Commit e15bacbebb9dcc95f148f28dfc83a6d5e48b60b8

Authored by Dan Kruchinin
Committed by Herbert Xu
1 parent 2197f9a16d

padata: Make two separate cpumasks

The aim of this patch is to make two separate cpumasks
for padata parallel and serial workers respectively.
It allows user to make more thin and sophisticated configurations
of padata framework. For example user may bind parallel and serial workers to non-intersecting
CPU groups to gain better performance. Also each padata instance has notifiers chain for its
cpumasks now. If either parallel or serial or both masks were changed all
interested subsystems will get notification about that. It's especially useful
if padata user uses algorithm for callback CPU selection according to serial cpumask.

Signed-off-by: Dan Kruchinin <dkruchinin@acm.org>
Signed-off-by: Herbert Xu <herbert@gondor.apana.org.au>

Showing 3 changed files with 564 additions and 214 deletions Side-by-side Diff

... ... @@ -24,13 +24,39 @@
24 24 #include <linux/init.h>
25 25 #include <linux/module.h>
26 26 #include <linux/slab.h>
  27 +#include <linux/notifier.h>
27 28 #include <crypto/pcrypt.h>
28 29  
29   -static struct padata_instance *pcrypt_enc_padata;
30   -static struct padata_instance *pcrypt_dec_padata;
31   -static struct workqueue_struct *encwq;
32   -static struct workqueue_struct *decwq;
  30 +struct pcrypt_instance {
  31 + struct padata_instance *pinst;
  32 + struct workqueue_struct *wq;
33 33  
  34 + /*
  35 + * Cpumask for callback CPUs. It should be
  36 + * equal to serial cpumask of corresponding padata instance,
  37 + * so it is updated when padata notifies us about serial
  38 + * cpumask change.
  39 + *
  40 + * cb_cpumask is protected by RCU. This fact prevents us from
  41 + * using cpumask_var_t directly because the actual type of
  42 + * cpumsak_var_t depends on kernel configuration(particularly on
  43 + * CONFIG_CPUMASK_OFFSTACK macro). Depending on the configuration
  44 + * cpumask_var_t may be either a pointer to the struct cpumask
  45 + * or a variable allocated on the stack. Thus we can not safely use
  46 + * cpumask_var_t with RCU operations such as rcu_assign_pointer or
  47 + * rcu_dereference. So cpumask_var_t is wrapped with struct
  48 + * pcrypt_cpumask which makes possible to use it with RCU.
  49 + */
  50 + struct pcrypt_cpumask {
  51 + cpumask_var_t mask;
  52 + } *cb_cpumask;
  53 + struct notifier_block nblock;
  54 +};
  55 +
  56 +static struct pcrypt_instance pencrypt;
  57 +static struct pcrypt_instance pdecrypt;
  58 +
  59 +
34 60 struct pcrypt_instance_ctx {
35 61 struct crypto_spawn spawn;
36 62 unsigned int tfm_count;
37 63  
38 64  
39 65  
40 66  
41 67  
42 68  
... ... @@ -42,25 +68,29 @@
42 68 };
43 69  
44 70 static int pcrypt_do_parallel(struct padata_priv *padata, unsigned int *cb_cpu,
45   - struct padata_instance *pinst)
  71 + struct pcrypt_instance *pcrypt)
46 72 {
47 73 unsigned int cpu_index, cpu, i;
  74 + struct pcrypt_cpumask *cpumask;
48 75  
49 76 cpu = *cb_cpu;
50 77  
51   - if (cpumask_test_cpu(cpu, cpu_active_mask))
  78 + rcu_read_lock_bh();
  79 + cpumask = rcu_dereference(pcrypt->cb_cpumask);
  80 + if (cpumask_test_cpu(cpu, cpumask->mask))
52 81 goto out;
53 82  
54   - cpu_index = cpu % cpumask_weight(cpu_active_mask);
  83 + cpu_index = cpu % cpumask_weight(cpumask->mask);
55 84  
56   - cpu = cpumask_first(cpu_active_mask);
  85 + cpu = cpumask_first(cpumask->mask);
57 86 for (i = 0; i < cpu_index; i++)
58   - cpu = cpumask_next(cpu, cpu_active_mask);
  87 + cpu = cpumask_next(cpu, cpumask->mask);
59 88  
60 89 *cb_cpu = cpu;
61 90  
62 91 out:
63   - return padata_do_parallel(pinst, padata, cpu);
  92 + rcu_read_unlock_bh();
  93 + return padata_do_parallel(pcrypt->pinst, padata, cpu);
64 94 }
65 95  
66 96 static int pcrypt_aead_setkey(struct crypto_aead *parent,
... ... @@ -142,7 +172,7 @@
142 172 req->cryptlen, req->iv);
143 173 aead_request_set_assoc(creq, req->assoc, req->assoclen);
144 174  
145   - err = pcrypt_do_parallel(padata, &ctx->cb_cpu, pcrypt_enc_padata);
  175 + err = pcrypt_do_parallel(padata, &ctx->cb_cpu, &pencrypt);
146 176 if (!err)
147 177 return -EINPROGRESS;
148 178  
... ... @@ -184,7 +214,7 @@
184 214 req->cryptlen, req->iv);
185 215 aead_request_set_assoc(creq, req->assoc, req->assoclen);
186 216  
187   - err = pcrypt_do_parallel(padata, &ctx->cb_cpu, pcrypt_dec_padata);
  217 + err = pcrypt_do_parallel(padata, &ctx->cb_cpu, &pdecrypt);
188 218 if (!err)
189 219 return -EINPROGRESS;
190 220  
... ... @@ -228,7 +258,7 @@
228 258 aead_givcrypt_set_assoc(creq, areq->assoc, areq->assoclen);
229 259 aead_givcrypt_set_giv(creq, req->giv, req->seq);
230 260  
231   - err = pcrypt_do_parallel(padata, &ctx->cb_cpu, pcrypt_enc_padata);
  261 + err = pcrypt_do_parallel(padata, &ctx->cb_cpu, &pencrypt);
