Commit 76db8ac45fc738f7d7664fe9b56d15c594a45228

Authored by Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: fix readdir EOVERFLOW on 32-bit archs
  ceph: fix frag offset for non-leftmost frags
  ceph: fix dangling pointer
  ceph: explicitly specify page alignment in network messages
  ceph: make page alignment explicit in osd interface
  ceph: fix comment, remove extraneous args
  ceph: fix update of ctime from MDS
  ceph: fix version check on racing inode updates
  ceph: fix uid/gid on resent mds requests
  ceph: fix rdcache_gen usage and invalidate
  ceph: re-request max_size if cap auth changes
  ceph: only let auth caps update max_size
  ceph: fix open for write on clustered mds
  ceph: fix bad pointer dereference in ceph_fill_trace
  ceph: fix small seq message skipping
  Revert "ceph: update issue_seq on cap grant"

Showing 14 changed files Side-by-side Diff

... ... @@ -204,7 +204,7 @@
204 204 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
205 205 page->index << PAGE_CACHE_SHIFT, &len,
206 206 ci->i_truncate_seq, ci->i_truncate_size,
207   - &page, 1);
  207 + &page, 1, 0);
208 208 if (err == -ENOENT)
209 209 err = 0;
210 210 if (err < 0) {
... ... @@ -287,7 +287,7 @@
287 287 rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
288 288 offset, &len,
289 289 ci->i_truncate_seq, ci->i_truncate_size,
290   - pages, nr_pages);
  290 + pages, nr_pages, 0);
291 291 if (rc == -ENOENT)
292 292 rc = 0;
293 293 if (rc < 0)
... ... @@ -774,7 +774,7 @@
774 774 snapc, do_sync,
775 775 ci->i_truncate_seq,
776 776 ci->i_truncate_size,
777   - &inode->i_mtime, true, 1);
  777 + &inode->i_mtime, true, 1, 0);
778 778 max_pages = req->r_num_pages;
779 779  
780 780 alloc_page_vec(fsc, req);
... ... @@ -1430,8 +1430,8 @@
1430 1430 invalidating_gen == ci->i_rdcache_gen) {
1431 1431 /* success. */
1432 1432 dout("try_nonblocking_invalidate %p success\n", inode);
1433   - ci->i_rdcache_gen = 0;
1434   - ci->i_rdcache_revoking = 0;
  1433 + /* save any racing async invalidate some trouble */
  1434 + ci->i_rdcache_revoking = ci->i_rdcache_gen - 1;
1435 1435 return 0;
1436 1436 }
1437 1437 dout("try_nonblocking_invalidate %p failed\n", inode);
... ... @@ -2273,8 +2273,7 @@
2273 2273 {
2274 2274 struct ceph_inode_info *ci = ceph_inode(inode);
2275 2275 int mds = session->s_mds;
2276   - unsigned seq = le32_to_cpu(grant->seq);
2277   - unsigned issue_seq = le32_to_cpu(grant->issue_seq);
  2276 + int seq = le32_to_cpu(grant->seq);
2278 2277 int newcaps = le32_to_cpu(grant->caps);
2279 2278 int issued, implemented, used, wanted, dirty;
2280 2279 u64 size = le64_to_cpu(grant->size);
... ... @@ -2286,8 +2285,8 @@
2286 2285 int revoked_rdcache = 0;
2287 2286 int queue_invalidate = 0;
2288 2287  
2289   - dout("handle_cap_grant inode %p cap %p mds%d seq %u/%u %s\n",
2290   - inode, cap, mds, seq, issue_seq, ceph_cap_string(newcaps));
  2288 + dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
  2289 + inode, cap, mds, seq, ceph_cap_string(newcaps));
2291 2290 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
2292 2291 inode->i_size);
2293 2292  
... ... @@ -2383,7 +2382,6 @@
2383 2382 }
2384 2383  
2385 2384 cap->seq = seq;
2386   - cap->issue_seq = issue_seq;
2387 2385  
2388 2386 /* file layout may have changed */
2389 2387 ci->i_layout = grant->layout;
... ... @@ -2691,6 +2689,11 @@
2691 2689 NULL /* no caps context */);
2692 2690 try_flush_caps(inode, session, NULL);
2693 2691 up_read(&mdsc->snap_rwsem);
  2692 +
  2693 + /* make sure we re-request max_size, if necessary */
  2694 + spin_lock(&inode->i_lock);
  2695 + ci->i_requested_max_size = 0;
  2696 + spin_unlock(&inode->i_lock);
