Blame view

fs/io-wq.c 26.7 KB
771b53d03   Jens Axboe   io-wq: small thre...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  // SPDX-License-Identifier: GPL-2.0
  /*
   * Basic worker thread pool for io_uring
   *
   * Copyright (C) 2019 Jens Axboe
   *
   */
  #include <linux/kernel.h>
  #include <linux/init.h>
  #include <linux/errno.h>
  #include <linux/sched/signal.h>
  #include <linux/mm.h>
  #include <linux/mmu_context.h>
  #include <linux/sched/mm.h>
  #include <linux/percpu.h>
  #include <linux/slab.h>
  #include <linux/kthread.h>
  #include <linux/rculist_nulls.h>
9392a27d8   Jens Axboe   io-wq: add suppor...
19
  #include <linux/fs_struct.h>
771b53d03   Jens Axboe   io-wq: small thre...
20
21
22
23
24
25
26
27
28
29
30
  
  #include "io-wq.h"
  
  #define WORKER_IDLE_TIMEOUT	(5 * HZ)
  
  enum {
  	IO_WORKER_F_UP		= 1,	/* up and active */
  	IO_WORKER_F_RUNNING	= 2,	/* account as running */
  	IO_WORKER_F_FREE	= 4,	/* worker on free list */
  	IO_WORKER_F_EXITING	= 8,	/* worker exiting */
  	IO_WORKER_F_FIXED	= 16,	/* static idle worker */
c5def4ab8   Jens Axboe   io-wq: add suppor...
31
  	IO_WORKER_F_BOUND	= 32,	/* is doing bounded work */
771b53d03   Jens Axboe   io-wq: small thre...
32
33
34
35
36
  };
  
  enum {
  	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
  	IO_WQ_BIT_CANCEL	= 1,	/* cancel work on list */
b60fda600   Jens Axboe   io-wq: wait for i...
37
  	IO_WQ_BIT_ERROR		= 2,	/* error on setup */
771b53d03   Jens Axboe   io-wq: small thre...
38
39
40
41
42
43
44
45
46
47
48
49
50
  };
  
  enum {
  	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
  };
  
  /*
   * One for each thread in a wqe pool
   */
  struct io_worker {
  	refcount_t ref;
  	unsigned flags;
  	struct hlist_nulls_node nulls_node;
e61df66c6   Jens Axboe   io-wq: ensure fre...
51
  	struct list_head all_list;
771b53d03   Jens Axboe   io-wq: small thre...
52
  	struct task_struct *task;
771b53d03   Jens Axboe   io-wq: small thre...
53
  	struct io_wqe *wqe;
36c2f9223   Jens Axboe   io-wq: ensure we ...
54

771b53d03   Jens Axboe   io-wq: small thre...
55
  	struct io_wq_work *cur_work;
36c2f9223   Jens Axboe   io-wq: ensure we ...
56
  	spinlock_t lock;
771b53d03   Jens Axboe   io-wq: small thre...
57
58
59
  
  	struct rcu_head rcu;
  	struct mm_struct *mm;
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
60
61
  	const struct cred *cur_creds;
  	const struct cred *saved_creds;
fcb323cc5   Jens Axboe   io_uring: io_urin...
62
  	struct files_struct *restore_files;
9392a27d8   Jens Axboe   io-wq: add suppor...
63
  	struct fs_struct *restore_fs;
771b53d03   Jens Axboe   io-wq: small thre...
64
  };
771b53d03   Jens Axboe   io-wq: small thre...
65
66
67
68
69
  #if BITS_PER_LONG == 64
  #define IO_WQ_HASH_ORDER	6
  #else
  #define IO_WQ_HASH_ORDER	5
  #endif
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
70
  #define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)
c5def4ab8   Jens Axboe   io-wq: add suppor...
71
72
73
74
75
76
77
78
79
80
  struct io_wqe_acct {
  	unsigned nr_workers;
  	unsigned max_workers;
  	atomic_t nr_running;
  };
  
  enum {
  	IO_WQ_ACCT_BOUND,
  	IO_WQ_ACCT_UNBOUND,
  };
771b53d03   Jens Axboe   io-wq: small thre...
81
82
83
84
85
86
  /*
   * Per-node worker thread pool
   */
  struct io_wqe {
  	struct {
  		spinlock_t lock;
6206f0e18   Jens Axboe   io-wq: shrink io_...
87
  		struct io_wq_work_list work_list;
771b53d03   Jens Axboe   io-wq: small thre...
88
89
90
91
92
  		unsigned long hash_map;
  		unsigned flags;
  	} ____cacheline_aligned_in_smp;
  
  	int node;
c5def4ab8   Jens Axboe   io-wq: add suppor...
93
  	struct io_wqe_acct acct[2];
771b53d03   Jens Axboe   io-wq: small thre...
94

021d1cdda   Jens Axboe   io-wq: remove now...
95
  	struct hlist_nulls_head free_list;
e61df66c6   Jens Axboe   io-wq: ensure fre...
96
  	struct list_head all_list;
771b53d03   Jens Axboe   io-wq: small thre...
97
98
  
  	struct io_wq *wq;
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
99
  	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
771b53d03   Jens Axboe   io-wq: small thre...
100
101
102
103
104
105
106
107
  };
  
  /*
   * Per io_wq state
    */
  struct io_wq {
  	struct io_wqe **wqes;
  	unsigned long state;
771b53d03   Jens Axboe   io-wq: small thre...
108

e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
109
  	free_work_fn *free_work;
7d7230652   Jens Axboe   io_wq: add get/pu...
110

771b53d03   Jens Axboe   io-wq: small thre...
111
  	struct task_struct *manager;
c5def4ab8   Jens Axboe   io-wq: add suppor...
112
  	struct user_struct *user;
771b53d03   Jens Axboe   io-wq: small thre...
113
114
  	refcount_t refs;
  	struct completion done;
848f7e188   Jens Axboe   io-wq: make the i...
115
116
  
  	refcount_t use_refs;
771b53d03   Jens Axboe   io-wq: small thre...
117
  };
