Commit 3d7a641e544e428191667e8b1f83f96fa46dbd65

Authored by David Howells
1 parent 66b00a7c93

SLOW_WORK: Wait for outstanding work items belonging to a module to clear

Wait for outstanding slow work items belonging to a module to clear when
unregistering that module as a user of the facility.  This prevents the put_ref
code of a work item from being taken away before it returns.

Signed-off-by: David Howells <dhowells@redhat.com>

Showing 8 changed files with 150 additions and 16 deletions Side-by-side Diff

Documentation/slow-work.txt
... ... @@ -64,9 +64,11 @@
64 64 Firstly, a module or subsystem wanting to make use of slow work items must
65 65 register its interest:
66 66  
67   - int ret = slow_work_register_user();
  67 + int ret = slow_work_register_user(struct module *module);
68 68  
69   -This will return 0 if successful, or a -ve error upon failure.
  69 +This will return 0 if successful, or a -ve error upon failure. The module
  70 +pointer should be the module interested in using this facility (almost
  71 +certainly THIS_MODULE).
70 72  
71 73  
72 74 Slow work items may then be set up by:
... ... @@ -110,7 +112,12 @@
110 112 module has no further interest in the facility, it should unregister its
111 113 interest:
112 114  
113   - slow_work_unregister_user();
  115 + slow_work_unregister_user(struct module *module);
  116 +
  117 +The module pointer is used to wait for all outstanding work items for that
  118 +module before completing the unregistration. This prevents the put_ref() code
  119 +from being taken away before it completes. module should almost certainly be
  120 +THIS_MODULE.
114 121  
115 122  
116 123 ===============
... ... @@ -48,7 +48,7 @@
48 48 {
49 49 int ret;
50 50  
51   - ret = slow_work_register_user();
  51 + ret = slow_work_register_user(THIS_MODULE);
52 52 if (ret < 0)
53 53 goto error_slow_work;
54 54  
... ... @@ -80,7 +80,7 @@
80 80 error_cookie_jar:
81 81 fscache_proc_cleanup();
82 82 error_proc:
83   - slow_work_unregister_user();
  83 + slow_work_unregister_user(THIS_MODULE);
84 84 error_slow_work:
85 85 return ret;
86 86 }
... ... @@ -97,7 +97,7 @@
97 97 kobject_put(fscache_root);
98 98 kmem_cache_destroy(fscache_cookie_jar);
99 99 fscache_proc_cleanup();
100   - slow_work_unregister_user();
  100 + slow_work_unregister_user(THIS_MODULE);
