Commit 553abd046af609191a91af7289d87d477adc659f

Authored by Joel Becker
Committed by Mark Fasheh
1 parent d85b20e4b3

ocfs2: Change the recovery map to an array of node numbers.

The old recovery map was a bitmap of node numbers.  This was sufficient
for the maximum node number of 254.  Going forward, we want node numbers
to be UINT32.  Thus, we need a new recovery map.

Note that we can't keep track of slots here.  We must write down the
node number to recovery *before* we get the locks needed to convert a
node number into a slot number.

The recovery map is now an array of unsigned ints, max_slots in size.
It moves to journal.c with the rest of recovery.

Because it needs to be initialized, we move all of recovery initialization
into a new function, ocfs2_recovery_init().  This actually cleans up
ocfs2_initialize_super() a little as well.  Following on, recovery cleaup
becomes part of ocfs2_recovery_exit().

A number of node map functions are rendered obsolete and are removed.

Finally, waiting on recovery is wrapped in a function rather than naked
checks on the recovery_event.  This is a cleanup from Mark.

Signed-off-by: Joel Becker <joel.becker@oracle.com>
Signed-off-by: Mark Fasheh <mfasheh@suse.com>

Showing 7 changed files with 182 additions and 170 deletions Side-by-side Diff

... ... @@ -1950,8 +1950,7 @@
1950 1950 goto local;
1951 1951  
1952 1952 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1953   - wait_event(osb->recovery_event,
1954   - ocfs2_node_map_is_empty(osb, &osb->recovery_map));
  1953 + ocfs2_wait_for_recovery(osb);
1955 1954  
1956 1955 lockres = &OCFS2_I(inode)->ip_inode_lockres;
1957 1956 level = ex ? LKM_EXMODE : LKM_PRMODE;
... ... @@ -1974,8 +1973,7 @@
1974 1973 * committed to owning this lock so we don't allow signals to
1975 1974 * abort the operation. */
1976 1975 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1977   - wait_event(osb->recovery_event,
1978   - ocfs2_node_map_is_empty(osb, &osb->recovery_map));
  1976 + ocfs2_wait_for_recovery(osb);
