Blame view
net/smc/smc_rx.c
11.6 KB
b24413180 License cleanup: ... |
1 |
// SPDX-License-Identifier: GPL-2.0 |
952310ccf smc: receive data... |
2 3 4 5 6 7 8 9 10 11 12 13 14 |
/* * Shared Memory Communications over RDMA (SMC-R) and RoCE * * Manage RMBE * copy new RMBE data into user space * * Copyright IBM Corp. 2016 * * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com> */ #include <linux/net.h> #include <linux/rcupdate.h> |
c3edc4010 sched/headers: Mo... |
15 |
#include <linux/sched/signal.h> |
952310ccf smc: receive data... |
16 17 18 19 20 21 22 |
#include <net/sock.h> #include "smc.h" #include "smc_core.h" #include "smc_cdc.h" #include "smc_tx.h" /* smc_tx_consumer_update() */ #include "smc_rx.h" |
b51fa1b13 smc: make smc_rx_... |
23 |
/* callback implementation to wakeup consumers blocked with smc_rx_wait(). |
952310ccf smc: receive data... |
24 25 |
* indirectly called by smc_cdc_msg_recv_action(). */ |
b51fa1b13 smc: make smc_rx_... |
26 |
static void smc_rx_wake_up(struct sock *sk) |
952310ccf smc: receive data... |
27 28 29 30 31 32 33 34 |
{ struct socket_wq *wq; /* derived from sock_def_readable() */ /* called already in smc_listen_work() */ rcu_read_lock(); wq = rcu_dereference(sk->sk_wq); if (skwq_has_sleeper(wq)) |
a9a08845e vfs: do bulk POLL... |
35 36 |
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI | EPOLLRDNORM | EPOLLRDBAND); |
90e9517ed net/smc: always c... |
37 |
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); |
952310ccf smc: receive data... |
38 39 40 |
if ((sk->sk_shutdown == SHUTDOWN_MASK) || (sk->sk_state == SMC_CLOSED)) sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP); |
952310ccf smc: receive data... |
41 42 |
rcu_read_unlock(); } |
9014db202 smc: add support ... |
43 44 45 46 |
/* Update consumer cursor * @conn connection to update * @cons consumer cursor * @len number of Bytes consumed |
de8474eb9 net/smc: urgent d... |
47 48 |
* Returns: * 1 if we should end our receive, 0 otherwise |
9014db202 smc: add support ... |
49 |
*/ |
de8474eb9 net/smc: urgent d... |
50 51 |
static int smc_rx_update_consumer(struct smc_sock *smc, union smc_host_cursor cons, size_t len) |
9014db202 smc: add support ... |
52 |
{ |
de8474eb9 net/smc: urgent d... |
53 54 55 56 |
struct smc_connection *conn = &smc->conn; struct sock *sk = &smc->sk; bool force = false; int diff, rc = 0; |
69cb7dc02 net/smc: add comm... |
57 |
smc_curs_add(conn->rmb_desc->len, &cons, len); |
de8474eb9 net/smc: urgent d... |
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
/* did we process urgent data? */ if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) { diff = smc_curs_comp(conn->rmb_desc->len, &cons, &conn->urg_curs); if (sock_flag(sk, SOCK_URGINLINE)) { if (diff == 0) { force = true; rc = 1; conn->urg_state = SMC_URG_READ; } } else { if (diff == 1) { /* skip urgent byte */ force = true; smc_curs_add(conn->rmb_desc->len, &cons, 1); conn->urg_rx_skip_pend = false; } else if (diff < -1) /* we read past urgent byte */ conn->urg_state = SMC_URG_READ; } } |
bac6de7b6 net/smc: eliminat... |
80 |
smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn); |
de8474eb9 net/smc: urgent d... |
81 |
|
9014db202 smc: add support ... |
82 83 |
/* send consumer cursor update if required */ /* similar to advertising new TCP rcv_wnd if required */ |
de8474eb9 net/smc: urgent d... |
84 85 86 87 88 89 90 91 92 |
smc_tx_consumer_update(conn, force); return rc; } static void smc_rx_update_cons(struct smc_sock *smc, size_t len) { struct smc_connection *conn = &smc->conn; union smc_host_cursor cons; |
bac6de7b6 net/smc: eliminat... |
93 |
smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); |
de8474eb9 net/smc: urgent d... |
94 |
smc_rx_update_consumer(smc, cons, len); |
9014db202 smc: add support ... |
95 96 97 98 99 100 101 102 103 104 105 106 107 |
} struct smc_spd_priv { struct smc_sock *smc; size_t len; }; static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe, struct pipe_buffer *buf) { struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private; struct smc_sock *smc = priv->smc; struct smc_connection *conn; |
9014db202 smc: add support ... |
108 109 110 111 112 113 114 115 |
struct sock *sk = &smc->sk; if (sk->sk_state == SMC_CLOSED || sk->sk_state == SMC_PEERFINCLOSEWAIT || sk->sk_state == SMC_APPFINCLOSEWAIT) goto out; conn = &smc->conn; lock_sock(sk); |
de8474eb9 net/smc: urgent d... |
116 |
smc_rx_update_cons(smc, priv->len); |
9014db202 smc: add support ... |
117 118 119 120 121 122 123 124 |
release_sock(sk); if (atomic_sub_and_test(priv->len, &conn->splice_pending)) smc_rx_wake_up(sk); out: kfree(priv); put_page(buf->page); sock_put(sk); } |
9014db202 smc: add support ... |
125 |
static const struct pipe_buf_operations smc_pipe_ops = { |
9014db202 smc: add support ... |
126 |
.release = smc_rx_pipe_buf_release, |
9014db202 smc: add support ... |
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
.get = generic_pipe_buf_get }; static void smc_rx_spd_release(struct splice_pipe_desc *spd, unsigned int i) { put_page(spd->pages[i]); } static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len, struct smc_sock *smc) { struct splice_pipe_desc spd; struct partial_page partial; struct smc_spd_priv *priv; |
9014db202 smc: add support ... |
142 |
int bytes; |
9014db202 smc: add support ... |
143 144 145 146 147 148 149 150 151 152 153 |
priv = kzalloc(sizeof(*priv), GFP_KERNEL); if (!priv) return -ENOMEM; priv->len = len; priv->smc = smc; partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr; partial.len = len; partial.private = (unsigned long)priv; spd.nr_pages_max = 1; spd.nr_pages = 1; |
48bf52317 net/smc: remove l... |
154 |
spd.pages = &smc->conn.rmb_desc->pages; |
9014db202 smc: add support ... |
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
spd.partial = &partial; spd.ops = &smc_pipe_ops; spd.spd_release = smc_rx_spd_release; bytes = splice_to_pipe(pipe, &spd); if (bytes > 0) { sock_hold(&smc->sk); get_page(smc->conn.rmb_desc->pages); atomic_add(bytes, &smc->conn.splice_pending); } return bytes; } static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn) { return atomic_read(&conn->bytes_to_rcv) && !atomic_read(&conn->splice_pending); } |
952310ccf smc: receive data... |
174 175 176 |
/* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted * @smc smc socket * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout |
b51fa1b13 smc: make smc_rx_... |
177 |
* @fcrit add'l criterion to evaluate as function pointer |
952310ccf smc: receive data... |
178 179 180 181 |
* Returns: * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown. * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted). */ |
b51fa1b13 smc: make smc_rx_... |
182 183 |
int smc_rx_wait(struct smc_sock *smc, long *timeo, int (*fcrit)(struct smc_connection *conn)) |
952310ccf smc: receive data... |
184 185 186 |
{ DEFINE_WAIT_FUNC(wait, woken_wake_function); struct smc_connection *conn = &smc->conn; |
b29009809 net/smc: cancel s... |
187 188 |
struct smc_cdc_conn_state_flags *cflags = &conn->local_tx_ctrl.conn_state_flags; |
952310ccf smc: receive data... |
189 190 |
struct sock *sk = &smc->sk; int rc; |
b51fa1b13 smc: make smc_rx_... |
191 |
if (fcrit(conn)) |
952310ccf smc: receive data... |
192 193 194 195 196 |
return 1; sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk); add_wait_queue(sk_sleep(sk), &wait); rc = sk_wait_event(sk, timeo, sk->sk_err || |
b29009809 net/smc: cancel s... |
197 |
cflags->peer_conn_abort || |
952310ccf smc: receive data... |
198 |
sk->sk_shutdown & RCV_SHUTDOWN || |
b29009809 net/smc: cancel s... |
199 |
conn->killed || |
882dcfe5a net/smc: receive ... |
200 |
fcrit(conn), |
952310ccf smc: receive data... |
201 202 203 204 205 |
&wait); remove_wait_queue(sk_sleep(sk), &wait); sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk); return rc; } |
de8474eb9 net/smc: urgent d... |
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, int flags) { struct smc_connection *conn = &smc->conn; union smc_host_cursor cons; struct sock *sk = &smc->sk; int rc = 0; if (sock_flag(sk, SOCK_URGINLINE) || !(conn->urg_state == SMC_URG_VALID) || conn->urg_state == SMC_URG_READ) return -EINVAL; if (conn->urg_state == SMC_URG_VALID) { if (!(flags & MSG_PEEK)) smc->conn.urg_state = SMC_URG_READ; msg->msg_flags |= MSG_OOB; if (len > 0) { if (!(flags & MSG_TRUNC)) rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1); len = 1; |
bac6de7b6 net/smc: eliminat... |
227 |
smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); |
de8474eb9 net/smc: urgent d... |
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
if (smc_curs_diff(conn->rmb_desc->len, &cons, &conn->urg_curs) > 1) conn->urg_rx_skip_pend = true; /* Urgent Byte was already accounted for, but trigger * skipping the urgent byte in non-inline case */ if (!(flags & MSG_PEEK)) smc_rx_update_consumer(smc, cons, 0); } else { msg->msg_flags |= MSG_TRUNC; } return rc ? -EFAULT : len; } if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN) return 0; return -EAGAIN; } |
107529e31 net/smc: receive ... |
248 249 250 251 252 253 254 255 256 257 258 |
static bool smc_rx_recvmsg_data_available(struct smc_sock *smc) { struct smc_connection *conn = &smc->conn; if (smc_rx_data_available(conn)) return true; else if (conn->urg_state == SMC_URG_VALID) /* we received a single urgent Byte - skip */ smc_rx_update_cons(smc, 0); return false; } |
9014db202 smc: add support ... |
259 260 261 262 263 264 |
/* smc_rx_recvmsg - receive data from RMBE * @msg: copy data to receive buffer * @pipe: copy data to pipe if set - indicates splice() call * * rcvbuf consumer: main API called by socket layer. * Called under sk lock. |
952310ccf smc: receive data... |
265 |
*/ |
9014db202 smc: add support ... |
266 267 |
int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, struct pipe_inode_info *pipe, size_t len, int flags) |
952310ccf smc: receive data... |
268 269 270 271 |
{ size_t copylen, read_done = 0, read_remaining = len; size_t chunk_len, chunk_off, chunk_len_sum; struct smc_connection *conn = &smc->conn; |
9014db202 smc: add support ... |
272 |
int (*func)(struct smc_connection *conn); |
952310ccf smc: receive data... |
273 274 275 276 |
union smc_host_cursor cons; int readable, chunk; char *rcvbuf_base; struct sock *sk; |
9014db202 smc: add support ... |
277 |
int splbytes; |
952310ccf smc: receive data... |
278 279 280 281 282 283 |
long timeo; int target; /* Read at least these many bytes */ int rc; if (unlikely(flags & MSG_ERRQUEUE)) return -EINVAL; /* future work for sk.sk_family == AF_SMC */ |
952310ccf smc: receive data... |
284 285 286 287 |
sk = &smc->sk; if (sk->sk_state == SMC_LISTEN) return -ENOTCONN; |
de8474eb9 net/smc: urgent d... |
288 289 |
if (flags & MSG_OOB) return smc_rx_recv_urg(smc, msg, len, flags); |
952310ccf smc: receive data... |
290 291 |
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); |
952310ccf smc: receive data... |
292 |
/* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */ |
be244f28d net/smc: add SMC-... |
293 |
rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr; |
952310ccf smc: receive data... |
294 295 |
do { /* while (read_remaining) */ |
9014db202 smc: add support ... |
296 |
if (read_done >= target || (pipe && read_done)) |
952310ccf smc: receive data... |
297 |
break; |
b29009809 net/smc: cancel s... |
298 299 |
if (conn->killed) break; |
107529e31 net/smc: receive ... |
300 |
if (smc_rx_recvmsg_data_available(smc)) |
952310ccf smc: receive data... |
301 |
goto copy; |
b29009809 net/smc: cancel s... |
302 |
if (sk->sk_shutdown & RCV_SHUTDOWN) { |
107529e31 net/smc: receive ... |
303 304 305 306 307 |
/* smc_cdc_msg_recv_action() could have run after * above smc_rx_recvmsg_data_available() */ if (smc_rx_recvmsg_data_available(smc)) goto copy; |
c8b8ec8e0 smc: simplify abo... |
308 |
break; |
107529e31 net/smc: receive ... |
309 |
} |
c8b8ec8e0 smc: simplify abo... |
310 |
|
952310ccf smc: receive data... |
311 312 313 |
if (read_done) { if (sk->sk_err || sk->sk_state == SMC_CLOSED || |
952310ccf smc: receive data... |
314 |
!timeo || |
c8b8ec8e0 smc: simplify abo... |
315 |
signal_pending(current)) |
952310ccf smc: receive data... |
316 317 |
break; } else { |
952310ccf smc: receive data... |
318 319 320 321 |
if (sk->sk_err) { read_done = sock_error(sk); break; } |
952310ccf smc: receive data... |
322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
if (sk->sk_state == SMC_CLOSED) { if (!sock_flag(sk, SOCK_DONE)) { /* This occurs when user tries to read * from never connected socket. */ read_done = -ENOTCONN; break; } break; } if (signal_pending(current)) { read_done = sock_intr_errno(timeo); break; } |
846e344eb net/smc: add rece... |
336 337 |
if (!timeo) return -EAGAIN; |
952310ccf smc: receive data... |
338 |
} |
b51fa1b13 smc: make smc_rx_... |
339 340 |
if (!smc_rx_data_available(conn)) { smc_rx_wait(smc, &timeo, smc_rx_data_available); |
952310ccf smc: receive data... |
341 342 343 344 345 |
continue; } copy: /* initialize variables for 1st iteration of subsequent loop */ |
b51fa1b13 smc: make smc_rx_... |
346 |
/* could be just 1 byte, even after waiting on data above */ |
952310ccf smc: receive data... |
347 |
readable = atomic_read(&conn->bytes_to_rcv); |
9014db202 smc: add support ... |
348 349 350 351 352 353 354 355 356 |
splbytes = atomic_read(&conn->splice_pending); if (!readable || (msg && splbytes)) { if (splbytes) func = smc_rx_data_available_and_no_splice_pend; else func = smc_rx_data_available; smc_rx_wait(smc, &timeo, func); continue; } |
bac6de7b6 net/smc: eliminat... |
357 |
smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); |
9014db202 smc: add support ... |
358 359 |
/* subsequent splice() calls pick up where previous left */ if (splbytes) |
69cb7dc02 net/smc: add comm... |
360 |
smc_curs_add(conn->rmb_desc->len, &cons, splbytes); |
de8474eb9 net/smc: urgent d... |
361 362 363 364 365 366 |
if (conn->urg_state == SMC_URG_VALID && sock_flag(&smc->sk, SOCK_URGINLINE) && readable > 1) readable--; /* always stop at urgent Byte */ /* not more than what user space asked for */ copylen = min_t(size_t, read_remaining, readable); |
952310ccf smc: receive data... |
367 368 |
/* determine chunks where to read from rcvbuf */ /* either unwrapped case, or 1st chunk of wrapped case */ |
69cb7dc02 net/smc: add comm... |
369 370 |
chunk_len = min_t(size_t, copylen, conn->rmb_desc->len - cons.count); |
952310ccf smc: receive data... |
371 372 |
chunk_len_sum = chunk_len; chunk_off = cons.count; |
10428dd83 net/smc: synchron... |
373 |
smc_rmb_sync_sg_for_cpu(conn); |
952310ccf smc: receive data... |
374 375 |
for (chunk = 0; chunk < 2; chunk++) { if (!(flags & MSG_TRUNC)) { |
9014db202 smc: add support ... |
376 377 378 379 380 381 382 383 384 385 |
if (msg) { rc = memcpy_to_msg(msg, rcvbuf_base + chunk_off, chunk_len); } else { rc = smc_rx_splice(pipe, rcvbuf_base + chunk_off, chunk_len, smc); } if (rc < 0) { |
952310ccf smc: receive data... |
386 387 |
if (!read_done) read_done = -EFAULT; |
10428dd83 net/smc: synchron... |
388 |
smc_rmb_sync_sg_for_device(conn); |
952310ccf smc: receive data... |
389 390 391 392 393 394 395 396 397 398 399 400 401 |
goto out; } } read_remaining -= chunk_len; read_done += chunk_len; if (chunk_len_sum == copylen) break; /* either on 1st or 2nd iteration */ /* prepare next (== 2nd) iteration */ chunk_len = copylen - chunk_len; /* remainder */ chunk_len_sum += chunk_len; chunk_off = 0; /* modulo offset in recv ring buffer */ } |
10428dd83 net/smc: synchron... |
402 |
smc_rmb_sync_sg_for_device(conn); |
952310ccf smc: receive data... |
403 404 405 |
/* update cursors */ if (!(flags & MSG_PEEK)) { |
952310ccf smc: receive data... |
406 407 408 |
/* increased in recv tasklet smc_cdc_msg_rcv() */ smp_mb__before_atomic(); atomic_sub(copylen, &conn->bytes_to_rcv); |
69cb7dc02 net/smc: add comm... |
409 |
/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ |
952310ccf smc: receive data... |
410 |
smp_mb__after_atomic(); |
de8474eb9 net/smc: urgent d... |
411 412 |
if (msg && smc_rx_update_consumer(smc, cons, copylen)) goto out; |
952310ccf smc: receive data... |
413 414 415 416 417 418 419 420 421 |
} } while (read_remaining); out: return read_done; } /* Initialize receive properties on connection establishment. NB: not __init! */ void smc_rx_init(struct smc_sock *smc) { |
b51fa1b13 smc: make smc_rx_... |
422 |
smc->sk.sk_data_ready = smc_rx_wake_up; |
9014db202 smc: add support ... |
423 |
atomic_set(&smc->conn.splice_pending, 0); |
de8474eb9 net/smc: urgent d... |
424 |
smc->conn.urg_state = SMC_URG_READ; |
952310ccf smc: receive data... |
425 |
} |