fs: dlm: relax sending to allow receiving
authorAlexander Aring <aahringo@redhat.com>
Thu, 27 Oct 2022 20:45:24 +0000 (16:45 -0400)
committerDavid Teigland <teigland@redhat.com>
Tue, 8 Nov 2022 18:59:41 +0000 (12:59 -0600)
This patch drops additionally the sock_mutex when there is a sending
message burst. Since we have acknowledge handling we free sending
buffers only when we receive an ack back, but if we are stuck in
send_to_sock() looping because dlm sends a lot of messages and we never
leave the loop the sending buffer fill up very quickly. We can't receive
during this iteration because the sock_mutex is held. This patch will
unlock the sock_mutex so it should be possible to receive messages when
a burst of sending messages happens. This will allow to free up memory
because acks which are already received can be processed.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/lowcomms.c

index 871d4e9f49fb613276504579965f7f22858775c0..b05c6d9b5102c01ded06945d1d7ea7760c38fe25 100644 (file)
@@ -1418,7 +1418,10 @@ static void send_to_sock(struct connection *con)
        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
        struct writequeue_entry *e;
        int len, offset, ret;
-       int count = 0;
+       int count;
+
+again:
+       count = 0;
 
        mutex_lock(&con->sock_mutex);
        if (con->sock == NULL)
@@ -1453,14 +1456,16 @@ static void send_to_sock(struct connection *con)
                } else if (ret < 0)
                        goto out;
 
+               spin_lock(&con->writequeue_lock);
+               writequeue_entry_complete(e, ret);
+
                /* Don't starve people filling buffers */
                if (++count >= MAX_SEND_MSG_COUNT) {
+                       spin_unlock(&con->writequeue_lock);
+                       mutex_unlock(&con->sock_mutex);
                        cond_resched();
-                       count = 0;
+                       goto again;
                }
-
-               spin_lock(&con->writequeue_lock);
-               writequeue_entry_complete(e, ret);
        }
        spin_unlock(&con->writequeue_lock);