Blame view

fs/ocfs2/dlm/dlmmaster.c 95.4 KB
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  /* -*- mode: c; c-basic-offset: 8; -*-
   * vim: noexpandtab sw=8 ts=8 sts=0:
   *
   * dlmmod.c
   *
   * standalone DLM module
   *
   * Copyright (C) 2004 Oracle.  All rights reserved.
   *
   * This program is free software; you can redistribute it and/or
   * modify it under the terms of the GNU General Public
   * License as published by the Free Software Foundation; either
   * version 2 of the License, or (at your option) any later version.
   *
   * This program is distributed in the hope that it will be useful,
   * but WITHOUT ANY WARRANTY; without even the implied warranty of
   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   * General Public License for more details.
   *
   * You should have received a copy of the GNU General Public
   * License along with this program; if not, write to the
   * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
   * Boston, MA 021110-1307, USA.
   *
   */
  
  
  #include <linux/module.h>
  #include <linux/fs.h>
  #include <linux/types.h>
  #include <linux/slab.h>
  #include <linux/highmem.h>
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
  #include <linux/init.h>
  #include <linux/sysctl.h>
  #include <linux/random.h>
  #include <linux/blkdev.h>
  #include <linux/socket.h>
  #include <linux/inet.h>
  #include <linux/spinlock.h>
  #include <linux/delay.h>
  
  
  #include "cluster/heartbeat.h"
  #include "cluster/nodemanager.h"
  #include "cluster/tcp.h"
  
  #include "dlmapi.h"
  #include "dlmcommon.h"
82353b594   Adrian Bunk   [PATCH] This patc...
49
  #include "dlmdomain.h"
e5a0334cb   Sunil Mushran   ocfs2/dlm: Move d...
50
  #include "dlmdebug.h"
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
51
52
53
  
  #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
  #include "cluster/masklog.h"
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
54
55
56
57
58
59
60
61
62
63
  static void dlm_mle_node_down(struct dlm_ctxt *dlm,
  			      struct dlm_master_list_entry *mle,
  			      struct o2nm_node *node,
  			      int idx);
  static void dlm_mle_node_up(struct dlm_ctxt *dlm,
  			    struct dlm_master_list_entry *mle,
  			    struct o2nm_node *node,
  			    int idx);
  
  static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
64
65
66
  static int dlm_do_assert_master(struct dlm_ctxt *dlm,
  				struct dlm_lock_resource *res,
  				void *nodemap, u32 flags);
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
67
  static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
68
69
70
71
72
73
  
  static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
  				struct dlm_master_list_entry *mle,
  				const char *name,
  				unsigned int namelen)
  {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
74
75
  	if (dlm != mle->dlm)
  		return 0;
7141514b8   Sunil Mushran   ocfs2/dlm: Remove...
76
77
  	if (namelen != mle->mnamelen ||
  	    memcmp(name, mle->mname, namelen) != 0)
f77a9a78c   Sunil Mushran   ocfs2/dlm: Clean ...
78
  		return 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
79
80
  	return 1;
  }
724bdca9b   Sunil Mushran   ocfs2/dlm: Create...
81
82
  static struct kmem_cache *dlm_lockres_cache = NULL;
  static struct kmem_cache *dlm_lockname_cache = NULL;
e18b890bb   Christoph Lameter   [PATCH] slab: rem...
83
  static struct kmem_cache *dlm_mle_cache = NULL;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
84

6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
85
86
87
88
89
90
91
92
93
94
95
96
  static void dlm_mle_release(struct kref *kref);
  static void dlm_init_mle(struct dlm_master_list_entry *mle,
  			enum dlm_mle_type type,
  			struct dlm_ctxt *dlm,
  			struct dlm_lock_resource *res,
  			const char *name,
  			unsigned int namelen);
  static void dlm_put_mle(struct dlm_master_list_entry *mle);
  static void __dlm_put_mle(struct dlm_master_list_entry *mle);
  static int dlm_find_mle(struct dlm_ctxt *dlm,
  			struct dlm_master_list_entry **mle,
  			char *name, unsigned int namelen);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
97
98
  static int dlm_do_master_request(struct dlm_lock_resource *res,
  				 struct dlm_master_list_entry *mle, int to);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
  
  
  static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
  				     struct dlm_lock_resource *res,
  				     struct dlm_master_list_entry *mle,
  				     int *blocked);
  static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
  				    struct dlm_lock_resource *res,
  				    struct dlm_master_list_entry *mle,
  				    int blocked);
  static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
  				 struct dlm_lock_resource *res,
  				 struct dlm_master_list_entry *mle,
  				 struct dlm_master_list_entry **oldmle,
  				 const char *name, unsigned int namelen,
  				 u8 new_master, u8 master);
  
  static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
  				    struct dlm_lock_resource *res);
  static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
  				      struct dlm_lock_resource *res);
  static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
  				       struct dlm_lock_resource *res,
  				       u8 target);
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
123
124
  static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
  				       struct dlm_lock_resource *res);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
  
  
  int dlm_is_host_down(int errno)
  {
  	switch (errno) {
  		case -EBADF:
  		case -ECONNREFUSED:
  		case -ENOTCONN:
  		case -ECONNRESET:
  		case -EPIPE:
  		case -EHOSTDOWN:
  		case -EHOSTUNREACH:
  		case -ETIMEDOUT:
  		case -ECONNABORTED:
  		case -ENETDOWN:
  		case -ENETUNREACH:
  		case -ENETRESET:
  		case -ESHUTDOWN:
  		case -ENOPROTOOPT:
  		case -EINVAL:   /* if returned from our tcp code,
  				   this means there is no socket */
  			return 1;
  	}
  	return 0;
  }
  
  
  /*
   * MASTER LIST FUNCTIONS
   */
  
  
  /*
   * regarding master list entries and heartbeat callbacks:
   *
   * in order to avoid sleeping and allocation that occurs in
   * heartbeat, master list entries are simply attached to the
   * dlm's established heartbeat callbacks.  the mle is attached
   * when it is created, and since the dlm->spinlock is held at
   * that time, any heartbeat event will be properly discovered
   * by the mle.  the mle needs to be detached from the
   * dlm->mle_hb_events list as soon as heartbeat events are no
   * longer useful to the mle, and before the mle is freed.
   *
   * as a general rule, heartbeat events are no longer needed by
   * the mle once an "answer" regarding the lock master has been
   * received.
   */
  static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
  					      struct dlm_master_list_entry *mle)
  {
  	assert_spin_locked(&dlm->spinlock);
  
  	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
  }
  
  
  static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
  					      struct dlm_master_list_entry *mle)
  {
  	if (!list_empty(&mle->hb_events))
  		list_del_init(&mle->hb_events);
  }
  
  
  static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
  					    struct dlm_master_list_entry *mle)
  {
  	spin_lock(&dlm->spinlock);
  	__dlm_mle_detach_hb_events(dlm, mle);
  	spin_unlock(&dlm->spinlock);
  }
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
  static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
  {
  	struct dlm_ctxt *dlm;
  	dlm = mle->dlm;
  
  	assert_spin_locked(&dlm->spinlock);
  	assert_spin_locked(&dlm->master_lock);
  	mle->inuse++;
  	kref_get(&mle->mle_refs);
  }
  
  static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
  {
  	struct dlm_ctxt *dlm;
  	dlm = mle->dlm;
  
  	spin_lock(&dlm->spinlock);
  	spin_lock(&dlm->master_lock);
  	mle->inuse--;
  	__dlm_put_mle(mle);
  	spin_unlock(&dlm->master_lock);
  	spin_unlock(&dlm->spinlock);
  
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
221
222
223
224
225
226
227
228
  /* remove from list and free */
  static void __dlm_put_mle(struct dlm_master_list_entry *mle)
  {
  	struct dlm_ctxt *dlm;
  	dlm = mle->dlm;
  
  	assert_spin_locked(&dlm->spinlock);
  	assert_spin_locked(&dlm->master_lock);
aa8523542   Kurt Hackel   ocfs2: mle ref co...
229
230
231
232
233
234
235
236
237
  	if (!atomic_read(&mle->mle_refs.refcount)) {
  		/* this may or may not crash, but who cares.
  		 * it's a BUG. */
  		mlog(ML_ERROR, "bad mle: %p
  ", mle);
  		dlm_print_one_mle(mle);
  		BUG();
  	} else
  		kref_put(&mle->mle_refs, dlm_mle_release);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
  }
  
  
  /* must not have any spinlocks coming in */
  static void dlm_put_mle(struct dlm_master_list_entry *mle)
  {
  	struct dlm_ctxt *dlm;
  	dlm = mle->dlm;
  
  	spin_lock(&dlm->spinlock);
  	spin_lock(&dlm->master_lock);
  	__dlm_put_mle(mle);
  	spin_unlock(&dlm->master_lock);
  	spin_unlock(&dlm->spinlock);
  }
  
  static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
  {
  	kref_get(&mle->mle_refs);
  }
  
  static void dlm_init_mle(struct dlm_master_list_entry *mle,
  			enum dlm_mle_type type,
  			struct dlm_ctxt *dlm,
  			struct dlm_lock_resource *res,
  			const char *name,
  			unsigned int namelen)
  {
  	assert_spin_locked(&dlm->spinlock);
  
  	mle->dlm = dlm;
  	mle->type = type;
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
270
  	INIT_HLIST_NODE(&mle->master_hash_node);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
271
272
273
274
275
276
277
278
279
  	INIT_LIST_HEAD(&mle->hb_events);
  	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
  	spin_lock_init(&mle->spinlock);
  	init_waitqueue_head(&mle->wq);
  	atomic_set(&mle->woken, 0);
  	kref_init(&mle->mle_refs);
  	memset(mle->response_map, 0, sizeof(mle->response_map));
  	mle->master = O2NM_MAX_NODES;
  	mle->new_master = O2NM_MAX_NODES;
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
280
  	mle->inuse = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
281

f77a9a78c   Sunil Mushran   ocfs2/dlm: Clean ...
282
283
284
  	BUG_ON(mle->type != DLM_MLE_BLOCK &&
  	       mle->type != DLM_MLE_MASTER &&
  	       mle->type != DLM_MLE_MIGRATION);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
285
286
  	if (mle->type == DLM_MLE_MASTER) {
  		BUG_ON(!res);
7141514b8   Sunil Mushran   ocfs2/dlm: Remove...
287
288
289
290
  		mle->mleres = res;
  		memcpy(mle->mname, res->lockname.name, res->lockname.len);
  		mle->mnamelen = res->lockname.len;
  		mle->mnamehash = res->lockname.hash;
f77a9a78c   Sunil Mushran   ocfs2/dlm: Clean ...
291
  	} else {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
292
  		BUG_ON(!name);
7141514b8   Sunil Mushran   ocfs2/dlm: Remove...
293
294
295
296
  		mle->mleres = NULL;
  		memcpy(mle->mname, name, namelen);
  		mle->mnamelen = namelen;
  		mle->mnamehash = dlm_lockid_hash(name, namelen);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
297
  	}
2041d8fdc   Sunil Mushran   ocfs2/dlm: Track ...
298
299
  	atomic_inc(&dlm->mle_tot_count[mle->type]);
  	atomic_inc(&dlm->mle_cur_count[mle->type]);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
300
301
302
303
304
305
306
307
308
  	/* copy off the node_map and register hb callbacks on our copy */
  	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
  	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
  	clear_bit(dlm->node_num, mle->vote_map);
  	clear_bit(dlm->node_num, mle->node_map);
  
  	/* attach the mle to the domain node up/down events */
  	__dlm_mle_attach_hb_events(dlm, mle);
  }
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
309
310
311
312
  void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
  {
  	assert_spin_locked(&dlm->spinlock);
  	assert_spin_locked(&dlm->master_lock);
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
313
314
  	if (!hlist_unhashed(&mle->master_hash_node))
  		hlist_del_init(&mle->master_hash_node);
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
315
316
317
318
  }
  
  void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
  {
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
319
  	struct hlist_head *bucket;
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
320

1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
321
  	assert_spin_locked(&dlm->master_lock);
7141514b8   Sunil Mushran   ocfs2/dlm: Remove...
322
  	bucket = dlm_master_hash(dlm, mle->mnamehash);
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
323
  	hlist_add_head(&mle->master_hash_node, bucket);
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
324
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
325
326
327
328
329
330
331
  
  /* returns 1 if found, 0 if not */
  static int dlm_find_mle(struct dlm_ctxt *dlm,
  			struct dlm_master_list_entry **mle,
  			char *name, unsigned int namelen)
  {
  	struct dlm_master_list_entry *tmpmle;
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
332
333
334
  	struct hlist_head *bucket;
  	struct hlist_node *list;
  	unsigned int hash;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
335
336
  
  	assert_spin_locked(&dlm->master_lock);
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
337
338
339
340
341
  	hash = dlm_lockid_hash(name, namelen);
  	bucket = dlm_master_hash(dlm, hash);
  	hlist_for_each(list, bucket) {
  		tmpmle = hlist_entry(list, struct dlm_master_list_entry,
  				     master_hash_node);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
342
343
344
345
346
347
348
349
350
351
352
353
  		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
  			continue;
  		dlm_get_mle(tmpmle);
  		*mle = tmpmle;
  		return 1;
  	}
  	return 0;
  }
  
  void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
  {
  	struct dlm_master_list_entry *mle;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
354
355
  
  	assert_spin_locked(&dlm->spinlock);
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
356

800deef3f   Christoph Hellwig   [PATCH] ocfs2: us...
357
  	list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
  		if (node_up)
  			dlm_mle_node_up(dlm, mle, NULL, idx);
  		else
  			dlm_mle_node_down(dlm, mle, NULL, idx);
  	}
  }
  
  static void dlm_mle_node_down(struct dlm_ctxt *dlm,
  			      struct dlm_master_list_entry *mle,
  			      struct o2nm_node *node, int idx)
  {
  	spin_lock(&mle->spinlock);
  
  	if (!test_bit(idx, mle->node_map))
  		mlog(0, "node %u already removed from nodemap!
  ", idx);
  	else
  		clear_bit(idx, mle->node_map);
  
  	spin_unlock(&mle->spinlock);
  }
  
  static void dlm_mle_node_up(struct dlm_ctxt *dlm,
  			    struct dlm_master_list_entry *mle,
  			    struct o2nm_node *node, int idx)
  {
  	spin_lock(&mle->spinlock);
  
  	if (test_bit(idx, mle->node_map))
  		mlog(0, "node %u already in node map!
  ", idx);
  	else
  		set_bit(idx, mle->node_map);
  
  	spin_unlock(&mle->spinlock);
  }
  
  
  int dlm_init_mle_cache(void)
  {
12eb0035d   Sunil Mushran   ocfs2/dlm: Rename...
398
  	dlm_mle_cache = kmem_cache_create("o2dlm_mle",
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
399
400
  					  sizeof(struct dlm_master_list_entry),
  					  0, SLAB_HWCACHE_ALIGN,
20c2df83d   Paul Mundt   mm: Remove slab d...
401
  					  NULL);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
  	if (dlm_mle_cache == NULL)
  		return -ENOMEM;
  	return 0;
  }
  
  void dlm_destroy_mle_cache(void)
  {
  	if (dlm_mle_cache)
  		kmem_cache_destroy(dlm_mle_cache);
  }
  
  static void dlm_mle_release(struct kref *kref)
  {
  	struct dlm_master_list_entry *mle;
  	struct dlm_ctxt *dlm;
  
  	mlog_entry_void();
  
  	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
  	dlm = mle->dlm;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
422
423
  	assert_spin_locked(&dlm->spinlock);
  	assert_spin_locked(&dlm->master_lock);
7141514b8   Sunil Mushran   ocfs2/dlm: Remove...
424
425
426
  	mlog(0, "Releasing mle for %.*s, type %d
  ", mle->mnamelen, mle->mname,
  	     mle->type);
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
427

6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
428
  	/* remove from list if not already */
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
429
  	__dlm_unlink_mle(dlm, mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
430
431
432
  
  	/* detach the mle from the domain node up/down events */
  	__dlm_mle_detach_hb_events(dlm, mle);
2041d8fdc   Sunil Mushran   ocfs2/dlm: Track ...
433
  	atomic_dec(&dlm->mle_cur_count[mle->type]);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
434
435
436
437
438
439
440
441
442
  	/* NOTE: kfree under spinlock here.
  	 * if this is bad, we can move this to a freelist. */
  	kmem_cache_free(dlm_mle_cache, mle);
  }
  
  
  /*
   * LOCK RESOURCE FUNCTIONS
   */
724bdca9b   Sunil Mushran   ocfs2/dlm: Create...
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
  int dlm_init_master_caches(void)
  {
  	dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
  					      sizeof(struct dlm_lock_resource),
  					      0, SLAB_HWCACHE_ALIGN, NULL);
  	if (!dlm_lockres_cache)
  		goto bail;
  
  	dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
  					       DLM_LOCKID_NAME_MAX, 0,
  					       SLAB_HWCACHE_ALIGN, NULL);
  	if (!dlm_lockname_cache)
  		goto bail;
  
  	return 0;
  bail:
  	dlm_destroy_master_caches();
  	return -ENOMEM;
  }
  
  void dlm_destroy_master_caches(void)
  {
  	if (dlm_lockname_cache)
  		kmem_cache_destroy(dlm_lockname_cache);
  
  	if (dlm_lockres_cache)
  		kmem_cache_destroy(dlm_lockres_cache);
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
471
472
473
  static void dlm_lockres_release(struct kref *kref)
  {
  	struct dlm_lock_resource *res;
b0d4f817b   Sunil Mushran   ocfs2/dlm: Fix ra...
474
  	struct dlm_ctxt *dlm;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
475
476
  
  	res = container_of(kref, struct dlm_lock_resource, refs);
b0d4f817b   Sunil Mushran   ocfs2/dlm: Fix ra...
477
  	dlm = res->dlm;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
478
479
480
481
482
483
484
485
  
  	/* This should not happen -- all lockres' have a name
  	 * associated with them at init time. */
  	BUG_ON(!res->lockname.name);
  
  	mlog(0, "destroying lockres %.*s
  ", res->lockname.len,
  	     res->lockname.name);
b0d4f817b   Sunil Mushran   ocfs2/dlm: Fix ra...
486
  	spin_lock(&dlm->track_lock);
29576f8bb   Sunil Mushran   ocfs2/dlm: Link a...
487
488
489
490
491
492
493
494
  	if (!list_empty(&res->tracking))
  		list_del_init(&res->tracking);
  	else {
  		mlog(ML_ERROR, "Resource %.*s not on the Tracking list
  ",
  		     res->lockname.len, res->lockname.name);
  		dlm_print_one_lock_resource(res);
  	}
b0d4f817b   Sunil Mushran   ocfs2/dlm: Fix ra...
495
  	spin_unlock(&dlm->track_lock);
6800791ab   Sunil Mushran   ocfs2/dlm: Improv...
496
  	atomic_dec(&dlm->res_cur_count);
a7f90d83e   Kurt Hackel   ocfs2: dump lockr...
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
  	if (!hlist_unhashed(&res->hash_node) ||
  	    !list_empty(&res->granted) ||
  	    !list_empty(&res->converting) ||
  	    !list_empty(&res->blocked) ||
  	    !list_empty(&res->dirty) ||
  	    !list_empty(&res->recovering) ||
  	    !list_empty(&res->purge)) {
  		mlog(ML_ERROR,
  		     "Going to BUG for resource %.*s."
  		     "  We're on a list! [%c%c%c%c%c%c%c]
  ",
  		     res->lockname.len, res->lockname.name,
  		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
  		     !list_empty(&res->granted) ? 'G' : ' ',
  		     !list_empty(&res->converting) ? 'C' : ' ',
  		     !list_empty(&res->blocked) ? 'B' : ' ',
  		     !list_empty(&res->dirty) ? 'D' : ' ',
  		     !list_empty(&res->recovering) ? 'R' : ' ',
  		     !list_empty(&res->purge) ? 'P' : ' ');
  
  		dlm_print_one_lock_resource(res);
  	}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
519
520
  	/* By the time we're ready to blow this guy away, we shouldn't
  	 * be on any lists. */
81f2094a6   Mark Fasheh   [PATCH] ocfs2: us...
521
  	BUG_ON(!hlist_unhashed(&res->hash_node));
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
522
523
524
525
526
527
  	BUG_ON(!list_empty(&res->granted));
  	BUG_ON(!list_empty(&res->converting));
  	BUG_ON(!list_empty(&res->blocked));
  	BUG_ON(!list_empty(&res->dirty));
  	BUG_ON(!list_empty(&res->recovering));
  	BUG_ON(!list_empty(&res->purge));
724bdca9b   Sunil Mushran   ocfs2/dlm: Create...
528
  	kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
529

724bdca9b   Sunil Mushran   ocfs2/dlm: Create...
530
  	kmem_cache_free(dlm_lockres_cache, res);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
531
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
  void dlm_lockres_put(struct dlm_lock_resource *res)
  {
  	kref_put(&res->refs, dlm_lockres_release);
  }
  
  static void dlm_init_lockres(struct dlm_ctxt *dlm,
  			     struct dlm_lock_resource *res,
  			     const char *name, unsigned int namelen)
  {
  	char *qname;
  
  	/* If we memset here, we lose our reference to the kmalloc'd
  	 * res->lockname.name, so be sure to init every field
  	 * correctly! */
  
  	qname = (char *) res->lockname.name;
  	memcpy(qname, name, namelen);
  
  	res->lockname.len = namelen;
a3d332915   Mark Fasheh   ocfs2: calculate ...
551
  	res->lockname.hash = dlm_lockid_hash(name, namelen);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
552
553
554
  
  	init_waitqueue_head(&res->wq);
  	spin_lock_init(&res->spinlock);
81f2094a6   Mark Fasheh   [PATCH] ocfs2: us...
555
  	INIT_HLIST_NODE(&res->hash_node);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
556
557
558
559
560
561
  	INIT_LIST_HEAD(&res->granted);
  	INIT_LIST_HEAD(&res->converting);
  	INIT_LIST_HEAD(&res->blocked);
  	INIT_LIST_HEAD(&res->dirty);
  	INIT_LIST_HEAD(&res->recovering);
  	INIT_LIST_HEAD(&res->purge);
29576f8bb   Sunil Mushran   ocfs2/dlm: Link a...
562
  	INIT_LIST_HEAD(&res->tracking);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
563
564
  	atomic_set(&res->asts_reserved, 0);
  	res->migration_pending = 0;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
565
  	res->inflight_locks = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
566

b0d4f817b   Sunil Mushran   ocfs2/dlm: Fix ra...
567
  	res->dlm = dlm;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
568
  	kref_init(&res->refs);
6800791ab   Sunil Mushran   ocfs2/dlm: Improv...
569
570
  	atomic_inc(&dlm->res_tot_count);
  	atomic_inc(&dlm->res_cur_count);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
571
572
573
574
575
576
577
578
  	/* just for consistency */
  	spin_lock(&res->spinlock);
  	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
  	spin_unlock(&res->spinlock);
  
  	res->state = DLM_LOCK_RES_IN_PROGRESS;
  
  	res->last_used = 0;
18c6ac383   Sunil Mushran   [PATCH] ocfs2/dlm...
579
  	spin_lock(&dlm->spinlock);
29576f8bb   Sunil Mushran   ocfs2/dlm: Link a...
580
  	list_add_tail(&res->tracking, &dlm->tracking_list);
18c6ac383   Sunil Mushran   [PATCH] ocfs2/dlm...
581
  	spin_unlock(&dlm->spinlock);
29576f8bb   Sunil Mushran   ocfs2/dlm: Link a...
582

6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
583
  	memset(res->lvb, 0, DLM_LVB_LEN);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
584
  	memset(res->refmap, 0, sizeof(res->refmap));
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
585
586
587
588
589
590
  }
  
  struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
  				   const char *name,
  				   unsigned int namelen)
  {
724bdca9b   Sunil Mushran   ocfs2/dlm: Create...
591
  	struct dlm_lock_resource *res = NULL;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
592

3914ed0ce   Julia Lawall   fs/ocfs2/dlm: Dro...
593
  	res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
594
  	if (!res)
724bdca9b   Sunil Mushran   ocfs2/dlm: Create...
595
  		goto error;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
596

3914ed0ce   Julia Lawall   fs/ocfs2/dlm: Dro...
597
  	res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
724bdca9b   Sunil Mushran   ocfs2/dlm: Create...
598
599
  	if (!res->lockname.name)
  		goto error;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
600
601
602
  
  	dlm_init_lockres(dlm, res, name, namelen);
  	return res;
724bdca9b   Sunil Mushran   ocfs2/dlm: Create...
603
604
605
606
607
608
609
610
  
  error:
  	if (res && res->lockname.name)
  		kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
  
  	if (res)
  		kmem_cache_free(dlm_lockres_cache, res);
  	return NULL;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
611
  }
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
  void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
  				   struct dlm_lock_resource *res,
  				   int new_lockres,
  				   const char *file,
  				   int line)
  {
  	if (!new_lockres)
  		assert_spin_locked(&res->spinlock);
  
  	if (!test_bit(dlm->node_num, res->refmap)) {
  		BUG_ON(res->inflight_locks != 0);
  		dlm_lockres_set_refmap_bit(dlm->node_num, res);
  	}
  	res->inflight_locks++;
  	mlog(0, "%s:%.*s: inflight++: now %u
  ",
  	     dlm->name, res->lockname.len, res->lockname.name,
  	     res->inflight_locks);
  }
  
  void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
  				   struct dlm_lock_resource *res,
  				   const char *file,
  				   int line)
  {
  	assert_spin_locked(&res->spinlock);
  
  	BUG_ON(res->inflight_locks == 0);
  	res->inflight_locks--;
  	mlog(0, "%s:%.*s: inflight--: now %u
  ",
  	     dlm->name, res->lockname.len, res->lockname.name,
  	     res->inflight_locks);
  	if (res->inflight_locks == 0)
  		dlm_lockres_clear_refmap_bit(dlm->node_num, res);
  	wake_up(&res->wq);
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
  /*
   * lookup a lock resource by name.
   * may already exist in the hashtable.
   * lockid is null terminated
   *
   * if not, allocate enough for the lockres and for
   * the temporary structure used in doing the mastering.
   *
   * also, do a lookup in the dlm->master_list to see
   * if another node has begun mastering the same lock.
   * if so, there should be a block entry in there
   * for this name, and we should *not* attempt to master
   * the lock here.   need to wait around for that node
   * to assert_master (or die).
   *
   */
  struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
  					  const char *lockid,
3384f3df5   Mark Fasheh   ocfs2: Allow bina...
667
  					  int namelen,
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
668
669
670
671
672
673
674
675
  					  int flags)
  {
  	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
  	struct dlm_master_list_entry *mle = NULL;
  	struct dlm_master_list_entry *alloc_mle = NULL;
  	int blocked = 0;
  	int ret, nodenum;
  	struct dlm_node_iter iter;
3384f3df5   Mark Fasheh   ocfs2: Allow bina...
676
  	unsigned int hash;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
677
  	int tries = 0;
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
678
  	int bit, wait_on_recovery = 0;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
679
  	int drop_inflight_if_nonlocal = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
680
681
  
  	BUG_ON(!lockid);
a3d332915   Mark Fasheh   ocfs2: calculate ...
682
  	hash = dlm_lockid_hash(lockid, namelen);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
683
684
685
686
687
688
  
  	mlog(0, "get lockres %s (len %d)
  ", lockid, namelen);
  
  lookup:
  	spin_lock(&dlm->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
689
  	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
690
  	if (tmpres) {
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
691
  		int dropping_ref = 0;
7b791d685   Sunil Mushran   ocfs2/dlm: Fix ra...
692
  		spin_unlock(&dlm->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
693
  		spin_lock(&tmpres->spinlock);
7b791d685   Sunil Mushran   ocfs2/dlm: Fix ra...
694
695
696
697
698
  		/* We wait for the other thread that is mastering the resource */
  		if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
  			__dlm_wait_on_lockres(tmpres);
  			BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
  		}
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
699
700
701
702
703
704
  		if (tmpres->owner == dlm->node_num) {
  			BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
  			dlm_lockres_grab_inflight_ref(dlm, tmpres);
  		} else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
  			dropping_ref = 1;
  		spin_unlock(&tmpres->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
705
706
707
708
709
710
711
712
713
714
715
  
  		/* wait until done messaging the master, drop our ref to allow
  		 * the lockres to be purged, start over. */
  		if (dropping_ref) {
  			spin_lock(&tmpres->spinlock);
  			__dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
  			spin_unlock(&tmpres->spinlock);
  			dlm_lockres_put(tmpres);
  			tmpres = NULL;
  			goto lookup;
  		}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
716
717
718
719
720
721
722
723
724
725
726
727
728
  		mlog(0, "found in hash!
  ");
  		if (res)
  			dlm_lockres_put(res);
  		res = tmpres;
  		goto leave;
  	}
  
  	if (!res) {
  		spin_unlock(&dlm->spinlock);
  		mlog(0, "allocating a new resource
  ");
  		/* nothing found and we need to allocate one. */
3914ed0ce   Julia Lawall   fs/ocfs2/dlm: Dro...
729
  		alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
  		if (!alloc_mle)
  			goto leave;
  		res = dlm_new_lockres(dlm, lockid, namelen);
  		if (!res)
  			goto leave;
  		goto lookup;
  	}
  
  	mlog(0, "no lockres found, allocated our own: %p
  ", res);
  
  	if (flags & LKM_LOCAL) {
  		/* caller knows it's safe to assume it's not mastered elsewhere
  		 * DONE!  return right away */
  		spin_lock(&res->spinlock);
  		dlm_change_lockres_owner(dlm, res, dlm->node_num);
  		__dlm_insert_lockres(dlm, res);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
747
  		dlm_lockres_grab_inflight_ref(dlm, res);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
748
749
750
751
752
753
754
755
756
757
758
759
  		spin_unlock(&res->spinlock);
  		spin_unlock(&dlm->spinlock);
  		/* lockres still marked IN_PROGRESS */
  		goto wake_waiters;
  	}
  
  	/* check master list to see if another node has started mastering it */
  	spin_lock(&dlm->master_lock);
  
  	/* if we found a block, wait for lock to be mastered by another node */
  	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
  	if (blocked) {
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
760
  		int mig;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
761
762
763
764
  		if (mle->type == DLM_MLE_MASTER) {
  			mlog(ML_ERROR, "master entry for nonexistent lock!
  ");
  			BUG();
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
  		}
  		mig = (mle->type == DLM_MLE_MIGRATION);
  		/* if there is a migration in progress, let the migration
  		 * finish before continuing.  we can wait for the absence
  		 * of the MIGRATION mle: either the migrate finished or
  		 * one of the nodes died and the mle was cleaned up.
  		 * if there is a BLOCK here, but it already has a master
  		 * set, we are too late.  the master does not have a ref
  		 * for us in the refmap.  detach the mle and drop it.
  		 * either way, go back to the top and start over. */
  		if (mig || mle->master != O2NM_MAX_NODES) {
  			BUG_ON(mig && mle->master == dlm->node_num);
  			/* we arrived too late.  the master does not
  			 * have a ref for us. retry. */
  			mlog(0, "%s:%.*s: late on %s
  ",
  			     dlm->name, namelen, lockid,
  			     mig ?  "MIGRATION" : "BLOCK");
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
783
  			spin_unlock(&dlm->master_lock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
784
785
786
  			spin_unlock(&dlm->spinlock);
  
  			/* master is known, detach */
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
787
788
  			if (!mig)
  				dlm_mle_detach_hb_events(dlm, mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
789
790
  			dlm_put_mle(mle);
  			mle = NULL;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
791
792
793
794
795
  			/* this is lame, but we cant wait on either
  			 * the mle or lockres waitqueue here */
  			if (mig)
  				msleep(100);
  			goto lookup;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
796
797
798
799
800
801
802
803
  		}
  	} else {
  		/* go ahead and try to master lock on this node */
  		mle = alloc_mle;
  		/* make sure this does not get freed below */
  		alloc_mle = NULL;
  		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
  		set_bit(dlm->node_num, mle->maybe_map);
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
804
  		__dlm_insert_mle(dlm, mle);
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
805
806
  
  		/* still holding the dlm spinlock, check the recovery map
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
807
  		 * to see if there are any nodes that still need to be
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
808
809
810
811
  		 * considered.  these will not appear in the mle nodemap
  		 * but they might own this lockres.  wait on them. */
  		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
  		if (bit < O2NM_MAX_NODES) {
2759236f8   Joe Perches   [PATCH] fs/ocfs2:...
812
  			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
813
814
815
816
817
  			     "recover before lock mastery can begin
  ",
  			     dlm->name, namelen, (char *)lockid, bit);
  			wait_on_recovery = 1;
  		}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
818
819
820
821
822
823
824
825
826
  	}
  
  	/* at this point there is either a DLM_MLE_BLOCK or a
  	 * DLM_MLE_MASTER on the master list, so it's safe to add the
  	 * lockres to the hashtable.  anyone who finds the lock will
  	 * still have to wait on the IN_PROGRESS. */
  
  	/* finally add the lockres to its hash bucket */
  	__dlm_insert_lockres(dlm, res);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
827
828
829
830
831
832
  	/* since this lockres is new it doesnt not require the spinlock */
  	dlm_lockres_grab_inflight_ref_new(dlm, res);
  
  	/* if this node does not become the master make sure to drop
  	 * this inflight reference below */
  	drop_inflight_if_nonlocal = 1;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
833
834
835
836
  	/* get an extra ref on the mle in case this is a BLOCK
  	 * if so, the creator of the BLOCK may try to put the last
  	 * ref at this time in the assert master handler, so we
  	 * need an extra one to keep from a bad ptr deref. */
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
837
  	dlm_get_mle_inuse(mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
838
839
  	spin_unlock(&dlm->master_lock);
  	spin_unlock(&dlm->spinlock);
e7e69eb38   Kurt Hackel   ocfs2: teach dlm_...
840
  redo_request:
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
  	while (wait_on_recovery) {
  		/* any cluster changes that occurred after dropping the
  		 * dlm spinlock would be detectable be a change on the mle,
  		 * so we only need to clear out the recovery map once. */
  		if (dlm_is_recovery_lock(lockid, namelen)) {
  			mlog(ML_NOTICE, "%s: recovery map is not empty, but "
  			     "must master $RECOVERY lock now
  ", dlm->name);
  			if (!dlm_pre_master_reco_lockres(dlm, res))
  				wait_on_recovery = 0;
  			else {
  				mlog(0, "%s: waiting 500ms for heartbeat state "
  				    "change
  ", dlm->name);
  				msleep(500);
  			}
  			continue;
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
858
  		}
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
859
860
  
  		dlm_kick_recovery_thread(dlm);
aa087b849   Kurt Hackel   ocfs2: increase b...
861
  		msleep(1000);
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
862
863
864
865
866
  		dlm_wait_for_recovery(dlm);
  
  		spin_lock(&dlm->spinlock);
  		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
  		if (bit < O2NM_MAX_NODES) {
2759236f8   Joe Perches   [PATCH] fs/ocfs2:...
867
  			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
868
869
870
871
872
873
874
  			     "recover before lock mastery can begin
  ",
  			     dlm->name, namelen, (char *)lockid, bit);
  			wait_on_recovery = 1;
  		} else
  			wait_on_recovery = 0;
  		spin_unlock(&dlm->spinlock);
b7084ab53   Kurt Hackel   ocfs2: wait for r...
875
876
877
  
  		if (wait_on_recovery)
  			dlm_wait_for_node_recovery(dlm, bit, 10000);
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
878
  	}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
879
880
881
  	/* must wait for lock to be mastered elsewhere */
  	if (blocked)
  		goto wait;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
882
883
884
  	ret = -EINVAL;
  	dlm_node_iter_init(mle->vote_map, &iter);
  	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
885
  		ret = dlm_do_master_request(res, mle, nodenum);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
886
887
888
889
  		if (ret < 0)
  			mlog_errno(ret);
  		if (mle->master != O2NM_MAX_NODES) {
  			/* found a master ! */
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
890
891
892
893
894
895
896
897
898
899
  			if (mle->master <= nodenum)
  				break;
  			/* if our master request has not reached the master
  			 * yet, keep going until it does.  this is how the
  			 * master will know that asserts are needed back to
  			 * the lower nodes. */
  			mlog(0, "%s:%.*s: requests only up to %u but master "
  			     "is %u, keep going
  ", dlm->name, namelen,
  			     lockid, nodenum, mle->master);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
900
901
902
903
904
905
906
  		}
  	}
  
  wait:
  	/* keep going until the response map includes all nodes */
  	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
  	if (ret < 0) {
e7e69eb38   Kurt Hackel   ocfs2: teach dlm_...
907
  		wait_on_recovery = 1;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
908
909
910
911
912
913
914
  		mlog(0, "%s:%.*s: node map changed, redo the "
  		     "master request now, blocked=%d
  ",
  		     dlm->name, res->lockname.len,
  		     res->lockname.name, blocked);
  		if (++tries > 20) {
  			mlog(ML_ERROR, "%s:%.*s: spinning on "
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
915
916
917
  			     "dlm_wait_for_lock_mastery, blocked=%d
  ",
  			     dlm->name, res->lockname.len,
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
918
919
  			     res->lockname.name, blocked);
  			dlm_print_one_lock_resource(res);
8a9343fa2   Mark Fasheh   ocfs2: dlm_print_...
920
  			dlm_print_one_mle(mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
921
922
923
924
925
926
927
928
929
930
931
932
933
934
  			tries = 0;
  		}
  		goto redo_request;
  	}
  
  	mlog(0, "lockres mastered by %u
  ", res->owner);
  	/* make sure we never continue without this */
  	BUG_ON(res->owner == O2NM_MAX_NODES);
  
  	/* master is known, detach if not already detached */
  	dlm_mle_detach_hb_events(dlm, mle);
  	dlm_put_mle(mle);
  	/* put the extra ref */
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
935
  	dlm_put_mle_inuse(mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
936
937
938
  
  wake_waiters:
  	spin_lock(&res->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
939
940
  	if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
  		dlm_lockres_drop_inflight_ref(dlm, res);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
  	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
  	spin_unlock(&res->spinlock);
  	wake_up(&res->wq);
  
  leave:
  	/* need to free the unused mle */
  	if (alloc_mle)
  		kmem_cache_free(dlm_mle_cache, alloc_mle);
  
  	return res;
  }
  
  
  #define DLM_MASTERY_TIMEOUT_MS   5000
  
  static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
  				     struct dlm_lock_resource *res,
  				     struct dlm_master_list_entry *mle,
  				     int *blocked)
  {
  	u8 m;
  	int ret, bit;
  	int map_changed, voting_done;
  	int assert, sleep;
  
  recheck:
  	ret = 0;
  	assert = 0;
  
  	/* check if another node has already become the owner */
  	spin_lock(&res->spinlock);
  	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
973
974
975
  		mlog(0, "%s:%.*s: owner is suddenly %u
  ", dlm->name,
  		     res->lockname.len, res->lockname.name, res->owner);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
976
  		spin_unlock(&res->spinlock);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
977
978
  		/* this will cause the master to re-assert across
  		 * the whole cluster, freeing up mles */
588e00902   Kurt Hackel   ocfs2: do not sen...
979
  		if (res->owner != dlm->node_num) {
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
980
  			ret = dlm_do_master_request(res, mle, res->owner);
588e00902   Kurt Hackel   ocfs2: do not sen...
981
982
983
984
985
986
987
  			if (ret < 0) {
  				/* give recovery a chance to run */
  				mlog(ML_ERROR, "link to %u went down?: %d
  ", res->owner, ret);
  				msleep(500);
  				goto recheck;
  			}
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
988
989
  		}
  		ret = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
  		goto leave;
  	}
  	spin_unlock(&res->spinlock);
  
  	spin_lock(&mle->spinlock);
  	m = mle->master;
  	map_changed = (memcmp(mle->vote_map, mle->node_map,
  			      sizeof(mle->vote_map)) != 0);
  	voting_done = (memcmp(mle->vote_map, mle->response_map,
  			     sizeof(mle->vote_map)) == 0);
  
  	/* restart if we hit any errors */
  	if (map_changed) {
  		int b;
  		mlog(0, "%s: %.*s: node map changed, restarting
  ",
  		     dlm->name, res->lockname.len, res->lockname.name);
  		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
  		b = (mle->type == DLM_MLE_BLOCK);
  		if ((*blocked && !b) || (!*blocked && b)) {
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
1010
1011
  			mlog(0, "%s:%.*s: status change: old=%d new=%d
  ",
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
  			     dlm->name, res->lockname.len, res->lockname.name,
  			     *blocked, b);
  			*blocked = b;
  		}
  		spin_unlock(&mle->spinlock);
  		if (ret < 0) {
  			mlog_errno(ret);
  			goto leave;
  		}
  		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
  		     "rechecking now
  ", dlm->name, res->lockname.len,
  		     res->lockname.name);
  		goto recheck;
aa8523542   Kurt Hackel   ocfs2: mle ref co...
1026
1027
1028
1029
1030
1031
1032
  	} else {
  		if (!voting_done) {
  			mlog(0, "map not changed and voting not done "
  			     "for %s:%.*s
  ", dlm->name, res->lockname.len,
  			     res->lockname.name);
  		}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
  	}
  
  	if (m != O2NM_MAX_NODES) {
  		/* another node has done an assert!
  		 * all done! */
  		sleep = 0;
  	} else {
  		sleep = 1;
  		/* have all nodes responded? */
  		if (voting_done && !*blocked) {
  			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
  			if (dlm->node_num <= bit) {
  				/* my node number is lowest.
  			 	 * now tell other nodes that I am
  				 * mastering this. */
  				mle->master = dlm->node_num;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1049
1050
  				/* ref was grabbed in get_lock_resource
  				 * will be dropped in dlmlock_master */
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
  				assert = 1;
  				sleep = 0;
  			}
  			/* if voting is done, but we have not received
  			 * an assert master yet, we must sleep */
  		}
  	}
  
  	spin_unlock(&mle->spinlock);
  
  	/* sleep if we haven't finished voting yet */
  	if (sleep) {
  		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
  
  		/*
  		if (atomic_read(&mle->mle_refs.refcount) < 2)
  			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s
  ", mle,
  			atomic_read(&mle->mle_refs.refcount),
  			res->lockname.len, res->lockname.name);
  		*/
  		atomic_set(&mle->woken, 0);
  		(void)wait_event_timeout(mle->wq,
  					 (atomic_read(&mle->woken) == 1),
  					 timeo);
  		if (res->owner == O2NM_MAX_NODES) {
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1077
1078
1079
  			mlog(0, "%s:%.*s: waiting again
  ", dlm->name,
  			     res->lockname.len, res->lockname.name);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
  			goto recheck;
  		}
  		mlog(0, "done waiting, master is %u
  ", res->owner);
  		ret = 0;
  		goto leave;
  	}
  
  	ret = 0;   /* done */
  	if (assert) {
  		m = dlm->node_num;
  		mlog(0, "about to master %.*s here, this=%u
  ",
  		     res->lockname.len, res->lockname.name, m);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1094
  		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
  		if (ret) {
  			/* This is a failure in the network path,
  			 * not in the response to the assert_master
  			 * (any nonzero response is a BUG on this node).
  			 * Most likely a socket just got disconnected
  			 * due to node death. */
  			mlog_errno(ret);
  		}
  		/* no longer need to restart lock mastery.
  		 * all living nodes have been contacted. */
  		ret = 0;
  	}
  
  	/* set the lockres owner */
  	spin_lock(&res->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1110
1111
  	/* mastery reference obtained either during
  	 * assert_master_handler or in get_lock_resource */
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
  	dlm_change_lockres_owner(dlm, res, m);
  	spin_unlock(&res->spinlock);
  
  leave:
  	return ret;
  }
  
  struct dlm_bitmap_diff_iter
  {
  	int curnode;
  	unsigned long *orig_bm;
  	unsigned long *cur_bm;
  	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
  };
  
  enum dlm_node_state_change
  {
  	NODE_DOWN = -1,
  	NODE_NO_CHANGE = 0,
  	NODE_UP
  };
  
  static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
  				      unsigned long *orig_bm,
  				      unsigned long *cur_bm)
  {
  	unsigned long p1, p2;
  	int i;
  
  	iter->curnode = -1;
  	iter->orig_bm = orig_bm;
  	iter->cur_bm = cur_bm;
  
  	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
         		p1 = *(iter->orig_bm + i);
  	       	p2 = *(iter->cur_bm + i);
  		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
  	}
  }
  
  static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
  				     enum dlm_node_state_change *state)
  {
  	int bit;
  
  	if (iter->curnode >= O2NM_MAX_NODES)
  		return -ENOENT;
  
  	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
  			    iter->curnode+1);
  	if (bit >= O2NM_MAX_NODES) {
  		iter->curnode = O2NM_MAX_NODES;
  		return -ENOENT;
  	}
  
  	/* if it was there in the original then this node died */
  	if (test_bit(bit, iter->orig_bm))
  		*state = NODE_DOWN;
  	else
  		*state = NODE_UP;
  
  	iter->curnode = bit;
  	return bit;
  }
  
  
  static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
  				    struct dlm_lock_resource *res,
  				    struct dlm_master_list_entry *mle,
  				    int blocked)
  {
  	struct dlm_bitmap_diff_iter bdi;
  	enum dlm_node_state_change sc;
  	int node;
  	int ret = 0;
  
  	mlog(0, "something happened such that the "
  	     "master process may need to be restarted!
  ");
  
  	assert_spin_locked(&mle->spinlock);
  
  	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
  	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
  	while (node >= 0) {
  		if (sc == NODE_UP) {
e2faea4ce   Kurt Hackel   [PATCH] ocfs2/dlm...
1198
1199
1200
1201
1202
  			/* a node came up.  clear any old vote from
  			 * the response map and set it in the vote map
  			 * then restart the mastery. */
  			mlog(ML_NOTICE, "node %d up while restarting
  ", node);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1203
1204
1205
1206
1207
1208
1209
1210
1211
  
  			/* redo the master request, but only for the new node */
  			mlog(0, "sending request to new node
  ");
  			clear_bit(node, mle->response_map);
  			set_bit(node, mle->vote_map);
  		} else {
  			mlog(ML_ERROR, "node down! %d
  ", node);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1212
1213
1214
1215
1216
1217
  			if (blocked) {
  				int lowest = find_next_bit(mle->maybe_map,
  						       O2NM_MAX_NODES, 0);
  
  				/* act like it was never there */
  				clear_bit(node, mle->maybe_map);
e7e69eb38   Kurt Hackel   ocfs2: teach dlm_...
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
  			       	if (node == lowest) {
  					mlog(0, "expected master %u died"
  					    " while this node was blocked "
  					    "waiting on it!
  ", node);
  					lowest = find_next_bit(mle->maybe_map,
  						       	O2NM_MAX_NODES,
  						       	lowest+1);
  					if (lowest < O2NM_MAX_NODES) {
  						mlog(0, "%s:%.*s:still "
  						     "blocked. waiting on %u "
  						     "now
  ", dlm->name,
  						     res->lockname.len,
  						     res->lockname.name,
  						     lowest);
  					} else {
  						/* mle is an MLE_BLOCK, but
  						 * there is now nothing left to
  						 * block on.  we need to return
  						 * all the way back out and try
  						 * again with an MLE_MASTER.
  						 * dlm_do_local_recovery_cleanup
  						 * has already run, so the mle
  						 * refcount is ok */
  						mlog(0, "%s:%.*s: no "
  						     "longer blocking. try to "
  						     "master this here
  ",
  						     dlm->name,
  						     res->lockname.len,
  						     res->lockname.name);
  						mle->type = DLM_MLE_MASTER;
7141514b8   Sunil Mushran   ocfs2/dlm: Remove...
1251
  						mle->mleres = res;
e7e69eb38   Kurt Hackel   ocfs2: teach dlm_...
1252
  					}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1253
  				}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1254
  			}
e7e69eb38   Kurt Hackel   ocfs2: teach dlm_...
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
  			/* now blank out everything, as if we had never
  			 * contacted anyone */
  			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
  			memset(mle->response_map, 0, sizeof(mle->response_map));
  			/* reset the vote_map to the current node_map */
  			memcpy(mle->vote_map, mle->node_map,
  			       sizeof(mle->node_map));
  			/* put myself into the maybe map */
  			if (mle->type != DLM_MLE_BLOCK)
  				set_bit(dlm->node_num, mle->maybe_map);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1265
1266
  		}
  		ret = -EAGAIN;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
  		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
  	}
  	return ret;
  }
  
  
  /*
   * DLM_MASTER_REQUEST_MSG
   *
   * returns: 0 on success,
   *          -errno on a network error
   *
   * on error, the caller should assume the target node is "dead"
   *
   */
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1282
1283
  static int dlm_do_master_request(struct dlm_lock_resource *res,
  				 struct dlm_master_list_entry *mle, int to)
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1284
1285
1286
1287
1288
1289
1290
1291
1292
  {
  	struct dlm_ctxt *dlm = mle->dlm;
  	struct dlm_master_request request;
  	int ret, response=0, resend;
  
  	memset(&request, 0, sizeof(request));
  	request.node_idx = dlm->node_num;
  
  	BUG_ON(mle->type == DLM_MLE_MIGRATION);
7141514b8   Sunil Mushran   ocfs2/dlm: Remove...
1293
1294
  	request.namelen = (u8)mle->mnamelen;
  	memcpy(request.name, mle->mname, request.namelen);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
  
  again:
  	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
  				 sizeof(request), to, &response);
  	if (ret < 0)  {
  		if (ret == -ESRCH) {
  			/* should never happen */
  			mlog(ML_ERROR, "TCP stack not ready!
  ");
  			BUG();
  		} else if (ret == -EINVAL) {
  			mlog(ML_ERROR, "bad args passed to o2net!
  ");
  			BUG();
  		} else if (ret == -ENOMEM) {
  			mlog(ML_ERROR, "out of memory while trying to send "
  			     "network message!  retrying
  ");
  			/* this is totally crude */
  			msleep(50);
  			goto again;
  		} else if (!dlm_is_host_down(ret)) {
  			/* not a network error. bad. */
  			mlog_errno(ret);
  			mlog(ML_ERROR, "unhandled error!");
  			BUG();
  		}
  		/* all other errors should be network errors,
  		 * and likely indicate node death */
  		mlog(ML_ERROR, "link to %d went down!
  ", to);
  		goto out;
  	}
  
  	ret = 0;
  	resend = 0;
  	spin_lock(&mle->spinlock);
  	switch (response) {
  		case DLM_MASTER_RESP_YES:
  			set_bit(to, mle->response_map);
  			mlog(0, "node %u is the master, response=YES
  ", to);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1337
1338
1339
1340
  			mlog(0, "%s:%.*s: master node %u now knows I have a "
  			     "reference
  ", dlm->name, res->lockname.len,
  			     res->lockname.name, to);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
  			mle->master = to;
  			break;
  		case DLM_MASTER_RESP_NO:
  			mlog(0, "node %u not master, response=NO
  ", to);
  			set_bit(to, mle->response_map);
  			break;
  		case DLM_MASTER_RESP_MAYBE:
  			mlog(0, "node %u not master, response=MAYBE
  ", to);
  			set_bit(to, mle->response_map);
  			set_bit(to, mle->maybe_map);
  			break;
  		case DLM_MASTER_RESP_ERROR:
  			mlog(0, "node %u hit an error, resending
  ", to);
  			resend = 1;
  			response = 0;
  			break;
  		default:
  			mlog(ML_ERROR, "bad response! %u
  ", response);
  			BUG();
  	}
  	spin_unlock(&mle->spinlock);
  	if (resend) {
  		/* this is also totally crude */
  		msleep(50);
  		goto again;
  	}
  
  out:
  	return ret;
  }
  
  /*
   * locks that can be taken here:
   * dlm->spinlock
   * res->spinlock
   * mle->spinlock
   * dlm->master_list
   *
   * if possible, TRIM THIS DOWN!!!
   */
d74c9803a   Kurt Hackel   ocfs2: Added post...
1385
1386
  int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
  			       void **ret_data)
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1387
1388
1389
  {
  	u8 response = DLM_MASTER_RESP_MAYBE;
  	struct dlm_ctxt *dlm = data;
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1390
  	struct dlm_lock_resource *res = NULL;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1391
1392
1393
  	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
  	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
  	char *name;
a3d332915   Mark Fasheh   ocfs2: calculate ...
1394
  	unsigned int namelen, hash;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1395
1396
  	int found, ret;
  	int set_maybe;
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1397
  	int dispatch_assert = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
  
  	if (!dlm_grab(dlm))
  		return DLM_MASTER_RESP_NO;
  
  	if (!dlm_domain_fully_joined(dlm)) {
  		response = DLM_MASTER_RESP_NO;
  		goto send_response;
  	}
  
  	name = request->name;
  	namelen = request->namelen;
a3d332915   Mark Fasheh   ocfs2: calculate ...
1409
  	hash = dlm_lockid_hash(name, namelen);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1410
1411
1412
1413
1414
1415
1416
1417
  
  	if (namelen > DLM_LOCKID_NAME_MAX) {
  		response = DLM_IVBUFLEN;
  		goto send_response;
  	}
  
  way_up_top:
  	spin_lock(&dlm->spinlock);
a3d332915   Mark Fasheh   ocfs2: calculate ...
1418
  	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1419
1420
1421
1422
1423
  	if (res) {
  		spin_unlock(&dlm->spinlock);
  
  		/* take care of the easy cases up front */
  		spin_lock(&res->spinlock);
1cd04dbe3   Kurt Hackel   ocfs2_dlm: Flush ...
1424
1425
  		if (res->state & (DLM_LOCK_RES_RECOVERING|
  				  DLM_LOCK_RES_MIGRATING)) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1426
1427
  			spin_unlock(&res->spinlock);
  			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1cd04dbe3   Kurt Hackel   ocfs2_dlm: Flush ...
1428
1429
  			     "being recovered/migrated
  ");
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1430
1431
1432
1433
1434
1435
1436
  			response = DLM_MASTER_RESP_ERROR;
  			if (mle)
  				kmem_cache_free(dlm_mle_cache, mle);
  			goto send_response;
  		}
  
  		if (res->owner == dlm->node_num) {
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1437
1438
1439
1440
  			mlog(0, "%s:%.*s: setting bit %u in refmap
  ",
  			     dlm->name, namelen, name, request->node_idx);
  			dlm_lockres_set_refmap_bit(request->node_idx, res);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1441
  			spin_unlock(&res->spinlock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
  			response = DLM_MASTER_RESP_YES;
  			if (mle)
  				kmem_cache_free(dlm_mle_cache, mle);
  
  			/* this node is the owner.
  			 * there is some extra work that needs to
  			 * happen now.  the requesting node has
  			 * caused all nodes up to this one to
  			 * create mles.  this node now needs to
  			 * go back and clean those up. */
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1452
  			dispatch_assert = 1;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
  			goto send_response;
  		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
  			spin_unlock(&res->spinlock);
  			// mlog(0, "node %u is the master
  ", res->owner);
  			response = DLM_MASTER_RESP_NO;
  			if (mle)
  				kmem_cache_free(dlm_mle_cache, mle);
  			goto send_response;
  		}
  
  		/* ok, there is no owner.  either this node is
  		 * being blocked, or it is actively trying to
  		 * master this lock. */
  		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
  			mlog(ML_ERROR, "lock with no owner should be "
  			     "in-progress!
  ");
  			BUG();
  		}
  
  		// mlog(0, "lockres is in progress...
  ");
  		spin_lock(&dlm->master_lock);
  		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
  		if (!found) {
  			mlog(ML_ERROR, "no mle found for this lock!
  ");
  			BUG();
  		}
  		set_maybe = 1;
  		spin_lock(&tmpmle->spinlock);
  		if (tmpmle->type == DLM_MLE_BLOCK) {
  			// mlog(0, "this node is waiting for "
  			// "lockres to be mastered
  ");
  			response = DLM_MASTER_RESP_NO;
  		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
  			mlog(0, "node %u is master, but trying to migrate to "
  			     "node %u.
  ", tmpmle->master, tmpmle->new_master);
  			if (tmpmle->master == dlm->node_num) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
  				mlog(ML_ERROR, "no owner on lockres, but this "
  				     "node is trying to migrate it to %u?!
  ",
  				     tmpmle->new_master);
  				BUG();
  			} else {
  				/* the real master can respond on its own */
  				response = DLM_MASTER_RESP_NO;
  			}
  		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
  			set_maybe = 0;
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1506
  			if (tmpmle->master == dlm->node_num) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1507
  				response = DLM_MASTER_RESP_YES;
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1508
1509
1510
1511
  				/* this node will be the owner.
  				 * go back and clean the mles on any
  				 * other nodes */
  				dispatch_assert = 1;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1512
1513
1514
1515
1516
  				dlm_lockres_set_refmap_bit(request->node_idx, res);
  				mlog(0, "%s:%.*s: setting bit %u in refmap
  ",
  				     dlm->name, namelen, name,
  				     request->node_idx);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1517
  			} else
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
  				response = DLM_MASTER_RESP_NO;
  		} else {
  			// mlog(0, "this node is attempting to "
  			// "master lockres
  ");
  			response = DLM_MASTER_RESP_MAYBE;
  		}
  		if (set_maybe)
  			set_bit(request->node_idx, tmpmle->maybe_map);
  		spin_unlock(&tmpmle->spinlock);
  
  		spin_unlock(&dlm->master_lock);
  		spin_unlock(&res->spinlock);
  
  		/* keep the mle attached to heartbeat events */
  		dlm_put_mle(tmpmle);
  		if (mle)
  			kmem_cache_free(dlm_mle_cache, mle);
  		goto send_response;
  	}
  
  	/*
  	 * lockres doesn't exist on this node
  	 * if there is an MLE_BLOCK, return NO
  	 * if there is an MLE_MASTER, return MAYBE
  	 * otherwise, add an MLE_BLOCK, return NO
  	 */
  	spin_lock(&dlm->master_lock);
  	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
  	if (!found) {
  		/* this lockid has never been seen on this node yet */
  		// mlog(0, "no mle found
  ");
  		if (!mle) {
  			spin_unlock(&dlm->master_lock);
  			spin_unlock(&dlm->spinlock);
3914ed0ce   Julia Lawall   fs/ocfs2/dlm: Dro...
1554
  			mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1555
  			if (!mle) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1556
  				response = DLM_MASTER_RESP_ERROR;
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1557
  				mlog_errno(-ENOMEM);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1558
1559
  				goto send_response;
  			}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1560
1561
1562
1563
1564
1565
  			goto way_up_top;
  		}
  
  		// mlog(0, "this is second time thru, already allocated, "
  		// "add the block.
  ");
41b8c8a10   Kurt Hackel   ocfs2: properly i...
1566
  		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1567
  		set_bit(request->node_idx, mle->maybe_map);
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
1568
  		__dlm_insert_mle(dlm, mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1569
1570
1571
1572
1573
1574
  		response = DLM_MASTER_RESP_NO;
  	} else {
  		// mlog(0, "mle was found
  ");
  		set_maybe = 1;
  		spin_lock(&tmpmle->spinlock);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1575
1576
1577
1578
1579
  		if (tmpmle->master == dlm->node_num) {
  			mlog(ML_ERROR, "no lockres, but an mle with this node as master!
  ");
  			BUG();
  		}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1580
1581
1582
1583
1584
1585
  		if (tmpmle->type == DLM_MLE_BLOCK)
  			response = DLM_MASTER_RESP_NO;
  		else if (tmpmle->type == DLM_MLE_MIGRATION) {
  			mlog(0, "migration mle was found (%u->%u)
  ",
  			     tmpmle->master, tmpmle->new_master);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1586
1587
  			/* real master can respond on its own */
  			response = DLM_MASTER_RESP_NO;
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1588
1589
  		} else
  			response = DLM_MASTER_RESP_MAYBE;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
  		if (set_maybe)
  			set_bit(request->node_idx, tmpmle->maybe_map);
  		spin_unlock(&tmpmle->spinlock);
  	}
  	spin_unlock(&dlm->master_lock);
  	spin_unlock(&dlm->spinlock);
  
  	if (found) {
  		/* keep the mle attached to heartbeat events */
  		dlm_put_mle(tmpmle);
  	}
  send_response:
b31cfc023   Sunil Mushran   ocfs2/dlm: Add mi...
1602
1603
1604
1605
1606
1607
  	/*
  	 * __dlm_lookup_lockres() grabbed a reference to this lockres.
  	 * The reference is released by dlm_assert_master_worker() under
  	 * the call to dlm_dispatch_assert_master().  If
  	 * dlm_assert_master_worker() isn't called, we drop it here.
  	 */
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
  	if (dispatch_assert) {
  		if (response != DLM_MASTER_RESP_YES)
  			mlog(ML_ERROR, "invalid response %d
  ", response);
  		if (!res) {
  			mlog(ML_ERROR, "bad lockres while trying to assert!
  ");
  			BUG();
  		}
  		mlog(0, "%u is the owner of %.*s, cleaning everyone else
  ",
  			     dlm->node_num, res->lockname.len, res->lockname.name);
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
1620
  		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1621
1622
1623
1624
1625
  						 DLM_ASSERT_MASTER_MLE_CLEANUP);
  		if (ret < 0) {
  			mlog(ML_ERROR, "failed to dispatch assert master work
  ");
  			response = DLM_MASTER_RESP_ERROR;
b31cfc023   Sunil Mushran   ocfs2/dlm: Add mi...
1626
  			dlm_lockres_put(res);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1627
  		}
b31cfc023   Sunil Mushran   ocfs2/dlm: Add mi...
1628
1629
1630
  	} else {
  		if (res)
  			dlm_lockres_put(res);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1631
  	}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
  	dlm_put(dlm);
  	return response;
  }
  
  /*
   * DLM_ASSERT_MASTER_MSG
   */
  
  
  /*
   * NOTE: this can be used for debugging
   * can periodically run all locks owned by this node
   * and re-assert across the cluster...
   */
05488bbeb   Adrian Bunk   [2.6 patch] ocfs2...
1646
1647
1648
  static int dlm_do_assert_master(struct dlm_ctxt *dlm,
  				struct dlm_lock_resource *res,
  				void *nodemap, u32 flags)
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1649
1650
1651
1652
1653
  {
  	struct dlm_assert_master assert;
  	int to, tmpret;
  	struct dlm_node_iter iter;
  	int ret = 0;
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1654
  	int reassert;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1655
1656
  	const char *lockname = res->lockname.name;
  	unsigned int namelen = res->lockname.len;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1657
1658
  
  	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
1659
1660
1661
1662
  
  	spin_lock(&res->spinlock);
  	res->state |= DLM_LOCK_RES_SETREF_INPROG;
  	spin_unlock(&res->spinlock);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1663
1664
  again:
  	reassert = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1665
1666
1667
1668
1669
  
  	/* note that if this nodemap is empty, it returns 0 */
  	dlm_node_iter_init(nodemap, &iter);
  	while ((to = dlm_node_iter_next(&iter)) >= 0) {
  		int r = 0;
a9ee4c8a6   Kurt Hackel   ocfs2: better err...
1670
  		struct dlm_master_list_entry *mle = NULL;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
  		mlog(0, "sending assert master to %d (%.*s)
  ", to,
  		     namelen, lockname);
  		memset(&assert, 0, sizeof(assert));
  		assert.node_idx = dlm->node_num;
  		assert.namelen = namelen;
  		memcpy(assert.name, lockname, namelen);
  		assert.flags = cpu_to_be32(flags);
  
  		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
  					    &assert, sizeof(assert), to, &r);
  		if (tmpret < 0) {
a5196ec5e   Wengang Wang   ocfs2: print node...
1683
1684
1685
1686
  			mlog(ML_ERROR, "Error %d when sending message %u (key "
  			     "0x%x) to node %u
  ", tmpret,
  			     DLM_ASSERT_MASTER_MSG, dlm->key, to);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1687
  			if (!dlm_is_host_down(tmpret)) {
3b3b84a89   Kurt Hackel   ocfs2: tune down ...
1688
1689
  				mlog(ML_ERROR, "unhandled error=%d!
  ", tmpret);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1690
1691
1692
  				BUG();
  			}
  			/* a node died.  finish out the rest of the nodes. */
3b3b84a89   Kurt Hackel   ocfs2: tune down ...
1693
1694
  			mlog(0, "link to %d went down!
  ", to);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1695
1696
  			/* any nonzero status return will do */
  			ret = tmpret;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1697
  			r = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1698
1699
1700
1701
1702
  		} else if (r < 0) {
  			/* ok, something horribly messed.  kill thyself. */
  			mlog(ML_ERROR,"during assert master of %.*s to %u, "
  			     "got %d.
  ", namelen, lockname, to, r);
a9ee4c8a6   Kurt Hackel   ocfs2: better err...
1703
1704
1705
1706
1707
1708
1709
1710
1711
  			spin_lock(&dlm->spinlock);
  			spin_lock(&dlm->master_lock);
  			if (dlm_find_mle(dlm, &mle, (char *)lockname,
  					 namelen)) {
  				dlm_print_one_mle(mle);
  				__dlm_put_mle(mle);
  			}
  			spin_unlock(&dlm->master_lock);
  			spin_unlock(&dlm->spinlock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1712
  			BUG();
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
  		}
  
  		if (r & DLM_ASSERT_RESPONSE_REASSERT &&
  		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
  				mlog(ML_ERROR, "%.*s: very strange, "
  				     "master MLE but no lockres on %u
  ",
  				     namelen, lockname, to);
  		}
  
  		if (r & DLM_ASSERT_RESPONSE_REASSERT) {
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1724
  			mlog(0, "%.*s: node %u create mles on other "
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
1725
1726
  			     "nodes and requests a re-assert
  ",
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1727
1728
  			     namelen, lockname, to);
  			reassert = 1;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1729
  		}
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1730
1731
1732
1733
1734
1735
1736
1737
1738
  		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
  			mlog(0, "%.*s: node %u has a reference to this "
  			     "lockres, set the bit in the refmap
  ",
  			     namelen, lockname, to);
  			spin_lock(&res->spinlock);
  			dlm_lockres_set_refmap_bit(to, res);
  			spin_unlock(&res->spinlock);
  		}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1739
  	}
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1740
1741
  	if (reassert)
  		goto again;
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
1742
1743
1744
1745
  	spin_lock(&res->spinlock);
  	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
  	spin_unlock(&res->spinlock);
  	wake_up(&res->wq);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
  	return ret;
  }
  
  /*
   * locks that can be taken here:
   * dlm->spinlock
   * res->spinlock
   * mle->spinlock
   * dlm->master_list
   *
   * if possible, TRIM THIS DOWN!!!
   */
d74c9803a   Kurt Hackel   ocfs2: Added post...
1758
1759
  int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
  			      void **ret_data)
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1760
1761
1762
1763
1764
1765
  {
  	struct dlm_ctxt *dlm = data;
  	struct dlm_master_list_entry *mle = NULL;
  	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
  	struct dlm_lock_resource *res = NULL;
  	char *name;
a3d332915   Mark Fasheh   ocfs2: calculate ...
1766
  	unsigned int namelen, hash;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1767
  	u32 flags;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1768
  	int master_request = 0, have_lockres_ref = 0;
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1769
  	int ret = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1770
1771
1772
1773
1774
1775
  
  	if (!dlm_grab(dlm))
  		return 0;
  
  	name = assert->name;
  	namelen = assert->namelen;
a3d332915   Mark Fasheh   ocfs2: calculate ...
1776
  	hash = dlm_lockid_hash(name, namelen);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
  	flags = be32_to_cpu(assert->flags);
  
  	if (namelen > DLM_LOCKID_NAME_MAX) {
  		mlog(ML_ERROR, "Invalid name length!");
  		goto done;
  	}
  
  	spin_lock(&dlm->spinlock);
  
  	if (flags)
  		mlog(0, "assert_master with flags: %u
  ", flags);
  
  	/* find the MLE */
  	spin_lock(&dlm->master_lock);
  	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
  		/* not an error, could be master just re-asserting */
  		mlog(0, "just got an assert_master from %u, but no "
  		     "MLE for it! (%.*s)
  ", assert->node_idx,
  		     namelen, name);
  	} else {
  		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
  		if (bit >= O2NM_MAX_NODES) {
  			/* not necessarily an error, though less likely.
  			 * could be master just re-asserting. */
aa8523542   Kurt Hackel   ocfs2: mle ref co...
1803
  			mlog(0, "no bits set in the maybe_map, but %u "
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
  			     "is asserting! (%.*s)
  ", assert->node_idx,
  			     namelen, name);
  		} else if (bit != assert->node_idx) {
  			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
  				mlog(0, "master %u was found, %u should "
  				     "back off
  ", assert->node_idx, bit);
  			} else {
  				/* with the fix for bug 569, a higher node
  				 * number winning the mastery will respond
  				 * YES to mastery requests, but this node
  				 * had no way of knowing.  let it pass. */
aa8523542   Kurt Hackel   ocfs2: mle ref co...
1817
  				mlog(0, "%u is the lowest node, "
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1818
1819
1820
1821
1822
1823
1824
  				     "%u is asserting. (%.*s)  %u must "
  				     "have begun after %u won.
  ", bit,
  				     assert->node_idx, namelen, name, bit,
  				     assert->node_idx);
  			}
  		}
2d1a868c5   Kurt Hackel   ocfs2: take mle r...
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
  		if (mle->type == DLM_MLE_MIGRATION) {
  			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
  				mlog(0, "%s:%.*s: got cleanup assert"
  				     " from %u for migration
  ",
  				     dlm->name, namelen, name,
  				     assert->node_idx);
  			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
  				mlog(0, "%s:%.*s: got unrelated assert"
  				     " from %u for migration, ignoring
  ",
  				     dlm->name, namelen, name,
  				     assert->node_idx);
  				__dlm_put_mle(mle);
  				spin_unlock(&dlm->master_lock);
  				spin_unlock(&dlm->spinlock);
  				goto done;
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
1842
  			}
2d1a868c5   Kurt Hackel   ocfs2: take mle r...
1843
  		}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1844
1845
1846
1847
1848
  	}
  	spin_unlock(&dlm->master_lock);
  
  	/* ok everything checks out with the MLE
  	 * now check to see if there is a lockres */
a3d332915   Mark Fasheh   ocfs2: calculate ...
1849
  	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1850
1851
1852
1853
1854
1855
1856
1857
1858
  	if (res) {
  		spin_lock(&res->spinlock);
  		if (res->state & DLM_LOCK_RES_RECOVERING)  {
  			mlog(ML_ERROR, "%u asserting but %.*s is "
  			     "RECOVERING!
  ", assert->node_idx, namelen, name);
  			goto kill;
  		}
  		if (!mle) {
dc2ed195d   Kurt Hackel   ocfs2: allow for ...
1859
1860
  			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
  			    res->owner != assert->node_idx) {
53ecd25e1   Sunil Mushran   ocfs2/dlm: Make d...
1861
1862
1863
1864
1865
1866
1867
  				mlog(ML_ERROR, "DIE! Mastery assert from %u, "
  				     "but current owner is %u! (%.*s)
  ",
  				     assert->node_idx, res->owner, namelen,
  				     name);
  				__dlm_print_one_lock_resource(res);
  				BUG();
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
  			}
  		} else if (mle->type != DLM_MLE_MIGRATION) {
  			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
  				/* owner is just re-asserting */
  				if (res->owner == assert->node_idx) {
  					mlog(0, "owner %u re-asserting on "
  					     "lock %.*s
  ", assert->node_idx,
  					     namelen, name);
  					goto ok;
  				}
  				mlog(ML_ERROR, "got assert_master from "
  				     "node %u, but %u is the owner! "
  				     "(%.*s)
  ", assert->node_idx,
  				     res->owner, namelen, name);
  				goto kill;
  			}
  			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
  				mlog(ML_ERROR, "got assert from %u, but lock "
  				     "with no owner should be "
  				     "in-progress! (%.*s)
  ",
  				     assert->node_idx,
  				     namelen, name);
  				goto kill;
  			}
  		} else /* mle->type == DLM_MLE_MIGRATION */ {
  			/* should only be getting an assert from new master */
  			if (assert->node_idx != mle->new_master) {
  				mlog(ML_ERROR, "got assert from %u, but "
  				     "new master is %u, and old master "
  				     "was %u (%.*s)
  ",
  				     assert->node_idx, mle->new_master,
  				     mle->master, namelen, name);
  				goto kill;
  			}
  
  		}
  ok:
  		spin_unlock(&res->spinlock);
  	}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1911
1912
1913
1914
1915
  
  	// mlog(0, "woo!  got an assert_master from node %u!
  ",
  	// 	     assert->node_idx);
  	if (mle) {
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1916
1917
  		int extra_ref = 0;
  		int nn = -1;
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1918
  		int rr, err = 0;
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
1919

6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1920
  		spin_lock(&mle->spinlock);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1921
1922
1923
1924
1925
1926
  		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
  			extra_ref = 1;
  		else {
  			/* MASTER mle: if any bits set in the response map
  			 * then the calling node needs to re-assert to clear
  			 * up nodes that this node contacted */
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
1927
  			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
1928
1929
1930
1931
1932
  						    nn+1)) < O2NM_MAX_NODES) {
  				if (nn != dlm->node_num && nn != assert->node_idx)
  					master_request = 1;
  			}
  		}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1933
1934
1935
1936
  		mle->master = assert->node_idx;
  		atomic_set(&mle->woken, 1);
  		wake_up(&mle->wq);
  		spin_unlock(&mle->spinlock);
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1937
  		if (res) {
a6fa36402   Kurt Hackel   ocfs2_dlm: wake u...
1938
  			int wake = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1939
  			spin_lock(&res->spinlock);
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1940
1941
1942
1943
1944
1945
1946
  			if (mle->type == DLM_MLE_MIGRATION) {
  				mlog(0, "finishing off migration of lockres %.*s, "
  			     		"from %u to %u
  ",
  			       		res->lockname.len, res->lockname.name,
  			       		dlm->node_num, mle->new_master);
  				res->state &= ~DLM_LOCK_RES_MIGRATING;
a6fa36402   Kurt Hackel   ocfs2_dlm: wake u...
1947
  				wake = 1;
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1948
1949
1950
1951
1952
  				dlm_change_lockres_owner(dlm, res, mle->new_master);
  				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
  			} else {
  				dlm_change_lockres_owner(dlm, res, mle->master);
  			}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1953
  			spin_unlock(&res->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
1954
  			have_lockres_ref = 1;
a6fa36402   Kurt Hackel   ocfs2_dlm: wake u...
1955
1956
  			if (wake)
  				wake_up(&res->wq);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1957
  		}
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1958
1959
1960
1961
  
  		/* master is known, detach if not already detached.
  		 * ensures that only one assert_master call will happen
  		 * on this mle. */
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
  		spin_lock(&dlm->master_lock);
  
  		rr = atomic_read(&mle->mle_refs.refcount);
  		if (mle->inuse > 0) {
  			if (extra_ref && rr < 3)
  				err = 1;
  			else if (!extra_ref && rr < 2)
  				err = 1;
  		} else {
  			if (extra_ref && rr < 2)
  				err = 1;
  			else if (!extra_ref && rr < 1)
  				err = 1;
  		}
  		if (err) {
  			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
  			     "that will mess up this node, refs=%d, extra=%d, "
  			     "inuse=%d
  ", dlm->name, namelen, name,
  			     assert->node_idx, rr, extra_ref, mle->inuse);
  			dlm_print_one_mle(mle);
  		}
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
1984
  		__dlm_unlink_mle(dlm, mle);
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1985
1986
  		__dlm_mle_detach_hb_events(dlm, mle);
  		__dlm_put_mle(mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
1987
1988
1989
1990
1991
  		if (extra_ref) {
  			/* the assert master message now balances the extra
  		 	 * ref given by the master / migration request message.
  		 	 * if this is the last put, it will be removed
  		 	 * from the list. */
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1992
1993
1994
  			__dlm_put_mle(mle);
  		}
  		spin_unlock(&dlm->master_lock);
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
1995
1996
1997
1998
1999
2000
  	} else if (res) {
  		if (res->owner != assert->node_idx) {
  			mlog(0, "assert_master from %u, but current "
  			     "owner is %u (%.*s), no mle
  ", assert->node_idx,
  			     res->owner, namelen, name);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2001
2002
  		}
  	}
14741472a   Srinivas Eeda   ocfs2: Fix a race...
2003
  	spin_unlock(&dlm->spinlock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2004
2005
  
  done:
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
2006
  	ret = 0;
3b8118cff   Kurt Hackel   ocfs2_dlm: Callin...
2007
2008
2009
2010
2011
2012
  	if (res) {
  		spin_lock(&res->spinlock);
  		res->state |= DLM_LOCK_RES_SETREF_INPROG;
  		spin_unlock(&res->spinlock);
  		*ret_data = (void *)res;
  	}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2013
  	dlm_put(dlm);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
2014
2015
2016
  	if (master_request) {
  		mlog(0, "need to tell master to reassert
  ");
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
  		/* positive. negative would shoot down the node. */
  		ret |= DLM_ASSERT_RESPONSE_REASSERT;
  		if (!have_lockres_ref) {
  			mlog(ML_ERROR, "strange, got assert from %u, MASTER "
  			     "mle present here for %s:%.*s, but no lockres!
  ",
  			     assert->node_idx, dlm->name, namelen, name);
  		}
  	}
  	if (have_lockres_ref) {
  		/* let the master know we have a reference to the lockres */
  		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
  		mlog(0, "%s:%.*s: got assert from %u, need a ref
  ",
  		     dlm->name, namelen, name, assert->node_idx);
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
2032
2033
  	}
  	return ret;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2034
2035
2036
  
  kill:
  	/* kill the caller! */
a9ee4c8a6   Kurt Hackel   ocfs2: better err...
2037
2038
2039
2040
  	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
  	     "and killing the other node now!  This node is OK and can continue.
  ");
  	__dlm_print_one_lock_resource(res);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2041
2042
  	spin_unlock(&res->spinlock);
  	spin_unlock(&dlm->spinlock);
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
2043
  	*ret_data = (void *)res;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2044
2045
2046
  	dlm_put(dlm);
  	return -EINVAL;
  }
3b8118cff   Kurt Hackel   ocfs2_dlm: Callin...
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
  void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
  {
  	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
  
  	if (ret_data) {
  		spin_lock(&res->spinlock);
  		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
  		spin_unlock(&res->spinlock);
  		wake_up(&res->wq);
  		dlm_lockres_put(res);
  	}
  	return;
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2060
2061
2062
2063
2064
  int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
  			       struct dlm_lock_resource *res,
  			       int ignore_higher, u8 request_from, u32 flags)
  {
  	struct dlm_work_item *item;
cd8612808   Robert P. J. Day   [PATCH] Fix numer...
2065
  	item = kzalloc(sizeof(*item), GFP_NOFS);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
  	if (!item)
  		return -ENOMEM;
  
  
  	/* queue up work for dlm_assert_master_worker */
  	dlm_grab(dlm);  /* get an extra ref for the work item */
  	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
  	item->u.am.lockres = res; /* already have a ref */
  	/* can optionally ignore node numbers higher than this node */
  	item->u.am.ignore_higher = ignore_higher;
  	item->u.am.request_from = request_from;
  	item->u.am.flags = flags;
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
2078
2079
2080
  	if (ignore_higher)
  		mlog(0, "IGNORE HIGHER: %.*s
  ", res->lockname.len,
9c6510a5b   Kurt Hackel   [PATCH] ocfs2: fi...
2081
  		     res->lockname.name);
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
2082

6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2083
2084
2085
  	spin_lock(&dlm->work_lock);
  	list_add_tail(&item->list, &dlm->work_list);
  	spin_unlock(&dlm->work_lock);
3156d2670   Kurt Hackel   ocfs2: move dlm w...
2086
  	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
  	return 0;
  }
  
  static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
  {
  	struct dlm_ctxt *dlm = data;
  	int ret = 0;
  	struct dlm_lock_resource *res;
  	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
  	int ignore_higher;
  	int bit;
  	u8 request_from;
  	u32 flags;
  
  	dlm = item->dlm;
  	res = item->u.am.lockres;
  	ignore_higher = item->u.am.ignore_higher;
  	request_from = item->u.am.request_from;
  	flags = item->u.am.flags;
  
  	spin_lock(&dlm->spinlock);
  	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
  	spin_unlock(&dlm->spinlock);
  
  	clear_bit(dlm->node_num, nodemap);
  	if (ignore_higher) {
  		/* if is this just to clear up mles for nodes below
  		 * this node, do not send the message to the original
  		 * caller or any node number higher than this */
  		clear_bit(request_from, nodemap);
  		bit = dlm->node_num;
  		while (1) {
  			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
  					    bit+1);
  		       	if (bit >= O2NM_MAX_NODES)
  				break;
  			clear_bit(bit, nodemap);
  		}
  	}
36407488b   Kurt Hackel   ocfs2: pending ma...
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
  	/*
  	 * If we're migrating this lock to someone else, we are no
  	 * longer allowed to assert out own mastery.  OTOH, we need to
  	 * prevent migration from starting while we're still asserting
  	 * our dominance.  The reserved ast delays migration.
  	 */
  	spin_lock(&res->spinlock);
  	if (res->state & DLM_LOCK_RES_MIGRATING) {
  		mlog(0, "Someone asked us to assert mastery, but we're "
  		     "in the middle of migration.  Skipping assert, "
  		     "the new master will handle that.
  ");
  		spin_unlock(&res->spinlock);
  		goto put;
  	} else
  		__dlm_lockres_reserve_ast(res);
  	spin_unlock(&res->spinlock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2143
2144
2145
2146
2147
  	/* this call now finishes out the nodemap
  	 * even if one or more nodes die */
  	mlog(0, "worker about to master %.*s here, this=%u
  ",
  		     res->lockname.len, res->lockname.name, dlm->node_num);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2148
  	ret = dlm_do_assert_master(dlm, res, nodemap, flags);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2149
2150
  	if (ret < 0) {
  		/* no need to restart, we are done */
3b3b84a89   Kurt Hackel   ocfs2: tune down ...
2151
2152
  		if (!dlm_is_host_down(ret))
  			mlog_errno(ret);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2153
  	}
36407488b   Kurt Hackel   ocfs2: pending ma...
2154
2155
2156
2157
  	/* Ok, we've asserted ourselves.  Let's let migration start. */
  	dlm_lockres_release_ast(dlm, res);
  
  put:
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2158
2159
2160
2161
2162
  	dlm_lockres_put(res);
  
  	mlog(0, "finished with dlm_assert_master_worker
  ");
  }
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
2163
2164
2165
2166
2167
2168
2169
  /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
   * We cannot wait for node recovery to complete to begin mastering this
   * lockres because this lockres is used to kick off recovery! ;-)
   * So, do a pre-check on all living nodes to see if any of those nodes
   * think that $RECOVERY is currently mastered by a dead node.  If so,
   * we wait a short time to allow that node to get notified by its own
   * heartbeat stack, then check again.  All $RECOVERY lock resources
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
2170
   * mastered by dead nodes are purged when the hearbeat callback is
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
   * fired, so we can know for sure that it is safe to continue once
   * the node returns a live node or no node.  */
  static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
  				       struct dlm_lock_resource *res)
  {
  	struct dlm_node_iter iter;
  	int nodenum;
  	int ret = 0;
  	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
  
  	spin_lock(&dlm->spinlock);
  	dlm_node_iter_init(dlm->domain_map, &iter);
  	spin_unlock(&dlm->spinlock);
  
  	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
  		/* do not send to self */
  		if (nodenum == dlm->node_num)
  			continue;
  		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
  		if (ret < 0) {
  			mlog_errno(ret);
  			if (!dlm_is_host_down(ret))
  				BUG();
  			/* host is down, so answer for that node would be
  			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
f42a100b2   Kurt Hackel   ocfs2: have dlm_p...
2196
  			ret = 0;
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
  		}
  
  		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
  			/* check to see if this master is in the recovery map */
  			spin_lock(&dlm->spinlock);
  			if (test_bit(master, dlm->recovery_map)) {
  				mlog(ML_NOTICE, "%s: node %u has not seen "
  				     "node %u go down yet, and thinks the "
  				     "dead node is mastering the recovery "
  				     "lock.  must wait.
  ", dlm->name,
  				     nodenum, master);
  				ret = -EAGAIN;
  			}
  			spin_unlock(&dlm->spinlock);
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
2212
2213
  			mlog(0, "%s: reco lock master is %u
  ", dlm->name,
c03872f5f   Kurt Hackel   [PATCH] ocfs2: dl...
2214
2215
2216
2217
2218
2219
  			     master);
  			break;
  		}
  	}
  	return ret;
  }
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
  /*
   * DLM_DEREF_LOCKRES_MSG
   */
  
  int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
  {
  	struct dlm_deref_lockres deref;
  	int ret = 0, r;
  	const char *lockname;
  	unsigned int namelen;
  
  	lockname = res->lockname.name;
  	namelen = res->lockname.len;
  	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
  
  	mlog(0, "%s:%.*s: sending deref to %d
  ",
  	     dlm->name, namelen, lockname, res->owner);
  	memset(&deref, 0, sizeof(deref));
  	deref.node_idx = dlm->node_num;
  	deref.namelen = namelen;
  	memcpy(deref.name, lockname, namelen);
  
  	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
  				 &deref, sizeof(deref), res->owner, &r);
  	if (ret < 0)
a5196ec5e   Wengang Wang   ocfs2: print node...
2246
2247
2248
2249
  		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
  		     "node %u
  ", ret, DLM_DEREF_LOCKRES_MSG, dlm->key,
  		     res->owner);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
  	else if (r < 0) {
  		/* BAD.  other node says I did not have a ref. */
  		mlog(ML_ERROR,"while dropping ref on %s:%.*s "
  		    "(master=%u) got %d.
  ", dlm->name, namelen,
  		    lockname, res->owner, r);
  		dlm_print_one_lock_resource(res);
  		BUG();
  	}
  	return ret;
  }
d74c9803a   Kurt Hackel   ocfs2: Added post...
2261
2262
  int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
  			      void **ret_data)
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2263
2264
2265
2266
2267
2268
2269
2270
2271
  {
  	struct dlm_ctxt *dlm = data;
  	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
  	struct dlm_lock_resource *res = NULL;
  	char *name;
  	unsigned int namelen;
  	int ret = -EINVAL;
  	u8 node;
  	unsigned int hash;
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
2272
2273
2274
  	struct dlm_work_item *item;
  	int cleared = 0;
  	int dispatch = 0;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
  
  	if (!dlm_grab(dlm))
  		return 0;
  
  	name = deref->name;
  	namelen = deref->namelen;
  	node = deref->node_idx;
  
  	if (namelen > DLM_LOCKID_NAME_MAX) {
  		mlog(ML_ERROR, "Invalid name length!");
  		goto done;
  	}
  	if (deref->node_idx >= O2NM_MAX_NODES) {
  		mlog(ML_ERROR, "Invalid node number: %u
  ", node);
  		goto done;
  	}
  
  	hash = dlm_lockid_hash(name, namelen);
  
  	spin_lock(&dlm->spinlock);
  	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
  	if (!res) {
  		spin_unlock(&dlm->spinlock);
  		mlog(ML_ERROR, "%s:%.*s: bad lockres name
  ",
  		     dlm->name, namelen, name);
  		goto done;
  	}
  	spin_unlock(&dlm->spinlock);
  
  	spin_lock(&res->spinlock);
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
2307
2308
2309
2310
2311
2312
2313
2314
  	if (res->state & DLM_LOCK_RES_SETREF_INPROG)
  		dispatch = 1;
  	else {
  		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
  		if (test_bit(node, res->refmap)) {
  			dlm_lockres_clear_refmap_bit(node, res);
  			cleared = 1;
  		}
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2315
2316
  	}
  	spin_unlock(&res->spinlock);
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
2317
2318
2319
2320
2321
2322
2323
2324
  	if (!dispatch) {
  		if (cleared)
  			dlm_lockres_calc_usage(dlm, res);
  		else {
  			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
  		     	"but it is already dropped!
  ", dlm->name,
  		     	res->lockname.len, res->lockname.name, node);
2af37ce82   Tao Ma   ocfs2: Use dlm_pr...
2325
  			dlm_print_one_lock_resource(res);
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
  		}
  		ret = 0;
  		goto done;
  	}
  
  	item = kzalloc(sizeof(*item), GFP_NOFS);
  	if (!item) {
  		ret = -ENOMEM;
  		mlog_errno(ret);
  		goto done;
  	}
  
  	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
  	item->u.dl.deref_res = res;
  	item->u.dl.deref_node = node;
  
  	spin_lock(&dlm->work_lock);
  	list_add_tail(&item->list, &dlm->work_list);
  	spin_unlock(&dlm->work_lock);
  
  	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
  	return 0;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2348
2349
2350
2351
  done:
  	if (res)
  		dlm_lockres_put(res);
  	dlm_put(dlm);
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
2352

ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2353
2354
  	return ret;
  }
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
  static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
  {
  	struct dlm_ctxt *dlm;
  	struct dlm_lock_resource *res;
  	u8 node;
  	u8 cleared = 0;
  
  	dlm = item->dlm;
  	res = item->u.dl.deref_res;
  	node = item->u.dl.deref_node;
  
  	spin_lock(&res->spinlock);
  	BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
  	if (test_bit(node, res->refmap)) {
  		__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
  		dlm_lockres_clear_refmap_bit(node, res);
  		cleared = 1;
  	}
  	spin_unlock(&res->spinlock);
  
  	if (cleared) {
  		mlog(0, "%s:%.*s node %u ref dropped in dispatch
  ",
  		     dlm->name, res->lockname.len, res->lockname.name, node);
  		dlm_lockres_calc_usage(dlm, res);
  	} else {
  		mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
  		     "but it is already dropped!
  ", dlm->name,
  		     res->lockname.len, res->lockname.name, node);
2af37ce82   Tao Ma   ocfs2: Use dlm_pr...
2385
  		dlm_print_one_lock_resource(res);
f3f854648   Sunil Mushran   ocfs2_dlm: Ensure...
2386
2387
2388
2389
  	}
  
  	dlm_lockres_put(res);
  }
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
  /* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
   * if not. If 0, numlocks is set to the number of locks in the lockres.
   */
  static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
  				      struct dlm_lock_resource *res,
  				      int *numlocks)
  {
  	int ret;
  	int i;
  	int count = 0;
800deef3f   Christoph Hellwig   [PATCH] ocfs2: us...
2400
  	struct list_head *queue;
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
  	struct dlm_lock *lock;
  
  	assert_spin_locked(&res->spinlock);
  
  	ret = -EINVAL;
  	if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
  		mlog(0, "cannot migrate lockres with unknown owner!
  ");
  		goto leave;
  	}
  
  	if (res->owner != dlm->node_num) {
  		mlog(0, "cannot migrate lockres this node doesn't own!
  ");
  		goto leave;
  	}
  
  	ret = 0;
  	queue = &res->granted;
  	for (i = 0; i < 3; i++) {
800deef3f   Christoph Hellwig   [PATCH] ocfs2: us...
2421
  		list_for_each_entry(lock, queue, list) {
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
  			++count;
  			if (lock->ml.node == dlm->node_num) {
  				mlog(0, "found a lock owned by this node still "
  				     "on the %s queue!  will not migrate this "
  				     "lockres
  ", (i == 0 ? "granted" :
  						   (i == 1 ? "converting" :
  						    "blocked")));
  				ret = -ENOTEMPTY;
  				goto leave;
  			}
  		}
  		queue++;
  	}
  
  	*numlocks = count;
  	mlog(0, "migrateable lockres having %d locks
  ", *numlocks);
  
  leave:
  	return ret;
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2444
2445
2446
2447
  
  /*
   * DLM_MIGRATE_LOCKRES
   */
faf0ec9f1   Adrian Bunk   [PATCH] fs/ocfs2/...
2448
2449
2450
  static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
  			       struct dlm_lock_resource *res,
  			       u8 target)
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2451
2452
2453
2454
  {
  	struct dlm_master_list_entry *mle = NULL;
  	struct dlm_master_list_entry *oldmle = NULL;
   	struct dlm_migratable_lockres *mres = NULL;
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2455
  	int ret = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2456
2457
2458
  	const char *name;
  	unsigned int namelen;
  	int mle_added = 0;
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2459
2460
  	int numlocks;
  	int wake = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
  
  	if (!dlm_grab(dlm))
  		return -EINVAL;
  
  	name = res->lockname.name;
  	namelen = res->lockname.len;
  
  	mlog(0, "migrating %.*s to %u
  ", namelen, name, target);
  
  	/*
  	 * ensure this lockres is a proper candidate for migration
  	 */
  	spin_lock(&res->spinlock);
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2475
2476
  	ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
  	if (ret < 0) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2477
2478
2479
  		spin_unlock(&res->spinlock);
  		goto leave;
  	}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2480
2481
2482
  	spin_unlock(&res->spinlock);
  
  	/* no work to do */
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2483
  	if (numlocks == 0) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2484
2485
  		mlog(0, "no locks were found on this lockres! done!
  ");
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2486
2487
2488
2489
2490
2491
2492
2493
2494
  		goto leave;
  	}
  
  	/*
  	 * preallocate up front
  	 * if this fails, abort
  	 */
  
  	ret = -ENOMEM;
ad8100e0d   Kurt Hackel   ocfs2: use GFP_NO...
2495
  	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2496
2497
2498
2499
  	if (!mres) {
  		mlog_errno(ret);
  		goto leave;
  	}
3914ed0ce   Julia Lawall   fs/ocfs2/dlm: Dro...
2500
  	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
  	if (!mle) {
  		mlog_errno(ret);
  		goto leave;
  	}
  	ret = 0;
  
  	/*
  	 * find a node to migrate the lockres to
  	 */
  
  	mlog(0, "picking a migration node
  ");
  	spin_lock(&dlm->spinlock);
  	/* pick a new node */
  	if (!test_bit(target, dlm->domain_map) ||
  	    target >= O2NM_MAX_NODES) {
  		target = dlm_pick_migration_target(dlm, res);
  	}
  	mlog(0, "node %u chosen for migration
  ", target);
  
  	if (target >= O2NM_MAX_NODES ||
  	    !test_bit(target, dlm->domain_map)) {
  		/* target chosen is not alive */
  		ret = -EINVAL;
  	}
  
  	if (ret) {
  		spin_unlock(&dlm->spinlock);
  		goto fail;
  	}
  
  	mlog(0, "continuing with target = %u
  ", target);
  
  	/*
  	 * clear any existing master requests and
  	 * add the migration mle to the list
  	 */
  	spin_lock(&dlm->master_lock);
  	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
  				    namelen, target, dlm->node_num);
  	spin_unlock(&dlm->master_lock);
  	spin_unlock(&dlm->spinlock);
  
  	if (ret == -EEXIST) {
  		mlog(0, "another process is already migrating it
  ");
  		goto fail;
  	}
  	mle_added = 1;
  
  	/*
  	 * set the MIGRATING flag and flush asts
  	 * if we fail after this we need to re-dirty the lockres
  	 */
  	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
  		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
  		     "the target went down.
  ", res->lockname.len,
  		     res->lockname.name, target);
  		spin_lock(&res->spinlock);
  		res->state &= ~DLM_LOCK_RES_MIGRATING;
a6fa36402   Kurt Hackel   ocfs2_dlm: wake u...
2564
  		wake = 1;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
  		spin_unlock(&res->spinlock);
  		ret = -EINVAL;
  	}
  
  fail:
  	if (oldmle) {
  		/* master is known, detach if not already detached */
  		dlm_mle_detach_hb_events(dlm, oldmle);
  		dlm_put_mle(oldmle);
  	}
  
  	if (ret < 0) {
  		if (mle_added) {
  			dlm_mle_detach_hb_events(dlm, mle);
  			dlm_put_mle(mle);
  		} else if (mle) {
  			kmem_cache_free(dlm_mle_cache, mle);
  		}
  		goto leave;
  	}
  
  	/*
  	 * at this point, we have a migration target, an mle
  	 * in the master list, and the MIGRATING flag set on
  	 * the lockres
  	 */
1cd04dbe3   Kurt Hackel   ocfs2_dlm: Flush ...
2591
2592
2593
  	/* now that remote nodes are spinning on the MIGRATING flag,
  	 * ensure that all assert_master work is flushed. */
  	flush_workqueue(dlm->dlm_worker);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2594
2595
2596
2597
2598
2599
2600
2601
  
  	/* get an extra reference on the mle.
  	 * otherwise the assert_master from the new
  	 * master will destroy this.
  	 * also, make sure that all callers of dlm_get_mle
  	 * take both dlm->spinlock and dlm->master_lock */
  	spin_lock(&dlm->spinlock);
  	spin_lock(&dlm->master_lock);
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
2602
  	dlm_get_mle_inuse(mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
  	spin_unlock(&dlm->master_lock);
  	spin_unlock(&dlm->spinlock);
  
  	/* notify new node and send all lock state */
  	/* call send_one_lockres with migration flag.
  	 * this serves as notice to the target node that a
  	 * migration is starting. */
  	ret = dlm_send_one_lockres(dlm, res, mres, target,
  				   DLM_MRES_MIGRATION);
  
  	if (ret < 0) {
  		mlog(0, "migration to node %u failed with %d
  ",
  		     target, ret);
  		/* migration failed, detach and clean up mle */
  		dlm_mle_detach_hb_events(dlm, mle);
  		dlm_put_mle(mle);
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
2620
2621
2622
  		dlm_put_mle_inuse(mle);
  		spin_lock(&res->spinlock);
  		res->state &= ~DLM_LOCK_RES_MIGRATING;
a6fa36402   Kurt Hackel   ocfs2_dlm: wake u...
2623
  		wake = 1;
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
2624
  		spin_unlock(&res->spinlock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
  		goto leave;
  	}
  
  	/* at this point, the target sends a message to all nodes,
  	 * (using dlm_do_migrate_request).  this node is skipped since
  	 * we had to put an mle in the list to begin the process.  this
  	 * node now waits for target to do an assert master.  this node
  	 * will be the last one notified, ensuring that the migration
  	 * is complete everywhere.  if the target dies while this is
  	 * going on, some nodes could potentially see the target as the
  	 * master, so it is important that my recovery finds the migration
af901ca18   André Goddard Rosa   tree-wide: fix as...
2636
  	 * mle and sets the master to UNKNOWN. */
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
  
  
  	/* wait for new node to assert master */
  	while (1) {
  		ret = wait_event_interruptible_timeout(mle->wq,
  					(atomic_read(&mle->woken) == 1),
  					msecs_to_jiffies(5000));
  
  		if (ret >= 0) {
  		       	if (atomic_read(&mle->woken) == 1 ||
  			    res->owner == target)
  				break;
1cd04dbe3   Kurt Hackel   ocfs2_dlm: Flush ...
2649
2650
2651
  			mlog(0, "%s:%.*s: timed out during migration
  ",
  			     dlm->name, res->lockname.len, res->lockname.name);
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
2652
  			/* avoid hang during shutdown when migrating lockres
e2faea4ce   Kurt Hackel   [PATCH] ocfs2/dlm...
2653
2654
  			 * to a node which also goes down */
  			if (dlm_is_node_dead(dlm, target)) {
aa8523542   Kurt Hackel   ocfs2: mle ref co...
2655
2656
2657
  				mlog(0, "%s:%.*s: expected migration "
  				     "target %u is no longer up, restarting
  ",
e2faea4ce   Kurt Hackel   [PATCH] ocfs2/dlm...
2658
2659
  				     dlm->name, res->lockname.len,
  				     res->lockname.name, target);
1cd04dbe3   Kurt Hackel   ocfs2_dlm: Flush ...
2660
2661
2662
2663
2664
2665
2666
  				ret = -EINVAL;
  				/* migration failed, detach and clean up mle */
  				dlm_mle_detach_hb_events(dlm, mle);
  				dlm_put_mle(mle);
  				dlm_put_mle_inuse(mle);
  				spin_lock(&res->spinlock);
  				res->state &= ~DLM_LOCK_RES_MIGRATING;
a6fa36402   Kurt Hackel   ocfs2_dlm: wake u...
2667
  				wake = 1;
1cd04dbe3   Kurt Hackel   ocfs2_dlm: Flush ...
2668
2669
  				spin_unlock(&res->spinlock);
  				goto leave;
e2faea4ce   Kurt Hackel   [PATCH] ocfs2/dlm...
2670
  			}
1cd04dbe3   Kurt Hackel   ocfs2_dlm: Flush ...
2671
2672
2673
2674
  		} else
  			mlog(0, "%s:%.*s: caught signal during migration
  ",
  			     dlm->name, res->lockname.len, res->lockname.name);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
  	}
  
  	/* all done, set the owner, clear the flag */
  	spin_lock(&res->spinlock);
  	dlm_set_lockres_owner(dlm, res, target);
  	res->state &= ~DLM_LOCK_RES_MIGRATING;
  	dlm_remove_nonlocal_locks(dlm, res);
  	spin_unlock(&res->spinlock);
  	wake_up(&res->wq);
  
  	/* master is known, detach if not already detached */
  	dlm_mle_detach_hb_events(dlm, mle);
a2bf04774   Kurt Hackel   ocfs2: mle ref co...
2687
  	dlm_put_mle_inuse(mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2688
2689
2690
2691
2692
2693
2694
2695
  	ret = 0;
  
  	dlm_lockres_calc_usage(dlm, res);
  
  leave:
  	/* re-dirty the lockres if we failed */
  	if (ret < 0)
  		dlm_kick_thread(dlm, res);
a6fa36402   Kurt Hackel   ocfs2_dlm: wake u...
2696
2697
2698
2699
  	/* wake up waiters if the MIGRATING flag got set
  	 * but migration failed */
  	if (wake)
  		wake_up(&res->wq);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
  	/* TODO: cleanup */
  	if (mres)
  		free_page((unsigned long)mres);
  
  	dlm_put(dlm);
  
  	mlog(0, "returning %d
  ", ret);
  	return ret;
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2710

ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
  #define DLM_MIGRATION_RETRY_MS  100
  
  /* Should be called only after beginning the domain leave process.
   * There should not be any remaining locks on nonlocal lock resources,
   * and there should be no local locks left on locally mastered resources.
   *
   * Called with the dlm spinlock held, may drop it to do migration, but
   * will re-acquire before exit.
   *
   * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
  int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
  {
  	int ret;
  	int lock_dropped = 0;
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2725
  	int numlocks;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2726

b36c3f849   Sunil Mushran   ocfs2_dlm: Add mi...
2727
  	spin_lock(&res->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2728
2729
2730
2731
2732
2733
2734
  	if (res->owner != dlm->node_num) {
  		if (!__dlm_lockres_unused(res)) {
  			mlog(ML_ERROR, "%s:%.*s: this node is not master, "
  			     "trying to free this but locks remain
  ",
  			     dlm->name, res->lockname.len, res->lockname.name);
  		}
b36c3f849   Sunil Mushran   ocfs2_dlm: Add mi...
2735
  		spin_unlock(&res->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2736
2737
  		goto leave;
  	}
2f5bf1f2d   Sunil Mushran   ocfs2_dlm: Check ...
2738
2739
2740
2741
2742
2743
2744
  
  	/* No need to migrate a lockres having no locks */
  	ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
  	if (ret >= 0 && numlocks == 0) {
  		spin_unlock(&res->spinlock);
  		goto leave;
  	}
b36c3f849   Sunil Mushran   ocfs2_dlm: Add mi...
2745
  	spin_unlock(&res->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
  
  	/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
  	spin_unlock(&dlm->spinlock);
  	lock_dropped = 1;
  	while (1) {
  		ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
  		if (ret >= 0)
  			break;
  		if (ret == -ENOTEMPTY) {
  			mlog(ML_ERROR, "lockres %.*s still has local locks!
  ",
  		     		res->lockname.len, res->lockname.name);
  			BUG();
  		}
  
  		mlog(0, "lockres %.*s: migrate failed, "
  		     "retrying
  ", res->lockname.len,
  		     res->lockname.name);
  		msleep(DLM_MIGRATION_RETRY_MS);
  	}
  	spin_lock(&dlm->spinlock);
  leave:
  	return lock_dropped;
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
  int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
  {
  	int ret;
  	spin_lock(&dlm->ast_lock);
  	spin_lock(&lock->spinlock);
  	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
  	spin_unlock(&lock->spinlock);
  	spin_unlock(&dlm->ast_lock);
  	return ret;
  }
  
  static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
  				     struct dlm_lock_resource *res,
  				     u8 mig_target)
  {
  	int can_proceed;
  	spin_lock(&res->spinlock);
  	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
  	spin_unlock(&res->spinlock);
2bd632165   Sunil Mushran   ocfs2/trivial: Re...
2790
  	/* target has died, so make the caller break out of the
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2791
2792
2793
2794
2795
2796
2797
  	 * wait_event, but caller must recheck the domain_map */
  	spin_lock(&dlm->spinlock);
  	if (!test_bit(mig_target, dlm->domain_map))
  		can_proceed = 1;
  	spin_unlock(&dlm->spinlock);
  	return can_proceed;
  }
faf0ec9f1   Adrian Bunk   [PATCH] fs/ocfs2/...
2798
2799
  static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
  				struct dlm_lock_resource *res)
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
  {
  	int ret;
  	spin_lock(&res->spinlock);
  	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
  	spin_unlock(&res->spinlock);
  	return ret;
  }
  
  
  static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
  				       struct dlm_lock_resource *res,
  				       u8 target)
  {
  	int ret = 0;
  
  	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u
  ",
  	       res->lockname.len, res->lockname.name, dlm->node_num,
  	       target);
  	/* need to set MIGRATING flag on lockres.  this is done by
  	 * ensuring that all asts have been flushed for this lockres. */
  	spin_lock(&res->spinlock);
  	BUG_ON(res->migration_pending);
  	res->migration_pending = 1;
  	/* strategy is to reserve an extra ast then release
  	 * it below, letting the release do all of the work */
  	__dlm_lockres_reserve_ast(res);
  	spin_unlock(&res->spinlock);
ddc09c8dd   Kurt Hackel   ocfs2_dlm: Fixes ...
2828
  	/* now flush all the pending asts */
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2829
  	dlm_kick_thread(dlm, res);
ddc09c8dd   Kurt Hackel   ocfs2_dlm: Fixes ...
2830
2831
2832
2833
2834
2835
2836
  	/* before waiting on DIRTY, block processes which may
  	 * try to dirty the lockres before MIGRATING is set */
  	spin_lock(&res->spinlock);
  	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
  	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
  	spin_unlock(&res->spinlock);
  	/* now wait on any pending asts and the DIRTY state */
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
  	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
  	dlm_lockres_release_ast(dlm, res);
  
  	mlog(0, "about to wait on migration_wq, dirty=%s
  ",
  	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
  	/* if the extra ref we just put was the final one, this
  	 * will pass thru immediately.  otherwise, we need to wait
  	 * for the last ast to finish. */
  again:
  	ret = wait_event_interruptible_timeout(dlm->migration_wq,
  		   dlm_migration_can_proceed(dlm, res, target),
  		   msecs_to_jiffies(1000));
  	if (ret < 0) {
  		mlog(0, "woken again: migrating? %s, dead? %s
  ",
  		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
  		       test_bit(target, dlm->domain_map) ? "no":"yes");
  	} else {
  		mlog(0, "all is well: migrating? %s, dead? %s
  ",
  		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
  		       test_bit(target, dlm->domain_map) ? "no":"yes");
  	}
  	if (!dlm_migration_can_proceed(dlm, res, target)) {
  		mlog(0, "trying again...
  ");
  		goto again;
  	}
a39953dd9   Wengang Wang   ocfs2/dlm: Remove...
2866
  	ret = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
  	/* did the target go down or die? */
  	spin_lock(&dlm->spinlock);
  	if (!test_bit(target, dlm->domain_map)) {
  		mlog(ML_ERROR, "aha. migration target %u just went down
  ",
  		     target);
  		ret = -EHOSTDOWN;
  	}
  	spin_unlock(&dlm->spinlock);
  
  	/*
a39953dd9   Wengang Wang   ocfs2/dlm: Remove...
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
  	 * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
  	 * another try; otherwise, we are sure the MIGRATING state is there,
  	 * drop the unneded state which blocked threads trying to DIRTY
  	 */
  	spin_lock(&res->spinlock);
  	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
  	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
  	if (!ret)
  		BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
  	spin_unlock(&res->spinlock);
  
  	/*
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2890
2891
  	 * at this point:
  	 *
a39953dd9   Wengang Wang   ocfs2/dlm: Remove...
2892
  	 *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
  	 *   o there are no pending asts on this lockres
  	 *   o all processes trying to reserve an ast on this
  	 *     lockres must wait for the MIGRATING flag to clear
  	 */
  	return ret;
  }
  
  /* last step in the migration process.
   * original master calls this to free all of the dlm_lock
   * structures that used to be for other nodes. */
  static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
  				      struct dlm_lock_resource *res)
  {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2906
  	struct list_head *queue = &res->granted;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2907
  	int i, bit;
800deef3f   Christoph Hellwig   [PATCH] ocfs2: us...
2908
  	struct dlm_lock *lock, *next;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2909
2910
2911
2912
2913
2914
  
  	assert_spin_locked(&res->spinlock);
  
  	BUG_ON(res->owner == dlm->node_num);
  
  	for (i=0; i<3; i++) {
800deef3f   Christoph Hellwig   [PATCH] ocfs2: us...
2915
  		list_for_each_entry_safe(lock, next, queue, list) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2916
2917
2918
2919
2920
2921
2922
2923
2924
  			if (lock->ml.node != dlm->node_num) {
  				mlog(0, "putting lock for node %u
  ",
  				     lock->ml.node);
  				/* be extra careful */
  				BUG_ON(!list_empty(&lock->ast_list));
  				BUG_ON(!list_empty(&lock->bast_list));
  				BUG_ON(lock->ast_pending);
  				BUG_ON(lock->bast_pending);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2925
  				dlm_lockres_clear_refmap_bit(lock->ml.node, res);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2926
2927
  				list_del_init(&lock->list);
  				dlm_lock_put(lock);
2c5c54aca   Sunil Mushran   ocfs2/dlm: Add mi...
2928
2929
2930
  				/* In a normal unlock, we would have added a
  				 * DLM_UNLOCK_FREE_LOCK action. Force it. */
  				dlm_lock_put(lock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2931
2932
2933
2934
  			}
  		}
  		queue++;
  	}
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
  	bit = 0;
  	while (1) {
  		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
  		if (bit >= O2NM_MAX_NODES)
  			break;
  		/* do not clear the local node reference, if there is a
  		 * process holding this, let it drop the ref itself */
  		if (bit != dlm->node_num) {
  			mlog(0, "%s:%.*s: node %u had a ref to this "
  			     "migrating lockres, clearing
  ", dlm->name,
  			     res->lockname.len, res->lockname.name, bit);
  			dlm_lockres_clear_refmap_bit(bit, res);
  		}
  		bit++;
  	}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
  }
  
  /* for now this is not too intelligent.  we will
   * need stats to make this do the right thing.
   * this just finds the first lock on one of the
   * queues and uses that node as the target. */
  static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
  				    struct dlm_lock_resource *res)
  {
  	int i;
  	struct list_head *queue = &res->granted;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2962
2963
2964
2965
2966
2967
2968
  	struct dlm_lock *lock;
  	int nodenum;
  
  	assert_spin_locked(&dlm->spinlock);
  
  	spin_lock(&res->spinlock);
  	for (i=0; i<3; i++) {
800deef3f   Christoph Hellwig   [PATCH] ocfs2: us...
2969
  		list_for_each_entry(lock, queue, list) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2970
2971
  			/* up to the caller to make sure this node
  			 * is alive */
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
  			if (lock->ml.node != dlm->node_num) {
  				spin_unlock(&res->spinlock);
  				return lock->ml.node;
  			}
  		}
  		queue++;
  	}
  	spin_unlock(&res->spinlock);
  	mlog(0, "have not found a suitable target yet! checking domain map
  ");
  
  	/* ok now we're getting desperate.  pick anyone alive. */
  	nodenum = -1;
  	while (1) {
  		nodenum = find_next_bit(dlm->domain_map,
  					O2NM_MAX_NODES, nodenum+1);
  		mlog(0, "found %d in domain map
  ", nodenum);
  		if (nodenum >= O2NM_MAX_NODES)
  			break;
  		if (nodenum != dlm->node_num) {
  			mlog(0, "picking %d
  ", nodenum);
  			return nodenum;
  		}
  	}
  
  	mlog(0, "giving up.  no master to migrate to
  ");
  	return DLM_LOCK_RES_OWNER_UNKNOWN;
  }
  
  
  
  /* this is called by the new master once all lockres
   * data has been received */
  static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
  				  struct dlm_lock_resource *res,
  				  u8 master, u8 new_master,
  				  struct dlm_node_iter *iter)
  {
  	struct dlm_migrate_request migrate;
2b8325640   Sunil Mushran   ocfs2/dlm: Fix a ...
3014
  	int ret, skip, status = 0;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
  	int nodenum;
  
  	memset(&migrate, 0, sizeof(migrate));
  	migrate.namelen = res->lockname.len;
  	memcpy(migrate.name, res->lockname.name, migrate.namelen);
  	migrate.new_master = new_master;
  	migrate.master = master;
  
  	ret = 0;
  
  	/* send message to all nodes, except the master and myself */
  	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
  		if (nodenum == master ||
  		    nodenum == new_master)
  			continue;
2b8325640   Sunil Mushran   ocfs2/dlm: Fix a ...
3030
3031
3032
3033
3034
3035
3036
3037
  		/* We could race exit domain. If exited, skip. */
  		spin_lock(&dlm->spinlock);
  		skip = (!test_bit(nodenum, dlm->domain_map));
  		spin_unlock(&dlm->spinlock);
  		if (skip) {
  			clear_bit(nodenum, iter->node_map);
  			continue;
  		}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3038
3039
3040
  		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
  					 &migrate, sizeof(migrate), nodenum,
  					 &status);
2b8325640   Sunil Mushran   ocfs2/dlm: Fix a ...
3041
  		if (ret < 0) {
a5196ec5e   Wengang Wang   ocfs2: print node...
3042
3043
3044
3045
  			mlog(ML_ERROR, "Error %d when sending message %u (key "
  			     "0x%x) to node %u
  ", ret, DLM_MIGRATE_REQUEST_MSG,
  			     dlm->key, nodenum);
2b8325640   Sunil Mushran   ocfs2/dlm: Fix a ...
3046
3047
3048
3049
3050
3051
3052
3053
  			if (!dlm_is_host_down(ret)) {
  				mlog(ML_ERROR, "unhandled error=%d!
  ", ret);
  				BUG();
  			}
  			clear_bit(nodenum, iter->node_map);
  			ret = 0;
  		} else if (status < 0) {
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3054
3055
3056
3057
  			mlog(0, "migrate request (node %u) returned %d!
  ",
  			     nodenum, status);
  			ret = status;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
  		} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
  			/* during the migration request we short-circuited
  			 * the mastery of the lockres.  make sure we have
  			 * a mastery ref for nodenum */
  			mlog(0, "%s:%.*s: need ref for node %u
  ",
  			     dlm->name, res->lockname.len, res->lockname.name,
  			     nodenum);
  			spin_lock(&res->spinlock);
  			dlm_lockres_set_refmap_bit(nodenum, res);
  			spin_unlock(&res->spinlock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
  		}
  	}
  
  	if (ret < 0)
  		mlog_errno(ret);
  
  	mlog(0, "returning ret=%d
  ", ret);
  	return ret;
  }
  
  
  /* if there is an existing mle for this lockres, we now know who the master is.
   * (the one who sent us *this* message) we can clear it up right away.
   * since the process that put the mle on the list still has a reference to it,
   * we can unhash it now, set the master and wake the process.  as a result,
   * we will have no mle in the list to start with.  now we can add an mle for
   * the migration and this should be the only one found for those scanning the
   * list.  */
d74c9803a   Kurt Hackel   ocfs2: Added post...
3088
3089
  int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
  				void **ret_data)
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3090
3091
3092
3093
3094
3095
  {
  	struct dlm_ctxt *dlm = data;
  	struct dlm_lock_resource *res = NULL;
  	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
  	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
  	const char *name;
a3d332915   Mark Fasheh   ocfs2: calculate ...
3096
  	unsigned int namelen, hash;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3097
3098
3099
3100
3101
3102
3103
  	int ret = 0;
  
  	if (!dlm_grab(dlm))
  		return -EINVAL;
  
  	name = migrate->name;
  	namelen = migrate->namelen;
a3d332915   Mark Fasheh   ocfs2: calculate ...
3104
  	hash = dlm_lockid_hash(name, namelen);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3105
3106
  
  	/* preallocate.. if this fails, abort */
3914ed0ce   Julia Lawall   fs/ocfs2/dlm: Dro...
3107
  	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3108
3109
3110
3111
3112
3113
3114
3115
  
  	if (!mle) {
  		ret = -ENOMEM;
  		goto leave;
  	}
  
  	/* check for pre-existing lock */
  	spin_lock(&dlm->spinlock);
a3d332915   Mark Fasheh   ocfs2: calculate ...
3116
  	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
  	if (res) {
  		spin_lock(&res->spinlock);
  		if (res->state & DLM_LOCK_RES_RECOVERING) {
  			/* if all is working ok, this can only mean that we got
  		 	* a migrate request from a node that we now see as
  		 	* dead.  what can we do here?  drop it to the floor? */
  			spin_unlock(&res->spinlock);
  			mlog(ML_ERROR, "Got a migrate request, but the "
  			     "lockres is marked as recovering!");
  			kmem_cache_free(dlm_mle_cache, mle);
  			ret = -EINVAL; /* need a better solution */
  			goto unlock;
  		}
  		res->state |= DLM_LOCK_RES_MIGRATING;
  		spin_unlock(&res->spinlock);
  	}
6d98c3ccb   Wengang Wang   ocfs2/dlm: fix a ...
3133
  	spin_lock(&dlm->master_lock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3134
3135
3136
3137
3138
  	/* ignore status.  only nonzero status would BUG. */
  	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
  				    name, namelen,
  				    migrate->new_master,
  				    migrate->master);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3139
  	spin_unlock(&dlm->master_lock);
6d98c3ccb   Wengang Wang   ocfs2/dlm: fix a ...
3140
  unlock:
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
  	spin_unlock(&dlm->spinlock);
  
  	if (oldmle) {
  		/* master is known, detach if not already detached */
  		dlm_mle_detach_hb_events(dlm, oldmle);
  		dlm_put_mle(oldmle);
  	}
  
  	if (res)
  		dlm_lockres_put(res);
  leave:
  	dlm_put(dlm);
  	return ret;
  }
  
  /* must be holding dlm->spinlock and dlm->master_lock
   * when adding a migration mle, we can clear any other mles
   * in the master list because we know with certainty that
   * the master is "master".  so we remove any old mle from
   * the list after setting it's master field, and then add
   * the new migration mle.  this way we can hold with the rule
   * of having only one mle for a given lock name at all times. */
  static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
  				 struct dlm_lock_resource *res,
  				 struct dlm_master_list_entry *mle,
  				 struct dlm_master_list_entry **oldmle,
  				 const char *name, unsigned int namelen,
  				 u8 new_master, u8 master)
  {
  	int found;
  	int ret = 0;
  
  	*oldmle = NULL;
  
  	mlog_entry_void();
  
  	assert_spin_locked(&dlm->spinlock);
  	assert_spin_locked(&dlm->master_lock);
  
  	/* caller is responsible for any ref taken here on oldmle */
  	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
  	if (found) {
  		struct dlm_master_list_entry *tmp = *oldmle;
  		spin_lock(&tmp->spinlock);
  		if (tmp->type == DLM_MLE_MIGRATION) {
  			if (master == dlm->node_num) {
  				/* ah another process raced me to it */
  				mlog(0, "tried to migrate %.*s, but some "
  				     "process beat me to it
  ",
  				     namelen, name);
  				ret = -EEXIST;
  			} else {
  				/* bad.  2 NODES are trying to migrate! */
  				mlog(ML_ERROR, "migration error  mle: "
  				     "master=%u new_master=%u // request: "
  				     "master=%u new_master=%u // "
  				     "lockres=%.*s
  ",
  				     tmp->master, tmp->new_master,
  				     master, new_master,
  				     namelen, name);
  				BUG();
  			}
  		} else {
  			/* this is essentially what assert_master does */
  			tmp->master = master;
  			atomic_set(&tmp->woken, 1);
  			wake_up(&tmp->wq);
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
3210
3211
  			/* remove it so that only one mle will be found */
  			__dlm_unlink_mle(dlm, tmp);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
3212
3213
3214
3215
3216
3217
3218
  			__dlm_mle_detach_hb_events(dlm, tmp);
  			ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
  			mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
  			    "telling master to get ref for cleared out mle "
  			    "during migration
  ", dlm->name, namelen, name,
  			    master, new_master);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3219
3220
3221
3222
3223
3224
3225
  		}
  		spin_unlock(&tmp->spinlock);
  	}
  
  	/* now add a migration mle to the tail of the list */
  	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
  	mle->new_master = new_master;
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
3226
3227
  	/* the new master will be sending an assert master for this.
  	 * at that point we will get the refmap reference */
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3228
3229
3230
  	mle->master = master;
  	/* do this for consistency with other mle types */
  	set_bit(new_master, mle->maybe_map);
1c0845773   Sunil Mushran   ocfs2/dlm: Encaps...
3231
  	__dlm_insert_mle(dlm, mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3232
3233
3234
  
  	return ret;
  }
c2cd4a443   Sunil Mushran   ocfs2/dlm: Refact...
3235
3236
3237
3238
3239
3240
3241
  /*
   * Sets the owner of the lockres, associated to the mle, to UNKNOWN
   */
  static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
  					struct dlm_master_list_entry *mle)
  {
  	struct dlm_lock_resource *res;
c2cd4a443   Sunil Mushran   ocfs2/dlm: Refact...
3242
3243
  
  	/* Find the lockres associated to the mle and set its owner to UNK */
7141514b8   Sunil Mushran   ocfs2/dlm: Remove...
3244
3245
  	res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
  				   mle->mnamehash);
c2cd4a443   Sunil Mushran   ocfs2/dlm: Refact...
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
  	if (res) {
  		spin_unlock(&dlm->master_lock);
  
  		/* move lockres onto recovery list */
  		spin_lock(&res->spinlock);
  		dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
  		dlm_move_lockres_to_recovery_list(dlm, res);
  		spin_unlock(&res->spinlock);
  		dlm_lockres_put(res);
  
  		/* about to get rid of mle, detach from heartbeat */
  		__dlm_mle_detach_hb_events(dlm, mle);
  
  		/* dump the mle */
  		spin_lock(&dlm->master_lock);
  		__dlm_put_mle(mle);
  		spin_unlock(&dlm->master_lock);
  	}
  
  	return res;
  }
  
  static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
  				    struct dlm_master_list_entry *mle)
  {
  	__dlm_mle_detach_hb_events(dlm, mle);
  
  	spin_lock(&mle->spinlock);
  	__dlm_unlink_mle(dlm, mle);
  	atomic_set(&mle->woken, 1);
  	spin_unlock(&mle->spinlock);
  
  	wake_up(&mle->wq);
  }
  
  static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
  				struct dlm_master_list_entry *mle, u8 dead_node)
  {
  	int bit;
  
  	BUG_ON(mle->type != DLM_MLE_BLOCK);
  
  	spin_lock(&mle->spinlock);
  	bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
  	if (bit != dead_node) {
  		mlog(0, "mle found, but dead node %u would not have been "
  		     "master
  ", dead_node);
  		spin_unlock(&mle->spinlock);
  	} else {
  		/* Must drop the refcount by one since the assert_master will
  		 * never arrive. This may result in the mle being unlinked and
  		 * freed, but there may still be a process waiting in the
  		 * dlmlock path which is fine. */
  		mlog(0, "node %u was expected master
  ", dead_node);
  		atomic_set(&mle->woken, 1);
  		spin_unlock(&mle->spinlock);
  		wake_up(&mle->wq);
  
  		/* Do not need events any longer, so detach from heartbeat */
  		__dlm_mle_detach_hb_events(dlm, mle);
  		__dlm_put_mle(mle);
  	}
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3311
3312
3313
  
  void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
  {
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
3314
  	struct dlm_master_list_entry *mle;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3315
  	struct dlm_lock_resource *res;
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
3316
3317
3318
  	struct hlist_head *bucket;
  	struct hlist_node *list;
  	unsigned int i;
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3319
3320
3321
3322
3323
3324
3325
3326
  
  	mlog_entry("dlm=%s, dead node=%u
  ", dlm->name, dead_node);
  top:
  	assert_spin_locked(&dlm->spinlock);
  
  	/* clean the master list */
  	spin_lock(&dlm->master_lock);
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
3327
3328
3329
3330
3331
  	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
  		bucket = dlm_master_hash(dlm, i);
  		hlist_for_each(list, bucket) {
  			mle = hlist_entry(list, struct dlm_master_list_entry,
  					  master_hash_node);
67ae1f060   Sunil Mushran   ocfs2/dlm: Indent...
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
  			BUG_ON(mle->type != DLM_MLE_BLOCK &&
  			       mle->type != DLM_MLE_MASTER &&
  			       mle->type != DLM_MLE_MIGRATION);
  
  			/* MASTER mles are initiated locally. The waiting
  			 * process will notice the node map change shortly.
  			 * Let that happen as normal. */
  			if (mle->type == DLM_MLE_MASTER)
  				continue;
  
  			/* BLOCK mles are initiated by other nodes. Need to
  			 * clean up if the dead node would have been the
  			 * master. */
  			if (mle->type == DLM_MLE_BLOCK) {
  				dlm_clean_block_mle(dlm, mle, dead_node);
  				continue;
  			}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3349

67ae1f060   Sunil Mushran   ocfs2/dlm: Indent...
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364
3365
3366
3367
3368
3369
3370
3371
3372
3373
3374
3375
3376
3377
3378
3379
3380
3381
3382
3383
3384
3385
  			/* Everything else is a MIGRATION mle */
  
  			/* The rule for MIGRATION mles is that the master
  			 * becomes UNKNOWN if *either* the original or the new
  			 * master dies. All UNKNOWN lockres' are sent to
  			 * whichever node becomes the recovery master. The new
  			 * master is responsible for determining if there is
  			 * still a master for this lockres, or if he needs to
  			 * take over mastery. Either way, this node should
  			 * expect another message to resolve this. */
  
  			if (mle->master != dead_node &&
  			    mle->new_master != dead_node)
  				continue;
  
  			/* If we have reached this point, this mle needs to be
  			 * removed from the list and freed. */
  			dlm_clean_migration_mle(dlm, mle);
  
  			mlog(0, "%s: node %u died during migration from "
  			     "%u to %u!
  ", dlm->name, dead_node, mle->master,
  			     mle->new_master);
  
  			/* If we find a lockres associated with the mle, we've
  			 * hit this rare case that messes up our lock ordering.
  			 * If so, we need to drop the master lock so that we can
  			 * take the lockres lock, meaning that we will have to
  			 * restart from the head of list. */
  			res = dlm_reset_mleres_owner(dlm, mle);
  			if (res)
  				/* restart */
  				goto top;
  
  			/* This may be the last reference */
  			__dlm_put_mle(mle);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3386
  		}
2ed6c750d   Sunil Mushran   ocfs2/dlm: Activa...
3387
  	}
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3388
3389
  	spin_unlock(&dlm->master_lock);
  }
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3390
3391
3392
3393
3394
3395
3396
3397
3398
3399
3400
  int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
  			 u8 old_master)
  {
  	struct dlm_node_iter iter;
  	int ret = 0;
  
  	spin_lock(&dlm->spinlock);
  	dlm_node_iter_init(dlm->domain_map, &iter);
  	clear_bit(old_master, iter.node_map);
  	clear_bit(dlm->node_num, iter.node_map);
  	spin_unlock(&dlm->spinlock);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
3401
3402
3403
3404
3405
3406
  	/* ownership of the lockres is changing.  account for the
  	 * mastery reference here since old_master will briefly have
  	 * a reference after the migration completes */
  	spin_lock(&res->spinlock);
  	dlm_lockres_set_refmap_bit(old_master, res);
  	spin_unlock(&res->spinlock);
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3407
3408
3409
3410
3411
3412
3413
3414
3415
3416
3417
3418
3419
3420
  	mlog(0, "now time to do a migrate request to other nodes
  ");
  	ret = dlm_do_migrate_request(dlm, res, old_master,
  				     dlm->node_num, &iter);
  	if (ret < 0) {
  		mlog_errno(ret);
  		goto leave;
  	}
  
  	mlog(0, "doing assert master of %.*s to all except the original node
  ",
  	     res->lockname.len, res->lockname.name);
  	/* this call now finishes out the nodemap
  	 * even if one or more nodes die */
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
3421
  	ret = dlm_do_assert_master(dlm, res, iter.node_map,
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3422
3423
3424
3425
3426
3427
3428
3429
3430
3431
3432
3433
  				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
  	if (ret < 0) {
  		/* no longer need to retry.  all living nodes contacted. */
  		mlog_errno(ret);
  		ret = 0;
  	}
  
  	memset(iter.node_map, 0, sizeof(iter.node_map));
  	set_bit(old_master, iter.node_map);
  	mlog(0, "doing assert master of %.*s back to %u
  ",
  	     res->lockname.len, res->lockname.name, old_master);
ba2bf2185   Kurt Hackel   ocfs2_dlm: fix cl...
3434
  	ret = dlm_do_assert_master(dlm, res, iter.node_map,
6714d8e86   Kurt Hackel   [PATCH] OCFS2: Th...
3435
3436
3437
3438
3439
3440
3441
3442
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
3454
3455
3456
3457
3458
3459
3460
3461
3462
3463
3464
3465
3466
3467
3468
3469
3470
3471
3472
3473
3474
3475
3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
3496
3497
3498
3499
3500
3501
3502
3503
3504
3505
3506
3507
  				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
  	if (ret < 0) {
  		mlog(0, "assert master to original master failed "
  		     "with %d.
  ", ret);
  		/* the only nonzero status here would be because of
  		 * a dead original node.  we're done. */
  		ret = 0;
  	}
  
  	/* all done, set the owner, clear the flag */
  	spin_lock(&res->spinlock);
  	dlm_set_lockres_owner(dlm, res, dlm->node_num);
  	res->state &= ~DLM_LOCK_RES_MIGRATING;
  	spin_unlock(&res->spinlock);
  	/* re-dirty it on the new master */
  	dlm_kick_thread(dlm, res);
  	wake_up(&res->wq);
  leave:
  	return ret;
  }
  
  /*
   * LOCKRES AST REFCOUNT
   * this is integral to migration
   */
  
  /* for future intent to call an ast, reserve one ahead of time.
   * this should be called only after waiting on the lockres
   * with dlm_wait_on_lockres, and while still holding the
   * spinlock after the call. */
  void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
  {
  	assert_spin_locked(&res->spinlock);
  	if (res->state & DLM_LOCK_RES_MIGRATING) {
  		__dlm_print_one_lock_resource(res);
  	}
  	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
  
  	atomic_inc(&res->asts_reserved);
  }
  
  /*
   * used to drop the reserved ast, either because it went unused,
   * or because the ast/bast was actually called.
   *
   * also, if there is a pending migration on this lockres,
   * and this was the last pending ast on the lockres,
   * atomically set the MIGRATING flag before we drop the lock.
   * this is how we ensure that migration can proceed with no
   * asts in progress.  note that it is ok if the state of the
   * queues is such that a lock should be granted in the future
   * or that a bast should be fired, because the new master will
   * shuffle the lists on this lockres as soon as it is migrated.
   */
  void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
  			     struct dlm_lock_resource *res)
  {
  	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
  		return;
  
  	if (!res->migration_pending) {
  		spin_unlock(&res->spinlock);
  		return;
  	}
  
  	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
  	res->migration_pending = 0;
  	res->state |= DLM_LOCK_RES_MIGRATING;
  	spin_unlock(&res->spinlock);
  	wake_up(&res->wq);
  	wake_up(&dlm->migration_wq);
  }
5dad6c39d   Srinivas Eeda   o2dlm: force free...
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525
3526
3527
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
  
  void dlm_force_free_mles(struct dlm_ctxt *dlm)
  {
  	int i;
  	struct hlist_head *bucket;
  	struct dlm_master_list_entry *mle;
  	struct hlist_node *tmp, *list;
  
  	/*
  	 * We notified all other nodes that we are exiting the domain and
  	 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
  	 * around we force free them and wake any processes that are waiting
  	 * on the mles
  	 */
  	spin_lock(&dlm->spinlock);
  	spin_lock(&dlm->master_lock);
  
  	BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
  	BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
  
  	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
  		bucket = dlm_master_hash(dlm, i);
  		hlist_for_each_safe(list, tmp, bucket) {
  			mle = hlist_entry(list, struct dlm_master_list_entry,
  					  master_hash_node);
  			if (mle->type != DLM_MLE_BLOCK) {
  				mlog(ML_ERROR, "bad mle: %p
  ", mle);
  				dlm_print_one_mle(mle);
  			}
  			atomic_set(&mle->woken, 1);
  			wake_up(&mle->wq);
  
  			__dlm_unlink_mle(dlm, mle);
  			__dlm_mle_detach_hb_events(dlm, mle);
  			__dlm_put_mle(mle);
  		}
  	}
  	spin_unlock(&dlm->master_lock);
  	spin_unlock(&dlm->spinlock);
  }