232 262 if (!err)
233 263 return -EINPROGRESS;
234 264  
... ... @@ -370,6 +400,88 @@
370 400 kfree(inst);
371 401 }
372 402  
  403 +static int pcrypt_cpumask_change_notify(struct notifier_block *self,
  404 + unsigned long val, void *data)
  405 +{
  406 + struct pcrypt_instance *pcrypt;
  407 + struct pcrypt_cpumask *new_mask, *old_mask;
  408 +
  409 + if (!(val & PADATA_CPU_SERIAL))
  410 + return 0;
  411 +
  412 + pcrypt = container_of(self, struct pcrypt_instance, nblock);
  413 + new_mask = kmalloc(sizeof(*new_mask), GFP_KERNEL);
  414 + if (!new_mask)
  415 + return -ENOMEM;
  416 + if (!alloc_cpumask_var(&new_mask->mask, GFP_KERNEL)) {
  417 + kfree(new_mask);
  418 + return -ENOMEM;
  419 + }
  420 +
  421 + old_mask = pcrypt->cb_cpumask;
  422 +
  423 + padata_get_cpumask(pcrypt->pinst, PADATA_CPU_SERIAL, new_mask->mask);
  424 + rcu_assign_pointer(pcrypt->cb_cpumask, new_mask);
  425 + synchronize_rcu_bh();
  426 +
  427 + free_cpumask_var(old_mask->mask);
  428 + kfree(old_mask);
  429 + return 0;
  430 +}
  431 +
  432 +static int __pcrypt_init_instance(struct pcrypt_instance *pcrypt,
  433 + const char *name)
  434 +{
  435 + int ret = -ENOMEM;
  436 + struct pcrypt_cpumask *mask;
  437 +
  438 + pcrypt->wq = create_workqueue(name);
  439 + if (!pcrypt->wq)
  440 + goto err;
  441 +
  442 + pcrypt->pinst = padata_alloc(pcrypt->wq);
  443 + if (!pcrypt->pinst)
  444 + goto err_destroy_workqueue;
  445 +
  446 + mask = kmalloc(sizeof(*mask), GFP_KERNEL);
  447 + if (!mask)
  448 + goto err_free_padata;
  449 + if (!alloc_cpumask_var(&mask->mask, GFP_KERNEL)) {
  450 + kfree(mask);
  451 + goto err_free_padata;
  452 + }
  453 +
  454 + padata_get_cpumask(pcrypt->pinst, PADATA_CPU_SERIAL, mask->mask);
  455 + rcu_assign_pointer(pcrypt->cb_cpumask, mask);
  456 +
  457 + pcrypt->nblock.notifier_call = pcrypt_cpumask_change_notify;
  458 + ret = padata_register_cpumask_notifier(pcrypt->pinst, &pcrypt->nblock);
  459 + if (ret)
  460 + goto err_free_cpumask;
  461 +
  462 + return ret;
  463 +err_free_cpumask:
  464 + free_cpumask_var(mask->mask);
  465 + kfree(mask);
  466 +err_free_padata:
  467 + padata_free(pcrypt->pinst);
  468 +err_destroy_workqueue:
  469 + destroy_workqueue(pcrypt->wq);
  470 +err:
  471 + return ret;
  472 +}
  473 +
  474 +static void __pcrypt_deinit_instance(struct pcrypt_instance *pcrypt)
  475 +{
  476 + free_cpumask_var(pcrypt->cb_cpumask->mask);
  477 + kfree(pcrypt->cb_cpumask);
  478 +
  479 + padata_stop(pcrypt->pinst);
  480 + padata_unregister_cpumask_notifier(pcrypt->pinst, &pcrypt->nblock);
  481 + destroy_workqueue(pcrypt->wq);
  482 + padata_free(pcrypt->pinst);
  483 +}
  484 +
