Blame view

net/sunrpc/xprtrdma/rpc_rdma.c 27.6 KB
f58851e6b   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
1
  /*
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
   * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   *
   * This software is available to you under a choice of one of two
   * licenses.  You may choose to be licensed under the terms of the GNU
   * General Public License (GPL) Version 2, available from the file
   * COPYING in the main directory of this source tree, or the BSD-type
   * license below:
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   *      Redistributions of source code must retain the above copyright
   *      notice, this list of conditions and the following disclaimer.
   *
   *      Redistributions in binary form must reproduce the above
   *      copyright notice, this list of conditions and the following
   *      disclaimer in the documentation and/or other materials provided
   *      with the distribution.
   *
   *      Neither the name of the Network Appliance, Inc. nor the names of
   *      its contributors may be used to endorse or promote products
   *      derived from this software without specific prior written
   *      permission.
   *
   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
   * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
   * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
   * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   */
  
  /*
   * rpc_rdma.c
   *
   * This file contains the guts of the RPC RDMA protocol, and
   * does marshaling/unmarshaling, etc. It is also where interfacing
   * to the Linux RPC framework lives.
f58851e6b   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
46
47
48
   */
  
  #include "xprt_rdma.h"
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
  #include <linux/highmem.h>
  
  #ifdef RPC_DEBUG
  # define RPCDBG_FACILITY	RPCDBG_TRANS
  #endif
  
  enum rpcrdma_chunktype {
  	rpcrdma_noch = 0,
  	rpcrdma_readch,
  	rpcrdma_areadch,
  	rpcrdma_writech,
  	rpcrdma_replych
  };
  
  #ifdef RPC_DEBUG
  static const char transfertypes[][12] = {
  	"pure inline",	/* no chunks */
  	" read chunk",	/* some argument via rdma read */
  	"*read chunk",	/* entire request via rdma read */
  	"write chunk",	/* some result via rdma write */
  	"reply chunk"	/* entire reply via rdma write */
  };
  #endif
  
  /*
   * Chunk assembly from upper layer xdr_buf.
   *
   * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
   * elements. Segments are then coalesced when registered, if possible
   * within the selected memreg mode.
   *
   * Note, this routine is never called if the connection's memory
   * registration strategy is 0 (bounce buffers).
   */
  
  static int
2a428b2b8   Chuck Lever   SUNRPC: Prevent m...
85
  rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
86
87
88
  	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
  {
  	int len, n = 0, p;
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
89
90
  	int page_base;
  	struct page **ppages;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
91
92
93
94
95
  
  	if (pos == 0 && xdrbuf->head[0].iov_len) {
  		seg[n].mr_page = NULL;
  		seg[n].mr_offset = xdrbuf->head[0].iov_base;
  		seg[n].mr_len = xdrbuf->head[0].iov_len;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
96
97
  		++n;
  	}
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
98
99
100
101
102
103
104
105
106
107
  	len = xdrbuf->page_len;
  	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
  	page_base = xdrbuf->page_base & ~PAGE_MASK;
  	p = 0;
  	while (len && n < nsegs) {
  		seg[n].mr_page = ppages[p];
  		seg[n].mr_offset = (void *)(unsigned long) page_base;
  		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
  		BUG_ON(seg[n].mr_len > PAGE_SIZE);
  		len -= seg[n].mr_len;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
108
  		++n;
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
109
110
  		++p;
  		page_base = 0;	/* page offset only applies to first page */
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
111
  	}
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
112
113
114
  	/* Message overflows the seg array */
  	if (len && n == nsegs)
  		return 0;
50e1092b3   James Lentini   SUNRPC xprtrdma: ...
115
  	if (xdrbuf->tail[0].iov_len) {
9191ca3b3   Tom Talpey   RPC/RDMA: adhere ...
116
117
118
119
  		/* the rpcrdma protocol allows us to omit any trailing
  		 * xdr pad bytes, saving the server an RDMA operation. */
  		if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
  			return n;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
120
  		if (n == nsegs)
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
121
  			/* Tail remains, but we're out of segments */
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
122
123
124
125
  			return 0;
  		seg[n].mr_page = NULL;
  		seg[n].mr_offset = xdrbuf->tail[0].iov_base;
  		seg[n].mr_len = xdrbuf->tail[0].iov_len;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
126
127
  		++n;
  	}
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
  	return n;
  }
  
  /*
   * Create read/write chunk lists, and reply chunks, for RDMA
   *
   *   Assume check against THRESHOLD has been done, and chunks are required.
   *   Assume only encoding one list entry for read|write chunks. The NFSv3
   *     protocol is simple enough to allow this as it only has a single "bulk
   *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
   *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
   *
   * When used for a single reply chunk (which is a special write
   * chunk used for the entire reply, rather than just the data), it
   * is used primarily for READDIR and READLINK which would otherwise
   * be severely size-limited by a small rdma inline read max. The server
   * response will come back as an RDMA Write, followed by a message
   * of type RDMA_NOMSG carrying the xid and length. As a result, reply
   * chunks do not provide data alignment, however they do not require
   * "fixup" (moving the response to the upper layer buffer) either.
   *
   * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
   *
   *  Read chunklist (a linked list):
   *   N elements, position P (same P for all chunks of same arg!):
   *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
   *
   *  Write chunklist (a list of (one) counted array):
   *   N elements:
   *    1 - N - HLOO - HLOO - ... - HLOO - 0
   *
   *  Reply chunk (a counted array):
   *   N elements:
   *    1 - N - HLOO - HLOO - ... - HLOO
   */
  
  static unsigned int
  rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
  		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
  {
  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);
  	int nsegs, nchunks = 0;
2a428b2b8   Chuck Lever   SUNRPC: Prevent m...
171
  	unsigned int pos;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
172
173
174
175
  	struct rpcrdma_mr_seg *seg = req->rl_segments;
  	struct rpcrdma_read_chunk *cur_rchunk = NULL;
  	struct rpcrdma_write_array *warray = NULL;
  	struct rpcrdma_write_chunk *cur_wchunk = NULL;
2d8a97266   Al Viro   SUNRPC endianness...
176
  	__be32 *iptr = headerp->rm_body.rm_chunks;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
  
  	if (type == rpcrdma_readch || type == rpcrdma_areadch) {
  		/* a read chunk - server will RDMA Read our memory */
  		cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
  	} else {
  		/* a write or reply chunk - server will RDMA Write our memory */
  		*iptr++ = xdr_zero;	/* encode a NULL read chunk list */
  		if (type == rpcrdma_replych)
  			*iptr++ = xdr_zero;	/* a NULL write chunk list */
  		warray = (struct rpcrdma_write_array *) iptr;
  		cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
  	}
  
  	if (type == rpcrdma_replych || type == rpcrdma_areadch)
  		pos = 0;
  	else
  		pos = target->head[0].iov_len;
  
  	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
  	if (nsegs == 0)
  		return 0;
  
  	do {
  		/* bind/register the memory, then build chunk from result. */
  		int n = rpcrdma_register_external(seg, nsegs,
  						cur_wchunk != NULL, r_xprt);
  		if (n <= 0)
  			goto out;
  		if (cur_rchunk) {	/* read */
  			cur_rchunk->rc_discrim = xdr_one;
  			/* all read chunks have the same "position" */
  			cur_rchunk->rc_position = htonl(pos);
  			cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
  			cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
  			xdr_encode_hyper(
2d8a97266   Al Viro   SUNRPC endianness...
212
  					(__be32 *)&cur_rchunk->rc_target.rs_offset,
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
213
214
  					seg->mr_base);
  			dprintk("RPC:       %s: read chunk "
2a428b2b8   Chuck Lever   SUNRPC: Prevent m...
215
216
  				"elem %d@0x%llx:0x%x pos %u (%s)
  ", __func__,
e08a132b0   Stephen Rothwell   [SUNRPC] rpc_rdma...
217
218
  				seg->mr_len, (unsigned long long)seg->mr_base,
  				seg->mr_rkey, pos, n < nsegs ? "more" : "last");
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
219
220
221
222
223
224
  			cur_rchunk++;
  			r_xprt->rx_stats.read_chunk_count++;
  		} else {		/* write/reply */
  			cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
  			cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
  			xdr_encode_hyper(
2d8a97266   Al Viro   SUNRPC endianness...
225
  					(__be32 *)&cur_wchunk->wc_target.rs_offset,
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
226
227
228
229
230
  					seg->mr_base);
  			dprintk("RPC:       %s: %s chunk "
  				"elem %d@0x%llx:0x%x (%s)
  ", __func__,
  				(type == rpcrdma_replych) ? "reply" : "write",
e08a132b0   Stephen Rothwell   [SUNRPC] rpc_rdma...
231
232
  				seg->mr_len, (unsigned long long)seg->mr_base,
  				seg->mr_rkey, n < nsegs ? "more" : "last");
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
  			cur_wchunk++;
  			if (type == rpcrdma_replych)
  				r_xprt->rx_stats.reply_chunk_count++;
  			else
  				r_xprt->rx_stats.write_chunk_count++;
  			r_xprt->rx_stats.total_rdma_request += seg->mr_len;
  		}
  		nchunks++;
  		seg   += n;
  		nsegs -= n;
  	} while (nsegs);
  
  	/* success. all failures return above */
  	req->rl_nchunks = nchunks;
  
  	BUG_ON(nchunks == 0);