771b53d03   Jens Axboe   io-wq: small thre...
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
  static bool io_worker_get(struct io_worker *worker)
  {
  	return refcount_inc_not_zero(&worker->ref);
  }
  
  static void io_worker_release(struct io_worker *worker)
  {
  	if (refcount_dec_and_test(&worker->ref))
  		wake_up_process(worker->task);
  }
  
  /*
   * Note: drops the wqe->lock if returning true! The caller must re-acquire
   * the lock in that case. Some callers need to restart handling if this
   * happens, so we can't just re-acquire the lock on behalf of the caller.
   */
  static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
  {
fcb323cc5   Jens Axboe   io_uring: io_urin...
136
  	bool dropped_lock = false;
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
137
138
139
  	if (worker->saved_creds) {
  		revert_creds(worker->saved_creds);
  		worker->cur_creds = worker->saved_creds = NULL;
181e448d8   Jens Axboe   io_uring: async w...
140
  	}
fcb323cc5   Jens Axboe   io_uring: io_urin...
141
142
143
144
145
146
147
148
149
  	if (current->files != worker->restore_files) {
  		__acquire(&wqe->lock);
  		spin_unlock_irq(&wqe->lock);
  		dropped_lock = true;
  
  		task_lock(current);
  		current->files = worker->restore_files;
  		task_unlock(current);
  	}
9392a27d8   Jens Axboe   io-wq: add suppor...
150
151
  	if (current->fs != worker->restore_fs)
  		current->fs = worker->restore_fs;
771b53d03   Jens Axboe   io-wq: small thre...
152
153
154
155
156
  	/*
  	 * If we have an active mm, we need to drop the wq lock before unusing
  	 * it. If we do, return true and let the caller retry the idle loop.
  	 */
  	if (worker->mm) {
fcb323cc5   Jens Axboe   io_uring: io_urin...
157
158
159
160
161
  		if (!dropped_lock) {
  			__acquire(&wqe->lock);
  			spin_unlock_irq(&wqe->lock);
  			dropped_lock = true;
  		}
771b53d03   Jens Axboe   io-wq: small thre...
162
163
164
165
166
  		__set_current_state(TASK_RUNNING);
  		set_fs(KERNEL_DS);
  		unuse_mm(worker->mm);
  		mmput(worker->mm);
  		worker->mm = NULL;
771b53d03   Jens Axboe   io-wq: small thre...
167
  	}
fcb323cc5   Jens Axboe   io_uring: io_urin...
168
  	return dropped_lock;
771b53d03   Jens Axboe   io-wq: small thre...
169
  }
c5def4ab8   Jens Axboe   io-wq: add suppor...
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
  static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
  						   struct io_wq_work *work)
  {
  	if (work->flags & IO_WQ_WORK_UNBOUND)
  		return &wqe->acct[IO_WQ_ACCT_UNBOUND];
  
  	return &wqe->acct[IO_WQ_ACCT_BOUND];
  }
  
  static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
  						  struct io_worker *worker)
  {
  	if (worker->flags & IO_WORKER_F_BOUND)
  		return &wqe->acct[IO_WQ_ACCT_BOUND];
  
  	return &wqe->acct[IO_WQ_ACCT_UNBOUND];
  }
771b53d03   Jens Axboe   io-wq: small thre...
187
188
189
  static void io_worker_exit(struct io_worker *worker)
  {
  	struct io_wqe *wqe = worker->wqe;
c5def4ab8   Jens Axboe   io-wq: add suppor...
190
191
  	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
  	unsigned nr_workers;
771b53d03   Jens Axboe   io-wq: small thre...
192
193
194
195
196
197
198
199
200
201
202
203
204
  
  	/*
  	 * If we're not at zero, someone else is holding a brief reference
  	 * to the worker. Wait for that to go away.
  	 */
  	set_current_state(TASK_INTERRUPTIBLE);
  	if (!refcount_dec_and_test(&worker->ref))
  		schedule();
  	__set_current_state(TASK_RUNNING);
  
  	preempt_disable();
  	current->flags &= ~PF_IO_WORKER;
  	if (worker->flags & IO_WORKER_F_RUNNING)
c5def4ab8   Jens Axboe   io-wq: add suppor...
205
206
207
  		atomic_dec(&acct->nr_running);
  	if (!(worker->flags & IO_WORKER_F_BOUND))
  		atomic_dec(&wqe->wq->user->processes);
771b53d03   Jens Axboe   io-wq: small thre...
208
209
210
211
212
  	worker->flags = 0;
  	preempt_enable();
  
  	spin_lock_irq(&wqe->lock);
  	hlist_nulls_del_rcu(&worker->nulls_node);
e61df66c6   Jens Axboe   io-wq: ensure fre...
213
  	list_del_rcu(&worker->all_list);
771b53d03   Jens Axboe   io-wq: small thre...
214
215
216
217
  	if (__io_worker_unuse(wqe, worker)) {
  		__release(&wqe->lock);
  		spin_lock_irq(&wqe->lock);
  	}
c5def4ab8   Jens Axboe   io-wq: add suppor...
218
219
220
  	acct->nr_workers--;
  	nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
  			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
771b53d03   Jens Axboe   io-wq: small thre...
221
222
223
  	spin_unlock_irq(&wqe->lock);
  
  	/* all workers gone, wq exit can proceed */
c5def4ab8   Jens Axboe   io-wq: add suppor...
224
  	if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
771b53d03   Jens Axboe   io-wq: small thre...
225
  		complete(&wqe->wq->done);
364b05fd0   YueHaibing   io-wq: use kfree_...
226
  	kfree_rcu(worker, rcu);
771b53d03   Jens Axboe   io-wq: small thre...
227
  }
c5def4ab8   Jens Axboe   io-wq: add suppor...
228
229
230
  static inline bool io_wqe_run_queue(struct io_wqe *wqe)
  	__must_hold(wqe->lock)
  {
6206f0e18   Jens Axboe   io-wq: shrink io_...
231
232
  	if (!wq_list_empty(&wqe->work_list) &&
  	    !(wqe->flags & IO_WQE_FLAG_STALLED))
c5def4ab8   Jens Axboe   io-wq: add suppor...
233
234
235
236
237
238
239
240
241
242
243
244
245
  		return true;
  	return false;
  }
  
  /*
   * Check head of free list for an available worker. If one isn't available,
   * caller must wake up the wq manager to create one.
   */
  static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
  	__must_hold(RCU)
  {
  	struct hlist_nulls_node *n;
  	struct io_worker *worker;
021d1cdda   Jens Axboe   io-wq: remove now...
246
  	n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
c5def4ab8   Jens Axboe   io-wq: add suppor...
247
248
249
250
251
  	if (is_a_nulls(n))
  		return false;
  
  	worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
  	if (io_worker_get(worker)) {
506d95ff5   Jens Axboe   io-wq: remove wor...
252
  		wake_up_process(worker->task);
c5def4ab8   Jens Axboe   io-wq: add suppor...
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
  		io_worker_release(worker);
  		return true;
  	}
  
  	return false;
  }
  
  /*
   * We need a worker. If we find a free one, we're good. If not, and we're
   * below the max number of workers, wake up the manager to create one.
   */
  static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
  {
  	bool ret;
  
  	/*
  	 * Most likely an attempt to queue unbounded work on an io_wq that
  	 * wasn't setup with any unbounded workers.
  	 */
  	WARN_ON_ONCE(!acct->max_workers);
  
  	rcu_read_lock();
  	ret = io_wqe_activate_free_worker(wqe);
  	rcu_read_unlock();
  
  	if (!ret && acct->nr_workers < acct->max_workers)
  		wake_up_process(wqe->wq->manager);
  }
  
  static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
  {
  	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
  
  	atomic_inc(&acct->nr_running);
  }
  
  static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
  	__must_hold(wqe->lock)
  {
  	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
  
  	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
  		io_wqe_wake_worker(wqe, acct);
  }
