Blame view

net/ceph/osd_client.c 55.8 KB
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1
  #include <linux/ceph/ceph_debug.h>
f24e9980e   Sage Weil   ceph: OSD client
2

3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
3
  #include <linux/module.h>
f24e9980e   Sage Weil   ceph: OSD client
4
5
6
7
8
9
  #include <linux/err.h>
  #include <linux/highmem.h>
  #include <linux/mm.h>
  #include <linux/pagemap.h>
  #include <linux/slab.h>
  #include <linux/uaccess.h>
68b4476b0   Yehuda Sadeh   ceph: messenger a...
10
11
12
  #ifdef CONFIG_BLOCK
  #include <linux/bio.h>
  #endif
f24e9980e   Sage Weil   ceph: OSD client
13

3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
14
15
16
17
18
19
  #include <linux/ceph/libceph.h>
  #include <linux/ceph/osd_client.h>
  #include <linux/ceph/messenger.h>
  #include <linux/ceph/decode.h>
  #include <linux/ceph/auth.h>
  #include <linux/ceph/pagelist.h>
f24e9980e   Sage Weil   ceph: OSD client
20

c16e78692   Sage Weil   ceph: use single ...
21
22
  #define OSD_OP_FRONT_LEN	4096
  #define OSD_OPREPLY_FRONT_LEN	512
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
23

9e32789f6   Tobias Klauser   ceph: Storage cla...
24
  static const struct ceph_connection_operations osd_con_ops;
f24e9980e   Sage Weil   ceph: OSD client
25

6f6c70067   Sage Weil   libceph: fix osd ...
26
27
  static void send_queued(struct ceph_osd_client *osdc);
  static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
28
29
30
31
32
33
  static void __register_request(struct ceph_osd_client *osdc,
  			       struct ceph_osd_request *req);
  static void __unregister_linger_request(struct ceph_osd_client *osdc,
  					struct ceph_osd_request *req);
  static int __send_request(struct ceph_osd_client *osdc,
  			  struct ceph_osd_request *req);
f24e9980e   Sage Weil   ceph: OSD client
34

68b4476b0   Yehuda Sadeh   ceph: messenger a...
35
36
37
38
39
40
41
  static int op_needs_trail(int op)
  {
  	switch (op) {
  	case CEPH_OSD_OP_GETXATTR:
  	case CEPH_OSD_OP_SETXATTR:
  	case CEPH_OSD_OP_CMPXATTR:
  	case CEPH_OSD_OP_CALL:
a40c4f10e   Yehuda Sadeh   libceph: add ling...
42
  	case CEPH_OSD_OP_NOTIFY:
68b4476b0   Yehuda Sadeh   ceph: messenger a...
43
44
45
46
47
48
49
50
51
52
53
  		return 1;
  	default:
  		return 0;
  	}
  }
  
  static int op_has_extent(int op)
  {
  	return (op == CEPH_OSD_OP_READ ||
  		op == CEPH_OSD_OP_WRITE);
  }
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
54
55
56
  void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
  			struct ceph_file_layout *layout,
  			u64 snapid,
68b4476b0   Yehuda Sadeh   ceph: messenger a...
57
58
59
  			u64 off, u64 *plen, u64 *bno,
  			struct ceph_osd_request *req,
  			struct ceph_osd_req_op *op)
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
60
61
  {
  	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
62
  	u64 orig_len = *plen;
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
63
64
65
66
67
  	u64 objoff, objlen;    /* extent in object */
  
  	reqhead->snapid = cpu_to_le64(snapid);
  
  	/* object extent? */
68b4476b0   Yehuda Sadeh   ceph: messenger a...
68
  	ceph_calc_file_object_mapping(layout, off, plen, bno,
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
69
  				      &objoff, &objlen);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
70
  	if (*plen < orig_len)
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
71
72
  		dout(" skipping last %llu, final file extent %llu~%llu
  ",
68b4476b0   Yehuda Sadeh   ceph: messenger a...
73
  		     orig_len - *plen, off, *plen);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
74

68b4476b0   Yehuda Sadeh   ceph: messenger a...
75
76
77
78
79
  	if (op_has_extent(op->op)) {
  		op->extent.offset = objoff;
  		op->extent.length = objlen;
  	}
  	req->r_num_pages = calc_pages_for(off, *plen);
b7495fc2f   Sage Weil   ceph: make page a...
80
  	req->r_page_alignment = off & ~PAGE_MASK;
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
81
82
  	if (op->op == CEPH_OSD_OP_WRITE)
  		op->payload_len = *plen;
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
83
84
85
86
87
88
  
  	dout("calc_layout bno=%llx %llu~%llu (%d pages)
  ",
  	     *bno, objoff, objlen, req->r_num_pages);
  
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
89
  EXPORT_SYMBOL(ceph_calc_raw_layout);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
90

f24e9980e   Sage Weil   ceph: OSD client
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
  /*
   * Implement client access to distributed object storage cluster.
   *
   * All data objects are stored within a cluster/cloud of OSDs, or
   * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
   * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
   * remote daemons serving up and coordinating consistent and safe
   * access to storage.
   *
   * Cluster membership and the mapping of data objects onto storage devices
   * are described by the osd map.
   *
   * We keep track of pending OSD requests (read, write), resubmit
   * requests to different OSDs when the cluster topology/data layout
   * change, or retry the affected requests when the communications
   * channel with an OSD is reset.
   */
  
  /*
   * calculate the mapping of a file extent onto an object, and fill out the
   * request accordingly.  shorten extent as necessary if it crosses an
   * object boundary.
   *
   * fill osd op in request message.
   */
  static void calc_layout(struct ceph_osd_client *osdc,
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
117
118
  			struct ceph_vino vino,
  			struct ceph_file_layout *layout,
f24e9980e   Sage Weil   ceph: OSD client
119
  			u64 off, u64 *plen,
68b4476b0   Yehuda Sadeh   ceph: messenger a...
120
121
  			struct ceph_osd_request *req,
  			struct ceph_osd_req_op *op)
f24e9980e   Sage Weil   ceph: OSD client
122
  {
f24e9980e   Sage Weil   ceph: OSD client
123
  	u64 bno;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
124
125
  	ceph_calc_raw_layout(osdc, layout, vino.snap, off,
  			     plen, &bno, req, op);
f24e9980e   Sage Weil   ceph: OSD client
126

2dab036b8   Sage Weil   libceph: use snpr...
127
  	snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
f24e9980e   Sage Weil   ceph: OSD client
128
  	req->r_oid_len = strlen(req->r_oid);
f24e9980e   Sage Weil   ceph: OSD client
129
  }
f24e9980e   Sage Weil   ceph: OSD client
130
131
132
  /*
   * requests
   */
415e49a9c   Sage Weil   ceph: use kref fo...
133
  void ceph_osdc_release_request(struct kref *kref)
f24e9980e   Sage Weil   ceph: OSD client
134
  {
415e49a9c   Sage Weil   ceph: use kref fo...
135
136
137
138
139
140
141
142
  	struct ceph_osd_request *req = container_of(kref,
  						    struct ceph_osd_request,
  						    r_kref);
  
  	if (req->r_request)
  		ceph_msg_put(req->r_request);
  	if (req->r_reply)
  		ceph_msg_put(req->r_reply);
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
143
  	if (req->r_con_filling_msg) {
350b1c32e   Sage Weil   ceph: control acc...
144
145
  		dout("release_request revoking pages %p from con %p
  ",
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
146
147
148
149
  		     req->r_pages, req->r_con_filling_msg);
  		ceph_con_revoke_message(req->r_con_filling_msg,
  				      req->r_reply);
  		ceph_con_put(req->r_con_filling_msg);
350b1c32e   Sage Weil   ceph: control acc...
150
  	}
415e49a9c   Sage Weil   ceph: use kref fo...
151
152
153
  	if (req->r_own_pages)
  		ceph_release_page_vector(req->r_pages,
  					 req->r_num_pages);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
154
155
156
157
  #ifdef CONFIG_BLOCK
  	if (req->r_bio)
  		bio_put(req->r_bio);
  #endif
415e49a9c   Sage Weil   ceph: use kref fo...
158
  	ceph_put_snap_context(req->r_snapc);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
159
160
161
162
  	if (req->r_trail) {
  		ceph_pagelist_release(req->r_trail);
  		kfree(req->r_trail);
  	}
415e49a9c   Sage Weil   ceph: use kref fo...
163
164
165
166
  	if (req->r_mempool)
  		mempool_free(req, req->r_osdc->req_mempool);
  	else
  		kfree(req);
f24e9980e   Sage Weil   ceph: OSD client
167
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
168
  EXPORT_SYMBOL(ceph_osdc_release_request);
68b4476b0   Yehuda Sadeh   ceph: messenger a...
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
  
  static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
  {
  	int i = 0;
  
  	if (needs_trail)
  		*needs_trail = 0;
  	while (ops[i].op) {
  		if (needs_trail && op_needs_trail(ops[i].op))
  			*needs_trail = 1;
  		i++;
  	}
  
  	return i;
  }
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
184
185
  struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
  					       int flags,
f24e9980e   Sage Weil   ceph: OSD client
186
  					       struct ceph_snap_context *snapc,
68b4476b0   Yehuda Sadeh   ceph: messenger a...
187
  					       struct ceph_osd_req_op *ops,
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
188
189
  					       bool use_mempool,
  					       gfp_t gfp_flags,
68b4476b0   Yehuda Sadeh   ceph: messenger a...
190
191
  					       struct page **pages,
  					       struct bio *bio)
f24e9980e   Sage Weil   ceph: OSD client
192
193
194
  {
  	struct ceph_osd_request *req;
  	struct ceph_msg *msg;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
195
196
197
  	int needs_trail;
  	int num_op = get_num_ops(ops, &needs_trail);
  	size_t msg_size = sizeof(struct ceph_osd_request_head);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
198

68b4476b0   Yehuda Sadeh   ceph: messenger a...
199
  	msg_size += num_op*sizeof(struct ceph_osd_op);
f24e9980e   Sage Weil   ceph: OSD client
200
201
  
  	if (use_mempool) {
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
202
  		req = mempool_alloc(osdc->req_mempool, gfp_flags);
f24e9980e   Sage Weil   ceph: OSD client
203
204
  		memset(req, 0, sizeof(*req));
  	} else {
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
205
  		req = kzalloc(sizeof(*req), gfp_flags);
f24e9980e   Sage Weil   ceph: OSD client
206
207
  	}
  	if (req == NULL)
a79832f26   Sage Weil   ceph: make ceph_m...
208
  		return NULL;
f24e9980e   Sage Weil   ceph: OSD client
209

f24e9980e   Sage Weil   ceph: OSD client
210
211
  	req->r_osdc = osdc;
  	req->r_mempool = use_mempool;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
212

415e49a9c   Sage Weil   ceph: use kref fo...
213
  	kref_init(&req->r_kref);
f24e9980e   Sage Weil   ceph: OSD client
214
215
216
  	init_completion(&req->r_completion);
  	init_completion(&req->r_safe_completion);
  	INIT_LIST_HEAD(&req->r_unsafe_item);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
217
218
  	INIT_LIST_HEAD(&req->r_linger_item);
  	INIT_LIST_HEAD(&req->r_linger_osd);
935b639a0   Sage Weil   libceph: fix ling...
219
  	INIT_LIST_HEAD(&req->r_req_lru_item);
f24e9980e   Sage Weil   ceph: OSD client
220
221
222
  	req->r_flags = flags;
  
  	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
c16e78692   Sage Weil   ceph: use single ...
223
224
225
226
227
  	/* create reply message */
  	if (use_mempool)
  		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
  	else
  		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
228
  				   OSD_OPREPLY_FRONT_LEN, gfp_flags);
a79832f26   Sage Weil   ceph: make ceph_m...
229
  	if (!msg) {
c16e78692   Sage Weil   ceph: use single ...
230
  		ceph_osdc_put_request(req);
a79832f26   Sage Weil   ceph: make ceph_m...
231
  		return NULL;
c16e78692   Sage Weil   ceph: use single ...
232
233
  	}
  	req->r_reply = msg;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
234
235
236
237
238
239
240
241
242
  	/* allocate space for the trailing data */
  	if (needs_trail) {
  		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
  		if (!req->r_trail) {
  			ceph_osdc_put_request(req);
  			return NULL;
  		}
  		ceph_pagelist_init(req->r_trail);
  	}
c16e78692   Sage Weil   ceph: use single ...
243
  	/* create request message; allow space for oid */
f24e9980e   Sage Weil   ceph: OSD client
244
245
246
247
  	msg_size += 40;
  	if (snapc)
  		msg_size += sizeof(u64) * snapc->num_snaps;
  	if (use_mempool)
8f3bc053c   Sage Weil   ceph: warn on all...
248
  		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980e   Sage Weil   ceph: OSD client
249
  	else
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
250
  		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
a79832f26   Sage Weil   ceph: make ceph_m...
251
  	if (!msg) {
f24e9980e   Sage Weil   ceph: OSD client
252
  		ceph_osdc_put_request(req);
a79832f26   Sage Weil   ceph: make ceph_m...
253
  		return NULL;
f24e9980e   Sage Weil   ceph: OSD client
254
  	}
68b4476b0   Yehuda Sadeh   ceph: messenger a...
255

f24e9980e   Sage Weil   ceph: OSD client
256
257
  	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
  	memset(msg->front.iov_base, 0, msg->front.iov_len);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
258
259
260
  
  	req->r_request = msg;
  	req->r_pages = pages;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
261
262
263
264
265
266
  #ifdef CONFIG_BLOCK
  	if (bio) {
  		req->r_bio = bio;
  		bio_get(req->r_bio);
  	}
  #endif
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
267
268
269
  
  	return req;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
270
  EXPORT_SYMBOL(ceph_osdc_alloc_request);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
271

68b4476b0   Yehuda Sadeh   ceph: messenger a...
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
  static void osd_req_encode_op(struct ceph_osd_request *req,
  			      struct ceph_osd_op *dst,
  			      struct ceph_osd_req_op *src)
  {
  	dst->op = cpu_to_le16(src->op);
  
  	switch (dst->op) {
  	case CEPH_OSD_OP_READ:
  	case CEPH_OSD_OP_WRITE:
  		dst->extent.offset =
  			cpu_to_le64(src->extent.offset);
  		dst->extent.length =
  			cpu_to_le64(src->extent.length);
  		dst->extent.truncate_size =
  			cpu_to_le64(src->extent.truncate_size);
  		dst->extent.truncate_seq =
  			cpu_to_le32(src->extent.truncate_seq);
  		break;
  
  	case CEPH_OSD_OP_GETXATTR:
  	case CEPH_OSD_OP_SETXATTR:
  	case CEPH_OSD_OP_CMPXATTR:
  		BUG_ON(!req->r_trail);
  
  		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
  		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
  		dst->xattr.cmp_op = src->xattr.cmp_op;
  		dst->xattr.cmp_mode = src->xattr.cmp_mode;
  		ceph_pagelist_append(req->r_trail, src->xattr.name,
  				     src->xattr.name_len);
  		ceph_pagelist_append(req->r_trail, src->xattr.val,
  				     src->xattr.value_len);
  		break;
ae1533b62   Yehuda Sadeh   ceph-rbd: osdc su...
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
  	case CEPH_OSD_OP_CALL:
  		BUG_ON(!req->r_trail);
  
  		dst->cls.class_len = src->cls.class_len;
  		dst->cls.method_len = src->cls.method_len;
  		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
  
  		ceph_pagelist_append(req->r_trail, src->cls.class_name,
  				     src->cls.class_len);
  		ceph_pagelist_append(req->r_trail, src->cls.method_name,
  				     src->cls.method_len);
  		ceph_pagelist_append(req->r_trail, src->cls.indata,
  				     src->cls.indata_len);
  		break;
  	case CEPH_OSD_OP_ROLLBACK:
  		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
  		break;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
322
323
  	case CEPH_OSD_OP_STARTSYNC:
  		break;
a40c4f10e   Yehuda Sadeh   libceph: add ling...
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
  	case CEPH_OSD_OP_NOTIFY:
  		{
  			__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
  			__le32 timeout = cpu_to_le32(src->watch.timeout);
  
  			BUG_ON(!req->r_trail);
  
  			ceph_pagelist_append(req->r_trail,
  						&prot_ver, sizeof(prot_ver));
  			ceph_pagelist_append(req->r_trail,
  						&timeout, sizeof(timeout));
  		}
  	case CEPH_OSD_OP_NOTIFY_ACK:
  	case CEPH_OSD_OP_WATCH:
  		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
  		dst->watch.ver = cpu_to_le64(src->watch.ver);
  		dst->watch.flag = src->watch.flag;
  		break;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
342
343
344
345
346
347
348
349
  	default:
  		pr_err("unrecognized osd opcode %d
  ", dst->op);
  		WARN_ON(1);
  		break;
  	}
  	dst->payload_len = cpu_to_le32(src->payload_len);
  }
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
350
351
352
353
354
  /*
   * build new request AND message
   *
   */
  void ceph_osdc_build_request(struct ceph_osd_request *req,
68b4476b0   Yehuda Sadeh   ceph: messenger a...
355
356
357
358
359
360
  			     u64 off, u64 *plen,
  			     struct ceph_osd_req_op *src_ops,
  			     struct ceph_snap_context *snapc,
  			     struct timespec *mtime,
  			     const char *oid,
  			     int oid_len)
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
361
362
363
  {
  	struct ceph_msg *msg = req->r_request;
  	struct ceph_osd_request_head *head;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
364
  	struct ceph_osd_req_op *src_op;
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
365
366
  	struct ceph_osd_op *op;
  	void *p;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
367
  	int num_op = get_num_ops(src_ops, NULL);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
368
  	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
369
  	int flags = req->r_flags;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
370
371
  	u64 data_len = 0;
  	int i;
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
372

f24e9980e   Sage Weil   ceph: OSD client
373
374
375
  	head = msg->front.iov_base;
  	op = (void *)(head + 1);
  	p = (void *)(op + num_op);
f24e9980e   Sage Weil   ceph: OSD client
376
377
378
379
380
381
382
  	req->r_snapc = ceph_get_snap_context(snapc);
  
  	head->client_inc = cpu_to_le32(1); /* always, for now. */
  	head->flags = cpu_to_le32(flags);
  	if (flags & CEPH_OSD_FLAG_WRITE)
  		ceph_encode_timespec(&head->mtime, mtime);
  	head->num_ops = cpu_to_le16(num_op);
f24e9980e   Sage Weil   ceph: OSD client
383

f24e9980e   Sage Weil   ceph: OSD client
384
385
  
  	/* fill in oid */
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
386
387
388
  	head->object_len = cpu_to_le32(oid_len);
  	memcpy(p, oid, oid_len);
  	p += oid_len;
f24e9980e   Sage Weil   ceph: OSD client
389

68b4476b0   Yehuda Sadeh   ceph: messenger a...
390
391
392
393
  	src_op = src_ops;
  	while (src_op->op) {
  		osd_req_encode_op(req, op, src_op);
  		src_op++;
f24e9980e   Sage Weil   ceph: OSD client
394
  		op++;
f24e9980e   Sage Weil   ceph: OSD client
395
  	}
68b4476b0   Yehuda Sadeh   ceph: messenger a...
396
397
398
  
  	if (req->r_trail)
  		data_len += req->r_trail->length;
f24e9980e   Sage Weil   ceph: OSD client
399
400
401
402
403
404
405
406
  	if (snapc) {
  		head->snap_seq = cpu_to_le64(snapc->seq);
  		head->num_snaps = cpu_to_le32(snapc->num_snaps);
  		for (i = 0; i < snapc->num_snaps; i++) {
  			put_unaligned_le64(snapc->snaps[i], p);
  			p += sizeof(u64);
  		}
  	}
68b4476b0   Yehuda Sadeh   ceph: messenger a...
407
408
409
410
411
412
413
  	if (flags & CEPH_OSD_FLAG_WRITE) {
  		req->r_request->hdr.data_off = cpu_to_le16(off);
  		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
  	} else if (data_len) {
  		req->r_request->hdr.data_off = 0;
  		req->r_request->hdr.data_len = cpu_to_le32(data_len);
  	}
c5c6b19d4   Sage Weil   ceph: explicitly ...
414
  	req->r_request->page_alignment = req->r_page_alignment;
f24e9980e   Sage Weil   ceph: OSD client
415
  	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
6f863e712   Sage Weil   ceph: set osd req...
416
417
418
  	msg_size = p - msg->front.iov_base;
  	msg->front.iov_len = msg_size;
  	msg->hdr.front_len = cpu_to_le32(msg_size);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
419
420
  	return;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
421
  EXPORT_SYMBOL(ceph_osdc_build_request);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
  
  /*
   * build new request AND message, calculate layout, and adjust file
   * extent as needed.
   *
   * if the file was recently truncated, we include information about its
   * old and new size so that the object can be updated appropriately.  (we
   * avoid synchronously deleting truncated objects because it's slow.)
   *
   * if @do_sync, include a 'startsync' command so that the osd will flush
   * data quickly.
   */
  struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
  					       struct ceph_file_layout *layout,
  					       struct ceph_vino vino,
  					       u64 off, u64 *plen,
  					       int opcode, int flags,
  					       struct ceph_snap_context *snapc,
  					       int do_sync,
  					       u32 truncate_seq,
  					       u64 truncate_size,
  					       struct timespec *mtime,
b7495fc2f   Sage Weil   ceph: make page a...
444
445
  					       bool use_mempool, int num_reply,
  					       int page_align)
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
446
  {
68b4476b0   Yehuda Sadeh   ceph: messenger a...
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
  	struct ceph_osd_req_op ops[3];
  	struct ceph_osd_request *req;
  
  	ops[0].op = opcode;
  	ops[0].extent.truncate_seq = truncate_seq;
  	ops[0].extent.truncate_size = truncate_size;
  	ops[0].payload_len = 0;
  
  	if (do_sync) {
  		ops[1].op = CEPH_OSD_OP_STARTSYNC;
  		ops[1].payload_len = 0;
  		ops[2].op = 0;
  	} else
  		ops[1].op = 0;
  
  	req = ceph_osdc_alloc_request(osdc, flags,
  					 snapc, ops,
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
464
  					 use_mempool,
68b4476b0   Yehuda Sadeh   ceph: messenger a...
465
  					 GFP_NOFS, NULL, NULL);
4ad12621e   Sage Weil   libceph: fix ceph...
466
467
  	if (!req)
  		return NULL;
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
468
469
  
  	/* calculate max write size */
68b4476b0   Yehuda Sadeh   ceph: messenger a...
470
  	calc_layout(osdc, vino, layout, off, plen, req, ops);
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
471
  	req->r_file_layout = *layout;  /* keep a copy */
9bb0ce2b0   Sage Weil   libceph: fix page...
472
473
474
  	/* in case it differs from natural (file) alignment that
  	   calc_layout filled in for us */
  	req->r_num_pages = calc_pages_for(page_align, *plen);
b7495fc2f   Sage Weil   ceph: make page a...
475
  	req->r_page_alignment = page_align;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
476
477
  	ceph_osdc_build_request(req, off, plen, ops,
  				snapc,
3499e8a5d   Yehuda Sadeh   ceph: refactor os...
478
479
  				mtime,
  				req->r_oid, req->r_oid_len);
f24e9980e   Sage Weil   ceph: OSD client
480
481
  	return req;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
482
  EXPORT_SYMBOL(ceph_osdc_new_request);
f24e9980e   Sage Weil   ceph: OSD client
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
  
  /*
   * We keep osd requests in an rbtree, sorted by ->r_tid.
   */
  static void __insert_request(struct ceph_osd_client *osdc,
  			     struct ceph_osd_request *new)
  {
  	struct rb_node **p = &osdc->requests.rb_node;
  	struct rb_node *parent = NULL;
  	struct ceph_osd_request *req = NULL;
  
  	while (*p) {
  		parent = *p;
  		req = rb_entry(parent, struct ceph_osd_request, r_node);
  		if (new->r_tid < req->r_tid)
  			p = &(*p)->rb_left;
  		else if (new->r_tid > req->r_tid)
  			p = &(*p)->rb_right;
  		else
  			BUG();
  	}
  
  	rb_link_node(&new->r_node, parent, p);
  	rb_insert_color(&new->r_node, &osdc->requests);
  }
  
  static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
  						 u64 tid)
  {
  	struct ceph_osd_request *req;
  	struct rb_node *n = osdc->requests.rb_node;
  
  	while (n) {
  		req = rb_entry(n, struct ceph_osd_request, r_node);
  		if (tid < req->r_tid)
  			n = n->rb_left;
  		else if (tid > req->r_tid)
  			n = n->rb_right;
  		else
  			return req;
  	}
  	return NULL;
  }
  
  static struct ceph_osd_request *
  __lookup_request_ge(struct ceph_osd_client *osdc,
  		    u64 tid)
  {
  	struct ceph_osd_request *req;
  	struct rb_node *n = osdc->requests.rb_node;
  
  	while (n) {
  		req = rb_entry(n, struct ceph_osd_request, r_node);
  		if (tid < req->r_tid) {
  			if (!n->rb_left)
  				return req;
  			n = n->rb_left;
  		} else if (tid > req->r_tid) {
  			n = n->rb_right;
  		} else {
  			return req;
  		}
  	}
  	return NULL;
  }
6f6c70067   Sage Weil   libceph: fix osd ...
548
549
550
551
552
553
  /*
   * Resubmit requests pending on the given osd.
   */
  static void __kick_osd_requests(struct ceph_osd_client *osdc,
  				struct ceph_osd *osd)
  {
a40c4f10e   Yehuda Sadeh   libceph: add ling...
554
  	struct ceph_osd_request *req, *nreq;
6f6c70067   Sage Weil   libceph: fix osd ...
555
556
557
558
559
560
561
562
563
564
565
566
567
  	int err;
  
  	dout("__kick_osd_requests osd%d
  ", osd->o_osd);
  	err = __reset_osd(osdc, osd);
  	if (err == -EAGAIN)
  		return;
  
  	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
  		list_move(&req->r_req_lru_item, &osdc->req_unsent);
  		dout("requeued %p tid %llu osd%d
  ", req, req->r_tid,
  		     osd->o_osd);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
568
569
570
571
572
573
  		if (!req->r_linger)
  			req->r_flags |= CEPH_OSD_FLAG_RETRY;
  	}
  
  	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
  				 r_linger_osd) {
77f38e0ee   Sage Weil   libceph: fix ling...
574
575
576
577
578
  		/*
  		 * reregister request prior to unregistering linger so
  		 * that r_osd is preserved.
  		 */
  		BUG_ON(!list_empty(&req->r_req_lru_item));
a40c4f10e   Yehuda Sadeh   libceph: add ling...
579
  		__register_request(osdc, req);
77f38e0ee   Sage Weil   libceph: fix ling...
580
581
582
  		list_add(&req->r_req_lru_item, &osdc->req_unsent);
  		list_add(&req->r_osd_item, &req->r_osd->o_requests);
  		__unregister_linger_request(osdc, req);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
583
584
585
  		dout("requeued lingering %p tid %llu osd%d
  ", req, req->r_tid,
  		     osd->o_osd);
6f6c70067   Sage Weil   libceph: fix osd ...
586
587
588
589
590
591
592
593
594
595
  	}
  }
  
  static void kick_osd_requests(struct ceph_osd_client *osdc,
  			      struct ceph_osd *kickosd)
  {
  	mutex_lock(&osdc->request_mutex);
  	__kick_osd_requests(osdc, kickosd);
  	mutex_unlock(&osdc->request_mutex);
  }
f24e9980e   Sage Weil   ceph: OSD client
596
597
  
  /*
81b024e70   Sage Weil   ceph: reset osd s...
598
   * If the osd connection drops, we need to resubmit all requests.
f24e9980e   Sage Weil   ceph: OSD client
599
600
601
602
603
604
605
606
607
608
609
   */
  static void osd_reset(struct ceph_connection *con)
  {
  	struct ceph_osd *osd = con->private;
  	struct ceph_osd_client *osdc;
  
  	if (!osd)
  		return;
  	dout("osd_reset osd%d
  ", osd->o_osd);
  	osdc = osd->o_osdc;
f24e9980e   Sage Weil   ceph: OSD client
610
  	down_read(&osdc->map_sem);
6f6c70067   Sage Weil   libceph: fix osd ...
611
612
  	kick_osd_requests(osdc, osd);
  	send_queued(osdc);
f24e9980e   Sage Weil   ceph: OSD client
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
  	up_read(&osdc->map_sem);
  }
  
  /*
   * Track open sessions with osds.
   */
  static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
  {
  	struct ceph_osd *osd;
  
  	osd = kzalloc(sizeof(*osd), GFP_NOFS);
  	if (!osd)
  		return NULL;
  
  	atomic_set(&osd->o_ref, 1);
  	osd->o_osdc = osdc;
  	INIT_LIST_HEAD(&osd->o_requests);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
630
  	INIT_LIST_HEAD(&osd->o_linger_requests);
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
631
  	INIT_LIST_HEAD(&osd->o_osd_lru);
f24e9980e   Sage Weil   ceph: OSD client
632
633
634
635
636
637
  	osd->o_incarnation = 1;
  
  	ceph_con_init(osdc->client->msgr, &osd->o_con);
  	osd->o_con.private = osd;
  	osd->o_con.ops = &osd_con_ops;
  	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
4e7a5dcd1   Sage Weil   ceph: negotiate a...
638

422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
639
  	INIT_LIST_HEAD(&osd->o_keepalive_item);
f24e9980e   Sage Weil   ceph: OSD client
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
  	return osd;
  }
  
  static struct ceph_osd *get_osd(struct ceph_osd *osd)
  {
  	if (atomic_inc_not_zero(&osd->o_ref)) {
  		dout("get_osd %p %d -> %d
  ", osd, atomic_read(&osd->o_ref)-1,
  		     atomic_read(&osd->o_ref));
  		return osd;
  	} else {
  		dout("get_osd %p FAIL
  ", osd);
  		return NULL;
  	}
  }
  
  static void put_osd(struct ceph_osd *osd)
  {
  	dout("put_osd %p %d -> %d
  ", osd, atomic_read(&osd->o_ref),
  	     atomic_read(&osd->o_ref) - 1);
79494d1b9   Sage Weil   ceph: fix leak of...
662
663
664
665
666
  	if (atomic_dec_and_test(&osd->o_ref)) {
  		struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
  
  		if (osd->o_authorizer)
  			ac->ops->destroy_authorizer(ac, osd->o_authorizer);
f24e9980e   Sage Weil   ceph: OSD client
667
  		kfree(osd);
79494d1b9   Sage Weil   ceph: fix leak of...
668
  	}
f24e9980e   Sage Weil   ceph: OSD client
669
670
671
672
673
  }
  
  /*
   * remove an osd from our map
   */
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
674
  static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980e   Sage Weil   ceph: OSD client
675
  {
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
676
677
  	dout("__remove_osd %p
  ", osd);
f24e9980e   Sage Weil   ceph: OSD client
678
679
  	BUG_ON(!list_empty(&osd->o_requests));
  	rb_erase(&osd->o_node, &osdc->osds);
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
680
  	list_del_init(&osd->o_osd_lru);
f24e9980e   Sage Weil   ceph: OSD client
681
682
683
  	ceph_con_close(&osd->o_con);
  	put_osd(osd);
  }
aca420bc5   Sage Weil   libceph: fix leak...
684
685
686
687
688
689
690
691
692
693
694
695
  static void remove_all_osds(struct ceph_osd_client *osdc)
  {
  	dout("__remove_old_osds %p
  ", osdc);
  	mutex_lock(&osdc->request_mutex);
  	while (!RB_EMPTY_ROOT(&osdc->osds)) {
  		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
  						struct ceph_osd, o_node);
  		__remove_osd(osdc, osd);
  	}
  	mutex_unlock(&osdc->request_mutex);
  }
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
696
697
698
699
700
701
702
  static void __move_osd_to_lru(struct ceph_osd_client *osdc,
  			      struct ceph_osd *osd)
  {
  	dout("__move_osd_to_lru %p
  ", osd);
  	BUG_ON(!list_empty(&osd->o_osd_lru));
  	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
703
  	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
704
705
706
707
708
709
710
711
712
  }
  
  static void __remove_osd_from_lru(struct ceph_osd *osd)
  {
  	dout("__remove_osd_from_lru %p
  ", osd);
  	if (!list_empty(&osd->o_osd_lru))
  		list_del_init(&osd->o_osd_lru);
  }
aca420bc5   Sage Weil   libceph: fix leak...
713
  static void remove_old_osds(struct ceph_osd_client *osdc)
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
714
715
716
717
718
719
720
  {
  	struct ceph_osd *osd, *nosd;
  
  	dout("__remove_old_osds %p
  ", osdc);
  	mutex_lock(&osdc->request_mutex);
  	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
aca420bc5   Sage Weil   libceph: fix leak...
721
  		if (time_before(jiffies, osd->lru_ttl))
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
722
723
724
725
726
  			break;
  		__remove_osd(osdc, osd);
  	}
  	mutex_unlock(&osdc->request_mutex);
  }
f24e9980e   Sage Weil   ceph: OSD client
727
728
729
  /*
   * reset osd connect
   */
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
730
  static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980e   Sage Weil   ceph: OSD client
731
  {
87b315a5b   Sage Weil   ceph: avoid reope...
732
  	struct ceph_osd_request *req;
f24e9980e   Sage Weil   ceph: OSD client
733
  	int ret = 0;
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
734
735
  	dout("__reset_osd %p osd%d
  ", osd, osd->o_osd);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
736
737
  	if (list_empty(&osd->o_requests) &&
  	    list_empty(&osd->o_linger_requests)) {
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
738
  		__remove_osd(osdc, osd);
87b315a5b   Sage Weil   ceph: avoid reope...
739
740
741
742
743
744
745
746
747
748
  	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
  			  &osd->o_con.peer_addr,
  			  sizeof(osd->o_con.peer_addr)) == 0 &&
  		   !ceph_con_opened(&osd->o_con)) {
  		dout(" osd addr hasn't changed and connection never opened,"
  		     " letting msgr retry");
  		/* touch each r_stamp for handle_timeout()'s benfit */
  		list_for_each_entry(req, &osd->o_requests, r_osd_item)
  			req->r_stamp = jiffies;
  		ret = -EAGAIN;
f24e9980e   Sage Weil   ceph: OSD client
749
750
751
752
753
754
755
756
757
758
759
760
761
  	} else {
  		ceph_con_close(&osd->o_con);
  		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
  		osd->o_incarnation++;
  	}
  	return ret;
  }
  
  static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
  {
  	struct rb_node **p = &osdc->osds.rb_node;
  	struct rb_node *parent = NULL;
  	struct ceph_osd *osd = NULL;
aca420bc5   Sage Weil   libceph: fix leak...
762
763
  	dout("__insert_osd %p osd%d
  ", new, new->o_osd);
f24e9980e   Sage Weil   ceph: OSD client
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
  	while (*p) {
  		parent = *p;
  		osd = rb_entry(parent, struct ceph_osd, o_node);
  		if (new->o_osd < osd->o_osd)
  			p = &(*p)->rb_left;
  		else if (new->o_osd > osd->o_osd)
  			p = &(*p)->rb_right;
  		else
  			BUG();
  	}
  
  	rb_link_node(&new->o_node, parent, p);
  	rb_insert_color(&new->o_node, &osdc->osds);
  }
  
  static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
  {
  	struct ceph_osd *osd;
  	struct rb_node *n = osdc->osds.rb_node;
  
  	while (n) {
  		osd = rb_entry(n, struct ceph_osd, o_node);
  		if (o < osd->o_osd)
  			n = n->rb_left;
  		else if (o > osd->o_osd)
  			n = n->rb_right;
  		else
  			return osd;
  	}
  	return NULL;
  }
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
795
796
797
  static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
  {
  	schedule_delayed_work(&osdc->timeout_work,
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
798
  			osdc->client->options->osd_keepalive_timeout * HZ);
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
799
800
801
802
803
804
  }
  
  static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
  {
  	cancel_delayed_work(&osdc->timeout_work);
  }
f24e9980e   Sage Weil   ceph: OSD client
805
806
807
808
809
  
  /*
   * Register request, assign tid.  If this is the first request, set up
   * the timeout event.
   */
a40c4f10e   Yehuda Sadeh   libceph: add ling...
810
811
  static void __register_request(struct ceph_osd_client *osdc,
  			       struct ceph_osd_request *req)
f24e9980e   Sage Weil   ceph: OSD client
812
  {
f24e9980e   Sage Weil   ceph: OSD client
813
  	req->r_tid = ++osdc->last_tid;
6df058c02   Sage Weil   ceph: include tra...
814
  	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
77f38e0ee   Sage Weil   libceph: fix ling...
815
816
  	dout("__register_request %p tid %lld
  ", req, req->r_tid);
f24e9980e   Sage Weil   ceph: OSD client
817
818
819
  	__insert_request(osdc, req);
  	ceph_osdc_get_request(req);
  	osdc->num_requests++;
f24e9980e   Sage Weil   ceph: OSD client
820
  	if (osdc->num_requests == 1) {
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
821
822
823
  		dout(" first request, scheduling timeout
  ");
  		__schedule_osd_timeout(osdc);
f24e9980e   Sage Weil   ceph: OSD client
824
  	}
a40c4f10e   Yehuda Sadeh   libceph: add ling...
825
826
827
828
829
830
831
  }
  
  static void register_request(struct ceph_osd_client *osdc,
  			     struct ceph_osd_request *req)
  {
  	mutex_lock(&osdc->request_mutex);
  	__register_request(osdc, req);
f24e9980e   Sage Weil   ceph: OSD client
832
833
834
835
836
837
838
839
840
841
842
843
844
  	mutex_unlock(&osdc->request_mutex);
  }
  
  /*
   * called under osdc->request_mutex
   */
  static void __unregister_request(struct ceph_osd_client *osdc,
  				 struct ceph_osd_request *req)
  {
  	dout("__unregister_request %p tid %lld
  ", req, req->r_tid);
  	rb_erase(&req->r_node, &osdc->requests);
  	osdc->num_requests--;
0ba6478df   Sage Weil   ceph: revoke osd ...
845
846
847
848
849
  	if (req->r_osd) {
  		/* make sure the original request isn't in flight. */
  		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
  
  		list_del_init(&req->r_osd_item);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
850
851
852
853
  		if (list_empty(&req->r_osd->o_requests) &&
  		    list_empty(&req->r_osd->o_linger_requests)) {
  			dout("moving osd to %p lru
  ", req->r_osd);
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
854
  			__move_osd_to_lru(osdc, req->r_osd);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
855
  		}
fbdb91904   Sage Weil   libceph: fix null...
856
  		if (list_empty(&req->r_linger_item))
a40c4f10e   Yehuda Sadeh   libceph: add ling...
857
  			req->r_osd = NULL;
0ba6478df   Sage Weil   ceph: revoke osd ...
858
  	}
f24e9980e   Sage Weil   ceph: OSD client
859
860
  
  	ceph_osdc_put_request(req);
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
861
862
863
864
865
  	list_del_init(&req->r_req_lru_item);
  	if (osdc->num_requests == 0) {
  		dout(" no requests, canceling timeout
  ");
  		__cancel_osd_timeout(osdc);
f24e9980e   Sage Weil   ceph: OSD client
866
867
868
869
870
871
872
873
  	}
  }
  
  /*
   * Cancel a previously queued request message
   */
  static void __cancel_request(struct ceph_osd_request *req)
  {
6bc18876b   Sage Weil   ceph: avoid null ...
874
  	if (req->r_sent && req->r_osd) {
f24e9980e   Sage Weil   ceph: OSD client
875
876
877
878
  		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
  		req->r_sent = 0;
  	}
  }
a40c4f10e   Yehuda Sadeh   libceph: add ling...
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
  static void __register_linger_request(struct ceph_osd_client *osdc,
  				    struct ceph_osd_request *req)
  {
  	dout("__register_linger_request %p
  ", req);
  	list_add_tail(&req->r_linger_item, &osdc->req_linger);
  	list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
  }
  
  static void __unregister_linger_request(struct ceph_osd_client *osdc,
  					struct ceph_osd_request *req)
  {
  	dout("__unregister_linger_request %p
  ", req);
  	if (req->r_osd) {
  		list_del_init(&req->r_linger_item);
  		list_del_init(&req->r_linger_osd);
  
  		if (list_empty(&req->r_osd->o_requests) &&
  		    list_empty(&req->r_osd->o_linger_requests)) {
  			dout("moving osd to %p lru
  ", req->r_osd);
  			__move_osd_to_lru(osdc, req->r_osd);
  		}
fbdb91904   Sage Weil   libceph: fix null...
903
904
  		if (list_empty(&req->r_osd_item))
  			req->r_osd = NULL;
a40c4f10e   Yehuda Sadeh   libceph: add ling...
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
  	}
  }
  
  void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
  					 struct ceph_osd_request *req)
  {
  	mutex_lock(&osdc->request_mutex);
  	if (req->r_linger) {
  		__unregister_linger_request(osdc, req);
  		ceph_osdc_put_request(req);
  	}
  	mutex_unlock(&osdc->request_mutex);
  }
  EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
  
  void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
  				  struct ceph_osd_request *req)
  {
  	if (!req->r_linger) {
  		dout("set_request_linger %p
  ", req);
  		req->r_linger = 1;
  		/*
  		 * caller is now responsible for calling
  		 * unregister_linger_request
  		 */
  		ceph_osdc_get_request(req);
  	}
  }
  EXPORT_SYMBOL(ceph_osdc_set_request_linger);
f24e9980e   Sage Weil   ceph: OSD client
935
936
937
  /*
   * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
   * (as needed), and set the request r_osd appropriately.  If there is
25985edce   Lucas De Marchi   Fix common misspe...
938
   * no up osd, set r_osd to NULL.  Move the request to the appropriate list
6f6c70067   Sage Weil   libceph: fix osd ...
939
   * (unsent, homeless) or leave on in-flight lru.
f24e9980e   Sage Weil   ceph: OSD client
940
941
942
943
944
   *
   * Return 0 if unchanged, 1 if changed, or negative on error.
   *
   * Caller should hold map_sem for read and request_mutex.
   */
6f6c70067   Sage Weil   libceph: fix osd ...
945
946
  static int __map_request(struct ceph_osd_client *osdc,
  			 struct ceph_osd_request *req)
f24e9980e   Sage Weil   ceph: OSD client
947
948
  {
  	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
51042122d   Sage Weil   ceph: fix endian ...
949
  	struct ceph_pg pgid;
d85b70566   Sage Weil   ceph: resubmit re...
950
951
  	int acting[CEPH_PG_MAX_SIZE];
  	int o = -1, num = 0;
f24e9980e   Sage Weil   ceph: OSD client
952
  	int err;
f24e9980e   Sage Weil   ceph: OSD client
953

6f6c70067   Sage Weil   libceph: fix osd ...
954
955
  	dout("map_request %p tid %lld
  ", req, req->r_tid);
f24e9980e   Sage Weil   ceph: OSD client
956
957
  	err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
  				      &req->r_file_layout, osdc->osdmap);
6f6c70067   Sage Weil   libceph: fix osd ...
958
959
  	if (err) {
  		list_move(&req->r_req_lru_item, &osdc->req_notarget);
f24e9980e   Sage Weil   ceph: OSD client
960
  		return err;
6f6c70067   Sage Weil   libceph: fix osd ...
961
  	}
51042122d   Sage Weil   ceph: fix endian ...
962
  	pgid = reqhead->layout.ol_pgid;
7740a42f8   Sage Weil   ceph: display pgi...
963
  	req->r_pgid = pgid;
d85b70566   Sage Weil   ceph: resubmit re...
964
965
966
967
968
  	err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
  	if (err > 0) {
  		o = acting[0];
  		num = err;
  	}
f24e9980e   Sage Weil   ceph: OSD client
969
970
  
  	if ((req->r_osd && req->r_osd->o_osd == o &&
d85b70566   Sage Weil   ceph: resubmit re...
971
972
973
  	     req->r_sent >= req->r_osd->o_incarnation &&
  	     req->r_num_pg_osds == num &&
  	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
f24e9980e   Sage Weil   ceph: OSD client
974
975
  	    (req->r_osd == NULL && o == -1))
  		return 0;  /* no change */
6f6c70067   Sage Weil   libceph: fix osd ...
976
977
  	dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)
  ",
51042122d   Sage Weil   ceph: fix endian ...
978
  	     req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
f24e9980e   Sage Weil   ceph: OSD client
979
  	     req->r_osd ? req->r_osd->o_osd : -1);
d85b70566   Sage Weil   ceph: resubmit re...
980
981
982
  	/* record full pg acting set */
  	memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
  	req->r_num_pg_osds = num;
f24e9980e   Sage Weil   ceph: OSD client
983
984
985
  	if (req->r_osd) {
  		__cancel_request(req);
  		list_del_init(&req->r_osd_item);
f24e9980e   Sage Weil   ceph: OSD client
986
987
988
989
990
  		req->r_osd = NULL;
  	}
  
  	req->r_osd = __lookup_osd(osdc, o);
  	if (!req->r_osd && o >= 0) {
c99eb1c72   Sage Weil   ceph: remove frag...
991
992
  		err = -ENOMEM;
  		req->r_osd = create_osd(osdc);
6f6c70067   Sage Weil   libceph: fix osd ...
993
994
  		if (!req->r_osd) {
  			list_move(&req->r_req_lru_item, &osdc->req_notarget);
c99eb1c72   Sage Weil   ceph: remove frag...
995
  			goto out;
6f6c70067   Sage Weil   libceph: fix osd ...
996
  		}
f24e9980e   Sage Weil   ceph: OSD client
997

6f6c70067   Sage Weil   libceph: fix osd ...
998
999
  		dout("map_request osd %p is osd%d
  ", req->r_osd, o);
f24e9980e   Sage Weil   ceph: OSD client
1000
1001
1002
1003
1004
1005
  		req->r_osd->o_osd = o;
  		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
  		__insert_osd(osdc, req->r_osd);
  
  		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
  	}
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1006
1007
  	if (req->r_osd) {
  		__remove_osd_from_lru(req->r_osd);
f24e9980e   Sage Weil   ceph: OSD client
1008
  		list_add(&req->r_osd_item, &req->r_osd->o_requests);
6f6c70067   Sage Weil   libceph: fix osd ...
1009
1010
1011
  		list_move(&req->r_req_lru_item, &osdc->req_unsent);
  	} else {
  		list_move(&req->r_req_lru_item, &osdc->req_notarget);
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1012
  	}
d85b70566   Sage Weil   ceph: resubmit re...
1013
  	err = 1;   /* osd or pg changed */
f24e9980e   Sage Weil   ceph: OSD client
1014
1015
  
  out:
f24e9980e   Sage Weil   ceph: OSD client
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
  	return err;
  }
  
  /*
   * caller should hold map_sem (for read) and request_mutex
   */
  static int __send_request(struct ceph_osd_client *osdc,
  			  struct ceph_osd_request *req)
  {
  	struct ceph_osd_request_head *reqhead;
f24e9980e   Sage Weil   ceph: OSD client
1026
1027
1028
1029
1030
1031
1032
1033
1034
  
  	dout("send_request %p tid %llu to osd%d flags %d
  ",
  	     req, req->r_tid, req->r_osd->o_osd, req->r_flags);
  
  	reqhead = req->r_request->front.iov_base;
  	reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
  	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
  	reqhead->reassert_version = req->r_reassert_version;
3dd72fc0e   Sage Weil   ceph: rename r_se...
1035
  	req->r_stamp = jiffies;
07a27e226   Henry C Chang   ceph: fix osd req...
1036
  	list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
f24e9980e   Sage Weil   ceph: OSD client
1037
1038
1039
1040
1041
1042
1043
1044
  
  	ceph_msg_get(req->r_request); /* send consumes a ref */
  	ceph_con_send(&req->r_osd->o_con, req->r_request);
  	req->r_sent = req->r_osd->o_incarnation;
  	return 0;
  }
  
  /*
6f6c70067   Sage Weil   libceph: fix osd ...
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
   * Send any requests in the queue (req_unsent).
   */
  static void send_queued(struct ceph_osd_client *osdc)
  {
  	struct ceph_osd_request *req, *tmp;
  
  	dout("send_queued
  ");
  	mutex_lock(&osdc->request_mutex);
  	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
  		__send_request(osdc, req);
  	}
  	mutex_unlock(&osdc->request_mutex);
  }
  
  /*
f24e9980e   Sage Weil   ceph: OSD client
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
   * Timeout callback, called every N seconds when 1 or more osd
   * requests has been active for more than N seconds.  When this
   * happens, we ping all OSDs with requests who have timed out to
   * ensure any communications channel reset is detected.  Reset the
   * request timeouts another N seconds in the future as we go.
   * Reschedule the timeout event another N seconds in future (unless
   * there are no open requests).
   */
  static void handle_timeout(struct work_struct *work)
  {
  	struct ceph_osd_client *osdc =
  		container_of(work, struct ceph_osd_client, timeout_work.work);
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1073
  	struct ceph_osd_request *req, *last_req = NULL;
f24e9980e   Sage Weil   ceph: OSD client
1074
  	struct ceph_osd *osd;
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1075
  	unsigned long timeout = osdc->client->options->osd_timeout * HZ;
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1076
  	unsigned long keepalive =
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1077
  		osdc->client->options->osd_keepalive_timeout * HZ;
3dd72fc0e   Sage Weil   ceph: rename r_se...
1078
  	unsigned long last_stamp = 0;
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1079
  	struct list_head slow_osds;
f24e9980e   Sage Weil   ceph: OSD client
1080
1081
1082
1083
1084
1085
1086
  	dout("timeout
  ");
  	down_read(&osdc->map_sem);
  
  	ceph_monc_request_next_osdmap(&osdc->client->monc);
  
  	mutex_lock(&osdc->request_mutex);
f24e9980e   Sage Weil   ceph: OSD client
1087

422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1088
1089
1090
1091
1092
1093
1094
  	/*
  	 * reset osds that appear to be _really_ unresponsive.  this
  	 * is a failsafe measure.. we really shouldn't be getting to
  	 * this point if the system is working properly.  the monitors
  	 * should mark the osd as failed and we should find out about
  	 * it from an updated osd map.
  	 */
f26e681d5   Sage Weil   ceph: osdtimeout=...
1095
  	while (timeout && !list_empty(&osdc->req_lru)) {
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1096
1097
  		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
  				 r_req_lru_item);
4cf9d5446   Sage Weil   libceph: don't ti...
1098
  		/* hasn't been long enough since we sent it? */
3dd72fc0e   Sage Weil   ceph: rename r_se...
1099
  		if (time_before(jiffies, req->r_stamp + timeout))
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1100
  			break;
4cf9d5446   Sage Weil   libceph: don't ti...
1101
1102
1103
1104
1105
  
  		/* hasn't been long enough since it was acked? */
  		if (req->r_request->ack_stamp == 0 ||
  		    time_before(jiffies, req->r_request->ack_stamp + timeout))
  			break;
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1106

3dd72fc0e   Sage Weil   ceph: rename r_se...
1107
  		BUG_ON(req == last_req && req->r_stamp == last_stamp);
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1108
  		last_req = req;
3dd72fc0e   Sage Weil   ceph: rename r_se...
1109
  		last_stamp = req->r_stamp;
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1110
1111
1112
1113
1114
1115
  
  		osd = req->r_osd;
  		BUG_ON(!osd);
  		pr_warning(" tid %llu timed out on osd%d, will reset osd
  ",
  			   req->r_tid, osd->o_osd);
6f6c70067   Sage Weil   libceph: fix osd ...
1116
  		__kick_osd_requests(osdc, osd);
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1117
1118
1119
1120
1121
1122
1123
1124
1125
  	}
  
  	/*
  	 * ping osds that are a bit slow.  this ensures that if there
  	 * is a break in the TCP connection we will notice, and reopen
  	 * a connection with that osd (from the fault callback).
  	 */
  	INIT_LIST_HEAD(&slow_osds);
  	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
3dd72fc0e   Sage Weil   ceph: rename r_se...
1126
  		if (time_before(jiffies, req->r_stamp + keepalive))
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1127
1128
1129
1130
1131
1132
  			break;
  
  		osd = req->r_osd;
  		BUG_ON(!osd);
  		dout(" tid %llu is slow, will send keepalive on osd%d
  ",
f24e9980e   Sage Weil   ceph: OSD client
1133
  		     req->r_tid, osd->o_osd);
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1134
1135
1136
1137
1138
1139
  		list_move_tail(&osd->o_keepalive_item, &slow_osds);
  	}
  	while (!list_empty(&slow_osds)) {
  		osd = list_entry(slow_osds.next, struct ceph_osd,
  				 o_keepalive_item);
  		list_del_init(&osd->o_keepalive_item);
f24e9980e   Sage Weil   ceph: OSD client
1140
1141
  		ceph_con_keepalive(&osd->o_con);
  	}
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1142
  	__schedule_osd_timeout(osdc);
f24e9980e   Sage Weil   ceph: OSD client
1143
  	mutex_unlock(&osdc->request_mutex);
6f6c70067   Sage Weil   libceph: fix osd ...
1144
  	send_queued(osdc);
f24e9980e   Sage Weil   ceph: OSD client
1145
1146
  	up_read(&osdc->map_sem);
  }
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1147
1148
1149
1150
1151
1152
  static void handle_osds_timeout(struct work_struct *work)
  {
  	struct ceph_osd_client *osdc =
  		container_of(work, struct ceph_osd_client,
  			     osds_timeout_work.work);
  	unsigned long delay =
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1153
  		osdc->client->options->osd_idle_ttl * HZ >> 2;
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1154
1155
1156
1157
  
  	dout("osds timeout
  ");
  	down_read(&osdc->map_sem);
aca420bc5   Sage Weil   libceph: fix leak...
1158
  	remove_old_osds(osdc);
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1159
1160
1161
1162
1163
  	up_read(&osdc->map_sem);
  
  	schedule_delayed_work(&osdc->osds_timeout_work,
  			      round_jiffies_relative(delay));
  }
258454723   Sage Weil   ceph: fix sync vs...
1164
1165
1166
1167
1168
1169
  static void complete_request(struct ceph_osd_request *req)
  {
  	if (req->r_safe_callback)
  		req->r_safe_callback(req, NULL);
  	complete_all(&req->r_safe_completion);  /* fsync waiter */
  }
f24e9980e   Sage Weil   ceph: OSD client
1170
1171
1172
1173
  /*
   * handle osd op reply.  either call the callback if it is specified,
   * or do the completion to wake up the waiting thread.
   */
350b1c32e   Sage Weil   ceph: control acc...
1174
1175
  static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
  			 struct ceph_connection *con)
f24e9980e   Sage Weil   ceph: OSD client
1176
1177
1178
1179
1180
  {
  	struct ceph_osd_reply_head *rhead = msg->front.iov_base;
  	struct ceph_osd_request *req;
  	u64 tid;
  	int numops, object_len, flags;
0ceed5db3   Sage Weil   ceph: unregister ...
1181
  	s32 result;
f24e9980e   Sage Weil   ceph: OSD client
1182

6df058c02   Sage Weil   ceph: include tra...
1183
  	tid = le64_to_cpu(msg->hdr.tid);
f24e9980e   Sage Weil   ceph: OSD client
1184
1185
  	if (msg->front.iov_len < sizeof(*rhead))
  		goto bad;
f24e9980e   Sage Weil   ceph: OSD client
1186
1187
  	numops = le32_to_cpu(rhead->num_ops);
  	object_len = le32_to_cpu(rhead->object_len);
0ceed5db3   Sage Weil   ceph: unregister ...
1188
  	result = le32_to_cpu(rhead->result);
f24e9980e   Sage Weil   ceph: OSD client
1189
1190
1191
  	if (msg->front.iov_len != sizeof(*rhead) + object_len +
  	    numops * sizeof(struct ceph_osd_op))
  		goto bad;
0ceed5db3   Sage Weil   ceph: unregister ...
1192
1193
  	dout("handle_reply %p tid %llu result %d
  ", msg, tid, (int)result);
f24e9980e   Sage Weil   ceph: OSD client
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
  	/* lookup */
  	mutex_lock(&osdc->request_mutex);
  	req = __lookup_request(osdc, tid);
  	if (req == NULL) {
  		dout("handle_reply tid %llu dne
  ", tid);
  		mutex_unlock(&osdc->request_mutex);
  		return;
  	}
  	ceph_osdc_get_request(req);
  	flags = le32_to_cpu(rhead->flags);
350b1c32e   Sage Weil   ceph: control acc...
1205
  	/*
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
1206
  	 * if this connection filled our message, drop our reference now, to
350b1c32e   Sage Weil   ceph: control acc...
1207
1208
  	 * avoid a (safe but slower) revoke later.
  	 */
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
1209
  	if (req->r_con_filling_msg == con && req->r_reply == msg) {
c16e78692   Sage Weil   ceph: use single ...
1210
1211
  		dout(" dropping con_filling_msg ref %p
  ", con);
0d59ab81c   Yehuda Sadeh   ceph: keep reserv...
1212
  		req->r_con_filling_msg = NULL;
350b1c32e   Sage Weil   ceph: control acc...
1213
1214
  		ceph_con_put(con);
  	}
f24e9980e   Sage Weil   ceph: OSD client
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
  	if (!req->r_got_reply) {
  		unsigned bytes;
  
  		req->r_result = le32_to_cpu(rhead->result);
  		bytes = le32_to_cpu(msg->hdr.data_len);
  		dout("handle_reply result %d bytes %d
  ", req->r_result,
  		     bytes);
  		if (req->r_result == 0)
  			req->r_result = bytes;
  
  		/* in case this is a write and we need to replay, */
  		req->r_reassert_version = rhead->reassert_version;
  
  		req->r_got_reply = 1;
  	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
  		dout("handle_reply tid %llu dup ack
  ", tid);
34b43a56b   Sage Weil   ceph: plug leak o...
1233
  		mutex_unlock(&osdc->request_mutex);
f24e9980e   Sage Weil   ceph: OSD client
1234
1235
1236
1237
1238
  		goto done;
  	}
  
  	dout("handle_reply tid %llu flags %d
  ", tid, flags);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1239
1240
  	if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
  		__register_linger_request(osdc, req);
f24e9980e   Sage Weil   ceph: OSD client
1241
  	/* either this is a read, or we got the safe response */
0ceed5db3   Sage Weil   ceph: unregister ...
1242
1243
  	if (result < 0 ||
  	    (flags & CEPH_OSD_FLAG_ONDISK) ||
f24e9980e   Sage Weil   ceph: OSD client
1244
1245
1246
1247
1248
1249
1250
1251
  	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
  		__unregister_request(osdc, req);
  
  	mutex_unlock(&osdc->request_mutex);
  
  	if (req->r_callback)
  		req->r_callback(req, msg);
  	else
03066f234   Yehuda Sadeh   ceph: use complet...
1252
  		complete_all(&req->r_completion);
f24e9980e   Sage Weil   ceph: OSD client
1253

258454723   Sage Weil   ceph: fix sync vs...
1254
1255
  	if (flags & CEPH_OSD_FLAG_ONDISK)
  		complete_request(req);
f24e9980e   Sage Weil   ceph: OSD client
1256
1257
  
  done:
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1258
1259
  	dout("req=%p req->r_linger=%d
  ", req, req->r_linger);
f24e9980e   Sage Weil   ceph: OSD client
1260
1261
1262
1263
1264
1265
1266
1267
  	ceph_osdc_put_request(req);
  	return;
  
  bad:
  	pr_err("corrupt osd_op_reply got %d %d expected %d
  ",
  	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
  	       (int)sizeof(*rhead));
9ec7cab14   Sage Weil   ceph: hex dump co...
1268
  	ceph_msg_dump(msg);
f24e9980e   Sage Weil   ceph: OSD client
1269
  }
6f6c70067   Sage Weil   libceph: fix osd ...
1270
  static void reset_changed_osds(struct ceph_osd_client *osdc)
f24e9980e   Sage Weil   ceph: OSD client
1271
  {
f24e9980e   Sage Weil   ceph: OSD client
1272
  	struct rb_node *p, *n;
f24e9980e   Sage Weil   ceph: OSD client
1273

6f6c70067   Sage Weil   libceph: fix osd ...
1274
1275
  	for (p = rb_first(&osdc->osds); p; p = n) {
  		struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
f24e9980e   Sage Weil   ceph: OSD client
1276

6f6c70067   Sage Weil   libceph: fix osd ...
1277
1278
1279
1280
1281
1282
1283
  		n = rb_next(p);
  		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
  		    memcmp(&osd->o_con.peer_addr,
  			   ceph_osd_addr(osdc->osdmap,
  					 osd->o_osd),
  			   sizeof(struct ceph_entity_addr)) != 0)
  			__reset_osd(osdc, osd);
f24e9980e   Sage Weil   ceph: OSD client
1284
  	}
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1285
1286
1287
  }
  
  /*
6f6c70067   Sage Weil   libceph: fix osd ...
1288
1289
   * Requeue requests whose mapping to an OSD has changed.  If requests map to
   * no osd, request a new map.
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1290
1291
1292
   *
   * Caller should hold map_sem for read and request_mutex.
   */
6f6c70067   Sage Weil   libceph: fix osd ...
1293
  static void kick_requests(struct ceph_osd_client *osdc)
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1294
  {
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1295
  	struct ceph_osd_request *req, *nreq;
6f6c70067   Sage Weil   libceph: fix osd ...
1296
1297
1298
  	struct rb_node *p;
  	int needmap = 0;
  	int err;
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1299

6f6c70067   Sage Weil   libceph: fix osd ...
1300
1301
  	dout("kick_requests
  ");
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1302
  	mutex_lock(&osdc->request_mutex);
6f6c70067   Sage Weil   libceph: fix osd ...
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
  	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
  		req = rb_entry(p, struct ceph_osd_request, r_node);
  		err = __map_request(osdc, req);
  		if (err < 0)
  			continue;  /* error */
  		if (req->r_osd == NULL) {
  			dout("%p tid %llu maps to no osd
  ", req, req->r_tid);
  			needmap++;  /* request a newer map */
  		} else if (err > 0) {
  			dout("%p tid %llu requeued on osd%d
  ", req, req->r_tid,
  			     req->r_osd ? req->r_osd->o_osd : -1);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
  			if (!req->r_linger)
  				req->r_flags |= CEPH_OSD_FLAG_RETRY;
  		}
  	}
  
  	list_for_each_entry_safe(req, nreq, &osdc->req_linger,
  				 r_linger_item) {
  		dout("linger req=%p req->r_osd=%p
  ", req, req->r_osd);
  
  		err = __map_request(osdc, req);
  		if (err == 0)
  			continue;  /* no change and no osd was specified */
  		if (err < 0)
  			continue;  /* hrm! */
  		if (req->r_osd == NULL) {
  			dout("tid %llu maps to no valid osd
  ", req->r_tid);
  			needmap++;  /* request a newer map */
  			continue;
6f6c70067   Sage Weil   libceph: fix osd ...
1336
  		}
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1337
1338
1339
1340
1341
1342
  
  		dout("kicking lingering %p tid %llu osd%d
  ", req, req->r_tid,
  		     req->r_osd ? req->r_osd->o_osd : -1);
  		__unregister_linger_request(osdc, req);
  		__register_request(osdc, req);
6f6c70067   Sage Weil   libceph: fix osd ...
1343
  	}
f24e9980e   Sage Weil   ceph: OSD client
1344
1345
1346
1347
1348
1349
1350
  	mutex_unlock(&osdc->request_mutex);
  
  	if (needmap) {
  		dout("%d requests for down osds, need new map
  ", needmap);
  		ceph_monc_request_next_osdmap(&osdc->client->monc);
  	}
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1351
  }
6f6c70067   Sage Weil   libceph: fix osd ...
1352

f24e9980e   Sage Weil   ceph: OSD client
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
  /*
   * Process updated osd map.
   *
   * The message contains any number of incremental and full maps, normally
   * indicating some sort of topology change in the cluster.  Kick requests
   * off to different OSDs as needed.
   */
  void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
  {
  	void *p, *end, *next;
  	u32 nr_maps, maplen;
  	u32 epoch;
  	struct ceph_osdmap *newmap = NULL, *oldmap;
  	int err;
  	struct ceph_fsid fsid;
  
  	dout("handle_map have %u
  ", osdc->osdmap ? osdc->osdmap->epoch : 0);
  	p = msg->front.iov_base;
  	end = p + msg->front.iov_len;
  
  	/* verify fsid */
  	ceph_decode_need(&p, end, sizeof(fsid), bad);
  	ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d8   Sage Weil   ceph: fix debugfs...
1377
1378
  	if (ceph_check_fsid(osdc->client, &fsid) < 0)
  		return;
f24e9980e   Sage Weil   ceph: OSD client
1379
1380
1381
1382
1383
1384
1385
1386
1387
  
  	down_write(&osdc->map_sem);
  
  	/* incremental maps */
  	ceph_decode_32_safe(&p, end, nr_maps, bad);
  	dout(" %d inc maps
  ", nr_maps);
  	while (nr_maps > 0) {
  		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea4   Sage Weil   ceph: convert enc...
1388
1389
  		epoch = ceph_decode_32(&p);
  		maplen = ceph_decode_32(&p);
f24e9980e   Sage Weil   ceph: OSD client
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
  		ceph_decode_need(&p, end, maplen, bad);
  		next = p + maplen;
  		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
  			dout("applying incremental map %u len %d
  ",
  			     epoch, maplen);
  			newmap = osdmap_apply_incremental(&p, next,
  							  osdc->osdmap,
  							  osdc->client->msgr);
  			if (IS_ERR(newmap)) {
  				err = PTR_ERR(newmap);
  				goto bad;
  			}
30dc6381b   Sage Weil   ceph: fix error p...
1403
  			BUG_ON(!newmap);
f24e9980e   Sage Weil   ceph: OSD client
1404
1405
1406
1407
  			if (newmap != osdc->osdmap) {
  				ceph_osdmap_destroy(osdc->osdmap);
  				osdc->osdmap = newmap;
  			}
6f6c70067   Sage Weil   libceph: fix osd ...
1408
1409
  			kick_requests(osdc);
  			reset_changed_osds(osdc);
f24e9980e   Sage Weil   ceph: OSD client
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
  		} else {
  			dout("ignoring incremental map %u len %d
  ",
  			     epoch, maplen);
  		}
  		p = next;
  		nr_maps--;
  	}
  	if (newmap)
  		goto done;
  
  	/* full maps */
  	ceph_decode_32_safe(&p, end, nr_maps, bad);
  	dout(" %d full maps
  ", nr_maps);
  	while (nr_maps) {
  		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea4   Sage Weil   ceph: convert enc...
1427
1428
  		epoch = ceph_decode_32(&p);
  		maplen = ceph_decode_32(&p);
f24e9980e   Sage Weil   ceph: OSD client
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
  		ceph_decode_need(&p, end, maplen, bad);
  		if (nr_maps > 1) {
  			dout("skipping non-latest full map %u len %d
  ",
  			     epoch, maplen);
  		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
  			dout("skipping full map %u len %d, "
  			     "older than our %u
  ", epoch, maplen,
  			     osdc->osdmap->epoch);
  		} else {
  			dout("taking full map %u len %d
  ", epoch, maplen);
  			newmap = osdmap_decode(&p, p+maplen);
  			if (IS_ERR(newmap)) {
  				err = PTR_ERR(newmap);
  				goto bad;
  			}
30dc6381b   Sage Weil   ceph: fix error p...
1447
  			BUG_ON(!newmap);
f24e9980e   Sage Weil   ceph: OSD client
1448
1449
1450
1451
  			oldmap = osdc->osdmap;
  			osdc->osdmap = newmap;
  			if (oldmap)
  				ceph_osdmap_destroy(oldmap);
6f6c70067   Sage Weil   libceph: fix osd ...
1452
  			kick_requests(osdc);
f24e9980e   Sage Weil   ceph: OSD client
1453
1454
1455
1456
1457
1458
1459
1460
  		}
  		p += maplen;
  		nr_maps--;
  	}
  
  done:
  	downgrade_write(&osdc->map_sem);
  	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
cd634fb6e   Sage Weil   libceph: subscrib...
1461
1462
1463
1464
1465
1466
1467
1468
  
  	/*
  	 * subscribe to subsequent osdmap updates if full to ensure
  	 * we find out when we are no longer full and stop returning
  	 * ENOSPC.
  	 */
  	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
  		ceph_monc_request_next_osdmap(&osdc->client->monc);
6f6c70067   Sage Weil   libceph: fix osd ...
1469
  	send_queued(osdc);
f24e9980e   Sage Weil   ceph: OSD client
1470
  	up_read(&osdc->map_sem);
03066f234   Yehuda Sadeh   ceph: use complet...
1471
  	wake_up_all(&osdc->client->auth_wq);
f24e9980e   Sage Weil   ceph: OSD client
1472
1473
1474
1475
1476
  	return;
  
  bad:
  	pr_err("osdc handle_map corrupt msg
  ");
9ec7cab14   Sage Weil   ceph: hex dump co...
1477
  	ceph_msg_dump(msg);
f24e9980e   Sage Weil   ceph: OSD client
1478
1479
1480
  	up_write(&osdc->map_sem);
  	return;
  }
f24e9980e   Sage Weil   ceph: OSD client
1481
  /*
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
   * watch/notify callback event infrastructure
   *
   * These callbacks are used both for watch and notify operations.
   */
  static void __release_event(struct kref *kref)
  {
  	struct ceph_osd_event *event =
  		container_of(kref, struct ceph_osd_event, kref);
  
  	dout("__release_event %p
  ", event);
  	kfree(event);
  }
  
  static void get_event(struct ceph_osd_event *event)
  {
  	kref_get(&event->kref);
  }
  
  void ceph_osdc_put_event(struct ceph_osd_event *event)
  {
  	kref_put(&event->kref, __release_event);
  }
  EXPORT_SYMBOL(ceph_osdc_put_event);
  
  static void __insert_event(struct ceph_osd_client *osdc,
  			     struct ceph_osd_event *new)
  {
  	struct rb_node **p = &osdc->event_tree.rb_node;
  	struct rb_node *parent = NULL;
  	struct ceph_osd_event *event = NULL;
  
  	while (*p) {
  		parent = *p;
  		event = rb_entry(parent, struct ceph_osd_event, node);
  		if (new->cookie < event->cookie)
  			p = &(*p)->rb_left;
  		else if (new->cookie > event->cookie)
  			p = &(*p)->rb_right;
  		else
  			BUG();
  	}
  
  	rb_link_node(&new->node, parent, p);
  	rb_insert_color(&new->node, &osdc->event_tree);
  }
  
  static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
  					        u64 cookie)
  {
  	struct rb_node **p = &osdc->event_tree.rb_node;
  	struct rb_node *parent = NULL;
  	struct ceph_osd_event *event = NULL;
  
  	while (*p) {
  		parent = *p;
  		event = rb_entry(parent, struct ceph_osd_event, node);
  		if (cookie < event->cookie)
  			p = &(*p)->rb_left;
  		else if (cookie > event->cookie)
  			p = &(*p)->rb_right;
  		else
  			return event;
  	}
  	return NULL;
  }
  
  static void __remove_event(struct ceph_osd_event *event)
  {
  	struct ceph_osd_client *osdc = event->osdc;
  
  	if (!RB_EMPTY_NODE(&event->node)) {
  		dout("__remove_event removed %p
  ", event);
  		rb_erase(&event->node, &osdc->event_tree);
  		ceph_osdc_put_event(event);
  	} else {
  		dout("__remove_event didn't remove %p
  ", event);
  	}
  }
  
  int ceph_osdc_create_event(struct ceph_osd_client *osdc,
  			   void (*event_cb)(u64, u64, u8, void *),
  			   int one_shot, void *data,
  			   struct ceph_osd_event **pevent)
  {
  	struct ceph_osd_event *event;
  
  	event = kmalloc(sizeof(*event), GFP_NOIO);
  	if (!event)
  		return -ENOMEM;
  
  	dout("create_event %p
  ", event);
  	event->cb = event_cb;
  	event->one_shot = one_shot;
  	event->data = data;
  	event->osdc = osdc;
  	INIT_LIST_HEAD(&event->osd_node);
  	kref_init(&event->kref);   /* one ref for us */
  	kref_get(&event->kref);    /* one ref for the caller */
  	init_completion(&event->completion);
  
  	spin_lock(&osdc->event_lock);
  	event->cookie = ++osdc->event_count;
  	__insert_event(osdc, event);
  	spin_unlock(&osdc->event_lock);
  
  	*pevent = event;
  	return 0;
  }
  EXPORT_SYMBOL(ceph_osdc_create_event);
  
  void ceph_osdc_cancel_event(struct ceph_osd_event *event)
  {
  	struct ceph_osd_client *osdc = event->osdc;
  
  	dout("cancel_event %p
  ", event);
  	spin_lock(&osdc->event_lock);
  	__remove_event(event);
  	spin_unlock(&osdc->event_lock);
  	ceph_osdc_put_event(event); /* caller's */
  }
  EXPORT_SYMBOL(ceph_osdc_cancel_event);
  
  
  static void do_event_work(struct work_struct *work)
  {
  	struct ceph_osd_event_work *event_work =
  		container_of(work, struct ceph_osd_event_work, work);
  	struct ceph_osd_event *event = event_work->event;
  	u64 ver = event_work->ver;
  	u64 notify_id = event_work->notify_id;
  	u8 opcode = event_work->opcode;
  
  	dout("do_event_work completing %p
  ", event);
  	event->cb(ver, notify_id, opcode, event->data);
  	complete(&event->completion);
  	dout("do_event_work completed %p
  ", event);
  	ceph_osdc_put_event(event);
  	kfree(event_work);
  }
  
  
  /*
   * Process osd watch notifications
   */
  void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
  {
  	void *p, *end;
  	u8 proto_ver;
  	u64 cookie, ver, notify_id;
  	u8 opcode;
  	struct ceph_osd_event *event;
  	struct ceph_osd_event_work *event_work;
  
  	p = msg->front.iov_base;
  	end = p + msg->front.iov_len;
  
  	ceph_decode_8_safe(&p, end, proto_ver, bad);
  	ceph_decode_8_safe(&p, end, opcode, bad);
  	ceph_decode_64_safe(&p, end, cookie, bad);
  	ceph_decode_64_safe(&p, end, ver, bad);
  	ceph_decode_64_safe(&p, end, notify_id, bad);
  
  	spin_lock(&osdc->event_lock);
  	event = __find_event(osdc, cookie);
  	if (event) {
  		get_event(event);
  		if (event->one_shot)
  			__remove_event(event);
  	}
  	spin_unlock(&osdc->event_lock);
  	dout("handle_watch_notify cookie %lld ver %lld event %p
  ",
  	     cookie, ver, event);
  	if (event) {
  		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1664
1665
1666
1667
1668
  		if (!event_work) {
  			dout("ERROR: could not allocate event_work
  ");
  			goto done_err;
  		}
6b0ae4097   Mariusz Kozlowski   ceph: fix possibl...
1669
  		INIT_WORK(&event_work->work, do_event_work);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
  		event_work->event = event;
  		event_work->ver = ver;
  		event_work->notify_id = notify_id;
  		event_work->opcode = opcode;
  		if (!queue_work(osdc->notify_wq, &event_work->work)) {
  			dout("WARNING: failed to queue notify event work
  ");
  			goto done_err;
  		}
  	}
  
  	return;
  
  done_err:
  	complete(&event->completion);
  	ceph_osdc_put_event(event);
  	return;
  
  bad:
  	pr_err("osdc handle_watch_notify corrupt msg
  ");
  	return;
  }
  
  int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
  {
  	int err;
  
  	dout("wait_event %p
  ", event);
  	err = wait_for_completion_interruptible_timeout(&event->completion,
  							timeout * HZ);
  	ceph_osdc_put_event(event);
  	if (err > 0)
  		err = 0;
  	dout("wait_event %p returns %d
  ", event, err);
  	return err;
  }
  EXPORT_SYMBOL(ceph_osdc_wait_event);
  
  /*
f24e9980e   Sage Weil   ceph: OSD client
1712
1713
1714
1715
1716
1717
   * Register request, send initial attempt.
   */
  int ceph_osdc_start_request(struct ceph_osd_client *osdc,
  			    struct ceph_osd_request *req,
  			    bool nofail)
  {
c1ea8823b   Sage Weil   ceph: fix osd req...
1718
  	int rc = 0;
f24e9980e   Sage Weil   ceph: OSD client
1719
1720
1721
  
  	req->r_request->pages = req->r_pages;
  	req->r_request->nr_pages = req->r_num_pages;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
1722
1723
1724
1725
  #ifdef CONFIG_BLOCK
  	req->r_request->bio = req->r_bio;
  #endif
  	req->r_request->trail = req->r_trail;
f24e9980e   Sage Weil   ceph: OSD client
1726
1727
1728
1729
1730
  
  	register_request(osdc, req);
  
  	down_read(&osdc->map_sem);
  	mutex_lock(&osdc->request_mutex);
c1ea8823b   Sage Weil   ceph: fix osd req...
1731
1732
1733
1734
1735
1736
  	/*
  	 * a racing kick_requests() may have sent the message for us
  	 * while we dropped request_mutex above, so only send now if
  	 * the request still han't been touched yet.
  	 */
  	if (req->r_sent == 0) {
6f6c70067   Sage Weil   libceph: fix osd ...
1737
  		rc = __map_request(osdc, req);
9d6fcb081   Sage Weil   ceph: check retur...
1738
1739
1740
1741
1742
1743
1744
  		if (rc < 0) {
  			if (nofail) {
  				dout("osdc_start_request failed map, "
  				     " will retry %lld
  ", req->r_tid);
  				rc = 0;
  			}
234af26ff   Dan Carpenter   ceph: unlock on e...
1745
  			goto out_unlock;
9d6fcb081   Sage Weil   ceph: check retur...
1746
  		}
6f6c70067   Sage Weil   libceph: fix osd ...
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
  		if (req->r_osd == NULL) {
  			dout("send_request %p no up osds in pg
  ", req);
  			ceph_monc_request_next_osdmap(&osdc->client->monc);
  		} else {
  			rc = __send_request(osdc, req);
  			if (rc) {
  				if (nofail) {
  					dout("osdc_start_request failed send, "
  					     " will retry %lld
  ", req->r_tid);
  					rc = 0;
  				} else {
  					__unregister_request(osdc, req);
  				}
c1ea8823b   Sage Weil   ceph: fix osd req...
1762
  			}
f24e9980e   Sage Weil   ceph: OSD client
1763
1764
  		}
  	}
234af26ff   Dan Carpenter   ceph: unlock on e...
1765
1766
  
  out_unlock:
f24e9980e   Sage Weil   ceph: OSD client
1767
1768
1769
1770
  	mutex_unlock(&osdc->request_mutex);
  	up_read(&osdc->map_sem);
  	return rc;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1771
  EXPORT_SYMBOL(ceph_osdc_start_request);
f24e9980e   Sage Weil   ceph: OSD client
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
  
  /*
   * wait for a request to complete
   */
  int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
  			   struct ceph_osd_request *req)
  {
  	int rc;
  
  	rc = wait_for_completion_interruptible(&req->r_completion);
  	if (rc < 0) {
  		mutex_lock(&osdc->request_mutex);
  		__cancel_request(req);
529cfcc46   Sage Weil   ceph: unregister ...
1785
  		__unregister_request(osdc, req);
f24e9980e   Sage Weil   ceph: OSD client
1786
  		mutex_unlock(&osdc->request_mutex);
258454723   Sage Weil   ceph: fix sync vs...
1787
  		complete_request(req);
529cfcc46   Sage Weil   ceph: unregister ...
1788
1789
  		dout("wait_request tid %llu canceled/timed out
  ", req->r_tid);
f24e9980e   Sage Weil   ceph: OSD client
1790
1791
1792
1793
1794
1795
1796
  		return rc;
  	}
  
  	dout("wait_request tid %llu result %d
  ", req->r_tid, req->r_result);
  	return req->r_result;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1797
  EXPORT_SYMBOL(ceph_osdc_wait_request);
f24e9980e   Sage Weil   ceph: OSD client
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
  
  /*
   * sync - wait for all in-flight requests to flush.  avoid starvation.
   */
  void ceph_osdc_sync(struct ceph_osd_client *osdc)
  {
  	struct ceph_osd_request *req;
  	u64 last_tid, next_tid = 0;
  
  	mutex_lock(&osdc->request_mutex);
  	last_tid = osdc->last_tid;
  	while (1) {
  		req = __lookup_request_ge(osdc, next_tid);
  		if (!req)
  			break;
  		if (req->r_tid > last_tid)
  			break;
  
  		next_tid = req->r_tid + 1;
  		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
  			continue;
  
  		ceph_osdc_get_request(req);
  		mutex_unlock(&osdc->request_mutex);
  		dout("sync waiting on tid %llu (last is %llu)
  ",
  		     req->r_tid, last_tid);
  		wait_for_completion(&req->r_safe_completion);
  		mutex_lock(&osdc->request_mutex);
  		ceph_osdc_put_request(req);
  	}
  	mutex_unlock(&osdc->request_mutex);
  	dout("sync done (thru tid %llu)
  ", last_tid);
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1833
  EXPORT_SYMBOL(ceph_osdc_sync);
f24e9980e   Sage Weil   ceph: OSD client
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
  
  /*
   * init, shutdown
   */
  int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
  {
  	int err;
  
  	dout("init
  ");
  	osdc->client = client;
  	osdc->osdmap = NULL;
  	init_rwsem(&osdc->map_sem);
  	init_completion(&osdc->map_waiters);
  	osdc->last_requested_map = 0;
  	mutex_init(&osdc->request_mutex);
f24e9980e   Sage Weil   ceph: OSD client
1850
1851
  	osdc->last_tid = 0;
  	osdc->osds = RB_ROOT;
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1852
  	INIT_LIST_HEAD(&osdc->osd_lru);
f24e9980e   Sage Weil   ceph: OSD client
1853
  	osdc->requests = RB_ROOT;
422d2cb8f   Yehuda Sadeh   ceph: reset osd a...
1854
  	INIT_LIST_HEAD(&osdc->req_lru);
6f6c70067   Sage Weil   libceph: fix osd ...
1855
1856
  	INIT_LIST_HEAD(&osdc->req_unsent);
  	INIT_LIST_HEAD(&osdc->req_notarget);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1857
  	INIT_LIST_HEAD(&osdc->req_linger);
f24e9980e   Sage Weil   ceph: OSD client
1858
1859
  	osdc->num_requests = 0;
  	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1860
  	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1861
1862
1863
  	spin_lock_init(&osdc->event_lock);
  	osdc->event_tree = RB_ROOT;
  	osdc->event_count = 0;
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1864
1865
  
  	schedule_delayed_work(&osdc->osds_timeout_work,
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1866
  	   round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
f24e9980e   Sage Weil   ceph: OSD client
1867

5f44f1426   Sage Weil   ceph: handle erro...
1868
  	err = -ENOMEM;
f24e9980e   Sage Weil   ceph: OSD client
1869
1870
1871
  	osdc->req_mempool = mempool_create_kmalloc_pool(10,
  					sizeof(struct ceph_osd_request));
  	if (!osdc->req_mempool)
5f44f1426   Sage Weil   ceph: handle erro...
1872
  		goto out;
f24e9980e   Sage Weil   ceph: OSD client
1873

4f48280ee   Sage Weil   ceph: name msgpoo...
1874
1875
  	err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
  				"osd_op");
f24e9980e   Sage Weil   ceph: OSD client
1876
  	if (err < 0)
5f44f1426   Sage Weil   ceph: handle erro...
1877
  		goto out_mempool;
c16e78692   Sage Weil   ceph: use single ...
1878
  	err = ceph_msgpool_init(&osdc->msgpool_op_reply,
4f48280ee   Sage Weil   ceph: name msgpoo...
1879
1880
  				OSD_OPREPLY_FRONT_LEN, 10, true,
  				"osd_op_reply");
c16e78692   Sage Weil   ceph: use single ...
1881
1882
  	if (err < 0)
  		goto out_msgpool;
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1883
1884
1885
1886
1887
1888
1889
  
  	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
  	if (IS_ERR(osdc->notify_wq)) {
  		err = PTR_ERR(osdc->notify_wq);
  		osdc->notify_wq = NULL;
  		goto out_msgpool;
  	}
f24e9980e   Sage Weil   ceph: OSD client
1890
  	return 0;
5f44f1426   Sage Weil   ceph: handle erro...
1891

c16e78692   Sage Weil   ceph: use single ...
1892
1893
  out_msgpool:
  	ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f1426   Sage Weil   ceph: handle erro...
1894
1895
1896
1897
  out_mempool:
  	mempool_destroy(osdc->req_mempool);
  out:
  	return err;
f24e9980e   Sage Weil   ceph: OSD client
1898
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1899
  EXPORT_SYMBOL(ceph_osdc_init);
f24e9980e   Sage Weil   ceph: OSD client
1900
1901
1902
  
  void ceph_osdc_stop(struct ceph_osd_client *osdc)
  {
a40c4f10e   Yehuda Sadeh   libceph: add ling...
1903
1904
  	flush_workqueue(osdc->notify_wq);
  	destroy_workqueue(osdc->notify_wq);
f24e9980e   Sage Weil   ceph: OSD client
1905
  	cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041bd   Yehuda Sadeh   ceph: put unused ...
1906
  	cancel_delayed_work_sync(&osdc->osds_timeout_work);
f24e9980e   Sage Weil   ceph: OSD client
1907
1908
1909
1910
  	if (osdc->osdmap) {
  		ceph_osdmap_destroy(osdc->osdmap);
  		osdc->osdmap = NULL;
  	}
aca420bc5   Sage Weil   libceph: fix leak...
1911
  	remove_all_osds(osdc);
f24e9980e   Sage Weil   ceph: OSD client
1912
1913
  	mempool_destroy(osdc->req_mempool);
  	ceph_msgpool_destroy(&osdc->msgpool_op);
c16e78692   Sage Weil   ceph: use single ...
1914
  	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980e   Sage Weil   ceph: OSD client
1915
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1916
  EXPORT_SYMBOL(ceph_osdc_stop);
f24e9980e   Sage Weil   ceph: OSD client
1917
1918
1919
1920
1921
1922
1923
1924
1925
  
  /*
   * Read some contiguous pages.  If we cross a stripe boundary, shorten
   * *plen.  Return number of bytes read, or error.
   */
  int ceph_osdc_readpages(struct ceph_osd_client *osdc,
  			struct ceph_vino vino, struct ceph_file_layout *layout,
  			u64 off, u64 *plen,
  			u32 truncate_seq, u64 truncate_size,
b7495fc2f   Sage Weil   ceph: make page a...
1926
  			struct page **pages, int num_pages, int page_align)
f24e9980e   Sage Weil   ceph: OSD client
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
  {
  	struct ceph_osd_request *req;
  	int rc = 0;
  
  	dout("readpages on ino %llx.%llx on %llu~%llu
  ", vino.ino,
  	     vino.snap, off, *plen);
  	req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
  				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
  				    NULL, 0, truncate_seq, truncate_size, NULL,
b7495fc2f   Sage Weil   ceph: make page a...
1937
  				    false, 1, page_align);
a79832f26   Sage Weil   ceph: make ceph_m...
1938
1939
  	if (!req)
  		return -ENOMEM;
f24e9980e   Sage Weil   ceph: OSD client
1940
1941
1942
  
  	/* it may be a short read due to an object boundary */
  	req->r_pages = pages;
f24e9980e   Sage Weil   ceph: OSD client
1943

b7495fc2f   Sage Weil   ceph: make page a...
1944
1945
1946
  	dout("readpages  final extent is %llu~%llu (%d pages align %d)
  ",
  	     off, *plen, req->r_num_pages, page_align);
f24e9980e   Sage Weil   ceph: OSD client
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
  
  	rc = ceph_osdc_start_request(osdc, req, false);
  	if (!rc)
  		rc = ceph_osdc_wait_request(osdc, req);
  
  	ceph_osdc_put_request(req);
  	dout("readpages result %d
  ", rc);
  	return rc;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
1957
  EXPORT_SYMBOL(ceph_osdc_readpages);
f24e9980e   Sage Weil   ceph: OSD client
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
  
  /*
   * do a synchronous write on N pages
   */
  int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
  			 struct ceph_file_layout *layout,
  			 struct ceph_snap_context *snapc,
  			 u64 off, u64 len,
  			 u32 truncate_seq, u64 truncate_size,
  			 struct timespec *mtime,
  			 struct page **pages, int num_pages,
  			 int flags, int do_sync, bool nofail)
  {
  	struct ceph_osd_request *req;
  	int rc = 0;
b7495fc2f   Sage Weil   ceph: make page a...
1973
  	int page_align = off & ~PAGE_MASK;
f24e9980e   Sage Weil   ceph: OSD client
1974
1975
1976
1977
1978
1979
1980
1981
  
  	BUG_ON(vino.snap != CEPH_NOSNAP);
  	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
  				    CEPH_OSD_OP_WRITE,
  				    flags | CEPH_OSD_FLAG_ONDISK |
  					    CEPH_OSD_FLAG_WRITE,
  				    snapc, do_sync,
  				    truncate_seq, truncate_size, mtime,
b7495fc2f   Sage Weil   ceph: make page a...
1982
  				    nofail, 1, page_align);
a79832f26   Sage Weil   ceph: make ceph_m...
1983
1984
  	if (!req)
  		return -ENOMEM;
f24e9980e   Sage Weil   ceph: OSD client
1985
1986
1987
  
  	/* it may be a short write due to an object boundary */
  	req->r_pages = pages;
f24e9980e   Sage Weil   ceph: OSD client
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
  	dout("writepages %llu~%llu (%d pages)
  ", off, len,
  	     req->r_num_pages);
  
  	rc = ceph_osdc_start_request(osdc, req, nofail);
  	if (!rc)
  		rc = ceph_osdc_wait_request(osdc, req);
  
  	ceph_osdc_put_request(req);
  	if (rc == 0)
  		rc = len;
  	dout("writepages result %d
  ", rc);
  	return rc;
  }
3d14c5d2b   Yehuda Sadeh   ceph: factor out ...
2003
  EXPORT_SYMBOL(ceph_osdc_writepages);
f24e9980e   Sage Weil   ceph: OSD client
2004
2005
2006
2007
2008
2009
2010
  
  /*
   * handle incoming message
   */
  static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
  {
  	struct ceph_osd *osd = con->private;
32c895e77   Julia Lawall   fs/ceph: Move a d...
2011
  	struct ceph_osd_client *osdc;
f24e9980e   Sage Weil   ceph: OSD client
2012
2013
2014
  	int type = le16_to_cpu(msg->hdr.type);
  
  	if (!osd)
4a32f93d2   Sage Weil   ceph: fix map han...
2015
  		goto out;
32c895e77   Julia Lawall   fs/ceph: Move a d...
2016
  	osdc = osd->o_osdc;
f24e9980e   Sage Weil   ceph: OSD client
2017
2018
2019
2020
2021
2022
  
  	switch (type) {
  	case CEPH_MSG_OSD_MAP:
  		ceph_osdc_handle_map(osdc, msg);
  		break;
  	case CEPH_MSG_OSD_OPREPLY:
350b1c32e   Sage Weil   ceph: control acc...
2023
  		handle_reply(osdc, msg, con);
f24e9980e   Sage Weil   ceph: OSD client
2024
  		break;
a40c4f10e   Yehuda Sadeh   libceph: add ling...
2025
2026
2027
  	case CEPH_MSG_WATCH_NOTIFY:
  		handle_watch_notify(osdc, msg);
  		break;
f24e9980e   Sage Weil   ceph: OSD client
2028
2029
2030
2031
2032
2033
  
  	default:
  		pr_err("received unknown message type %d %s
  ", type,
  		       ceph_msg_type_name(type));
  	}
4a32f93d2   Sage Weil   ceph: fix map han...
2034
  out:
f24e9980e   Sage Weil   ceph: OSD client
2035
2036
  	ceph_msg_put(msg);
  }
5b3a4db3e   Sage Weil   ceph: fix up unex...
2037
  /*
21b667f69   Sage Weil   ceph: simplify pa...
2038
2039
   * lookup and return message for incoming reply.  set up reply message
   * pages.
5b3a4db3e   Sage Weil   ceph: fix up unex...
2040
2041
   */
  static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2042
2043
  				  struct ceph_msg_header *hdr,
  				  int *skip)
f24e9980e   Sage Weil   ceph: OSD client
2044
2045
2046
  {
  	struct ceph_osd *osd = con->private;
  	struct ceph_osd_client *osdc = osd->o_osdc;
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2047
  	struct ceph_msg *m;
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2048
  	struct ceph_osd_request *req;
5b3a4db3e   Sage Weil   ceph: fix up unex...
2049
2050
  	int front = le32_to_cpu(hdr->front_len);
  	int data_len = le32_to_cpu(hdr->data_len);
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2051
  	u64 tid;
f24e9980e   Sage Weil   ceph: OSD client
2052

0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2053
2054
2055
2056
2057
2058
  	tid = le64_to_cpu(hdr->tid);
  	mutex_lock(&osdc->request_mutex);
  	req = __lookup_request(osdc, tid);
  	if (!req) {
  		*skip = 1;
  		m = NULL;
c16e78692   Sage Weil   ceph: use single ...
2059
2060
  		pr_info("get_reply unknown tid %llu from osd%d
  ", tid,
5b3a4db3e   Sage Weil   ceph: fix up unex...
2061
  			osd->o_osd);
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2062
2063
  		goto out;
  	}
c16e78692   Sage Weil   ceph: use single ...
2064
2065
2066
2067
2068
2069
2070
  
  	if (req->r_con_filling_msg) {
  		dout("get_reply revoking msg %p from old con %p
  ",
  		     req->r_reply, req->r_con_filling_msg);
  		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
  		ceph_con_put(req->r_con_filling_msg);
6f46cb293   Sage Weil   ceph: fix theoret...
2071
  		req->r_con_filling_msg = NULL;
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2072
  	}
c16e78692   Sage Weil   ceph: use single ...
2073
2074
2075
2076
  	if (front > req->r_reply->front.iov_len) {
  		pr_warning("get_reply front %d > preallocated %d
  ",
  			   front, (int)req->r_reply->front.iov_len);
34d23762d   Yehuda Sadeh   ceph: all allocat...
2077
  		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
a79832f26   Sage Weil   ceph: make ceph_m...
2078
  		if (!m)
c16e78692   Sage Weil   ceph: use single ...
2079
2080
2081
2082
2083
  			goto out;
  		ceph_msg_put(req->r_reply);
  		req->r_reply = m;
  	}
  	m = ceph_msg_get(req->r_reply);
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2084
  	if (data_len > 0) {
b7495fc2f   Sage Weil   ceph: make page a...
2085
  		int want = calc_pages_for(req->r_page_alignment, data_len);
21b667f69   Sage Weil   ceph: simplify pa...
2086
2087
  
  		if (unlikely(req->r_num_pages < want)) {
9bb0ce2b0   Sage Weil   libceph: fix page...
2088
2089
2090
2091
  			pr_warning("tid %lld reply has %d bytes %d pages, we"
  				   " had only %d pages ready
  ", tid, data_len,
  				   want, req->r_num_pages);
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2092
2093
  			*skip = 1;
  			ceph_msg_put(m);
a79832f26   Sage Weil   ceph: make ceph_m...
2094
  			m = NULL;
21b667f69   Sage Weil   ceph: simplify pa...
2095
  			goto out;
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2096
  		}
21b667f69   Sage Weil   ceph: simplify pa...
2097
2098
  		m->pages = req->r_pages;
  		m->nr_pages = req->r_num_pages;
c5c6b19d4   Sage Weil   ceph: explicitly ...
2099
  		m->page_alignment = req->r_page_alignment;
68b4476b0   Yehuda Sadeh   ceph: messenger a...
2100
2101
2102
  #ifdef CONFIG_BLOCK
  		m->bio = req->r_bio;
  #endif
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2103
  	}
5b3a4db3e   Sage Weil   ceph: fix up unex...
2104
  	*skip = 0;
c16e78692   Sage Weil   ceph: use single ...
2105
2106
2107
  	req->r_con_filling_msg = ceph_con_get(con);
  	dout("get_reply tid %lld %p
  ", tid, m);
0547a9b30   Yehuda Sadeh   ceph: alloc messa...
2108
2109
2110
  
  out:
  	mutex_unlock(&osdc->request_mutex);
2450418c4   Yehuda Sadeh   ceph: allocate mi...
2111
  	return m;
5b3a4db3e   Sage Weil   ceph: fix up unex...
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
  
  }
  
  static struct ceph_msg *alloc_msg(struct ceph_connection *con,
  				  struct ceph_msg_header *hdr,
  				  int *skip)
  {
  	struct ceph_osd *osd = con->private;
  	int type = le16_to_cpu(hdr->type);
  	int front = le32_to_cpu(hdr->front_len);
  
  	switch (type) {
  	case CEPH_MSG_OSD_MAP:
a40c4f10e   Yehuda Sadeh   libceph: add ling...
2125
  	case CEPH_MSG_WATCH_NOTIFY:
34d23762d   Yehuda Sadeh   ceph: all allocat...
2126
  		return ceph_msg_new(type, front, GFP_NOFS);
5b3a4db3e   Sage Weil   ceph: fix up unex...
2127
2128
2129
2130
2131
2132
2133
2134
2135
  	case CEPH_MSG_OSD_OPREPLY:
  		return get_reply(con, hdr, skip);
  	default:
  		pr_info("alloc_msg unexpected msg type %d from osd%d
  ", type,
  			osd->o_osd);
  		*skip = 1;
  		return NULL;
  	}
f24e9980e   Sage Weil   ceph: OSD client
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
  }
  
  /*
   * Wrappers to refcount containing ceph_osd struct
   */
  static struct ceph_connection *get_osd_con(struct ceph_connection *con)
  {
  	struct ceph_osd *osd = con->private;
  	if (get_osd(osd))
  		return con;
  	return NULL;
  }
  
  static void put_osd_con(struct ceph_connection *con)
  {
  	struct ceph_osd *osd = con->private;
  	put_osd(osd);
  }
4e7a5dcd1   Sage Weil   ceph: negotiate a...
2154
2155
2156
2157
  /*
   * authentication
   */
  static int get_authorizer(struct ceph_connection *con,
213c99ee0   Sage Weil   ceph: whitespace ...
2158
2159
  			  void **buf, int *len, int *proto,
  			  void **reply_buf, int *reply_len, int force_new)
4e7a5dcd1   Sage Weil   ceph: negotiate a...
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
  {
  	struct ceph_osd *o = con->private;
  	struct ceph_osd_client *osdc = o->o_osdc;
  	struct ceph_auth_client *ac = osdc->client->monc.auth;
  	int ret = 0;
  
  	if (force_new && o->o_authorizer) {
  		ac->ops->destroy_authorizer(ac, o->o_authorizer);
  		o->o_authorizer = NULL;
  	}
  	if (o->o_authorizer == NULL) {
  		ret = ac->ops->create_authorizer(
  			ac, CEPH_ENTITY_TYPE_OSD,
  			&o->o_authorizer,
  			&o->o_authorizer_buf,
  			&o->o_authorizer_buf_len,
  			&o->o_authorizer_reply_buf,
  			&o->o_authorizer_reply_buf_len);
  		if (ret)
213c99ee0   Sage Weil   ceph: whitespace ...
2179
  			return ret;
4e7a5dcd1   Sage Weil   ceph: negotiate a...
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
  	}
  
  	*proto = ac->protocol;
  	*buf = o->o_authorizer_buf;
  	*len = o->o_authorizer_buf_len;
  	*reply_buf = o->o_authorizer_reply_buf;
  	*reply_len = o->o_authorizer_reply_buf_len;
  	return 0;
  }
  
  
  static int verify_authorizer_reply(struct ceph_connection *con, int len)
  {
  	struct ceph_osd *o = con->private;
  	struct ceph_osd_client *osdc = o->o_osdc;
  	struct ceph_auth_client *ac = osdc->client->monc.auth;
  
  	return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
  }
9bd2e6f8b   Sage Weil   ceph: allow renew...
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
  static int invalidate_authorizer(struct ceph_connection *con)
  {
  	struct ceph_osd *o = con->private;
  	struct ceph_osd_client *osdc = o->o_osdc;
  	struct ceph_auth_client *ac = osdc->client->monc.auth;
  
  	if (ac->ops->invalidate_authorizer)
  		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
  
  	return ceph_monc_validate_auth(&osdc->client->monc);
  }
4e7a5dcd1   Sage Weil   ceph: negotiate a...
2210

9e32789f6   Tobias Klauser   ceph: Storage cla...
2211
  static const struct ceph_connection_operations osd_con_ops = {
f24e9980e   Sage Weil   ceph: OSD client
2212
2213
2214
  	.get = get_osd_con,
  	.put = put_osd_con,
  	.dispatch = dispatch,
4e7a5dcd1   Sage Weil   ceph: negotiate a...
2215
2216
  	.get_authorizer = get_authorizer,
  	.verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8b   Sage Weil   ceph: allow renew...
2217
  	.invalidate_authorizer = invalidate_authorizer,
f24e9980e   Sage Weil   ceph: OSD client
2218
  	.alloc_msg = alloc_msg,
81b024e70   Sage Weil   ceph: reset osd s...
2219
  	.fault = osd_reset,
f24e9980e   Sage Weil   ceph: OSD client
2220
  };