373 485 static struct crypto_template pcrypt_tmpl = {
374 486 .name = "pcrypt",
375 487 .alloc = pcrypt_alloc,
376 488  
377 489  
378 490  
379 491  
380 492  
381 493  
382 494  
... ... @@ -379,60 +491,31 @@
379 491  
380 492 static int __init pcrypt_init(void)
381 493 {
382   - int err = -ENOMEM;
383   - encwq = create_workqueue("pencrypt");
384   - if (!encwq)
385   - goto err;
  494 + int err;
386 495  
387   - decwq = create_workqueue("pdecrypt");
388   - if (!decwq)
389   - goto err_destroy_encwq;
390   -
391   -
392   - pcrypt_enc_padata = padata_alloc(cpu_possible_mask, encwq);
393   - if (!pcrypt_enc_padata)
394   - goto err_destroy_decwq;
395   -
396   - pcrypt_dec_padata = padata_alloc(cpu_possible_mask, decwq);
397   - if (!pcrypt_dec_padata)
398   - goto err_free_enc_padata;
399   -
400   - err = padata_start(pcrypt_enc_padata);
  496 + err = __pcrypt_init_instance(&pencrypt, "pencrypt");
401 497 if (err)
402   - goto err_free_dec_padata;
  498 + goto err;
403 499  
404   - err = padata_start(pcrypt_dec_padata);
  500 + err = __pcrypt_init_instance(&pdecrypt, "pdecrypt");
405 501 if (err)
406   - goto err_free_dec_padata;
  502 + goto err_deinit_pencrypt;
407 503  
  504 + padata_start(pencrypt.pinst);
  505 + padata_start(pdecrypt.pinst);
  506 +
408 507 return crypto_register_template(&pcrypt_tmpl);
409 508  
410   -err_free_dec_padata:
411   - padata_free(pcrypt_dec_padata);
412   -
413   -err_free_enc_padata:
414   - padata_free(pcrypt_enc_padata);
415   -
416   -err_destroy_decwq:
417   - destroy_workqueue(decwq);
418   -
419   -err_destroy_encwq:
420   - destroy_workqueue(encwq);
421   -
  509 +err_deinit_pencrypt:
  510 + __pcrypt_deinit_instance(&pencrypt);
422 511 err:
423 512 return err;
424 513 }
425 514  
426 515 static void __exit pcrypt_exit(void)
427 516 {
428   - padata_stop(pcrypt_enc_padata);
429   - padata_stop(pcrypt_dec_padata);
430   -
431   - destroy_workqueue(encwq);
432   - destroy_workqueue(decwq);
433   -
434   - padata_free(pcrypt_enc_padata);
435   - padata_free(pcrypt_dec_padata);
  517 + __pcrypt_deinit_instance(&pencrypt);
  518 + __pcrypt_deinit_instance(&pdecrypt);
436 519  
437 520 crypto_unregister_template(&pcrypt_tmpl);
438 521 }
include/linux/padata.h
... ... @@ -25,7 +25,11 @@
25 25 #include <linux/spinlock.h>
26 26 #include <linux/list.h>
27 27 #include <linux/timer.h>
  28 +#include <linux/notifier.h>
28 29  
  30 +#define PADATA_CPU_SERIAL 0x01
  31 +#define PADATA_CPU_PARALLEL 0x02
  32 +
29 33 /**
30 34 * struct padata_priv - Embedded to the users data structure.
31 35 *
... ... @@ -59,7 +63,20 @@
59 63 };
60 64  
61 65 /**
62   - * struct padata_queue - The percpu padata queues.
  66 +* struct padata_serial_queue - The percpu padata serial queue
  67 +*
  68 +* @serial: List to wait for serialization after reordering.
  69 +* @work: work struct for serialization.
  70 +* @pd: Backpointer to the internal control structure.
  71 +*/
  72 +struct padata_serial_queue {
  73 + struct padata_list serial;
  74 + struct work_struct work;
  75 + struct parallel_data *pd;
  76 +};
  77 +
  78 +/**
  79 + * struct padata_parallel_queue - The percpu padata parallel queue
63 80 *
64 81 * @parallel: List to wait for parallelization.
65 82 * @reorder: List to wait for reordering after parallel processing.
66 83  
67 84  
68 85  
69 86  
70 87  
... ... @@ -67,44 +84,52 @@
67 84 * @pwork: work struct for parallelization.
68 85 * @swork: work struct for serialization.
69 86 * @pd: Backpointer to the internal control structure.
  87 + * @work: work struct for parallelization.
  88 + * @num_obj: Number of objects that are processed by this cpu.
70 89 * @cpu_index: Index of the cpu.
71 90 */
72   -struct padata_queue {
73   - struct padata_list parallel;
74   - struct padata_list reorder;
75   - struct padata_list serial;
76   - struct work_struct pwork;
77   - struct work_struct swork;
78   - struct parallel_data *pd;
79   - int cpu_index;
  91 +struct padata_parallel_queue {
  92 + struct padata_list parallel;
  93 + struct padata_list reorder;
  94 + struct parallel_data *pd;
  95 + struct work_struct work;
  96 + atomic_t num_obj;
  97 + int cpu_index;
80 98 };
81 99  
  100 +
82 101 /**
83 102 * struct parallel_data - Internal control structure, covers everything
84 103 * that depends on the cpumask in use.
85 104 *
86 105 * @pinst: padata instance.
87   - * @queue: percpu padata queues.
  106 + * @pqueue: percpu padata queues used for parallelization.
  107 + * @squeue: percpu padata queues used for serialuzation.
88 108 * @seq_nr: The sequence number that will be attached to the next object.
89 109 * @reorder_objects: Number of objects waiting in the reorder queues.
90 110 * @refcnt: Number of objects holding a reference on this parallel_data.
91 111 * @max_seq_nr: Maximal used sequence number.
92   - * @cpumask: cpumask in use.
  112 + * @cpumask: Contains two cpumasks: pcpu and cbcpu for
  113 + * parallel and serial workers respectively.
93 114 * @lock: Reorder lock.
94 115 * @processed: Number of already processed objects.
95 116 * @timer: Reorder timer.
96 117 */
97 118 struct parallel_data {
98   - struct padata_instance *pinst;
99   - struct padata_queue *queue;
100   - atomic_t seq_nr;
101   - atomic_t reorder_objects;
102   - atomic_t refcnt;
103   - unsigned int max_seq_nr;
104   - cpumask_var_t cpumask;
105   - spinlock_t lock ____cacheline_aligned;
106   - unsigned int processed;
107   - struct timer_list timer;
  119 + struct padata_instance *pinst;
  120 + struct padata_parallel_queue *pqueue;
  121 + struct padata_serial_queue *squeue;
  122 + atomic_t seq_nr;
  123 + atomic_t reorder_objects;
  124 + atomic_t refcnt;
  125 + unsigned int max_seq_nr;
  126 + struct {
  127 + cpumask_var_t pcpu;
  128 + cpumask_var_t cbcpu;
  129 + } cpumask;
  130 + spinlock_t lock ____cacheline_aligned;
  131 + unsigned int processed;
  132 + struct timer_list timer;
108 133 };
109 134  
110 135 /**
111 136  
112 137  
113 138  
114 139  
115 140  
... ... @@ -113,33 +138,52 @@
113 138 * @cpu_notifier: cpu hotplug notifier.
114 139 * @wq: The workqueue in use.
115 140 * @pd: The internal control structure.
116   - * @cpumask: User supplied cpumask.
  141 + * @cpumask: User supplied cpumask. Contains two cpumasks: pcpu and
  142 + * cbcpu for parallel and serial works respectivly.
  143 + * @cpumask_change_notifier: Notifiers chain for user-defined notify
  144 + * callbacks that will be called when either @pcpu or @cbcpu
  145 + * or both cpumasks change.
117 146 * @lock: padata instance lock.
118 147 * @flags: padata flags.
119 148 */
120 149 struct padata_instance {
121   - struct notifier_block cpu_notifier;
122   - struct workqueue_struct *wq;
123   - struct parallel_data *pd;
124   - cpumask_var_t cpumask;
125   - struct mutex lock;
126   - u8 flags;
127   -#define PADATA_INIT 1
128   -#define PADATA_RESET 2
129   -#define PADATA_INVALID 4
  150 + struct notifier_block cpu_notifier;
  151 + struct workqueue_struct *wq;
  152 + struct parallel_data *pd;
  153 + struct {
  154 + cpumask_var_t pcpu;
  155 + cpumask_var_t cbcpu;
  156 + } cpumask;
  157 + struct blocking_notifier_head cpumask_change_notifier;
  158 + struct mutex lock;
  159 + u8 flags;
  160 +#define PADATA_INIT 1
  161 +#define PADATA_RESET 2
  162 +#define PADATA_INVALID 4
130 163 };
131 164  
132   -extern struct padata_instance *padata_alloc(const struct cpumask *cpumask,
133   - struct workqueue_struct *wq);
  165 +extern struct padata_instance *padata_alloc(struct workqueue_struct *wq);
  166 +extern struct padata_instance *__padata_alloc(struct workqueue_struct *wq,
  167 + const struct cpumask *pcpumask,
  168 + const struct cpumask *cbcpumask);
134 169 extern void padata_free(struct padata_instance *pinst);
135 170 extern int padata_do_parallel(struct padata_instance *pinst,
136 171 struct padata_priv *padata, int cb_cpu);
137 172 extern void padata_do_serial(struct padata_priv *padata);
138   -extern int padata_set_cpumask(struct padata_instance *pinst,
  173 +extern int padata_get_cpumask(struct padata_instance *pinst,
  174 + int cpumask_type, struct cpumask *out_mask);
  175 +extern int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
139 176 cpumask_var_t cpumask);
140   -extern int padata_add_cpu(struct padata_instance *pinst, int cpu);
141   -extern int padata_remove_cpu(struct padata_instance *pinst, int cpu);
  177 +extern int __padata_set_cpumasks(struct padata_instance *pinst,
  178 + cpumask_var_t pcpumask,
  179 + cpumask_var_t cbcpumask);
  180 +extern int padata_add_cpu(struct padata_instance *pinst, int cpu, int mask);
  181 +extern int padata_remove_cpu(struct padata_instance *pinst, int cpu, int mask);
142 182 extern int padata_start(struct padata_instance *pinst);
143 183 extern void padata_stop(struct padata_instance *pinst);
  184 +extern int padata_register_cpumask_notifier(struct padata_instance *pinst,
  185 + struct notifier_block *nblock);
  186 +extern int padata_unregister_cpumask_notifier(struct padata_instance *pinst,
  187 + struct notifier_block *nblock);
144 188 #endif
... ... @@ -35,9 +35,9 @@
35 35 {
36 36 int cpu, target_cpu;
37 37  
38   - target_cpu = cpumask_first(pd->cpumask);
  38 + target_cpu = cpumask_first(pd->cpumask.pcpu);
39 39 for (cpu = 0; cpu < cpu_index; cpu++)
40   - target_cpu = cpumask_next(target_cpu, pd->cpumask);
  40 + target_cpu = cpumask_next(target_cpu, pd->cpumask.pcpu);
41 41  
42 42 return target_cpu;
43 43 }
44 44  
45 45  
46 46  
47 47  
... ... @@ -53,26 +53,27 @@
53 53 * Hash the sequence numbers to the cpus by taking
54 54 * seq_nr mod. number of cpus in use.
55 55 */
56   - cpu_index = padata->seq_nr % cpumask_weight(pd->cpumask);
  56 + cpu_index = padata->seq_nr % cpumask_weight(pd->cpumask.pcpu);
57 57  
58 58 return padata_index_to_cpu(pd, cpu_index);
59 59 }
60 60  
61   -static void padata_parallel_worker(struct work_struct *work)
  61 +static void padata_parallel_worker(struct work_struct *parallel_work)
62 62 {
63   - struct padata_queue *queue;
  63 + struct padata_parallel_queue *pqueue;
64 64 struct parallel_data *pd;
65 65 struct padata_instance *pinst;
66 66 LIST_HEAD(local_list);
67 67  
68 68 local_bh_disable();
69   - queue = container_of(work, struct padata_queue, pwork);
70   - pd = queue->pd;
  69 + pqueue = container_of(parallel_work,
  70 + struct padata_parallel_queue, work);
  71 + pd = pqueue->pd;
71 72 pinst = pd->pinst;
72 73  
73   - spin_lock(&queue->parallel.lock);
74   - list_replace_init(&queue->parallel.list, &local_list);
75   - spin_unlock(&queue->parallel.lock);
  74 + spin_lock(&pqueue->parallel.lock);
  75 + list_replace_init(&pqueue->parallel.list, &local_list);
  76 + spin_unlock(&pqueue->parallel.lock);
76 77  
77 78 while (!list_empty(&local_list)) {
78 79 struct padata_priv *padata;
... ... @@ -94,7 +95,7 @@
94 95 * @pinst: padata instance
95 96 * @padata: object to be parallelized
96 97 * @cb_cpu: cpu the serialization callback function will run on,
97   - * must be in the cpumask of padata.
  98 + * must be in the serial cpumask of padata(i.e. cpumask.cbcpu).
98 99 *
99 100 * The parallelization callback function will run with BHs off.
100 101 * Note: Every object which is parallelized by padata_do_parallel
... ... @@ -104,7 +105,7 @@
104 105 struct padata_priv *padata, int cb_cpu)
105 106 {
106 107 int target_cpu, err;
107   - struct padata_queue *queue;
  108 + struct padata_parallel_queue *queue;
108 109 struct parallel_data *pd;
109 110  
110 111 rcu_read_lock_bh();
... ... @@ -115,7 +116,7 @@
115 116 if (!(pinst->flags & PADATA_INIT))
116 117 goto out;
117 118  
118   - if (!cpumask_test_cpu(cb_cpu, pd->cpumask))
  119 + if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu))
119 120 goto out;
120 121  
121 122 err = -EBUSY;
122 123  
... ... @@ -136,13 +137,13 @@
136 137 padata->seq_nr = atomic_inc_return(&pd->seq_nr);
137 138  
138 139 target_cpu = padata_cpu_hash(padata);
139   - queue = per_cpu_ptr(pd->queue, target_cpu);
  140 + queue = per_cpu_ptr(pd->pqueue, target_cpu);
140 141  
141 142 spin_lock(&queue->parallel.lock);
142 143 list_add_tail(&padata->list, &queue->parallel.list);
143 144 spin_unlock(&queue->parallel.lock);
144 145  
145   - queue_work_on(target_cpu, pinst->wq, &queue->pwork);
  146 + queue_work_on(target_cpu, pinst->wq, &queue->work);
146 147  
147 148 out:
148 149 rcu_read_unlock_bh();
149 150  
... ... @@ -172,11 +173,11 @@
172 173 {
173 174 int cpu, num_cpus;
174 175 int next_nr, next_index;
175   - struct padata_queue *queue, *next_queue;
  176 + struct padata_parallel_queue *queue, *next_queue;
176 177 struct padata_priv *padata;
177 178 struct padata_list *reorder;
178 179  
179   - num_cpus = cpumask_weight(pd->cpumask);
  180 + num_cpus = cpumask_weight(pd->cpumask.pcpu);
180 181  
181 182 /*
182 183 * Calculate the percpu reorder queue and the sequence
183 184  
... ... @@ -185,13 +186,13 @@
185 186 next_nr = pd->processed;
186 187 next_index = next_nr % num_cpus;
187 188 cpu = padata_index_to_cpu(pd, next_index);
188   - next_queue = per_cpu_ptr(pd->queue, cpu);
  189 + next_queue = per_cpu_ptr(pd->pqueue, cpu);
189 190  
190 191 if (unlikely(next_nr > pd->max_seq_nr)) {
191 192 next_nr = next_nr - pd->max_seq_nr - 1;
192 193 next_index = next_nr % num_cpus;
193 194 cpu = padata_index_to_cpu(pd, next_index);
194   - next_queue = per_cpu_ptr(pd->queue, cpu);
  195 + next_queue = per_cpu_ptr(pd->pqueue, cpu);
195 196 pd->processed = 0;
196 197 }
197 198  
... ... @@ -215,7 +216,7 @@
215 216 goto out;
216 217 }
217 218  
218   - queue = per_cpu_ptr(pd->queue, smp_processor_id());
  219 + queue = per_cpu_ptr(pd->pqueue, smp_processor_id());
219 220 if (queue->cpu_index == next_queue->cpu_index) {
220 221 padata = ERR_PTR(-ENODATA);
221 222 goto out;
... ... @@ -229,7 +230,7 @@
229 230 static void padata_reorder(struct parallel_data *pd)
230 231 {
231 232 struct padata_priv *padata;
232   - struct padata_queue *queue;
  233 + struct padata_serial_queue *squeue;
233 234 struct padata_instance *pinst = pd->pinst;
234 235  
235 236 /*
236 237  
237 238  
... ... @@ -268,13 +269,13 @@
268 269 return;
269 270 }
270 271  
271   - queue = per_cpu_ptr(pd->queue, padata->cb_cpu);
  272 + squeue = per_cpu_ptr(pd->squeue, padata->cb_cpu);
272 273  
273   - spin_lock(&queue->serial.lock);
274   - list_add_tail(&padata->list, &queue->serial.list);
275   - spin_unlock(&queue->serial.lock);
  274 + spin_lock(&squeue->serial.lock);
  275 + list_add_tail(&padata->list, &squeue->serial.list);
  276 + spin_unlock(&squeue->serial.lock);
276 277  
277   - queue_work_on(padata->cb_cpu, pinst->wq, &queue->swork);
  278 + queue_work_on(padata->cb_cpu, pinst->wq, &squeue->work);
278 279 }
279 280  
280 281 spin_unlock_bh(&pd->lock);
281 282  
282 283  
283 284  
... ... @@ -300,19 +301,19 @@
300 301 padata_reorder(pd);
301 302 }
302 303  
303   -static void padata_serial_worker(struct work_struct *work)
  304 +static void padata_serial_worker(struct work_struct *serial_work)
304 305 {
305   - struct padata_queue *queue;
  306 + struct padata_serial_queue *squeue;
306 307 struct parallel_data *pd;
307 308 LIST_HEAD(local_list);
308 309  
309 310 local_bh_disable();
310   - queue = container_of(work, struct padata_queue, swork);
311   - pd = queue->pd;
  311 + squeue = container_of(serial_work, struct padata_serial_queue, work);
  312 + pd = squeue->pd;
312 313  
313   - spin_lock(&queue->serial.lock);
314   - list_replace_init(&queue->serial.list, &local_list);
315   - spin_unlock(&queue->serial.lock);
  314 + spin_lock(&squeue->serial.lock);
  315 + list_replace_init(&squeue->serial.list, &local_list);
  316 + spin_unlock(&squeue->serial.lock);
316 317  
317 318 while (!list_empty(&local_list)) {
318 319 struct padata_priv *padata;
319 320  
320 321  
321 322  
... ... @@ -339,18 +340,18 @@
339 340 void padata_do_serial(struct padata_priv *padata)
340 341 {
341 342 int cpu;
342   - struct padata_queue *queue;
  343 + struct padata_parallel_queue *pqueue;
343 344 struct parallel_data *pd;
344 345  
345 346 pd = padata->pd;
346 347  
347 348 cpu = get_cpu();
348   - queue = per_cpu_ptr(pd->queue, cpu);
  349 + pqueue = per_cpu_ptr(pd->pqueue, cpu);
349 350  
350   - spin_lock(&queue->reorder.lock);
  351 + spin_lock(&pqueue->reorder.lock);
351 352 atomic_inc(&pd->reorder_objects);
352   - list_add_tail(&padata->list, &queue->reorder.list);
353   - spin_unlock(&queue->reorder.lock);
  353 + list_add_tail(&padata->list, &pqueue->reorder.list);
  354 + spin_unlock(&pqueue->reorder.lock);
354 355  
355 356 put_cpu();
356 357  
357 358  
358 359  
359 360  
360 361  
361 362  
362 363  
363 364  
364 365  
365 366  
366 367  
367 368  
368 369  
... ... @@ -358,51 +359,88 @@
358 359 }
359 360 EXPORT_SYMBOL(padata_do_serial);
360 361  
361   -/* Allocate and initialize the internal cpumask dependend resources. */
362   -static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
363   - const struct cpumask *cpumask)
  362 +static int padata_setup_cpumasks(struct parallel_data *pd,
  363 + const struct cpumask *pcpumask,
  364 + const struct cpumask *cbcpumask)
364 365 {
365   - int cpu, cpu_index, num_cpus;
366   - struct padata_queue *queue;
367   - struct parallel_data *pd;
  366 + if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
  367 + return -ENOMEM;
368 368  
369   - cpu_index = 0;
  369 + cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_active_mask);
  370 + if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
  371 + free_cpumask_var(pd->cpumask.cbcpu);
  372 + return -ENOMEM;
  373 + }
