NFSv4.1: filelayout driver specific code for COMMIT

Implement all the hooks created in the previous patches.
This requires exporting quite a few functions and adding a few
structure fields.

Signed-off-by: Fred Isaman <iisaman@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4filelayout.c b/fs/nfs/nfs4filelayout.c
index 03ff80c..97e75a2 100644
--- a/fs/nfs/nfs4filelayout.c
+++ b/fs/nfs/nfs4filelayout.c
@@ -213,6 +213,37 @@
 	return 0;
 }
 
+/* Fake up some data that will cause nfs_commit_release to retry the writes. */
+static void prepare_to_resend_writes(struct nfs_write_data *data)
+{
+	struct nfs_page *first = nfs_list_entry(data->pages.next);
+
+	data->task.tk_status = 0;
+	memcpy(data->verf.verifier, first->wb_verf.verifier,
+	       sizeof(first->wb_verf.verifier));
+	data->verf.verifier[0]++; /* ensure verifier mismatch */
+}
+
+static int filelayout_commit_done_cb(struct rpc_task *task,
+				     struct nfs_write_data *data)
+{
+	int reset = 0;
+
+	if (filelayout_async_handle_error(task, data->args.context->state,
+					  data->ds_clp, &reset) == -EAGAIN) {
+		dprintk("%s calling restart ds_clp %p ds_clp->cl_session %p\n",
+			__func__, data->ds_clp, data->ds_clp->cl_session);
+		if (reset) {
+			prepare_to_resend_writes(data);
+			filelayout_set_lo_fail(data->lseg);
+		} else
+			nfs_restart_rpc(task, data->ds_clp);
+		return -EAGAIN;
+	}
+
+	return 0;
+}
+
 static void filelayout_write_prepare(struct rpc_task *task, void *data)
 {
 	struct nfs_write_data *wdata = (struct nfs_write_data *)data;
@@ -240,6 +271,16 @@
 	wdata->mds_ops->rpc_release(data);
 }
 
+static void filelayout_commit_release(void *data)
+{
+	struct nfs_write_data *wdata = (struct nfs_write_data *)data;
+
+	nfs_commit_release_pages(wdata);
+	if (atomic_dec_and_test(&NFS_I(wdata->inode)->commits_outstanding))
+		nfs_commit_clear_lock(NFS_I(wdata->inode));
+	nfs_commitdata_release(wdata);
+}
+
 struct rpc_call_ops filelayout_read_call_ops = {
 	.rpc_call_prepare = filelayout_read_prepare,
 	.rpc_call_done = filelayout_read_call_done,
@@ -252,6 +293,12 @@
 	.rpc_release = filelayout_write_release,
 };
 
+struct rpc_call_ops filelayout_commit_call_ops = {
+	.rpc_call_prepare = filelayout_write_prepare,
+	.rpc_call_done = filelayout_write_call_done,
+	.rpc_release = filelayout_commit_release,
+};
+
 static enum pnfs_try_status
 filelayout_read_pagelist(struct nfs_read_data *data)
 {
@@ -574,6 +621,191 @@
 	return (p_stripe == r_stripe);
 }
 
