Commit 0c3141e910eaaa0b617e2f26c69b266d1cd1f035

Authored by Allan Stephens
Committed by David S. Miller
1 parent b89741a0cc

[TIPC]: Overhaul of socket locking logic

This patch modifies TIPC's socket code to follow the same approach
used by other protocols.  This change eliminates the need for a
mutex in the TIPC-specific portion of the socket protocol data
structure -- in its place, the standard Linux socket backlog queue
and associated locking routines are utilized.  These changes fix
a long-standing receive queue bug on SMP systems, and also enable
individual read and write threads to utilize a socket without
unnecessarily interfering with each other.

Signed-off-by: Allan Stephens <allan.stephens@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

Showing 3 changed files with 608 additions and 431 deletions Side-by-side Diff

include/net/tipc/tipc_port.h
... ... @@ -96,6 +96,12 @@
96 96  
97 97 void *tipc_get_handle(const u32 ref);
98 98  
  99 +/*
  100 + * The following routines require that the port be locked on entry
  101 + */
  102 +
  103 +int tipc_disconnect_port(struct tipc_port *tp_ptr);
  104 +
99 105  
100 106 #endif
101 107  
... ... @@ -1240,6 +1240,28 @@
1240 1240 return res;
1241 1241 }
1242 1242  
  1243 +/**
  1244 + * tipc_disconnect_port - disconnect port from peer
  1245 + *
  1246 + * Port must be locked.
  1247 + */
  1248 +
  1249 +int tipc_disconnect_port(struct tipc_port *tp_ptr)
  1250 +{
  1251 + int res;
  1252 +
  1253 + if (tp_ptr->connected) {
  1254 + tp_ptr->connected = 0;
  1255 + /* let timer expire on it's own to avoid deadlock! */
  1256 + tipc_nodesub_unsubscribe(
  1257 + &((struct port *)tp_ptr)->subscription);
  1258 + res = TIPC_OK;
  1259 + } else {
  1260 + res = -ENOTCONN;
  1261 + }
  1262 + return res;
  1263 +}
  1264 +
1243 1265 /*
1244 1266 * tipc_disconnect(): Disconnect port form peer.
1245 1267 * This is a node local operation.
1246 1268  
... ... @@ -1248,17 +1270,12 @@
1248 1270 int tipc_disconnect(u32 ref)
1249 1271 {
1250 1272 struct port *p_ptr;
1251   - int res = -ENOTCONN;
  1273 + int res;
1252 1274  
1253 1275 p_ptr = tipc_port_lock(ref);
1254 1276 if (!p_ptr)
1255 1277 return -EINVAL;
1256   - if (p_ptr->publ.connected) {
1257   - p_ptr->publ.connected = 0;
1258   - /* let timer expire on it's own to avoid deadlock! */
1259   - tipc_nodesub_unsubscribe(&p_ptr->subscription);
1260   - res = TIPC_OK;
1261   - }
  1278 + res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1262 1279 tipc_port_unlock(p_ptr);
1263 1280 return res;
1264 1281 }
Changes suppressed. Click to show
... ... @@ -43,7 +43,6 @@
43 43 #include <linux/slab.h>
44 44 #include <linux/poll.h>
45 45 #include <linux/fcntl.h>
46   -#include <linux/mutex.h>
47 46 #include <asm/string.h>
48 47 #include <asm/atomic.h>
49 48 #include <net/sock.h>
50 49  
51 50  
... ... @@ -64,11 +63,12 @@
64 63 struct tipc_sock {
65 64 struct sock sk;
66 65 struct tipc_port *p;
67   - struct mutex lock;
68 66 };
69 67  
70   -#define tipc_sk(sk) ((struct tipc_sock*)sk)
  68 +#define tipc_sk(sk) ((struct tipc_sock *)(sk))
  69 +#define tipc_sk_port(sk) ((struct tipc_port *)(tipc_sk(sk)->p))
71 70  
  71 +static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
72 72 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
73 73 static void wakeupdispatch(struct tipc_port *tport);
74 74  
75 75  
76 76  
77 77  
78 78  
79 79  
80 80  
81 81  
82 82  
83 83  
84 84  
85 85  
86 86  
87 87  
88 88  
89 89  
90 90  
91 91  
92 92  
93 93  
94 94  
95 95  
96 96  
97 97  
98 98  
99 99  
100 100  
101 101  
102 102  
103 103  
104 104  
105 105  
... ... @@ -82,109 +82,172 @@
82 82  
83 83 static atomic_t tipc_queue_size = ATOMIC_INIT(0);
84 84  
85   -
86 85 /*
87   - * sock_lock(): Lock a port/socket pair. lock_sock() can
88   - * not be used here, since the same lock must protect ports
89   - * with non-socket interfaces.
90   - * See net.c for description of locking policy.
  86 + * Revised TIPC socket locking policy:
  87 + *
  88 + * Most socket operations take the standard socket lock when they start
  89 + * and hold it until they finish (or until they need to sleep). Acquiring
  90 + * this lock grants the owner exclusive access to the fields of the socket
  91 + * data structures, with the exception of the backlog queue. A few socket
  92 + * operations can be done without taking the socket lock because they only
  93 + * read socket information that never changes during the life of the socket.
  94 + *
  95 + * Socket operations may acquire the lock for the associated TIPC port if they
  96 + * need to perform an operation on the port. If any routine needs to acquire
  97 + * both the socket lock and the port lock it must take the socket lock first
  98 + * to avoid the risk of deadlock.
  99 + *
  100 + * The dispatcher handling incoming messages cannot grab the socket lock in
  101 + * the standard fashion, since invoked it runs at the BH level and cannot block.
  102 + * Instead, it checks to see if the socket lock is currently owned by someone,
  103 + * and either handles the message itself or adds it to the socket's backlog
  104 + * queue; in the latter case the queued message is processed once the process
  105 + * owning the socket lock releases it.
  106 + *
  107 + * NOTE: Releasing the socket lock while an operation is sleeping overcomes
  108 + * the problem of a blocked socket operation preventing any other operations
  109 + * from occurring. However, applications must be careful if they have
  110 + * multiple threads trying to send (or receive) on the same socket, as these
  111 + * operations might interfere with each other. For example, doing a connect
  112 + * and a receive at the same time might allow the receive to consume the
  113 + * ACK message meant for the connect. While additional work could be done
  114 + * to try and overcome this, it doesn't seem to be worthwhile at the present.
  115 + *
  116 + * NOTE: Releasing the socket lock while an operation is sleeping also ensures
  117 + * that another operation that must be performed in a non-blocking manner is
  118 + * not delayed for very long because the lock has already been taken.
  119 + *
  120 + * NOTE: This code assumes that certain fields of a port/socket pair are
  121 + * constant over its lifetime; such fields can be examined without taking
  122 + * the socket lock and/or port lock, and do not need to be re-read even
  123 + * after resuming processing after waiting. These fields include:
  124 + * - socket type
  125 + * - pointer to socket sk structure (aka tipc_sock structure)
  126 + * - pointer to port structure
  127 + * - port reference
91 128 */
92   -static void sock_lock(struct tipc_sock* tsock)
  129 +
  130 +/**
  131 + * advance_rx_queue - discard first buffer in socket receive queue
  132 + *
  133 + * Caller must hold socket lock
  134 + */
  135 +
  136 +static void advance_rx_queue(struct sock *sk)
93 137 {
94   - spin_lock_bh(tsock->p->lock);
  138 + buf_discard(__skb_dequeue(&sk->sk_receive_queue));
  139 + atomic_dec(&tipc_queue_size);
95 140 }
96 141  
97   -/*
98   - * sock_unlock(): Unlock a port/socket pair
  142 +/**
  143 + * discard_rx_queue - discard all buffers in socket receive queue
  144 + *
  145 + * Caller must hold socket lock
99 146 */
100   -static void sock_unlock(struct tipc_sock* tsock)
  147 +
  148 +static void discard_rx_queue(struct sock *sk)
101 149 {
102   - spin_unlock_bh(tsock->p->lock);
  150 + struct sk_buff *buf;
  151 +
  152 + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
  153 + atomic_dec(&tipc_queue_size);
  154 + buf_discard(buf);
  155 + }
103 156 }
104 157  
105 158 /**
106   - * advance_queue - discard first buffer in queue
107   - * @tsock: TIPC socket
  159 + * reject_rx_queue - reject all buffers in socket receive queue
  160 + *
  161 + * Caller must hold socket lock
108 162 */
109 163  
110   -static void advance_queue(struct tipc_sock *tsock)
  164 +static void reject_rx_queue(struct sock *sk)
111 165 {
112   - sock_lock(tsock);
113   - buf_discard(skb_dequeue(&tsock->sk.sk_receive_queue));
114   - sock_unlock(tsock);
115   - atomic_dec(&tipc_queue_size);
  166 + struct sk_buff *buf;
  167 +
  168 + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
  169 + tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
  170 + atomic_dec(&tipc_queue_size);
  171 + }
