dm cache: add fail io mode and needs_check flag

If a cache metadata operation fails (e.g. transaction commit) the
cache's metadata device will abort the current transaction, set a new
needs_check flag, and the cache will transition to "read-only" mode.  If
aborting the transaction or setting the needs_check flag fails the cache
will transition to "fail-io" mode.

Once needs_check is set the cache device will not be allowed to
activate.  Activation requires write access to metadata.  Future work is
needed to add proper support for running the cache in read-only mode.

Once in fail-io mode the cache will report a status of "Fail".

Also, add commit() wrapper that will disallow commits if in read_only or
fail mode.

Signed-off-by: Joe Thornber <ejt@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@redhat.com>
diff --git a/drivers/md/dm-cache-target.c b/drivers/md/dm-cache-target.c
index 6d36ed3..dae0321 100644
--- a/drivers/md/dm-cache-target.c
+++ b/drivers/md/dm-cache-target.c
@@ -150,12 +150,10 @@
 #define DATA_DEV_BLOCK_SIZE_MIN_SECTORS (32 * 1024 >> SECTOR_SHIFT)
 #define DATA_DEV_BLOCK_SIZE_MAX_SECTORS (1024 * 1024 * 1024 >> SECTOR_SHIFT)
 
-/*
- * FIXME: the cache is read/write for the time being.
- */
 enum cache_metadata_mode {
 	CM_WRITE,		/* metadata may be changed */
 	CM_READ_ONLY,		/* metadata may not be changed */
+	CM_FAIL
 };
 
 enum cache_io_mode {
@@ -385,6 +383,8 @@
 	struct dm_bio_prison_cell *cell2;
 };
 
