/*
  * Get us an open_bucket we can allocate from, return with it locked:
  */
-struct write_point *bch2_alloc_sectors_start(struct bch_fs *c,
-                               unsigned target,
-                               unsigned erasure_code,
-                               struct write_point_specifier write_point,
-                               struct bch_devs_list *devs_have,
-                               unsigned nr_replicas,
-                               unsigned nr_replicas_required,
-                               enum alloc_reserve reserve,
-                               unsigned flags,
-                               struct closure *cl)
+int bch2_alloc_sectors_start(struct bch_fs *c,
+                            unsigned target,
+                            unsigned erasure_code,
+                            struct write_point_specifier write_point,
+                            struct bch_devs_list *devs_have,
+                            unsigned nr_replicas,
+                            unsigned nr_replicas_required,
+                            enum alloc_reserve reserve,
+                            unsigned flags,
+                            struct closure *cl,
+                            struct write_point **wp_ret)
 {
        struct write_point *wp;
        struct open_bucket *ob;
        write_points_nr = c->write_points_nr;
        have_cache      = false;
 
-       wp = writepoint_find(c, write_point.v);
+       *wp_ret = wp = writepoint_find(c, write_point.v);
 
        if (wp->data_type == BCH_DATA_user)
                ob_flags |= BUCKET_MAY_ALLOC_PARTIAL;
 
        BUG_ON(!wp->sectors_free || wp->sectors_free == UINT_MAX);
 
-       return wp;
+       return 0;
 err:
        open_bucket_for_each(c, &wp->ptrs, ob, i)
                if (ptrs.nr < ARRAY_SIZE(ptrs.v))
        switch (ret) {
        case -OPEN_BUCKETS_EMPTY:
        case -FREELIST_EMPTY:
-               return cl ? ERR_PTR(-EAGAIN) : ERR_PTR(-ENOSPC);
+               return cl ? -EAGAIN : -ENOSPC;
        case -INSUFFICIENT_DEVICES:
-               return ERR_PTR(-EROFS);
+               return -EROFS;
        default:
                BUG();
        }
 void bch2_alloc_sectors_append_ptrs(struct bch_fs *c, struct write_point *wp,
                                    struct bkey_i *k, unsigned sectors,
                                    bool cached)
-
 {
        struct open_bucket *ob;
        unsigned i;
 
        BUG_ON(sectors > wp->sectors_free);
-       wp->sectors_free -= sectors;
+       wp->sectors_free        -= sectors;
+       wp->sectors_allocated   += sectors;
 
        open_bucket_for_each(c, &wp->ptrs, ob, i) {
                struct bch_dev *ca = bch_dev_bkey_exists(c, ob->dev);
 {
        mutex_init(&wp->lock);
        wp->data_type = type;
+
+       INIT_WORK(&wp->index_update_work, bch2_write_point_do_index_updates);
+       INIT_LIST_HEAD(&wp->writes);
+       spin_lock_init(&wp->writes_lock);
 }
 
 void bch2_fs_allocator_foreground_init(struct bch_fs *c)
        }
 
 }
+
+static const char * const bch2_write_point_states[] = {
+#define x(n)   #n,
+       WRITE_POINT_STATES()
+#undef x
+       NULL
+};
+
+void bch2_write_points_to_text(struct printbuf *out, struct bch_fs *c)
+{
+       struct write_point *wp;
+       unsigned i;
+
+       for (wp = c->write_points;
+            wp < c->write_points + ARRAY_SIZE(c->write_points);
+            wp++) {
+               pr_buf(out, "%lu: ", wp->write_point);
+               bch2_hprint(out, wp->sectors_allocated);
+
+               pr_buf(out, " last wrote: ");
+               bch2_pr_time_units(out, sched_clock() - wp->last_used);
+
+               for (i = 0; i < WRITE_POINT_STATE_NR; i++) {
+                       pr_buf(out, " %s: ", bch2_write_point_states[i]);
+                       bch2_pr_time_units(out, wp->time[i]);
+               }
+
+               pr_newline(out);
+       }
+}
 
        }
 }
 
-static void __bch2_write(struct closure *);
+static void __bch2_write(struct bch_write_op *);
 
 static void bch2_write_done(struct closure *cl)
 {
        goto out;
 }
 
+static inline void __wp_update_state(struct write_point *wp, enum write_point_state state)
+{
+       if (state != wp->state) {
+               u64 now = ktime_get_ns();
+
+               if (wp->last_state_change &&
+                   time_after64(now, wp->last_state_change))
+                       wp->time[wp->state] += now - wp->last_state_change;
+               wp->state = state;
+               wp->last_state_change = now;
+       }
+}
+
+static inline void wp_update_state(struct write_point *wp, bool running)
+{
+       enum write_point_state state;
+
+       state = running                  ? WRITE_POINT_running :
+               !list_empty(&wp->writes) ? WRITE_POINT_waiting_io
+                                        : WRITE_POINT_stopped;
+
+       __wp_update_state(wp, state);
+}
+
 static void bch2_write_index(struct closure *cl)
 {
        struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
-       struct bch_fs *c = op->c;
+       struct write_point *wp = op->wp;
+       struct workqueue_struct *wq = index_update_wq(op);
 
-       __bch2_write_index(op);
+       barrier();
 
-       if (!(op->flags & BCH_WRITE_DONE)) {
-               continue_at(cl, __bch2_write, index_update_wq(op));
-       } else if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
-               bch2_journal_flush_seq_async(&c->journal,
-                                            *op_journal_seq(op),
-                                            cl);
-               continue_at(cl, bch2_write_done, index_update_wq(op));
-       } else {
-               continue_at_nobarrier(cl, bch2_write_done, NULL);
+       /*
+        * We're not using wp->writes_lock here, so this is racey: that's ok,
+        * because this is just for diagnostic purposes, and we're running out
+        * of interrupt context here so if we were to take the log we'd have to
+        * switch to spin_lock_irq()/irqsave(), which is not free:
+        */
+       if (wp->state == WRITE_POINT_waiting_io)
+               __wp_update_state(wp, WRITE_POINT_waiting_work);
+
+       op->btree_update_ready = true;
+       queue_work(wq, &wp->index_update_work);
+}
+
+void bch2_write_point_do_index_updates(struct work_struct *work)
+{
+       struct write_point *wp =
+               container_of(work, struct write_point, index_update_work);
+       struct bch_write_op *op;
+
+       while (1) {
+               spin_lock(&wp->writes_lock);
+               list_for_each_entry(op, &wp->writes, wp_list)
+                       if (op->btree_update_ready) {
+                               list_del(&op->wp_list);
+                               goto unlock;
+                       }
+               op = NULL;
+unlock:
+               wp_update_state(wp, op != NULL);
+               spin_unlock(&wp->writes_lock);
+
+               if (!op)
+                       break;
+
+               op->flags |= BCH_WRITE_IN_WORKER;
+
+               __bch2_write_index(op);
+
+               if (!(op->flags & BCH_WRITE_DONE)) {
+                       __bch2_write(op);
+               } else if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
+                       bch2_journal_flush_seq_async(&op->c->journal,
+                                                    *op_journal_seq(op),
+                                                    &op->cl);
+                       continue_at(&op->cl, bch2_write_done, index_update_wq(op));
+               } else {
+                       bch2_write_done(&op->cl);
+               }
        }
 }
 
 
        if (parent)
                bio_endio(&parent->bio);
-       else if (!(op->flags & BCH_WRITE_SKIP_CLOSURE_PUT))
-               closure_put(cl);
        else
-               continue_at_nobarrier(cl, bch2_write_index, index_update_wq(op));
+               closure_put(cl);
 }
 
 static void init_append_extent(struct bch_write_op *op,
        return ret;
 }
 