+static bool filelayout_mark_pnfs_commit(struct pnfs_layout_segment *lseg)
+{
+	return !FILELAYOUT_LSEG(lseg)->commit_through_mds;
+}
+
+static u32 select_bucket_index(struct nfs4_filelayout_segment *fl, u32 j)
+{
+	if (fl->stripe_type == STRIPE_SPARSE)
+		return nfs4_fl_calc_ds_index(&fl->generic_hdr, j);
+	else
+		return j;
+}
+
+struct list_head *filelayout_choose_commit_list(struct nfs_page *req)
+{
+	struct pnfs_layout_segment *lseg = req->wb_commit_lseg;
+	struct nfs4_filelayout_segment *fl = FILELAYOUT_LSEG(lseg);
+	u32 i, j;
+	struct list_head *list;
+
+	/* Note that we are calling nfs4_fl_calc_j_index on each page
+	 * that ends up being committed to a data server.  An attractive
+	 * alternative is to add a field to nfs_write_data and nfs_page
+	 * to store the value calculated in filelayout_write_pagelist
+	 * and just use that here.
+	 */
+	j = nfs4_fl_calc_j_index(lseg,
+				 (loff_t)req->wb_index << PAGE_CACHE_SHIFT);
+	i = select_bucket_index(fl, j);
+	list = &fl->commit_buckets[i];
+	if (list_empty(list)) {
+		/* Non-empty buckets hold a reference on the lseg */
+		get_lseg(lseg);
+	}
+	return list;
+}
+
+static u32 calc_ds_index_from_commit(struct pnfs_layout_segment *lseg, u32 i)
+{
+	struct nfs4_filelayout_segment *flseg = FILELAYOUT_LSEG(lseg);
+
+	if (flseg->stripe_type == STRIPE_SPARSE)
+		return i;
+	else
+		return nfs4_fl_calc_ds_index(lseg, i);
+}
+
+static struct nfs_fh *
+select_ds_fh_from_commit(struct pnfs_layout_segment *lseg, u32 i)
+{
+	struct nfs4_filelayout_segment *flseg = FILELAYOUT_LSEG(lseg);
+
+	if (flseg->stripe_type == STRIPE_SPARSE) {
+		if (flseg->num_fh == 1)
+			i = 0;
+		else if (flseg->num_fh == 0)
+			/* Use the MDS OPEN fh set in nfs_read_rpcsetup */
+			return NULL;
+	}
+	return flseg->fh_array[i];
+}
+
+static int filelayout_initiate_commit(struct nfs_write_data *data, int how)
+{
+	struct pnfs_layout_segment *lseg = data->lseg;
+	struct nfs4_pnfs_ds *ds;
+	u32 idx;
+	struct nfs_fh *fh;
+
+	idx = calc_ds_index_from_commit(lseg, data->ds_commit_index);
+	ds = nfs4_fl_prepare_ds(lseg, idx);
+	if (!ds) {
+		printk(KERN_ERR "%s: prepare_ds failed, use MDS\n", __func__);
+		set_bit(lo_fail_bit(IOMODE_RW), &lseg->pls_layout->plh_flags);
+		set_bit(lo_fail_bit(IOMODE_READ), &lseg->pls_layout->plh_flags);
+		prepare_to_resend_writes(data);
+		data->mds_ops->rpc_release(data);
+		return -EAGAIN;
+	}
+	dprintk("%s ino %lu, how %d\n", __func__, data->inode->i_ino, how);
+	data->write_done_cb = filelayout_commit_done_cb;
+	data->ds_clp = ds->ds_clp;
+	fh = select_ds_fh_from_commit(lseg, data->ds_commit_index);
+	if (fh)
+		data->args.fh = fh;
+	return nfs_initiate_commit(data, ds->ds_clp->cl_rpcclient,
+				   &filelayout_commit_call_ops, how);
+}
+
+/*
+ * This is only useful while we are using whole file layouts.
+ */
+static struct pnfs_layout_segment *find_only_write_lseg(struct inode *inode)
+{
+	struct pnfs_layout_segment *lseg, *rv = NULL;
+
+	spin_lock(&inode->i_lock);
+	list_for_each_entry(lseg, &NFS_I(inode)->layout->plh_segs, pls_list)
+		if (lseg->pls_range.iomode == IOMODE_RW)
+			rv = get_lseg(lseg);
+	spin_unlock(&inode->i_lock);
+	return rv;
+}
+
+static int alloc_ds_commits(struct inode *inode, struct list_head *list)
+{
+	struct pnfs_layout_segment *lseg;
+	struct nfs4_filelayout_segment *fl;
+	struct nfs_write_data *data;
+	int i, j;
+
+	/* Won't need this when non-whole file layout segments are supported
+	 * instead we will use a pnfs_layout_hdr structure */
+	lseg = find_only_write_lseg(inode);
+	if (!lseg)
+		return 0;
+	fl = FILELAYOUT_LSEG(lseg);
+	for (i = 0; i < fl->number_of_buckets; i++) {
+		if (list_empty(&fl->commit_buckets[i]))
+			continue;
+		data = nfs_commitdata_alloc();
+		if (!data)
+			goto out_bad;
+		data->ds_commit_index = i;
+		data->lseg = lseg;
+		list_add(&data->pages, list);
+	}
+	put_lseg(lseg);
+	return 0;
+
+out_bad:
+	for (j = i; j < fl->number_of_buckets; j++) {
+		if (list_empty(&fl->commit_buckets[i]))
+			continue;
+		nfs_retry_commit(&fl->commit_buckets[i], lseg);
+		put_lseg(lseg);  /* associated with emptying bucket */
+	}
+	put_lseg(lseg);
+	/* Caller will clean up entries put on list */
+	return -ENOMEM;
+}
+
+/* This follows nfs_commit_list pretty closely */
+static int
+filelayout_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
+			   int how)
+{
+	struct nfs_write_data	*data, *tmp;
+	LIST_HEAD(list);
+
+	if (!list_empty(mds_pages)) {
+		data = nfs_commitdata_alloc();
+		if (!data)
+			goto out_bad;
+		data->lseg = NULL;
+		list_add(&data->pages, &list);
+	}
+
+	if (alloc_ds_commits(inode, &list))
+		goto out_bad;
+
+	list_for_each_entry_safe(data, tmp, &list, pages) {
+		list_del_init(&data->pages);
+		atomic_inc(&NFS_I(inode)->commits_outstanding);
+		if (!data->lseg) {
+			nfs_init_commit(data, mds_pages, NULL);
+			nfs_initiate_commit(data, NFS_CLIENT(inode),
+					    data->mds_ops, how);
+		} else {
+			nfs_init_commit(data, &FILELAYOUT_LSEG(data->lseg)->commit_buckets[data->ds_commit_index], data->lseg);
+			filelayout_initiate_commit(data, how);
+		}
+	}
+	return 0;
+ out_bad:
+	list_for_each_entry_safe(data, tmp, &list, pages) {
+		nfs_retry_commit(&data->pages, data->lseg);
+		list_del_init(&data->pages);
+		nfs_commit_free(data);
+	}
+	nfs_retry_commit(mds_pages, NULL);
+	nfs_commit_clear_lock(NFS_I(inode));
+	return -ENOMEM;
+}
+
 static struct pnfs_layoutdriver_type filelayout_type = {
 	.id			= LAYOUT_NFSV4_1_FILES,
 	.name			= "LAYOUT_NFSV4_1_FILES",
@@ -581,6 +813,9 @@
 	.alloc_lseg		= filelayout_alloc_lseg,
 	.free_lseg		= filelayout_free_lseg,
 	.pg_test		= filelayout_pg_test,
+	.mark_pnfs_commit	= filelayout_mark_pnfs_commit,
+	.choose_commit_list	= filelayout_choose_commit_list,
+	.commit_pagelist	= filelayout_commit_pagelist,
 	.read_pagelist		= filelayout_read_pagelist,
 	.write_pagelist		= filelayout_write_pagelist,
 };