Commit 57e6a7dde8cfca9ac1d6702cf9104d22bc11ba90

Authored by Linus Torvalds

Merge git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-3.0-nmw

* git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-3.0-nmw:
  GFS2: Fix nlink setting on inode creation
  GFS2: fail mount if journal recovery fails
  GFS2: let spectator mount do read only recovery
  GFS2: Fix a use-after-free that coverity spotted
  GFS2: dlm based recovery coordination

Showing 12 changed files Side-by-side Diff

... ... @@ -1353,7 +1353,7 @@
1353 1353 spin_lock(&gl->gl_spin);
1354 1354 gl->gl_reply = ret;
1355 1355  
1356   - if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_flags))) {
  1356 + if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags))) {
1357 1357 if (gfs2_should_freeze(gl)) {
1358 1358 set_bit(GLF_FROZEN, &gl->gl_flags);
1359 1359 spin_unlock(&gl->gl_spin);
... ... @@ -121,8 +121,11 @@
121 121  
122 122 struct lm_lockops {
123 123 const char *lm_proto_name;
124   - int (*lm_mount) (struct gfs2_sbd *sdp, const char *fsname);
125   - void (*lm_unmount) (struct gfs2_sbd *sdp);
  124 + int (*lm_mount) (struct gfs2_sbd *sdp, const char *table);
  125 + void (*lm_first_done) (struct gfs2_sbd *sdp);
  126 + void (*lm_recovery_result) (struct gfs2_sbd *sdp, unsigned int jid,
  127 + unsigned int result);
  128 + void (*lm_unmount) (struct gfs2_sbd *sdp);
126 129 void (*lm_withdraw) (struct gfs2_sbd *sdp);
127 130 void (*lm_put_lock) (struct gfs2_glock *gl);
128 131 int (*lm_lock) (struct gfs2_glock *gl, unsigned int req_state,
... ... @@ -139,8 +139,45 @@
139 139 #define GDLM_STRNAME_BYTES 25
140 140 #define GDLM_LVB_SIZE 32
141 141  
  142 +/*
  143 + * ls_recover_flags:
  144 + *
  145 + * DFL_BLOCK_LOCKS: dlm is in recovery and will grant locks that had been
  146 + * held by failed nodes whose journals need recovery. Those locks should
  147 + * only be used for journal recovery until the journal recovery is done.
  148 + * This is set by the dlm recover_prep callback and cleared by the
  149 + * gfs2_control thread when journal recovery is complete. To avoid
  150 + * races between recover_prep setting and gfs2_control clearing, recover_spin
  151 + * is held while changing this bit and reading/writing recover_block
  152 + * and recover_start.
  153 + *
  154 + * DFL_NO_DLM_OPS: dlm lockspace ops/callbacks are not being used.
  155 + *
  156 + * DFL_FIRST_MOUNT: this node is the first to mount this fs and is doing
  157 + * recovery of all journals before allowing other nodes to mount the fs.
  158 + * This is cleared when FIRST_MOUNT_DONE is set.
  159 + *
  160 + * DFL_FIRST_MOUNT_DONE: this node was the first mounter, and has finished
  161 + * recovery of all journals, and now allows other nodes to mount the fs.
  162 + *
  163 + * DFL_MOUNT_DONE: gdlm_mount has completed successfully and cleared
  164 + * BLOCK_LOCKS for the first time. The gfs2_control thread should now
  165 + * control clearing BLOCK_LOCKS for further recoveries.
  166 + *
  167 + * DFL_UNMOUNT: gdlm_unmount sets to keep sdp off gfs2_control_wq.
  168 + *
  169 + * DFL_DLM_RECOVERY: set while dlm is in recovery, between recover_prep()
  170 + * and recover_done(), i.e. set while recover_block == recover_start.
  171 + */
  172 +
142 173 enum {
143 174 DFL_BLOCK_LOCKS = 0,
  175 + DFL_NO_DLM_OPS = 1,
  176 + DFL_FIRST_MOUNT = 2,
  177 + DFL_FIRST_MOUNT_DONE = 3,
  178 + DFL_MOUNT_DONE = 4,
  179 + DFL_UNMOUNT = 5,
  180 + DFL_DLM_RECOVERY = 6,
144 181 };
145 182  
146 183 struct lm_lockname {
... ... @@ -392,6 +429,7 @@
392 429 #define JDF_RECOVERY 1
393 430 unsigned int jd_jid;
394 431 unsigned int jd_blocks;
  432 + int jd_recover_error;
395 433 };
396 434  
397 435 struct gfs2_statfs_change_host {
... ... @@ -461,6 +499,7 @@
461 499 SDF_NORECOVERY = 4,
462 500 SDF_DEMOTE = 5,
463 501 SDF_NOJOURNALID = 6,
  502 + SDF_RORECOVERY = 7, /* read only recovery */
464 503 };
465 504  
466 505 #define GFS2_FSNAME_LEN 256
467 506  
468 507  
... ... @@ -499,14 +538,26 @@
499 538 struct lm_lockstruct {
500 539 int ls_jid;
501 540 unsigned int ls_first;
502   - unsigned int ls_first_done;
503 541 unsigned int ls_nodir;
504 542 const struct lm_lockops *ls_ops;
505   - unsigned long ls_flags;
506 543 dlm_lockspace_t *ls_dlm;
507 544  
508   - int ls_recover_jid_done;
509   - int ls_recover_jid_status;
  545 + int ls_recover_jid_done; /* These two are deprecated, */
  546 + int ls_recover_jid_status; /* used previously by gfs_controld */
  547 +
  548 + struct dlm_lksb ls_mounted_lksb; /* mounted_lock */
  549 + struct dlm_lksb ls_control_lksb; /* control_lock */
  550 + char ls_control_lvb[GDLM_LVB_SIZE]; /* control_lock lvb */
  551 + struct completion ls_sync_wait; /* {control,mounted}_{lock,unlock} */
  552 +
  553 + spinlock_t ls_recover_spin; /* protects following fields */
  554 + unsigned long ls_recover_flags; /* DFL_ */
  555 + uint32_t ls_recover_mount; /* gen in first recover_done cb */
  556 + uint32_t ls_recover_start; /* gen in last recover_done cb */
  557 + uint32_t ls_recover_block; /* copy recover_start in last recover_prep */
  558 + uint32_t ls_recover_size; /* size of recover_submit, recover_result */
  559 + uint32_t *ls_recover_submit; /* gen in last recover_slot cb per jid */
  560 + uint32_t *ls_recover_result; /* result of last jid recovery */
510 561 };
511 562  
512 563 struct gfs2_sbd {
... ... @@ -544,6 +595,7 @@
544 595 wait_queue_head_t sd_glock_wait;
545 596 atomic_t sd_glock_disposal;
546 597 struct completion sd_locking_init;
  598 + struct delayed_work sd_control_work;
547 599  
548 600 /* Inode Stuff */
549 601  
... ... @@ -599,9 +599,7 @@
599 599 error = gfs2_meta_inode_buffer(ip, &dibh);
600 600 if (error)
601 601 goto fail_end_trans;
602   - inc_nlink(&ip->i_inode);
603   - if (S_ISDIR(ip->i_inode.i_mode))
604   - inc_nlink(&ip->i_inode);
  602 + set_nlink(&ip->i_inode, S_ISDIR(ip->i_inode.i_mode) ? 2 : 1);
605 603 gfs2_trans_add_bh(ip->i_gl, dibh, 1);
606 604 gfs2_dinode_out(ip, dibh->b_data);
607 605 brelse(dibh);
Changes suppressed. Click to show
1 1 /*
2 2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
3   - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
  3 + * Copyright 2004-2011 Red Hat, Inc.
4 4 *
5 5 * This copyrighted material is made available to anyone wishing to use,
6 6 * modify, copy, or redistribute it subject to the terms and conditions
7 7  
8 8  
... ... @@ -11,12 +11,15 @@
11 11 #include <linux/dlm.h>
12 12 #include <linux/slab.h>
13 13 #include <linux/types.h>
  14 +#include <linux/delay.h>
14 15 #include <linux/gfs2_ondisk.h>
15 16  
16 17 #include "incore.h"
17 18 #include "glock.h"
18 19 #include "util.h"
  20 +#include "sys.h"
19 21  
  22 +extern struct workqueue_struct *gfs2_control_wq;
20 23  
21 24 static void gdlm_ast(void *arg)
22 25 {
23 26  
24 27  
25 28  
26 29  
27 30  
28 31  
29 32  
30 33  
... ... @@ -185,34 +188,1002 @@
185 188 dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl);
186 189 }
187 190  
188   -static int gdlm_mount(struct gfs2_sbd *sdp, const char *fsname)
  191 +/*
  192 + * dlm/gfs2 recovery coordination using dlm_recover callbacks
  193 + *
  194 + * 1. dlm_controld sees lockspace members change
  195 + * 2. dlm_controld blocks dlm-kernel locking activity
  196 + * 3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
  197 + * 4. dlm_controld starts and finishes its own user level recovery
  198 + * 5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery
  199 + * 6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot)
  200 + * 7. dlm_recoverd does its own lock recovery
  201 + * 8. dlm_recoverd unblocks dlm-kernel locking activity
  202 + * 9. dlm_recoverd notifies gfs2 when done (recover_done with new generation)
  203 + * 10. gfs2_control updates control_lock lvb with new generation and jid bits
  204 + * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none)
  205 + * 12. gfs2_recover dequeues and recovers journals of failed nodes
  206 + * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result)
  207 + * 14. gfs2_control updates control_lock lvb jid bits for recovered journals
  208 + * 15. gfs2_control unblocks normal locking when all journals are recovered
  209 + *
  210 + * - failures during recovery
  211 + *
  212 + * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control
  213 + * clears BLOCK_LOCKS (step 15), e.g. another node fails while still
  214 + * recovering for a prior failure. gfs2_control needs a way to detect
  215 + * this so it can leave BLOCK_LOCKS set in step 15. This is managed using
  216 + * the recover_block and recover_start values.
  217 + *
  218 + * recover_done() provides a new lockspace generation number each time it
  219 + * is called (step 9). This generation number is saved as recover_start.
  220 + * When recover_prep() is called, it sets BLOCK_LOCKS and sets
  221 + * recover_block = recover_start. So, while recover_block is equal to
  222 + * recover_start, BLOCK_LOCKS should remain set. (recover_spin must
  223 + * be held around the BLOCK_LOCKS/recover_block/recover_start logic.)
  224 + *
  225 + * - more specific gfs2 steps in sequence above
  226 + *
  227 + * 3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start
  228 + * 6. recover_slot records any failed jids (maybe none)
  229 + * 9. recover_done sets recover_start = new generation number
  230 + * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids
  231 + * 12. gfs2_recover does journal recoveries for failed jids identified above
  232 + * 14. gfs2_control clears control_lock lvb bits for recovered jids
  233 + * 15. gfs2_control checks if recover_block == recover_start (step 3 occured
  234 + * again) then do nothing, otherwise if recover_start > recover_block
  235 + * then clear BLOCK_LOCKS.
  236 + *
  237 + * - parallel recovery steps across all nodes
  238 + *
  239 + * All nodes attempt to update the control_lock lvb with the new generation
  240 + * number and jid bits, but only the first to get the control_lock EX will
  241 + * do so; others will see that it's already done (lvb already contains new
  242 + * generation number.)
  243 + *
  244 + * . All nodes get the same recover_prep/recover_slot/recover_done callbacks
  245 + * . All nodes attempt to set control_lock lvb gen + bits for the new gen
  246 + * . One node gets control_lock first and writes the lvb, others see it's done
  247 + * . All nodes attempt to recover jids for which they see control_lock bits set
  248 + * . One node succeeds for a jid, and that one clears the jid bit in the lvb
  249 + * . All nodes will eventually see all lvb bits clear and unblock locks
  250 + *
  251 + * - is there a problem with clearing an lvb bit that should be set
  252 + * and missing a journal recovery?
  253 + *
  254 + * 1. jid fails
  255 + * 2. lvb bit set for step 1
  256 + * 3. jid recovered for step 1
  257 + * 4. jid taken again (new mount)
  258 + * 5. jid fails (for step 4)
  259 + * 6. lvb bit set for step 5 (will already be set)
  260 + * 7. lvb bit cleared for step 3
  261 + *
  262 + * This is not a problem because the failure in step 5 does not
  263 + * require recovery, because the mount in step 4 could not have
  264 + * progressed far enough to unblock locks and access the fs. The
  265 + * control_mount() function waits for all recoveries to be complete
  266 + * for the latest lockspace generation before ever unblocking locks
  267 + * and returning. The mount in step 4 waits until the recovery in
  268 + * step 1 is done.
  269 + *
  270 + * - special case of first mounter: first node to mount the fs
  271 + *
  272 + * The first node to mount a gfs2 fs needs to check all the journals
  273 + * and recover any that need recovery before other nodes are allowed
  274 + * to mount the fs. (Others may begin mounting, but they must wait
  275 + * for the first mounter to be done before taking locks on the fs
  276 + * or accessing the fs.) This has two parts:
  277 + *
  278 + * 1. The mounted_lock tells a node it's the first to mount the fs.
  279 + * Each node holds the mounted_lock in PR while it's mounted.
  280 + * Each node tries to acquire the mounted_lock in EX when it mounts.
  281 + * If a node is granted the mounted_lock EX it means there are no
  282 + * other mounted nodes (no PR locks exist), and it is the first mounter.
  283 + * The mounted_lock is demoted to PR when first recovery is done, so
  284 + * others will fail to get an EX lock, but will get a PR lock.
  285 + *
  286 + * 2. The control_lock blocks others in control_mount() while the first
  287 + * mounter is doing first mount recovery of all journals.
  288 + * A mounting node needs to acquire control_lock in EX mode before
  289 + * it can proceed. The first mounter holds control_lock in EX while doing
  290 + * the first mount recovery, blocking mounts from other nodes, then demotes
  291 + * control_lock to NL when it's done (others_may_mount/first_done),
  292 + * allowing other nodes to continue mounting.
  293 + *
  294 + * first mounter:
  295 + * control_lock EX/NOQUEUE success
  296 + * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters)
  297 + * set first=1
  298 + * do first mounter recovery
  299 + * mounted_lock EX->PR
  300 + * control_lock EX->NL, write lvb generation
  301 + *
  302 + * other mounter:
  303 + * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry)
  304 + * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR)
  305 + * mounted_lock PR/NOQUEUE success
  306 + * read lvb generation
  307 + * control_lock EX->NL
  308 + * set first=0
  309 + *
  310 + * - mount during recovery
  311 + *
  312 + * If a node mounts while others are doing recovery (not first mounter),
  313 + * the mounting node will get its initial recover_done() callback without
  314 + * having seen any previous failures/callbacks.
  315 + *
  316 + * It must wait for all recoveries preceding its mount to be finished
  317 + * before it unblocks locks. It does this by repeating the "other mounter"
  318 + * steps above until the lvb generation number is >= its mount generation
  319 + * number (from initial recover_done) and all lvb bits are clear.
  320 + *
  321 + * - control_lock lvb format
  322 + *
  323 + * 4 bytes generation number: the latest dlm lockspace generation number
  324 + * from recover_done callback. Indicates the jid bitmap has been updated
  325 + * to reflect all slot failures through that generation.
  326 + * 4 bytes unused.
  327 + * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates
  328 + * that jid N needs recovery.
  329 + */
  330 +
  331 +#define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */
  332 +
  333 +static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen,
  334 + char *lvb_bits)
189 335 {
  336 + uint32_t gen;
  337 + memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE);
  338 + memcpy(&gen, lvb_bits, sizeof(uint32_t));
  339 + *lvb_gen = le32_to_cpu(gen);
  340 +}
  341 +
  342 +static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen,
  343 + char *lvb_bits)
  344 +{
  345 + uint32_t gen;
  346 + memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE);
  347 + gen = cpu_to_le32(lvb_gen);
  348 + memcpy(ls->ls_control_lvb, &gen, sizeof(uint32_t));
  349 +}
  350 +
  351 +static int all_jid_bits_clear(char *lvb)
  352 +{
  353 + int i;
  354 + for (i = JID_BITMAP_OFFSET; i < GDLM_LVB_SIZE; i++) {
  355 + if (lvb[i])
  356 + return 0;
  357 + }
  358 + return 1;
  359 +}
  360 +
  361 +static void sync_wait_cb(void *arg)
  362 +{
  363 + struct lm_lockstruct *ls = arg;
  364 + complete(&ls->ls_sync_wait);
  365 +}
  366 +
  367 +static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name)
  368 +{
190 369 struct lm_lockstruct *ls = &sdp->sd_lockstruct;
191 370 int error;
192 371  
193   - if (fsname == NULL) {
194   - fs_info(sdp, "no fsname found\n");
195   - return -EINVAL;
  372 + error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
  373 + if (error) {
  374 + fs_err(sdp, "%s lkid %x error %d\n",
  375 + name, lksb->sb_lkid, error);
  376 + return error;
196 377 }
197 378  
198   - error = dlm_new_lockspace(fsname, NULL,
199   - DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
200   - (ls->ls_nodir ? DLM_LSFL_NODIR : 0),
201   - GDLM_LVB_SIZE, NULL, NULL, NULL, &ls->ls_dlm);
  379 + wait_for_completion(&ls->ls_sync_wait);
  380 +
  381 + if (lksb->sb_status != -DLM_EUNLOCK) {
  382 + fs_err(sdp, "%s lkid %x status %d\n",
  383 + name, lksb->sb_lkid, lksb->sb_status);
  384 + return -1;
  385 + }
  386 + return 0;
  387 +}
  388 +
  389 +static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags,
  390 + unsigned int num, struct dlm_lksb *lksb, char *name)
  391 +{
  392 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  393 + char strname[GDLM_STRNAME_BYTES];
  394 + int error, status;
  395 +
  396 + memset(strname, 0, GDLM_STRNAME_BYTES);
  397 + snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
  398 +
  399 + error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
  400 + strname, GDLM_STRNAME_BYTES - 1,
  401 + 0, sync_wait_cb, ls, NULL);
  402 + if (error) {
  403 + fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
  404 + name, lksb->sb_lkid, flags, mode, error);
  405 + return error;
  406 + }
  407 +
  408 + wait_for_completion(&ls->ls_sync_wait);
  409 +
  410 + status = lksb->sb_status;
  411 +
  412 + if (status && status != -EAGAIN) {
  413 + fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n",
  414 + name, lksb->sb_lkid, flags, mode, status);
  415 + }
  416 +
  417 + return status;
  418 +}
  419 +
  420 +static int mounted_unlock(struct gfs2_sbd *sdp)
  421 +{
  422 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  423 + return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock");
  424 +}
  425 +
  426 +static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
  427 +{
  428 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  429 + return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK,
  430 + &ls->ls_mounted_lksb, "mounted_lock");
  431 +}
  432 +
  433 +static int control_unlock(struct gfs2_sbd *sdp)
  434 +{
  435 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  436 + return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock");
  437 +}
  438 +
  439 +static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
  440 +{
  441 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  442 + return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK,
  443 + &ls->ls_control_lksb, "control_lock");
  444 +}
  445 +
  446 +static void gfs2_control_func(struct work_struct *work)
  447 +{
  448 + struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
  449 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  450 + char lvb_bits[GDLM_LVB_SIZE];
  451 + uint32_t block_gen, start_gen, lvb_gen, flags;
  452 + int recover_set = 0;
  453 + int write_lvb = 0;
  454 + int recover_size;
  455 + int i, error;
  456 +
  457 + spin_lock(&ls->ls_recover_spin);
  458 + /*
  459 + * No MOUNT_DONE means we're still mounting; control_mount()
  460 + * will set this flag, after which this thread will take over
  461 + * all further clearing of BLOCK_LOCKS.
  462 + *
  463 + * FIRST_MOUNT means this node is doing first mounter recovery,
  464 + * for which recovery control is handled by
  465 + * control_mount()/control_first_done(), not this thread.
  466 + */
  467 + if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
  468 + test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
  469 + spin_unlock(&ls->ls_recover_spin);
  470 + return;
  471 + }
  472 + block_gen = ls->ls_recover_block;
  473 + start_gen = ls->ls_recover_start;
  474 + spin_unlock(&ls->ls_recover_spin);
  475 +
  476 + /*
  477 + * Equal block_gen and start_gen implies we are between
  478 + * recover_prep and recover_done callbacks, which means
  479 + * dlm recovery is in progress and dlm locking is blocked.
  480 + * There's no point trying to do any work until recover_done.
  481 + */
  482 +
  483 + if (block_gen == start_gen)
  484 + return;
  485 +
  486 + /*
  487 + * Propagate recover_submit[] and recover_result[] to lvb:
  488 + * dlm_recoverd adds to recover_submit[] jids needing recovery
  489 + * gfs2_recover adds to recover_result[] journal recovery results
  490 + *
  491 + * set lvb bit for jids in recover_submit[] if the lvb has not
  492 + * yet been updated for the generation of the failure
  493 + *
  494 + * clear lvb bit for jids in recover_result[] if the result of
  495 + * the journal recovery is SUCCESS
  496 + */
  497 +
  498 + error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
  499 + if (error) {
  500 + fs_err(sdp, "control lock EX error %d\n", error);
  501 + return;
  502 + }
  503 +
  504 + control_lvb_read(ls, &lvb_gen, lvb_bits);
  505 +
  506 + spin_lock(&ls->ls_recover_spin);
  507 + if (block_gen != ls->ls_recover_block ||
  508 + start_gen != ls->ls_recover_start) {
  509 + fs_info(sdp, "recover generation %u block1 %u %u\n",
  510 + start_gen, block_gen, ls->ls_recover_block);
  511 + spin_unlock(&ls->ls_recover_spin);
  512 + control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
  513 + return;
  514 + }
  515 +
  516 + recover_size = ls->ls_recover_size;
  517 +
  518 + if (lvb_gen <= start_gen) {
  519 + /*
  520 + * Clear lvb bits for jids we've successfully recovered.
  521 + * Because all nodes attempt to recover failed journals,
  522 + * a journal can be recovered multiple times successfully
  523 + * in succession. Only the first will really do recovery,
  524 + * the others find it clean, but still report a successful
  525 + * recovery. So, another node may have already recovered
  526 + * the jid and cleared the lvb bit for it.
  527 + */
  528 + for (i = 0; i < recover_size; i++) {
  529 + if (ls->ls_recover_result[i] != LM_RD_SUCCESS)
  530 + continue;
  531 +
  532 + ls->ls_recover_result[i] = 0;
  533 +
  534 + if (!test_bit_le(i, lvb_bits + JID_BITMAP_OFFSET))
  535 + continue;
  536 +
  537 + __clear_bit_le(i, lvb_bits + JID_BITMAP_OFFSET);
  538 + write_lvb = 1;
  539 + }
  540 + }
  541 +
  542 + if (lvb_gen == start_gen) {
  543 + /*
  544 + * Failed slots before start_gen are already set in lvb.
  545 + */
  546 + for (i = 0; i < recover_size; i++) {
  547 + if (!ls->ls_recover_submit[i])
  548 + continue;
  549 + if (ls->ls_recover_submit[i] < lvb_gen)
  550 + ls->ls_recover_submit[i] = 0;
  551 + }
  552 + } else if (lvb_gen < start_gen) {
  553 + /*
  554 + * Failed slots before start_gen are not yet set in lvb.
  555 + */
  556 + for (i = 0; i < recover_size; i++) {
  557 + if (!ls->ls_recover_submit[i])
  558 + continue;
  559 + if (ls->ls_recover_submit[i] < start_gen) {
  560 + ls->ls_recover_submit[i] = 0;
  561 + __set_bit_le(i, lvb_bits + JID_BITMAP_OFFSET);
  562 + }
  563 + }
  564 + /* even if there are no bits to set, we need to write the
  565 + latest generation to the lvb */
  566 + write_lvb = 1;
  567 + } else {
  568 + /*
  569 + * we should be getting a recover_done() for lvb_gen soon
  570 + */
  571 + }
  572 + spin_unlock(&ls->ls_recover_spin);
  573 +
  574 + if (write_lvb) {
  575 + control_lvb_write(ls, start_gen, lvb_bits);
  576 + flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK;
  577 + } else {
  578 + flags = DLM_LKF_CONVERT;
  579 + }
  580 +
  581 + error = control_lock(sdp, DLM_LOCK_NL, flags);
  582 + if (error) {
  583 + fs_err(sdp, "control lock NL error %d\n", error);
  584 + return;
  585 + }
  586 +
  587 + /*
  588 + * Everyone will see jid bits set in the lvb, run gfs2_recover_set(),
  589 + * and clear a jid bit in the lvb if the recovery is a success.
  590 + * Eventually all journals will be recovered, all jid bits will
  591 + * be cleared in the lvb, and everyone will clear BLOCK_LOCKS.
  592 + */
  593 +
  594 + for (i = 0; i < recover_size; i++) {
  595 + if (test_bit_le(i, lvb_bits + JID_BITMAP_OFFSET)) {
  596 + fs_info(sdp, "recover generation %u jid %d\n",
  597 + start_gen, i);
  598 + gfs2_recover_set(sdp, i);
  599 + recover_set++;
  600 + }
  601 + }
  602 + if (recover_set)
  603 + return;
  604 +
  605 + /*
  606 + * No more jid bits set in lvb, all recovery is done, unblock locks
  607 + * (unless a new recover_prep callback has occured blocking locks
  608 + * again while working above)
  609 + */
  610 +
  611 + spin_lock(&ls->ls_recover_spin);
  612 + if (ls->ls_recover_block == block_gen &&
  613 + ls->ls_recover_start == start_gen) {
  614 + clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
  615 + spin_unlock(&ls->ls_recover_spin);
  616 + fs_info(sdp, "recover generation %u done\n", start_gen);
  617 + gfs2_glock_thaw(sdp);
  618 + } else {
  619 + fs_info(sdp, "recover generation %u block2 %u %u\n",
  620 + start_gen, block_gen, ls->ls_recover_block);
  621 + spin_unlock(&ls->ls_recover_spin);
  622 + }
  623 +}
  624 +
  625 +static int control_mount(struct gfs2_sbd *sdp)
  626 +{
  627 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  628 + char lvb_bits[GDLM_LVB_SIZE];
  629 + uint32_t start_gen, block_gen, mount_gen, lvb_gen;
  630 + int mounted_mode;
  631 + int retries = 0;
  632 + int error;
  633 +
  634 + memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb));
  635 + memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb));
  636 + memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE);
  637 + ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb;
  638 + init_completion(&ls->ls_sync_wait);
  639 +
  640 + set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
  641 +
  642 + error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK);
  643 + if (error) {
  644 + fs_err(sdp, "control_mount control_lock NL error %d\n", error);
  645 + return error;
  646 + }
  647 +
  648 + error = mounted_lock(sdp, DLM_LOCK_NL, 0);
  649 + if (error) {
  650 + fs_err(sdp, "control_mount mounted_lock NL error %d\n", error);
  651 + control_unlock(sdp);
  652 + return error;
  653 + }
  654 + mounted_mode = DLM_LOCK_NL;
  655 +
  656 +restart:
  657 + if (retries++ && signal_pending(current)) {
  658 + error = -EINTR;
  659 + goto fail;
  660 + }
  661 +
  662 + /*
  663 + * We always start with both locks in NL. control_lock is
  664 + * demoted to NL below so we don't need to do it here.
  665 + */
  666 +
  667 + if (mounted_mode != DLM_LOCK_NL) {
  668 + error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
  669 + if (error)
  670 + goto fail;
  671 + mounted_mode = DLM_LOCK_NL;
  672 + }
  673 +
  674 + /*
  675 + * Other nodes need to do some work in dlm recovery and gfs2_control
  676 + * before the recover_done and control_lock will be ready for us below.
  677 + * A delay here is not required but often avoids having to retry.
  678 + */
  679 +
  680 + msleep_interruptible(500);
  681 +
  682 + /*
  683 + * Acquire control_lock in EX and mounted_lock in either EX or PR.
  684 + * control_lock lvb keeps track of any pending journal recoveries.
  685 + * mounted_lock indicates if any other nodes have the fs mounted.
  686 + */
  687 +
  688 + error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK);
  689 + if (error == -EAGAIN) {
  690 + goto restart;
  691 + } else if (error) {
  692 + fs_err(sdp, "control_mount control_lock EX error %d\n", error);
  693 + goto fail;
  694 + }
  695 +
  696 + error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
  697 + if (!error) {
  698 + mounted_mode = DLM_LOCK_EX;
  699 + goto locks_done;
  700 + } else if (error != -EAGAIN) {
  701 + fs_err(sdp, "control_mount mounted_lock EX error %d\n", error);
  702 + goto fail;
  703 + }
  704 +
  705 + error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
  706 + if (!error) {
  707 + mounted_mode = DLM_LOCK_PR;
  708 + goto locks_done;
  709 + } else {
  710 + /* not even -EAGAIN should happen here */
  711 + fs_err(sdp, "control_mount mounted_lock PR error %d\n", error);
  712 + goto fail;
  713 + }
  714 +
  715 +locks_done:
  716 + /*
  717 + * If we got both locks above in EX, then we're the first mounter.
  718 + * If not, then we need to wait for the control_lock lvb to be
  719 + * updated by other mounted nodes to reflect our mount generation.
  720 + *
  721 + * In simple first mounter cases, first mounter will see zero lvb_gen,
  722 + * but in cases where all existing nodes leave/fail before mounting
  723 + * nodes finish control_mount, then all nodes will be mounting and
  724 + * lvb_gen will be non-zero.
  725 + */
  726 +
  727 + control_lvb_read(ls, &lvb_gen, lvb_bits);
  728 +
  729 + if (lvb_gen == 0xFFFFFFFF) {
  730 + /* special value to force mount attempts to fail */
  731 + fs_err(sdp, "control_mount control_lock disabled\n");
  732 + error = -EINVAL;
  733 + goto fail;
  734 + }
  735 +
  736 + if (mounted_mode == DLM_LOCK_EX) {
  737 + /* first mounter, keep both EX while doing first recovery */
  738 + spin_lock(&ls->ls_recover_spin);
  739 + clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
  740 + set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
  741 + set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
  742 + spin_unlock(&ls->ls_recover_spin);
  743 + fs_info(sdp, "first mounter control generation %u\n", lvb_gen);
  744 + return 0;
  745 + }
  746 +
  747 + error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