370 374  
371   - pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
372   - if (!pd)
373   - goto err;
  375 + cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_active_mask);
  376 + return 0;
  377 +}
374 378  
375   - pd->queue = alloc_percpu(struct padata_queue);
376   - if (!pd->queue)
377   - goto err_free_pd;
  379 +static void __padata_list_init(struct padata_list *pd_list)
  380 +{
  381 + INIT_LIST_HEAD(&pd_list->list);
  382 + spin_lock_init(&pd_list->lock);
  383 +}
378 384  
379   - if (!alloc_cpumask_var(&pd->cpumask, GFP_KERNEL))
380   - goto err_free_queue;
  385 +/* Initialize all percpu queues used by serial workers */
  386 +static void padata_init_squeues(struct parallel_data *pd)
  387 +{
  388 + int cpu;
  389 + struct padata_serial_queue *squeue;
381 390  
382   - cpumask_and(pd->cpumask, cpumask, cpu_active_mask);
  391 + for_each_cpu(cpu, pd->cpumask.cbcpu) {
  392 + squeue = per_cpu_ptr(pd->squeue, cpu);
  393 + squeue->pd = pd;
  394 + __padata_list_init(&squeue->serial);
  395 + INIT_WORK(&squeue->work, padata_serial_worker);
  396 + }
  397 +}
