fs/pipe: use spinlock in pipe_read() only if there is a watch_queue
authorMax Kellermann <max.kellermann@ionos.com>
Thu, 21 Sep 2023 07:57:55 +0000 (09:57 +0200)
committerChristian Brauner <brauner@kernel.org>
Thu, 19 Oct 2023 09:02:48 +0000 (11:02 +0200)
If there is no watch_queue, holding the pipe mutex is enough to
prevent concurrent writes, and we can avoid the spinlock.

O_NOTIFICATION_QUEUE is an exotic and rarely used feature, and of all
the pipes that exist at any given time, only very few actually have a
watch_queue, therefore it appears worthwile to optimize the common
case.

This patch does not optimize pipe_resize_ring() where the spinlocks
could be avoided as well; that does not seem like a worthwile
optimization because this function is not called often.

Related commits:

- commit 8df441294dd3 ("pipe: Check for ring full inside of the
  spinlock in pipe_write()")
- commit b667b8673443 ("pipe: Advance tail pointer inside of wait
  spinlock in pipe_read()")
- commit 189b0ddc2451 ("pipe: Fix missing lock in pipe_resize_ring()")

Signed-off-by: Max Kellermann <max.kellermann@ionos.com>
Message-Id: <20230921075755.1378787-4-max.kellermann@ionos.com>
Reviewed-by: David Howells <dhowells@redhat.com>
Signed-off-by: Christian Brauner <brauner@kernel.org>
fs/pipe.c

index 939def02c18cc88c9a7c311ca7161861b33d7f0a..1362e2ec2211588313c39c5845487dfee8cae156 100644 (file)
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -227,6 +227,36 @@ static inline bool pipe_readable(const struct pipe_inode_info *pipe)
        return !pipe_empty(head, tail) || !writers;
 }
 
+static inline unsigned int pipe_update_tail(struct pipe_inode_info *pipe,
+                                           struct pipe_buffer *buf,
+                                           unsigned int tail)
+{
+       pipe_buf_release(pipe, buf);
+
+       /*
+        * If the pipe has a watch_queue, we need additional protection
+        * by the spinlock because notifications get posted with only
+        * this spinlock, no mutex
+        */
+       if (pipe_has_watch_queue(pipe)) {
+               spin_lock_irq(&pipe->rd_wait.lock);
+#ifdef CONFIG_WATCH_QUEUE
+               if (buf->flags & PIPE_BUF_FLAG_LOSS)
+                       pipe->note_loss = true;
+#endif
+               pipe->tail = ++tail;
+               spin_unlock_irq(&pipe->rd_wait.lock);
+               return tail;
+       }
+
+       /*
+        * Without a watch_queue, we can simply increment the tail
+        * without the spinlock - the mutex is enough.
+        */
+       pipe->tail = ++tail;
+       return tail;
+}
+
 static ssize_t
 pipe_read(struct kiocb *iocb, struct iov_iter *to)
 {
@@ -320,17 +350,8 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
                                buf->len = 0;
                        }
 
-                       if (!buf->len) {
-                               pipe_buf_release(pipe, buf);
-                               spin_lock_irq(&pipe->rd_wait.lock);
-#ifdef CONFIG_WATCH_QUEUE
-                               if (buf->flags & PIPE_BUF_FLAG_LOSS)
-                                       pipe->note_loss = true;
-#endif
-                               tail++;
-                               pipe->tail = tail;
-                               spin_unlock_irq(&pipe->rd_wait.lock);
-                       }
+                       if (!buf->len)
+                               tail = pipe_update_tail(pipe, buf, tail);
                        total_len -= chars;
                        if (!total_len)
                                break;  /* common path: read succeeded */