2694 2697 }
2695 2698  
2696 2699 /*
... ... @@ -336,7 +336,10 @@
336 336 if (req->r_reply_info.dir_end) {
337 337 kfree(fi->last_name);
338 338 fi->last_name = NULL;
339   - fi->next_offset = 2;
  339 + if (ceph_frag_is_rightmost(frag))
  340 + fi->next_offset = 2;
  341 + else
  342 + fi->next_offset = 0;
340 343 } else {
341 344 rinfo = &req->r_reply_info;
342 345 err = note_last_dentry(fi,
343 346  
344 347  
... ... @@ -355,18 +358,22 @@
355 358 u64 pos = ceph_make_fpos(frag, off);
356 359 struct ceph_mds_reply_inode *in =
357 360 rinfo->dir_in[off - fi->offset].in;
  361 + struct ceph_vino vino;
  362 + ino_t ino;
  363 +
358 364 dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
359 365 off, off - fi->offset, rinfo->dir_nr, pos,
360 366 rinfo->dir_dname_len[off - fi->offset],
361 367 rinfo->dir_dname[off - fi->offset], in);
362 368 BUG_ON(!in);
363 369 ftype = le32_to_cpu(in->mode) >> 12;
  370 + vino.ino = le64_to_cpu(in->ino);
  371 + vino.snap = le64_to_cpu(in->snapid);
  372 + ino = ceph_vino_to_ino(vino);
364 373 if (filldir(dirent,
365 374 rinfo->dir_dname[off - fi->offset],
366 375 rinfo->dir_dname_len[off - fi->offset],
367   - pos,
368   - le64_to_cpu(in->ino),
369   - ftype) < 0) {
  376 + pos, ino, ftype) < 0) {
370 377 dout("filldir stopping us...\n");
371 378 return 0;
372 379 }
... ... @@ -414,6 +421,7 @@
414 421 fi->last_readdir = NULL;
415 422 }
416 423 kfree(fi->last_name);
  424 + fi->last_name = NULL;
417 425 fi->next_offset = 2; /* compensate for . and .. */
418 426 if (fi->dentry) {
419 427 dput(fi->dentry);
... ... @@ -154,11 +154,13 @@
154 154 }
155 155  
156 156 /*
157   - * No need to block if we have any caps. Update wanted set
  157 + * No need to block if we have caps on the auth MDS (for
  158 + * write) or any MDS (for read). Update wanted set
158 159 * asynchronously.
159 160 */
160 161 spin_lock(&inode->i_lock);
161   - if (__ceph_is_any_real_caps(ci)) {
  162 + if (__ceph_is_any_real_caps(ci) &&
  163 + (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
162 164 int mds_wanted = __ceph_caps_mds_wanted(ci);
163 165 int issued = __ceph_caps_issued(ci, NULL);
164 166  
165 167  
... ... @@ -280,11 +282,12 @@
280 282 static int striped_read(struct inode *inode,
281 283 u64 off, u64 len,
282 284 struct page **pages, int num_pages,
283   - int *checkeof)
  285 + int *checkeof, bool align_to_pages)
284 286 {
285 287 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
286 288 struct ceph_inode_info *ci = ceph_inode(inode);
287 289 u64 pos, this_len;
  290 + int io_align, page_align;
288 291 int page_off = off & ~PAGE_CACHE_MASK; /* first byte's offset in page */
289 292 int left, pages_left;
290 293 int read;
291 294  
292 295  
... ... @@ -300,14 +303,19 @@
300 303 page_pos = pages;
301 304 pages_left = num_pages;
302 305 read = 0;
  306 + io_align = off & ~PAGE_MASK;
303 307  
304 308 more:
  309 + if (align_to_pages)
  310 + page_align = (pos - io_align) & ~PAGE_MASK;
  311 + else
  312 + page_align = pos & ~PAGE_MASK;
305 313 this_len = left;
306 314 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
307 315 &ci->i_layout, pos, &this_len,
308 316 ci->i_truncate_seq,
309 317 ci->i_truncate_size,
310   - page_pos, pages_left);
  318 + page_pos, pages_left, page_align);
311 319 hit_stripe = this_len < left;
312 320 was_short = ret >= 0 && ret < this_len;
313 321 if (ret == -ENOENT)
314 322  
315 323  
316 324  
... ... @@ -374,26 +382,25 @@
374 382 dout("sync_read on file %p %llu~%u %s\n", file, off, len,
375 383 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
376 384  
377   - if (file->f_flags & O_DIRECT) {
378   - pages = ceph_get_direct_page_vector(data, num_pages, off, len);
379   -
380   - /*
381   - * flush any page cache pages in this range. this
382   - * will make concurrent normal and O_DIRECT io slow,
383   - * but it will at least behave sensibly when they are
384   - * in sequence.
385   - */
386   - } else {
  385 + if (file->f_flags & O_DIRECT)
  386 + pages = ceph_get_direct_page_vector(data, num_pages);
  387 + else
387 388 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
388   - }
389 389 if (IS_ERR(pages))
390 390 return PTR_ERR(pages);
391 391  
  392 + /*
  393 + * flush any page cache pages in this range. this
  394 + * will make concurrent normal and sync io slow,
  395 + * but it will at least behave sensibly when they are
  396 + * in sequence.
  397 + */
392 398 ret = filemap_write_and_wait(inode->i_mapping);
393 399 if (ret < 0)
394 400 goto done;
395 401  
396   - ret = striped_read(inode, off, len, pages, num_pages, checkeof);
  402 + ret = striped_read(inode, off, len, pages, num_pages, checkeof,
  403 + file->f_flags & O_DIRECT);
397 404  
398 405 if (ret >= 0 && (file->f_flags & O_DIRECT) == 0)
399 406 ret = ceph_copy_page_vector_to_user(pages, data, off, ret);
... ... @@ -448,6 +455,7 @@
448 455 int flags;
449 456 int do_sync = 0;
450 457 int check_caps = 0;
  458 + int page_align, io_align;
451 459 int ret;
452 460 struct timespec mtime = CURRENT_TIME;
453 461  
... ... @@ -462,6 +470,8 @@
462 470 else
463 471 pos = *offset;
464 472  
  473 + io_align = pos & ~PAGE_MASK;
  474 +
465 475 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
466 476 if (ret < 0)
467 477 return ret;
468 478  
469 479  
... ... @@ -486,20 +496,26 @@
486 496 */
487 497 more:
488 498 len = left;
  499 + if (file->f_flags & O_DIRECT)
  500 + /* write from beginning of first page, regardless of
  501 + io alignment */
  502 + page_align = (pos - io_align) & ~PAGE_MASK;
  503 + else
  504 + page_align = pos & ~PAGE_MASK;
489 505 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
490 506 ceph_vino(inode), pos, &len,
491 507 CEPH_OSD_OP_WRITE, flags,
492 508 ci->i_snap_realm->cached_context,
493 509 do_sync,
494 510 ci->i_truncate_seq, ci->i_truncate_size,
495   - &mtime, false, 2);
  511 + &mtime, false, 2, page_align);