1979 1977  
1980 1978 local:
1981 1979 /*
fs/ocfs2/heartbeat.c
... ... @@ -48,7 +48,6 @@
48 48 int bit);
49 49 static inline void __ocfs2_node_map_clear_bit(struct ocfs2_node_map *map,
50 50 int bit);
51   -static inline int __ocfs2_node_map_is_empty(struct ocfs2_node_map *map);
52 51  
53 52 /* special case -1 for now
54 53 * TODO: should *really* make sure the calling func never passes -1!! */
... ... @@ -62,7 +61,6 @@
62 61 void ocfs2_init_node_maps(struct ocfs2_super *osb)
63 62 {
64 63 spin_lock_init(&osb->node_map_lock);
65   - ocfs2_node_map_init(&osb->recovery_map);
66 64 ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
67 65 }
68 66  
... ... @@ -190,115 +188,5 @@
190 188 ret = test_bit(bit, map->map);
191 189 spin_unlock(&osb->node_map_lock);
192 190 return ret;
193   -}
194   -
195   -static inline int __ocfs2_node_map_is_empty(struct ocfs2_node_map *map)
196   -{
197   - int bit;
198   - bit = find_next_bit(map->map, map->num_nodes, 0);
199   - if (bit < map->num_nodes)
200   - return 0;
201   - return 1;
202   -}
203   -
204   -int ocfs2_node_map_is_empty(struct ocfs2_super *osb,
205   - struct ocfs2_node_map *map)
206   -{
207   - int ret;
208   - BUG_ON(map->num_nodes == 0);
209   - spin_lock(&osb->node_map_lock);
210   - ret = __ocfs2_node_map_is_empty(map);
211   - spin_unlock(&osb->node_map_lock);
212   - return ret;
213   -}
214   -
215   -#if 0
216   -
217   -static void __ocfs2_node_map_dup(struct ocfs2_node_map *target,
218   - struct ocfs2_node_map *from)
219   -{
220   - BUG_ON(from->num_nodes == 0);
221   - ocfs2_node_map_init(target);
222   - __ocfs2_node_map_set(target, from);
223   -}
224   -
225   -/* returns 1 if bit is the only bit set in target, 0 otherwise */
226   -int ocfs2_node_map_is_only(struct ocfs2_super *osb,
227   - struct ocfs2_node_map *target,
228   - int bit)
229   -{
230   - struct ocfs2_node_map temp;
231   - int ret;
232   -
233   - spin_lock(&osb->node_map_lock);
234   - __ocfs2_node_map_dup(&temp, target);
235   - __ocfs2_node_map_clear_bit(&temp, bit);
236   - ret = __ocfs2_node_map_is_empty(&temp);
237   - spin_unlock(&osb->node_map_lock);
238   -
239   - return ret;
240   -}
241   -
242   -static void __ocfs2_node_map_set(struct ocfs2_node_map *target,
243   - struct ocfs2_node_map *from)
244   -{
245   - int num_longs, i;
246   -
247   - BUG_ON(target->num_nodes != from->num_nodes);
248   - BUG_ON(target->num_nodes == 0);
249   -
250   - num_longs = BITS_TO_LONGS(target->num_nodes);
251   - for (i = 0; i < num_longs; i++)
252   - target->map[i] = from->map[i];
253   -}
254   -
255   -#endif /* 0 */
256   -
257   -/* Returns whether the recovery bit was actually set - it may not be
258   - * if a node is still marked as needing recovery */
259   -int ocfs2_recovery_map_set(struct ocfs2_super *osb,
260   - int num)
261   -{
262   - int set = 0;
263   -
264   - spin_lock(&osb->node_map_lock);
265   -
266   - if (!test_bit(num, osb->recovery_map.map)) {
267   - __ocfs2_node_map_set_bit(&osb->recovery_map, num);
268   - set = 1;
269   - }
270   -
271   - spin_unlock(&osb->node_map_lock);
272   -
273   - return set;
274   -}
275   -
276   -void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
277   - int num)
278   -{
279   - ocfs2_node_map_clear_bit(osb, &osb->recovery_map, num);
280   -}
281   -
282   -int ocfs2_node_map_iterate(struct ocfs2_super *osb,
283   - struct ocfs2_node_map *map,
284   - int idx)
285   -{
286   - int i = idx;
287   -
288   - idx = O2NM_INVALID_NODE_NUM;
289   - spin_lock(&osb->node_map_lock);
290   - if ((i != O2NM_INVALID_NODE_NUM) &&
291   - (i >= 0) &&
292   - (i < map->num_nodes)) {
293   - while(i < map->num_nodes) {
294   - if (test_bit(i, map->map)) {
295   - idx = i;
296   - break;
297   - }
298   - i++;
299   - }
300   - }
301   - spin_unlock(&osb->node_map_lock);
302   - return idx;
303 191 }
fs/ocfs2/heartbeat.h
... ... @@ -33,8 +33,6 @@
33 33  
34 34 /* node map functions - used to keep track of mounted and in-recovery
35 35 * nodes. */
36   -int ocfs2_node_map_is_empty(struct ocfs2_super *osb,
37   - struct ocfs2_node_map *map);
38 36 void ocfs2_node_map_set_bit(struct ocfs2_super *osb,
39 37 struct ocfs2_node_map *map,
40 38 int bit);
... ... @@ -44,18 +42,6 @@
44 42 int ocfs2_node_map_test_bit(struct ocfs2_super *osb,
45 43 struct ocfs2_node_map *map,
46 44 int bit);
47   -int ocfs2_node_map_iterate(struct ocfs2_super *osb,
48   - struct ocfs2_node_map *map,
49   - int idx);
50   -static inline int ocfs2_node_map_first_set_bit(struct ocfs2_super *osb,
51   - struct ocfs2_node_map *map)
52   -{
53   - return ocfs2_node_map_iterate(osb, map, 0);
54   -}
55   -int ocfs2_recovery_map_set(struct ocfs2_super *osb,
56   - int num);
57   -void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
58   - int num);
59 45  
60 46 #endif /* OCFS2_HEARTBEAT_H */
... ... @@ -64,6 +64,137 @@
64 64 int slot);
65 65 static int ocfs2_commit_thread(void *arg);
66 66  
  67 +
  68 +/*
  69 + * The recovery_list is a simple linked list of node numbers to recover.
  70 + * It is protected by the recovery_lock.
  71 + */
  72 +
  73 +struct ocfs2_recovery_map {
  74 + int rm_used;
  75 + unsigned int *rm_entries;
  76 +};
  77 +
  78 +int ocfs2_recovery_init(struct ocfs2_super *osb)
  79 +{
  80 + struct ocfs2_recovery_map *rm;
  81 +
  82 + mutex_init(&osb->recovery_lock);
  83 + osb->disable_recovery = 0;
  84 + osb->recovery_thread_task = NULL;
  85 + init_waitqueue_head(&osb->recovery_event);
  86 +
  87 + rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
  88 + osb->max_slots * sizeof(unsigned int),
  89 + GFP_KERNEL);
  90 + if (!rm) {
  91 + mlog_errno(-ENOMEM);
  92 + return -ENOMEM;
  93 + }
  94 +
  95 + rm->rm_entries = (unsigned int *)((char *)rm +
  96 + sizeof(struct ocfs2_recovery_map));
  97 + osb->recovery_map = rm;
  98 +
  99 + return 0;
  100 +}
  101 +
  102 +/* we can't grab the goofy sem lock from inside wait_event, so we use
  103 + * memory barriers to make sure that we'll see the null task before
  104 + * being woken up */
  105 +static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
  106 +{
  107 + mb();
  108 + return osb->recovery_thread_task != NULL;
  109 +}
  110 +
  111 +void ocfs2_recovery_exit(struct ocfs2_super *osb)
  112 +{
  113 + struct ocfs2_recovery_map *rm;
  114 +
  115 + /* disable any new recovery threads and wait for any currently
  116 + * running ones to exit. Do this before setting the vol_state. */
  117 + mutex_lock(&osb->recovery_lock);
  118 + osb->disable_recovery = 1;
  119 + mutex_unlock(&osb->recovery_lock);
  120 + wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
  121 +
  122 + /* At this point, we know that no more recovery threads can be
  123 + * launched, so wait for any recovery completion work to
  124 + * complete. */
  125 + flush_workqueue(ocfs2_wq);
  126 +
  127 + /*
  128 + * Now that recovery is shut down, and the osb is about to be
  129 + * freed, the osb_lock is not taken here.
  130 + */
  131 + rm = osb->recovery_map;
  132 + /* XXX: Should we bug if there are dirty entries? */
  133 +
  134 + kfree(rm);
  135 +}
  136 +
  137 +static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
  138 + unsigned int node_num)
  139 +{
  140 + int i;
  141 + struct ocfs2_recovery_map *rm = osb->recovery_map;
  142 +
  143 + assert_spin_locked(&osb->osb_lock);
  144 +
  145 + for (i = 0; i < rm->rm_used; i++) {
  146 + if (rm->rm_entries[i] == node_num)
  147 + return 1;
  148 + }
  149 +
  150 + return 0;
  151 +}
  152 +
  153 +/* Behaves like test-and-set. Returns the previous value */
  154 +static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
  155 + unsigned int node_num)
  156 +{
  157 + struct ocfs2_recovery_map *rm = osb->recovery_map;
  158 +
  159 + spin_lock(&osb->osb_lock);
  160 + if (__ocfs2_recovery_map_test(osb, node_num)) {
  161 + spin_unlock(&osb->osb_lock);
  162 + return 1;
  163 + }
  164 +
  165 + /* XXX: Can this be exploited? Not from o2dlm... */
  166 + BUG_ON(rm->rm_used >= osb->max_slots);
  167 +
  168 + rm->rm_entries[rm->rm_used] = node_num;
  169 + rm->rm_used++;
  170 + spin_unlock(&osb->osb_lock);
  171 +
  172 + return 0;
  173 +}
  174 +
  175 +static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
  176 + unsigned int node_num)
  177 +{
  178 + int i;
  179 + struct ocfs2_recovery_map *rm = osb->recovery_map;
  180 +
  181 + spin_lock(&osb->osb_lock);
  182 +
  183 + for (i = 0; i < rm->rm_used; i++) {
  184 + if (rm->rm_entries[i] == node_num)
  185 + break;
  186 + }
  187 +
  188 + if (i < rm->rm_used) {
  189 + /* XXX: be careful with the pointer math */
  190 + memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
  191 + (rm->rm_used - i - 1) * sizeof(unsigned int));
  192 + rm->rm_used--;
  193 + }
  194 +
  195 + spin_unlock(&osb->osb_lock);
  196 +}
  197 +