15cdc644b   Tom Tucker   rpcrdma: Fix SQ s...
249
250
  	BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
  	       && (nchunks > 3));
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
251
252
253
254
255
  
  	/*
  	 * finish off header. If write, marshal discrim and nchunks.
  	 */
  	if (cur_rchunk) {
2d8a97266   Al Viro   SUNRPC endianness...
256
  		iptr = (__be32 *) cur_rchunk;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
257
258
259
260
261
262
  		*iptr++ = xdr_zero;	/* finish the read chunk list */
  		*iptr++ = xdr_zero;	/* encode a NULL write chunk list */
  		*iptr++ = xdr_zero;	/* encode a NULL reply chunk */
  	} else {
  		warray->wc_discrim = xdr_one;
  		warray->wc_nchunks = htonl(nchunks);
2d8a97266   Al Viro   SUNRPC endianness...
263
  		iptr = (__be32 *) cur_wchunk;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
  		if (type == rpcrdma_writech) {
  			*iptr++ = xdr_zero; /* finish the write chunk list */
  			*iptr++ = xdr_zero; /* encode a NULL reply chunk */
  		}
  	}
  
  	/*
  	 * Return header size.
  	 */
  	return (unsigned char *)iptr - (unsigned char *)headerp;
  
  out:
  	for (pos = 0; nchunks--;)
  		pos += rpcrdma_deregister_external(
  				&req->rl_segments[pos], r_xprt, NULL);
  	return 0;
  }
  
  /*
   * Copy write data inline.
   * This function is used for "small" requests. Data which is passed
   * to RPC via iovecs (or page list) is copied directly into the
   * pre-registered memory buffer for this request. For small amounts
   * of data, this is efficient. The cutoff value is tunable.
   */
  static int
  rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
  {
  	int i, npages, curlen;
  	int copy_len;
  	unsigned char *srcp, *destp;
  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
296
297
  	int page_base;
  	struct page **ppages;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
  
  	destp = rqst->rq_svec[0].iov_base;
  	curlen = rqst->rq_svec[0].iov_len;
  	destp += curlen;
  	/*
  	 * Do optional padding where it makes sense. Alignment of write
  	 * payload can help the server, if our setting is accurate.
  	 */
  	pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
  	if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
  		pad = 0;	/* don't pad this request */
  
  	dprintk("RPC:       %s: pad %d destp 0x%p len %d hdrlen %d
  ",
  		__func__, pad, destp, rqst->rq_slen, curlen);
  
  	copy_len = rqst->rq_snd_buf.page_len;
b38ab40ad   Tom Talpey   XPRTRDMA: correct...
315
316
317
318
319
320
321
322
323
324
325
326
327
  
  	if (rqst->rq_snd_buf.tail[0].iov_len) {
  		curlen = rqst->rq_snd_buf.tail[0].iov_len;
  		if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
  			memmove(destp + copy_len,
  				rqst->rq_snd_buf.tail[0].iov_base, curlen);
  			r_xprt->rx_stats.pullup_copy_count += curlen;
  		}
  		dprintk("RPC:       %s: tail destp 0x%p len %d
  ",
  			__func__, destp + copy_len, curlen);
  		rqst->rq_svec[0].iov_len += curlen;
  	}
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
328
  	r_xprt->rx_stats.pullup_copy_count += copy_len;
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
329
330
331
332
333
  
  	page_base = rqst->rq_snd_buf.page_base;
  	ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
  	page_base &= ~PAGE_MASK;
  	npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
