Commit b56c0d8937e665a27d90517ee7a746d0aa05af46

Authored by Tejun Heo
1 parent 53c5f5ba42

kthread: implement kthread_worker

Implement simple work processor for kthread.  This is to ease using
kthread.  Single thread workqueue used to be used for things like this
but workqueue won't guarantee fixed kthread association anymore to
enable worker sharing.

This can be used in cases where specific kthread association is
necessary, for example, when it should have RT priority or be assigned
to certain cgroup.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Andrew Morton <akpm@linux-foundation.org>

Showing 2 changed files with 213 additions and 0 deletions Side-by-side Diff

include/linux/kthread.h
... ... @@ -34,5 +34,69 @@
34 34 int kthreadd(void *unused);
35 35 extern struct task_struct *kthreadd_task;
36 36  
  37 +/*
  38 + * Simple work processor based on kthread.
  39 + *
  40 + * This provides easier way to make use of kthreads. A kthread_work
  41 + * can be queued and flushed using queue/flush_kthread_work()
  42 + * respectively. Queued kthread_works are processed by a kthread
  43 + * running kthread_worker_fn().
  44 + *
  45 + * A kthread_work can't be freed while it is executing.
  46 + */
  47 +struct kthread_work;
  48 +typedef void (*kthread_work_func_t)(struct kthread_work *work);
  49 +
  50 +struct kthread_worker {
  51 + spinlock_t lock;
  52 + struct list_head work_list;
  53 + struct task_struct *task;
  54 +};
  55 +
  56 +struct kthread_work {
  57 + struct list_head node;
  58 + kthread_work_func_t func;
  59 + wait_queue_head_t done;
  60 + atomic_t flushing;
  61 + int queue_seq;
  62 + int done_seq;
  63 +};
  64 +
  65 +#define KTHREAD_WORKER_INIT(worker) { \
  66 + .lock = SPIN_LOCK_UNLOCKED, \
  67 + .work_list = LIST_HEAD_INIT((worker).work_list), \
  68 + }
  69 +
  70 +#define KTHREAD_WORK_INIT(work, fn) { \
  71 + .node = LIST_HEAD_INIT((work).node), \
  72 + .func = (fn), \
  73 + .done = __WAIT_QUEUE_HEAD_INITIALIZER((work).done), \
  74 + .flushing = ATOMIC_INIT(0), \
  75 + }
  76 +
  77 +#define DEFINE_KTHREAD_WORKER(worker) \
  78 + struct kthread_worker worker = KTHREAD_WORKER_INIT(worker)
  79 +
  80 +#define DEFINE_KTHREAD_WORK(work, fn) \
  81 + struct kthread_work work = KTHREAD_WORK_INIT(work, fn)
  82 +
  83 +static inline void init_kthread_worker(struct kthread_worker *worker)
  84 +{
  85 + *worker = (struct kthread_worker)KTHREAD_WORKER_INIT(*worker);
  86 +}
  87 +
  88 +static inline void init_kthread_work(struct kthread_work *work,
  89 + kthread_work_func_t fn)
  90 +{
  91 + *work = (struct kthread_work)KTHREAD_WORK_INIT(*work, fn);
  92 +}
  93 +
  94 +int kthread_worker_fn(void *worker_ptr);
  95 +
  96 +bool queue_kthread_work(struct kthread_worker *worker,
  97 + struct kthread_work *work);
  98 +void flush_kthread_work(struct kthread_work *work);
  99 +void flush_kthread_worker(struct kthread_worker *worker);
  100 +
37 101 #endif /* _LINUX_KTHREAD_H */
... ... @@ -14,6 +14,8 @@
14 14 #include <linux/file.h>
15 15 #include <linux/module.h>
16 16 #include <linux/mutex.h>
  17 +#include <linux/slab.h>
  18 +#include <linux/freezer.h>
