Blame view

net/ceph/messenger.c 63.3 KB
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1
  #include <linux/ceph/ceph_debug.h>
31b8006e1   Sage Weil   ceph: messenger l...
2
3
4
5
6
7
8
  
  #include <linux/crc32c.h>
  #include <linux/ctype.h>
  #include <linux/highmem.h>
  #include <linux/inet.h>
  #include <linux/kthread.h>
  #include <linux/net.h>
5a0e3ad6a   Tejun Heo   include cleanup: ...
9
  #include <linux/slab.h>
31b8006e1   Sage Weil   ceph: messenger l...
10
11
  #include <linux/socket.h>
  #include <linux/string.h>
68b4476b0   Yehuda Sadeh   ceph: messenger a...
12
13
  #include <linux/bio.h>
  #include <linux/blkdev.h>
ee3b56f26   Noah Watkins   ceph: use kernel ...
14
  #include <linux/dns_resolver.h>
31b8006e1   Sage Weil   ceph: messenger l...
15
  #include <net/tcp.h>
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
16
17
18
19
  #include <linux/ceph/libceph.h>
  #include <linux/ceph/messenger.h>
  #include <linux/ceph/decode.h>
  #include <linux/ceph/pagelist.h>
bc3b2d7fb   Paul Gortmaker   net: Add export.h...
20
  #include <linux/export.h>
31b8006e1   Sage Weil   ceph: messenger l...
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  
  /*
   * Ceph uses the messenger to exchange ceph_msg messages with other
   * hosts in the system.  The messenger provides ordered and reliable
   * delivery.  We tolerate TCP disconnects by reconnecting (with
   * exponential backoff) in the case of a fault (disconnection, bad
   * crc, protocol error).  Acks allow sent messages to be discarded by
   * the sender.
   */
  
  /* static tag bytes (protocol control messages) */
  static char tag_msg = CEPH_MSGR_TAG_MSG;
  static char tag_ack = CEPH_MSGR_TAG_ACK;
  static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
a6a5349d1   Sage Weil   ceph: use separat...
35
36
37
  #ifdef CONFIG_LOCKDEP
  static struct lock_class_key socket_class;
  #endif
31b8006e1   Sage Weil   ceph: messenger l...
38
39
40
41
  
  static void queue_con(struct ceph_connection *con);
  static void con_work(struct work_struct *);
  static void ceph_fault(struct ceph_connection *con);
31b8006e1   Sage Weil   ceph: messenger l...
42
43
44
45
  /*
   * nicely render a sockaddr as a string.
   */
  #define MAX_ADDR_STR 20
d06dbaf6c   Sage Weil   ceph: fix printin...
46
47
  #define MAX_ADDR_STR_LEN 60
  static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN];
31b8006e1   Sage Weil   ceph: messenger l...
48
49
  static DEFINE_SPINLOCK(addr_str_lock);
  static int last_addr_str;
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
50
  const char *ceph_pr_addr(const struct sockaddr_storage *ss)
31b8006e1   Sage Weil   ceph: messenger l...
51
52
53
54
  {
  	int i;
  	char *s;
  	struct sockaddr_in *in4 = (void *)ss;
31b8006e1   Sage Weil   ceph: messenger l...
55
56
57
58
59
60
61
62
63
64
65
  	struct sockaddr_in6 *in6 = (void *)ss;
  
  	spin_lock(&addr_str_lock);
  	i = last_addr_str++;
  	if (last_addr_str == MAX_ADDR_STR)
  		last_addr_str = 0;
  	spin_unlock(&addr_str_lock);
  	s = addr_str[i];
  
  	switch (ss->ss_family) {
  	case AF_INET:
d06dbaf6c   Sage Weil   ceph: fix printin...
66
67
  		snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr,
  			 (unsigned int)ntohs(in4->sin_port));
31b8006e1   Sage Weil   ceph: messenger l...
68
69
70
  		break;
  
  	case AF_INET6:
d06dbaf6c   Sage Weil   ceph: fix printin...
71
72
  		snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr,
  			 (unsigned int)ntohs(in6->sin6_port));
31b8006e1   Sage Weil   ceph: messenger l...
73
74
75
  		break;
  
  	default:
12a2f643b   Sage Weil   libceph: use snpr...
76
77
  		snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)",
  			 (int)ss->ss_family);
31b8006e1   Sage Weil   ceph: messenger l...
78
79
80
81
  	}
  
  	return s;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
82
  EXPORT_SYMBOL(ceph_pr_addr);
31b8006e1   Sage Weil   ceph: messenger l...
83

63f2d2119   Sage Weil   ceph: use fixed e...
84
85
86
87
88
  static void encode_my_addr(struct ceph_messenger *msgr)
  {
  	memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
  	ceph_encode_addr(&msgr->my_enc_addr);
  }
31b8006e1   Sage Weil   ceph: messenger l...
89
90
91
92
  /*
   * work queue for all reading and writing to/from the socket.
   */
  struct workqueue_struct *ceph_msgr_wq;
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
93
  int ceph_msgr_init(void)
