io_uring: fix sequencing issues with linked timeouts
authorJens Axboe <axboe@kernel.dk>
Fri, 15 Nov 2019 02:39:52 +0000 (19:39 -0700)
committerJens Axboe <axboe@kernel.dk>
Tue, 26 Nov 2019 02:56:05 +0000 (19:56 -0700)
We have an issue with timeout links that are deeper in the submit chain,
because we only handle it upfront, not from later submissions. Move the
prep + issue of the timeout link to the async work prep handler, and do
it normally for non-async queue. If we validate and prepare the timeout
links upfront when we first see them, there's nothing stopping us from
supporting any sort of nesting.

Fixes: 2665abfd757f ("io_uring: add support for linked SQE timeouts")
Reported-by: Pavel Begunkov <asml.silence@gmail.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index 8cd3505d167bb57a472f0c201005c44c006688b5..759a30e52fdafb8c243bf301cb51c0aabbcebfef 100644 (file)
@@ -353,6 +353,7 @@ struct io_kiocb {
 #define REQ_F_TIMEOUT_NOSEQ    8192    /* no timeout sequence */
 #define REQ_F_INFLIGHT         16384   /* on inflight list */
 #define REQ_F_COMP_LOCKED      32768   /* completion under lock */
+#define REQ_F_FREE_SQE         65536   /* free sqe if not async queued */
        u64                     user_data;
        u32                     result;
        u32                     sequence;
@@ -391,6 +392,8 @@ static void __io_free_req(struct io_kiocb *req);
 static void io_put_req(struct io_kiocb *req);
 static void io_double_put_req(struct io_kiocb *req);
 static void __io_double_put_req(struct io_kiocb *req);
+static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
+static void io_queue_linked_timeout(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
 
@@ -529,7 +532,8 @@ static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
                 opcode == IORING_OP_WRITE_FIXED);
 }
 
-static inline bool io_prep_async_work(struct io_kiocb *req)
+static inline bool io_prep_async_work(struct io_kiocb *req,
+                                     struct io_kiocb **link)
 {
        bool do_hashed = false;
 
@@ -558,13 +562,17 @@ static inline bool io_prep_async_work(struct io_kiocb *req)
                        req->work.flags |= IO_WQ_WORK_NEEDS_USER;
        }
 
+       *link = io_prep_linked_timeout(req);
        return do_hashed;
 }
 
 static inline void io_queue_async_work(struct io_kiocb *req)
 {
-       bool do_hashed = io_prep_async_work(req);
        struct io_ring_ctx *ctx = req->ctx;
+       struct io_kiocb *link;
+       bool do_hashed;
+
+       do_hashed = io_prep_async_work(req, &link);
 
        trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
                                        req->flags);
@@ -574,6 +582,9 @@ static inline void io_queue_async_work(struct io_kiocb *req)
                io_wq_enqueue_hashed(ctx->io_wq, &req->work,
                                        file_inode(req->file));
        }
+
+       if (link)
+               io_queue_linked_timeout(link);
 }
 
 static void io_kill_timeout(struct io_kiocb *req)
