Commit 808012fbb23a52ec59352445d2076d175ad4ab26

Authored by Chuck Lever
Committed by Trond Myklebust
1 parent 262965f53d

[PATCH] RPC: skip over transport-specific heads automatically

Add a generic mechanism for skipping over transport-specific headers
 when constructing an RPC request.  This removes another "xprt->stream"
 dependency.

 Test-plan:
 Write-intensive workload on a single mount point (try both UDP and
 TCP).

 Signed-off-by: Chuck Lever <cel@netapp.com>
 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>

Showing 5 changed files with 53 additions and 14 deletions Side-by-side Diff

include/linux/sunrpc/msg_prot.h
... ... @@ -76,6 +76,31 @@
76 76  
77 77 #define RPC_MAXNETNAMELEN 256
78 78  
  79 +/*
  80 + * From RFC 1831:
  81 + *
  82 + * "A record is composed of one or more record fragments. A record
  83 + * fragment is a four-byte header followed by 0 to (2**31) - 1 bytes of
  84 + * fragment data. The bytes encode an unsigned binary number; as with
  85 + * XDR integers, the byte order is from highest to lowest. The number
  86 + * encodes two values -- a boolean which indicates whether the fragment
  87 + * is the last fragment of the record (bit value 1 implies the fragment
  88 + * is the last fragment) and a 31-bit unsigned binary value which is the
  89 + * length in bytes of the fragment's data. The boolean value is the
  90 + * highest-order bit of the header; the length is the 31 low-order bits.
  91 + * (Note that this record specification is NOT in XDR standard form!)"
  92 + *
  93 + * The Linux RPC client always sends its requests in a single record
  94 + * fragment, limiting the maximum payload size for stream transports to
  95 + * 2GB.
  96 + */
  97 +
  98 +typedef u32 rpc_fraghdr;
  99 +
  100 +#define RPC_LAST_STREAM_FRAGMENT (1U << 31)
  101 +#define RPC_FRAGMENT_SIZE_MASK (~RPC_LAST_STREAM_FRAGMENT)
  102 +#define RPC_MAX_FRAGMENT_SIZE ((1U << 31) - 1)
  103 +
79 104 #endif /* __KERNEL__ */
80 105 #endif /* _LINUX_SUNRPC_MSGPROT_H_ */
include/linux/sunrpc/xprt.h
... ... @@ -155,6 +155,8 @@
155 155  
156 156 size_t max_payload; /* largest RPC payload size,
157 157 in bytes */
  158 + unsigned int tsh_size; /* size of transport specific
  159 + header */
158 160  
159 161 struct rpc_wait_queue sending; /* requests waiting to send */
160 162 struct rpc_wait_queue resend; /* requests waiting to resend */
... ... @@ -235,6 +237,11 @@
235 237 int xprt_adjust_timeout(struct rpc_rqst *req);
236 238 void xprt_release(struct rpc_task *task);
237 239 int xprt_destroy(struct rpc_xprt *xprt);
  240 +
  241 +static inline u32 *xprt_skip_transport_header(struct rpc_xprt *xprt, u32 *p)
  242 +{
  243 + return p + xprt->tsh_size;
  244 +}
238 245  
239 246 /*
240 247 * Transport switch helper functions
net/sunrpc/auth_gss/auth_gss.c
... ... @@ -844,10 +844,8 @@
844 844  
845 845 /* We compute the checksum for the verifier over the xdr-encoded bytes
846 846 * starting with the xid and ending at the end of the credential: */
847   - iov.iov_base = req->rq_snd_buf.head[0].iov_base;
848   - if (task->tk_client->cl_xprt->stream)
849   - /* See clnt.c:call_header() */
850   - iov.iov_base += 4;
  847 + iov.iov_base = xprt_skip_transport_header(task->tk_xprt,
  848 + req->rq_snd_buf.head[0].iov_base);
