Commit 1537663f5763892cacf1409ac0efef1b4f332d1e

Authored by Tejun Heo
1 parent 6416669975

workqueue: kill cpu_populated_map

Worker management is about to be overhauled.  Simplify things by
removing cpu_populated_map, creating workers for all possible cpus and
making single threaded workqueues behave more like multi threaded
ones.

After this patch, all cwqs are always initialized, all workqueues are
linked on the workqueues list and workers for all possibles cpus
always exist.  This also makes CPU hotplug support simpler - checking
->cpus_allowed before processing works in worker_thread() and flushing
cwqs on CPU_POST_DEAD are enough.

While at it, make get_cwq() always return the cwq for the specified
cpu, add target_cwq() for cases where single thread distinction is
necessary and drop all direct usage of per_cpu_ptr() on wq->cpu_wq.

Signed-off-by: Tejun Heo <tj@kernel.org>

Showing 1 changed file with 59 additions and 114 deletions Side-by-side Diff

... ... @@ -55,6 +55,7 @@
55 55 struct list_head worklist;
56 56 wait_queue_head_t more_work;
57 57 struct work_struct *current_work;
  58 + unsigned int cpu;
58 59  
59 60 struct workqueue_struct *wq; /* I: the owning workqueue */
60 61 struct task_struct *thread;
61 62  
62 63  
63 64  
64 65  
65 66  
... ... @@ -189,34 +190,19 @@
189 190 static LIST_HEAD(workqueues);
190 191  
191 192 static int singlethread_cpu __read_mostly;
192   -static const struct cpumask *cpu_singlethread_map __read_mostly;
193   -/*
194   - * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
195   - * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
196   - * which comes in between can't use for_each_online_cpu(). We could
197   - * use cpu_possible_map, the cpumask below is more a documentation
198   - * than optimization.
199   - */
200   -static cpumask_var_t cpu_populated_map __read_mostly;
201 193  
202   -/* If it's single threaded, it isn't in the list of workqueues. */
203   -static inline bool is_wq_single_threaded(struct workqueue_struct *wq)
  194 +static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
  195 + struct workqueue_struct *wq)
204 196 {
205   - return wq->flags & WQ_SINGLE_THREAD;
  197 + return per_cpu_ptr(wq->cpu_wq, cpu);
206 198 }
207 199  
208   -static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
  200 +static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
  201 + struct workqueue_struct *wq)
209 202 {
210   - return is_wq_single_threaded(wq)
211   - ? cpu_singlethread_map : cpu_populated_map;
212   -}
213   -
214   -static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
215   - struct workqueue_struct *wq)
216   -{
217   - if (unlikely(is_wq_single_threaded(wq)))
  203 + if (unlikely(wq->flags & WQ_SINGLE_THREAD))
218 204 cpu = singlethread_cpu;
219   - return per_cpu_ptr(wq->cpu_wq, cpu);
  205 + return get_cwq(cpu, wq);
220 206 }
221 207  
222 208 /*
... ... @@ -279,7 +265,7 @@
279 265 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
280 266 struct work_struct *work)
281 267 {
282   - struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
  268 + struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
283 269 unsigned long flags;
284 270  
285 271 debug_work_activate(work);
... ... @@ -383,7 +369,7 @@
383 369 timer_stats_timer_set_start_info(&dwork->timer);
384 370  
385 371 /* This stores cwq for the moment, for the timer_fn */
386   - set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
  372 + set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
387 373 timer->expires = jiffies + delay;
388 374 timer->data = (unsigned long)dwork;
389 375 timer->function = delayed_work_timer_fn;
... ... @@ -495,6 +481,10 @@
495 481 if (kthread_should_stop())
496 482 break;
497 483  
  484 + if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed,
  485 + get_cpu_mask(cwq->cpu))))
  486 + set_cpus_allowed_ptr(cwq->thread,
  487 + get_cpu_mask(cwq->cpu));