771b53d03   Jens Axboe   io-wq: small thre...
297
298
299
300
301
302
303
  static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
  {
  	allow_kernel_signal(SIGINT);
  
  	current->flags |= PF_IO_WORKER;
  
  	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
fcb323cc5   Jens Axboe   io_uring: io_urin...
304
  	worker->restore_files = current->files;
9392a27d8   Jens Axboe   io-wq: add suppor...
305
  	worker->restore_fs = current->fs;
c5def4ab8   Jens Axboe   io-wq: add suppor...
306
  	io_wqe_inc_running(wqe, worker);
771b53d03   Jens Axboe   io-wq: small thre...
307
308
309
310
311
312
313
314
315
316
  }
  
  /*
   * Worker will start processing some work. Move it to the busy list, if
   * it's currently on the freelist
   */
  static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
  			     struct io_wq_work *work)
  	__must_hold(wqe->lock)
  {
c5def4ab8   Jens Axboe   io-wq: add suppor...
317
  	bool worker_bound, work_bound;
771b53d03   Jens Axboe   io-wq: small thre...
318
319
320
  	if (worker->flags & IO_WORKER_F_FREE) {
  		worker->flags &= ~IO_WORKER_F_FREE;
  		hlist_nulls_del_init_rcu(&worker->nulls_node);
771b53d03   Jens Axboe   io-wq: small thre...
321
  	}
c5def4ab8   Jens Axboe   io-wq: add suppor...
322
323
324
325
326
  
  	/*
  	 * If worker is moving from bound to unbound (or vice versa), then
  	 * ensure we update the running accounting.
  	 */
b2e9c7d64   Dan Carpenter   io-wq: remove ext...
327
328
329
  	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
  	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
  	if (worker_bound != work_bound) {
c5def4ab8   Jens Axboe   io-wq: add suppor...
330
331
332
333
334
335
336
337
338
339
340
341
342
343
  		io_wqe_dec_running(wqe, worker);
  		if (work_bound) {
  			worker->flags |= IO_WORKER_F_BOUND;
  			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
  			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
  			atomic_dec(&wqe->wq->user->processes);
  		} else {
  			worker->flags &= ~IO_WORKER_F_BOUND;
  			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
  			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
  			atomic_inc(&wqe->wq->user->processes);
  		}
  		io_wqe_inc_running(wqe, worker);
  	 }
771b53d03   Jens Axboe   io-wq: small thre...
344
345
346
347
348
349
350
351
352
353
354
355
356
357
  }
  
  /*
   * No work, worker going to sleep. Move to freelist, and unuse mm if we
   * have one attached. Dropping the mm may potentially sleep, so we drop
   * the lock in that case and return success. Since the caller has to
   * retry the loop in that case (we changed task state), we don't regrab
   * the lock if we return success.
   */
  static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
  	__must_hold(wqe->lock)
  {
  	if (!(worker->flags & IO_WORKER_F_FREE)) {
  		worker->flags |= IO_WORKER_F_FREE;
021d1cdda   Jens Axboe   io-wq: remove now...
358
  		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
771b53d03   Jens Axboe   io-wq: small thre...
359
360
361
362
  	}
  
  	return __io_worker_unuse(wqe, worker);
  }
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
363
364
365
366
367
368
  static inline unsigned int io_get_work_hash(struct io_wq_work *work)
  {
  	return work->flags >> IO_WQ_HASH_SHIFT;
  }
  
  static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
771b53d03   Jens Axboe   io-wq: small thre...
369
370
  	__must_hold(wqe->lock)
  {
6206f0e18   Jens Axboe   io-wq: shrink io_...
371
  	struct io_wq_work_node *node, *prev;
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
372
  	struct io_wq_work *work, *tail;
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
373
  	unsigned int hash;
771b53d03   Jens Axboe   io-wq: small thre...
374

6206f0e18   Jens Axboe   io-wq: shrink io_...
375
376
  	wq_list_for_each(node, prev, &wqe->work_list) {
  		work = container_of(node, struct io_wq_work, list);
771b53d03   Jens Axboe   io-wq: small thre...
377
  		/* not hashed, can run anytime */
8766dd516   Pavel Begunkov   io-wq: split hash...
378
  		if (!io_wq_is_hashed(work)) {
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
379
  			wq_list_del(&wqe->work_list, node, prev);
771b53d03   Jens Axboe   io-wq: small thre...
380
381
382
383
  			return work;
  		}
  
  		/* hashed, can run if not already running */
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
384
385
386
  		hash = io_get_work_hash(work);
  		if (!(wqe->hash_map & BIT(hash))) {
  			wqe->hash_map |= BIT(hash);
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
387
388
389
390
  			/* all items with this hash lie in [work, tail] */
  			tail = wqe->hash_tail[hash];
  			wqe->hash_tail[hash] = NULL;
  			wq_list_cut(&wqe->work_list, &tail->list, prev);
771b53d03   Jens Axboe   io-wq: small thre...
391
392
393
394
395
396
  			return work;
  		}
  	}
  
  	return NULL;
  }
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
  static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
  {
  	if (worker->mm) {
  		unuse_mm(worker->mm);
  		mmput(worker->mm);
  		worker->mm = NULL;
  	}
  	if (!work->mm) {
  		set_fs(KERNEL_DS);
  		return;
  	}
  	if (mmget_not_zero(work->mm)) {
  		use_mm(work->mm);
  		if (!worker->mm)
  			set_fs(USER_DS);
  		worker->mm = work->mm;
  		/* hang on to this mm */
  		work->mm = NULL;
  		return;
  	}
  
  	/* failed grabbing mm, ensure work gets cancelled */
  	work->flags |= IO_WQ_WORK_CANCEL;
  }
  
  static void io_wq_switch_creds(struct io_worker *worker,
  			       struct io_wq_work *work)
  {
  	const struct cred *old_creds = override_creds(work->creds);
  
  	worker->cur_creds = work->creds;
  	if (worker->saved_creds)
  		put_cred(old_creds); /* creds set by previous switch */
  	else
  		worker->saved_creds = old_creds;
  }
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
  static void io_impersonate_work(struct io_worker *worker,
  				struct io_wq_work *work)
  {
  	if (work->files && current->files != work->files) {
  		task_lock(current);
  		current->files = work->files;
  		task_unlock(current);
  	}
  	if (work->fs && current->fs != work->fs)
  		current->fs = work->fs;
  	if (work->mm != worker->mm)
  		io_wq_switch_mm(worker, work);
  	if (worker->cur_creds != work->creds)
  		io_wq_switch_creds(worker, work);
  }
  
  static void io_assign_current_work(struct io_worker *worker,
  				   struct io_wq_work *work)
  {
d78298e73   Pavel Begunkov   io-wq: don't resc...
452
453
454
455
456
457
  	if (work) {
  		/* flush pending signals before assigning new work */
  		if (signal_pending(current))
  			flush_signals(current);
  		cond_resched();
  	}
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
458
459
460
461
462
  
  	spin_lock_irq(&worker->lock);
  	worker->cur_work = work;
  	spin_unlock_irq(&worker->lock);
  }
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
463
  static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
