Blame view

fs/io-wq.c 28.3 KB
771b53d03   Jens Axboe   io-wq: small thre...
1
2
3
4
5
6
7
8
9
10
11
12
  // SPDX-License-Identifier: GPL-2.0
  /*
   * Basic worker thread pool for io_uring
   *
   * Copyright (C) 2019 Jens Axboe
   *
   */
  #include <linux/kernel.h>
  #include <linux/init.h>
  #include <linux/errno.h>
  #include <linux/sched/signal.h>
  #include <linux/mm.h>
771b53d03   Jens Axboe   io-wq: small thre...
13
14
15
16
17
  #include <linux/sched/mm.h>
  #include <linux/percpu.h>
  #include <linux/slab.h>
  #include <linux/kthread.h>
  #include <linux/rculist_nulls.h>
9392a27d8   Jens Axboe   io-wq: add suppor...
18
  #include <linux/fs_struct.h>
aa96bf8a9   Jens Axboe   io_uring: use io-...
19
  #include <linux/task_work.h>
91d8f5191   Dennis Zhou   io_uring: add blk...
20
  #include <linux/blk-cgroup.h>
771b53d03   Jens Axboe   io-wq: small thre...
21
22
23
24
25
26
27
28
29
  
  #include "io-wq.h"
  
  #define WORKER_IDLE_TIMEOUT	(5 * HZ)
  
  enum {
  	IO_WORKER_F_UP		= 1,	/* up and active */
  	IO_WORKER_F_RUNNING	= 2,	/* account as running */
  	IO_WORKER_F_FREE	= 4,	/* worker on free list */
145cc8c66   Jens Axboe   io-wq: kill unuse...
30
31
  	IO_WORKER_F_FIXED	= 8,	/* static idle worker */
  	IO_WORKER_F_BOUND	= 16,	/* is doing bounded work */
771b53d03   Jens Axboe   io-wq: small thre...
32
33
34
35
36
  };
  
  enum {
  	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
  	IO_WQ_BIT_CANCEL	= 1,	/* cancel work on list */
b60fda600   Jens Axboe   io-wq: wait for i...
37
  	IO_WQ_BIT_ERROR		= 2,	/* error on setup */
771b53d03   Jens Axboe   io-wq: small thre...
38
39
40
41
42
43
44
45
46
47
48
49
50
  };
  
  enum {
  	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
  };
  
  /*
   * One for each thread in a wqe pool
   */
  struct io_worker {
  	refcount_t ref;
  	unsigned flags;
  	struct hlist_nulls_node nulls_node;
e61df66c6   Jens Axboe   io-wq: ensure fre...
51
  	struct list_head all_list;
771b53d03   Jens Axboe   io-wq: small thre...
52
  	struct task_struct *task;
771b53d03   Jens Axboe   io-wq: small thre...
53
  	struct io_wqe *wqe;
36c2f9223   Jens Axboe   io-wq: ensure we ...
54

771b53d03   Jens Axboe   io-wq: small thre...
55
  	struct io_wq_work *cur_work;
36c2f9223   Jens Axboe   io-wq: ensure we ...
56
  	spinlock_t lock;
771b53d03   Jens Axboe   io-wq: small thre...
57
58
59
  
  	struct rcu_head rcu;
  	struct mm_struct *mm;
91d8f5191   Dennis Zhou   io_uring: add blk...
60
61
62
  #ifdef CONFIG_BLK_CGROUP
  	struct cgroup_subsys_state *blkcg_css;
  #endif
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
63
64
  	const struct cred *cur_creds;
  	const struct cred *saved_creds;
fcb323cc5   Jens Axboe   io_uring: io_urin...
65
  	struct files_struct *restore_files;
9b8284921   Jens Axboe   io_uring: referen...
66
  	struct nsproxy *restore_nsproxy;
9392a27d8   Jens Axboe   io-wq: add suppor...
67
  	struct fs_struct *restore_fs;
771b53d03   Jens Axboe   io-wq: small thre...
68
  };
771b53d03   Jens Axboe   io-wq: small thre...
69
70
71
72
73
  #if BITS_PER_LONG == 64
  #define IO_WQ_HASH_ORDER	6
  #else
  #define IO_WQ_HASH_ORDER	5
  #endif
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
74
  #define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)
c5def4ab8   Jens Axboe   io-wq: add suppor...
75
76
77
78
79
80
81
82
83
84
  struct io_wqe_acct {
  	unsigned nr_workers;
  	unsigned max_workers;
  	atomic_t nr_running;
  };
  
  enum {
  	IO_WQ_ACCT_BOUND,
  	IO_WQ_ACCT_UNBOUND,
  };
771b53d03   Jens Axboe   io-wq: small thre...
85
86
87
88
89
  /*
   * Per-node worker thread pool
   */
  struct io_wqe {
  	struct {
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
90
  		raw_spinlock_t lock;
6206f0e18   Jens Axboe   io-wq: shrink io_...
91
  		struct io_wq_work_list work_list;
771b53d03   Jens Axboe   io-wq: small thre...
92
93
94
95
96
  		unsigned long hash_map;
  		unsigned flags;
  	} ____cacheline_aligned_in_smp;
  
  	int node;
c5def4ab8   Jens Axboe   io-wq: add suppor...
97
  	struct io_wqe_acct acct[2];
771b53d03   Jens Axboe   io-wq: small thre...
98

021d1cdda   Jens Axboe   io-wq: remove now...
99
  	struct hlist_nulls_head free_list;
e61df66c6   Jens Axboe   io-wq: ensure fre...
100
  	struct list_head all_list;
771b53d03   Jens Axboe   io-wq: small thre...
101
102
  
  	struct io_wq *wq;
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
103
  	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
771b53d03   Jens Axboe   io-wq: small thre...
104
105
106
107
108
109
110
111
  };
  
  /*
   * Per io_wq state
    */
  struct io_wq {
  	struct io_wqe **wqes;
  	unsigned long state;
771b53d03   Jens Axboe   io-wq: small thre...
112

e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
113
  	free_work_fn *free_work;
f5fa38c59   Pavel Begunkov   io_wq: add per-wq...
114
  	io_wq_work_fn *do_work;
7d7230652   Jens Axboe   io_wq: add get/pu...
115

771b53d03   Jens Axboe   io-wq: small thre...
116
  	struct task_struct *manager;
c5def4ab8   Jens Axboe   io-wq: add suppor...
117
  	struct user_struct *user;
771b53d03   Jens Axboe   io-wq: small thre...
118
119
  	refcount_t refs;
  	struct completion done;
848f7e188   Jens Axboe   io-wq: make the i...
120
121
  
  	refcount_t use_refs;
771b53d03   Jens Axboe   io-wq: small thre...
122
  };
