return ret;
 }
 
+static void btrfs_dio_private_put(struct btrfs_dio_private *dip)
+{
+       /*
+        * This implies a barrier so that stores to dio_bio->bi_status before
+        * this and loads of dio_bio->bi_status after this are fully ordered.
+        */
+       if (!refcount_dec_and_test(&dip->refs))
+               return;
+
+       if (bio_op(dip->dio_bio) == REQ_OP_WRITE) {
+               __endio_write_update_ordered(dip->inode, dip->logical_offset,
+                                            dip->bytes,
+                                            !dip->dio_bio->bi_status);
+       } else {
+               unlock_extent(&BTRFS_I(dip->inode)->io_tree,
+                             dip->logical_offset,
+                             dip->logical_offset + dip->bytes - 1);
+       }
+
+       dio_end_io(dip->dio_bio);
+       kfree(dip);
+}
+
 static inline blk_status_t submit_dio_repair_bio(struct inode *inode,
                                                 struct bio *bio,
                                                 int mirror_num)
        return err;
 }
 
-static blk_status_t btrfs_subio_endio_read(struct inode *inode,
-               struct btrfs_io_bio *io_bio, blk_status_t err)
+static blk_status_t btrfs_check_read_dio_bio(struct inode *inode,
+                                            struct btrfs_io_bio *io_bio,
+                                            blk_status_t err)
 {
        bool skip_csum = BTRFS_I(inode)->flags & BTRFS_INODE_NODATASUM;
 
        }
 }
 
-static void btrfs_endio_direct_read(struct bio *bio)
-{
-       struct btrfs_dio_private *dip = bio->bi_private;
-       struct inode *inode = dip->inode;
-       struct bio *dio_bio;
-       struct btrfs_io_bio *io_bio = btrfs_io_bio(bio);
-       blk_status_t err = bio->bi_status;
-
-       if (dip->flags & BTRFS_DIO_ORIG_BIO_SUBMITTED)
-               err = btrfs_subio_endio_read(inode, io_bio, err);
-
-       unlock_extent(&BTRFS_I(inode)->io_tree, dip->logical_offset,
-                     dip->logical_offset + dip->bytes - 1);
-       dio_bio = dip->dio_bio;
-
-       kfree(dip);
-
-       dio_bio->bi_status = err;
-       dio_end_io(dio_bio);
-       bio_put(bio);
-}
-
 static void __endio_write_update_ordered(struct inode *inode,
                                         const u64 offset, const u64 bytes,
                                         const bool uptodate)
        }
 }
 
-static void btrfs_endio_direct_write(struct bio *bio)
-{
-       struct btrfs_dio_private *dip = bio->bi_private;
-       struct bio *dio_bio = dip->dio_bio;
-
-       __endio_write_update_ordered(dip->inode, dip->logical_offset,
-                                    dip->bytes, !bio->bi_status);
-
-       kfree(dip);
-
-       dio_bio->bi_status = bio->bi_status;
-       dio_end_io(dio_bio);
-       bio_put(bio);
-}
-
 static blk_status_t btrfs_submit_bio_start_direct_io(void *private_data,
                                    struct bio *bio, u64 offset)
 {
                           (unsigned long long)bio->bi_iter.bi_sector,
                           bio->bi_iter.bi_size, err);
 
-       if (dip->subio_endio)
-               err = dip->subio_endio(dip->inode, btrfs_io_bio(bio), err);
-
-       if (err) {
-               /*
-                * We want to perceive the errors flag being set before
-                * decrementing the reference count. We don't need a barrier
-                * since atomic operations with a return value are fully
-                * ordered as per atomic_t.txt
-                */
-               dip->errors = 1;
+       if (bio_op(bio) == REQ_OP_READ) {
+               err = btrfs_check_read_dio_bio(dip->inode, btrfs_io_bio(bio),
+                                              err);
        }
 
-       /* if there are more bios still pending for this dio, just exit */
-       if (!refcount_dec_and_test(&dip->refs))
-               goto out;
+       if (err)
+               dip->dio_bio->bi_status = err;
 
-       if (dip->errors) {
-               bio_io_error(dip->orig_bio);
-       } else {
-               dip->dio_bio->bi_status = BLK_STS_OK;
-               bio_endio(dip->orig_bio);
-       }
-out:
        bio_put(bio);
+       btrfs_dio_private_put(dip);
 }
 
 static inline blk_status_t btrfs_submit_dio_bio(struct bio *bio,
        const bool csum = !(BTRFS_I(inode)->flags & BTRFS_INODE_NODATASUM);
        size_t dip_size;
        struct btrfs_dio_private *dip;
-       struct bio *bio;
 
        dip_size = sizeof(*dip);
        if (!write && csum) {
        if (!dip)
                return NULL;
 
-       bio = btrfs_bio_clone(dio_bio);
-       bio->bi_private = dip;
-       btrfs_io_bio(bio)->logical = file_offset;
-
        dip->inode = inode;
        dip->logical_offset = file_offset;
        dip->bytes = dio_bio->bi_iter.bi_size;
        dip->disk_bytenr = (u64)dio_bio->bi_iter.bi_sector << 9;
-       dip->orig_bio = bio;
        dip->dio_bio = dio_bio;
        refcount_set(&dip->refs, 1);
 
                        dip->bytes;
                dio_data->unsubmitted_oe_range_start =
                        dio_data->unsubmitted_oe_range_end;
-
-               bio->bi_end_io = btrfs_endio_direct_write;
-       } else {
-               bio->bi_end_io = btrfs_endio_direct_read;
-               dip->subio_endio = btrfs_subio_endio_read;
        }
        return dip;
 }
        const bool write = (bio_op(dio_bio) == REQ_OP_WRITE);
        const bool csum = !(BTRFS_I(inode)->flags & BTRFS_INODE_NODATASUM);
        struct btrfs_fs_info *fs_info = btrfs_sb(inode->i_sb);
+       const bool raid56 = (btrfs_data_alloc_profile(fs_info) &
+                            BTRFS_BLOCK_GROUP_RAID56_MASK);
        struct btrfs_dio_private *dip;
        struct bio *bio;
-       struct bio *orig_bio;
        u64 start_sector;
        int async_submit = 0;
        u64 submit_len;
                        goto out_err;
        }
 
-       orig_bio = dip->orig_bio;
-       start_sector = orig_bio->bi_iter.bi_sector;
-       submit_len = orig_bio->bi_iter.bi_size;
-       ret = btrfs_get_io_geometry(fs_info, btrfs_op(orig_bio),
-                                   start_sector << 9, submit_len, &geom);
-       if (ret)
-               goto out_err;
+       start_sector = dio_bio->bi_iter.bi_sector;
+       submit_len = dio_bio->bi_iter.bi_size;
 
-       if (geom.len >= submit_len) {
-               bio = orig_bio;
-               dip->flags |= BTRFS_DIO_ORIG_BIO_SUBMITTED;
-               goto submit;
-       }
-
-       /* async crcs make it difficult to collect full stripe writes. */
-       if (btrfs_data_alloc_profile(fs_info) & BTRFS_BLOCK_GROUP_RAID56_MASK)
-               async_submit = 0;
-       else
-               async_submit = 1;
-
-       /* bio split */
-       ASSERT(geom.len <= INT_MAX);
        do {
+               ret = btrfs_get_io_geometry(fs_info, btrfs_op(dio_bio),
+                                           start_sector << 9, submit_len,
+                                           &geom);
+               if (ret) {
+                       status = errno_to_blk_status(ret);
+                       goto out_err;
+               }
+               ASSERT(geom.len <= INT_MAX);
+
                clone_len = min_t(int, submit_len, geom.len);
 
                /*
                 * This will never fail as it's passing GPF_NOFS and
                 * the allocation is backed by btrfs_bioset.
                 */
-               bio = btrfs_bio_clone_partial(orig_bio, clone_offset,
-                                             clone_len);
+               bio = btrfs_bio_clone_partial(dio_bio, clone_offset, clone_len);
                bio->bi_private = dip;
                bio->bi_end_io = btrfs_end_dio_bio;
                btrfs_io_bio(bio)->logical = file_offset;
 
                ASSERT(submit_len >= clone_len);
                submit_len -= clone_len;
-               if (submit_len == 0)
-                       break;
 
                /*
                 * Increase the count before we submit the bio so we know
                 * the end IO handler won't happen before we increase the
                 * count. Otherwise, the dip might get freed before we're
                 * done setting it up.
+                *
+                * We transfer the initial reference to the last bio, so we
+                * don't need to increment the reference count for the last one.
                 */
-               refcount_inc(&dip->refs);
+               if (submit_len > 0) {
+                       refcount_inc(&dip->refs);
+                       /*
+                        * If we are submitting more than one bio, submit them
+                        * all asynchronously. The exception is RAID 5 or 6, as
+                        * asynchronous checksums make it difficult to collect
+                        * full stripe writes.
+                        */
+                       if (!raid56)
+                               async_submit = 1;
+               }
 
                status = btrfs_submit_dio_bio(bio, inode, file_offset,
                                                async_submit);
                if (status) {
                        bio_put(bio);
-                       refcount_dec(&dip->refs);
+                       if (submit_len > 0)
+                               refcount_dec(&dip->refs);
                        goto out_err;
                }
 
                clone_offset += clone_len;
                start_sector += clone_len >> 9;
                file_offset += clone_len;
-
-               ret = btrfs_get_io_geometry(fs_info, btrfs_op(orig_bio),
-                                     start_sector << 9, submit_len, &geom);
-               if (ret)
-                       goto out_err;
        } while (submit_len > 0);
+       return;
 
-submit:
-       status = btrfs_submit_dio_bio(bio, inode, file_offset, async_submit);
-       if (!status)
-               return;
-
-       if (bio != orig_bio)
-               bio_put(bio);
 out_err:
-       dip->errors = 1;
-       /*
-        * Before atomic variable goto zero, we must  make sure dip->errors is
-        * perceived to be set. This ordering is ensured by the fact that an
-        * atomic operations with a return value are fully ordered as per
-        * atomic_t.txt
-        */
-       if (refcount_dec_and_test(&dip->refs))
-               bio_io_error(dip->orig_bio);
+       dip->dio_bio->bi_status = status;
+       btrfs_dio_private_put(dip);
 }
 
 static ssize_t check_direct_IO(struct btrfs_fs_info *fs_info,