771b53d03   Jens Axboe   io-wq: small thre...
464
465
466
  static void io_worker_handle_work(struct io_worker *worker)
  	__releases(wqe->lock)
  {
771b53d03   Jens Axboe   io-wq: small thre...
467
468
469
470
  	struct io_wqe *wqe = worker->wqe;
  	struct io_wq *wq = wqe->wq;
  
  	do {
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
471
  		struct io_wq_work *work;
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
472
  		unsigned int hash;
f462fd36f   Pavel Begunkov   io-wq: optimise o...
473
  get_next:
771b53d03   Jens Axboe   io-wq: small thre...
474
  		/*
771b53d03   Jens Axboe   io-wq: small thre...
475
476
477
478
479
480
  		 * If we got some work, mark us as busy. If we didn't, but
  		 * the list isn't empty, it means we stalled on hashed work.
  		 * Mark us stalled so we don't keep looking for work when we
  		 * can't make progress, any work completion or insertion will
  		 * clear the stalled flag.
  		 */
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
481
  		work = io_get_next_work(wqe);
771b53d03   Jens Axboe   io-wq: small thre...
482
483
  		if (work)
  			__io_worker_busy(wqe, worker, work);
6206f0e18   Jens Axboe   io-wq: shrink io_...
484
  		else if (!wq_list_empty(&wqe->work_list))
771b53d03   Jens Axboe   io-wq: small thre...
485
486
487
488
489
  			wqe->flags |= IO_WQE_FLAG_STALLED;
  
  		spin_unlock_irq(&wqe->lock);
  		if (!work)
  			break;
58e393198   Pavel Begunkov   io-wq: optimise l...
490
  		io_assign_current_work(worker, work);
36c2f9223   Jens Axboe   io-wq: ensure we ...
491

dc026a73c   Pavel Begunkov   io-wq: shuffle io...
492
493
  		/* handle a whole dependent link */
  		do {
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
494
  			struct io_wq_work *old_work, *next_hashed, *linked;
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
495

86f3cd1b5   Pavel Begunkov   io-wq: handle has...
496
  			next_hashed = wq_next_work(work);
58e393198   Pavel Begunkov   io-wq: optimise l...
497
  			io_impersonate_work(worker, work);
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
498
499
500
501
502
503
  			/*
  			 * OK to set IO_WQ_WORK_CANCEL even for uncancellable
  			 * work, the worker function will do the right thing.
  			 */
  			if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
  				work->flags |= IO_WQ_WORK_CANCEL;
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
504
  			hash = io_get_work_hash(work);
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
505
506
507
508
509
510
511
512
513
514
  			linked = old_work = work;
  			linked->func(&linked);
  			linked = (old_work == linked) ? NULL : linked;
  
  			work = next_hashed;
  			if (!work && linked && !io_wq_is_hashed(linked)) {
  				work = linked;
  				linked = NULL;
  			}
  			io_assign_current_work(worker, work);
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
515
  			wq->free_work(old_work);
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
516

86f3cd1b5   Pavel Begunkov   io-wq: handle has...
517
518
519
520
  			if (linked)
  				io_wqe_enqueue(wqe, linked);
  
  			if (hash != -1U && !next_hashed) {
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
521
522
523
  				spin_lock_irq(&wqe->lock);
  				wqe->hash_map &= ~BIT_ULL(hash);
  				wqe->flags &= ~IO_WQE_FLAG_STALLED;
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
524
525
  				/* dependent work is not hashed */
  				hash = -1U;
f462fd36f   Pavel Begunkov   io-wq: optimise o...
526
527
528
529
  				/* skip unnecessary unlock-lock wqe->lock */
  				if (!work)
  					goto get_next;
  				spin_unlock_irq(&wqe->lock);
7d7230652   Jens Axboe   io_wq: add get/pu...
530
  			}
58e393198   Pavel Begunkov   io-wq: optimise l...
531
  		} while (work);
7d7230652   Jens Axboe   io_wq: add get/pu...
532

dc026a73c   Pavel Begunkov   io-wq: shuffle io...
533
  		spin_lock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
534
535
  	} while (1);
  }
771b53d03   Jens Axboe   io-wq: small thre...
536
537
538
539
540
  static int io_wqe_worker(void *data)
  {
  	struct io_worker *worker = data;
  	struct io_wqe *wqe = worker->wqe;
  	struct io_wq *wq = wqe->wq;
771b53d03   Jens Axboe   io-wq: small thre...
541
542
543
544
  
  	io_worker_start(wqe, worker);
  
  	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
506d95ff5   Jens Axboe   io-wq: remove wor...
545
  		set_current_state(TASK_INTERRUPTIBLE);
e995d5123   Jens Axboe   io-wq: briefly sp...
546
  loop:
771b53d03   Jens Axboe   io-wq: small thre...
547
548
549
550
  		spin_lock_irq(&wqe->lock);
  		if (io_wqe_run_queue(wqe)) {
  			__set_current_state(TASK_RUNNING);
  			io_worker_handle_work(worker);
e995d5123   Jens Axboe   io-wq: briefly sp...
551
  			goto loop;
771b53d03   Jens Axboe   io-wq: small thre...
552
553
554
555
  		}
  		/* drops the lock on success, retry */
  		if (__io_worker_idle(wqe, worker)) {
  			__release(&wqe->lock);
e995d5123   Jens Axboe   io-wq: briefly sp...
556
  			goto loop;
771b53d03   Jens Axboe   io-wq: small thre...
557
558
559
560
561
562
563
564
565
566
567
  		}
  		spin_unlock_irq(&wqe->lock);
  		if (signal_pending(current))
  			flush_signals(current);
  		if (schedule_timeout(WORKER_IDLE_TIMEOUT))
  			continue;
  		/* timed out, exit unless we're the fixed worker */
  		if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
  		    !(worker->flags & IO_WORKER_F_FIXED))
  			break;
  	}
771b53d03   Jens Axboe   io-wq: small thre...
568
569
  	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
  		spin_lock_irq(&wqe->lock);
6206f0e18   Jens Axboe   io-wq: shrink io_...
570
  		if (!wq_list_empty(&wqe->work_list))
771b53d03   Jens Axboe   io-wq: small thre...
571
572
573
574
575
576
577
578
579
580
  			io_worker_handle_work(worker);
  		else
  			spin_unlock_irq(&wqe->lock);
  	}
  
  	io_worker_exit(worker);
  	return 0;
  }
  
  /*
771b53d03   Jens Axboe   io-wq: small thre...
581
582
583
584
585
586
587
588
589
590
591
592
   * Called when a worker is scheduled in. Mark us as currently running.
   */
  void io_wq_worker_running(struct task_struct *tsk)
  {
  	struct io_worker *worker = kthread_data(tsk);
  	struct io_wqe *wqe = worker->wqe;
  
  	if (!(worker->flags & IO_WORKER_F_UP))
  		return;
  	if (worker->flags & IO_WORKER_F_RUNNING)
  		return;
  	worker->flags |= IO_WORKER_F_RUNNING;
c5def4ab8   Jens Axboe   io-wq: add suppor...
593
  	io_wqe_inc_running(wqe, worker);
771b53d03   Jens Axboe   io-wq: small thre...
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
  }
  
  /*
   * Called when worker is going to sleep. If there are no workers currently
   * running and we have work pending, wake up a free one or have the manager
   * set one up.
   */
  void io_wq_worker_sleeping(struct task_struct *tsk)
  {
  	struct io_worker *worker = kthread_data(tsk);
  	struct io_wqe *wqe = worker->wqe;
  
  	if (!(worker->flags & IO_WORKER_F_UP))
  		return;
  	if (!(worker->flags & IO_WORKER_F_RUNNING))
  		return;
  
  	worker->flags &= ~IO_WORKER_F_RUNNING;
  
  	spin_lock_irq(&wqe->lock);
c5def4ab8   Jens Axboe   io-wq: add suppor...
614
  	io_wqe_dec_running(wqe, worker);
771b53d03   Jens Axboe   io-wq: small thre...
615
616
  	spin_unlock_irq(&wqe->lock);
  }
