Commit 34ed62461ae4970695974afb9a60ac3df0086830

Authored by Paul E. McKenney
Committed by Paul E. McKenney
1 parent f6161aa153

rcu: Remove restrictions on no-CBs CPUs

Currently, CPU 0 is constrained to not be a no-CBs CPU, and furthermore
at least one no-CBs CPU must remain online at any given time.  These
restrictions are problematic in some situations, such as cases where
all CPUs must run a real-time workload that needs to be insulated from
OS jitter and latencies due to RCU callback invocation.  This commit
therefore provides no-CBs CPUs a (very crude and energy-inefficient)
way to start and to wait for grace periods independently of the normal
RCU callback mechanisms.  This approach allows any or all of the CPUs to
be designated as no-CBs CPUs, and allows any proper subset of the CPUs
(whether no-CBs CPUs or not) to be offlined.

This commit also provides a fix for a locking bug spotted by Xie
ChanglongX <changlongx.xie@intel.com>.

Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>

Showing 4 changed files with 57 additions and 129 deletions Side-by-side Diff

... ... @@ -655,7 +655,7 @@
655 655 Accept the default if unsure.
656 656  
657 657 config RCU_NOCB_CPU
658   - bool "Offload RCU callback processing from boot-selected CPUs"
  658 + bool "Offload RCU callback processing from boot-selected CPUs (EXPERIMENTAL"
659 659 depends on TREE_RCU || TREE_PREEMPT_RCU
660 660 default n
661 661 help
... ... @@ -673,7 +673,7 @@
673 673 callback, and (2) affinity or cgroups can be used to force
674 674 the kthreads to run on whatever set of CPUs is desired.
675 675  
676   - Say Y here if you want reduced OS jitter on selected CPUs.
  676 + Say Y here if you want to help to debug reduced OS jitter.
677 677 Say N here if you are unsure.
678 678  
679 679 endmenu # "RCU Subsystem"
... ... @@ -310,6 +310,8 @@
310 310  
311 311 if (rcu_gp_in_progress(rsp))
312 312 return 0; /* No, a grace period is already in progress. */
  313 + if (rcu_nocb_needs_gp(rdp))
  314 + return 1; /* Yes, a no-CBs CPU needs one. */
313 315 if (!rdp->nxttail[RCU_NEXT_TAIL])
314 316 return 0; /* No, this is a no-CBs (or offline) CPU. */
315 317 if (*rdp->nxttail[RCU_NEXT_READY_TAIL])
316 318  
... ... @@ -1035,10 +1037,11 @@
1035 1037 {
1036 1038 int i;
1037 1039  
  1040 + if (init_nocb_callback_list(rdp))
  1041 + return;
1038 1042 rdp->nxtlist = NULL;
1039 1043 for (i = 0; i < RCU_NEXT_SIZE; i++)
1040 1044 rdp->nxttail[i] = &rdp->nxtlist;
1041   - init_nocb_callback_list(rdp);
1042 1045 }
1043 1046  
1044 1047 /*
... ... @@ -2909,7 +2912,6 @@
2909 2912 struct rcu_data *rdp = per_cpu_ptr(rcu_state->rda, cpu);
2910 2913 struct rcu_node *rnp = rdp->mynode;
2911 2914 struct rcu_state *rsp;
2912   - int ret = NOTIFY_OK;
2913 2915  
2914 2916 trace_rcu_utilization("Start CPU hotplug");
2915 2917 switch (action) {
... ... @@ -2923,10 +2925,7 @@
2923 2925 rcu_boost_kthread_setaffinity(rnp, -1);
2924 2926 break;
2925 2927 case CPU_DOWN_PREPARE:
2926   - if (nocb_cpu_expendable(cpu))
2927   - rcu_boost_kthread_setaffinity(rnp, cpu);
2928   - else
2929   - ret = NOTIFY_BAD;
  2928 + rcu_boost_kthread_setaffinity(rnp, cpu);
2930 2929 break;
2931 2930 case CPU_DYING:
2932 2931 case CPU_DYING_FROZEN:
... ... @@ -2950,7 +2949,7 @@
2950 2949 break;
2951 2950 }
2952 2951 trace_rcu_utilization("End CPU hotplug");
2953   - return ret;
  2952 + return NOTIFY_OK;
2954 2953 }
2955 2954  
2956 2955 /*
... ... @@ -3170,7 +3169,6 @@
3170 3169 rcu_init_one(&rcu_sched_state, &rcu_sched_data);
3171 3170 rcu_init_one(&rcu_bh_state, &rcu_bh_data);
3172 3171 __rcu_init_preempt();
3173   - rcu_init_nocb();
3174 3172 open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
3175 3173  
3176 3174 /*
... ... @@ -326,6 +326,7 @@
326 326 int nocb_p_count_lazy; /* (approximate). */
327 327 wait_queue_head_t nocb_wq; /* For nocb kthreads to sleep on. */
328 328 struct task_struct *nocb_kthread;
  329 + bool nocb_needs_gp;