202 748 if (error)
203   - printk(KERN_ERR "dlm_new_lockspace error %d", error);
  749 + goto fail;
204 750  
  751 + /*
  752 + * We are not first mounter, now we need to wait for the control_lock
  753 + * lvb generation to be >= the generation from our first recover_done
  754 + * and all lvb bits to be clear (no pending journal recoveries.)
  755 + */
  756 +
  757 + if (!all_jid_bits_clear(lvb_bits)) {
  758 + /* journals need recovery, wait until all are clear */
  759 + fs_info(sdp, "control_mount wait for journal recovery\n");
  760 + goto restart;
  761 + }
  762 +
  763 + spin_lock(&ls->ls_recover_spin);
  764 + block_gen = ls->ls_recover_block;
  765 + start_gen = ls->ls_recover_start;
  766 + mount_gen = ls->ls_recover_mount;
  767 +
  768 + if (lvb_gen < mount_gen) {
  769 + /* wait for mounted nodes to update control_lock lvb to our
  770 + generation, which might include new recovery bits set */
  771 + fs_info(sdp, "control_mount wait1 block %u start %u mount %u "
  772 + "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
  773 + lvb_gen, ls->ls_recover_flags);
  774 + spin_unlock(&ls->ls_recover_spin);
  775 + goto restart;
  776 + }
  777 +
  778 + if (lvb_gen != start_gen) {
  779 + /* wait for mounted nodes to update control_lock lvb to the
  780 + latest recovery generation */
  781 + fs_info(sdp, "control_mount wait2 block %u start %u mount %u "
  782 + "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
  783 + lvb_gen, ls->ls_recover_flags);
  784 + spin_unlock(&ls->ls_recover_spin);
  785 + goto restart;
  786 + }
  787 +
  788 + if (block_gen == start_gen) {
  789 + /* dlm recovery in progress, wait for it to finish */
  790 + fs_info(sdp, "control_mount wait3 block %u start %u mount %u "
  791 + "lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
  792 + lvb_gen, ls->ls_recover_flags);
  793 + spin_unlock(&ls->ls_recover_spin);
  794 + goto restart;
  795 + }
  796 +
  797 + clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
  798 + set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
  799 + memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
  800 + memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
  801 + spin_unlock(&ls->ls_recover_spin);
  802 + return 0;
  803 +
  804 +fail:
  805 + mounted_unlock(sdp);
  806 + control_unlock(sdp);