496 512 if (!req)
497 513 return -ENOMEM;
498 514  
499 515 num_pages = calc_pages_for(pos, len);
500 516  
501 517 if (file->f_flags & O_DIRECT) {
502   - pages = ceph_get_direct_page_vector(data, num_pages, pos, len);
  518 + pages = ceph_get_direct_page_vector(data, num_pages);
503 519 if (IS_ERR(pages)) {
504 520 ret = PTR_ERR(pages);
505 521 goto out;
... ... @@ -470,7 +470,9 @@
470 470  
471 471 if (issued & (CEPH_CAP_FILE_EXCL|
472 472 CEPH_CAP_FILE_WR|
473   - CEPH_CAP_FILE_BUFFER)) {
  473 + CEPH_CAP_FILE_BUFFER|
  474 + CEPH_CAP_AUTH_EXCL|
  475 + CEPH_CAP_XATTR_EXCL)) {
474 476 if (timespec_compare(ctime, &inode->i_ctime) > 0) {
475 477 dout("ctime %ld.%09ld -> %ld.%09ld inc w/ cap\n",
476 478 inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
... ... @@ -510,7 +512,7 @@
510 512 warn = 1;
511 513 }
512 514 } else {
513   - /* we have no write caps; whatever the MDS says is true */
  515 + /* we have no write|excl caps; whatever the MDS says is true */
514 516 if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {
515 517 inode->i_ctime = *ctime;
516 518 inode->i_mtime = *mtime;
517 519  
... ... @@ -566,12 +568,17 @@
566 568  
567 569 /*
568 570 * provided version will be odd if inode value is projected,
569   - * even if stable. skip the update if we have a newer info
570   - * (e.g., due to inode info racing form multiple MDSs), or if
571   - * we are getting projected (unstable) inode info.
  571 + * even if stable. skip the update if we have newer stable
  572 + * info (ours>=theirs, e.g. due to racing mds replies), unless
  573 + * we are getting projected (unstable) info (in which case the
  574 + * version is odd, and we want ours>theirs).
  575 + * us them
  576 + * 2 2 skip
  577 + * 3 2 skip
  578 + * 3 3 update
572 579 */
573 580 if (le64_to_cpu(info->version) > 0 &&
574   - (ci->i_version & ~1) > le64_to_cpu(info->version))
  581 + (ci->i_version & ~1) >= le64_to_cpu(info->version))
575 582 goto no_change;
576 583  
577 584 issued = __ceph_caps_issued(ci, &implemented);
... ... @@ -605,7 +612,14 @@
605 612 le32_to_cpu(info->time_warp_seq),
606 613 &ctime, &mtime, &atime);
607 614  
608   - ci->i_max_size = le64_to_cpu(info->max_size);
  615 + /* only update max_size on auth cap */
  616 + if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
  617 + ci->i_max_size != le64_to_cpu(info->max_size)) {
  618 + dout("max_size %lld -> %llu\n", ci->i_max_size,
  619 + le64_to_cpu(info->max_size));
  620 + ci->i_max_size = le64_to_cpu(info->max_size);
  621 + }
  622 +
609 623 ci->i_layout = info->layout;
610 624 inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
611 625  
... ... @@ -1054,7 +1068,8 @@
1054 1068 ininfo = rinfo->targeti.in;
1055 1069 vino.ino = le64_to_cpu(ininfo->ino);
1056 1070 vino.snap = le64_to_cpu(ininfo->snapid);
1057   - if (!dn->d_inode) {
  1071 + in = dn->d_inode;
  1072 + if (!in) {
1058 1073 in = ceph_get_inode(sb, vino);
1059 1074 if (IS_ERR(in)) {
1060 1075 pr_err("fill_trace bad get_inode "
1061 1076  
... ... @@ -1385,11 +1400,8 @@
1385 1400 spin_lock(&inode->i_lock);
1386 1401 dout("invalidate_pages %p gen %d revoking %d\n", inode,
1387 1402 ci->i_rdcache_gen, ci->i_rdcache_revoking);
1388   - if (ci->i_rdcache_gen == 0 ||
1389   - ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1390   - BUG_ON(ci->i_rdcache_revoking > ci->i_rdcache_gen);
  1403 + if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1391 1404 /* nevermind! */
1392   - ci->i_rdcache_revoking = 0;
1393 1405 spin_unlock(&inode->i_lock);
1394 1406 goto out;
1395 1407 }
1396 1408  
1397 1409  
... ... @@ -1399,15 +1411,16 @@
1399 1411 ceph_invalidate_nondirty_pages(inode->i_mapping);
1400 1412  
1401 1413 spin_lock(&inode->i_lock);
1402   - if (orig_gen == ci->i_rdcache_gen) {
  1414 + if (orig_gen == ci->i_rdcache_gen &&
  1415 + orig_gen == ci->i_rdcache_revoking) {
1403 1416 dout("invalidate_pages %p gen %d successful\n", inode,
1404 1417 ci->i_rdcache_gen);
1405   - ci->i_rdcache_gen = 0;
1406   - ci->i_rdcache_revoking = 0;
  1418 + ci->i_rdcache_revoking--;
1407 1419 check = 1;
1408 1420 } else {
1409   - dout("invalidate_pages %p gen %d raced, gen now %d\n",
1410   - inode, orig_gen, ci->i_rdcache_gen);
  1421 + dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
  1422 + inode, orig_gen, ci->i_rdcache_gen,
  1423 + ci->i_rdcache_revoking);
1411 1424 }
1412 1425 spin_unlock(&inode->i_lock);
1413 1426  
... ... @@ -1738,7 +1751,7 @@
1738 1751 return 0;
1739 1752 }
1740 1753  
1741   - dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask));
  1754 + dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
1742 1755 if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
1743 1756 return 0;
1744 1757  
fs/ceph/mds_client.c
... ... @@ -528,6 +528,9 @@
528 528 ceph_mdsc_get_request(req);
529 529 __insert_request(mdsc, req);
530 530  
  531 + req->r_uid = current_fsuid();
  532 + req->r_gid = current_fsgid();
  533 +
531 534 if (dir) {
532 535 struct ceph_inode_info *ci = ceph_inode(dir);
533 536  
... ... @@ -1587,8 +1590,8 @@
1587 1590  
1588 1591 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1589 1592 head->op = cpu_to_le32(req->r_op);
1590   - head->caller_uid = cpu_to_le32(current_fsuid());
1591   - head->caller_gid = cpu_to_le32(current_fsgid());
  1593 + head->caller_uid = cpu_to_le32(req->r_uid);
  1594 + head->caller_gid = cpu_to_le32(req->r_gid);
1592 1595 head->args = req->r_args;
1593 1596  
1594 1597 ceph_encode_filepath(&p, end, ino1, path1);
fs/ceph/mds_client.h
... ... @@ -170,6 +170,8 @@
170 170  
171 171 union ceph_mds_request_args r_args;
172 172 int r_fmode; /* file mode, if expecting cap */
  173 + uid_t r_uid;
  174 + gid_t r_gid;
173 175  
174 176 /* for choosing which mds to send this request to */
175 177 int r_direct_mode;
... ... @@ -293,9 +293,7 @@
293 293 int i_rd_ref, i_rdcache_ref, i_wr_ref;
294 294 int i_wrbuffer_ref, i_wrbuffer_ref_head;
295 295 u32 i_shared_gen; /* increment each time we get FILE_SHARED */
296   - u32 i_rdcache_gen; /* we increment this each time we get
297   - FILE_CACHE. If it's non-zero, we
298   - _may_ have cached pages. */
  296 + u32 i_rdcache_gen; /* incremented each time we get FILE_CACHE. */
299 297 u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */
300 298  
301 299 struct list_head i_unsafe_writes; /* uncommitted sync writes */
include/linux/ceph/libceph.h
... ... @@ -227,8 +227,7 @@
227 227 extern void ceph_release_page_vector(struct page **pages, int num_pages);
228 228  
229 229 extern struct page **ceph_get_direct_page_vector(const char __user *data,
230   - int num_pages,
231   - loff_t off, size_t len);
  230 + int num_pages);
232 231 extern void ceph_put_page_vector(struct page **pages, int num_pages);
233 232 extern void ceph_release_page_vector(struct page **pages, int num_pages);
234 233 extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
include/linux/ceph/messenger.h
... ... @@ -82,6 +82,7 @@
82 82 struct ceph_buffer *middle;
83 83 struct page **pages; /* data payload. NOT OWNER. */
84 84 unsigned nr_pages; /* size of page array */
  85 + unsigned page_alignment; /* io offset in first page */
85 86 struct ceph_pagelist *pagelist; /* instead of pages */
86 87 struct list_head list_head;
87 88 struct kref kref;
include/linux/ceph/osd_client.h
... ... @@ -79,6 +79,7 @@
79 79 struct ceph_file_layout r_file_layout;
80 80 struct ceph_snap_context *r_snapc; /* snap context for writes */
81 81 unsigned r_num_pages; /* size of page array (follows) */
  82 + unsigned r_page_alignment; /* io offset in first page */
82 83 struct page **r_pages; /* pages for data payload */
83 84 int r_pages_from_pool;
84 85 int r_own_pages; /* if true, i own page list */
... ... @@ -194,7 +195,8 @@
194 195 int do_sync, u32 truncate_seq,
195 196 u64 truncate_size,
196 197 struct timespec *mtime,
197   - bool use_mempool, int num_reply);
  198 + bool use_mempool, int num_reply,
  199 + int page_align);
198 200  
199 201 static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
200 202 {
... ... @@ -218,7 +220,8 @@
218 220 struct ceph_file_layout *layout,
219 221 u64 off, u64 *plen,
220 222 u32 truncate_seq, u64 truncate_size,
221   - struct page **pages, int nr_pages);
  223 + struct page **pages, int nr_pages,
  224 + int page_align);
222 225  
223 226 extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
224 227 struct ceph_vino vino,
net/ceph/messenger.c
... ... @@ -540,8 +540,7 @@
540 540 /* initialize page iterator */
541 541 con->out_msg_pos.page = 0;
542 542 if (m->pages)
543   - con->out_msg_pos.page_pos =
544   - le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
  543 + con->out_msg_pos.page_pos = m->page_alignment;
545 544 else
546 545 con->out_msg_pos.page_pos = 0;
547 546 con->out_msg_pos.data_pos = 0;
... ... @@ -1491,7 +1490,7 @@
1491 1490 struct ceph_msg *m = con->in_msg;
1492 1491 int ret;
1493 1492 int to, left;
1494   - unsigned front_len, middle_len, data_len, data_off;
  1493 + unsigned front_len, middle_len, data_len;
1495 1494 int datacrc = con->msgr->nocrc;
1496 1495 int skip;
1497 1496 u64 seq;
1498 1497  
1499 1498  
... ... @@ -1527,19 +1526,17 @@
1527 1526 data_len = le32_to_cpu(con->in_hdr.data_len);
1528 1527 if (data_len > CEPH_MSG_MAX_DATA_LEN)
1529 1528 return -EIO;
1530   - data_off = le16_to_cpu(con->in_hdr.data_off);
1531 1529  
1532 1530 /* verify seq# */
1533 1531 seq = le64_to_cpu(con->in_hdr.seq);
1534 1532 if ((s64)seq - (s64)con->in_seq < 1) {
1535   - pr_info("skipping %s%lld %s seq %lld, expected %lld\n",
  1533 + pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1536 1534 ENTITY_NAME(con->peer_name),
1537 1535 ceph_pr_addr(&con->peer_addr.in_addr),
1538 1536 seq, con->in_seq + 1);
1539 1537 con->in_base_pos = -front_len - middle_len - data_len -
1540 1538 sizeof(m->footer);
1541 1539 con->in_tag = CEPH_MSGR_TAG_READY;
1542   - con->in_seq++;
1543 1540 return 0;
1544 1541 } else if ((s64)seq - (s64)con->in_seq > 1) {
1545 1542 pr_err("read_partial_message bad seq %lld expected %lld\n",
... ... @@ -1576,7 +1573,7 @@
1576 1573  
1577 1574 con->in_msg_pos.page = 0;
1578 1575 if (m->pages)
1579   - con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
  1576 + con->in_msg_pos.page_pos = m->page_alignment;
1580 1577 else
1581 1578 con->in_msg_pos.page_pos = 0;
1582 1579 con->in_msg_pos.data_pos = 0;
... ... @@ -2301,6 +2298,7 @@
2301 2298  
2302 2299 /* data */
2303 2300 m->nr_pages = 0;
  2301 + m->page_alignment = 0;
2304 2302 m->pages = NULL;
2305 2303 m->pagelist = NULL;
2306 2304 m->bio = NULL;
... ... @@ -2370,6 +2368,7 @@
2370 2368 type, front_len);
2371 2369 return NULL;
2372 2370 }
  2371 + msg->page_alignment = le16_to_cpu(hdr->data_off);
2373 2372 }
2374 2373 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2375 2374  
net/ceph/osd_client.c
... ... @@ -71,6 +71,7 @@
71 71 op->extent.length = objlen;
72 72 }
73 73 req->r_num_pages = calc_pages_for(off, *plen);
  74 + req->r_page_alignment = off & ~PAGE_MASK;
74 75 if (op->op == CEPH_OSD_OP_WRITE)
75 76 op->payload_len = *plen;
76 77  
... ... @@ -390,6 +391,8 @@
390 391 req->r_request->hdr.data_len = cpu_to_le32(data_len);
391 392 }
392 393  
  394 + req->r_request->page_alignment = req->r_page_alignment;
  395 +
393 396 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
394 397 msg_size = p - msg->front.iov_base;
395 398 msg->front.iov_len = msg_size;
... ... @@ -419,7 +422,8 @@
419 422 u32 truncate_seq,
420 423 u64 truncate_size,
421 424 struct timespec *mtime,
422   - bool use_mempool, int num_reply)
  425 + bool use_mempool, int num_reply,
  426 + int page_align)
423 427 {
424 428 struct ceph_osd_req_op ops[3];
425 429 struct ceph_osd_request *req;
... ... @@ -447,6 +451,10 @@
447 451 calc_layout(osdc, vino, layout, off, plen, req, ops);
448 452 req->r_file_layout = *layout; /* keep a copy */
449 453  
  454 + /* in case it differs from natural alignment that calc_layout
  455 + filled in for us */
  456 + req->r_page_alignment = page_align;
  457 +
450 458 ceph_osdc_build_request(req, off, plen, ops,
451 459 snapc,
452 460 mtime,
... ... @@ -1489,7 +1497,7 @@
1489 1497 struct ceph_vino vino, struct ceph_file_layout *layout,
1490 1498 u64 off, u64 *plen,
1491 1499 u32 truncate_seq, u64 truncate_size,
1492   - struct page **pages, int num_pages)
  1500 + struct page **pages, int num_pages, int page_align)
1493 1501 {
1494 1502 struct ceph_osd_request *req;
1495 1503 int rc = 0;
1496 1504  
... ... @@ -1499,15 +1507,15 @@
1499 1507 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1500 1508 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1501 1509 NULL, 0, truncate_seq, truncate_size, NULL,
1502   - false, 1);
  1510 + false, 1, page_align);
1503 1511 if (!req)
1504 1512 return -ENOMEM;
1505 1513  
1506 1514 /* it may be a short read due to an object boundary */
1507 1515 req->r_pages = pages;
1508 1516  
1509   - dout("readpages final extent is %llu~%llu (%d pages)\n",
1510   - off, *plen, req->r_num_pages);
  1517 + dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
  1518 + off, *plen, req->r_num_pages, page_align);
1511 1519  
1512 1520 rc = ceph_osdc_start_request(osdc, req, false);
1513 1521 if (!rc)
... ... @@ -1533,6 +1541,7 @@
1533 1541 {
1534 1542 struct ceph_osd_request *req;
1535 1543 int rc = 0;
  1544 + int page_align = off & ~PAGE_MASK;
1536 1545  
1537 1546 BUG_ON(vino.snap != CEPH_NOSNAP);
1538 1547 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
... ... @@ -1541,7 +1550,7 @@
1541 1550 CEPH_OSD_FLAG_WRITE,
1542 1551 snapc, do_sync,
1543 1552 truncate_seq, truncate_size, mtime,
1544   - nofail, 1);
  1553 + nofail, 1, page_align);
1545 1554 if (!req)
1546 1555 return -ENOMEM;
1547 1556  
... ... @@ -1638,8 +1647,7 @@
1638 1647 m = ceph_msg_get(req->r_reply);
1639 1648  
1640 1649 if (data_len > 0) {
1641   - unsigned data_off = le16_to_cpu(hdr->data_off);
1642   - int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
  1650 + int want = calc_pages_for(req->r_page_alignment, data_len);
1643 1651  
1644 1652 if (unlikely(req->r_num_pages < want)) {
1645 1653 pr_warning("tid %lld reply %d > expected %d pages\n",
... ... @@ -1651,6 +1659,7 @@
1651 1659 }
1652 1660 m->pages = req->r_pages;
1653 1661 m->nr_pages = req->r_num_pages;
  1662 + m->page_alignment = req->r_page_alignment;
1654 1663 #ifdef CONFIG_BLOCK
1655 1664 m->bio = req->r_bio;
1656 1665 #endif
... ... @@ -13,8 +13,7 @@
13 13 * build a vector of user pages
14 14 */
15 15 struct page **ceph_get_direct_page_vector(const char __user *data,
16   - int num_pages,
17   - loff_t off, size_t len)
  16 + int num_pages)
18 17 {
19 18 struct page **pages;
20 19 int rc;