771b53d03   Jens Axboe   io-wq: small thre...
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
  static bool io_worker_get(struct io_worker *worker)
  {
  	return refcount_inc_not_zero(&worker->ref);
  }
  
  static void io_worker_release(struct io_worker *worker)
  {
  	if (refcount_dec_and_test(&worker->ref))
  		wake_up_process(worker->task);
  }
  
  /*
   * Note: drops the wqe->lock if returning true! The caller must re-acquire
   * the lock in that case. Some callers need to restart handling if this
   * happens, so we can't just re-acquire the lock on behalf of the caller.
   */
  static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
  {
fcb323cc5   Jens Axboe   io_uring: io_urin...
141
  	bool dropped_lock = false;
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
142
143
144
  	if (worker->saved_creds) {
  		revert_creds(worker->saved_creds);
  		worker->cur_creds = worker->saved_creds = NULL;
181e448d8   Jens Axboe   io_uring: async w...
145
  	}
fcb323cc5   Jens Axboe   io_uring: io_urin...
146
147
  	if (current->files != worker->restore_files) {
  		__acquire(&wqe->lock);
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
148
  		raw_spin_unlock_irq(&wqe->lock);
fcb323cc5   Jens Axboe   io_uring: io_urin...
149
150
151
152
  		dropped_lock = true;
  
  		task_lock(current);
  		current->files = worker->restore_files;
9b8284921   Jens Axboe   io_uring: referen...
153
  		current->nsproxy = worker->restore_nsproxy;
fcb323cc5   Jens Axboe   io_uring: io_urin...
154
155
  		task_unlock(current);
  	}
9392a27d8   Jens Axboe   io-wq: add suppor...
156
157
  	if (current->fs != worker->restore_fs)
  		current->fs = worker->restore_fs;
771b53d03   Jens Axboe   io-wq: small thre...
158
159
160
161
162
  	/*
  	 * If we have an active mm, we need to drop the wq lock before unusing
  	 * it. If we do, return true and let the caller retry the idle loop.
  	 */
  	if (worker->mm) {
fcb323cc5   Jens Axboe   io_uring: io_urin...
163
164
  		if (!dropped_lock) {
  			__acquire(&wqe->lock);
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
165
  			raw_spin_unlock_irq(&wqe->lock);
fcb323cc5   Jens Axboe   io_uring: io_urin...
166
167
  			dropped_lock = true;
  		}
771b53d03   Jens Axboe   io-wq: small thre...
168
  		__set_current_state(TASK_RUNNING);
f5678e7f2   Christoph Hellwig   kernel: better do...
169
  		kthread_unuse_mm(worker->mm);
771b53d03   Jens Axboe   io-wq: small thre...
170
171
  		mmput(worker->mm);
  		worker->mm = NULL;
771b53d03   Jens Axboe   io-wq: small thre...
172
  	}
91d8f5191   Dennis Zhou   io_uring: add blk...
173
174
175
176
177
178
  #ifdef CONFIG_BLK_CGROUP
  	if (worker->blkcg_css) {
  		kthread_associate_blkcg(NULL);
  		worker->blkcg_css = NULL;
  	}
  #endif
fcb323cc5   Jens Axboe   io_uring: io_urin...
179
  	return dropped_lock;
771b53d03   Jens Axboe   io-wq: small thre...
180
  }
c5def4ab8   Jens Axboe   io-wq: add suppor...
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
  static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
  						   struct io_wq_work *work)
  {
  	if (work->flags & IO_WQ_WORK_UNBOUND)
  		return &wqe->acct[IO_WQ_ACCT_UNBOUND];
  
  	return &wqe->acct[IO_WQ_ACCT_BOUND];
  }
  
  static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
  						  struct io_worker *worker)
  {
  	if (worker->flags & IO_WORKER_F_BOUND)
  		return &wqe->acct[IO_WQ_ACCT_BOUND];
  
  	return &wqe->acct[IO_WQ_ACCT_UNBOUND];
  }
771b53d03   Jens Axboe   io-wq: small thre...
198
199
200
  static void io_worker_exit(struct io_worker *worker)
  {
  	struct io_wqe *wqe = worker->wqe;
c5def4ab8   Jens Axboe   io-wq: add suppor...
201
  	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
771b53d03   Jens Axboe   io-wq: small thre...
202
203
204
205
206
207
208
209
210
211
212
213
214
  
  	/*
  	 * If we're not at zero, someone else is holding a brief reference
  	 * to the worker. Wait for that to go away.
  	 */
  	set_current_state(TASK_INTERRUPTIBLE);
  	if (!refcount_dec_and_test(&worker->ref))
  		schedule();
  	__set_current_state(TASK_RUNNING);
  
  	preempt_disable();
  	current->flags &= ~PF_IO_WORKER;
  	if (worker->flags & IO_WORKER_F_RUNNING)
c5def4ab8   Jens Axboe   io-wq: add suppor...
215
216
217
  		atomic_dec(&acct->nr_running);
  	if (!(worker->flags & IO_WORKER_F_BOUND))
  		atomic_dec(&wqe->wq->user->processes);
771b53d03   Jens Axboe   io-wq: small thre...
218
219
  	worker->flags = 0;
  	preempt_enable();
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
220
  	raw_spin_lock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
221
  	hlist_nulls_del_rcu(&worker->nulls_node);
e61df66c6   Jens Axboe   io-wq: ensure fre...
222
  	list_del_rcu(&worker->all_list);
771b53d03   Jens Axboe   io-wq: small thre...
223
224
  	if (__io_worker_unuse(wqe, worker)) {
  		__release(&wqe->lock);
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
225
  		raw_spin_lock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
226
  	}
c5def4ab8   Jens Axboe   io-wq: add suppor...
227
  	acct->nr_workers--;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
228
  	raw_spin_unlock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
229

364b05fd0   YueHaibing   io-wq: use kfree_...
230
  	kfree_rcu(worker, rcu);
c4068bf89   Hillf Danton   io-wq: fix use-af...
231
232
  	if (refcount_dec_and_test(&wqe->wq->refs))
  		complete(&wqe->wq->done);
771b53d03   Jens Axboe   io-wq: small thre...
233
  }
c5def4ab8   Jens Axboe   io-wq: add suppor...
234
235
236
  static inline bool io_wqe_run_queue(struct io_wqe *wqe)
  	__must_hold(wqe->lock)
  {
6206f0e18   Jens Axboe   io-wq: shrink io_...
237
238
  	if (!wq_list_empty(&wqe->work_list) &&
  	    !(wqe->flags & IO_WQE_FLAG_STALLED))
c5def4ab8   Jens Axboe   io-wq: add suppor...
239
240
241
242
243
244
245
246
247
248
249
250
251
  		return true;
  	return false;
  }
  
  /*
   * Check head of free list for an available worker. If one isn't available,
   * caller must wake up the wq manager to create one.
   */
  static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
  	__must_hold(RCU)
  {
  	struct hlist_nulls_node *n;
  	struct io_worker *worker;
021d1cdda   Jens Axboe   io-wq: remove now...
252
  	n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
c5def4ab8   Jens Axboe   io-wq: add suppor...
253
254
255
256
257
  	if (is_a_nulls(n))
  		return false;
  
  	worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
  	if (io_worker_get(worker)) {
506d95ff5   Jens Axboe   io-wq: remove wor...
258
  		wake_up_process(worker->task);
c5def4ab8   Jens Axboe   io-wq: add suppor...
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
  		io_worker_release(worker);
  		return true;
  	}
  
  	return false;
  }
  
  /*
   * We need a worker. If we find a free one, we're good. If not, and we're
   * below the max number of workers, wake up the manager to create one.
   */
  static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
  {
  	bool ret;
  
  	/*
  	 * Most likely an attempt to queue unbounded work on an io_wq that
  	 * wasn't setup with any unbounded workers.
  	 */
  	WARN_ON_ONCE(!acct->max_workers);
  
  	rcu_read_lock();
  	ret = io_wqe_activate_free_worker(wqe);
  	rcu_read_unlock();
  
  	if (!ret && acct->nr_workers < acct->max_workers)
  		wake_up_process(wqe->wq->manager);
  }
  
  static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
  {
  	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
  
  	atomic_inc(&acct->nr_running);
  }
  
  static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
  	__must_hold(wqe->lock)
  {
  	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
  
  	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
  		io_wqe_wake_worker(wqe, acct);
  }
