Commit 1a52bb0b686844021597d190e562ab55d1210104

Authored by Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: ensure prealloc_blob is in place when removing xattr
  rbd: initialize snap_rwsem in rbd_add()
  ceph: enable/disable dentry complete flags via mount option
  vfs: export symbol d_find_any_alias()
  ceph: always initialize the dentry in open_root_dentry()
  libceph: remove useless return value for osd_client __send_request()
  ceph: avoid iput() while holding spinlock in ceph_dir_fsync
  ceph: avoid useless dget/dput in encode_fh
  ceph: dereference pointer after checking for NULL
  crush: fix force for non-root TAKE
  ceph: remove unnecessary d_fsdata conditional checks
  ceph: Use kmemdup rather than duplicating its implementation

Fix up conflicts in fs/ceph/super.c (d_alloc_root() failure handling vs
always initialize the dentry in open_root_dentry)

Showing 14 changed files Side-by-side Diff

Documentation/filesystems/ceph.txt
... ... @@ -119,12 +119,20 @@
119 119 must rely on TCP's error correction to detect data corruption
120 120 in the data payload.
121 121  
122   - noasyncreaddir
123   - Disable client's use its local cache to satisfy readdir
124   - requests. (This does not change correctness; the client uses
125   - cached metadata only when a lease or capability ensures it is
126   - valid.)
  122 + dcache
  123 + Use the dcache contents to perform negative lookups and
  124 + readdir when the client has the entire directory contents in
  125 + its cache. (This does not change correctness; the client uses
  126 + cached metadata only when a lease or capability ensures it is
  127 + valid.)
127 128  
  129 + nodcache
  130 + Do not use the dcache as above. This avoids a significant amount of
  131 + complex code, sacrificing performance without affecting correctness,
  132 + and is useful for tracking down bugs.
  133 +
  134 + noasyncreaddir
  135 + Do not use the dcache as above for readdir.
128 136  
129 137 More Information
130 138 ================
... ... @@ -2184,6 +2184,8 @@
2184 2184 INIT_LIST_HEAD(&rbd_dev->node);
2185 2185 INIT_LIST_HEAD(&rbd_dev->snaps);
2186 2186  
  2187 + init_rwsem(&rbd_dev->header.snap_rwsem);
  2188 +