334
  	for (i = 0; copy_len && i < npages; i++) {
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
335
  		curlen = PAGE_SIZE - page_base;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
336
337
338
339
340
  		if (curlen > copy_len)
  			curlen = copy_len;
  		dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d
  ",
  			__func__, i, destp, copy_len, curlen);
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
341
342
  		srcp = kmap_atomic(ppages[i], KM_SKB_SUNRPC_DATA);
  		memcpy(destp, srcp+page_base, curlen);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
343
344
345
346
  		kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
  		rqst->rq_svec[0].iov_len += curlen;
  		destp += curlen;
  		copy_len -= curlen;
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
347
  		page_base = 0;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
348
  	}
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
  	/* header now contains entire send message */
  	return pad;
  }
  
  /*
   * Marshal a request: the primary job of this routine is to choose
   * the transfer modes. See comments below.
   *
   * Uses multiple RDMA IOVs for a request:
   *  [0] -- RPC RDMA header, which uses memory from the *start* of the
   *         preregistered buffer that already holds the RPC data in
   *         its middle.
   *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
   *  [2] -- optional padding.
   *  [3] -- if padded, header only in [1] and data here.
   */
  
  int
  rpcrdma_marshal_req(struct rpc_rqst *rqst)
  {
  	struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;
  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
  	char *base;
  	size_t hdrlen, rpclen, padlen;
  	enum rpcrdma_chunktype rtype, wtype;
  	struct rpcrdma_msg *headerp;
  
  	/*
  	 * rpclen gets amount of data in first buffer, which is the
  	 * pre-registered buffer.
  	 */
  	base = rqst->rq_svec[0].iov_base;
  	rpclen = rqst->rq_svec[0].iov_len;
  
  	/* build RDMA header in private area at front */
  	headerp = (struct rpcrdma_msg *) req->rl_base;
  	/* don't htonl XID, it's already done in request */
  	headerp->rm_xid = rqst->rq_xid;
  	headerp->rm_vers = xdr_one;
  	headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
8d614434a   YOSHIFUJI Hideaki   [SUNRPC]: Use hto...
390
  	headerp->rm_type = htonl(RDMA_MSG);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
  
  	/*
  	 * Chunks needed for results?
  	 *
  	 * o If the expected result is under the inline threshold, all ops
  	 *   return as inline (but see later).
  	 * o Large non-read ops return as a single reply chunk.
  	 * o Large read ops return data as write chunk(s), header as inline.
  	 *
  	 * Note: the NFS code sending down multiple result segments implies
  	 * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
  	 */
  
  	/*
  	 * This code can handle read chunks, write chunks OR reply
  	 * chunks -- only one type. If the request is too big to fit
  	 * inline, then we will choose read chunks. If the request is
  	 * a READ, then use write chunks to separate the file data
  	 * into pages; otherwise use reply chunks.
  	 */
  	if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
  		wtype = rpcrdma_noch;
  	else if (rqst->rq_rcv_buf.page_len == 0)
  		wtype = rpcrdma_replych;
  	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
  		wtype = rpcrdma_writech;
  	else
  		wtype = rpcrdma_replych;
  
  	/*
  	 * Chunks needed for arguments?
  	 *
  	 * o If the total request is under the inline threshold, all ops
  	 *   are sent as inline.
  	 * o Large non-write ops are sent with the entire message as a
  	 *   single read chunk (protocol 0-position special case).
  	 * o Large write ops transmit data as read chunk(s), header as
  	 *   inline.
  	 *
  	 * Note: the NFS code sending down multiple argument segments
  	 * implies the op is a write.
  	 * TBD check NFSv4 setacl
  	 */
  	if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
  		rtype = rpcrdma_noch;
  	else if (rqst->rq_snd_buf.page_len == 0)
  		rtype = rpcrdma_areadch;
  	else
  		rtype = rpcrdma_readch;
  
  	/* The following simplification is not true forever */
  	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
  		wtype = rpcrdma_noch;
  	BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
  
  	if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
  	    (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
  		/* forced to "pure inline"? */
  		dprintk("RPC:       %s: too much data (%d/%d) for inline
  ",
  			__func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
  		return -1;
  	}
  
  	hdrlen = 28; /*sizeof *headerp;*/
  	padlen = 0;
  
  	/*
  	 * Pull up any extra send data into the preregistered buffer.
  	 * When padding is in use and applies to the transfer, insert
  	 * it and change the message type.
  	 */
  	if (rtype == rpcrdma_noch) {
  
  		padlen = rpcrdma_inline_pullup(rqst,
  						RPCRDMA_INLINE_PAD_VALUE(rqst));
  
  		if (padlen) {
8d614434a   YOSHIFUJI Hideaki   [SUNRPC]: Use hto...
469
  			headerp->rm_type = htonl(RDMA_MSGP);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
470
471
472
  			headerp->rm_body.rm_padded.rm_align =
  				htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
  			headerp->rm_body.rm_padded.rm_thresh =
8d614434a   YOSHIFUJI Hideaki   [SUNRPC]: Use hto...
473
  				htonl(RPCRDMA_INLINE_PAD_THRESH);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
  			headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
  			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
  			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
  			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
  			BUG_ON(wtype != rpcrdma_noch);
  
  		} else {
  			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
  			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
  			headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
  			/* new length after pullup */
  			rpclen = rqst->rq_svec[0].iov_len;
  			/*
  			 * Currently we try to not actually use read inline.
  			 * Reply chunks have the desirable property that
  			 * they land, packed, directly in the target buffers
  			 * without headers, so they require no fixup. The
  			 * additional RDMA Write op sends the same amount
  			 * of data, streams on-the-wire and adds no overhead
  			 * on receive. Therefore, we request a reply chunk
  			 * for non-writes wherever feasible and efficient.
  			 */
  			if (wtype == rpcrdma_noch &&
  			    r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
  				wtype = rpcrdma_replych;
  		}
  	}
  
  	/*
  	 * Marshal chunks. This routine will return the header length
  	 * consumed by marshaling.
  	 */
  	if (rtype != rpcrdma_noch) {
  		hdrlen = rpcrdma_create_chunks(rqst,
  					&rqst->rq_snd_buf, headerp, rtype);
  		wtype = rtype;	/* simplify dprintk */
  
  	} else if (wtype != rpcrdma_noch) {
  		hdrlen = rpcrdma_create_chunks(rqst,
  					&rqst->rq_rcv_buf, headerp, wtype);
  	}
  
  	if (hdrlen == 0)
  		return -1;
5f37d561e   Tom Talpey   RPC/RDMA: reforma...
518
519
520
  	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
  		" headerp 0x%p base 0x%p lkey 0x%x
  ",
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
  		__func__, transfertypes[wtype], hdrlen, rpclen, padlen,
  		headerp, base, req->rl_iov.lkey);
  
  	/*
  	 * initialize send_iov's - normally only two: rdma chunk header and
  	 * single preregistered RPC header buffer, but if padding is present,
  	 * then use a preregistered (and zeroed) pad buffer between the RPC
  	 * header and any write data. In all non-rdma cases, any following
  	 * data has been copied into the RPC header buffer.
  	 */
  	req->rl_send_iov[0].addr = req->rl_iov.addr;
  	req->rl_send_iov[0].length = hdrlen;
  	req->rl_send_iov[0].lkey = req->rl_iov.lkey;
  
  	req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
  	req->rl_send_iov[1].length = rpclen;
  	req->rl_send_iov[1].lkey = req->rl_iov.lkey;
  
  	req->rl_niovs = 2;
  
  	if (padlen) {
  		struct rpcrdma_ep *ep = &r_xprt->rx_ep;
  
  		req->rl_send_iov[2].addr = ep->rep_pad.addr;
  		req->rl_send_iov[2].length = padlen;
  		req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
  
  		req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
  		req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
  		req->rl_send_iov[3].lkey = req->rl_iov.lkey;
  
  		req->rl_niovs = 4;
  	}
  
  	return 0;
  }
  
  /*
   * Chase down a received write or reply chunklist to get length
   * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
   */
  static int
d4b37ff73   Chuck Lever   SUNRPC: Fix an un...
563
  rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp)
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
564
565
566
567
568
569
570
571
572
573
574
575
576
  {
  	unsigned int i, total_len;
  	struct rpcrdma_write_chunk *cur_wchunk;
  
  	i = ntohl(**iptrp);	/* get array count */
  	if (i > max)
  		return -1;
  	cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
  	total_len = 0;
  	while (i--) {
  		struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
  		ifdebug(FACILITY) {
  			u64 off;
2d8a97266   Al Viro   SUNRPC endianness...
577
  			xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
578
579
580
581
  			dprintk("RPC:       %s: chunk %d@0x%llx:0x%x
  ",
  				__func__,
  				ntohl(seg->rs_length),
e08a132b0   Stephen Rothwell   [SUNRPC] rpc_rdma...
582
  				(unsigned long long)off,
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
583
584
585
586
587
588
589
  				ntohl(seg->rs_handle));
  		}
  		total_len += ntohl(seg->rs_length);
  		++cur_wchunk;
  	}
  	/* check and adjust for properly terminated write chunk */
  	if (wrchunk) {
2d8a97266   Al Viro   SUNRPC endianness...
590
  		__be32 *w = (__be32 *) cur_wchunk;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
591
592
593
594
595
596
  		if (*w++ != xdr_zero)
  			return -1;
  		cur_wchunk = (struct rpcrdma_write_chunk *) w;
  	}
  	if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
  		return -1;
2d8a97266   Al Viro   SUNRPC endianness...
597
  	*iptrp = (__be32 *) cur_wchunk;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
598
599
600
601
602
603
604
  	return total_len;
  }
  
  /*
   * Scatter inline received data back into provided iov's.
   */
  static void