771b53d03   Jens Axboe   io-wq: small thre...
303
304
305
306
307
308
309
  static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
  {
  	allow_kernel_signal(SIGINT);
  
  	current->flags |= PF_IO_WORKER;
  
  	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
fcb323cc5   Jens Axboe   io_uring: io_urin...
310
  	worker->restore_files = current->files;
9b8284921   Jens Axboe   io_uring: referen...
311
  	worker->restore_nsproxy = current->nsproxy;
9392a27d8   Jens Axboe   io-wq: add suppor...
312
  	worker->restore_fs = current->fs;
c5def4ab8   Jens Axboe   io-wq: add suppor...
313
  	io_wqe_inc_running(wqe, worker);
771b53d03   Jens Axboe   io-wq: small thre...
314
315
316
317
318
319
320
321
322
323
  }
  
  /*
   * Worker will start processing some work. Move it to the busy list, if
   * it's currently on the freelist
   */
  static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
  			     struct io_wq_work *work)
  	__must_hold(wqe->lock)
  {
c5def4ab8   Jens Axboe   io-wq: add suppor...
324
  	bool worker_bound, work_bound;
771b53d03   Jens Axboe   io-wq: small thre...
325
326
327
  	if (worker->flags & IO_WORKER_F_FREE) {
  		worker->flags &= ~IO_WORKER_F_FREE;
  		hlist_nulls_del_init_rcu(&worker->nulls_node);
771b53d03   Jens Axboe   io-wq: small thre...
328
  	}
c5def4ab8   Jens Axboe   io-wq: add suppor...
329
330
331
332
333
  
  	/*
  	 * If worker is moving from bound to unbound (or vice versa), then
  	 * ensure we update the running accounting.
  	 */
b2e9c7d64   Dan Carpenter   io-wq: remove ext...
334
335
336
  	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
  	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
  	if (worker_bound != work_bound) {
c5def4ab8   Jens Axboe   io-wq: add suppor...
337
338
339
340
341
342
343
344
345
346
347
348
349
350
  		io_wqe_dec_running(wqe, worker);
  		if (work_bound) {
  			worker->flags |= IO_WORKER_F_BOUND;
  			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
  			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
  			atomic_dec(&wqe->wq->user->processes);
  		} else {
  			worker->flags &= ~IO_WORKER_F_BOUND;
  			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
  			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
  			atomic_inc(&wqe->wq->user->processes);
  		}
  		io_wqe_inc_running(wqe, worker);
  	 }
771b53d03   Jens Axboe   io-wq: small thre...
351
352
353
354
355
356
357
358
359
360
361
362
363
364
  }
  
  /*
   * No work, worker going to sleep. Move to freelist, and unuse mm if we
   * have one attached. Dropping the mm may potentially sleep, so we drop
   * the lock in that case and return success. Since the caller has to
   * retry the loop in that case (we changed task state), we don't regrab
   * the lock if we return success.
   */
  static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
  	__must_hold(wqe->lock)
  {
  	if (!(worker->flags & IO_WORKER_F_FREE)) {
  		worker->flags |= IO_WORKER_F_FREE;
021d1cdda   Jens Axboe   io-wq: remove now...
365
  		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
771b53d03   Jens Axboe   io-wq: small thre...
366
367
368
369
  	}
  
  	return __io_worker_unuse(wqe, worker);
  }
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
370
371
372
373
374
375
  static inline unsigned int io_get_work_hash(struct io_wq_work *work)
  {
  	return work->flags >> IO_WQ_HASH_SHIFT;
  }
  
  static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
771b53d03   Jens Axboe   io-wq: small thre...
376
377
  	__must_hold(wqe->lock)
  {
6206f0e18   Jens Axboe   io-wq: shrink io_...
378
  	struct io_wq_work_node *node, *prev;
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
379
  	struct io_wq_work *work, *tail;
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
380
  	unsigned int hash;
771b53d03   Jens Axboe   io-wq: small thre...
381

6206f0e18   Jens Axboe   io-wq: shrink io_...
382
383
  	wq_list_for_each(node, prev, &wqe->work_list) {
  		work = container_of(node, struct io_wq_work, list);
771b53d03   Jens Axboe   io-wq: small thre...
384
  		/* not hashed, can run anytime */
8766dd516   Pavel Begunkov   io-wq: split hash...
385
  		if (!io_wq_is_hashed(work)) {
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
386
  			wq_list_del(&wqe->work_list, node, prev);
771b53d03   Jens Axboe   io-wq: small thre...
387
388
389
390
  			return work;
  		}
  
  		/* hashed, can run if not already running */
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
391
392
393
  		hash = io_get_work_hash(work);
  		if (!(wqe->hash_map & BIT(hash))) {
  			wqe->hash_map |= BIT(hash);
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
394
395
396
397
  			/* all items with this hash lie in [work, tail] */
  			tail = wqe->hash_tail[hash];
  			wqe->hash_tail[hash] = NULL;
  			wq_list_cut(&wqe->work_list, &tail->list, prev);
771b53d03   Jens Axboe   io-wq: small thre...
398
399
400
401
402
403
  			return work;
  		}
  	}
  
  	return NULL;
  }
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
404
405
406
  static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
  {
  	if (worker->mm) {
f5678e7f2   Christoph Hellwig   kernel: better do...
407
  		kthread_unuse_mm(worker->mm);
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
408
409
410
  		mmput(worker->mm);
  		worker->mm = NULL;
  	}
37c54f9bd   Christoph Hellwig   kernel: set USER_...
411
  	if (!work->mm)
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
412
  		return;
37c54f9bd   Christoph Hellwig   kernel: set USER_...
413

cccf0ee83   Jens Axboe   io_uring/io-wq: d...
414
  	if (mmget_not_zero(work->mm)) {
f5678e7f2   Christoph Hellwig   kernel: better do...
415
  		kthread_use_mm(work->mm);
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
416
417
418
419
420
421
422
423
424
  		worker->mm = work->mm;
  		/* hang on to this mm */
  		work->mm = NULL;
  		return;
  	}
  
  	/* failed grabbing mm, ensure work gets cancelled */
  	work->flags |= IO_WQ_WORK_CANCEL;
  }
91d8f5191   Dennis Zhou   io_uring: add blk...
425
426
427
428
429
430
431
432
433
434
  static inline void io_wq_switch_blkcg(struct io_worker *worker,
  				      struct io_wq_work *work)
  {
  #ifdef CONFIG_BLK_CGROUP
  	if (work->blkcg_css != worker->blkcg_css) {
  		kthread_associate_blkcg(work->blkcg_css);
  		worker->blkcg_css = work->blkcg_css;
  	}
  #endif
  }
cccf0ee83   Jens Axboe   io_uring/io-wq: d...
435
436
437
438
439
440
441
442
443
444
445
  static void io_wq_switch_creds(struct io_worker *worker,
  			       struct io_wq_work *work)
  {
  	const struct cred *old_creds = override_creds(work->creds);
  
  	worker->cur_creds = work->creds;
  	if (worker->saved_creds)
  		put_cred(old_creds); /* creds set by previous switch */
  	else
  		worker->saved_creds = old_creds;
  }
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
446
447
448
449
450
451
  static void io_impersonate_work(struct io_worker *worker,
  				struct io_wq_work *work)
  {
  	if (work->files && current->files != work->files) {
  		task_lock(current);
  		current->files = work->files;
9b8284921   Jens Axboe   io_uring: referen...
452
  		current->nsproxy = work->nsproxy;
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
453
454
455
456
457
458
459
460
  		task_unlock(current);
  	}
  	if (work->fs && current->fs != work->fs)
  		current->fs = work->fs;
  	if (work->mm != worker->mm)
  		io_wq_switch_mm(worker, work);
  	if (worker->cur_creds != work->creds)
  		io_wq_switch_creds(worker, work);
57f1a6495   Pavel Begunkov   io_uring/io-wq: m...
461
  	current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->fsize;
91d8f5191   Dennis Zhou   io_uring: add blk...
462
  	io_wq_switch_blkcg(worker, work);
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
463
464
465
466
467
  }
  
  static void io_assign_current_work(struct io_worker *worker,
  				   struct io_wq_work *work)
  {
d78298e73   Pavel Begunkov   io-wq: don't resc...
468
469
470
471
472
473
  	if (work) {
  		/* flush pending signals before assigning new work */
  		if (signal_pending(current))
  			flush_signals(current);
  		cond_resched();
  	}
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
474
475
476
477
478
  
  	spin_lock_irq(&worker->lock);
  	worker->cur_work = work;
  	spin_unlock_irq(&worker->lock);
  }
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
479
  static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
