Commit 0016eedc4185a3cd7e578b027a6e69001b85d6c4

Authored by Joel Becker
1 parent e8fce482f3

ocfs2_dlmfs: Use the stackglue.

Rather than directly using o2dlm, dlmfs can now use the stackglue.  This
allows it to use userspace cluster stacks and fs/dlm.  This commit
forces o2cb for now.  A latter commit will bump the protocol version and
allow non-o2cb stacks.

This is one big sed, really.  LKM_xxMODE becomes DLM_LOCK_xx.  LKM_flag
becomes DLM_LKF_flag.

We also learn to check that the LVB is valid before reading it.  Any DLM
can lose the contents of the LVB during a complicated recovery.  userdlm
should be checking this.  Now it does.  dlmfs will return 0 from read(2)
if the LVB was invalid.

Signed-off-by: Joel Becker <joel.becker@oracle.com>

Showing 3 changed files with 166 additions and 173 deletions Side-by-side Diff

fs/ocfs2/dlmfs/dlmfs.c
... ... @@ -47,21 +47,13 @@
47 47  
48 48 #include <asm/uaccess.h>
49 49  
50   -
51   -#include "cluster/nodemanager.h"
52   -#include "cluster/heartbeat.h"
53   -#include "cluster/tcp.h"
54   -
55   -#include "dlm/dlmapi.h"
56   -
  50 +#include "stackglue.h"
57 51 #include "userdlm.h"
58   -
59 52 #include "dlmfsver.h"
60 53  
61 54 #define MLOG_MASK_PREFIX ML_DLMFS
62 55 #include "cluster/masklog.h"
63 56  
64   -#include "ocfs2_lockingver.h"
65 57  
66 58 static const struct super_operations dlmfs_ops;
67 59 static const struct file_operations dlmfs_file_operations;
... ... @@ -72,15 +64,6 @@
72 64  
73 65 struct workqueue_struct *user_dlm_worker;
74 66  
75   -/*
76   - * This is the userdlmfs locking protocol version.
77   - *
78   - * See fs/ocfs2/dlmglue.c for more details on locking versions.
79   - */
80   -static const struct dlm_protocol_version user_locking_protocol = {
81   - .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
82   - .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
83   -};
84 67  
85 68  
86 69 /*
... ... @@ -259,7 +242,7 @@
259 242 loff_t *ppos)
260 243 {
261 244 int bytes_left;
262   - ssize_t readlen;
  245 + ssize_t readlen, got;
263 246 char *lvb_buf;
264 247 struct inode *inode = filp->f_path.dentry->d_inode;
265 248  
... ... @@ -285,9 +268,13 @@
285 268 if (!lvb_buf)
286 269 return -ENOMEM;
287 270  
288   - user_dlm_read_lvb(inode, lvb_buf, readlen);
289   - bytes_left = __copy_to_user(buf, lvb_buf, readlen);
290   - readlen -= bytes_left;
  271 + got = user_dlm_read_lvb(inode, lvb_buf, readlen);
  272 + if (got) {
  273 + BUG_ON(got != readlen);
  274 + bytes_left = __copy_to_user(buf, lvb_buf, readlen);
  275 + readlen -= bytes_left;
  276 + } else
  277 + readlen = 0;
291 278  
292 279 kfree(lvb_buf);
293 280  
... ... @@ -346,7 +333,7 @@
346 333 struct dlmfs_inode_private *ip =
347 334 (struct dlmfs_inode_private *) foo;
348 335  
349   - ip->ip_dlm = NULL;
  336 + ip->ip_conn = NULL;
350 337 ip->ip_parent = NULL;
351 338  
352 339 inode_init_once(&ip->ip_vfs_inode);
353 340  
354 341  
... ... @@ -388,14 +375,14 @@
388 375 goto clear_fields;
389 376 }
390 377  
391   - mlog(0, "we're a directory, ip->ip_dlm = 0x%p\n", ip->ip_dlm);
  378 + mlog(0, "we're a directory, ip->ip_conn = 0x%p\n", ip->ip_conn);
392 379 /* we must be a directory. If required, lets unregister the
393 380 * dlm context now. */
394   - if (ip->ip_dlm)
395   - user_dlm_unregister_context(ip->ip_dlm);
  381 + if (ip->ip_conn)
  382 + user_dlm_unregister(ip->ip_conn);
396 383 clear_fields:
397 384 ip->ip_parent = NULL;
398   - ip->ip_dlm = NULL;
  385 + ip->ip_conn = NULL;
399 386 }
400 387  
401 388 static struct backing_dev_info dlmfs_backing_dev_info = {
... ... @@ -445,7 +432,7 @@
445 432 inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
446 433  
447 434 ip = DLMFS_I(inode);
448   - ip->ip_dlm = DLMFS_I(parent)->ip_dlm;
  435 + ip->ip_conn = DLMFS_I(parent)->ip_conn;
449 436  
450 437 switch (mode & S_IFMT) {
451 438 default:
452 439  
... ... @@ -499,13 +486,12 @@
499 486 struct inode *inode = NULL;
500 487 struct qstr *domain = &dentry->d_name;
501 488 struct dlmfs_inode_private *ip;
502   - struct dlm_ctxt *dlm;
503   - struct dlm_protocol_version proto = user_locking_protocol;
  489 + struct ocfs2_cluster_connection *conn;
504 490  
505 491 mlog(0, "mkdir %.*s\n", domain->len, domain->name);
506 492  
507 493 /* verify that we have a proper domain */
508   - if (domain->len >= O2NM_MAX_NAME_LEN) {
  494 + if (domain->len >= GROUP_NAME_MAX) {
509 495 status = -EINVAL;
510 496 mlog(ML_ERROR, "invalid domain name for directory.\n");
511 497 goto bail;
512 498  
... ... @@ -520,14 +506,14 @@
520 506  
521 507 ip = DLMFS_I(inode);
522 508  
523   - dlm = user_dlm_register_context(domain, &proto);
524   - if (IS_ERR(dlm)) {
525   - status = PTR_ERR(dlm);
  509 + conn = user_dlm_register(domain);
  510 + if (IS_ERR(conn)) {
  511 + status = PTR_ERR(conn);
526 512 mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n",
527 513 status, domain->len, domain->name);
528 514 goto bail;
529 515 }
530   - ip->ip_dlm = dlm;
  516 + ip->ip_conn = conn;
531 517  
532 518 inc_nlink(dir);
533 519 d_instantiate(dentry, inode);
... ... @@ -696,6 +682,7 @@
696 682 }
697 683 cleanup_worker = 1;
698 684  
  685 + user_dlm_set_locking_protocol();
699 686 status = register_filesystem(&dlmfs_fs_type);
700 687 bail:
701 688 if (status) {
fs/ocfs2/dlmfs/userdlm.c
... ... @@ -34,18 +34,19 @@
34 34 #include <linux/types.h>
35 35 #include <linux/crc32.h>
36 36  
37   -
38   -#include "cluster/nodemanager.h"
39   -#include "cluster/heartbeat.h"
40   -#include "cluster/tcp.h"
41   -
42   -#include "dlm/dlmapi.h"
43   -
  37 +#include "ocfs2_lockingver.h"
  38 +#include "stackglue.h"
44 39 #include "userdlm.h"
45 40  
46 41 #define MLOG_MASK_PREFIX ML_DLMFS
47 42 #include "cluster/masklog.h"
48 43  
  44 +
  45 +static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
  46 +{
  47 + return container_of(lksb, struct user_lock_res, l_lksb);
  48 +}
  49 +
49 50 static inline int user_check_wait_flag(struct user_lock_res *lockres,
50 51 int flag)
51 52 {
52 53  
... ... @@ -73,15 +74,15 @@
73 74 }
74 75  
75 76 /* I heart container_of... */
76   -static inline struct dlm_ctxt *
77   -dlm_ctxt_from_user_lockres(struct user_lock_res *lockres)
  77 +static inline struct ocfs2_cluster_connection *
  78 +cluster_connection_from_user_lockres(struct user_lock_res *lockres)
78 79 {
79 80 struct dlmfs_inode_private *ip;
80 81  
81 82 ip = container_of(lockres,
82 83 struct dlmfs_inode_private,
83 84 ip_lockres);
84   - return ip->ip_dlm;
  85 + return ip->ip_conn;
85 86 }
86 87  
87 88 static struct inode *
... ... @@ -103,9 +104,9 @@
103 104 }
104 105  
105 106 #define user_log_dlm_error(_func, _stat, _lockres) do { \
106   - mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
107   - "resource %.*s: %s\n", dlm_errname(_stat), _func, \
108   - _lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \
  107 + mlog(ML_ERROR, "Dlm error %d while calling %s on " \
  108 + "resource %.*s\n", _stat, _func, \
  109 + _lockres->l_namelen, _lockres->l_name); \
109 110 } while (0)
110 111  
111 112 /* WARNING: This function lives in a world where the only three lock
112 113  
113 114  
114 115  
115 116  
116 117  
117 118  
... ... @@ -113,34 +114,34 @@
113 114 * lock types are added. */
114 115 static inline int user_highest_compat_lock_level(int level)
115 116 {
116   - int new_level = LKM_EXMODE;
  117 + int new_level = DLM_LOCK_EX;
117 118  
118   - if (level == LKM_EXMODE)
119   - new_level = LKM_NLMODE;
120   - else if (level == LKM_PRMODE)
121   - new_level = LKM_PRMODE;
  119 + if (level == DLM_LOCK_EX)
  120 + new_level = DLM_LOCK_NL;
  121 + else if (level == DLM_LOCK_PR)
  122 + new_level = DLM_LOCK_PR;
122 123 return new_level;
123 124 }
124 125  
125   -static void user_ast(void *opaque)
  126 +static void user_ast(struct ocfs2_dlm_lksb *lksb)
126 127 {
127   - struct user_lock_res *lockres = opaque;
128   - struct dlm_lockstatus *lksb;
  128 + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
  129 + int status;
129 130  
130 131 mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen,
131 132 lockres->l_name);
132 133  
133 134 spin_lock(&lockres->l_lock);
134 135  
135   - lksb = &(lockres->l_lksb);
136   - if (lksb->status != DLM_NORMAL) {
  136 + status = ocfs2_dlm_lock_status(&lockres->l_lksb);
  137 + if (status) {
137 138 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
138   - lksb->status, lockres->l_namelen, lockres->l_name);
  139 + status, lockres->l_namelen, lockres->l_name);
139 140 spin_unlock(&lockres->l_lock);
140 141 return;
141 142 }
142 143  
143   - mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE,
  144 + mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
144 145 "Lockres %.*s, requested ivmode. flags 0x%x\n",
145 146 lockres->l_namelen, lockres->l_name, lockres->l_flags);
146 147  
147 148  
... ... @@ -148,13 +149,13 @@
148 149 if (lockres->l_requested < lockres->l_level) {
149 150 if (lockres->l_requested <=
150 151 user_highest_compat_lock_level(lockres->l_blocking)) {
151   - lockres->l_blocking = LKM_NLMODE;
  152 + lockres->l_blocking = DLM_LOCK_NL;
152 153 lockres->l_flags &= ~USER_LOCK_BLOCKED;
153 154 }
154 155 }
155 156  
156 157 lockres->l_level = lockres->l_requested;
157   - lockres->l_requested = LKM_IVMODE;
  158 + lockres->l_requested = DLM_LOCK_IV;
158 159 lockres->l_flags |= USER_LOCK_ATTACHED;
159 160 lockres->l_flags &= ~USER_LOCK_BUSY;
160 161  
161 162  
... ... @@ -193,11 +194,11 @@
193 194 return;
194 195  
195 196 switch (lockres->l_blocking) {
196   - case LKM_EXMODE:
  197 + case DLM_LOCK_EX:
197 198 if (!lockres->l_ex_holders && !lockres->l_ro_holders)
198 199 queue = 1;
199 200 break;
200   - case LKM_PRMODE:
  201 + case DLM_LOCK_PR:
201 202 if (!lockres->l_ex_holders)
202 203 queue = 1;
203 204 break;
204 205  
... ... @@ -209,9 +210,9 @@
209 210 __user_dlm_queue_lockres(lockres);
210 211 }
211 212  
212   -static void user_bast(void *opaque, int level)
  213 +static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
213 214 {
214   - struct user_lock_res *lockres = opaque;
  215 + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
215 216  
216 217 mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n",
217 218 lockres->l_namelen, lockres->l_name, level);
218 219  
219 220  
... ... @@ -227,15 +228,15 @@
227 228 wake_up(&lockres->l_event);
228 229 }
229 230  
230   -static void user_unlock_ast(void *opaque, enum dlm_status status)
  231 +static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
231 232 {
232   - struct user_lock_res *lockres = opaque;
  233 + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
233 234  
234 235 mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen,
235 236 lockres->l_name);
236 237  
237   - if (status != DLM_NORMAL && status != DLM_CANCELGRANT)
238   - mlog(ML_ERROR, "Dlm returns status %d\n", status);
  238 + if (status)
  239 + mlog(ML_ERROR, "dlm returns status %d\n", status);
239 240  
240 241 spin_lock(&lockres->l_lock);
241 242 /* The teardown flag gets set early during the unlock process,
... ... @@ -243,7 +244,7 @@
243 244 * for a concurrent cancel. */
244 245 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
245 246 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
246   - lockres->l_level = LKM_IVMODE;
  247 + lockres->l_level = DLM_LOCK_IV;
247 248 } else if (status == DLM_CANCELGRANT) {
248 249 /* We tried to cancel a convert request, but it was
249 250 * already granted. Don't clear the busy flag - the
... ... @@ -254,7 +255,7 @@
254 255 } else {
255 256 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
256 257 /* Cancel succeeded, we want to re-queue */
257   - lockres->l_requested = LKM_IVMODE; /* cancel an
  258 + lockres->l_requested = DLM_LOCK_IV; /* cancel an
258 259 * upconvert
259 260 * request. */
260 261 lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
... ... @@ -271,6 +272,21 @@
271 272 wake_up(&lockres->l_event);
272 273 }
273 274  
  275 +/*
  276 + * This is the userdlmfs locking protocol version.
  277 + *
  278 + * See fs/ocfs2/dlmglue.c for more details on locking versions.
  279 + */
  280 +static struct ocfs2_locking_protocol user_dlm_lproto = {
  281 + .lp_max_version = {
  282 + .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
  283 + .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
  284 + },
  285 + .lp_lock_ast = user_ast,
  286 + .lp_blocking_ast = user_bast,
  287 + .lp_unlock_ast = user_unlock_ast,
  288 +};
  289 +
274 290 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
275 291 {
276 292 struct inode *inode;
... ... @@ -283,7 +299,8 @@
283 299 int new_level, status;
284 300 struct user_lock_res *lockres =
285 301 container_of(work, struct user_lock_res, l_work);
286   - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
  302 + struct ocfs2_cluster_connection *conn =
  303 + cluster_connection_from_user_lockres(lockres);
287 304  
288 305 mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
289 306 lockres->l_name);
290 307  
... ... @@ -322,20 +339,17 @@
322 339 lockres->l_flags |= USER_LOCK_IN_CANCEL;
323 340 spin_unlock(&lockres->l_lock);
324 341  
325   - status = dlmunlock(dlm,
326   - &lockres->l_lksb,
327   - LKM_CANCEL,
328   - user_unlock_ast,
329   - lockres);
330   - if (status != DLM_NORMAL)
331   - user_log_dlm_error("dlmunlock", status, lockres);
  342 + status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
  343 + DLM_LKF_CANCEL);
  344 + if (status)
  345 + user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