205 807 return error;
206 808 }
207 809  
  810 +static int dlm_recovery_wait(void *word)
  811 +{
  812 + schedule();
  813 + return 0;
  814 +}
  815 +
  816 +static int control_first_done(struct gfs2_sbd *sdp)
  817 +{
  818 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  819 + char lvb_bits[GDLM_LVB_SIZE];
  820 + uint32_t start_gen, block_gen;
  821 + int error;
  822 +
  823 +restart:
  824 + spin_lock(&ls->ls_recover_spin);
  825 + start_gen = ls->ls_recover_start;
  826 + block_gen = ls->ls_recover_block;
  827 +
  828 + if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) ||
  829 + !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
  830 + !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
  831 + /* sanity check, should not happen */
  832 + fs_err(sdp, "control_first_done start %u block %u flags %lx\n",
  833 + start_gen, block_gen, ls->ls_recover_flags);
  834 + spin_unlock(&ls->ls_recover_spin);
  835 + control_unlock(sdp);
  836 + return -1;
  837 + }
  838 +
  839 + if (start_gen == block_gen) {
  840 + /*
  841 + * Wait for the end of a dlm recovery cycle to switch from
  842 + * first mounter recovery. We can ignore any recover_slot
  843 + * callbacks between the recover_prep and next recover_done
  844 + * because we are still the first mounter and any failed nodes
  845 + * have not fully mounted, so they don't need recovery.
  846 + */
  847 + spin_unlock(&ls->ls_recover_spin);
  848 + fs_info(sdp, "control_first_done wait gen %u\n", start_gen);
  849 +
  850 + wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY,
  851 + dlm_recovery_wait, TASK_UNINTERRUPTIBLE);
  852 + goto restart;
  853 + }
  854 +
  855 + clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
  856 + set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags);
  857 + memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
  858 + memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
  859 + spin_unlock(&ls->ls_recover_spin);
  860 +
  861 + memset(lvb_bits, 0, sizeof(lvb_bits));
  862 + control_lvb_write(ls, start_gen, lvb_bits);
  863 +
  864 + error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT);
  865 + if (error)
  866 + fs_err(sdp, "control_first_done mounted PR error %d\n", error);
  867 +
  868 + error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
  869 + if (error)
  870 + fs_err(sdp, "control_first_done control NL error %d\n", error);
  871 +
  872 + return error;
  873 +}
  874 +
  875 +/*
  876 + * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC)
  877 + * to accomodate the largest slot number. (NB dlm slot numbers start at 1,
  878 + * gfs2 jids start at 0, so jid = slot - 1)
  879 + */
  880 +
  881 +#define RECOVER_SIZE_INC 16
  882 +
  883 +static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots,
  884 + int num_slots)
  885 +{
  886 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  887 + uint32_t *submit = NULL;
  888 + uint32_t *result = NULL;
  889 + uint32_t old_size, new_size;
  890 + int i, max_jid;
  891 +
  892 + max_jid = 0;
  893 + for (i = 0; i < num_slots; i++) {
  894 + if (max_jid < slots[i].slot - 1)
  895 + max_jid = slots[i].slot - 1;
  896 + }
  897 +
  898 + old_size = ls->ls_recover_size;
  899 +
  900 + if (old_size >= max_jid + 1)
  901 + return 0;
  902 +
  903 + new_size = old_size + RECOVER_SIZE_INC;
  904 +
  905 + submit = kzalloc(new_size * sizeof(uint32_t), GFP_NOFS);
  906 + result = kzalloc(new_size * sizeof(uint32_t), GFP_NOFS);
  907 + if (!submit || !result) {
  908 + kfree(submit);
  909 + kfree(result);
  910 + return -ENOMEM;
  911 + }
  912 +
  913 + spin_lock(&ls->ls_recover_spin);
  914 + memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t));
  915 + memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t));
  916 + kfree(ls->ls_recover_submit);
  917 + kfree(ls->ls_recover_result);
  918 + ls->ls_recover_submit = submit;
  919 + ls->ls_recover_result = result;
  920 + ls->ls_recover_size = new_size;
  921 + spin_unlock(&ls->ls_recover_spin);
  922 + return 0;
  923 +}
  924 +
  925 +static void free_recover_size(struct lm_lockstruct *ls)
  926 +{
  927 + kfree(ls->ls_recover_submit);
  928 + kfree(ls->ls_recover_result);
  929 + ls->ls_recover_submit = NULL;
  930 + ls->ls_recover_result = NULL;
  931 + ls->ls_recover_size = 0;
  932 +}
  933 +
  934 +/* dlm calls before it does lock recovery */
  935 +
  936 +static void gdlm_recover_prep(void *arg)
  937 +{
  938 + struct gfs2_sbd *sdp = arg;
  939 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  940 +
  941 + spin_lock(&ls->ls_recover_spin);
  942 + ls->ls_recover_block = ls->ls_recover_start;
  943 + set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
  944 +
  945 + if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
  946 + test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
  947 + spin_unlock(&ls->ls_recover_spin);
  948 + return;
  949 + }
  950 + set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
  951 + spin_unlock(&ls->ls_recover_spin);
  952 +}
  953 +
  954 +/* dlm calls after recover_prep has been completed on all lockspace members;
  955 + identifies slot/jid of failed member */
  956 +
  957 +static void gdlm_recover_slot(void *arg, struct dlm_slot *slot)
  958 +{
  959 + struct gfs2_sbd *sdp = arg;
  960 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  961 + int jid = slot->slot - 1;
  962 +
  963 + spin_lock(&ls->ls_recover_spin);
  964 + if (ls->ls_recover_size < jid + 1) {
  965 + fs_err(sdp, "recover_slot jid %d gen %u short size %d",
  966 + jid, ls->ls_recover_block, ls->ls_recover_size);
  967 + spin_unlock(&ls->ls_recover_spin);
  968 + return;
  969 + }
  970 +
  971 + if (ls->ls_recover_submit[jid]) {
  972 + fs_info(sdp, "recover_slot jid %d gen %u prev %u",
  973 + jid, ls->ls_recover_block, ls->ls_recover_submit[jid]);
  974 + }
  975 + ls->ls_recover_submit[jid] = ls->ls_recover_block;
  976 + spin_unlock(&ls->ls_recover_spin);
  977 +}
  978 +
  979 +/* dlm calls after recover_slot and after it completes lock recovery */
  980 +
  981 +static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
  982 + int our_slot, uint32_t generation)
  983 +{
  984 + struct gfs2_sbd *sdp = arg;
  985 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  986 +
  987 + /* ensure the ls jid arrays are large enough */
  988 + set_recover_size(sdp, slots, num_slots);
  989 +
  990 + spin_lock(&ls->ls_recover_spin);
  991 + ls->ls_recover_start = generation;
  992 +
  993 + if (!ls->ls_recover_mount) {
  994 + ls->ls_recover_mount = generation;
  995 + ls->ls_jid = our_slot - 1;
  996 + }
  997 +
  998 + if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
  999 + queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
  1000 +
  1001 + clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
  1002 + smp_mb__after_clear_bit();
  1003 + wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY);
  1004 + spin_unlock(&ls->ls_recover_spin);
  1005 +}
  1006 +
  1007 +/* gfs2_recover thread has a journal recovery result */
  1008 +
  1009 +static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid,
  1010 + unsigned int result)
  1011 +{
  1012 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  1013 +
  1014 + if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
  1015 + return;
  1016 +
  1017 + /* don't care about the recovery of own journal during mount */
  1018 + if (jid == ls->ls_jid)
  1019 + return;
  1020 +
  1021 + spin_lock(&ls->ls_recover_spin);
  1022 + if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
  1023 + spin_unlock(&ls->ls_recover_spin);
  1024 + return;
  1025 + }
  1026 + if (ls->ls_recover_size < jid + 1) {
  1027 + fs_err(sdp, "recovery_result jid %d short size %d",
  1028 + jid, ls->ls_recover_size);
  1029 + spin_unlock(&ls->ls_recover_spin);
  1030 + return;
  1031 + }
  1032 +
  1033 + fs_info(sdp, "recover jid %d result %s\n", jid,
  1034 + result == LM_RD_GAVEUP ? "busy" : "success");
  1035 +
  1036 + ls->ls_recover_result[jid] = result;
  1037 +
  1038 + /* GAVEUP means another node is recovering the journal; delay our
  1039 + next attempt to recover it, to give the other node a chance to
  1040 + finish before trying again */
  1041 +
  1042 + if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
  1043 + queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work,
  1044 + result == LM_RD_GAVEUP ? HZ : 0);
  1045 + spin_unlock(&ls->ls_recover_spin);
  1046 +}
  1047 +
  1048 +const struct dlm_lockspace_ops gdlm_lockspace_ops = {
  1049 + .recover_prep = gdlm_recover_prep,
  1050 + .recover_slot = gdlm_recover_slot,
  1051 + .recover_done = gdlm_recover_done,
  1052 +};
  1053 +
  1054 +static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
  1055 +{
  1056 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  1057 + char cluster[GFS2_LOCKNAME_LEN];
  1058 + const char *fsname;
  1059 + uint32_t flags;
  1060 + int error, ops_result;
  1061 +
  1062 + /*
  1063 + * initialize everything
  1064 + */
  1065 +
  1066 + INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func);
  1067 + spin_lock_init(&ls->ls_recover_spin);
  1068 + ls->ls_recover_flags = 0;
  1069 + ls->ls_recover_mount = 0;
  1070 + ls->ls_recover_start = 0;
  1071 + ls->ls_recover_block = 0;
  1072 + ls->ls_recover_size = 0;
  1073 + ls->ls_recover_submit = NULL;
  1074 + ls->ls_recover_result = NULL;
  1075 +
  1076 + error = set_recover_size(sdp, NULL, 0);
  1077 + if (error)
  1078 + goto fail;
  1079 +
  1080 + /*
  1081 + * prepare dlm_new_lockspace args
  1082 + */
  1083 +
  1084 + fsname = strchr(table, ':');
  1085 + if (!fsname) {
  1086 + fs_info(sdp, "no fsname found\n");
  1087 + error = -EINVAL;
  1088 + goto fail_free;
  1089 + }
  1090 + memset(cluster, 0, sizeof(cluster));
  1091 + memcpy(cluster, table, strlen(table) - strlen(fsname));
  1092 + fsname++;
  1093 +
  1094 + flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
  1095 + if (ls->ls_nodir)
  1096 + flags |= DLM_LSFL_NODIR;
  1097 +
  1098 + /*
  1099 + * create/join lockspace
  1100 + */
  1101 +
  1102 + error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE,
  1103 + &gdlm_lockspace_ops, sdp, &ops_result,
  1104 + &ls->ls_dlm);
  1105 + if (error) {
  1106 + fs_err(sdp, "dlm_new_lockspace error %d\n", error);
  1107 + goto fail_free;
  1108 + }
  1109 +
  1110 + if (ops_result < 0) {
  1111 + /*
  1112 + * dlm does not support ops callbacks,
  1113 + * old dlm_controld/gfs_controld are used, try without ops.
  1114 + */
  1115 + fs_info(sdp, "dlm lockspace ops not used\n");
  1116 + free_recover_size(ls);
  1117 + set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags);
  1118 + return 0;
  1119 + }
  1120 +
  1121 + if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) {
  1122 + fs_err(sdp, "dlm lockspace ops disallow jid preset\n");
  1123 + error = -EINVAL;
  1124 + goto fail_release;
  1125 + }
  1126 +
  1127 + /*
  1128 + * control_mount() uses control_lock to determine first mounter,
  1129 + * and for later mounts, waits for any recoveries to be cleared.
  1130 + */
  1131 +
  1132 + error = control_mount(sdp);
  1133 + if (error) {
  1134 + fs_err(sdp, "mount control error %d\n", error);
  1135 + goto fail_release;
  1136 + }
  1137 +
  1138 + ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
  1139 + clear_bit(SDF_NOJOURNALID, &sdp->sd_flags);
  1140 + smp_mb__after_clear_bit();
  1141 + wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID);
  1142 + return 0;
  1143 +
  1144 +fail_release:
  1145 + dlm_release_lockspace(ls->ls_dlm, 2);
  1146 +fail_free:
  1147 + free_recover_size(ls);
  1148 +fail:
  1149 + return error;
  1150 +}
  1151 +
  1152 +static void gdlm_first_done(struct gfs2_sbd *sdp)
  1153 +{
  1154 + struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  1155 + int error;
  1156 +
  1157 + if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
  1158 + return;
  1159 +
  1160 + error = control_first_done(sdp);
  1161 + if (error)
  1162 + fs_err(sdp, "mount first_done error %d\n", error);
  1163 +}
  1164 +