771b53d03   Jens Axboe   io-wq: small thre...
480
481
482
  static void io_worker_handle_work(struct io_worker *worker)
  	__releases(wqe->lock)
  {
771b53d03   Jens Axboe   io-wq: small thre...
483
484
485
486
  	struct io_wqe *wqe = worker->wqe;
  	struct io_wq *wq = wqe->wq;
  
  	do {
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
487
  		struct io_wq_work *work;
f462fd36f   Pavel Begunkov   io-wq: optimise o...
488
  get_next:
771b53d03   Jens Axboe   io-wq: small thre...
489
  		/*
771b53d03   Jens Axboe   io-wq: small thre...
490
491
492
493
494
495
  		 * If we got some work, mark us as busy. If we didn't, but
  		 * the list isn't empty, it means we stalled on hashed work.
  		 * Mark us stalled so we don't keep looking for work when we
  		 * can't make progress, any work completion or insertion will
  		 * clear the stalled flag.
  		 */
60cf46ae6   Pavel Begunkov   io-wq: hash depen...
496
  		work = io_get_next_work(wqe);
771b53d03   Jens Axboe   io-wq: small thre...
497
498
  		if (work)
  			__io_worker_busy(wqe, worker, work);
6206f0e18   Jens Axboe   io-wq: shrink io_...
499
  		else if (!wq_list_empty(&wqe->work_list))
771b53d03   Jens Axboe   io-wq: small thre...
500
  			wqe->flags |= IO_WQE_FLAG_STALLED;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
501
  		raw_spin_unlock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
502
503
  		if (!work)
  			break;
58e393198   Pavel Begunkov   io-wq: optimise l...
504
  		io_assign_current_work(worker, work);
36c2f9223   Jens Axboe   io-wq: ensure we ...
505

dc026a73c   Pavel Begunkov   io-wq: shuffle io...
506
507
  		/* handle a whole dependent link */
  		do {
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
508
  			struct io_wq_work *old_work, *next_hashed, *linked;
b089ed390   Pavel Begunkov   io-wq: update has...
509
  			unsigned int hash = io_get_work_hash(work);
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
510

86f3cd1b5   Pavel Begunkov   io-wq: handle has...
511
  			next_hashed = wq_next_work(work);
58e393198   Pavel Begunkov   io-wq: optimise l...
512
  			io_impersonate_work(worker, work);
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
513
514
515
516
517
518
  			/*
  			 * OK to set IO_WQ_WORK_CANCEL even for uncancellable
  			 * work, the worker function will do the right thing.
  			 */
  			if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
  				work->flags |= IO_WQ_WORK_CANCEL;
f4db7182e   Pavel Begunkov   io-wq: return nex...
519
520
  			old_work = work;
  			linked = wq->do_work(work);
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
521
522
523
524
525
526
527
  
  			work = next_hashed;
  			if (!work && linked && !io_wq_is_hashed(linked)) {
  				work = linked;
  				linked = NULL;
  			}
  			io_assign_current_work(worker, work);
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
528
  			wq->free_work(old_work);
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
529

86f3cd1b5   Pavel Begunkov   io-wq: handle has...
530
531
532
533
  			if (linked)
  				io_wqe_enqueue(wqe, linked);
  
  			if (hash != -1U && !next_hashed) {
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
534
  				raw_spin_lock_irq(&wqe->lock);
dc026a73c   Pavel Begunkov   io-wq: shuffle io...
535
536
  				wqe->hash_map &= ~BIT_ULL(hash);
  				wqe->flags &= ~IO_WQE_FLAG_STALLED;
f462fd36f   Pavel Begunkov   io-wq: optimise o...
537
538
539
  				/* skip unnecessary unlock-lock wqe->lock */
  				if (!work)
  					goto get_next;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
540
  				raw_spin_unlock_irq(&wqe->lock);
7d7230652   Jens Axboe   io_wq: add get/pu...
541
  			}
58e393198   Pavel Begunkov   io-wq: optimise l...
542
  		} while (work);
7d7230652   Jens Axboe   io_wq: add get/pu...
543

95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
544
  		raw_spin_lock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
545
546
  	} while (1);
  }
771b53d03   Jens Axboe   io-wq: small thre...
547
548
549
550
551
  static int io_wqe_worker(void *data)
  {
  	struct io_worker *worker = data;
  	struct io_wqe *wqe = worker->wqe;
  	struct io_wq *wq = wqe->wq;
771b53d03   Jens Axboe   io-wq: small thre...
552
553
554
555
  
  	io_worker_start(wqe, worker);
  
  	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
506d95ff5   Jens Axboe   io-wq: remove wor...
556
  		set_current_state(TASK_INTERRUPTIBLE);
e995d5123   Jens Axboe   io-wq: briefly sp...
557
  loop:
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
558
  		raw_spin_lock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
559
560
561
  		if (io_wqe_run_queue(wqe)) {
  			__set_current_state(TASK_RUNNING);
  			io_worker_handle_work(worker);
e995d5123   Jens Axboe   io-wq: briefly sp...
562
  			goto loop;
771b53d03   Jens Axboe   io-wq: small thre...
563
564
565
566
  		}
  		/* drops the lock on success, retry */
  		if (__io_worker_idle(wqe, worker)) {
  			__release(&wqe->lock);
e995d5123   Jens Axboe   io-wq: briefly sp...
567
  			goto loop;
771b53d03   Jens Axboe   io-wq: small thre...
568
  		}
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
569
  		raw_spin_unlock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
570
571
572
573
574
575
576
577
578
  		if (signal_pending(current))
  			flush_signals(current);
  		if (schedule_timeout(WORKER_IDLE_TIMEOUT))
  			continue;
  		/* timed out, exit unless we're the fixed worker */
  		if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
  		    !(worker->flags & IO_WORKER_F_FIXED))
  			break;
  	}
771b53d03   Jens Axboe   io-wq: small thre...
579
  	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
580
  		raw_spin_lock_irq(&wqe->lock);
6206f0e18   Jens Axboe   io-wq: shrink io_...
581
  		if (!wq_list_empty(&wqe->work_list))
771b53d03   Jens Axboe   io-wq: small thre...
582
583
  			io_worker_handle_work(worker);
  		else
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
584
  			raw_spin_unlock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
585
586
587
588
589
590
591
  	}
  
  	io_worker_exit(worker);
  	return 0;
  }
  
  /*
771b53d03   Jens Axboe   io-wq: small thre...
592
593
594
595
596
597
598
599
600
601
602
603
   * Called when a worker is scheduled in. Mark us as currently running.
   */
  void io_wq_worker_running(struct task_struct *tsk)
  {
  	struct io_worker *worker = kthread_data(tsk);
  	struct io_wqe *wqe = worker->wqe;
  
  	if (!(worker->flags & IO_WORKER_F_UP))
  		return;
  	if (worker->flags & IO_WORKER_F_RUNNING)
  		return;
  	worker->flags |= IO_WORKER_F_RUNNING;
c5def4ab8   Jens Axboe   io-wq: add suppor...
604
  	io_wqe_inc_running(wqe, worker);
771b53d03   Jens Axboe   io-wq: small thre...
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
  }
  
  /*
   * Called when worker is going to sleep. If there are no workers currently
   * running and we have work pending, wake up a free one or have the manager
   * set one up.
   */
  void io_wq_worker_sleeping(struct task_struct *tsk)
  {
  	struct io_worker *worker = kthread_data(tsk);
  	struct io_wqe *wqe = worker->wqe;
  
  	if (!(worker->flags & IO_WORKER_F_UP))
  		return;
  	if (!(worker->flags & IO_WORKER_F_RUNNING))
  		return;
  
  	worker->flags &= ~IO_WORKER_F_RUNNING;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
623
  	raw_spin_lock_irq(&wqe->lock);
c5def4ab8   Jens Axboe   io-wq: add suppor...
624
  	io_wqe_dec_running(wqe, worker);
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
625
  	raw_spin_unlock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
626
  }