@@ -875,6 +886,15 @@ static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
        nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
        while (nxt) {
                list_del_init(&nxt->list);
+
+               if ((req->flags & REQ_F_LINK_TIMEOUT) &&
+                   (nxt->flags & REQ_F_TIMEOUT)) {
+                       wake_ev |= io_link_cancel_timeout(nxt);
+                       nxt = list_first_entry_or_null(&req->link_list,
+                                                       struct io_kiocb, list);
+                       req->flags &= ~REQ_F_LINK_TIMEOUT;
+                       continue;
+               }
                if (!list_empty(&req->link_list)) {
                        INIT_LIST_HEAD(&nxt->link_list);
                        list_splice(&req->link_list, &nxt->link_list);
@@ -885,19 +905,13 @@ static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
                 * If we're in async work, we can continue processing the chain
                 * in this context instead of having to queue up new async work.
                 */
-               if (req->flags & REQ_F_LINK_TIMEOUT) {
-                       wake_ev = io_link_cancel_timeout(nxt);
-
-                       /* we dropped this link, get next */
-                       nxt = list_first_entry_or_null(&req->link_list,
-                                                       struct io_kiocb, list);
-               } else if (nxtptr && io_wq_current_is_worker()) {
-                       *nxtptr = nxt;
-                       break;
-               } else {
-                       io_queue_async_work(nxt);
-                       break;
+               if (nxt) {
+                       if (nxtptr && io_wq_current_is_worker())
+                               *nxtptr = nxt;
+                       else
+                               io_queue_async_work(nxt);
                }
+               break;
        }
 
        if (wake_ev)
@@ -916,11 +930,16 @@ static void io_fail_links(struct io_kiocb *req)
        spin_lock_irqsave(&ctx->completion_lock, flags);
 
        while (!list_empty(&req->link_list)) {
+               const struct io_uring_sqe *sqe_to_free = NULL;
+
                link = list_first_entry(&req->link_list, struct io_kiocb, list);
                list_del_init(&link->list);
 
                trace_io_uring_fail_link(req, link);
 
+               if (link->flags & REQ_F_FREE_SQE)
+                       sqe_to_free = link->submit.sqe;
+
                if ((req->flags & REQ_F_LINK_TIMEOUT) &&
                    link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) {
                        io_link_cancel_timeout(link);
@@ -928,6 +947,7 @@ static void io_fail_links(struct io_kiocb *req)
                        io_cqring_fill_event(link, -ECANCELED);
                        __io_double_put_req(link);
                }
+               kfree(sqe_to_free);
        }
 
        io_commit_cqring(ctx);
@@ -2683,8 +2703,12 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
 
        /* if a dependent link is ready, pass it back */
        if (!ret && nxt) {
-               io_prep_async_work(nxt);
+               struct io_kiocb *link;
+
+               io_prep_async_work(nxt, &link);
                *workptr = &nxt->work;
+               if (link)
+                       io_queue_linked_timeout(link);
        }
 }
 
@@ -2819,7 +2843,6 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
 
 static void io_queue_linked_timeout(struct io_kiocb *req)
 {
-       struct io_timeout_data *data = req->timeout.data;
        struct io_ring_ctx *ctx = req->ctx;
 
        /*
@@ -2828,6 +2851,8 @@ static void io_queue_linked_timeout(struct io_kiocb *req)
         */
        spin_lock_irq(&ctx->completion_lock);
        if (!list_empty(&req->list)) {
+               struct io_timeout_data *data = req->timeout.data;
+
                data->timer.function = io_link_timeout_fn;
                hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
                                data->mode);
@@ -2841,7 +2866,6 @@ static void io_queue_linked_timeout(struct io_kiocb *req)
 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
 {
        struct io_kiocb *nxt;
-       int ret;
 
        if (!(req->flags & REQ_F_LINK))
                return NULL;
@@ -2850,33 +2874,15 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
        if (!nxt || nxt->submit.sqe->opcode != IORING_OP_LINK_TIMEOUT)
                return NULL;
 
-       ret = io_timeout_setup(nxt);
-       /* common setup allows offset being set, we don't */
-       if (!ret && nxt->submit.sqe->off)
-               ret = -EINVAL;
-       if (ret) {
-               list_del_init(&nxt->list);
-               io_cqring_add_event(nxt, ret);
-               io_double_put_req(nxt);
-               return ERR_PTR(-ECANCELED);
-       }
-
        req->flags |= REQ_F_LINK_TIMEOUT;
        return nxt;
 }
 
 static void __io_queue_sqe(struct io_kiocb *req)
 {
-       struct io_kiocb *nxt;
+       struct io_kiocb *nxt = io_prep_linked_timeout(req);
        int ret;
 
-       nxt = io_prep_linked_timeout(req);
-       if (IS_ERR(nxt)) {
-               ret = PTR_ERR(nxt);
-               nxt = NULL;
-               goto err;
-       }
-
        ret = __io_submit_sqe(req, NULL, true);
 
        /*
@@ -2904,10 +2910,6 @@ static void __io_queue_sqe(struct io_kiocb *req)
                         * submit reference when the iocb is actually submitted.
                         */
                        io_queue_async_work(req);
-
-                       if (nxt)
-                               io_queue_linked_timeout(nxt);
-
                        return;
                }
        }
@@ -2952,6 +2954,10 @@ static void io_queue_link_head(struct io_kiocb *req, struct io_kiocb *shadow)
        int need_submit = false;
        struct io_ring_ctx *ctx = req->ctx;
 
+       if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
+               ret = -ECANCELED;
+               goto err;
+       }
        if (!shadow) {
                io_queue_sqe(req);
                return;
@@ -2966,9 +2972,11 @@ static void io_queue_link_head(struct io_kiocb *req, struct io_kiocb *shadow)
        ret = io_req_defer(req);
        if (ret) {
                if (ret != -EIOCBQUEUED) {
+err:
                        io_cqring_add_event(req, ret);
                        io_double_put_req(req);
-                       __io_free_req(shadow);
+                       if (shadow)
+                               __io_free_req(shadow);
                        return;
                }
        } else {
@@ -3025,6 +3033,17 @@ err_req:
        if (*link) {
                struct io_kiocb *prev = *link;
 
+               if (READ_ONCE(s->sqe->opcode) == IORING_OP_LINK_TIMEOUT) {
+                       ret = io_timeout_setup(req);
+                       /* common setup allows offset being set, we don't */
+                       if (!ret && s->sqe->off)
+                               ret = -EINVAL;
+                       if (ret) {
+                               prev->flags |= REQ_F_FAIL_LINK;
+                               goto err_req;
+                       }
+               }
+
                sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
                if (!sqe_copy) {
                        ret = -EAGAIN;
@@ -3032,6 +3051,7 @@ err_req:
                }
 
                s->sqe = sqe_copy;
+               req->flags |= REQ_F_FREE_SQE;
                trace_io_uring_link(ctx, req, prev);
                list_add_tail(&req->list, &prev->link_list);
        } else if (s->sqe->flags & IOSQE_IO_LINK) {