208 1165 static void gdlm_unmount(struct gfs2_sbd *sdp)
209 1166 {
210 1167 struct lm_lockstruct *ls = &sdp->sd_lockstruct;
211 1168  
  1169 + if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
  1170 + goto release;
  1171 +
  1172 + /* wait for gfs2_control_wq to be done with this mount */
  1173 +
  1174 + spin_lock(&ls->ls_recover_spin);
  1175 + set_bit(DFL_UNMOUNT, &ls->ls_recover_flags);
  1176 + spin_unlock(&ls->ls_recover_spin);
  1177 + flush_delayed_work_sync(&sdp->sd_control_work);
  1178 +
  1179 + /* mounted_lock and control_lock will be purged in dlm recovery */
  1180 +release:
212 1181 if (ls->ls_dlm) {
213 1182 dlm_release_lockspace(ls->ls_dlm, 2);
214 1183 ls->ls_dlm = NULL;
215 1184 }
  1185 +
  1186 + free_recover_size(ls);
216 1187 }
217 1188  
218 1189 static const match_table_t dlm_tokens = {
... ... @@ -226,6 +1197,8 @@
226 1197 const struct lm_lockops gfs2_dlm_ops = {
227 1198 .lm_proto_name = "lock_dlm",
228 1199 .lm_mount = gdlm_mount,
  1200 + .lm_first_done = gdlm_first_done,
  1201 + .lm_recovery_result = gdlm_recovery_result,
229 1202 .lm_unmount = gdlm_unmount,
230 1203 .lm_put_lock = gdlm_put_lock,
231 1204 .lm_lock = gdlm_lock,
... ... @@ -28,6 +28,8 @@
28 28 #include "recovery.h"
29 29 #include "dir.h"
30 30  
  31 +struct workqueue_struct *gfs2_control_wq;
  32 +
31 33 static struct shrinker qd_shrinker = {
32 34 .shrink = gfs2_shrink_qd_memory,
33 35 .seeks = DEFAULT_SEEKS,
34 36  
... ... @@ -146,12 +148,19 @@
146 148 if (!gfs_recovery_wq)
147 149 goto fail_wq;
148 150  
  151 + gfs2_control_wq = alloc_workqueue("gfs2_control",
  152 + WQ_NON_REENTRANT | WQ_UNBOUND | WQ_FREEZABLE, 0);
  153 + if (!gfs2_control_wq)
  154 + goto fail_control;
  155 +
149 156 gfs2_register_debugfs();
150 157  
151 158 printk("GFS2 installed\n");
152 159  
153 160 return 0;
154 161  
  162 +fail_control:
  163 + destroy_workqueue(gfs_recovery_wq);
155 164 fail_wq:
156 165 unregister_filesystem(&gfs2meta_fs_type);
157 166 fail_unregister:
... ... @@ -195,6 +204,7 @@
195 204 unregister_filesystem(&gfs2_fs_type);
196 205 unregister_filesystem(&gfs2meta_fs_type);
197 206 destroy_workqueue(gfs_recovery_wq);
  207 + destroy_workqueue(gfs2_control_wq);
198 208  
199 209 rcu_barrier();
200 210  
fs/gfs2/ops_fstype.c
... ... @@ -562,8 +562,12 @@
562 562 {
563 563 char *message = "FIRSTMOUNT=Done";
564 564 char *envp[] = { message, NULL };
565   - struct lm_lockstruct *ls = &sdp->sd_lockstruct;
566   - ls->ls_first_done = 1;
  565 +
  566 + fs_info(sdp, "first mount done, others may mount\n");
  567 +
  568 + if (sdp->sd_lockstruct.ls_ops->lm_first_done)
  569 + sdp->sd_lockstruct.ls_ops->lm_first_done(sdp);
  570 +
567 571 kobject_uevent_env(&sdp->sd_kobj, KOBJ_CHANGE, envp);
568 572 }
569 573  
... ... @@ -944,7 +948,6 @@
944 948 struct gfs2_args *args = &sdp->sd_args;
945 949 const char *proto = sdp->sd_proto_name;
946 950 const char *table = sdp->sd_table_name;
947   - const char *fsname;
948 951 char *o, *options;
949 952 int ret;
950 953  
951 954  
... ... @@ -1004,21 +1007,12 @@
1004 1007 }
1005 1008 }
1006 1009  
1007   - if (sdp->sd_args.ar_spectator)
1008   - snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s.s", table);
1009   - else
1010   - snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s.%u", table,
1011   - sdp->sd_lockstruct.ls_jid);
1012   -
1013   - fsname = strchr(table, ':');
1014   - if (fsname)
1015   - fsname++;
1016 1010 if (lm->lm_mount == NULL) {
1017 1011 fs_info(sdp, "Now mounting FS...\n");
1018 1012 complete_all(&sdp->sd_locking_init);
1019 1013 return 0;
1020 1014 }
1021   - ret = lm->lm_mount(sdp, fsname);
  1015 + ret = lm->lm_mount(sdp, table);