b60fda600   Jens Axboe   io-wq: wait for i...
617
  static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
771b53d03   Jens Axboe   io-wq: small thre...
618
  {
c5def4ab8   Jens Axboe   io-wq: add suppor...
619
  	struct io_wqe_acct *acct =&wqe->acct[index];
771b53d03   Jens Axboe   io-wq: small thre...
620
  	struct io_worker *worker;
ad6e005ca   Jann Horn   io_uring: use kza...
621
  	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
771b53d03   Jens Axboe   io-wq: small thre...
622
  	if (!worker)
b60fda600   Jens Axboe   io-wq: wait for i...
623
  		return false;
771b53d03   Jens Axboe   io-wq: small thre...
624
625
626
  
  	refcount_set(&worker->ref, 1);
  	worker->nulls_node.pprev = NULL;
771b53d03   Jens Axboe   io-wq: small thre...
627
  	worker->wqe = wqe;
36c2f9223   Jens Axboe   io-wq: ensure we ...
628
  	spin_lock_init(&worker->lock);
771b53d03   Jens Axboe   io-wq: small thre...
629
630
  
  	worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
c5def4ab8   Jens Axboe   io-wq: add suppor...
631
  				"io_wqe_worker-%d/%d", index, wqe->node);
771b53d03   Jens Axboe   io-wq: small thre...
632
633
  	if (IS_ERR(worker->task)) {
  		kfree(worker);
b60fda600   Jens Axboe   io-wq: wait for i...
634
  		return false;
771b53d03   Jens Axboe   io-wq: small thre...
635
636
637
  	}
  
  	spin_lock_irq(&wqe->lock);
021d1cdda   Jens Axboe   io-wq: remove now...
638
  	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
e61df66c6   Jens Axboe   io-wq: ensure fre...
639
  	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
771b53d03   Jens Axboe   io-wq: small thre...
640
  	worker->flags |= IO_WORKER_F_FREE;
c5def4ab8   Jens Axboe   io-wq: add suppor...
641
642
643
  	if (index == IO_WQ_ACCT_BOUND)
  		worker->flags |= IO_WORKER_F_BOUND;
  	if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
771b53d03   Jens Axboe   io-wq: small thre...
644
  		worker->flags |= IO_WORKER_F_FIXED;
c5def4ab8   Jens Axboe   io-wq: add suppor...
645
  	acct->nr_workers++;
771b53d03   Jens Axboe   io-wq: small thre...
646
  	spin_unlock_irq(&wqe->lock);
c5def4ab8   Jens Axboe   io-wq: add suppor...
647
648
  	if (index == IO_WQ_ACCT_UNBOUND)
  		atomic_inc(&wq->user->processes);
771b53d03   Jens Axboe   io-wq: small thre...
649
  	wake_up_process(worker->task);
b60fda600   Jens Axboe   io-wq: wait for i...
650
  	return true;
771b53d03   Jens Axboe   io-wq: small thre...
651
  }
c5def4ab8   Jens Axboe   io-wq: add suppor...
652
  static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
771b53d03   Jens Axboe   io-wq: small thre...
653
654
  	__must_hold(wqe->lock)
  {
c5def4ab8   Jens Axboe   io-wq: add suppor...
655
  	struct io_wqe_acct *acct = &wqe->acct[index];
771b53d03   Jens Axboe   io-wq: small thre...
656

c5def4ab8   Jens Axboe   io-wq: add suppor...
657
  	/* if we have available workers or no work, no need */
021d1cdda   Jens Axboe   io-wq: remove now...
658
  	if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
c5def4ab8   Jens Axboe   io-wq: add suppor...
659
660
  		return false;
  	return acct->nr_workers < acct->max_workers;
771b53d03   Jens Axboe   io-wq: small thre...
661
662
663
664
665
666
667
668
  }
  
  /*
   * Manager thread. Tasked with creating new workers, if we need them.
   */
  static int io_wq_manager(void *data)
  {
  	struct io_wq *wq = data;
3fc50ab55   Jann Horn   io-wq: fix handli...
669
670
  	int workers_to_create = num_possible_nodes();
  	int node;
771b53d03   Jens Axboe   io-wq: small thre...
671

b60fda600   Jens Axboe   io-wq: wait for i...
672
  	/* create fixed workers */
3fc50ab55   Jann Horn   io-wq: fix handli...
673
674
  	refcount_set(&wq->refs, workers_to_create);
  	for_each_node(node) {
7563439ad   Jens Axboe   io-wq: don't call...
675
676
  		if (!node_online(node))
  			continue;
3fc50ab55   Jann Horn   io-wq: fix handli...
677
678
679
  		if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
  			goto err;
  		workers_to_create--;
b60fda600   Jens Axboe   io-wq: wait for i...
680
  	}
771b53d03   Jens Axboe   io-wq: small thre...
681

7563439ad   Jens Axboe   io-wq: don't call...
682
683
  	while (workers_to_create--)
  		refcount_dec(&wq->refs);
b60fda600   Jens Axboe   io-wq: wait for i...
684
685
686
  	complete(&wq->done);
  
  	while (!kthread_should_stop()) {
3fc50ab55   Jann Horn   io-wq: fix handli...
687
688
  		for_each_node(node) {
  			struct io_wqe *wqe = wq->wqes[node];
c5def4ab8   Jens Axboe   io-wq: add suppor...
689
  			bool fork_worker[2] = { false, false };
771b53d03   Jens Axboe   io-wq: small thre...
690

7563439ad   Jens Axboe   io-wq: don't call...
691
692
  			if (!node_online(node))
  				continue;
771b53d03   Jens Axboe   io-wq: small thre...
693
  			spin_lock_irq(&wqe->lock);
c5def4ab8   Jens Axboe   io-wq: add suppor...
694
695
696
697
  			if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
  				fork_worker[IO_WQ_ACCT_BOUND] = true;
  			if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
  				fork_worker[IO_WQ_ACCT_UNBOUND] = true;
771b53d03   Jens Axboe   io-wq: small thre...
698
  			spin_unlock_irq(&wqe->lock);
c5def4ab8   Jens Axboe   io-wq: add suppor...
699
700
701
702
  			if (fork_worker[IO_WQ_ACCT_BOUND])
  				create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
  			if (fork_worker[IO_WQ_ACCT_UNBOUND])
  				create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
771b53d03   Jens Axboe   io-wq: small thre...
703
704
705
706
707
708
  		}
  		set_current_state(TASK_INTERRUPTIBLE);
  		schedule_timeout(HZ);
  	}
  
  	return 0;
b60fda600   Jens Axboe   io-wq: wait for i...
709
710
711
  err:
  	set_bit(IO_WQ_BIT_ERROR, &wq->state);
  	set_bit(IO_WQ_BIT_EXIT, &wq->state);
3fc50ab55   Jann Horn   io-wq: fix handli...
712
  	if (refcount_sub_and_test(workers_to_create, &wq->refs))
b60fda600   Jens Axboe   io-wq: wait for i...
713
714
  		complete(&wq->done);
  	return 0;
771b53d03   Jens Axboe   io-wq: small thre...
715
  }
