io_uring: io_uring: add support for async work inheriting files
authorJens Axboe <axboe@kernel.dk>
Thu, 24 Oct 2019 18:39:47 +0000 (12:39 -0600)
committerJens Axboe <axboe@kernel.dk>
Tue, 29 Oct 2019 18:43:06 +0000 (12:43 -0600)
This is in preparation for adding opcodes that need to add new files
in a process file table, system calls like open(2) or accept4(2).

If an opcode needs this, it must set IO_WQ_WORK_NEEDS_FILES in the work
item. If work that needs to get punted to async context have this
set, the async worker will assume the original task file table before
executing the work.

Note that opcodes that need access to the current files of an
application cannot be done through IORING_SETUP_SQPOLL.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io-wq.c
fs/io-wq.h
fs/io_uring.c

index 37863879e987ded686c869ff2d959b193ed05446..253c04a40db57f35e4bcf25b8c4193c1755851ac 100644 (file)
@@ -52,6 +52,7 @@ struct io_worker {
 
        struct rcu_head rcu;
        struct mm_struct *mm;
+       struct files_struct *restore_files;
 };
 
 struct io_wq_nulls_list {
@@ -126,22 +127,36 @@ static void io_worker_release(struct io_worker *worker)
  */
 static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
 {
+       bool dropped_lock = false;
+
+       if (current->files != worker->restore_files) {
+               __acquire(&wqe->lock);
+               spin_unlock_irq(&wqe->lock);
+               dropped_lock = true;
+
+               task_lock(current);
+               current->files = worker->restore_files;
+               task_unlock(current);
+       }
+
        /*
         * If we have an active mm, we need to drop the wq lock before unusing
         * it. If we do, return true and let the caller retry the idle loop.
         */
        if (worker->mm) {
-               __acquire(&wqe->lock);
-               spin_unlock_irq(&wqe->lock);
+               if (!dropped_lock) {
+                       __acquire(&wqe->lock);
+                       spin_unlock_irq(&wqe->lock);
+                       dropped_lock = true;
+               }
                __set_current_state(TASK_RUNNING);
                set_fs(KERNEL_DS);
                unuse_mm(worker->mm);
                mmput(worker->mm);
                worker->mm = NULL;
-               return true;
        }
 
-       return false;
+       return dropped_lock;
 }
 
 static void io_worker_exit(struct io_worker *worker)
@@ -189,6 +204,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
        current->flags |= PF_IO_WORKER;
 
        worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
+       worker->restore_files = current->files;
        atomic_inc(&wqe->nr_running);
 }
 
@@ -291,6 +307,12 @@ static void io_worker_handle_work(struct io_worker *worker)
                if (!work)
                        break;
 next:
+               if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
+                   current->files != work->files) {
+                       task_lock(current);
+                       current->files = work->files;
+                       task_unlock(current);
+               }
                if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
                    wq->mm && mmget_not_zero(wq->mm)) {
                        use_mm(wq->mm);
index be8f22c8937b5e754cfe77d5f49aa6c349e01398..e93f764b1fa4ee23fd9e49d0b78507765788b425 100644 (file)
@@ -8,6 +8,7 @@ enum {
        IO_WQ_WORK_HAS_MM       = 2,
        IO_WQ_WORK_HASHED       = 4,
        IO_WQ_WORK_NEEDS_USER   = 8,
+       IO_WQ_WORK_NEEDS_FILES  = 16,
 
        IO_WQ_HASH_SHIFT        = 24,   /* upper 8 bits are used for hash key */
 };
@@ -22,12 +23,14 @@ struct io_wq_work {
        struct list_head list;
        void (*func)(struct io_wq_work **);
        unsigned flags;
+       struct files_struct *files;
 };
 
 #define INIT_IO_WORK(work, _func)                      \
        do {                                            \
                (work)->func = _func;                   \
                (work)->flags = 0;                      \
+               (work)->files = NULL;                   \
        } while (0)                                     \
 
 struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm);
