Blame view
fs/io-wq.c
30.5 KB
771b53d03 io-wq: small thre... |
1 2 3 4 5 6 7 8 9 10 11 12 |
// SPDX-License-Identifier: GPL-2.0 /* * Basic worker thread pool for io_uring * * Copyright (C) 2019 Jens Axboe * */ #include <linux/kernel.h> #include <linux/init.h> #include <linux/errno.h> #include <linux/sched/signal.h> #include <linux/mm.h> |
771b53d03 io-wq: small thre... |
13 14 15 16 17 |
#include <linux/sched/mm.h> #include <linux/percpu.h> #include <linux/slab.h> #include <linux/kthread.h> #include <linux/rculist_nulls.h> |
9392a27d8 io-wq: add suppor... |
18 |
#include <linux/fs_struct.h> |
aa96bf8a9 io_uring: use io-... |
19 |
#include <linux/task_work.h> |
91d8f5191 io_uring: add blk... |
20 |
#include <linux/blk-cgroup.h> |
4ea33a976 io-wq: inherit au... |
21 |
#include <linux/audit.h> |
43c01fbef io-wq: re-set NUM... |
22 |
#include <linux/cpu.h> |
771b53d03 io-wq: small thre... |
23 |
|
43c01fbef io-wq: re-set NUM... |
24 |
#include "../kernel/sched/sched.h" |
771b53d03 io-wq: small thre... |
25 26 27 28 29 30 31 32 |
#include "io-wq.h" #define WORKER_IDLE_TIMEOUT (5 * HZ) enum { IO_WORKER_F_UP = 1, /* up and active */ IO_WORKER_F_RUNNING = 2, /* account as running */ IO_WORKER_F_FREE = 4, /* worker on free list */ |
145cc8c66 io-wq: kill unuse... |
33 34 |
IO_WORKER_F_FIXED = 8, /* static idle worker */ IO_WORKER_F_BOUND = 16, /* is doing bounded work */ |
771b53d03 io-wq: small thre... |
35 36 37 38 39 |
}; enum { IO_WQ_BIT_EXIT = 0, /* wq exiting */ IO_WQ_BIT_CANCEL = 1, /* cancel work on list */ |
b60fda600 io-wq: wait for i... |
40 |
IO_WQ_BIT_ERROR = 2, /* error on setup */ |
771b53d03 io-wq: small thre... |
41 42 43 44 45 46 47 48 49 50 51 52 53 |
}; enum { IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ }; /* * One for each thread in a wqe pool */ struct io_worker { refcount_t ref; unsigned flags; struct hlist_nulls_node nulls_node; |
e61df66c6 io-wq: ensure fre... |
54 |
struct list_head all_list; |
771b53d03 io-wq: small thre... |
55 |
struct task_struct *task; |
771b53d03 io-wq: small thre... |
56 |
struct io_wqe *wqe; |
36c2f9223 io-wq: ensure we ... |
57 |
|
771b53d03 io-wq: small thre... |
58 |
struct io_wq_work *cur_work; |
36c2f9223 io-wq: ensure we ... |
59 |
spinlock_t lock; |
771b53d03 io-wq: small thre... |
60 61 62 |
struct rcu_head rcu; struct mm_struct *mm; |
91d8f5191 io_uring: add blk... |
63 64 65 |
#ifdef CONFIG_BLK_CGROUP struct cgroup_subsys_state *blkcg_css; #endif |
cccf0ee83 io_uring/io-wq: d... |
66 67 |
const struct cred *cur_creds; const struct cred *saved_creds; |
fcb323cc5 io_uring: io_urin... |
68 |
struct files_struct *restore_files; |
9b8284921 io_uring: referen... |
69 |
struct nsproxy *restore_nsproxy; |
9392a27d8 io-wq: add suppor... |
70 |
struct fs_struct *restore_fs; |
771b53d03 io-wq: small thre... |
71 |
}; |
771b53d03 io-wq: small thre... |
72 73 74 75 76 |
#if BITS_PER_LONG == 64 #define IO_WQ_HASH_ORDER 6 #else #define IO_WQ_HASH_ORDER 5 #endif |
86f3cd1b5 io-wq: handle has... |
77 |
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) |
c5def4ab8 io-wq: add suppor... |
78 79 80 81 82 83 84 85 86 87 |
struct io_wqe_acct { unsigned nr_workers; unsigned max_workers; atomic_t nr_running; }; enum { IO_WQ_ACCT_BOUND, IO_WQ_ACCT_UNBOUND, }; |
771b53d03 io-wq: small thre... |
88 89 90 91 92 |
/* * Per-node worker thread pool */ struct io_wqe { struct { |
95da84659 io_wq: Make io_wq... |
93 |
raw_spinlock_t lock; |
6206f0e18 io-wq: shrink io_... |
94 |
struct io_wq_work_list work_list; |
771b53d03 io-wq: small thre... |
95 96 97 98 99 |
unsigned long hash_map; unsigned flags; } ____cacheline_aligned_in_smp; int node; |
c5def4ab8 io-wq: add suppor... |
100 |
struct io_wqe_acct acct[2]; |
771b53d03 io-wq: small thre... |
101 |
|
021d1cdda io-wq: remove now... |
102 |
struct hlist_nulls_head free_list; |
e61df66c6 io-wq: ensure fre... |
103 |
struct list_head all_list; |
771b53d03 io-wq: small thre... |
104 105 |
struct io_wq *wq; |
86f3cd1b5 io-wq: handle has... |
106 |
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; |
771b53d03 io-wq: small thre... |
107 108 109 110 111 112 113 114 |
}; /* * Per io_wq state */ struct io_wq { struct io_wqe **wqes; unsigned long state; |
771b53d03 io-wq: small thre... |
115 |
|
e9fd93965 io_uring/io-wq: f... |
116 |
free_work_fn *free_work; |
f5fa38c59 io_wq: add per-wq... |
117 |
io_wq_work_fn *do_work; |
7d7230652 io_wq: add get/pu... |
118 |
|
771b53d03 io-wq: small thre... |
119 |
struct task_struct *manager; |
c5def4ab8 io-wq: add suppor... |
120 |
struct user_struct *user; |
771b53d03 io-wq: small thre... |
121 122 |
refcount_t refs; struct completion done; |
848f7e188 io-wq: make the i... |
123 |
|
43c01fbef io-wq: re-set NUM... |
124 |
struct hlist_node cpuhp_node; |
848f7e188 io-wq: make the i... |
125 |
refcount_t use_refs; |
771b53d03 io-wq: small thre... |
126 |
}; |
43c01fbef io-wq: re-set NUM... |
127 |
static enum cpuhp_state io_wq_online; |
771b53d03 io-wq: small thre... |
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
static bool io_worker_get(struct io_worker *worker) { return refcount_inc_not_zero(&worker->ref); } static void io_worker_release(struct io_worker *worker) { if (refcount_dec_and_test(&worker->ref)) wake_up_process(worker->task); } /* * Note: drops the wqe->lock if returning true! The caller must re-acquire * the lock in that case. Some callers need to restart handling if this * happens, so we can't just re-acquire the lock on behalf of the caller. */ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) { |
fcb323cc5 io_uring: io_urin... |
146 |
bool dropped_lock = false; |
cccf0ee83 io_uring/io-wq: d... |
147 148 149 |
if (worker->saved_creds) { revert_creds(worker->saved_creds); worker->cur_creds = worker->saved_creds = NULL; |
181e448d8 io_uring: async w... |
150 |
} |
fcb323cc5 io_uring: io_urin... |
151 152 |
if (current->files != worker->restore_files) { __acquire(&wqe->lock); |
95da84659 io_wq: Make io_wq... |
153 |
raw_spin_unlock_irq(&wqe->lock); |
fcb323cc5 io_uring: io_urin... |
154 155 156 157 |
dropped_lock = true; task_lock(current); current->files = worker->restore_files; |
9b8284921 io_uring: referen... |
158 |
current->nsproxy = worker->restore_nsproxy; |
fcb323cc5 io_uring: io_urin... |
159 160 |
task_unlock(current); } |
9392a27d8 io-wq: add suppor... |
161 162 |
if (current->fs != worker->restore_fs) current->fs = worker->restore_fs; |
771b53d03 io-wq: small thre... |
163 164 165 166 167 |
/* * If we have an active mm, we need to drop the wq lock before unusing * it. If we do, return true and let the caller retry the idle loop. */ if (worker->mm) { |
fcb323cc5 io_uring: io_urin... |
168 169 |
if (!dropped_lock) { __acquire(&wqe->lock); |
95da84659 io_wq: Make io_wq... |
170 |
raw_spin_unlock_irq(&wqe->lock); |
fcb323cc5 io_uring: io_urin... |
171 172 |
dropped_lock = true; } |
771b53d03 io-wq: small thre... |
173 |
__set_current_state(TASK_RUNNING); |
f5678e7f2 kernel: better do... |
174 |
kthread_unuse_mm(worker->mm); |
771b53d03 io-wq: small thre... |
175 176 |
mmput(worker->mm); worker->mm = NULL; |
771b53d03 io-wq: small thre... |
177 |
} |
91d8f5191 io_uring: add blk... |
178 179 180 181 182 183 |
#ifdef CONFIG_BLK_CGROUP if (worker->blkcg_css) { kthread_associate_blkcg(NULL); worker->blkcg_css = NULL; } #endif |
69228338c io_uring: unify f... |
184 185 |
if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; |
fcb323cc5 io_uring: io_urin... |
186 |
return dropped_lock; |
771b53d03 io-wq: small thre... |
187 |
} |
c5def4ab8 io-wq: add suppor... |
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, struct io_wq_work *work) { if (work->flags & IO_WQ_WORK_UNBOUND) return &wqe->acct[IO_WQ_ACCT_UNBOUND]; return &wqe->acct[IO_WQ_ACCT_BOUND]; } static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, struct io_worker *worker) { if (worker->flags & IO_WORKER_F_BOUND) return &wqe->acct[IO_WQ_ACCT_BOUND]; return &wqe->acct[IO_WQ_ACCT_UNBOUND]; } |
771b53d03 io-wq: small thre... |
205 206 207 |
static void io_worker_exit(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; |
c5def4ab8 io-wq: add suppor... |
208 |
struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); |
771b53d03 io-wq: small thre... |
209 210 211 212 213 214 215 216 217 218 219 220 221 |
/* * If we're not at zero, someone else is holding a brief reference * to the worker. Wait for that to go away. */ set_current_state(TASK_INTERRUPTIBLE); if (!refcount_dec_and_test(&worker->ref)) schedule(); __set_current_state(TASK_RUNNING); preempt_disable(); current->flags &= ~PF_IO_WORKER; if (worker->flags & IO_WORKER_F_RUNNING) |
c5def4ab8 io-wq: add suppor... |
222 223 224 |
atomic_dec(&acct->nr_running); if (!(worker->flags & IO_WORKER_F_BOUND)) atomic_dec(&wqe->wq->user->processes); |
771b53d03 io-wq: small thre... |
225 226 |
worker->flags = 0; preempt_enable(); |
95da84659 io_wq: Make io_wq... |
227 |
raw_spin_lock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
228 |
hlist_nulls_del_rcu(&worker->nulls_node); |
e61df66c6 io-wq: ensure fre... |
229 |
list_del_rcu(&worker->all_list); |
771b53d03 io-wq: small thre... |
230 231 |
if (__io_worker_unuse(wqe, worker)) { __release(&wqe->lock); |
95da84659 io_wq: Make io_wq... |
232 |
raw_spin_lock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
233 |
} |
c5def4ab8 io-wq: add suppor... |
234 |
acct->nr_workers--; |
95da84659 io_wq: Make io_wq... |
235 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
236 |
|
364b05fd0 io-wq: use kfree_... |
237 |
kfree_rcu(worker, rcu); |
c4068bf89 io-wq: fix use-af... |
238 239 |
if (refcount_dec_and_test(&wqe->wq->refs)) complete(&wqe->wq->done); |
771b53d03 io-wq: small thre... |
240 |
} |
c5def4ab8 io-wq: add suppor... |
241 242 243 |
static inline bool io_wqe_run_queue(struct io_wqe *wqe) __must_hold(wqe->lock) { |
6206f0e18 io-wq: shrink io_... |
244 245 |
if (!wq_list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED)) |
c5def4ab8 io-wq: add suppor... |
246 247 248 249 250 251 252 253 254 255 256 257 258 |
return true; return false; } /* * Check head of free list for an available worker. If one isn't available, * caller must wake up the wq manager to create one. */ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) __must_hold(RCU) { struct hlist_nulls_node *n; struct io_worker *worker; |
021d1cdda io-wq: remove now... |
259 |
n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); |
c5def4ab8 io-wq: add suppor... |
260 261 262 263 264 |
if (is_a_nulls(n)) return false; worker = hlist_nulls_entry(n, struct io_worker, nulls_node); if (io_worker_get(worker)) { |
506d95ff5 io-wq: remove wor... |
265 |
wake_up_process(worker->task); |
c5def4ab8 io-wq: add suppor... |
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
io_worker_release(worker); return true; } return false; } /* * We need a worker. If we find a free one, we're good. If not, and we're * below the max number of workers, wake up the manager to create one. */ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { bool ret; /* * Most likely an attempt to queue unbounded work on an io_wq that * wasn't setup with any unbounded workers. */ WARN_ON_ONCE(!acct->max_workers); rcu_read_lock(); ret = io_wqe_activate_free_worker(wqe); rcu_read_unlock(); if (!ret && acct->nr_workers < acct->max_workers) wake_up_process(wqe->wq->manager); } static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) { struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); atomic_inc(&acct->nr_running); } static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) __must_hold(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) io_wqe_wake_worker(wqe, acct); } |
771b53d03 io-wq: small thre... |
310 311 312 313 314 315 316 |
static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) { allow_kernel_signal(SIGINT); current->flags |= PF_IO_WORKER; worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); |
fcb323cc5 io_uring: io_urin... |
317 |
worker->restore_files = current->files; |
9b8284921 io_uring: referen... |
318 |
worker->restore_nsproxy = current->nsproxy; |
9392a27d8 io-wq: add suppor... |
319 |
worker->restore_fs = current->fs; |
c5def4ab8 io-wq: add suppor... |
320 |
io_wqe_inc_running(wqe, worker); |
771b53d03 io-wq: small thre... |
321 322 323 324 325 326 327 328 329 330 |
} /* * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist */ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, struct io_wq_work *work) __must_hold(wqe->lock) { |
c5def4ab8 io-wq: add suppor... |
331 |
bool worker_bound, work_bound; |
771b53d03 io-wq: small thre... |
332 333 334 |
if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; hlist_nulls_del_init_rcu(&worker->nulls_node); |
771b53d03 io-wq: small thre... |
335 |
} |
c5def4ab8 io-wq: add suppor... |
336 337 338 339 340 |
/* * If worker is moving from bound to unbound (or vice versa), then * ensure we update the running accounting. */ |
b2e9c7d64 io-wq: remove ext... |
341 342 343 |
worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; if (worker_bound != work_bound) { |
c5def4ab8 io-wq: add suppor... |
344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
io_wqe_dec_running(wqe, worker); if (work_bound) { worker->flags |= IO_WORKER_F_BOUND; wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; atomic_dec(&wqe->wq->user->processes); } else { worker->flags &= ~IO_WORKER_F_BOUND; wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; atomic_inc(&wqe->wq->user->processes); } io_wqe_inc_running(wqe, worker); } |
771b53d03 io-wq: small thre... |
358 359 360 361 362 363 364 365 366 367 368 369 370 371 |
} /* * No work, worker going to sleep. Move to freelist, and unuse mm if we * have one attached. Dropping the mm may potentially sleep, so we drop * the lock in that case and return success. Since the caller has to * retry the loop in that case (we changed task state), we don't regrab * the lock if we return success. */ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) __must_hold(wqe->lock) { if (!(worker->flags & IO_WORKER_F_FREE)) { worker->flags |= IO_WORKER_F_FREE; |
021d1cdda io-wq: remove now... |
372 |
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); |
771b53d03 io-wq: small thre... |
373 374 375 376 |
} return __io_worker_unuse(wqe, worker); } |
60cf46ae6 io-wq: hash depen... |
377 378 379 380 381 382 |
static inline unsigned int io_get_work_hash(struct io_wq_work *work) { return work->flags >> IO_WQ_HASH_SHIFT; } static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) |
771b53d03 io-wq: small thre... |
383 384 |
__must_hold(wqe->lock) { |
6206f0e18 io-wq: shrink io_... |
385 |
struct io_wq_work_node *node, *prev; |
86f3cd1b5 io-wq: handle has... |
386 |
struct io_wq_work *work, *tail; |
60cf46ae6 io-wq: hash depen... |
387 |
unsigned int hash; |
771b53d03 io-wq: small thre... |
388 |
|
6206f0e18 io-wq: shrink io_... |
389 390 |
wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); |
771b53d03 io-wq: small thre... |
391 |
/* not hashed, can run anytime */ |
8766dd516 io-wq: split hash... |
392 |
if (!io_wq_is_hashed(work)) { |
86f3cd1b5 io-wq: handle has... |
393 |
wq_list_del(&wqe->work_list, node, prev); |
771b53d03 io-wq: small thre... |
394 395 396 397 |
return work; } /* hashed, can run if not already running */ |
60cf46ae6 io-wq: hash depen... |
398 399 400 |
hash = io_get_work_hash(work); if (!(wqe->hash_map & BIT(hash))) { wqe->hash_map |= BIT(hash); |
86f3cd1b5 io-wq: handle has... |
401 402 403 404 |
/* all items with this hash lie in [work, tail] */ tail = wqe->hash_tail[hash]; wqe->hash_tail[hash] = NULL; wq_list_cut(&wqe->work_list, &tail->list, prev); |
771b53d03 io-wq: small thre... |
405 406 407 408 409 410 |
return work; } } return NULL; } |
cccf0ee83 io_uring/io-wq: d... |
411 412 413 |
static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) { if (worker->mm) { |
f5678e7f2 kernel: better do... |
414 |
kthread_unuse_mm(worker->mm); |
cccf0ee83 io_uring/io-wq: d... |
415 416 417 |
mmput(worker->mm); worker->mm = NULL; } |
37c54f9bd kernel: set USER_... |
418 |
|
98447d65b io_uring: move io... |
419 420 421 |
if (mmget_not_zero(work->identity->mm)) { kthread_use_mm(work->identity->mm); worker->mm = work->identity->mm; |
cccf0ee83 io_uring/io-wq: d... |
422 423 424 425 426 427 |
return; } /* failed grabbing mm, ensure work gets cancelled */ work->flags |= IO_WQ_WORK_CANCEL; } |
91d8f5191 io_uring: add blk... |
428 429 430 431 |
static inline void io_wq_switch_blkcg(struct io_worker *worker, struct io_wq_work *work) { #ifdef CONFIG_BLK_CGROUP |
0f2037658 io_uring: pass re... |
432 433 |
if (!(work->flags & IO_WQ_WORK_BLKCG)) return; |
98447d65b io_uring: move io... |
434 435 436 |
if (work->identity->blkcg_css != worker->blkcg_css) { kthread_associate_blkcg(work->identity->blkcg_css); worker->blkcg_css = work->identity->blkcg_css; |
91d8f5191 io_uring: add blk... |
437 438 439 |
} #endif } |
cccf0ee83 io_uring/io-wq: d... |
440 441 442 |
static void io_wq_switch_creds(struct io_worker *worker, struct io_wq_work *work) { |
98447d65b io_uring: move io... |
443 |
const struct cred *old_creds = override_creds(work->identity->creds); |
cccf0ee83 io_uring/io-wq: d... |
444 |
|
98447d65b io_uring: move io... |
445 |
worker->cur_creds = work->identity->creds; |
cccf0ee83 io_uring/io-wq: d... |
446 447 448 449 450 |
if (worker->saved_creds) put_cred(old_creds); /* creds set by previous switch */ else worker->saved_creds = old_creds; } |
dc026a73c io-wq: shuffle io... |
451 452 453 |
static void io_impersonate_work(struct io_worker *worker, struct io_wq_work *work) { |
98447d65b io_uring: move io... |
454 455 |
if ((work->flags & IO_WQ_WORK_FILES) && current->files != work->identity->files) { |
dc026a73c io-wq: shuffle io... |
456 |
task_lock(current); |
98447d65b io_uring: move io... |
457 458 |
current->files = work->identity->files; current->nsproxy = work->identity->nsproxy; |
dc026a73c io-wq: shuffle io... |
459 |
task_unlock(current); |
3dd1680d1 io-wq: cancel req... |
460 461 462 463 |
if (!work->identity->files) { /* failed grabbing files, ensure work gets cancelled */ work->flags |= IO_WQ_WORK_CANCEL; } |
dc026a73c io-wq: shuffle io... |
464 |
} |
98447d65b io_uring: move io... |
465 466 467 |
if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs) current->fs = work->identity->fs; if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm) |
dc026a73c io-wq: shuffle io... |
468 |
io_wq_switch_mm(worker, work); |
98447d65b io_uring: move io... |
469 470 |
if ((work->flags & IO_WQ_WORK_CREDS) && worker->cur_creds != work->identity->creds) |
dc026a73c io-wq: shuffle io... |
471 |
io_wq_switch_creds(worker, work); |
69228338c io_uring: unify f... |
472 473 474 475 |
if (work->flags & IO_WQ_WORK_FSIZE) current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize; else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; |
91d8f5191 io_uring: add blk... |
476 |
io_wq_switch_blkcg(worker, work); |
4ea33a976 io-wq: inherit au... |
477 478 479 480 |
#ifdef CONFIG_AUDIT current->loginuid = work->identity->loginuid; current->sessionid = work->identity->sessionid; #endif |
dc026a73c io-wq: shuffle io... |
481 482 483 484 485 |
} static void io_assign_current_work(struct io_worker *worker, struct io_wq_work *work) { |
d78298e73 io-wq: don't resc... |
486 487 488 489 490 491 |
if (work) { /* flush pending signals before assigning new work */ if (signal_pending(current)) flush_signals(current); cond_resched(); } |
dc026a73c io-wq: shuffle io... |
492 |
|
4ea33a976 io-wq: inherit au... |
493 494 495 496 |
#ifdef CONFIG_AUDIT current->loginuid = KUIDT_INIT(AUDIT_UID_UNSET); current->sessionid = AUDIT_SID_UNSET; #endif |
dc026a73c io-wq: shuffle io... |
497 498 499 500 |
spin_lock_irq(&worker->lock); worker->cur_work = work; spin_unlock_irq(&worker->lock); } |
60cf46ae6 io-wq: hash depen... |
501 |
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); |
771b53d03 io-wq: small thre... |
502 503 504 |
static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { |
771b53d03 io-wq: small thre... |
505 506 507 508 |
struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; do { |
86f3cd1b5 io-wq: handle has... |
509 |
struct io_wq_work *work; |
f462fd36f io-wq: optimise o... |
510 |
get_next: |
771b53d03 io-wq: small thre... |
511 |
/* |
771b53d03 io-wq: small thre... |
512 513 514 515 516 517 |
* If we got some work, mark us as busy. If we didn't, but * the list isn't empty, it means we stalled on hashed work. * Mark us stalled so we don't keep looking for work when we * can't make progress, any work completion or insertion will * clear the stalled flag. */ |
60cf46ae6 io-wq: hash depen... |
518 |
work = io_get_next_work(wqe); |
771b53d03 io-wq: small thre... |
519 520 |
if (work) __io_worker_busy(wqe, worker, work); |
6206f0e18 io-wq: shrink io_... |
521 |
else if (!wq_list_empty(&wqe->work_list)) |
771b53d03 io-wq: small thre... |
522 |
wqe->flags |= IO_WQE_FLAG_STALLED; |
95da84659 io_wq: Make io_wq... |
523 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
524 525 |
if (!work) break; |
58e393198 io-wq: optimise l... |
526 |
io_assign_current_work(worker, work); |
36c2f9223 io-wq: ensure we ... |
527 |
|
dc026a73c io-wq: shuffle io... |
528 529 |
/* handle a whole dependent link */ do { |
86f3cd1b5 io-wq: handle has... |
530 |
struct io_wq_work *old_work, *next_hashed, *linked; |
b089ed390 io-wq: update has... |
531 |
unsigned int hash = io_get_work_hash(work); |
dc026a73c io-wq: shuffle io... |
532 |
|
86f3cd1b5 io-wq: handle has... |
533 |
next_hashed = wq_next_work(work); |
58e393198 io-wq: optimise l... |
534 |
io_impersonate_work(worker, work); |
dc026a73c io-wq: shuffle io... |
535 536 537 538 539 540 |
/* * OK to set IO_WQ_WORK_CANCEL even for uncancellable * work, the worker function will do the right thing. */ if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) work->flags |= IO_WQ_WORK_CANCEL; |
f4db7182e io-wq: return nex... |
541 542 |
old_work = work; linked = wq->do_work(work); |
86f3cd1b5 io-wq: handle has... |
543 544 545 546 547 548 549 |
work = next_hashed; if (!work && linked && !io_wq_is_hashed(linked)) { work = linked; linked = NULL; } io_assign_current_work(worker, work); |
e9fd93965 io_uring/io-wq: f... |
550 |
wq->free_work(old_work); |
dc026a73c io-wq: shuffle io... |
551 |
|
86f3cd1b5 io-wq: handle has... |
552 553 554 555 |
if (linked) io_wqe_enqueue(wqe, linked); if (hash != -1U && !next_hashed) { |
95da84659 io_wq: Make io_wq... |
556 |
raw_spin_lock_irq(&wqe->lock); |
dc026a73c io-wq: shuffle io... |
557 558 |
wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; |
f462fd36f io-wq: optimise o... |
559 560 561 |
/* skip unnecessary unlock-lock wqe->lock */ if (!work) goto get_next; |
95da84659 io_wq: Make io_wq... |
562 |
raw_spin_unlock_irq(&wqe->lock); |
7d7230652 io_wq: add get/pu... |
563 |
} |
58e393198 io-wq: optimise l... |
564 |
} while (work); |
7d7230652 io_wq: add get/pu... |
565 |
|
95da84659 io_wq: Make io_wq... |
566 |
raw_spin_lock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
567 568 |
} while (1); } |
771b53d03 io-wq: small thre... |
569 570 571 572 573 |
static int io_wqe_worker(void *data) { struct io_worker *worker = data; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; |
771b53d03 io-wq: small thre... |
574 575 576 577 |
io_worker_start(wqe, worker); while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { |
506d95ff5 io-wq: remove wor... |
578 |
set_current_state(TASK_INTERRUPTIBLE); |
e995d5123 io-wq: briefly sp... |
579 |
loop: |
95da84659 io_wq: Make io_wq... |
580 |
raw_spin_lock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
581 582 583 |
if (io_wqe_run_queue(wqe)) { __set_current_state(TASK_RUNNING); io_worker_handle_work(worker); |
e995d5123 io-wq: briefly sp... |
584 |
goto loop; |
771b53d03 io-wq: small thre... |
585 586 587 588 |
} /* drops the lock on success, retry */ if (__io_worker_idle(wqe, worker)) { __release(&wqe->lock); |
e995d5123 io-wq: briefly sp... |
589 |
goto loop; |
771b53d03 io-wq: small thre... |
590 |
} |
95da84659 io_wq: Make io_wq... |
591 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
592 593 594 595 596 597 598 599 600 |
if (signal_pending(current)) flush_signals(current); if (schedule_timeout(WORKER_IDLE_TIMEOUT)) continue; /* timed out, exit unless we're the fixed worker */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || !(worker->flags & IO_WORKER_F_FIXED)) break; } |
771b53d03 io-wq: small thre... |
601 |
if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { |
95da84659 io_wq: Make io_wq... |
602 |
raw_spin_lock_irq(&wqe->lock); |
6206f0e18 io-wq: shrink io_... |
603 |
if (!wq_list_empty(&wqe->work_list)) |
771b53d03 io-wq: small thre... |
604 605 |
io_worker_handle_work(worker); else |
95da84659 io_wq: Make io_wq... |
606 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
607 608 609 610 611 612 613 |
} io_worker_exit(worker); return 0; } /* |
771b53d03 io-wq: small thre... |
614 615 616 617 618 619 620 621 622 623 624 625 |
* Called when a worker is scheduled in. Mark us as currently running. */ void io_wq_worker_running(struct task_struct *tsk) { struct io_worker *worker = kthread_data(tsk); struct io_wqe *wqe = worker->wqe; if (!(worker->flags & IO_WORKER_F_UP)) return; if (worker->flags & IO_WORKER_F_RUNNING) return; worker->flags |= IO_WORKER_F_RUNNING; |
c5def4ab8 io-wq: add suppor... |
626 |
io_wqe_inc_running(wqe, worker); |
771b53d03 io-wq: small thre... |
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 |
} /* * Called when worker is going to sleep. If there are no workers currently * running and we have work pending, wake up a free one or have the manager * set one up. */ void io_wq_worker_sleeping(struct task_struct *tsk) { struct io_worker *worker = kthread_data(tsk); struct io_wqe *wqe = worker->wqe; if (!(worker->flags & IO_WORKER_F_UP)) return; if (!(worker->flags & IO_WORKER_F_RUNNING)) return; worker->flags &= ~IO_WORKER_F_RUNNING; |
95da84659 io_wq: Make io_wq... |
645 |
raw_spin_lock_irq(&wqe->lock); |
c5def4ab8 io-wq: add suppor... |
646 |
io_wqe_dec_running(wqe, worker); |
95da84659 io_wq: Make io_wq... |
647 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
648 |
} |
b60fda600 io-wq: wait for i... |
649 |
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) |
771b53d03 io-wq: small thre... |
650 |
{ |
c4068bf89 io-wq: fix use-af... |
651 |
struct io_wqe_acct *acct = &wqe->acct[index]; |
771b53d03 io-wq: small thre... |
652 |
struct io_worker *worker; |
ad6e005ca io_uring: use kza... |
653 |
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); |
771b53d03 io-wq: small thre... |
654 |
if (!worker) |
b60fda600 io-wq: wait for i... |
655 |
return false; |
771b53d03 io-wq: small thre... |
656 657 658 |
refcount_set(&worker->ref, 1); worker->nulls_node.pprev = NULL; |
771b53d03 io-wq: small thre... |
659 |
worker->wqe = wqe; |
36c2f9223 io-wq: ensure we ... |
660 |
spin_lock_init(&worker->lock); |
771b53d03 io-wq: small thre... |
661 662 |
worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, |
c5def4ab8 io-wq: add suppor... |
663 |
"io_wqe_worker-%d/%d", index, wqe->node); |
771b53d03 io-wq: small thre... |
664 665 |
if (IS_ERR(worker->task)) { kfree(worker); |
b60fda600 io-wq: wait for i... |
666 |
return false; |
771b53d03 io-wq: small thre... |
667 |
} |
a8b595b22 io-wq: assign NUM... |
668 |
kthread_bind_mask(worker->task, cpumask_of_node(wqe->node)); |
771b53d03 io-wq: small thre... |
669 |
|
95da84659 io_wq: Make io_wq... |
670 |
raw_spin_lock_irq(&wqe->lock); |
021d1cdda io-wq: remove now... |
671 |
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); |
e61df66c6 io-wq: ensure fre... |
672 |
list_add_tail_rcu(&worker->all_list, &wqe->all_list); |
771b53d03 io-wq: small thre... |
673 |
worker->flags |= IO_WORKER_F_FREE; |
c5def4ab8 io-wq: add suppor... |
674 675 676 |
if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) |
771b53d03 io-wq: small thre... |
677 |
worker->flags |= IO_WORKER_F_FIXED; |
c5def4ab8 io-wq: add suppor... |
678 |
acct->nr_workers++; |
95da84659 io_wq: Make io_wq... |
679 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
680 |
|
c5def4ab8 io-wq: add suppor... |
681 682 |
if (index == IO_WQ_ACCT_UNBOUND) atomic_inc(&wq->user->processes); |
c4068bf89 io-wq: fix use-af... |
683 |
refcount_inc(&wq->refs); |
771b53d03 io-wq: small thre... |
684 |
wake_up_process(worker->task); |
b60fda600 io-wq: wait for i... |
685 |
return true; |
771b53d03 io-wq: small thre... |
686 |
} |
c5def4ab8 io-wq: add suppor... |
687 |
static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) |
771b53d03 io-wq: small thre... |
688 689 |
__must_hold(wqe->lock) { |
c5def4ab8 io-wq: add suppor... |
690 |
struct io_wqe_acct *acct = &wqe->acct[index]; |
771b53d03 io-wq: small thre... |
691 |
|
c5def4ab8 io-wq: add suppor... |
692 |
/* if we have available workers or no work, no need */ |
021d1cdda io-wq: remove now... |
693 |
if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) |
c5def4ab8 io-wq: add suppor... |
694 695 |
return false; return acct->nr_workers < acct->max_workers; |
771b53d03 io-wq: small thre... |
696 |
} |
c4068bf89 io-wq: fix use-af... |
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 |
static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) { send_sig(SIGINT, worker->task, 1); return false; } /* * Iterate the passed in list and call the specific function for each * worker that isn't exiting */ static bool io_wq_for_each_worker(struct io_wqe *wqe, bool (*func)(struct io_worker *, void *), void *data) { struct io_worker *worker; bool ret = false; list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { if (io_worker_get(worker)) { /* no task if node is/was offline */ if (worker->task) ret = func(worker, data); io_worker_release(worker); if (ret) break; } } return ret; } static bool io_wq_worker_wake(struct io_worker *worker, void *data) { wake_up_process(worker->task); return false; } |
771b53d03 io-wq: small thre... |
733 734 735 736 737 738 |
/* * Manager thread. Tasked with creating new workers, if we need them. */ static int io_wq_manager(void *data) { struct io_wq *wq = data; |
3fc50ab55 io-wq: fix handli... |
739 |
int node; |
771b53d03 io-wq: small thre... |
740 |
|
b60fda600 io-wq: wait for i... |
741 |
/* create fixed workers */ |
c4068bf89 io-wq: fix use-af... |
742 |
refcount_set(&wq->refs, 1); |
3fc50ab55 io-wq: fix handli... |
743 |
for_each_node(node) { |
7563439ad io-wq: don't call... |
744 745 |
if (!node_online(node)) continue; |
c4068bf89 io-wq: fix use-af... |
746 747 748 749 750 |
if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) continue; set_bit(IO_WQ_BIT_ERROR, &wq->state); set_bit(IO_WQ_BIT_EXIT, &wq->state); goto out; |
b60fda600 io-wq: wait for i... |
751 |
} |
771b53d03 io-wq: small thre... |
752 |
|
b60fda600 io-wq: wait for i... |
753 754 755 |
complete(&wq->done); while (!kthread_should_stop()) { |
aa96bf8a9 io_uring: use io-... |
756 757 |
if (current->task_works) task_work_run(); |
3fc50ab55 io-wq: fix handli... |
758 759 |
for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; |
c5def4ab8 io-wq: add suppor... |
760 |
bool fork_worker[2] = { false, false }; |
771b53d03 io-wq: small thre... |
761 |
|
7563439ad io-wq: don't call... |
762 763 |
if (!node_online(node)) continue; |
95da84659 io_wq: Make io_wq... |
764 |
raw_spin_lock_irq(&wqe->lock); |
c5def4ab8 io-wq: add suppor... |
765 766 767 768 |
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) fork_worker[IO_WQ_ACCT_BOUND] = true; if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) fork_worker[IO_WQ_ACCT_UNBOUND] = true; |
95da84659 io_wq: Make io_wq... |
769 |
raw_spin_unlock_irq(&wqe->lock); |
c5def4ab8 io-wq: add suppor... |
770 771 772 773 |
if (fork_worker[IO_WQ_ACCT_BOUND]) create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); if (fork_worker[IO_WQ_ACCT_UNBOUND]) create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); |
771b53d03 io-wq: small thre... |
774 775 776 777 |
} set_current_state(TASK_INTERRUPTIBLE); schedule_timeout(HZ); } |
aa96bf8a9 io_uring: use io-... |
778 779 |
if (current->task_works) task_work_run(); |
c4068bf89 io-wq: fix use-af... |
780 781 |
out: if (refcount_dec_and_test(&wq->refs)) { |
b60fda600 io-wq: wait for i... |
782 |
complete(&wq->done); |
c4068bf89 io-wq: fix use-af... |
783 784 785 786 787 788 789 790 791 |
return 0; } /* if ERROR is set and we get here, we have workers to wake */ if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { rcu_read_lock(); for_each_node(node) io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); rcu_read_unlock(); } |
b60fda600 io-wq: wait for i... |
792 |
return 0; |
771b53d03 io-wq: small thre... |
793 |
} |
c5def4ab8 io-wq: add suppor... |
794 795 796 797 798 799 800 801 802 803 804 |
static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, struct io_wq_work *work) { bool free_worker; if (!(work->flags & IO_WQ_WORK_UNBOUND)) return true; if (atomic_read(&acct->nr_running)) return true; rcu_read_lock(); |
021d1cdda io-wq: remove now... |
805 |
free_worker = !hlist_nulls_empty(&wqe->free_list); |
c5def4ab8 io-wq: add suppor... |
806 807 808 809 810 811 812 813 814 815 |
rcu_read_unlock(); if (free_worker) return true; if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers && !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN))) return false; return true; } |
e9fd93965 io_uring/io-wq: f... |
816 |
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) |
fc04c39ba io-wq: fix IO_WQ_... |
817 |
{ |
e9fd93965 io_uring/io-wq: f... |
818 |
struct io_wq *wq = wqe->wq; |
fc04c39ba io-wq: fix IO_WQ_... |
819 820 821 822 |
do { struct io_wq_work *old_work = work; work->flags |= IO_WQ_WORK_CANCEL; |
f4db7182e io-wq: return nex... |
823 |
work = wq->do_work(work); |
e9fd93965 io_uring/io-wq: f... |
824 |
wq->free_work(old_work); |
fc04c39ba io-wq: fix IO_WQ_... |
825 826 |
} while (work); } |
86f3cd1b5 io-wq: handle has... |
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 |
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) { unsigned int hash; struct io_wq_work *tail; if (!io_wq_is_hashed(work)) { append: wq_list_add_tail(&work->list, &wqe->work_list); return; } hash = io_get_work_hash(work); tail = wqe->hash_tail[hash]; wqe->hash_tail[hash] = work; if (!tail) goto append; wq_list_add_after(&work->list, &tail->list, &wqe->work_list); } |
771b53d03 io-wq: small thre... |
846 847 |
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { |
c5def4ab8 io-wq: add suppor... |
848 |
struct io_wqe_acct *acct = io_work_get_acct(wqe, work); |
895e2ca0f io-wq: support co... |
849 |
int work_flags; |
771b53d03 io-wq: small thre... |
850 |
unsigned long flags; |
c5def4ab8 io-wq: add suppor... |
851 852 853 854 855 856 857 |
/* * Do early check to see if we need a new unbound worker, and if we do, * if we're allowed to do so. This isn't 100% accurate as there's a * gap between this check and incrementing the value, but that's OK. * It's close enough to not be an issue, fork() has the same delay. */ if (unlikely(!io_wq_can_queue(wqe, acct, work))) { |
e9fd93965 io_uring/io-wq: f... |
858 |
io_run_cancel(work, wqe); |
c5def4ab8 io-wq: add suppor... |
859 860 |
return; } |
895e2ca0f io-wq: support co... |
861 |
work_flags = work->flags; |
95da84659 io_wq: Make io_wq... |
862 |
raw_spin_lock_irqsave(&wqe->lock, flags); |
86f3cd1b5 io-wq: handle has... |
863 |
io_wqe_insert_work(wqe, work); |
771b53d03 io-wq: small thre... |
864 |
wqe->flags &= ~IO_WQE_FLAG_STALLED; |
95da84659 io_wq: Make io_wq... |
865 |
raw_spin_unlock_irqrestore(&wqe->lock, flags); |
771b53d03 io-wq: small thre... |
866 |
|
895e2ca0f io-wq: support co... |
867 868 |
if ((work_flags & IO_WQ_WORK_CONCURRENT) || !atomic_read(&acct->nr_running)) |
c5def4ab8 io-wq: add suppor... |
869 |
io_wqe_wake_worker(wqe, acct); |
771b53d03 io-wq: small thre... |
870 871 872 873 874 875 876 877 878 879 |
} void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) { struct io_wqe *wqe = wq->wqes[numa_node_id()]; io_wqe_enqueue(wqe, work); } /* |
8766dd516 io-wq: split hash... |
880 881 |
* Work items that hash to the same value will not be done in parallel. * Used to limit concurrent writes, generally hashed by inode. |
771b53d03 io-wq: small thre... |
882 |
*/ |
8766dd516 io-wq: split hash... |
883 |
void io_wq_hash_work(struct io_wq_work *work, void *val) |
771b53d03 io-wq: small thre... |
884 |
{ |
8766dd516 io-wq: split hash... |
885 |
unsigned int bit; |
771b53d03 io-wq: small thre... |
886 887 888 |
bit = hash_ptr(val, IO_WQ_HASH_ORDER); work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); |
771b53d03 io-wq: small thre... |
889 |
} |
771b53d03 io-wq: small thre... |
890 891 |
void io_wq_cancel_all(struct io_wq *wq) { |
3fc50ab55 io-wq: fix handli... |
892 |
int node; |
771b53d03 io-wq: small thre... |
893 894 |
set_bit(IO_WQ_BIT_CANCEL, &wq->state); |
771b53d03 io-wq: small thre... |
895 |
rcu_read_lock(); |
3fc50ab55 io-wq: fix handli... |
896 897 |
for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; |
771b53d03 io-wq: small thre... |
898 |
|
e61df66c6 io-wq: ensure fre... |
899 |
io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL); |
771b53d03 io-wq: small thre... |
900 901 902 |
} rcu_read_unlock(); } |
62755e35d io_uring: support... |
903 |
struct io_cb_cancel_data { |
2293b4195 io-wq: remove dup... |
904 905 |
work_cancel_fn *fn; void *data; |
4f26bda15 io-wq: add an opt... |
906 907 908 |
int nr_running; int nr_pending; bool cancel_all; |
62755e35d io_uring: support... |
909 |
}; |
2293b4195 io-wq: remove dup... |
910 |
static bool io_wq_worker_cancel(struct io_worker *worker, void *data) |
62755e35d io_uring: support... |
911 |
{ |
2293b4195 io-wq: remove dup... |
912 |
struct io_cb_cancel_data *match = data; |
6f72653e7 io-wq: use proper... |
913 |
unsigned long flags; |
62755e35d io_uring: support... |
914 915 916 |
/* * Hold the lock to avoid ->cur_work going out of scope, caller |
36c2f9223 io-wq: ensure we ... |
917 |
* may dereference the passed in work. |
62755e35d io_uring: support... |
918 |
*/ |
36c2f9223 io-wq: ensure we ... |
919 |
spin_lock_irqsave(&worker->lock, flags); |
62755e35d io_uring: support... |
920 |
if (worker->cur_work && |
0c9d5ccd2 io-wq: add suppor... |
921 |
!(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && |
2293b4195 io-wq: remove dup... |
922 |
match->fn(worker->cur_work, match->data)) { |
771b53d03 io-wq: small thre... |
923 |
send_sig(SIGINT, worker->task, 1); |
4f26bda15 io-wq: add an opt... |
924 |
match->nr_running++; |
771b53d03 io-wq: small thre... |
925 |
} |
36c2f9223 io-wq: ensure we ... |
926 |
spin_unlock_irqrestore(&worker->lock, flags); |
771b53d03 io-wq: small thre... |
927 |
|
4f26bda15 io-wq: add an opt... |
928 |
return match->nr_running && !match->cancel_all; |
771b53d03 io-wq: small thre... |
929 |
} |
204361a77 io-wq: fix hang a... |
930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 |
static inline void io_wqe_remove_pending(struct io_wqe *wqe, struct io_wq_work *work, struct io_wq_work_node *prev) { unsigned int hash = io_get_work_hash(work); struct io_wq_work *prev_work = NULL; if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) { if (prev) prev_work = container_of(prev, struct io_wq_work, list); if (prev_work && io_get_work_hash(prev_work) == hash) wqe->hash_tail[hash] = prev_work; else wqe->hash_tail[hash] = NULL; } wq_list_del(&wqe->work_list, &work->list, prev); } |
4f26bda15 io-wq: add an opt... |
947 |
static void io_wqe_cancel_pending_work(struct io_wqe *wqe, |
f4c2665e3 io-wq: reorder ca... |
948 |
struct io_cb_cancel_data *match) |
771b53d03 io-wq: small thre... |
949 |
{ |
6206f0e18 io-wq: shrink io_... |
950 |
struct io_wq_work_node *node, *prev; |
771b53d03 io-wq: small thre... |
951 |
struct io_wq_work *work; |
6f72653e7 io-wq: use proper... |
952 |
unsigned long flags; |
771b53d03 io-wq: small thre... |
953 |
|
4f26bda15 io-wq: add an opt... |
954 |
retry: |
95da84659 io_wq: Make io_wq... |
955 |
raw_spin_lock_irqsave(&wqe->lock, flags); |
6206f0e18 io-wq: shrink io_... |
956 957 |
wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); |
4f26bda15 io-wq: add an opt... |
958 959 |
if (!match->fn(work, match->data)) continue; |
204361a77 io-wq: fix hang a... |
960 |
io_wqe_remove_pending(wqe, work, prev); |
95da84659 io_wq: Make io_wq... |
961 |
raw_spin_unlock_irqrestore(&wqe->lock, flags); |
4f26bda15 io-wq: add an opt... |
962 963 964 965 966 967 968 |
io_run_cancel(work, wqe); match->nr_pending++; if (!match->cancel_all) return; /* not safe to continue after unlock */ goto retry; |
771b53d03 io-wq: small thre... |
969 |
} |
95da84659 io_wq: Make io_wq... |
970 |
raw_spin_unlock_irqrestore(&wqe->lock, flags); |
f4c2665e3 io-wq: reorder ca... |
971 |
} |
4f26bda15 io-wq: add an opt... |
972 |
static void io_wqe_cancel_running_work(struct io_wqe *wqe, |
f4c2665e3 io-wq: reorder ca... |
973 974 |
struct io_cb_cancel_data *match) { |
771b53d03 io-wq: small thre... |
975 |
rcu_read_lock(); |
4f26bda15 io-wq: add an opt... |
976 |
io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); |
771b53d03 io-wq: small thre... |
977 |
rcu_read_unlock(); |
771b53d03 io-wq: small thre... |
978 |
} |
2293b4195 io-wq: remove dup... |
979 |
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, |
4f26bda15 io-wq: add an opt... |
980 |
void *data, bool cancel_all) |
771b53d03 io-wq: small thre... |
981 |
{ |
2293b4195 io-wq: remove dup... |
982 |
struct io_cb_cancel_data match = { |
4f26bda15 io-wq: add an opt... |
983 984 985 |
.fn = cancel, .data = data, .cancel_all = cancel_all, |
00bcda13d io-wq: make io_wq... |
986 |
}; |
3fc50ab55 io-wq: fix handli... |
987 |
int node; |
771b53d03 io-wq: small thre... |
988 |
|
f4c2665e3 io-wq: reorder ca... |
989 990 991 992 993 |
/* * First check pending list, if we're lucky we can just remove it * from there. CANCEL_OK means that the work is returned as-new, * no completion will be posted for it. */ |
3fc50ab55 io-wq: fix handli... |
994 995 |
for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; |
771b53d03 io-wq: small thre... |
996 |
|
4f26bda15 io-wq: add an opt... |
997 998 |
io_wqe_cancel_pending_work(wqe, &match); if (match.nr_pending && !match.cancel_all) |
f4c2665e3 io-wq: reorder ca... |
999 |
return IO_WQ_CANCEL_OK; |
771b53d03 io-wq: small thre... |
1000 |
} |
f4c2665e3 io-wq: reorder ca... |
1001 1002 1003 1004 1005 1006 1007 1008 |
/* * Now check if a free (going busy) or busy worker has the work * currently running. If we find it there, we'll return CANCEL_RUNNING * as an indication that we attempt to signal cancellation. The * completion will run normally in this case. */ for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; |
4f26bda15 io-wq: add an opt... |
1009 1010 |
io_wqe_cancel_running_work(wqe, &match); if (match.nr_running && !match.cancel_all) |
f4c2665e3 io-wq: reorder ca... |
1011 1012 |
return IO_WQ_CANCEL_RUNNING; } |
4f26bda15 io-wq: add an opt... |
1013 1014 1015 1016 |
if (match.nr_running) return IO_WQ_CANCEL_RUNNING; if (match.nr_pending) return IO_WQ_CANCEL_OK; |
f4c2665e3 io-wq: reorder ca... |
1017 |
return IO_WQ_CANCEL_NOTFOUND; |
771b53d03 io-wq: small thre... |
1018 |
} |
2293b4195 io-wq: remove dup... |
1019 1020 1021 1022 1023 1024 1025 |
static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data) { return work == data; } enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) { |
4f26bda15 io-wq: add an opt... |
1026 |
return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork, false); |
2293b4195 io-wq: remove dup... |
1027 |
} |
576a347b7 io-wq: have io_wq... |
1028 |
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) |
771b53d03 io-wq: small thre... |
1029 |
{ |
3fc50ab55 io-wq: fix handli... |
1030 |
int ret = -ENOMEM, node; |
771b53d03 io-wq: small thre... |
1031 |
struct io_wq *wq; |
f5fa38c59 io_wq: add per-wq... |
1032 |
if (WARN_ON_ONCE(!data->free_work || !data->do_work)) |
e9fd93965 io_uring/io-wq: f... |
1033 |
return ERR_PTR(-EINVAL); |
ad6e005ca io_uring: use kza... |
1034 |
wq = kzalloc(sizeof(*wq), GFP_KERNEL); |
771b53d03 io-wq: small thre... |
1035 1036 |
if (!wq) return ERR_PTR(-ENOMEM); |
3fc50ab55 io-wq: fix handli... |
1037 |
wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL); |
43c01fbef io-wq: re-set NUM... |
1038 1039 1040 1041 1042 1043 |
if (!wq->wqes) goto err_wq; ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); if (ret) goto err_wqes; |
771b53d03 io-wq: small thre... |
1044 |
|
e9fd93965 io_uring/io-wq: f... |
1045 |
wq->free_work = data->free_work; |
f5fa38c59 io_wq: add per-wq... |
1046 |
wq->do_work = data->do_work; |
7d7230652 io_wq: add get/pu... |
1047 |
|
c5def4ab8 io-wq: add suppor... |
1048 |
/* caller must already hold a reference to this */ |
576a347b7 io-wq: have io_wq... |
1049 |
wq->user = data->user; |
c5def4ab8 io-wq: add suppor... |
1050 |
|
43c01fbef io-wq: re-set NUM... |
1051 |
ret = -ENOMEM; |
3fc50ab55 io-wq: fix handli... |
1052 |
for_each_node(node) { |
771b53d03 io-wq: small thre... |
1053 |
struct io_wqe *wqe; |
7563439ad io-wq: don't call... |
1054 |
int alloc_node = node; |
771b53d03 io-wq: small thre... |
1055 |
|
7563439ad io-wq: don't call... |
1056 1057 1058 |
if (!node_online(alloc_node)) alloc_node = NUMA_NO_NODE; wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); |
771b53d03 io-wq: small thre... |
1059 |
if (!wqe) |
3fc50ab55 io-wq: fix handli... |
1060 1061 |
goto err; wq->wqes[node] = wqe; |
7563439ad io-wq: don't call... |
1062 |
wqe->node = alloc_node; |
c5def4ab8 io-wq: add suppor... |
1063 1064 |
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); |
576a347b7 io-wq: have io_wq... |
1065 |
if (wq->user) { |
c5def4ab8 io-wq: add suppor... |
1066 1067 1068 1069 |
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); } atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); |
771b53d03 io-wq: small thre... |
1070 |
wqe->wq = wq; |
95da84659 io_wq: Make io_wq... |
1071 |
raw_spin_lock_init(&wqe->lock); |
6206f0e18 io-wq: shrink io_... |
1072 |
INIT_WQ_LIST(&wqe->work_list); |
021d1cdda io-wq: remove now... |
1073 |
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); |
e61df66c6 io-wq: ensure fre... |
1074 |
INIT_LIST_HEAD(&wqe->all_list); |
771b53d03 io-wq: small thre... |
1075 1076 1077 |
} init_completion(&wq->done); |
771b53d03 io-wq: small thre... |
1078 1079 1080 |
wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); if (!IS_ERR(wq->manager)) { wake_up_process(wq->manager); |
b60fda600 io-wq: wait for i... |
1081 1082 1083 1084 1085 |
wait_for_completion(&wq->done); if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { ret = -ENOMEM; goto err; } |
848f7e188 io-wq: make the i... |
1086 |
refcount_set(&wq->use_refs, 1); |
b60fda600 io-wq: wait for i... |
1087 |
reinit_completion(&wq->done); |
771b53d03 io-wq: small thre... |
1088 1089 1090 1091 |
return wq; } ret = PTR_ERR(wq->manager); |
771b53d03 io-wq: small thre... |
1092 |
complete(&wq->done); |
b60fda600 io-wq: wait for i... |
1093 |
err: |
43c01fbef io-wq: re-set NUM... |
1094 |
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); |
3fc50ab55 io-wq: fix handli... |
1095 1096 |
for_each_node(node) kfree(wq->wqes[node]); |
43c01fbef io-wq: re-set NUM... |
1097 |
err_wqes: |
b60fda600 io-wq: wait for i... |
1098 |
kfree(wq->wqes); |
43c01fbef io-wq: re-set NUM... |
1099 |
err_wq: |
b60fda600 io-wq: wait for i... |
1100 |
kfree(wq); |
771b53d03 io-wq: small thre... |
1101 1102 |
return ERR_PTR(ret); } |
eba6f5a33 io-wq: allow grab... |
1103 1104 |
bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) { |
f5fa38c59 io_wq: add per-wq... |
1105 |
if (data->free_work != wq->free_work || data->do_work != wq->do_work) |
eba6f5a33 io-wq: allow grab... |
1106 1107 1108 1109 |
return false; return refcount_inc_not_zero(&wq->use_refs); } |
848f7e188 io-wq: make the i... |
1110 |
static void __io_wq_destroy(struct io_wq *wq) |
771b53d03 io-wq: small thre... |
1111 |
{ |
3fc50ab55 io-wq: fix handli... |
1112 |
int node; |
771b53d03 io-wq: small thre... |
1113 |
|
43c01fbef io-wq: re-set NUM... |
1114 |
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); |
b60fda600 io-wq: wait for i... |
1115 1116 |
set_bit(IO_WQ_BIT_EXIT, &wq->state); if (wq->manager) |
771b53d03 io-wq: small thre... |
1117 |
kthread_stop(wq->manager); |
771b53d03 io-wq: small thre... |
1118 1119 |
rcu_read_lock(); |
3fc50ab55 io-wq: fix handli... |
1120 1121 |
for_each_node(node) io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); |
771b53d03 io-wq: small thre... |
1122 1123 1124 |
rcu_read_unlock(); wait_for_completion(&wq->done); |
3fc50ab55 io-wq: fix handli... |
1125 1126 |
for_each_node(node) kfree(wq->wqes[node]); |
771b53d03 io-wq: small thre... |
1127 1128 1129 |
kfree(wq->wqes); kfree(wq); } |
848f7e188 io-wq: make the i... |
1130 1131 1132 1133 1134 1135 |
void io_wq_destroy(struct io_wq *wq) { if (refcount_dec_and_test(&wq->use_refs)) __io_wq_destroy(wq); } |
aa96bf8a9 io_uring: use io-... |
1136 1137 1138 1139 1140 |
struct task_struct *io_wq_get_task(struct io_wq *wq) { return wq->manager; } |
43c01fbef io-wq: re-set NUM... |
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 |
static bool io_wq_worker_affinity(struct io_worker *worker, void *data) { struct task_struct *task = worker->task; struct rq_flags rf; struct rq *rq; rq = task_rq_lock(task, &rf); do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node)); task->flags |= PF_NO_SETAFFINITY; task_rq_unlock(rq, task, &rf); return false; } static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) { struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); int i; rcu_read_lock(); for_each_node(i) io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL); rcu_read_unlock(); return 0; } static __init int io_wq_init(void) { int ret; ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", io_wq_cpu_online, NULL); if (ret < 0) return ret; io_wq_online = ret; return 0; } subsys_initcall(io_wq_init); |