1022 1016 if (ret == 0)
1023 1017 fs_info(sdp, "Joined cluster. Now mounting FS...\n");
1024 1018 complete_all(&sdp->sd_locking_init);
... ... @@ -1084,7 +1078,7 @@
1084 1078  
1085 1079 if (sdp->sd_args.ar_spectator) {
1086 1080 sb->s_flags |= MS_RDONLY;
1087   - set_bit(SDF_NORECOVERY, &sdp->sd_flags);
  1081 + set_bit(SDF_RORECOVERY, &sdp->sd_flags);
1088 1082 }
1089 1083 if (sdp->sd_args.ar_posix_acl)
1090 1084 sb->s_flags |= MS_POSIXACL;
... ... @@ -1124,6 +1118,8 @@
1124 1118 if (error)
1125 1119 goto fail;
1126 1120  
  1121 + snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s", sdp->sd_table_name);
  1122 +
1127 1123 gfs2_create_debugfs_file(sdp);
1128 1124  
1129 1125 error = gfs2_sys_fs_add(sdp);
... ... @@ -1159,6 +1155,13 @@
1159 1155 sdp->sd_lockstruct.ls_jid = 0;
1160 1156 goto fail_sb;
1161 1157 }
  1158 +
  1159 + if (sdp->sd_args.ar_spectator)
  1160 + snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s.s",
  1161 + sdp->sd_table_name);
  1162 + else
  1163 + snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s.%u",
  1164 + sdp->sd_table_name, sdp->sd_lockstruct.ls_jid);