31b8006e1   Sage Weil   ceph: messenger l...
94
  {
f363e45fd   Tejun Heo   net/ceph: make ce...
95
  	ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
d96c9043d   Sage Weil   ceph: fix msgr_in...
96
97
98
99
  	if (!ceph_msgr_wq) {
  		pr_err("msgr_init failed to create workqueue
  ");
  		return -ENOMEM;
31b8006e1   Sage Weil   ceph: messenger l...
100
101
102
  	}
  	return 0;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
103
  EXPORT_SYMBOL(ceph_msgr_init);
31b8006e1   Sage Weil   ceph: messenger l...
104
105
106
107
108
  
  void ceph_msgr_exit(void)
  {
  	destroy_workqueue(ceph_msgr_wq);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
109
  EXPORT_SYMBOL(ceph_msgr_exit);
31b8006e1   Sage Weil   ceph: messenger l...
110

cd84db6e4   Yehuda Sadeh   ceph: code cleanup
111
  void ceph_msgr_flush(void)
a922d38fd   Sage Weil   ceph: close out m...
112
113
114
  {
  	flush_workqueue(ceph_msgr_wq);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
115
  EXPORT_SYMBOL(ceph_msgr_flush);
a922d38fd   Sage Weil   ceph: close out m...
116

31b8006e1   Sage Weil   ceph: messenger l...
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
  /*
   * socket callback functions
   */
  
  /* data available on socket, or listen socket received a connect */
  static void ceph_data_ready(struct sock *sk, int count_unused)
  {
  	struct ceph_connection *con =
  		(struct ceph_connection *)sk->sk_user_data;
  	if (sk->sk_state != TCP_CLOSE_WAIT) {
  		dout("ceph_data_ready on %p state = %lu, queueing work
  ",
  		     con, con->state);
  		queue_con(con);
  	}
  }
  
  /* socket has buffer space for writing */
  static void ceph_write_space(struct sock *sk)
  {
  	struct ceph_connection *con =
  		(struct ceph_connection *)sk->sk_user_data;
  
  	/* only queue to workqueue if there is data we want to write. */
  	if (test_bit(WRITE_PENDING, &con->state)) {
  		dout("ceph_write_space %p queueing write work
  ", con);
  		queue_con(con);
  	} else {
  		dout("ceph_write_space %p nothing to write
  ", con);
  	}
  
  	/* since we have our own write_space, clear the SOCK_NOSPACE flag */
  	clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  }
  
  /* socket's state has changed */
  static void ceph_state_change(struct sock *sk)
  {
  	struct ceph_connection *con =
  		(struct ceph_connection *)sk->sk_user_data;
  
  	dout("ceph_state_change %p state = %lu sk_state = %u
  ",
  	     con, con->state, sk->sk_state);
  
  	if (test_bit(CLOSED, &con->state))
  		return;
  
  	switch (sk->sk_state) {
  	case TCP_CLOSE:
  		dout("ceph_state_change TCP_CLOSE
  ");
  	case TCP_CLOSE_WAIT:
  		dout("ceph_state_change TCP_CLOSE_WAIT
  ");
  		if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
  			if (test_bit(CONNECTING, &con->state))
  				con->error_msg = "connection failed";
  			else
  				con->error_msg = "socket closed";
  			queue_con(con);
  		}
  		break;
  	case TCP_ESTABLISHED:
  		dout("ceph_state_change TCP_ESTABLISHED
  ");
  		queue_con(con);
  		break;
  	}
  }
  
  /*
   * set up socket callbacks
   */
  static void set_sock_callbacks(struct socket *sock,
  			       struct ceph_connection *con)
  {
  	struct sock *sk = sock->sk;
  	sk->sk_user_data = (void *)con;
  	sk->sk_data_ready = ceph_data_ready;
  	sk->sk_write_space = ceph_write_space;
  	sk->sk_state_change = ceph_state_change;
  }
  
  
  /*
   * socket helpers
   */
  
  /*
   * initiate connection to a remote socket.
   */
  static struct socket *ceph_tcp_connect(struct ceph_connection *con)
  {
f91d3471c   Sage Weil   ceph: fix creatio...
213
  	struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
31b8006e1   Sage Weil   ceph: messenger l...
214
215
216
217
  	struct socket *sock;
  	int ret;
  
  	BUG_ON(con->sock);
f91d3471c   Sage Weil   ceph: fix creatio...
218
219
  	ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
  			       IPPROTO_TCP, &sock);
31b8006e1   Sage Weil   ceph: messenger l...
220
221
222
223
  	if (ret)
  		return ERR_PTR(ret);
  	con->sock = sock;
  	sock->sk->sk_allocation = GFP_NOFS;
a6a5349d1   Sage Weil   ceph: use separat...
224
225
226
  #ifdef CONFIG_LOCKDEP
  	lockdep_set_class(&sock->sk->sk_lock, &socket_class);
  #endif
31b8006e1   Sage Weil   ceph: messenger l...
227
  	set_sock_callbacks(sock, con);
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
228
229
  	dout("connect %s
  ", ceph_pr_addr(&con->peer_addr.in_addr));
31b8006e1   Sage Weil   ceph: messenger l...
230

f91d3471c   Sage Weil   ceph: fix creatio...
231
232
  	ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
  				 O_NONBLOCK);
31b8006e1   Sage Weil   ceph: messenger l...
233
234
235
  	if (ret == -EINPROGRESS) {
  		dout("connect %s EINPROGRESS sk_state = %u
  ",
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
236
  		     ceph_pr_addr(&con->peer_addr.in_addr),
31b8006e1   Sage Weil   ceph: messenger l...
237
238
239
240
241
242
  		     sock->sk->sk_state);
  		ret = 0;
  	}
  	if (ret < 0) {
  		pr_err("connect %s error %d
  ",
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
243
  		       ceph_pr_addr(&con->peer_addr.in_addr), ret);
31b8006e1   Sage Weil   ceph: messenger l...
244
245
246
247
248
249
250
251
252
253
254
255
256
257
  		sock_release(sock);
  		con->sock = NULL;
  		con->error_msg = "connect error";
  	}
  
  	if (ret < 0)
  		return ERR_PTR(ret);
  	return sock;
  }
  
  static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
  {
  	struct kvec iov = {buf, len};
  	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
98bdb0aa0   Sage Weil   libceph: fix sock...
258
  	int r;
31b8006e1   Sage Weil   ceph: messenger l...
259

98bdb0aa0   Sage Weil   libceph: fix sock...
260
261
262
263
  	r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
  	if (r == -EAGAIN)
  		r = 0;
  	return r;
31b8006e1   Sage Weil   ceph: messenger l...
264
265
266
267
268
269
270
271
272
273
  }
  
  /*
   * write something.  @more is true if caller will be sending more data
   * shortly.
   */
  static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
  		     size_t kvlen, size_t len, int more)
  {
  	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
42961d233   Sage Weil   libceph: fix sock...
274
  	int r;
31b8006e1   Sage Weil   ceph: messenger l...
275
276
277
278
279
  
  	if (more)
  		msg.msg_flags |= MSG_MORE;
  	else
  		msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
42961d233   Sage Weil   libceph: fix sock...
280
281
282
283
  	r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
  	if (r == -EAGAIN)
  		r = 0;
  	return r;
31b8006e1   Sage Weil   ceph: messenger l...
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
  }
  
  
  /*
   * Shutdown/close the socket for the given connection.
   */
  static int con_close_socket(struct ceph_connection *con)
  {
  	int rc;
  
  	dout("con_close_socket on %p sock %p
  ", con, con->sock);
  	if (!con->sock)
  		return 0;
  	set_bit(SOCK_CLOSED, &con->state);
  	rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
  	sock_release(con->sock);
  	con->sock = NULL;
  	clear_bit(SOCK_CLOSED, &con->state);
  	return rc;
  }
  
  /*
   * Reset a connection.  Discard all incoming and outgoing messages
   * and clear *_seq state.
   */
  static void ceph_msg_remove(struct ceph_msg *msg)
  {
  	list_del_init(&msg->list_head);
  	ceph_msg_put(msg);
  }
  static void ceph_msg_remove_list(struct list_head *head)
  {
  	while (!list_empty(head)) {
  		struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
  							list_head);
  		ceph_msg_remove(msg);
  	}
  }
  
  static void reset_connection(struct ceph_connection *con)
  {
  	/* reset connection, out_queue, msg_ and connect_seq */
  	/* discard existing out_queue and msg_seq */
31b8006e1   Sage Weil   ceph: messenger l...
328
329
  	ceph_msg_remove_list(&con->out_queue);
  	ceph_msg_remove_list(&con->out_sent);
cf3e5c409   Sage Weil   ceph: plug leak o...
330
331
332
333
  	if (con->in_msg) {
  		ceph_msg_put(con->in_msg);
  		con->in_msg = NULL;
  	}
31b8006e1   Sage Weil   ceph: messenger l...
334
335
  	con->connect_seq = 0;
  	con->out_seq = 0;
c86a2930c   Sage Weil   ceph: carry expli...
336
337
338
339
  	if (con->out_msg) {
  		ceph_msg_put(con->out_msg);
  		con->out_msg = NULL;
  	}
31b8006e1   Sage Weil   ceph: messenger l...
340
  	con->in_seq = 0;
0e0d5e0c4   Sage Weil   ceph: fix ack cou...
341
  	con->in_seq_acked = 0;
31b8006e1   Sage Weil   ceph: messenger l...
342
343
344
345
346
347
348
  }
  
  /*
   * mark a peer down.  drop any open connections.
   */
  void ceph_con_close(struct ceph_connection *con)
  {
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
349
350
351
  	dout("con_close %p peer %s
  ", con,
  	     ceph_pr_addr(&con->peer_addr.in_addr));
31b8006e1   Sage Weil   ceph: messenger l...
352
353
  	set_bit(CLOSED, &con->state);  /* in case there's queued work */
  	clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
1679f876a   Sage Weil   ceph: reset bits ...
354
355
356
  	clear_bit(LOSSYTX, &con->state);  /* so we retry next connect */
  	clear_bit(KEEPALIVE_PENDING, &con->state);
  	clear_bit(WRITE_PENDING, &con->state);
ec302645f   Sage Weil   ceph: use connect...
357
  	mutex_lock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
358
  	reset_connection(con);
6f2bc3ff4   Sage Weil   ceph: clean up co...
359
  	con->peer_global_seq = 0;
91e45ce38   Sage Weil   ceph: cancel dela...
360
  	cancel_delayed_work(&con->work);
ec302645f   Sage Weil   ceph: use connect...
361
  	mutex_unlock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
362
363
  	queue_con(con);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
364
  EXPORT_SYMBOL(ceph_con_close);
31b8006e1   Sage Weil   ceph: messenger l...
365
366
  
  /*
31b8006e1   Sage Weil   ceph: messenger l...
367
368
369
370
   * Reopen a closed connection, with a new peer address.
   */
  void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
  {
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
371
372
  	dout("con_open %p %s
  ", con, ceph_pr_addr(&addr->in_addr));
31b8006e1   Sage Weil   ceph: messenger l...
373
374
375
  	set_bit(OPENING, &con->state);
  	clear_bit(CLOSED, &con->state);
  	memcpy(&con->peer_addr, addr, sizeof(*addr));
03c677e1d   Sage Weil   ceph: reset msgr ...
376
  	con->delay = 0;      /* reset backoff memory */
31b8006e1   Sage Weil   ceph: messenger l...
377
378
  	queue_con(con);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
379
  EXPORT_SYMBOL(ceph_con_open);
31b8006e1   Sage Weil   ceph: messenger l...
380
381
  
  /*
87b315a5b   Sage Weil   ceph: avoid reope...
382
383
384
385
386
387
388
389
   * return true if this connection ever successfully opened
   */
  bool ceph_con_opened(struct ceph_connection *con)
  {
  	return con->connect_seq > 0;
  }
  
  /*
31b8006e1   Sage Weil   ceph: messenger l...
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
   * generic get/put
   */
  struct ceph_connection *ceph_con_get(struct ceph_connection *con)
  {
  	dout("con_get %p nref = %d -> %d
  ", con,
  	     atomic_read(&con->nref), atomic_read(&con->nref) + 1);
  	if (atomic_inc_not_zero(&con->nref))
  		return con;
  	return NULL;
  }
  
  void ceph_con_put(struct ceph_connection *con)
  {
  	dout("con_put %p nref = %d -> %d
  ", con,
  	     atomic_read(&con->nref), atomic_read(&con->nref) - 1);
  	BUG_ON(atomic_read(&con->nref) == 0);
  	if (atomic_dec_and_test(&con->nref)) {
71ececdac   Sage Weil   ceph: remove unne...
409
  		BUG_ON(con->sock);
31b8006e1   Sage Weil   ceph: messenger l...
410
411
412
413
414
415
416
417
418
419
420
421
422
423
  		kfree(con);
  	}
  }
  
  /*
   * initialize a new connection.
   */
  void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
  {
  	dout("con_init %p
  ", con);
  	memset(con, 0, sizeof(*con));
  	atomic_set(&con->nref, 1);
  	con->msgr = msgr;
ec302645f   Sage Weil   ceph: use connect...
424
  	mutex_init(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
425
426
427
428
  	INIT_LIST_HEAD(&con->out_queue);
  	INIT_LIST_HEAD(&con->out_sent);
  	INIT_DELAYED_WORK(&con->work, con_work);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
429
  EXPORT_SYMBOL(ceph_con_init);
31b8006e1   Sage Weil   ceph: messenger l...
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
  
  
  /*
   * We maintain a global counter to order connection attempts.  Get
   * a unique seq greater than @gt.
   */
  static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
  {
  	u32 ret;
  
  	spin_lock(&msgr->global_seq_lock);
  	if (msgr->global_seq < gt)
  		msgr->global_seq = gt;
  	ret = ++msgr->global_seq;
  	spin_unlock(&msgr->global_seq_lock);
  	return ret;
  }
  
  
  /*
   * Prepare footer for currently outgoing message, and finish things
   * off.  Assumes out_kvec* are already valid.. we just add on to the end.
   */
  static void prepare_write_message_footer(struct ceph_connection *con, int v)
  {
  	struct ceph_msg *m = con->out_msg;
  
  	dout("prepare_write_message_footer %p
  ", con);
  	con->out_kvec_is_msg = true;
  	con->out_kvec[v].iov_base = &m->footer;
  	con->out_kvec[v].iov_len = sizeof(m->footer);
  	con->out_kvec_bytes += sizeof(m->footer);
  	con->out_kvec_left++;
  	con->out_more = m->more_to_follow;
c86a2930c   Sage Weil   ceph: carry expli...
465
  	con->out_msg_done = true;
31b8006e1   Sage Weil   ceph: messenger l...
466
467
468
469
470
471
472
473
474
475
476
477
  }
  
  /*
   * Prepare headers for the next outgoing message.
   */
  static void prepare_write_message(struct ceph_connection *con)
  {
  	struct ceph_msg *m;
  	int v = 0;
  
  	con->out_kvec_bytes = 0;
  	con->out_kvec_is_msg = true;
c86a2930c   Sage Weil   ceph: carry expli...
478
  	con->out_msg_done = false;
31b8006e1   Sage Weil   ceph: messenger l...
479
480
481
482
483
484
485
486
487
488
489
490
  
  	/* Sneak an ack in there first?  If we can get it into the same
  	 * TCP packet that's a good thing. */
  	if (con->in_seq > con->in_seq_acked) {
  		con->in_seq_acked = con->in_seq;
  		con->out_kvec[v].iov_base = &tag_ack;
  		con->out_kvec[v++].iov_len = 1;
  		con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
  		con->out_kvec[v].iov_base = &con->out_temp_ack;
  		con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
  		con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
  	}
31b8006e1   Sage Weil   ceph: messenger l...
491
492
  	m = list_first_entry(&con->out_queue,
  		       struct ceph_msg, list_head);
c86a2930c   Sage Weil   ceph: carry expli...
493
  	con->out_msg = m;
4cf9d5446   Sage Weil   libceph: don't ti...
494
495
496
497
  
  	/* put message on sent list */
  	ceph_msg_get(m);
  	list_move_tail(&m->list_head, &con->out_sent);
31b8006e1   Sage Weil   ceph: messenger l...
498

e84346b72   Sage Weil   ceph: preserve se...
499
500
501
502
503
504
505
506
  	/*
  	 * only assign outgoing seq # if we haven't sent this message
  	 * yet.  if it is requeued, resend with it's original seq.
  	 */
  	if (m->needs_out_seq) {
  		m->hdr.seq = cpu_to_le64(++con->out_seq);
  		m->needs_out_seq = false;
  	}
31b8006e1   Sage Weil   ceph: messenger l...
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
  
  	dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs
  ",
  	     m, con->out_seq, le16_to_cpu(m->hdr.type),
  	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
  	     le32_to_cpu(m->hdr.data_len),
  	     m->nr_pages);
  	BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
  
  	/* tag + hdr + front + middle */
  	con->out_kvec[v].iov_base = &tag_msg;
  	con->out_kvec[v++].iov_len = 1;
  	con->out_kvec[v].iov_base = &m->hdr;
  	con->out_kvec[v++].iov_len = sizeof(m->hdr);
  	con->out_kvec[v++] = m->front;
  	if (m->middle)
  		con->out_kvec[v++] = m->middle->vec;
  	con->out_kvec_left = v;
  	con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
  		(m->middle ? m->middle->vec.iov_len : 0);
  	con->out_kvec_cur = con->out_kvec;
  
  	/* fill in crc (except data pages), footer */
  	con->out_msg->hdr.crc =
  		cpu_to_le32(crc32c(0, (void *)&m->hdr,
  				      sizeof(m->hdr) - sizeof(m->hdr.crc)));
  	con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
  	con->out_msg->footer.front_crc =
  		cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
  	if (m->middle)
  		con->out_msg->footer.middle_crc =
  			cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
  					   m->middle->vec.iov_len));
  	else
  		con->out_msg->footer.middle_crc = 0;
  	con->out_msg->footer.data_crc = 0;
  	dout("prepare_write_message front_crc %u data_crc %u
  ",
  	     le32_to_cpu(con->out_msg->footer.front_crc),
  	     le32_to_cpu(con->out_msg->footer.middle_crc));
  
  	/* is there a data payload? */
  	if (le32_to_cpu(m->hdr.data_len) > 0) {
  		/* initialize page iterator */
  		con->out_msg_pos.page = 0;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
552
  		if (m->pages)
c5c6b19d4   Sage Weil   ceph: explicitly ...
553
  			con->out_msg_pos.page_pos = m->page_alignment;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
554
555
  		else
  			con->out_msg_pos.page_pos = 0;
31b8006e1   Sage Weil   ceph: messenger l...
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
  		con->out_msg_pos.data_pos = 0;
  		con->out_msg_pos.did_page_crc = 0;
  		con->out_more = 1;  /* data + footer will follow */
  	} else {
  		/* no, queue up footer too and be done */
  		prepare_write_message_footer(con, v);
  	}
  
  	set_bit(WRITE_PENDING, &con->state);
  }
  
  /*
   * Prepare an ack.
   */
  static void prepare_write_ack(struct ceph_connection *con)
  {
  	dout("prepare_write_ack %p %llu -> %llu
  ", con,
  	     con->in_seq_acked, con->in_seq);
  	con->in_seq_acked = con->in_seq;
  
  	con->out_kvec[0].iov_base = &tag_ack;
  	con->out_kvec[0].iov_len = 1;
  	con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
  	con->out_kvec[1].iov_base = &con->out_temp_ack;
  	con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
  	con->out_kvec_left = 2;
  	con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
  	con->out_kvec_cur = con->out_kvec;
  	con->out_more = 1;  /* more will follow.. eventually.. */
  	set_bit(WRITE_PENDING, &con->state);
  }
  
  /*
   * Prepare to write keepalive byte.
   */
  static void prepare_write_keepalive(struct ceph_connection *con)
  {
  	dout("prepare_write_keepalive %p
  ", con);
  	con->out_kvec[0].iov_base = &tag_keepalive;
  	con->out_kvec[0].iov_len = 1;
  	con->out_kvec_left = 1;
  	con->out_kvec_bytes = 1;
  	con->out_kvec_cur = con->out_kvec;
  	set_bit(WRITE_PENDING, &con->state);
  }
  
  /*
   * Connection negotiation.
   */
0da5d7036   Sage Weil   libceph: handle c...
607
  static int prepare_connect_authorizer(struct ceph_connection *con)
4e7a5dcd1   Sage Weil   ceph: negotiate a...
608
609
610
611
  {
  	void *auth_buf;
  	int auth_len = 0;
  	int auth_protocol = 0;
ec302645f   Sage Weil   ceph: use connect...
612
  	mutex_unlock(&con->mutex);
4e7a5dcd1   Sage Weil   ceph: negotiate a...
613
614
615
616
617
  	if (con->ops->get_authorizer)
  		con->ops->get_authorizer(con, &auth_buf, &auth_len,
  					 &auth_protocol, &con->auth_reply_buf,
  					 &con->auth_reply_buf_len,
  					 con->auth_retry);
ec302645f   Sage Weil   ceph: use connect...
618
  	mutex_lock(&con->mutex);
4e7a5dcd1   Sage Weil   ceph: negotiate a...
619

0da5d7036   Sage Weil   libceph: handle c...
620
621
622
  	if (test_bit(CLOSED, &con->state) ||
  	    test_bit(OPENING, &con->state))
  		return -EAGAIN;
4e7a5dcd1   Sage Weil   ceph: negotiate a...
623
624
  	con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
  	con->out_connect.authorizer_len = cpu_to_le32(auth_len);
e8f54ce16   Sage Weil   libceph: fix unin...
625
626
627
628
629
630
  	if (auth_len) {
  		con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
  		con->out_kvec[con->out_kvec_left].iov_len = auth_len;
  		con->out_kvec_left++;
  		con->out_kvec_bytes += auth_len;
  	}
0da5d7036   Sage Weil   libceph: handle c...
631
  	return 0;
4e7a5dcd1   Sage Weil   ceph: negotiate a...
632
  }
31b8006e1   Sage Weil   ceph: messenger l...
633
634
635
  /*
   * We connected to a peer and are saying hello.
   */
eed0ef2ca   Sage Weil   ceph: separate ba...
636
637
  static void prepare_write_banner(struct ceph_messenger *msgr,
  				 struct ceph_connection *con)
31b8006e1   Sage Weil   ceph: messenger l...
638
639
  {
  	int len = strlen(CEPH_BANNER);
eed0ef2ca   Sage Weil   ceph: separate ba...
640
641
642
643
644
645
646
647
648
649
650
  
  	con->out_kvec[0].iov_base = CEPH_BANNER;
  	con->out_kvec[0].iov_len = len;
  	con->out_kvec[1].iov_base = &msgr->my_enc_addr;
  	con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
  	con->out_kvec_left = 2;
  	con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
  	con->out_kvec_cur = con->out_kvec;
  	con->out_more = 0;
  	set_bit(WRITE_PENDING, &con->state);
  }
0da5d7036   Sage Weil   libceph: handle c...
651
652
653
  static int prepare_write_connect(struct ceph_messenger *msgr,
  				 struct ceph_connection *con,
  				 int after_banner)
eed0ef2ca   Sage Weil   ceph: separate ba...
654
  {
31b8006e1   Sage Weil   ceph: messenger l...
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
  	unsigned global_seq = get_global_seq(con->msgr, 0);
  	int proto;
  
  	switch (con->peer_name.type) {
  	case CEPH_ENTITY_TYPE_MON:
  		proto = CEPH_MONC_PROTOCOL;
  		break;
  	case CEPH_ENTITY_TYPE_OSD:
  		proto = CEPH_OSDC_PROTOCOL;
  		break;
  	case CEPH_ENTITY_TYPE_MDS:
  		proto = CEPH_MDSC_PROTOCOL;
  		break;
  	default:
  		BUG();
  	}
  
  	dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d
  ", con,
  	     con->connect_seq, global_seq, proto);
4e7a5dcd1   Sage Weil   ceph: negotiate a...
675

3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
676
  	con->out_connect.features = cpu_to_le64(msgr->supported_features);
31b8006e1   Sage Weil   ceph: messenger l...
677
678
679
680
681
  	con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
  	con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
  	con->out_connect.global_seq = cpu_to_le32(global_seq);
  	con->out_connect.protocol_version = cpu_to_le32(proto);
  	con->out_connect.flags = 0;
31b8006e1   Sage Weil   ceph: messenger l...
682

eed0ef2ca   Sage Weil   ceph: separate ba...
683
684
685
686
687
688
689
690
  	if (!after_banner) {
  		con->out_kvec_left = 0;
  		con->out_kvec_bytes = 0;
  	}
  	con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
  	con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
  	con->out_kvec_left++;
  	con->out_kvec_bytes += sizeof(con->out_connect);
31b8006e1   Sage Weil   ceph: messenger l...
691
692
693
  	con->out_kvec_cur = con->out_kvec;
  	con->out_more = 0;
  	set_bit(WRITE_PENDING, &con->state);
4e7a5dcd1   Sage Weil   ceph: negotiate a...
694

0da5d7036   Sage Weil   libceph: handle c...
695
  	return prepare_connect_authorizer(con);
31b8006e1   Sage Weil   ceph: messenger l...
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
  }
  
  
  /*
   * write as much of pending kvecs to the socket as we can.
   *  1 -> done
   *  0 -> socket full, but more to do
   * <0 -> error
   */
  static int write_partial_kvec(struct ceph_connection *con)
  {
  	int ret;
  
  	dout("write_partial_kvec %p %d left
  ", con, con->out_kvec_bytes);
  	while (con->out_kvec_bytes > 0) {
  		ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
  				       con->out_kvec_left, con->out_kvec_bytes,
  				       con->out_more);
  		if (ret <= 0)
  			goto out;
  		con->out_kvec_bytes -= ret;
  		if (con->out_kvec_bytes == 0)
  			break;            /* done */
  		while (ret > 0) {
  			if (ret >= con->out_kvec_cur->iov_len) {
  				ret -= con->out_kvec_cur->iov_len;
  				con->out_kvec_cur++;
  				con->out_kvec_left--;
  			} else {
  				con->out_kvec_cur->iov_len -= ret;
  				con->out_kvec_cur->iov_base += ret;
  				ret = 0;
  				break;
  			}
  		}
  	}
  	con->out_kvec_left = 0;
  	con->out_kvec_is_msg = false;
  	ret = 1;
  out:
  	dout("write_partial_kvec %p %d left in %d kvecs ret = %d
  ", con,
  	     con->out_kvec_bytes, con->out_kvec_left, ret);
  	return ret;  /* done! */
  }
68b4476b0   Yehuda Sadeh   ceph: messenger a...
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
  #ifdef CONFIG_BLOCK
  static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
  {
  	if (!bio) {
  		*iter = NULL;
  		*seg = 0;
  		return;
  	}
  	*iter = bio;
  	*seg = bio->bi_idx;
  }
  
  static void iter_bio_next(struct bio **bio_iter, int *seg)
  {
  	if (*bio_iter == NULL)
  		return;
  
  	BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
  
  	(*seg)++;
  	if (*seg == (*bio_iter)->bi_vcnt)
  		init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
  }
  #endif
31b8006e1   Sage Weil   ceph: messenger l...
766
767
768
769
770
771
772
773
774
775
776
777
778
779
  /*
   * Write as much message data payload as we can.  If we finish, queue
   * up the footer.
   *  1 -> done, footer is now queued in out_kvec[].
   *  0 -> socket full, but more to do
   * <0 -> error
   */
  static int write_partial_msg_pages(struct ceph_connection *con)
  {
  	struct ceph_msg *msg = con->out_msg;
  	unsigned data_len = le32_to_cpu(msg->hdr.data_len);
  	size_t len;
  	int crc = con->msgr->nocrc;
  	int ret;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
780
781
782
  	int total_max_write;
  	int in_trail = 0;
  	size_t trail_len = (msg->trail ? msg->trail->length : 0);
31b8006e1   Sage Weil   ceph: messenger l...
783
784
785
786
787
  
  	dout("write_partial_msg_pages %p msg %p page %d/%d offset %d
  ",
  	     con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
  	     con->out_msg_pos.page_pos);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
788
789
790
791
792
793
  #ifdef CONFIG_BLOCK
  	if (msg->bio && !msg->bio_iter)
  		init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
  #endif
  
  	while (data_len > con->out_msg_pos.data_pos) {
31b8006e1   Sage Weil   ceph: messenger l...
794
795
  		struct page *page = NULL;
  		void *kaddr = NULL;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
796
797
798
799
800
  		int max_write = PAGE_SIZE;
  		int page_shift = 0;
  
  		total_max_write = data_len - trail_len -
  			con->out_msg_pos.data_pos;
31b8006e1   Sage Weil   ceph: messenger l...
801
802
803
804
805
806
  
  		/*
  		 * if we are calculating the data crc (the default), we need
  		 * to map the page.  if our pages[] has been revoked, use the
  		 * zero page.
  		 */
68b4476b0   Yehuda Sadeh   ceph: messenger a...
807
808
809
810
811
812
813
814
815
816
817
818
819
  
  		/* have we reached the trail part of the data? */
  		if (con->out_msg_pos.data_pos >= data_len - trail_len) {
  			in_trail = 1;
  
  			total_max_write = data_len - con->out_msg_pos.data_pos;
  
  			page = list_first_entry(&msg->trail->head,
  						struct page, lru);
  			if (crc)
  				kaddr = kmap(page);
  			max_write = PAGE_SIZE;
  		} else if (msg->pages) {
31b8006e1   Sage Weil   ceph: messenger l...
820
821
822
  			page = msg->pages[con->out_msg_pos.page];
  			if (crc)
  				kaddr = kmap(page);
58bb3b374   Sage Weil   ceph: support cep...
823
824
825
826
827
  		} else if (msg->pagelist) {
  			page = list_first_entry(&msg->pagelist->head,
  						struct page, lru);
  			if (crc)
  				kaddr = kmap(page);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
828
829
830
831
832
833
834
835
836
837
838
  #ifdef CONFIG_BLOCK
  		} else if (msg->bio) {
  			struct bio_vec *bv;
  
  			bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
  			page = bv->bv_page;
  			page_shift = bv->bv_offset;
  			if (crc)
  				kaddr = kmap(page) + page_shift;
  			max_write = bv->bv_len;
  #endif
31b8006e1   Sage Weil   ceph: messenger l...
839
840
841
842
843
  		} else {
  			page = con->msgr->zero_page;
  			if (crc)
  				kaddr = page_address(con->msgr->zero_page);
  		}
68b4476b0   Yehuda Sadeh   ceph: messenger a...
844
845
  		len = min_t(int, max_write - con->out_msg_pos.page_pos,
  			    total_max_write);
31b8006e1   Sage Weil   ceph: messenger l...
846
847
848
849
850
851
852
853
854
  		if (crc && !con->out_msg_pos.did_page_crc) {
  			void *base = kaddr + con->out_msg_pos.page_pos;
  			u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
  
  			BUG_ON(kaddr == NULL);
  			con->out_msg->footer.data_crc =
  				cpu_to_le32(crc32c(tmpcrc, base, len));
  			con->out_msg_pos.did_page_crc = 1;
  		}
31b8006e1   Sage Weil   ceph: messenger l...
855
  		ret = kernel_sendpage(con->sock, page,
68b4476b0   Yehuda Sadeh   ceph: messenger a...
856
857
  				      con->out_msg_pos.page_pos + page_shift,
  				      len,
31b8006e1   Sage Weil   ceph: messenger l...
858
859
  				      MSG_DONTWAIT | MSG_NOSIGNAL |
  				      MSG_MORE);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
860
861
  		if (crc &&
  		    (msg->pages || msg->pagelist || msg->bio || in_trail))
31b8006e1   Sage Weil   ceph: messenger l...
862
  			kunmap(page);
42961d233   Sage Weil   libceph: fix sock...
863
864
  		if (ret == -EAGAIN)
  			ret = 0;
31b8006e1   Sage Weil   ceph: messenger l...
865
866
867
868
869
870
871
872
873
  		if (ret <= 0)
  			goto out;
  
  		con->out_msg_pos.data_pos += ret;
  		con->out_msg_pos.page_pos += ret;
  		if (ret == len) {
  			con->out_msg_pos.page_pos = 0;
  			con->out_msg_pos.page++;
  			con->out_msg_pos.did_page_crc = 0;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
874
875
876
877
  			if (in_trail)
  				list_move_tail(&page->lru,
  					       &msg->trail->head);
  			else if (msg->pagelist)
58bb3b374   Sage Weil   ceph: support cep...
878
879
  				list_move_tail(&page->lru,
  					       &msg->pagelist->head);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
880
881
882
883
  #ifdef CONFIG_BLOCK
  			else if (msg->bio)
  				iter_bio_next(&msg->bio_iter, &msg->bio_seg);
  #endif
31b8006e1   Sage Weil   ceph: messenger l...
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
  		}
  	}
  
  	dout("write_partial_msg_pages %p msg %p done
  ", con, msg);
  
  	/* prepare and queue up footer, too */
  	if (!crc)
  		con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
  	con->out_kvec_bytes = 0;
  	con->out_kvec_left = 0;
  	con->out_kvec_cur = con->out_kvec;
  	prepare_write_message_footer(con, 0);
  	ret = 1;
  out:
  	return ret;
  }
  
  /*
   * write some zeros
   */
  static int write_partial_skip(struct ceph_connection *con)
  {
  	int ret;
  
  	while (con->out_skip > 0) {
  		struct kvec iov = {
  			.iov_base = page_address(con->msgr->zero_page),
  			.iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
  		};
  
  		ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
  		if (ret <= 0)
  			goto out;
  		con->out_skip -= ret;
  	}
  	ret = 1;
  out:
  	return ret;
  }
  
  /*
   * Prepare to read connection handshake, or an ack.
   */
eed0ef2ca   Sage Weil   ceph: separate ba...
928
929
930
931
932
933
  static void prepare_read_banner(struct ceph_connection *con)
  {
  	dout("prepare_read_banner %p
  ", con);
  	con->in_base_pos = 0;
  }
31b8006e1   Sage Weil   ceph: messenger l...
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
  static void prepare_read_connect(struct ceph_connection *con)
  {
  	dout("prepare_read_connect %p
  ", con);
  	con->in_base_pos = 0;
  }
  
  static void prepare_read_ack(struct ceph_connection *con)
  {
  	dout("prepare_read_ack %p
  ", con);
  	con->in_base_pos = 0;
  }
  
  static void prepare_read_tag(struct ceph_connection *con)
  {
  	dout("prepare_read_tag %p
  ", con);
  	con->in_base_pos = 0;
  	con->in_tag = CEPH_MSGR_TAG_READY;
  }
  
  /*
   * Prepare to read a message.
   */
  static int prepare_read_message(struct ceph_connection *con)
  {
  	dout("prepare_read_message %p
  ", con);
  	BUG_ON(con->in_msg != NULL);
  	con->in_base_pos = 0;
  	con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
  	return 0;
  }
  
  
  static int read_partial(struct ceph_connection *con,
  			int *to, int size, void *object)
  {
  	*to += size;
  	while (con->in_base_pos < *to) {
  		int left = *to - con->in_base_pos;
  		int have = size - left;
  		int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
  		if (ret <= 0)
  			return ret;
  		con->in_base_pos += ret;
  	}
  	return 1;
  }
  
  
  /*
   * Read all or part of the connect-side handshake on a new connection
   */
eed0ef2ca   Sage Weil   ceph: separate ba...
989
  static int read_partial_banner(struct ceph_connection *con)
31b8006e1   Sage Weil   ceph: messenger l...
990
991
  {
  	int ret, to = 0;
eed0ef2ca   Sage Weil   ceph: separate ba...
992
993
  	dout("read_partial_banner %p at %d
  ", con, con->in_base_pos);
31b8006e1   Sage Weil   ceph: messenger l...
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
  
  	/* peer's banner */
  	ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
  	if (ret <= 0)
  		goto out;
  	ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
  			   &con->actual_peer_addr);
  	if (ret <= 0)
  		goto out;
  	ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
  			   &con->peer_addr_for_me);
  	if (ret <= 0)
  		goto out;
eed0ef2ca   Sage Weil   ceph: separate ba...
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
  out:
  	return ret;
  }
  
  static int read_partial_connect(struct ceph_connection *con)
  {
  	int ret, to = 0;
  
  	dout("read_partial_connect %p at %d
  ", con, con->in_base_pos);
31b8006e1   Sage Weil   ceph: messenger l...
1017
1018
1019
  	ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
  	if (ret <= 0)
  		goto out;
4e7a5dcd1   Sage Weil   ceph: negotiate a...
1020
1021
1022
1023
  	ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
  			   con->auth_reply_buf);
  	if (ret <= 0)
  		goto out;
31b8006e1   Sage Weil   ceph: messenger l...
1024

4e7a5dcd1   Sage Weil   ceph: negotiate a...
1025
1026
1027
1028
  	dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u
  ",
  	     con, (int)con->in_reply.tag,
  	     le32_to_cpu(con->in_reply.connect_seq),
31b8006e1   Sage Weil   ceph: messenger l...
1029
1030
1031
  	     le32_to_cpu(con->in_reply.global_seq));
  out:
  	return ret;
eed0ef2ca   Sage Weil   ceph: separate ba...
1032

31b8006e1   Sage Weil   ceph: messenger l...
1033
1034
1035
1036
1037
1038
1039
1040
  }
  
  /*
   * Verify the hello banner looks okay.
   */
  static int verify_hello(struct ceph_connection *con)
  {
  	if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
13e38c8ae   Sage Weil   ceph: update to m...
1041
1042
  		pr_err("connect to %s got bad banner
  ",
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1043
  		       ceph_pr_addr(&con->peer_addr.in_addr));
31b8006e1   Sage Weil   ceph: messenger l...
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
  		con->error_msg = "protocol error, bad banner";
  		return -1;
  	}
  	return 0;
  }
  
  static bool addr_is_blank(struct sockaddr_storage *ss)
  {
  	switch (ss->ss_family) {
  	case AF_INET:
  		return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
  	case AF_INET6:
  		return
  		     ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
  		     ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
  		     ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
  		     ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
  	}
  	return false;
  }
  
  static int addr_port(struct sockaddr_storage *ss)
  {
  	switch (ss->ss_family) {
  	case AF_INET:
f28bcfbe6   Sage Weil   ceph: convert por...
1069
  		return ntohs(((struct sockaddr_in *)ss)->sin_port);
31b8006e1   Sage Weil   ceph: messenger l...
1070
  	case AF_INET6:
f28bcfbe6   Sage Weil   ceph: convert por...
1071
  		return ntohs(((struct sockaddr_in6 *)ss)->sin6_port);
31b8006e1   Sage Weil   ceph: messenger l...
1072
1073
1074
1075
1076
1077
1078
1079
1080
  	}
  	return 0;
  }
  
  static void addr_set_port(struct sockaddr_storage *ss, int p)
  {
  	switch (ss->ss_family) {
  	case AF_INET:
  		((struct sockaddr_in *)ss)->sin_port = htons(p);
a2a79609c   Sage Weil   libceph: add miss...
1081
  		break;
31b8006e1   Sage Weil   ceph: messenger l...
1082
1083
  	case AF_INET6:
  		((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
a2a79609c   Sage Weil   libceph: add miss...
1084
  		break;
31b8006e1   Sage Weil   ceph: messenger l...
1085
1086
1087
1088
  	}
  }
  
  /*
ee3b56f26   Noah Watkins   ceph: use kernel ...
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
   * Unlike other *_pton function semantics, zero indicates success.
   */
  static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
  		char delim, const char **ipend)
  {
  	struct sockaddr_in *in4 = (void *)ss;
  	struct sockaddr_in6 *in6 = (void *)ss;
  
  	memset(ss, 0, sizeof(*ss));
  
  	if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
  		ss->ss_family = AF_INET;
  		return 0;
  	}
  
  	if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
  		ss->ss_family = AF_INET6;
  		return 0;
  	}
  
  	return -EINVAL;
  }
  
  /*
   * Extract hostname string and resolve using kernel DNS facility.
   */
  #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
  static int ceph_dns_resolve_name(const char *name, size_t namelen,
  		struct sockaddr_storage *ss, char delim, const char **ipend)
  {
  	const char *end, *delim_p;
  	char *colon_p, *ip_addr = NULL;
  	int ip_len, ret;
  
  	/*
  	 * The end of the hostname occurs immediately preceding the delimiter or
  	 * the port marker (':') where the delimiter takes precedence.
  	 */
  	delim_p = memchr(name, delim, namelen);
  	colon_p = memchr(name, ':', namelen);
  
  	if (delim_p && colon_p)
  		end = delim_p < colon_p ? delim_p : colon_p;
  	else if (!delim_p && colon_p)
  		end = colon_p;
  	else {
  		end = delim_p;
  		if (!end) /* case: hostname:/ */
  			end = name + namelen;
  	}
  
  	if (end <= name)
  		return -EINVAL;
  
  	/* do dns_resolve upcall */
  	ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
  	if (ip_len > 0)
  		ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
  	else
  		ret = -ESRCH;
  
  	kfree(ip_addr);
  
  	*ipend = end;
  
  	pr_info("resolve '%.*s' (ret=%d): %s
  ", (int)(end - name), name,
  			ret, ret ? "failed" : ceph_pr_addr(ss));
  
  	return ret;
  }
  #else
  static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
  		struct sockaddr_storage *ss, char delim, const char **ipend)
  {
  	return -EINVAL;
  }
  #endif
  
  /*
   * Parse a server name (IP or hostname). If a valid IP address is not found
   * then try to extract a hostname to resolve using userspace DNS upcall.
   */
  static int ceph_parse_server_name(const char *name, size_t namelen,
  			struct sockaddr_storage *ss, char delim, const char **ipend)
  {
  	int ret;
  
  	ret = ceph_pton(name, namelen, ss, delim, ipend);
  	if (ret)
  		ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
  
  	return ret;
  }
  
  /*
31b8006e1   Sage Weil   ceph: messenger l...
1185
1186
1187
1188
1189
1190
1191
   * Parse an ip[:port] list into an addr array.  Use the default
   * monitor port if a port isn't specified.
   */
  int ceph_parse_ips(const char *c, const char *end,
  		   struct ceph_entity_addr *addr,
  		   int max_count, int *count)
  {
ee3b56f26   Noah Watkins   ceph: use kernel ...
1192
  	int i, ret = -EINVAL;
31b8006e1   Sage Weil   ceph: messenger l...
1193
1194
1195
1196
1197
1198
1199
  	const char *p = c;
  
  	dout("parse_ips on '%.*s'
  ", (int)(end-c), c);
  	for (i = 0; i < max_count; i++) {
  		const char *ipend;
  		struct sockaddr_storage *ss = &addr[i].in_addr;
31b8006e1   Sage Weil   ceph: messenger l...
1200
  		int port;
39139f64e   Sage Weil   ceph: fix parsing...
1201
1202
1203
1204
1205
1206
  		char delim = ',';
  
  		if (*p == '[') {
  			delim = ']';
  			p++;
  		}
31b8006e1   Sage Weil   ceph: messenger l...
1207

ee3b56f26   Noah Watkins   ceph: use kernel ...
1208
1209
  		ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
  		if (ret)
31b8006e1   Sage Weil   ceph: messenger l...
1210
  			goto bad;
ee3b56f26   Noah Watkins   ceph: use kernel ...
1211
  		ret = -EINVAL;
31b8006e1   Sage Weil   ceph: messenger l...
1212
  		p = ipend;
39139f64e   Sage Weil   ceph: fix parsing...
1213
1214
1215
1216
1217
1218
1219
1220
  		if (delim == ']') {
  			if (*p != ']') {
  				dout("missing matching ']'
  ");
  				goto bad;
  			}
  			p++;
  		}
31b8006e1   Sage Weil   ceph: messenger l...
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
  		/* port? */
  		if (p < end && *p == ':') {
  			port = 0;
  			p++;
  			while (p < end && *p >= '0' && *p <= '9') {
  				port = (port * 10) + (*p - '0');
  				p++;
  			}
  			if (port > 65535 || port == 0)
  				goto bad;
  		} else {
  			port = CEPH_MON_PORT;
  		}
  
  		addr_set_port(ss, port);
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1236
1237
  		dout("parse_ips got %s
  ", ceph_pr_addr(ss));
31b8006e1   Sage Weil   ceph: messenger l...
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
  
  		if (p == end)
  			break;
  		if (*p != ',')
  			goto bad;
  		p++;
  	}
  
  	if (p != end)
  		goto bad;
  
  	if (count)
  		*count = i + 1;
  	return 0;
  
  bad:
39139f64e   Sage Weil   ceph: fix parsing...
1254
1255
  	pr_err("parse_ips bad ip '%.*s'
  ", (int)(end - c), c);
ee3b56f26   Noah Watkins   ceph: use kernel ...
1256
  	return ret;
31b8006e1   Sage Weil   ceph: messenger l...
1257
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1258
  EXPORT_SYMBOL(ceph_parse_ips);
31b8006e1   Sage Weil   ceph: messenger l...
1259

eed0ef2ca   Sage Weil   ceph: separate ba...
1260
  static int process_banner(struct ceph_connection *con)
31b8006e1   Sage Weil   ceph: messenger l...
1261
  {
eed0ef2ca   Sage Weil   ceph: separate ba...
1262
1263
  	dout("process_banner on %p
  ", con);
31b8006e1   Sage Weil   ceph: messenger l...
1264
1265
1266
  
  	if (verify_hello(con) < 0)
  		return -1;
63f2d2119   Sage Weil   ceph: use fixed e...
1267
1268
  	ceph_decode_addr(&con->actual_peer_addr);
  	ceph_decode_addr(&con->peer_addr_for_me);
31b8006e1   Sage Weil   ceph: messenger l...
1269
1270
1271
1272
1273
  	/*
  	 * Make sure the other end is who we wanted.  note that the other
  	 * end may not yet know their ip address, so if it's 0.0.0.0, give
  	 * them the benefit of the doubt.
  	 */
103e2d3ae   Sage Weil   ceph: remove unus...
1274
1275
  	if (memcmp(&con->peer_addr, &con->actual_peer_addr,
  		   sizeof(con->peer_addr)) != 0 &&
31b8006e1   Sage Weil   ceph: messenger l...
1276
1277
  	    !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
  	      con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
cd84db6e4   Yehuda Sadeh   ceph: code cleanup
1278
1279
  		pr_warning("wrong peer, want %s/%d, got %s/%d
  ",
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1280
  			   ceph_pr_addr(&con->peer_addr.in_addr),
cd84db6e4   Yehuda Sadeh   ceph: code cleanup
1281
  			   (int)le32_to_cpu(con->peer_addr.nonce),
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1282
  			   ceph_pr_addr(&con->actual_peer_addr.in_addr),
cd84db6e4   Yehuda Sadeh   ceph: code cleanup
1283
  			   (int)le32_to_cpu(con->actual_peer_addr.nonce));
58bb3b374   Sage Weil   ceph: support cep...
1284
  		con->error_msg = "wrong peer at address";
31b8006e1   Sage Weil   ceph: messenger l...
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
  		return -1;
  	}
  
  	/*
  	 * did we learn our address?
  	 */
  	if (addr_is_blank(&con->msgr->inst.addr.in_addr)) {
  		int port = addr_port(&con->msgr->inst.addr.in_addr);
  
  		memcpy(&con->msgr->inst.addr.in_addr,
  		       &con->peer_addr_for_me.in_addr,
  		       sizeof(con->peer_addr_for_me.in_addr));
  		addr_set_port(&con->msgr->inst.addr.in_addr, port);
63f2d2119   Sage Weil   ceph: use fixed e...
1298
  		encode_my_addr(con->msgr);
eed0ef2ca   Sage Weil   ceph: separate ba...
1299
1300
  		dout("process_banner learned my addr is %s
  ",
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1301
  		     ceph_pr_addr(&con->msgr->inst.addr.in_addr));
31b8006e1   Sage Weil   ceph: messenger l...
1302
  	}
eed0ef2ca   Sage Weil   ceph: separate ba...
1303
1304
1305
1306
  	set_bit(NEGOTIATING, &con->state);
  	prepare_read_connect(con);
  	return 0;
  }
04a419f90   Sage Weil   ceph: add feature...
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
  static void fail_protocol(struct ceph_connection *con)
  {
  	reset_connection(con);
  	set_bit(CLOSED, &con->state);  /* in case there's queued work */
  
  	mutex_unlock(&con->mutex);
  	if (con->ops->bad_proto)
  		con->ops->bad_proto(con);
  	mutex_lock(&con->mutex);
  }
eed0ef2ca   Sage Weil   ceph: separate ba...
1317
1318
  static int process_connect(struct ceph_connection *con)
  {
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1319
1320
  	u64 sup_feat = con->msgr->supported_features;
  	u64 req_feat = con->msgr->required_features;
04a419f90   Sage Weil   ceph: add feature...
1321
  	u64 server_feat = le64_to_cpu(con->in_reply.features);
0da5d7036   Sage Weil   libceph: handle c...
1322
  	int ret;
04a419f90   Sage Weil   ceph: add feature...
1323

eed0ef2ca   Sage Weil   ceph: separate ba...
1324
1325
  	dout("process_connect on %p tag %d
  ", con, (int)con->in_tag);
31b8006e1   Sage Weil   ceph: messenger l...
1326
  	switch (con->in_reply.tag) {
04a419f90   Sage Weil   ceph: add feature...
1327
1328
1329
1330
1331
  	case CEPH_MSGR_TAG_FEATURES:
  		pr_err("%s%lld %s feature set mismatch,"
  		       " my %llx < server's %llx, missing %llx
  ",
  		       ENTITY_NAME(con->peer_name),
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1332
  		       ceph_pr_addr(&con->peer_addr.in_addr),
04a419f90   Sage Weil   ceph: add feature...
1333
1334
1335
1336
  		       sup_feat, server_feat, server_feat & ~sup_feat);
  		con->error_msg = "missing required protocol features";
  		fail_protocol(con);
  		return -1;
31b8006e1   Sage Weil   ceph: messenger l...
1337
  	case CEPH_MSGR_TAG_BADPROTOVER:
31b8006e1   Sage Weil   ceph: messenger l...
1338
1339
1340
1341
  		pr_err("%s%lld %s protocol version mismatch,"
  		       " my %d != server's %d
  ",
  		       ENTITY_NAME(con->peer_name),
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1342
  		       ceph_pr_addr(&con->peer_addr.in_addr),
31b8006e1   Sage Weil   ceph: messenger l...
1343
1344
1345
  		       le32_to_cpu(con->out_connect.protocol_version),
  		       le32_to_cpu(con->in_reply.protocol_version));
  		con->error_msg = "protocol version mismatch";
04a419f90   Sage Weil   ceph: add feature...
1346
  		fail_protocol(con);
31b8006e1   Sage Weil   ceph: messenger l...
1347
  		return -1;
4e7a5dcd1   Sage Weil   ceph: negotiate a...
1348
1349
1350
1351
1352
1353
1354
  	case CEPH_MSGR_TAG_BADAUTHORIZER:
  		con->auth_retry++;
  		dout("process_connect %p got BADAUTHORIZER attempt %d
  ", con,
  		     con->auth_retry);
  		if (con->auth_retry == 2) {
  			con->error_msg = "connect authorization failure";
4e7a5dcd1   Sage Weil   ceph: negotiate a...
1355
1356
1357
  			return -1;
  		}
  		con->auth_retry = 1;
0da5d7036   Sage Weil   libceph: handle c...
1358
1359
1360
  		ret = prepare_write_connect(con->msgr, con, 0);
  		if (ret < 0)
  			return ret;
63733a0fc   Sage Weil   ceph: fix authent...
1361
  		prepare_read_connect(con);
4e7a5dcd1   Sage Weil   ceph: negotiate a...
1362
  		break;
31b8006e1   Sage Weil   ceph: messenger l...
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
  
  	case CEPH_MSGR_TAG_RESETSESSION:
  		/*
  		 * If we connected with a large connect_seq but the peer
  		 * has no record of a session with us (no connection, or
  		 * connect_seq == 0), they will send RESETSESION to indicate
  		 * that they must have reset their session, and may have
  		 * dropped messages.
  		 */
  		dout("process_connect got RESET peer seq %u
  ",
  		     le32_to_cpu(con->in_connect.connect_seq));
  		pr_err("%s%lld %s connection reset
  ",
  		       ENTITY_NAME(con->peer_name),
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1378
  		       ceph_pr_addr(&con->peer_addr.in_addr));
31b8006e1   Sage Weil   ceph: messenger l...
1379
  		reset_connection(con);
eed0ef2ca   Sage Weil   ceph: separate ba...
1380
  		prepare_write_connect(con->msgr, con, 0);
31b8006e1   Sage Weil   ceph: messenger l...
1381
1382
1383
  		prepare_read_connect(con);
  
  		/* Tell ceph about it. */
ec302645f   Sage Weil   ceph: use connect...
1384
  		mutex_unlock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
1385
1386
1387
1388
  		pr_info("reset on %s%lld
  ", ENTITY_NAME(con->peer_name));
  		if (con->ops->peer_reset)
  			con->ops->peer_reset(con);
ec302645f   Sage Weil   ceph: use connect...
1389
  		mutex_lock(&con->mutex);
0da5d7036   Sage Weil   libceph: handle c...
1390
1391
1392
  		if (test_bit(CLOSED, &con->state) ||
  		    test_bit(OPENING, &con->state))
  			return -EAGAIN;
31b8006e1   Sage Weil   ceph: messenger l...
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
  		break;
  
  	case CEPH_MSGR_TAG_RETRY_SESSION:
  		/*
  		 * If we sent a smaller connect_seq than the peer has, try
  		 * again with a larger value.
  		 */
  		dout("process_connect got RETRY my seq = %u, peer_seq = %u
  ",
  		     le32_to_cpu(con->out_connect.connect_seq),
  		     le32_to_cpu(con->in_connect.connect_seq));
  		con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
eed0ef2ca   Sage Weil   ceph: separate ba...
1405
  		prepare_write_connect(con->msgr, con, 0);
31b8006e1   Sage Weil   ceph: messenger l...
1406
1407
1408
1409
1410
1411
1412
1413
  		prepare_read_connect(con);
  		break;
  
  	case CEPH_MSGR_TAG_RETRY_GLOBAL:
  		/*
  		 * If we sent a smaller global_seq than the peer has, try
  		 * again with a larger value.
  		 */
eed0ef2ca   Sage Weil   ceph: separate ba...
1414
1415
  		dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u
  ",
31b8006e1   Sage Weil   ceph: messenger l...
1416
1417
1418
1419
  		     con->peer_global_seq,
  		     le32_to_cpu(con->in_connect.global_seq));
  		get_global_seq(con->msgr,
  			       le32_to_cpu(con->in_connect.global_seq));
eed0ef2ca   Sage Weil   ceph: separate ba...
1420
  		prepare_write_connect(con->msgr, con, 0);
31b8006e1   Sage Weil   ceph: messenger l...
1421
1422
1423
1424
  		prepare_read_connect(con);
  		break;
  
  	case CEPH_MSGR_TAG_READY:
04a419f90   Sage Weil   ceph: add feature...
1425
1426
1427
1428
1429
  		if (req_feat & ~server_feat) {
  			pr_err("%s%lld %s protocol feature mismatch,"
  			       " my required %llx > server's %llx, need %llx
  ",
  			       ENTITY_NAME(con->peer_name),
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1430
  			       ceph_pr_addr(&con->peer_addr.in_addr),
04a419f90   Sage Weil   ceph: add feature...
1431
1432
1433
1434
1435
  			       req_feat, server_feat, req_feat & ~server_feat);
  			con->error_msg = "missing required protocol features";
  			fail_protocol(con);
  			return -1;
  		}
31b8006e1   Sage Weil   ceph: messenger l...
1436
  		clear_bit(CONNECTING, &con->state);
31b8006e1   Sage Weil   ceph: messenger l...
1437
1438
  		con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
  		con->connect_seq++;
aba558e28   Sage Weil   ceph: save peer f...
1439
  		con->peer_features = server_feat;
31b8006e1   Sage Weil   ceph: messenger l...
1440
1441
1442
1443
1444
1445
1446
  		dout("process_connect got READY gseq %d cseq %d (%d)
  ",
  		     con->peer_global_seq,
  		     le32_to_cpu(con->in_reply.connect_seq),
  		     con->connect_seq);
  		WARN_ON(con->connect_seq !=
  			le32_to_cpu(con->in_reply.connect_seq));
92ac41d0a   Sage Weil   ceph: detect loss...
1447
1448
1449
  
  		if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
  			set_bit(LOSSYTX, &con->state);
31b8006e1   Sage Weil   ceph: messenger l...
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
  		prepare_read_tag(con);
  		break;
  
  	case CEPH_MSGR_TAG_WAIT:
  		/*
  		 * If there is a connection race (we are opening
  		 * connections to each other), one of us may just have
  		 * to WAIT.  This shouldn't happen if we are the
  		 * client.
  		 */
041778822   Sage Weil   libceph: fix TAG_...
1460
1461
1462
1463
  		pr_err("process_connect got WAIT as client
  ");
  		con->error_msg = "protocol error, got WAIT as client";
  		return -1;
31b8006e1   Sage Weil   ceph: messenger l...
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
  
  	default:
  		pr_err("connect protocol error, will retry
  ");
  		con->error_msg = "protocol error, garbage tag during connect";
  		return -1;
  	}
  	return 0;
  }
  
  
  /*
   * read (part of) an ack
   */
  static int read_partial_ack(struct ceph_connection *con)
  {
  	int to = 0;
  
  	return read_partial(con, &to, sizeof(con->in_temp_ack),
  			    &con->in_temp_ack);
  }
  
  
  /*
   * We can finally discard anything that's been acked.
   */
  static void process_ack(struct ceph_connection *con)
  {
  	struct ceph_msg *m;
  	u64 ack = le64_to_cpu(con->in_temp_ack);
  	u64 seq;
31b8006e1   Sage Weil   ceph: messenger l...
1495
1496
1497
1498
1499
1500
1501
1502
1503
  	while (!list_empty(&con->out_sent)) {
  		m = list_first_entry(&con->out_sent, struct ceph_msg,
  				     list_head);
  		seq = le64_to_cpu(m->hdr.seq);
  		if (seq > ack)
  			break;
  		dout("got ack for seq %llu type %d at %p
  ", seq,
  		     le16_to_cpu(m->hdr.type), m);
4cf9d5446   Sage Weil   libceph: don't ti...
1504
  		m->ack_stamp = jiffies;
31b8006e1   Sage Weil   ceph: messenger l...
1505
1506
  		ceph_msg_remove(m);
  	}
31b8006e1   Sage Weil   ceph: messenger l...
1507
1508
  	prepare_read_tag(con);
  }
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1509
  static int read_partial_message_section(struct ceph_connection *con,
213c99ee0   Sage Weil   ceph: whitespace ...
1510
1511
  					struct kvec *section,
  					unsigned int sec_len, u32 *crc)
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1512
  {
68b4476b0   Yehuda Sadeh   ceph: messenger a...
1513
  	int ret, left;
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
  
  	BUG_ON(!section);
  
  	while (section->iov_len < sec_len) {
  		BUG_ON(section->iov_base == NULL);
  		left = sec_len - section->iov_len;
  		ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
  				       section->iov_len, left);
  		if (ret <= 0)
  			return ret;
  		section->iov_len += ret;
  		if (section->iov_len == sec_len)
  			*crc = crc32c(0, section->iov_base,
  				      section->iov_len);
  	}
31b8006e1   Sage Weil   ceph: messenger l...
1529

2450418c4   Yehuda Sadeh   ceph: allocate mi...
1530
1531
  	return 1;
  }
31b8006e1   Sage Weil   ceph: messenger l...
1532

2450418c4   Yehuda Sadeh   ceph: allocate mi...
1533
1534
1535
  static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
  				struct ceph_msg_header *hdr,
  				int *skip);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
  
  
  static int read_partial_message_pages(struct ceph_connection *con,
  				      struct page **pages,
  				      unsigned data_len, int datacrc)
  {
  	void *p;
  	int ret;
  	int left;
  
  	left = min((int)(data_len - con->in_msg_pos.data_pos),
  		   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
  	/* (page) data */
  	BUG_ON(pages == NULL);
  	p = kmap(pages[con->in_msg_pos.page]);
  	ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
  			       left);
  	if (ret > 0 && datacrc)
  		con->in_data_crc =
  			crc32c(con->in_data_crc,
  				  p + con->in_msg_pos.page_pos, ret);
  	kunmap(pages[con->in_msg_pos.page]);
  	if (ret <= 0)
  		return ret;
  	con->in_msg_pos.data_pos += ret;
  	con->in_msg_pos.page_pos += ret;
  	if (con->in_msg_pos.page_pos == PAGE_SIZE) {
  		con->in_msg_pos.page_pos = 0;
  		con->in_msg_pos.page++;
  	}
  
  	return ret;
  }
  
  #ifdef CONFIG_BLOCK
  static int read_partial_message_bio(struct ceph_connection *con,
  				    struct bio **bio_iter, int *bio_seg,
  				    unsigned data_len, int datacrc)
  {
  	struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
  	void *p;
  	int ret, left;
  
  	if (IS_ERR(bv))
  		return PTR_ERR(bv);
  
  	left = min((int)(data_len - con->in_msg_pos.data_pos),
  		   (int)(bv->bv_len - con->in_msg_pos.page_pos));
  
  	p = kmap(bv->bv_page) + bv->bv_offset;
  
  	ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
  			       left);
  	if (ret > 0 && datacrc)
  		con->in_data_crc =
  			crc32c(con->in_data_crc,
  				  p + con->in_msg_pos.page_pos, ret);
  	kunmap(bv->bv_page);
  	if (ret <= 0)
  		return ret;
  	con->in_msg_pos.data_pos += ret;
  	con->in_msg_pos.page_pos += ret;
  	if (con->in_msg_pos.page_pos == bv->bv_len) {
  		con->in_msg_pos.page_pos = 0;
  		iter_bio_next(bio_iter, bio_seg);
  	}
  
  	return ret;
  }
  #endif
31b8006e1   Sage Weil   ceph: messenger l...
1606
1607
1608
1609
1610
1611
  /*
   * read (part of) a message.
   */
  static int read_partial_message(struct ceph_connection *con)
  {
  	struct ceph_msg *m = con->in_msg;
31b8006e1   Sage Weil   ceph: messenger l...
1612
  	int ret;
9d7f0f139   Yehuda Sadeh   ceph: refactor me...
1613
  	int to, left;
c5c6b19d4   Sage Weil   ceph: explicitly ...
1614
  	unsigned front_len, middle_len, data_len;
31b8006e1   Sage Weil   ceph: messenger l...
1615
  	int datacrc = con->msgr->nocrc;
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1616
  	int skip;
ae18756b9   Sage Weil   ceph: discard inc...
1617
  	u64 seq;
31b8006e1   Sage Weil   ceph: messenger l...
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
  
  	dout("read_partial_message con %p msg %p
  ", con, m);
  
  	/* header */
  	while (con->in_base_pos < sizeof(con->in_hdr)) {
  		left = sizeof(con->in_hdr) - con->in_base_pos;
  		ret = ceph_tcp_recvmsg(con->sock,
  				       (char *)&con->in_hdr + con->in_base_pos,
  				       left);
  		if (ret <= 0)
  			return ret;
  		con->in_base_pos += ret;
  		if (con->in_base_pos == sizeof(con->in_hdr)) {
  			u32 crc = crc32c(0, (void *)&con->in_hdr,
  				 sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
  			if (crc != le32_to_cpu(con->in_hdr.crc)) {
  				pr_err("read_partial_message bad hdr "
  				       " crc %u != expected %u
  ",
  				       crc, con->in_hdr.crc);
  				return -EBADMSG;
  			}
  		}
  	}
31b8006e1   Sage Weil   ceph: messenger l...
1643
1644
1645
1646
1647
1648
1649
1650
1651
  	front_len = le32_to_cpu(con->in_hdr.front_len);
  	if (front_len > CEPH_MSG_MAX_FRONT_LEN)
  		return -EIO;
  	middle_len = le32_to_cpu(con->in_hdr.middle_len);
  	if (middle_len > CEPH_MSG_MAX_DATA_LEN)
  		return -EIO;
  	data_len = le32_to_cpu(con->in_hdr.data_len);
  	if (data_len > CEPH_MSG_MAX_DATA_LEN)
  		return -EIO;
ae18756b9   Sage Weil   ceph: discard inc...
1652
1653
1654
  	/* verify seq# */
  	seq = le64_to_cpu(con->in_hdr.seq);
  	if ((s64)seq - (s64)con->in_seq < 1) {
df9f86faf   Sage Weil   ceph: fix small s...
1655
1656
  		pr_info("skipping %s%lld %s seq %lld expected %lld
  ",
ae18756b9   Sage Weil   ceph: discard inc...
1657
  			ENTITY_NAME(con->peer_name),
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1658
  			ceph_pr_addr(&con->peer_addr.in_addr),
ae18756b9   Sage Weil   ceph: discard inc...
1659
1660
1661
1662
  			seq, con->in_seq + 1);
  		con->in_base_pos = -front_len - middle_len - data_len -
  			sizeof(m->footer);
  		con->in_tag = CEPH_MSGR_TAG_READY;
ae18756b9   Sage Weil   ceph: discard inc...
1663
1664
1665
1666
1667
1668
1669
1670
  		return 0;
  	} else if ((s64)seq - (s64)con->in_seq > 1) {
  		pr_err("read_partial_message bad seq %lld expected %lld
  ",
  		       seq, con->in_seq + 1);
  		con->error_msg = "bad message sequence # for incoming message";
  		return -EBADMSG;
  	}
31b8006e1   Sage Weil   ceph: messenger l...
1671
1672
1673
1674
1675
  	/* allocate message? */
  	if (!con->in_msg) {
  		dout("got hdr type %d front %d data %d
  ", con->in_hdr.type,
  		     con->in_hdr.front_len, con->in_hdr.data_len);
ae32be313   Sage Weil   ceph: fix message...
1676
  		skip = 0;
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1677
1678
  		con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
  		if (skip) {
31b8006e1   Sage Weil   ceph: messenger l...
1679
  			/* skip this message */
a79832f26   Sage Weil   ceph: make ceph_m...
1680
1681
  			dout("alloc_msg said skip message
  ");
ae32be313   Sage Weil   ceph: fix message...
1682
  			BUG_ON(con->in_msg);
31b8006e1   Sage Weil   ceph: messenger l...
1683
1684
1685
  			con->in_base_pos = -front_len - middle_len - data_len -
  				sizeof(m->footer);
  			con->in_tag = CEPH_MSGR_TAG_READY;
684be25c5   Sage Weil   ceph: fix seq cou...
1686
  			con->in_seq++;
31b8006e1   Sage Weil   ceph: messenger l...
1687
1688
  			return 0;
  		}
a79832f26   Sage Weil   ceph: make ceph_m...
1689
  		if (!con->in_msg) {
5b3a4db3e   Sage Weil   ceph: fix up unex...
1690
1691
  			con->error_msg =
  				"error allocating memory for incoming message";
a79832f26   Sage Weil   ceph: make ceph_m...
1692
  			return -ENOMEM;
31b8006e1   Sage Weil   ceph: messenger l...
1693
1694
1695
  		}
  		m = con->in_msg;
  		m->front.iov_len = 0;    /* haven't read it yet */
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1696
1697
  		if (m->middle)
  			m->middle->vec.iov_len = 0;
9d7f0f139   Yehuda Sadeh   ceph: refactor me...
1698
1699
  
  		con->in_msg_pos.page = 0;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
1700
  		if (m->pages)
c5c6b19d4   Sage Weil   ceph: explicitly ...
1701
  			con->in_msg_pos.page_pos = m->page_alignment;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
1702
1703
  		else
  			con->in_msg_pos.page_pos = 0;
9d7f0f139   Yehuda Sadeh   ceph: refactor me...
1704
  		con->in_msg_pos.data_pos = 0;
31b8006e1   Sage Weil   ceph: messenger l...
1705
1706
1707
  	}
  
  	/* front */
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1708
1709
1710
1711
  	ret = read_partial_message_section(con, &m->front, front_len,
  					   &con->in_front_crc);
  	if (ret <= 0)
  		return ret;
31b8006e1   Sage Weil   ceph: messenger l...
1712
1713
  
  	/* middle */
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1714
  	if (m->middle) {
213c99ee0   Sage Weil   ceph: whitespace ...
1715
1716
  		ret = read_partial_message_section(con, &m->middle->vec,
  						   middle_len,
2450418c4   Yehuda Sadeh   ceph: allocate mi...
1717
  						   &con->in_middle_crc);
31b8006e1   Sage Weil   ceph: messenger l...
1718
1719
  		if (ret <= 0)
  			return ret;
31b8006e1   Sage Weil   ceph: messenger l...
1720
  	}
68b4476b0   Yehuda Sadeh   ceph: messenger a...
1721
1722
1723
1724
  #ifdef CONFIG_BLOCK
  	if (m->bio && !m->bio_iter)
  		init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
  #endif
31b8006e1   Sage Weil   ceph: messenger l...
1725
1726
  
  	/* (page) data */
31b8006e1   Sage Weil   ceph: messenger l...
1727
  	while (con->in_msg_pos.data_pos < data_len) {
68b4476b0   Yehuda Sadeh   ceph: messenger a...
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
  		if (m->pages) {
  			ret = read_partial_message_pages(con, m->pages,
  						 data_len, datacrc);
  			if (ret <= 0)
  				return ret;
  #ifdef CONFIG_BLOCK
  		} else if (m->bio) {
  
  			ret = read_partial_message_bio(con,
  						 &m->bio_iter, &m->bio_seg,
  						 data_len, datacrc);
  			if (ret <= 0)
  				return ret;
  #endif
  		} else {
  			BUG_ON(1);
31b8006e1   Sage Weil   ceph: messenger l...
1744
1745
  		}
  	}
31b8006e1   Sage Weil   ceph: messenger l...
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
  	/* footer */
  	to = sizeof(m->hdr) + sizeof(m->footer);
  	while (con->in_base_pos < to) {
  		left = to - con->in_base_pos;
  		ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
  				       (con->in_base_pos - sizeof(m->hdr)),
  				       left);
  		if (ret <= 0)
  			return ret;
  		con->in_base_pos += ret;
  	}
  	dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)
  ",
  	     m, front_len, m->footer.front_crc, middle_len,
  	     m->footer.middle_crc, data_len, m->footer.data_crc);
  
  	/* crc ok? */
  	if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
  		pr_err("read_partial_message %p front crc %u != exp. %u
  ",
  		       m, con->in_front_crc, m->footer.front_crc);
  		return -EBADMSG;
  	}
  	if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
  		pr_err("read_partial_message %p middle crc %u != exp %u
  ",
  		       m, con->in_middle_crc, m->footer.middle_crc);
  		return -EBADMSG;
  	}
  	if (datacrc &&
  	    (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
  	    con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
  		pr_err("read_partial_message %p data crc %u != exp. %u
  ", m,
  		       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
  		return -EBADMSG;
  	}
  
  	return 1; /* done! */
  }
  
  /*
   * Process message.  This happens in the worker thread.  The callback should
   * be careful not to do anything that waits on other incoming messages or it
   * may deadlock.
   */
  static void process_message(struct ceph_connection *con)
  {
5e095e8b4   Sage Weil   ceph: plug msg le...
1794
  	struct ceph_msg *msg;
31b8006e1   Sage Weil   ceph: messenger l...
1795

5e095e8b4   Sage Weil   ceph: plug msg le...
1796
  	msg = con->in_msg;
31b8006e1   Sage Weil   ceph: messenger l...
1797
1798
1799
1800
  	con->in_msg = NULL;
  
  	/* if first message, set peer_name */
  	if (con->peer_name.type == 0)
dbad185d4   Sage Weil   ceph: drop src ad...
1801
  		con->peer_name = msg->hdr.src;
31b8006e1   Sage Weil   ceph: messenger l...
1802

31b8006e1   Sage Weil   ceph: messenger l...
1803
  	con->in_seq++;
ec302645f   Sage Weil   ceph: use connect...
1804
  	mutex_unlock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
1805
1806
1807
1808
  
  	dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====
  ",
  	     msg, le64_to_cpu(msg->hdr.seq),
dbad185d4   Sage Weil   ceph: drop src ad...
1809
  	     ENTITY_NAME(msg->hdr.src),
31b8006e1   Sage Weil   ceph: messenger l...
1810
1811
1812
1813
1814
1815
  	     le16_to_cpu(msg->hdr.type),
  	     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
  	     le32_to_cpu(msg->hdr.front_len),
  	     le32_to_cpu(msg->hdr.data_len),
  	     con->in_front_crc, con->in_middle_crc, con->in_data_crc);
  	con->ops->dispatch(con, msg);
ec302645f   Sage Weil   ceph: use connect...
1816
1817
  
  	mutex_lock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
  	prepare_read_tag(con);
  }
  
  
  /*
   * Write something to the socket.  Called in a worker thread when the
   * socket appears to be writeable and we have something ready to send.
   */
  static int try_write(struct ceph_connection *con)
  {
  	struct ceph_messenger *msgr = con->msgr;
  	int ret = 1;
  
  	dout("try_write start %p state %lu nref %d
  ", con, con->state,
  	     atomic_read(&con->nref));
31b8006e1   Sage Weil   ceph: messenger l...
1834
1835
1836
1837
1838
1839
  more:
  	dout("try_write out_kvec_bytes %d
  ", con->out_kvec_bytes);
  
  	/* open the socket first? */
  	if (con->sock == NULL) {
eed0ef2ca   Sage Weil   ceph: separate ba...
1840
1841
1842
  		prepare_write_banner(msgr, con);
  		prepare_write_connect(msgr, con, 1);
  		prepare_read_banner(con);
31b8006e1   Sage Weil   ceph: messenger l...
1843
  		set_bit(CONNECTING, &con->state);
eed0ef2ca   Sage Weil   ceph: separate ba...
1844
  		clear_bit(NEGOTIATING, &con->state);
31b8006e1   Sage Weil   ceph: messenger l...
1845

cf3e5c409   Sage Weil   ceph: plug leak o...
1846
  		BUG_ON(con->in_msg);
31b8006e1   Sage Weil   ceph: messenger l...
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
  		con->in_tag = CEPH_MSGR_TAG_READY;
  		dout("try_write initiating connect on %p new state %lu
  ",
  		     con, con->state);
  		con->sock = ceph_tcp_connect(con);
  		if (IS_ERR(con->sock)) {
  			con->sock = NULL;
  			con->error_msg = "connect error";
  			ret = -1;
  			goto out;
  		}
  	}
  
  more_kvec:
  	/* kvec data queued? */
  	if (con->out_skip) {
  		ret = write_partial_skip(con);
  		if (ret <= 0)
42961d233   Sage Weil   libceph: fix sock...
1865
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
1866
1867
1868
1869
  	}
  	if (con->out_kvec_left) {
  		ret = write_partial_kvec(con);
  		if (ret <= 0)
42961d233   Sage Weil   libceph: fix sock...
1870
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
1871
1872
1873
1874
  	}
  
  	/* msg pages? */
  	if (con->out_msg) {
c86a2930c   Sage Weil   ceph: carry expli...
1875
1876
1877
1878
1879
  		if (con->out_msg_done) {
  			ceph_msg_put(con->out_msg);
  			con->out_msg = NULL;   /* we're done with this one */
  			goto do_next;
  		}
31b8006e1   Sage Weil   ceph: messenger l...
1880
1881
1882
1883
  		ret = write_partial_msg_pages(con);
  		if (ret == 1)
  			goto more_kvec;  /* we need to send the footer, too! */
  		if (ret == 0)
42961d233   Sage Weil   libceph: fix sock...
1884
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
1885
1886
1887
1888
  		if (ret < 0) {
  			dout("try_write write_partial_msg_pages err %d
  ",
  			     ret);
42961d233   Sage Weil   libceph: fix sock...
1889
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
1890
1891
  		}
  	}
c86a2930c   Sage Weil   ceph: carry expli...
1892
  do_next:
31b8006e1   Sage Weil   ceph: messenger l...
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
  	if (!test_bit(CONNECTING, &con->state)) {
  		/* is anything else pending? */
  		if (!list_empty(&con->out_queue)) {
  			prepare_write_message(con);
  			goto more;
  		}
  		if (con->in_seq > con->in_seq_acked) {
  			prepare_write_ack(con);
  			goto more;
  		}
  		if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
  			prepare_write_keepalive(con);
  			goto more;
  		}
  	}
  
  	/* Nothing to do! */
  	clear_bit(WRITE_PENDING, &con->state);
  	dout("try_write nothing else to write.
  ");
31b8006e1   Sage Weil   ceph: messenger l...
1913
1914
  	ret = 0;
  out:
42961d233   Sage Weil   libceph: fix sock...
1915
1916
  	dout("try_write done on %p ret %d
  ", con, ret);
31b8006e1   Sage Weil   ceph: messenger l...
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
  	return ret;
  }
  
  
  
  /*
   * Read what we can from the socket.
   */
  static int try_read(struct ceph_connection *con)
  {
31b8006e1   Sage Weil   ceph: messenger l...
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
  	int ret = -1;
  
  	if (!con->sock)
  		return 0;
  
  	if (test_bit(STANDBY, &con->state))
  		return 0;
  
  	dout("try_read start on %p
  ", con);
ec302645f   Sage Weil   ceph: use connect...
1937

31b8006e1   Sage Weil   ceph: messenger l...
1938
1939
1940
1941
  more:
  	dout("try_read tag %d in_base_pos %d
  ", (int)con->in_tag,
  	     con->in_base_pos);
0da5d7036   Sage Weil   libceph: handle c...
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
  
  	/*
  	 * process_connect and process_message drop and re-take
  	 * con->mutex.  make sure we handle a racing close or reopen.
  	 */
  	if (test_bit(CLOSED, &con->state) ||
  	    test_bit(OPENING, &con->state)) {
  		ret = -EAGAIN;
  		goto out;
  	}
31b8006e1   Sage Weil   ceph: messenger l...
1952
  	if (test_bit(CONNECTING, &con->state)) {
eed0ef2ca   Sage Weil   ceph: separate ba...
1953
1954
1955
1956
1957
  		if (!test_bit(NEGOTIATING, &con->state)) {
  			dout("try_read connecting
  ");
  			ret = read_partial_banner(con);
  			if (ret <= 0)
eed0ef2ca   Sage Weil   ceph: separate ba...
1958
  				goto out;
98bdb0aa0   Sage Weil   libceph: fix sock...
1959
1960
1961
  			ret = process_banner(con);
  			if (ret < 0)
  				goto out;
eed0ef2ca   Sage Weil   ceph: separate ba...
1962
  		}
31b8006e1   Sage Weil   ceph: messenger l...
1963
1964
  		ret = read_partial_connect(con);
  		if (ret <= 0)
31b8006e1   Sage Weil   ceph: messenger l...
1965
  			goto out;
98bdb0aa0   Sage Weil   libceph: fix sock...
1966
1967
1968
  		ret = process_connect(con);
  		if (ret < 0)
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
  		goto more;
  	}
  
  	if (con->in_base_pos < 0) {
  		/*
  		 * skipping + discarding content.
  		 *
  		 * FIXME: there must be a better way to do this!
  		 */
  		static char buf[1024];
  		int skip = min(1024, -con->in_base_pos);
  		dout("skipping %d / %d bytes
  ", skip, -con->in_base_pos);
  		ret = ceph_tcp_recvmsg(con->sock, buf, skip);
  		if (ret <= 0)
98bdb0aa0   Sage Weil   libceph: fix sock...
1984
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
  		con->in_base_pos += ret;
  		if (con->in_base_pos)
  			goto more;
  	}
  	if (con->in_tag == CEPH_MSGR_TAG_READY) {
  		/*
  		 * what's next?
  		 */
  		ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
  		if (ret <= 0)
98bdb0aa0   Sage Weil   libceph: fix sock...
1995
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
  		dout("try_read got tag %d
  ", (int)con->in_tag);
  		switch (con->in_tag) {
  		case CEPH_MSGR_TAG_MSG:
  			prepare_read_message(con);
  			break;
  		case CEPH_MSGR_TAG_ACK:
  			prepare_read_ack(con);
  			break;
  		case CEPH_MSGR_TAG_CLOSE:
  			set_bit(CLOSED, &con->state);   /* fixme */
98bdb0aa0   Sage Weil   libceph: fix sock...
2007
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
  		default:
  			goto bad_tag;
  		}
  	}
  	if (con->in_tag == CEPH_MSGR_TAG_MSG) {
  		ret = read_partial_message(con);
  		if (ret <= 0) {
  			switch (ret) {
  			case -EBADMSG:
  				con->error_msg = "bad crc";
  				ret = -EIO;
98bdb0aa0   Sage Weil   libceph: fix sock...
2019
  				break;
31b8006e1   Sage Weil   ceph: messenger l...
2020
2021
  			case -EIO:
  				con->error_msg = "io error";
98bdb0aa0   Sage Weil   libceph: fix sock...
2022
  				break;
31b8006e1   Sage Weil   ceph: messenger l...
2023
  			}
98bdb0aa0   Sage Weil   libceph: fix sock...
2024
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
2025
2026
2027
2028
2029
2030
2031
2032
2033
  		}
  		if (con->in_tag == CEPH_MSGR_TAG_READY)
  			goto more;
  		process_message(con);
  		goto more;
  	}
  	if (con->in_tag == CEPH_MSGR_TAG_ACK) {
  		ret = read_partial_ack(con);
  		if (ret <= 0)
98bdb0aa0   Sage Weil   libceph: fix sock...
2034
  			goto out;
31b8006e1   Sage Weil   ceph: messenger l...
2035
2036
2037
  		process_ack(con);
  		goto more;
  	}
31b8006e1   Sage Weil   ceph: messenger l...
2038
  out:
98bdb0aa0   Sage Weil   libceph: fix sock...
2039
2040
  	dout("try_read done on %p ret %d
  ", con, ret);
31b8006e1   Sage Weil   ceph: messenger l...
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
  	return ret;
  
  bad_tag:
  	pr_err("try_read bad con->in_tag = %d
  ", (int)con->in_tag);
  	con->error_msg = "protocol error, garbage tag";
  	ret = -1;
  	goto out;
  }
  
  
  /*
   * Atomically queue work on a connection.  Bump @con reference to
   * avoid races with connection teardown.
31b8006e1   Sage Weil   ceph: messenger l...
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
   */
  static void queue_con(struct ceph_connection *con)
  {
  	if (test_bit(DEAD, &con->state)) {
  		dout("queue_con %p ignoring: DEAD
  ",
  		     con);
  		return;
  	}
  
  	if (!con->ops->get(con)) {
  		dout("queue_con %p ref count 0
  ", con);
  		return;
  	}
f363e45fd   Tejun Heo   net/ceph: make ce...
2070
  	if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
31b8006e1   Sage Weil   ceph: messenger l...
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
  		dout("queue_con %p - already queued
  ", con);
  		con->ops->put(con);
  	} else {
  		dout("queue_con %p
  ", con);
  	}
  }
  
  /*
   * Do some work on a connection.  Drop a connection ref when we're done.
   */
  static void con_work(struct work_struct *work)
  {
  	struct ceph_connection *con = container_of(work, struct ceph_connection,
  						   work.work);
0da5d7036   Sage Weil   libceph: handle c...
2087
  	int ret;
31b8006e1   Sage Weil   ceph: messenger l...
2088

9dd4658db   Sage Weil   ceph: close messe...
2089
  	mutex_lock(&con->mutex);
0da5d7036   Sage Weil   libceph: handle c...
2090
  restart:
60bf8bf88   Sage Weil   libceph: fix msgr...
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
  	if (test_and_clear_bit(BACKOFF, &con->state)) {
  		dout("con_work %p backing off
  ", con);
  		if (queue_delayed_work(ceph_msgr_wq, &con->work,
  				       round_jiffies_relative(con->delay))) {
  			dout("con_work %p backoff %lu
  ", con, con->delay);
  			mutex_unlock(&con->mutex);
  			return;
  		} else {
  			con->ops->put(con);
  			dout("con_work %p FAILED to back off %lu
  ", con,
  			     con->delay);
  		}
  	}
9dd4658db   Sage Weil   ceph: close messe...
2107

e00de341f   Sage Weil   libceph: fix msgr...
2108
2109
2110
2111
2112
  	if (test_bit(STANDBY, &con->state)) {
  		dout("con_work %p STANDBY
  ", con);
  		goto done;
  	}
31b8006e1   Sage Weil   ceph: messenger l...
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
  	if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
  		dout("con_work CLOSED
  ");
  		con_close_socket(con);
  		goto done;
  	}
  	if (test_and_clear_bit(OPENING, &con->state)) {
  		/* reopen w/ new peer */
  		dout("con_work OPENING
  ");
  		con_close_socket(con);
  	}
0da5d7036   Sage Weil   libceph: handle c...
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
  	if (test_and_clear_bit(SOCK_CLOSED, &con->state))
  		goto fault;
  
  	ret = try_read(con);
  	if (ret == -EAGAIN)
  		goto restart;
  	if (ret < 0)
  		goto fault;
  
  	ret = try_write(con);
  	if (ret == -EAGAIN)
  		goto restart;
  	if (ret < 0)
  		goto fault;
31b8006e1   Sage Weil   ceph: messenger l...
2139
2140
  
  done:
9dd4658db   Sage Weil   ceph: close messe...
2141
  	mutex_unlock(&con->mutex);
9dd4658db   Sage Weil   ceph: close messe...
2142
  done_unlocked:
31b8006e1   Sage Weil   ceph: messenger l...
2143
  	con->ops->put(con);
0da5d7036   Sage Weil   libceph: handle c...
2144
2145
2146
2147
2148
2149
  	return;
  
  fault:
  	mutex_unlock(&con->mutex);
  	ceph_fault(con);     /* error/fault path */
  	goto done_unlocked;
31b8006e1   Sage Weil   ceph: messenger l...
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
  }
  
  
  /*
   * Generic error/fault handler.  A retry mechanism is used with
   * exponential backoff
   */
  static void ceph_fault(struct ceph_connection *con)
  {
  	pr_err("%s%lld %s %s
  ", ENTITY_NAME(con->peer_name),
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2161
  	       ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
31b8006e1   Sage Weil   ceph: messenger l...
2162
2163
  	dout("fault %p state %lu to peer %s
  ",
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2164
  	     con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
31b8006e1   Sage Weil   ceph: messenger l...
2165
2166
2167
2168
2169
2170
  
  	if (test_bit(LOSSYTX, &con->state)) {
  		dout("fault on LOSSYTX channel
  ");
  		goto out;
  	}
ec302645f   Sage Weil   ceph: use connect...
2171
  	mutex_lock(&con->mutex);
91e45ce38   Sage Weil   ceph: cancel dela...
2172
2173
  	if (test_bit(CLOSED, &con->state))
  		goto out_unlock;
ec302645f   Sage Weil   ceph: use connect...
2174

31b8006e1   Sage Weil   ceph: messenger l...
2175
  	con_close_socket(con);
5e095e8b4   Sage Weil   ceph: plug msg le...
2176
2177
2178
2179
2180
  
  	if (con->in_msg) {
  		ceph_msg_put(con->in_msg);
  		con->in_msg = NULL;
  	}
31b8006e1   Sage Weil   ceph: messenger l...
2181

e80a52d14   Sage Weil   ceph: fix connect...
2182
2183
  	/* Requeue anything that hasn't been acked */
  	list_splice_init(&con->out_sent, &con->out_queue);
9bd2e6f8b   Sage Weil   ceph: allow renew...
2184

e76661d0a   Sage Weil   libceph: fix msgr...
2185
2186
2187
2188
  	/* If there are no messages queued or keepalive pending, place
  	 * the connection in a STANDBY state */
  	if (list_empty(&con->out_queue) &&
  	    !test_bit(KEEPALIVE_PENDING, &con->state)) {
e00de341f   Sage Weil   libceph: fix msgr...
2189
2190
2191
  		dout("fault %p setting STANDBY clearing WRITE_PENDING
  ", con);
  		clear_bit(WRITE_PENDING, &con->state);
31b8006e1   Sage Weil   ceph: messenger l...
2192
  		set_bit(STANDBY, &con->state);
e80a52d14   Sage Weil   ceph: fix connect...
2193
2194
2195
2196
2197
2198
  	} else {
  		/* retry after a delay. */
  		if (con->delay == 0)
  			con->delay = BASE_DELAY_INTERVAL;
  		else if (con->delay < MAX_DELAY_INTERVAL)
  			con->delay *= 2;
e80a52d14   Sage Weil   ceph: fix connect...
2199
2200
  		con->ops->get(con);
  		if (queue_delayed_work(ceph_msgr_wq, &con->work,
60bf8bf88   Sage Weil   libceph: fix msgr...
2201
2202
2203
2204
  				       round_jiffies_relative(con->delay))) {
  			dout("fault queued %p delay %lu
  ", con, con->delay);
  		} else {
e80a52d14   Sage Weil   ceph: fix connect...
2205
  			con->ops->put(con);
60bf8bf88   Sage Weil   libceph: fix msgr...
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
  			dout("fault failed to queue %p delay %lu, backoff
  ",
  			     con, con->delay);
  			/*
  			 * In many cases we see a socket state change
  			 * while con_work is running and end up
  			 * queuing (non-delayed) work, such that we
  			 * can't backoff with a delay.  Set a flag so
  			 * that when con_work restarts we schedule the
  			 * delay then.
  			 */
  			set_bit(BACKOFF, &con->state);
  		}
31b8006e1   Sage Weil   ceph: messenger l...
2219
  	}
91e45ce38   Sage Weil   ceph: cancel dela...
2220
2221
  out_unlock:
  	mutex_unlock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
2222
  out:
161fd65ac   Sage Weil   ceph: invalidate_...
2223
2224
2225
  	/*
  	 * in case we faulted due to authentication, invalidate our
  	 * current tickets so that we can get new ones.
213c99ee0   Sage Weil   ceph: whitespace ...
2226
  	 */
161fd65ac   Sage Weil   ceph: invalidate_...
2227
2228
2229
2230
2231
  	if (con->auth_retry && con->ops->invalidate_authorizer) {
  		dout("calling invalidate_authorizer()
  ");
  		con->ops->invalidate_authorizer(con);
  	}
31b8006e1   Sage Weil   ceph: messenger l...
2232
2233
2234
2235
2236
2237
2238
2239
2240
  	if (con->ops->fault)
  		con->ops->fault(con);
  }
  
  
  
  /*
   * create a new messenger instance
   */
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2241
2242
2243
  struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
  					     u32 supported_features,
  					     u32 required_features)
31b8006e1   Sage Weil   ceph: messenger l...
2244
2245
2246
2247
2248
2249
  {
  	struct ceph_messenger *msgr;
  
  	msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
  	if (msgr == NULL)
  		return ERR_PTR(-ENOMEM);
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2250
2251
  	msgr->supported_features = supported_features;
  	msgr->required_features = required_features;
31b8006e1   Sage Weil   ceph: messenger l...
2252
2253
2254
2255
  	spin_lock_init(&msgr->global_seq_lock);
  
  	/* the zero page is needed if a request is "canceled" while the message
  	 * is being written over the socket */
31459fe4b   Yehuda Sadeh   ceph: use __page_...
2256
  	msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
31b8006e1   Sage Weil   ceph: messenger l...
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
  	if (!msgr->zero_page) {
  		kfree(msgr);
  		return ERR_PTR(-ENOMEM);
  	}
  	kmap(msgr->zero_page);
  
  	if (myaddr)
  		msgr->inst.addr = *myaddr;
  
  	/* select a random nonce */
ac8839d7b   Sage Weil   ceph: include typ...
2267
  	msgr->inst.addr.type = 0;
103e2d3ae   Sage Weil   ceph: remove unus...
2268
  	get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
63f2d2119   Sage Weil   ceph: use fixed e...
2269
  	encode_my_addr(msgr);
31b8006e1   Sage Weil   ceph: messenger l...
2270
2271
2272
2273
2274
  
  	dout("messenger_create %p
  ", msgr);
  	return msgr;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2275
  EXPORT_SYMBOL(ceph_messenger_create);
31b8006e1   Sage Weil   ceph: messenger l...
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
  
  void ceph_messenger_destroy(struct ceph_messenger *msgr)
  {
  	dout("destroy %p
  ", msgr);
  	kunmap(msgr->zero_page);
  	__free_page(msgr->zero_page);
  	kfree(msgr);
  	dout("destroyed messenger %p
  ", msgr);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2287
  EXPORT_SYMBOL(ceph_messenger_destroy);
31b8006e1   Sage Weil   ceph: messenger l...
2288

e00de341f   Sage Weil   libceph: fix msgr...
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
  static void clear_standby(struct ceph_connection *con)
  {
  	/* come back from STANDBY? */
  	if (test_and_clear_bit(STANDBY, &con->state)) {
  		mutex_lock(&con->mutex);
  		dout("clear_standby %p and ++connect_seq
  ", con);
  		con->connect_seq++;
  		WARN_ON(test_bit(WRITE_PENDING, &con->state));
  		WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
  		mutex_unlock(&con->mutex);
  	}
  }
31b8006e1   Sage Weil   ceph: messenger l...
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
  /*
   * Queue up an outgoing message on the given connection.
   */
  void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
  {
  	if (test_bit(CLOSED, &con->state)) {
  		dout("con_send %p closed, dropping %p
  ", con, msg);
  		ceph_msg_put(msg);
  		return;
  	}
  
  	/* set src+dst */
dbad185d4   Sage Weil   ceph: drop src ad...
2315
  	msg->hdr.src = con->msgr->inst.name;
31b8006e1   Sage Weil   ceph: messenger l...
2316

3ca02ef96   Sage Weil   ceph: reset front...
2317
  	BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
e84346b72   Sage Weil   ceph: preserve se...
2318
  	msg->needs_out_seq = true;
31b8006e1   Sage Weil   ceph: messenger l...
2319
  	/* queue */
ec302645f   Sage Weil   ceph: use connect...
2320
  	mutex_lock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
2321
2322
2323
2324
2325
2326
2327
2328
2329
  	BUG_ON(!list_empty(&msg->list_head));
  	list_add_tail(&msg->list_head, &con->out_queue);
  	dout("----- %p to %s%lld %d=%s len %d+%d+%d -----
  ", msg,
  	     ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
  	     ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
  	     le32_to_cpu(msg->hdr.front_len),
  	     le32_to_cpu(msg->hdr.middle_len),
  	     le32_to_cpu(msg->hdr.data_len));
ec302645f   Sage Weil   ceph: use connect...
2330
  	mutex_unlock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
2331
2332
2333
  
  	/* if there wasn't anything waiting to send before, queue
  	 * new work */
e00de341f   Sage Weil   libceph: fix msgr...
2334
  	clear_standby(con);
31b8006e1   Sage Weil   ceph: messenger l...
2335
2336
2337
  	if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
  		queue_con(con);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2338
  EXPORT_SYMBOL(ceph_con_send);
31b8006e1   Sage Weil   ceph: messenger l...
2339
2340
2341
2342
2343
2344
  
  /*
   * Revoke a message that was previously queued for send
   */
  void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
  {
ec302645f   Sage Weil   ceph: use connect...
2345
  	mutex_lock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
2346
  	if (!list_empty(&msg->list_head)) {
ed98adad3   Sage Weil   ceph: fix message...
2347
2348
  		dout("con_revoke %p msg %p - was on queue
  ", con, msg);
31b8006e1   Sage Weil   ceph: messenger l...
2349
2350
2351
  		list_del_init(&msg->list_head);
  		ceph_msg_put(msg);
  		msg->hdr.seq = 0;
ed98adad3   Sage Weil   ceph: fix message...
2352
2353
2354
2355
2356
  	}
  	if (con->out_msg == msg) {
  		dout("con_revoke %p msg %p - was sending
  ", con, msg);
  		con->out_msg = NULL;
31b8006e1   Sage Weil   ceph: messenger l...
2357
2358
2359
2360
  		if (con->out_kvec_is_msg) {
  			con->out_skip = con->out_kvec_bytes;
  			con->out_kvec_is_msg = false;
  		}
ed98adad3   Sage Weil   ceph: fix message...
2361
2362
  		ceph_msg_put(msg);
  		msg->hdr.seq = 0;
31b8006e1   Sage Weil   ceph: messenger l...
2363
  	}
ec302645f   Sage Weil   ceph: use connect...
2364
  	mutex_unlock(&con->mutex);
31b8006e1   Sage Weil   ceph: messenger l...
2365
2366
2367
  }
  
  /*
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
2368
   * Revoke a message that we may be reading data into
350b1c32e   Sage Weil   ceph: control acc...
2369
   */
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
2370
  void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
350b1c32e   Sage Weil   ceph: control acc...
2371
2372
  {
  	mutex_lock(&con->mutex);
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
2373
2374
2375
  	if (con->in_msg && con->in_msg == msg) {
  		unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
  		unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
350b1c32e   Sage Weil   ceph: control acc...
2376
2377
2378
  		unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
  
  		/* skip rest of message */
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
2379
2380
  		dout("con_revoke_pages %p msg %p revoked
  ", con, msg);
350b1c32e   Sage Weil   ceph: control acc...
2381
2382
  			con->in_base_pos = con->in_base_pos -
  				sizeof(struct ceph_msg_header) -
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
2383
2384
2385
  				front_len -
  				middle_len -
  				data_len -
350b1c32e   Sage Weil   ceph: control acc...
2386
  				sizeof(struct ceph_msg_footer);
350b1c32e   Sage Weil   ceph: control acc...
2387
2388
2389
  		ceph_msg_put(con->in_msg);
  		con->in_msg = NULL;
  		con->in_tag = CEPH_MSGR_TAG_READY;
684be25c5   Sage Weil   ceph: fix seq cou...
2390
  		con->in_seq++;
350b1c32e   Sage Weil   ceph: control acc...
2391
2392
2393
  	} else {
  		dout("con_revoke_pages %p msg %p pages %p no-op
  ",
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
2394
  		     con, con->in_msg, msg);
350b1c32e   Sage Weil   ceph: control acc...
2395
2396
2397
2398
2399
  	}
  	mutex_unlock(&con->mutex);
  }
  
  /*
31b8006e1   Sage Weil   ceph: messenger l...
2400
2401
2402
2403
   * Queue a keepalive byte to ensure the tcp connection is alive.
   */
  void ceph_con_keepalive(struct ceph_connection *con)
  {
e00de341f   Sage Weil   libceph: fix msgr...
2404
2405
2406
  	dout("con_keepalive %p
  ", con);
  	clear_standby(con);
31b8006e1   Sage Weil   ceph: messenger l...
2407
2408
2409
2410
  	if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
  	    test_and_set_bit(WRITE_PENDING, &con->state) == 0)
  		queue_con(con);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2411
  EXPORT_SYMBOL(ceph_con_keepalive);
31b8006e1   Sage Weil   ceph: messenger l...
2412
2413
2414
2415
2416
2417
  
  
  /*
   * construct a new message with given type, size
   * the new msg has a ref count of 1.
   */
b61c27636   Sage Weil   libceph: don't co...
2418
2419
  struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
  			      bool can_fail)
31b8006e1   Sage Weil   ceph: messenger l...
2420
2421
  {
  	struct ceph_msg *m;
34d23762d   Yehuda Sadeh   ceph: all allocat...
2422
  	m = kmalloc(sizeof(*m), flags);
31b8006e1   Sage Weil   ceph: messenger l...
2423
2424
  	if (m == NULL)
  		goto out;
c2e552e76   Sage Weil   ceph: use kref fo...
2425
  	kref_init(&m->kref);
31b8006e1   Sage Weil   ceph: messenger l...
2426
  	INIT_LIST_HEAD(&m->list_head);
45c6ceb54   Sage Weil   ceph: zero unused...
2427
  	m->hdr.tid = 0;
31b8006e1   Sage Weil   ceph: messenger l...
2428
  	m->hdr.type = cpu_to_le16(type);
45c6ceb54   Sage Weil   ceph: zero unused...
2429
2430
  	m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
  	m->hdr.version = 0;
31b8006e1   Sage Weil   ceph: messenger l...
2431
2432
  	m->hdr.front_len = cpu_to_le32(front_len);
  	m->hdr.middle_len = 0;
bb257664f   Sage Weil   ceph: simplify ce...
2433
2434
  	m->hdr.data_len = 0;
  	m->hdr.data_off = 0;
45c6ceb54   Sage Weil   ceph: zero unused...
2435
  	m->hdr.reserved = 0;
31b8006e1   Sage Weil   ceph: messenger l...
2436
2437
2438
  	m->footer.front_crc = 0;
  	m->footer.middle_crc = 0;
  	m->footer.data_crc = 0;
45c6ceb54   Sage Weil   ceph: zero unused...
2439
  	m->footer.flags = 0;
31b8006e1   Sage Weil   ceph: messenger l...
2440
2441
2442
  	m->front_max = front_len;
  	m->front_is_vmalloc = false;
  	m->more_to_follow = false;
c0d5f9db1   Jim Schutt   libceph: initiali...
2443
  	m->ack_stamp = 0;
31b8006e1   Sage Weil   ceph: messenger l...
2444
  	m->pool = NULL;
ca20892db   Henry C Chang   libceph: fix ceph...
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
  	/* middle */
  	m->middle = NULL;
  
  	/* data */
  	m->nr_pages = 0;
  	m->page_alignment = 0;
  	m->pages = NULL;
  	m->pagelist = NULL;
  	m->bio = NULL;
  	m->bio_iter = NULL;
  	m->bio_seg = 0;
  	m->trail = NULL;
31b8006e1   Sage Weil   ceph: messenger l...
2457
2458
2459
  	/* front */
  	if (front_len) {
  		if (front_len > PAGE_CACHE_SIZE) {
34d23762d   Yehuda Sadeh   ceph: all allocat...
2460
  			m->front.iov_base = __vmalloc(front_len, flags,
31b8006e1   Sage Weil   ceph: messenger l...
2461
2462
2463
  						      PAGE_KERNEL);
  			m->front_is_vmalloc = true;
  		} else {
34d23762d   Yehuda Sadeh   ceph: all allocat...
2464
  			m->front.iov_base = kmalloc(front_len, flags);
31b8006e1   Sage Weil   ceph: messenger l...
2465
2466
  		}
  		if (m->front.iov_base == NULL) {
b61c27636   Sage Weil   libceph: don't co...
2467
2468
  			dout("ceph_msg_new can't allocate %d bytes
  ",
31b8006e1   Sage Weil   ceph: messenger l...
2469
2470
2471
2472
2473
2474
2475
  			     front_len);
  			goto out2;
  		}
  	} else {
  		m->front.iov_base = NULL;
  	}
  	m->front.iov_len = front_len;
bb257664f   Sage Weil   ceph: simplify ce...
2476
2477
  	dout("ceph_msg_new %p front %d
  ", m, front_len);
31b8006e1   Sage Weil   ceph: messenger l...
2478
2479
2480
2481
2482
  	return m;
  
  out2:
  	ceph_msg_put(m);
  out:
b61c27636   Sage Weil   libceph: don't co...
2483
2484
2485
2486
  	if (!can_fail) {
  		pr_err("msg_new can't create type %d front %d
  ", type,
  		       front_len);
f0ed1b7ce   Sage Weil   libceph: warn on ...
2487
  		WARN_ON(1);
b61c27636   Sage Weil   libceph: don't co...
2488
2489
2490
2491
2492
  	} else {
  		dout("msg_new can't create type %d front %d
  ", type,
  		     front_len);
  	}
a79832f26   Sage Weil   ceph: make ceph_m...
2493
  	return NULL;
31b8006e1   Sage Weil   ceph: messenger l...
2494
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2495
  EXPORT_SYMBOL(ceph_msg_new);
31b8006e1   Sage Weil   ceph: messenger l...
2496
2497
  
  /*
31b8006e1   Sage Weil   ceph: messenger l...
2498
2499
2500
2501
2502
2503
   * Allocate "middle" portion of a message, if it is needed and wasn't
   * allocated by alloc_msg.  This allows us to read a small fixed-size
   * per-type header in the front and then gracefully fail (i.e.,
   * propagate the error to the caller based on info in the front) when
   * the middle is too large.
   */
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2504
  static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
31b8006e1   Sage Weil   ceph: messenger l...
2505
2506
2507
2508
2509
2510
2511
2512
2513
  {
  	int type = le16_to_cpu(msg->hdr.type);
  	int middle_len = le32_to_cpu(msg->hdr.middle_len);
  
  	dout("alloc_middle %p type %d %s middle_len %d
  ", msg, type,
  	     ceph_msg_type_name(type), middle_len);
  	BUG_ON(!middle_len);
  	BUG_ON(msg->middle);
b6c1d5b81   Sage Weil   ceph: simplify ce...
2514
  	msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
31b8006e1   Sage Weil   ceph: messenger l...
2515
2516
2517
2518
  	if (!msg->middle)
  		return -ENOMEM;
  	return 0;
  }
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
  /*
   * Generic message allocator, for incoming messages.
   */
  static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
  				struct ceph_msg_header *hdr,
  				int *skip)
  {
  	int type = le16_to_cpu(hdr->type);
  	int front_len = le32_to_cpu(hdr->front_len);
  	int middle_len = le32_to_cpu(hdr->middle_len);
  	struct ceph_msg *msg = NULL;
  	int ret;
  
  	if (con->ops->alloc_msg) {
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2533
  		mutex_unlock(&con->mutex);
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2534
  		msg = con->ops->alloc_msg(con, hdr, skip);
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2535
  		mutex_lock(&con->mutex);
a79832f26   Sage Weil   ceph: make ceph_m...
2536
  		if (!msg || *skip)
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2537
2538
2539
2540
  			return NULL;
  	}
  	if (!msg) {
  		*skip = 0;
b61c27636   Sage Weil   libceph: don't co...
2541
  		msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2542
2543
2544
2545
  		if (!msg) {
  			pr_err("unable to allocate msg type %d len %d
  ",
  			       type, front_len);
a79832f26   Sage Weil   ceph: make ceph_m...
2546
  			return NULL;
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2547
  		}
c5c6b19d4   Sage Weil   ceph: explicitly ...
2548
  		msg->page_alignment = le16_to_cpu(hdr->data_off);
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2549
  	}
9d7f0f139   Yehuda Sadeh   ceph: refactor me...
2550
  	memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2551

bb257664f   Sage Weil   ceph: simplify ce...
2552
  	if (middle_len && !msg->middle) {
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2553
  		ret = ceph_alloc_middle(con, msg);
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2554
2555
  		if (ret < 0) {
  			ceph_msg_put(msg);
a79832f26   Sage Weil   ceph: make ceph_m...
2556
  			return NULL;
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2557
2558
  		}
  	}
9d7f0f139   Yehuda Sadeh   ceph: refactor me...
2559

2450418c4   Yehuda Sadeh   ceph: allocate mi...
2560
2561
  	return msg;
  }
31b8006e1   Sage Weil   ceph: messenger l...
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
  
  /*
   * Free a generically kmalloc'd message.
   */
  void ceph_msg_kfree(struct ceph_msg *m)
  {
  	dout("msg_kfree %p
  ", m);
  	if (m->front_is_vmalloc)
  		vfree(m->front.iov_base);
  	else
  		kfree(m->front.iov_base);
  	kfree(m);
  }
  
  /*
   * Drop a msg ref.  Destroy as needed.
   */
c2e552e76   Sage Weil   ceph: use kref fo...
2580
2581
2582
  void ceph_msg_last_put(struct kref *kref)
  {
  	struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
31b8006e1   Sage Weil   ceph: messenger l...
2583

c2e552e76   Sage Weil   ceph: use kref fo...
2584
2585
2586
2587
2588
2589
2590
2591
  	dout("ceph_msg_put last one on %p
  ", m);
  	WARN_ON(!list_empty(&m->list_head));
  
  	/* drop middle, data, if any */
  	if (m->middle) {
  		ceph_buffer_put(m->middle);
  		m->middle = NULL;
31b8006e1   Sage Weil   ceph: messenger l...
2592
  	}
c2e552e76   Sage Weil   ceph: use kref fo...
2593
2594
  	m->nr_pages = 0;
  	m->pages = NULL;
58bb3b374   Sage Weil   ceph: support cep...
2595
2596
2597
2598
2599
  	if (m->pagelist) {
  		ceph_pagelist_release(m->pagelist);
  		kfree(m->pagelist);
  		m->pagelist = NULL;
  	}
68b4476b0   Yehuda Sadeh   ceph: messenger a...
2600
  	m->trail = NULL;
c2e552e76   Sage Weil   ceph: use kref fo...
2601
2602
2603
2604
  	if (m->pool)
  		ceph_msgpool_put(m->pool, m);
  	else
  		ceph_msg_kfree(m);
31b8006e1   Sage Weil   ceph: messenger l...
2605
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2606
  EXPORT_SYMBOL(ceph_msg_last_put);
9ec7cab14   Sage Weil   ceph: hex dump co...
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
  
  void ceph_msg_dump(struct ceph_msg *msg)
  {
  	pr_debug("msg_dump %p (front_max %d nr_pages %d)
  ", msg,
  		 msg->front_max, msg->nr_pages);
  	print_hex_dump(KERN_DEBUG, "header: ",
  		       DUMP_PREFIX_OFFSET, 16, 1,
  		       &msg->hdr, sizeof(msg->hdr), true);
  	print_hex_dump(KERN_DEBUG, " front: ",
  		       DUMP_PREFIX_OFFSET, 16, 1,
  		       msg->front.iov_base, msg->front.iov_len, true);
  	if (msg->middle)
  		print_hex_dump(KERN_DEBUG, "middle: ",
  			       DUMP_PREFIX_OFFSET, 16, 1,
  			       msg->middle->vec.iov_base,
  			       msg->middle->vec.iov_len, true);
  	print_hex_dump(KERN_DEBUG, "footer: ",
  		       DUMP_PREFIX_OFFSET, 16, 1,
  		       &msg->footer, sizeof(msg->footer), true);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2628
  EXPORT_SYMBOL(ceph_msg_dump);