io_uring: enable req cache for IRQ driven IO
authorJens Axboe <axboe@kernel.dk>
Wed, 10 Feb 2021 02:53:37 +0000 (19:53 -0700)
committerJens Axboe <axboe@kernel.dk>
Wed, 10 Feb 2021 14:33:12 +0000 (07:33 -0700)
This is the last class of requests that cannot utilize the req alloc
cache. Add a per-ctx req cache that is protected by the completion_lock,
and refill our submit side cache when it gets over our batch count.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index e73ca37c6a3b0d42ea1446410f3d916a645b29ba..2c7ff0b1b086af819c2435a2ec49bf6c9e3c151a 100644 (file)
@@ -272,7 +272,11 @@ struct io_sq_data {
 struct io_comp_state {
        struct io_kiocb         *reqs[IO_COMPL_BATCH];
        unsigned int            nr;
+       unsigned int            locked_free_nr;
+       /* inline/task_work completion list, under ->uring_lock */
        struct list_head        free_list;
+       /* IRQ completion list, under ->completion_lock */
+       struct list_head        locked_free_list;
 };
 
 struct io_submit_state {
@@ -1033,6 +1037,9 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res);
 static void io_put_req(struct io_kiocb *req);
 static void io_put_req_deferred(struct io_kiocb *req, int nr);
 static void io_double_put_req(struct io_kiocb *req);
+static void io_dismantle_req(struct io_kiocb *req);
+static void io_put_task(struct task_struct *task, int nr);
+static void io_queue_next(struct io_kiocb *req);
 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
 static void __io_queue_linked_timeout(struct io_kiocb *req);
 static void io_queue_linked_timeout(struct io_kiocb *req);
@@ -1353,6 +1360,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
        init_llist_head(&ctx->rsrc_put_llist);
        INIT_LIST_HEAD(&ctx->submit_state.comp.free_list);
+       INIT_LIST_HEAD(&ctx->submit_state.comp.locked_free_list);
        return ctx;
 err:
        kfree(ctx->cancel_hash);
@@ -1908,8 +1916,8 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
        __io_cqring_fill_event(req, res, 0);
 }
 
-static void io_req_complete_post(struct io_kiocb *req, long res,
-                                unsigned int cflags)
+static inline void io_req_complete_post(struct io_kiocb *req, long res,
+                                       unsigned int cflags)
 {
        struct io_ring_ctx *ctx = req->ctx;
        unsigned long flags;
@@ -1917,16 +1925,26 @@ static void io_req_complete_post(struct io_kiocb *req, long res,
        spin_lock_irqsave(&ctx->completion_lock, flags);
        __io_cqring_fill_event(req, res, cflags);
        io_commit_cqring(ctx);
+       /*
+        * If we're the last reference to this request, add to our locked
+        * free_list cache.
+        */
+       if (refcount_dec_and_test(&req->refs)) {
+               struct io_comp_state *cs = &ctx->submit_state.comp;
+
+               io_dismantle_req(req);
+               io_put_task(req->task, 1);
+               list_add(&req->compl.list, &cs->locked_free_list);
+               cs->locked_free_nr++;
+       } else
+               req = NULL;
        spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
        io_cqring_ev_posted(ctx);
-}
-
-static inline void io_req_complete_nostate(struct io_kiocb *req, long res,
-                                          unsigned int cflags)
-{
-       io_req_complete_post(req, res, cflags);
-       io_put_req(req);
+       if (req) {
+               io_queue_next(req);
+               percpu_ref_put(&ctx->refs);
+       }
 }
 
 static void io_req_complete_state(struct io_kiocb *req, long res,
@@ -1944,7 +1962,7 @@ static inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags,
        if (issue_flags & IO_URING_F_COMPLETE_DEFER)
                io_req_complete_state(req, res, cflags);
        else
-               io_req_complete_nostate(req, res, cflags);
+               io_req_complete_post(req, res, cflags);
 }
 
 static inline void io_req_complete(struct io_kiocb *req, long res)
@@ -1952,12 +1970,26 @@ static inline void io_req_complete(struct io_kiocb *req, long res)
        __io_req_complete(req, 0, res, 0);
 }
 
-static bool io_flush_cached_reqs(struct io_submit_state *state)
+static bool io_flush_cached_reqs(struct io_ring_ctx *ctx)
 {
+       struct io_submit_state *state = &ctx->submit_state;
+       struct io_comp_state *cs = &state->comp;
        struct io_kiocb *req = NULL;
 
-       while (!list_empty(&state->comp.free_list)) {
-               req = list_first_entry(&state->comp.free_list, struct io_kiocb,
+       /*
+        * If we have more than a batch's worth of requests in our IRQ side
+        * locked cache, grab the lock and move them over to our submission
+        * side cache.
+        */
+       if (READ_ONCE(cs->locked_free_nr) > IO_COMPL_BATCH) {
+               spin_lock_irq(&ctx->completion_lock);
+               list_splice_init(&cs->locked_free_list, &cs->free_list);
+               cs->locked_free_nr = 0;
+               spin_unlock_irq(&ctx->completion_lock);
+       }
+
+       while (!list_empty(&cs->free_list)) {
+               req = list_first_entry(&cs->free_list, struct io_kiocb,
                                        compl.list);
                list_del(&req->compl.list);
                state->reqs[state->free_reqs++] = req;
@@ -1978,7 +2010,7 @@ static struct io_kiocb *io_alloc_req(struct io_ring_ctx *ctx)
                gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
                int ret;
 
-               if (io_flush_cached_reqs(state))
+               if (io_flush_cached_reqs(ctx))
                        goto got_req;
 
                ret = kmem_cache_alloc_bulk(req_cachep, gfp, IO_REQ_ALLOC_BATCH,
@@ -8748,14 +8780,12 @@ static void io_destroy_buffers(struct io_ring_ctx *ctx)
        idr_destroy(&ctx->io_buffer_idr);
 }
 
-static void io_req_cache_free(struct io_ring_ctx *ctx)
+static void io_req_cache_free(struct list_head *list)
 {
-       struct io_comp_state *cs = &ctx->submit_state.comp;
-
-       while (!list_empty(&cs->free_list)) {
+       while (!list_empty(list)) {
                struct io_kiocb *req;
 
-               req = list_first_entry(&cs->free_list, struct io_kiocb, compl.list);
+               req = list_first_entry(list, struct io_kiocb, compl.list);
                list_del(&req->compl.list);
                kmem_cache_free(req_cachep, req);
        }
@@ -8803,7 +8833,8 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
        free_uid(ctx->user);
        put_cred(ctx->creds);
        kfree(ctx->cancel_hash);
-       io_req_cache_free(ctx);
+       io_req_cache_free(&ctx->submit_state.comp.free_list);
+       io_req_cache_free(&ctx->submit_state.comp.locked_free_list);
        kfree(ctx);
 }