67 198 static int ocfs2_commit_cache(struct ocfs2_super *osb)
68 199 {
69 200 int status = 0;
... ... @@ -650,6 +781,23 @@
650 781 return status;
651 782 }
652 783  
  784 +static int ocfs2_recovery_completed(struct ocfs2_super *osb)
  785 +{
  786 + int empty;
  787 + struct ocfs2_recovery_map *rm = osb->recovery_map;
  788 +
  789 + spin_lock(&osb->osb_lock);
  790 + empty = (rm->rm_used == 0);
  791 + spin_unlock(&osb->osb_lock);
  792 +
  793 + return empty;
  794 +}
  795 +
  796 +void ocfs2_wait_for_recovery(struct ocfs2_super *osb)
  797 +{
  798 + wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
  799 +}
  800 +
653 801 /*
654 802 * JBD Might read a cached version of another nodes journal file. We
655 803 * don't want this as this file changes often and we get no
... ... @@ -848,6 +996,7 @@
848 996 {
849 997 int status, node_num;
850 998 struct ocfs2_super *osb = arg;
  999 + struct ocfs2_recovery_map *rm = osb->recovery_map;
851 1000  
852 1001 mlog_entry_void();
853 1002  
854 1003  
855 1004  
856 1005  
857 1006  
... ... @@ -863,26 +1012,29 @@
863 1012 goto bail;
864 1013 }
865 1014  
866   - while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
867   - node_num = ocfs2_node_map_first_set_bit(osb,
868   - &osb->recovery_map);
869   - if (node_num == O2NM_INVALID_NODE_NUM) {
870   - mlog(0, "Out of nodes to recover.\n");
871   - break;
872   - }
  1015 + spin_lock(&osb->osb_lock);
  1016 + while (rm->rm_used) {
  1017 + /* It's always safe to remove entry zero, as we won't
  1018 + * clear it until ocfs2_recover_node() has succeeded. */
  1019 + node_num = rm->rm_entries[0];
  1020 + spin_unlock(&osb->osb_lock);
873 1021  
874 1022 status = ocfs2_recover_node(osb, node_num);
875   - if (status < 0) {
  1023 + if (!status) {
  1024 + ocfs2_recovery_map_clear(osb, node_num);
  1025 + } else {
876 1026 mlog(ML_ERROR,
877 1027 "Error %d recovering node %d on device (%u,%u)!\n",
878 1028 status, node_num,
879 1029 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
880 1030 mlog(ML_ERROR, "Volume requires unmount.\n");
881   - continue;
882 1031 }
883 1032  
884   - ocfs2_recovery_map_clear(osb, node_num);
  1033 + spin_lock(&osb->osb_lock);
885 1034 }
  1035 + spin_unlock(&osb->osb_lock);
  1036 + mlog(0, "All nodes recovered\n");
  1037 +
886 1038 ocfs2_super_unlock(osb, 1);
887 1039  
888 1040 /* We always run recovery on our own orphan dir - the dead
... ... @@ -893,8 +1045,7 @@
893 1045  
894 1046 bail:
895 1047 mutex_lock(&osb->recovery_lock);
896   - if (!status &&
897   - !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
  1048 + if (!status && !ocfs2_recovery_completed(osb)) {
898 1049 mutex_unlock(&osb->recovery_lock);
899 1050 goto restart;
900 1051 }
... ... @@ -924,8 +1075,8 @@
924 1075  
925 1076 /* People waiting on recovery will wait on
926 1077 * the recovery map to empty. */
927   - if (!ocfs2_recovery_map_set(osb, node_num))
928   - mlog(0, "node %d already be in recovery.\n", node_num);
  1078 + if (ocfs2_recovery_map_set(osb, node_num))
  1079 + mlog(0, "node %d already in recovery map.\n", node_num);
929 1080  
930 1081 mlog(0, "starting recovery thread...\n");
931 1082  
... ... @@ -1197,7 +1348,7 @@
1197 1348 if (status == -ENOENT)
1198 1349 continue;
1199 1350  
1200   - if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num))
  1351 + if (__ocfs2_recovery_map_test(osb, node_num))
