Commit 65a64464349883891e21e74af16c05d6e1eeb4e9

Authored by Andi Kleen
1 parent 5d5429af06

HWPOISON: Allow schedule_on_each_cpu() from keventd

Right now when calling schedule_on_each_cpu() from keventd there
is a deadlock because it tries to schedule a work item on the current CPU
too. This happens via lru_add_drain_all() in hwpoison.

Just call the function for the current CPU in this case. This is actually
faster too.

Debugging with Fengguang Wu & Max Asbock

Signed-off-by: Andi Kleen <ak@linux.intel.com>

Showing 1 changed file with 19 additions and 2 deletions Inline Diff

1 /* 1 /*
2 * linux/kernel/workqueue.c 2 * linux/kernel/workqueue.c
3 * 3 *
4 * Generic mechanism for defining kernel helper threads for running 4 * Generic mechanism for defining kernel helper threads for running
5 * arbitrary tasks in process context. 5 * arbitrary tasks in process context.
6 * 6 *
7 * Started by Ingo Molnar, Copyright (C) 2002 7 * Started by Ingo Molnar, Copyright (C) 2002
8 * 8 *
9 * Derived from the taskqueue/keventd code by: 9 * Derived from the taskqueue/keventd code by:
10 * 10 *
11 * David Woodhouse <dwmw2@infradead.org> 11 * David Woodhouse <dwmw2@infradead.org>
12 * Andrew Morton 12 * Andrew Morton
13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de>
14 * Theodore Ts'o <tytso@mit.edu> 14 * Theodore Ts'o <tytso@mit.edu>
15 * 15 *
16 * Made to use alloc_percpu by Christoph Lameter. 16 * Made to use alloc_percpu by Christoph Lameter.
17 */ 17 */
18 18
19 #include <linux/module.h> 19 #include <linux/module.h>
20 #include <linux/kernel.h> 20 #include <linux/kernel.h>
21 #include <linux/sched.h> 21 #include <linux/sched.h>
22 #include <linux/init.h> 22 #include <linux/init.h>
23 #include <linux/signal.h> 23 #include <linux/signal.h>
24 #include <linux/completion.h> 24 #include <linux/completion.h>
25 #include <linux/workqueue.h> 25 #include <linux/workqueue.h>
26 #include <linux/slab.h> 26 #include <linux/slab.h>
27 #include <linux/cpu.h> 27 #include <linux/cpu.h>
28 #include <linux/notifier.h> 28 #include <linux/notifier.h>
29 #include <linux/kthread.h> 29 #include <linux/kthread.h>
30 #include <linux/hardirq.h> 30 #include <linux/hardirq.h>
31 #include <linux/mempolicy.h> 31 #include <linux/mempolicy.h>
32 #include <linux/freezer.h> 32 #include <linux/freezer.h>
33 #include <linux/kallsyms.h> 33 #include <linux/kallsyms.h>
34 #include <linux/debug_locks.h> 34 #include <linux/debug_locks.h>
35 #include <linux/lockdep.h> 35 #include <linux/lockdep.h>
36 #define CREATE_TRACE_POINTS 36 #define CREATE_TRACE_POINTS
37 #include <trace/events/workqueue.h> 37 #include <trace/events/workqueue.h>
38 38
39 /* 39 /*
40 * The per-CPU workqueue (if single thread, we always use the first 40 * The per-CPU workqueue (if single thread, we always use the first
41 * possible cpu). 41 * possible cpu).
42 */ 42 */
43 struct cpu_workqueue_struct { 43 struct cpu_workqueue_struct {
44 44
45 spinlock_t lock; 45 spinlock_t lock;
46 46
47 struct list_head worklist; 47 struct list_head worklist;
48 wait_queue_head_t more_work; 48 wait_queue_head_t more_work;
49 struct work_struct *current_work; 49 struct work_struct *current_work;
50 50
51 struct workqueue_struct *wq; 51 struct workqueue_struct *wq;
52 struct task_struct *thread; 52 struct task_struct *thread;
53 } ____cacheline_aligned; 53 } ____cacheline_aligned;
54 54
55 /* 55 /*
56 * The externally visible workqueue abstraction is an array of 56 * The externally visible workqueue abstraction is an array of
57 * per-CPU workqueues: 57 * per-CPU workqueues:
58 */ 58 */
59 struct workqueue_struct { 59 struct workqueue_struct {
60 struct cpu_workqueue_struct *cpu_wq; 60 struct cpu_workqueue_struct *cpu_wq;
61 struct list_head list; 61 struct list_head list;
62 const char *name; 62 const char *name;
63 int singlethread; 63 int singlethread;
64 int freezeable; /* Freeze threads during suspend */ 64 int freezeable; /* Freeze threads during suspend */
65 int rt; 65 int rt;
66 #ifdef CONFIG_LOCKDEP 66 #ifdef CONFIG_LOCKDEP
67 struct lockdep_map lockdep_map; 67 struct lockdep_map lockdep_map;
68 #endif 68 #endif
69 }; 69 };
70 70
71 /* Serializes the accesses to the list of workqueues. */ 71 /* Serializes the accesses to the list of workqueues. */
72 static DEFINE_SPINLOCK(workqueue_lock); 72 static DEFINE_SPINLOCK(workqueue_lock);
73 static LIST_HEAD(workqueues); 73 static LIST_HEAD(workqueues);
74 74
75 static int singlethread_cpu __read_mostly; 75 static int singlethread_cpu __read_mostly;
76 static const struct cpumask *cpu_singlethread_map __read_mostly; 76 static const struct cpumask *cpu_singlethread_map __read_mostly;
77 /* 77 /*
78 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD 78 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
79 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work 79 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
80 * which comes in between can't use for_each_online_cpu(). We could 80 * which comes in between can't use for_each_online_cpu(). We could
81 * use cpu_possible_map, the cpumask below is more a documentation 81 * use cpu_possible_map, the cpumask below is more a documentation
82 * than optimization. 82 * than optimization.
83 */ 83 */
84 static cpumask_var_t cpu_populated_map __read_mostly; 84 static cpumask_var_t cpu_populated_map __read_mostly;
85 85
86 /* If it's single threaded, it isn't in the list of workqueues. */ 86 /* If it's single threaded, it isn't in the list of workqueues. */
87 static inline int is_wq_single_threaded(struct workqueue_struct *wq) 87 static inline int is_wq_single_threaded(struct workqueue_struct *wq)
88 { 88 {
89 return wq->singlethread; 89 return wq->singlethread;
90 } 90 }
91 91
92 static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) 92 static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
93 { 93 {
94 return is_wq_single_threaded(wq) 94 return is_wq_single_threaded(wq)
95 ? cpu_singlethread_map : cpu_populated_map; 95 ? cpu_singlethread_map : cpu_populated_map;
96 } 96 }
97 97
98 static 98 static
99 struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu) 99 struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
100 { 100 {
101 if (unlikely(is_wq_single_threaded(wq))) 101 if (unlikely(is_wq_single_threaded(wq)))
102 cpu = singlethread_cpu; 102 cpu = singlethread_cpu;
103 return per_cpu_ptr(wq->cpu_wq, cpu); 103 return per_cpu_ptr(wq->cpu_wq, cpu);
104 } 104 }
105 105
106 /* 106 /*
107 * Set the workqueue on which a work item is to be run 107 * Set the workqueue on which a work item is to be run
108 * - Must *only* be called if the pending flag is set 108 * - Must *only* be called if the pending flag is set
109 */ 109 */
110 static inline void set_wq_data(struct work_struct *work, 110 static inline void set_wq_data(struct work_struct *work,
111 struct cpu_workqueue_struct *cwq) 111 struct cpu_workqueue_struct *cwq)
112 { 112 {
113 unsigned long new; 113 unsigned long new;
114 114
115 BUG_ON(!work_pending(work)); 115 BUG_ON(!work_pending(work));
116 116
117 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); 117 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
118 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 118 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
119 atomic_long_set(&work->data, new); 119 atomic_long_set(&work->data, new);
120 } 120 }
121 121
122 static inline 122 static inline
123 struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) 123 struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
124 { 124 {
125 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 125 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
126 } 126 }
127 127
128 static void insert_work(struct cpu_workqueue_struct *cwq, 128 static void insert_work(struct cpu_workqueue_struct *cwq,
129 struct work_struct *work, struct list_head *head) 129 struct work_struct *work, struct list_head *head)
130 { 130 {
131 trace_workqueue_insertion(cwq->thread, work); 131 trace_workqueue_insertion(cwq->thread, work);
132 132
133 set_wq_data(work, cwq); 133 set_wq_data(work, cwq);
134 /* 134 /*
135 * Ensure that we get the right work->data if we see the 135 * Ensure that we get the right work->data if we see the
136 * result of list_add() below, see try_to_grab_pending(). 136 * result of list_add() below, see try_to_grab_pending().
137 */ 137 */
138 smp_wmb(); 138 smp_wmb();
139 list_add_tail(&work->entry, head); 139 list_add_tail(&work->entry, head);
140 wake_up(&cwq->more_work); 140 wake_up(&cwq->more_work);
141 } 141 }
142 142
143 static void __queue_work(struct cpu_workqueue_struct *cwq, 143 static void __queue_work(struct cpu_workqueue_struct *cwq,
144 struct work_struct *work) 144 struct work_struct *work)
145 { 145 {
146 unsigned long flags; 146 unsigned long flags;
147 147
148 spin_lock_irqsave(&cwq->lock, flags); 148 spin_lock_irqsave(&cwq->lock, flags);
149 insert_work(cwq, work, &cwq->worklist); 149 insert_work(cwq, work, &cwq->worklist);
150 spin_unlock_irqrestore(&cwq->lock, flags); 150 spin_unlock_irqrestore(&cwq->lock, flags);
151 } 151 }
152 152
153 /** 153 /**
154 * queue_work - queue work on a workqueue 154 * queue_work - queue work on a workqueue
155 * @wq: workqueue to use 155 * @wq: workqueue to use
156 * @work: work to queue 156 * @work: work to queue
157 * 157 *
158 * Returns 0 if @work was already on a queue, non-zero otherwise. 158 * Returns 0 if @work was already on a queue, non-zero otherwise.
159 * 159 *
160 * We queue the work to the CPU on which it was submitted, but if the CPU dies 160 * We queue the work to the CPU on which it was submitted, but if the CPU dies
161 * it can be processed by another CPU. 161 * it can be processed by another CPU.
162 */ 162 */
163 int queue_work(struct workqueue_struct *wq, struct work_struct *work) 163 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
164 { 164 {
165 int ret; 165 int ret;
166 166
167 ret = queue_work_on(get_cpu(), wq, work); 167 ret = queue_work_on(get_cpu(), wq, work);
168 put_cpu(); 168 put_cpu();
169 169
170 return ret; 170 return ret;
171 } 171 }
172 EXPORT_SYMBOL_GPL(queue_work); 172 EXPORT_SYMBOL_GPL(queue_work);
173 173
174 /** 174 /**
175 * queue_work_on - queue work on specific cpu 175 * queue_work_on - queue work on specific cpu
176 * @cpu: CPU number to execute work on 176 * @cpu: CPU number to execute work on
177 * @wq: workqueue to use 177 * @wq: workqueue to use
178 * @work: work to queue 178 * @work: work to queue
179 * 179 *
180 * Returns 0 if @work was already on a queue, non-zero otherwise. 180 * Returns 0 if @work was already on a queue, non-zero otherwise.
181 * 181 *
182 * We queue the work to a specific CPU, the caller must ensure it 182 * We queue the work to a specific CPU, the caller must ensure it
183 * can't go away. 183 * can't go away.
184 */ 184 */
185 int 185 int
186 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 186 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
187 { 187 {
188 int ret = 0; 188 int ret = 0;
189 189
190 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 190 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
191 BUG_ON(!list_empty(&work->entry)); 191 BUG_ON(!list_empty(&work->entry));
192 __queue_work(wq_per_cpu(wq, cpu), work); 192 __queue_work(wq_per_cpu(wq, cpu), work);
193 ret = 1; 193 ret = 1;
194 } 194 }
195 return ret; 195 return ret;
196 } 196 }
197 EXPORT_SYMBOL_GPL(queue_work_on); 197 EXPORT_SYMBOL_GPL(queue_work_on);
198 198
199 static void delayed_work_timer_fn(unsigned long __data) 199 static void delayed_work_timer_fn(unsigned long __data)
200 { 200 {
201 struct delayed_work *dwork = (struct delayed_work *)__data; 201 struct delayed_work *dwork = (struct delayed_work *)__data;
202 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); 202 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
203 struct workqueue_struct *wq = cwq->wq; 203 struct workqueue_struct *wq = cwq->wq;
204 204
205 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work); 205 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
206 } 206 }
207 207
208 /** 208 /**
209 * queue_delayed_work - queue work on a workqueue after delay 209 * queue_delayed_work - queue work on a workqueue after delay
210 * @wq: workqueue to use 210 * @wq: workqueue to use
211 * @dwork: delayable work to queue 211 * @dwork: delayable work to queue
212 * @delay: number of jiffies to wait before queueing 212 * @delay: number of jiffies to wait before queueing
213 * 213 *
214 * Returns 0 if @work was already on a queue, non-zero otherwise. 214 * Returns 0 if @work was already on a queue, non-zero otherwise.
215 */ 215 */
216 int queue_delayed_work(struct workqueue_struct *wq, 216 int queue_delayed_work(struct workqueue_struct *wq,
217 struct delayed_work *dwork, unsigned long delay) 217 struct delayed_work *dwork, unsigned long delay)
218 { 218 {
219 if (delay == 0) 219 if (delay == 0)
220 return queue_work(wq, &dwork->work); 220 return queue_work(wq, &dwork->work);
221 221
222 return queue_delayed_work_on(-1, wq, dwork, delay); 222 return queue_delayed_work_on(-1, wq, dwork, delay);
223 } 223 }
224 EXPORT_SYMBOL_GPL(queue_delayed_work); 224 EXPORT_SYMBOL_GPL(queue_delayed_work);
225 225
226 /** 226 /**
227 * queue_delayed_work_on - queue work on specific CPU after delay 227 * queue_delayed_work_on - queue work on specific CPU after delay
228 * @cpu: CPU number to execute work on 228 * @cpu: CPU number to execute work on
229 * @wq: workqueue to use 229 * @wq: workqueue to use
230 * @dwork: work to queue 230 * @dwork: work to queue
231 * @delay: number of jiffies to wait before queueing 231 * @delay: number of jiffies to wait before queueing
232 * 232 *
233 * Returns 0 if @work was already on a queue, non-zero otherwise. 233 * Returns 0 if @work was already on a queue, non-zero otherwise.
234 */ 234 */
235 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 235 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
236 struct delayed_work *dwork, unsigned long delay) 236 struct delayed_work *dwork, unsigned long delay)
237 { 237 {
238 int ret = 0; 238 int ret = 0;
239 struct timer_list *timer = &dwork->timer; 239 struct timer_list *timer = &dwork->timer;
240 struct work_struct *work = &dwork->work; 240 struct work_struct *work = &dwork->work;
241 241
242 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 242 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
243 BUG_ON(timer_pending(timer)); 243 BUG_ON(timer_pending(timer));
244 BUG_ON(!list_empty(&work->entry)); 244 BUG_ON(!list_empty(&work->entry));
245 245
246 timer_stats_timer_set_start_info(&dwork->timer); 246 timer_stats_timer_set_start_info(&dwork->timer);
247 247
248 /* This stores cwq for the moment, for the timer_fn */ 248 /* This stores cwq for the moment, for the timer_fn */
249 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id())); 249 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
250 timer->expires = jiffies + delay; 250 timer->expires = jiffies + delay;
251 timer->data = (unsigned long)dwork; 251 timer->data = (unsigned long)dwork;
252 timer->function = delayed_work_timer_fn; 252 timer->function = delayed_work_timer_fn;
253 253
254 if (unlikely(cpu >= 0)) 254 if (unlikely(cpu >= 0))
255 add_timer_on(timer, cpu); 255 add_timer_on(timer, cpu);
256 else 256 else
257 add_timer(timer); 257 add_timer(timer);
258 ret = 1; 258 ret = 1;
259 } 259 }
260 return ret; 260 return ret;
261 } 261 }
262 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 262 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
263 263
264 static void run_workqueue(struct cpu_workqueue_struct *cwq) 264 static void run_workqueue(struct cpu_workqueue_struct *cwq)
265 { 265 {
266 spin_lock_irq(&cwq->lock); 266 spin_lock_irq(&cwq->lock);
267 while (!list_empty(&cwq->worklist)) { 267 while (!list_empty(&cwq->worklist)) {
268 struct work_struct *work = list_entry(cwq->worklist.next, 268 struct work_struct *work = list_entry(cwq->worklist.next,
269 struct work_struct, entry); 269 struct work_struct, entry);
270 work_func_t f = work->func; 270 work_func_t f = work->func;
271 #ifdef CONFIG_LOCKDEP 271 #ifdef CONFIG_LOCKDEP
272 /* 272 /*
273 * It is permissible to free the struct work_struct 273 * It is permissible to free the struct work_struct
274 * from inside the function that is called from it, 274 * from inside the function that is called from it,
275 * this we need to take into account for lockdep too. 275 * this we need to take into account for lockdep too.
276 * To avoid bogus "held lock freed" warnings as well 276 * To avoid bogus "held lock freed" warnings as well
277 * as problems when looking into work->lockdep_map, 277 * as problems when looking into work->lockdep_map,
278 * make a copy and use that here. 278 * make a copy and use that here.
279 */ 279 */
280 struct lockdep_map lockdep_map = work->lockdep_map; 280 struct lockdep_map lockdep_map = work->lockdep_map;
281 #endif 281 #endif
282 trace_workqueue_execution(cwq->thread, work); 282 trace_workqueue_execution(cwq->thread, work);
283 cwq->current_work = work; 283 cwq->current_work = work;
284 list_del_init(cwq->worklist.next); 284 list_del_init(cwq->worklist.next);
285 spin_unlock_irq(&cwq->lock); 285 spin_unlock_irq(&cwq->lock);
286 286
287 BUG_ON(get_wq_data(work) != cwq); 287 BUG_ON(get_wq_data(work) != cwq);
288 work_clear_pending(work); 288 work_clear_pending(work);
289 lock_map_acquire(&cwq->wq->lockdep_map); 289 lock_map_acquire(&cwq->wq->lockdep_map);
290 lock_map_acquire(&lockdep_map); 290 lock_map_acquire(&lockdep_map);
291 f(work); 291 f(work);
292 lock_map_release(&lockdep_map); 292 lock_map_release(&lockdep_map);
293 lock_map_release(&cwq->wq->lockdep_map); 293 lock_map_release(&cwq->wq->lockdep_map);
294 294
295 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 295 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
296 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 296 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
297 "%s/0x%08x/%d\n", 297 "%s/0x%08x/%d\n",
298 current->comm, preempt_count(), 298 current->comm, preempt_count(),
299 task_pid_nr(current)); 299 task_pid_nr(current));
300 printk(KERN_ERR " last function: "); 300 printk(KERN_ERR " last function: ");
301 print_symbol("%s\n", (unsigned long)f); 301 print_symbol("%s\n", (unsigned long)f);
302 debug_show_held_locks(current); 302 debug_show_held_locks(current);
303 dump_stack(); 303 dump_stack();
304 } 304 }
305 305
306 spin_lock_irq(&cwq->lock); 306 spin_lock_irq(&cwq->lock);
307 cwq->current_work = NULL; 307 cwq->current_work = NULL;
308 } 308 }
309 spin_unlock_irq(&cwq->lock); 309 spin_unlock_irq(&cwq->lock);
310 } 310 }
311 311
312 static int worker_thread(void *__cwq) 312 static int worker_thread(void *__cwq)
313 { 313 {
314 struct cpu_workqueue_struct *cwq = __cwq; 314 struct cpu_workqueue_struct *cwq = __cwq;
315 DEFINE_WAIT(wait); 315 DEFINE_WAIT(wait);
316 316
317 if (cwq->wq->freezeable) 317 if (cwq->wq->freezeable)
318 set_freezable(); 318 set_freezable();
319 319
320 for (;;) { 320 for (;;) {
321 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 321 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
322 if (!freezing(current) && 322 if (!freezing(current) &&
323 !kthread_should_stop() && 323 !kthread_should_stop() &&
324 list_empty(&cwq->worklist)) 324 list_empty(&cwq->worklist))
325 schedule(); 325 schedule();
326 finish_wait(&cwq->more_work, &wait); 326 finish_wait(&cwq->more_work, &wait);
327 327
328 try_to_freeze(); 328 try_to_freeze();
329 329
330 if (kthread_should_stop()) 330 if (kthread_should_stop())
331 break; 331 break;
332 332
333 run_workqueue(cwq); 333 run_workqueue(cwq);
334 } 334 }
335 335
336 return 0; 336 return 0;
337 } 337 }
338 338
339 struct wq_barrier { 339 struct wq_barrier {
340 struct work_struct work; 340 struct work_struct work;
341 struct completion done; 341 struct completion done;
342 }; 342 };
343 343
344 static void wq_barrier_func(struct work_struct *work) 344 static void wq_barrier_func(struct work_struct *work)
345 { 345 {
346 struct wq_barrier *barr = container_of(work, struct wq_barrier, work); 346 struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
347 complete(&barr->done); 347 complete(&barr->done);
348 } 348 }
349 349
350 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 350 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
351 struct wq_barrier *barr, struct list_head *head) 351 struct wq_barrier *barr, struct list_head *head)
352 { 352 {
353 INIT_WORK(&barr->work, wq_barrier_func); 353 INIT_WORK(&barr->work, wq_barrier_func);
354 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); 354 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
355 355
356 init_completion(&barr->done); 356 init_completion(&barr->done);
357 357
358 insert_work(cwq, &barr->work, head); 358 insert_work(cwq, &barr->work, head);
359 } 359 }
360 360
361 static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 361 static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
362 { 362 {
363 int active = 0; 363 int active = 0;
364 struct wq_barrier barr; 364 struct wq_barrier barr;
365 365
366 WARN_ON(cwq->thread == current); 366 WARN_ON(cwq->thread == current);
367 367
368 spin_lock_irq(&cwq->lock); 368 spin_lock_irq(&cwq->lock);
369 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { 369 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
370 insert_wq_barrier(cwq, &barr, &cwq->worklist); 370 insert_wq_barrier(cwq, &barr, &cwq->worklist);
371 active = 1; 371 active = 1;
372 } 372 }
373 spin_unlock_irq(&cwq->lock); 373 spin_unlock_irq(&cwq->lock);
374 374
375 if (active) 375 if (active)
376 wait_for_completion(&barr.done); 376 wait_for_completion(&barr.done);
377 377
378 return active; 378 return active;
379 } 379 }
380 380
381 /** 381 /**
382 * flush_workqueue - ensure that any scheduled work has run to completion. 382 * flush_workqueue - ensure that any scheduled work has run to completion.
383 * @wq: workqueue to flush 383 * @wq: workqueue to flush
384 * 384 *
385 * Forces execution of the workqueue and blocks until its completion. 385 * Forces execution of the workqueue and blocks until its completion.
386 * This is typically used in driver shutdown handlers. 386 * This is typically used in driver shutdown handlers.
387 * 387 *
388 * We sleep until all works which were queued on entry have been handled, 388 * We sleep until all works which were queued on entry have been handled,
389 * but we are not livelocked by new incoming ones. 389 * but we are not livelocked by new incoming ones.
390 * 390 *
391 * This function used to run the workqueues itself. Now we just wait for the 391 * This function used to run the workqueues itself. Now we just wait for the
392 * helper threads to do it. 392 * helper threads to do it.
393 */ 393 */
394 void flush_workqueue(struct workqueue_struct *wq) 394 void flush_workqueue(struct workqueue_struct *wq)
395 { 395 {
396 const struct cpumask *cpu_map = wq_cpu_map(wq); 396 const struct cpumask *cpu_map = wq_cpu_map(wq);
397 int cpu; 397 int cpu;
398 398
399 might_sleep(); 399 might_sleep();
400 lock_map_acquire(&wq->lockdep_map); 400 lock_map_acquire(&wq->lockdep_map);
401 lock_map_release(&wq->lockdep_map); 401 lock_map_release(&wq->lockdep_map);
402 for_each_cpu(cpu, cpu_map) 402 for_each_cpu(cpu, cpu_map)
403 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 403 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
404 } 404 }
405 EXPORT_SYMBOL_GPL(flush_workqueue); 405 EXPORT_SYMBOL_GPL(flush_workqueue);
406 406
407 /** 407 /**
408 * flush_work - block until a work_struct's callback has terminated 408 * flush_work - block until a work_struct's callback has terminated
409 * @work: the work which is to be flushed 409 * @work: the work which is to be flushed
410 * 410 *
411 * Returns false if @work has already terminated. 411 * Returns false if @work has already terminated.
412 * 412 *
413 * It is expected that, prior to calling flush_work(), the caller has 413 * It is expected that, prior to calling flush_work(), the caller has
414 * arranged for the work to not be requeued, otherwise it doesn't make 414 * arranged for the work to not be requeued, otherwise it doesn't make
415 * sense to use this function. 415 * sense to use this function.
416 */ 416 */
417 int flush_work(struct work_struct *work) 417 int flush_work(struct work_struct *work)
418 { 418 {
419 struct cpu_workqueue_struct *cwq; 419 struct cpu_workqueue_struct *cwq;
420 struct list_head *prev; 420 struct list_head *prev;
421 struct wq_barrier barr; 421 struct wq_barrier barr;
422 422
423 might_sleep(); 423 might_sleep();
424 cwq = get_wq_data(work); 424 cwq = get_wq_data(work);
425 if (!cwq) 425 if (!cwq)
426 return 0; 426 return 0;
427 427
428 lock_map_acquire(&cwq->wq->lockdep_map); 428 lock_map_acquire(&cwq->wq->lockdep_map);
429 lock_map_release(&cwq->wq->lockdep_map); 429 lock_map_release(&cwq->wq->lockdep_map);
430 430
431 prev = NULL; 431 prev = NULL;
432 spin_lock_irq(&cwq->lock); 432 spin_lock_irq(&cwq->lock);
433 if (!list_empty(&work->entry)) { 433 if (!list_empty(&work->entry)) {
434 /* 434 /*
435 * See the comment near try_to_grab_pending()->smp_rmb(). 435 * See the comment near try_to_grab_pending()->smp_rmb().
436 * If it was re-queued under us we are not going to wait. 436 * If it was re-queued under us we are not going to wait.
437 */ 437 */
438 smp_rmb(); 438 smp_rmb();
439 if (unlikely(cwq != get_wq_data(work))) 439 if (unlikely(cwq != get_wq_data(work)))
440 goto out; 440 goto out;
441 prev = &work->entry; 441 prev = &work->entry;
442 } else { 442 } else {
443 if (cwq->current_work != work) 443 if (cwq->current_work != work)
444 goto out; 444 goto out;
445 prev = &cwq->worklist; 445 prev = &cwq->worklist;
446 } 446 }
447 insert_wq_barrier(cwq, &barr, prev->next); 447 insert_wq_barrier(cwq, &barr, prev->next);
448 out: 448 out:
449 spin_unlock_irq(&cwq->lock); 449 spin_unlock_irq(&cwq->lock);
450 if (!prev) 450 if (!prev)
451 return 0; 451 return 0;
452 452
453 wait_for_completion(&barr.done); 453 wait_for_completion(&barr.done);
454 return 1; 454 return 1;
455 } 455 }
456 EXPORT_SYMBOL_GPL(flush_work); 456 EXPORT_SYMBOL_GPL(flush_work);
457 457
458 /* 458 /*
459 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit, 459 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
460 * so this work can't be re-armed in any way. 460 * so this work can't be re-armed in any way.
461 */ 461 */
462 static int try_to_grab_pending(struct work_struct *work) 462 static int try_to_grab_pending(struct work_struct *work)
463 { 463 {
464 struct cpu_workqueue_struct *cwq; 464 struct cpu_workqueue_struct *cwq;
465 int ret = -1; 465 int ret = -1;
466 466
467 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) 467 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
468 return 0; 468 return 0;
469 469
470 /* 470 /*
471 * The queueing is in progress, or it is already queued. Try to 471 * The queueing is in progress, or it is already queued. Try to
472 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 472 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
473 */ 473 */
474 474
475 cwq = get_wq_data(work); 475 cwq = get_wq_data(work);
476 if (!cwq) 476 if (!cwq)
477 return ret; 477 return ret;
478 478
479 spin_lock_irq(&cwq->lock); 479 spin_lock_irq(&cwq->lock);
480 if (!list_empty(&work->entry)) { 480 if (!list_empty(&work->entry)) {
481 /* 481 /*
482 * This work is queued, but perhaps we locked the wrong cwq. 482 * This work is queued, but perhaps we locked the wrong cwq.
483 * In that case we must see the new value after rmb(), see 483 * In that case we must see the new value after rmb(), see
484 * insert_work()->wmb(). 484 * insert_work()->wmb().
485 */ 485 */
486 smp_rmb(); 486 smp_rmb();
487 if (cwq == get_wq_data(work)) { 487 if (cwq == get_wq_data(work)) {
488 list_del_init(&work->entry); 488 list_del_init(&work->entry);
489 ret = 1; 489 ret = 1;
490 } 490 }
491 } 491 }
492 spin_unlock_irq(&cwq->lock); 492 spin_unlock_irq(&cwq->lock);
493 493
494 return ret; 494 return ret;
495 } 495 }
496 496
497 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, 497 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
498 struct work_struct *work) 498 struct work_struct *work)
499 { 499 {
500 struct wq_barrier barr; 500 struct wq_barrier barr;
501 int running = 0; 501 int running = 0;
502 502
503 spin_lock_irq(&cwq->lock); 503 spin_lock_irq(&cwq->lock);
504 if (unlikely(cwq->current_work == work)) { 504 if (unlikely(cwq->current_work == work)) {
505 insert_wq_barrier(cwq, &barr, cwq->worklist.next); 505 insert_wq_barrier(cwq, &barr, cwq->worklist.next);
506 running = 1; 506 running = 1;
507 } 507 }
508 spin_unlock_irq(&cwq->lock); 508 spin_unlock_irq(&cwq->lock);
509 509
510 if (unlikely(running)) 510 if (unlikely(running))
511 wait_for_completion(&barr.done); 511 wait_for_completion(&barr.done);
512 } 512 }
513 513
514 static void wait_on_work(struct work_struct *work) 514 static void wait_on_work(struct work_struct *work)
515 { 515 {
516 struct cpu_workqueue_struct *cwq; 516 struct cpu_workqueue_struct *cwq;
517 struct workqueue_struct *wq; 517 struct workqueue_struct *wq;
518 const struct cpumask *cpu_map; 518 const struct cpumask *cpu_map;
519 int cpu; 519 int cpu;
520 520
521 might_sleep(); 521 might_sleep();
522 522
523 lock_map_acquire(&work->lockdep_map); 523 lock_map_acquire(&work->lockdep_map);
524 lock_map_release(&work->lockdep_map); 524 lock_map_release(&work->lockdep_map);
525 525
526 cwq = get_wq_data(work); 526 cwq = get_wq_data(work);
527 if (!cwq) 527 if (!cwq)
528 return; 528 return;
529 529
530 wq = cwq->wq; 530 wq = cwq->wq;
531 cpu_map = wq_cpu_map(wq); 531 cpu_map = wq_cpu_map(wq);
532 532
533 for_each_cpu(cpu, cpu_map) 533 for_each_cpu(cpu, cpu_map)
534 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 534 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
535 } 535 }
536 536
537 static int __cancel_work_timer(struct work_struct *work, 537 static int __cancel_work_timer(struct work_struct *work,
538 struct timer_list* timer) 538 struct timer_list* timer)
539 { 539 {
540 int ret; 540 int ret;
541 541
542 do { 542 do {
543 ret = (timer && likely(del_timer(timer))); 543 ret = (timer && likely(del_timer(timer)));
544 if (!ret) 544 if (!ret)
545 ret = try_to_grab_pending(work); 545 ret = try_to_grab_pending(work);
546 wait_on_work(work); 546 wait_on_work(work);
547 } while (unlikely(ret < 0)); 547 } while (unlikely(ret < 0));
548 548
549 work_clear_pending(work); 549 work_clear_pending(work);
550 return ret; 550 return ret;
551 } 551 }
552 552
553 /** 553 /**
554 * cancel_work_sync - block until a work_struct's callback has terminated 554 * cancel_work_sync - block until a work_struct's callback has terminated
555 * @work: the work which is to be flushed 555 * @work: the work which is to be flushed
556 * 556 *
557 * Returns true if @work was pending. 557 * Returns true if @work was pending.
558 * 558 *
559 * cancel_work_sync() will cancel the work if it is queued. If the work's 559 * cancel_work_sync() will cancel the work if it is queued. If the work's
560 * callback appears to be running, cancel_work_sync() will block until it 560 * callback appears to be running, cancel_work_sync() will block until it
561 * has completed. 561 * has completed.
562 * 562 *
563 * It is possible to use this function if the work re-queues itself. It can 563 * It is possible to use this function if the work re-queues itself. It can
564 * cancel the work even if it migrates to another workqueue, however in that 564 * cancel the work even if it migrates to another workqueue, however in that
565 * case it only guarantees that work->func() has completed on the last queued 565 * case it only guarantees that work->func() has completed on the last queued
566 * workqueue. 566 * workqueue.
567 * 567 *
568 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not 568 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
569 * pending, otherwise it goes into a busy-wait loop until the timer expires. 569 * pending, otherwise it goes into a busy-wait loop until the timer expires.
570 * 570 *
571 * The caller must ensure that workqueue_struct on which this work was last 571 * The caller must ensure that workqueue_struct on which this work was last
572 * queued can't be destroyed before this function returns. 572 * queued can't be destroyed before this function returns.
573 */ 573 */
574 int cancel_work_sync(struct work_struct *work) 574 int cancel_work_sync(struct work_struct *work)
575 { 575 {
576 return __cancel_work_timer(work, NULL); 576 return __cancel_work_timer(work, NULL);
577 } 577 }
578 EXPORT_SYMBOL_GPL(cancel_work_sync); 578 EXPORT_SYMBOL_GPL(cancel_work_sync);
579 579
580 /** 580 /**
581 * cancel_delayed_work_sync - reliably kill off a delayed work. 581 * cancel_delayed_work_sync - reliably kill off a delayed work.
582 * @dwork: the delayed work struct 582 * @dwork: the delayed work struct
583 * 583 *
584 * Returns true if @dwork was pending. 584 * Returns true if @dwork was pending.
585 * 585 *
586 * It is possible to use this function if @dwork rearms itself via queue_work() 586 * It is possible to use this function if @dwork rearms itself via queue_work()
587 * or queue_delayed_work(). See also the comment for cancel_work_sync(). 587 * or queue_delayed_work(). See also the comment for cancel_work_sync().
588 */ 588 */
589 int cancel_delayed_work_sync(struct delayed_work *dwork) 589 int cancel_delayed_work_sync(struct delayed_work *dwork)
590 { 590 {
591 return __cancel_work_timer(&dwork->work, &dwork->timer); 591 return __cancel_work_timer(&dwork->work, &dwork->timer);
592 } 592 }
593 EXPORT_SYMBOL(cancel_delayed_work_sync); 593 EXPORT_SYMBOL(cancel_delayed_work_sync);
594 594
595 static struct workqueue_struct *keventd_wq __read_mostly; 595 static struct workqueue_struct *keventd_wq __read_mostly;
596 596
597 /** 597 /**
598 * schedule_work - put work task in global workqueue 598 * schedule_work - put work task in global workqueue
599 * @work: job to be done 599 * @work: job to be done
600 * 600 *
601 * Returns zero if @work was already on the kernel-global workqueue and 601 * Returns zero if @work was already on the kernel-global workqueue and
602 * non-zero otherwise. 602 * non-zero otherwise.
603 * 603 *
604 * This puts a job in the kernel-global workqueue if it was not already 604 * This puts a job in the kernel-global workqueue if it was not already
605 * queued and leaves it in the same position on the kernel-global 605 * queued and leaves it in the same position on the kernel-global
606 * workqueue otherwise. 606 * workqueue otherwise.
607 */ 607 */
608 int schedule_work(struct work_struct *work) 608 int schedule_work(struct work_struct *work)
609 { 609 {
610 return queue_work(keventd_wq, work); 610 return queue_work(keventd_wq, work);
611 } 611 }
612 EXPORT_SYMBOL(schedule_work); 612 EXPORT_SYMBOL(schedule_work);
613 613
614 /* 614 /*
615 * schedule_work_on - put work task on a specific cpu 615 * schedule_work_on - put work task on a specific cpu
616 * @cpu: cpu to put the work task on 616 * @cpu: cpu to put the work task on
617 * @work: job to be done 617 * @work: job to be done
618 * 618 *
619 * This puts a job on a specific cpu 619 * This puts a job on a specific cpu
620 */ 620 */
621 int schedule_work_on(int cpu, struct work_struct *work) 621 int schedule_work_on(int cpu, struct work_struct *work)
622 { 622 {
623 return queue_work_on(cpu, keventd_wq, work); 623 return queue_work_on(cpu, keventd_wq, work);
624 } 624 }
625 EXPORT_SYMBOL(schedule_work_on); 625 EXPORT_SYMBOL(schedule_work_on);
626 626
627 /** 627 /**
628 * schedule_delayed_work - put work task in global workqueue after delay 628 * schedule_delayed_work - put work task in global workqueue after delay
629 * @dwork: job to be done 629 * @dwork: job to be done
630 * @delay: number of jiffies to wait or 0 for immediate execution 630 * @delay: number of jiffies to wait or 0 for immediate execution
631 * 631 *
632 * After waiting for a given time this puts a job in the kernel-global 632 * After waiting for a given time this puts a job in the kernel-global
633 * workqueue. 633 * workqueue.
634 */ 634 */
635 int schedule_delayed_work(struct delayed_work *dwork, 635 int schedule_delayed_work(struct delayed_work *dwork,
636 unsigned long delay) 636 unsigned long delay)
637 { 637 {
638 return queue_delayed_work(keventd_wq, dwork, delay); 638 return queue_delayed_work(keventd_wq, dwork, delay);
639 } 639 }
640 EXPORT_SYMBOL(schedule_delayed_work); 640 EXPORT_SYMBOL(schedule_delayed_work);
641 641
642 /** 642 /**
643 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 643 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
644 * @cpu: cpu to use 644 * @cpu: cpu to use
645 * @dwork: job to be done 645 * @dwork: job to be done
646 * @delay: number of jiffies to wait 646 * @delay: number of jiffies to wait
647 * 647 *
648 * After waiting for a given time this puts a job in the kernel-global 648 * After waiting for a given time this puts a job in the kernel-global
649 * workqueue on the specified CPU. 649 * workqueue on the specified CPU.
650 */ 650 */
651 int schedule_delayed_work_on(int cpu, 651 int schedule_delayed_work_on(int cpu,
652 struct delayed_work *dwork, unsigned long delay) 652 struct delayed_work *dwork, unsigned long delay)
653 { 653 {
654 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 654 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
655 } 655 }
656 EXPORT_SYMBOL(schedule_delayed_work_on); 656 EXPORT_SYMBOL(schedule_delayed_work_on);
657 657
658 /** 658 /**
659 * schedule_on_each_cpu - call a function on each online CPU from keventd 659 * schedule_on_each_cpu - call a function on each online CPU from keventd
660 * @func: the function to call 660 * @func: the function to call
661 * 661 *
662 * Returns zero on success. 662 * Returns zero on success.
663 * Returns -ve errno on failure. 663 * Returns -ve errno on failure.
664 * 664 *
665 * schedule_on_each_cpu() is very slow. 665 * schedule_on_each_cpu() is very slow.
666 */ 666 */
667 int schedule_on_each_cpu(work_func_t func) 667 int schedule_on_each_cpu(work_func_t func)
668 { 668 {
669 int cpu; 669 int cpu;
670 int orig = -1;
670 struct work_struct *works; 671 struct work_struct *works;
671 672
672 works = alloc_percpu(struct work_struct); 673 works = alloc_percpu(struct work_struct);
673 if (!works) 674 if (!works)
674 return -ENOMEM; 675 return -ENOMEM;
675 676
677 /*
678 * when running in keventd don't schedule a work item on itself.
679 * Can just call directly because the work queue is already bound.
680 * This also is faster.
681 * Make this a generic parameter for other workqueues?
682 */
683 if (current_is_keventd()) {
684 orig = raw_smp_processor_id();
685 INIT_WORK(per_cpu_ptr(works, orig), func);
686 func(per_cpu_ptr(works, orig));
687 }
688
676 get_online_cpus(); 689 get_online_cpus();
677 for_each_online_cpu(cpu) { 690 for_each_online_cpu(cpu) {
678 struct work_struct *work = per_cpu_ptr(works, cpu); 691 struct work_struct *work = per_cpu_ptr(works, cpu);
679 692
693 if (cpu == orig)
694 continue;
680 INIT_WORK(work, func); 695 INIT_WORK(work, func);
681 schedule_work_on(cpu, work); 696 schedule_work_on(cpu, work);
682 } 697 }
683 for_each_online_cpu(cpu) 698 for_each_online_cpu(cpu) {
684 flush_work(per_cpu_ptr(works, cpu)); 699 if (cpu != orig)
700 flush_work(per_cpu_ptr(works, cpu));
701 }
685 put_online_cpus(); 702 put_online_cpus();
686 free_percpu(works); 703 free_percpu(works);
687 return 0; 704 return 0;
688 } 705 }
689 706
690 void flush_scheduled_work(void) 707 void flush_scheduled_work(void)
691 { 708 {
692 flush_workqueue(keventd_wq); 709 flush_workqueue(keventd_wq);
693 } 710 }
694 EXPORT_SYMBOL(flush_scheduled_work); 711 EXPORT_SYMBOL(flush_scheduled_work);
695 712
696 /** 713 /**
697 * execute_in_process_context - reliably execute the routine with user context 714 * execute_in_process_context - reliably execute the routine with user context
698 * @fn: the function to execute 715 * @fn: the function to execute
699 * @ew: guaranteed storage for the execute work structure (must 716 * @ew: guaranteed storage for the execute work structure (must
700 * be available when the work executes) 717 * be available when the work executes)
701 * 718 *
702 * Executes the function immediately if process context is available, 719 * Executes the function immediately if process context is available,
703 * otherwise schedules the function for delayed execution. 720 * otherwise schedules the function for delayed execution.
704 * 721 *
705 * Returns: 0 - function was executed 722 * Returns: 0 - function was executed
706 * 1 - function was scheduled for execution 723 * 1 - function was scheduled for execution
707 */ 724 */
708 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 725 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
709 { 726 {
710 if (!in_interrupt()) { 727 if (!in_interrupt()) {
711 fn(&ew->work); 728 fn(&ew->work);
712 return 0; 729 return 0;
713 } 730 }
714 731
715 INIT_WORK(&ew->work, fn); 732 INIT_WORK(&ew->work, fn);
716 schedule_work(&ew->work); 733 schedule_work(&ew->work);
717 734
718 return 1; 735 return 1;
719 } 736 }
720 EXPORT_SYMBOL_GPL(execute_in_process_context); 737 EXPORT_SYMBOL_GPL(execute_in_process_context);
721 738
722 int keventd_up(void) 739 int keventd_up(void)
723 { 740 {
724 return keventd_wq != NULL; 741 return keventd_wq != NULL;
725 } 742 }
726 743
727 int current_is_keventd(void) 744 int current_is_keventd(void)
728 { 745 {
729 struct cpu_workqueue_struct *cwq; 746 struct cpu_workqueue_struct *cwq;
730 int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 747 int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
731 int ret = 0; 748 int ret = 0;
732 749
733 BUG_ON(!keventd_wq); 750 BUG_ON(!keventd_wq);
734 751
735 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 752 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
736 if (current == cwq->thread) 753 if (current == cwq->thread)
737 ret = 1; 754 ret = 1;
738 755
739 return ret; 756 return ret;
740 757
741 } 758 }
742 759
743 static struct cpu_workqueue_struct * 760 static struct cpu_workqueue_struct *
744 init_cpu_workqueue(struct workqueue_struct *wq, int cpu) 761 init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
745 { 762 {
746 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 763 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
747 764
748 cwq->wq = wq; 765 cwq->wq = wq;
749 spin_lock_init(&cwq->lock); 766 spin_lock_init(&cwq->lock);
750 INIT_LIST_HEAD(&cwq->worklist); 767 INIT_LIST_HEAD(&cwq->worklist);
751 init_waitqueue_head(&cwq->more_work); 768 init_waitqueue_head(&cwq->more_work);
752 769
753 return cwq; 770 return cwq;
754 } 771 }
755 772
756 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 773 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
757 { 774 {
758 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 }; 775 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
759 struct workqueue_struct *wq = cwq->wq; 776 struct workqueue_struct *wq = cwq->wq;
760 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d"; 777 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
761 struct task_struct *p; 778 struct task_struct *p;
762 779
763 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 780 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
764 /* 781 /*
765 * Nobody can add the work_struct to this cwq, 782 * Nobody can add the work_struct to this cwq,
766 * if (caller is __create_workqueue) 783 * if (caller is __create_workqueue)
767 * nobody should see this wq 784 * nobody should see this wq
768 * else // caller is CPU_UP_PREPARE 785 * else // caller is CPU_UP_PREPARE
769 * cpu is not on cpu_online_map 786 * cpu is not on cpu_online_map
770 * so we can abort safely. 787 * so we can abort safely.
771 */ 788 */
772 if (IS_ERR(p)) 789 if (IS_ERR(p))
773 return PTR_ERR(p); 790 return PTR_ERR(p);
774 if (cwq->wq->rt) 791 if (cwq->wq->rt)
775 sched_setscheduler_nocheck(p, SCHED_FIFO, &param); 792 sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
776 cwq->thread = p; 793 cwq->thread = p;
777 794
778 trace_workqueue_creation(cwq->thread, cpu); 795 trace_workqueue_creation(cwq->thread, cpu);
779 796
780 return 0; 797 return 0;
781 } 798 }
782 799
783 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 800 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
784 { 801 {
785 struct task_struct *p = cwq->thread; 802 struct task_struct *p = cwq->thread;
786 803
787 if (p != NULL) { 804 if (p != NULL) {
788 if (cpu >= 0) 805 if (cpu >= 0)
789 kthread_bind(p, cpu); 806 kthread_bind(p, cpu);
790 wake_up_process(p); 807 wake_up_process(p);
791 } 808 }
792 } 809 }
793 810
794 struct workqueue_struct *__create_workqueue_key(const char *name, 811 struct workqueue_struct *__create_workqueue_key(const char *name,
795 int singlethread, 812 int singlethread,
796 int freezeable, 813 int freezeable,
797 int rt, 814 int rt,
798 struct lock_class_key *key, 815 struct lock_class_key *key,
799 const char *lock_name) 816 const char *lock_name)
800 { 817 {
801 struct workqueue_struct *wq; 818 struct workqueue_struct *wq;
802 struct cpu_workqueue_struct *cwq; 819 struct cpu_workqueue_struct *cwq;
803 int err = 0, cpu; 820 int err = 0, cpu;
804 821
805 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 822 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
806 if (!wq) 823 if (!wq)
807 return NULL; 824 return NULL;
808 825
809 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 826 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
810 if (!wq->cpu_wq) { 827 if (!wq->cpu_wq) {
811 kfree(wq); 828 kfree(wq);
812 return NULL; 829 return NULL;
813 } 830 }
814 831
815 wq->name = name; 832 wq->name = name;
816 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 833 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
817 wq->singlethread = singlethread; 834 wq->singlethread = singlethread;
818 wq->freezeable = freezeable; 835 wq->freezeable = freezeable;
819 wq->rt = rt; 836 wq->rt = rt;
820 INIT_LIST_HEAD(&wq->list); 837 INIT_LIST_HEAD(&wq->list);
821 838
822 if (singlethread) { 839 if (singlethread) {
823 cwq = init_cpu_workqueue(wq, singlethread_cpu); 840 cwq = init_cpu_workqueue(wq, singlethread_cpu);
824 err = create_workqueue_thread(cwq, singlethread_cpu); 841 err = create_workqueue_thread(cwq, singlethread_cpu);
825 start_workqueue_thread(cwq, -1); 842 start_workqueue_thread(cwq, -1);
826 } else { 843 } else {
827 cpu_maps_update_begin(); 844 cpu_maps_update_begin();
828 /* 845 /*
829 * We must place this wq on list even if the code below fails. 846 * We must place this wq on list even if the code below fails.
830 * cpu_down(cpu) can remove cpu from cpu_populated_map before 847 * cpu_down(cpu) can remove cpu from cpu_populated_map before
831 * destroy_workqueue() takes the lock, in that case we leak 848 * destroy_workqueue() takes the lock, in that case we leak
832 * cwq[cpu]->thread. 849 * cwq[cpu]->thread.
833 */ 850 */
834 spin_lock(&workqueue_lock); 851 spin_lock(&workqueue_lock);
835 list_add(&wq->list, &workqueues); 852 list_add(&wq->list, &workqueues);
836 spin_unlock(&workqueue_lock); 853 spin_unlock(&workqueue_lock);
837 /* 854 /*
838 * We must initialize cwqs for each possible cpu even if we 855 * We must initialize cwqs for each possible cpu even if we
839 * are going to call destroy_workqueue() finally. Otherwise 856 * are going to call destroy_workqueue() finally. Otherwise
840 * cpu_up() can hit the uninitialized cwq once we drop the 857 * cpu_up() can hit the uninitialized cwq once we drop the
841 * lock. 858 * lock.
842 */ 859 */
843 for_each_possible_cpu(cpu) { 860 for_each_possible_cpu(cpu) {
844 cwq = init_cpu_workqueue(wq, cpu); 861 cwq = init_cpu_workqueue(wq, cpu);
845 if (err || !cpu_online(cpu)) 862 if (err || !cpu_online(cpu))
846 continue; 863 continue;
847 err = create_workqueue_thread(cwq, cpu); 864 err = create_workqueue_thread(cwq, cpu);
848 start_workqueue_thread(cwq, cpu); 865 start_workqueue_thread(cwq, cpu);
849 } 866 }
850 cpu_maps_update_done(); 867 cpu_maps_update_done();
851 } 868 }
852 869
853 if (err) { 870 if (err) {
854 destroy_workqueue(wq); 871 destroy_workqueue(wq);
855 wq = NULL; 872 wq = NULL;
856 } 873 }
857 return wq; 874 return wq;
858 } 875 }
859 EXPORT_SYMBOL_GPL(__create_workqueue_key); 876 EXPORT_SYMBOL_GPL(__create_workqueue_key);
860 877
861 static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) 878 static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
862 { 879 {
863 /* 880 /*
864 * Our caller is either destroy_workqueue() or CPU_POST_DEAD, 881 * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
865 * cpu_add_remove_lock protects cwq->thread. 882 * cpu_add_remove_lock protects cwq->thread.
866 */ 883 */
867 if (cwq->thread == NULL) 884 if (cwq->thread == NULL)
868 return; 885 return;
869 886
870 lock_map_acquire(&cwq->wq->lockdep_map); 887 lock_map_acquire(&cwq->wq->lockdep_map);
871 lock_map_release(&cwq->wq->lockdep_map); 888 lock_map_release(&cwq->wq->lockdep_map);
872 889
873 flush_cpu_workqueue(cwq); 890 flush_cpu_workqueue(cwq);
874 /* 891 /*
875 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty, 892 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
876 * a concurrent flush_workqueue() can insert a barrier after us. 893 * a concurrent flush_workqueue() can insert a barrier after us.
877 * However, in that case run_workqueue() won't return and check 894 * However, in that case run_workqueue() won't return and check
878 * kthread_should_stop() until it flushes all work_struct's. 895 * kthread_should_stop() until it flushes all work_struct's.
879 * When ->worklist becomes empty it is safe to exit because no 896 * When ->worklist becomes empty it is safe to exit because no
880 * more work_structs can be queued on this cwq: flush_workqueue 897 * more work_structs can be queued on this cwq: flush_workqueue
881 * checks list_empty(), and a "normal" queue_work() can't use 898 * checks list_empty(), and a "normal" queue_work() can't use
882 * a dead CPU. 899 * a dead CPU.
883 */ 900 */
884 trace_workqueue_destruction(cwq->thread); 901 trace_workqueue_destruction(cwq->thread);
885 kthread_stop(cwq->thread); 902 kthread_stop(cwq->thread);
886 cwq->thread = NULL; 903 cwq->thread = NULL;
887 } 904 }
888 905
889 /** 906 /**
890 * destroy_workqueue - safely terminate a workqueue 907 * destroy_workqueue - safely terminate a workqueue
891 * @wq: target workqueue 908 * @wq: target workqueue
892 * 909 *
893 * Safely destroy a workqueue. All work currently pending will be done first. 910 * Safely destroy a workqueue. All work currently pending will be done first.
894 */ 911 */
895 void destroy_workqueue(struct workqueue_struct *wq) 912 void destroy_workqueue(struct workqueue_struct *wq)
896 { 913 {
897 const struct cpumask *cpu_map = wq_cpu_map(wq); 914 const struct cpumask *cpu_map = wq_cpu_map(wq);
898 int cpu; 915 int cpu;
899 916
900 cpu_maps_update_begin(); 917 cpu_maps_update_begin();
901 spin_lock(&workqueue_lock); 918 spin_lock(&workqueue_lock);
902 list_del(&wq->list); 919 list_del(&wq->list);
903 spin_unlock(&workqueue_lock); 920 spin_unlock(&workqueue_lock);
904 921
905 for_each_cpu(cpu, cpu_map) 922 for_each_cpu(cpu, cpu_map)
906 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); 923 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
907 cpu_maps_update_done(); 924 cpu_maps_update_done();
908 925
909 free_percpu(wq->cpu_wq); 926 free_percpu(wq->cpu_wq);
910 kfree(wq); 927 kfree(wq);
911 } 928 }
912 EXPORT_SYMBOL_GPL(destroy_workqueue); 929 EXPORT_SYMBOL_GPL(destroy_workqueue);
913 930
914 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 931 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
915 unsigned long action, 932 unsigned long action,
916 void *hcpu) 933 void *hcpu)
917 { 934 {
918 unsigned int cpu = (unsigned long)hcpu; 935 unsigned int cpu = (unsigned long)hcpu;
919 struct cpu_workqueue_struct *cwq; 936 struct cpu_workqueue_struct *cwq;
920 struct workqueue_struct *wq; 937 struct workqueue_struct *wq;
921 int ret = NOTIFY_OK; 938 int ret = NOTIFY_OK;
922 939
923 action &= ~CPU_TASKS_FROZEN; 940 action &= ~CPU_TASKS_FROZEN;
924 941
925 switch (action) { 942 switch (action) {
926 case CPU_UP_PREPARE: 943 case CPU_UP_PREPARE:
927 cpumask_set_cpu(cpu, cpu_populated_map); 944 cpumask_set_cpu(cpu, cpu_populated_map);
928 } 945 }
929 undo: 946 undo:
930 list_for_each_entry(wq, &workqueues, list) { 947 list_for_each_entry(wq, &workqueues, list) {
931 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 948 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
932 949
933 switch (action) { 950 switch (action) {
934 case CPU_UP_PREPARE: 951 case CPU_UP_PREPARE:
935 if (!create_workqueue_thread(cwq, cpu)) 952 if (!create_workqueue_thread(cwq, cpu))
936 break; 953 break;
937 printk(KERN_ERR "workqueue [%s] for %i failed\n", 954 printk(KERN_ERR "workqueue [%s] for %i failed\n",
938 wq->name, cpu); 955 wq->name, cpu);
939 action = CPU_UP_CANCELED; 956 action = CPU_UP_CANCELED;
940 ret = NOTIFY_BAD; 957 ret = NOTIFY_BAD;
941 goto undo; 958 goto undo;
942 959
943 case CPU_ONLINE: 960 case CPU_ONLINE:
944 start_workqueue_thread(cwq, cpu); 961 start_workqueue_thread(cwq, cpu);
945 break; 962 break;
946 963
947 case CPU_UP_CANCELED: 964 case CPU_UP_CANCELED:
948 start_workqueue_thread(cwq, -1); 965 start_workqueue_thread(cwq, -1);
949 case CPU_POST_DEAD: 966 case CPU_POST_DEAD:
950 cleanup_workqueue_thread(cwq); 967 cleanup_workqueue_thread(cwq);
951 break; 968 break;
952 } 969 }
953 } 970 }
954 971
955 switch (action) { 972 switch (action) {
956 case CPU_UP_CANCELED: 973 case CPU_UP_CANCELED:
957 case CPU_POST_DEAD: 974 case CPU_POST_DEAD:
958 cpumask_clear_cpu(cpu, cpu_populated_map); 975 cpumask_clear_cpu(cpu, cpu_populated_map);
959 } 976 }
960 977
961 return ret; 978 return ret;
962 } 979 }
963 980
964 #ifdef CONFIG_SMP 981 #ifdef CONFIG_SMP
965 982
966 struct work_for_cpu { 983 struct work_for_cpu {
967 struct completion completion; 984 struct completion completion;
968 long (*fn)(void *); 985 long (*fn)(void *);
969 void *arg; 986 void *arg;
970 long ret; 987 long ret;
971 }; 988 };
972 989
973 static int do_work_for_cpu(void *_wfc) 990 static int do_work_for_cpu(void *_wfc)
974 { 991 {
975 struct work_for_cpu *wfc = _wfc; 992 struct work_for_cpu *wfc = _wfc;
976 wfc->ret = wfc->fn(wfc->arg); 993 wfc->ret = wfc->fn(wfc->arg);
977 complete(&wfc->completion); 994 complete(&wfc->completion);
978 return 0; 995 return 0;
979 } 996 }
980 997
981 /** 998 /**
982 * work_on_cpu - run a function in user context on a particular cpu 999 * work_on_cpu - run a function in user context on a particular cpu
983 * @cpu: the cpu to run on 1000 * @cpu: the cpu to run on
984 * @fn: the function to run 1001 * @fn: the function to run
985 * @arg: the function arg 1002 * @arg: the function arg
986 * 1003 *
987 * This will return the value @fn returns. 1004 * This will return the value @fn returns.
988 * It is up to the caller to ensure that the cpu doesn't go offline. 1005 * It is up to the caller to ensure that the cpu doesn't go offline.
989 * The caller must not hold any locks which would prevent @fn from completing. 1006 * The caller must not hold any locks which would prevent @fn from completing.
990 */ 1007 */
991 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg) 1008 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
992 { 1009 {
993 struct task_struct *sub_thread; 1010 struct task_struct *sub_thread;
994 struct work_for_cpu wfc = { 1011 struct work_for_cpu wfc = {
995 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion), 1012 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
996 .fn = fn, 1013 .fn = fn,
997 .arg = arg, 1014 .arg = arg,
998 }; 1015 };
999 1016
1000 sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu"); 1017 sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
1001 if (IS_ERR(sub_thread)) 1018 if (IS_ERR(sub_thread))
1002 return PTR_ERR(sub_thread); 1019 return PTR_ERR(sub_thread);
1003 kthread_bind(sub_thread, cpu); 1020 kthread_bind(sub_thread, cpu);
1004 wake_up_process(sub_thread); 1021 wake_up_process(sub_thread);
1005 wait_for_completion(&wfc.completion); 1022 wait_for_completion(&wfc.completion);
1006 return wfc.ret; 1023 return wfc.ret;
1007 } 1024 }
1008 EXPORT_SYMBOL_GPL(work_on_cpu); 1025 EXPORT_SYMBOL_GPL(work_on_cpu);
1009 #endif /* CONFIG_SMP */ 1026 #endif /* CONFIG_SMP */
1010 1027
1011 void __init init_workqueues(void) 1028 void __init init_workqueues(void)
1012 { 1029 {
1013 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); 1030 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
1014 1031
1015 cpumask_copy(cpu_populated_map, cpu_online_mask); 1032 cpumask_copy(cpu_populated_map, cpu_online_mask);
1016 singlethread_cpu = cpumask_first(cpu_possible_mask); 1033 singlethread_cpu = cpumask_first(cpu_possible_mask);
1017 cpu_singlethread_map = cpumask_of(singlethread_cpu); 1034 cpu_singlethread_map = cpumask_of(singlethread_cpu);
1018 hotcpu_notifier(workqueue_cpu_callback, 0); 1035 hotcpu_notifier(workqueue_cpu_callback, 0);
1019 keventd_wq = create_workqueue("events"); 1036 keventd_wq = create_workqueue("events");
1020 BUG_ON(!keventd_wq); 1037 BUG_ON(!keventd_wq);
1021 } 1038 }
1022 1039