b60fda600   Jens Axboe   io-wq: wait for i...
627
  static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
771b53d03   Jens Axboe   io-wq: small thre...
628
  {
c4068bf89   Hillf Danton   io-wq: fix use-af...
629
  	struct io_wqe_acct *acct = &wqe->acct[index];
771b53d03   Jens Axboe   io-wq: small thre...
630
  	struct io_worker *worker;
ad6e005ca   Jann Horn   io_uring: use kza...
631
  	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
771b53d03   Jens Axboe   io-wq: small thre...
632
  	if (!worker)
b60fda600   Jens Axboe   io-wq: wait for i...
633
  		return false;
771b53d03   Jens Axboe   io-wq: small thre...
634
635
636
  
  	refcount_set(&worker->ref, 1);
  	worker->nulls_node.pprev = NULL;
771b53d03   Jens Axboe   io-wq: small thre...
637
  	worker->wqe = wqe;
36c2f9223   Jens Axboe   io-wq: ensure we ...
638
  	spin_lock_init(&worker->lock);
771b53d03   Jens Axboe   io-wq: small thre...
639
640
  
  	worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
c5def4ab8   Jens Axboe   io-wq: add suppor...
641
  				"io_wqe_worker-%d/%d", index, wqe->node);
771b53d03   Jens Axboe   io-wq: small thre...
642
643
  	if (IS_ERR(worker->task)) {
  		kfree(worker);
b60fda600   Jens Axboe   io-wq: wait for i...
644
  		return false;
771b53d03   Jens Axboe   io-wq: small thre...
645
  	}
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
646
  	raw_spin_lock_irq(&wqe->lock);
021d1cdda   Jens Axboe   io-wq: remove now...
647
  	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
e61df66c6   Jens Axboe   io-wq: ensure fre...
648
  	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
771b53d03   Jens Axboe   io-wq: small thre...
649
  	worker->flags |= IO_WORKER_F_FREE;
c5def4ab8   Jens Axboe   io-wq: add suppor...
650
651
652
  	if (index == IO_WQ_ACCT_BOUND)
  		worker->flags |= IO_WORKER_F_BOUND;
  	if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
771b53d03   Jens Axboe   io-wq: small thre...
653
  		worker->flags |= IO_WORKER_F_FIXED;
c5def4ab8   Jens Axboe   io-wq: add suppor...
654
  	acct->nr_workers++;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
655
  	raw_spin_unlock_irq(&wqe->lock);
771b53d03   Jens Axboe   io-wq: small thre...
656

c5def4ab8   Jens Axboe   io-wq: add suppor...
657
658
  	if (index == IO_WQ_ACCT_UNBOUND)
  		atomic_inc(&wq->user->processes);
c4068bf89   Hillf Danton   io-wq: fix use-af...
659
  	refcount_inc(&wq->refs);
771b53d03   Jens Axboe   io-wq: small thre...
660
  	wake_up_process(worker->task);
b60fda600   Jens Axboe   io-wq: wait for i...
661
  	return true;
771b53d03   Jens Axboe   io-wq: small thre...
662
  }
c5def4ab8   Jens Axboe   io-wq: add suppor...
663
  static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
771b53d03   Jens Axboe   io-wq: small thre...
664
665
  	__must_hold(wqe->lock)
  {
c5def4ab8   Jens Axboe   io-wq: add suppor...
666
  	struct io_wqe_acct *acct = &wqe->acct[index];
771b53d03   Jens Axboe   io-wq: small thre...
667

c5def4ab8   Jens Axboe   io-wq: add suppor...
668
  	/* if we have available workers or no work, no need */
021d1cdda   Jens Axboe   io-wq: remove now...
669
  	if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
c5def4ab8   Jens Axboe   io-wq: add suppor...
670
671
  		return false;
  	return acct->nr_workers < acct->max_workers;
771b53d03   Jens Axboe   io-wq: small thre...
672
  }
c4068bf89   Hillf Danton   io-wq: fix use-af...
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
  static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
  {
  	send_sig(SIGINT, worker->task, 1);
  	return false;
  }
  
  /*
   * Iterate the passed in list and call the specific function for each
   * worker that isn't exiting
   */
  static bool io_wq_for_each_worker(struct io_wqe *wqe,
  				  bool (*func)(struct io_worker *, void *),
  				  void *data)
  {
  	struct io_worker *worker;
  	bool ret = false;
  
  	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
  		if (io_worker_get(worker)) {
  			/* no task if node is/was offline */
  			if (worker->task)
  				ret = func(worker, data);
  			io_worker_release(worker);
  			if (ret)
  				break;
  		}
  	}
  
  	return ret;
  }
  
  static bool io_wq_worker_wake(struct io_worker *worker, void *data)
  {
  	wake_up_process(worker->task);
  	return false;
  }
771b53d03   Jens Axboe   io-wq: small thre...
709
710
711
712
713
714
  /*
   * Manager thread. Tasked with creating new workers, if we need them.
   */
  static int io_wq_manager(void *data)
  {
  	struct io_wq *wq = data;
3fc50ab55   Jann Horn   io-wq: fix handli...
715
  	int node;
771b53d03   Jens Axboe   io-wq: small thre...
716

b60fda600   Jens Axboe   io-wq: wait for i...
717
  	/* create fixed workers */
c4068bf89   Hillf Danton   io-wq: fix use-af...
718
  	refcount_set(&wq->refs, 1);
3fc50ab55   Jann Horn   io-wq: fix handli...
719
  	for_each_node(node) {
7563439ad   Jens Axboe   io-wq: don't call...
720
721
  		if (!node_online(node))
  			continue;
c4068bf89   Hillf Danton   io-wq: fix use-af...
722
723
724
725
726
  		if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
  			continue;
  		set_bit(IO_WQ_BIT_ERROR, &wq->state);
  		set_bit(IO_WQ_BIT_EXIT, &wq->state);
  		goto out;
b60fda600   Jens Axboe   io-wq: wait for i...
727
  	}
771b53d03   Jens Axboe   io-wq: small thre...
728

b60fda600   Jens Axboe   io-wq: wait for i...
729
730
731
  	complete(&wq->done);
  
  	while (!kthread_should_stop()) {
aa96bf8a9   Jens Axboe   io_uring: use io-...
732
733
  		if (current->task_works)
  			task_work_run();
3fc50ab55   Jann Horn   io-wq: fix handli...
734
735
  		for_each_node(node) {
  			struct io_wqe *wqe = wq->wqes[node];
c5def4ab8   Jens Axboe   io-wq: add suppor...
736
  			bool fork_worker[2] = { false, false };
771b53d03   Jens Axboe   io-wq: small thre...
737

7563439ad   Jens Axboe   io-wq: don't call...
738
739
  			if (!node_online(node))
  				continue;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
740
  			raw_spin_lock_irq(&wqe->lock);
c5def4ab8   Jens Axboe   io-wq: add suppor...
741
742
743
744
  			if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
  				fork_worker[IO_WQ_ACCT_BOUND] = true;
  			if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
  				fork_worker[IO_WQ_ACCT_UNBOUND] = true;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
745
  			raw_spin_unlock_irq(&wqe->lock);
c5def4ab8   Jens Axboe   io-wq: add suppor...
746
747
748
749
  			if (fork_worker[IO_WQ_ACCT_BOUND])
  				create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
  			if (fork_worker[IO_WQ_ACCT_UNBOUND])
  				create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
771b53d03   Jens Axboe   io-wq: small thre...
750
751
752
753
  		}
  		set_current_state(TASK_INTERRUPTIBLE);
  		schedule_timeout(HZ);
  	}
aa96bf8a9   Jens Axboe   io_uring: use io-...
754
755
  	if (current->task_works)
  		task_work_run();
c4068bf89   Hillf Danton   io-wq: fix use-af...
756
757
  out:
  	if (refcount_dec_and_test(&wq->refs)) {
b60fda600   Jens Axboe   io-wq: wait for i...
758
  		complete(&wq->done);
c4068bf89   Hillf Danton   io-wq: fix use-af...
759
760
761
762
763
764
765
766
767
  		return 0;
  	}
  	/* if ERROR is set and we get here, we have workers to wake */
  	if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
  		rcu_read_lock();
  		for_each_node(node)
  			io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
  		rcu_read_unlock();
  	}