c5def4ab8   Jens Axboe   io-wq: add suppor...
716
717
718
719
720
721
722
723
724
725
726
  static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
  			    struct io_wq_work *work)
  {
  	bool free_worker;
  
  	if (!(work->flags & IO_WQ_WORK_UNBOUND))
  		return true;
  	if (atomic_read(&acct->nr_running))
  		return true;
  
  	rcu_read_lock();
021d1cdda   Jens Axboe   io-wq: remove now...
727
  	free_worker = !hlist_nulls_empty(&wqe->free_list);
c5def4ab8   Jens Axboe   io-wq: add suppor...
728
729
730
731
732
733
734
735
736
737
  	rcu_read_unlock();
  	if (free_worker)
  		return true;
  
  	if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
  	    !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
  		return false;
  
  	return true;
  }
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
738
  static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
fc04c39ba   Pavel Begunkov   io-wq: fix IO_WQ_...
739
  {
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
740
  	struct io_wq *wq = wqe->wq;
fc04c39ba   Pavel Begunkov   io-wq: fix IO_WQ_...
741
742
743
744
745
746
  	do {
  		struct io_wq_work *old_work = work;
  
  		work->flags |= IO_WQ_WORK_CANCEL;
  		work->func(&work);
  		work = (work == old_work) ? NULL : work;
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
747
  		wq->free_work(old_work);
fc04c39ba   Pavel Begunkov   io-wq: fix IO_WQ_...
748
749
  	} while (work);
  }
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
  static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
  {
  	unsigned int hash;
  	struct io_wq_work *tail;
  
  	if (!io_wq_is_hashed(work)) {
  append:
  		wq_list_add_tail(&work->list, &wqe->work_list);
  		return;
  	}
  
  	hash = io_get_work_hash(work);
  	tail = wqe->hash_tail[hash];
  	wqe->hash_tail[hash] = work;
  	if (!tail)
  		goto append;
  
  	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
  }
771b53d03   Jens Axboe   io-wq: small thre...
769
770
  static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
  {
c5def4ab8   Jens Axboe   io-wq: add suppor...
771
  	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
895e2ca0f   Jens Axboe   io-wq: support co...
772
  	int work_flags;
771b53d03   Jens Axboe   io-wq: small thre...
773
  	unsigned long flags;
c5def4ab8   Jens Axboe   io-wq: add suppor...
774
775
776
777
778
779
780
  	/*
  	 * Do early check to see if we need a new unbound worker, and if we do,
  	 * if we're allowed to do so. This isn't 100% accurate as there's a
  	 * gap between this check and incrementing the value, but that's OK.
  	 * It's close enough to not be an issue, fork() has the same delay.
  	 */
  	if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
781
  		io_run_cancel(work, wqe);
c5def4ab8   Jens Axboe   io-wq: add suppor...
782
783
  		return;
  	}
895e2ca0f   Jens Axboe   io-wq: support co...
784
  	work_flags = work->flags;
771b53d03   Jens Axboe   io-wq: small thre...
785
  	spin_lock_irqsave(&wqe->lock, flags);
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
786
  	io_wqe_insert_work(wqe, work);
771b53d03   Jens Axboe   io-wq: small thre...
787
788
  	wqe->flags &= ~IO_WQE_FLAG_STALLED;
  	spin_unlock_irqrestore(&wqe->lock, flags);
895e2ca0f   Jens Axboe   io-wq: support co...
789
790
  	if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
  	    !atomic_read(&acct->nr_running))
c5def4ab8   Jens Axboe   io-wq: add suppor...
791
  		io_wqe_wake_worker(wqe, acct);
771b53d03   Jens Axboe   io-wq: small thre...
792
793
794
795
796
797
798
799
800
801
  }
  
  void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
  {
  	struct io_wqe *wqe = wq->wqes[numa_node_id()];
  
  	io_wqe_enqueue(wqe, work);
  }
  
  /*
8766dd516   Pavel Begunkov   io-wq: split hash...
802
803
   * Work items that hash to the same value will not be done in parallel.
   * Used to limit concurrent writes, generally hashed by inode.
771b53d03   Jens Axboe   io-wq: small thre...
804
   */
8766dd516   Pavel Begunkov   io-wq: split hash...
805
  void io_wq_hash_work(struct io_wq_work *work, void *val)
771b53d03   Jens Axboe   io-wq: small thre...
806
  {
8766dd516   Pavel Begunkov   io-wq: split hash...
807
  	unsigned int bit;
771b53d03   Jens Axboe   io-wq: small thre...
808
809
810
  
  	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
  	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
771b53d03   Jens Axboe   io-wq: small thre...
811
812
813
814
815
816
817
818
819
820
821
822
823
  }
  
  static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
  {
  	send_sig(SIGINT, worker->task, 1);
  	return false;
  }
  
  /*
   * Iterate the passed in list and call the specific function for each
   * worker that isn't exiting
   */
  static bool io_wq_for_each_worker(struct io_wqe *wqe,
771b53d03   Jens Axboe   io-wq: small thre...
824
825
826
  				  bool (*func)(struct io_worker *, void *),
  				  void *data)
  {
771b53d03   Jens Axboe   io-wq: small thre...
827
828
  	struct io_worker *worker;
  	bool ret = false;
e61df66c6   Jens Axboe   io-wq: ensure fre...
829
  	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
771b53d03   Jens Axboe   io-wq: small thre...
830
  		if (io_worker_get(worker)) {
7563439ad   Jens Axboe   io-wq: don't call...
831
832
833
  			/* no task if node is/was offline */
  			if (worker->task)
  				ret = func(worker, data);
771b53d03   Jens Axboe   io-wq: small thre...
834
835
836
837
838
  			io_worker_release(worker);
  			if (ret)
  				break;
  		}
  	}
e61df66c6   Jens Axboe   io-wq: ensure fre...
839

771b53d03   Jens Axboe   io-wq: small thre...
840
841
842
843
844
  	return ret;
  }
  
  void io_wq_cancel_all(struct io_wq *wq)
  {
3fc50ab55   Jann Horn   io-wq: fix handli...
845
  	int node;
771b53d03   Jens Axboe   io-wq: small thre...
846
847
  
  	set_bit(IO_WQ_BIT_CANCEL, &wq->state);
771b53d03   Jens Axboe   io-wq: small thre...
848
  	rcu_read_lock();
3fc50ab55   Jann Horn   io-wq: fix handli...
849
850
  	for_each_node(node) {
  		struct io_wqe *wqe = wq->wqes[node];
771b53d03   Jens Axboe   io-wq: small thre...
851

e61df66c6   Jens Axboe   io-wq: ensure fre...
852
  		io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
771b53d03   Jens Axboe   io-wq: small thre...
853
854
855
  	}
  	rcu_read_unlock();
  }