2187 2189 /* generate unique id: find highest unique id, add one */
2188 2190 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2189 2191  
... ... @@ -973,7 +973,7 @@
973 973  
974 974 spin_lock(&dentry->d_lock);
975 975 di = ceph_dentry(dentry);
976   - if (di && di->lease_session) {
  976 + if (di->lease_session) {
977 977 s = di->lease_session;
978 978 spin_lock(&s->s_cap_lock);
979 979 gen = s->s_cap_gen;
... ... @@ -1072,13 +1072,11 @@
1072 1072 struct ceph_dentry_info *di = ceph_dentry(dentry);
1073 1073  
1074 1074 dout("d_release %p\n", dentry);
1075   - if (di) {
1076   - ceph_dentry_lru_del(dentry);
1077   - if (di->lease_session)
1078   - ceph_put_mds_session(di->lease_session);
1079   - kmem_cache_free(ceph_dentry_cachep, di);
1080   - dentry->d_fsdata = NULL;
1081   - }
  1075 + ceph_dentry_lru_del(dentry);
  1076 + if (di->lease_session)
  1077 + ceph_put_mds_session(di->lease_session);
  1078 + kmem_cache_free(ceph_dentry_cachep, di);
  1079 + dentry->d_fsdata = NULL;
1082 1080 }
1083 1081  
1084 1082 static int ceph_snapdir_d_revalidate(struct dentry *dentry,
1085 1083  
1086 1084  
... ... @@ -1096,17 +1094,36 @@
1096 1094 */
1097 1095 void ceph_dir_set_complete(struct inode *inode)
1098 1096 {
1099   - /* not yet implemented */
  1097 + struct dentry *dentry = d_find_any_alias(inode);
  1098 +
  1099 + if (dentry && ceph_dentry(dentry) &&
  1100 + ceph_test_mount_opt(ceph_sb_to_client(dentry->d_sb), DCACHE)) {
  1101 + dout(" marking %p (%p) complete\n", inode, dentry);
  1102 + set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
  1103 + }
  1104 + dput(dentry);
1100 1105 }
1101 1106  
1102 1107 void ceph_dir_clear_complete(struct inode *inode)
1103 1108 {
1104   - /* not yet implemented */
  1109 + struct dentry *dentry = d_find_any_alias(inode);
  1110 +
  1111 + if (dentry && ceph_dentry(dentry)) {
  1112 + dout(" marking %p (%p) complete\n", inode, dentry);
  1113 + set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
  1114 + }
  1115 + dput(dentry);
1105 1116 }
1106 1117  
1107 1118 bool ceph_dir_test_complete(struct inode *inode)
1108 1119 {
1109   - /* not yet implemented */
  1120 + struct dentry *dentry = d_find_any_alias(inode);
  1121 +
  1122 + if (dentry && ceph_dentry(dentry)) {
  1123 + dout(" marking %p (%p) NOT complete\n", inode, dentry);
  1124 + clear_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
  1125 + }
  1126 + dput(dentry);
1110 1127 return false;
1111 1128 }
1112 1129  
... ... @@ -1220,6 +1237,7 @@
1220 1237 do {
1221 1238 ceph_mdsc_get_request(req);
1222 1239 spin_unlock(&ci->i_unsafe_lock);
  1240 +
1223 1241 dout("dir_fsync %p wait on tid %llu (until %llu)\n",
1224 1242 inode, req->r_tid, last_tid);
1225 1243 if (req->r_timeout) {
1226 1244  
... ... @@ -1232,9 +1250,9 @@
1232 1250 } else {
1233 1251 wait_for_completion(&req->r_safe_completion);
1234 1252 }
1235   - spin_lock(&ci->i_unsafe_lock);
1236 1253 ceph_mdsc_put_request(req);
1237 1254  
  1255 + spin_lock(&ci->i_unsafe_lock);
1238 1256 if (ret || list_empty(head))
1239 1257 break;
1240 1258 req = list_entry(head->next,
... ... @@ -1259,13 +1277,11 @@
1259 1277  
1260 1278 dout("dentry_lru_add %p %p '%.*s'\n", di, dn,
1261 1279 dn->d_name.len, dn->d_name.name);
1262   - if (di) {
1263   - mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1264   - spin_lock(&mdsc->dentry_lru_lock);
1265   - list_add_tail(&di->lru, &mdsc->dentry_lru);
1266   - mdsc->num_dentry++;
1267   - spin_unlock(&mdsc->dentry_lru_lock);
1268   - }
  1280 + mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
  1281 + spin_lock(&mdsc->dentry_lru_lock);
  1282 + list_add_tail(&di->lru, &mdsc->dentry_lru);
  1283 + mdsc->num_dentry++;
  1284 + spin_unlock(&mdsc->dentry_lru_lock);
1269 1285 }
1270 1286  
1271 1287 void ceph_dentry_lru_touch(struct dentry *dn)
... ... @@ -1275,12 +1291,10 @@
1275 1291  
1276 1292 dout("dentry_lru_touch %p %p '%.*s' (offset %lld)\n", di, dn,
1277 1293 dn->d_name.len, dn->d_name.name, di->offset);
1278   - if (di) {
1279   - mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1280   - spin_lock(&mdsc->dentry_lru_lock);
1281   - list_move_tail(&di->lru, &mdsc->dentry_lru);
1282   - spin_unlock(&mdsc->dentry_lru_lock);
1283   - }
  1294 + mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
  1295 + spin_lock(&mdsc->dentry_lru_lock);
  1296 + list_move_tail(&di->lru, &mdsc->dentry_lru);
  1297 + spin_unlock(&mdsc->dentry_lru_lock);
1284 1298 }
1285 1299  
1286 1300 void ceph_dentry_lru_del(struct dentry *dn)
... ... @@ -1290,13 +1304,11 @@
1290 1304  
1291 1305 dout("dentry_lru_del %p %p '%.*s'\n", di, dn,
1292 1306 dn->d_name.len, dn->d_name.name);
1293   - if (di) {
1294   - mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1295   - spin_lock(&mdsc->dentry_lru_lock);
1296   - list_del_init(&di->lru);
1297   - mdsc->num_dentry--;
1298   - spin_unlock(&mdsc->dentry_lru_lock);
1299   - }
  1307 + mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
  1308 + spin_lock(&mdsc->dentry_lru_lock);
  1309 + list_del_init(&di->lru);
  1310 + mdsc->num_dentry--;
  1311 + spin_unlock(&mdsc->dentry_lru_lock);
1300 1312 }
1301 1313  
1302 1314 /*
... ... @@ -56,9 +56,7 @@
56 56 return -EINVAL;
57 57  
58 58 spin_lock(&dentry->d_lock);
59   - parent = dget(dentry->d_parent);
60   - spin_unlock(&dentry->d_lock);
61   -
  59 + parent = dentry->d_parent;
62 60 if (*max_len >= connected_handle_length) {
63 61 dout("encode_fh %p connectable\n", dentry);
64 62 cfh->ino = ceph_ino(dentry->d_inode);
... ... @@ -81,7 +79,7 @@
81 79 *max_len = handle_length;
82 80 type = 255;
83 81 }
84   - dput(parent);
  82 + spin_unlock(&dentry->d_lock);
85 83 return type;
86 84 }
87 85  
... ... @@ -850,11 +850,12 @@
850 850 {
851 851 struct dentry *dir = dn->d_parent;
852 852 struct inode *inode = dir->d_inode;
853   - struct ceph_inode_info *ci = ceph_inode(inode);
  853 + struct ceph_inode_info *ci;
854 854 struct ceph_dentry_info *di;
855 855  
856 856 BUG_ON(!inode);
857 857  
  858 + ci = ceph_inode(inode);
858 859 di = ceph_dentry(dn);
859 860  
860 861 spin_lock(&ci->i_ceph_lock);
fs/ceph/mds_client.c
... ... @@ -2772,7 +2772,7 @@
2772 2772 di = ceph_dentry(dentry);
2773 2773 switch (h->action) {
2774 2774 case CEPH_MDS_LEASE_REVOKE:
2775   - if (di && di->lease_session == session) {
  2775 + if (di->lease_session == session) {
2776 2776 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2777 2777 h->seq = cpu_to_le32(di->lease_seq);
2778 2778 __ceph_mdsc_drop_dentry_lease(dentry);
... ... @@ -2781,7 +2781,7 @@
2781 2781 break;
2782 2782  
2783 2783 case CEPH_MDS_LEASE_RENEW:
2784   - if (di && di->lease_session == session &&
  2784 + if (di->lease_session == session &&
2785 2785 di->lease_gen == session->s_cap_gen &&
2786 2786 di->lease_renew_from &&
2787 2787 di->lease_renew_after == 0) {
... ... @@ -131,6 +131,8 @@
131 131 Opt_rbytes,
132 132 Opt_norbytes,
133 133 Opt_noasyncreaddir,
  134 + Opt_dcache,
  135 + Opt_nodcache,
134 136 Opt_ino32,
135 137 };
136 138  
... ... @@ -152,6 +154,8 @@
152 154 {Opt_rbytes, "rbytes"},
153 155 {Opt_norbytes, "norbytes"},
154 156 {Opt_noasyncreaddir, "noasyncreaddir"},
  157 + {Opt_dcache, "dcache"},
  158 + {Opt_nodcache, "nodcache"},
155 159 {Opt_ino32, "ino32"},
156 160 {-1, NULL}
157 161 };
... ... @@ -231,6 +235,12 @@
231 235 case Opt_noasyncreaddir:
232 236 fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
233 237 break;
  238 + case Opt_dcache:
  239 + fsopt->flags |= CEPH_MOUNT_OPT_DCACHE;
  240 + break;
  241 + case Opt_nodcache:
  242 + fsopt->flags &= ~CEPH_MOUNT_OPT_DCACHE;
  243 + break;
234 244 case Opt_ino32:
235 245 fsopt->flags |= CEPH_MOUNT_OPT_INO32;
236 246 break;
... ... @@ -377,6 +387,10 @@
377 387 seq_puts(m, ",norbytes");
378 388 if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
379 389 seq_puts(m, ",noasyncreaddir");
  390 + if (fsopt->flags & CEPH_MOUNT_OPT_DCACHE)
  391 + seq_puts(m, ",dcache");
  392 + else
  393 + seq_puts(m, ",nodcache");
380 394  
381 395 if (fsopt->wsize)
382 396 seq_printf(m, ",wsize=%d", fsopt->wsize);
383 397  
... ... @@ -647,10 +661,10 @@
647 661 root = ERR_PTR(-ENOMEM);
648 662 goto out;
649 663 }
650   - ceph_init_dentry(root);
651 664 } else {
652 665 root = d_obtain_alias(inode);
653 666 }
  667 + ceph_init_dentry(root);
654 668 dout("open_root_inode success, root dentry is %p\n", root);
655 669 } else {
656 670 root = ERR_PTR(err);
... ... @@ -28,6 +28,7 @@
28 28 #define CEPH_MOUNT_OPT_RBYTES (1<<5) /* dir st_bytes = rbytes */
29 29 #define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */
30 30 #define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */
  31 +#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */
31 32  
32 33 #define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES)
33 34  
... ... @@ -818,6 +818,7 @@
818 818 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
819 819 int issued;
820 820 int err;
  821 + int required_blob_size;
821 822 int dirty;
822 823  
823 824 if (ceph_snap(inode) != CEPH_NOSNAP)
824 825  
825 826  
... ... @@ -833,14 +834,34 @@
833 834 return -EOPNOTSUPP;
834 835 }
835 836  
  837 + err = -ENOMEM;
