static void btree_node_will_make_reachable(struct btree_update *,
                                           struct btree *);
 static void btree_update_drop_new_node(struct bch_fs *, struct btree *);
-static void bch2_btree_set_root_ondisk(struct bch_fs *, struct btree *, int);
 
 /* Debug code: */
 
 }
 
 static void bch2_btree_node_free_ondisk(struct bch_fs *c,
-                                       struct pending_btree_node_free *pending)
+                       struct pending_btree_node_free *pending,
+                       u64 journal_seq)
 {
        BUG_ON(!pending->index_update_done);
 
        bch2_mark_key(c, bkey_i_to_s_c(&pending->key),
-                     0, 0, NULL, 0, BTREE_TRIGGER_OVERWRITE);
+                     0, 0, NULL, journal_seq, BTREE_TRIGGER_OVERWRITE);
 
        if (gc_visited(c, gc_phase(GC_PHASE_PENDING_DELETE)))
                bch2_mark_key(c, bkey_i_to_s_c(&pending->key),
-                             0, 0, NULL, 0,
+                             0, 0, NULL, journal_seq,
                              BTREE_TRIGGER_OVERWRITE|
                              BTREE_TRIGGER_GC);
 }
 {
        struct bch_fs *c = as->c;
 
+       bch2_journal_preres_put(&c->journal, &as->journal_preres);
+
+       bch2_journal_pin_drop(&c->journal, &as->journal);
        bch2_journal_pin_flush(&c->journal, &as->journal);
 
-       BUG_ON(as->nr_new_nodes);
-       BUG_ON(as->nr_pending);
+       BUG_ON((as->nr_new_nodes || as->nr_pending) &&
+              !bch2_journal_error(&c->journal));;
 
        if (as->reserve)
                bch2_btree_reserve_put(c, as->reserve);
        mutex_unlock(&c->btree_interior_update_lock);
 }
 
-static void btree_update_nodes_reachable(struct closure *cl)
+static void btree_update_nodes_reachable(struct btree_update *as, u64 seq)
 {
-       struct btree_update *as = container_of(cl, struct btree_update, cl);
        struct bch_fs *c = as->c;
 
-       bch2_journal_pin_drop(&c->journal, &as->journal);
-
        mutex_lock(&c->btree_interior_update_lock);
 
        while (as->nr_new_nodes) {
        }
 
        while (as->nr_pending)
-               bch2_btree_node_free_ondisk(c, &as->pending[--as->nr_pending]);
+               bch2_btree_node_free_ondisk(c, &as->pending[--as->nr_pending],
+                                           seq);
 
        mutex_unlock(&c->btree_interior_update_lock);
-
-       closure_wake_up(&as->wait);
-
-       bch2_btree_update_free(as);
-}
-
-static void btree_update_wait_on_journal(struct closure *cl)
-{
-       struct btree_update *as = container_of(cl, struct btree_update, cl);
-       struct bch_fs *c = as->c;
-       int ret;
-
-       ret = bch2_journal_open_seq_async(&c->journal, as->journal_seq, cl);
-       if (ret == -EAGAIN) {
-               continue_at(cl, btree_update_wait_on_journal, system_wq);
-               return;
-       }
-       if (ret < 0)
-               goto err;
-
-       bch2_journal_flush_seq_async(&c->journal, as->journal_seq, cl);
-err:
-       continue_at(cl, btree_update_nodes_reachable, system_wq);
 }
 
 static void btree_update_nodes_written(struct closure *cl)
 {
        struct btree_update *as = container_of(cl, struct btree_update, cl);
+       struct journal_res res = { 0 };
        struct bch_fs *c = as->c;
        struct btree *b;
+       struct bset *i;
+       struct bkey_i *k;
+       unsigned journal_u64s = 0;
+       int ret;
 
        /*
         * We did an update to a parent node where the pointers we added pointed
         */
        mutex_lock(&c->btree_interior_update_lock);
        as->nodes_written = true;
-retry:
+again:
        as = list_first_entry_or_null(&c->btree_interior_updates_unwritten,
                                      struct btree_update, unwritten_list);
        if (!as || !as->nodes_written) {
                return;
        }
 
+       b = as->b;
+       if (b && !six_trylock_intent(&b->c.lock)) {
+               mutex_unlock(&c->btree_interior_update_lock);
+               btree_node_lock_type(c, b, SIX_LOCK_intent);
+               six_unlock_intent(&b->c.lock);
+               goto out;
+       }
+
+       journal_u64s = 0;
+
+       if (as->mode != BTREE_INTERIOR_UPDATING_ROOT)
+               for_each_keylist_key(&as->parent_keys, k)
+                       journal_u64s += jset_u64s(k->k.u64s);
+
+       ret = bch2_journal_res_get(&c->journal, &res, journal_u64s,
+                                  JOURNAL_RES_GET_RESERVED);
+       if (ret) {
+               BUG_ON(!bch2_journal_error(&c->journal));
+               /* can't unblock btree writes */
+               goto free_update;
+       }
+
+       if (as->mode != BTREE_INTERIOR_UPDATING_ROOT)
+               for_each_keylist_key(&as->parent_keys, k)
+                       bch2_journal_add_entry(&c->journal, &res,
+                                              BCH_JSET_ENTRY_btree_keys,
+                                              as->btree_id,
+                                              as->level,
+                                              k, k->k.u64s);
+
        switch (as->mode) {
        case BTREE_INTERIOR_NO_UPDATE:
                BUG();
        case BTREE_INTERIOR_UPDATING_NODE:
-               /* The usual case: */
-               b = READ_ONCE(as->b);
-
-               if (!six_trylock_read(&b->c.lock)) {
-                       mutex_unlock(&c->btree_interior_update_lock);
-                       btree_node_lock_type(c, b, SIX_LOCK_read);
-                       six_unlock_read(&b->c.lock);
-                       mutex_lock(&c->btree_interior_update_lock);
-                       goto retry;
-               }
-
-               BUG_ON(!btree_node_dirty(b));
-               closure_wait(&btree_current_write(b)->wait, &as->cl);
+               /* @b is the node we did the final insert into: */
+               BUG_ON(!res.ref);
 
+               six_lock_write(&b->c.lock, NULL, NULL);
                list_del(&as->write_blocked_list);
 
-               /*
-                * for flush_held_btree_writes() waiting on updates to flush or
-                * nodes to be writeable:
-                */
-               closure_wake_up(&c->btree_interior_update_wait);
+               i = btree_bset_last(b);
+               i->journal_seq = cpu_to_le64(
+                       max(res.seq,
+                           le64_to_cpu(i->journal_seq)));
+
+               bch2_btree_add_journal_pin(c, b, res.seq);
+               six_unlock_write(&b->c.lock);
 
                list_del(&as->unwritten_list);
                mutex_unlock(&c->btree_interior_update_lock);
                 * b->write_blocked prevented it from being written, so
                 * write it now if it needs to be written:
                 */
-               bch2_btree_node_write_cond(c, b, true);
-               six_unlock_read(&b->c.lock);
-               continue_at(&as->cl, btree_update_nodes_reachable, system_wq);
+               btree_node_write_if_need(c, b, SIX_LOCK_intent);
+               six_unlock_intent(&b->c.lock);
                break;
 
        case BTREE_INTERIOR_UPDATING_AS:
-               /*
-                * The btree node we originally updated has been freed and is
-                * being rewritten - so we need to write anything here, we just
-                * need to signal to that btree_update that it's ok to make the
-                * new replacement node visible:
-                */
-               closure_put(&as->parent_as->cl);
-
-               /*
-                * and then we have to wait on that btree_update to finish:
-                */
-               closure_wait(&as->parent_as->wait, &as->cl);
+               BUG_ON(b);
 
                list_del(&as->unwritten_list);
                mutex_unlock(&c->btree_interior_update_lock);
-
-               continue_at(&as->cl, btree_update_nodes_reachable, system_wq);
                break;
 
-       case BTREE_INTERIOR_UPDATING_ROOT:
-               /* b is the new btree root: */
-               b = READ_ONCE(as->b);
-
-               if (!six_trylock_read(&b->c.lock)) {
-                       mutex_unlock(&c->btree_interior_update_lock);
-                       btree_node_lock_type(c, b, SIX_LOCK_read);
-                       six_unlock_read(&b->c.lock);
-                       mutex_lock(&c->btree_interior_update_lock);
-                       goto retry;
-               }
-
-               BUG_ON(c->btree_roots[b->c.btree_id].as != as);
-               c->btree_roots[b->c.btree_id].as = NULL;
+       case BTREE_INTERIOR_UPDATING_ROOT: {
+               struct btree_root *r = &c->btree_roots[as->btree_id];
 
-               bch2_btree_set_root_ondisk(c, b, WRITE);
+               BUG_ON(b);
 
-               /*
-                * We don't have to wait anything anything here (before
-                * btree_update_nodes_reachable frees the old nodes
-                * ondisk) - we've ensured that the very next journal write will
-                * have the pointer to the new root, and before the allocator
-                * can reuse the old nodes it'll have to do a journal commit:
-                */
-               six_unlock_read(&b->c.lock);
+               mutex_lock(&c->btree_root_lock);
+               bkey_copy(&r->key, as->parent_keys.keys);
+               r->level = as->level;
+               r->alive = true;
+               c->btree_roots_dirty = true;
+               mutex_unlock(&c->btree_root_lock);
 
                list_del(&as->unwritten_list);
                mutex_unlock(&c->btree_interior_update_lock);
-
-               /*
-                * Bit of funny circularity going on here we have to break:
-                *
-                * We have to drop our journal pin before writing the journal
-                * entry that points to the new btree root: else, we could
-                * deadlock if the journal currently happens to be full.
-                *
-                * This mean we're dropping the journal pin _before_ the new
-                * nodes are technically reachable - but this is safe, because
-                * after the bch2_btree_set_root_ondisk() call above they will
-                * be reachable as of the very next journal write:
-                */
-               bch2_journal_pin_drop(&c->journal, &as->journal);
-
-               as->journal_seq = bch2_journal_last_unwritten_seq(&c->journal);
-
-               btree_update_wait_on_journal(&as->cl);
                break;
        }
+       }
 
+       bch2_journal_pin_drop(&c->journal, &as->journal);
+
+       bch2_journal_res_put(&c->journal, &res);
+       bch2_journal_preres_put(&c->journal, &as->journal_preres);
+
+       btree_update_nodes_reachable(as, res.seq);
+free_update:
+       bch2_btree_update_free(as);
+       /*
+        * for flush_held_btree_writes() waiting on updates to flush or
+        * nodes to be writeable:
+        */
+       closure_wake_up(&c->btree_interior_update_wait);
+out:
        mutex_lock(&c->btree_interior_update_lock);
-       goto retry;
+       goto again;
 }
 
 /*
        BUG_ON(as->mode != BTREE_INTERIOR_NO_UPDATE);
        BUG_ON(!btree_node_dirty(b));
 
-       as->mode = BTREE_INTERIOR_UPDATING_NODE;
-       as->b = b;
+       as->mode        = BTREE_INTERIOR_UPDATING_NODE;
+       as->b           = b;
+       as->level       = b->c.level;
        list_add(&as->write_blocked_list, &b->write_blocked);
 
        mutex_unlock(&c->btree_interior_update_lock);
-
-       /*
-        * In general, when you're staging things in a journal that will later
-        * be written elsewhere, and you also want to guarantee ordering: that
-        * is, if you have updates a, b, c, after a crash you should never see c
-        * and not a or b - there's a problem:
-        *
-        * If the final destination of the update(s) (i.e. btree node) can be
-        * written/flushed _before_ the relevant journal entry - oops, that
-        * breaks ordering, since the various leaf nodes can be written in any
-        * order.
-        *
-        * Normally we use bset->journal_seq to deal with this - if during
-        * recovery we find a btree node write that's newer than the newest
-        * journal entry, we just ignore it - we don't need it, anything we're
-        * supposed to have (that we reported as completed via fsync()) will
-        * still be in the journal, and as far as the state of the journal is
-        * concerned that btree node write never happened.
-        *
-        * That breaks when we're rewriting/splitting/merging nodes, since we're
-        * mixing btree node writes that haven't happened yet with previously
-        * written data that has been reported as completed to the journal.
-        *
-        * Thus, before making the new nodes reachable, we have to wait the
-        * newest journal sequence number we have data for to be written (if it
-        * hasn't been yet).
-        */
-       bch2_journal_wait_on_seq(&c->journal, as->journal_seq, &as->cl);
-}
-
-static void interior_update_flush(struct journal *j,
-                       struct journal_entry_pin *pin, u64 seq)
-{
-       struct btree_update *as =
-               container_of(pin, struct btree_update, journal);
-
-       bch2_journal_flush_seq_async(j, as->journal_seq, NULL);
 }
 
 static void btree_update_reparent(struct btree_update *as,
 {
        struct bch_fs *c = as->c;
 
+       lockdep_assert_held(&c->btree_interior_update_lock);
+
        child->b = NULL;
        child->mode = BTREE_INTERIOR_UPDATING_AS;
-       child->parent_as = as;
-       closure_get(&as->cl);
 
        /*
         * When we write a new btree root, we have to drop our journal pin
         * just transfer the journal pin to the new interior update so
         * btree_update_nodes_written() can drop it.
         */
-       bch2_journal_pin_copy(&c->journal, &as->journal,
-                             &child->journal, interior_update_flush);
+       bch2_journal_pin_copy(&c->journal, &as->journal, &child->journal, NULL);
        bch2_journal_pin_drop(&c->journal, &child->journal);
-
-       as->journal_seq = max(as->journal_seq, child->journal_seq);
 }
 
-static void btree_update_updated_root(struct btree_update *as)
+static void btree_update_updated_root(struct btree_update *as, struct btree *b)
 {
        struct bch_fs *c = as->c;
-       struct btree_root *r = &c->btree_roots[as->btree_id];
-
-       mutex_lock(&c->btree_interior_update_lock);
-       list_add_tail(&as->unwritten_list, &c->btree_interior_updates_unwritten);
 
        BUG_ON(as->mode != BTREE_INTERIOR_NO_UPDATE);
+       BUG_ON(!bch2_keylist_empty(&as->parent_keys));
 
-       /*
-        * Old root might not be persistent yet - if so, redirect its
-        * btree_update operation to point to us:
-        */
-       if (r->as)
-               btree_update_reparent(as, r->as);
-
-       as->mode = BTREE_INTERIOR_UPDATING_ROOT;
-       as->b = r->b;
-       r->as = as;
+       mutex_lock(&c->btree_interior_update_lock);
+       list_add_tail(&as->unwritten_list, &c->btree_interior_updates_unwritten);
 
+       as->mode        = BTREE_INTERIOR_UPDATING_ROOT;
+       as->level       = b->c.level;
+       bch2_keylist_add(&as->parent_keys, &b->key);
        mutex_unlock(&c->btree_interior_update_lock);
-
-       /*
-        * When we're rewriting nodes and updating interior nodes, there's an
-        * issue with updates that haven't been written in the journal getting
-        * mixed together with older data - see btree_update_updated_node()
-        * for the explanation.
-        *
-        * However, this doesn't affect us when we're writing a new btree root -
-        * because to make that new root reachable we have to write out a new
-        * journal entry, which must necessarily be newer than as->journal_seq.
-        */
 }
 
 static void btree_node_will_make_reachable(struct btree_update *as,
                                               struct btree *b)
 {
        struct bch_fs *c = as->c;
-       struct closure *cl, *cl_n;
        struct btree_update *p, *n;
        struct btree_write *w;
-       struct bset_tree *t;
 
        set_btree_node_dying(b);
 
 
        btree_interior_update_add_node_reference(as, b);
 
-       /*
-        * Does this node have data that hasn't been written in the journal?
-        *
-        * If so, we have to wait for the corresponding journal entry to be
-        * written before making the new nodes reachable - we can't just carry
-        * over the bset->journal_seq tracking, since we'll be mixing those keys
-        * in with keys that aren't in the journal anymore:
-        */
-       for_each_bset(b, t)
-               as->journal_seq = max(as->journal_seq,
-                                     le64_to_cpu(bset(b, t)->journal_seq));
-
        mutex_lock(&c->btree_interior_update_lock);
 
        /*
 
        clear_btree_node_dirty(b);
        clear_btree_node_need_write(b);
-       w = btree_current_write(b);
-
-       /*
-        * Does this node have any btree_update operations waiting on this node
-        * to be written?
-        *
-        * If so, wake them up when this btree_update operation is reachable:
-        */
-       llist_for_each_entry_safe(cl, cl_n, llist_del_all(&w->wait.list), list)
-               llist_add(&cl->list, &as->wait.list);
 
        /*
         * Does this node have unwritten data that has a pin on the journal?
         * oldest pin of any of the nodes we're freeing. We'll release the pin
         * when the new nodes are persistent and reachable on disk:
         */
-       bch2_journal_pin_copy(&c->journal, &as->journal,
-                             &w->journal, interior_update_flush);
+       w = btree_current_write(b);
+       bch2_journal_pin_copy(&c->journal, &as->journal, &w->journal, NULL);
        bch2_journal_pin_drop(&c->journal, &w->journal);
 
        w = btree_prev_write(b);
-       bch2_journal_pin_copy(&c->journal, &as->journal,
-                             &w->journal, interior_update_flush);
+       bch2_journal_pin_copy(&c->journal, &as->journal, &w->journal, NULL);
        bch2_journal_pin_drop(&c->journal, &w->journal);
 
        mutex_unlock(&c->btree_interior_update_lock);
 {
        struct btree_reserve *reserve;
        struct btree_update *as;
+       int ret;
 
        reserve = bch2_btree_reserve_get(c, nr_nodes, flags, cl);
        if (IS_ERR(reserve))
 
        bch2_keylist_init(&as->parent_keys, as->inline_keys);
 
+       ret = bch2_journal_preres_get(&c->journal, &as->journal_preres,
+                                jset_u64s(BKEY_BTREE_PTR_U64s_MAX) * 3, 0);
+       if (ret) {
+               bch2_btree_reserve_put(c, reserve);
+               closure_debug_destroy(&as->cl);
+               mempool_free(as, &c->btree_interior_update_pool);
+               return ERR_PTR(ret);
+       }
+
        mutex_lock(&c->btree_interior_update_lock);
        list_add_tail(&as->list, &c->btree_interior_update_list);
        mutex_unlock(&c->btree_interior_update_lock);
        mutex_unlock(&c->btree_interior_update_lock);
 }
 
-static void bch2_btree_set_root_ondisk(struct bch_fs *c, struct btree *b, int rw)
-{
-       struct btree_root *r = &c->btree_roots[b->c.btree_id];
-
-       mutex_lock(&c->btree_root_lock);
-
-       BUG_ON(b != r->b);
-       bkey_copy(&r->key, &b->key);
-       r->level = b->c.level;
-       r->alive = true;
-       if (rw == WRITE)
-               c->btree_roots_dirty = true;
-
-       mutex_unlock(&c->btree_root_lock);
-}
-
 /**
  * bch_btree_set_root - update the root in memory and on disk
  *
 
        bch2_btree_set_root_inmem(as, b);
 
-       btree_update_updated_root(as);
+       btree_update_updated_root(as, b);
 
        /*
         * Unlock old root after new root is visible:
                bch2_btree_build_aux_trees(n1);
                six_unlock_write(&n1->c.lock);
 
-               bch2_keylist_add(&as->parent_keys, &n1->key);
+               if (parent)
+                       bch2_keylist_add(&as->parent_keys, &n1->key);
        }
 
        bch2_btree_node_write(c, n1, SIX_LOCK_intent);
               (bkey_cmp_packed(b, k, &insert->k) >= 0))
                ;
 
-       while (!bch2_keylist_empty(keys)) {
-               insert = bch2_keylist_front(keys);
-
+       for_each_keylist_key(keys, insert)
                bch2_insert_fixup_btree_ptr(as, b, iter, insert, &node_iter);
-               bch2_keylist_pop_front(keys);
-       }
 
        btree_update_updated_node(as, b);
 
                        bkey_copy(&b->key, new_key);
                }
 
-               btree_update_updated_root(as);
+               btree_update_updated_root(as, b);
                bch2_btree_node_unlock_write(b, iter);
        }