62755e35d   Jens Axboe   io_uring: support...
856
  struct io_cb_cancel_data {
2293b4195   Pavel Begunkov   io-wq: remove dup...
857
858
  	work_cancel_fn *fn;
  	void *data;
62755e35d   Jens Axboe   io_uring: support...
859
  };
2293b4195   Pavel Begunkov   io-wq: remove dup...
860
  static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
62755e35d   Jens Axboe   io_uring: support...
861
  {
2293b4195   Pavel Begunkov   io-wq: remove dup...
862
  	struct io_cb_cancel_data *match = data;
6f72653e7   Jens Axboe   io-wq: use proper...
863
  	unsigned long flags;
62755e35d   Jens Axboe   io_uring: support...
864
865
866
867
  	bool ret = false;
  
  	/*
  	 * Hold the lock to avoid ->cur_work going out of scope, caller
36c2f9223   Jens Axboe   io-wq: ensure we ...
868
  	 * may dereference the passed in work.
62755e35d   Jens Axboe   io_uring: support...
869
  	 */
36c2f9223   Jens Axboe   io-wq: ensure we ...
870
  	spin_lock_irqsave(&worker->lock, flags);
62755e35d   Jens Axboe   io_uring: support...
871
  	if (worker->cur_work &&
0c9d5ccd2   Jens Axboe   io-wq: add suppor...
872
  	    !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
2293b4195   Pavel Begunkov   io-wq: remove dup...
873
  	    match->fn(worker->cur_work, match->data)) {
771b53d03   Jens Axboe   io-wq: small thre...
874
  		send_sig(SIGINT, worker->task, 1);
36c2f9223   Jens Axboe   io-wq: ensure we ...
875
  		ret = true;
771b53d03   Jens Axboe   io-wq: small thre...
876
  	}
36c2f9223   Jens Axboe   io-wq: ensure we ...
877
  	spin_unlock_irqrestore(&worker->lock, flags);
771b53d03   Jens Axboe   io-wq: small thre...
878

36c2f9223   Jens Axboe   io-wq: ensure we ...
879
  	return ret;
771b53d03   Jens Axboe   io-wq: small thre...
880
881
882
  }
  
  static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
2293b4195   Pavel Begunkov   io-wq: remove dup...
883
  					    struct io_cb_cancel_data *match)
771b53d03   Jens Axboe   io-wq: small thre...
884
  {
6206f0e18   Jens Axboe   io-wq: shrink io_...
885
  	struct io_wq_work_node *node, *prev;
771b53d03   Jens Axboe   io-wq: small thre...
886
  	struct io_wq_work *work;
6f72653e7   Jens Axboe   io-wq: use proper...
887
  	unsigned long flags;
771b53d03   Jens Axboe   io-wq: small thre...
888
  	bool found = false;
771b53d03   Jens Axboe   io-wq: small thre...
889
890
891
892
893
  	/*
  	 * First check pending list, if we're lucky we can just remove it
  	 * from there. CANCEL_OK means that the work is returned as-new,
  	 * no completion will be posted for it.
  	 */
6f72653e7   Jens Axboe   io-wq: use proper...
894
  	spin_lock_irqsave(&wqe->lock, flags);
6206f0e18   Jens Axboe   io-wq: shrink io_...
895
896
  	wq_list_for_each(node, prev, &wqe->work_list) {
  		work = container_of(node, struct io_wq_work, list);
00bcda13d   Jens Axboe   io-wq: make io_wq...
897
  		if (match->fn(work, match->data)) {
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
898
  			wq_list_del(&wqe->work_list, node, prev);
771b53d03   Jens Axboe   io-wq: small thre...
899
900
901
902
  			found = true;
  			break;
  		}
  	}
6f72653e7   Jens Axboe   io-wq: use proper...
903
  	spin_unlock_irqrestore(&wqe->lock, flags);
771b53d03   Jens Axboe   io-wq: small thre...
904
905
  
  	if (found) {
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
906
  		io_run_cancel(work, wqe);
771b53d03   Jens Axboe   io-wq: small thre...
907
908
909
910
911
912
  		return IO_WQ_CANCEL_OK;
  	}
  
  	/*
  	 * Now check if a free (going busy) or busy worker has the work
  	 * currently running. If we find it there, we'll return CANCEL_RUNNING
d195a66e3   Brian Gianforcaro   io_uring: fix sta...
913
  	 * as an indication that we attempt to signal cancellation. The
771b53d03   Jens Axboe   io-wq: small thre...
914
915
916
  	 * completion will run normally in this case.
  	 */
  	rcu_read_lock();
00bcda13d   Jens Axboe   io-wq: make io_wq...
917
  	found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
771b53d03   Jens Axboe   io-wq: small thre...
918
919
920
  	rcu_read_unlock();
  	return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
  }
2293b4195   Pavel Begunkov   io-wq: remove dup...
921
922
  enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
  				  void *data)
771b53d03   Jens Axboe   io-wq: small thre...
923
  {
2293b4195   Pavel Begunkov   io-wq: remove dup...
924
925
926
  	struct io_cb_cancel_data match = {
  		.fn	= cancel,
  		.data	= data,
00bcda13d   Jens Axboe   io-wq: make io_wq...
927
  	};
771b53d03   Jens Axboe   io-wq: small thre...
928
  	enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
3fc50ab55   Jann Horn   io-wq: fix handli...
929
  	int node;
771b53d03   Jens Axboe   io-wq: small thre...
930

3fc50ab55   Jann Horn   io-wq: fix handli...
931
932
  	for_each_node(node) {
  		struct io_wqe *wqe = wq->wqes[node];
771b53d03   Jens Axboe   io-wq: small thre...
933

00bcda13d   Jens Axboe   io-wq: make io_wq...
934
  		ret = io_wqe_cancel_work(wqe, &match);
771b53d03   Jens Axboe   io-wq: small thre...
935
936
937
938
939
940
  		if (ret != IO_WQ_CANCEL_NOTFOUND)
  			break;
  	}
  
  	return ret;
  }
2293b4195   Pavel Begunkov   io-wq: remove dup...
941
942
943
944
945
946
947
948
949
  static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data)
  {
  	return work == data;
  }
  
  enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
  {
  	return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork);
  }
36282881a   Jens Axboe   io-wq: add io_wq_...
950
951
952
  static bool io_wq_pid_match(struct io_wq_work *work, void *data)
  {
  	pid_t pid = (pid_t) (unsigned long) data;
2293b4195   Pavel Begunkov   io-wq: remove dup...
953
  	return work->task_pid == pid;
36282881a   Jens Axboe   io-wq: add io_wq_...
954
955
956
957
  }
  
  enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid)
  {
2293b4195   Pavel Begunkov   io-wq: remove dup...
958
  	void *data = (void *) (unsigned long) pid;
36282881a   Jens Axboe   io-wq: add io_wq_...
959

2293b4195   Pavel Begunkov   io-wq: remove dup...
960
  	return io_wq_cancel_cb(wq, io_wq_pid_match, data);
36282881a   Jens Axboe   io-wq: add io_wq_...
961
  }