101 101 printk(KERN_NOTICE "FS-Cache: Unloaded\n");
102 102 }
103 103  
... ... @@ -45,6 +45,7 @@
45 45 static void fscache_dequeue_object(struct fscache_object *);
46 46  
47 47 const struct slow_work_ops fscache_object_slow_work_ops = {
  48 + .owner = THIS_MODULE,
48 49 .get_ref = fscache_object_slow_work_get_ref,
49 50 .put_ref = fscache_object_slow_work_put_ref,
50 51 .execute = fscache_object_slow_work_execute,
fs/fscache/operation.c
... ... @@ -453,6 +453,7 @@
453 453 }
454 454  
455 455 const struct slow_work_ops fscache_op_slow_work_ops = {
  456 + .owner = THIS_MODULE,
456 457 .get_ref = fscache_op_get_ref,
457 458 .put_ref = fscache_op_put_ref,
458 459 .execute = fscache_op_execute,
... ... @@ -114,7 +114,7 @@
114 114 if (error)
115 115 goto fail_unregister;
116 116  
117   - error = slow_work_register_user();
  117 + error = slow_work_register_user(THIS_MODULE);
118 118 if (error)
119 119 goto fail_slow;
120 120  
... ... @@ -163,7 +163,7 @@
163 163 gfs2_unregister_debugfs();
164 164 unregister_filesystem(&gfs2_fs_type);
165 165 unregister_filesystem(&gfs2meta_fs_type);
166   - slow_work_unregister_user();
  166 + slow_work_unregister_user(THIS_MODULE);
167 167  
168 168 kmem_cache_destroy(gfs2_quotad_cachep);
169 169 kmem_cache_destroy(gfs2_rgrpd_cachep);
... ... @@ -593,6 +593,7 @@
593 593 }
594 594  
595 595 struct slow_work_ops gfs2_recover_ops = {
  596 + .owner = THIS_MODULE,
596 597 .get_ref = gfs2_recover_get_ref,
597 598 .put_ref = gfs2_recover_put_ref,
598 599 .execute = gfs2_recover_work,
include/linux/slow-work.h
... ... @@ -24,6 +24,9 @@
24 24 * The operations used to support slow work items
25 25 */
26 26 struct slow_work_ops {
  27 + /* owner */
  28 + struct module *owner;
  29 +
27 30 /* get a ref on a work item
28 31 * - return 0 if successful, -ve if not
29 32 */
... ... @@ -42,6 +45,7 @@
42 45 * queued
43 46 */
44 47 struct slow_work {
  48 + struct module *owner; /* the owning module */
45 49 unsigned long flags;
46 50 #define SLOW_WORK_PENDING 0 /* item pending (further) execution */
47 51 #define SLOW_WORK_EXECUTING 1 /* item currently executing */
... ... @@ -84,8 +88,8 @@
84 88 }
85 89  
86 90 extern int slow_work_enqueue(struct slow_work *work);
87   -extern int slow_work_register_user(void);
88   -extern void slow_work_unregister_user(void);
  91 +extern int slow_work_register_user(struct module *owner);
  92 +extern void slow_work_unregister_user(struct module *owner);
89 93  
90 94 #ifdef CONFIG_SYSCTL
91 95 extern ctl_table slow_work_sysctls[];
... ... @@ -22,6 +22,8 @@
22 22 #define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after
23 23 * OOM */
24 24  
  25 +#define SLOW_WORK_THREAD_LIMIT 255 /* abs maximum number of slow-work threads */
  26 +
25 27 static void slow_work_cull_timeout(unsigned long);
26 28 static void slow_work_oom_timeout(unsigned long);
27 29  
... ... @@ -46,7 +48,7 @@
46 48  
47 49 #ifdef CONFIG_SYSCTL
48 50 static const int slow_work_min_min_threads = 2;
49   -static int slow_work_max_max_threads = 255;
  51 +static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
50 52 static const int slow_work_min_vslow = 1;
51 53 static const int slow_work_max_vslow = 99;
52 54  
... ... @@ -98,6 +100,23 @@
98 100 static struct slow_work slow_work_new_thread; /* new thread starter */
99 101  
100 102 /*
  103 + * slow work ID allocation (use slow_work_queue_lock)
  104 + */
  105 +static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
  106 +
  107 +/*
  108 + * Unregistration tracking to prevent put_ref() from disappearing during module
  109 + * unload
  110 + */
  111 +#ifdef CONFIG_MODULES
  112 +static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
  113 +static struct module *slow_work_unreg_module;
  114 +static struct slow_work *slow_work_unreg_work_item;
  115 +static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
  116 +static DEFINE_MUTEX(slow_work_unreg_sync_lock);
  117 +#endif
  118 +
  119 +/*
101 120 * The queues of work items and the lock governing access to them. These are
102 121 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
103 122 * as the number of threads bears no relation to the number of CPUs.
104 123  
... ... @@ -149,8 +168,11 @@
149 168 * Attempt to execute stuff queued on a slow thread. Return true if we managed
150 169 * it, false if there was nothing to do.
151 170 */
152   -static bool slow_work_execute(void)
  171 +static bool slow_work_execute(int id)
153 172 {
  173 +#ifdef CONFIG_MODULES
  174 + struct module *module;
  175 +#endif
154 176 struct slow_work *work = NULL;
155 177 unsigned vsmax;
156 178 bool very_slow;
... ... @@ -186,6 +208,12 @@
186 208 } else {
187 209 very_slow = false; /* avoid the compiler warning */
188 210 }
  211 +
  212 +#ifdef CONFIG_MODULES
  213 + if (work)
  214 + slow_work_thread_processing[id] = work->owner;
  215 +#endif
  216 +
189 217 spin_unlock_irq(&slow_work_queue_lock);
190 218  
191 219 if (!work)
192 220  
... ... @@ -219,7 +247,18 @@
219 247 spin_unlock_irq(&slow_work_queue_lock);
220 248 }
221 249  
  250 + /* sort out the race between module unloading and put_ref() */
222 251 work->ops->put_ref(work);
  252 +
  253 +#ifdef CONFIG_MODULES
  254 + module = slow_work_thread_processing[id];
  255 + slow_work_thread_processing[id] = NULL;
  256 + smp_mb();
  257 + if (slow_work_unreg_work_item == work ||
  258 + slow_work_unreg_module == module)
  259 + wake_up_all(&slow_work_unreg_wq);
  260 +#endif
  261 +
223 262 return true;
224 263  
225 264 auto_requeue:
... ... @@ -232,6 +271,7 @@
232 271 else
233 272 list_add_tail(&work->link, &slow_work_queue);
234 273 spin_unlock_irq(&slow_work_queue_lock);
  274 + slow_work_thread_processing[id] = NULL;
235 275 return true;
236 276 }
237 277  
238 278  
... ... @@ -368,13 +408,22 @@
368 408 */
369 409 static int slow_work_thread(void *_data)
370 410 {
371   - int vsmax;
  411 + int vsmax, id;
372 412  
373 413 DEFINE_WAIT(wait);
374 414  
375 415 set_freezable();
376 416 set_user_nice(current, -5);
377 417  
  418 + /* allocate ourselves an ID */
  419 + spin_lock_irq(&slow_work_queue_lock);
  420 + id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
  421 + BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
  422 + __set_bit(id, slow_work_ids);
  423 + spin_unlock_irq(&slow_work_queue_lock);
  424 +
  425 + sprintf(current->comm, "kslowd%03u", id);
  426 +
378 427 for (;;) {
379 428 vsmax = vslow_work_proportion;
380 429 vsmax *= atomic_read(&slow_work_thread_count);
... ... @@ -395,7 +444,7 @@
395 444 vsmax *= atomic_read(&slow_work_thread_count);
396 445 vsmax /= 100;
397 446  
398   - if (slow_work_available(vsmax) && slow_work_execute()) {
  447 + if (slow_work_available(vsmax) && slow_work_execute(id)) {
399 448 cond_resched();
400 449 if (list_empty(&slow_work_queue) &&
401 450 list_empty(&vslow_work_queue) &&
... ... @@ -412,6 +461,10 @@
412 461 break;
413 462 }
414 463  
  464 + spin_lock_irq(&slow_work_queue_lock);
  465 + __clear_bit(id, slow_work_ids);
  466 + spin_unlock_irq(&slow_work_queue_lock);
  467 +
415 468 if (atomic_dec_and_test(&slow_work_thread_count))
416 469 complete_and_exit(&slow_work_last_thread_exited, 0);
417 470 return 0;
... ... @@ -475,6 +528,7 @@
475 528 }
476 529  
477 530 static const struct slow_work_ops slow_work_new_thread_ops = {
  531 + .owner = THIS_MODULE,
478 532 .get_ref = slow_work_new_thread_get_ref,
479 533 .put_ref = slow_work_new_thread_put_ref,
480 534 .execute = slow_work_new_thread_execute,
481 535  
... ... @@ -546,12 +600,13 @@
546 600  
547 601 /**
548 602 * slow_work_register_user - Register a user of the facility
  603 + * @module: The module about to make use of the facility
549 604 *
550 605 * Register a user of the facility, starting up the initial threads if there
551 606 * aren't any other users at this point. This will return 0 if successful, or
552 607 * an error if not.
553 608 */
554   -int slow_work_register_user(void)
  609 +int slow_work_register_user(struct module *module)
555 610 {
556 611 struct task_struct *p;
557 612 int loop;
558 613  
559 614  
560 615  
561 616  
... ... @@ -598,14 +653,79 @@
598 653 }
599 654 EXPORT_SYMBOL(slow_work_register_user);
600 655  
  656 +/*
  657 + * wait for all outstanding items from the calling module to complete
  658 + * - note that more items may be queued whilst we're waiting
  659 + */
  660 +static void slow_work_wait_for_items(struct module *module)
  661 +{
  662 + DECLARE_WAITQUEUE(myself, current);
  663 + struct slow_work *work;
  664 + int loop;
  665 +
  666 + mutex_lock(&slow_work_unreg_sync_lock);
  667 + add_wait_queue(&slow_work_unreg_wq, &myself);
  668 +
  669 + for (;;) {
  670 + spin_lock_irq(&slow_work_queue_lock);
  671 +
  672 + /* first of all, we wait for the last queued item in each list
  673 + * to be processed */
  674 + list_for_each_entry_reverse(work, &vslow_work_queue, link) {
  675 + if (work->owner == module) {
  676 + set_current_state(TASK_UNINTERRUPTIBLE);
  677 + slow_work_unreg_work_item = work;
  678 + goto do_wait;
  679 + }
  680 + }
  681 + list_for_each_entry_reverse(work, &slow_work_queue, link) {
  682 + if (work->owner == module) {
  683 + set_current_state(TASK_UNINTERRUPTIBLE);
  684 + slow_work_unreg_work_item = work;
  685 + goto do_wait;
  686 + }
  687 + }
  688 +
  689 + /* then we wait for the items being processed to finish */
  690 + slow_work_unreg_module = module;
  691 + smp_mb();
  692 + for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
  693 + if (slow_work_thread_processing[loop] == module)
  694 + goto do_wait;
  695 + }
  696 + spin_unlock_irq(&slow_work_queue_lock);
  697 + break; /* okay, we're done */
  698 +
  699 + do_wait:
  700 + spin_unlock_irq(&slow_work_queue_lock);
  701 + schedule();
  702 + slow_work_unreg_work_item = NULL;
  703 + slow_work_unreg_module = NULL;
  704 + }
  705 +
  706 + remove_wait_queue(&slow_work_unreg_wq, &myself);
  707 + mutex_unlock(&slow_work_unreg_sync_lock);
  708 +}
  709 +
601 710 /**
602 711 * slow_work_unregister_user - Unregister a user of the facility
  712 + * @module: The module whose items should be cleared
603 713 *
604 714 * Unregister a user of the facility, killing all the threads if this was the
605 715 * last one.
  716 + *
  717 + * This waits for all the work items belonging to the nominated module to go
  718 + * away before proceeding.
606 719 */
607   -void slow_work_unregister_user(void)
  720 +void slow_work_unregister_user(struct module *module)
608 721 {
  722 + /* first of all, wait for all outstanding items from the calling module
  723 + * to complete */
  724 + if (module)
  725 + slow_work_wait_for_items(module);
  726 +
  727 + /* then we can actually go about shutting down the facility if need
  728 + * be */
609 729 mutex_lock(&slow_work_user_lock);
610 730  
611 731 BUG_ON(slow_work_user_count <= 0);