383 398  
384   - for_each_cpu(cpu, pd->cpumask) {
385   - queue = per_cpu_ptr(pd->queue, cpu);
  399 +/* Initialize all percpu queues used by parallel workers */
  400 +static void padata_init_pqueues(struct parallel_data *pd)
  401 +{
  402 + int cpu_index, num_cpus, cpu;
  403 + struct padata_parallel_queue *pqueue;
386 404  
387   - queue->pd = pd;
  405 + cpu_index = 0;
  406 + for_each_cpu(cpu, pd->cpumask.pcpu) {
  407 + pqueue = per_cpu_ptr(pd->pqueue, cpu);
  408 + pqueue->pd = pd;
  409 + pqueue->cpu_index = cpu_index;
388 410  
389   - queue->cpu_index = cpu_index;
390   - cpu_index++;
391   -
392   - INIT_LIST_HEAD(&queue->reorder.list);
393   - INIT_LIST_HEAD(&queue->parallel.list);
394   - INIT_LIST_HEAD(&queue->serial.list);
395   - spin_lock_init(&queue->reorder.lock);
396   - spin_lock_init(&queue->parallel.lock);
397   - spin_lock_init(&queue->serial.lock);
398   -
399   - INIT_WORK(&queue->pwork, padata_parallel_worker);
400   - INIT_WORK(&queue->swork, padata_serial_worker);
  411 + __padata_list_init(&pqueue->reorder);
  412 + __padata_list_init(&pqueue->parallel);
  413 + INIT_WORK(&pqueue->work, padata_parallel_worker);
  414 + atomic_set(&pqueue->num_obj, 0);
401 415 }
402 416  
403   - num_cpus = cpumask_weight(pd->cpumask);
  417 + num_cpus = cpumask_weight(pd->cpumask.pcpu);
404 418 pd->max_seq_nr = (MAX_SEQ_NR / num_cpus) * num_cpus - 1;
  419 +}
405 420  
  421 +/* Allocate and initialize the internal cpumask dependend resources. */
  422 +static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
  423 + const struct cpumask *pcpumask,
  424 + const struct cpumask *cbcpumask)
  425 +{
  426 + struct parallel_data *pd;
  427 +
  428 + pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
  429 + if (!pd)
  430 + goto err;
  431 +
  432 + pd->pqueue = alloc_percpu(struct padata_parallel_queue);
  433 + if (!pd->pqueue)
  434 + goto err_free_pd;
  435 +
  436 + pd->squeue = alloc_percpu(struct padata_serial_queue);
  437 + if (!pd->squeue)
  438 + goto err_free_pqueue;
  439 + if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)
  440 + goto err_free_squeue;
  441 +
  442 + padata_init_pqueues(pd);
  443 + padata_init_squeues(pd);
406 444 setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd);
407 445 atomic_set(&pd->seq_nr, -1);
408 446 atomic_set(&pd->reorder_objects, 0);
... ... @@ -412,8 +450,10 @@
412 450  
413 451 return pd;
414 452  
415   -err_free_queue:
416   - free_percpu(pd->queue);
  453 +err_free_squeue:
  454 + free_percpu(pd->squeue);
  455 +err_free_pqueue:
  456 + free_percpu(pd->pqueue);