851 849 iov.iov_len = (u8 *)p - (u8 *)iov.iov_base;
852 850 xdr_buf_from_iov(&iov, &verf_buf);
853 851  
... ... @@ -1075,13 +1075,12 @@
1075 1075 call_header(struct rpc_task *task)
1076 1076 {
1077 1077 struct rpc_clnt *clnt = task->tk_client;
1078   - struct rpc_xprt *xprt = clnt->cl_xprt;
1079 1078 struct rpc_rqst *req = task->tk_rqstp;
1080 1079 u32 *p = req->rq_svec[0].iov_base;
1081 1080  
1082 1081 /* FIXME: check buffer size? */
1083   - if (xprt->stream)
1084   - *p++ = 0; /* fill in later */
  1082 +
  1083 + p = xprt_skip_transport_header(task->tk_xprt, p);
1085 1084 *p++ = req->rq_xid; /* XID */
1086 1085 *p++ = htonl(RPC_CALL); /* CALL */
1087 1086 *p++ = htonl(RPC_VERSION); /* RPC version */
net/sunrpc/xprtsock.c
... ... @@ -282,6 +282,13 @@
282 282 return status;
283 283 }
284 284  
  285 +static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
  286 +{
  287 + u32 reclen = buf->len - sizeof(rpc_fraghdr);
  288 + rpc_fraghdr *base = buf->head[0].iov_base;
  289 + *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
  290 +}
  291 +
285 292 /**
286 293 * xs_tcp_send_request - write an RPC request to a TCP socket
287 294 * @task: address of RPC task that manages the state of an RPC request
288 295  
... ... @@ -301,11 +308,9 @@
301 308 struct rpc_rqst *req = task->tk_rqstp;
302 309 struct rpc_xprt *xprt = req->rq_xprt;
303 310 struct xdr_buf *xdr = &req->rq_snd_buf;
304   - u32 *marker = req->rq_svec[0].iov_base;
305 311 int status, retry = 0;
306 312  
307   - /* Write the record marker */
308   - *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
  313 + xs_encode_tcp_record_marker(&req->rq_snd_buf);
309 314  
310 315 xs_pktdump("packet data:",
311 316 req->rq_svec->iov_base,
312 317  
313 318  
314 319  
315 320  
... ... @@ -503,16 +508,19 @@
503 508 xprt->tcp_offset += used;
504 509 if (used != len)
505 510 return;
  511 +
506 512 xprt->tcp_reclen = ntohl(xprt->tcp_recm);
507   - if (xprt->tcp_reclen & 0x80000000)
  513 + if (xprt->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
508 514 xprt->tcp_flags |= XPRT_LAST_FRAG;
509 515 else
510 516 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
511   - xprt->tcp_reclen &= 0x7fffffff;
  517 + xprt->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
  518 +
512 519 xprt->tcp_flags &= ~XPRT_COPY_RECM;
513 520 xprt->tcp_offset = 0;
  521 +
514 522 /* Sanity check of the record length */
515   - if (xprt->tcp_reclen < 4) {
  523 + if (unlikely(xprt->tcp_reclen < 4)) {
516 524 dprintk("RPC: invalid TCP record fragment length\n");
517 525 xprt_disconnect(xprt);
518 526 return;
... ... @@ -1065,6 +1073,7 @@
1065 1073  
1066 1074 xprt->prot = IPPROTO_UDP;
1067 1075 xprt->port = XS_MAX_RESVPORT;
  1076 + xprt->tsh_size = 0;
1068 1077 xprt->stream = 0;
1069 1078 xprt->nocong = 0;
1070 1079 xprt->cwnd = RPC_INITCWND;
1071 1080  
... ... @@ -1105,11 +1114,12 @@
1105 1114  
1106 1115 xprt->prot = IPPROTO_TCP;
1107 1116 xprt->port = XS_MAX_RESVPORT;
  1117 + xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1108 1118 xprt->stream = 1;
1109 1119 xprt->nocong = 1;
1110 1120 xprt->cwnd = RPC_MAXCWND(xprt);
1111 1121 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1112   - xprt->max_payload = (1U << 31) - 1;
  1122 + xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1113 1123  
1114 1124 INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
1115 1125