Blame view
fs/io-wq.c
28.3 KB
771b53d03 io-wq: small thre... |
1 2 3 4 5 6 7 8 9 10 11 12 |
// SPDX-License-Identifier: GPL-2.0 /* * Basic worker thread pool for io_uring * * Copyright (C) 2019 Jens Axboe * */ #include <linux/kernel.h> #include <linux/init.h> #include <linux/errno.h> #include <linux/sched/signal.h> #include <linux/mm.h> |
771b53d03 io-wq: small thre... |
13 14 15 16 17 |
#include <linux/sched/mm.h> #include <linux/percpu.h> #include <linux/slab.h> #include <linux/kthread.h> #include <linux/rculist_nulls.h> |
9392a27d8 io-wq: add suppor... |
18 |
#include <linux/fs_struct.h> |
aa96bf8a9 io_uring: use io-... |
19 |
#include <linux/task_work.h> |
91d8f5191 io_uring: add blk... |
20 |
#include <linux/blk-cgroup.h> |
771b53d03 io-wq: small thre... |
21 22 23 24 25 26 27 28 29 |
#include "io-wq.h" #define WORKER_IDLE_TIMEOUT (5 * HZ) enum { IO_WORKER_F_UP = 1, /* up and active */ IO_WORKER_F_RUNNING = 2, /* account as running */ IO_WORKER_F_FREE = 4, /* worker on free list */ |
145cc8c66 io-wq: kill unuse... |
30 31 |
IO_WORKER_F_FIXED = 8, /* static idle worker */ IO_WORKER_F_BOUND = 16, /* is doing bounded work */ |
771b53d03 io-wq: small thre... |
32 33 34 35 36 |
}; enum { IO_WQ_BIT_EXIT = 0, /* wq exiting */ IO_WQ_BIT_CANCEL = 1, /* cancel work on list */ |
b60fda600 io-wq: wait for i... |
37 |
IO_WQ_BIT_ERROR = 2, /* error on setup */ |
771b53d03 io-wq: small thre... |
38 39 40 41 42 43 44 45 46 47 48 49 50 |
}; enum { IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ }; /* * One for each thread in a wqe pool */ struct io_worker { refcount_t ref; unsigned flags; struct hlist_nulls_node nulls_node; |
e61df66c6 io-wq: ensure fre... |
51 |
struct list_head all_list; |
771b53d03 io-wq: small thre... |
52 |
struct task_struct *task; |
771b53d03 io-wq: small thre... |
53 |
struct io_wqe *wqe; |
36c2f9223 io-wq: ensure we ... |
54 |
|
771b53d03 io-wq: small thre... |
55 |
struct io_wq_work *cur_work; |
36c2f9223 io-wq: ensure we ... |
56 |
spinlock_t lock; |
771b53d03 io-wq: small thre... |
57 58 59 |
struct rcu_head rcu; struct mm_struct *mm; |
91d8f5191 io_uring: add blk... |
60 61 62 |
#ifdef CONFIG_BLK_CGROUP struct cgroup_subsys_state *blkcg_css; #endif |
cccf0ee83 io_uring/io-wq: d... |
63 64 |
const struct cred *cur_creds; const struct cred *saved_creds; |
fcb323cc5 io_uring: io_urin... |
65 |
struct files_struct *restore_files; |
9b8284921 io_uring: referen... |
66 |
struct nsproxy *restore_nsproxy; |
9392a27d8 io-wq: add suppor... |
67 |
struct fs_struct *restore_fs; |
771b53d03 io-wq: small thre... |
68 |
}; |
771b53d03 io-wq: small thre... |
69 70 71 72 73 |
#if BITS_PER_LONG == 64 #define IO_WQ_HASH_ORDER 6 #else #define IO_WQ_HASH_ORDER 5 #endif |
86f3cd1b5 io-wq: handle has... |
74 |
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) |
c5def4ab8 io-wq: add suppor... |
75 76 77 78 79 80 81 82 83 84 |
struct io_wqe_acct { unsigned nr_workers; unsigned max_workers; atomic_t nr_running; }; enum { IO_WQ_ACCT_BOUND, IO_WQ_ACCT_UNBOUND, }; |
771b53d03 io-wq: small thre... |
85 86 87 88 89 |
/* * Per-node worker thread pool */ struct io_wqe { struct { |
95da84659 io_wq: Make io_wq... |
90 |
raw_spinlock_t lock; |
6206f0e18 io-wq: shrink io_... |
91 |
struct io_wq_work_list work_list; |
771b53d03 io-wq: small thre... |
92 93 94 95 96 |
unsigned long hash_map; unsigned flags; } ____cacheline_aligned_in_smp; int node; |
c5def4ab8 io-wq: add suppor... |
97 |
struct io_wqe_acct acct[2]; |
771b53d03 io-wq: small thre... |
98 |
|
021d1cdda io-wq: remove now... |
99 |
struct hlist_nulls_head free_list; |
e61df66c6 io-wq: ensure fre... |
100 |
struct list_head all_list; |
771b53d03 io-wq: small thre... |
101 102 |
struct io_wq *wq; |
86f3cd1b5 io-wq: handle has... |
103 |
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; |
771b53d03 io-wq: small thre... |
104 105 106 107 108 109 110 111 |
}; /* * Per io_wq state */ struct io_wq { struct io_wqe **wqes; unsigned long state; |
771b53d03 io-wq: small thre... |
112 |
|
e9fd93965 io_uring/io-wq: f... |
113 |
free_work_fn *free_work; |
f5fa38c59 io_wq: add per-wq... |
114 |
io_wq_work_fn *do_work; |
7d7230652 io_wq: add get/pu... |
115 |
|
771b53d03 io-wq: small thre... |
116 |
struct task_struct *manager; |
c5def4ab8 io-wq: add suppor... |
117 |
struct user_struct *user; |
771b53d03 io-wq: small thre... |
118 119 |
refcount_t refs; struct completion done; |
848f7e188 io-wq: make the i... |
120 121 |
refcount_t use_refs; |
771b53d03 io-wq: small thre... |
122 |
}; |
771b53d03 io-wq: small thre... |
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
static bool io_worker_get(struct io_worker *worker) { return refcount_inc_not_zero(&worker->ref); } static void io_worker_release(struct io_worker *worker) { if (refcount_dec_and_test(&worker->ref)) wake_up_process(worker->task); } /* * Note: drops the wqe->lock if returning true! The caller must re-acquire * the lock in that case. Some callers need to restart handling if this * happens, so we can't just re-acquire the lock on behalf of the caller. */ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) { |
fcb323cc5 io_uring: io_urin... |
141 |
bool dropped_lock = false; |
cccf0ee83 io_uring/io-wq: d... |
142 143 144 |
if (worker->saved_creds) { revert_creds(worker->saved_creds); worker->cur_creds = worker->saved_creds = NULL; |
181e448d8 io_uring: async w... |
145 |
} |
fcb323cc5 io_uring: io_urin... |
146 147 |
if (current->files != worker->restore_files) { __acquire(&wqe->lock); |
95da84659 io_wq: Make io_wq... |
148 |
raw_spin_unlock_irq(&wqe->lock); |
fcb323cc5 io_uring: io_urin... |
149 150 151 152 |
dropped_lock = true; task_lock(current); current->files = worker->restore_files; |
9b8284921 io_uring: referen... |
153 |
current->nsproxy = worker->restore_nsproxy; |
fcb323cc5 io_uring: io_urin... |
154 155 |
task_unlock(current); } |
9392a27d8 io-wq: add suppor... |
156 157 |
if (current->fs != worker->restore_fs) current->fs = worker->restore_fs; |
771b53d03 io-wq: small thre... |
158 159 160 161 162 |
/* * If we have an active mm, we need to drop the wq lock before unusing * it. If we do, return true and let the caller retry the idle loop. */ if (worker->mm) { |
fcb323cc5 io_uring: io_urin... |
163 164 |
if (!dropped_lock) { __acquire(&wqe->lock); |
95da84659 io_wq: Make io_wq... |
165 |
raw_spin_unlock_irq(&wqe->lock); |
fcb323cc5 io_uring: io_urin... |
166 167 |
dropped_lock = true; } |
771b53d03 io-wq: small thre... |
168 |
__set_current_state(TASK_RUNNING); |
f5678e7f2 kernel: better do... |
169 |
kthread_unuse_mm(worker->mm); |
771b53d03 io-wq: small thre... |
170 171 |
mmput(worker->mm); worker->mm = NULL; |
771b53d03 io-wq: small thre... |
172 |
} |
91d8f5191 io_uring: add blk... |
173 174 175 176 177 178 |
#ifdef CONFIG_BLK_CGROUP if (worker->blkcg_css) { kthread_associate_blkcg(NULL); worker->blkcg_css = NULL; } #endif |
fcb323cc5 io_uring: io_urin... |
179 |
return dropped_lock; |
771b53d03 io-wq: small thre... |
180 |
} |
c5def4ab8 io-wq: add suppor... |
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, struct io_wq_work *work) { if (work->flags & IO_WQ_WORK_UNBOUND) return &wqe->acct[IO_WQ_ACCT_UNBOUND]; return &wqe->acct[IO_WQ_ACCT_BOUND]; } static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, struct io_worker *worker) { if (worker->flags & IO_WORKER_F_BOUND) return &wqe->acct[IO_WQ_ACCT_BOUND]; return &wqe->acct[IO_WQ_ACCT_UNBOUND]; } |
771b53d03 io-wq: small thre... |
198 199 200 |
static void io_worker_exit(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; |
c5def4ab8 io-wq: add suppor... |
201 |
struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); |
771b53d03 io-wq: small thre... |
202 203 204 205 206 207 208 209 210 211 212 213 214 |
/* * If we're not at zero, someone else is holding a brief reference * to the worker. Wait for that to go away. */ set_current_state(TASK_INTERRUPTIBLE); if (!refcount_dec_and_test(&worker->ref)) schedule(); __set_current_state(TASK_RUNNING); preempt_disable(); current->flags &= ~PF_IO_WORKER; if (worker->flags & IO_WORKER_F_RUNNING) |
c5def4ab8 io-wq: add suppor... |
215 216 217 |
atomic_dec(&acct->nr_running); if (!(worker->flags & IO_WORKER_F_BOUND)) atomic_dec(&wqe->wq->user->processes); |
771b53d03 io-wq: small thre... |
218 219 |
worker->flags = 0; preempt_enable(); |
95da84659 io_wq: Make io_wq... |
220 |
raw_spin_lock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
221 |
hlist_nulls_del_rcu(&worker->nulls_node); |
e61df66c6 io-wq: ensure fre... |
222 |
list_del_rcu(&worker->all_list); |
771b53d03 io-wq: small thre... |
223 224 |
if (__io_worker_unuse(wqe, worker)) { __release(&wqe->lock); |
95da84659 io_wq: Make io_wq... |
225 |
raw_spin_lock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
226 |
} |
c5def4ab8 io-wq: add suppor... |
227 |
acct->nr_workers--; |
95da84659 io_wq: Make io_wq... |
228 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
229 |
|
364b05fd0 io-wq: use kfree_... |
230 |
kfree_rcu(worker, rcu); |
c4068bf89 io-wq: fix use-af... |
231 232 |
if (refcount_dec_and_test(&wqe->wq->refs)) complete(&wqe->wq->done); |
771b53d03 io-wq: small thre... |
233 |
} |
c5def4ab8 io-wq: add suppor... |
234 235 236 |
static inline bool io_wqe_run_queue(struct io_wqe *wqe) __must_hold(wqe->lock) { |
6206f0e18 io-wq: shrink io_... |
237 238 |
if (!wq_list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED)) |
c5def4ab8 io-wq: add suppor... |
239 240 241 242 243 244 245 246 247 248 249 250 251 |
return true; return false; } /* * Check head of free list for an available worker. If one isn't available, * caller must wake up the wq manager to create one. */ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) __must_hold(RCU) { struct hlist_nulls_node *n; struct io_worker *worker; |
021d1cdda io-wq: remove now... |
252 |
n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); |
c5def4ab8 io-wq: add suppor... |
253 254 255 256 257 |
if (is_a_nulls(n)) return false; worker = hlist_nulls_entry(n, struct io_worker, nulls_node); if (io_worker_get(worker)) { |
506d95ff5 io-wq: remove wor... |
258 |
wake_up_process(worker->task); |
c5def4ab8 io-wq: add suppor... |
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
io_worker_release(worker); return true; } return false; } /* * We need a worker. If we find a free one, we're good. If not, and we're * below the max number of workers, wake up the manager to create one. */ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) { bool ret; /* * Most likely an attempt to queue unbounded work on an io_wq that * wasn't setup with any unbounded workers. */ WARN_ON_ONCE(!acct->max_workers); rcu_read_lock(); ret = io_wqe_activate_free_worker(wqe); rcu_read_unlock(); if (!ret && acct->nr_workers < acct->max_workers) wake_up_process(wqe->wq->manager); } static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) { struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); atomic_inc(&acct->nr_running); } static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) __must_hold(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) io_wqe_wake_worker(wqe, acct); } |
771b53d03 io-wq: small thre... |
303 304 305 306 307 308 309 |
static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) { allow_kernel_signal(SIGINT); current->flags |= PF_IO_WORKER; worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); |
fcb323cc5 io_uring: io_urin... |
310 |
worker->restore_files = current->files; |
9b8284921 io_uring: referen... |
311 |
worker->restore_nsproxy = current->nsproxy; |
9392a27d8 io-wq: add suppor... |
312 |
worker->restore_fs = current->fs; |
c5def4ab8 io-wq: add suppor... |
313 |
io_wqe_inc_running(wqe, worker); |
771b53d03 io-wq: small thre... |
314 315 316 317 318 319 320 321 322 323 |
} /* * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist */ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, struct io_wq_work *work) __must_hold(wqe->lock) { |
c5def4ab8 io-wq: add suppor... |
324 |
bool worker_bound, work_bound; |
771b53d03 io-wq: small thre... |
325 326 327 |
if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; hlist_nulls_del_init_rcu(&worker->nulls_node); |
771b53d03 io-wq: small thre... |
328 |
} |
c5def4ab8 io-wq: add suppor... |
329 330 331 332 333 |
/* * If worker is moving from bound to unbound (or vice versa), then * ensure we update the running accounting. */ |
b2e9c7d64 io-wq: remove ext... |
334 335 336 |
worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; if (worker_bound != work_bound) { |
c5def4ab8 io-wq: add suppor... |
337 338 339 340 341 342 343 344 345 346 347 348 349 350 |
io_wqe_dec_running(wqe, worker); if (work_bound) { worker->flags |= IO_WORKER_F_BOUND; wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; atomic_dec(&wqe->wq->user->processes); } else { worker->flags &= ~IO_WORKER_F_BOUND; wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; atomic_inc(&wqe->wq->user->processes); } io_wqe_inc_running(wqe, worker); } |
771b53d03 io-wq: small thre... |
351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
} /* * No work, worker going to sleep. Move to freelist, and unuse mm if we * have one attached. Dropping the mm may potentially sleep, so we drop * the lock in that case and return success. Since the caller has to * retry the loop in that case (we changed task state), we don't regrab * the lock if we return success. */ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) __must_hold(wqe->lock) { if (!(worker->flags & IO_WORKER_F_FREE)) { worker->flags |= IO_WORKER_F_FREE; |
021d1cdda io-wq: remove now... |
365 |
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); |
771b53d03 io-wq: small thre... |
366 367 368 369 |
} return __io_worker_unuse(wqe, worker); } |
60cf46ae6 io-wq: hash depen... |
370 371 372 373 374 375 |
static inline unsigned int io_get_work_hash(struct io_wq_work *work) { return work->flags >> IO_WQ_HASH_SHIFT; } static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) |
771b53d03 io-wq: small thre... |
376 377 |
__must_hold(wqe->lock) { |
6206f0e18 io-wq: shrink io_... |
378 |
struct io_wq_work_node *node, *prev; |
86f3cd1b5 io-wq: handle has... |
379 |
struct io_wq_work *work, *tail; |
60cf46ae6 io-wq: hash depen... |
380 |
unsigned int hash; |
771b53d03 io-wq: small thre... |
381 |
|
6206f0e18 io-wq: shrink io_... |
382 383 |
wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); |
771b53d03 io-wq: small thre... |
384 |
/* not hashed, can run anytime */ |
8766dd516 io-wq: split hash... |
385 |
if (!io_wq_is_hashed(work)) { |
86f3cd1b5 io-wq: handle has... |
386 |
wq_list_del(&wqe->work_list, node, prev); |
771b53d03 io-wq: small thre... |
387 388 389 390 |
return work; } /* hashed, can run if not already running */ |
60cf46ae6 io-wq: hash depen... |
391 392 393 |
hash = io_get_work_hash(work); if (!(wqe->hash_map & BIT(hash))) { wqe->hash_map |= BIT(hash); |
86f3cd1b5 io-wq: handle has... |
394 395 396 397 |
/* all items with this hash lie in [work, tail] */ tail = wqe->hash_tail[hash]; wqe->hash_tail[hash] = NULL; wq_list_cut(&wqe->work_list, &tail->list, prev); |
771b53d03 io-wq: small thre... |
398 399 400 401 402 403 |
return work; } } return NULL; } |
cccf0ee83 io_uring/io-wq: d... |
404 405 406 |
static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) { if (worker->mm) { |
f5678e7f2 kernel: better do... |
407 |
kthread_unuse_mm(worker->mm); |
cccf0ee83 io_uring/io-wq: d... |
408 409 410 |
mmput(worker->mm); worker->mm = NULL; } |
37c54f9bd kernel: set USER_... |
411 |
if (!work->mm) |
cccf0ee83 io_uring/io-wq: d... |
412 |
return; |
37c54f9bd kernel: set USER_... |
413 |
|
cccf0ee83 io_uring/io-wq: d... |
414 |
if (mmget_not_zero(work->mm)) { |
f5678e7f2 kernel: better do... |
415 |
kthread_use_mm(work->mm); |
cccf0ee83 io_uring/io-wq: d... |
416 417 418 419 420 421 422 423 424 |
worker->mm = work->mm; /* hang on to this mm */ work->mm = NULL; return; } /* failed grabbing mm, ensure work gets cancelled */ work->flags |= IO_WQ_WORK_CANCEL; } |
91d8f5191 io_uring: add blk... |
425 426 427 428 429 430 431 432 433 434 |
static inline void io_wq_switch_blkcg(struct io_worker *worker, struct io_wq_work *work) { #ifdef CONFIG_BLK_CGROUP if (work->blkcg_css != worker->blkcg_css) { kthread_associate_blkcg(work->blkcg_css); worker->blkcg_css = work->blkcg_css; } #endif } |
cccf0ee83 io_uring/io-wq: d... |
435 436 437 438 439 440 441 442 443 444 445 |
static void io_wq_switch_creds(struct io_worker *worker, struct io_wq_work *work) { const struct cred *old_creds = override_creds(work->creds); worker->cur_creds = work->creds; if (worker->saved_creds) put_cred(old_creds); /* creds set by previous switch */ else worker->saved_creds = old_creds; } |
dc026a73c io-wq: shuffle io... |
446 447 448 449 450 451 |
static void io_impersonate_work(struct io_worker *worker, struct io_wq_work *work) { if (work->files && current->files != work->files) { task_lock(current); current->files = work->files; |
9b8284921 io_uring: referen... |
452 |
current->nsproxy = work->nsproxy; |
dc026a73c io-wq: shuffle io... |
453 454 455 456 457 458 459 460 |
task_unlock(current); } if (work->fs && current->fs != work->fs) current->fs = work->fs; if (work->mm != worker->mm) io_wq_switch_mm(worker, work); if (worker->cur_creds != work->creds) io_wq_switch_creds(worker, work); |
57f1a6495 io_uring/io-wq: m... |
461 |
current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->fsize; |
91d8f5191 io_uring: add blk... |
462 |
io_wq_switch_blkcg(worker, work); |
dc026a73c io-wq: shuffle io... |
463 464 465 466 467 |
} static void io_assign_current_work(struct io_worker *worker, struct io_wq_work *work) { |
d78298e73 io-wq: don't resc... |
468 469 470 471 472 473 |
if (work) { /* flush pending signals before assigning new work */ if (signal_pending(current)) flush_signals(current); cond_resched(); } |
dc026a73c io-wq: shuffle io... |
474 475 476 477 478 |
spin_lock_irq(&worker->lock); worker->cur_work = work; spin_unlock_irq(&worker->lock); } |
60cf46ae6 io-wq: hash depen... |
479 |
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); |
771b53d03 io-wq: small thre... |
480 481 482 |
static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { |
771b53d03 io-wq: small thre... |
483 484 485 486 |
struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; do { |
86f3cd1b5 io-wq: handle has... |
487 |
struct io_wq_work *work; |
f462fd36f io-wq: optimise o... |
488 |
get_next: |
771b53d03 io-wq: small thre... |
489 |
/* |
771b53d03 io-wq: small thre... |
490 491 492 493 494 495 |
* If we got some work, mark us as busy. If we didn't, but * the list isn't empty, it means we stalled on hashed work. * Mark us stalled so we don't keep looking for work when we * can't make progress, any work completion or insertion will * clear the stalled flag. */ |
60cf46ae6 io-wq: hash depen... |
496 |
work = io_get_next_work(wqe); |
771b53d03 io-wq: small thre... |
497 498 |
if (work) __io_worker_busy(wqe, worker, work); |
6206f0e18 io-wq: shrink io_... |
499 |
else if (!wq_list_empty(&wqe->work_list)) |
771b53d03 io-wq: small thre... |
500 |
wqe->flags |= IO_WQE_FLAG_STALLED; |
95da84659 io_wq: Make io_wq... |
501 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
502 503 |
if (!work) break; |
58e393198 io-wq: optimise l... |
504 |
io_assign_current_work(worker, work); |
36c2f9223 io-wq: ensure we ... |
505 |
|
dc026a73c io-wq: shuffle io... |
506 507 |
/* handle a whole dependent link */ do { |
86f3cd1b5 io-wq: handle has... |
508 |
struct io_wq_work *old_work, *next_hashed, *linked; |
b089ed390 io-wq: update has... |
509 |
unsigned int hash = io_get_work_hash(work); |
dc026a73c io-wq: shuffle io... |
510 |
|
86f3cd1b5 io-wq: handle has... |
511 |
next_hashed = wq_next_work(work); |
58e393198 io-wq: optimise l... |
512 |
io_impersonate_work(worker, work); |
dc026a73c io-wq: shuffle io... |
513 514 515 516 517 518 |
/* * OK to set IO_WQ_WORK_CANCEL even for uncancellable * work, the worker function will do the right thing. */ if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) work->flags |= IO_WQ_WORK_CANCEL; |
f4db7182e io-wq: return nex... |
519 520 |
old_work = work; linked = wq->do_work(work); |
86f3cd1b5 io-wq: handle has... |
521 522 523 524 525 526 527 |
work = next_hashed; if (!work && linked && !io_wq_is_hashed(linked)) { work = linked; linked = NULL; } io_assign_current_work(worker, work); |
e9fd93965 io_uring/io-wq: f... |
528 |
wq->free_work(old_work); |
dc026a73c io-wq: shuffle io... |
529 |
|
86f3cd1b5 io-wq: handle has... |
530 531 532 533 |
if (linked) io_wqe_enqueue(wqe, linked); if (hash != -1U && !next_hashed) { |
95da84659 io_wq: Make io_wq... |
534 |
raw_spin_lock_irq(&wqe->lock); |
dc026a73c io-wq: shuffle io... |
535 536 |
wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; |
f462fd36f io-wq: optimise o... |
537 538 539 |
/* skip unnecessary unlock-lock wqe->lock */ if (!work) goto get_next; |
95da84659 io_wq: Make io_wq... |
540 |
raw_spin_unlock_irq(&wqe->lock); |
7d7230652 io_wq: add get/pu... |
541 |
} |
58e393198 io-wq: optimise l... |
542 |
} while (work); |
7d7230652 io_wq: add get/pu... |
543 |
|
95da84659 io_wq: Make io_wq... |
544 |
raw_spin_lock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
545 546 |
} while (1); } |
771b53d03 io-wq: small thre... |
547 548 549 550 551 |
static int io_wqe_worker(void *data) { struct io_worker *worker = data; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; |
771b53d03 io-wq: small thre... |
552 553 554 555 |
io_worker_start(wqe, worker); while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { |
506d95ff5 io-wq: remove wor... |
556 |
set_current_state(TASK_INTERRUPTIBLE); |
e995d5123 io-wq: briefly sp... |
557 |
loop: |
95da84659 io_wq: Make io_wq... |
558 |
raw_spin_lock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
559 560 561 |
if (io_wqe_run_queue(wqe)) { __set_current_state(TASK_RUNNING); io_worker_handle_work(worker); |
e995d5123 io-wq: briefly sp... |
562 |
goto loop; |
771b53d03 io-wq: small thre... |
563 564 565 566 |
} /* drops the lock on success, retry */ if (__io_worker_idle(wqe, worker)) { __release(&wqe->lock); |
e995d5123 io-wq: briefly sp... |
567 |
goto loop; |
771b53d03 io-wq: small thre... |
568 |
} |
95da84659 io_wq: Make io_wq... |
569 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
570 571 572 573 574 575 576 577 578 |
if (signal_pending(current)) flush_signals(current); if (schedule_timeout(WORKER_IDLE_TIMEOUT)) continue; /* timed out, exit unless we're the fixed worker */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || !(worker->flags & IO_WORKER_F_FIXED)) break; } |
771b53d03 io-wq: small thre... |
579 |
if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { |
95da84659 io_wq: Make io_wq... |
580 |
raw_spin_lock_irq(&wqe->lock); |
6206f0e18 io-wq: shrink io_... |
581 |
if (!wq_list_empty(&wqe->work_list)) |
771b53d03 io-wq: small thre... |
582 583 |
io_worker_handle_work(worker); else |
95da84659 io_wq: Make io_wq... |
584 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
585 586 587 588 589 590 591 |
} io_worker_exit(worker); return 0; } /* |
771b53d03 io-wq: small thre... |
592 593 594 595 596 597 598 599 600 601 602 603 |
* Called when a worker is scheduled in. Mark us as currently running. */ void io_wq_worker_running(struct task_struct *tsk) { struct io_worker *worker = kthread_data(tsk); struct io_wqe *wqe = worker->wqe; if (!(worker->flags & IO_WORKER_F_UP)) return; if (worker->flags & IO_WORKER_F_RUNNING) return; worker->flags |= IO_WORKER_F_RUNNING; |
c5def4ab8 io-wq: add suppor... |
604 |
io_wqe_inc_running(wqe, worker); |
771b53d03 io-wq: small thre... |
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 |
} /* * Called when worker is going to sleep. If there are no workers currently * running and we have work pending, wake up a free one or have the manager * set one up. */ void io_wq_worker_sleeping(struct task_struct *tsk) { struct io_worker *worker = kthread_data(tsk); struct io_wqe *wqe = worker->wqe; if (!(worker->flags & IO_WORKER_F_UP)) return; if (!(worker->flags & IO_WORKER_F_RUNNING)) return; worker->flags &= ~IO_WORKER_F_RUNNING; |
95da84659 io_wq: Make io_wq... |
623 |
raw_spin_lock_irq(&wqe->lock); |
c5def4ab8 io-wq: add suppor... |
624 |
io_wqe_dec_running(wqe, worker); |
95da84659 io_wq: Make io_wq... |
625 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
626 |
} |
b60fda600 io-wq: wait for i... |
627 |
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) |
771b53d03 io-wq: small thre... |
628 |
{ |
c4068bf89 io-wq: fix use-af... |
629 |
struct io_wqe_acct *acct = &wqe->acct[index]; |
771b53d03 io-wq: small thre... |
630 |
struct io_worker *worker; |
ad6e005ca io_uring: use kza... |
631 |
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); |
771b53d03 io-wq: small thre... |
632 |
if (!worker) |
b60fda600 io-wq: wait for i... |
633 |
return false; |
771b53d03 io-wq: small thre... |
634 635 636 |
refcount_set(&worker->ref, 1); worker->nulls_node.pprev = NULL; |
771b53d03 io-wq: small thre... |
637 |
worker->wqe = wqe; |
36c2f9223 io-wq: ensure we ... |
638 |
spin_lock_init(&worker->lock); |
771b53d03 io-wq: small thre... |
639 640 |
worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, |
c5def4ab8 io-wq: add suppor... |
641 |
"io_wqe_worker-%d/%d", index, wqe->node); |
771b53d03 io-wq: small thre... |
642 643 |
if (IS_ERR(worker->task)) { kfree(worker); |
b60fda600 io-wq: wait for i... |
644 |
return false; |
771b53d03 io-wq: small thre... |
645 |
} |
95da84659 io_wq: Make io_wq... |
646 |
raw_spin_lock_irq(&wqe->lock); |
021d1cdda io-wq: remove now... |
647 |
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); |
e61df66c6 io-wq: ensure fre... |
648 |
list_add_tail_rcu(&worker->all_list, &wqe->all_list); |
771b53d03 io-wq: small thre... |
649 |
worker->flags |= IO_WORKER_F_FREE; |
c5def4ab8 io-wq: add suppor... |
650 651 652 |
if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) |
771b53d03 io-wq: small thre... |
653 |
worker->flags |= IO_WORKER_F_FIXED; |
c5def4ab8 io-wq: add suppor... |
654 |
acct->nr_workers++; |
95da84659 io_wq: Make io_wq... |
655 |
raw_spin_unlock_irq(&wqe->lock); |
771b53d03 io-wq: small thre... |
656 |
|
c5def4ab8 io-wq: add suppor... |
657 658 |
if (index == IO_WQ_ACCT_UNBOUND) atomic_inc(&wq->user->processes); |
c4068bf89 io-wq: fix use-af... |
659 |
refcount_inc(&wq->refs); |
771b53d03 io-wq: small thre... |
660 |
wake_up_process(worker->task); |
b60fda600 io-wq: wait for i... |
661 |
return true; |
771b53d03 io-wq: small thre... |
662 |
} |
c5def4ab8 io-wq: add suppor... |
663 |
static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) |
771b53d03 io-wq: small thre... |
664 665 |
__must_hold(wqe->lock) { |
c5def4ab8 io-wq: add suppor... |
666 |
struct io_wqe_acct *acct = &wqe->acct[index]; |
771b53d03 io-wq: small thre... |
667 |
|
c5def4ab8 io-wq: add suppor... |
668 |
/* if we have available workers or no work, no need */ |
021d1cdda io-wq: remove now... |
669 |
if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) |
c5def4ab8 io-wq: add suppor... |
670 671 |
return false; return acct->nr_workers < acct->max_workers; |
771b53d03 io-wq: small thre... |
672 |
} |
c4068bf89 io-wq: fix use-af... |
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 |
static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) { send_sig(SIGINT, worker->task, 1); return false; } /* * Iterate the passed in list and call the specific function for each * worker that isn't exiting */ static bool io_wq_for_each_worker(struct io_wqe *wqe, bool (*func)(struct io_worker *, void *), void *data) { struct io_worker *worker; bool ret = false; list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { if (io_worker_get(worker)) { /* no task if node is/was offline */ if (worker->task) ret = func(worker, data); io_worker_release(worker); if (ret) break; } } return ret; } static bool io_wq_worker_wake(struct io_worker *worker, void *data) { wake_up_process(worker->task); return false; } |
771b53d03 io-wq: small thre... |
709 710 711 712 713 714 |
/* * Manager thread. Tasked with creating new workers, if we need them. */ static int io_wq_manager(void *data) { struct io_wq *wq = data; |
3fc50ab55 io-wq: fix handli... |
715 |
int node; |
771b53d03 io-wq: small thre... |
716 |
|
b60fda600 io-wq: wait for i... |
717 |
/* create fixed workers */ |
c4068bf89 io-wq: fix use-af... |
718 |
refcount_set(&wq->refs, 1); |
3fc50ab55 io-wq: fix handli... |
719 |
for_each_node(node) { |
7563439ad io-wq: don't call... |
720 721 |
if (!node_online(node)) continue; |
c4068bf89 io-wq: fix use-af... |
722 723 724 725 726 |
if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) continue; set_bit(IO_WQ_BIT_ERROR, &wq->state); set_bit(IO_WQ_BIT_EXIT, &wq->state); goto out; |
b60fda600 io-wq: wait for i... |
727 |
} |
771b53d03 io-wq: small thre... |
728 |
|
b60fda600 io-wq: wait for i... |
729 730 731 |
complete(&wq->done); while (!kthread_should_stop()) { |
aa96bf8a9 io_uring: use io-... |
732 733 |
if (current->task_works) task_work_run(); |
3fc50ab55 io-wq: fix handli... |
734 735 |
for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; |
c5def4ab8 io-wq: add suppor... |
736 |
bool fork_worker[2] = { false, false }; |
771b53d03 io-wq: small thre... |
737 |
|
7563439ad io-wq: don't call... |
738 739 |
if (!node_online(node)) continue; |
95da84659 io_wq: Make io_wq... |
740 |
raw_spin_lock_irq(&wqe->lock); |
c5def4ab8 io-wq: add suppor... |
741 742 743 744 |
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) fork_worker[IO_WQ_ACCT_BOUND] = true; if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) fork_worker[IO_WQ_ACCT_UNBOUND] = true; |
95da84659 io_wq: Make io_wq... |
745 |
raw_spin_unlock_irq(&wqe->lock); |
c5def4ab8 io-wq: add suppor... |
746 747 748 749 |
if (fork_worker[IO_WQ_ACCT_BOUND]) create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); if (fork_worker[IO_WQ_ACCT_UNBOUND]) create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); |
771b53d03 io-wq: small thre... |
750 751 752 753 |
} set_current_state(TASK_INTERRUPTIBLE); schedule_timeout(HZ); } |
aa96bf8a9 io_uring: use io-... |
754 755 |
if (current->task_works) task_work_run(); |
c4068bf89 io-wq: fix use-af... |
756 757 |
out: if (refcount_dec_and_test(&wq->refs)) { |
b60fda600 io-wq: wait for i... |
758 |
complete(&wq->done); |
c4068bf89 io-wq: fix use-af... |
759 760 761 762 763 764 765 766 767 |
return 0; } /* if ERROR is set and we get here, we have workers to wake */ if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { rcu_read_lock(); for_each_node(node) io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); rcu_read_unlock(); } |
b60fda600 io-wq: wait for i... |
768 |
return 0; |
771b53d03 io-wq: small thre... |
769 |
} |
c5def4ab8 io-wq: add suppor... |
770 771 772 773 774 775 776 777 778 779 780 |
static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, struct io_wq_work *work) { bool free_worker; if (!(work->flags & IO_WQ_WORK_UNBOUND)) return true; if (atomic_read(&acct->nr_running)) return true; rcu_read_lock(); |
021d1cdda io-wq: remove now... |
781 |
free_worker = !hlist_nulls_empty(&wqe->free_list); |
c5def4ab8 io-wq: add suppor... |
782 783 784 785 786 787 788 789 790 791 |
rcu_read_unlock(); if (free_worker) return true; if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers && !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN))) return false; return true; } |
e9fd93965 io_uring/io-wq: f... |
792 |
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) |
fc04c39ba io-wq: fix IO_WQ_... |
793 |
{ |
e9fd93965 io_uring/io-wq: f... |
794 |
struct io_wq *wq = wqe->wq; |
fc04c39ba io-wq: fix IO_WQ_... |
795 796 797 798 |
do { struct io_wq_work *old_work = work; work->flags |= IO_WQ_WORK_CANCEL; |
f4db7182e io-wq: return nex... |
799 |
work = wq->do_work(work); |
e9fd93965 io_uring/io-wq: f... |
800 |
wq->free_work(old_work); |
fc04c39ba io-wq: fix IO_WQ_... |
801 802 |
} while (work); } |
86f3cd1b5 io-wq: handle has... |
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 |
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) { unsigned int hash; struct io_wq_work *tail; if (!io_wq_is_hashed(work)) { append: wq_list_add_tail(&work->list, &wqe->work_list); return; } hash = io_get_work_hash(work); tail = wqe->hash_tail[hash]; wqe->hash_tail[hash] = work; if (!tail) goto append; wq_list_add_after(&work->list, &tail->list, &wqe->work_list); } |
771b53d03 io-wq: small thre... |
822 823 |
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { |
c5def4ab8 io-wq: add suppor... |
824 |
struct io_wqe_acct *acct = io_work_get_acct(wqe, work); |
895e2ca0f io-wq: support co... |
825 |
int work_flags; |
771b53d03 io-wq: small thre... |
826 |
unsigned long flags; |
c5def4ab8 io-wq: add suppor... |
827 828 829 830 831 832 833 |
/* * Do early check to see if we need a new unbound worker, and if we do, * if we're allowed to do so. This isn't 100% accurate as there's a * gap between this check and incrementing the value, but that's OK. * It's close enough to not be an issue, fork() has the same delay. */ if (unlikely(!io_wq_can_queue(wqe, acct, work))) { |
e9fd93965 io_uring/io-wq: f... |
834 |
io_run_cancel(work, wqe); |
c5def4ab8 io-wq: add suppor... |
835 836 |
return; } |
895e2ca0f io-wq: support co... |
837 |
work_flags = work->flags; |
95da84659 io_wq: Make io_wq... |
838 |
raw_spin_lock_irqsave(&wqe->lock, flags); |
86f3cd1b5 io-wq: handle has... |
839 |
io_wqe_insert_work(wqe, work); |
771b53d03 io-wq: small thre... |
840 |
wqe->flags &= ~IO_WQE_FLAG_STALLED; |
95da84659 io_wq: Make io_wq... |
841 |
raw_spin_unlock_irqrestore(&wqe->lock, flags); |
771b53d03 io-wq: small thre... |
842 |
|
895e2ca0f io-wq: support co... |
843 844 |
if ((work_flags & IO_WQ_WORK_CONCURRENT) || !atomic_read(&acct->nr_running)) |
c5def4ab8 io-wq: add suppor... |
845 |
io_wqe_wake_worker(wqe, acct); |
771b53d03 io-wq: small thre... |
846 847 848 849 850 851 852 853 854 855 |
} void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) { struct io_wqe *wqe = wq->wqes[numa_node_id()]; io_wqe_enqueue(wqe, work); } /* |
8766dd516 io-wq: split hash... |
856 857 |
* Work items that hash to the same value will not be done in parallel. * Used to limit concurrent writes, generally hashed by inode. |
771b53d03 io-wq: small thre... |
858 |
*/ |
8766dd516 io-wq: split hash... |
859 |
void io_wq_hash_work(struct io_wq_work *work, void *val) |
771b53d03 io-wq: small thre... |
860 |
{ |
8766dd516 io-wq: split hash... |
861 |
unsigned int bit; |
771b53d03 io-wq: small thre... |
862 863 864 |
bit = hash_ptr(val, IO_WQ_HASH_ORDER); work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); |
771b53d03 io-wq: small thre... |
865 |
} |
771b53d03 io-wq: small thre... |
866 867 |
void io_wq_cancel_all(struct io_wq *wq) { |
3fc50ab55 io-wq: fix handli... |
868 |
int node; |
771b53d03 io-wq: small thre... |
869 870 |
set_bit(IO_WQ_BIT_CANCEL, &wq->state); |
771b53d03 io-wq: small thre... |
871 |
rcu_read_lock(); |
3fc50ab55 io-wq: fix handli... |
872 873 |
for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; |
771b53d03 io-wq: small thre... |
874 |
|
e61df66c6 io-wq: ensure fre... |
875 |
io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL); |
771b53d03 io-wq: small thre... |
876 877 878 |
} rcu_read_unlock(); } |
62755e35d io_uring: support... |
879 |
struct io_cb_cancel_data { |
2293b4195 io-wq: remove dup... |
880 881 |
work_cancel_fn *fn; void *data; |
4f26bda15 io-wq: add an opt... |
882 883 884 |
int nr_running; int nr_pending; bool cancel_all; |
62755e35d io_uring: support... |
885 |
}; |
2293b4195 io-wq: remove dup... |
886 |
static bool io_wq_worker_cancel(struct io_worker *worker, void *data) |
62755e35d io_uring: support... |
887 |
{ |
2293b4195 io-wq: remove dup... |
888 |
struct io_cb_cancel_data *match = data; |
6f72653e7 io-wq: use proper... |
889 |
unsigned long flags; |
62755e35d io_uring: support... |
890 891 892 |
/* * Hold the lock to avoid ->cur_work going out of scope, caller |
36c2f9223 io-wq: ensure we ... |
893 |
* may dereference the passed in work. |
62755e35d io_uring: support... |
894 |
*/ |
36c2f9223 io-wq: ensure we ... |
895 |
spin_lock_irqsave(&worker->lock, flags); |
62755e35d io_uring: support... |
896 |
if (worker->cur_work && |
0c9d5ccd2 io-wq: add suppor... |
897 |
!(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && |
2293b4195 io-wq: remove dup... |
898 |
match->fn(worker->cur_work, match->data)) { |
771b53d03 io-wq: small thre... |
899 |
send_sig(SIGINT, worker->task, 1); |
4f26bda15 io-wq: add an opt... |
900 |
match->nr_running++; |
771b53d03 io-wq: small thre... |
901 |
} |
36c2f9223 io-wq: ensure we ... |
902 |
spin_unlock_irqrestore(&worker->lock, flags); |
771b53d03 io-wq: small thre... |
903 |
|
4f26bda15 io-wq: add an opt... |
904 |
return match->nr_running && !match->cancel_all; |
771b53d03 io-wq: small thre... |
905 |
} |
204361a77 io-wq: fix hang a... |
906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 |
static inline void io_wqe_remove_pending(struct io_wqe *wqe, struct io_wq_work *work, struct io_wq_work_node *prev) { unsigned int hash = io_get_work_hash(work); struct io_wq_work *prev_work = NULL; if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) { if (prev) prev_work = container_of(prev, struct io_wq_work, list); if (prev_work && io_get_work_hash(prev_work) == hash) wqe->hash_tail[hash] = prev_work; else wqe->hash_tail[hash] = NULL; } wq_list_del(&wqe->work_list, &work->list, prev); } |
4f26bda15 io-wq: add an opt... |
923 |
static void io_wqe_cancel_pending_work(struct io_wqe *wqe, |
f4c2665e3 io-wq: reorder ca... |
924 |
struct io_cb_cancel_data *match) |
771b53d03 io-wq: small thre... |
925 |
{ |
6206f0e18 io-wq: shrink io_... |
926 |
struct io_wq_work_node *node, *prev; |
771b53d03 io-wq: small thre... |
927 |
struct io_wq_work *work; |
6f72653e7 io-wq: use proper... |
928 |
unsigned long flags; |
771b53d03 io-wq: small thre... |
929 |
|
4f26bda15 io-wq: add an opt... |
930 |
retry: |
95da84659 io_wq: Make io_wq... |
931 |
raw_spin_lock_irqsave(&wqe->lock, flags); |
6206f0e18 io-wq: shrink io_... |
932 933 |
wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); |
4f26bda15 io-wq: add an opt... |
934 935 |
if (!match->fn(work, match->data)) continue; |
204361a77 io-wq: fix hang a... |
936 |
io_wqe_remove_pending(wqe, work, prev); |
95da84659 io_wq: Make io_wq... |
937 |
raw_spin_unlock_irqrestore(&wqe->lock, flags); |
4f26bda15 io-wq: add an opt... |
938 939 940 941 942 943 944 |
io_run_cancel(work, wqe); match->nr_pending++; if (!match->cancel_all) return; /* not safe to continue after unlock */ goto retry; |
771b53d03 io-wq: small thre... |
945 |
} |
95da84659 io_wq: Make io_wq... |
946 |
raw_spin_unlock_irqrestore(&wqe->lock, flags); |
f4c2665e3 io-wq: reorder ca... |
947 |
} |
4f26bda15 io-wq: add an opt... |
948 |
static void io_wqe_cancel_running_work(struct io_wqe *wqe, |
f4c2665e3 io-wq: reorder ca... |
949 950 |
struct io_cb_cancel_data *match) { |
771b53d03 io-wq: small thre... |
951 |
rcu_read_lock(); |
4f26bda15 io-wq: add an opt... |
952 |
io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); |
771b53d03 io-wq: small thre... |
953 |
rcu_read_unlock(); |
771b53d03 io-wq: small thre... |
954 |
} |
2293b4195 io-wq: remove dup... |
955 |
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, |
4f26bda15 io-wq: add an opt... |
956 |
void *data, bool cancel_all) |
771b53d03 io-wq: small thre... |
957 |
{ |
2293b4195 io-wq: remove dup... |
958 |
struct io_cb_cancel_data match = { |
4f26bda15 io-wq: add an opt... |
959 960 961 |
.fn = cancel, .data = data, .cancel_all = cancel_all, |
00bcda13d io-wq: make io_wq... |
962 |
}; |
3fc50ab55 io-wq: fix handli... |
963 |
int node; |
771b53d03 io-wq: small thre... |
964 |
|
f4c2665e3 io-wq: reorder ca... |
965 966 967 968 969 |
/* * First check pending list, if we're lucky we can just remove it * from there. CANCEL_OK means that the work is returned as-new, * no completion will be posted for it. */ |
3fc50ab55 io-wq: fix handli... |
970 971 |
for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; |
771b53d03 io-wq: small thre... |
972 |
|
4f26bda15 io-wq: add an opt... |
973 974 |
io_wqe_cancel_pending_work(wqe, &match); if (match.nr_pending && !match.cancel_all) |
f4c2665e3 io-wq: reorder ca... |
975 |
return IO_WQ_CANCEL_OK; |
771b53d03 io-wq: small thre... |
976 |
} |
f4c2665e3 io-wq: reorder ca... |
977 978 979 980 981 982 983 984 |
/* * Now check if a free (going busy) or busy worker has the work * currently running. If we find it there, we'll return CANCEL_RUNNING * as an indication that we attempt to signal cancellation. The * completion will run normally in this case. */ for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; |
4f26bda15 io-wq: add an opt... |
985 986 |
io_wqe_cancel_running_work(wqe, &match); if (match.nr_running && !match.cancel_all) |
f4c2665e3 io-wq: reorder ca... |
987 988 |
return IO_WQ_CANCEL_RUNNING; } |
4f26bda15 io-wq: add an opt... |
989 990 991 992 |
if (match.nr_running) return IO_WQ_CANCEL_RUNNING; if (match.nr_pending) return IO_WQ_CANCEL_OK; |
f4c2665e3 io-wq: reorder ca... |
993 |
return IO_WQ_CANCEL_NOTFOUND; |
771b53d03 io-wq: small thre... |
994 |
} |
2293b4195 io-wq: remove dup... |
995 996 997 998 999 1000 1001 |
static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data) { return work == data; } enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) { |
4f26bda15 io-wq: add an opt... |
1002 |
return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork, false); |
2293b4195 io-wq: remove dup... |
1003 |
} |
576a347b7 io-wq: have io_wq... |
1004 |
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) |
771b53d03 io-wq: small thre... |
1005 |
{ |
3fc50ab55 io-wq: fix handli... |
1006 |
int ret = -ENOMEM, node; |
771b53d03 io-wq: small thre... |
1007 |
struct io_wq *wq; |
f5fa38c59 io_wq: add per-wq... |
1008 |
if (WARN_ON_ONCE(!data->free_work || !data->do_work)) |
e9fd93965 io_uring/io-wq: f... |
1009 |
return ERR_PTR(-EINVAL); |
ad6e005ca io_uring: use kza... |
1010 |
wq = kzalloc(sizeof(*wq), GFP_KERNEL); |
771b53d03 io-wq: small thre... |
1011 1012 |
if (!wq) return ERR_PTR(-ENOMEM); |
3fc50ab55 io-wq: fix handli... |
1013 |
wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL); |
771b53d03 io-wq: small thre... |
1014 1015 1016 1017 |
if (!wq->wqes) { kfree(wq); return ERR_PTR(-ENOMEM); } |
e9fd93965 io_uring/io-wq: f... |
1018 |
wq->free_work = data->free_work; |
f5fa38c59 io_wq: add per-wq... |
1019 |
wq->do_work = data->do_work; |
7d7230652 io_wq: add get/pu... |
1020 |
|
c5def4ab8 io-wq: add suppor... |
1021 |
/* caller must already hold a reference to this */ |
576a347b7 io-wq: have io_wq... |
1022 |
wq->user = data->user; |
c5def4ab8 io-wq: add suppor... |
1023 |
|
3fc50ab55 io-wq: fix handli... |
1024 |
for_each_node(node) { |
771b53d03 io-wq: small thre... |
1025 |
struct io_wqe *wqe; |
7563439ad io-wq: don't call... |
1026 |
int alloc_node = node; |
771b53d03 io-wq: small thre... |
1027 |
|
7563439ad io-wq: don't call... |
1028 1029 1030 |
if (!node_online(alloc_node)) alloc_node = NUMA_NO_NODE; wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); |
771b53d03 io-wq: small thre... |
1031 |
if (!wqe) |
3fc50ab55 io-wq: fix handli... |
1032 1033 |
goto err; wq->wqes[node] = wqe; |
7563439ad io-wq: don't call... |
1034 |
wqe->node = alloc_node; |
c5def4ab8 io-wq: add suppor... |
1035 1036 |
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); |
576a347b7 io-wq: have io_wq... |
1037 |
if (wq->user) { |
c5def4ab8 io-wq: add suppor... |
1038 1039 1040 1041 |
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); } atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); |
771b53d03 io-wq: small thre... |
1042 |
wqe->wq = wq; |
95da84659 io_wq: Make io_wq... |
1043 |
raw_spin_lock_init(&wqe->lock); |
6206f0e18 io-wq: shrink io_... |
1044 |
INIT_WQ_LIST(&wqe->work_list); |
021d1cdda io-wq: remove now... |
1045 |
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); |
e61df66c6 io-wq: ensure fre... |
1046 |
INIT_LIST_HEAD(&wqe->all_list); |
771b53d03 io-wq: small thre... |
1047 1048 1049 |
} init_completion(&wq->done); |
771b53d03 io-wq: small thre... |
1050 1051 1052 |
wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); if (!IS_ERR(wq->manager)) { wake_up_process(wq->manager); |
b60fda600 io-wq: wait for i... |
1053 1054 1055 1056 1057 |
wait_for_completion(&wq->done); if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { ret = -ENOMEM; goto err; } |
848f7e188 io-wq: make the i... |
1058 |
refcount_set(&wq->use_refs, 1); |
b60fda600 io-wq: wait for i... |
1059 |
reinit_completion(&wq->done); |
771b53d03 io-wq: small thre... |
1060 1061 1062 1063 |
return wq; } ret = PTR_ERR(wq->manager); |
771b53d03 io-wq: small thre... |
1064 |
complete(&wq->done); |
b60fda600 io-wq: wait for i... |
1065 |
err: |
3fc50ab55 io-wq: fix handli... |
1066 1067 |
for_each_node(node) kfree(wq->wqes[node]); |
b60fda600 io-wq: wait for i... |
1068 1069 |
kfree(wq->wqes); kfree(wq); |
771b53d03 io-wq: small thre... |
1070 1071 |
return ERR_PTR(ret); } |
eba6f5a33 io-wq: allow grab... |
1072 1073 |
bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) { |
f5fa38c59 io_wq: add per-wq... |
1074 |
if (data->free_work != wq->free_work || data->do_work != wq->do_work) |
eba6f5a33 io-wq: allow grab... |
1075 1076 1077 1078 |
return false; return refcount_inc_not_zero(&wq->use_refs); } |
848f7e188 io-wq: make the i... |
1079 |
static void __io_wq_destroy(struct io_wq *wq) |
771b53d03 io-wq: small thre... |
1080 |
{ |
3fc50ab55 io-wq: fix handli... |
1081 |
int node; |
771b53d03 io-wq: small thre... |
1082 |
|
b60fda600 io-wq: wait for i... |
1083 1084 |
set_bit(IO_WQ_BIT_EXIT, &wq->state); if (wq->manager) |
771b53d03 io-wq: small thre... |
1085 |
kthread_stop(wq->manager); |
771b53d03 io-wq: small thre... |
1086 1087 |
rcu_read_lock(); |
3fc50ab55 io-wq: fix handli... |
1088 1089 |
for_each_node(node) io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); |
771b53d03 io-wq: small thre... |
1090 1091 1092 |
rcu_read_unlock(); wait_for_completion(&wq->done); |
3fc50ab55 io-wq: fix handli... |
1093 1094 |
for_each_node(node) kfree(wq->wqes[node]); |
771b53d03 io-wq: small thre... |
1095 1096 1097 |
kfree(wq->wqes); kfree(wq); } |
848f7e188 io-wq: make the i... |
1098 1099 1100 1101 1102 1103 |
void io_wq_destroy(struct io_wq *wq) { if (refcount_dec_and_test(&wq->use_refs)) __io_wq_destroy(wq); } |
aa96bf8a9 io_uring: use io-... |
1104 1105 1106 1107 1108 |
struct task_struct *io_wq_get_task(struct io_wq *wq) { return wq->manager; } |