SUNRPC: use lwq for sp_sockets - renamed to sp_xprts
authorNeilBrown <neilb@suse.de>
Mon, 11 Sep 2023 14:40:02 +0000 (10:40 -0400)
committerChuck Lever <chuck.lever@oracle.com>
Mon, 16 Oct 2023 16:44:07 +0000 (12:44 -0400)
lwq avoids using back pointers in lists, and uses less locking.
This introduces a new spinlock, but the other one will be removed in a
future patch.

For svc_clean_up_xprts(), we now dequeue the entire queue, walk it to
remove and process the xprts that need cleaning up, then re-enqueue the
remaining queue.

Signed-off-by: NeilBrown <neilb@suse.de>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
include/linux/sunrpc/svc.h
include/linux/sunrpc/svc_xprt.h
net/sunrpc/svc.c
net/sunrpc/svc_xprt.c

index dafa362b4fddb582eaa3afd4ec51eddab55194b5..7ff9fe785e49464b999384fb7878d237caf395d2 100644 (file)
@@ -17,6 +17,7 @@
 #include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/auth.h>
 #include <linux/sunrpc/svcauth.h>
+#include <linux/lwq.h>
 #include <linux/wait.h>
 #include <linux/mm.h>
 #include <linux/pagevec.h>
@@ -34,7 +35,7 @@
 struct svc_pool {
        unsigned int            sp_id;          /* pool id; also node id on NUMA */
        spinlock_t              sp_lock;        /* protects all fields */
-       struct list_head        sp_sockets;     /* pending sockets */
+       struct lwq              sp_xprts;       /* pending transports */
        unsigned int            sp_nrthreads;   /* # of threads in pool */
        struct list_head        sp_all_threads; /* all server threads */
        struct llist_head       sp_idle_threads; /* idle server threads */
index fa55d12dc76512a2ef8584a068e8b1ef7d4d444c..8e20cd60e2e712afd9be8b30b82eae200795aef3 100644 (file)
@@ -54,7 +54,7 @@ struct svc_xprt {
        const struct svc_xprt_ops *xpt_ops;
        struct kref             xpt_ref;
        struct list_head        xpt_list;
-       struct list_head        xpt_ready;
+       struct lwq_node         xpt_ready;
        unsigned long           xpt_flags;
 
        struct svc_serv         *xpt_server;    /* service for transport */
index 326592162af12c8eba17e2094a627d6121a422cb..244b5b9eba4d635ff991fdfa46dc65dacee1b757 100644 (file)
@@ -508,7 +508,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
                                i, serv->sv_name);
 
                pool->sp_id = i;
-               INIT_LIST_HEAD(&pool->sp_sockets);
+               lwq_init(&pool->sp_xprts);
                INIT_LIST_HEAD(&pool->sp_all_threads);
                init_llist_head(&pool->sp_idle_threads);
                spin_lock_init(&pool->sp_lock);
index 75f66714e3a7fe17aa298d6a83518f59321264f3..28ca7db55da1b2bdfb01ea2869633eab7f5b05d8 100644 (file)
@@ -201,7 +201,6 @@ void svc_xprt_init(struct net *net, struct svc_xprt_class *xcl,
        kref_init(&xprt->xpt_ref);
        xprt->xpt_server = serv;
        INIT_LIST_HEAD(&xprt->xpt_list);
-       INIT_LIST_HEAD(&xprt->xpt_ready);
        INIT_LIST_HEAD(&xprt->xpt_deferred);
        INIT_LIST_HEAD(&xprt->xpt_users);
        mutex_init(&xprt->xpt_mutex);
@@ -472,9 +471,7 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
        pool = svc_pool_for_cpu(xprt->xpt_server);
 
        percpu_counter_inc(&pool->sp_sockets_queued);
-       spin_lock_bh(&pool->sp_lock);
-       list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
-       spin_unlock_bh(&pool->sp_lock);
+       lwq_enqueue(&xprt->xpt_ready, &pool->sp_xprts);
 
        svc_pool_wake_idle_thread(pool);
 }
@@ -487,18 +484,9 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
 {
        struct svc_xprt *xprt = NULL;
 
-       if (list_empty(&pool->sp_sockets))
-               goto out;
-
-       spin_lock_bh(&pool->sp_lock);
-       if (likely(!list_empty(&pool->sp_sockets))) {
-               xprt = list_first_entry(&pool->sp_sockets,
-                                       struct svc_xprt, xpt_ready);
-               list_del_init(&xprt->xpt_ready);
+       xprt = lwq_dequeue(&pool->sp_xprts, struct svc_xprt, xpt_ready);
+       if (xprt)
                svc_xprt_get(xprt);
-       }
-       spin_unlock_bh(&pool->sp_lock);
-out:
        return xprt;
 }
 
@@ -708,7 +696,7 @@ svc_thread_should_sleep(struct svc_rqst *rqstp)
                return false;
 
        /* was a socket queued? */
-       if (!list_empty(&pool->sp_sockets))
+       if (!lwq_empty(&pool->sp_xprts))
                return false;
 
        /* are we shutting down? */
@@ -1050,7 +1038,6 @@ static void svc_delete_xprt(struct svc_xprt *xprt)
 
        spin_lock_bh(&serv->sv_lock);
        list_del_init(&xprt->xpt_list);
-       WARN_ON_ONCE(!list_empty(&xprt->xpt_ready));
        if (test_bit(XPT_TEMP, &xprt->xpt_flags))
                serv->sv_tmpcnt--;
        spin_unlock_bh(&serv->sv_lock);
@@ -1101,36 +1088,26 @@ static int svc_close_list(struct svc_serv *serv, struct list_head *xprt_list, st
        return ret;
 }
 
-static struct svc_xprt *svc_dequeue_net(struct svc_serv *serv, struct net *net)
+static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
 {
-       struct svc_pool *pool;
        struct svc_xprt *xprt;
-       struct svc_xprt *tmp;
        int i;
 
        for (i = 0; i < serv->sv_nrpools; i++) {
-               pool = &serv->sv_pools[i];
-
-               spin_lock_bh(&pool->sp_lock);
-               list_for_each_entry_safe(xprt, tmp, &pool->sp_sockets, xpt_ready) {
-                       if (xprt->xpt_net != net)
-                               continue;
-                       list_del_init(&xprt->xpt_ready);
-                       spin_unlock_bh(&pool->sp_lock);
-                       return xprt;
+               struct svc_pool *pool = &serv->sv_pools[i];
+               struct llist_node *q, **t1, *t2;
+
+               q = lwq_dequeue_all(&pool->sp_xprts);
+               lwq_for_each_safe(xprt, t1, t2, &q, xpt_ready) {
+                       if (xprt->xpt_net == net) {
+                               set_bit(XPT_CLOSE, &xprt->xpt_flags);
+                               svc_delete_xprt(xprt);
+                               xprt = NULL;
+                       }
                }
-               spin_unlock_bh(&pool->sp_lock);
-       }
-       return NULL;
-}
 
-static void svc_clean_up_xprts(struct svc_serv *serv, struct net *net)
-{
-       struct svc_xprt *xprt;
-
-       while ((xprt = svc_dequeue_net(serv, net))) {
-               set_bit(XPT_CLOSE, &xprt->xpt_flags);
-               svc_delete_xprt(xprt);
+               if (q)
+                       lwq_enqueue_batch(q, &pool->sp_xprts);
        }
 }