417 457 err_free_pd:
418 458 kfree(pd);
419 459 err:
... ... @@ -422,8 +462,10 @@
422 462  
423 463 static void padata_free_pd(struct parallel_data *pd)
424 464 {
425   - free_cpumask_var(pd->cpumask);
426   - free_percpu(pd->queue);
  465 + free_cpumask_var(pd->cpumask.pcpu);
  466 + free_cpumask_var(pd->cpumask.cbcpu);
  467 + free_percpu(pd->pqueue);
  468 + free_percpu(pd->squeue);
427 469 kfree(pd);
428 470 }
429 471  
430 472  
... ... @@ -431,11 +473,12 @@
431 473 static void padata_flush_queues(struct parallel_data *pd)
432 474 {
433 475 int cpu;
434   - struct padata_queue *queue;
  476 + struct padata_parallel_queue *pqueue;
  477 + struct padata_serial_queue *squeue;
435 478  
436   - for_each_cpu(cpu, pd->cpumask) {
437   - queue = per_cpu_ptr(pd->queue, cpu);
438   - flush_work(&queue->pwork);
  479 + for_each_cpu(cpu, pd->cpumask.pcpu) {
  480 + pqueue = per_cpu_ptr(pd->pqueue, cpu);
  481 + flush_work(&pqueue->work);
439 482 }
440 483  
441 484 del_timer_sync(&pd->timer);
... ... @@ -443,9 +486,9 @@
443 486 if (atomic_read(&pd->reorder_objects))
444 487 padata_reorder(pd);
445 488  
446   - for_each_cpu(cpu, pd->cpumask) {
447   - queue = per_cpu_ptr(pd->queue, cpu);
448   - flush_work(&queue->swork);
  489 + for_each_cpu(cpu, pd->cpumask.cbcpu) {
  490 + squeue = per_cpu_ptr(pd->squeue, cpu);
  491 + flush_work(&squeue->work);
449 492 }
450 493  
451 494 BUG_ON(atomic_read(&pd->refcnt) != 0);
452 495  
453 496  
454 497  
455 498  
... ... @@ -475,21 +518,63 @@
475 518 struct parallel_data *pd_new)
476 519 {
477 520 struct parallel_data *pd_old = pinst->pd;
  521 + int notification_mask = 0;
478 522  
479 523 pinst->flags |= PADATA_RESET;
480 524  
481 525 rcu_assign_pointer(pinst->pd, pd_new);
482 526  
483 527 synchronize_rcu();
  528 + if (!pd_old)
  529 + goto out;
484 530  
485   - if (pd_old) {
486   - padata_flush_queues(pd_old);
487   - padata_free_pd(pd_old);
488   - }
  531 + padata_flush_queues(pd_old);
  532 + if (!cpumask_equal(pd_old->cpumask.pcpu, pd_new->cpumask.pcpu))
  533 + notification_mask |= PADATA_CPU_PARALLEL;
  534 + if (!cpumask_equal(pd_old->cpumask.cbcpu, pd_new->cpumask.cbcpu))
  535 + notification_mask |= PADATA_CPU_SERIAL;
489 536  
  537 + padata_free_pd(pd_old);
  538 + if (notification_mask)
  539 + blocking_notifier_call_chain(&pinst->cpumask_change_notifier,
  540 + notification_mask, pinst);
  541 +
  542 +out:
490 543 pinst->flags &= ~PADATA_RESET;
491 544 }
492 545  
  546 +/**
  547 + * padata_register_cpumask_notifier - Registers a notifier that will be called
  548 + * if either pcpu or cbcpu or both cpumasks change.
  549 + *
  550 + * @pinst: A poineter to padata instance
  551 + * @nblock: A pointer to notifier block.
  552 + */
  553 +int padata_register_cpumask_notifier(struct padata_instance *pinst,
  554 + struct notifier_block *nblock)
  555 +{
  556 + return blocking_notifier_chain_register(&pinst->cpumask_change_notifier,
  557 + nblock);
  558 +}
  559 +EXPORT_SYMBOL(padata_register_cpumask_notifier);
  560 +
  561 +/**
  562 + * padata_unregister_cpumask_notifier - Unregisters cpumask notifier
  563 + * registered earlier using padata_register_cpumask_notifier
  564 + *
  565 + * @pinst: A pointer to data instance.
  566 + * @nlock: A pointer to notifier block.
  567 + */
  568 +int padata_unregister_cpumask_notifier(struct padata_instance *pinst,
  569 + struct notifier_block *nblock)
  570 +{
  571 + return blocking_notifier_chain_unregister(
  572 + &pinst->cpumask_change_notifier,
  573 + nblock);
  574 +}
  575 +EXPORT_SYMBOL(padata_unregister_cpumask_notifier);
  576 +
  577 +
493 578 /* If cpumask contains no active cpu, we mark the instance as invalid. */
494 579 static bool padata_validate_cpumask(struct padata_instance *pinst,
495 580 const struct cpumask *cpumask)
496 581  
497 582  
498 583  
499 584  
500 585  
501 586  
502 587  
503 588  
... ... @@ -504,36 +589,112 @@
504 589 }
505 590  
506 591 /**
507   - * padata_set_cpumask - set the cpumask that padata should use
  592 + * padata_get_cpumask: Fetch serial or parallel cpumask from the
  593 + * given padata instance and copy it to @out_mask
508 594 *
  595 + * @pinst: A pointer to padata instance
  596 + * @cpumask_type: Specifies which cpumask will be copied.
  597 + * Possible values are PADATA_CPU_SERIAL *or* PADATA_CPU_PARALLEL
  598 + * corresponding to serial and parallel cpumask respectively.
  599 + * @out_mask: A pointer to cpumask structure where selected
  600 + * cpumask will be copied.
  601 + */
  602 +int padata_get_cpumask(struct padata_instance *pinst,
  603 + int cpumask_type, struct cpumask *out_mask)
  604 +{
  605 + struct parallel_data *pd;
  606 + int ret = 0;
  607 +
  608 + rcu_read_lock_bh();
  609 + pd = rcu_dereference(pinst->pd);
  610 + switch (cpumask_type) {
  611 + case PADATA_CPU_SERIAL:
  612 + cpumask_copy(out_mask, pd->cpumask.cbcpu);
  613 + break;
  614 + case PADATA_CPU_PARALLEL:
  615 + cpumask_copy(out_mask, pd->cpumask.pcpu);
  616 + break;
  617 + default:
  618 + ret = -EINVAL;
  619 + }
  620 +
  621 + rcu_read_unlock_bh();
  622 + return ret;
  623 +}
  624 +EXPORT_SYMBOL(padata_get_cpumask);
  625 +
  626 +/**
  627 + * padata_set_cpumask: Sets specified by @cpumask_type cpumask to the value
  628 + * equivalent to @cpumask.
  629 + *
509 630 * @pinst: padata instance
  631 + * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding
  632 + * to parallel and serial cpumasks respectively.
510 633 * @cpumask: the cpumask to use
511 634 */
512   -int padata_set_cpumask(struct padata_instance *pinst,
513   - cpumask_var_t cpumask)
  635 +int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
  636 + cpumask_var_t cpumask)