9191ca3b3   Tom Talpey   RPC/RDMA: adhere ...
605
  rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
606
607
608
  {
  	int i, npages, curlen, olen;
  	char *destp;
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
609
610
  	struct page **ppages;
  	int page_base;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
  
  	curlen = rqst->rq_rcv_buf.head[0].iov_len;
  	if (curlen > copy_len) {	/* write chunk header fixup */
  		curlen = copy_len;
  		rqst->rq_rcv_buf.head[0].iov_len = curlen;
  	}
  
  	dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d
  ",
  		__func__, srcp, copy_len, curlen);
  
  	/* Shift pointer for first receive segment only */
  	rqst->rq_rcv_buf.head[0].iov_base = srcp;
  	srcp += curlen;
  	copy_len -= curlen;
  
  	olen = copy_len;
  	i = 0;
  	rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
630
631
632
  	page_base = rqst->rq_rcv_buf.page_base;
  	ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
  	page_base &= ~PAGE_MASK;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
633
  	if (copy_len && rqst->rq_rcv_buf.page_len) {
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
634
  		npages = PAGE_ALIGN(page_base +
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
635
636
  			rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
  		for (; i < npages; i++) {
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
637
  			curlen = PAGE_SIZE - page_base;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
638
639
640
641
642
643
  			if (curlen > copy_len)
  				curlen = copy_len;
  			dprintk("RPC:       %s: page %d"
  				" srcp 0x%p len %d curlen %d
  ",
  				__func__, i, srcp, copy_len, curlen);
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
644
645
646
  			destp = kmap_atomic(ppages[i], KM_SKB_SUNRPC_DATA);
  			memcpy(destp + page_base, srcp, curlen);
  			flush_dcache_page(ppages[i]);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
647
648
649
650
651
  			kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
  			srcp += curlen;
  			copy_len -= curlen;
  			if (copy_len == 0)
  				break;
bd7ea31b9   Tom Tucker   RPCRDMA: Fix to X...
652
  			page_base = 0;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
653
654
655
656
657
658
659
660
661
662
  		}
  		rqst->rq_rcv_buf.page_len = olen - copy_len;
  	} else
  		rqst->rq_rcv_buf.page_len = 0;
  
  	if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
  		curlen = copy_len;
  		if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
  			curlen = rqst->rq_rcv_buf.tail[0].iov_len;
  		if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
b38ab40ad   Tom Talpey   XPRTRDMA: correct...
663
  			memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
664
665
666
667
668
669
670
  		dprintk("RPC:       %s: tail srcp 0x%p len %d curlen %d
  ",
  			__func__, srcp, copy_len, curlen);
  		rqst->rq_rcv_buf.tail[0].iov_len = curlen;
  		copy_len -= curlen; ++i;
  	} else
  		rqst->rq_rcv_buf.tail[0].iov_len = 0;
