Commit ec8a2e5621db2da24badb3969eda7fd359e1869f

Authored by Jon Paul Maloy
Committed by David S. Miller
1 parent b786e2b0fa

tipc: same receive code path for connection protocol and data messages

As a preparation to eliminate port_lock we need to bring reception
of connection protocol messages under proper protection of bh_lock_sock
or socket owner.

We fix this by letting those messages follow the same code path as
incoming data messages.

As a side effect of this change, the last reference to the function
net_route_msg() disappears, and we can eliminate that function.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

Showing 6 changed files with 19 additions and 55 deletions Side-by-side Diff

... ... @@ -1479,6 +1479,7 @@
1479 1479 case TIPC_MEDIUM_IMPORTANCE:
1480 1480 case TIPC_HIGH_IMPORTANCE:
1481 1481 case TIPC_CRITICAL_IMPORTANCE:
  1482 + case CONN_MANAGER:
1482 1483 tipc_node_unlock(n_ptr);
1483 1484 tipc_sk_rcv(buf);
1484 1485 continue;
... ... @@ -1493,10 +1494,6 @@
1493 1494 tipc_node_unlock(n_ptr);
1494 1495 tipc_named_rcv(buf);
1495 1496 continue;
1496   - case CONN_MANAGER:
1497   - tipc_node_unlock(n_ptr);
1498   - tipc_port_proto_rcv(buf);
1499   - continue;
1500 1497 case BCAST_PROTOCOL:
1501 1498 tipc_link_sync_rcv(n_ptr, buf);
1502 1499 break;
... ... @@ -2106,6 +2103,7 @@
2106 2103 u32 msgcount = msg_msgcnt(buf_msg(buf));
2107 2104 u32 pos = INT_H_SIZE;
2108 2105 struct sk_buff *obuf;
  2106 + struct tipc_msg *omsg;
2109 2107  
2110 2108 while (msgcount--) {
2111 2109 obuf = buf_extract(buf, pos);
... ... @@ -2113,8 +2111,16 @@
2113 2111 pr_warn("Link unable to unbundle message(s)\n");
2114 2112 break;
2115 2113 }
2116   - pos += align(msg_size(buf_msg(obuf)));
2117   - tipc_net_route_msg(obuf);
  2114 + omsg = buf_msg(obuf);
  2115 + pos += align(msg_size(omsg));
  2116 + if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
  2117 + tipc_sk_rcv(obuf);
  2118 + } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
  2119 + tipc_named_rcv(obuf);
  2120 + } else {
  2121 + pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
  2122 + kfree_skb(obuf);
  2123 + }
2118 2124 }
2119 2125 kfree_skb(buf);
2120 2126 }
... ... @@ -103,46 +103,6 @@
103 103 * This is always used within the scope of a tipc_nametbl_lock(read).
104 104 * - A local spin_lock protecting the queue of subscriber events.
105 105 */
106   -void tipc_net_route_msg(struct sk_buff *buf)
107   -{
108   - struct tipc_msg *msg;
109   - u32 dnode;
110   -
111   - if (!buf)
112   - return;
113   - msg = buf_msg(buf);
114   -
115   - /* Handle message for this node */
116   - dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg);
117   - if (tipc_in_scope(dnode, tipc_own_addr)) {
118   - if (msg_isdata(msg)) {
119   - if (msg_mcast(msg))
120   - tipc_port_mcast_rcv(buf, NULL);
121   - else if (msg_destport(msg)) {
122   - tipc_sk_rcv(buf);
123   - } else {
124   - pr_warn("Cannot route msg; no destination\n");
125   - kfree_skb(buf);
126   - }
127   - return;
128   - }
129   - switch (msg_user(msg)) {
130   - case NAME_DISTRIBUTOR:
131   - tipc_named_rcv(buf);
132   - break;
133   - case CONN_MANAGER:
134   - tipc_port_proto_rcv(buf);
135   - break;
136   - default:
137   - kfree_skb(buf);
138   - }
139   - return;
140   - }
141   -
142   - /* Handle message for another node */
143   - skb_trim(buf, msg_size(msg));
144   - tipc_link_xmit(buf, dnode, msg_link_selector(msg));
145   -}
146 106  
147 107 int tipc_net_start(u32 addr)
148 108 {
... ... @@ -37,8 +37,6 @@
37 37 #ifndef _TIPC_NET_H
38 38 #define _TIPC_NET_H
39 39  
40   -void tipc_net_route_msg(struct sk_buff *buf);
41   -
42 40 int tipc_net_start(u32 addr);
43 41 void tipc_net_stop(void);
44 42  
... ... @@ -365,16 +365,14 @@
365 365 return buf;
366 366 }
367 367  
368   -void tipc_port_proto_rcv(struct sk_buff *buf)
  368 +void tipc_port_proto_rcv(struct tipc_port *p_ptr, struct sk_buff *buf)
369 369 {
370 370 struct tipc_msg *msg = buf_msg(buf);
371   - struct tipc_port *p_ptr;
372 371 struct sk_buff *r_buf = NULL;
373 372 u32 destport = msg_destport(msg);
374 373 int wakeable;
375 374  
376 375 /* Validate connection */
377   - p_ptr = tipc_port_lock(destport);
378 376 if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
379 377 r_buf = tipc_buf_acquire(BASIC_H_SIZE);
380 378 if (r_buf) {
... ... @@ -385,8 +383,6 @@
385 383 msg_set_origport(msg, destport);
386 384 msg_set_destport(msg, msg_origport(msg));
387 385 }
388   - if (p_ptr)
389   - tipc_port_unlock(p_ptr);
390 386 goto exit;
391 387 }
392 388  
... ... @@ -409,7 +405,6 @@
409 405 break;
410 406 }
411 407 p_ptr->probing_state = CONFIRMED;
412   - tipc_port_unlock(p_ptr);
413 408 exit:
414 409 tipc_link_xmit2(r_buf, msg_destnode(msg), msg_link_selector(msg));
415 410 kfree_skb(buf);
... ... @@ -140,7 +140,7 @@
140 140 unsigned int len);
141 141  
142 142 struct sk_buff *tipc_port_get_ports(void);
143   -void tipc_port_proto_rcv(struct sk_buff *buf);
  143 +void tipc_port_proto_rcv(struct tipc_port *port, struct sk_buff *buf);
144 144 void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
145 145 void tipc_port_reinit(void);
146 146  
... ... @@ -1416,6 +1416,11 @@
1416 1416 unsigned int limit = rcvbuf_limit(sk, buf);
1417 1417 int rc = TIPC_OK;
1418 1418  
  1419 + if (unlikely(msg_user(msg) == CONN_MANAGER)) {
  1420 + tipc_port_proto_rcv(&tsk->port, buf);
  1421 + return TIPC_OK;
  1422 + }
  1423 +
1419 1424 /* Reject message if it is wrong sort of message for socket */
1420 1425 if (msg_type(msg) > TIPC_DIRECT_MSG)
1421 1426 return -TIPC_ERR_NO_PORT;