+static enum cache_metadata_mode get_cache_mode(struct cache *cache);
+
 static void wake_worker(struct cache *cache)
 {
 	queue_work(cache->wq, &cache->worker);
@@ -699,6 +699,9 @@
 {
 	struct dm_cache_statistics stats;
 
+	if (get_cache_mode(cache) >= CM_READ_ONLY)
+		return;
+
 	stats.read_hits = atomic_read(&cache->stats.read_hit);
 	stats.read_misses = atomic_read(&cache->stats.read_miss);
 	stats.write_hits = atomic_read(&cache->stats.write_hit);
@@ -958,6 +961,84 @@
 }
 
 /*----------------------------------------------------------------
+ * Failure modes
+ *--------------------------------------------------------------*/
+static enum cache_metadata_mode get_cache_mode(struct cache *cache)
+{
+	return cache->features.mode;
+}
+
+static void notify_mode_switch(struct cache *cache, enum cache_metadata_mode mode)
+{
+	const char *descs[] = {
+		"write",
+		"read-only",
+		"fail"
+	};
+
+	dm_table_event(cache->ti->table);
+	DMINFO("switching cache to %s mode", descs[(int)mode]);
+}
+
+static void set_cache_mode(struct cache *cache, enum cache_metadata_mode new_mode)
+{
+	bool needs_check = dm_cache_metadata_needs_check(cache->cmd);
+	enum cache_metadata_mode old_mode = get_cache_mode(cache);
+
+	if (new_mode == CM_WRITE && needs_check) {
+		DMERR("unable to switch cache to write mode until repaired.");
+		if (old_mode != new_mode)
+			new_mode = old_mode;
+		else
+			new_mode = CM_READ_ONLY;
+	}
+
+	/* Never move out of fail mode */
+	if (old_mode == CM_FAIL)
+		new_mode = CM_FAIL;
+
+	switch (new_mode) {
+	case CM_FAIL:
+	case CM_READ_ONLY:
+		dm_cache_metadata_set_read_only(cache->cmd);
+		break;
+
+	case CM_WRITE:
+		dm_cache_metadata_set_read_write(cache->cmd);
+		break;
+	}
+
+	cache->features.mode = new_mode;
+
+	if (new_mode != old_mode)
+		notify_mode_switch(cache, new_mode);
+}
+
+static void abort_transaction(struct cache *cache)
+{
+	if (get_cache_mode(cache) >= CM_READ_ONLY)
+		return;
+
+	if (dm_cache_metadata_set_needs_check(cache->cmd)) {
+		DMERR("failed to set 'needs_check' flag in metadata");
+		set_cache_mode(cache, CM_FAIL);
+	}
+
+	DMERR_LIMIT("aborting current metadata transaction");
+	if (dm_cache_metadata_abort(cache->cmd)) {
+		DMERR("failed to abort metadata transaction");
+		set_cache_mode(cache, CM_FAIL);
+	}
+}
+
+static void metadata_operation_failed(struct cache *cache, const char *op, int r)
+{
+	DMERR_LIMIT("metadata operation '%s' failed: error = %d", op, r);
+	abort_transaction(cache);
+	set_cache_mode(cache, CM_READ_ONLY);
+}
+
+/*----------------------------------------------------------------
  * Migration processing
  *
  * Migration covers moving data from the origin device to the cache, or
@@ -1063,6 +1144,7 @@
 
 static void migration_success_pre_commit(struct dm_cache_migration *mg)
 {
+	int r;
 	unsigned long flags;
 	struct cache *cache = mg->cache;
 
@@ -1073,8 +1155,10 @@
 		return;
 
 	} else if (mg->demote) {
-		if (dm_cache_remove_mapping(cache->cmd, mg->cblock)) {
+		r = dm_cache_remove_mapping(cache->cmd, mg->cblock);
+		if (r) {
 			DMWARN_LIMIT("demotion failed; couldn't update on disk metadata");
+			metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
 			policy_force_mapping(cache->policy, mg->new_oblock,
 					     mg->old_oblock);
 			if (mg->promote)
@@ -1083,8 +1167,10 @@
 			return;
 		}
 	} else {
-		if (dm_cache_insert_mapping(cache->cmd, mg->cblock, mg->new_oblock)) {
+		r = dm_cache_insert_mapping(cache->cmd, mg->cblock, mg->new_oblock);
+		if (r) {
 			DMWARN_LIMIT("promotion failed; couldn't update on disk metadata");
+			metadata_operation_failed(cache, "dm_cache_insert_mapping", r);
 			policy_remove_mapping(cache->policy, mg->new_oblock);
 			free_io_migration(mg);
 			return;
@@ -1812,15 +1898,32 @@
 	       jiffies > cache->last_commit_jiffies + COMMIT_PERIOD;
 }
 
+/*
+ * A non-zero return indicates read_only or fail_io mode.
+ */
+static int commit(struct cache *cache, bool clean_shutdown)
+{
+	int r;
+
+	if (get_cache_mode(cache) >= CM_READ_ONLY)
+		return -EINVAL;
+
+	atomic_inc(&cache->stats.commit_count);
+	r = dm_cache_commit(cache->cmd, clean_shutdown);
+	if (r)
+		metadata_operation_failed(cache, "dm_cache_commit", r);
+
+	return r;
+}
+
 static int commit_if_needed(struct cache *cache)
 {
 	int r = 0;
 
 	if ((cache->commit_requested || need_commit_due_to_time(cache)) &&
 	    dm_cache_changed_this_transaction(cache->cmd)) {
-		atomic_inc(&cache->stats.commit_count);
+		r = commit(cache, false);
 		cache->commit_requested = false;
-		r = dm_cache_commit(cache->cmd, false);
 		cache->last_commit_jiffies = jiffies;
 	}
 
@@ -1988,8 +2091,10 @@
 		r = policy_remove_cblock(cache->policy, to_cblock(begin));
 		if (!r) {
 			r = dm_cache_remove_mapping(cache->cmd, to_cblock(begin));
-			if (r)
+			if (r) {
+				metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
 				break;
+			}
 
 		} else if (r == -ENODATA) {
 			/* harmless, already unmapped */
@@ -2133,12 +2238,6 @@
 		if (commit_if_needed(cache)) {
 			process_deferred_flush_bios(cache, false);
 			process_migrations(cache, &cache->need_commit_migrations, migration_failure);
-
-			/*
-			 * FIXME: rollback metadata or just go into a
-			 * failure mode and error everything
-			 */
-
 		} else {
 			process_deferred_flush_bios(cache, true);
 			process_migrations(cache, &cache->need_commit_migrations,
@@ -2711,6 +2810,12 @@
 		goto bad;
 	}
 	cache->cmd = cmd;
+	set_cache_mode(cache, CM_WRITE);
+	if (get_cache_mode(cache) != CM_WRITE) {
+		*error = "Unable to get write access to metadata, please check/repair metadata.";
+		r = -EINVAL;
+		goto bad;
+	}
 
 	if (passthrough_mode(&cache->features)) {
 		bool all_clean;
@@ -3043,11 +3148,16 @@
 {
 	unsigned i, r;
 
+	if (get_cache_mode(cache) >= CM_READ_ONLY)
+		return -EINVAL;
+
 	for (i = 0; i < from_cblock(cache->cache_size); i++) {
 		r = dm_cache_set_dirty(cache->cmd, to_cblock(i),
 				       is_dirty(cache, to_cblock(i)));
-		if (r)
+		if (r) {
+			metadata_operation_failed(cache, "dm_cache_set_dirty", r);
 			return r;
+		}
 	}
 
 	return 0;
@@ -3057,18 +3167,40 @@
 {
 	unsigned i, r;
 
+	if (get_cache_mode(cache) >= CM_READ_ONLY)
+		return -EINVAL;
+
 	r = dm_cache_discard_bitset_resize(cache->cmd, cache->discard_block_size,
 					   cache->discard_nr_blocks);
 	if (r) {
 		DMERR("could not resize on-disk discard bitset");
+		metadata_operation_failed(cache, "dm_cache_discard_bitset_resize", r);
 		return r;
 	}
 
 	for (i = 0; i < from_dblock(cache->discard_nr_blocks); i++) {
 		r = dm_cache_set_discard(cache->cmd, to_dblock(i),
 					 is_discarded(cache, to_dblock(i)));
-		if (r)
+		if (r) {
+			metadata_operation_failed(cache, "dm_cache_set_discard", r);
 			return r;
+		}
+	}
+
+	return 0;
+}
+
+static int write_hints(struct cache *cache)
+{
+	int r;
+
+	if (get_cache_mode(cache) >= CM_READ_ONLY)
+		return -EINVAL;
+
+	r = dm_cache_write_hints(cache->cmd, cache->policy);
+	if (r) {
+		metadata_operation_failed(cache, "dm_cache_write_hints", r);
+		return r;
 	}
 
 	return 0;
@@ -3091,7 +3223,7 @@
 
 	save_stats(cache);
 
-	r3 = dm_cache_write_hints(cache->cmd, cache->policy);
+	r3 = write_hints(cache);
 	if (r3)
 		DMERR("could not write hints");
 
@@ -3100,9 +3232,9 @@
 	 * set the clean shutdown flag.  This will effectively force every
 	 * dirty bit to be set on reload.
 	 */
-	r4 = dm_cache_commit(cache->cmd, !r1 && !r2 && !r3);
+	r4 = commit(cache, !r1 && !r2 && !r3);
 	if (r4)
-		DMERR("could not write cache metadata.  Data loss may occur.");
+		DMERR("could not write cache metadata.");
 
 	return !r1 && !r2 && !r3 && !r4;
 }
@@ -3118,7 +3250,8 @@
 	requeue_deferred_cells(cache);
 	stop_quiescing(cache);
 
-	(void) sync_metadata(cache);
+	if (get_cache_mode(cache) == CM_WRITE)
+		(void) sync_metadata(cache);
 }
 
 static int load_mapping(void *context, dm_oblock_t oblock, dm_cblock_t cblock,
@@ -3257,6 +3390,7 @@
 	r = dm_cache_resize(cache->cmd, new_size);
 	if (r) {
 		DMERR("could not resize cache metadata");
+		metadata_operation_failed(cache, "dm_cache_resize", r);
 		return r;
 	}
 
@@ -3295,6 +3429,7 @@
 					   load_mapping, cache);
 		if (r) {
 			DMERR("could not load cache mappings");
+			metadata_operation_failed(cache, "dm_cache_load_mappings", r);
 			return r;
 		}
 
@@ -3315,6 +3450,7 @@
 		r = dm_cache_load_discards(cache->cmd, load_discard, &li);
 		if (r) {
 			DMERR("could not load origin discards");
+			metadata_operation_failed(cache, "dm_cache_load_discards", r);
 			return r;
 		}
 		set_discard_range(&li);
@@ -3342,7 +3478,7 @@
  * <#demotions> <#promotions> <#dirty>
  * <#features> <features>*
  * <#core args> <core args>
- * <policy name> <#policy args> <policy args>*
+ * <policy name> <#policy args> <policy args>* <cache metadata mode>
  */
 static void cache_status(struct dm_target *ti, status_type_t type,
 			 unsigned status_flags, char *result, unsigned maxlen)
@@ -3358,13 +3494,15 @@
 
 	switch (type) {
 	case STATUSTYPE_INFO:
-		/* Commit to ensure statistics aren't out-of-date */
-		if (!(status_flags & DM_STATUS_NOFLUSH_FLAG) && !dm_suspended(ti)) {
-			r = dm_cache_commit(cache->cmd, false);
-			if (r)
-				DMERR("could not commit metadata for accurate status");
+		if (get_cache_mode(cache) == CM_FAIL) {
+			DMEMIT("Fail");
+			break;
 		}
 
+		/* Commit to ensure statistics aren't out-of-date */
+		if (!(status_flags & DM_STATUS_NOFLUSH_FLAG) && !dm_suspended(ti))
+			(void) commit(cache, false);
+
 		r = dm_cache_get_free_metadata_block_count(cache->cmd,
 							   &nr_free_blocks_metadata);
 		if (r) {
@@ -3413,11 +3551,16 @@
 
 		DMEMIT("%s ", dm_cache_policy_get_name(cache->policy));
 		if (sz < maxlen) {
-			r = policy_emit_config_values(cache->policy, result + sz, maxlen - sz);
+			r = policy_emit_config_values(cache->policy, result, maxlen, &sz);
 			if (r)
 				DMERR("policy_emit_config_values returned %d", r);
 		}
 
+		if (get_cache_mode(cache) == CM_READ_ONLY)
+			DMEMIT("ro ");
+		else
+			DMEMIT("rw ");
+
 		break;
 
 	case STATUSTYPE_TABLE:
@@ -3573,6 +3716,11 @@
 	if (!argc)
 		return -EINVAL;
 
+	if (get_cache_mode(cache) >= CM_READ_ONLY) {
+		DMERR("unable to service cache target messages in READ_ONLY or FAIL mode");
+		return -EOPNOTSUPP;
+	}
+
 	if (!strcasecmp(argv[0], "invalidate_cblocks"))
 		return process_invalidate_cblocks_message(cache, argc - 1, (const char **) argv + 1);
 
@@ -3646,7 +3794,7 @@
 
 static struct target_type cache_target = {
 	.name = "cache",
-	.version = {1, 6, 0},
+	.version = {1, 7, 0},
 	.module = THIS_MODULE,
 	.ctr = cache_ctr,
 	.dtr = cache_dtr,