514 637 {
  638 + struct cpumask *serial_mask, *parallel_mask;
  639 +
  640 + switch (cpumask_type) {
  641 + case PADATA_CPU_PARALLEL:
  642 + serial_mask = pinst->cpumask.cbcpu;
  643 + parallel_mask = cpumask;
  644 + break;
  645 + case PADATA_CPU_SERIAL:
  646 + parallel_mask = pinst->cpumask.pcpu;
  647 + serial_mask = cpumask;
  648 + break;
  649 + default:
  650 + return -EINVAL;
  651 + }
  652 +
  653 + return __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
  654 +}
  655 +EXPORT_SYMBOL(padata_set_cpumask);
  656 +
  657 +/**
  658 + * __padata_set_cpumasks - Set both parallel and serial cpumasks. The first
  659 + * one is used by parallel workers and the second one
  660 + * by the wokers doing serialization.
  661 + *
  662 + * @pinst: padata instance
  663 + * @pcpumask: the cpumask to use for parallel workers
  664 + * @cbcpumask: the cpumsak to use for serial workers
  665 + */
  666 +int __padata_set_cpumasks(struct padata_instance *pinst,
  667 + cpumask_var_t pcpumask, cpumask_var_t cbcpumask)
  668 +{
515 669 int valid;
516 670 int err = 0;
517 671 struct parallel_data *pd = NULL;
518 672  
519 673 mutex_lock(&pinst->lock);
520 674  
521   - valid = padata_validate_cpumask(pinst, cpumask);
  675 + valid = padata_validate_cpumask(pinst, pcpumask);
522 676 if (!valid) {
523 677 __padata_stop(pinst);
524 678 goto out_replace;
525 679 }
526 680  
  681 + valid = padata_validate_cpumask(pinst, cbcpumask);
  682 + if (!valid) {
  683 + __padata_stop(pinst);
  684 + goto out_replace;
  685 + }
  686 +
527 687 get_online_cpus();
528 688  
529   - pd = padata_alloc_pd(pinst, cpumask);
  689 + pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
530 690 if (!pd) {
531 691 err = -ENOMEM;
532 692 goto out;
533 693 }
534 694  
535 695 out_replace:
536   - cpumask_copy(pinst->cpumask, cpumask);
  696 + cpumask_copy(pinst->cpumask.pcpu, pcpumask);
  697 + cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
537 698  
538 699 padata_replace(pinst, pd);
539 700  
540 701  
541 702  
542 703  
543 704  
544 705  
545 706  
546 707  
547 708  
... ... @@ -546,41 +707,57 @@
546 707 mutex_unlock(&pinst->lock);
547 708  
548 709 return err;
  710 +
549 711 }
550   -EXPORT_SYMBOL(padata_set_cpumask);
  712 +EXPORT_SYMBOL(__padata_set_cpumasks);
551 713  
552 714 static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
553 715 {
554 716 struct parallel_data *pd;
555 717  
556 718 if (cpumask_test_cpu(cpu, cpu_active_mask)) {
557   - pd = padata_alloc_pd(pinst, pinst->cpumask);
  719 + pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
  720 + pinst->cpumask.cbcpu);
558 721 if (!pd)
559 722 return -ENOMEM;
560 723  
561 724 padata_replace(pinst, pd);
562 725  
563   - if (padata_validate_cpumask(pinst, pinst->cpumask))
  726 + if (padata_validate_cpumask(pinst, pinst->cpumask.pcpu) &&
  727 + padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
564 728 __padata_start(pinst);
565 729 }
566 730  
567 731 return 0;
568 732 }
569 733  
570   -/**
571   - * padata_add_cpu - add a cpu to the padata cpumask
  734 + /**
  735 + * padata_add_cpu - add a cpu to one or both(parallel and serial)
  736 + * padata cpumasks.
572 737 *
573 738 * @pinst: padata instance
574 739 * @cpu: cpu to add
  740 + * @mask: bitmask of flags specifying to which cpumask @cpu shuld be added.
  741 + * The @mask may be any combination of the following flags:
  742 + * PADATA_CPU_SERIAL - serial cpumask
  743 + * PADATA_CPU_PARALLEL - parallel cpumask
575 744 */
576   -int padata_add_cpu(struct padata_instance *pinst, int cpu)
  745 +
  746 +int padata_add_cpu(struct padata_instance *pinst, int cpu, int mask)
