rcu/nocb: De-offloading CB kthread
authorFrederic Weisbecker <frederic@kernel.org>
Fri, 13 Nov 2020 12:13:19 +0000 (13:13 +0100)
committerPaul E. McKenney <paulmck@kernel.org>
Thu, 7 Jan 2021 00:24:19 +0000 (16:24 -0800)
To de-offload callback processing back onto a CPU, it is necessary to
clear SEGCBLIST_OFFLOAD and notify the nocb CB kthread, which will then
clear its own bit flag and go to sleep to stop handling callbacks.  This
commit makes that change.  It will also be necessary to notify the nocb
GP kthread in this same way, which is the subject of a follow-on commit.

Cc: Josh Triplett <josh@joshtriplett.org>
Cc: Steven Rostedt <rostedt@goodmis.org>
Cc: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Cc: Lai Jiangshan <jiangshanlai@gmail.com>
Cc: Joel Fernandes <joel@joelfernandes.org>
Cc: Neeraj Upadhyay <neeraju@codeaurora.org>
Cc: Thomas Gleixner <tglx@linutronix.de>
Inspired-by: Paul E. McKenney <paulmck@kernel.org>
Tested-by: Boqun Feng <boqun.feng@gmail.com>
Signed-off-by: Frederic Weisbecker <frederic@kernel.org>
[ paulmck: Add export per kernel test robot feedback. ]
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
include/linux/rcupdate.h
kernel/rcu/rcu_segcblist.c
kernel/rcu/rcu_segcblist.h
kernel/rcu/tree.h
kernel/rcu/tree_plugin.h

index de08264113111e0b43c531838489a6e3085f5326..40266eb418b60d2809d1001ffb97b7575644e2c4 100644 (file)
@@ -104,8 +104,10 @@ static inline void rcu_user_exit(void) { }
 
 #ifdef CONFIG_RCU_NOCB_CPU
 void rcu_init_nohz(void);
+int rcu_nocb_cpu_deoffload(int cpu);
 #else /* #ifdef CONFIG_RCU_NOCB_CPU */
 static inline void rcu_init_nohz(void) { }
+static inline int rcu_nocb_cpu_deoffload(int cpu) { return 0; }
 #endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
 
 /**
index 7fc6362625b21d019242dd4c213e6bcba572456c..7f181c9675f761bd8fbc81294a665da0dd0ecb85 100644 (file)
@@ -264,10 +264,14 @@ void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
  * Mark the specified rcu_segcblist structure as offloaded.  This
  * structure must be empty.
  */
-void rcu_segcblist_offload(struct rcu_segcblist *rsclp)
+void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload)
 {
-       rcu_segcblist_clear_flags(rsclp, SEGCBLIST_SOFTIRQ_ONLY);
-       rcu_segcblist_set_flags(rsclp, SEGCBLIST_OFFLOADED);
+       if (offload) {
+               rcu_segcblist_clear_flags(rsclp, SEGCBLIST_SOFTIRQ_ONLY);
+               rcu_segcblist_set_flags(rsclp, SEGCBLIST_OFFLOADED);
+       } else {
+               rcu_segcblist_clear_flags(rsclp, SEGCBLIST_OFFLOADED);
+       }
 }
 
 /*
index e05952ab9b877b27e8d239ee7ec39f4c5515664c..28c9a5225afc6ac421d0dfd735d52371a9ae49d3 100644 (file)
@@ -109,7 +109,7 @@ void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp);
 void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v);
 void rcu_segcblist_init(struct rcu_segcblist *rsclp);
 void rcu_segcblist_disable(struct rcu_segcblist *rsclp);
-void rcu_segcblist_offload(struct rcu_segcblist *rsclp);
+void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload);
 bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp);
 bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp);
 struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
index 7708ed161f4a272c8b434946859977f79a528d11..e0deb482984795272b3e6f45f2c04586b25d9c44 100644 (file)
@@ -201,6 +201,7 @@ struct rcu_data {
        /* 5) Callback offloading. */
 #ifdef CONFIG_RCU_NOCB_CPU
        struct swait_queue_head nocb_cb_wq; /* For nocb kthreads to sleep on. */
+       struct swait_queue_head nocb_state_wq; /* For offloading state changes */
        struct task_struct *nocb_gp_kthread;
        raw_spinlock_t nocb_lock;       /* Guard following pair of fields. */
        atomic_t nocb_lock_contended;   /* Contention experienced. */
index 7e291ce0a1d6f14443e073456974b8d5a463f0cb..1b870d0d24451936a302a72d643a3fc89d1b2e5a 100644 (file)
@@ -2081,16 +2081,29 @@ static int rcu_nocb_gp_kthread(void *arg)
        return 0;
 }
 