116 172 }
117 173  
118 174 /**
119 175 * tipc_create - create a TIPC socket
  176 + * @net: network namespace (must be default network)
120 177 * @sock: pre-allocated socket structure
121 178 * @protocol: protocol indicator (must be 0)
122 179 *
123   - * This routine creates and attaches a 'struct sock' to the 'struct socket',
124   - * then create and attaches a TIPC port to the 'struct sock' part.
  180 + * This routine creates additional data structures used by the TIPC socket,
  181 + * initializes them, and links them together.
125 182 *
126 183 * Returns 0 on success, errno otherwise
127 184 */
  185 +
128 186 static int tipc_create(struct net *net, struct socket *sock, int protocol)
129 187 {
130   - struct tipc_sock *tsock;
131   - struct tipc_port *port;
  188 + const struct proto_ops *ops;
  189 + socket_state state;
132 190 struct sock *sk;
133   - u32 ref;
  191 + u32 portref;
134 192  
  193 + /* Validate arguments */
  194 +
135 195 if (net != &init_net)
136 196 return -EAFNOSUPPORT;
137 197  
138 198 if (unlikely(protocol != 0))
139 199 return -EPROTONOSUPPORT;
140 200  
141   - ref = tipc_createport_raw(NULL, &dispatch, &wakeupdispatch, TIPC_LOW_IMPORTANCE);
142   - if (unlikely(!ref))
143   - return -ENOMEM;
144   -
145   - sock->state = SS_UNCONNECTED;
146   -
147 201 switch (sock->type) {
148 202 case SOCK_STREAM:
149   - sock->ops = &stream_ops;
  203 + ops = &stream_ops;
  204 + state = SS_UNCONNECTED;
150 205 break;
151 206 case SOCK_SEQPACKET:
152   - sock->ops = &packet_ops;
  207 + ops = &packet_ops;
  208 + state = SS_UNCONNECTED;
153 209 break;
154 210 case SOCK_DGRAM:
155   - tipc_set_portunreliable(ref, 1);
156   - /* fall through */
157 211 case SOCK_RDM:
158   - tipc_set_portunreturnable(ref, 1);
159   - sock->ops = &msg_ops;
160   - sock->state = SS_READY;
  212 + ops = &msg_ops;
  213 + state = SS_READY;
161 214 break;
162 215 default:
163   - tipc_deleteport(ref);
164 216 return -EPROTOTYPE;
165 217 }
166 218  
  219 + /* Allocate socket's protocol area */
  220 +
167 221 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
168   - if (!sk) {
169   - tipc_deleteport(ref);
  222 + if (sk == NULL)
170 223 return -ENOMEM;
171   - }
172 224  
173   - sock_init_data(sock, sk);
174   - sk->sk_rcvtimeo = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
  225 + /* Allocate TIPC port for socket to use */
175 226  
176   - tsock = tipc_sk(sk);
177   - port = tipc_get_port(ref);
  227 + portref = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
  228 + TIPC_LOW_IMPORTANCE);
  229 + if (unlikely(portref == 0)) {
  230 + sk_free(sk);
  231 + return -ENOMEM;
  232 + }
178 233  
179   - tsock->p = port;
180   - port->usr_handle = tsock;
  234 + /* Finish initializing socket data structures */
181 235  
182   - mutex_init(&tsock->lock);
  236 + sock->ops = ops;
  237 + sock->state = state;
183 238  
184   - dbg("sock_create: %x\n",tsock);
  239 + sock_init_data(sock, sk);
  240 + sk->sk_rcvtimeo = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
  241 + sk->sk_backlog_rcv = backlog_rcv;
  242 + tipc_sk(sk)->p = tipc_get_port(portref);
185 243  
186   - atomic_inc(&tipc_user_count);
  244 + if (sock->state == SS_READY) {
  245 + tipc_set_portunreturnable(portref, 1);
  246 + if (sock->type == SOCK_DGRAM)
  247 + tipc_set_portunreliable(portref, 1);
  248 + }
187 249  
  250 + atomic_inc(&tipc_user_count);
188 251 return 0;
189 252 }
190 253  
191 254  
192 255  
193 256  
194 257  
195 258  
196 259  
197 260  
198 261  
199 262  
200 263  
201 264  
202 265  
203 266  
204 267  
205 268  
206 269  
207 270  
... ... @@ -207,52 +270,62 @@
207 270  
208 271 static int release(struct socket *sock)
209 272 {
210   - struct tipc_sock *tsock = tipc_sk(sock->sk);
211 273 struct sock *sk = sock->sk;
212   - int res = TIPC_OK;
  274 + struct tipc_port *tport;
213 275 struct sk_buff *buf;
  276 + int res;
214 277  
215   - dbg("sock_delete: %x\n",tsock);
216   - if (!tsock)
  278 + /*
  279 + * Exit if socket isn't fully initialized (occurs when a failed accept()
  280 + * releases a pre-allocated child socket that was never used)
  281 + */
  282 +
  283 + if (sk == NULL)
217 284 return 0;
218   - mutex_lock(&tsock->lock);
219   - if (!sock->sk) {
220   - mutex_unlock(&tsock->lock);
221   - return 0;
222   - }
223 285  
224   - /* Reject unreceived messages, unless no longer connected */
  286 + tport = tipc_sk_port(sk);
  287 + lock_sock(sk);
225 288  
  289 + /*
  290 + * Reject all unreceived messages, except on an active connection
  291 + * (which disconnects locally & sends a 'FIN+' to peer)
  292 + */
  293 +
226 294 while (sock->state != SS_DISCONNECTING) {
227   - sock_lock(tsock);
228   - buf = skb_dequeue(&sk->sk_receive_queue);
229   - if (!buf)
230   - tsock->p->usr_handle = NULL;
231   - sock_unlock(tsock);
232   - if (!buf)
  295 + buf = __skb_dequeue(&sk->sk_receive_queue);
  296 + if (buf == NULL)
233 297 break;
  298 + atomic_dec(&tipc_queue_size);
234 299 if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf)))
235 300 buf_discard(buf);
236   - else
  301 + else {
  302 + if ((sock->state == SS_CONNECTING) ||
  303 + (sock->state == SS_CONNECTED)) {
  304 + sock->state = SS_DISCONNECTING;
  305 + tipc_disconnect(tport->ref);
  306 + }
237 307 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
238   - atomic_dec(&tipc_queue_size);
  308 + }
239 309 }
240 310  
241   - /* Delete TIPC port */
  311 + /*
  312 + * Delete TIPC port; this ensures no more messages are queued
  313 + * (also disconnects an active connection & sends a 'FIN-' to peer)
  314 + */
242 315  
243   - res = tipc_deleteport(tsock->p->ref);
244   - sock->sk = NULL;
  316 + res = tipc_deleteport(tport->ref);
245 317  
246   - /* Discard any remaining messages */
  318 + /* Discard any remaining (connection-based) messages in receive queue */
247 319  
248   - while ((buf = skb_dequeue(&sk->sk_receive_queue))) {
249   - buf_discard(buf);
250   - atomic_dec(&tipc_queue_size);
251   - }
  320 + discard_rx_queue(sk);
252 321  
253   - mutex_unlock(&tsock->lock);
  322 + /* Reject any messages that accumulated in backlog queue */
254 323  
  324 + sock->state = SS_DISCONNECTING;
  325 + release_sock(sk);
  326 +
255 327 sock_put(sk);
  328 + sock->sk = NULL;
256 329  
257 330 atomic_dec(&tipc_user_count);
258 331 return res;
259 332  
260 333  
261 334  
262 335  
263 336  
264 337  
265 338  
... ... @@ -269,47 +342,32 @@
269 342 * (i.e. a socket address length of 0) unbinds all names from the socket.
270 343 *
271 344 * Returns 0 on success, errno otherwise
  345 + *
  346 + * NOTE: This routine doesn't need to take the socket lock since it doesn't
  347 + * access any non-constant socket information.
272 348 */
273 349  
274 350 static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
275 351 {
276   - struct tipc_sock *tsock = tipc_sk(sock->sk);
277 352 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
278   - int res;
  353 + u32 portref = tipc_sk_port(sock->sk)->ref;
279 354  
280   - if (mutex_lock_interruptible(&tsock->lock))
281   - return -ERESTARTSYS;
  355 + if (unlikely(!uaddr_len))
  356 + return tipc_withdraw(portref, 0, NULL);
282 357  
283   - if (unlikely(!uaddr_len)) {
284   - res = tipc_withdraw(tsock->p->ref, 0, NULL);
285   - goto exit;
286   - }
  358 + if (uaddr_len < sizeof(struct sockaddr_tipc))
  359 + return -EINVAL;
  360 + if (addr->family != AF_TIPC)
  361 + return -EAFNOSUPPORT;
287 362  
288   - if (uaddr_len < sizeof(struct sockaddr_tipc)) {
289   - res = -EINVAL;
290   - goto exit;
291   - }
292   -
293   - if (addr->family != AF_TIPC) {
294   - res = -EAFNOSUPPORT;
295   - goto exit;
296   - }
297 363 if (addr->addrtype == TIPC_ADDR_NAME)
298 364 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
299   - else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
300   - res = -EAFNOSUPPORT;
301   - goto exit;
302   - }
  365 + else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
  366 + return -EAFNOSUPPORT;
303 367  
304   - if (addr->scope > 0)
305   - res = tipc_publish(tsock->p->ref, addr->scope,
306   - &addr->addr.nameseq);
307   - else
308   - res = tipc_withdraw(tsock->p->ref, -addr->scope,
309   - &addr->addr.nameseq);
310   -exit:
311   - mutex_unlock(&tsock->lock);
312   - return res;
  368 + return (addr->scope > 0) ?
  369 + tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
  370 + tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
313 371 }
314 372  
315 373 /**
316 374  
317 375  
318 376  
319 377  
320 378  
... ... @@ -320,30 +378,33 @@
320 378 * @peer: 0 to obtain socket name, 1 to obtain peer socket name
321 379 *
322 380 * Returns 0 on success, errno otherwise
  381 + *
  382 + * NOTE: This routine doesn't need to take the socket lock since it doesn't
  383 + * access any non-constant socket information.
323 384 */
324 385  
325 386 static int get_name(struct socket *sock, struct sockaddr *uaddr,
326 387 int *uaddr_len, int peer)
327 388 {
328   - struct tipc_sock *tsock = tipc_sk(sock->sk);
329 389 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
  390 + u32 portref = tipc_sk_port(sock->sk)->ref;
330 391 u32 res;
331 392  
332   - if (mutex_lock_interruptible(&tsock->lock))
333   - return -ERESTARTSYS;
  393 + if (peer) {
  394 + res = tipc_peer(portref, &addr->addr.id);
  395 + if (res)
  396 + return res;
  397 + } else {
  398 + tipc_ownidentity(portref, &addr->addr.id);
  399 + }
334 400  
335 401 *uaddr_len = sizeof(*addr);
336 402 addr->addrtype = TIPC_ADDR_ID;
337 403 addr->family = AF_TIPC;
338 404 addr->scope = 0;
339   - if (peer)
340   - res = tipc_peer(tsock->p->ref, &addr->addr.id);
341   - else
342   - res = tipc_ownidentity(tsock->p->ref, &addr->addr.id);
343 405 addr->addr.name.domain = 0;
344 406  
345   - mutex_unlock(&tsock->lock);
346   - return res;
  407 + return 0;
347 408 }
348 409  
349 410 /**
... ... @@ -414,7 +475,6 @@
414 475 return 0;
415 476 if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
416 477 return 0;
417   -
418 478 if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
419 479 return -EACCES;
420 480  
... ... @@ -428,7 +488,7 @@
428 488  
429 489 /**
430 490 * send_msg - send message in connectionless manner
431   - * @iocb: (unused)
  491 + * @iocb: if NULL, indicates that socket lock is already held
432 492 * @sock: socket structure
433 493 * @m: message to send
434 494 * @total_len: length of message
435 495  
... ... @@ -444,9 +504,9 @@
444 504 static int send_msg(struct kiocb *iocb, struct socket *sock,
445 505 struct msghdr *m, size_t total_len)
446 506 {
447   - struct tipc_sock *tsock = tipc_sk(sock->sk);
  507 + struct sock *sk = sock->sk;
  508 + struct tipc_port *tport = tipc_sk_port(sk);
448 509 struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
449   - struct sk_buff *buf;
450 510 int needs_conn;
451 511 int res = -EINVAL;
452 512  
453 513  
454 514  
455 515  
456 516  
457 517  
458 518  
459 519  
... ... @@ -456,48 +516,46 @@
456 516 (dest->family != AF_TIPC)))
457 517 return -EINVAL;
458 518  
  519 + if (iocb)
  520 + lock_sock(sk);
  521 +
459 522 needs_conn = (sock->state != SS_READY);
460 523 if (unlikely(needs_conn)) {
461   - if (sock->state == SS_LISTENING)
462   - return -EPIPE;
463   - if (sock->state != SS_UNCONNECTED)
464   - return -EISCONN;
465   - if ((tsock->p->published) ||
466   - ((sock->type == SOCK_STREAM) && (total_len != 0)))
467   - return -EOPNOTSUPP;
  524 + if (sock->state == SS_LISTENING) {
  525 + res = -EPIPE;
  526 + goto exit;
  527 + }
  528 + if (sock->state != SS_UNCONNECTED) {
  529 + res = -EISCONN;
  530 + goto exit;
  531 + }
  532 + if ((tport->published) ||
  533 + ((sock->type == SOCK_STREAM) && (total_len != 0))) {
  534 + res = -EOPNOTSUPP;
  535 + goto exit;
  536 + }
468 537 if (dest->addrtype == TIPC_ADDR_NAME) {
469   - tsock->p->conn_type = dest->addr.name.name.type;
470   - tsock->p->conn_instance = dest->addr.name.name.instance;
  538 + tport->conn_type = dest->addr.name.name.type;
  539 + tport->conn_instance = dest->addr.name.name.instance;
471 540 }
472   - }
473 541  
474   - if (mutex_lock_interruptible(&tsock->lock))
475   - return -ERESTARTSYS;
476   -
477   - if (needs_conn) {
478   -
479 542 /* Abort any pending connection attempts (very unlikely) */
480 543  
481   - while ((buf = skb_dequeue(&sock->sk->sk_receive_queue))) {
482   - tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
483   - atomic_dec(&tipc_queue_size);
484   - }
485   -
486   - sock->state = SS_CONNECTING;
  544 + reject_rx_queue(sk);
487 545 }
488 546  
489 547 do {
490 548 if (dest->addrtype == TIPC_ADDR_NAME) {
491 549 if ((res = dest_name_check(dest, m)))
492   - goto exit;
493   - res = tipc_send2name(tsock->p->ref,
  550 + break;
  551 + res = tipc_send2name(tport->ref,
494 552 &dest->addr.name.name,
495 553 dest->addr.name.domain,
496 554 m->msg_iovlen,
497 555 m->msg_iov);
498 556 }
499 557 else if (dest->addrtype == TIPC_ADDR_ID) {
500   - res = tipc_send2port(tsock->p->ref,
  558 + res = tipc_send2port(tport->ref,
501 559 &dest->addr.id,
502 560 m->msg_iovlen,
503 561 m->msg_iov);
504 562  
505 563  
506 564  
507 565  
508 566  
509 567  
... ... @@ -505,36 +563,43 @@
505 563 else if (dest->addrtype == TIPC_ADDR_MCAST) {
506 564 if (needs_conn) {
507 565 res = -EOPNOTSUPP;
508   - goto exit;
  566 + break;
509 567 }
510 568 if ((res = dest_name_check(dest, m)))
511   - goto exit;
512   - res = tipc_multicast(tsock->p->ref,
  569 + break;
  570 + res = tipc_multicast(tport->ref,
513 571 &dest->addr.nameseq,
514 572 0,
515 573 m->msg_iovlen,
516 574 m->msg_iov);
517 575 }
518 576 if (likely(res != -ELINKCONG)) {
519   -exit:
520   - mutex_unlock(&tsock->lock);
521   - return res;
  577 + if (needs_conn && (res >= 0)) {
  578 + sock->state = SS_CONNECTING;
  579 + }
  580 + break;
522 581 }
523 582 if (m->msg_flags & MSG_DONTWAIT) {
524 583 res = -EWOULDBLOCK;
525   - goto exit;
  584 + break;
526 585 }
527   - if (wait_event_interruptible(*sock->sk->sk_sleep,
528   - !tsock->p->congested)) {
529   - res = -ERESTARTSYS;
530   - goto exit;
531   - }
  586 + release_sock(sk);
  587 + res = wait_event_interruptible(*sk->sk_sleep,
  588 + !tport->congested);
  589 + lock_sock(sk);
  590 + if (res)
  591 + break;
532 592 } while (1);
  593 +
  594 +exit:
  595 + if (iocb)
  596 + release_sock(sk);
  597 + return res;
533 598 }
534 599  
535 600 /**
536 601 * send_packet - send a connection-oriented message
537   - * @iocb: (unused)
  602 + * @iocb: if NULL, indicates that socket lock is already held
538 603 * @sock: socket structure
539 604 * @m: message to send
540 605 * @total_len: length of message
... ... @@ -547,7 +612,8 @@
547 612 static int send_packet(struct kiocb *iocb, struct socket *sock,
548 613 struct msghdr *m, size_t total_len)
549 614 {
550   - struct tipc_sock *tsock = tipc_sk(sock->sk);
  615 + struct sock *sk = sock->sk;
  616 + struct tipc_port *tport = tipc_sk_port(sk);
551 617 struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
552 618 int res;
553 619  
... ... @@ -556,9 +622,8 @@
556 622 if (unlikely(dest))
557 623 return send_msg(iocb, sock, m, total_len);
558 624  
559   - if (mutex_lock_interruptible(&tsock->lock)) {
560   - return -ERESTARTSYS;
561   - }
  625 + if (iocb)
  626 + lock_sock(sk);
562 627  
563 628 do {
564 629 if (unlikely(sock->state != SS_CONNECTED)) {
565 630  
566 631  
567 632  
568 633  
569 634  
... ... @@ -566,25 +631,28 @@
566 631 res = -EPIPE;
567 632 else
568 633 res = -ENOTCONN;
569   - goto exit;
  634 + break;
570 635 }
571 636  
572   - res = tipc_send(tsock->p->ref, m->msg_iovlen, m->msg_iov);
  637 + res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov);
573 638 if (likely(res != -ELINKCONG)) {
574   -exit:
575   - mutex_unlock(&tsock->lock);
576   - return res;
  639 + break;
577 640 }
578 641 if (m->msg_flags & MSG_DONTWAIT) {
579 642 res = -EWOULDBLOCK;
580   - goto exit;
  643 + break;
581 644 }
582   - if (wait_event_interruptible(*sock->sk->sk_sleep,
583   - !tsock->p->congested)) {
584   - res = -ERESTARTSYS;
585   - goto exit;
586   - }
  645 + release_sock(sk);
  646 + res = wait_event_interruptible(*sk->sk_sleep,
  647 + (!tport->congested || !tport->connected));
  648 + lock_sock(sk);
  649 + if (res)
  650 + break;
587 651 } while (1);
  652 +
  653 + if (iocb)
  654 + release_sock(sk);
  655 + return res;
588 656 }
589 657  
590 658 /**
591 659  
... ... @@ -600,11 +668,11 @@
600 668 * or errno if no data sent
601 669 */
602 670  
603   -
604 671 static int send_stream(struct kiocb *iocb, struct socket *sock,
605 672 struct msghdr *m, size_t total_len)
606 673 {
607   - struct tipc_port *tport;
  674 + struct sock *sk = sock->sk;
  675 + struct tipc_port *tport = tipc_sk_port(sk);
608 676 struct msghdr my_msg;
609 677 struct iovec my_iov;
610 678 struct iovec *curr_iov;
611 679  
612 680  
... ... @@ -616,19 +684,27 @@
616 684 int bytes_sent;
617 685 int res;
618 686  
  687 + lock_sock(sk);
  688 +
619 689 /* Handle special cases where there is no connection */
620 690  
621 691 if (unlikely(sock->state != SS_CONNECTED)) {
622   - if (sock->state == SS_UNCONNECTED)
623   - return send_packet(iocb, sock, m, total_len);
624   - else if (sock->state == SS_DISCONNECTING)
625   - return -EPIPE;
626   - else
627   - return -ENOTCONN;
  692 + if (sock->state == SS_UNCONNECTED) {
  693 + res = send_packet(NULL, sock, m, total_len);
  694 + goto exit;
  695 + } else if (sock->state == SS_DISCONNECTING) {
  696 + res = -EPIPE;
  697 + goto exit;
  698 + } else {
  699 + res = -ENOTCONN;
  700 + goto exit;
  701 + }
628 702 }
629 703  
630   - if (unlikely(m->msg_name))
631   - return -EISCONN;
  704 + if (unlikely(m->msg_name)) {
  705 + res = -EISCONN;
  706 + goto exit;
  707 + }
632 708  
633 709 /*
634 710 * Send each iovec entry using one or more messages
... ... @@ -646,7 +722,6 @@
646 722 my_msg.msg_name = NULL;
647 723 bytes_sent = 0;
648 724  
649   - tport = tipc_sk(sock->sk)->p;
650 725 hdr_size = msg_hdr_sz(&tport->phdr);
651 726  
652 727 while (curr_iovlen--) {
653 728  
... ... @@ -661,10 +736,10 @@
661 736 bytes_to_send = curr_left;
662 737 my_iov.iov_base = curr_start;
663 738 my_iov.iov_len = bytes_to_send;
664   - if ((res = send_packet(iocb, sock, &my_msg, 0)) < 0) {
665   - if (bytes_sent != 0)
  739 + if ((res = send_packet(NULL, sock, &my_msg, 0)) < 0) {
  740 + if (bytes_sent)
666 741 res = bytes_sent;
667   - return res;
  742 + goto exit;
668 743 }
669 744 curr_left -= bytes_to_send;
670 745 curr_start += bytes_to_send;
671 746  
672 747  
673 748  
... ... @@ -673,22 +748,23 @@
673 748  
674 749 curr_iov++;
675 750 }
676   -
677   - return bytes_sent;
  751 + res = bytes_sent;
  752 +exit:
  753 + release_sock(sk);
  754 + return res;
678 755 }
679 756  
680 757 /**
681 758 * auto_connect - complete connection setup to a remote port
682 759 * @sock: socket structure
683   - * @tsock: TIPC-specific socket structure
684 760 * @msg: peer's response message
685 761 *
686 762 * Returns 0 on success, errno otherwise
687 763 */
688 764  
689   -static int auto_connect(struct socket *sock, struct tipc_sock *tsock,
690   - struct tipc_msg *msg)
  765 +static int auto_connect(struct socket *sock, struct tipc_msg *msg)
691 766 {
  767 + struct tipc_port *tport = tipc_sk_port(sock->sk);
692 768 struct tipc_portid peer;
693 769  
694 770 if (msg_errcode(msg)) {
... ... @@ -698,8 +774,8 @@
698 774  
699 775 peer.ref = msg_origport(msg);
700 776 peer.node = msg_orignode(msg);
701   - tipc_connect2port(tsock->p->ref, &peer);
702   - tipc_set_portimportance(tsock->p->ref, msg_importance(msg));
  777 + tipc_connect2port(tport->ref, &peer);
  778 + tipc_set_portimportance(tport->ref, msg_importance(msg));
703 779 sock->state = SS_CONNECTED;
704 780 return 0;
705 781 }
706 782  
707 783  
708 784  
709 785  
710 786  
711 787  
712 788  
713 789  
714 790  
715 791  
716 792  
... ... @@ -812,62 +888,54 @@
812 888 static int recv_msg(struct kiocb *iocb, struct socket *sock,
813 889 struct msghdr *m, size_t buf_len, int flags)
814 890 {
815   - struct tipc_sock *tsock = tipc_sk(sock->sk);
  891 + struct sock *sk = sock->sk;
  892 + struct tipc_port *tport = tipc_sk_port(sk);
816 893 struct sk_buff *buf;
817 894 struct tipc_msg *msg;
818   - unsigned int q_len;
819 895 unsigned int sz;
820 896 u32 err;
821 897 int res;
822 898  
823   - /* Currently doesn't support receiving into multiple iovec entries */
  899 + /* Catch invalid receive requests */
824 900  
825 901 if (m->msg_iovlen != 1)
826   - return -EOPNOTSUPP;
  902 + return -EOPNOTSUPP; /* Don't do multiple iovec entries yet */
827 903  
828   - /* Catch invalid receive attempts */
829   -
830 904 if (unlikely(!buf_len))
831 905 return -EINVAL;
832 906  
833   - if (sock->type == SOCK_SEQPACKET) {
834   - if (unlikely(sock->state == SS_UNCONNECTED))
835   - return -ENOTCONN;
836   - if (unlikely((sock->state == SS_DISCONNECTING) &&
837   - (skb_queue_len(&sock->sk->sk_receive_queue) == 0)))
838   - return -ENOTCONN;
839   - }
  907 + lock_sock(sk);
840 908  
841   - /* Look for a message in receive queue; wait if necessary */
842   -
843   - if (unlikely(mutex_lock_interruptible(&tsock->lock)))
844   - return -ERESTARTSYS;
845   -
846   -restart:
847   - if (unlikely((skb_queue_len(&sock->sk->sk_receive_queue) == 0) &&
848   - (flags & MSG_DONTWAIT))) {
849   - res = -EWOULDBLOCK;
  909 + if (unlikely(sock->state == SS_UNCONNECTED)) {
  910 + res = -ENOTCONN;
850 911 goto exit;
851 912 }
852 913  
853   - if ((res = wait_event_interruptible(
854   - *sock->sk->sk_sleep,
855   - ((q_len = skb_queue_len(&sock->sk->sk_receive_queue)) ||
856   - (sock->state == SS_DISCONNECTING))) )) {
857   - goto exit;
858   - }
  914 +restart:
859 915  
860   - /* Catch attempt to receive on an already terminated connection */
861   - /* [THIS CHECK MAY OVERLAP WITH AN EARLIER CHECK] */
  916 + /* Look for a message in receive queue; wait if necessary */
862 917  
863   - if (!q_len) {
864   - res = -ENOTCONN;
865   - goto exit;
  918 + while (skb_queue_empty(&sk->sk_receive_queue)) {
  919 + if (sock->state == SS_DISCONNECTING) {
  920 + res = -ENOTCONN;
  921 + goto exit;
  922 + }
  923 + if (flags & MSG_DONTWAIT) {
  924 + res = -EWOULDBLOCK;
  925 + goto exit;
  926 + }
  927 + release_sock(sk);
  928 + res = wait_event_interruptible(*sk->sk_sleep,
  929 + (!skb_queue_empty(&sk->sk_receive_queue) ||
  930 + (sock->state == SS_DISCONNECTING)));
  931 + lock_sock(sk);
  932 + if (res)
  933 + goto exit;
866 934 }
867 935  
868   - /* Get access to first message in receive queue */
  936 + /* Look at first message in receive queue */
869 937  
870   - buf = skb_peek(&sock->sk->sk_receive_queue);
  938 + buf = skb_peek(&sk->sk_receive_queue);
871 939 msg = buf_msg(buf);
872 940 sz = msg_data_sz(msg);
873 941 err = msg_errcode(msg);
874 942  
... ... @@ -875,14 +943,15 @@
875 943 /* Complete connection setup for an implied connect */
876 944  
877 945 if (unlikely(sock->state == SS_CONNECTING)) {
878   - if ((res = auto_connect(sock, tsock, msg)))
  946 + res = auto_connect(sock, msg);
  947 + if (res)
879 948 goto exit;
880 949 }
881 950  
882 951 /* Discard an empty non-errored message & try again */
883 952  
884 953 if ((!sz) && (!err)) {
885   - advance_queue(tsock);
  954 + advance_rx_queue(sk);
886 955 goto restart;
887 956 }
888 957  
... ... @@ -892,7 +961,8 @@
892 961  
893 962 /* Capture ancillary data (optional) */
894 963  
895   - if ((res = anc_data_recv(m, msg, tsock->p)))
  964 + res = anc_data_recv(m, msg, tport);
  965 + if (res)
896 966 goto exit;
897 967  
898 968 /* Capture message data (if valid) & compute return value (always) */
899 969  
... ... @@ -920,12 +990,12 @@
920 990  
921 991 if (likely(!(flags & MSG_PEEK))) {
922 992 if ((sock->state != SS_READY) &&
923   - (++tsock->p->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
924   - tipc_acknowledge(tsock->p->ref, tsock->p->conn_unacked);
925   - advance_queue(tsock);
  993 + (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
  994 + tipc_acknowledge(tport->ref, tport->conn_unacked);
  995 + advance_rx_queue(sk);
926 996 }
927 997 exit:
928   - mutex_unlock(&tsock->lock);
  998 + release_sock(sk);
929 999 return res;
930 1000 }
931 1001  
932 1002  
... ... @@ -945,10 +1015,10 @@
945 1015 static int recv_stream(struct kiocb *iocb, struct socket *sock,
946 1016 struct msghdr *m, size_t buf_len, int flags)
947 1017 {
948   - struct tipc_sock *tsock = tipc_sk(sock->sk);
  1018 + struct sock *sk = sock->sk;
  1019 + struct tipc_port *tport = tipc_sk_port(sk);
949 1020 struct sk_buff *buf;
950 1021 struct tipc_msg *msg;
951   - unsigned int q_len;
952 1022 unsigned int sz;
953 1023 int sz_to_copy;
954 1024 int sz_copied = 0;
955 1025  
956 1026  
957 1027  
958 1028  
959 1029  
960 1030  
961 1031  
962 1032  
963 1033  
964 1034  
... ... @@ -956,54 +1026,49 @@
956 1026 char __user *crs = m->msg_iov->iov_base;
957 1027 unsigned char *buf_crs;
958 1028 u32 err;
959   - int res;
  1029 + int res = 0;
960 1030  
961   - /* Currently doesn't support receiving into multiple iovec entries */
  1031 + /* Catch invalid receive attempts */
962 1032  
963 1033 if (m->msg_iovlen != 1)
964   - return -EOPNOTSUPP;
  1034 + return -EOPNOTSUPP; /* Don't do multiple iovec entries yet */
965 1035  
966   - /* Catch invalid receive attempts */
967   -
968 1036 if (unlikely(!buf_len))
969 1037 return -EINVAL;
970 1038  
971   - if (unlikely(sock->state == SS_DISCONNECTING)) {
972   - if (skb_queue_len(&sock->sk->sk_receive_queue) == 0)
973   - return -ENOTCONN;
974   - } else if (unlikely(sock->state != SS_CONNECTED))
975   - return -ENOTCONN;
  1039 + lock_sock(sk);
976 1040  
977   - /* Look for a message in receive queue; wait if necessary */
978   -
979   - if (unlikely(mutex_lock_interruptible(&tsock->lock)))
980   - return -ERESTARTSYS;
981   -
982   -restart:
983   - if (unlikely((skb_queue_len(&sock->sk->sk_receive_queue) == 0) &&
984   - (flags & MSG_DONTWAIT))) {
985   - res = -EWOULDBLOCK;
  1041 + if (unlikely((sock->state == SS_UNCONNECTED) ||
  1042 + (sock->state == SS_CONNECTING))) {
  1043 + res = -ENOTCONN;
986 1044 goto exit;
987 1045 }
988 1046  
989   - if ((res = wait_event_interruptible(
990   - *sock->sk->sk_sleep,
991   - ((q_len = skb_queue_len(&sock->sk->sk_receive_queue)) ||
992   - (sock->state == SS_DISCONNECTING))) )) {
993   - goto exit;
994   - }
  1047 +restart:
995 1048  
996   - /* Catch attempt to receive on an already terminated connection */
997   - /* [THIS CHECK MAY OVERLAP WITH AN EARLIER CHECK] */
  1049 + /* Look for a message in receive queue; wait if necessary */
998 1050  
999   - if (!q_len) {
1000   - res = -ENOTCONN;
1001   - goto exit;
  1051 + while (skb_queue_empty(&sk->sk_receive_queue)) {
  1052 + if (sock->state == SS_DISCONNECTING) {
  1053 + res = -ENOTCONN;
  1054 + goto exit;
  1055 + }
  1056 + if (flags & MSG_DONTWAIT) {
  1057 + res = -EWOULDBLOCK;
  1058 + goto exit;
  1059 + }
  1060 + release_sock(sk);
  1061 + res = wait_event_interruptible(*sk->sk_sleep,
  1062 + (!skb_queue_empty(&sk->sk_receive_queue) ||
  1063 + (sock->state == SS_DISCONNECTING)));
  1064 + lock_sock(sk);
  1065 + if (res)
  1066 + goto exit;
1002 1067 }
1003 1068  
1004   - /* Get access to first message in receive queue */
  1069 + /* Look at first message in receive queue */
1005 1070  
1006   - buf = skb_peek(&sock->sk->sk_receive_queue);
  1071 + buf = skb_peek(&sk->sk_receive_queue);
1007 1072 msg = buf_msg(buf);
1008 1073 sz = msg_data_sz(msg);
1009 1074 err = msg_errcode(msg);
... ... @@ -1011,7 +1076,7 @@
1011 1076 /* Discard an empty non-errored message & try again */
1012 1077  
1013 1078 if ((!sz) && (!err)) {
1014   - advance_queue(tsock);
  1079 + advance_rx_queue(sk);
1015 1080 goto restart;
1016 1081 }
1017 1082  
... ... @@ -1019,7 +1084,8 @@
1019 1084  
1020 1085 if (sz_copied == 0) {
1021 1086 set_orig_addr(m, msg);
1022   - if ((res = anc_data_recv(m, msg, tsock->p)))
  1087 + res = anc_data_recv(m, msg, tport);
  1088 + if (res)
1023 1089 goto exit;
1024 1090 }
1025 1091  
... ... @@ -1057,9 +1123,9 @@
1057 1123 /* Consume received message (optional) */
1058 1124  
1059 1125 if (likely(!(flags & MSG_PEEK))) {
1060   - if (unlikely(++tsock->p->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1061   - tipc_acknowledge(tsock->p->ref, tsock->p->conn_unacked);
1062   - advance_queue(tsock);
  1126 + if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
  1127 + tipc_acknowledge(tport->ref, tport->conn_unacked);
  1128 + advance_rx_queue(sk);
1063 1129 }
1064 1130  
1065 1131 /* Loop around if more data is required */
... ... @@ -1074,7 +1140,7 @@
1074 1140 goto restart;
1075 1141  
1076 1142 exit:
1077   - mutex_unlock(&tsock->lock);
  1143 + release_sock(sk);
1078 1144 return sz_copied ? sz_copied : res;
1079 1145 }
1080 1146  
1081 1147  
1082 1148  
1083 1149  
1084 1150  
1085 1151  
1086 1152  
... ... @@ -1108,37 +1174,24 @@
1108 1174 }
1109 1175  
1110 1176 /**
1111   - * async_disconnect - wrapper function used to disconnect port
1112   - * @portref: TIPC port reference (passed as pointer-sized value)
1113   - */
1114   -
1115   -static void async_disconnect(unsigned long portref)
1116   -{
1117   - tipc_disconnect((u32)portref);
1118   -}
1119   -
1120   -/**
1121   - * dispatch - handle arriving message
1122   - * @tport: TIPC port that received message
  1177 + * filter_rcv - validate incoming message
  1178 + * @sk: socket
1123 1179 * @buf: message
1124 1180 *
1125   - * Called with port locked. Must not take socket lock to avoid deadlock risk.
  1181 + * Enqueues message on receive queue if acceptable; optionally handles
  1182 + * disconnect indication for a connected socket.
1126 1183 *
  1184 + * Called with socket lock already taken; port lock may also be taken.
  1185 + *
1127 1186 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1128 1187 */
1129 1188  
1130   -static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
  1189 +static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1131 1190 {
  1191 + struct socket *sock = sk->sk_socket;
1132 1192 struct tipc_msg *msg = buf_msg(buf);
1133   - struct tipc_sock *tsock = (struct tipc_sock *)tport->usr_handle;
1134   - struct socket *sock;
1135 1193 u32 recv_q_len;
1136 1194  
1137   - /* Reject message if socket is closing */
1138   -
1139   - if (!tsock)
1140   - return TIPC_ERR_NO_PORT;
1141   -
1142 1195 /* Reject message if it is wrong sort of message for socket */
1143 1196  
1144 1197 /*
... ... @@ -1146,7 +1199,7 @@
1146 1199 * "NO PORT" ISN'T REALLY THE RIGHT ERROR CODE, AND THERE MAY
1147 1200 * BE SECURITY IMPLICATIONS INHERENT IN REJECTING INVALID TRAFFIC
1148 1201 */
1149   - sock = tsock->sk.sk_socket;
  1202 +
1150 1203 if (sock->state == SS_READY) {
1151 1204 if (msg_connected(msg)) {
1152 1205 msg_dbg(msg, "dispatch filter 1\n");
1153 1206  
1154 1207  
1155 1208  
1156 1209  
1157 1210  
1158 1211  
1159 1212  
1160 1213  
1161 1214  
... ... @@ -1194,45 +1247,98 @@
1194 1247 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1195 1248 return TIPC_ERR_OVERLOAD;
1196 1249 }
1197   - recv_q_len = skb_queue_len(&tsock->sk.sk_receive_queue);
  1250 + recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1198 1251 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1199 1252 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1200 1253 return TIPC_ERR_OVERLOAD;
1201 1254 }
1202 1255  
  1256 + /* Enqueue message (finally!) */
  1257 +
  1258 + msg_dbg(msg, "<DISP<: ");
  1259 + TIPC_SKB_CB(buf)->handle = msg_data(msg);
  1260 + atomic_inc(&tipc_queue_size);
  1261 + __skb_queue_tail(&sk->sk_receive_queue, buf);
  1262 +
1203 1263 /* Initiate connection termination for an incoming 'FIN' */
1204 1264  
1205 1265 if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1206 1266 sock->state = SS_DISCONNECTING;
1207   - /* Note: Use signal since port lock is already taken! */
1208   - tipc_k_signal((Handler)async_disconnect, tport->ref);
  1267 + tipc_disconnect_port(tipc_sk_port(sk));
1209 1268 }
1210 1269  
1211   - /* Enqueue message (finally!) */
  1270 + if (waitqueue_active(sk->sk_sleep))
  1271 + wake_up_interruptible(sk->sk_sleep);
  1272 + return TIPC_OK;
  1273 +}
1212 1274  
1213   - msg_dbg(msg,"<DISP<: ");
1214   - TIPC_SKB_CB(buf)->handle = msg_data(msg);
1215   - atomic_inc(&tipc_queue_size);
1216   - skb_queue_tail(&sock->sk->sk_receive_queue, buf);
  1275 +/**
  1276 + * backlog_rcv - handle incoming message from backlog queue
  1277 + * @sk: socket
  1278 + * @buf: message
  1279 + *
  1280 + * Caller must hold socket lock, but not port lock.
  1281 + *
  1282 + * Returns 0
  1283 + */
1217 1284  
1218   - if (waitqueue_active(sock->sk->sk_sleep))
1219   - wake_up_interruptible(sock->sk->sk_sleep);
1220   - return TIPC_OK;
  1285 +static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
  1286 +{
  1287 + u32 res;
  1288 +
  1289 + res = filter_rcv(sk, buf);
  1290 + if (res)
  1291 + tipc_reject_msg(buf, res);
  1292 + return 0;
1221 1293 }
1222 1294  
1223 1295 /**
  1296 + * dispatch - handle incoming message
  1297 + * @tport: TIPC port that received message
  1298 + * @buf: message
  1299 + *
  1300 + * Called with port lock already taken.
  1301 + *
  1302 + * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
  1303 + */
  1304 +
  1305 +static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
  1306 +{
  1307 + struct sock *sk = (struct sock *)tport->usr_handle;
  1308 + u32 res;
  1309 +
  1310 + /*
  1311 + * Process message if socket is unlocked; otherwise add to backlog queue
  1312 + *
  1313 + * This code is based on sk_receive_skb(), but must be distinct from it
  1314 + * since a TIPC-specific filter/reject mechanism is utilized
  1315 + */
  1316 +
  1317 + bh_lock_sock(sk);
  1318 + if (!sock_owned_by_user(sk)) {
  1319 + res = filter_rcv(sk, buf);
  1320 + } else {
  1321 + sk_add_backlog(sk, buf);
  1322 + res = TIPC_OK;
  1323 + }
  1324 + bh_unlock_sock(sk);
  1325 +
  1326 + return res;
  1327 +}
  1328 +
  1329 +/**
1224 1330 * wakeupdispatch - wake up port after congestion
1225 1331 * @tport: port to wakeup
1226 1332 *
1227   - * Called with port lock on.
  1333 + * Called with port lock already taken.
1228 1334 */
1229 1335  
1230 1336 static void wakeupdispatch(struct tipc_port *tport)
1231 1337 {
1232   - struct tipc_sock *tsock = (struct tipc_sock *)tport->usr_handle;
  1338 + struct sock *sk = (struct sock *)tport->usr_handle;
1233 1339  
1234   - if (waitqueue_active(tsock->sk.sk_sleep))
1235   - wake_up_interruptible(tsock->sk.sk_sleep);
  1340 + if (waitqueue_active(sk->sk_sleep))
  1341 + wake_up_interruptible(sk->sk_sleep);
1236 1342 }
1237 1343  
1238 1344 /**
... ... @@ -1240,7 +1346,7 @@
1240 1346 * @sock: socket structure
1241 1347 * @dest: socket address for destination port
1242 1348 * @destlen: size of socket address data structure
1243   - * @flags: (unused)
  1349 + * @flags: file-related flags associated with socket
1244 1350 *
1245 1351 * Returns 0 on success, errno otherwise
1246 1352 */
1247 1353  
1248 1354  
1249 1355  
1250 1356  
... ... @@ -1248,31 +1354,43 @@
1248 1354 static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1249 1355 int flags)
1250 1356 {
1251   - struct tipc_sock *tsock = tipc_sk(sock->sk);
  1357 + struct sock *sk = sock->sk;
1252 1358 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1253 1359 struct msghdr m = {NULL,};
1254 1360 struct sk_buff *buf;
1255 1361 struct tipc_msg *msg;
1256 1362 int res;
1257 1363  
  1364 + lock_sock(sk);
  1365 +
1258 1366 /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1259 1367  
1260   - if (sock->state == SS_READY)
1261   - return -EOPNOTSUPP;
  1368 + if (sock->state == SS_READY) {
  1369 + res = -EOPNOTSUPP;
  1370 + goto exit;
  1371 + }
1262 1372  
1263 1373 /* For now, TIPC does not support the non-blocking form of connect() */
1264 1374  
1265   - if (flags & O_NONBLOCK)
1266   - return -EWOULDBLOCK;
  1375 + if (flags & O_NONBLOCK) {
  1376 + res = -EWOULDBLOCK;
  1377 + goto exit;
  1378 + }
1267 1379  
1268 1380 /* Issue Posix-compliant error code if socket is in the wrong state */
1269 1381  
1270   - if (sock->state == SS_LISTENING)
1271   - return -EOPNOTSUPP;
1272   - if (sock->state == SS_CONNECTING)
1273   - return -EALREADY;
1274   - if (sock->state != SS_UNCONNECTED)
1275   - return -EISCONN;
  1382 + if (sock->state == SS_LISTENING) {
  1383 + res = -EOPNOTSUPP;
  1384 + goto exit;
  1385 + }
  1386 + if (sock->state == SS_CONNECTING) {
  1387 + res = -EALREADY;
  1388 + goto exit;
  1389 + }
  1390 + if (sock->state != SS_UNCONNECTED) {
  1391 + res = -EISCONN;
  1392 + goto exit;
  1393 + }
1276 1394  
1277 1395 /*
1278 1396 * Reject connection attempt using multicast address
1279 1397  
1280 1398  
1281 1399  
1282 1400  
1283 1401  
1284 1402  
... ... @@ -1281,34 +1399,48 @@
1281 1399 * so there's no need to do it here
1282 1400 */
1283 1401  
1284   - if (dst->addrtype == TIPC_ADDR_MCAST)
1285   - return -EINVAL;
  1402 + if (dst->addrtype == TIPC_ADDR_MCAST) {
  1403 + res = -EINVAL;
  1404 + goto exit;
  1405 + }
1286 1406  
  1407 + /* Reject any messages already in receive queue (very unlikely) */
  1408 +
  1409 + reject_rx_queue(sk);
  1410 +
1287 1411 /* Send a 'SYN-' to destination */
1288 1412  
1289 1413 m.msg_name = dest;
1290 1414 m.msg_namelen = destlen;
1291 1415 res = send_msg(NULL, sock, &m, 0);
1292 1416 if (res < 0) {
1293   - sock->state = SS_DISCONNECTING;
1294   - return res;
  1417 + goto exit;
1295 1418 }
1296 1419  
1297   - if (mutex_lock_interruptible(&tsock->lock))
1298   - return -ERESTARTSYS;
  1420 + /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1299 1421  
1300   - /* Wait for destination's 'ACK' response */
  1422 + release_sock(sk);
  1423 + res = wait_event_interruptible_timeout(*sk->sk_sleep,
  1424 + (!skb_queue_empty(&sk->sk_receive_queue) ||
  1425 + (sock->state != SS_CONNECTING)),
  1426 + sk->sk_rcvtimeo);
  1427 + lock_sock(sk);
1301 1428  
1302   - res = wait_event_interruptible_timeout(*sock->sk->sk_sleep,
1303   - skb_queue_len(&sock->sk->sk_receive_queue),
1304   - sock->sk->sk_rcvtimeo);
1305   - buf = skb_peek(&sock->sk->sk_receive_queue);
1306 1429 if (res > 0) {
1307   - msg = buf_msg(buf);
1308   - res = auto_connect(sock, tsock, msg);
1309   - if (!res) {
1310   - if (!msg_data_sz(msg))
1311   - advance_queue(tsock);
  1430 + buf = skb_peek(&sk->sk_receive_queue);
  1431 + if (buf != NULL) {
  1432 + msg = buf_msg(buf);
  1433 + res = auto_connect(sock, msg);
  1434 + if (!res) {
  1435 + if (!msg_data_sz(msg))
  1436 + advance_rx_queue(sk);
  1437 + }
  1438 + } else {
  1439 + if (sock->state == SS_CONNECTED) {
  1440 + res = -EISCONN;
  1441 + } else {
  1442 + res = -ECONNREFUSED;
  1443 + }
1312 1444 }
1313 1445 } else {
1314 1446 if (res == 0)
... ... @@ -1318,7 +1450,8 @@
1318 1450 sock->state = SS_DISCONNECTING;
1319 1451 }
1320 1452  
1321   - mutex_unlock(&tsock->lock);
  1453 +exit:
  1454 + release_sock(sk);
1322 1455 return res;
1323 1456 }
1324 1457  
1325 1458  
1326 1459  
... ... @@ -1332,14 +1465,22 @@
1332 1465  
1333 1466 static int listen(struct socket *sock, int len)
1334 1467 {
1335   - /* REQUIRES SOCKET LOCKING OF SOME SORT? */
  1468 + struct sock *sk = sock->sk;
  1469 + int res;
1336 1470  
  1471 + lock_sock(sk);
  1472 +
1337 1473 if (sock->state == SS_READY)
1338   - return -EOPNOTSUPP;
1339   - if (sock->state != SS_UNCONNECTED)
1340   - return -EINVAL;
1341   - sock->state = SS_LISTENING;
1342   - return 0;
  1474 + res = -EOPNOTSUPP;
  1475 + else if (sock->state != SS_UNCONNECTED)
  1476 + res = -EINVAL;
  1477 + else {
  1478 + sock->state = SS_LISTENING;
  1479 + res = 0;
  1480 + }
  1481 +
  1482 + release_sock(sk);
  1483 + return res;
1343 1484 }
1344 1485  
1345 1486 /**
1346 1487  
1347 1488  
1348 1489  
1349 1490  
1350 1491  
1351 1492  
1352 1493  
1353 1494  
1354 1495  
1355 1496  
1356 1497  
1357 1498  
... ... @@ -1351,50 +1492,69 @@
1351 1492 * Returns 0 on success, errno otherwise
1352 1493 */
1353 1494  
1354   -static int accept(struct socket *sock, struct socket *newsock, int flags)
  1495 +static int accept(struct socket *sock, struct socket *new_sock, int flags)
1355 1496 {
1356   - struct tipc_sock *tsock = tipc_sk(sock->sk);
  1497 + struct sock *sk = sock->sk;
1357 1498 struct sk_buff *buf;
1358   - int res = -EFAULT;
  1499 + int res;
1359 1500  
1360   - if (sock->state == SS_READY)
1361   - return -EOPNOTSUPP;
1362   - if (sock->state != SS_LISTENING)
1363   - return -EINVAL;
  1501 + lock_sock(sk);
1364 1502  
1365   - if (unlikely((skb_queue_len(&sock->sk->sk_receive_queue) == 0) &&
1366   - (flags & O_NONBLOCK)))
1367   - return -EWOULDBLOCK;
1368   -
1369   - if (mutex_lock_interruptible(&tsock->lock))
1370   - return -ERESTARTSYS;
1371   -
1372   - if (wait_event_interruptible(*sock->sk->sk_sleep,
1373   - skb_queue_len(&sock->sk->sk_receive_queue))) {
1374   - res = -ERESTARTSYS;
  1503 + if (sock->state == SS_READY) {
  1504 + res = -EOPNOTSUPP;
1375 1505 goto exit;
1376 1506 }
1377   - buf = skb_peek(&sock->sk->sk_receive_queue);
  1507 + if (sock->state != SS_LISTENING) {
  1508 + res = -EINVAL;
  1509 + goto exit;
  1510 + }
1378 1511  
1379   - res = tipc_create(sock_net(sock->sk), newsock, 0);
  1512 + while (skb_queue_empty(&sk->sk_receive_queue)) {
  1513 + if (flags & O_NONBLOCK) {
  1514 + res = -EWOULDBLOCK;
  1515 + goto exit;
  1516 + }
  1517 + release_sock(sk);
  1518 + res = wait_event_interruptible(*sk->sk_sleep,
  1519 + (!skb_queue_empty(&sk->sk_receive_queue)));
  1520 + lock_sock(sk);
  1521 + if (res)
  1522 + goto exit;
  1523 + }
  1524 +
  1525 + buf = skb_peek(&sk->sk_receive_queue);
  1526 +
  1527 + res = tipc_create(sock_net(sock->sk), new_sock, 0);
1380 1528 if (!res) {
1381   - struct tipc_sock *new_tsock = tipc_sk(newsock->sk);
  1529 + struct sock *new_sk = new_sock->sk;
  1530 + struct tipc_port *new_tport = tipc_sk_port(new_sk);
  1531 + u32 new_ref = new_tport->ref;
1382 1532 struct tipc_portid id;
1383 1533 struct tipc_msg *msg = buf_msg(buf);
1384   - u32 new_ref = new_tsock->p->ref;
1385 1534  
  1535 + lock_sock(new_sk);
  1536 +
  1537 + /*
  1538 + * Reject any stray messages received by new socket
  1539 + * before the socket lock was taken (very, very unlikely)
  1540 + */
  1541 +
  1542 + reject_rx_queue(new_sk);
  1543 +
  1544 + /* Connect new socket to it's peer */
  1545 +
1386 1546 id.ref = msg_origport(msg);
1387 1547 id.node = msg_orignode(msg);
1388 1548 tipc_connect2port(new_ref, &id);
1389   - newsock->state = SS_CONNECTED;
  1549 + new_sock->state = SS_CONNECTED;
1390 1550  
1391 1551 tipc_set_portimportance(new_ref, msg_importance(msg));
1392 1552 if (msg_named(msg)) {
1393   - new_tsock->p->conn_type = msg_nametype(msg);
1394   - new_tsock->p->conn_instance = msg_nameinst(msg);
  1553 + new_tport->conn_type = msg_nametype(msg);
  1554 + new_tport->conn_instance = msg_nameinst(msg);
1395 1555 }
1396 1556  
1397   - /*
  1557 + /*
1398 1558 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1399 1559 * Respond to 'SYN+' by queuing it on new socket.
1400 1560 */
1401 1561  
1402 1562  
1403 1563  
... ... @@ -1403,17 +1563,16 @@
1403 1563 if (!msg_data_sz(msg)) {
1404 1564 struct msghdr m = {NULL,};
1405 1565  
1406   - send_packet(NULL, newsock, &m, 0);
1407   - advance_queue(tsock);
  1566 + advance_rx_queue(sk);
  1567 + send_packet(NULL, new_sock, &m, 0);
1408 1568 } else {
1409   - sock_lock(tsock);
1410   - skb_dequeue(&sock->sk->sk_receive_queue);
1411   - sock_unlock(tsock);
1412   - skb_queue_head(&newsock->sk->sk_receive_queue, buf);
  1569 + __skb_dequeue(&sk->sk_receive_queue);
  1570 + __skb_queue_head(&new_sk->sk_receive_queue, buf);
1413 1571 }
  1572 + release_sock(new_sk);
1414 1573 }
1415 1574 exit:
1416   - mutex_unlock(&tsock->lock);
  1575 + release_sock(sk);
1417 1576 return res;
1418 1577 }
1419 1578  
1420 1579  
1421 1580  
1422 1581  
1423 1582  
1424 1583  
1425 1584  
1426 1585  
1427 1586  
1428 1587  
1429 1588  
1430 1589  
... ... @@ -1429,54 +1588,46 @@
1429 1588  
1430 1589 static int shutdown(struct socket *sock, int how)
1431 1590 {
1432   - struct tipc_sock* tsock = tipc_sk(sock->sk);
  1591 + struct sock *sk = sock->sk;
  1592 + struct tipc_port *tport = tipc_sk_port(sk);
1433 1593 struct sk_buff *buf;
1434 1594 int res;
1435 1595  
1436 1596 if (how != SHUT_RDWR)
1437 1597 return -EINVAL;
1438 1598  
1439   - if (mutex_lock_interruptible(&tsock->lock))
1440   - return -ERESTARTSYS;
  1599 + lock_sock(sk);
1441 1600  
1442   - sock_lock(tsock);
1443   -
1444 1601 switch (sock->state) {
  1602 + case SS_CONNECTING:
1445 1603 case SS_CONNECTED:
1446 1604  
1447   - /* Send 'FIN+' or 'FIN-' message to peer */
1448   -
1449   - sock_unlock(tsock);
  1605 + /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1450 1606 restart:
1451   - if ((buf = skb_dequeue(&sock->sk->sk_receive_queue))) {
  1607 + buf = __skb_dequeue(&sk->sk_receive_queue);
  1608 + if (buf) {
1452 1609 atomic_dec(&tipc_queue_size);
1453 1610 if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf))) {
1454 1611 buf_discard(buf);
1455 1612 goto restart;
1456 1613 }
  1614 + tipc_disconnect(tport->ref);
1457 1615 tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
  1616 + } else {
  1617 + tipc_shutdown(tport->ref);
1458 1618 }
1459   - else {
1460   - tipc_shutdown(tsock->p->ref);
1461   - }
1462   - sock_lock(tsock);
1463 1619  
  1620 + sock->state = SS_DISCONNECTING;
  1621 +
1464 1622 /* fall through */
1465 1623  
1466 1624 case SS_DISCONNECTING:
1467 1625  
1468   - /* Discard any unreceived messages */
  1626 + /* Discard any unreceived messages; wake up sleeping tasks */
1469 1627  
1470   - while ((buf = skb_dequeue(&sock->sk->sk_receive_queue))) {
1471   - atomic_dec(&tipc_queue_size);
1472   - buf_discard(buf);
1473   - }
1474   - tsock->p->conn_unacked = 0;
1475   -
1476   - /* fall through */
1477   -
1478   - case SS_CONNECTING:
1479   - sock->state = SS_DISCONNECTING;
  1628 + discard_rx_queue(sk);
  1629 + if (waitqueue_active(sk->sk_sleep))
  1630 + wake_up_interruptible(sk->sk_sleep);
1480 1631 res = 0;
1481 1632 break;
1482 1633  
... ... @@ -1484,9 +1635,7 @@
1484 1635 res = -ENOTCONN;
1485 1636 }
1486 1637  
1487   - sock_unlock(tsock);
1488   -
1489   - mutex_unlock(&tsock->lock);
  1638 + release_sock(sk);
1490 1639 return res;
1491 1640 }
1492 1641  
... ... @@ -1507,7 +1656,8 @@
1507 1656 static int setsockopt(struct socket *sock,
1508 1657 int lvl, int opt, char __user *ov, int ol)
1509 1658 {
1510   - struct tipc_sock *tsock = tipc_sk(sock->sk);
  1659 + struct sock *sk = sock->sk;
  1660 + struct tipc_port *tport = tipc_sk_port(sk);
1511 1661 u32 value;
1512 1662 int res;
1513 1663  
1514 1664  
1515 1665  
1516 1666  
1517 1667  
1518 1668  
... ... @@ -1520,30 +1670,31 @@
1520 1670 if ((res = get_user(value, (u32 __user *)ov)))
1521 1671 return res;
1522 1672  
1523   - if (mutex_lock_interruptible(&tsock->lock))
1524   - return -ERESTARTSYS;
  1673 + lock_sock(sk);
1525 1674  
1526 1675 switch (opt) {
1527 1676 case TIPC_IMPORTANCE:
1528   - res = tipc_set_portimportance(tsock->p->ref, value);
  1677 + res = tipc_set_portimportance(tport->ref, value);
1529 1678 break;
1530 1679 case TIPC_SRC_DROPPABLE:
1531 1680 if (sock->type != SOCK_STREAM)
1532   - res = tipc_set_portunreliable(tsock->p->ref, value);
  1681 + res = tipc_set_portunreliable(tport->ref, value);
1533 1682 else
1534 1683 res = -ENOPROTOOPT;
1535 1684 break;
1536 1685 case TIPC_DEST_DROPPABLE:
1537   - res = tipc_set_portunreturnable(tsock->p->ref, value);
  1686 + res = tipc_set_portunreturnable(tport->ref, value);
1538 1687 break;
1539 1688 case TIPC_CONN_TIMEOUT:
1540   - sock->sk->sk_rcvtimeo = msecs_to_jiffies(value);
  1689 + sk->sk_rcvtimeo = msecs_to_jiffies(value);
  1690 + /* no need to set "res", since already 0 at this point */
1541 1691 break;
1542 1692 default:
1543 1693 res = -EINVAL;
1544 1694 }
1545 1695  
1546   - mutex_unlock(&tsock->lock);
  1696 + release_sock(sk);
  1697 +
1547 1698 return res;
1548 1699 }
1549 1700  
... ... @@ -1564,7 +1715,8 @@
1564 1715 static int getsockopt(struct socket *sock,
1565 1716 int lvl, int opt, char __user *ov, int __user *ol)
1566 1717 {
1567   - struct tipc_sock *tsock = tipc_sk(sock->sk);
  1718 + struct sock *sk = sock->sk;
  1719 + struct tipc_port *tport = tipc_sk_port(sk);
1568 1720 int len;
1569 1721 u32 value;
1570 1722 int res;
1571 1723  
1572 1724  
1573 1725  
1574 1726  
1575 1727  
... ... @@ -1576,26 +1728,28 @@
1576 1728 if ((res = get_user(len, ol)))
1577 1729 return res;
1578 1730  
1579   - if (mutex_lock_interruptible(&tsock->lock))
1580   - return -ERESTARTSYS;
  1731 + lock_sock(sk);
1581 1732  
1582 1733 switch (opt) {
1583 1734 case TIPC_IMPORTANCE:
1584   - res = tipc_portimportance(tsock->p->ref, &value);
  1735 + res = tipc_portimportance(tport->ref, &value);
1585 1736 break;
1586 1737 case TIPC_SRC_DROPPABLE:
1587   - res = tipc_portunreliable(tsock->p->ref, &value);
  1738 + res = tipc_portunreliable(tport->ref, &value);
1588 1739 break;
1589 1740 case TIPC_DEST_DROPPABLE:
1590   - res = tipc_portunreturnable(tsock->p->ref, &value);
  1741 + res = tipc_portunreturnable(tport->ref, &value);
1591 1742 break;
1592 1743 case TIPC_CONN_TIMEOUT:
1593   - value = jiffies_to_msecs(sock->sk->sk_rcvtimeo);
  1744 + value = jiffies_to_msecs(sk->sk_rcvtimeo);
  1745 + /* no need to set "res", since already 0 at this point */
1594 1746 break;
1595 1747 default:
1596 1748 res = -EINVAL;
1597 1749 }
1598 1750  
  1751 + release_sock(sk);
  1752 +
1599 1753 if (res) {
1600 1754 /* "get" failed */
1601 1755 }
... ... @@ -1609,7 +1763,6 @@
1609 1763 res = put_user(sizeof(value), ol);
1610 1764 }
1611 1765  
1612   - mutex_unlock(&tsock->lock);
1613 1766 return res;
1614 1767 }
1615 1768  
... ... @@ -1722,6 +1875,7 @@
1722 1875 /**
1723 1876 * tipc_socket_stop - stop TIPC socket interface
1724 1877 */
  1878 +
1725 1879 void tipc_socket_stop(void)
1726 1880 {
1727 1881 if (!sockets_enabled)