b60fda600   Jens Axboe   io-wq: wait for i...
768
  	return 0;
771b53d03   Jens Axboe   io-wq: small thre...
769
  }
c5def4ab8   Jens Axboe   io-wq: add suppor...
770
771
772
773
774
775
776
777
778
779
780
  static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
  			    struct io_wq_work *work)
  {
  	bool free_worker;
  
  	if (!(work->flags & IO_WQ_WORK_UNBOUND))
  		return true;
  	if (atomic_read(&acct->nr_running))
  		return true;
  
  	rcu_read_lock();
021d1cdda   Jens Axboe   io-wq: remove now...
781
  	free_worker = !hlist_nulls_empty(&wqe->free_list);
c5def4ab8   Jens Axboe   io-wq: add suppor...
782
783
784
785
786
787
788
789
790
791
  	rcu_read_unlock();
  	if (free_worker)
  		return true;
  
  	if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
  	    !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
  		return false;
  
  	return true;
  }
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
792
  static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
fc04c39ba   Pavel Begunkov   io-wq: fix IO_WQ_...
793
  {
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
794
  	struct io_wq *wq = wqe->wq;
fc04c39ba   Pavel Begunkov   io-wq: fix IO_WQ_...
795
796
797
798
  	do {
  		struct io_wq_work *old_work = work;
  
  		work->flags |= IO_WQ_WORK_CANCEL;
f4db7182e   Pavel Begunkov   io-wq: return nex...
799
  		work = wq->do_work(work);
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
800
  		wq->free_work(old_work);
fc04c39ba   Pavel Begunkov   io-wq: fix IO_WQ_...
801
802
  	} while (work);
  }
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
  static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
  {
  	unsigned int hash;
  	struct io_wq_work *tail;
  
  	if (!io_wq_is_hashed(work)) {
  append:
  		wq_list_add_tail(&work->list, &wqe->work_list);
  		return;
  	}
  
  	hash = io_get_work_hash(work);
  	tail = wqe->hash_tail[hash];
  	wqe->hash_tail[hash] = work;
  	if (!tail)
  		goto append;
  
  	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
  }
771b53d03   Jens Axboe   io-wq: small thre...
822
823
  static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
  {
c5def4ab8   Jens Axboe   io-wq: add suppor...
824
  	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
895e2ca0f   Jens Axboe   io-wq: support co...
825
  	int work_flags;
771b53d03   Jens Axboe   io-wq: small thre...
826
  	unsigned long flags;
c5def4ab8   Jens Axboe   io-wq: add suppor...
827
828
829
830
831
832
833
  	/*
  	 * Do early check to see if we need a new unbound worker, and if we do,
  	 * if we're allowed to do so. This isn't 100% accurate as there's a
  	 * gap between this check and incrementing the value, but that's OK.
  	 * It's close enough to not be an issue, fork() has the same delay.
  	 */
  	if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
834
  		io_run_cancel(work, wqe);
c5def4ab8   Jens Axboe   io-wq: add suppor...
835
836
  		return;
  	}
895e2ca0f   Jens Axboe   io-wq: support co...
837
  	work_flags = work->flags;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
838
  	raw_spin_lock_irqsave(&wqe->lock, flags);
86f3cd1b5   Pavel Begunkov   io-wq: handle has...
839
  	io_wqe_insert_work(wqe, work);
771b53d03   Jens Axboe   io-wq: small thre...
840
  	wqe->flags &= ~IO_WQE_FLAG_STALLED;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
841
  	raw_spin_unlock_irqrestore(&wqe->lock, flags);
771b53d03   Jens Axboe   io-wq: small thre...
842

895e2ca0f   Jens Axboe   io-wq: support co...
843
844
  	if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
  	    !atomic_read(&acct->nr_running))
c5def4ab8   Jens Axboe   io-wq: add suppor...
845
  		io_wqe_wake_worker(wqe, acct);
771b53d03   Jens Axboe   io-wq: small thre...
846
847
848
849
850
851
852
853
854
855
  }
  
  void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
  {
  	struct io_wqe *wqe = wq->wqes[numa_node_id()];
  
  	io_wqe_enqueue(wqe, work);
  }
  
  /*
8766dd516   Pavel Begunkov   io-wq: split hash...
856
857
   * Work items that hash to the same value will not be done in parallel.
   * Used to limit concurrent writes, generally hashed by inode.
771b53d03   Jens Axboe   io-wq: small thre...
858
   */
8766dd516   Pavel Begunkov   io-wq: split hash...
859
  void io_wq_hash_work(struct io_wq_work *work, void *val)
771b53d03   Jens Axboe   io-wq: small thre...
860
  {
8766dd516   Pavel Begunkov   io-wq: split hash...
861
  	unsigned int bit;
771b53d03   Jens Axboe   io-wq: small thre...
862
863
864
  
  	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
  	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
771b53d03   Jens Axboe   io-wq: small thre...
865
  }
771b53d03   Jens Axboe   io-wq: small thre...
866
867
  void io_wq_cancel_all(struct io_wq *wq)
  {
3fc50ab55   Jann Horn   io-wq: fix handli...
868
  	int node;
771b53d03   Jens Axboe   io-wq: small thre...
869
870
  
  	set_bit(IO_WQ_BIT_CANCEL, &wq->state);
771b53d03   Jens Axboe   io-wq: small thre...
871
  	rcu_read_lock();
3fc50ab55   Jann Horn   io-wq: fix handli...
872
873
  	for_each_node(node) {
  		struct io_wqe *wqe = wq->wqes[node];
771b53d03   Jens Axboe   io-wq: small thre...
874

e61df66c6   Jens Axboe   io-wq: ensure fre...
875
  		io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
771b53d03   Jens Axboe   io-wq: small thre...
876
877
878
  	}
  	rcu_read_unlock();
  }
62755e35d   Jens Axboe   io_uring: support...
879
  struct io_cb_cancel_data {
2293b4195   Pavel Begunkov   io-wq: remove dup...
880
881
  	work_cancel_fn *fn;
  	void *data;
4f26bda15   Pavel Begunkov   io-wq: add an opt...
882
883
884
  	int nr_running;
  	int nr_pending;
  	bool cancel_all;
62755e35d   Jens Axboe   io_uring: support...
885
  };
2293b4195   Pavel Begunkov   io-wq: remove dup...
886
  static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