9191ca3b3   Tom Talpey   RPC/RDMA: adhere ...
671
672
673
674
675
676
  	if (pad) {
  		/* implicit padding on terminal chunk */
  		unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base;
  		while (pad--)
  			p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0;
  	}
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
  	if (copy_len)
  		dprintk("RPC:       %s: %d bytes in"
  			" %d extra segments (%d lost)
  ",
  			__func__, olen, i, copy_len);
  
  	/* TBD avoid a warning from call_decode() */
  	rqst->rq_private_buf = rqst->rq_rcv_buf;
  }
  
  /*
   * This function is called when an async event is posted to
   * the connection which changes the connection state. All it
   * does at this point is mark the connection up/down, the rpc
   * timers do the rest.
   */
  void
  rpcrdma_conn_func(struct rpcrdma_ep *ep)
  {
  	struct rpc_xprt *xprt = ep->rep_xprt;
  
  	spin_lock_bh(&xprt->transport_lock);
575448bd3   Tom Talpey   RPC/RDMA: suppres...
699
700
  	if (++xprt->connect_cookie == 0)	/* maintain a reserved value */
  		++xprt->connect_cookie;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
701
702
703
704
705
  	if (ep->rep_connected > 0) {
  		if (!xprt_test_and_set_connected(xprt))
  			xprt_wake_pending_tasks(xprt, 0);
  	} else {
  		if (xprt_test_and_clear_connected(xprt))
926449ba6   Tom Talpey   RPC/RDMA: return ...
706
  			xprt_wake_pending_tasks(xprt, -ENOTCONN);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
  	}
  	spin_unlock_bh(&xprt->transport_lock);
  }
  
  /*
   * This function is called when memory window unbind which we are waiting
   * for completes. Just use rr_func (zeroed by upcall) to signal completion.
   */
  static void
  rpcrdma_unbind_func(struct rpcrdma_rep *rep)
  {
  	wake_up(&rep->rr_unbind);
  }
  
  /*
   * Called as a tasklet to do req/reply match and complete a request
   * Errors must result in the RPC task either being awakened, or
   * allowed to timeout, to discover the errors at that time.
   */
  void
  rpcrdma_reply_handler(struct rpcrdma_rep *rep)
  {
  	struct rpcrdma_msg *headerp;
  	struct rpcrdma_req *req;
  	struct rpc_rqst *rqst;
  	struct rpc_xprt *xprt = rep->rr_xprt;
  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
2d8a97266   Al Viro   SUNRPC endianness...
734
  	__be32 *iptr;
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
  	int i, rdmalen, status;
  
  	/* Check status. If bad, signal disconnect and return rep to pool */
  	if (rep->rr_len == ~0U) {
  		rpcrdma_recv_buffer_put(rep);
  		if (r_xprt->rx_ep.rep_connected == 1) {
  			r_xprt->rx_ep.rep_connected = -EIO;
  			rpcrdma_conn_func(&r_xprt->rx_ep);
  		}
  		return;
  	}
  	if (rep->rr_len < 28) {
  		dprintk("RPC:       %s: short/invalid reply
  ", __func__);
  		goto repost;
  	}
  	headerp = (struct rpcrdma_msg *) rep->rr_base;
  	if (headerp->rm_vers != xdr_one) {
  		dprintk("RPC:       %s: invalid version %d
  ",
  			__func__, ntohl(headerp->rm_vers));
  		goto repost;
  	}
  
  	/* Get XID and try for a match. */
  	spin_lock(&xprt->transport_lock);
  	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
  	if (rqst == NULL) {
  		spin_unlock(&xprt->transport_lock);
  		dprintk("RPC:       %s: reply 0x%p failed "
  			"to match any request xid 0x%08x len %d
  ",
  			__func__, rep, headerp->rm_xid, rep->rr_len);
  repost:
  		r_xprt->rx_stats.bad_reply_count++;
  		rep->rr_func = rpcrdma_reply_handler;
  		if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
  			rpcrdma_recv_buffer_put(rep);
  
  		return;
  	}
  
  	/* get request object */
  	req = rpcr_to_rdmar(rqst);
  
  	dprintk("RPC:       %s: reply 0x%p completes request 0x%p
  "
  		"                   RPC request 0x%p xid 0x%08x
  ",
  			__func__, rep, req, rqst, headerp->rm_xid);
  
  	BUG_ON(!req || req->rl_reply);
  
  	/* from here on, the reply is no longer an orphan */
  	req->rl_reply = rep;
  
  	/* check for expected message types */
  	/* The order of some of these tests is important. */
  	switch (headerp->rm_type) {
606780404   Arnaldo Carvalho de Melo   net: Use hton[sl]...
794
  	case htonl(RDMA_MSG):
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
  		/* never expect read chunks */
  		/* never expect reply chunks (two ways to check) */
  		/* never expect write chunks without having offered RDMA */
  		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
  		    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
  		     headerp->rm_body.rm_chunks[2] != xdr_zero) ||
  		    (headerp->rm_body.rm_chunks[1] != xdr_zero &&
  		     req->rl_nchunks == 0))
  			goto badheader;
  		if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
  			/* count any expected write chunks in read reply */
  			/* start at write chunk array count */
  			iptr = &headerp->rm_body.rm_chunks[2];
  			rdmalen = rpcrdma_count_chunks(rep,
  						req->rl_nchunks, 1, &iptr);
  			/* check for validity, and no reply chunk after */
  			if (rdmalen < 0 || *iptr++ != xdr_zero)
  				goto badheader;
  			rep->rr_len -=
  			    ((unsigned char *)iptr - (unsigned char *)headerp);
  			status = rep->rr_len + rdmalen;
  			r_xprt->rx_stats.total_rdma_reply += rdmalen;
9191ca3b3   Tom Talpey   RPC/RDMA: adhere ...
817
818
819
820
821
  			/* special case - last chunk may omit padding */
  			if (rdmalen &= 3) {
  				rdmalen = 4 - rdmalen;
  				status += rdmalen;
  			}
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
822
823
  		} else {
  			/* else ordinary inline */
9191ca3b3   Tom Talpey   RPC/RDMA: adhere ...
824
  			rdmalen = 0;
2d8a97266   Al Viro   SUNRPC endianness...
825
  			iptr = (__be32 *)((unsigned char *)headerp + 28);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
826
827
828
829
  			rep->rr_len -= 28; /*sizeof *headerp;*/
  			status = rep->rr_len;
  		}
  		/* Fix up the rpc results for upper layer */
