Commit 8db1bae30b7cd3c3abc05f467d0f7c69b33b80e9

Authored by Jon Paul Maloy
Committed by David S. Miller
1 parent 067608e9d0

tipc: separate building and sending of rejected messages

The way we build and send rejected message is currenty perceived as
hard to follow, partly because we let the transmission go via deep
call chains through functions such as tipc_reject_msg() and
net_route_msg().

We want to remove those functions, and make the call sequences shallower
and simpler. For this purpose, we separate building and sending of
rejected messages. We build the reject message using the new function
tipc_msg_reverse(), and let the transmission go via the newly introduced
tipc_link_xmit2() function, as all transmission eventually will do. We
also ensure that all calls to tipc_link_xmit2() are made outside
port_lock/bh_lock_sock.

Finally, we replace all calls to tipc_reject_msg() with the two new
calls at all locations in the code that we want to keep. The remaining
calls are made from code that we are planning to remove, along with
tipc_reject_msg() itself.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

Showing 3 changed files with 64 additions and 8 deletions Side-by-side Diff

... ... @@ -37,6 +37,8 @@
37 37 #include "core.h"
38 38 #include "msg.h"
39 39  
  40 +#define MAX_FORWARD_SIZE 1024
  41 +
40 42 static unsigned int align(unsigned int i)
41 43 {
42 44 return (i + 3) & ~3u;
... ... @@ -327,5 +329,45 @@
327 329 tipc_msg_bundle(bbuf, *buf, mtu);
328 330 *buf = bbuf;
329 331 return true;
  332 +}
  333 +
  334 +/**
  335 + * tipc_msg_reverse(): swap source and destination addresses and add error code
  336 + * @buf: buffer containing message to be reversed
  337 + * @dnode: return value: node where to send message after reversal
  338 + * @err: error code to be set in message
  339 + * Consumes buffer if failure
  340 + * Returns true if success, otherwise false
  341 + */
  342 +bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err)
  343 +{
  344 + struct tipc_msg *msg = buf_msg(buf);
  345 + uint imp = msg_importance(msg);
  346 + struct tipc_msg ohdr;
  347 + uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
  348 +
  349 + if (skb_linearize(buf) || !msg_isdata(msg))
  350 + goto exit;
  351 + if (msg_dest_droppable(msg) || msg_errcode(msg))
  352 + goto exit;
  353 +
  354 + memcpy(&ohdr, msg, msg_hdr_sz(msg));
  355 + msg_set_importance(msg, min_t(uint, ++imp, TIPC_CRITICAL_IMPORTANCE));
  356 + msg_set_errcode(msg, err);
  357 + msg_set_origport(msg, msg_destport(&ohdr));
  358 + msg_set_destport(msg, msg_origport(&ohdr));
  359 + msg_set_prevnode(msg, tipc_own_addr);
  360 + if (!msg_short(msg)) {
  361 + msg_set_orignode(msg, msg_destnode(&ohdr));
  362 + msg_set_destnode(msg, msg_orignode(&ohdr));
  363 + }
  364 + msg_set_size(msg, msg_hdr_sz(msg) + rdsz);
  365 + skb_trim(buf, msg_size(msg));
  366 + skb_orphan(buf);
  367 + *dnode = msg_orignode(&ohdr);
  368 + return true;
  369 +exit:
  370 + kfree_skb(buf);
  371 + return false;
330 372 }
... ... @@ -725,6 +725,8 @@
725 725 return msg_origport(m);
726 726 }
727 727  
  728 +bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err);
  729 +
728 730 void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
729 731 u32 destnode);
730 732  
... ... @@ -39,6 +39,7 @@
39 39 #include "node.h"
40 40  
41 41 #include <linux/export.h>
  42 +#include "link.h"
42 43  
43 44 #define SS_LISTENING -1 /* socket is listening */
44 45 #define SS_READY -2 /* socket is connectionless */
45 46  
... ... @@ -123,9 +124,12 @@
123 124 static void reject_rx_queue(struct sock *sk)
124 125 {
125 126 struct sk_buff *buf;
  127 + u32 dnode;
126 128  
127   - while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
128   - tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
  129 + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
  130 + if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
  131 + tipc_link_xmit2(buf, dnode, 0);
  132 + }
129 133 }
130 134  
131 135 /**
... ... @@ -303,6 +307,7 @@
303 307 struct tipc_sock *tsk;
304 308 struct tipc_port *port;
305 309 struct sk_buff *buf;
  310 + u32 dnode;
306 311  
307 312 /*
308 313 * Exit if socket isn't fully initialized (occurs when a failed accept()
... ... @@ -331,7 +336,8 @@
331 336 sock->state = SS_DISCONNECTING;
332 337 tipc_port_disconnect(port->ref);
333 338 }
334   - tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
  339 + if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
  340 + tipc_link_xmit2(buf, dnode, 0);
335 341 }
336 342 }
337 343  
338 344  
339 345  
... ... @@ -1430,14 +1436,15 @@
1430 1436 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1431 1437 {
1432 1438 int rc;
  1439 + u32 onode;
1433 1440 struct tipc_sock *tsk = tipc_sk(sk);
1434 1441 uint truesize = buf->truesize;
1435 1442  
1436 1443 rc = filter_rcv(sk, buf);
1437   - if (unlikely(rc))
1438   - tipc_reject_msg(buf, -rc);
1439 1444  
1440   - if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
  1445 + if (unlikely(rc && tipc_msg_reverse(buf, &onode, -rc)))
  1446 + tipc_link_xmit2(buf, onode, 0);
  1447 + else if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1441 1448 atomic_add(truesize, &tsk->dupl_rcvcnt);
1442 1449  
1443 1450 return 0;
... ... @@ -1457,6 +1464,7 @@
1457 1464 u32 dport = msg_destport(buf_msg(buf));
1458 1465 int rc = TIPC_OK;
1459 1466 uint limit;
  1467 + u32 dnode;
1460 1468  
1461 1469 /* Forward unresolved named message */
1462 1470 if (unlikely(!dport)) {
... ... @@ -1493,7 +1501,9 @@
1493 1501 if (likely(!rc))
1494 1502 return 0;
1495 1503 exit:
1496   - tipc_reject_msg(buf, -rc);
  1504 + if (!tipc_msg_reverse(buf, &dnode, -rc))
  1505 + return -EHOSTUNREACH;
  1506 + tipc_link_xmit2(buf, dnode, 0);
1497 1507 return -EHOSTUNREACH;
1498 1508 }
1499 1509  
... ... @@ -1758,6 +1768,7 @@
1758 1768 struct tipc_sock *tsk = tipc_sk(sk);
1759 1769 struct tipc_port *port = &tsk->port;
1760 1770 struct sk_buff *buf;
  1771 + u32 peer;
1761 1772 int res;
1762 1773  
1763 1774 if (how != SHUT_RDWR)
... ... @@ -1778,7 +1789,8 @@
1778 1789 goto restart;
1779 1790 }
1780 1791 tipc_port_disconnect(port->ref);
1781   - tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
  1792 + if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN))
  1793 + tipc_link_xmit2(buf, peer, 0);
1782 1794 } else {
1783 1795 tipc_port_shutdown(port->ref);
1784 1796 }