498 488 run_workqueue(cwq);
499 489 }
500 490  
501 491  
... ... @@ -574,14 +564,13 @@
574 564 */
575 565 void flush_workqueue(struct workqueue_struct *wq)
576 566 {
577   - const struct cpumask *cpu_map = wq_cpu_map(wq);
578 567 int cpu;
579 568  
580 569 might_sleep();
581 570 lock_map_acquire(&wq->lockdep_map);
582 571 lock_map_release(&wq->lockdep_map);
583   - for_each_cpu(cpu, cpu_map)
584   - flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
  572 + for_each_possible_cpu(cpu)
  573 + flush_cpu_workqueue(get_cwq(cpu, wq));
585 574 }
586 575 EXPORT_SYMBOL_GPL(flush_workqueue);
587 576  
... ... @@ -699,7 +688,6 @@
699 688 {
700 689 struct cpu_workqueue_struct *cwq;
701 690 struct workqueue_struct *wq;
702   - const struct cpumask *cpu_map;
703 691 int cpu;
704 692  
705 693 might_sleep();
706 694  
... ... @@ -712,9 +700,8 @@
712 700 return;
713 701  
714 702 wq = cwq->wq;
715   - cpu_map = wq_cpu_map(wq);
716 703  
717   - for_each_cpu(cpu, cpu_map)
  704 + for_each_possible_cpu(cpu)
718 705 wait_on_cpu_work(get_cwq(cpu, wq), work);
719 706 }
720 707  
... ... @@ -972,7 +959,7 @@
972 959  
973 960 BUG_ON(!keventd_wq);
974 961  
975   - cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
  962 + cwq = get_cwq(cpu, keventd_wq);
976 963 if (current == cwq->thread)
977 964 ret = 1;
978 965  
979 966  
980 967  
... ... @@ -980,26 +967,12 @@
980 967  
981 968 }
982 969  
983   -static struct cpu_workqueue_struct *
984   -init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
985   -{
986   - struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
987   -
988   - cwq->wq = wq;
989   - spin_lock_init(&cwq->lock);
990   - INIT_LIST_HEAD(&cwq->worklist);
991   - init_waitqueue_head(&cwq->more_work);
992   -
993   - return cwq;
994   -}
995   -
996 970 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
997 971 {
998 972 struct workqueue_struct *wq = cwq->wq;
999   - const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
1000 973 struct task_struct *p;
1001 974  
1002   - p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
  975 + p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
1003 976 /*
1004 977 * Nobody can add the work_struct to this cwq,
1005 978 * if (caller is __create_workqueue)
1006 979  
... ... @@ -1031,8 +1004,8 @@
1031 1004 struct lock_class_key *key,
1032 1005 const char *lock_name)
1033 1006 {
  1007 + bool singlethread = flags & WQ_SINGLE_THREAD;
1034 1008 struct workqueue_struct *wq;
1035   - struct cpu_workqueue_struct *cwq;
1036 1009 int err = 0, cpu;
1037 1010  
1038 1011 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1039 1012  
1040 1013  
... ... @@ -1048,37 +1021,37 @@
1048 1021 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1049 1022 INIT_LIST_HEAD(&wq->list);
1050 1023  
1051   - if (flags & WQ_SINGLE_THREAD) {
1052   - cwq = init_cpu_workqueue(wq, singlethread_cpu);
1053   - err = create_workqueue_thread(cwq, singlethread_cpu);
1054   - start_workqueue_thread(cwq, -1);
1055   - } else {
1056   - cpu_maps_update_begin();
1057   - /*
1058   - * We must place this wq on list even if the code below fails.
1059   - * cpu_down(cpu) can remove cpu from cpu_populated_map before
1060   - * destroy_workqueue() takes the lock, in that case we leak
1061   - * cwq[cpu]->thread.
1062   - */
1063   - spin_lock(&workqueue_lock);
1064   - list_add(&wq->list, &workqueues);
1065   - spin_unlock(&workqueue_lock);
1066   - /*
1067   - * We must initialize cwqs for each possible cpu even if we
1068   - * are going to call destroy_workqueue() finally. Otherwise
1069   - * cpu_up() can hit the uninitialized cwq once we drop the
1070   - * lock.
1071   - */
1072   - for_each_possible_cpu(cpu) {
1073   - cwq = init_cpu_workqueue(wq, cpu);
1074   - if (err || !cpu_online(cpu))
1075   - continue;
1076   - err = create_workqueue_thread(cwq, cpu);
  1024 + cpu_maps_update_begin();
  1025 + /*
  1026 + * We must initialize cwqs for each possible cpu even if we
  1027 + * are going to call destroy_workqueue() finally. Otherwise
  1028 + * cpu_up() can hit the uninitialized cwq once we drop the
  1029 + * lock.
  1030 + */
  1031 + for_each_possible_cpu(cpu) {
  1032 + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
  1033 +
  1034 + cwq->wq = wq;
  1035 + cwq->cpu = cpu;
  1036 + spin_lock_init(&cwq->lock);
  1037 + INIT_LIST_HEAD(&cwq->worklist);
  1038 + init_waitqueue_head(&cwq->more_work);
  1039 +
  1040 + if (err)
  1041 + continue;
  1042 + err = create_workqueue_thread(cwq, cpu);
  1043 + if (cpu_online(cpu) && !singlethread)
1077 1044 start_workqueue_thread(cwq, cpu);
1078   - }
1079   - cpu_maps_update_done();
  1045 + else
  1046 + start_workqueue_thread(cwq, -1);
1080 1047 }
1081 1048  
  1049 + spin_lock(&workqueue_lock);
  1050 + list_add(&wq->list, &workqueues);
  1051 + spin_unlock(&workqueue_lock);
  1052 +
  1053 + cpu_maps_update_done();
  1054 +
1082 1055 if (err) {
1083 1056 destroy_workqueue(wq);
1084 1057 wq = NULL;
1085 1058  
1086 1059  
... ... @@ -1128,17 +1101,16 @@
1128 1101 */
1129 1102 void destroy_workqueue(struct workqueue_struct *wq)
1130 1103 {
1131   - const struct cpumask *cpu_map = wq_cpu_map(wq);
1132 1104 int cpu;
1133 1105  
1134 1106 cpu_maps_update_begin();
1135 1107 spin_lock(&workqueue_lock);
1136 1108 list_del(&wq->list);
1137 1109 spin_unlock(&workqueue_lock);
  1110 + cpu_maps_update_done();
1138 1111  
1139   - for_each_cpu(cpu, cpu_map)
1140   - cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
1141   - cpu_maps_update_done();
  1112 + for_each_possible_cpu(cpu)
  1113 + cleanup_workqueue_thread(get_cwq(cpu, wq));
1142 1114  
1143 1115 free_percpu(wq->cpu_wq);
1144 1116 kfree(wq);
1145 1117  
1146 1118  
1147 1119  
1148 1120  
1149 1121  
1150 1122  
... ... @@ -1152,48 +1124,25 @@
1152 1124 unsigned int cpu = (unsigned long)hcpu;
1153 1125 struct cpu_workqueue_struct *cwq;
1154 1126 struct workqueue_struct *wq;
1155   - int err = 0;
1156 1127  
1157 1128 action &= ~CPU_TASKS_FROZEN;
1158 1129  
1159   - switch (action) {
1160   - case CPU_UP_PREPARE:
1161   - cpumask_set_cpu(cpu, cpu_populated_map);
1162   - }
1163   -undo:
1164 1130 list_for_each_entry(wq, &workqueues, list) {
1165   - cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  1131 + if (wq->flags & WQ_SINGLE_THREAD)
  1132 + continue;
1166 1133  
1167   - switch (action) {
1168   - case CPU_UP_PREPARE:
1169   - err = create_workqueue_thread(cwq, cpu);
1170   - if (!err)
1171   - break;
1172   - printk(KERN_ERR "workqueue [%s] for %i failed\n",
1173   - wq->name, cpu);
1174   - action = CPU_UP_CANCELED;
1175   - err = -ENOMEM;
1176   - goto undo;
  1134 + cwq = get_cwq(cpu, wq);
1177 1135  
1178   - case CPU_ONLINE:
1179   - start_workqueue_thread(cwq, cpu);
1180   - break;
1181   -
1182   - case CPU_UP_CANCELED:
1183   - start_workqueue_thread(cwq, -1);
  1136 + switch (action) {
1184 1137 case CPU_POST_DEAD:
1185   - cleanup_workqueue_thread(cwq);
  1138 + lock_map_acquire(&cwq->wq->lockdep_map);
  1139 + lock_map_release(&cwq->wq->lockdep_map);
  1140 + flush_cpu_workqueue(cwq);
1186 1141 break;
1187 1142 }
1188 1143 }
1189 1144  
1190   - switch (action) {
1191   - case CPU_UP_CANCELED:
1192   - case CPU_POST_DEAD:
1193   - cpumask_clear_cpu(cpu, cpu_populated_map);
1194   - }
1195   -
1196   - return notifier_from_errno(err);
  1145 + return notifier_from_errno(0);
1197 1146 }
1198 1147  
1199 1148 #ifdef CONFIG_SMP
1200 1149  
... ... @@ -1245,11 +1194,7 @@
1245 1194  
1246 1195 void __init init_workqueues(void)
1247 1196 {
1248   - alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
1249   -
1250   - cpumask_copy(cpu_populated_map, cpu_online_mask);
1251 1197 singlethread_cpu = cpumask_first(cpu_possible_mask);
1252   - cpu_singlethread_map = cpumask_of(singlethread_cpu);
1253 1198 hotcpu_notifier(workqueue_cpu_callback, 0);
1254 1199 keventd_wq = create_workqueue("events");
1255 1200 BUG_ON(!keventd_wq);