576a347b7   Jens Axboe   io-wq: have io_wq...
962
  struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
771b53d03   Jens Axboe   io-wq: small thre...
963
  {
3fc50ab55   Jann Horn   io-wq: fix handli...
964
  	int ret = -ENOMEM, node;
771b53d03   Jens Axboe   io-wq: small thre...
965
  	struct io_wq *wq;
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
966
967
  	if (WARN_ON_ONCE(!data->free_work))
  		return ERR_PTR(-EINVAL);
ad6e005ca   Jann Horn   io_uring: use kza...
968
  	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
771b53d03   Jens Axboe   io-wq: small thre...
969
970
  	if (!wq)
  		return ERR_PTR(-ENOMEM);
3fc50ab55   Jann Horn   io-wq: fix handli...
971
  	wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
771b53d03   Jens Axboe   io-wq: small thre...
972
973
974
975
  	if (!wq->wqes) {
  		kfree(wq);
  		return ERR_PTR(-ENOMEM);
  	}
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
976
  	wq->free_work = data->free_work;
7d7230652   Jens Axboe   io_wq: add get/pu...
977

c5def4ab8   Jens Axboe   io-wq: add suppor...
978
  	/* caller must already hold a reference to this */
576a347b7   Jens Axboe   io-wq: have io_wq...
979
  	wq->user = data->user;
c5def4ab8   Jens Axboe   io-wq: add suppor...
980

3fc50ab55   Jann Horn   io-wq: fix handli...
981
  	for_each_node(node) {
771b53d03   Jens Axboe   io-wq: small thre...
982
  		struct io_wqe *wqe;
7563439ad   Jens Axboe   io-wq: don't call...
983
  		int alloc_node = node;
771b53d03   Jens Axboe   io-wq: small thre...
984

7563439ad   Jens Axboe   io-wq: don't call...
985
986
987
  		if (!node_online(alloc_node))
  			alloc_node = NUMA_NO_NODE;
  		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
771b53d03   Jens Axboe   io-wq: small thre...
988
  		if (!wqe)
3fc50ab55   Jann Horn   io-wq: fix handli...
989
990
  			goto err;
  		wq->wqes[node] = wqe;
7563439ad   Jens Axboe   io-wq: don't call...
991
  		wqe->node = alloc_node;
c5def4ab8   Jens Axboe   io-wq: add suppor...
992
993
  		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
  		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
576a347b7   Jens Axboe   io-wq: have io_wq...
994
  		if (wq->user) {
c5def4ab8   Jens Axboe   io-wq: add suppor...
995
996
997
998
  			wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
  					task_rlimit(current, RLIMIT_NPROC);
  		}
  		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
771b53d03   Jens Axboe   io-wq: small thre...
999
1000
  		wqe->wq = wq;
  		spin_lock_init(&wqe->lock);
6206f0e18   Jens Axboe   io-wq: shrink io_...
1001
  		INIT_WQ_LIST(&wqe->work_list);
021d1cdda   Jens Axboe   io-wq: remove now...
1002
  		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
e61df66c6   Jens Axboe   io-wq: ensure fre...
1003
  		INIT_LIST_HEAD(&wqe->all_list);
771b53d03   Jens Axboe   io-wq: small thre...
1004
1005
1006
  	}
  
  	init_completion(&wq->done);
771b53d03   Jens Axboe   io-wq: small thre...
1007
1008
1009
  	wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
  	if (!IS_ERR(wq->manager)) {
  		wake_up_process(wq->manager);
b60fda600   Jens Axboe   io-wq: wait for i...
1010
1011
1012
1013
1014
  		wait_for_completion(&wq->done);
  		if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
  			ret = -ENOMEM;
  			goto err;
  		}
848f7e188   Jens Axboe   io-wq: make the i...
1015
  		refcount_set(&wq->use_refs, 1);
b60fda600   Jens Axboe   io-wq: wait for i...
1016
  		reinit_completion(&wq->done);
771b53d03   Jens Axboe   io-wq: small thre...
1017
1018
1019
1020
  		return wq;
  	}
  
  	ret = PTR_ERR(wq->manager);
771b53d03   Jens Axboe   io-wq: small thre...
1021
  	complete(&wq->done);
b60fda600   Jens Axboe   io-wq: wait for i...
1022
  err:
3fc50ab55   Jann Horn   io-wq: fix handli...
1023
1024
  	for_each_node(node)
  		kfree(wq->wqes[node]);
b60fda600   Jens Axboe   io-wq: wait for i...
1025
1026
  	kfree(wq->wqes);
  	kfree(wq);
771b53d03   Jens Axboe   io-wq: small thre...
1027
1028
  	return ERR_PTR(ret);
  }
eba6f5a33   Pavel Begunkov   io-wq: allow grab...
1029
1030
  bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
  {
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
1031
  	if (data->free_work != wq->free_work)
eba6f5a33   Pavel Begunkov   io-wq: allow grab...
1032
1033
1034
1035
  		return false;
  
  	return refcount_inc_not_zero(&wq->use_refs);
  }
771b53d03   Jens Axboe   io-wq: small thre...
1036
1037
1038
1039
1040
  static bool io_wq_worker_wake(struct io_worker *worker, void *data)
  {
  	wake_up_process(worker->task);
  	return false;
  }
848f7e188   Jens Axboe   io-wq: make the i...
1041
  static void __io_wq_destroy(struct io_wq *wq)
771b53d03   Jens Axboe   io-wq: small thre...
1042
  {
3fc50ab55   Jann Horn   io-wq: fix handli...
1043
  	int node;
771b53d03   Jens Axboe   io-wq: small thre...
1044

b60fda600   Jens Axboe   io-wq: wait for i...
1045
1046
  	set_bit(IO_WQ_BIT_EXIT, &wq->state);
  	if (wq->manager)
771b53d03   Jens Axboe   io-wq: small thre...
1047
  		kthread_stop(wq->manager);
771b53d03   Jens Axboe   io-wq: small thre...
1048
1049
  
  	rcu_read_lock();
3fc50ab55   Jann Horn   io-wq: fix handli...
1050
1051
  	for_each_node(node)
  		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
771b53d03   Jens Axboe   io-wq: small thre...
1052
1053
1054
  	rcu_read_unlock();
  
  	wait_for_completion(&wq->done);
3fc50ab55   Jann Horn   io-wq: fix handli...
1055
1056
  	for_each_node(node)
  		kfree(wq->wqes[node]);
771b53d03   Jens Axboe   io-wq: small thre...
1057
1058
1059
  	kfree(wq->wqes);
  	kfree(wq);
  }
848f7e188   Jens Axboe   io-wq: make the i...
1060
1061
1062
1063
1064
1065
  
  void io_wq_destroy(struct io_wq *wq)
  {
  	if (refcount_dec_and_test(&wq->use_refs))
  		__io_wq_destroy(wq);
  }