index d94bd4e3a60eb675fac9f447fd9eb5e28505058e..6e1523567920c0bfe0914e3c28076c1569276aad 100644 (file)
@@ -196,6 +196,8 @@ struct io_ring_ctx {
 
                struct list_head        defer_list;
                struct list_head        timeout_list;
+
+               wait_queue_head_t       inflight_wait;
        } ____cacheline_aligned_in_smp;
 
        /* IO offload */
@@ -250,6 +252,9 @@ struct io_ring_ctx {
                 */
                struct list_head        poll_list;
                struct list_head        cancel_list;
+
+               spinlock_t              inflight_lock;
+               struct list_head        inflight_list;
        } ____cacheline_aligned_in_smp;
 
 #if defined(CONFIG_UNIX)
@@ -259,6 +264,8 @@ struct io_ring_ctx {
 
 struct sqe_submit {
        const struct io_uring_sqe       *sqe;
+       struct file                     *ring_file;
+       int                             ring_fd;
        u32                             sequence;
        bool                            has_user;
        bool                            in_async;
@@ -317,10 +324,13 @@ struct io_kiocb {
 #define REQ_F_TIMEOUT          1024    /* timeout request */
 #define REQ_F_ISREG            2048    /* regular file */
 #define REQ_F_MUST_PUNT                4096    /* must be punted even for NONBLOCK */
+#define REQ_F_INFLIGHT         8192    /* on inflight list */
        u64                     user_data;
        u32                     result;
        u32                     sequence;
 
+       struct list_head        inflight_entry;
+
        struct io_wq_work       work;
 };
 
@@ -401,6 +411,9 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        INIT_LIST_HEAD(&ctx->cancel_list);
        INIT_LIST_HEAD(&ctx->defer_list);
        INIT_LIST_HEAD(&ctx->timeout_list);
+       init_waitqueue_head(&ctx->inflight_wait);
+       spin_lock_init(&ctx->inflight_lock);
+       INIT_LIST_HEAD(&ctx->inflight_list);
        return ctx;
 }
 
@@ -670,9 +683,20 @@ static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
 
 static void __io_free_req(struct io_kiocb *req)
 {
+       struct io_ring_ctx *ctx = req->ctx;
+
        if (req->file && !(req->flags & REQ_F_FIXED_FILE))
                fput(req->file);
-       percpu_ref_put(&req->ctx->refs);
+       if (req->flags & REQ_F_INFLIGHT) {
+               unsigned long flags;
+
+               spin_lock_irqsave(&ctx->inflight_lock, flags);
+               list_del(&req->inflight_entry);
+               if (waitqueue_active(&ctx->inflight_wait))
+                       wake_up(&ctx->inflight_wait);
+               spin_unlock_irqrestore(&ctx->inflight_lock, flags);
+       }
+       percpu_ref_put(&ctx->refs);
        kmem_cache_free(req_cachep, req);
 }
 
@@ -2276,6 +2300,30 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
        return 0;
 }
 
+static int io_grab_files(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+       int ret = -EBADF;
+
+       rcu_read_lock();
+       spin_lock_irq(&ctx->inflight_lock);
+       /*
+        * We use the f_ops->flush() handler to ensure that we can flush
+        * out work accessing these files if the fd is closed. Check if
+        * the fd has changed since we started down this path, and disallow
+        * this operation if it has.
+        */
+       if (fcheck(req->submit.ring_fd) == req->submit.ring_file) {
+               list_add(&req->inflight_entry, &ctx->inflight_list);
+               req->flags |= REQ_F_INFLIGHT;
+               req->work.files = current->files;
+               ret = 0;
+       }
+       spin_unlock_irq(&ctx->inflight_lock);
+       rcu_read_unlock();
+
+       return ret;
+}
+
 static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
                        struct sqe_submit *s)
 {
@@ -2295,17 +2343,25 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
                if (sqe_copy) {
                        s->sqe = sqe_copy;
                        memcpy(&req->submit, s, sizeof(*s));
-                       io_queue_async_work(ctx, req);
+                       if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
+                               ret = io_grab_files(ctx, req);
+                               if (ret) {
+                                       kfree(sqe_copy);
+                                       goto err;
+                               }
+                       }
 
                        /*
                         * Queued up for async execution, worker will release
                         * submit reference when the iocb is actually submitted.
                         */
+                       io_queue_async_work(ctx, req);
                        return 0;
                }
        }
 
        /* drop submission reference */
+err:
        io_put_req(req, NULL);
 
        /* and drop final reference, if we failed */
@@ -2509,6 +2565,7 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
 
        head = READ_ONCE(sq_array[head & ctx->sq_mask]);
        if (head < ctx->sq_entries) {
+               s->ring_file = NULL;
                s->sqe = &ctx->sq_sqes[head];
                s->sequence = ctx->cached_sq_head;
                ctx->cached_sq_head++;
@@ -2708,7 +2765,8 @@ static int io_sq_thread(void *data)
        return 0;
 }
 
-static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
+static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
+                         struct file *ring_file, int ring_fd)
 {
        struct io_submit_state state, *statep = NULL;
        struct io_kiocb *link = NULL;
@@ -2750,9 +2808,11 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
                }
 
 out:
+               s.ring_file = ring_file;
                s.has_user = true;
                s.in_async = false;
                s.needs_fixed_file = false;
+               s.ring_fd = ring_fd;
                submit++;
                trace_io_uring_submit_sqe(ctx, true, false);
                io_submit_sqe(ctx, &s, statep, &link);
@@ -3714,6 +3774,53 @@ static int io_uring_release(struct inode *inode, struct file *file)
        return 0;
 }
 
+static void io_uring_cancel_files(struct io_ring_ctx *ctx,
+                                 struct files_struct *files)
+{
+       struct io_kiocb *req;
+       DEFINE_WAIT(wait);
+
+       while (!list_empty_careful(&ctx->inflight_list)) {
+               enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
+
+               spin_lock_irq(&ctx->inflight_lock);
+               list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
+                       if (req->work.files == files) {
+                               ret = io_wq_cancel_work(ctx->io_wq, &req->work);
+                               break;
+                       }
+               }
+               if (ret == IO_WQ_CANCEL_RUNNING)
+                       prepare_to_wait(&ctx->inflight_wait, &wait,
+                                       TASK_UNINTERRUPTIBLE);
+
+               spin_unlock_irq(&ctx->inflight_lock);
+
+               /*
+                * We need to keep going until we get NOTFOUND. We only cancel
+                * one work at the time.
+                *
+                * If we get CANCEL_RUNNING, then wait for a work to complete
+                * before continuing.
+                */
+               if (ret == IO_WQ_CANCEL_OK)
+                       continue;
+               else if (ret != IO_WQ_CANCEL_RUNNING)
+                       break;
+               schedule();
+       }
+}
+
+static int io_uring_flush(struct file *file, void *data)
+{
+       struct io_ring_ctx *ctx = file->private_data;
+
+       io_uring_cancel_files(ctx, data);
+       if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
+               io_wq_cancel_all(ctx->io_wq);
+       return 0;
+}
+
 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
 {
        loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
@@ -3782,7 +3889,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                to_submit = min(to_submit, ctx->sq_entries);
 
                mutex_lock(&ctx->uring_lock);
-               submitted = io_ring_submit(ctx, to_submit);
+               submitted = io_ring_submit(ctx, to_submit, f.file, fd);
                mutex_unlock(&ctx->uring_lock);
        }
        if (flags & IORING_ENTER_GETEVENTS) {
@@ -3805,6 +3912,7 @@ out_fput:
 
 static const struct file_operations io_uring_fops = {
        .release        = io_uring_release,
+       .flush          = io_uring_flush,
        .mmap           = io_uring_mmap,
        .poll           = io_uring_poll,
        .fasync         = io_uring_fasync,