62755e35d   Jens Axboe   io_uring: support...
887
  {
2293b4195   Pavel Begunkov   io-wq: remove dup...
888
  	struct io_cb_cancel_data *match = data;
6f72653e7   Jens Axboe   io-wq: use proper...
889
  	unsigned long flags;
62755e35d   Jens Axboe   io_uring: support...
890
891
892
  
  	/*
  	 * Hold the lock to avoid ->cur_work going out of scope, caller
36c2f9223   Jens Axboe   io-wq: ensure we ...
893
  	 * may dereference the passed in work.
62755e35d   Jens Axboe   io_uring: support...
894
  	 */
36c2f9223   Jens Axboe   io-wq: ensure we ...
895
  	spin_lock_irqsave(&worker->lock, flags);
62755e35d   Jens Axboe   io_uring: support...
896
  	if (worker->cur_work &&
0c9d5ccd2   Jens Axboe   io-wq: add suppor...
897
  	    !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
2293b4195   Pavel Begunkov   io-wq: remove dup...
898
  	    match->fn(worker->cur_work, match->data)) {
771b53d03   Jens Axboe   io-wq: small thre...
899
  		send_sig(SIGINT, worker->task, 1);
4f26bda15   Pavel Begunkov   io-wq: add an opt...
900
  		match->nr_running++;
771b53d03   Jens Axboe   io-wq: small thre...
901
  	}
36c2f9223   Jens Axboe   io-wq: ensure we ...
902
  	spin_unlock_irqrestore(&worker->lock, flags);
771b53d03   Jens Axboe   io-wq: small thre...
903

4f26bda15   Pavel Begunkov   io-wq: add an opt...
904
  	return match->nr_running && !match->cancel_all;
771b53d03   Jens Axboe   io-wq: small thre...
905
  }
204361a77   Pavel Begunkov   io-wq: fix hang a...
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
  static inline void io_wqe_remove_pending(struct io_wqe *wqe,
  					 struct io_wq_work *work,
  					 struct io_wq_work_node *prev)
  {
  	unsigned int hash = io_get_work_hash(work);
  	struct io_wq_work *prev_work = NULL;
  
  	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
  		if (prev)
  			prev_work = container_of(prev, struct io_wq_work, list);
  		if (prev_work && io_get_work_hash(prev_work) == hash)
  			wqe->hash_tail[hash] = prev_work;
  		else
  			wqe->hash_tail[hash] = NULL;
  	}
  	wq_list_del(&wqe->work_list, &work->list, prev);
  }
4f26bda15   Pavel Begunkov   io-wq: add an opt...
923
  static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
f4c2665e3   Pavel Begunkov   io-wq: reorder ca...
924
  				       struct io_cb_cancel_data *match)
771b53d03   Jens Axboe   io-wq: small thre...
925
  {
6206f0e18   Jens Axboe   io-wq: shrink io_...
926
  	struct io_wq_work_node *node, *prev;
771b53d03   Jens Axboe   io-wq: small thre...
927
  	struct io_wq_work *work;
6f72653e7   Jens Axboe   io-wq: use proper...
928
  	unsigned long flags;
771b53d03   Jens Axboe   io-wq: small thre...
929

4f26bda15   Pavel Begunkov   io-wq: add an opt...
930
  retry:
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
931
  	raw_spin_lock_irqsave(&wqe->lock, flags);
6206f0e18   Jens Axboe   io-wq: shrink io_...
932
933
  	wq_list_for_each(node, prev, &wqe->work_list) {
  		work = container_of(node, struct io_wq_work, list);
4f26bda15   Pavel Begunkov   io-wq: add an opt...
934
935
  		if (!match->fn(work, match->data))
  			continue;
204361a77   Pavel Begunkov   io-wq: fix hang a...
936
  		io_wqe_remove_pending(wqe, work, prev);
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
937
  		raw_spin_unlock_irqrestore(&wqe->lock, flags);
4f26bda15   Pavel Begunkov   io-wq: add an opt...
938
939
940
941
942
943
944
  		io_run_cancel(work, wqe);
  		match->nr_pending++;
  		if (!match->cancel_all)
  			return;
  
  		/* not safe to continue after unlock */
  		goto retry;
771b53d03   Jens Axboe   io-wq: small thre...
945
  	}
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
946
  	raw_spin_unlock_irqrestore(&wqe->lock, flags);
f4c2665e3   Pavel Begunkov   io-wq: reorder ca...
947
  }
4f26bda15   Pavel Begunkov   io-wq: add an opt...
948
  static void io_wqe_cancel_running_work(struct io_wqe *wqe,
f4c2665e3   Pavel Begunkov   io-wq: reorder ca...
949
950
  				       struct io_cb_cancel_data *match)
  {
771b53d03   Jens Axboe   io-wq: small thre...
951
  	rcu_read_lock();
4f26bda15   Pavel Begunkov   io-wq: add an opt...
952
  	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
771b53d03   Jens Axboe   io-wq: small thre...
953
  	rcu_read_unlock();
771b53d03   Jens Axboe   io-wq: small thre...
954
  }
2293b4195   Pavel Begunkov   io-wq: remove dup...
955
  enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
4f26bda15   Pavel Begunkov   io-wq: add an opt...
956
  				  void *data, bool cancel_all)
771b53d03   Jens Axboe   io-wq: small thre...
957
  {
2293b4195   Pavel Begunkov   io-wq: remove dup...
958
  	struct io_cb_cancel_data match = {
4f26bda15   Pavel Begunkov   io-wq: add an opt...
959
960
961
  		.fn		= cancel,
  		.data		= data,
  		.cancel_all	= cancel_all,
00bcda13d   Jens Axboe   io-wq: make io_wq...
962
  	};
3fc50ab55   Jann Horn   io-wq: fix handli...
963
  	int node;
771b53d03   Jens Axboe   io-wq: small thre...
964

f4c2665e3   Pavel Begunkov   io-wq: reorder ca...
965
966
967
968
969
  	/*
  	 * First check pending list, if we're lucky we can just remove it
  	 * from there. CANCEL_OK means that the work is returned as-new,
  	 * no completion will be posted for it.
  	 */
3fc50ab55   Jann Horn   io-wq: fix handli...
970
971
  	for_each_node(node) {
  		struct io_wqe *wqe = wq->wqes[node];
771b53d03   Jens Axboe   io-wq: small thre...
972

4f26bda15   Pavel Begunkov   io-wq: add an opt...
973
974
  		io_wqe_cancel_pending_work(wqe, &match);
  		if (match.nr_pending && !match.cancel_all)
f4c2665e3   Pavel Begunkov   io-wq: reorder ca...
975
  			return IO_WQ_CANCEL_OK;
771b53d03   Jens Axboe   io-wq: small thre...
976
  	}
f4c2665e3   Pavel Begunkov   io-wq: reorder ca...
977
978
979
980
981
982
983
984
  	/*
  	 * Now check if a free (going busy) or busy worker has the work
  	 * currently running. If we find it there, we'll return CANCEL_RUNNING
  	 * as an indication that we attempt to signal cancellation. The
  	 * completion will run normally in this case.
  	 */
  	for_each_node(node) {
  		struct io_wqe *wqe = wq->wqes[node];
4f26bda15   Pavel Begunkov   io-wq: add an opt...
985
986
  		io_wqe_cancel_running_work(wqe, &match);
  		if (match.nr_running && !match.cancel_all)
f4c2665e3   Pavel Begunkov   io-wq: reorder ca...
987
988
  			return IO_WQ_CANCEL_RUNNING;
  	}
4f26bda15   Pavel Begunkov   io-wq: add an opt...
989
990
991
992
  	if (match.nr_running)
  		return IO_WQ_CANCEL_RUNNING;
  	if (match.nr_pending)
  		return IO_WQ_CANCEL_OK;
f4c2665e3   Pavel Begunkov   io-wq: reorder ca...
993
  	return IO_WQ_CANCEL_NOTFOUND;
771b53d03   Jens Axboe   io-wq: small thre...
994
  }