332 346 goto drop_ref;
333 347 }
334 348  
335 349 /* If there are still incompat holders, we can exit safely
336 350 * without worrying about re-queueing this lock as that will
337 351 * happen on the last call to user_cluster_unlock. */
338   - if ((lockres->l_blocking == LKM_EXMODE)
  352 + if ((lockres->l_blocking == DLM_LOCK_EX)
339 353 && (lockres->l_ex_holders || lockres->l_ro_holders)) {
340 354 spin_unlock(&lockres->l_lock);
341 355 mlog(0, "can't downconvert for ex: ro = %u, ex = %u\n",
... ... @@ -343,7 +357,7 @@
343 357 goto drop_ref;
344 358 }
345 359  
346   - if ((lockres->l_blocking == LKM_PRMODE)
  360 + if ((lockres->l_blocking == DLM_LOCK_PR)
347 361 && lockres->l_ex_holders) {
348 362 spin_unlock(&lockres->l_lock);
349 363 mlog(0, "can't downconvert for pr: ex = %u\n",
... ... @@ -360,17 +374,12 @@
360 374 spin_unlock(&lockres->l_lock);
361 375  
362 376 /* need lock downconvert request now... */
363   - status = dlmlock(dlm,
364   - new_level,
365   - &lockres->l_lksb,
366   - LKM_CONVERT|LKM_VALBLK,
367   - lockres->l_name,
368   - lockres->l_namelen,
369   - user_ast,
370   - lockres,
371   - user_bast);
372   - if (status != DLM_NORMAL) {
373   - user_log_dlm_error("dlmlock", status, lockres);
  377 + status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
  378 + DLM_LKF_CONVERT|DLM_LKF_VALBLK,
  379 + lockres->l_name,
  380 + lockres->l_namelen);
  381 + if (status) {
  382 + user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
374 383 user_recover_from_dlm_error(lockres);
375 384 }
376 385  
377 386  
... ... @@ -382,10 +391,10 @@
382 391 int level)
383 392 {
384 393 switch(level) {
385   - case LKM_EXMODE:
  394 + case DLM_LOCK_EX:
386 395 lockres->l_ex_holders++;
387 396 break;
388   - case LKM_PRMODE:
  397 + case DLM_LOCK_PR:
389 398 lockres->l_ro_holders++;
390 399 break;
391 400 default:
392 401  
... ... @@ -410,10 +419,11 @@
410 419 int lkm_flags)
411 420 {
412 421 int status, local_flags;
413   - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
  422 + struct ocfs2_cluster_connection *conn =
  423 + cluster_connection_from_user_lockres(lockres);
414 424  
415   - if (level != LKM_EXMODE &&
416   - level != LKM_PRMODE) {
  425 + if (level != DLM_LOCK_EX &&
  426 + level != DLM_LOCK_PR) {
417 427 mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
418 428 lockres->l_namelen, lockres->l_name);
419 429 status = -EINVAL;
... ... @@ -422,7 +432,7 @@
422 432  
423 433 mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n",
424 434 lockres->l_namelen, lockres->l_name,
425   - (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
  435 + (level == DLM_LOCK_EX) ? "DLM_LOCK_EX" : "DLM_LOCK_PR",
426 436 lkm_flags);
427 437  
428 438 again:
429 439  
430 440  
... ... @@ -457,35 +467,26 @@
457 467 }
458 468  
459 469 if (level > lockres->l_level) {
460   - local_flags = lkm_flags | LKM_VALBLK;
461   - if (lockres->l_level != LKM_IVMODE)
462   - local_flags |= LKM_CONVERT;
  470 + local_flags = lkm_flags | DLM_LKF_VALBLK;
  471 + if (lockres->l_level != DLM_LOCK_IV)
  472 + local_flags |= DLM_LKF_CONVERT;
463 473  
464 474 lockres->l_requested = level;
465 475 lockres->l_flags |= USER_LOCK_BUSY;
466 476 spin_unlock(&lockres->l_lock);
467 477  
468   - BUG_ON(level == LKM_IVMODE);
469   - BUG_ON(level == LKM_NLMODE);
  478 + BUG_ON(level == DLM_LOCK_IV);
  479 + BUG_ON(level == DLM_LOCK_NL);
470 480  
471 481 /* call dlm_lock to upgrade lock now */
472   - status = dlmlock(dlm,
473   - level,
474   - &lockres->l_lksb,
475   - local_flags,
476   - lockres->l_name,
477   - lockres->l_namelen,
478   - user_ast,
479   - lockres,
480   - user_bast);
481   - if (status != DLM_NORMAL) {
482   - if ((lkm_flags & LKM_NOQUEUE) &&
483   - (status == DLM_NOTQUEUED))
484   - status = -EAGAIN;
485   - else {
486   - user_log_dlm_error("dlmlock", status, lockres);
487   - status = -EINVAL;
488   - }
  482 + status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
  483 + local_flags, lockres->l_name,
  484 + lockres->l_namelen);
  485 + if (status) {
  486 + if ((lkm_flags & DLM_LKF_NOQUEUE) &&
  487 + (status != -EAGAIN))
  488 + user_log_dlm_error("ocfs2_dlm_lock",
  489 + status, lockres);
489 490 user_recover_from_dlm_error(lockres);
490 491 goto bail;
491 492 }
492 493  
... ... @@ -506,11 +507,11 @@
506 507 int level)
507 508 {
508 509 switch(level) {
509   - case LKM_EXMODE:
  510 + case DLM_LOCK_EX:
510 511 BUG_ON(!lockres->l_ex_holders);
511 512 lockres->l_ex_holders--;
512 513 break;
513   - case LKM_PRMODE:
  514 + case DLM_LOCK_PR:
514 515 BUG_ON(!lockres->l_ro_holders);
515 516 lockres->l_ro_holders--;
516 517 break;
... ... @@ -522,8 +523,8 @@
522 523 void user_dlm_cluster_unlock(struct user_lock_res *lockres,
523 524 int level)
524 525 {
525   - if (level != LKM_EXMODE &&
526   - level != LKM_PRMODE) {
  526 + if (level != DLM_LOCK_EX &&
  527 + level != DLM_LOCK_PR) {
527 528 mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
528 529 lockres->l_namelen, lockres->l_name);
529 530 return;
530 531  
531 532  
532 533  
533 534  
534 535  
... ... @@ -540,33 +541,40 @@
540 541 unsigned int len)
541 542 {
542 543 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
543   - char *lvb = lockres->l_lksb.lvb;
  544 + char *lvb;
544 545  
545 546 BUG_ON(len > DLM_LVB_LEN);
546 547  
547 548 spin_lock(&lockres->l_lock);
548 549  
549   - BUG_ON(lockres->l_level < LKM_EXMODE);
  550 + BUG_ON(lockres->l_level < DLM_LOCK_EX);
  551 + lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
550 552 memcpy(lvb, val, len);
551 553  
552 554 spin_unlock(&lockres->l_lock);
553 555 }
554 556  
555   -void user_dlm_read_lvb(struct inode *inode,
556   - char *val,
557   - unsigned int len)
  557 +ssize_t user_dlm_read_lvb(struct inode *inode,
  558 + char *val,
  559 + unsigned int len)
558 560 {
559 561 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
560   - char *lvb = lockres->l_lksb.lvb;
  562 + char *lvb;
  563 + ssize_t ret = len;
561 564  
562 565 BUG_ON(len > DLM_LVB_LEN);
563 566  
564 567 spin_lock(&lockres->l_lock);
565 568  
566   - BUG_ON(lockres->l_level < LKM_PRMODE);
567   - memcpy(val, lvb, len);
  569 + BUG_ON(lockres->l_level < DLM_LOCK_PR);
  570 + if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
  571 + lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
  572 + memcpy(val, lvb, len);
  573 + } else
  574 + ret = 0;
568 575  
569 576 spin_unlock(&lockres->l_lock);
  577 + return ret;
570 578 }
571 579  
572 580 void user_dlm_lock_res_init(struct user_lock_res *lockres,
... ... @@ -576,9 +584,9 @@
576 584  
577 585 spin_lock_init(&lockres->l_lock);
578 586 init_waitqueue_head(&lockres->l_event);
579   - lockres->l_level = LKM_IVMODE;
580   - lockres->l_requested = LKM_IVMODE;
581   - lockres->l_blocking = LKM_IVMODE;
  587 + lockres->l_level = DLM_LOCK_IV;
  588 + lockres->l_requested = DLM_LOCK_IV;
  589 + lockres->l_blocking = DLM_LOCK_IV;
582 590  
583 591 /* should have been checked before getting here. */
584 592 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
... ... @@ -592,7 +600,8 @@
592 600 int user_dlm_destroy_lock(struct user_lock_res *lockres)
593 601 {
594 602 int status = -EBUSY;
595   - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
  603 + struct ocfs2_cluster_connection *conn =
  604 + cluster_connection_from_user_lockres(lockres);
596 605  
597 606 mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name);
598 607  
... ... @@ -627,14 +636,9 @@
627 636 lockres->l_flags |= USER_LOCK_BUSY;
628 637 spin_unlock(&lockres->l_lock);
629 638  
630   - status = dlmunlock(dlm,
631   - &lockres->l_lksb,
632   - LKM_VALBLK,
633   - user_unlock_ast,
634   - lockres);
635   - if (status != DLM_NORMAL) {
636   - user_log_dlm_error("dlmunlock", status, lockres);
637   - status = -EINVAL;
  639 + status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
  640 + if (status) {
  641 + user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
638 642 goto bail;
639 643 }
640 644  
641 645  
642 646  
643 647  
644 648  
645 649  
646 650  
647 651  
... ... @@ -645,33 +649,35 @@
645 649 return status;
646 650 }
647 651  
648   -struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
649   - struct dlm_protocol_version *proto)
  652 +static void user_dlm_recovery_handler_noop(int node_num,
  653 + void *recovery_data)
650 654 {
651   - struct dlm_ctxt *dlm;
652   - u32 dlm_key;
653   - char *domain;
  655 + /* We ignore recovery events */
  656 + return;
  657 +}
654 658  
655   - domain = kmalloc(name->len + 1, GFP_NOFS);
656   - if (!domain) {
657   - mlog_errno(-ENOMEM);
658   - return ERR_PTR(-ENOMEM);
659   - }
  659 +void user_dlm_set_locking_protocol(void)
  660 +{
  661 + ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
  662 +}
660 663  
661   - dlm_key = crc32_le(0, name->name, name->len);
  664 +struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name)
  665 +{
  666 + int rc;
  667 + struct ocfs2_cluster_connection *conn;
662 668  
663   - snprintf(domain, name->len + 1, "%.*s", name->len, name->name);
  669 + rc = ocfs2_cluster_connect("o2cb", name->name, name->len,
  670 + &user_dlm_lproto,
  671 + user_dlm_recovery_handler_noop,
  672 + NULL, &conn);
  673 + if (rc)
  674 + mlog_errno(rc);
664 675  
665   - dlm = dlm_register_domain(domain, dlm_key, proto);
666   - if (IS_ERR(dlm))
667   - mlog_errno(PTR_ERR(dlm));
668   -
669   - kfree(domain);
670   - return dlm;
  676 + return rc ? ERR_PTR(rc) : conn;
671 677 }
672 678  
673   -void user_dlm_unregister_context(struct dlm_ctxt *dlm)
  679 +void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
674 680 {
675   - dlm_unregister_domain(dlm);
  681 + ocfs2_cluster_disconnect(conn, 0);
676 682 }
fs/ocfs2/dlmfs/userdlm.h
... ... @@ -57,7 +57,7 @@
57 57 int l_level;
58 58 unsigned int l_ro_holders;
59 59 unsigned int l_ex_holders;
60   - struct dlm_lockstatus l_lksb;
  60 + struct ocfs2_dlm_lksb l_lksb;
61 61  
62 62 int l_requested;
63 63 int l_blocking;
64 64  
... ... @@ -80,15 +80,15 @@
80 80 void user_dlm_write_lvb(struct inode *inode,
81 81 const char *val,
82 82 unsigned int len);
83   -void user_dlm_read_lvb(struct inode *inode,
84   - char *val,
85   - unsigned int len);
86   -struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
87   - struct dlm_protocol_version *proto);
88   -void user_dlm_unregister_context(struct dlm_ctxt *dlm);
  83 +ssize_t user_dlm_read_lvb(struct inode *inode,
  84 + char *val,
  85 + unsigned int len);
  86 +struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name);
  87 +void user_dlm_unregister(struct ocfs2_cluster_connection *conn);
  88 +void user_dlm_set_locking_protocol(void);
89 89  
90 90 struct dlmfs_inode_private {
91   - struct dlm_ctxt *ip_dlm;
  91 + struct ocfs2_cluster_connection *ip_conn;
92 92  
93 93 struct user_lock_res ip_lockres; /* unused for directories. */
94 94 struct inode *ip_parent;