1201 1352 continue;
1202 1353 spin_unlock(&osb->osb_lock);
1203 1354  
... ... @@ -134,6 +134,10 @@
134 134  
135 135 /* Exported only for the journal struct init code in super.c. Do not call. */
136 136 void ocfs2_complete_recovery(struct work_struct *work);
  137 +void ocfs2_wait_for_recovery(struct ocfs2_super *osb);
  138 +
  139 +int ocfs2_recovery_init(struct ocfs2_super *osb);
  140 +void ocfs2_recovery_exit(struct ocfs2_super *osb);
137 141  
138 142 /*
139 143 * Journal Control:
... ... @@ -180,6 +180,7 @@
180 180  
181 181 struct ocfs2_journal;
182 182 struct ocfs2_slot_info;
  183 +struct ocfs2_recovery_map;
183 184 struct ocfs2_super
184 185 {
185 186 struct task_struct *commit_task;
... ... @@ -191,7 +192,6 @@
191 192 struct ocfs2_slot_info *slot_info;
192 193  
193 194 spinlock_t node_map_lock;
194   - struct ocfs2_node_map recovery_map;
195 195  
196 196 u64 root_blkno;
197 197 u64 system_dir_blkno;
... ... @@ -226,6 +226,7 @@
226 226  
227 227 atomic_t vol_state;
228 228 struct mutex recovery_lock;
  229 + struct ocfs2_recovery_map *recovery_map;
229 230 struct task_struct *recovery_thread_task;
230 231 int disable_recovery;
231 232 wait_queue_head_t checkpoint_event;
... ... @@ -1224,15 +1224,6 @@
1224 1224 return status;
1225 1225 }
1226 1226  
1227   -/* we can't grab the goofy sem lock from inside wait_event, so we use
1228   - * memory barriers to make sure that we'll see the null task before
1229   - * being woken up */
1230   -static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
1231   -{
1232   - mb();
1233   - return osb->recovery_thread_task != NULL;
1234   -}
1235   -
1236 1227 static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
1237 1228 {
1238 1229 int tmp;
1239 1230  
... ... @@ -1249,18 +1240,9 @@
1249 1240  
1250 1241 ocfs2_truncate_log_shutdown(osb);
1251 1242  
1252   - /* disable any new recovery threads and wait for any currently
1253   - * running ones to exit. Do this before setting the vol_state. */
1254   - mutex_lock(&osb->recovery_lock);
1255   - osb->disable_recovery = 1;
1256   - mutex_unlock(&osb->recovery_lock);
1257   - wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
  1243 + /* This will disable recovery and flush any recovery work. */
  1244 + ocfs2_recovery_exit(osb);
1258 1245  
1259   - /* At this point, we know that no more recovery threads can be
1260   - * launched, so wait for any recovery completion work to
1261   - * complete. */
1262   - flush_workqueue(ocfs2_wq);
1263   -
1264 1246 ocfs2_journal_shutdown(osb);
1265 1247  
1266 1248 ocfs2_sync_blockdev(sb);
... ... @@ -1368,7 +1350,6 @@
1368 1350 osb->s_sectsize_bits = blksize_bits(sector_size);
1369 1351 BUG_ON(!osb->s_sectsize_bits);
1370 1352  
1371   - init_waitqueue_head(&osb->recovery_event);
1372 1353 spin_lock_init(&osb->dc_task_lock);
1373 1354 init_waitqueue_head(&osb->dc_event);
1374 1355 osb->dc_work_sequence = 0;
... ... @@ -1388,10 +1369,12 @@
1388 1369 snprintf(osb->dev_str, sizeof(osb->dev_str), "%u,%u",
1389 1370 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
1390 1371  
1391   - mutex_init(&osb->recovery_lock);
1392   -
1393   - osb->disable_recovery = 0;
1394   - osb->recovery_thread_task = NULL;
  1372 + status = ocfs2_recovery_init(osb);
  1373 + if (status) {
  1374 + mlog(ML_ERROR, "Unable to initialize recovery state\n");
  1375 + mlog_errno(status);
  1376 + goto bail;
  1377 + }
1395 1378  
1396 1379 init_waitqueue_head(&osb->checkpoint_event);
1397 1380 atomic_set(&osb->needs_checkpoint, 0);