rcu: Add sysfs to provide throttled access to rcu_barrier()
authorPaul E. McKenney <paulmck@kernel.org>
Wed, 2 Aug 2023 00:15:25 +0000 (17:15 -0700)
committerFrederic Weisbecker <frederic@kernel.org>
Wed, 13 Sep 2023 20:28:49 +0000 (22:28 +0200)
When running a series of stress tests all making heavy use of RCU,
it is all too possible to OOM the system when the prior test's RCU
callbacks don't get invoked until after the subsequent test starts.
One way of handling this is just a timed wait, but this fails when a
given CPU has so many callbacks queued that they take longer to invoke
than allowed for by that timed wait.

This commit therefore adds an rcutree.do_rcu_barrier module parameter that
is accessible from sysfs.  Writing one of the many synonyms for boolean
"true" will cause an rcu_barrier() to be invoked, but will guarantee that
no more than one rcu_barrier() will be invoked per sixteenth of a second
via this mechanism.  The flip side is that a given request might wait a
second or three longer than absolutely necessary, but only when there are
multiple uses of rcutree.do_rcu_barrier within a one-second time interval.

This commit unnecessarily serializes the rcu_barrier() machinery, given
that serialization is already provided by procfs.  This has the advantage
of allowing throttled rcu_barrier() from other sources within the kernel.

Reported-by: Johannes Weiner <hannes@cmpxchg.org>
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
Signed-off-by: Frederic Weisbecker <frederic@kernel.org>
Documentation/admin-guide/kernel-parameters.txt
kernel/rcu/tree.c

index 0a1731a0f0ef373421c0594886c065461ade1064..7ec8a406d4198ee078cd600eed32687a011b9a8e 100644 (file)
                        Set maximum number of finished RCU callbacks to
                        process in one batch.
 
+       rcutree.do_rcu_barrier= [KNL]
+                       Request a call to rcu_barrier().  This is
+                       throttled so that userspace tests can safely
+                       hammer on the sysfs variable if they so choose.
+                       If triggered before the RCU grace-period machinery
+                       is fully active, this will error out with EAGAIN.
+
        rcutree.dump_tree=      [KNL]
                        Dump the structure of the rcu_node combining tree
                        out at early boot.  This is used for diagnostic
index 7c79480bfaa04e4bb11b5a0ca6e9f6acf614a61b..3c7281fc25a795bfdb3482eac6f10801d842347b 100644 (file)
@@ -4083,6 +4083,82 @@ retry:
 }
 EXPORT_SYMBOL_GPL(rcu_barrier);
 
+static unsigned long rcu_barrier_last_throttle;
+
+/**
+ * rcu_barrier_throttled - Do rcu_barrier(), but limit to one per second
+ *
+ * This can be thought of as guard rails around rcu_barrier() that
+ * permits unrestricted userspace use, at least assuming the hardware's
+ * try_cmpxchg() is robust.  There will be at most one call per second to
+ * rcu_barrier() system-wide from use of this function, which means that
+ * callers might needlessly wait a second or three.
+ *
+ * This is intended for use by test suites to avoid OOM by flushing RCU
+ * callbacks from the previous test before starting the next.  See the
+ * rcutree.do_rcu_barrier module parameter for more information.
+ *
+ * Why not simply make rcu_barrier() more scalable?  That might be
+ * the eventual endpoint, but let's keep it simple for the time being.
+ * Note that the module parameter infrastructure serializes calls to a
+ * given .set() function, but should concurrent .set() invocation ever be
+ * possible, we are ready!
+ */
+static void rcu_barrier_throttled(void)
+{
+       unsigned long j = jiffies;
+       unsigned long old = READ_ONCE(rcu_barrier_last_throttle);
+       unsigned long s = rcu_seq_snap(&rcu_state.barrier_sequence);
+
+       while (time_in_range(j, old, old + HZ / 16) ||
+              !try_cmpxchg(&rcu_barrier_last_throttle, &old, j)) {
+               schedule_timeout_idle(HZ / 16);
+               if (rcu_seq_done(&rcu_state.barrier_sequence, s)) {
+                       smp_mb(); /* caller's subsequent code after above check. */
+                       return;
+               }
+               j = jiffies;
+               old = READ_ONCE(rcu_barrier_last_throttle);
+       }
+       rcu_barrier();
+}
+
+/*
+ * Invoke rcu_barrier_throttled() when a rcutree.do_rcu_barrier
+ * request arrives.  We insist on a true value to allow for possible
+ * future expansion.
+ */
+static int param_set_do_rcu_barrier(const char *val, const struct kernel_param *kp)
+{
+       bool b;
+       int ret;
+
+       if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING)
+               return -EAGAIN;
+       ret = kstrtobool(val, &b);
+       if (!ret && b) {
+               atomic_inc((atomic_t *)kp->arg);
+               rcu_barrier_throttled();
+               atomic_dec((atomic_t *)kp->arg);
+       }
+       return ret;
+}
+
+/*
+ * Output the number of outstanding rcutree.do_rcu_barrier requests.
+ */
+static int param_get_do_rcu_barrier(char *buffer, const struct kernel_param *kp)
+{
+       return sprintf(buffer, "%d\n", atomic_read((atomic_t *)kp->arg));
+}
+
+static const struct kernel_param_ops do_rcu_barrier_ops = {
+       .set = param_set_do_rcu_barrier,
+       .get = param_get_do_rcu_barrier,
+};
+static atomic_t do_rcu_barrier;
+module_param_cb(do_rcu_barrier, &do_rcu_barrier_ops, &do_rcu_barrier, 0644);
+
 /*
  * Compute the mask of online CPUs for the specified rcu_node structure.
  * This will not be stable unless the rcu_node structure's ->lock is