From: Linus Torvalds Date: Mon, 3 Aug 2020 20:01:22 +0000 (-0700) Subject: Merge tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block X-Git-Url: http://git.maquefel.me/?a=commitdiff_plain;h=cdc8fcb49905c0b67e355e027cb462ee168ffaa3;p=linux.git Merge tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block Pull io_uring updates from Jens Axboe: "Lots of cleanups in here, hardening the code and/or making it easier to read and fixing bugs, but a core feature/change too adding support for real async buffered reads. With the latter in place, we just need buffered write async support and we're done relying on kthreads for the fast path. In detail: - Cleanup how memory accounting is done on ring setup/free (Bijan) - sq array offset calculation fixup (Dmitry) - Consistently handle blocking off O_DIRECT submission path (me) - Support proper async buffered reads, instead of relying on kthread offload for that. This uses the page waitqueue to drive retries from task_work, like we handle poll based retry. (me) - IO completion optimizations (me) - Fix race with accounting and ring fd install (me) - Support EPOLLEXCLUSIVE (Jiufei) - Get rid of the io_kiocb unionizing, made possible by shrinking other bits (Pavel) - Completion side cleanups (Pavel) - Cleanup REQ_F_ flags handling, and kill off many of them (Pavel) - Request environment grabbing cleanups (Pavel) - File and socket read/write cleanups (Pavel) - Improve kiocb_set_rw_flags() (Pavel) - Tons of fixes and cleanups (Pavel) - IORING_SQ_NEED_WAKEUP clear fix (Xiaoguang)" * tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block: (127 commits) io_uring: flip if handling after io_setup_async_rw fs: optimise kiocb_set_rw_flags() io_uring: don't touch 'ctx' after installing file descriptor io_uring: get rid of atomic FAA for cq_timeouts io_uring: consolidate *_check_overflow accounting io_uring: fix stalled deferred requests io_uring: fix racy overflow count reporting io_uring: deduplicate __io_complete_rw() io_uring: de-unionise io_kiocb io-wq: update hash bits io_uring: fix missing io_queue_linked_timeout() io_uring: mark ->work uninitialised after cleanup io_uring: deduplicate io_grab_files() calls io_uring: don't do opcode prep twice io_uring: clear IORING_SQ_NEED_WAKEUP after executing task works io_uring: batch put_task_struct() tasks: add put_task_struct_many() io_uring: return locked and pinned page accounting io_uring: don't miscount pinned memory io_uring: don't open-code recv kbuf managment ... --- cdc8fcb49905c0b67e355e027cb462ee168ffaa3 diff --cc block/blk-core.c index 93104c7470e8a,62a4904db921c..d9d632639bd18 --- a/block/blk-core.c +++ b/block/blk-core.c @@@ -956,13 -952,30 +956,18 @@@ static inline blk_status_t blk_check_zo return BLK_STS_OK; } -static noinline_for_stack bool -generic_make_request_checks(struct bio *bio) +static noinline_for_stack bool submit_bio_checks(struct bio *bio) { - struct request_queue *q; - int nr_sectors = bio_sectors(bio); + struct request_queue *q = bio->bi_disk->queue; blk_status_t status = BLK_STS_IOERR; + struct blk_plug *plug; - char b[BDEVNAME_SIZE]; might_sleep(); - q = bio->bi_disk->queue; - if (unlikely(!q)) { - printk(KERN_ERR - "generic_make_request: Trying to access " - "nonexistent block-device %s (%Lu)\n", - bio_devname(bio, b), (long long)bio->bi_iter.bi_sector); - goto end_io; - } - + plug = blk_mq_plug(q, bio); + if (plug && plug->nowait) + bio->bi_opf |= REQ_NOWAIT; + /* * For a REQ_NOWAIT based request, return -EOPNOTSUPP * if queue is not a request based queue. diff --cc fs/io_uring.c index 493e5047e67c9,6fd0b0f5df68b..2a3af95be4cab --- a/fs/io_uring.c +++ b/fs/io_uring.c @@@ -2593,40 -2913,122 +2913,147 @@@ static int io_setup_async_rw(struct io_ io_req_map_rw(req, io_size, iovec, fast_iov, iter); } - return 0; + return 0; + } + + static inline int io_rw_prep_async(struct io_kiocb *req, int rw, + bool force_nonblock) + { + struct io_async_ctx *io = req->io; + struct iov_iter iter; + ssize_t ret; + + io->rw.iov = io->rw.fast_iov; + req->io = NULL; + ret = io_import_iovec(rw, req, &io->rw.iov, &iter, !force_nonblock); + req->io = io; + if (unlikely(ret < 0)) + return ret; + + io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter); + return 0; + } + + static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, + bool force_nonblock) + { + ssize_t ret; + + ret = io_prep_rw(req, sqe, force_nonblock); + if (ret) + return ret; + + if (unlikely(!(req->file->f_mode & FMODE_READ))) + return -EBADF; + + /* either don't need iovec imported or already have it */ + if (!req->io || req->flags & REQ_F_NEED_CLEANUP) + return 0; + return io_rw_prep_async(req, READ, force_nonblock); + } + + static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode, + int sync, void *arg) + { + struct wait_page_queue *wpq; + struct io_kiocb *req = wait->private; + struct wait_page_key *key = arg; + int ret; + + wpq = container_of(wait, struct wait_page_queue, wait); + - ret = wake_page_match(wpq, key); - if (ret != 1) - return ret; ++ if (!wake_page_match(wpq, key)) ++ return 0; ++ ++ /* Stop waking things up if the page is locked again */ ++ if (test_bit(key->bit_nr, &key->page->flags)) ++ return -1; + + list_del_init(&wait->entry); + + init_task_work(&req->task_work, io_req_task_submit); + /* submit ref gets dropped, acquire a new one */ + refcount_inc(&req->refs); + ret = io_req_task_work_add(req, &req->task_work); + if (unlikely(ret)) { + struct task_struct *tsk; + + /* queue just for cancelation */ + init_task_work(&req->task_work, io_req_task_cancel); + tsk = io_wq_get_task(req->ctx->io_wq); + task_work_add(tsk, &req->task_work, 0); + wake_up_process(tsk); + } + return 1; + } + ++static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb, ++ struct wait_page_queue *wait, ++ wait_queue_func_t func, ++ void *data) ++{ ++ /* Can't support async wakeup with polled IO */ ++ if (kiocb->ki_flags & IOCB_HIPRI) ++ return -EINVAL; ++ if (kiocb->ki_filp->f_mode & FMODE_BUF_RASYNC) { ++ wait->wait.func = func; ++ wait->wait.private = data; ++ wait->wait.flags = 0; ++ INIT_LIST_HEAD(&wait->wait.entry); ++ kiocb->ki_flags |= IOCB_WAITQ; ++ kiocb->ki_waitq = wait; ++ return 0; ++ } ++ ++ return -EOPNOTSUPP; +} + - static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, - bool force_nonblock) ++ + static bool io_rw_should_retry(struct io_kiocb *req) { - struct io_async_ctx *io; - struct iov_iter iter; - ssize_t ret; + struct kiocb *kiocb = &req->rw.kiocb; + int ret; - ret = io_prep_rw(req, sqe, force_nonblock); - if (ret) - return ret; + /* never retry for NOWAIT, we just complete with -EAGAIN */ + if (req->flags & REQ_F_NOWAIT) + return false; - if (unlikely(!(req->file->f_mode & FMODE_READ))) - return -EBADF; + /* already tried, or we're doing O_DIRECT */ + if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ)) + return false; + /* + * just use poll if we can, and don't attempt if the fs doesn't + * support callback based unlocks + */ + if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC)) + return false; - /* either don't need iovec imported or already have it */ - if (!req->io || req->flags & REQ_F_NEED_CLEANUP) - return 0; + /* + * If request type doesn't require req->io to defer in general, + * we need to allocate it here + */ + if (!req->io && __io_alloc_async_ctx(req)) + return false; - io = req->io; - io->rw.iov = io->rw.fast_iov; - req->io = NULL; - ret = io_import_iovec(READ, req, &io->rw.iov, &iter, !force_nonblock); - req->io = io; - if (ret < 0) - return ret; + ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq, + io_async_buf_func, req); + if (!ret) { + io_get_req_task(req); + return true; + } - io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter); - return 0; + return false; + } + + static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter) + { + if (req->file->f_op->read_iter) + return call_read_iter(req->file, &req->rw.kiocb, iter); + return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter); } - static int io_read(struct io_kiocb *req, bool force_nonblock) + static int io_read(struct io_kiocb *req, bool force_nonblock, + struct io_comp_state *cs) { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw.kiocb; diff --cc include/linux/fs.h index cdfed8c997506,e535543d31d97..bd7ec3eaeed0c --- a/include/linux/fs.h +++ b/include/linux/fs.h @@@ -315,7 -318,8 +318,9 @@@ enum rw_hint #define IOCB_SYNC (1 << 5) #define IOCB_WRITE (1 << 6) #define IOCB_NOWAIT (1 << 7) + /* iocb->ki_waitq is valid */ + #define IOCB_WAITQ (1 << 8) +#define IOCB_NOIO (1 << 9) struct kiocb { struct file *ki_filp; diff --cc include/linux/pagemap.h index cf2468da68e91,7386bc67cc5a7..d1f4eff605ad9 --- a/include/linux/pagemap.h +++ b/include/linux/pagemap.h @@@ -496,8 -496,67 +496,35 @@@ static inline pgoff_t linear_page_index return pgoff; } + /* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */ + struct wait_page_key { + struct page *page; + int bit_nr; + int page_match; + }; + + struct wait_page_queue { + struct page *page; + int bit_nr; + wait_queue_entry_t wait; + }; + -static inline int wake_page_match(struct wait_page_queue *wait_page, ++static inline bool wake_page_match(struct wait_page_queue *wait_page, + struct wait_page_key *key) + { + if (wait_page->page != key->page) - return 0; ++ return false; + key->page_match = 1; + + if (wait_page->bit_nr != key->bit_nr) - return 0; - - /* - * Stop walking if it's locked. - * Is this safe if put_and_wait_on_page_locked() is in use? - * Yes: the waker must hold a reference to this page, and if PG_locked - * has now already been set by another task, that task must also hold - * a reference to the *same usage* of this page; so there is no need - * to walk on to wake even the put_and_wait_on_page_locked() callers. - */ - if (test_bit(key->bit_nr, &key->page->flags)) - return -1; - - return 1; -} - -static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb, - struct wait_page_queue *wait, - wait_queue_func_t func, - void *data) -{ - /* Can't support async wakeup with polled IO */ - if (kiocb->ki_flags & IOCB_HIPRI) - return -EINVAL; - if (kiocb->ki_filp->f_mode & FMODE_BUF_RASYNC) { - wait->wait.func = func; - wait->wait.private = data; - wait->wait.flags = 0; - INIT_LIST_HEAD(&wait->wait.entry); - kiocb->ki_flags |= IOCB_WAITQ; - kiocb->ki_waitq = wait; - return 0; - } ++ return false; + - return -EOPNOTSUPP; ++ return true; + } + extern void __lock_page(struct page *page); extern int __lock_page_killable(struct page *page); + extern int __lock_page_async(struct page *page, struct wait_page_queue *wait); extern int __lock_page_or_retry(struct page *page, struct mm_struct *mm, unsigned int flags); extern void unlock_page(struct page *page); diff --cc mm/filemap.c index 991503bbf922c,a5b1fa8f7ce47..9f131f1cfde3f --- a/mm/filemap.c +++ b/mm/filemap.c @@@ -987,63 -987,17 +987,46 @@@ void __init pagecache_init(void page_writeback_init(); } - /* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */ - struct wait_page_key { - struct page *page; - int bit_nr; - int page_match; - }; - - struct wait_page_queue { - struct page *page; - int bit_nr; - wait_queue_entry_t wait; - }; - static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg) { + int ret; struct wait_page_key *key = arg; struct wait_page_queue *wait_page = container_of(wait, struct wait_page_queue, wait); - int ret; - if (wait_page->page != key->page) - return 0; - key->page_match = 1; - - if (wait_page->bit_nr != key->bit_nr) - ret = wake_page_match(wait_page, key); - if (ret != 1) - return ret; - return autoremove_wake_function(wait, mode, sync, key); ++ if (!wake_page_match(wait_page, key)) + return 0; + + /* + * If it's an exclusive wait, we get the bit for it, and + * stop walking if we can't. + * + * If it's a non-exclusive wait, then the fact that this + * wake function was called means that the bit already + * was cleared, and we don't care if somebody then + * re-took it. + */ + ret = 0; + if (wait->flags & WQ_FLAG_EXCLUSIVE) { + if (test_and_set_bit(key->bit_nr, &key->page->flags)) + return -1; + ret = 1; + } + wait->flags |= WQ_FLAG_WOKEN; + + wake_up_state(wait->private, mode); + + /* + * Ok, we have successfully done what we're waiting for, + * and we can unconditionally remove the wait entry. + * + * Note that this has to be the absolute last thing we do, + * since after list_del_init(&wait->entry) the wait entry + * might be de-allocated and the process might even have + * exited. + */ + list_del_init_careful(&wait->entry); + return ret; } static void wake_up_page_bit(struct page *page, int bit_nr) @@@ -2061,8 -2044,6 +2087,8 @@@ find_page page = find_get_page(mapping, index); if (!page) { - if (iocb->ki_flags & (IOCB_NOWAIT | IOCB_NOIO)) ++ if (iocb->ki_flags & IOCB_NOIO) + goto would_block; page_cache_sync_readahead(mapping, ra, filp, index, last_index - index); @@@ -2197,7 -2185,7 +2234,7 @@@ page_not_up_to_date_locked } readpage: - if (iocb->ki_flags & IOCB_NOIO) { - if (iocb->ki_flags & IOCB_NOWAIT) { ++ if (iocb->ki_flags & (IOCB_NOIO | IOCB_NOWAIT)) { unlock_page(page); put_page(page); goto would_block;