17 19 #include <trace/events/sched.h>
18 20  
19 21 static DEFINE_SPINLOCK(kthread_create_lock);
... ... @@ -247,4 +249,151 @@
247 249  
248 250 return 0;
249 251 }
  252 +
  253 +/**
  254 + * kthread_worker_fn - kthread function to process kthread_worker
  255 + * @worker_ptr: pointer to initialized kthread_worker
  256 + *
  257 + * This function can be used as @threadfn to kthread_create() or
  258 + * kthread_run() with @worker_ptr argument pointing to an initialized
  259 + * kthread_worker. The started kthread will process work_list until
  260 + * the it is stopped with kthread_stop(). A kthread can also call
  261 + * this function directly after extra initialization.
  262 + *
  263 + * Different kthreads can be used for the same kthread_worker as long
  264 + * as there's only one kthread attached to it at any given time. A
  265 + * kthread_worker without an attached kthread simply collects queued
  266 + * kthread_works.
  267 + */
  268 +int kthread_worker_fn(void *worker_ptr)
  269 +{
  270 + struct kthread_worker *worker = worker_ptr;
  271 + struct kthread_work *work;
  272 +
  273 + WARN_ON(worker->task);
  274 + worker->task = current;
  275 +repeat:
  276 + set_current_state(TASK_INTERRUPTIBLE); /* mb paired w/ kthread_stop */
  277 +
  278 + if (kthread_should_stop()) {
  279 + __set_current_state(TASK_RUNNING);
  280 + spin_lock_irq(&worker->lock);
  281 + worker->task = NULL;
  282 + spin_unlock_irq(&worker->lock);
  283 + return 0;
  284 + }
  285 +
  286 + work = NULL;
  287 + spin_lock_irq(&worker->lock);
  288 + if (!list_empty(&worker->work_list)) {
  289 + work = list_first_entry(&worker->work_list,
  290 + struct kthread_work, node);
  291 + list_del_init(&work->node);
  292 + }
  293 + spin_unlock_irq(&worker->lock);
  294 +
  295 + if (work) {
  296 + __set_current_state(TASK_RUNNING);
  297 + work->func(work);
  298 + smp_wmb(); /* wmb worker-b0 paired with flush-b1 */
  299 + work->done_seq = work->queue_seq;
  300 + smp_mb(); /* mb worker-b1 paired with flush-b0 */
  301 + if (atomic_read(&work->flushing))
  302 + wake_up_all(&work->done);
  303 + } else if (!freezing(current))
  304 + schedule();
  305 +
  306 + try_to_freeze();
  307 + goto repeat;
  308 +}
  309 +EXPORT_SYMBOL_GPL(kthread_worker_fn);
  310 +
  311 +/**
  312 + * queue_kthread_work - queue a kthread_work
  313 + * @worker: target kthread_worker
  314 + * @work: kthread_work to queue
  315 + *
  316 + * Queue @work to work processor @task for async execution. @task
  317 + * must have been created with kthread_worker_create(). Returns %true
  318 + * if @work was successfully queued, %false if it was already pending.
  319 + */
  320 +bool queue_kthread_work(struct kthread_worker *worker,
  321 + struct kthread_work *work)
  322 +{
  323 + bool ret = false;
  324 + unsigned long flags;
  325 +
  326 + spin_lock_irqsave(&worker->lock, flags);
  327 + if (list_empty(&work->node)) {
  328 + list_add_tail(&work->node, &worker->work_list);
  329 + work->queue_seq++;
  330 + if (likely(worker->task))
  331 + wake_up_process(worker->task);
  332 + ret = true;
  333 + }
  334 + spin_unlock_irqrestore(&worker->lock, flags);
  335 + return ret;
  336 +}
  337 +EXPORT_SYMBOL_GPL(queue_kthread_work);
  338 +
  339 +/**
  340 + * flush_kthread_work - flush a kthread_work
  341 + * @work: work to flush
  342 + *
  343 + * If @work is queued or executing, wait for it to finish execution.
  344 + */
  345 +void flush_kthread_work(struct kthread_work *work)
  346 +{
  347 + int seq = work->queue_seq;
  348 +
  349 + atomic_inc(&work->flushing);
  350 +
  351 + /*
  352 + * mb flush-b0 paired with worker-b1, to make sure either
  353 + * worker sees the above increment or we see done_seq update.
  354 + */
  355 + smp_mb__after_atomic_inc();
  356 +
  357 + /* A - B <= 0 tests whether B is in front of A regardless of overflow */
  358 + wait_event(work->done, seq - work->done_seq <= 0);
  359 + atomic_dec(&work->flushing);
  360 +
  361 + /*
  362 + * rmb flush-b1 paired with worker-b0, to make sure our caller
  363 + * sees every change made by work->func().
  364 + */
  365 + smp_mb__after_atomic_dec();
  366 +}
  367 +EXPORT_SYMBOL_GPL(flush_kthread_work);
  368 +
  369 +struct kthread_flush_work {
  370 + struct kthread_work work;
  371 + struct completion done;
  372 +};
  373 +
  374 +static void kthread_flush_work_fn(struct kthread_work *work)
  375 +{
  376 + struct kthread_flush_work *fwork =
  377 + container_of(work, struct kthread_flush_work, work);
  378 + complete(&fwork->done);
  379 +}
  380 +
  381 +/**
  382 + * flush_kthread_worker - flush all current works on a kthread_worker
  383 + * @worker: worker to flush
  384 + *
  385 + * Wait until all currently executing or pending works on @worker are
  386 + * finished.
  387 + */
  388 +void flush_kthread_worker(struct kthread_worker *worker)
  389 +{
  390 + struct kthread_flush_work fwork = {
  391 + KTHREAD_WORK_INIT(fwork.work, kthread_flush_work_fn),
  392 + COMPLETION_INITIALIZER_ONSTACK(fwork.done),
  393 + };
  394 +
  395 + queue_kthread_work(worker, &fwork.work);
  396 + wait_for_completion(&fwork.done);
  397 +}
  398 +EXPORT_SYMBOL_GPL(flush_kthread_worker);