1162 1165  
1163 1166 error = init_inodes(sdp, DO);
1164 1167 if (error)
... ... @@ -436,12 +436,16 @@
436 436 char env_status[20];
437 437 char *envp[] = { env_jid, env_status, NULL };
438 438 struct lm_lockstruct *ls = &sdp->sd_lockstruct;
  439 +
439 440 ls->ls_recover_jid_done = jid;
440 441 ls->ls_recover_jid_status = message;
441 442 sprintf(env_jid, "JID=%d", jid);
442 443 sprintf(env_status, "RECOVERY=%s",
443 444 message == LM_RD_SUCCESS ? "Done" : "Failed");
444 445 kobject_uevent_env(&sdp->sd_kobj, KOBJ_CHANGE, envp);
  446 +
  447 + if (sdp->sd_lockstruct.ls_ops->lm_recovery_result)
  448 + sdp->sd_lockstruct.ls_ops->lm_recovery_result(sdp, jid, message);
445 449 }
446 450  
447 451 void gfs2_recover_func(struct work_struct *work)
... ... @@ -512,7 +516,9 @@
512 516 if (error)
513 517 goto fail_gunlock_ji;
514 518  
515   - if (test_bit(SDF_JOURNAL_CHECKED, &sdp->sd_flags)) {
  519 + if (test_bit(SDF_RORECOVERY, &sdp->sd_flags)) {
  520 + ro = 1;
  521 + } else if (test_bit(SDF_JOURNAL_CHECKED, &sdp->sd_flags)) {
516 522 if (!test_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags))
517 523 ro = 1;
518 524 } else {
... ... @@ -577,6 +583,7 @@
577 583  
578 584 fs_info(sdp, "jid=%u: %s\n", jd->jd_jid, (error) ? "Failed" : "Done");
579 585 fail:
  586 + jd->jd_recover_error = error;
580 587 gfs2_recovery_done(sdp, jd->jd_jid, LM_RD_GAVEUP);
581 588 done:
582 589 clear_bit(JDF_RECOVERY, &jd->jd_flags);
... ... @@ -605,6 +612,6 @@
605 612 wait_on_bit(&jd->jd_flags, JDF_RECOVERY, gfs2_recovery_wait,
606 613 TASK_UNINTERRUPTIBLE);
607 614  
608   - return 0;
  615 + return wait ? jd->jd_recover_error : 0;
609 616 }
... ... @@ -1108,9 +1108,9 @@
1108 1108 {
1109 1109 struct gfs2_blkreserv *rs = ip->i_res;
1110 1110  
1111   - gfs2_blkrsv_put(ip);
1112 1111 if (rs->rs_rgd_gh.gh_gl)
1113 1112 gfs2_glock_dq_uninit(&rs->rs_rgd_gh);
  1113 + gfs2_blkrsv_put(ip);
1114 1114 }
1115 1115  
1116 1116 /**
... ... @@ -298,7 +298,7 @@
298 298 ssize_t ret;
299 299 int val = 0;
300 300  
301   - if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_flags))
  301 + if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags))
302 302 val = 1;
303 303 ret = sprintf(buf, "%d\n", val);
304 304 return ret;
305 305  
... ... @@ -313,9 +313,9 @@
313 313 val = simple_strtol(buf, NULL, 0);
314 314  
315 315 if (val == 1)
316   - set_bit(DFL_BLOCK_LOCKS, &ls->ls_flags);
  316 + set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
317 317 else if (val == 0) {
318   - clear_bit(DFL_BLOCK_LOCKS, &ls->ls_flags);
  318 + clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
319 319 smp_mb__after_clear_bit();
320 320 gfs2_glock_thaw(sdp);
321 321 } else {
... ... @@ -350,8 +350,8 @@
350 350 goto out;
351 351 if (sdp->sd_lockstruct.ls_ops->lm_mount == NULL)
352 352 goto out;
353   - sdp->sd_lockstruct.ls_first = first;
354   - rv = 0;
  353 + sdp->sd_lockstruct.ls_first = first;
  354 + rv = 0;
355 355 out:
356 356 spin_unlock(&sdp->sd_jindex_spin);
357 357 return rv ? rv : len;
358 358  
359 359  
360 360  
... ... @@ -360,19 +360,14 @@
360 360 static ssize_t first_done_show(struct gfs2_sbd *sdp, char *buf)
361 361 {
362 362 struct lm_lockstruct *ls = &sdp->sd_lockstruct;
363   - return sprintf(buf, "%d\n", ls->ls_first_done);
  363 + return sprintf(buf, "%d\n", !!test_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags));
364 364 }
365 365  
366   -static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len)
  366 +int gfs2_recover_set(struct gfs2_sbd *sdp, unsigned jid)
367 367 {
368   - unsigned jid;
369 368 struct gfs2_jdesc *jd;
370 369 int rv;
371 370  
372   - rv = sscanf(buf, "%u", &jid);
373   - if (rv != 1)
374   - return -EINVAL;
375   -
376 371 rv = -ESHUTDOWN;
377 372 spin_lock(&sdp->sd_jindex_spin);
378 373 if (test_bit(SDF_NORECOVERY, &sdp->sd_flags))
... ... @@ -389,6 +384,20 @@
389 384 }
390 385 out:
391 386 spin_unlock(&sdp->sd_jindex_spin);
  387 + return rv;
  388 +}
  389 +
  390 +static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len)
  391 +{
  392 + unsigned jid;
  393 + int rv;
  394 +
  395 + rv = sscanf(buf, "%u", &jid);
  396 + if (rv != 1)
  397 + return -EINVAL;
  398 +
  399 + rv = gfs2_recover_set(sdp, jid);
  400 +
392 401 return rv ? rv : len;
393 402 }
394 403  
... ... @@ -19,5 +19,7 @@
19 19 int gfs2_sys_init(void);
20 20 void gfs2_sys_uninit(void);
21 21  
  22 +int gfs2_recover_set(struct gfs2_sbd *sdp, unsigned jid);
  23 +
22 24 #endif /* __SYS_DOT_H__ */
include/linux/gfs2_ondisk.h
... ... @@ -22,6 +22,8 @@
22 22 #define GFS2_LIVE_LOCK 1
23 23 #define GFS2_TRANS_LOCK 2
24 24 #define GFS2_RENAME_LOCK 3
  25 +#define GFS2_CONTROL_LOCK 4
  26 +#define GFS2_MOUNTED_LOCK 5
25 27  
26 28 /* Format numbers for various metadata types */
27 29