836 838 spin_lock(&ci->i_ceph_lock);
837 839 __build_xattrs(inode);
  840 +retry:
838 841 issued = __ceph_caps_issued(ci, NULL);
839 842 dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
840 843  
841 844 if (!(issued & CEPH_CAP_XATTR_EXCL))
842 845 goto do_sync;
843 846  
  847 + required_blob_size = __get_required_blob_size(ci, 0, 0);
  848 +
  849 + if (!ci->i_xattrs.prealloc_blob ||
  850 + required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
  851 + struct ceph_buffer *blob;
  852 +
  853 + spin_unlock(&ci->i_ceph_lock);
  854 + dout(" preaallocating new blob size=%d\n", required_blob_size);
  855 + blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
  856 + if (!blob)
  857 + goto out;
  858 + spin_lock(&ci->i_ceph_lock);
  859 + if (ci->i_xattrs.prealloc_blob)
  860 + ceph_buffer_put(ci->i_xattrs.prealloc_blob);
  861 + ci->i_xattrs.prealloc_blob = blob;
  862 + goto retry;
  863 + }
  864 +
844 865 err = __remove_xattr_by_name(ceph_inode(inode), name);
845 866 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
846 867 ci->i_xattrs.dirty = true;
... ... @@ -853,6 +874,7 @@
853 874 do_sync:
854 875 spin_unlock(&ci->i_ceph_lock);
855 876 err = ceph_send_removexattr(dentry, name);
  877 +out:
856 878 return err;
857 879 }
... ... @@ -1475,7 +1475,14 @@
1475 1475 return alias;
1476 1476 }
1477 1477  
1478   -static struct dentry * d_find_any_alias(struct inode *inode)
  1478 +/**
  1479 + * d_find_any_alias - find any alias for a given inode
  1480 + * @inode: inode to find an alias for
  1481 + *
  1482 + * If any aliases exist for the given inode, take and return a
  1483 + * reference for one of them. If no aliases exist, return %NULL.
  1484 + */
  1485 +struct dentry *d_find_any_alias(struct inode *inode)
1479 1486 {
1480 1487 struct dentry *de;
1481 1488  
... ... @@ -1484,7 +1491,7 @@
1484 1491 spin_unlock(&inode->i_lock);
1485 1492 return de;
1486 1493 }
1487   -
  1494 +EXPORT_SYMBOL(d_find_any_alias);
1488 1495  
1489 1496 /**
1490 1497 * d_obtain_alias - find or allocate a dentry for a given inode
include/linux/dcache.h
... ... @@ -242,6 +242,7 @@
242 242 extern struct dentry * d_alloc_pseudo(struct super_block *, const struct qstr *);
243 243 extern struct dentry * d_splice_alias(struct inode *, struct dentry *);
244 244 extern struct dentry * d_add_ci(struct dentry *, struct inode *, struct qstr *);
  245 +extern struct dentry *d_find_any_alias(struct inode *inode);
245 246 extern struct dentry * d_obtain_alias(struct inode *);
246 247 extern void shrink_dcache_sb(struct super_block *);
247 248 extern void shrink_dcache_parent(struct dentry *);
net/ceph/crush/mapper.c
... ... @@ -510,10 +510,15 @@
510 510 switch (rule->steps[step].op) {
511 511 case CRUSH_RULE_TAKE:
512 512 w[0] = rule->steps[step].arg1;
513   - if (force_pos >= 0) {
514   - BUG_ON(force_context[force_pos] != w[0]);
  513 +
  514 + /* find position in force_context/hierarchy */
  515 + while (force_pos >= 0 &&
  516 + force_context[force_pos] != w[0])
