Commit 777c6c5f1f6e757ae49ecca2ed72d6b1f523c007

Authored by Johannes Weiner
Committed by Linus Torvalds
1 parent 40b0bb1e73

wait: prevent exclusive waiter starvation

With exclusive waiters, every process woken up through the wait queue must
ensure that the next waiter down the line is woken when it has finished.

Interruptible waiters don't do that when aborting due to a signal.  And if
an aborting waiter is concurrently woken up through the waitqueue, noone
will ever wake up the next waiter.

This has been observed with __wait_on_bit_lock() used by
lock_page_killable(): the first contender on the queue was aborting when
the actual lock holder woke it up concurrently.  The aborted contender
didn't acquire the lock and therefor never did an unlock followed by
waking up the next waiter.

Add abort_exclusive_wait() which removes the process' wait descriptor from
the waitqueue, iff still queued, or wakes up the next waiter otherwise.
It does so under the waitqueue lock.  Racing with a wake up means the
aborting process is either already woken (removed from the queue) and will
wake up the next waiter, or it will remove itself from the queue and the
concurrent wake up will apply to the next waiter after it.

Use abort_exclusive_wait() in __wait_event_interruptible_exclusive() and
__wait_on_bit_lock() when they were interrupted by other means than a wake
up through the queue.

[akpm@linux-foundation.org: coding-style fixes]
Reported-by: Chris Mason <chris.mason@oracle.com>
Signed-off-by: Johannes Weiner <hannes@cmpxchg.org>
Mentored-by: Oleg Nesterov <oleg@redhat.com>
Cc: Peter Zijlstra <a.p.zijlstra@chello.nl>
Cc: Matthew Wilcox <matthew@wil.cx>
Cc: Chuck Lever <cel@citi.umich.edu>
Cc: Nick Piggin <nickpiggin@yahoo.com.au>
Cc: Ingo Molnar <mingo@elte.hu>
Cc: <stable@kernel.org>		["after some testing"]
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>

Showing 3 changed files with 63 additions and 11 deletions Side-by-side Diff

include/linux/wait.h
... ... @@ -132,6 +132,8 @@
132 132 list_del(&old->task_list);
133 133 }
134 134  
  135 +void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
  136 + int nr_exclusive, int sync, void *key);
135 137 void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr, void *key);
136 138 extern void __wake_up_locked(wait_queue_head_t *q, unsigned int mode);
137 139 extern void __wake_up_sync(wait_queue_head_t *q, unsigned int mode, int nr);
138 140  
139 141  
140 142  
... ... @@ -333,16 +335,19 @@
333 335 for (;;) { \
334 336 prepare_to_wait_exclusive(&wq, &__wait, \
335 337 TASK_INTERRUPTIBLE); \
336   - if (condition) \
  338 + if (condition) { \
  339 + finish_wait(&wq, &__wait); \
337 340 break; \
  341 + } \
338 342 if (!signal_pending(current)) { \
339 343 schedule(); \
340 344 continue; \
341 345 } \
342 346 ret = -ERESTARTSYS; \
  347 + abort_exclusive_wait(&wq, &__wait, \
  348 + TASK_INTERRUPTIBLE, NULL); \
343 349 break; \
344 350 } \
345   - finish_wait(&wq, &__wait); \
346 351 } while (0)
347 352  
348 353 #define wait_event_interruptible_exclusive(wq, condition) \
... ... @@ -431,6 +436,8 @@
431 436 void prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state);
432 437 void prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state);
433 438 void finish_wait(wait_queue_head_t *q, wait_queue_t *wait);
  439 +void abort_exclusive_wait(wait_queue_head_t *q, wait_queue_t *wait,
  440 + unsigned int mode, void *key);
434 441 int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key);
435 442 int wake_bit_function(wait_queue_t *wait, unsigned mode, int sync, void *key);
436 443  
... ... @@ -4697,8 +4697,8 @@
4697 4697 * started to run but is not in state TASK_RUNNING. try_to_wake_up() returns
4698 4698 * zero in this (rare) case, and we handle it by continuing to scan the queue.
4699 4699 */
4700   -static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
4701   - int nr_exclusive, int sync, void *key)
  4700 +void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
  4701 + int nr_exclusive, int sync, void *key)
4702 4702 {
4703 4703 wait_queue_t *curr, *next;
4704 4704  
... ... @@ -91,6 +91,15 @@
91 91 }
92 92 EXPORT_SYMBOL(prepare_to_wait_exclusive);
93 93  
  94 +/*
  95 + * finish_wait - clean up after waiting in a queue
  96 + * @q: waitqueue waited on
  97 + * @wait: wait descriptor
  98 + *
  99 + * Sets current thread back to running state and removes
  100 + * the wait descriptor from the given waitqueue if still
  101 + * queued.
  102 + */
94 103 void finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
95 104 {
96 105 unsigned long flags;
... ... @@ -117,6 +126,39 @@
117 126 }
118 127 EXPORT_SYMBOL(finish_wait);
119 128  
  129 +/*
  130 + * abort_exclusive_wait - abort exclusive waiting in a queue
  131 + * @q: waitqueue waited on
  132 + * @wait: wait descriptor
  133 + * @state: runstate of the waiter to be woken
  134 + * @key: key to identify a wait bit queue or %NULL
  135 + *
  136 + * Sets current thread back to running state and removes
  137 + * the wait descriptor from the given waitqueue if still
  138 + * queued.
  139 + *
  140 + * Wakes up the next waiter if the caller is concurrently
  141 + * woken up through the queue.
  142 + *
  143 + * This prevents waiter starvation where an exclusive waiter
  144 + * aborts and is woken up concurrently and noone wakes up
  145 + * the next waiter.
  146 + */
  147 +void abort_exclusive_wait(wait_queue_head_t *q, wait_queue_t *wait,
  148 + unsigned int mode, void *key)
  149 +{
  150 + unsigned long flags;
  151 +
  152 + __set_current_state(TASK_RUNNING);
  153 + spin_lock_irqsave(&q->lock, flags);
  154 + if (!list_empty(&wait->task_list))
  155 + list_del_init(&wait->task_list);
  156 + else if (waitqueue_active(q))
  157 + __wake_up_common(q, mode, 1, 0, key);
  158 + spin_unlock_irqrestore(&q->lock, flags);
  159 +}
  160 +EXPORT_SYMBOL(abort_exclusive_wait);
  161 +
120 162 int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
121 163 {
122 164 int ret = default_wake_function(wait, mode, sync, key);
123 165  
124 166  
125 167  
... ... @@ -177,17 +219,20 @@
177 219 __wait_on_bit_lock(wait_queue_head_t *wq, struct wait_bit_queue *q,
178 220 int (*action)(void *), unsigned mode)
179 221 {
180   - int ret = 0;
181   -
182 222 do {
  223 + int ret;
  224 +
183 225 prepare_to_wait_exclusive(wq, &q->wait, mode);
184   - if (test_bit(q->key.bit_nr, q->key.flags)) {
185   - if ((ret = (*action)(q->key.flags)))
186   - break;
187   - }
  226 + if (!test_bit(q->key.bit_nr, q->key.flags))
  227 + continue;
  228 + ret = action(q->key.flags);
  229 + if (!ret)
  230 + continue;
  231 + abort_exclusive_wait(wq, &q->wait, mode, &q->key);
  232 + return ret;
188 233 } while (test_and_set_bit(q->key.bit_nr, q->key.flags));
189 234 finish_wait(wq, &q->wait);
190   - return ret;
  235 + return 0;
191 236 }
192 237 EXPORT_SYMBOL(__wait_on_bit_lock);
193 238