329 330 #endif /* #ifdef CONFIG_RCU_NOCB_CPU */
330 331  
331 332 int cpu;
... ... @@ -375,12 +376,6 @@
375 376 struct rcu_data __percpu *rda; /* pointer of percu rcu_data. */
376 377 void (*call)(struct rcu_head *head, /* call_rcu() flavor. */
377 378 void (*func)(struct rcu_head *head));
378   -#ifdef CONFIG_RCU_NOCB_CPU
379   - void (*call_remote)(struct rcu_head *head,
380   - void (*func)(struct rcu_head *head));
381   - /* call_rcu() flavor, but for */
382   - /* placing on remote CPU. */
383   -#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
384 379  
385 380 /* The following fields are guarded by the root rcu_node's lock. */
386 381  
387 382  
388 383  
... ... @@ -529,16 +524,15 @@
529 524 static void print_cpu_stall_info_end(void);
530 525 static void zero_cpu_stall_ticks(struct rcu_data *rdp);
531 526 static void increment_cpu_stall_ticks(void);
  527 +static int rcu_nocb_needs_gp(struct rcu_data *rdp);
532 528 static bool is_nocb_cpu(int cpu);
533 529 static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
534 530 bool lazy);
535 531 static bool rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
536 532 struct rcu_data *rdp);
537   -static bool nocb_cpu_expendable(int cpu);
538 533 static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
539 534 static void rcu_spawn_nocb_kthreads(struct rcu_state *rsp);
540   -static void init_nocb_callback_list(struct rcu_data *rdp);
541   -static void __init rcu_init_nocb(void);
  535 +static bool init_nocb_callback_list(struct rcu_data *rdp);
542 536  
543 537 #endif /* #ifndef RCU_TREE_NONCORE */
544 538  
kernel/rcutree_plugin.h
... ... @@ -86,10 +86,6 @@
86 86 printk(KERN_INFO "\tRCU restricting CPUs from NR_CPUS=%d to nr_cpu_ids=%d.\n", NR_CPUS, nr_cpu_ids);
87 87 #ifdef CONFIG_RCU_NOCB_CPU
88 88 if (have_rcu_nocb_mask) {
89   - if (cpumask_test_cpu(0, rcu_nocb_mask)) {
90   - cpumask_clear_cpu(0, rcu_nocb_mask);
91   - pr_info("\tCPU 0: illegal no-CBs CPU (cleared).\n");
92   - }
93 89 cpulist_scnprintf(nocb_buf, sizeof(nocb_buf), rcu_nocb_mask);
94 90 pr_info("\tExperimental no-CBs CPUs: %s.\n", nocb_buf);
95 91 if (rcu_nocb_poll)
... ... @@ -2165,6 +2161,14 @@
2165 2161 }
2166 2162 early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
2167 2163  
  2164 +/*
  2165 + * Does this CPU needs a grace period due to offloaded callbacks?
  2166 + */
  2167 +static int rcu_nocb_needs_gp(struct rcu_data *rdp)
  2168 +{
  2169 + return rdp->nocb_needs_gp;
  2170 +}
  2171 +