515 517 force_pos--;
516   - }
  518 + /* and move past it */
  519 + if (force_pos >= 0)
  520 + force_pos--;
  521 +
517 522 wsize = 1;
518 523 break;
519 524  
... ... @@ -15,10 +15,9 @@
15 15 const struct ceph_crypto_key *src)
16 16 {
17 17 memcpy(dst, src, sizeof(struct ceph_crypto_key));
18   - dst->key = kmalloc(src->len, GFP_NOFS);
  18 + dst->key = kmemdup(src->key, src->len, GFP_NOFS);
19 19 if (!dst->key)
20 20 return -ENOMEM;
21   - memcpy(dst->key, src->key, src->len);
22 21 return 0;
23 22 }
24 23  
net/ceph/osd_client.c
... ... @@ -29,8 +29,8 @@
29 29 struct ceph_osd_request *req);
30 30 static void __unregister_linger_request(struct ceph_osd_client *osdc,
31 31 struct ceph_osd_request *req);
32   -static int __send_request(struct ceph_osd_client *osdc,
33   - struct ceph_osd_request *req);
  32 +static void __send_request(struct ceph_osd_client *osdc,
  33 + struct ceph_osd_request *req);
34 34  
35 35 static int op_needs_trail(int op)
36 36 {
... ... @@ -1022,8 +1022,8 @@
1022 1022 /*
1023 1023 * caller should hold map_sem (for read) and request_mutex
1024 1024 */
1025   -static int __send_request(struct ceph_osd_client *osdc,
1026   - struct ceph_osd_request *req)
  1025 +static void __send_request(struct ceph_osd_client *osdc,
  1026 + struct ceph_osd_request *req)
1027 1027 {
1028 1028 struct ceph_osd_request_head *reqhead;
1029 1029  
... ... @@ -1041,7 +1041,6 @@
1041 1041 ceph_msg_get(req->r_request); /* send consumes a ref */
1042 1042 ceph_con_send(&req->r_osd->o_con, req->r_request);
1043 1043 req->r_sent = req->r_osd->o_incarnation;
1044   - return 0;
1045 1044 }
1046 1045  
1047 1046 /*
1048 1047  
... ... @@ -1726,17 +1725,9 @@
1726 1725 dout("send_request %p no up osds in pg\n", req);
1727 1726 ceph_monc_request_next_osdmap(&osdc->client->monc);
1728 1727 } else {
1729   - rc = __send_request(osdc, req);
1730   - if (rc) {
1731   - if (nofail) {
1732   - dout("osdc_start_request failed send, "
1733   - " will retry %lld\n", req->r_tid);
1734   - rc = 0;
1735   - } else {
1736   - __unregister_request(osdc, req);
1737   - }
1738   - }
  1728 + __send_request(osdc, req);
1739 1729 }
  1730 + rc = 0;
1740 1731 }
1741 1732  
1742 1733 out_unlock: