Commit 2f2d76cc3e938389feee671b46252dde6880b3b7

Authored by Benjamin Poirier
Committed by David S. Miller
1 parent 0343c5543b

dlm: Do not allocate a fd for peeloff

avoids allocating a fd that a) propagates to every kernel thread and
usermodehelper b) is not properly released.

References: http://article.gmane.org/gmane.linux.network.drbd/22529
Signed-off-by: Benjamin Poirier <bpoirier@suse.de>
Signed-off-by: David S. Miller <davem@davemloft.net>

Showing 1 changed file with 8 additions and 14 deletions Inline Diff

1 /****************************************************************************** 1 /******************************************************************************
2 ******************************************************************************* 2 *******************************************************************************
3 ** 3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 5 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
6 ** 6 **
7 ** This copyrighted material is made available to anyone wishing to use, 7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2. 9 ** of the GNU General Public License v.2.
10 ** 10 **
11 ******************************************************************************* 11 *******************************************************************************
12 ******************************************************************************/ 12 ******************************************************************************/
13 13
14 /* 14 /*
15 * lowcomms.c 15 * lowcomms.c
16 * 16 *
17 * This is the "low-level" comms layer. 17 * This is the "low-level" comms layer.
18 * 18 *
19 * It is responsible for sending/receiving messages 19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster. 20 * from other nodes in the cluster.
21 * 21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are 22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to 23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is its 24 * be expanded for the cluster infrastructure then that is its
25 * responsibility. It is this layer's 25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or 26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication. 27 * whatever it needs for inter-node communication.
28 * 28 *
29 * The comms level is two kernel threads that deal mainly with 29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them 30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the 31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and 32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections 33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed 34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times 35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together 36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block. 37 * messages bound for one node and send them in one block.
38 * 38 *
39 * lowcomms will choose to use either TCP or SCTP as its transport layer 39 * lowcomms will choose to use either TCP or SCTP as its transport layer
40 * depending on the configuration variable 'protocol'. This should be set 40 * depending on the configuration variable 'protocol'. This should be set
41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42 * cluster-wide mechanism as it must be the same on all nodes of the cluster 42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
43 * for the DLM to function. 43 * for the DLM to function.
44 * 44 *
45 */ 45 */
46 46
47 #include <asm/ioctls.h> 47 #include <asm/ioctls.h>
48 #include <net/sock.h> 48 #include <net/sock.h>
49 #include <net/tcp.h> 49 #include <net/tcp.h>
50 #include <linux/pagemap.h> 50 #include <linux/pagemap.h>
51 #include <linux/file.h> 51 #include <linux/file.h>
52 #include <linux/mutex.h> 52 #include <linux/mutex.h>
53 #include <linux/sctp.h> 53 #include <linux/sctp.h>
54 #include <linux/slab.h> 54 #include <linux/slab.h>
55 #include <net/sctp/sctp.h>
55 #include <net/sctp/user.h> 56 #include <net/sctp/user.h>
56 #include <net/ipv6.h> 57 #include <net/ipv6.h>
57 58
58 #include "dlm_internal.h" 59 #include "dlm_internal.h"
59 #include "lowcomms.h" 60 #include "lowcomms.h"
60 #include "midcomms.h" 61 #include "midcomms.h"
61 #include "config.h" 62 #include "config.h"
62 63
63 #define NEEDED_RMEM (4*1024*1024) 64 #define NEEDED_RMEM (4*1024*1024)
64 #define CONN_HASH_SIZE 32 65 #define CONN_HASH_SIZE 32
65 66
66 /* Number of messages to send before rescheduling */ 67 /* Number of messages to send before rescheduling */
67 #define MAX_SEND_MSG_COUNT 25 68 #define MAX_SEND_MSG_COUNT 25
68 69
69 struct cbuf { 70 struct cbuf {
70 unsigned int base; 71 unsigned int base;
71 unsigned int len; 72 unsigned int len;
72 unsigned int mask; 73 unsigned int mask;
73 }; 74 };
74 75
75 static void cbuf_add(struct cbuf *cb, int n) 76 static void cbuf_add(struct cbuf *cb, int n)
76 { 77 {
77 cb->len += n; 78 cb->len += n;
78 } 79 }
79 80
80 static int cbuf_data(struct cbuf *cb) 81 static int cbuf_data(struct cbuf *cb)
81 { 82 {
82 return ((cb->base + cb->len) & cb->mask); 83 return ((cb->base + cb->len) & cb->mask);
83 } 84 }
84 85
85 static void cbuf_init(struct cbuf *cb, int size) 86 static void cbuf_init(struct cbuf *cb, int size)
86 { 87 {
87 cb->base = cb->len = 0; 88 cb->base = cb->len = 0;
88 cb->mask = size-1; 89 cb->mask = size-1;
89 } 90 }
90 91
91 static void cbuf_eat(struct cbuf *cb, int n) 92 static void cbuf_eat(struct cbuf *cb, int n)
92 { 93 {
93 cb->len -= n; 94 cb->len -= n;
94 cb->base += n; 95 cb->base += n;
95 cb->base &= cb->mask; 96 cb->base &= cb->mask;
96 } 97 }
97 98
98 static bool cbuf_empty(struct cbuf *cb) 99 static bool cbuf_empty(struct cbuf *cb)
99 { 100 {
100 return cb->len == 0; 101 return cb->len == 0;
101 } 102 }
102 103
103 struct connection { 104 struct connection {
104 struct socket *sock; /* NULL if not connected */ 105 struct socket *sock; /* NULL if not connected */
105 uint32_t nodeid; /* So we know who we are in the list */ 106 uint32_t nodeid; /* So we know who we are in the list */
106 struct mutex sock_mutex; 107 struct mutex sock_mutex;
107 unsigned long flags; 108 unsigned long flags;
108 #define CF_READ_PENDING 1 109 #define CF_READ_PENDING 1
109 #define CF_WRITE_PENDING 2 110 #define CF_WRITE_PENDING 2
110 #define CF_CONNECT_PENDING 3 111 #define CF_CONNECT_PENDING 3
111 #define CF_INIT_PENDING 4 112 #define CF_INIT_PENDING 4
112 #define CF_IS_OTHERCON 5 113 #define CF_IS_OTHERCON 5
113 #define CF_CLOSE 6 114 #define CF_CLOSE 6
114 #define CF_APP_LIMITED 7 115 #define CF_APP_LIMITED 7
115 struct list_head writequeue; /* List of outgoing writequeue_entries */ 116 struct list_head writequeue; /* List of outgoing writequeue_entries */
116 spinlock_t writequeue_lock; 117 spinlock_t writequeue_lock;
117 int (*rx_action) (struct connection *); /* What to do when active */ 118 int (*rx_action) (struct connection *); /* What to do when active */
118 void (*connect_action) (struct connection *); /* What to do to connect */ 119 void (*connect_action) (struct connection *); /* What to do to connect */
119 struct page *rx_page; 120 struct page *rx_page;
120 struct cbuf cb; 121 struct cbuf cb;
121 int retries; 122 int retries;
122 #define MAX_CONNECT_RETRIES 3 123 #define MAX_CONNECT_RETRIES 3
123 int sctp_assoc; 124 int sctp_assoc;
124 struct hlist_node list; 125 struct hlist_node list;
125 struct connection *othercon; 126 struct connection *othercon;
126 struct work_struct rwork; /* Receive workqueue */ 127 struct work_struct rwork; /* Receive workqueue */
127 struct work_struct swork; /* Send workqueue */ 128 struct work_struct swork; /* Send workqueue */
128 }; 129 };
129 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 130 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
130 131
131 /* An entry waiting to be sent */ 132 /* An entry waiting to be sent */
132 struct writequeue_entry { 133 struct writequeue_entry {
133 struct list_head list; 134 struct list_head list;
134 struct page *page; 135 struct page *page;
135 int offset; 136 int offset;
136 int len; 137 int len;
137 int end; 138 int end;
138 int users; 139 int users;
139 struct connection *con; 140 struct connection *con;
140 }; 141 };
141 142
142 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 143 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
143 static int dlm_local_count; 144 static int dlm_local_count;
144 145
145 /* Work queues */ 146 /* Work queues */
146 static struct workqueue_struct *recv_workqueue; 147 static struct workqueue_struct *recv_workqueue;
147 static struct workqueue_struct *send_workqueue; 148 static struct workqueue_struct *send_workqueue;
148 149
149 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 150 static struct hlist_head connection_hash[CONN_HASH_SIZE];
150 static DEFINE_MUTEX(connections_lock); 151 static DEFINE_MUTEX(connections_lock);
151 static struct kmem_cache *con_cache; 152 static struct kmem_cache *con_cache;
152 153
153 static void process_recv_sockets(struct work_struct *work); 154 static void process_recv_sockets(struct work_struct *work);
154 static void process_send_sockets(struct work_struct *work); 155 static void process_send_sockets(struct work_struct *work);
155 156
156 157
157 /* This is deliberately very simple because most clusters have simple 158 /* This is deliberately very simple because most clusters have simple
158 sequential nodeids, so we should be able to go straight to a connection 159 sequential nodeids, so we should be able to go straight to a connection
159 struct in the array */ 160 struct in the array */
160 static inline int nodeid_hash(int nodeid) 161 static inline int nodeid_hash(int nodeid)
161 { 162 {
162 return nodeid & (CONN_HASH_SIZE-1); 163 return nodeid & (CONN_HASH_SIZE-1);
163 } 164 }
164 165
165 static struct connection *__find_con(int nodeid) 166 static struct connection *__find_con(int nodeid)
166 { 167 {
167 int r; 168 int r;
168 struct hlist_node *h; 169 struct hlist_node *h;
169 struct connection *con; 170 struct connection *con;
170 171
171 r = nodeid_hash(nodeid); 172 r = nodeid_hash(nodeid);
172 173
173 hlist_for_each_entry(con, h, &connection_hash[r], list) { 174 hlist_for_each_entry(con, h, &connection_hash[r], list) {
174 if (con->nodeid == nodeid) 175 if (con->nodeid == nodeid)
175 return con; 176 return con;
176 } 177 }
177 return NULL; 178 return NULL;
178 } 179 }
179 180
180 /* 181 /*
181 * If 'allocation' is zero then we don't attempt to create a new 182 * If 'allocation' is zero then we don't attempt to create a new
182 * connection structure for this node. 183 * connection structure for this node.
183 */ 184 */
184 static struct connection *__nodeid2con(int nodeid, gfp_t alloc) 185 static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
185 { 186 {
186 struct connection *con = NULL; 187 struct connection *con = NULL;
187 int r; 188 int r;
188 189
189 con = __find_con(nodeid); 190 con = __find_con(nodeid);
190 if (con || !alloc) 191 if (con || !alloc)
191 return con; 192 return con;
192 193
193 con = kmem_cache_zalloc(con_cache, alloc); 194 con = kmem_cache_zalloc(con_cache, alloc);
194 if (!con) 195 if (!con)
195 return NULL; 196 return NULL;
196 197
197 r = nodeid_hash(nodeid); 198 r = nodeid_hash(nodeid);
198 hlist_add_head(&con->list, &connection_hash[r]); 199 hlist_add_head(&con->list, &connection_hash[r]);
199 200
200 con->nodeid = nodeid; 201 con->nodeid = nodeid;
201 mutex_init(&con->sock_mutex); 202 mutex_init(&con->sock_mutex);
202 INIT_LIST_HEAD(&con->writequeue); 203 INIT_LIST_HEAD(&con->writequeue);
203 spin_lock_init(&con->writequeue_lock); 204 spin_lock_init(&con->writequeue_lock);
204 INIT_WORK(&con->swork, process_send_sockets); 205 INIT_WORK(&con->swork, process_send_sockets);
205 INIT_WORK(&con->rwork, process_recv_sockets); 206 INIT_WORK(&con->rwork, process_recv_sockets);
206 207
207 /* Setup action pointers for child sockets */ 208 /* Setup action pointers for child sockets */
208 if (con->nodeid) { 209 if (con->nodeid) {
209 struct connection *zerocon = __find_con(0); 210 struct connection *zerocon = __find_con(0);
210 211
211 con->connect_action = zerocon->connect_action; 212 con->connect_action = zerocon->connect_action;
212 if (!con->rx_action) 213 if (!con->rx_action)
213 con->rx_action = zerocon->rx_action; 214 con->rx_action = zerocon->rx_action;
214 } 215 }
215 216
216 return con; 217 return con;
217 } 218 }
218 219
219 /* Loop round all connections */ 220 /* Loop round all connections */
220 static void foreach_conn(void (*conn_func)(struct connection *c)) 221 static void foreach_conn(void (*conn_func)(struct connection *c))
221 { 222 {
222 int i; 223 int i;
223 struct hlist_node *h, *n; 224 struct hlist_node *h, *n;
224 struct connection *con; 225 struct connection *con;
225 226
226 for (i = 0; i < CONN_HASH_SIZE; i++) { 227 for (i = 0; i < CONN_HASH_SIZE; i++) {
227 hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){ 228 hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){
228 conn_func(con); 229 conn_func(con);
229 } 230 }
230 } 231 }
231 } 232 }
232 233
233 static struct connection *nodeid2con(int nodeid, gfp_t allocation) 234 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
234 { 235 {
235 struct connection *con; 236 struct connection *con;
236 237
237 mutex_lock(&connections_lock); 238 mutex_lock(&connections_lock);
238 con = __nodeid2con(nodeid, allocation); 239 con = __nodeid2con(nodeid, allocation);
239 mutex_unlock(&connections_lock); 240 mutex_unlock(&connections_lock);
240 241
241 return con; 242 return con;
242 } 243 }
243 244
244 /* This is a bit drastic, but only called when things go wrong */ 245 /* This is a bit drastic, but only called when things go wrong */
245 static struct connection *assoc2con(int assoc_id) 246 static struct connection *assoc2con(int assoc_id)
246 { 247 {
247 int i; 248 int i;
248 struct hlist_node *h; 249 struct hlist_node *h;
249 struct connection *con; 250 struct connection *con;
250 251
251 mutex_lock(&connections_lock); 252 mutex_lock(&connections_lock);
252 253
253 for (i = 0 ; i < CONN_HASH_SIZE; i++) { 254 for (i = 0 ; i < CONN_HASH_SIZE; i++) {
254 hlist_for_each_entry(con, h, &connection_hash[i], list) { 255 hlist_for_each_entry(con, h, &connection_hash[i], list) {
255 if (con->sctp_assoc == assoc_id) { 256 if (con->sctp_assoc == assoc_id) {
256 mutex_unlock(&connections_lock); 257 mutex_unlock(&connections_lock);
257 return con; 258 return con;
258 } 259 }
259 } 260 }
260 } 261 }
261 mutex_unlock(&connections_lock); 262 mutex_unlock(&connections_lock);
262 return NULL; 263 return NULL;
263 } 264 }
264 265
265 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) 266 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
266 { 267 {
267 struct sockaddr_storage addr; 268 struct sockaddr_storage addr;
268 int error; 269 int error;
269 270
270 if (!dlm_local_count) 271 if (!dlm_local_count)
271 return -1; 272 return -1;
272 273
273 error = dlm_nodeid_to_addr(nodeid, &addr); 274 error = dlm_nodeid_to_addr(nodeid, &addr);
274 if (error) 275 if (error)
275 return error; 276 return error;
276 277
277 if (dlm_local_addr[0]->ss_family == AF_INET) { 278 if (dlm_local_addr[0]->ss_family == AF_INET) {
278 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; 279 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
279 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; 280 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
280 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 281 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
281 } else { 282 } else {
282 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; 283 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
283 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; 284 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
284 ret6->sin6_addr = in6->sin6_addr; 285 ret6->sin6_addr = in6->sin6_addr;
285 } 286 }
286 287
287 return 0; 288 return 0;
288 } 289 }
289 290
290 /* Data available on socket or listen socket received a connect */ 291 /* Data available on socket or listen socket received a connect */
291 static void lowcomms_data_ready(struct sock *sk, int count_unused) 292 static void lowcomms_data_ready(struct sock *sk, int count_unused)
292 { 293 {
293 struct connection *con = sock2con(sk); 294 struct connection *con = sock2con(sk);
294 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 295 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
295 queue_work(recv_workqueue, &con->rwork); 296 queue_work(recv_workqueue, &con->rwork);
296 } 297 }
297 298
298 static void lowcomms_write_space(struct sock *sk) 299 static void lowcomms_write_space(struct sock *sk)
299 { 300 {
300 struct connection *con = sock2con(sk); 301 struct connection *con = sock2con(sk);
301 302
302 if (!con) 303 if (!con)
303 return; 304 return;
304 305
305 clear_bit(SOCK_NOSPACE, &con->sock->flags); 306 clear_bit(SOCK_NOSPACE, &con->sock->flags);
306 307
307 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 308 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
308 con->sock->sk->sk_write_pending--; 309 con->sock->sk->sk_write_pending--;
309 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags); 310 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
310 } 311 }
311 312
312 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 313 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
313 queue_work(send_workqueue, &con->swork); 314 queue_work(send_workqueue, &con->swork);
314 } 315 }
315 316
316 static inline void lowcomms_connect_sock(struct connection *con) 317 static inline void lowcomms_connect_sock(struct connection *con)
317 { 318 {
318 if (test_bit(CF_CLOSE, &con->flags)) 319 if (test_bit(CF_CLOSE, &con->flags))
319 return; 320 return;
320 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 321 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
321 queue_work(send_workqueue, &con->swork); 322 queue_work(send_workqueue, &con->swork);
322 } 323 }
323 324
324 static void lowcomms_state_change(struct sock *sk) 325 static void lowcomms_state_change(struct sock *sk)
325 { 326 {
326 if (sk->sk_state == TCP_ESTABLISHED) 327 if (sk->sk_state == TCP_ESTABLISHED)
327 lowcomms_write_space(sk); 328 lowcomms_write_space(sk);
328 } 329 }
329 330
330 int dlm_lowcomms_connect_node(int nodeid) 331 int dlm_lowcomms_connect_node(int nodeid)
331 { 332 {
332 struct connection *con; 333 struct connection *con;
333 334
334 /* with sctp there's no connecting without sending */ 335 /* with sctp there's no connecting without sending */
335 if (dlm_config.ci_protocol != 0) 336 if (dlm_config.ci_protocol != 0)
336 return 0; 337 return 0;
337 338
338 if (nodeid == dlm_our_nodeid()) 339 if (nodeid == dlm_our_nodeid())
339 return 0; 340 return 0;
340 341
341 con = nodeid2con(nodeid, GFP_NOFS); 342 con = nodeid2con(nodeid, GFP_NOFS);
342 if (!con) 343 if (!con)
343 return -ENOMEM; 344 return -ENOMEM;
344 lowcomms_connect_sock(con); 345 lowcomms_connect_sock(con);
345 return 0; 346 return 0;
346 } 347 }
347 348
348 /* Make a socket active */ 349 /* Make a socket active */
349 static int add_sock(struct socket *sock, struct connection *con) 350 static int add_sock(struct socket *sock, struct connection *con)
350 { 351 {
351 con->sock = sock; 352 con->sock = sock;
352 353
353 /* Install a data_ready callback */ 354 /* Install a data_ready callback */
354 con->sock->sk->sk_data_ready = lowcomms_data_ready; 355 con->sock->sk->sk_data_ready = lowcomms_data_ready;
355 con->sock->sk->sk_write_space = lowcomms_write_space; 356 con->sock->sk->sk_write_space = lowcomms_write_space;
356 con->sock->sk->sk_state_change = lowcomms_state_change; 357 con->sock->sk->sk_state_change = lowcomms_state_change;
357 con->sock->sk->sk_user_data = con; 358 con->sock->sk->sk_user_data = con;
358 con->sock->sk->sk_allocation = GFP_NOFS; 359 con->sock->sk->sk_allocation = GFP_NOFS;
359 return 0; 360 return 0;
360 } 361 }
361 362
362 /* Add the port number to an IPv6 or 4 sockaddr and return the address 363 /* Add the port number to an IPv6 or 4 sockaddr and return the address
363 length */ 364 length */
364 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 365 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
365 int *addr_len) 366 int *addr_len)
366 { 367 {
367 saddr->ss_family = dlm_local_addr[0]->ss_family; 368 saddr->ss_family = dlm_local_addr[0]->ss_family;
368 if (saddr->ss_family == AF_INET) { 369 if (saddr->ss_family == AF_INET) {
369 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 370 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
370 in4_addr->sin_port = cpu_to_be16(port); 371 in4_addr->sin_port = cpu_to_be16(port);
371 *addr_len = sizeof(struct sockaddr_in); 372 *addr_len = sizeof(struct sockaddr_in);
372 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 373 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
373 } else { 374 } else {
374 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 375 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
375 in6_addr->sin6_port = cpu_to_be16(port); 376 in6_addr->sin6_port = cpu_to_be16(port);
376 *addr_len = sizeof(struct sockaddr_in6); 377 *addr_len = sizeof(struct sockaddr_in6);
377 } 378 }
378 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 379 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
379 } 380 }
380 381
381 /* Close a remote connection and tidy up */ 382 /* Close a remote connection and tidy up */
382 static void close_connection(struct connection *con, bool and_other) 383 static void close_connection(struct connection *con, bool and_other)
383 { 384 {
384 mutex_lock(&con->sock_mutex); 385 mutex_lock(&con->sock_mutex);
385 386
386 if (con->sock) { 387 if (con->sock) {
387 sock_release(con->sock); 388 sock_release(con->sock);
388 con->sock = NULL; 389 con->sock = NULL;
389 } 390 }
390 if (con->othercon && and_other) { 391 if (con->othercon && and_other) {
391 /* Will only re-enter once. */ 392 /* Will only re-enter once. */
392 close_connection(con->othercon, false); 393 close_connection(con->othercon, false);
393 } 394 }
394 if (con->rx_page) { 395 if (con->rx_page) {
395 __free_page(con->rx_page); 396 __free_page(con->rx_page);
396 con->rx_page = NULL; 397 con->rx_page = NULL;
397 } 398 }
398 399
399 con->retries = 0; 400 con->retries = 0;
400 mutex_unlock(&con->sock_mutex); 401 mutex_unlock(&con->sock_mutex);
401 } 402 }
402 403
403 /* We only send shutdown messages to nodes that are not part of the cluster */ 404 /* We only send shutdown messages to nodes that are not part of the cluster */
404 static void sctp_send_shutdown(sctp_assoc_t associd) 405 static void sctp_send_shutdown(sctp_assoc_t associd)
405 { 406 {
406 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 407 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
407 struct msghdr outmessage; 408 struct msghdr outmessage;
408 struct cmsghdr *cmsg; 409 struct cmsghdr *cmsg;
409 struct sctp_sndrcvinfo *sinfo; 410 struct sctp_sndrcvinfo *sinfo;
410 int ret; 411 int ret;
411 struct connection *con; 412 struct connection *con;
412 413
413 con = nodeid2con(0,0); 414 con = nodeid2con(0,0);
414 BUG_ON(con == NULL); 415 BUG_ON(con == NULL);
415 416
416 outmessage.msg_name = NULL; 417 outmessage.msg_name = NULL;
417 outmessage.msg_namelen = 0; 418 outmessage.msg_namelen = 0;
418 outmessage.msg_control = outcmsg; 419 outmessage.msg_control = outcmsg;
419 outmessage.msg_controllen = sizeof(outcmsg); 420 outmessage.msg_controllen = sizeof(outcmsg);
420 outmessage.msg_flags = MSG_EOR; 421 outmessage.msg_flags = MSG_EOR;
421 422
422 cmsg = CMSG_FIRSTHDR(&outmessage); 423 cmsg = CMSG_FIRSTHDR(&outmessage);
423 cmsg->cmsg_level = IPPROTO_SCTP; 424 cmsg->cmsg_level = IPPROTO_SCTP;
424 cmsg->cmsg_type = SCTP_SNDRCV; 425 cmsg->cmsg_type = SCTP_SNDRCV;
425 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 426 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
426 outmessage.msg_controllen = cmsg->cmsg_len; 427 outmessage.msg_controllen = cmsg->cmsg_len;
427 sinfo = CMSG_DATA(cmsg); 428 sinfo = CMSG_DATA(cmsg);
428 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 429 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
429 430
430 sinfo->sinfo_flags |= MSG_EOF; 431 sinfo->sinfo_flags |= MSG_EOF;
431 sinfo->sinfo_assoc_id = associd; 432 sinfo->sinfo_assoc_id = associd;
432 433
433 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); 434 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
434 435
435 if (ret != 0) 436 if (ret != 0)
436 log_print("send EOF to node failed: %d", ret); 437 log_print("send EOF to node failed: %d", ret);
437 } 438 }
438 439
439 static void sctp_init_failed_foreach(struct connection *con) 440 static void sctp_init_failed_foreach(struct connection *con)
440 { 441 {
441 con->sctp_assoc = 0; 442 con->sctp_assoc = 0;
442 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 443 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
443 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 444 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
444 queue_work(send_workqueue, &con->swork); 445 queue_work(send_workqueue, &con->swork);
445 } 446 }
446 } 447 }
447 448
448 /* INIT failed but we don't know which node... 449 /* INIT failed but we don't know which node...
449 restart INIT on all pending nodes */ 450 restart INIT on all pending nodes */
450 static void sctp_init_failed(void) 451 static void sctp_init_failed(void)
451 { 452 {
452 mutex_lock(&connections_lock); 453 mutex_lock(&connections_lock);
453 454
454 foreach_conn(sctp_init_failed_foreach); 455 foreach_conn(sctp_init_failed_foreach);
455 456
456 mutex_unlock(&connections_lock); 457 mutex_unlock(&connections_lock);
457 } 458 }
458 459
459 /* Something happened to an association */ 460 /* Something happened to an association */
460 static void process_sctp_notification(struct connection *con, 461 static void process_sctp_notification(struct connection *con,
461 struct msghdr *msg, char *buf) 462 struct msghdr *msg, char *buf)
462 { 463 {
463 union sctp_notification *sn = (union sctp_notification *)buf; 464 union sctp_notification *sn = (union sctp_notification *)buf;
464 465
465 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { 466 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
466 switch (sn->sn_assoc_change.sac_state) { 467 switch (sn->sn_assoc_change.sac_state) {
467 468
468 case SCTP_COMM_UP: 469 case SCTP_COMM_UP:
469 case SCTP_RESTART: 470 case SCTP_RESTART:
470 { 471 {
471 /* Check that the new node is in the lockspace */ 472 /* Check that the new node is in the lockspace */
472 struct sctp_prim prim; 473 struct sctp_prim prim;
473 int nodeid; 474 int nodeid;
474 int prim_len, ret; 475 int prim_len, ret;
475 int addr_len; 476 int addr_len;
476 struct connection *new_con; 477 struct connection *new_con;
477 sctp_peeloff_arg_t parg;
478 int parglen = sizeof(parg);
479 int err;
480 478
481 /* 479 /*
482 * We get this before any data for an association. 480 * We get this before any data for an association.
483 * We verify that the node is in the cluster and 481 * We verify that the node is in the cluster and
484 * then peel off a socket for it. 482 * then peel off a socket for it.
485 */ 483 */
486 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 484 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
487 log_print("COMM_UP for invalid assoc ID %d", 485 log_print("COMM_UP for invalid assoc ID %d",
488 (int)sn->sn_assoc_change.sac_assoc_id); 486 (int)sn->sn_assoc_change.sac_assoc_id);
489 sctp_init_failed(); 487 sctp_init_failed();
490 return; 488 return;
491 } 489 }
492 memset(&prim, 0, sizeof(struct sctp_prim)); 490 memset(&prim, 0, sizeof(struct sctp_prim));
493 prim_len = sizeof(struct sctp_prim); 491 prim_len = sizeof(struct sctp_prim);
494 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; 492 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
495 493
496 ret = kernel_getsockopt(con->sock, 494 ret = kernel_getsockopt(con->sock,
497 IPPROTO_SCTP, 495 IPPROTO_SCTP,
498 SCTP_PRIMARY_ADDR, 496 SCTP_PRIMARY_ADDR,
499 (char*)&prim, 497 (char*)&prim,
500 &prim_len); 498 &prim_len);
501 if (ret < 0) { 499 if (ret < 0) {
502 log_print("getsockopt/sctp_primary_addr on " 500 log_print("getsockopt/sctp_primary_addr on "
503 "new assoc %d failed : %d", 501 "new assoc %d failed : %d",
504 (int)sn->sn_assoc_change.sac_assoc_id, 502 (int)sn->sn_assoc_change.sac_assoc_id,
505 ret); 503 ret);
506 504
507 /* Retry INIT later */ 505 /* Retry INIT later */
508 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 506 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
509 if (new_con) 507 if (new_con)
510 clear_bit(CF_CONNECT_PENDING, &con->flags); 508 clear_bit(CF_CONNECT_PENDING, &con->flags);
511 return; 509 return;
512 } 510 }
513 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 511 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
514 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 512 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
515 unsigned char *b=(unsigned char *)&prim.ssp_addr; 513 unsigned char *b=(unsigned char *)&prim.ssp_addr;
516 log_print("reject connect from unknown addr"); 514 log_print("reject connect from unknown addr");
517 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 515 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
518 b, sizeof(struct sockaddr_storage)); 516 b, sizeof(struct sockaddr_storage));
519 sctp_send_shutdown(prim.ssp_assoc_id); 517 sctp_send_shutdown(prim.ssp_assoc_id);
520 return; 518 return;
521 } 519 }
522 520
523 new_con = nodeid2con(nodeid, GFP_NOFS); 521 new_con = nodeid2con(nodeid, GFP_NOFS);
524 if (!new_con) 522 if (!new_con)
525 return; 523 return;
526 524
527 /* Peel off a new sock */ 525 /* Peel off a new sock */
528 parg.associd = sn->sn_assoc_change.sac_assoc_id; 526 sctp_lock_sock(con->sock->sk);
529 ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, 527 ret = sctp_do_peeloff(con->sock->sk,
530 SCTP_SOCKOPT_PEELOFF, 528 sn->sn_assoc_change.sac_assoc_id,
531 (void *)&parg, &parglen); 529 &new_con->sock);
530 sctp_release_sock(con->sock->sk);
532 if (ret < 0) { 531 if (ret < 0) {
533 log_print("Can't peel off a socket for " 532 log_print("Can't peel off a socket for "
534 "connection %d to node %d: err=%d", 533 "connection %d to node %d: err=%d",
535 parg.associd, nodeid, ret); 534 (int)sn->sn_assoc_change.sac_assoc_id,
535 nodeid, ret);
536 return; 536 return;
537 } 537 }
538 new_con->sock = sockfd_lookup(parg.sd, &err);
539 if (!new_con->sock) {
540 log_print("sockfd_lookup error %d", err);
541 return;
542 }
543 add_sock(new_con->sock, new_con); 538 add_sock(new_con->sock, new_con);
544 sockfd_put(new_con->sock);
545 539
546 log_print("connecting to %d sctp association %d", 540 log_print("connecting to %d sctp association %d",
547 nodeid, (int)sn->sn_assoc_change.sac_assoc_id); 541 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
548 542
549 /* Send any pending writes */ 543 /* Send any pending writes */
550 clear_bit(CF_CONNECT_PENDING, &new_con->flags); 544 clear_bit(CF_CONNECT_PENDING, &new_con->flags);
551 clear_bit(CF_INIT_PENDING, &con->flags); 545 clear_bit(CF_INIT_PENDING, &con->flags);
552 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { 546 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
553 queue_work(send_workqueue, &new_con->swork); 547 queue_work(send_workqueue, &new_con->swork);
554 } 548 }
555 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) 549 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
556 queue_work(recv_workqueue, &new_con->rwork); 550 queue_work(recv_workqueue, &new_con->rwork);
557 } 551 }
558 break; 552 break;
559 553
560 case SCTP_COMM_LOST: 554 case SCTP_COMM_LOST:
561 case SCTP_SHUTDOWN_COMP: 555 case SCTP_SHUTDOWN_COMP:
562 { 556 {
563 con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 557 con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
564 if (con) { 558 if (con) {
565 con->sctp_assoc = 0; 559 con->sctp_assoc = 0;
566 } 560 }
567 } 561 }
568 break; 562 break;
569 563
570 /* We don't know which INIT failed, so clear the PENDING flags 564 /* We don't know which INIT failed, so clear the PENDING flags
571 * on them all. if assoc_id is zero then it will then try 565 * on them all. if assoc_id is zero then it will then try
572 * again */ 566 * again */
573 567
574 case SCTP_CANT_STR_ASSOC: 568 case SCTP_CANT_STR_ASSOC:
575 { 569 {
576 log_print("Can't start SCTP association - retrying"); 570 log_print("Can't start SCTP association - retrying");
577 sctp_init_failed(); 571 sctp_init_failed();
578 } 572 }
579 break; 573 break;
580 574
581 default: 575 default:
582 log_print("unexpected SCTP assoc change id=%d state=%d", 576 log_print("unexpected SCTP assoc change id=%d state=%d",
583 (int)sn->sn_assoc_change.sac_assoc_id, 577 (int)sn->sn_assoc_change.sac_assoc_id,
584 sn->sn_assoc_change.sac_state); 578 sn->sn_assoc_change.sac_state);
585 } 579 }
586 } 580 }
587 } 581 }
588 582
589 /* Data received from remote end */ 583 /* Data received from remote end */
590 static int receive_from_sock(struct connection *con) 584 static int receive_from_sock(struct connection *con)
591 { 585 {
592 int ret = 0; 586 int ret = 0;
593 struct msghdr msg = {}; 587 struct msghdr msg = {};
594 struct kvec iov[2]; 588 struct kvec iov[2];
595 unsigned len; 589 unsigned len;
596 int r; 590 int r;
597 int call_again_soon = 0; 591 int call_again_soon = 0;
598 int nvec; 592 int nvec;
599 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 593 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
600 594
601 mutex_lock(&con->sock_mutex); 595 mutex_lock(&con->sock_mutex);
602 596
603 if (con->sock == NULL) { 597 if (con->sock == NULL) {
604 ret = -EAGAIN; 598 ret = -EAGAIN;
605 goto out_close; 599 goto out_close;
606 } 600 }
607 601
608 if (con->rx_page == NULL) { 602 if (con->rx_page == NULL) {
609 /* 603 /*
610 * This doesn't need to be atomic, but I think it should 604 * This doesn't need to be atomic, but I think it should
611 * improve performance if it is. 605 * improve performance if it is.
612 */ 606 */
613 con->rx_page = alloc_page(GFP_ATOMIC); 607 con->rx_page = alloc_page(GFP_ATOMIC);
614 if (con->rx_page == NULL) 608 if (con->rx_page == NULL)
615 goto out_resched; 609 goto out_resched;
616 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 610 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
617 } 611 }
618 612
619 /* Only SCTP needs these really */ 613 /* Only SCTP needs these really */
620 memset(&incmsg, 0, sizeof(incmsg)); 614 memset(&incmsg, 0, sizeof(incmsg));
621 msg.msg_control = incmsg; 615 msg.msg_control = incmsg;
622 msg.msg_controllen = sizeof(incmsg); 616 msg.msg_controllen = sizeof(incmsg);
623 617
624 /* 618 /*
625 * iov[0] is the bit of the circular buffer between the current end 619 * iov[0] is the bit of the circular buffer between the current end
626 * point (cb.base + cb.len) and the end of the buffer. 620 * point (cb.base + cb.len) and the end of the buffer.
627 */ 621 */
628 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 622 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
629 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 623 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
630 iov[1].iov_len = 0; 624 iov[1].iov_len = 0;
631 nvec = 1; 625 nvec = 1;
632 626
633 /* 627 /*
634 * iov[1] is the bit of the circular buffer between the start of the 628 * iov[1] is the bit of the circular buffer between the start of the
635 * buffer and the start of the currently used section (cb.base) 629 * buffer and the start of the currently used section (cb.base)
636 */ 630 */
637 if (cbuf_data(&con->cb) >= con->cb.base) { 631 if (cbuf_data(&con->cb) >= con->cb.base) {
638 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); 632 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
639 iov[1].iov_len = con->cb.base; 633 iov[1].iov_len = con->cb.base;
640 iov[1].iov_base = page_address(con->rx_page); 634 iov[1].iov_base = page_address(con->rx_page);
641 nvec = 2; 635 nvec = 2;
642 } 636 }
643 len = iov[0].iov_len + iov[1].iov_len; 637 len = iov[0].iov_len + iov[1].iov_len;
644 638
645 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 639 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
646 MSG_DONTWAIT | MSG_NOSIGNAL); 640 MSG_DONTWAIT | MSG_NOSIGNAL);
647 if (ret <= 0) 641 if (ret <= 0)
648 goto out_close; 642 goto out_close;
649 643
650 /* Process SCTP notifications */ 644 /* Process SCTP notifications */
651 if (msg.msg_flags & MSG_NOTIFICATION) { 645 if (msg.msg_flags & MSG_NOTIFICATION) {
652 msg.msg_control = incmsg; 646 msg.msg_control = incmsg;
653 msg.msg_controllen = sizeof(incmsg); 647 msg.msg_controllen = sizeof(incmsg);
654 648
655 process_sctp_notification(con, &msg, 649 process_sctp_notification(con, &msg,
656 page_address(con->rx_page) + con->cb.base); 650 page_address(con->rx_page) + con->cb.base);
657 mutex_unlock(&con->sock_mutex); 651 mutex_unlock(&con->sock_mutex);
658 return 0; 652 return 0;
659 } 653 }
660 BUG_ON(con->nodeid == 0); 654 BUG_ON(con->nodeid == 0);
661 655
662 if (ret == len) 656 if (ret == len)
663 call_again_soon = 1; 657 call_again_soon = 1;
664 cbuf_add(&con->cb, ret); 658 cbuf_add(&con->cb, ret);
665 ret = dlm_process_incoming_buffer(con->nodeid, 659 ret = dlm_process_incoming_buffer(con->nodeid,
666 page_address(con->rx_page), 660 page_address(con->rx_page),
667 con->cb.base, con->cb.len, 661 con->cb.base, con->cb.len,
668 PAGE_CACHE_SIZE); 662 PAGE_CACHE_SIZE);
669 if (ret == -EBADMSG) { 663 if (ret == -EBADMSG) {
670 log_print("lowcomms: addr=%p, base=%u, len=%u, " 664 log_print("lowcomms: addr=%p, base=%u, len=%u, "
671 "iov_len=%u, iov_base[0]=%p, read=%d", 665 "iov_len=%u, iov_base[0]=%p, read=%d",
672 page_address(con->rx_page), con->cb.base, con->cb.len, 666 page_address(con->rx_page), con->cb.base, con->cb.len,
673 len, iov[0].iov_base, r); 667 len, iov[0].iov_base, r);
674 } 668 }
675 if (ret < 0) 669 if (ret < 0)
676 goto out_close; 670 goto out_close;
677 cbuf_eat(&con->cb, ret); 671 cbuf_eat(&con->cb, ret);
678 672
679 if (cbuf_empty(&con->cb) && !call_again_soon) { 673 if (cbuf_empty(&con->cb) && !call_again_soon) {
680 __free_page(con->rx_page); 674 __free_page(con->rx_page);
681 con->rx_page = NULL; 675 con->rx_page = NULL;
682 } 676 }
683 677
684 if (call_again_soon) 678 if (call_again_soon)
685 goto out_resched; 679 goto out_resched;
686 mutex_unlock(&con->sock_mutex); 680 mutex_unlock(&con->sock_mutex);
687 return 0; 681 return 0;
688 682
689 out_resched: 683 out_resched:
690 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 684 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
691 queue_work(recv_workqueue, &con->rwork); 685 queue_work(recv_workqueue, &con->rwork);
692 mutex_unlock(&con->sock_mutex); 686 mutex_unlock(&con->sock_mutex);
693 return -EAGAIN; 687 return -EAGAIN;
694 688
695 out_close: 689 out_close:
696 mutex_unlock(&con->sock_mutex); 690 mutex_unlock(&con->sock_mutex);
697 if (ret != -EAGAIN) { 691 if (ret != -EAGAIN) {
698 close_connection(con, false); 692 close_connection(con, false);
699 /* Reconnect when there is something to send */ 693 /* Reconnect when there is something to send */
700 } 694 }
701 /* Don't return success if we really got EOF */ 695 /* Don't return success if we really got EOF */
702 if (ret == 0) 696 if (ret == 0)
703 ret = -EAGAIN; 697 ret = -EAGAIN;
704 698
705 return ret; 699 return ret;
706 } 700 }
707 701
708 /* Listening socket is busy, accept a connection */ 702 /* Listening socket is busy, accept a connection */
709 static int tcp_accept_from_sock(struct connection *con) 703 static int tcp_accept_from_sock(struct connection *con)
710 { 704 {
711 int result; 705 int result;
712 struct sockaddr_storage peeraddr; 706 struct sockaddr_storage peeraddr;
713 struct socket *newsock; 707 struct socket *newsock;
714 int len; 708 int len;
715 int nodeid; 709 int nodeid;
716 struct connection *newcon; 710 struct connection *newcon;
717 struct connection *addcon; 711 struct connection *addcon;
718 712
719 memset(&peeraddr, 0, sizeof(peeraddr)); 713 memset(&peeraddr, 0, sizeof(peeraddr));
720 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 714 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
721 IPPROTO_TCP, &newsock); 715 IPPROTO_TCP, &newsock);
722 if (result < 0) 716 if (result < 0)
723 return -ENOMEM; 717 return -ENOMEM;
724 718
725 mutex_lock_nested(&con->sock_mutex, 0); 719 mutex_lock_nested(&con->sock_mutex, 0);
726 720
727 result = -ENOTCONN; 721 result = -ENOTCONN;
728 if (con->sock == NULL) 722 if (con->sock == NULL)
729 goto accept_err; 723 goto accept_err;
730 724
731 newsock->type = con->sock->type; 725 newsock->type = con->sock->type;
732 newsock->ops = con->sock->ops; 726 newsock->ops = con->sock->ops;
733 727
734 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); 728 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
735 if (result < 0) 729 if (result < 0)
736 goto accept_err; 730 goto accept_err;
737 731
738 /* Get the connected socket's peer */ 732 /* Get the connected socket's peer */
739 memset(&peeraddr, 0, sizeof(peeraddr)); 733 memset(&peeraddr, 0, sizeof(peeraddr));
740 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 734 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
741 &len, 2)) { 735 &len, 2)) {
742 result = -ECONNABORTED; 736 result = -ECONNABORTED;
743 goto accept_err; 737 goto accept_err;
744 } 738 }
745 739
746 /* Get the new node's NODEID */ 740 /* Get the new node's NODEID */
747 make_sockaddr(&peeraddr, 0, &len); 741 make_sockaddr(&peeraddr, 0, &len);
748 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 742 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
749 unsigned char *b=(unsigned char *)&peeraddr; 743 unsigned char *b=(unsigned char *)&peeraddr;
750 log_print("connect from non cluster node"); 744 log_print("connect from non cluster node");
751 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 745 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
752 b, sizeof(struct sockaddr_storage)); 746 b, sizeof(struct sockaddr_storage));
753 sock_release(newsock); 747 sock_release(newsock);
754 mutex_unlock(&con->sock_mutex); 748 mutex_unlock(&con->sock_mutex);
755 return -1; 749 return -1;
756 } 750 }
757 751
758 log_print("got connection from %d", nodeid); 752 log_print("got connection from %d", nodeid);
759 753
760 /* Check to see if we already have a connection to this node. This 754 /* Check to see if we already have a connection to this node. This
761 * could happen if the two nodes initiate a connection at roughly 755 * could happen if the two nodes initiate a connection at roughly
762 * the same time and the connections cross on the wire. 756 * the same time and the connections cross on the wire.
763 * In this case we store the incoming one in "othercon" 757 * In this case we store the incoming one in "othercon"
764 */ 758 */
765 newcon = nodeid2con(nodeid, GFP_NOFS); 759 newcon = nodeid2con(nodeid, GFP_NOFS);
766 if (!newcon) { 760 if (!newcon) {
767 result = -ENOMEM; 761 result = -ENOMEM;
768 goto accept_err; 762 goto accept_err;
769 } 763 }
770 mutex_lock_nested(&newcon->sock_mutex, 1); 764 mutex_lock_nested(&newcon->sock_mutex, 1);
771 if (newcon->sock) { 765 if (newcon->sock) {
772 struct connection *othercon = newcon->othercon; 766 struct connection *othercon = newcon->othercon;
773 767
774 if (!othercon) { 768 if (!othercon) {
775 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); 769 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
776 if (!othercon) { 770 if (!othercon) {
777 log_print("failed to allocate incoming socket"); 771 log_print("failed to allocate incoming socket");
778 mutex_unlock(&newcon->sock_mutex); 772 mutex_unlock(&newcon->sock_mutex);
779 result = -ENOMEM; 773 result = -ENOMEM;
780 goto accept_err; 774 goto accept_err;
781 } 775 }
782 othercon->nodeid = nodeid; 776 othercon->nodeid = nodeid;
783 othercon->rx_action = receive_from_sock; 777 othercon->rx_action = receive_from_sock;
784 mutex_init(&othercon->sock_mutex); 778 mutex_init(&othercon->sock_mutex);
785 INIT_WORK(&othercon->swork, process_send_sockets); 779 INIT_WORK(&othercon->swork, process_send_sockets);
786 INIT_WORK(&othercon->rwork, process_recv_sockets); 780 INIT_WORK(&othercon->rwork, process_recv_sockets);
787 set_bit(CF_IS_OTHERCON, &othercon->flags); 781 set_bit(CF_IS_OTHERCON, &othercon->flags);
788 } 782 }
789 if (!othercon->sock) { 783 if (!othercon->sock) {
790 newcon->othercon = othercon; 784 newcon->othercon = othercon;
791 othercon->sock = newsock; 785 othercon->sock = newsock;
792 newsock->sk->sk_user_data = othercon; 786 newsock->sk->sk_user_data = othercon;
793 add_sock(newsock, othercon); 787 add_sock(newsock, othercon);
794 addcon = othercon; 788 addcon = othercon;
795 } 789 }
796 else { 790 else {
797 printk("Extra connection from node %d attempted\n", nodeid); 791 printk("Extra connection from node %d attempted\n", nodeid);
798 result = -EAGAIN; 792 result = -EAGAIN;
799 mutex_unlock(&newcon->sock_mutex); 793 mutex_unlock(&newcon->sock_mutex);
800 goto accept_err; 794 goto accept_err;
801 } 795 }
802 } 796 }
803 else { 797 else {
804 newsock->sk->sk_user_data = newcon; 798 newsock->sk->sk_user_data = newcon;
805 newcon->rx_action = receive_from_sock; 799 newcon->rx_action = receive_from_sock;
806 add_sock(newsock, newcon); 800 add_sock(newsock, newcon);
807 addcon = newcon; 801 addcon = newcon;
808 } 802 }
809 803
810 mutex_unlock(&newcon->sock_mutex); 804 mutex_unlock(&newcon->sock_mutex);
811 805
812 /* 806 /*
813 * Add it to the active queue in case we got data 807 * Add it to the active queue in case we got data
814 * between processing the accept adding the socket 808 * between processing the accept adding the socket
815 * to the read_sockets list 809 * to the read_sockets list
816 */ 810 */
817 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 811 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
818 queue_work(recv_workqueue, &addcon->rwork); 812 queue_work(recv_workqueue, &addcon->rwork);
819 mutex_unlock(&con->sock_mutex); 813 mutex_unlock(&con->sock_mutex);
820 814
821 return 0; 815 return 0;
822 816
823 accept_err: 817 accept_err:
824 mutex_unlock(&con->sock_mutex); 818 mutex_unlock(&con->sock_mutex);
825 sock_release(newsock); 819 sock_release(newsock);
826 820
827 if (result != -EAGAIN) 821 if (result != -EAGAIN)
828 log_print("error accepting connection from node: %d", result); 822 log_print("error accepting connection from node: %d", result);
829 return result; 823 return result;
830 } 824 }
831 825
832 static void free_entry(struct writequeue_entry *e) 826 static void free_entry(struct writequeue_entry *e)
833 { 827 {
834 __free_page(e->page); 828 __free_page(e->page);
835 kfree(e); 829 kfree(e);
836 } 830 }
837 831
838 /* Initiate an SCTP association. 832 /* Initiate an SCTP association.
839 This is a special case of send_to_sock() in that we don't yet have a 833 This is a special case of send_to_sock() in that we don't yet have a
840 peeled-off socket for this association, so we use the listening socket 834 peeled-off socket for this association, so we use the listening socket
841 and add the primary IP address of the remote node. 835 and add the primary IP address of the remote node.
842 */ 836 */
843 static void sctp_init_assoc(struct connection *con) 837 static void sctp_init_assoc(struct connection *con)
844 { 838 {
845 struct sockaddr_storage rem_addr; 839 struct sockaddr_storage rem_addr;
846 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 840 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
847 struct msghdr outmessage; 841 struct msghdr outmessage;
848 struct cmsghdr *cmsg; 842 struct cmsghdr *cmsg;
849 struct sctp_sndrcvinfo *sinfo; 843 struct sctp_sndrcvinfo *sinfo;
850 struct connection *base_con; 844 struct connection *base_con;
851 struct writequeue_entry *e; 845 struct writequeue_entry *e;
852 int len, offset; 846 int len, offset;
853 int ret; 847 int ret;
854 int addrlen; 848 int addrlen;
855 struct kvec iov[1]; 849 struct kvec iov[1];
856 850
857 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 851 if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
858 return; 852 return;
859 853
860 if (con->retries++ > MAX_CONNECT_RETRIES) 854 if (con->retries++ > MAX_CONNECT_RETRIES)
861 return; 855 return;
862 856
863 if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { 857 if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) {
864 log_print("no address for nodeid %d", con->nodeid); 858 log_print("no address for nodeid %d", con->nodeid);
865 return; 859 return;
866 } 860 }
867 base_con = nodeid2con(0, 0); 861 base_con = nodeid2con(0, 0);
868 BUG_ON(base_con == NULL); 862 BUG_ON(base_con == NULL);
869 863
870 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); 864 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
871 865
872 outmessage.msg_name = &rem_addr; 866 outmessage.msg_name = &rem_addr;
873 outmessage.msg_namelen = addrlen; 867 outmessage.msg_namelen = addrlen;
874 outmessage.msg_control = outcmsg; 868 outmessage.msg_control = outcmsg;
875 outmessage.msg_controllen = sizeof(outcmsg); 869 outmessage.msg_controllen = sizeof(outcmsg);
876 outmessage.msg_flags = MSG_EOR; 870 outmessage.msg_flags = MSG_EOR;
877 871
878 spin_lock(&con->writequeue_lock); 872 spin_lock(&con->writequeue_lock);
879 873
880 if (list_empty(&con->writequeue)) { 874 if (list_empty(&con->writequeue)) {
881 spin_unlock(&con->writequeue_lock); 875 spin_unlock(&con->writequeue_lock);
882 log_print("writequeue empty for nodeid %d", con->nodeid); 876 log_print("writequeue empty for nodeid %d", con->nodeid);
883 return; 877 return;
884 } 878 }
885 879
886 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 880 e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
887 len = e->len; 881 len = e->len;
888 offset = e->offset; 882 offset = e->offset;
889 spin_unlock(&con->writequeue_lock); 883 spin_unlock(&con->writequeue_lock);
890 884
891 /* Send the first block off the write queue */ 885 /* Send the first block off the write queue */
892 iov[0].iov_base = page_address(e->page)+offset; 886 iov[0].iov_base = page_address(e->page)+offset;
893 iov[0].iov_len = len; 887 iov[0].iov_len = len;
894 888
895 cmsg = CMSG_FIRSTHDR(&outmessage); 889 cmsg = CMSG_FIRSTHDR(&outmessage);
896 cmsg->cmsg_level = IPPROTO_SCTP; 890 cmsg->cmsg_level = IPPROTO_SCTP;
897 cmsg->cmsg_type = SCTP_SNDRCV; 891 cmsg->cmsg_type = SCTP_SNDRCV;
898 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 892 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
899 sinfo = CMSG_DATA(cmsg); 893 sinfo = CMSG_DATA(cmsg);
900 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 894 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
901 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); 895 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
902 outmessage.msg_controllen = cmsg->cmsg_len; 896 outmessage.msg_controllen = cmsg->cmsg_len;
903 897
904 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); 898 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
905 if (ret < 0) { 899 if (ret < 0) {
906 log_print("Send first packet to node %d failed: %d", 900 log_print("Send first packet to node %d failed: %d",
907 con->nodeid, ret); 901 con->nodeid, ret);
908 902
909 /* Try again later */ 903 /* Try again later */
910 clear_bit(CF_CONNECT_PENDING, &con->flags); 904 clear_bit(CF_CONNECT_PENDING, &con->flags);
911 clear_bit(CF_INIT_PENDING, &con->flags); 905 clear_bit(CF_INIT_PENDING, &con->flags);
912 } 906 }
913 else { 907 else {
914 spin_lock(&con->writequeue_lock); 908 spin_lock(&con->writequeue_lock);
915 e->offset += ret; 909 e->offset += ret;
916 e->len -= ret; 910 e->len -= ret;
917 911
918 if (e->len == 0 && e->users == 0) { 912 if (e->len == 0 && e->users == 0) {
919 list_del(&e->list); 913 list_del(&e->list);
920 free_entry(e); 914 free_entry(e);
921 } 915 }
922 spin_unlock(&con->writequeue_lock); 916 spin_unlock(&con->writequeue_lock);
923 } 917 }
924 } 918 }
925 919
926 /* Connect a new socket to its peer */ 920 /* Connect a new socket to its peer */
927 static void tcp_connect_to_sock(struct connection *con) 921 static void tcp_connect_to_sock(struct connection *con)
928 { 922 {
929 int result = -EHOSTUNREACH; 923 int result = -EHOSTUNREACH;
930 struct sockaddr_storage saddr, src_addr; 924 struct sockaddr_storage saddr, src_addr;
931 int addr_len; 925 int addr_len;
932 struct socket *sock = NULL; 926 struct socket *sock = NULL;
933 int one = 1; 927 int one = 1;
934 928
935 if (con->nodeid == 0) { 929 if (con->nodeid == 0) {
936 log_print("attempt to connect sock 0 foiled"); 930 log_print("attempt to connect sock 0 foiled");
937 return; 931 return;
938 } 932 }
939 933
940 mutex_lock(&con->sock_mutex); 934 mutex_lock(&con->sock_mutex);
941 if (con->retries++ > MAX_CONNECT_RETRIES) 935 if (con->retries++ > MAX_CONNECT_RETRIES)
942 goto out; 936 goto out;
943 937
944 /* Some odd races can cause double-connects, ignore them */ 938 /* Some odd races can cause double-connects, ignore them */
945 if (con->sock) { 939 if (con->sock) {
946 result = 0; 940 result = 0;
947 goto out; 941 goto out;
948 } 942 }
949 943
950 /* Create a socket to communicate with */ 944 /* Create a socket to communicate with */
951 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 945 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
952 IPPROTO_TCP, &sock); 946 IPPROTO_TCP, &sock);
953 if (result < 0) 947 if (result < 0)
954 goto out_err; 948 goto out_err;
955 949
956 memset(&saddr, 0, sizeof(saddr)); 950 memset(&saddr, 0, sizeof(saddr));
957 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) 951 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
958 goto out_err; 952 goto out_err;
959 953
960 sock->sk->sk_user_data = con; 954 sock->sk->sk_user_data = con;
961 con->rx_action = receive_from_sock; 955 con->rx_action = receive_from_sock;
962 con->connect_action = tcp_connect_to_sock; 956 con->connect_action = tcp_connect_to_sock;
963 add_sock(sock, con); 957 add_sock(sock, con);
964 958
965 /* Bind to our cluster-known address connecting to avoid 959 /* Bind to our cluster-known address connecting to avoid
966 routing problems */ 960 routing problems */
967 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 961 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
968 make_sockaddr(&src_addr, 0, &addr_len); 962 make_sockaddr(&src_addr, 0, &addr_len);
969 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 963 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
970 addr_len); 964 addr_len);
971 if (result < 0) { 965 if (result < 0) {
972 log_print("could not bind for connect: %d", result); 966 log_print("could not bind for connect: %d", result);
973 /* This *may* not indicate a critical error */ 967 /* This *may* not indicate a critical error */
974 } 968 }
975 969
976 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 970 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
977 971
978 log_print("connecting to %d", con->nodeid); 972 log_print("connecting to %d", con->nodeid);
979 973
980 /* Turn off Nagle's algorithm */ 974 /* Turn off Nagle's algorithm */
981 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 975 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
982 sizeof(one)); 976 sizeof(one));
983 977
984 result = 978 result =
985 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 979 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
986 O_NONBLOCK); 980 O_NONBLOCK);
987 if (result == -EINPROGRESS) 981 if (result == -EINPROGRESS)
988 result = 0; 982 result = 0;
989 if (result == 0) 983 if (result == 0)
990 goto out; 984 goto out;
991 985
992 out_err: 986 out_err:
993 if (con->sock) { 987 if (con->sock) {
994 sock_release(con->sock); 988 sock_release(con->sock);
995 con->sock = NULL; 989 con->sock = NULL;
996 } else if (sock) { 990 } else if (sock) {
997 sock_release(sock); 991 sock_release(sock);
998 } 992 }
999 /* 993 /*
1000 * Some errors are fatal and this list might need adjusting. For other 994 * Some errors are fatal and this list might need adjusting. For other
1001 * errors we try again until the max number of retries is reached. 995 * errors we try again until the max number of retries is reached.
1002 */ 996 */
1003 if (result != -EHOSTUNREACH && result != -ENETUNREACH && 997 if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
1004 result != -ENETDOWN && result != -EINVAL 998 result != -ENETDOWN && result != -EINVAL
1005 && result != -EPROTONOSUPPORT) { 999 && result != -EPROTONOSUPPORT) {
1006 lowcomms_connect_sock(con); 1000 lowcomms_connect_sock(con);
1007 result = 0; 1001 result = 0;
1008 } 1002 }
1009 out: 1003 out:
1010 mutex_unlock(&con->sock_mutex); 1004 mutex_unlock(&con->sock_mutex);
1011 return; 1005 return;
1012 } 1006 }
1013 1007
1014 static struct socket *tcp_create_listen_sock(struct connection *con, 1008 static struct socket *tcp_create_listen_sock(struct connection *con,
1015 struct sockaddr_storage *saddr) 1009 struct sockaddr_storage *saddr)
1016 { 1010 {
1017 struct socket *sock = NULL; 1011 struct socket *sock = NULL;
1018 int result = 0; 1012 int result = 0;
1019 int one = 1; 1013 int one = 1;
1020 int addr_len; 1014 int addr_len;
1021 1015
1022 if (dlm_local_addr[0]->ss_family == AF_INET) 1016 if (dlm_local_addr[0]->ss_family == AF_INET)
1023 addr_len = sizeof(struct sockaddr_in); 1017 addr_len = sizeof(struct sockaddr_in);
1024 else 1018 else
1025 addr_len = sizeof(struct sockaddr_in6); 1019 addr_len = sizeof(struct sockaddr_in6);
1026 1020
1027 /* Create a socket to communicate with */ 1021 /* Create a socket to communicate with */
1028 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1022 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1029 IPPROTO_TCP, &sock); 1023 IPPROTO_TCP, &sock);
1030 if (result < 0) { 1024 if (result < 0) {
1031 log_print("Can't create listening comms socket"); 1025 log_print("Can't create listening comms socket");
1032 goto create_out; 1026 goto create_out;
1033 } 1027 }
1034 1028
1035 /* Turn off Nagle's algorithm */ 1029 /* Turn off Nagle's algorithm */
1036 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1030 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1037 sizeof(one)); 1031 sizeof(one));
1038 1032
1039 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1033 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1040 (char *)&one, sizeof(one)); 1034 (char *)&one, sizeof(one));
1041 1035
1042 if (result < 0) { 1036 if (result < 0) {
1043 log_print("Failed to set SO_REUSEADDR on socket: %d", result); 1037 log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1044 } 1038 }
1045 sock->sk->sk_user_data = con; 1039 sock->sk->sk_user_data = con;
1046 con->rx_action = tcp_accept_from_sock; 1040 con->rx_action = tcp_accept_from_sock;
1047 con->connect_action = tcp_connect_to_sock; 1041 con->connect_action = tcp_connect_to_sock;
1048 con->sock = sock; 1042 con->sock = sock;
1049 1043
1050 /* Bind to our port */ 1044 /* Bind to our port */
1051 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1045 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1052 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1046 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1053 if (result < 0) { 1047 if (result < 0) {
1054 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1048 log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1055 sock_release(sock); 1049 sock_release(sock);
1056 sock = NULL; 1050 sock = NULL;
1057 con->sock = NULL; 1051 con->sock = NULL;
1058 goto create_out; 1052 goto create_out;
1059 } 1053 }
1060 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1054 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1061 (char *)&one, sizeof(one)); 1055 (char *)&one, sizeof(one));
1062 if (result < 0) { 1056 if (result < 0) {
1063 log_print("Set keepalive failed: %d", result); 1057 log_print("Set keepalive failed: %d", result);
1064 } 1058 }
1065 1059
1066 result = sock->ops->listen(sock, 5); 1060 result = sock->ops->listen(sock, 5);
1067 if (result < 0) { 1061 if (result < 0) {
1068 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1062 log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1069 sock_release(sock); 1063 sock_release(sock);
1070 sock = NULL; 1064 sock = NULL;
1071 goto create_out; 1065 goto create_out;
1072 } 1066 }
1073 1067
1074 create_out: 1068 create_out:
1075 return sock; 1069 return sock;
1076 } 1070 }
1077 1071
1078 /* Get local addresses */ 1072 /* Get local addresses */
1079 static void init_local(void) 1073 static void init_local(void)
1080 { 1074 {
1081 struct sockaddr_storage sas, *addr; 1075 struct sockaddr_storage sas, *addr;
1082 int i; 1076 int i;
1083 1077
1084 dlm_local_count = 0; 1078 dlm_local_count = 0;
1085 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { 1079 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
1086 if (dlm_our_addr(&sas, i)) 1080 if (dlm_our_addr(&sas, i))
1087 break; 1081 break;
1088 1082
1089 addr = kmalloc(sizeof(*addr), GFP_NOFS); 1083 addr = kmalloc(sizeof(*addr), GFP_NOFS);
1090 if (!addr) 1084 if (!addr)
1091 break; 1085 break;
1092 memcpy(addr, &sas, sizeof(*addr)); 1086 memcpy(addr, &sas, sizeof(*addr));
1093 dlm_local_addr[dlm_local_count++] = addr; 1087 dlm_local_addr[dlm_local_count++] = addr;
1094 } 1088 }
1095 } 1089 }
1096 1090
1097 /* Bind to an IP address. SCTP allows multiple address so it can do 1091 /* Bind to an IP address. SCTP allows multiple address so it can do
1098 multi-homing */ 1092 multi-homing */
1099 static int add_sctp_bind_addr(struct connection *sctp_con, 1093 static int add_sctp_bind_addr(struct connection *sctp_con,
1100 struct sockaddr_storage *addr, 1094 struct sockaddr_storage *addr,
1101 int addr_len, int num) 1095 int addr_len, int num)
1102 { 1096 {
1103 int result = 0; 1097 int result = 0;
1104 1098
1105 if (num == 1) 1099 if (num == 1)
1106 result = kernel_bind(sctp_con->sock, 1100 result = kernel_bind(sctp_con->sock,
1107 (struct sockaddr *) addr, 1101 (struct sockaddr *) addr,
1108 addr_len); 1102 addr_len);
1109 else 1103 else
1110 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, 1104 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1111 SCTP_SOCKOPT_BINDX_ADD, 1105 SCTP_SOCKOPT_BINDX_ADD,
1112 (char *)addr, addr_len); 1106 (char *)addr, addr_len);
1113 1107
1114 if (result < 0) 1108 if (result < 0)
1115 log_print("Can't bind to port %d addr number %d", 1109 log_print("Can't bind to port %d addr number %d",
1116 dlm_config.ci_tcp_port, num); 1110 dlm_config.ci_tcp_port, num);
1117 1111
1118 return result; 1112 return result;
1119 } 1113 }
1120 1114
1121 /* Initialise SCTP socket and bind to all interfaces */ 1115 /* Initialise SCTP socket and bind to all interfaces */
1122 static int sctp_listen_for_all(void) 1116 static int sctp_listen_for_all(void)
1123 { 1117 {
1124 struct socket *sock = NULL; 1118 struct socket *sock = NULL;
1125 struct sockaddr_storage localaddr; 1119 struct sockaddr_storage localaddr;
1126 struct sctp_event_subscribe subscribe; 1120 struct sctp_event_subscribe subscribe;
1127 int result = -EINVAL, num = 1, i, addr_len; 1121 int result = -EINVAL, num = 1, i, addr_len;
1128 struct connection *con = nodeid2con(0, GFP_NOFS); 1122 struct connection *con = nodeid2con(0, GFP_NOFS);
1129 int bufsize = NEEDED_RMEM; 1123 int bufsize = NEEDED_RMEM;
1130 1124
1131 if (!con) 1125 if (!con)
1132 return -ENOMEM; 1126 return -ENOMEM;
1133 1127
1134 log_print("Using SCTP for communications"); 1128 log_print("Using SCTP for communications");
1135 1129
1136 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, 1130 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1137 IPPROTO_SCTP, &sock); 1131 IPPROTO_SCTP, &sock);
1138 if (result < 0) { 1132 if (result < 0) {
1139 log_print("Can't create comms socket, check SCTP is loaded"); 1133 log_print("Can't create comms socket, check SCTP is loaded");
1140 goto out; 1134 goto out;
1141 } 1135 }
1142 1136
1143 /* Listen for events */ 1137 /* Listen for events */
1144 memset(&subscribe, 0, sizeof(subscribe)); 1138 memset(&subscribe, 0, sizeof(subscribe));
1145 subscribe.sctp_data_io_event = 1; 1139 subscribe.sctp_data_io_event = 1;
1146 subscribe.sctp_association_event = 1; 1140 subscribe.sctp_association_event = 1;
1147 subscribe.sctp_send_failure_event = 1; 1141 subscribe.sctp_send_failure_event = 1;
1148 subscribe.sctp_shutdown_event = 1; 1142 subscribe.sctp_shutdown_event = 1;
1149 subscribe.sctp_partial_delivery_event = 1; 1143 subscribe.sctp_partial_delivery_event = 1;
1150 1144
1151 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 1145 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1152 (char *)&bufsize, sizeof(bufsize)); 1146 (char *)&bufsize, sizeof(bufsize));
1153 if (result) 1147 if (result)
1154 log_print("Error increasing buffer space on socket %d", result); 1148 log_print("Error increasing buffer space on socket %d", result);
1155 1149
1156 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, 1150 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1157 (char *)&subscribe, sizeof(subscribe)); 1151 (char *)&subscribe, sizeof(subscribe));
1158 if (result < 0) { 1152 if (result < 0) {
1159 log_print("Failed to set SCTP_EVENTS on socket: result=%d", 1153 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1160 result); 1154 result);
1161 goto create_delsock; 1155 goto create_delsock;
1162 } 1156 }
1163 1157
1164 /* Init con struct */ 1158 /* Init con struct */
1165 sock->sk->sk_user_data = con; 1159 sock->sk->sk_user_data = con;
1166 con->sock = sock; 1160 con->sock = sock;
1167 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1161 con->sock->sk->sk_data_ready = lowcomms_data_ready;
1168 con->rx_action = receive_from_sock; 1162 con->rx_action = receive_from_sock;
1169 con->connect_action = sctp_init_assoc; 1163 con->connect_action = sctp_init_assoc;
1170 1164
1171 /* Bind to all interfaces. */ 1165 /* Bind to all interfaces. */
1172 for (i = 0; i < dlm_local_count; i++) { 1166 for (i = 0; i < dlm_local_count; i++) {
1173 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 1167 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1174 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); 1168 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1175 1169
1176 result = add_sctp_bind_addr(con, &localaddr, addr_len, num); 1170 result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1177 if (result) 1171 if (result)
1178 goto create_delsock; 1172 goto create_delsock;
1179 ++num; 1173 ++num;
1180 } 1174 }
1181 1175
1182 result = sock->ops->listen(sock, 5); 1176 result = sock->ops->listen(sock, 5);
1183 if (result < 0) { 1177 if (result < 0) {
1184 log_print("Can't set socket listening"); 1178 log_print("Can't set socket listening");
1185 goto create_delsock; 1179 goto create_delsock;
1186 } 1180 }
1187 1181
1188 return 0; 1182 return 0;
1189 1183
1190 create_delsock: 1184 create_delsock:
1191 sock_release(sock); 1185 sock_release(sock);
1192 con->sock = NULL; 1186 con->sock = NULL;
1193 out: 1187 out:
1194 return result; 1188 return result;
1195 } 1189 }
1196 1190
1197 static int tcp_listen_for_all(void) 1191 static int tcp_listen_for_all(void)
1198 { 1192 {
1199 struct socket *sock = NULL; 1193 struct socket *sock = NULL;
1200 struct connection *con = nodeid2con(0, GFP_NOFS); 1194 struct connection *con = nodeid2con(0, GFP_NOFS);
1201 int result = -EINVAL; 1195 int result = -EINVAL;
1202 1196
1203 if (!con) 1197 if (!con)
1204 return -ENOMEM; 1198 return -ENOMEM;
1205 1199
1206 /* We don't support multi-homed hosts */ 1200 /* We don't support multi-homed hosts */
1207 if (dlm_local_addr[1] != NULL) { 1201 if (dlm_local_addr[1] != NULL) {
1208 log_print("TCP protocol can't handle multi-homed hosts, " 1202 log_print("TCP protocol can't handle multi-homed hosts, "
1209 "try SCTP"); 1203 "try SCTP");
1210 return -EINVAL; 1204 return -EINVAL;
1211 } 1205 }
1212 1206
1213 log_print("Using TCP for communications"); 1207 log_print("Using TCP for communications");
1214 1208
1215 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1209 sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1216 if (sock) { 1210 if (sock) {
1217 add_sock(sock, con); 1211 add_sock(sock, con);
1218 result = 0; 1212 result = 0;
1219 } 1213 }
1220 else { 1214 else {
1221 result = -EADDRINUSE; 1215 result = -EADDRINUSE;
1222 } 1216 }
1223 1217
1224 return result; 1218 return result;
1225 } 1219 }
1226 1220
1227 1221
1228 1222
1229 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1223 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1230 gfp_t allocation) 1224 gfp_t allocation)
1231 { 1225 {
1232 struct writequeue_entry *entry; 1226 struct writequeue_entry *entry;
1233 1227
1234 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1228 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1235 if (!entry) 1229 if (!entry)
1236 return NULL; 1230 return NULL;
1237 1231
1238 entry->page = alloc_page(allocation); 1232 entry->page = alloc_page(allocation);
1239 if (!entry->page) { 1233 if (!entry->page) {
1240 kfree(entry); 1234 kfree(entry);
1241 return NULL; 1235 return NULL;
1242 } 1236 }
1243 1237
1244 entry->offset = 0; 1238 entry->offset = 0;
1245 entry->len = 0; 1239 entry->len = 0;
1246 entry->end = 0; 1240 entry->end = 0;
1247 entry->users = 0; 1241 entry->users = 0;
1248 entry->con = con; 1242 entry->con = con;
1249 1243
1250 return entry; 1244 return entry;
1251 } 1245 }
1252 1246
1253 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1247 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1254 { 1248 {
1255 struct connection *con; 1249 struct connection *con;
1256 struct writequeue_entry *e; 1250 struct writequeue_entry *e;
1257 int offset = 0; 1251 int offset = 0;
1258 int users = 0; 1252 int users = 0;
1259 1253
1260 con = nodeid2con(nodeid, allocation); 1254 con = nodeid2con(nodeid, allocation);
1261 if (!con) 1255 if (!con)
1262 return NULL; 1256 return NULL;
1263 1257
1264 spin_lock(&con->writequeue_lock); 1258 spin_lock(&con->writequeue_lock);
1265 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1259 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1266 if ((&e->list == &con->writequeue) || 1260 if ((&e->list == &con->writequeue) ||
1267 (PAGE_CACHE_SIZE - e->end < len)) { 1261 (PAGE_CACHE_SIZE - e->end < len)) {
1268 e = NULL; 1262 e = NULL;
1269 } else { 1263 } else {
1270 offset = e->end; 1264 offset = e->end;
1271 e->end += len; 1265 e->end += len;
1272 users = e->users++; 1266 users = e->users++;
1273 } 1267 }
1274 spin_unlock(&con->writequeue_lock); 1268 spin_unlock(&con->writequeue_lock);
1275 1269
1276 if (e) { 1270 if (e) {
1277 got_one: 1271 got_one:
1278 *ppc = page_address(e->page) + offset; 1272 *ppc = page_address(e->page) + offset;
1279 return e; 1273 return e;
1280 } 1274 }
1281 1275
1282 e = new_writequeue_entry(con, allocation); 1276 e = new_writequeue_entry(con, allocation);
1283 if (e) { 1277 if (e) {
1284 spin_lock(&con->writequeue_lock); 1278 spin_lock(&con->writequeue_lock);
1285 offset = e->end; 1279 offset = e->end;
1286 e->end += len; 1280 e->end += len;
1287 users = e->users++; 1281 users = e->users++;
1288 list_add_tail(&e->list, &con->writequeue); 1282 list_add_tail(&e->list, &con->writequeue);
1289 spin_unlock(&con->writequeue_lock); 1283 spin_unlock(&con->writequeue_lock);
1290 goto got_one; 1284 goto got_one;
1291 } 1285 }
1292 return NULL; 1286 return NULL;
1293 } 1287 }
1294 1288
1295 void dlm_lowcomms_commit_buffer(void *mh) 1289 void dlm_lowcomms_commit_buffer(void *mh)
1296 { 1290 {
1297 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1291 struct writequeue_entry *e = (struct writequeue_entry *)mh;
1298 struct connection *con = e->con; 1292 struct connection *con = e->con;
1299 int users; 1293 int users;
1300 1294
1301 spin_lock(&con->writequeue_lock); 1295 spin_lock(&con->writequeue_lock);
1302 users = --e->users; 1296 users = --e->users;
1303 if (users) 1297 if (users)
1304 goto out; 1298 goto out;
1305 e->len = e->end - e->offset; 1299 e->len = e->end - e->offset;
1306 spin_unlock(&con->writequeue_lock); 1300 spin_unlock(&con->writequeue_lock);
1307 1301
1308 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { 1302 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1309 queue_work(send_workqueue, &con->swork); 1303 queue_work(send_workqueue, &con->swork);
1310 } 1304 }
1311 return; 1305 return;
1312 1306
1313 out: 1307 out:
1314 spin_unlock(&con->writequeue_lock); 1308 spin_unlock(&con->writequeue_lock);
1315 return; 1309 return;
1316 } 1310 }
1317 1311
1318 /* Send a message */ 1312 /* Send a message */
1319 static void send_to_sock(struct connection *con) 1313 static void send_to_sock(struct connection *con)
1320 { 1314 {
1321 int ret = 0; 1315 int ret = 0;
1322 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1316 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1323 struct writequeue_entry *e; 1317 struct writequeue_entry *e;
1324 int len, offset; 1318 int len, offset;
1325 int count = 0; 1319 int count = 0;
1326 1320
1327 mutex_lock(&con->sock_mutex); 1321 mutex_lock(&con->sock_mutex);
1328 if (con->sock == NULL) 1322 if (con->sock == NULL)
1329 goto out_connect; 1323 goto out_connect;
1330 1324
1331 spin_lock(&con->writequeue_lock); 1325 spin_lock(&con->writequeue_lock);
1332 for (;;) { 1326 for (;;) {
1333 e = list_entry(con->writequeue.next, struct writequeue_entry, 1327 e = list_entry(con->writequeue.next, struct writequeue_entry,
1334 list); 1328 list);
1335 if ((struct list_head *) e == &con->writequeue) 1329 if ((struct list_head *) e == &con->writequeue)
1336 break; 1330 break;
1337 1331
1338 len = e->len; 1332 len = e->len;
1339 offset = e->offset; 1333 offset = e->offset;
1340 BUG_ON(len == 0 && e->users == 0); 1334 BUG_ON(len == 0 && e->users == 0);
1341 spin_unlock(&con->writequeue_lock); 1335 spin_unlock(&con->writequeue_lock);
1342 1336
1343 ret = 0; 1337 ret = 0;
1344 if (len) { 1338 if (len) {
1345 ret = kernel_sendpage(con->sock, e->page, offset, len, 1339 ret = kernel_sendpage(con->sock, e->page, offset, len,
1346 msg_flags); 1340 msg_flags);
1347 if (ret == -EAGAIN || ret == 0) { 1341 if (ret == -EAGAIN || ret == 0) {
1348 if (ret == -EAGAIN && 1342 if (ret == -EAGAIN &&
1349 test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) && 1343 test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1350 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1344 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1351 /* Notify TCP that we're limited by the 1345 /* Notify TCP that we're limited by the
1352 * application window size. 1346 * application window size.
1353 */ 1347 */
1354 set_bit(SOCK_NOSPACE, &con->sock->flags); 1348 set_bit(SOCK_NOSPACE, &con->sock->flags);
1355 con->sock->sk->sk_write_pending++; 1349 con->sock->sk->sk_write_pending++;
1356 } 1350 }
1357 cond_resched(); 1351 cond_resched();
1358 goto out; 1352 goto out;
1359 } 1353 }
1360 if (ret <= 0) 1354 if (ret <= 0)
1361 goto send_error; 1355 goto send_error;
1362 } 1356 }
1363 1357
1364 /* Don't starve people filling buffers */ 1358 /* Don't starve people filling buffers */
1365 if (++count >= MAX_SEND_MSG_COUNT) { 1359 if (++count >= MAX_SEND_MSG_COUNT) {
1366 cond_resched(); 1360 cond_resched();
1367 count = 0; 1361 count = 0;
1368 } 1362 }
1369 1363
1370 spin_lock(&con->writequeue_lock); 1364 spin_lock(&con->writequeue_lock);
1371 e->offset += ret; 1365 e->offset += ret;
1372 e->len -= ret; 1366 e->len -= ret;
1373 1367
1374 if (e->len == 0 && e->users == 0) { 1368 if (e->len == 0 && e->users == 0) {
1375 list_del(&e->list); 1369 list_del(&e->list);
1376 free_entry(e); 1370 free_entry(e);
1377 continue; 1371 continue;
1378 } 1372 }
1379 } 1373 }
1380 spin_unlock(&con->writequeue_lock); 1374 spin_unlock(&con->writequeue_lock);
1381 out: 1375 out:
1382 mutex_unlock(&con->sock_mutex); 1376 mutex_unlock(&con->sock_mutex);
1383 return; 1377 return;
1384 1378
1385 send_error: 1379 send_error:
1386 mutex_unlock(&con->sock_mutex); 1380 mutex_unlock(&con->sock_mutex);
1387 close_connection(con, false); 1381 close_connection(con, false);
1388 lowcomms_connect_sock(con); 1382 lowcomms_connect_sock(con);
1389 return; 1383 return;
1390 1384
1391 out_connect: 1385 out_connect:
1392 mutex_unlock(&con->sock_mutex); 1386 mutex_unlock(&con->sock_mutex);
1393 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1387 if (!test_bit(CF_INIT_PENDING, &con->flags))
1394 lowcomms_connect_sock(con); 1388 lowcomms_connect_sock(con);
1395 return; 1389 return;
1396 } 1390 }
1397 1391
1398 static void clean_one_writequeue(struct connection *con) 1392 static void clean_one_writequeue(struct connection *con)
1399 { 1393 {
1400 struct writequeue_entry *e, *safe; 1394 struct writequeue_entry *e, *safe;
1401 1395
1402 spin_lock(&con->writequeue_lock); 1396 spin_lock(&con->writequeue_lock);
1403 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1397 list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1404 list_del(&e->list); 1398 list_del(&e->list);
1405 free_entry(e); 1399 free_entry(e);
1406 } 1400 }
1407 spin_unlock(&con->writequeue_lock); 1401 spin_unlock(&con->writequeue_lock);
1408 } 1402 }
1409 1403
1410 /* Called from recovery when it knows that a node has 1404 /* Called from recovery when it knows that a node has
1411 left the cluster */ 1405 left the cluster */
1412 int dlm_lowcomms_close(int nodeid) 1406 int dlm_lowcomms_close(int nodeid)
1413 { 1407 {
1414 struct connection *con; 1408 struct connection *con;
1415 1409
1416 log_print("closing connection to node %d", nodeid); 1410 log_print("closing connection to node %d", nodeid);
1417 con = nodeid2con(nodeid, 0); 1411 con = nodeid2con(nodeid, 0);
1418 if (con) { 1412 if (con) {
1419 clear_bit(CF_CONNECT_PENDING, &con->flags); 1413 clear_bit(CF_CONNECT_PENDING, &con->flags);
1420 clear_bit(CF_WRITE_PENDING, &con->flags); 1414 clear_bit(CF_WRITE_PENDING, &con->flags);
1421 set_bit(CF_CLOSE, &con->flags); 1415 set_bit(CF_CLOSE, &con->flags);
1422 if (cancel_work_sync(&con->swork)) 1416 if (cancel_work_sync(&con->swork))
1423 log_print("canceled swork for node %d", nodeid); 1417 log_print("canceled swork for node %d", nodeid);
1424 if (cancel_work_sync(&con->rwork)) 1418 if (cancel_work_sync(&con->rwork))
1425 log_print("canceled rwork for node %d", nodeid); 1419 log_print("canceled rwork for node %d", nodeid);
1426 clean_one_writequeue(con); 1420 clean_one_writequeue(con);
1427 close_connection(con, true); 1421 close_connection(con, true);
1428 } 1422 }
1429 return 0; 1423 return 0;
1430 } 1424 }
1431 1425
1432 /* Receive workqueue function */ 1426 /* Receive workqueue function */
1433 static void process_recv_sockets(struct work_struct *work) 1427 static void process_recv_sockets(struct work_struct *work)
1434 { 1428 {
1435 struct connection *con = container_of(work, struct connection, rwork); 1429 struct connection *con = container_of(work, struct connection, rwork);
1436 int err; 1430 int err;
1437 1431
1438 clear_bit(CF_READ_PENDING, &con->flags); 1432 clear_bit(CF_READ_PENDING, &con->flags);
1439 do { 1433 do {
1440 err = con->rx_action(con); 1434 err = con->rx_action(con);
1441 } while (!err); 1435 } while (!err);
1442 } 1436 }
1443 1437
1444 /* Send workqueue function */ 1438 /* Send workqueue function */
1445 static void process_send_sockets(struct work_struct *work) 1439 static void process_send_sockets(struct work_struct *work)
1446 { 1440 {
1447 struct connection *con = container_of(work, struct connection, swork); 1441 struct connection *con = container_of(work, struct connection, swork);
1448 1442
1449 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1443 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1450 con->connect_action(con); 1444 con->connect_action(con);
1451 set_bit(CF_WRITE_PENDING, &con->flags); 1445 set_bit(CF_WRITE_PENDING, &con->flags);
1452 } 1446 }
1453 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) 1447 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1454 send_to_sock(con); 1448 send_to_sock(con);
1455 } 1449 }
1456 1450
1457 1451
1458 /* Discard all entries on the write queues */ 1452 /* Discard all entries on the write queues */
1459 static void clean_writequeues(void) 1453 static void clean_writequeues(void)
1460 { 1454 {
1461 foreach_conn(clean_one_writequeue); 1455 foreach_conn(clean_one_writequeue);
1462 } 1456 }
1463 1457
1464 static void work_stop(void) 1458 static void work_stop(void)
1465 { 1459 {
1466 destroy_workqueue(recv_workqueue); 1460 destroy_workqueue(recv_workqueue);
1467 destroy_workqueue(send_workqueue); 1461 destroy_workqueue(send_workqueue);
1468 } 1462 }
1469 1463
1470 static int work_start(void) 1464 static int work_start(void)
1471 { 1465 {
1472 recv_workqueue = alloc_workqueue("dlm_recv", 1466 recv_workqueue = alloc_workqueue("dlm_recv",
1473 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1467 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1474 if (!recv_workqueue) { 1468 if (!recv_workqueue) {
1475 log_print("can't start dlm_recv"); 1469 log_print("can't start dlm_recv");
1476 return -ENOMEM; 1470 return -ENOMEM;
1477 } 1471 }
1478 1472
1479 send_workqueue = alloc_workqueue("dlm_send", 1473 send_workqueue = alloc_workqueue("dlm_send",
1480 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1474 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1481 if (!send_workqueue) { 1475 if (!send_workqueue) {
1482 log_print("can't start dlm_send"); 1476 log_print("can't start dlm_send");
1483 destroy_workqueue(recv_workqueue); 1477 destroy_workqueue(recv_workqueue);
1484 return -ENOMEM; 1478 return -ENOMEM;
1485 } 1479 }
1486 1480
1487 return 0; 1481 return 0;
1488 } 1482 }
1489 1483
1490 static void stop_conn(struct connection *con) 1484 static void stop_conn(struct connection *con)
1491 { 1485 {
1492 con->flags |= 0x0F; 1486 con->flags |= 0x0F;
1493 if (con->sock && con->sock->sk) 1487 if (con->sock && con->sock->sk)
1494 con->sock->sk->sk_user_data = NULL; 1488 con->sock->sk->sk_user_data = NULL;
1495 } 1489 }
1496 1490
1497 static void free_conn(struct connection *con) 1491 static void free_conn(struct connection *con)
1498 { 1492 {
1499 close_connection(con, true); 1493 close_connection(con, true);
1500 if (con->othercon) 1494 if (con->othercon)
1501 kmem_cache_free(con_cache, con->othercon); 1495 kmem_cache_free(con_cache, con->othercon);
1502 hlist_del(&con->list); 1496 hlist_del(&con->list);
1503 kmem_cache_free(con_cache, con); 1497 kmem_cache_free(con_cache, con);
1504 } 1498 }
1505 1499
1506 void dlm_lowcomms_stop(void) 1500 void dlm_lowcomms_stop(void)
1507 { 1501 {
1508 /* Set all the flags to prevent any 1502 /* Set all the flags to prevent any
1509 socket activity. 1503 socket activity.
1510 */ 1504 */
1511 mutex_lock(&connections_lock); 1505 mutex_lock(&connections_lock);
1512 foreach_conn(stop_conn); 1506 foreach_conn(stop_conn);
1513 mutex_unlock(&connections_lock); 1507 mutex_unlock(&connections_lock);
1514 1508
1515 work_stop(); 1509 work_stop();
1516 1510
1517 mutex_lock(&connections_lock); 1511 mutex_lock(&connections_lock);
1518 clean_writequeues(); 1512 clean_writequeues();
1519 1513
1520 foreach_conn(free_conn); 1514 foreach_conn(free_conn);
1521 1515
1522 mutex_unlock(&connections_lock); 1516 mutex_unlock(&connections_lock);
1523 kmem_cache_destroy(con_cache); 1517 kmem_cache_destroy(con_cache);
1524 } 1518 }
1525 1519
1526 int dlm_lowcomms_start(void) 1520 int dlm_lowcomms_start(void)
1527 { 1521 {
1528 int error = -EINVAL; 1522 int error = -EINVAL;
1529 struct connection *con; 1523 struct connection *con;
1530 int i; 1524 int i;
1531 1525
1532 for (i = 0; i < CONN_HASH_SIZE; i++) 1526 for (i = 0; i < CONN_HASH_SIZE; i++)
1533 INIT_HLIST_HEAD(&connection_hash[i]); 1527 INIT_HLIST_HEAD(&connection_hash[i]);
1534 1528
1535 init_local(); 1529 init_local();
1536 if (!dlm_local_count) { 1530 if (!dlm_local_count) {
1537 error = -ENOTCONN; 1531 error = -ENOTCONN;
1538 log_print("no local IP address has been set"); 1532 log_print("no local IP address has been set");
1539 goto out; 1533 goto out;
1540 } 1534 }
1541 1535
1542 error = -ENOMEM; 1536 error = -ENOMEM;
1543 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1537 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1544 __alignof__(struct connection), 0, 1538 __alignof__(struct connection), 0,
1545 NULL); 1539 NULL);
1546 if (!con_cache) 1540 if (!con_cache)
1547 goto out; 1541 goto out;
1548 1542
1549 /* Start listening */ 1543 /* Start listening */
1550 if (dlm_config.ci_protocol == 0) 1544 if (dlm_config.ci_protocol == 0)
1551 error = tcp_listen_for_all(); 1545 error = tcp_listen_for_all();
1552 else 1546 else
1553 error = sctp_listen_for_all(); 1547 error = sctp_listen_for_all();
1554 if (error) 1548 if (error)
1555 goto fail_unlisten; 1549 goto fail_unlisten;
1556 1550
1557 error = work_start(); 1551 error = work_start();
1558 if (error) 1552 if (error)
1559 goto fail_unlisten; 1553 goto fail_unlisten;
1560 1554
1561 return 0; 1555 return 0;
1562 1556
1563 fail_unlisten: 1557 fail_unlisten:
1564 con = nodeid2con(0,0); 1558 con = nodeid2con(0,0);
1565 if (con) { 1559 if (con) {
1566 close_connection(con, false); 1560 close_connection(con, false);
1567 kmem_cache_free(con_cache, con); 1561 kmem_cache_free(con_cache, con);
1568 } 1562 }
1569 kmem_cache_destroy(con_cache); 1563 kmem_cache_destroy(con_cache);
1570 1564
1571 out: 1565 out: