raid5: only wakeup necessary threads

If there are not enough stripes to handle, we'd better not always
queue all available work_structs. If one worker can only handle small
or even none stripes, it will impact request merge and create lock
contention.

With this patch, the number of work_struct running will depend on
pending stripes number. Note: some statistics info used in the patch
are accessed without locking protection. This should doesn't matter,
we just try best to avoid queue unnecessary work_struct.

Signed-off-by: Shaohua Li <shli@fusionio.com>
Signed-off-by: NeilBrown <neilb@suse.de>
diff --git a/drivers/md/raid5.c b/drivers/md/raid5.c
index 663a8e5..7ff4f25 100644
--- a/drivers/md/raid5.c
+++ b/drivers/md/raid5.c
@@ -77,6 +77,7 @@
 #define BYPASS_THRESHOLD	1
 #define NR_HASH			(PAGE_SIZE / sizeof(struct hlist_head))
 #define HASH_MASK		(NR_HASH - 1)
+#define MAX_STRIPE_BATCH	8
 
 static inline struct hlist_head *stripe_hash(struct r5conf *conf, sector_t sect)
 {
@@ -209,6 +210,7 @@
 {
 	struct r5conf *conf = sh->raid_conf;
 	struct r5worker_group *group;
+	int thread_cnt;
 	int i, cpu = sh->cpu;
 
 	if (!cpu_online(cpu)) {
@@ -220,6 +222,8 @@
 		struct r5worker_group *group;
 		group = conf->worker_groups + cpu_to_group(cpu);
 		list_add_tail(&sh->lru, &group->handle_list);
+		group->stripes_cnt++;
+		sh->group = group;
 	}
 
 	if (conf->worker_cnt_per_group == 0) {
@@ -229,8 +233,20 @@
 
 	group = conf->worker_groups + cpu_to_group(sh->cpu);
 
-	for (i = 0; i < conf->worker_cnt_per_group; i++)
-		queue_work_on(sh->cpu, raid5_wq, &group->workers[i].work);
+	group->workers[0].working = true;
+	/* at least one worker should run to avoid race */
+	queue_work_on(sh->cpu, raid5_wq, &group->workers[0].work);
+
+	thread_cnt = group->stripes_cnt / MAX_STRIPE_BATCH - 1;
+	/* wakeup more workers */
+	for (i = 1; i < conf->worker_cnt_per_group && thread_cnt > 0; i++) {
+		if (group->workers[i].working == false) {
+			group->workers[i].working = true;
+			queue_work_on(sh->cpu, raid5_wq,
+				      &group->workers[i].work);
+			thread_cnt--;
+		}
+	}
 }
 
 static void do_release_stripe(struct r5conf *conf, struct stripe_head *sh)
@@ -589,6 +605,10 @@
 				    !test_bit(STRIPE_EXPANDING, &sh->state))
 					BUG();
 				list_del_init(&sh->lru);
+				if (sh->group) {
+					sh->group->stripes_cnt--;
+					sh->group = NULL;
+				}
 			}
 		}
 	} while (sh == NULL);
@@ -4153,15 +4173,18 @@
 {
 	struct stripe_head *sh = NULL, *tmp;
 	struct list_head *handle_list = NULL;
+	struct r5worker_group *wg = NULL;
 
 	if (conf->worker_cnt_per_group == 0) {
 		handle_list = &conf->handle_list;
 	} else if (group != ANY_GROUP) {
 		handle_list = &conf->worker_groups[group].handle_list;
+		wg = &conf->worker_groups[group];
 	} else {
 		int i;
 		for (i = 0; i < conf->group_cnt; i++) {
 			handle_list = &conf->worker_groups[i].handle_list;
+			wg = &conf->worker_groups[i];
 			if (!list_empty(handle_list))
 				break;
 		}
@@ -4208,11 +4231,16 @@
 			if (conf->bypass_count < 0)
 				conf->bypass_count = 0;
 		}
+		wg = NULL;
 	}
 
 	if (!sh)
 		return NULL;
 
+	if (wg) {
+		wg->stripes_cnt--;
+		sh->group = NULL;
+	}
 	list_del_init(&sh->lru);
 	atomic_inc(&sh->count);
 	BUG_ON(atomic_read(&sh->count) != 1);
@@ -4919,8 +4947,8 @@
 	return handled;
 }
 
-#define MAX_STRIPE_BATCH 8
-static int handle_active_stripes(struct r5conf *conf, int group)
+static int handle_active_stripes(struct r5conf *conf, int group,
+				 struct r5worker *worker)
 {
 	struct stripe_head *batch[MAX_STRIPE_BATCH], *sh;
 	int i, batch_size = 0;
@@ -4963,7 +4991,8 @@
 
 		released = release_stripe_list(conf);
 
-		batch_size = handle_active_stripes(conf, group_id);
+		batch_size = handle_active_stripes(conf, group_id, worker);
+		worker->working = false;
 		if (!batch_size && !released)
 			break;
 		handled += batch_size;
@@ -5025,7 +5054,7 @@
 			handled++;
 		}
 
-		batch_size = handle_active_stripes(conf, ANY_GROUP);
+		batch_size = handle_active_stripes(conf, ANY_GROUP, NULL);
 		if (!batch_size && !released)
 			break;
 		handled += batch_size;