2293b4195   Pavel Begunkov   io-wq: remove dup...
995
996
997
998
999
1000
1001
  static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data)
  {
  	return work == data;
  }
  
  enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
  {
4f26bda15   Pavel Begunkov   io-wq: add an opt...
1002
  	return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork, false);
2293b4195   Pavel Begunkov   io-wq: remove dup...
1003
  }
576a347b7   Jens Axboe   io-wq: have io_wq...
1004
  struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
771b53d03   Jens Axboe   io-wq: small thre...
1005
  {
3fc50ab55   Jann Horn   io-wq: fix handli...
1006
  	int ret = -ENOMEM, node;
771b53d03   Jens Axboe   io-wq: small thre...
1007
  	struct io_wq *wq;
f5fa38c59   Pavel Begunkov   io_wq: add per-wq...
1008
  	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
1009
  		return ERR_PTR(-EINVAL);
ad6e005ca   Jann Horn   io_uring: use kza...
1010
  	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
771b53d03   Jens Axboe   io-wq: small thre...
1011
1012
  	if (!wq)
  		return ERR_PTR(-ENOMEM);
3fc50ab55   Jann Horn   io-wq: fix handli...
1013
  	wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
771b53d03   Jens Axboe   io-wq: small thre...
1014
1015
1016
1017
  	if (!wq->wqes) {
  		kfree(wq);
  		return ERR_PTR(-ENOMEM);
  	}
e9fd93965   Pavel Begunkov   io_uring/io-wq: f...
1018
  	wq->free_work = data->free_work;
f5fa38c59   Pavel Begunkov   io_wq: add per-wq...
1019
  	wq->do_work = data->do_work;
7d7230652   Jens Axboe   io_wq: add get/pu...
1020

c5def4ab8   Jens Axboe   io-wq: add suppor...
1021
  	/* caller must already hold a reference to this */
576a347b7   Jens Axboe   io-wq: have io_wq...
1022
  	wq->user = data->user;
c5def4ab8   Jens Axboe   io-wq: add suppor...
1023

3fc50ab55   Jann Horn   io-wq: fix handli...
1024
  	for_each_node(node) {
771b53d03   Jens Axboe   io-wq: small thre...
1025
  		struct io_wqe *wqe;
7563439ad   Jens Axboe   io-wq: don't call...
1026
  		int alloc_node = node;
771b53d03   Jens Axboe   io-wq: small thre...
1027

7563439ad   Jens Axboe   io-wq: don't call...
1028
1029
1030
  		if (!node_online(alloc_node))
  			alloc_node = NUMA_NO_NODE;
  		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
771b53d03   Jens Axboe   io-wq: small thre...
1031
  		if (!wqe)
3fc50ab55   Jann Horn   io-wq: fix handli...
1032
1033
  			goto err;
  		wq->wqes[node] = wqe;
7563439ad   Jens Axboe   io-wq: don't call...
1034
  		wqe->node = alloc_node;
c5def4ab8   Jens Axboe   io-wq: add suppor...
1035
1036
  		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
  		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
576a347b7   Jens Axboe   io-wq: have io_wq...
1037
  		if (wq->user) {
c5def4ab8   Jens Axboe   io-wq: add suppor...
1038
1039
1040
1041
  			wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
  					task_rlimit(current, RLIMIT_NPROC);
  		}
  		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
771b53d03   Jens Axboe   io-wq: small thre...
1042
  		wqe->wq = wq;
95da84659   Sebastian Andrzej Siewior   io_wq: Make io_wq...
1043
  		raw_spin_lock_init(&wqe->lock);
6206f0e18   Jens Axboe   io-wq: shrink io_...
1044
  		INIT_WQ_LIST(&wqe->work_list);
021d1cdda   Jens Axboe   io-wq: remove now...
1045
  		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
e61df66c6   Jens Axboe   io-wq: ensure fre...
1046
  		INIT_LIST_HEAD(&wqe->all_list);
771b53d03   Jens Axboe   io-wq: small thre...
1047
1048
1049
  	}
  
  	init_completion(&wq->done);
771b53d03   Jens Axboe   io-wq: small thre...
1050
1051
1052
  	wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
  	if (!IS_ERR(wq->manager)) {
  		wake_up_process(wq->manager);
b60fda600   Jens Axboe   io-wq: wait for i...
1053
1054
1055
1056
1057
  		wait_for_completion(&wq->done);
  		if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
  			ret = -ENOMEM;
  			goto err;
  		}
848f7e188   Jens Axboe   io-wq: make the i...
1058
  		refcount_set(&wq->use_refs, 1);
b60fda600   Jens Axboe   io-wq: wait for i...
1059
  		reinit_completion(&wq->done);
771b53d03   Jens Axboe   io-wq: small thre...
1060
1061
1062
1063
  		return wq;
  	}
  
  	ret = PTR_ERR(wq->manager);
771b53d03   Jens Axboe   io-wq: small thre...
1064
  	complete(&wq->done);
b60fda600   Jens Axboe   io-wq: wait for i...
1065
  err:
3fc50ab55   Jann Horn   io-wq: fix handli...
1066
1067
  	for_each_node(node)
  		kfree(wq->wqes[node]);
b60fda600   Jens Axboe   io-wq: wait for i...
1068
1069
  	kfree(wq->wqes);
  	kfree(wq);
771b53d03   Jens Axboe   io-wq: small thre...
1070
1071
  	return ERR_PTR(ret);
  }
eba6f5a33   Pavel Begunkov   io-wq: allow grab...
1072
1073
  bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
  {
f5fa38c59   Pavel Begunkov   io_wq: add per-wq...
1074
  	if (data->free_work != wq->free_work || data->do_work != wq->do_work)
eba6f5a33   Pavel Begunkov   io-wq: allow grab...
1075
1076
1077
1078
  		return false;
  
  	return refcount_inc_not_zero(&wq->use_refs);
  }
848f7e188   Jens Axboe   io-wq: make the i...
1079
  static void __io_wq_destroy(struct io_wq *wq)
771b53d03   Jens Axboe   io-wq: small thre...
1080
  {
3fc50ab55   Jann Horn   io-wq: fix handli...
1081
  	int node;
771b53d03   Jens Axboe   io-wq: small thre...
1082

b60fda600   Jens Axboe   io-wq: wait for i...
1083
1084
  	set_bit(IO_WQ_BIT_EXIT, &wq->state);
  	if (wq->manager)
771b53d03   Jens Axboe   io-wq: small thre...
1085
  		kthread_stop(wq->manager);
771b53d03   Jens Axboe   io-wq: small thre...
1086
1087
  
  	rcu_read_lock();
3fc50ab55   Jann Horn   io-wq: fix handli...
1088
1089
  	for_each_node(node)
  		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
771b53d03   Jens Axboe   io-wq: small thre...
1090
1091
1092
  	rcu_read_unlock();
  
  	wait_for_completion(&wq->done);
3fc50ab55   Jann Horn   io-wq: fix handli...
1093
1094
  	for_each_node(node)
  		kfree(wq->wqes[node]);
771b53d03   Jens Axboe   io-wq: small thre...
1095
1096
1097
  	kfree(wq->wqes);
  	kfree(wq);
  }
848f7e188   Jens Axboe   io-wq: make the i...
1098
1099
1100
1101
1102
1103
  
  void io_wq_destroy(struct io_wq *wq)
  {
  	if (refcount_dec_and_test(&wq->use_refs))
  		__io_wq_destroy(wq);
  }
aa96bf8a9   Jens Axboe   io_uring: use io-...
1104
1105
1106
1107
1108
  
  struct task_struct *io_wq_get_task(struct io_wq *wq)
  {
  	return wq->manager;
  }