2168 2172 /* Is the specified CPU a no-CPUs CPU? */
2169 2173 static bool is_nocb_cpu(int cpu)
2170 2174 {
2171 2175  
2172 2176  
2173 2177  
2174 2178  
2175 2179  
2176 2180  
... ... @@ -2265,98 +2269,42 @@
2265 2269 }
2266 2270  
2267 2271 /*
2268   - * There must be at least one non-no-CBs CPU in operation at any given
2269   - * time, because no-CBs CPUs are not capable of initiating grace periods
2270   - * independently. This function therefore complains if the specified
2271   - * CPU is the last non-no-CBs CPU, allowing the CPU-hotplug system to
2272   - * avoid offlining the last such CPU. (Recursion is a wonderful thing,
2273   - * but you have to have a base case!)
  2272 + * If necessary, kick off a new grace period, and either way wait
  2273 + * for a subsequent grace period to complete.
2274 2274 */
2275   -static bool nocb_cpu_expendable(int cpu)
  2275 +static void rcu_nocb_wait_gp(struct rcu_data *rdp)
2276 2276 {
2277   - cpumask_var_t non_nocb_cpus;
2278   - int ret;
  2277 + unsigned long c;
  2278 + unsigned long flags;
  2279 + unsigned long j;
  2280 + struct rcu_node *rnp = rdp->mynode;
2279 2281  
  2282 + raw_spin_lock_irqsave(&rnp->lock, flags);
  2283 + c = rnp->completed + 2;
  2284 + rdp->nocb_needs_gp = true;
  2285 + raw_spin_unlock_irqrestore(&rnp->lock, flags);
  2286 +
2280 2287 /*
2281   - * If there are no no-CB CPUs or if this CPU is not a no-CB CPU,
2282   - * then offlining this CPU is harmless. Let it happen.
  2288 + * Wait for the grace period. Do so interruptibly to avoid messing
  2289 + * up the load average.
2283 2290 */
2284   - if (!have_rcu_nocb_mask || is_nocb_cpu(cpu))
2285   - return 1;
2286   -
2287   - /* If no memory, play it safe and keep the CPU around. */
2288   - if (!alloc_cpumask_var(&non_nocb_cpus, GFP_NOIO))
2289   - return 0;
2290   - cpumask_andnot(non_nocb_cpus, cpu_online_mask, rcu_nocb_mask);
2291   - cpumask_clear_cpu(cpu, non_nocb_cpus);
2292   - ret = !cpumask_empty(non_nocb_cpus);
2293   - free_cpumask_var(non_nocb_cpus);
2294   - return ret;
  2291 + for (;;) {
  2292 + j = jiffies;
  2293 + schedule_timeout_interruptible(2);
  2294 + raw_spin_lock_irqsave(&rnp->lock, flags);
  2295 + if (ULONG_CMP_GE(rnp->completed, c)) {
  2296 + rdp->nocb_needs_gp = false;
  2297 + raw_spin_unlock_irqrestore(&rnp->lock, flags);
  2298 + break;
  2299 + }
  2300 + if (j == jiffies)
  2301 + flush_signals(current);
  2302 + raw_spin_unlock_irqrestore(&rnp->lock, flags);
  2303 + }
  2304 + smp_mb(); /* Ensure that CB invocation happens after GP end. */
2295 2305 }
2296 2306  
2297 2307 /*
2298   - * Helper structure for remote registry of RCU callbacks.
2299   - * This is needed for when a no-CBs CPU needs to start a grace period.
2300   - * If it just invokes call_rcu(), the resulting callback will be queued,
2301   - * which can result in deadlock.
2302   - */
2303   -struct rcu_head_remote {
2304   - struct rcu_head *rhp;
2305   - call_rcu_func_t *crf;
2306   - void (*func)(struct rcu_head *rhp);
2307   -};
2308   -
2309   -/*
2310   - * Register a callback as specified by the rcu_head_remote struct.
2311   - * This function is intended to be invoked via smp_call_function_single().
2312   - */
2313   -static void call_rcu_local(void *arg)
2314   -{
2315   - struct rcu_head_remote *rhrp =
2316   - container_of(arg, struct rcu_head_remote, rhp);
2317   -
2318   - rhrp->crf(rhrp->rhp, rhrp->func);
2319   -}
2320   -
2321   -/*
2322   - * Set up an rcu_head_remote structure and the invoke call_rcu_local()
2323   - * on CPU 0 (which is guaranteed to be a non-no-CBs CPU) via
2324   - * smp_call_function_single().
2325   - */
2326   -static void invoke_crf_remote(struct rcu_head *rhp,
2327   - void (*func)(struct rcu_head *rhp),
2328   - call_rcu_func_t crf)
2329   -{
2330   - struct rcu_head_remote rhr;
2331   -
2332   - rhr.rhp = rhp;
2333   - rhr.crf = crf;
2334   - rhr.func = func;
2335   - smp_call_function_single(0, call_rcu_local, &rhr, 1);
2336   -}
2337   -
2338   -/*
2339   - * Helper functions to be passed to wait_rcu_gp(), each of which
2340   - * invokes invoke_crf_remote() to register a callback appropriately.
2341   - */
2342   -static void __maybe_unused
2343   -call_rcu_preempt_remote(struct rcu_head *rhp,
2344   - void (*func)(struct rcu_head *rhp))
2345   -{
2346   - invoke_crf_remote(rhp, func, call_rcu);
2347   -}
2348   -static void call_rcu_bh_remote(struct rcu_head *rhp,
2349   - void (*func)(struct rcu_head *rhp))
2350   -{
2351   - invoke_crf_remote(rhp, func, call_rcu_bh);
2352   -}
2353   -static void call_rcu_sched_remote(struct rcu_head *rhp,
2354   - void (*func)(struct rcu_head *rhp))
2355   -{
2356   - invoke_crf_remote(rhp, func, call_rcu_sched);
2357   -}
2358   -
2359   -/*
2360 2308 * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes
2361 2309 * callbacks queued by the corresponding no-CBs CPU.
2362 2310 */
... ... @@ -2390,7 +2338,7 @@
2390 2338 cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
2391 2339 ACCESS_ONCE(rdp->nocb_p_count) += c;
2392 2340 ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
2393   - wait_rcu_gp(rdp->rsp->call_remote);
  2341 + rcu_nocb_wait_gp(rdp);
