Commit 0db638f44e7db9732d9c5704ca837f57ce061f42

Authored by Mark Fasheh
1 parent 4ba63adce0

ocfs2: warn the user on a dead timeout mismatch

Print a warning to the user when a node with a different dead count joins
the region.

Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>

Showing 2 changed files with 21 additions and 0 deletions Inline Diff

fs/ocfs2/cluster/heartbeat.c
1 /* -*- mode: c; c-basic-offset: 8; -*- 1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0: 2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 * 3 *
4 * Copyright (C) 2004, 2005 Oracle. All rights reserved. 4 * Copyright (C) 2004, 2005 Oracle. All rights reserved.
5 * 5 *
6 * This program is free software; you can redistribute it and/or 6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public 7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either 8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version. 9 * version 2 of the License, or (at your option) any later version.
10 * 10 *
11 * This program is distributed in the hope that it will be useful, 11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details. 14 * General Public License for more details.
15 * 15 *
16 * You should have received a copy of the GNU General Public 16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the 17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA. 19 * Boston, MA 021110-1307, USA.
20 */ 20 */
21 21
22 #include <linux/kernel.h> 22 #include <linux/kernel.h>
23 #include <linux/sched.h> 23 #include <linux/sched.h>
24 #include <linux/jiffies.h> 24 #include <linux/jiffies.h>
25 #include <linux/module.h> 25 #include <linux/module.h>
26 #include <linux/fs.h> 26 #include <linux/fs.h>
27 #include <linux/bio.h> 27 #include <linux/bio.h>
28 #include <linux/blkdev.h> 28 #include <linux/blkdev.h>
29 #include <linux/delay.h> 29 #include <linux/delay.h>
30 #include <linux/file.h> 30 #include <linux/file.h>
31 #include <linux/kthread.h> 31 #include <linux/kthread.h>
32 #include <linux/configfs.h> 32 #include <linux/configfs.h>
33 #include <linux/random.h> 33 #include <linux/random.h>
34 #include <linux/crc32.h> 34 #include <linux/crc32.h>
35 #include <linux/time.h> 35 #include <linux/time.h>
36 36
37 #include "heartbeat.h" 37 #include "heartbeat.h"
38 #include "tcp.h" 38 #include "tcp.h"
39 #include "nodemanager.h" 39 #include "nodemanager.h"
40 #include "quorum.h" 40 #include "quorum.h"
41 41
42 #include "masklog.h" 42 #include "masklog.h"
43 43
44 44
45 /* 45 /*
46 * The first heartbeat pass had one global thread that would serialize all hb 46 * The first heartbeat pass had one global thread that would serialize all hb
47 * callback calls. This global serializing sem should only be removed once 47 * callback calls. This global serializing sem should only be removed once
48 * we've made sure that all callees can deal with being called concurrently 48 * we've made sure that all callees can deal with being called concurrently
49 * from multiple hb region threads. 49 * from multiple hb region threads.
50 */ 50 */
51 static DECLARE_RWSEM(o2hb_callback_sem); 51 static DECLARE_RWSEM(o2hb_callback_sem);
52 52
53 /* 53 /*
54 * multiple hb threads are watching multiple regions. A node is live 54 * multiple hb threads are watching multiple regions. A node is live
55 * whenever any of the threads sees activity from the node in its region. 55 * whenever any of the threads sees activity from the node in its region.
56 */ 56 */
57 static DEFINE_SPINLOCK(o2hb_live_lock); 57 static DEFINE_SPINLOCK(o2hb_live_lock);
58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES]; 58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
60 static LIST_HEAD(o2hb_node_events); 60 static LIST_HEAD(o2hb_node_events);
61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); 61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
62 62
63 static LIST_HEAD(o2hb_all_regions); 63 static LIST_HEAD(o2hb_all_regions);
64 64
65 static struct o2hb_callback { 65 static struct o2hb_callback {
66 struct list_head list; 66 struct list_head list;
67 } o2hb_callbacks[O2HB_NUM_CB]; 67 } o2hb_callbacks[O2HB_NUM_CB];
68 68
69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); 69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
70 70
71 #define O2HB_DEFAULT_BLOCK_BITS 9 71 #define O2HB_DEFAULT_BLOCK_BITS 9
72 72
73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; 73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
74 74
75 /* Only sets a new threshold if there are no active regions. 75 /* Only sets a new threshold if there are no active regions.
76 * 76 *
77 * No locking or otherwise interesting code is required for reading 77 * No locking or otherwise interesting code is required for reading
78 * o2hb_dead_threshold as it can't change once regions are active and 78 * o2hb_dead_threshold as it can't change once regions are active and
79 * it's not interesting to anyone until then anyway. */ 79 * it's not interesting to anyone until then anyway. */
80 static void o2hb_dead_threshold_set(unsigned int threshold) 80 static void o2hb_dead_threshold_set(unsigned int threshold)
81 { 81 {
82 if (threshold > O2HB_MIN_DEAD_THRESHOLD) { 82 if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
83 spin_lock(&o2hb_live_lock); 83 spin_lock(&o2hb_live_lock);
84 if (list_empty(&o2hb_all_regions)) 84 if (list_empty(&o2hb_all_regions))
85 o2hb_dead_threshold = threshold; 85 o2hb_dead_threshold = threshold;
86 spin_unlock(&o2hb_live_lock); 86 spin_unlock(&o2hb_live_lock);
87 } 87 }
88 } 88 }
89 89
90 struct o2hb_node_event { 90 struct o2hb_node_event {
91 struct list_head hn_item; 91 struct list_head hn_item;
92 enum o2hb_callback_type hn_event_type; 92 enum o2hb_callback_type hn_event_type;
93 struct o2nm_node *hn_node; 93 struct o2nm_node *hn_node;
94 int hn_node_num; 94 int hn_node_num;
95 }; 95 };
96 96
97 struct o2hb_disk_slot { 97 struct o2hb_disk_slot {
98 struct o2hb_disk_heartbeat_block *ds_raw_block; 98 struct o2hb_disk_heartbeat_block *ds_raw_block;
99 u8 ds_node_num; 99 u8 ds_node_num;
100 u64 ds_last_time; 100 u64 ds_last_time;
101 u64 ds_last_generation; 101 u64 ds_last_generation;
102 u16 ds_equal_samples; 102 u16 ds_equal_samples;
103 u16 ds_changed_samples; 103 u16 ds_changed_samples;
104 struct list_head ds_live_item; 104 struct list_head ds_live_item;
105 }; 105 };
106 106
107 /* each thread owns a region.. when we're asked to tear down the region 107 /* each thread owns a region.. when we're asked to tear down the region
108 * we ask the thread to stop, who cleans up the region */ 108 * we ask the thread to stop, who cleans up the region */
109 struct o2hb_region { 109 struct o2hb_region {
110 struct config_item hr_item; 110 struct config_item hr_item;
111 111
112 struct list_head hr_all_item; 112 struct list_head hr_all_item;
113 unsigned hr_unclean_stop:1; 113 unsigned hr_unclean_stop:1;
114 114
115 /* protected by the hr_callback_sem */ 115 /* protected by the hr_callback_sem */
116 struct task_struct *hr_task; 116 struct task_struct *hr_task;
117 117
118 unsigned int hr_blocks; 118 unsigned int hr_blocks;
119 unsigned long long hr_start_block; 119 unsigned long long hr_start_block;
120 120
121 unsigned int hr_block_bits; 121 unsigned int hr_block_bits;
122 unsigned int hr_block_bytes; 122 unsigned int hr_block_bytes;
123 123
124 unsigned int hr_slots_per_page; 124 unsigned int hr_slots_per_page;
125 unsigned int hr_num_pages; 125 unsigned int hr_num_pages;
126 126
127 struct page **hr_slot_data; 127 struct page **hr_slot_data;
128 struct block_device *hr_bdev; 128 struct block_device *hr_bdev;
129 struct o2hb_disk_slot *hr_slots; 129 struct o2hb_disk_slot *hr_slots;
130 130
131 /* let the person setting up hb wait for it to return until it 131 /* let the person setting up hb wait for it to return until it
132 * has reached a 'steady' state. This will be fixed when we have 132 * has reached a 'steady' state. This will be fixed when we have
133 * a more complete api that doesn't lead to this sort of fragility. */ 133 * a more complete api that doesn't lead to this sort of fragility. */
134 atomic_t hr_steady_iterations; 134 atomic_t hr_steady_iterations;
135 135
136 char hr_dev_name[BDEVNAME_SIZE]; 136 char hr_dev_name[BDEVNAME_SIZE];
137 137
138 unsigned int hr_timeout_ms; 138 unsigned int hr_timeout_ms;
139 139
140 /* randomized as the region goes up and down so that a node 140 /* randomized as the region goes up and down so that a node
141 * recognizes a node going up and down in one iteration */ 141 * recognizes a node going up and down in one iteration */
142 u64 hr_generation; 142 u64 hr_generation;
143 143
144 struct work_struct hr_write_timeout_work; 144 struct work_struct hr_write_timeout_work;
145 unsigned long hr_last_timeout_start; 145 unsigned long hr_last_timeout_start;
146 146
147 /* Used during o2hb_check_slot to hold a copy of the block 147 /* Used during o2hb_check_slot to hold a copy of the block
148 * being checked because we temporarily have to zero out the 148 * being checked because we temporarily have to zero out the
149 * crc field. */ 149 * crc field. */
150 struct o2hb_disk_heartbeat_block *hr_tmp_block; 150 struct o2hb_disk_heartbeat_block *hr_tmp_block;
151 }; 151 };
152 152
153 struct o2hb_bio_wait_ctxt { 153 struct o2hb_bio_wait_ctxt {
154 atomic_t wc_num_reqs; 154 atomic_t wc_num_reqs;
155 struct completion wc_io_complete; 155 struct completion wc_io_complete;
156 int wc_error; 156 int wc_error;
157 }; 157 };
158 158
159 static void o2hb_write_timeout(void *arg) 159 static void o2hb_write_timeout(void *arg)
160 { 160 {
161 struct o2hb_region *reg = arg; 161 struct o2hb_region *reg = arg;
162 162
163 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 163 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
164 "milliseconds\n", reg->hr_dev_name, 164 "milliseconds\n", reg->hr_dev_name,
165 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 165 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
166 o2quo_disk_timeout(); 166 o2quo_disk_timeout();
167 } 167 }
168 168
169 static void o2hb_arm_write_timeout(struct o2hb_region *reg) 169 static void o2hb_arm_write_timeout(struct o2hb_region *reg)
170 { 170 {
171 mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); 171 mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);
172 172
173 cancel_delayed_work(&reg->hr_write_timeout_work); 173 cancel_delayed_work(&reg->hr_write_timeout_work);
174 reg->hr_last_timeout_start = jiffies; 174 reg->hr_last_timeout_start = jiffies;
175 schedule_delayed_work(&reg->hr_write_timeout_work, 175 schedule_delayed_work(&reg->hr_write_timeout_work,
176 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); 176 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
177 } 177 }
178 178
179 static void o2hb_disarm_write_timeout(struct o2hb_region *reg) 179 static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
180 { 180 {
181 cancel_delayed_work(&reg->hr_write_timeout_work); 181 cancel_delayed_work(&reg->hr_write_timeout_work);
182 flush_scheduled_work(); 182 flush_scheduled_work();
183 } 183 }
184 184
185 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc, 185 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,
186 unsigned int num_ios) 186 unsigned int num_ios)
187 { 187 {
188 atomic_set(&wc->wc_num_reqs, num_ios); 188 atomic_set(&wc->wc_num_reqs, num_ios);
189 init_completion(&wc->wc_io_complete); 189 init_completion(&wc->wc_io_complete);
190 wc->wc_error = 0; 190 wc->wc_error = 0;
191 } 191 }
192 192
193 /* Used in error paths too */ 193 /* Used in error paths too */
194 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, 194 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
195 unsigned int num) 195 unsigned int num)
196 { 196 {
197 /* sadly atomic_sub_and_test() isn't available on all platforms. The 197 /* sadly atomic_sub_and_test() isn't available on all platforms. The
198 * good news is that the fast path only completes one at a time */ 198 * good news is that the fast path only completes one at a time */
199 while(num--) { 199 while(num--) {
200 if (atomic_dec_and_test(&wc->wc_num_reqs)) { 200 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
201 BUG_ON(num > 0); 201 BUG_ON(num > 0);
202 complete(&wc->wc_io_complete); 202 complete(&wc->wc_io_complete);
203 } 203 }
204 } 204 }
205 } 205 }
206 206
207 static void o2hb_wait_on_io(struct o2hb_region *reg, 207 static void o2hb_wait_on_io(struct o2hb_region *reg,
208 struct o2hb_bio_wait_ctxt *wc) 208 struct o2hb_bio_wait_ctxt *wc)
209 { 209 {
210 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; 210 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
211 211
212 blk_run_address_space(mapping); 212 blk_run_address_space(mapping);
213 213
214 wait_for_completion(&wc->wc_io_complete); 214 wait_for_completion(&wc->wc_io_complete);
215 } 215 }
216 216
217 static int o2hb_bio_end_io(struct bio *bio, 217 static int o2hb_bio_end_io(struct bio *bio,
218 unsigned int bytes_done, 218 unsigned int bytes_done,
219 int error) 219 int error)
220 { 220 {
221 struct o2hb_bio_wait_ctxt *wc = bio->bi_private; 221 struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
222 222
223 if (error) { 223 if (error) {
224 mlog(ML_ERROR, "IO Error %d\n", error); 224 mlog(ML_ERROR, "IO Error %d\n", error);
225 wc->wc_error = error; 225 wc->wc_error = error;
226 } 226 }
227 227
228 if (bio->bi_size) 228 if (bio->bi_size)
229 return 1; 229 return 1;
230 230
231 o2hb_bio_wait_dec(wc, 1); 231 o2hb_bio_wait_dec(wc, 1);
232 return 0; 232 return 0;
233 } 233 }
234 234
235 /* Setup a Bio to cover I/O against num_slots slots starting at 235 /* Setup a Bio to cover I/O against num_slots slots starting at
236 * start_slot. */ 236 * start_slot. */
237 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 237 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
238 struct o2hb_bio_wait_ctxt *wc, 238 struct o2hb_bio_wait_ctxt *wc,
239 unsigned int start_slot, 239 unsigned int start_slot,
240 unsigned int num_slots) 240 unsigned int num_slots)
241 { 241 {
242 int i, nr_vecs, len, first_page, last_page; 242 int i, nr_vecs, len, first_page, last_page;
243 unsigned int vec_len, vec_start; 243 unsigned int vec_len, vec_start;
244 unsigned int bits = reg->hr_block_bits; 244 unsigned int bits = reg->hr_block_bits;
245 unsigned int spp = reg->hr_slots_per_page; 245 unsigned int spp = reg->hr_slots_per_page;
246 struct bio *bio; 246 struct bio *bio;
247 struct page *page; 247 struct page *page;
248 248
249 nr_vecs = (num_slots + spp - 1) / spp; 249 nr_vecs = (num_slots + spp - 1) / spp;
250 250
251 /* Testing has shown this allocation to take long enough under 251 /* Testing has shown this allocation to take long enough under
252 * GFP_KERNEL that the local node can get fenced. It would be 252 * GFP_KERNEL that the local node can get fenced. It would be
253 * nicest if we could pre-allocate these bios and avoid this 253 * nicest if we could pre-allocate these bios and avoid this
254 * all together. */ 254 * all together. */
255 bio = bio_alloc(GFP_ATOMIC, nr_vecs); 255 bio = bio_alloc(GFP_ATOMIC, nr_vecs);
256 if (!bio) { 256 if (!bio) {
257 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 257 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
258 bio = ERR_PTR(-ENOMEM); 258 bio = ERR_PTR(-ENOMEM);
259 goto bail; 259 goto bail;
260 } 260 }
261 261
262 /* Must put everything in 512 byte sectors for the bio... */ 262 /* Must put everything in 512 byte sectors for the bio... */
263 bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9); 263 bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9);
264 bio->bi_bdev = reg->hr_bdev; 264 bio->bi_bdev = reg->hr_bdev;
265 bio->bi_private = wc; 265 bio->bi_private = wc;
266 bio->bi_end_io = o2hb_bio_end_io; 266 bio->bi_end_io = o2hb_bio_end_io;
267 267
268 first_page = start_slot / spp; 268 first_page = start_slot / spp;
269 last_page = first_page + nr_vecs; 269 last_page = first_page + nr_vecs;
270 vec_start = (start_slot << bits) % PAGE_CACHE_SIZE; 270 vec_start = (start_slot << bits) % PAGE_CACHE_SIZE;
271 for(i = first_page; i < last_page; i++) { 271 for(i = first_page; i < last_page; i++) {
272 page = reg->hr_slot_data[i]; 272 page = reg->hr_slot_data[i];
273 273
274 vec_len = PAGE_CACHE_SIZE; 274 vec_len = PAGE_CACHE_SIZE;
275 /* last page might be short */ 275 /* last page might be short */
276 if (((i + 1) * spp) > (start_slot + num_slots)) 276 if (((i + 1) * spp) > (start_slot + num_slots))
277 vec_len = ((num_slots + start_slot) % spp) << bits; 277 vec_len = ((num_slots + start_slot) % spp) << bits;
278 vec_len -= vec_start; 278 vec_len -= vec_start;
279 279
280 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 280 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
281 i, vec_len, vec_start); 281 i, vec_len, vec_start);
282 282
283 len = bio_add_page(bio, page, vec_len, vec_start); 283 len = bio_add_page(bio, page, vec_len, vec_start);
284 if (len != vec_len) { 284 if (len != vec_len) {
285 bio_put(bio); 285 bio_put(bio);
286 bio = ERR_PTR(-EIO); 286 bio = ERR_PTR(-EIO);
287 287
288 mlog(ML_ERROR, "Error adding page to bio i = %d, " 288 mlog(ML_ERROR, "Error adding page to bio i = %d, "
289 "vec_len = %u, len = %d\n, start = %u\n", 289 "vec_len = %u, len = %d\n, start = %u\n",
290 i, vec_len, len, vec_start); 290 i, vec_len, len, vec_start);
291 goto bail; 291 goto bail;
292 } 292 }
293 293
294 vec_start = 0; 294 vec_start = 0;
295 } 295 }
296 296
297 bail: 297 bail:
298 return bio; 298 return bio;
299 } 299 }
300 300
301 /* 301 /*
302 * Compute the maximum number of sectors the bdev can handle in one bio, 302 * Compute the maximum number of sectors the bdev can handle in one bio,
303 * as a power of two. 303 * as a power of two.
304 * 304 *
305 * Stolen from oracleasm, thanks Joel! 305 * Stolen from oracleasm, thanks Joel!
306 */ 306 */
307 static int compute_max_sectors(struct block_device *bdev) 307 static int compute_max_sectors(struct block_device *bdev)
308 { 308 {
309 int max_pages, max_sectors, pow_two_sectors; 309 int max_pages, max_sectors, pow_two_sectors;
310 310
311 struct request_queue *q; 311 struct request_queue *q;
312 312
313 q = bdev_get_queue(bdev); 313 q = bdev_get_queue(bdev);
314 max_pages = q->max_sectors >> (PAGE_SHIFT - 9); 314 max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
315 if (max_pages > BIO_MAX_PAGES) 315 if (max_pages > BIO_MAX_PAGES)
316 max_pages = BIO_MAX_PAGES; 316 max_pages = BIO_MAX_PAGES;
317 if (max_pages > q->max_phys_segments) 317 if (max_pages > q->max_phys_segments)
318 max_pages = q->max_phys_segments; 318 max_pages = q->max_phys_segments;
319 if (max_pages > q->max_hw_segments) 319 if (max_pages > q->max_hw_segments)
320 max_pages = q->max_hw_segments; 320 max_pages = q->max_hw_segments;
321 max_pages--; /* Handle I/Os that straddle a page */ 321 max_pages--; /* Handle I/Os that straddle a page */
322 322
323 max_sectors = max_pages << (PAGE_SHIFT - 9); 323 max_sectors = max_pages << (PAGE_SHIFT - 9);
324 324
325 /* Why is fls() 1-based???? */ 325 /* Why is fls() 1-based???? */
326 pow_two_sectors = 1 << (fls(max_sectors) - 1); 326 pow_two_sectors = 1 << (fls(max_sectors) - 1);
327 327
328 return pow_two_sectors; 328 return pow_two_sectors;
329 } 329 }
330 330
331 static inline void o2hb_compute_request_limits(struct o2hb_region *reg, 331 static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
332 unsigned int num_slots, 332 unsigned int num_slots,
333 unsigned int *num_bios, 333 unsigned int *num_bios,
334 unsigned int *slots_per_bio) 334 unsigned int *slots_per_bio)
335 { 335 {
336 unsigned int max_sectors, io_sectors; 336 unsigned int max_sectors, io_sectors;
337 337
338 max_sectors = compute_max_sectors(reg->hr_bdev); 338 max_sectors = compute_max_sectors(reg->hr_bdev);
339 339
340 io_sectors = num_slots << (reg->hr_block_bits - 9); 340 io_sectors = num_slots << (reg->hr_block_bits - 9);
341 341
342 *num_bios = (io_sectors + max_sectors - 1) / max_sectors; 342 *num_bios = (io_sectors + max_sectors - 1) / max_sectors;
343 *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9); 343 *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
344 344
345 mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This " 345 mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
346 "device can handle %u sectors of I/O\n", io_sectors, num_slots, 346 "device can handle %u sectors of I/O\n", io_sectors, num_slots,
347 max_sectors); 347 max_sectors);
348 mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n", 348 mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
349 *num_bios, *slots_per_bio); 349 *num_bios, *slots_per_bio);
350 } 350 }
351 351
352 static int o2hb_read_slots(struct o2hb_region *reg, 352 static int o2hb_read_slots(struct o2hb_region *reg,
353 unsigned int max_slots) 353 unsigned int max_slots)
354 { 354 {
355 unsigned int num_bios, slots_per_bio, start_slot, num_slots; 355 unsigned int num_bios, slots_per_bio, start_slot, num_slots;
356 int i, status; 356 int i, status;
357 struct o2hb_bio_wait_ctxt wc; 357 struct o2hb_bio_wait_ctxt wc;
358 struct bio **bios; 358 struct bio **bios;
359 struct bio *bio; 359 struct bio *bio;
360 360
361 o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); 361 o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio);
362 362
363 bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); 363 bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL);
364 if (!bios) { 364 if (!bios) {
365 status = -ENOMEM; 365 status = -ENOMEM;
366 mlog_errno(status); 366 mlog_errno(status);
367 return status; 367 return status;
368 } 368 }
369 369
370 o2hb_bio_wait_init(&wc, num_bios); 370 o2hb_bio_wait_init(&wc, num_bios);
371 371
372 num_slots = slots_per_bio; 372 num_slots = slots_per_bio;
373 for(i = 0; i < num_bios; i++) { 373 for(i = 0; i < num_bios; i++) {
374 start_slot = i * slots_per_bio; 374 start_slot = i * slots_per_bio;
375 375
376 /* adjust num_slots at last bio */ 376 /* adjust num_slots at last bio */
377 if (max_slots < (start_slot + num_slots)) 377 if (max_slots < (start_slot + num_slots))
378 num_slots = max_slots - start_slot; 378 num_slots = max_slots - start_slot;
379 379
380 bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots); 380 bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
381 if (IS_ERR(bio)) { 381 if (IS_ERR(bio)) {
382 o2hb_bio_wait_dec(&wc, num_bios - i); 382 o2hb_bio_wait_dec(&wc, num_bios - i);
383 383
384 status = PTR_ERR(bio); 384 status = PTR_ERR(bio);
385 mlog_errno(status); 385 mlog_errno(status);
386 goto bail_and_wait; 386 goto bail_and_wait;
387 } 387 }
388 bios[i] = bio; 388 bios[i] = bio;
389 389
390 submit_bio(READ, bio); 390 submit_bio(READ, bio);
391 } 391 }
392 392
393 status = 0; 393 status = 0;
394 394
395 bail_and_wait: 395 bail_and_wait:
396 o2hb_wait_on_io(reg, &wc); 396 o2hb_wait_on_io(reg, &wc);
397 if (wc.wc_error && !status) 397 if (wc.wc_error && !status)
398 status = wc.wc_error; 398 status = wc.wc_error;
399 399
400 if (bios) { 400 if (bios) {
401 for(i = 0; i < num_bios; i++) 401 for(i = 0; i < num_bios; i++)
402 if (bios[i]) 402 if (bios[i])
403 bio_put(bios[i]); 403 bio_put(bios[i]);
404 kfree(bios); 404 kfree(bios);
405 } 405 }
406 406
407 return status; 407 return status;
408 } 408 }
409 409
410 static int o2hb_issue_node_write(struct o2hb_region *reg, 410 static int o2hb_issue_node_write(struct o2hb_region *reg,
411 struct bio **write_bio, 411 struct bio **write_bio,
412 struct o2hb_bio_wait_ctxt *write_wc) 412 struct o2hb_bio_wait_ctxt *write_wc)
413 { 413 {
414 int status; 414 int status;
415 unsigned int slot; 415 unsigned int slot;
416 struct bio *bio; 416 struct bio *bio;
417 417
418 o2hb_bio_wait_init(write_wc, 1); 418 o2hb_bio_wait_init(write_wc, 1);
419 419
420 slot = o2nm_this_node(); 420 slot = o2nm_this_node();
421 421
422 bio = o2hb_setup_one_bio(reg, write_wc, slot, 1); 422 bio = o2hb_setup_one_bio(reg, write_wc, slot, 1);
423 if (IS_ERR(bio)) { 423 if (IS_ERR(bio)) {
424 status = PTR_ERR(bio); 424 status = PTR_ERR(bio);
425 mlog_errno(status); 425 mlog_errno(status);
426 goto bail; 426 goto bail;
427 } 427 }
428 428
429 submit_bio(WRITE, bio); 429 submit_bio(WRITE, bio);
430 430
431 *write_bio = bio; 431 *write_bio = bio;
432 status = 0; 432 status = 0;
433 bail: 433 bail:
434 return status; 434 return status;
435 } 435 }
436 436
437 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, 437 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
438 struct o2hb_disk_heartbeat_block *hb_block) 438 struct o2hb_disk_heartbeat_block *hb_block)
439 { 439 {
440 __le32 old_cksum; 440 __le32 old_cksum;
441 u32 ret; 441 u32 ret;
442 442
443 /* We want to compute the block crc with a 0 value in the 443 /* We want to compute the block crc with a 0 value in the
444 * hb_cksum field. Save it off here and replace after the 444 * hb_cksum field. Save it off here and replace after the
445 * crc. */ 445 * crc. */
446 old_cksum = hb_block->hb_cksum; 446 old_cksum = hb_block->hb_cksum;
447 hb_block->hb_cksum = 0; 447 hb_block->hb_cksum = 0;
448 448
449 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); 449 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
450 450
451 hb_block->hb_cksum = old_cksum; 451 hb_block->hb_cksum = old_cksum;
452 452
453 return ret; 453 return ret;
454 } 454 }
455 455
456 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block) 456 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
457 { 457 {
458 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, " 458 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
459 "cksum = 0x%x, generation 0x%llx\n", 459 "cksum = 0x%x, generation 0x%llx\n",
460 (long long)le64_to_cpu(hb_block->hb_seq), 460 (long long)le64_to_cpu(hb_block->hb_seq),
461 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), 461 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
462 (long long)le64_to_cpu(hb_block->hb_generation)); 462 (long long)le64_to_cpu(hb_block->hb_generation));
463 } 463 }
464 464
465 static int o2hb_verify_crc(struct o2hb_region *reg, 465 static int o2hb_verify_crc(struct o2hb_region *reg,
466 struct o2hb_disk_heartbeat_block *hb_block) 466 struct o2hb_disk_heartbeat_block *hb_block)
467 { 467 {
468 u32 read, computed; 468 u32 read, computed;
469 469
470 read = le32_to_cpu(hb_block->hb_cksum); 470 read = le32_to_cpu(hb_block->hb_cksum);
471 computed = o2hb_compute_block_crc_le(reg, hb_block); 471 computed = o2hb_compute_block_crc_le(reg, hb_block);
472 472
473 return read == computed; 473 return read == computed;
474 } 474 }
475 475
476 /* We want to make sure that nobody is heartbeating on top of us -- 476 /* We want to make sure that nobody is heartbeating on top of us --
477 * this will help detect an invalid configuration. */ 477 * this will help detect an invalid configuration. */
478 static int o2hb_check_last_timestamp(struct o2hb_region *reg) 478 static int o2hb_check_last_timestamp(struct o2hb_region *reg)
479 { 479 {
480 int node_num, ret; 480 int node_num, ret;
481 struct o2hb_disk_slot *slot; 481 struct o2hb_disk_slot *slot;
482 struct o2hb_disk_heartbeat_block *hb_block; 482 struct o2hb_disk_heartbeat_block *hb_block;
483 483
484 node_num = o2nm_this_node(); 484 node_num = o2nm_this_node();
485 485
486 ret = 1; 486 ret = 1;
487 slot = &reg->hr_slots[node_num]; 487 slot = &reg->hr_slots[node_num];
488 /* Don't check on our 1st timestamp */ 488 /* Don't check on our 1st timestamp */
489 if (slot->ds_last_time) { 489 if (slot->ds_last_time) {
490 hb_block = slot->ds_raw_block; 490 hb_block = slot->ds_raw_block;
491 491
492 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) 492 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time)
493 ret = 0; 493 ret = 0;
494 } 494 }
495 495
496 return ret; 496 return ret;
497 } 497 }
498 498
499 static inline void o2hb_prepare_block(struct o2hb_region *reg, 499 static inline void o2hb_prepare_block(struct o2hb_region *reg,
500 u64 generation) 500 u64 generation)
501 { 501 {
502 int node_num; 502 int node_num;
503 u64 cputime; 503 u64 cputime;
504 struct o2hb_disk_slot *slot; 504 struct o2hb_disk_slot *slot;
505 struct o2hb_disk_heartbeat_block *hb_block; 505 struct o2hb_disk_heartbeat_block *hb_block;
506 506
507 node_num = o2nm_this_node(); 507 node_num = o2nm_this_node();
508 slot = &reg->hr_slots[node_num]; 508 slot = &reg->hr_slots[node_num];
509 509
510 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; 510 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
511 memset(hb_block, 0, reg->hr_block_bytes); 511 memset(hb_block, 0, reg->hr_block_bytes);
512 /* TODO: time stuff */ 512 /* TODO: time stuff */
513 cputime = CURRENT_TIME.tv_sec; 513 cputime = CURRENT_TIME.tv_sec;
514 if (!cputime) 514 if (!cputime)
515 cputime = 1; 515 cputime = 1;
516 516
517 hb_block->hb_seq = cpu_to_le64(cputime); 517 hb_block->hb_seq = cpu_to_le64(cputime);
518 hb_block->hb_node = node_num; 518 hb_block->hb_node = node_num;
519 hb_block->hb_generation = cpu_to_le64(generation); 519 hb_block->hb_generation = cpu_to_le64(generation);
520 hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
520 521
521 /* This step must always happen last! */ 522 /* This step must always happen last! */
522 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, 523 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
523 hb_block)); 524 hb_block));
524 525
525 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n", 526 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
526 (long long)cpu_to_le64(generation), 527 (long long)cpu_to_le64(generation),
527 le32_to_cpu(hb_block->hb_cksum)); 528 le32_to_cpu(hb_block->hb_cksum));
528 } 529 }
529 530
530 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, 531 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
531 struct o2nm_node *node, 532 struct o2nm_node *node,
532 int idx) 533 int idx)
533 { 534 {
534 struct list_head *iter; 535 struct list_head *iter;
535 struct o2hb_callback_func *f; 536 struct o2hb_callback_func *f;
536 537
537 list_for_each(iter, &hbcall->list) { 538 list_for_each(iter, &hbcall->list) {
538 f = list_entry(iter, struct o2hb_callback_func, hc_item); 539 f = list_entry(iter, struct o2hb_callback_func, hc_item);
539 mlog(ML_HEARTBEAT, "calling funcs %p\n", f); 540 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
540 (f->hc_func)(node, idx, f->hc_data); 541 (f->hc_func)(node, idx, f->hc_data);
541 } 542 }
542 } 543 }
543 544
544 /* Will run the list in order until we process the passed event */ 545 /* Will run the list in order until we process the passed event */
545 static void o2hb_run_event_list(struct o2hb_node_event *queued_event) 546 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
546 { 547 {
547 int empty; 548 int empty;
548 struct o2hb_callback *hbcall; 549 struct o2hb_callback *hbcall;
549 struct o2hb_node_event *event; 550 struct o2hb_node_event *event;
550 551
551 spin_lock(&o2hb_live_lock); 552 spin_lock(&o2hb_live_lock);
552 empty = list_empty(&queued_event->hn_item); 553 empty = list_empty(&queued_event->hn_item);
553 spin_unlock(&o2hb_live_lock); 554 spin_unlock(&o2hb_live_lock);
554 if (empty) 555 if (empty)
555 return; 556 return;
556 557
557 /* Holding callback sem assures we don't alter the callback 558 /* Holding callback sem assures we don't alter the callback
558 * lists when doing this, and serializes ourselves with other 559 * lists when doing this, and serializes ourselves with other
559 * processes wanting callbacks. */ 560 * processes wanting callbacks. */
560 down_write(&o2hb_callback_sem); 561 down_write(&o2hb_callback_sem);
561 562
562 spin_lock(&o2hb_live_lock); 563 spin_lock(&o2hb_live_lock);
563 while (!list_empty(&o2hb_node_events) 564 while (!list_empty(&o2hb_node_events)
564 && !list_empty(&queued_event->hn_item)) { 565 && !list_empty(&queued_event->hn_item)) {
565 event = list_entry(o2hb_node_events.next, 566 event = list_entry(o2hb_node_events.next,
566 struct o2hb_node_event, 567 struct o2hb_node_event,
567 hn_item); 568 hn_item);
568 list_del_init(&event->hn_item); 569 list_del_init(&event->hn_item);
569 spin_unlock(&o2hb_live_lock); 570 spin_unlock(&o2hb_live_lock);
570 571
571 mlog(ML_HEARTBEAT, "Node %s event for %d\n", 572 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
572 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", 573 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
573 event->hn_node_num); 574 event->hn_node_num);
574 575
575 hbcall = hbcall_from_type(event->hn_event_type); 576 hbcall = hbcall_from_type(event->hn_event_type);
576 577
577 /* We should *never* have gotten on to the list with a 578 /* We should *never* have gotten on to the list with a
578 * bad type... This isn't something that we should try 579 * bad type... This isn't something that we should try
579 * to recover from. */ 580 * to recover from. */
580 BUG_ON(IS_ERR(hbcall)); 581 BUG_ON(IS_ERR(hbcall));
581 582
582 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); 583 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
583 584
584 spin_lock(&o2hb_live_lock); 585 spin_lock(&o2hb_live_lock);
585 } 586 }
586 spin_unlock(&o2hb_live_lock); 587 spin_unlock(&o2hb_live_lock);
587 588
588 up_write(&o2hb_callback_sem); 589 up_write(&o2hb_callback_sem);
589 } 590 }
590 591
591 static void o2hb_queue_node_event(struct o2hb_node_event *event, 592 static void o2hb_queue_node_event(struct o2hb_node_event *event,
592 enum o2hb_callback_type type, 593 enum o2hb_callback_type type,
593 struct o2nm_node *node, 594 struct o2nm_node *node,
594 int node_num) 595 int node_num)
595 { 596 {
596 assert_spin_locked(&o2hb_live_lock); 597 assert_spin_locked(&o2hb_live_lock);
597 598
598 event->hn_event_type = type; 599 event->hn_event_type = type;
599 event->hn_node = node; 600 event->hn_node = node;
600 event->hn_node_num = node_num; 601 event->hn_node_num = node_num;
601 602
602 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", 603 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
603 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); 604 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
604 605
605 list_add_tail(&event->hn_item, &o2hb_node_events); 606 list_add_tail(&event->hn_item, &o2hb_node_events);
606 } 607 }
607 608
608 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) 609 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
609 { 610 {
610 struct o2hb_node_event event = 611 struct o2hb_node_event event =
611 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 612 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
612 struct o2nm_node *node; 613 struct o2nm_node *node;
613 614
614 node = o2nm_get_node_by_num(slot->ds_node_num); 615 node = o2nm_get_node_by_num(slot->ds_node_num);
615 if (!node) 616 if (!node)
616 return; 617 return;
617 618
618 spin_lock(&o2hb_live_lock); 619 spin_lock(&o2hb_live_lock);
619 if (!list_empty(&slot->ds_live_item)) { 620 if (!list_empty(&slot->ds_live_item)) {
620 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", 621 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
621 slot->ds_node_num); 622 slot->ds_node_num);
622 623
623 list_del_init(&slot->ds_live_item); 624 list_del_init(&slot->ds_live_item);
624 625
625 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 626 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
626 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 627 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
627 628
628 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 629 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
629 slot->ds_node_num); 630 slot->ds_node_num);
630 } 631 }
631 } 632 }
632 spin_unlock(&o2hb_live_lock); 633 spin_unlock(&o2hb_live_lock);
633 634
634 o2hb_run_event_list(&event); 635 o2hb_run_event_list(&event);
635 636
636 o2nm_node_put(node); 637 o2nm_node_put(node);
637 } 638 }
638 639
639 static int o2hb_check_slot(struct o2hb_region *reg, 640 static int o2hb_check_slot(struct o2hb_region *reg,
640 struct o2hb_disk_slot *slot) 641 struct o2hb_disk_slot *slot)
641 { 642 {
642 int changed = 0, gen_changed = 0; 643 int changed = 0, gen_changed = 0;
643 struct o2hb_node_event event = 644 struct o2hb_node_event event =
644 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 645 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
645 struct o2nm_node *node; 646 struct o2nm_node *node;
646 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; 647 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
647 u64 cputime; 648 u64 cputime;
649 unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
650 unsigned int slot_dead_ms;
648 651
649 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); 652 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
650 653
651 /* Is this correct? Do we assume that the node doesn't exist 654 /* Is this correct? Do we assume that the node doesn't exist
652 * if we're not configured for him? */ 655 * if we're not configured for him? */
653 node = o2nm_get_node_by_num(slot->ds_node_num); 656 node = o2nm_get_node_by_num(slot->ds_node_num);
654 if (!node) 657 if (!node)
655 return 0; 658 return 0;
656 659
657 if (!o2hb_verify_crc(reg, hb_block)) { 660 if (!o2hb_verify_crc(reg, hb_block)) {
658 /* all paths from here will drop o2hb_live_lock for 661 /* all paths from here will drop o2hb_live_lock for
659 * us. */ 662 * us. */
660 spin_lock(&o2hb_live_lock); 663 spin_lock(&o2hb_live_lock);
661 664
662 /* Don't print an error on the console in this case - 665 /* Don't print an error on the console in this case -
663 * a freshly formatted heartbeat area will not have a 666 * a freshly formatted heartbeat area will not have a
664 * crc set on it. */ 667 * crc set on it. */
665 if (list_empty(&slot->ds_live_item)) 668 if (list_empty(&slot->ds_live_item))
666 goto out; 669 goto out;
667 670
668 /* The node is live but pushed out a bad crc. We 671 /* The node is live but pushed out a bad crc. We
669 * consider it a transient miss but don't populate any 672 * consider it a transient miss but don't populate any
670 * other values as they may be junk. */ 673 * other values as they may be junk. */
671 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", 674 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
672 slot->ds_node_num, reg->hr_dev_name); 675 slot->ds_node_num, reg->hr_dev_name);
673 o2hb_dump_slot(hb_block); 676 o2hb_dump_slot(hb_block);
674 677
675 slot->ds_equal_samples++; 678 slot->ds_equal_samples++;
676 goto fire_callbacks; 679 goto fire_callbacks;
677 } 680 }
678 681
679 /* we don't care if these wrap.. the state transitions below 682 /* we don't care if these wrap.. the state transitions below
680 * clear at the right places */ 683 * clear at the right places */
681 cputime = le64_to_cpu(hb_block->hb_seq); 684 cputime = le64_to_cpu(hb_block->hb_seq);
682 if (slot->ds_last_time != cputime) 685 if (slot->ds_last_time != cputime)
683 slot->ds_changed_samples++; 686 slot->ds_changed_samples++;
684 else 687 else
685 slot->ds_equal_samples++; 688 slot->ds_equal_samples++;
686 slot->ds_last_time = cputime; 689 slot->ds_last_time = cputime;
687 690
688 /* The node changed heartbeat generations. We assume this to 691 /* The node changed heartbeat generations. We assume this to
689 * mean it dropped off but came back before we timed out. We 692 * mean it dropped off but came back before we timed out. We
690 * want to consider it down for the time being but don't want 693 * want to consider it down for the time being but don't want
691 * to lose any changed_samples state we might build up to 694 * to lose any changed_samples state we might build up to
692 * considering it live again. */ 695 * considering it live again. */
693 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { 696 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
694 gen_changed = 1; 697 gen_changed = 1;
695 slot->ds_equal_samples = 0; 698 slot->ds_equal_samples = 0;
696 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx " 699 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
697 "to 0x%llx)\n", slot->ds_node_num, 700 "to 0x%llx)\n", slot->ds_node_num,
698 (long long)slot->ds_last_generation, 701 (long long)slot->ds_last_generation,
699 (long long)le64_to_cpu(hb_block->hb_generation)); 702 (long long)le64_to_cpu(hb_block->hb_generation));
700 } 703 }
701 704
702 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 705 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
703 706
704 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x " 707 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
705 "seq %llu last %llu changed %u equal %u\n", 708 "seq %llu last %llu changed %u equal %u\n",
706 slot->ds_node_num, (long long)slot->ds_last_generation, 709 slot->ds_node_num, (long long)slot->ds_last_generation,
707 le32_to_cpu(hb_block->hb_cksum), 710 le32_to_cpu(hb_block->hb_cksum),
708 (unsigned long long)le64_to_cpu(hb_block->hb_seq), 711 (unsigned long long)le64_to_cpu(hb_block->hb_seq),
709 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples, 712 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
710 slot->ds_equal_samples); 713 slot->ds_equal_samples);
711 714
712 spin_lock(&o2hb_live_lock); 715 spin_lock(&o2hb_live_lock);
713 716
714 fire_callbacks: 717 fire_callbacks:
715 /* dead nodes only come to life after some number of 718 /* dead nodes only come to life after some number of
716 * changes at any time during their dead time */ 719 * changes at any time during their dead time */
717 if (list_empty(&slot->ds_live_item) && 720 if (list_empty(&slot->ds_live_item) &&
718 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { 721 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
719 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n", 722 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
720 slot->ds_node_num, (long long)slot->ds_last_generation); 723 slot->ds_node_num, (long long)slot->ds_last_generation);
721 724
722 /* first on the list generates a callback */ 725 /* first on the list generates a callback */
723 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 726 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
724 set_bit(slot->ds_node_num, o2hb_live_node_bitmap); 727 set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
725 728
726 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, 729 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
727 slot->ds_node_num); 730 slot->ds_node_num);
728 731
729 changed = 1; 732 changed = 1;
730 } 733 }
731 734
732 list_add_tail(&slot->ds_live_item, 735 list_add_tail(&slot->ds_live_item,
733 &o2hb_live_slots[slot->ds_node_num]); 736 &o2hb_live_slots[slot->ds_node_num]);
734 737
735 slot->ds_equal_samples = 0; 738 slot->ds_equal_samples = 0;
739
740 /* We want to be sure that all nodes agree on the
741 * number of milliseconds before a node will be
742 * considered dead. The self-fencing timeout is
743 * computed from this value, and a discrepancy might
744 * result in heartbeat calling a node dead when it
745 * hasn't self-fenced yet. */
746 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
747 if (slot_dead_ms && slot_dead_ms != dead_ms) {
748 /* TODO: Perhaps we can fail the region here. */
749 mlog(ML_ERROR, "Node %d on device %s has a dead count "
750 "of %u ms, but our count is %u ms.\n"
751 "Please double check your configuration values "
752 "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
753 slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
754 dead_ms);
755 }
736 goto out; 756 goto out;
737 } 757 }
738 758
739 /* if the list is dead, we're done.. */ 759 /* if the list is dead, we're done.. */
740 if (list_empty(&slot->ds_live_item)) 760 if (list_empty(&slot->ds_live_item))
741 goto out; 761 goto out;
742 762
743 /* live nodes only go dead after enough consequtive missed 763 /* live nodes only go dead after enough consequtive missed
744 * samples.. reset the missed counter whenever we see 764 * samples.. reset the missed counter whenever we see
745 * activity */ 765 * activity */
746 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { 766 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
747 mlog(ML_HEARTBEAT, "Node %d left my region\n", 767 mlog(ML_HEARTBEAT, "Node %d left my region\n",
748 slot->ds_node_num); 768 slot->ds_node_num);
749 769
750 /* last off the live_slot generates a callback */ 770 /* last off the live_slot generates a callback */
751 list_del_init(&slot->ds_live_item); 771 list_del_init(&slot->ds_live_item);
752 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 772 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
753 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 773 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
754 774
755 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 775 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
756 slot->ds_node_num); 776 slot->ds_node_num);
757 777
758 changed = 1; 778 changed = 1;
759 } 779 }
760 780
761 /* We don't clear this because the node is still 781 /* We don't clear this because the node is still
762 * actually writing new blocks. */ 782 * actually writing new blocks. */
763 if (!gen_changed) 783 if (!gen_changed)
764 slot->ds_changed_samples = 0; 784 slot->ds_changed_samples = 0;
765 goto out; 785 goto out;
766 } 786 }
767 if (slot->ds_changed_samples) { 787 if (slot->ds_changed_samples) {
768 slot->ds_changed_samples = 0; 788 slot->ds_changed_samples = 0;
769 slot->ds_equal_samples = 0; 789 slot->ds_equal_samples = 0;
770 } 790 }
771 out: 791 out:
772 spin_unlock(&o2hb_live_lock); 792 spin_unlock(&o2hb_live_lock);
773 793
774 o2hb_run_event_list(&event); 794 o2hb_run_event_list(&event);
775 795
776 o2nm_node_put(node); 796 o2nm_node_put(node);
777 return changed; 797 return changed;
778 } 798 }
779 799
780 /* This could be faster if we just implmented a find_last_bit, but I 800 /* This could be faster if we just implmented a find_last_bit, but I
781 * don't think the circumstances warrant it. */ 801 * don't think the circumstances warrant it. */
782 static int o2hb_highest_node(unsigned long *nodes, 802 static int o2hb_highest_node(unsigned long *nodes,
783 int numbits) 803 int numbits)
784 { 804 {
785 int highest, node; 805 int highest, node;
786 806
787 highest = numbits; 807 highest = numbits;
788 node = -1; 808 node = -1;
789 while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { 809 while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
790 if (node >= numbits) 810 if (node >= numbits)
791 break; 811 break;
792 812
793 highest = node; 813 highest = node;
794 } 814 }
795 815
796 return highest; 816 return highest;
797 } 817 }
798 818
799 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) 819 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
800 { 820 {
801 int i, ret, highest_node, change = 0; 821 int i, ret, highest_node, change = 0;
802 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 822 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
803 struct bio *write_bio; 823 struct bio *write_bio;
804 struct o2hb_bio_wait_ctxt write_wc; 824 struct o2hb_bio_wait_ctxt write_wc;
805 825
806 ret = o2nm_configured_node_map(configured_nodes, 826 ret = o2nm_configured_node_map(configured_nodes,
807 sizeof(configured_nodes)); 827 sizeof(configured_nodes));
808 if (ret) { 828 if (ret) {
809 mlog_errno(ret); 829 mlog_errno(ret);
810 return ret; 830 return ret;
811 } 831 }
812 832
813 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); 833 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
814 if (highest_node >= O2NM_MAX_NODES) { 834 if (highest_node >= O2NM_MAX_NODES) {
815 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); 835 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
816 return -EINVAL; 836 return -EINVAL;
817 } 837 }
818 838
819 /* No sense in reading the slots of nodes that don't exist 839 /* No sense in reading the slots of nodes that don't exist
820 * yet. Of course, if the node definitions have holes in them 840 * yet. Of course, if the node definitions have holes in them
821 * then we're reading an empty slot anyway... Consider this 841 * then we're reading an empty slot anyway... Consider this
822 * best-effort. */ 842 * best-effort. */
823 ret = o2hb_read_slots(reg, highest_node + 1); 843 ret = o2hb_read_slots(reg, highest_node + 1);
824 if (ret < 0) { 844 if (ret < 0) {
825 mlog_errno(ret); 845 mlog_errno(ret);
826 return ret; 846 return ret;
827 } 847 }
828 848
829 /* With an up to date view of the slots, we can check that no 849 /* With an up to date view of the slots, we can check that no
830 * other node has been improperly configured to heartbeat in 850 * other node has been improperly configured to heartbeat in
831 * our slot. */ 851 * our slot. */
832 if (!o2hb_check_last_timestamp(reg)) 852 if (!o2hb_check_last_timestamp(reg))
833 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " 853 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating "
834 "in our slot!\n", reg->hr_dev_name); 854 "in our slot!\n", reg->hr_dev_name);
835 855
836 /* fill in the proper info for our next heartbeat */ 856 /* fill in the proper info for our next heartbeat */
837 o2hb_prepare_block(reg, reg->hr_generation); 857 o2hb_prepare_block(reg, reg->hr_generation);
838 858
839 /* And fire off the write. Note that we don't wait on this I/O 859 /* And fire off the write. Note that we don't wait on this I/O
840 * until later. */ 860 * until later. */
841 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 861 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
842 if (ret < 0) { 862 if (ret < 0) {
843 mlog_errno(ret); 863 mlog_errno(ret);
844 return ret; 864 return ret;
845 } 865 }
846 866
847 i = -1; 867 i = -1;
848 while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 868 while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
849 869
850 change |= o2hb_check_slot(reg, &reg->hr_slots[i]); 870 change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
851 } 871 }
852 872
853 /* 873 /*
854 * We have to be sure we've advertised ourselves on disk 874 * We have to be sure we've advertised ourselves on disk
855 * before we can go to steady state. This ensures that 875 * before we can go to steady state. This ensures that
856 * people we find in our steady state have seen us. 876 * people we find in our steady state have seen us.
857 */ 877 */
858 o2hb_wait_on_io(reg, &write_wc); 878 o2hb_wait_on_io(reg, &write_wc);
859 bio_put(write_bio); 879 bio_put(write_bio);
860 if (write_wc.wc_error) { 880 if (write_wc.wc_error) {
861 /* Do not re-arm the write timeout on I/O error - we 881 /* Do not re-arm the write timeout on I/O error - we
862 * can't be sure that the new block ever made it to 882 * can't be sure that the new block ever made it to
863 * disk */ 883 * disk */
864 mlog(ML_ERROR, "Write error %d on device \"%s\"\n", 884 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
865 write_wc.wc_error, reg->hr_dev_name); 885 write_wc.wc_error, reg->hr_dev_name);
866 return write_wc.wc_error; 886 return write_wc.wc_error;
867 } 887 }
868 888
869 o2hb_arm_write_timeout(reg); 889 o2hb_arm_write_timeout(reg);
870 890
871 /* let the person who launched us know when things are steady */ 891 /* let the person who launched us know when things are steady */
872 if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) { 892 if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
873 if (atomic_dec_and_test(&reg->hr_steady_iterations)) 893 if (atomic_dec_and_test(&reg->hr_steady_iterations))
874 wake_up(&o2hb_steady_queue); 894 wake_up(&o2hb_steady_queue);
875 } 895 }
876 896
877 return 0; 897 return 0;
878 } 898 }
879 899
880 /* Subtract b from a, storing the result in a. a *must* have a larger 900 /* Subtract b from a, storing the result in a. a *must* have a larger
881 * value than b. */ 901 * value than b. */
882 static void o2hb_tv_subtract(struct timeval *a, 902 static void o2hb_tv_subtract(struct timeval *a,
883 struct timeval *b) 903 struct timeval *b)
884 { 904 {
885 /* just return 0 when a is after b */ 905 /* just return 0 when a is after b */
886 if (a->tv_sec < b->tv_sec || 906 if (a->tv_sec < b->tv_sec ||
887 (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { 907 (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
888 a->tv_sec = 0; 908 a->tv_sec = 0;
889 a->tv_usec = 0; 909 a->tv_usec = 0;
890 return; 910 return;
891 } 911 }
892 912
893 a->tv_sec -= b->tv_sec; 913 a->tv_sec -= b->tv_sec;
894 a->tv_usec -= b->tv_usec; 914 a->tv_usec -= b->tv_usec;
895 while ( a->tv_usec < 0 ) { 915 while ( a->tv_usec < 0 ) {
896 a->tv_sec--; 916 a->tv_sec--;
897 a->tv_usec += 1000000; 917 a->tv_usec += 1000000;
898 } 918 }
899 } 919 }
900 920
901 static unsigned int o2hb_elapsed_msecs(struct timeval *start, 921 static unsigned int o2hb_elapsed_msecs(struct timeval *start,
902 struct timeval *end) 922 struct timeval *end)
903 { 923 {
904 struct timeval res = *end; 924 struct timeval res = *end;
905 925
906 o2hb_tv_subtract(&res, start); 926 o2hb_tv_subtract(&res, start);
907 927
908 return res.tv_sec * 1000 + res.tv_usec / 1000; 928 return res.tv_sec * 1000 + res.tv_usec / 1000;
909 } 929 }
910 930
911 /* 931 /*
912 * we ride the region ref that the region dir holds. before the region 932 * we ride the region ref that the region dir holds. before the region
913 * dir is removed and drops it ref it will wait to tear down this 933 * dir is removed and drops it ref it will wait to tear down this
914 * thread. 934 * thread.
915 */ 935 */
916 static int o2hb_thread(void *data) 936 static int o2hb_thread(void *data)
917 { 937 {
918 int i, ret; 938 int i, ret;
919 struct o2hb_region *reg = data; 939 struct o2hb_region *reg = data;
920 struct bio *write_bio; 940 struct bio *write_bio;
921 struct o2hb_bio_wait_ctxt write_wc; 941 struct o2hb_bio_wait_ctxt write_wc;
922 struct timeval before_hb, after_hb; 942 struct timeval before_hb, after_hb;
923 unsigned int elapsed_msec; 943 unsigned int elapsed_msec;
924 944
925 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); 945 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
926 946
927 set_user_nice(current, -20); 947 set_user_nice(current, -20);
928 948
929 while (!kthread_should_stop() && !reg->hr_unclean_stop) { 949 while (!kthread_should_stop() && !reg->hr_unclean_stop) {
930 /* We track the time spent inside 950 /* We track the time spent inside
931 * o2hb_do_disk_heartbeat so that we avoid more then 951 * o2hb_do_disk_heartbeat so that we avoid more then
932 * hr_timeout_ms between disk writes. On busy systems 952 * hr_timeout_ms between disk writes. On busy systems
933 * this should result in a heartbeat which is less 953 * this should result in a heartbeat which is less
934 * likely to time itself out. */ 954 * likely to time itself out. */
935 do_gettimeofday(&before_hb); 955 do_gettimeofday(&before_hb);
936 956
937 i = 0; 957 i = 0;
938 do { 958 do {
939 ret = o2hb_do_disk_heartbeat(reg); 959 ret = o2hb_do_disk_heartbeat(reg);
940 } while (ret && ++i < 2); 960 } while (ret && ++i < 2);
941 961
942 do_gettimeofday(&after_hb); 962 do_gettimeofday(&after_hb);
943 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); 963 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
944 964
945 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", 965 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
946 before_hb.tv_sec, (unsigned long) before_hb.tv_usec, 966 before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
947 after_hb.tv_sec, (unsigned long) after_hb.tv_usec, 967 after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
948 elapsed_msec); 968 elapsed_msec);
949 969
950 if (elapsed_msec < reg->hr_timeout_ms) { 970 if (elapsed_msec < reg->hr_timeout_ms) {
951 /* the kthread api has blocked signals for us so no 971 /* the kthread api has blocked signals for us so no
952 * need to record the return value. */ 972 * need to record the return value. */
953 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); 973 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
954 } 974 }
955 } 975 }
956 976
957 o2hb_disarm_write_timeout(reg); 977 o2hb_disarm_write_timeout(reg);
958 978
959 /* unclean stop is only used in very bad situation */ 979 /* unclean stop is only used in very bad situation */
960 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) 980 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
961 o2hb_shutdown_slot(&reg->hr_slots[i]); 981 o2hb_shutdown_slot(&reg->hr_slots[i]);
962 982
963 /* Explicit down notification - avoid forcing the other nodes 983 /* Explicit down notification - avoid forcing the other nodes
964 * to timeout on this region when we could just as easily 984 * to timeout on this region when we could just as easily
965 * write a clear generation - thus indicating to them that 985 * write a clear generation - thus indicating to them that
966 * this node has left this region. 986 * this node has left this region.
967 * 987 *
968 * XXX: Should we skip this on unclean_stop? */ 988 * XXX: Should we skip this on unclean_stop? */
969 o2hb_prepare_block(reg, 0); 989 o2hb_prepare_block(reg, 0);
970 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 990 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
971 if (ret == 0) { 991 if (ret == 0) {
972 o2hb_wait_on_io(reg, &write_wc); 992 o2hb_wait_on_io(reg, &write_wc);
973 bio_put(write_bio); 993 bio_put(write_bio);
974 } else { 994 } else {
975 mlog_errno(ret); 995 mlog_errno(ret);
976 } 996 }
977 997
978 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); 998 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
979 999
980 return 0; 1000 return 0;
981 } 1001 }
982 1002
983 void o2hb_init(void) 1003 void o2hb_init(void)
984 { 1004 {
985 int i; 1005 int i;
986 1006
987 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) 1007 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
988 INIT_LIST_HEAD(&o2hb_callbacks[i].list); 1008 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
989 1009
990 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) 1010 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
991 INIT_LIST_HEAD(&o2hb_live_slots[i]); 1011 INIT_LIST_HEAD(&o2hb_live_slots[i]);
992 1012
993 INIT_LIST_HEAD(&o2hb_node_events); 1013 INIT_LIST_HEAD(&o2hb_node_events);
994 1014
995 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); 1015 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
996 } 1016 }
997 1017
998 /* if we're already in a callback then we're already serialized by the sem */ 1018 /* if we're already in a callback then we're already serialized by the sem */
999 static void o2hb_fill_node_map_from_callback(unsigned long *map, 1019 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1000 unsigned bytes) 1020 unsigned bytes)
1001 { 1021 {
1002 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); 1022 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1003 1023
1004 memcpy(map, &o2hb_live_node_bitmap, bytes); 1024 memcpy(map, &o2hb_live_node_bitmap, bytes);
1005 } 1025 }
1006 1026
1007 /* 1027 /*
1008 * get a map of all nodes that are heartbeating in any regions 1028 * get a map of all nodes that are heartbeating in any regions
1009 */ 1029 */
1010 void o2hb_fill_node_map(unsigned long *map, unsigned bytes) 1030 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1011 { 1031 {
1012 /* callers want to serialize this map and callbacks so that they 1032 /* callers want to serialize this map and callbacks so that they
1013 * can trust that they don't miss nodes coming to the party */ 1033 * can trust that they don't miss nodes coming to the party */
1014 down_read(&o2hb_callback_sem); 1034 down_read(&o2hb_callback_sem);
1015 spin_lock(&o2hb_live_lock); 1035 spin_lock(&o2hb_live_lock);
1016 o2hb_fill_node_map_from_callback(map, bytes); 1036 o2hb_fill_node_map_from_callback(map, bytes);
1017 spin_unlock(&o2hb_live_lock); 1037 spin_unlock(&o2hb_live_lock);
1018 up_read(&o2hb_callback_sem); 1038 up_read(&o2hb_callback_sem);
1019 } 1039 }
1020 EXPORT_SYMBOL_GPL(o2hb_fill_node_map); 1040 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1021 1041
1022 /* 1042 /*
1023 * heartbeat configfs bits. The heartbeat set is a default set under 1043 * heartbeat configfs bits. The heartbeat set is a default set under
1024 * the cluster set in nodemanager.c. 1044 * the cluster set in nodemanager.c.
1025 */ 1045 */
1026 1046
1027 static struct o2hb_region *to_o2hb_region(struct config_item *item) 1047 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1028 { 1048 {
1029 return item ? container_of(item, struct o2hb_region, hr_item) : NULL; 1049 return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1030 } 1050 }
1031 1051
1032 /* drop_item only drops its ref after killing the thread, nothing should 1052 /* drop_item only drops its ref after killing the thread, nothing should
1033 * be using the region anymore. this has to clean up any state that 1053 * be using the region anymore. this has to clean up any state that
1034 * attributes might have built up. */ 1054 * attributes might have built up. */
1035 static void o2hb_region_release(struct config_item *item) 1055 static void o2hb_region_release(struct config_item *item)
1036 { 1056 {
1037 int i; 1057 int i;
1038 struct page *page; 1058 struct page *page;
1039 struct o2hb_region *reg = to_o2hb_region(item); 1059 struct o2hb_region *reg = to_o2hb_region(item);
1040 1060
1041 if (reg->hr_tmp_block) 1061 if (reg->hr_tmp_block)
1042 kfree(reg->hr_tmp_block); 1062 kfree(reg->hr_tmp_block);
1043 1063
1044 if (reg->hr_slot_data) { 1064 if (reg->hr_slot_data) {
1045 for (i = 0; i < reg->hr_num_pages; i++) { 1065 for (i = 0; i < reg->hr_num_pages; i++) {
1046 page = reg->hr_slot_data[i]; 1066 page = reg->hr_slot_data[i];
1047 if (page) 1067 if (page)
1048 __free_page(page); 1068 __free_page(page);
1049 } 1069 }
1050 kfree(reg->hr_slot_data); 1070 kfree(reg->hr_slot_data);
1051 } 1071 }
1052 1072
1053 if (reg->hr_bdev) 1073 if (reg->hr_bdev)
1054 blkdev_put(reg->hr_bdev); 1074 blkdev_put(reg->hr_bdev);
1055 1075
1056 if (reg->hr_slots) 1076 if (reg->hr_slots)
1057 kfree(reg->hr_slots); 1077 kfree(reg->hr_slots);
1058 1078
1059 spin_lock(&o2hb_live_lock); 1079 spin_lock(&o2hb_live_lock);
1060 list_del(&reg->hr_all_item); 1080 list_del(&reg->hr_all_item);
1061 spin_unlock(&o2hb_live_lock); 1081 spin_unlock(&o2hb_live_lock);
1062 1082
1063 kfree(reg); 1083 kfree(reg);
1064 } 1084 }
1065 1085
1066 static int o2hb_read_block_input(struct o2hb_region *reg, 1086 static int o2hb_read_block_input(struct o2hb_region *reg,
1067 const char *page, 1087 const char *page,
1068 size_t count, 1088 size_t count,
1069 unsigned long *ret_bytes, 1089 unsigned long *ret_bytes,
1070 unsigned int *ret_bits) 1090 unsigned int *ret_bits)
1071 { 1091 {
1072 unsigned long bytes; 1092 unsigned long bytes;
1073 char *p = (char *)page; 1093 char *p = (char *)page;
1074 1094
1075 bytes = simple_strtoul(p, &p, 0); 1095 bytes = simple_strtoul(p, &p, 0);
1076 if (!p || (*p && (*p != '\n'))) 1096 if (!p || (*p && (*p != '\n')))
1077 return -EINVAL; 1097 return -EINVAL;
1078 1098
1079 /* Heartbeat and fs min / max block sizes are the same. */ 1099 /* Heartbeat and fs min / max block sizes are the same. */
1080 if (bytes > 4096 || bytes < 512) 1100 if (bytes > 4096 || bytes < 512)
1081 return -ERANGE; 1101 return -ERANGE;
1082 if (hweight16(bytes) != 1) 1102 if (hweight16(bytes) != 1)
1083 return -EINVAL; 1103 return -EINVAL;
1084 1104
1085 if (ret_bytes) 1105 if (ret_bytes)
1086 *ret_bytes = bytes; 1106 *ret_bytes = bytes;
1087 if (ret_bits) 1107 if (ret_bits)
1088 *ret_bits = ffs(bytes) - 1; 1108 *ret_bits = ffs(bytes) - 1;
1089 1109
1090 return 0; 1110 return 0;
1091 } 1111 }
1092 1112
1093 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg, 1113 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1094 char *page) 1114 char *page)
1095 { 1115 {
1096 return sprintf(page, "%u\n", reg->hr_block_bytes); 1116 return sprintf(page, "%u\n", reg->hr_block_bytes);
1097 } 1117 }
1098 1118
1099 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, 1119 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1100 const char *page, 1120 const char *page,
1101 size_t count) 1121 size_t count)
1102 { 1122 {
1103 int status; 1123 int status;
1104 unsigned long block_bytes; 1124 unsigned long block_bytes;
1105 unsigned int block_bits; 1125 unsigned int block_bits;
1106 1126
1107 if (reg->hr_bdev) 1127 if (reg->hr_bdev)
1108 return -EINVAL; 1128 return -EINVAL;
1109 1129
1110 status = o2hb_read_block_input(reg, page, count, 1130 status = o2hb_read_block_input(reg, page, count,
1111 &block_bytes, &block_bits); 1131 &block_bytes, &block_bits);
1112 if (status) 1132 if (status)
1113 return status; 1133 return status;
1114 1134
1115 reg->hr_block_bytes = (unsigned int)block_bytes; 1135 reg->hr_block_bytes = (unsigned int)block_bytes;
1116 reg->hr_block_bits = block_bits; 1136 reg->hr_block_bits = block_bits;
1117 1137
1118 return count; 1138 return count;
1119 } 1139 }
1120 1140
1121 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg, 1141 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1122 char *page) 1142 char *page)
1123 { 1143 {
1124 return sprintf(page, "%llu\n", reg->hr_start_block); 1144 return sprintf(page, "%llu\n", reg->hr_start_block);
1125 } 1145 }
1126 1146
1127 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, 1147 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1128 const char *page, 1148 const char *page,
1129 size_t count) 1149 size_t count)
1130 { 1150 {
1131 unsigned long long tmp; 1151 unsigned long long tmp;
1132 char *p = (char *)page; 1152 char *p = (char *)page;
1133 1153
1134 if (reg->hr_bdev) 1154 if (reg->hr_bdev)
1135 return -EINVAL; 1155 return -EINVAL;
1136 1156
1137 tmp = simple_strtoull(p, &p, 0); 1157 tmp = simple_strtoull(p, &p, 0);
1138 if (!p || (*p && (*p != '\n'))) 1158 if (!p || (*p && (*p != '\n')))
1139 return -EINVAL; 1159 return -EINVAL;
1140 1160
1141 reg->hr_start_block = tmp; 1161 reg->hr_start_block = tmp;
1142 1162
1143 return count; 1163 return count;
1144 } 1164 }
1145 1165
1146 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg, 1166 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1147 char *page) 1167 char *page)
1148 { 1168 {
1149 return sprintf(page, "%d\n", reg->hr_blocks); 1169 return sprintf(page, "%d\n", reg->hr_blocks);
1150 } 1170 }
1151 1171
1152 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, 1172 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1153 const char *page, 1173 const char *page,
1154 size_t count) 1174 size_t count)
1155 { 1175 {
1156 unsigned long tmp; 1176 unsigned long tmp;
1157 char *p = (char *)page; 1177 char *p = (char *)page;
1158 1178
1159 if (reg->hr_bdev) 1179 if (reg->hr_bdev)
1160 return -EINVAL; 1180 return -EINVAL;
1161 1181
1162 tmp = simple_strtoul(p, &p, 0); 1182 tmp = simple_strtoul(p, &p, 0);
1163 if (!p || (*p && (*p != '\n'))) 1183 if (!p || (*p && (*p != '\n')))
1164 return -EINVAL; 1184 return -EINVAL;
1165 1185
1166 if (tmp > O2NM_MAX_NODES || tmp == 0) 1186 if (tmp > O2NM_MAX_NODES || tmp == 0)
1167 return -ERANGE; 1187 return -ERANGE;
1168 1188
1169 reg->hr_blocks = (unsigned int)tmp; 1189 reg->hr_blocks = (unsigned int)tmp;
1170 1190
1171 return count; 1191 return count;
1172 } 1192 }
1173 1193
1174 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg, 1194 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1175 char *page) 1195 char *page)
1176 { 1196 {
1177 unsigned int ret = 0; 1197 unsigned int ret = 0;
1178 1198
1179 if (reg->hr_bdev) 1199 if (reg->hr_bdev)
1180 ret = sprintf(page, "%s\n", reg->hr_dev_name); 1200 ret = sprintf(page, "%s\n", reg->hr_dev_name);
1181 1201
1182 return ret; 1202 return ret;
1183 } 1203 }
1184 1204
1185 static void o2hb_init_region_params(struct o2hb_region *reg) 1205 static void o2hb_init_region_params(struct o2hb_region *reg)
1186 { 1206 {
1187 reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits; 1207 reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1188 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; 1208 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1189 1209
1190 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", 1210 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1191 reg->hr_start_block, reg->hr_blocks); 1211 reg->hr_start_block, reg->hr_blocks);
1192 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", 1212 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1193 reg->hr_block_bytes, reg->hr_block_bits); 1213 reg->hr_block_bytes, reg->hr_block_bits);
1194 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); 1214 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1195 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold); 1215 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1196 } 1216 }
1197 1217
1198 static int o2hb_map_slot_data(struct o2hb_region *reg) 1218 static int o2hb_map_slot_data(struct o2hb_region *reg)
1199 { 1219 {
1200 int i, j; 1220 int i, j;
1201 unsigned int last_slot; 1221 unsigned int last_slot;
1202 unsigned int spp = reg->hr_slots_per_page; 1222 unsigned int spp = reg->hr_slots_per_page;
1203 struct page *page; 1223 struct page *page;
1204 char *raw; 1224 char *raw;
1205 struct o2hb_disk_slot *slot; 1225 struct o2hb_disk_slot *slot;
1206 1226
1207 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); 1227 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1208 if (reg->hr_tmp_block == NULL) { 1228 if (reg->hr_tmp_block == NULL) {
1209 mlog_errno(-ENOMEM); 1229 mlog_errno(-ENOMEM);
1210 return -ENOMEM; 1230 return -ENOMEM;
1211 } 1231 }
1212 1232
1213 reg->hr_slots = kcalloc(reg->hr_blocks, 1233 reg->hr_slots = kcalloc(reg->hr_blocks,
1214 sizeof(struct o2hb_disk_slot), GFP_KERNEL); 1234 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1215 if (reg->hr_slots == NULL) { 1235 if (reg->hr_slots == NULL) {
1216 mlog_errno(-ENOMEM); 1236 mlog_errno(-ENOMEM);
1217 return -ENOMEM; 1237 return -ENOMEM;
1218 } 1238 }
1219 1239
1220 for(i = 0; i < reg->hr_blocks; i++) { 1240 for(i = 0; i < reg->hr_blocks; i++) {
1221 slot = &reg->hr_slots[i]; 1241 slot = &reg->hr_slots[i];
1222 slot->ds_node_num = i; 1242 slot->ds_node_num = i;
1223 INIT_LIST_HEAD(&slot->ds_live_item); 1243 INIT_LIST_HEAD(&slot->ds_live_item);
1224 slot->ds_raw_block = NULL; 1244 slot->ds_raw_block = NULL;
1225 } 1245 }
1226 1246
1227 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; 1247 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1228 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " 1248 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1229 "at %u blocks per page\n", 1249 "at %u blocks per page\n",
1230 reg->hr_num_pages, reg->hr_blocks, spp); 1250 reg->hr_num_pages, reg->hr_blocks, spp);
1231 1251
1232 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), 1252 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1233 GFP_KERNEL); 1253 GFP_KERNEL);
1234 if (!reg->hr_slot_data) { 1254 if (!reg->hr_slot_data) {
1235 mlog_errno(-ENOMEM); 1255 mlog_errno(-ENOMEM);
1236 return -ENOMEM; 1256 return -ENOMEM;
1237 } 1257 }
1238 1258
1239 for(i = 0; i < reg->hr_num_pages; i++) { 1259 for(i = 0; i < reg->hr_num_pages; i++) {
1240 page = alloc_page(GFP_KERNEL); 1260 page = alloc_page(GFP_KERNEL);
1241 if (!page) { 1261 if (!page) {
1242 mlog_errno(-ENOMEM); 1262 mlog_errno(-ENOMEM);
1243 return -ENOMEM; 1263 return -ENOMEM;
1244 } 1264 }
1245 1265
1246 reg->hr_slot_data[i] = page; 1266 reg->hr_slot_data[i] = page;
1247 1267
1248 last_slot = i * spp; 1268 last_slot = i * spp;
1249 raw = page_address(page); 1269 raw = page_address(page);
1250 for (j = 0; 1270 for (j = 0;
1251 (j < spp) && ((j + last_slot) < reg->hr_blocks); 1271 (j < spp) && ((j + last_slot) < reg->hr_blocks);
1252 j++) { 1272 j++) {
1253 BUG_ON((j + last_slot) >= reg->hr_blocks); 1273 BUG_ON((j + last_slot) >= reg->hr_blocks);
1254 1274
1255 slot = &reg->hr_slots[j + last_slot]; 1275 slot = &reg->hr_slots[j + last_slot];
1256 slot->ds_raw_block = 1276 slot->ds_raw_block =
1257 (struct o2hb_disk_heartbeat_block *) raw; 1277 (struct o2hb_disk_heartbeat_block *) raw;
1258 1278
1259 raw += reg->hr_block_bytes; 1279 raw += reg->hr_block_bytes;
1260 } 1280 }
1261 } 1281 }
1262 1282
1263 return 0; 1283 return 0;
1264 } 1284 }
1265 1285
1266 /* Read in all the slots available and populate the tracking 1286 /* Read in all the slots available and populate the tracking
1267 * structures so that we can start with a baseline idea of what's 1287 * structures so that we can start with a baseline idea of what's
1268 * there. */ 1288 * there. */
1269 static int o2hb_populate_slot_data(struct o2hb_region *reg) 1289 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1270 { 1290 {
1271 int ret, i; 1291 int ret, i;
1272 struct o2hb_disk_slot *slot; 1292 struct o2hb_disk_slot *slot;
1273 struct o2hb_disk_heartbeat_block *hb_block; 1293 struct o2hb_disk_heartbeat_block *hb_block;
1274 1294
1275 mlog_entry_void(); 1295 mlog_entry_void();
1276 1296
1277 ret = o2hb_read_slots(reg, reg->hr_blocks); 1297 ret = o2hb_read_slots(reg, reg->hr_blocks);
1278 if (ret) { 1298 if (ret) {
1279 mlog_errno(ret); 1299 mlog_errno(ret);
1280 goto out; 1300 goto out;
1281 } 1301 }
1282 1302
1283 /* We only want to get an idea of the values initially in each 1303 /* We only want to get an idea of the values initially in each
1284 * slot, so we do no verification - o2hb_check_slot will 1304 * slot, so we do no verification - o2hb_check_slot will
1285 * actually determine if each configured slot is valid and 1305 * actually determine if each configured slot is valid and
1286 * whether any values have changed. */ 1306 * whether any values have changed. */
1287 for(i = 0; i < reg->hr_blocks; i++) { 1307 for(i = 0; i < reg->hr_blocks; i++) {
1288 slot = &reg->hr_slots[i]; 1308 slot = &reg->hr_slots[i];
1289 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block; 1309 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1290 1310
1291 /* Only fill the values that o2hb_check_slot uses to 1311 /* Only fill the values that o2hb_check_slot uses to
1292 * determine changing slots */ 1312 * determine changing slots */
1293 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq); 1313 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1294 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 1314 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1295 } 1315 }
1296 1316
1297 out: 1317 out:
1298 mlog_exit(ret); 1318 mlog_exit(ret);
1299 return ret; 1319 return ret;
1300 } 1320 }
1301 1321
1302 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ 1322 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1303 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, 1323 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1304 const char *page, 1324 const char *page,
1305 size_t count) 1325 size_t count)
1306 { 1326 {
1307 long fd; 1327 long fd;
1308 int sectsize; 1328 int sectsize;
1309 char *p = (char *)page; 1329 char *p = (char *)page;
1310 struct file *filp = NULL; 1330 struct file *filp = NULL;
1311 struct inode *inode = NULL; 1331 struct inode *inode = NULL;
1312 ssize_t ret = -EINVAL; 1332 ssize_t ret = -EINVAL;
1313 1333
1314 if (reg->hr_bdev) 1334 if (reg->hr_bdev)
1315 goto out; 1335 goto out;
1316 1336
1317 /* We can't heartbeat without having had our node number 1337 /* We can't heartbeat without having had our node number
1318 * configured yet. */ 1338 * configured yet. */
1319 if (o2nm_this_node() == O2NM_MAX_NODES) 1339 if (o2nm_this_node() == O2NM_MAX_NODES)
1320 goto out; 1340 goto out;
1321 1341
1322 fd = simple_strtol(p, &p, 0); 1342 fd = simple_strtol(p, &p, 0);
1323 if (!p || (*p && (*p != '\n'))) 1343 if (!p || (*p && (*p != '\n')))
1324 goto out; 1344 goto out;
1325 1345
1326 if (fd < 0 || fd >= INT_MAX) 1346 if (fd < 0 || fd >= INT_MAX)
1327 goto out; 1347 goto out;
1328 1348
1329 filp = fget(fd); 1349 filp = fget(fd);
1330 if (filp == NULL) 1350 if (filp == NULL)
1331 goto out; 1351 goto out;
1332 1352
1333 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || 1353 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1334 reg->hr_block_bytes == 0) 1354 reg->hr_block_bytes == 0)
1335 goto out; 1355 goto out;
1336 1356
1337 inode = igrab(filp->f_mapping->host); 1357 inode = igrab(filp->f_mapping->host);
1338 if (inode == NULL) 1358 if (inode == NULL)
1339 goto out; 1359 goto out;
1340 1360
1341 if (!S_ISBLK(inode->i_mode)) 1361 if (!S_ISBLK(inode->i_mode))
1342 goto out; 1362 goto out;
1343 1363
1344 reg->hr_bdev = I_BDEV(filp->f_mapping->host); 1364 reg->hr_bdev = I_BDEV(filp->f_mapping->host);
1345 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0); 1365 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0);
1346 if (ret) { 1366 if (ret) {
1347 reg->hr_bdev = NULL; 1367 reg->hr_bdev = NULL;
1348 goto out; 1368 goto out;
1349 } 1369 }
1350 inode = NULL; 1370 inode = NULL;
1351 1371
1352 bdevname(reg->hr_bdev, reg->hr_dev_name); 1372 bdevname(reg->hr_bdev, reg->hr_dev_name);
1353 1373
1354 sectsize = bdev_hardsect_size(reg->hr_bdev); 1374 sectsize = bdev_hardsect_size(reg->hr_bdev);
1355 if (sectsize != reg->hr_block_bytes) { 1375 if (sectsize != reg->hr_block_bytes) {
1356 mlog(ML_ERROR, 1376 mlog(ML_ERROR,
1357 "blocksize %u incorrect for device, expected %d", 1377 "blocksize %u incorrect for device, expected %d",
1358 reg->hr_block_bytes, sectsize); 1378 reg->hr_block_bytes, sectsize);
1359 ret = -EINVAL; 1379 ret = -EINVAL;
1360 goto out; 1380 goto out;
1361 } 1381 }
1362 1382
1363 o2hb_init_region_params(reg); 1383 o2hb_init_region_params(reg);
1364 1384
1365 /* Generation of zero is invalid */ 1385 /* Generation of zero is invalid */
1366 do { 1386 do {
1367 get_random_bytes(&reg->hr_generation, 1387 get_random_bytes(&reg->hr_generation,
1368 sizeof(reg->hr_generation)); 1388 sizeof(reg->hr_generation));
1369 } while (reg->hr_generation == 0); 1389 } while (reg->hr_generation == 0);
1370 1390
1371 ret = o2hb_map_slot_data(reg); 1391 ret = o2hb_map_slot_data(reg);
1372 if (ret) { 1392 if (ret) {
1373 mlog_errno(ret); 1393 mlog_errno(ret);
1374 goto out; 1394 goto out;
1375 } 1395 }
1376 1396
1377 ret = o2hb_populate_slot_data(reg); 1397 ret = o2hb_populate_slot_data(reg);
1378 if (ret) { 1398 if (ret) {
1379 mlog_errno(ret); 1399 mlog_errno(ret);
1380 goto out; 1400 goto out;
1381 } 1401 }
1382 1402
1383 INIT_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout, reg); 1403 INIT_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout, reg);
1384 1404
1385 /* 1405 /*
1386 * A node is considered live after it has beat LIVE_THRESHOLD 1406 * A node is considered live after it has beat LIVE_THRESHOLD
1387 * times. We're not steady until we've given them a chance 1407 * times. We're not steady until we've given them a chance
1388 * _after_ our first read. 1408 * _after_ our first read.
1389 */ 1409 */
1390 atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1); 1410 atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1);
1391 1411
1392 reg->hr_task = kthread_run(o2hb_thread, reg, "o2hb-%s", 1412 reg->hr_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1393 reg->hr_item.ci_name); 1413 reg->hr_item.ci_name);
1394 if (IS_ERR(reg->hr_task)) { 1414 if (IS_ERR(reg->hr_task)) {
1395 ret = PTR_ERR(reg->hr_task); 1415 ret = PTR_ERR(reg->hr_task);
1396 mlog_errno(ret); 1416 mlog_errno(ret);
1397 reg->hr_task = NULL; 1417 reg->hr_task = NULL;
1398 goto out; 1418 goto out;
1399 } 1419 }
1400 1420
1401 ret = wait_event_interruptible(o2hb_steady_queue, 1421 ret = wait_event_interruptible(o2hb_steady_queue,
1402 atomic_read(&reg->hr_steady_iterations) == 0); 1422 atomic_read(&reg->hr_steady_iterations) == 0);
1403 if (ret) { 1423 if (ret) {
1404 kthread_stop(reg->hr_task); 1424 kthread_stop(reg->hr_task);
1405 reg->hr_task = NULL; 1425 reg->hr_task = NULL;
1406 goto out; 1426 goto out;
1407 } 1427 }
1408 1428
1409 ret = count; 1429 ret = count;
1410 out: 1430 out:
1411 if (filp) 1431 if (filp)
1412 fput(filp); 1432 fput(filp);
1413 if (inode) 1433 if (inode)
1414 iput(inode); 1434 iput(inode);
1415 if (ret < 0) { 1435 if (ret < 0) {
1416 if (reg->hr_bdev) { 1436 if (reg->hr_bdev) {
1417 blkdev_put(reg->hr_bdev); 1437 blkdev_put(reg->hr_bdev);
1418 reg->hr_bdev = NULL; 1438 reg->hr_bdev = NULL;
1419 } 1439 }
1420 } 1440 }
1421 return ret; 1441 return ret;
1422 } 1442 }
1423 1443
1424 struct o2hb_region_attribute { 1444 struct o2hb_region_attribute {
1425 struct configfs_attribute attr; 1445 struct configfs_attribute attr;
1426 ssize_t (*show)(struct o2hb_region *, char *); 1446 ssize_t (*show)(struct o2hb_region *, char *);
1427 ssize_t (*store)(struct o2hb_region *, const char *, size_t); 1447 ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1428 }; 1448 };
1429 1449
1430 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = { 1450 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1431 .attr = { .ca_owner = THIS_MODULE, 1451 .attr = { .ca_owner = THIS_MODULE,
1432 .ca_name = "block_bytes", 1452 .ca_name = "block_bytes",
1433 .ca_mode = S_IRUGO | S_IWUSR }, 1453 .ca_mode = S_IRUGO | S_IWUSR },
1434 .show = o2hb_region_block_bytes_read, 1454 .show = o2hb_region_block_bytes_read,
1435 .store = o2hb_region_block_bytes_write, 1455 .store = o2hb_region_block_bytes_write,
1436 }; 1456 };
1437 1457
1438 static struct o2hb_region_attribute o2hb_region_attr_start_block = { 1458 static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1439 .attr = { .ca_owner = THIS_MODULE, 1459 .attr = { .ca_owner = THIS_MODULE,
1440 .ca_name = "start_block", 1460 .ca_name = "start_block",
1441 .ca_mode = S_IRUGO | S_IWUSR }, 1461 .ca_mode = S_IRUGO | S_IWUSR },
1442 .show = o2hb_region_start_block_read, 1462 .show = o2hb_region_start_block_read,
1443 .store = o2hb_region_start_block_write, 1463 .store = o2hb_region_start_block_write,
1444 }; 1464 };
1445 1465
1446 static struct o2hb_region_attribute o2hb_region_attr_blocks = { 1466 static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1447 .attr = { .ca_owner = THIS_MODULE, 1467 .attr = { .ca_owner = THIS_MODULE,
1448 .ca_name = "blocks", 1468 .ca_name = "blocks",
1449 .ca_mode = S_IRUGO | S_IWUSR }, 1469 .ca_mode = S_IRUGO | S_IWUSR },
1450 .show = o2hb_region_blocks_read, 1470 .show = o2hb_region_blocks_read,
1451 .store = o2hb_region_blocks_write, 1471 .store = o2hb_region_blocks_write,
1452 }; 1472 };
1453 1473
1454 static struct o2hb_region_attribute o2hb_region_attr_dev = { 1474 static struct o2hb_region_attribute o2hb_region_attr_dev = {
1455 .attr = { .ca_owner = THIS_MODULE, 1475 .attr = { .ca_owner = THIS_MODULE,
1456 .ca_name = "dev", 1476 .ca_name = "dev",
1457 .ca_mode = S_IRUGO | S_IWUSR }, 1477 .ca_mode = S_IRUGO | S_IWUSR },
1458 .show = o2hb_region_dev_read, 1478 .show = o2hb_region_dev_read,
1459 .store = o2hb_region_dev_write, 1479 .store = o2hb_region_dev_write,
1460 }; 1480 };
1461 1481
1462 static struct configfs_attribute *o2hb_region_attrs[] = { 1482 static struct configfs_attribute *o2hb_region_attrs[] = {
1463 &o2hb_region_attr_block_bytes.attr, 1483 &o2hb_region_attr_block_bytes.attr,
1464 &o2hb_region_attr_start_block.attr, 1484 &o2hb_region_attr_start_block.attr,
1465 &o2hb_region_attr_blocks.attr, 1485 &o2hb_region_attr_blocks.attr,
1466 &o2hb_region_attr_dev.attr, 1486 &o2hb_region_attr_dev.attr,
1467 NULL, 1487 NULL,
1468 }; 1488 };
1469 1489
1470 static ssize_t o2hb_region_show(struct config_item *item, 1490 static ssize_t o2hb_region_show(struct config_item *item,
1471 struct configfs_attribute *attr, 1491 struct configfs_attribute *attr,
1472 char *page) 1492 char *page)
1473 { 1493 {
1474 struct o2hb_region *reg = to_o2hb_region(item); 1494 struct o2hb_region *reg = to_o2hb_region(item);
1475 struct o2hb_region_attribute *o2hb_region_attr = 1495 struct o2hb_region_attribute *o2hb_region_attr =
1476 container_of(attr, struct o2hb_region_attribute, attr); 1496 container_of(attr, struct o2hb_region_attribute, attr);
1477 ssize_t ret = 0; 1497 ssize_t ret = 0;
1478 1498
1479 if (o2hb_region_attr->show) 1499 if (o2hb_region_attr->show)
1480 ret = o2hb_region_attr->show(reg, page); 1500 ret = o2hb_region_attr->show(reg, page);
1481 return ret; 1501 return ret;
1482 } 1502 }
1483 1503
1484 static ssize_t o2hb_region_store(struct config_item *item, 1504 static ssize_t o2hb_region_store(struct config_item *item,
1485 struct configfs_attribute *attr, 1505 struct configfs_attribute *attr,
1486 const char *page, size_t count) 1506 const char *page, size_t count)
1487 { 1507 {
1488 struct o2hb_region *reg = to_o2hb_region(item); 1508 struct o2hb_region *reg = to_o2hb_region(item);
1489 struct o2hb_region_attribute *o2hb_region_attr = 1509 struct o2hb_region_attribute *o2hb_region_attr =
1490 container_of(attr, struct o2hb_region_attribute, attr); 1510 container_of(attr, struct o2hb_region_attribute, attr);
1491 ssize_t ret = -EINVAL; 1511 ssize_t ret = -EINVAL;
1492 1512
1493 if (o2hb_region_attr->store) 1513 if (o2hb_region_attr->store)
1494 ret = o2hb_region_attr->store(reg, page, count); 1514 ret = o2hb_region_attr->store(reg, page, count);
1495 return ret; 1515 return ret;
1496 } 1516 }
1497 1517
1498 static struct configfs_item_operations o2hb_region_item_ops = { 1518 static struct configfs_item_operations o2hb_region_item_ops = {
1499 .release = o2hb_region_release, 1519 .release = o2hb_region_release,
1500 .show_attribute = o2hb_region_show, 1520 .show_attribute = o2hb_region_show,
1501 .store_attribute = o2hb_region_store, 1521 .store_attribute = o2hb_region_store,
1502 }; 1522 };
1503 1523
1504 static struct config_item_type o2hb_region_type = { 1524 static struct config_item_type o2hb_region_type = {
1505 .ct_item_ops = &o2hb_region_item_ops, 1525 .ct_item_ops = &o2hb_region_item_ops,
1506 .ct_attrs = o2hb_region_attrs, 1526 .ct_attrs = o2hb_region_attrs,
1507 .ct_owner = THIS_MODULE, 1527 .ct_owner = THIS_MODULE,
1508 }; 1528 };
1509 1529
1510 /* heartbeat set */ 1530 /* heartbeat set */
1511 1531
1512 struct o2hb_heartbeat_group { 1532 struct o2hb_heartbeat_group {
1513 struct config_group hs_group; 1533 struct config_group hs_group;
1514 /* some stuff? */ 1534 /* some stuff? */
1515 }; 1535 };
1516 1536
1517 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group) 1537 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1518 { 1538 {
1519 return group ? 1539 return group ?
1520 container_of(group, struct o2hb_heartbeat_group, hs_group) 1540 container_of(group, struct o2hb_heartbeat_group, hs_group)
1521 : NULL; 1541 : NULL;
1522 } 1542 }
1523 1543
1524 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, 1544 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
1525 const char *name) 1545 const char *name)
1526 { 1546 {
1527 struct o2hb_region *reg = NULL; 1547 struct o2hb_region *reg = NULL;
1528 struct config_item *ret = NULL; 1548 struct config_item *ret = NULL;
1529 1549
1530 reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL); 1550 reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL);
1531 if (reg == NULL) 1551 if (reg == NULL)
1532 goto out; /* ENOMEM */ 1552 goto out; /* ENOMEM */
1533 1553
1534 config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type); 1554 config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
1535 1555
1536 ret = &reg->hr_item; 1556 ret = &reg->hr_item;
1537 1557
1538 spin_lock(&o2hb_live_lock); 1558 spin_lock(&o2hb_live_lock);
1539 list_add_tail(&reg->hr_all_item, &o2hb_all_regions); 1559 list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
1540 spin_unlock(&o2hb_live_lock); 1560 spin_unlock(&o2hb_live_lock);
1541 out: 1561 out:
1542 if (ret == NULL) 1562 if (ret == NULL)
1543 kfree(reg); 1563 kfree(reg);
1544 1564
1545 return ret; 1565 return ret;
1546 } 1566 }
1547 1567
1548 static void o2hb_heartbeat_group_drop_item(struct config_group *group, 1568 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
1549 struct config_item *item) 1569 struct config_item *item)
1550 { 1570 {
1551 struct o2hb_region *reg = to_o2hb_region(item); 1571 struct o2hb_region *reg = to_o2hb_region(item);
1552 1572
1553 /* stop the thread when the user removes the region dir */ 1573 /* stop the thread when the user removes the region dir */
1554 if (reg->hr_task) { 1574 if (reg->hr_task) {
1555 kthread_stop(reg->hr_task); 1575 kthread_stop(reg->hr_task);
1556 reg->hr_task = NULL; 1576 reg->hr_task = NULL;
1557 } 1577 }
1558 1578
1559 config_item_put(item); 1579 config_item_put(item);
1560 } 1580 }
1561 1581
1562 struct o2hb_heartbeat_group_attribute { 1582 struct o2hb_heartbeat_group_attribute {
1563 struct configfs_attribute attr; 1583 struct configfs_attribute attr;
1564 ssize_t (*show)(struct o2hb_heartbeat_group *, char *); 1584 ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
1565 ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t); 1585 ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
1566 }; 1586 };
1567 1587
1568 static ssize_t o2hb_heartbeat_group_show(struct config_item *item, 1588 static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
1569 struct configfs_attribute *attr, 1589 struct configfs_attribute *attr,
1570 char *page) 1590 char *page)
1571 { 1591 {
1572 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1592 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1573 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1593 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1574 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1594 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1575 ssize_t ret = 0; 1595 ssize_t ret = 0;
1576 1596
1577 if (o2hb_heartbeat_group_attr->show) 1597 if (o2hb_heartbeat_group_attr->show)
1578 ret = o2hb_heartbeat_group_attr->show(reg, page); 1598 ret = o2hb_heartbeat_group_attr->show(reg, page);
1579 return ret; 1599 return ret;
1580 } 1600 }
1581 1601
1582 static ssize_t o2hb_heartbeat_group_store(struct config_item *item, 1602 static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
1583 struct configfs_attribute *attr, 1603 struct configfs_attribute *attr,
1584 const char *page, size_t count) 1604 const char *page, size_t count)
1585 { 1605 {
1586 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1606 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1587 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1607 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1588 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1608 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1589 ssize_t ret = -EINVAL; 1609 ssize_t ret = -EINVAL;
1590 1610
1591 if (o2hb_heartbeat_group_attr->store) 1611 if (o2hb_heartbeat_group_attr->store)
1592 ret = o2hb_heartbeat_group_attr->store(reg, page, count); 1612 ret = o2hb_heartbeat_group_attr->store(reg, page, count);
1593 return ret; 1613 return ret;
1594 } 1614 }
1595 1615
1596 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group, 1616 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
1597 char *page) 1617 char *page)
1598 { 1618 {
1599 return sprintf(page, "%u\n", o2hb_dead_threshold); 1619 return sprintf(page, "%u\n", o2hb_dead_threshold);
1600 } 1620 }
1601 1621
1602 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group, 1622 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
1603 const char *page, 1623 const char *page,
1604 size_t count) 1624 size_t count)
1605 { 1625 {
1606 unsigned long tmp; 1626 unsigned long tmp;
1607 char *p = (char *)page; 1627 char *p = (char *)page;
1608 1628
1609 tmp = simple_strtoul(p, &p, 10); 1629 tmp = simple_strtoul(p, &p, 10);
1610 if (!p || (*p && (*p != '\n'))) 1630 if (!p || (*p && (*p != '\n')))
1611 return -EINVAL; 1631 return -EINVAL;
1612 1632
1613 /* this will validate ranges for us. */ 1633 /* this will validate ranges for us. */
1614 o2hb_dead_threshold_set((unsigned int) tmp); 1634 o2hb_dead_threshold_set((unsigned int) tmp);
1615 1635
1616 return count; 1636 return count;
1617 } 1637 }
1618 1638
1619 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = { 1639 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
1620 .attr = { .ca_owner = THIS_MODULE, 1640 .attr = { .ca_owner = THIS_MODULE,
1621 .ca_name = "dead_threshold", 1641 .ca_name = "dead_threshold",
1622 .ca_mode = S_IRUGO | S_IWUSR }, 1642 .ca_mode = S_IRUGO | S_IWUSR },
1623 .show = o2hb_heartbeat_group_threshold_show, 1643 .show = o2hb_heartbeat_group_threshold_show,
1624 .store = o2hb_heartbeat_group_threshold_store, 1644 .store = o2hb_heartbeat_group_threshold_store,
1625 }; 1645 };
1626 1646
1627 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { 1647 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
1628 &o2hb_heartbeat_group_attr_threshold.attr, 1648 &o2hb_heartbeat_group_attr_threshold.attr,
1629 NULL, 1649 NULL,
1630 }; 1650 };
1631 1651
1632 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = { 1652 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
1633 .show_attribute = o2hb_heartbeat_group_show, 1653 .show_attribute = o2hb_heartbeat_group_show,
1634 .store_attribute = o2hb_heartbeat_group_store, 1654 .store_attribute = o2hb_heartbeat_group_store,
1635 }; 1655 };
1636 1656
1637 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { 1657 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
1638 .make_item = o2hb_heartbeat_group_make_item, 1658 .make_item = o2hb_heartbeat_group_make_item,
1639 .drop_item = o2hb_heartbeat_group_drop_item, 1659 .drop_item = o2hb_heartbeat_group_drop_item,
1640 }; 1660 };
1641 1661
1642 static struct config_item_type o2hb_heartbeat_group_type = { 1662 static struct config_item_type o2hb_heartbeat_group_type = {
1643 .ct_group_ops = &o2hb_heartbeat_group_group_ops, 1663 .ct_group_ops = &o2hb_heartbeat_group_group_ops,
1644 .ct_item_ops = &o2hb_hearbeat_group_item_ops, 1664 .ct_item_ops = &o2hb_hearbeat_group_item_ops,
1645 .ct_attrs = o2hb_heartbeat_group_attrs, 1665 .ct_attrs = o2hb_heartbeat_group_attrs,
1646 .ct_owner = THIS_MODULE, 1666 .ct_owner = THIS_MODULE,
1647 }; 1667 };
1648 1668
1649 /* this is just here to avoid touching group in heartbeat.h which the 1669 /* this is just here to avoid touching group in heartbeat.h which the
1650 * entire damn world #includes */ 1670 * entire damn world #includes */
1651 struct config_group *o2hb_alloc_hb_set(void) 1671 struct config_group *o2hb_alloc_hb_set(void)
1652 { 1672 {
1653 struct o2hb_heartbeat_group *hs = NULL; 1673 struct o2hb_heartbeat_group *hs = NULL;
1654 struct config_group *ret = NULL; 1674 struct config_group *ret = NULL;
1655 1675
1656 hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 1676 hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
1657 if (hs == NULL) 1677 if (hs == NULL)
1658 goto out; 1678 goto out;
1659 1679
1660 config_group_init_type_name(&hs->hs_group, "heartbeat", 1680 config_group_init_type_name(&hs->hs_group, "heartbeat",
1661 &o2hb_heartbeat_group_type); 1681 &o2hb_heartbeat_group_type);
1662 1682
1663 ret = &hs->hs_group; 1683 ret = &hs->hs_group;
1664 out: 1684 out:
1665 if (ret == NULL) 1685 if (ret == NULL)
1666 kfree(hs); 1686 kfree(hs);
1667 return ret; 1687 return ret;
1668 } 1688 }
1669 1689
1670 void o2hb_free_hb_set(struct config_group *group) 1690 void o2hb_free_hb_set(struct config_group *group)
1671 { 1691 {
1672 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group); 1692 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
1673 kfree(hs); 1693 kfree(hs);
1674 } 1694 }
1675 1695
1676 /* hb callback registration and issueing */ 1696 /* hb callback registration and issueing */
1677 1697
1678 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type) 1698 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
1679 { 1699 {
1680 if (type == O2HB_NUM_CB) 1700 if (type == O2HB_NUM_CB)
1681 return ERR_PTR(-EINVAL); 1701 return ERR_PTR(-EINVAL);
1682 1702
1683 return &o2hb_callbacks[type]; 1703 return &o2hb_callbacks[type];
1684 } 1704 }
1685 1705
1686 void o2hb_setup_callback(struct o2hb_callback_func *hc, 1706 void o2hb_setup_callback(struct o2hb_callback_func *hc,
1687 enum o2hb_callback_type type, 1707 enum o2hb_callback_type type,
1688 o2hb_cb_func *func, 1708 o2hb_cb_func *func,
1689 void *data, 1709 void *data,
1690 int priority) 1710 int priority)
1691 { 1711 {
1692 INIT_LIST_HEAD(&hc->hc_item); 1712 INIT_LIST_HEAD(&hc->hc_item);
1693 hc->hc_func = func; 1713 hc->hc_func = func;
1694 hc->hc_data = data; 1714 hc->hc_data = data;
1695 hc->hc_priority = priority; 1715 hc->hc_priority = priority;
1696 hc->hc_type = type; 1716 hc->hc_type = type;
1697 hc->hc_magic = O2HB_CB_MAGIC; 1717 hc->hc_magic = O2HB_CB_MAGIC;
1698 } 1718 }
1699 EXPORT_SYMBOL_GPL(o2hb_setup_callback); 1719 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
1700 1720
1701 int o2hb_register_callback(struct o2hb_callback_func *hc) 1721 int o2hb_register_callback(struct o2hb_callback_func *hc)
1702 { 1722 {
1703 struct o2hb_callback_func *tmp; 1723 struct o2hb_callback_func *tmp;
1704 struct list_head *iter; 1724 struct list_head *iter;
1705 struct o2hb_callback *hbcall; 1725 struct o2hb_callback *hbcall;
1706 int ret; 1726 int ret;
1707 1727
1708 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1728 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1709 BUG_ON(!list_empty(&hc->hc_item)); 1729 BUG_ON(!list_empty(&hc->hc_item));
1710 1730
1711 hbcall = hbcall_from_type(hc->hc_type); 1731 hbcall = hbcall_from_type(hc->hc_type);
1712 if (IS_ERR(hbcall)) { 1732 if (IS_ERR(hbcall)) {
1713 ret = PTR_ERR(hbcall); 1733 ret = PTR_ERR(hbcall);
1714 goto out; 1734 goto out;
1715 } 1735 }
1716 1736
1717 down_write(&o2hb_callback_sem); 1737 down_write(&o2hb_callback_sem);
1718 1738
1719 list_for_each(iter, &hbcall->list) { 1739 list_for_each(iter, &hbcall->list) {
1720 tmp = list_entry(iter, struct o2hb_callback_func, hc_item); 1740 tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
1721 if (hc->hc_priority < tmp->hc_priority) { 1741 if (hc->hc_priority < tmp->hc_priority) {
1722 list_add_tail(&hc->hc_item, iter); 1742 list_add_tail(&hc->hc_item, iter);
1723 break; 1743 break;
1724 } 1744 }
1725 } 1745 }
1726 if (list_empty(&hc->hc_item)) 1746 if (list_empty(&hc->hc_item))
1727 list_add_tail(&hc->hc_item, &hbcall->list); 1747 list_add_tail(&hc->hc_item, &hbcall->list);
1728 1748
1729 up_write(&o2hb_callback_sem); 1749 up_write(&o2hb_callback_sem);
1730 ret = 0; 1750 ret = 0;
1731 out: 1751 out:
1732 mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n", 1752 mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n",
1733 ret, __builtin_return_address(0), hc); 1753 ret, __builtin_return_address(0), hc);
1734 return ret; 1754 return ret;
1735 } 1755 }
1736 EXPORT_SYMBOL_GPL(o2hb_register_callback); 1756 EXPORT_SYMBOL_GPL(o2hb_register_callback);
1737 1757
1738 int o2hb_unregister_callback(struct o2hb_callback_func *hc) 1758 int o2hb_unregister_callback(struct o2hb_callback_func *hc)
1739 { 1759 {
1740 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1760 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1741 1761
1742 mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n", 1762 mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n",
1743 __builtin_return_address(0), hc); 1763 __builtin_return_address(0), hc);
1744 1764
1745 if (list_empty(&hc->hc_item)) 1765 if (list_empty(&hc->hc_item))
1746 return 0; 1766 return 0;
1747 1767
1748 down_write(&o2hb_callback_sem); 1768 down_write(&o2hb_callback_sem);
1749 1769
1750 list_del_init(&hc->hc_item); 1770 list_del_init(&hc->hc_item);
1751 1771
1752 up_write(&o2hb_callback_sem); 1772 up_write(&o2hb_callback_sem);
1753 1773
1754 return 0; 1774 return 0;
1755 } 1775 }
1756 EXPORT_SYMBOL_GPL(o2hb_unregister_callback); 1776 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
1757 1777
1758 int o2hb_check_node_heartbeating(u8 node_num) 1778 int o2hb_check_node_heartbeating(u8 node_num)
1759 { 1779 {
1760 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1780 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1761 1781
1762 o2hb_fill_node_map(testing_map, sizeof(testing_map)); 1782 o2hb_fill_node_map(testing_map, sizeof(testing_map));
1763 if (!test_bit(node_num, testing_map)) { 1783 if (!test_bit(node_num, testing_map)) {
1764 mlog(ML_HEARTBEAT, 1784 mlog(ML_HEARTBEAT,
1765 "node (%u) does not have heartbeating enabled.\n", 1785 "node (%u) does not have heartbeating enabled.\n",
1766 node_num); 1786 node_num);
1767 return 0; 1787 return 0;
1768 } 1788 }
1769 1789
1770 return 1; 1790 return 1;
1771 } 1791 }
1772 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating); 1792 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
1773 1793
1774 int o2hb_check_node_heartbeating_from_callback(u8 node_num) 1794 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
1775 { 1795 {
1776 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1796 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1777 1797
1778 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 1798 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
1779 if (!test_bit(node_num, testing_map)) { 1799 if (!test_bit(node_num, testing_map)) {
1780 mlog(ML_HEARTBEAT, 1800 mlog(ML_HEARTBEAT,
1781 "node (%u) does not have heartbeating enabled.\n", 1801 "node (%u) does not have heartbeating enabled.\n",
1782 node_num); 1802 node_num);
1783 return 0; 1803 return 0;
1784 } 1804 }
1785 1805
1786 return 1; 1806 return 1;
1787 } 1807 }
1788 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); 1808 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
1789 1809
1790 /* Makes sure our local node is configured with a node number, and is 1810 /* Makes sure our local node is configured with a node number, and is
1791 * heartbeating. */ 1811 * heartbeating. */
1792 int o2hb_check_local_node_heartbeating(void) 1812 int o2hb_check_local_node_heartbeating(void)
1793 { 1813 {
1794 u8 node_num; 1814 u8 node_num;
1795 1815
1796 /* if this node was set then we have networking */ 1816 /* if this node was set then we have networking */
1797 node_num = o2nm_this_node(); 1817 node_num = o2nm_this_node();
1798 if (node_num == O2NM_MAX_NODES) { 1818 if (node_num == O2NM_MAX_NODES) {
1799 mlog(ML_HEARTBEAT, "this node has not been configured.\n"); 1819 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
1800 return 0; 1820 return 0;
1801 } 1821 }
1802 1822
1803 return o2hb_check_node_heartbeating(node_num); 1823 return o2hb_check_node_heartbeating(node_num);
1804 } 1824 }
1805 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating); 1825 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
1806 1826
1807 /* 1827 /*
1808 * this is just a hack until we get the plumbing which flips file systems 1828 * this is just a hack until we get the plumbing which flips file systems
1809 * read only and drops the hb ref instead of killing the node dead. 1829 * read only and drops the hb ref instead of killing the node dead.
1810 */ 1830 */
1811 void o2hb_stop_all_regions(void) 1831 void o2hb_stop_all_regions(void)
1812 { 1832 {
1813 struct o2hb_region *reg; 1833 struct o2hb_region *reg;
1814 1834
1815 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n"); 1835 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
1816 1836
1817 spin_lock(&o2hb_live_lock); 1837 spin_lock(&o2hb_live_lock);
1818 1838
1819 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) 1839 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
1820 reg->hr_unclean_stop = 1; 1840 reg->hr_unclean_stop = 1;
1821 1841
1822 spin_unlock(&o2hb_live_lock); 1842 spin_unlock(&o2hb_live_lock);
1823 } 1843 }
1824 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions); 1844 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
1825 1845
fs/ocfs2/cluster/ocfs2_heartbeat.h
1 /* -*- mode: c; c-basic-offset: 8; -*- 1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0: 2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 * 3 *
4 * ocfs2_heartbeat.h 4 * ocfs2_heartbeat.h
5 * 5 *
6 * On-disk structures for ocfs2_heartbeat 6 * On-disk structures for ocfs2_heartbeat
7 * 7 *
8 * Copyright (C) 2002, 2004 Oracle. All rights reserved. 8 * Copyright (C) 2002, 2004 Oracle. All rights reserved.
9 * 9 *
10 * This program is free software; you can redistribute it and/or 10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public 11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either 12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version. 13 * version 2 of the License, or (at your option) any later version.
14 * 14 *
15 * This program is distributed in the hope that it will be useful, 15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details. 18 * General Public License for more details.
19 * 19 *
20 * You should have received a copy of the GNU General Public 20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the 21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA. 23 * Boston, MA 021110-1307, USA.
24 */ 24 */
25 25
26 #ifndef _OCFS2_HEARTBEAT_H 26 #ifndef _OCFS2_HEARTBEAT_H
27 #define _OCFS2_HEARTBEAT_H 27 #define _OCFS2_HEARTBEAT_H
28 28
29 struct o2hb_disk_heartbeat_block { 29 struct o2hb_disk_heartbeat_block {
30 __le64 hb_seq; 30 __le64 hb_seq;
31 __u8 hb_node; 31 __u8 hb_node;
32 __u8 hb_pad1[3]; 32 __u8 hb_pad1[3];
33 __le32 hb_cksum; 33 __le32 hb_cksum;
34 __le64 hb_generation; 34 __le64 hb_generation;
35 __le32 hb_dead_ms;
35 }; 36 };
36 37
37 #endif /* _OCFS2_HEARTBEAT_H */ 38 #endif /* _OCFS2_HEARTBEAT_H */
38 39