577 747 {
578 748 int err;
579 749  
  750 + if (!(mask & (PADATA_CPU_SERIAL | PADATA_CPU_PARALLEL)))
  751 + return -EINVAL;
  752 +
580 753 mutex_lock(&pinst->lock);
581 754  
582 755 get_online_cpus();
583   - cpumask_set_cpu(cpu, pinst->cpumask);
  756 + if (mask & PADATA_CPU_SERIAL)
  757 + cpumask_set_cpu(cpu, pinst->cpumask.cbcpu);
  758 + if (mask & PADATA_CPU_PARALLEL)
  759 + cpumask_set_cpu(cpu, pinst->cpumask.pcpu);
  760 +
584 761 err = __padata_add_cpu(pinst, cpu);
585 762 put_online_cpus();
586 763  
587 764  
... ... @@ -596,13 +773,15 @@
596 773  
597 774 if (cpumask_test_cpu(cpu, cpu_online_mask)) {
598 775  
599   - if (!padata_validate_cpumask(pinst, pinst->cpumask)) {
  776 + if (!padata_validate_cpumask(pinst, pinst->cpumask.pcpu) ||
  777 + !padata_validate_cpumask(pinst, pinst->cpumask.cbcpu)) {
600 778 __padata_stop(pinst);
601 779 padata_replace(pinst, pd);
602 780 goto out;
603 781 }
604 782  
605   - pd = padata_alloc_pd(pinst, pinst->cpumask);
  783 + pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
  784 + pinst->cpumask.cbcpu);
606 785 if (!pd)
607 786 return -ENOMEM;
608 787  
609 788  
610 789  
611 790  
612 791  
... ... @@ -613,20 +792,32 @@
613 792 return 0;
614 793 }
615 794  
616   -/**
617   - * padata_remove_cpu - remove a cpu from the padata cpumask
  795 + /**
  796 + * padata_remove_cpu - remove a cpu from the one or both(serial and paralell)
  797 + * padata cpumasks.
618 798 *
619 799 * @pinst: padata instance
620 800 * @cpu: cpu to remove
  801 + * @mask: bitmask specifying from which cpumask @cpu should be removed
  802 + * The @mask may be any combination of the following flags:
  803 + * PADATA_CPU_SERIAL - serial cpumask
  804 + * PADATA_CPU_PARALLEL - parallel cpumask
621 805 */
622   -int padata_remove_cpu(struct padata_instance *pinst, int cpu)
  806 +int padata_remove_cpu(struct padata_instance *pinst, int cpu, int mask)
623 807 {
624 808 int err;
625 809  
  810 + if (!(mask & (PADATA_CPU_SERIAL | PADATA_CPU_PARALLEL)))
  811 + return -EINVAL;
  812 +
626 813 mutex_lock(&pinst->lock);
627 814  
628 815 get_online_cpus();
629   - cpumask_clear_cpu(cpu, pinst->cpumask);
  816 + if (mask & PADATA_CPU_SERIAL)
  817 + cpumask_clear_cpu(cpu, pinst->cpumask.cbcpu);
  818 + if (mask & PADATA_CPU_PARALLEL)
  819 + cpumask_clear_cpu(cpu, pinst->cpumask.pcpu);
  820 +
630 821 err = __padata_remove_cpu(pinst, cpu);
631 822 put_online_cpus();
632 823  
... ... @@ -672,6 +863,14 @@
672 863 EXPORT_SYMBOL(padata_stop);
673 864  
674 865 #ifdef CONFIG_HOTPLUG_CPU
  866 +
  867 +static inline int pinst_has_cpu(struct padata_instance *pinst, int cpu)
  868 +{
  869 + return cpumask_test_cpu(cpu, pinst->cpumask.pcpu) ||
  870 + cpumask_test_cpu(cpu, pinst->cpumask.cbcpu);
  871 +}
  872 +
  873 +
675 874 static int padata_cpu_callback(struct notifier_block *nfb,
676 875 unsigned long action, void *hcpu)
677 876 {
... ... @@ -684,7 +883,7 @@
684 883 switch (action) {
685 884 case CPU_ONLINE:
686 885 case CPU_ONLINE_FROZEN:
687   - if (!cpumask_test_cpu(cpu, pinst->cpumask))
  886 + if (!pinst_has_cpu(pinst, cpu))
688 887 break;
689 888 mutex_lock(&pinst->lock);
690 889 err = __padata_add_cpu(pinst, cpu);
... ... @@ -695,7 +894,7 @@
695 894  
696 895 case CPU_DOWN_PREPARE:
697 896 case CPU_DOWN_PREPARE_FROZEN:
698   - if (!cpumask_test_cpu(cpu, pinst->cpumask))
  897 + if (!pinst_has_cpu(pinst, cpu))
699 898 break;
700 899 mutex_lock(&pinst->lock);
701 900 err = __padata_remove_cpu(pinst, cpu);
... ... @@ -706,7 +905,7 @@
706 905  
707 906 case CPU_UP_CANCELED:
708 907 case CPU_UP_CANCELED_FROZEN:
709   - if (!cpumask_test_cpu(cpu, pinst->cpumask))
  908 + if (!pinst_has_cpu(pinst, cpu))
710 909 break;
711 910 mutex_lock(&pinst->lock);
712 911 __padata_remove_cpu(pinst, cpu);
... ... @@ -714,7 +913,7 @@
714 913  
715 914 case CPU_DOWN_FAILED:
716 915 case CPU_DOWN_FAILED_FROZEN:
717   - if (!cpumask_test_cpu(cpu, pinst->cpumask))
  916 + if (!pinst_has_cpu(pinst, cpu))
718 917 break;
719 918 mutex_lock(&pinst->lock);
720 919 __padata_add_cpu(pinst, cpu);
721 920  
722 921  
723 922  
... ... @@ -726,14 +925,30 @@
726 925 #endif
727 926  
728 927 /**
729   - * padata_alloc - allocate and initialize a padata instance
  928 + * padata_alloc - Allocate and initialize padata instance.
  929 + * Use default cpumask(cpu_possible_mask)
  930 + * for serial and parallel workes.
730 931 *
731   - * @cpumask: cpumask that padata uses for parallelization
732 932 * @wq: workqueue to use for the allocated padata instance
733 933 */
734   -struct padata_instance *padata_alloc(const struct cpumask *cpumask,
735   - struct workqueue_struct *wq)
  934 +struct padata_instance *padata_alloc(struct workqueue_struct *wq)
736 935 {
  936 + return __padata_alloc(wq, cpu_possible_mask, cpu_possible_mask);
  937 +}
  938 +EXPORT_SYMBOL(padata_alloc);
  939 +
  940 +/**
  941 + * __padata_alloc - allocate and initialize a padata instance
  942 + * and specify cpumasks for serial and parallel workers.
  943 + *
  944 + * @wq: workqueue to use for the allocated padata instance
  945 + * @pcpumask: cpumask that will be used for padata parallelization
  946 + * @cbcpumask: cpumask that will be used for padata serialization
  947 + */
  948 +struct padata_instance *__padata_alloc(struct workqueue_struct *wq,
  949 + const struct cpumask *pcpumask,
  950 + const struct cpumask *cbcpumask)
  951 +{
737 952 struct padata_instance *pinst;
738 953 struct parallel_data *pd = NULL;
739 954  
740 955  
741 956  
742 957  
743 958  
... ... @@ -742,21 +957,26 @@
742 957 goto err;
743 958  
744 959 get_online_cpus();
745   -
746   - if (!alloc_cpumask_var(&pinst->cpumask, GFP_KERNEL))
  960 + if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
747 961 goto err_free_inst;
748   -
749   - if (padata_validate_cpumask(pinst, cpumask)) {
750   - pd = padata_alloc_pd(pinst, cpumask);
751   - if (!pd)
752   - goto err_free_mask;
  962 + if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
  963 + free_cpumask_var(pinst->cpumask.pcpu);
  964 + goto err_free_inst;
753 965 }
  966 + if (!padata_validate_cpumask(pinst, pcpumask) ||
  967 + !padata_validate_cpumask(pinst, cbcpumask))
  968 + goto err_free_masks;
754 969  
  970 + pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
  971 + if (!pd)
  972 + goto err_free_masks;
  973 +
755 974 rcu_assign_pointer(pinst->pd, pd);
756 975  
757 976 pinst->wq = wq;
758 977  
759   - cpumask_copy(pinst->cpumask, cpumask);
  978 + cpumask_copy(pinst->cpumask.pcpu, pcpumask);
  979 + cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
760 980  
761 981 pinst->flags = 0;
762 982  
763 983  
764 984  
... ... @@ -768,19 +988,21 @@
768 988  
769 989 put_online_cpus();
770 990  
  991 + BLOCKING_INIT_NOTIFIER_HEAD(&pinst->cpumask_change_notifier);
771 992 mutex_init(&pinst->lock);
772 993  
773 994 return pinst;
774 995  
775   -err_free_mask:
776   - free_cpumask_var(pinst->cpumask);
  996 +err_free_masks:
  997 + free_cpumask_var(pinst->cpumask.pcpu);
  998 + free_cpumask_var(pinst->cpumask.cbcpu);
777 999 err_free_inst:
778 1000 kfree(pinst);
779 1001 put_online_cpus();
780 1002 err:
781 1003 return NULL;
782 1004 }
783   -EXPORT_SYMBOL(padata_alloc);
  1005 +EXPORT_SYMBOL(__padata_alloc);
784 1006  
785 1007 /**
786 1008 * padata_free - free a padata instance
... ... @@ -795,7 +1017,8 @@
795 1017  
796 1018 padata_stop(pinst);
797 1019 padata_free_pd(pinst->pd);
798   - free_cpumask_var(pinst->cpumask);
  1020 + free_cpumask_var(pinst->cpumask.pcpu);
  1021 + free_cpumask_var(pinst->cpumask.cbcpu);
799 1022 kfree(pinst);
800 1023 }
801 1024 EXPORT_SYMBOL(padata_free);