9191ca3b3   Tom Talpey   RPC/RDMA: adhere ...
830
  		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
831
  		break;
606780404   Arnaldo Carvalho de Melo   net: Use hton[sl]...
832
  	case htonl(RDMA_NOMSG):
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
833
834
835
836
837
838
  		/* never expect read or write chunks, always reply chunks */
  		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
  		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
  		    headerp->rm_body.rm_chunks[2] != xdr_one ||
  		    req->rl_nchunks == 0)
  			goto badheader;
2d8a97266   Al Viro   SUNRPC endianness...
839
  		iptr = (__be32 *)((unsigned char *)headerp + 28);
e96018280   \"Talpey, Thomas\   RPCRDMA: rpc rdma...
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
  		rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
  		if (rdmalen < 0)
  			goto badheader;
  		r_xprt->rx_stats.total_rdma_reply += rdmalen;
  		/* Reply chunk buffer already is the reply vector - no fixup. */
  		status = rdmalen;
  		break;
  
  badheader:
  	default:
  		dprintk("%s: invalid rpcrdma reply header (type %d):"
  				" chunks[012] == %d %d %d"
  				" expected chunks <= %d
  ",
  				__func__, ntohl(headerp->rm_type),
  				headerp->rm_body.rm_chunks[0],
  				headerp->rm_body.rm_chunks[1],
  				headerp->rm_body.rm_chunks[2],
  				req->rl_nchunks);
  		status = -EIO;
  		r_xprt->rx_stats.bad_reply_count++;
  		break;
  	}
  
  	/* If using mw bind, start the deregister process now. */
  	/* (Note: if mr_free(), cannot perform it here, in tasklet context) */
  	if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
  	case RPCRDMA_MEMWINDOWS:
  		for (i = 0; req->rl_nchunks-- > 1;)
  			i += rpcrdma_deregister_external(
  				&req->rl_segments[i], r_xprt, NULL);
  		/* Optionally wait (not here) for unbinds to complete */
  		rep->rr_func = rpcrdma_unbind_func;
  		(void) rpcrdma_deregister_external(&req->rl_segments[i],
  						   r_xprt, rep);
  		break;
  	case RPCRDMA_MEMWINDOWS_ASYNC:
  		for (i = 0; req->rl_nchunks--;)
  			i += rpcrdma_deregister_external(&req->rl_segments[i],
  							 r_xprt, NULL);
  		break;
  	default:
  		break;
  	}
  
  	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)
  ",
  			__func__, xprt, rqst, status);
  	xprt_complete_rqst(rqst->rq_task, status);
  	spin_unlock(&xprt->transport_lock);
  }