+static inline bool nocb_cb_can_run(struct rcu_data *rdp)
+{
+       u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
+       return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
+{
+       return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
+}
+
 /*
  * Invoke any ready callbacks from the corresponding no-CBs CPU,
  * then, if there are no more, wait for more to appear.
  */
 static void nocb_cb_wait(struct rcu_data *rdp)
 {
+       struct rcu_segcblist *cblist = &rdp->cblist;
+       struct rcu_node *rnp = rdp->mynode;
+       bool needwake_state = false;
+       bool needwake_gp = false;
        unsigned long cur_gp_seq;
        unsigned long flags;
-       bool needwake_gp = false;
-       struct rcu_node *rnp = rdp->mynode;
 
        local_irq_save(flags);
        rcu_momentary_dyntick_idle();
@@ -2100,32 +2113,50 @@ static void nocb_cb_wait(struct rcu_data *rdp)
        local_bh_enable();
        lockdep_assert_irqs_enabled();
        rcu_nocb_lock_irqsave(rdp, flags);
-       if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+       if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
            rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
            raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
                needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
                raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
        }
-       if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
-               rcu_nocb_unlock_irqrestore(rdp, flags);
-               if (needwake_gp)
-                       rcu_gp_kthread_wake();
-               return;
-       }
 
-       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
        WRITE_ONCE(rdp->nocb_cb_sleep, true);
+
+       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+               if (rcu_segcblist_ready_cbs(cblist))
+                       WRITE_ONCE(rdp->nocb_cb_sleep, false);
+       } else {
+               /*
+                * De-offloading. Clear our flag and notify the de-offload worker.
+                * We won't touch the callbacks and keep sleeping until we ever
+                * get re-offloaded.
+                */
+               WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
+               rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
+               if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+                       needwake_state = true;
+       }
+
+       if (rdp->nocb_cb_sleep)
+               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
+
        rcu_nocb_unlock_irqrestore(rdp, flags);
        if (needwake_gp)
                rcu_gp_kthread_wake();
-       swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
-                                !READ_ONCE(rdp->nocb_cb_sleep));
-       if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */
+
+       if (needwake_state)
+               swake_up_one(&rdp->nocb_state_wq);
+
+       do {
+               swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
+                                                   nocb_cb_wait_cond(rdp));
+
                /* ^^^ Ensure CB invocation follows _sleep test. */
-               return;
-       }
-       WARN_ON(signal_pending(current));
-       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
+               if (smp_load_acquire(&rdp->nocb_cb_sleep)) {
+                       WARN_ON(signal_pending(current));
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
+               }
+       } while (!nocb_cb_can_run(rdp));
 }
 
 /*
@@ -2187,6 +2218,67 @@ static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
                do_nocb_deferred_wakeup_common(rdp);
 }
 
+static int __rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
+{
+       struct rcu_segcblist *cblist = &rdp->cblist;
+       bool wake_cb = false;
+       unsigned long flags;
+
+       printk("De-offloading %d\n", rdp->cpu);
+
+       rcu_nocb_lock_irqsave(rdp, flags);
+       rcu_segcblist_offload(cblist, false);
+
+       if (rdp->nocb_cb_sleep) {
+               rdp->nocb_cb_sleep = false;
+               wake_cb = true;
+       }
+       rcu_nocb_unlock_irqrestore(rdp, flags);
+
+       if (wake_cb)
+               swake_up_one(&rdp->nocb_cb_wq);
+
+       swait_event_exclusive(rdp->nocb_state_wq,
+                             !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
+
+       return 0;
+}
+
+static long rcu_nocb_rdp_deoffload(void *arg)
+{
+       struct rcu_data *rdp = arg;
+
+       WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+       return __rcu_nocb_rdp_deoffload(rdp);
+}
+
+int rcu_nocb_cpu_deoffload(int cpu)
+{
+       struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+       int ret = 0;
+
+       if (rdp == rdp->nocb_gp_rdp) {
+               pr_info("Can't deoffload an rdp GP leader (yet)\n");
+               return -EINVAL;
+       }
+       mutex_lock(&rcu_state.barrier_mutex);
+       cpus_read_lock();
+       if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
+               if (cpu_online(cpu)) {
+                       ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
+               } else {
+                       ret = __rcu_nocb_rdp_deoffload(rdp);
+               }
+               if (!ret)
+                       cpumask_clear_cpu(cpu, rcu_nocb_mask);
+       }
+       cpus_read_unlock();
+       mutex_unlock(&rcu_state.barrier_mutex);
+
+       return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
+
 void __init rcu_init_nohz(void)
 {
        int cpu;
@@ -2229,7 +2321,8 @@ void __init rcu_init_nohz(void)
                rdp = per_cpu_ptr(&rcu_data, cpu);
                if (rcu_segcblist_empty(&rdp->cblist))
                        rcu_segcblist_init(&rdp->cblist);
-               rcu_segcblist_offload(&rdp->cblist);
+               rcu_segcblist_offload(&rdp->cblist, true);
+               rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
        }
        rcu_organize_nocb_kthreads();
 }
@@ -2239,6 +2332,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
 {
        init_swait_queue_head(&rdp->nocb_cb_wq);
        init_swait_queue_head(&rdp->nocb_gp_wq);
+       init_swait_queue_head(&rdp->nocb_state_wq);
        raw_spin_lock_init(&rdp->nocb_lock);
        raw_spin_lock_init(&rdp->nocb_bypass_lock);
        raw_spin_lock_init(&rdp->nocb_gp_lock);