-static void __bch2_write(struct closure *cl)
+static void __bch2_write(struct bch_write_op *op)
 {
-       struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
        struct bch_fs *c = op->c;
-       struct write_point *wp;
+       struct write_point *wp = NULL;
        struct bio *bio = NULL;
-       bool skip_put = true;
        unsigned nofs_flags;
        int ret;
 
        nofs_flags = memalloc_nofs_save();
 again:
        memset(&op->failed, 0, sizeof(op->failed));
+       op->btree_update_ready = false;
 
        do {
                struct bkey_i *key_to_write;
                /* +1 for possible cache device: */
                if (op->open_buckets.nr + op->nr_replicas + 1 >
                    ARRAY_SIZE(op->open_buckets.v))
-                       goto flush_io;
+                       break;
 
                if (bch2_keylist_realloc(&op->insert_keys,
                                        op->inline_keys,
                                        ARRAY_SIZE(op->inline_keys),
                                        BKEY_EXTENT_U64s_MAX))
-                       goto flush_io;
+                       break;
 
                if ((op->flags & BCH_WRITE_FROM_INTERNAL) &&
                    percpu_ref_is_dying(&c->writes)) {
                 * freeing up space on specific disks, which means that
                 * allocations for specific disks may hang arbitrarily long:
                 */
-               wp = bch2_alloc_sectors_start(c,
+               ret = bch2_alloc_sectors_start(c,
                        op->target,
                        op->opts.erasure_code && !(op->flags & BCH_WRITE_CACHED),
                        op->write_point,
                        op->alloc_reserve,
                        op->flags,
                        (op->flags & (BCH_WRITE_ALLOC_NOWAIT|
-                                     BCH_WRITE_ONLY_SPECIFIED_DEVS)) ? NULL : cl);
-               EBUG_ON(!wp);
-
-               if (unlikely(IS_ERR(wp))) {
-                       if (unlikely(PTR_ERR(wp) != -EAGAIN)) {
-                               ret = PTR_ERR(wp);
+                                     BCH_WRITE_ONLY_SPECIFIED_DEVS))
+                       ? NULL : &op->cl,
+                       &wp);
+               if (unlikely(ret)) {
+                       if (unlikely(ret != -EAGAIN))
                                goto err;
-                       }
 
-                       goto flush_io;
+                       break;
                }
 
-               /*
-                * It's possible for the allocator to fail, put us on the
-                * freelist waitlist, and then succeed in one of various retry
-                * paths: if that happens, we need to disable the skip_put
-                * optimization because otherwise there won't necessarily be a
-                * barrier before we free the bch_write_op:
-                */
-               if (atomic_read(&cl->remaining) & CLOSURE_WAITING)
-                       skip_put = false;
+               EBUG_ON(!wp);
 
                bch2_open_bucket_get(c, wp, &op->open_buckets);
                ret = bch2_write_extent(op, wp, &bio);
+
                bch2_alloc_sectors_done(c, wp);
 
                if (ret < 0)
                        goto err;
 
-               if (ret) {
-                       skip_put = false;
-               } else {
-                       /*
-                        * for the skip_put optimization this has to be set
-                        * before we submit the bio:
-                        */
+               if (!ret)
                        op->flags |= BCH_WRITE_DONE;
-               }
 
                bio->bi_end_io  = bch2_write_endio;
                bio->bi_private = &op->cl;
                bio->bi_opf |= REQ_OP_WRITE;
 
-               if (!skip_put)
-                       closure_get(bio->bi_private);
-               else
-                       op->flags |= BCH_WRITE_SKIP_CLOSURE_PUT;
+               closure_get(bio->bi_private);
 
                key_to_write = (void *) (op->insert_keys.keys_p +
                                         key_to_write_offset);
                bch2_submit_wbio_replicas(to_wbio(bio), c, BCH_DATA_user,
                                          key_to_write);
        } while (ret);
-
-       if (!skip_put)
-               continue_at(cl, bch2_write_index, index_update_wq(op));
 out:
-       memalloc_nofs_restore(nofs_flags);
-       return;
-err:
-       op->error = ret;
-       op->flags |= BCH_WRITE_DONE;
-
-       continue_at(cl, bch2_write_index, index_update_wq(op));
-       goto out;
-flush_io:
        /*
         * If the write can't all be submitted at once, we generally want to
         * block synchronously as that signals backpressure to the caller.
-        *
-        * However, if we're running out of a workqueue, we can't block here
-        * because we'll be blocking other work items from completing:
         */
-       if (current->flags & PF_WQ_WORKER) {
-               continue_at(cl, bch2_write_index, index_update_wq(op));
-               goto out;
-       }
-
-       closure_sync(cl);
-
-       if (!bch2_keylist_empty(&op->insert_keys)) {
+       if (!(op->flags & BCH_WRITE_DONE) &&
+           !(op->flags & BCH_WRITE_IN_WORKER)) {
+               closure_sync(&op->cl);
                __bch2_write_index(op);
 
-               if (op->error) {
-                       op->flags |= BCH_WRITE_DONE;
-                       continue_at_nobarrier(cl, bch2_write_done, NULL);
-                       goto out;
-               }
+               if (!(op->flags & BCH_WRITE_DONE))
+                       goto again;
+               bch2_write_done(&op->cl);
+       } else {
+               spin_lock(&wp->writes_lock);
+               op->wp = wp;
+               list_add_tail(&op->wp_list, &wp->writes);
+               if (wp->state == WRITE_POINT_stopped)
+                       __wp_update_state(wp, WRITE_POINT_waiting_io);
+               spin_unlock(&wp->writes_lock);
+
+               continue_at(&op->cl, bch2_write_index, NULL);
        }
 
-       goto again;
+       memalloc_nofs_restore(nofs_flags);
+       return;
+err:
+       op->error = ret;
+       op->flags |= BCH_WRITE_DONE;
+       goto out;
 }
 
 static void bch2_write_data_inline(struct bch_write_op *op, unsigned data_len)
 {
-       struct closure *cl = &op->cl;
        struct bio *bio = &op->wbio.bio;
        struct bvec_iter iter;
        struct bkey_i_inline_data *id;
        unsigned sectors;
        int ret;
 
+       op->flags |= BCH_WRITE_WROTE_DATA_INLINE;
+       op->flags |= BCH_WRITE_DONE;
+
        bch2_check_set_feature(op->c, BCH_FEATURE_inline_data);
 
        ret = bch2_keylist_realloc(&op->insert_keys, op->inline_keys,
        set_bkey_val_bytes(&id->k, data_len);
        bch2_keylist_push(&op->insert_keys);
 
-       op->flags |= BCH_WRITE_WROTE_DATA_INLINE;
-       op->flags |= BCH_WRITE_DONE;
-
-       continue_at_nobarrier(cl, bch2_write_index, NULL);
-       return;
+       __bch2_write_index(op);
 err:
        bch2_write_done(&op->cl);
 }
        struct bch_fs *c = op->c;
        unsigned data_len;
 
+       EBUG_ON(op->cl.parent);
        BUG_ON(!op->nr_replicas);
        BUG_ON(!op->write_point.v);
        BUG_ON(!bkey_cmp(op->pos, POS_MAX));
                return;
        }
 
-       continue_at_nobarrier(cl, __bch2_write, NULL);
+       __bch2_write(op);
        return;
 err:
        bch2_disk_reservation_put(c, &op->res);
 
-       if (op->end_io) {
-               EBUG_ON(cl->parent);
-               closure_debug_destroy(cl);
+       closure_debug_destroy(&op->cl);
+       if (op->end_io)
                op->end_io(op);
-       } else {
-               closure_return(cl);
-       }
 }
 
 /* Cache promotion on read */