2394 2342  
2395 2343 /* Each pass through the following loop invokes a callback. */
2396 2344 trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
2397 2345  
2398 2346  
2399 2347  
2400 2348  
2401 2349  
... ... @@ -2443,26 +2391,22 @@
2443 2391 }
2444 2392  
2445 2393 /* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */
2446   -static void init_nocb_callback_list(struct rcu_data *rdp)
  2394 +static bool init_nocb_callback_list(struct rcu_data *rdp)
2447 2395 {
2448 2396 if (rcu_nocb_mask == NULL ||
2449 2397 !cpumask_test_cpu(rdp->cpu, rcu_nocb_mask))
2450   - return;
  2398 + return false;
2451 2399 rdp->nxttail[RCU_NEXT_TAIL] = NULL;
  2400 + return true;
2452 2401 }
2453 2402  
2454   -/* Initialize the ->call_remote fields in the rcu_state structures. */
2455   -static void __init rcu_init_nocb(void)
  2403 +#else /* #ifdef CONFIG_RCU_NOCB_CPU */
  2404 +
  2405 +static int rcu_nocb_needs_gp(struct rcu_data *rdp)
2456 2406 {
2457   -#ifdef CONFIG_PREEMPT_RCU
2458   - rcu_preempt_state.call_remote = call_rcu_preempt_remote;
2459   -#endif /* #ifdef CONFIG_PREEMPT_RCU */
2460   - rcu_bh_state.call_remote = call_rcu_bh_remote;
2461   - rcu_sched_state.call_remote = call_rcu_sched_remote;
  2407 + return 0;
2462 2408 }
2463 2409  
2464   -#else /* #ifdef CONFIG_RCU_NOCB_CPU */
2465   -
2466 2410 static bool is_nocb_cpu(int cpu)
2467 2411 {
2468 2412 return false;
... ... @@ -2480,11 +2424,6 @@
2480 2424 return 0;
2481 2425 }
2482 2426  
2483   -static bool nocb_cpu_expendable(int cpu)
2484   -{
2485   - return 1;
2486   -}
2487   -
2488 2427 static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
2489 2428 {
2490 2429 }
2491 2430  
... ... @@ -2493,12 +2432,9 @@
2493 2432 {
2494 2433 }
2495 2434  
2496   -static void init_nocb_callback_list(struct rcu_data *rdp)
  2435 +static bool init_nocb_callback_list(struct rcu_data *rdp)
2497 2436 {
2498   -}
2499   -
2500   -static void __init rcu_init_nocb(void)
2501   -{
  2437 + return false;
2502 2438 }
2503 2439  
2504 2440 #endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */