static void btree_node_will_make_reachable(struct btree_update *,
struct btree *);
static void btree_update_drop_new_node(struct bch_fs *, struct btree *);
-static void bch2_btree_set_root_ondisk(struct bch_fs *, struct btree *, int);
/* Debug code: */
}
static void bch2_btree_node_free_ondisk(struct bch_fs *c,
- struct pending_btree_node_free *pending)
+ struct pending_btree_node_free *pending,
+ u64 journal_seq)
{
BUG_ON(!pending->index_update_done);
bch2_mark_key(c, bkey_i_to_s_c(&pending->key),
- 0, 0, NULL, 0, BTREE_TRIGGER_OVERWRITE);
+ 0, 0, NULL, journal_seq, BTREE_TRIGGER_OVERWRITE);
if (gc_visited(c, gc_phase(GC_PHASE_PENDING_DELETE)))
bch2_mark_key(c, bkey_i_to_s_c(&pending->key),
- 0, 0, NULL, 0,
+ 0, 0, NULL, journal_seq,
BTREE_TRIGGER_OVERWRITE|
BTREE_TRIGGER_GC);
}
{
struct bch_fs *c = as->c;
+ bch2_journal_preres_put(&c->journal, &as->journal_preres);
+
+ bch2_journal_pin_drop(&c->journal, &as->journal);
bch2_journal_pin_flush(&c->journal, &as->journal);
- BUG_ON(as->nr_new_nodes);
- BUG_ON(as->nr_pending);
+ BUG_ON((as->nr_new_nodes || as->nr_pending) &&
+ !bch2_journal_error(&c->journal));;
if (as->reserve)
bch2_btree_reserve_put(c, as->reserve);
mutex_unlock(&c->btree_interior_update_lock);
}
-static void btree_update_nodes_reachable(struct closure *cl)
+static void btree_update_nodes_reachable(struct btree_update *as, u64 seq)
{
- struct btree_update *as = container_of(cl, struct btree_update, cl);
struct bch_fs *c = as->c;
- bch2_journal_pin_drop(&c->journal, &as->journal);
-
mutex_lock(&c->btree_interior_update_lock);
while (as->nr_new_nodes) {
}
while (as->nr_pending)
- bch2_btree_node_free_ondisk(c, &as->pending[--as->nr_pending]);
+ bch2_btree_node_free_ondisk(c, &as->pending[--as->nr_pending],
+ seq);
mutex_unlock(&c->btree_interior_update_lock);
-
- closure_wake_up(&as->wait);
-
- bch2_btree_update_free(as);
-}
-
-static void btree_update_wait_on_journal(struct closure *cl)
-{
- struct btree_update *as = container_of(cl, struct btree_update, cl);
- struct bch_fs *c = as->c;
- int ret;
-
- ret = bch2_journal_open_seq_async(&c->journal, as->journal_seq, cl);
- if (ret == -EAGAIN) {
- continue_at(cl, btree_update_wait_on_journal, system_wq);
- return;
- }
- if (ret < 0)
- goto err;
-
- bch2_journal_flush_seq_async(&c->journal, as->journal_seq, cl);
-err:
- continue_at(cl, btree_update_nodes_reachable, system_wq);
}
static void btree_update_nodes_written(struct closure *cl)
{
struct btree_update *as = container_of(cl, struct btree_update, cl);
+ struct journal_res res = { 0 };
struct bch_fs *c = as->c;
struct btree *b;
+ struct bset *i;
+ struct bkey_i *k;
+ unsigned journal_u64s = 0;
+ int ret;
/*
* We did an update to a parent node where the pointers we added pointed
*/
mutex_lock(&c->btree_interior_update_lock);
as->nodes_written = true;
-retry:
+again:
as = list_first_entry_or_null(&c->btree_interior_updates_unwritten,
struct btree_update, unwritten_list);
if (!as || !as->nodes_written) {
return;
}
+ b = as->b;
+ if (b && !six_trylock_intent(&b->c.lock)) {
+ mutex_unlock(&c->btree_interior_update_lock);
+ btree_node_lock_type(c, b, SIX_LOCK_intent);
+ six_unlock_intent(&b->c.lock);
+ goto out;
+ }
+
+ journal_u64s = 0;
+
+ if (as->mode != BTREE_INTERIOR_UPDATING_ROOT)
+ for_each_keylist_key(&as->parent_keys, k)
+ journal_u64s += jset_u64s(k->k.u64s);
+
+ ret = bch2_journal_res_get(&c->journal, &res, journal_u64s,
+ JOURNAL_RES_GET_RESERVED);
+ if (ret) {
+ BUG_ON(!bch2_journal_error(&c->journal));
+ /* can't unblock btree writes */
+ goto free_update;
+ }
+
+ if (as->mode != BTREE_INTERIOR_UPDATING_ROOT)
+ for_each_keylist_key(&as->parent_keys, k)
+ bch2_journal_add_entry(&c->journal, &res,
+ BCH_JSET_ENTRY_btree_keys,
+ as->btree_id,
+ as->level,
+ k, k->k.u64s);
+
switch (as->mode) {
case BTREE_INTERIOR_NO_UPDATE:
BUG();
case BTREE_INTERIOR_UPDATING_NODE:
- /* The usual case: */
- b = READ_ONCE(as->b);
-
- if (!six_trylock_read(&b->c.lock)) {
- mutex_unlock(&c->btree_interior_update_lock);
- btree_node_lock_type(c, b, SIX_LOCK_read);
- six_unlock_read(&b->c.lock);
- mutex_lock(&c->btree_interior_update_lock);
- goto retry;
- }
-
- BUG_ON(!btree_node_dirty(b));
- closure_wait(&btree_current_write(b)->wait, &as->cl);
+ /* @b is the node we did the final insert into: */
+ BUG_ON(!res.ref);
+ six_lock_write(&b->c.lock, NULL, NULL);
list_del(&as->write_blocked_list);
- /*
- * for flush_held_btree_writes() waiting on updates to flush or
- * nodes to be writeable:
- */
- closure_wake_up(&c->btree_interior_update_wait);
+ i = btree_bset_last(b);
+ i->journal_seq = cpu_to_le64(
+ max(res.seq,
+ le64_to_cpu(i->journal_seq)));
+
+ bch2_btree_add_journal_pin(c, b, res.seq);
+ six_unlock_write(&b->c.lock);
list_del(&as->unwritten_list);
mutex_unlock(&c->btree_interior_update_lock);
* b->write_blocked prevented it from being written, so
* write it now if it needs to be written:
*/
- bch2_btree_node_write_cond(c, b, true);
- six_unlock_read(&b->c.lock);
- continue_at(&as->cl, btree_update_nodes_reachable, system_wq);
+ btree_node_write_if_need(c, b, SIX_LOCK_intent);
+ six_unlock_intent(&b->c.lock);
break;
case BTREE_INTERIOR_UPDATING_AS:
- /*
- * The btree node we originally updated has been freed and is
- * being rewritten - so we need to write anything here, we just
- * need to signal to that btree_update that it's ok to make the
- * new replacement node visible:
- */
- closure_put(&as->parent_as->cl);
-
- /*
- * and then we have to wait on that btree_update to finish:
- */
- closure_wait(&as->parent_as->wait, &as->cl);
+ BUG_ON(b);
list_del(&as->unwritten_list);
mutex_unlock(&c->btree_interior_update_lock);
-
- continue_at(&as->cl, btree_update_nodes_reachable, system_wq);
break;
- case BTREE_INTERIOR_UPDATING_ROOT:
- /* b is the new btree root: */
- b = READ_ONCE(as->b);
-
- if (!six_trylock_read(&b->c.lock)) {
- mutex_unlock(&c->btree_interior_update_lock);
- btree_node_lock_type(c, b, SIX_LOCK_read);
- six_unlock_read(&b->c.lock);
- mutex_lock(&c->btree_interior_update_lock);
- goto retry;
- }
-
- BUG_ON(c->btree_roots[b->c.btree_id].as != as);
- c->btree_roots[b->c.btree_id].as = NULL;
+ case BTREE_INTERIOR_UPDATING_ROOT: {
+ struct btree_root *r = &c->btree_roots[as->btree_id];
- bch2_btree_set_root_ondisk(c, b, WRITE);
+ BUG_ON(b);
- /*
- * We don't have to wait anything anything here (before
- * btree_update_nodes_reachable frees the old nodes
- * ondisk) - we've ensured that the very next journal write will
- * have the pointer to the new root, and before the allocator
- * can reuse the old nodes it'll have to do a journal commit:
- */
- six_unlock_read(&b->c.lock);
+ mutex_lock(&c->btree_root_lock);
+ bkey_copy(&r->key, as->parent_keys.keys);
+ r->level = as->level;
+ r->alive = true;
+ c->btree_roots_dirty = true;
+ mutex_unlock(&c->btree_root_lock);
list_del(&as->unwritten_list);
mutex_unlock(&c->btree_interior_update_lock);
-
- /*
- * Bit of funny circularity going on here we have to break:
- *
- * We have to drop our journal pin before writing the journal
- * entry that points to the new btree root: else, we could
- * deadlock if the journal currently happens to be full.
- *
- * This mean we're dropping the journal pin _before_ the new
- * nodes are technically reachable - but this is safe, because
- * after the bch2_btree_set_root_ondisk() call above they will
- * be reachable as of the very next journal write:
- */
- bch2_journal_pin_drop(&c->journal, &as->journal);
-
- as->journal_seq = bch2_journal_last_unwritten_seq(&c->journal);
-
- btree_update_wait_on_journal(&as->cl);
break;
}
+ }
+ bch2_journal_pin_drop(&c->journal, &as->journal);
+
+ bch2_journal_res_put(&c->journal, &res);
+ bch2_journal_preres_put(&c->journal, &as->journal_preres);
+
+ btree_update_nodes_reachable(as, res.seq);
+free_update:
+ bch2_btree_update_free(as);
+ /*
+ * for flush_held_btree_writes() waiting on updates to flush or
+ * nodes to be writeable:
+ */
+ closure_wake_up(&c->btree_interior_update_wait);
+out:
mutex_lock(&c->btree_interior_update_lock);
- goto retry;
+ goto again;
}
/*
BUG_ON(as->mode != BTREE_INTERIOR_NO_UPDATE);
BUG_ON(!btree_node_dirty(b));
- as->mode = BTREE_INTERIOR_UPDATING_NODE;
- as->b = b;
+ as->mode = BTREE_INTERIOR_UPDATING_NODE;
+ as->b = b;
+ as->level = b->c.level;
list_add(&as->write_blocked_list, &b->write_blocked);
mutex_unlock(&c->btree_interior_update_lock);
-
- /*
- * In general, when you're staging things in a journal that will later
- * be written elsewhere, and you also want to guarantee ordering: that
- * is, if you have updates a, b, c, after a crash you should never see c
- * and not a or b - there's a problem:
- *
- * If the final destination of the update(s) (i.e. btree node) can be
- * written/flushed _before_ the relevant journal entry - oops, that
- * breaks ordering, since the various leaf nodes can be written in any
- * order.
- *
- * Normally we use bset->journal_seq to deal with this - if during
- * recovery we find a btree node write that's newer than the newest
- * journal entry, we just ignore it - we don't need it, anything we're
- * supposed to have (that we reported as completed via fsync()) will
- * still be in the journal, and as far as the state of the journal is
- * concerned that btree node write never happened.
- *
- * That breaks when we're rewriting/splitting/merging nodes, since we're
- * mixing btree node writes that haven't happened yet with previously
- * written data that has been reported as completed to the journal.
- *
- * Thus, before making the new nodes reachable, we have to wait the
- * newest journal sequence number we have data for to be written (if it
- * hasn't been yet).
- */
- bch2_journal_wait_on_seq(&c->journal, as->journal_seq, &as->cl);
-}
-
-static void interior_update_flush(struct journal *j,
- struct journal_entry_pin *pin, u64 seq)
-{
- struct btree_update *as =
- container_of(pin, struct btree_update, journal);
-
- bch2_journal_flush_seq_async(j, as->journal_seq, NULL);
}
static void btree_update_reparent(struct btree_update *as,
{
struct bch_fs *c = as->c;
+ lockdep_assert_held(&c->btree_interior_update_lock);
+
child->b = NULL;
child->mode = BTREE_INTERIOR_UPDATING_AS;
- child->parent_as = as;
- closure_get(&as->cl);
/*
* When we write a new btree root, we have to drop our journal pin
* just transfer the journal pin to the new interior update so
* btree_update_nodes_written() can drop it.
*/
- bch2_journal_pin_copy(&c->journal, &as->journal,
- &child->journal, interior_update_flush);
+ bch2_journal_pin_copy(&c->journal, &as->journal, &child->journal, NULL);
bch2_journal_pin_drop(&c->journal, &child->journal);
-
- as->journal_seq = max(as->journal_seq, child->journal_seq);
}
-static void btree_update_updated_root(struct btree_update *as)
+static void btree_update_updated_root(struct btree_update *as, struct btree *b)
{
struct bch_fs *c = as->c;
- struct btree_root *r = &c->btree_roots[as->btree_id];
-
- mutex_lock(&c->btree_interior_update_lock);
- list_add_tail(&as->unwritten_list, &c->btree_interior_updates_unwritten);
BUG_ON(as->mode != BTREE_INTERIOR_NO_UPDATE);
+ BUG_ON(!bch2_keylist_empty(&as->parent_keys));
- /*
- * Old root might not be persistent yet - if so, redirect its
- * btree_update operation to point to us:
- */
- if (r->as)
- btree_update_reparent(as, r->as);
-
- as->mode = BTREE_INTERIOR_UPDATING_ROOT;
- as->b = r->b;
- r->as = as;
+ mutex_lock(&c->btree_interior_update_lock);
+ list_add_tail(&as->unwritten_list, &c->btree_interior_updates_unwritten);
+ as->mode = BTREE_INTERIOR_UPDATING_ROOT;
+ as->level = b->c.level;
+ bch2_keylist_add(&as->parent_keys, &b->key);
mutex_unlock(&c->btree_interior_update_lock);
-
- /*
- * When we're rewriting nodes and updating interior nodes, there's an
- * issue with updates that haven't been written in the journal getting
- * mixed together with older data - see btree_update_updated_node()
- * for the explanation.
- *
- * However, this doesn't affect us when we're writing a new btree root -
- * because to make that new root reachable we have to write out a new
- * journal entry, which must necessarily be newer than as->journal_seq.
- */
}
static void btree_node_will_make_reachable(struct btree_update *as,
struct btree *b)
{
struct bch_fs *c = as->c;
- struct closure *cl, *cl_n;
struct btree_update *p, *n;
struct btree_write *w;
- struct bset_tree *t;
set_btree_node_dying(b);
btree_interior_update_add_node_reference(as, b);
- /*
- * Does this node have data that hasn't been written in the journal?
- *
- * If so, we have to wait for the corresponding journal entry to be
- * written before making the new nodes reachable - we can't just carry
- * over the bset->journal_seq tracking, since we'll be mixing those keys
- * in with keys that aren't in the journal anymore:
- */
- for_each_bset(b, t)
- as->journal_seq = max(as->journal_seq,
- le64_to_cpu(bset(b, t)->journal_seq));
-
mutex_lock(&c->btree_interior_update_lock);
/*
clear_btree_node_dirty(b);
clear_btree_node_need_write(b);
- w = btree_current_write(b);
-
- /*
- * Does this node have any btree_update operations waiting on this node
- * to be written?
- *
- * If so, wake them up when this btree_update operation is reachable:
- */
- llist_for_each_entry_safe(cl, cl_n, llist_del_all(&w->wait.list), list)
- llist_add(&cl->list, &as->wait.list);
/*
* Does this node have unwritten data that has a pin on the journal?
* oldest pin of any of the nodes we're freeing. We'll release the pin
* when the new nodes are persistent and reachable on disk:
*/
- bch2_journal_pin_copy(&c->journal, &as->journal,
- &w->journal, interior_update_flush);
+ w = btree_current_write(b);
+ bch2_journal_pin_copy(&c->journal, &as->journal, &w->journal, NULL);
bch2_journal_pin_drop(&c->journal, &w->journal);
w = btree_prev_write(b);
- bch2_journal_pin_copy(&c->journal, &as->journal,
- &w->journal, interior_update_flush);
+ bch2_journal_pin_copy(&c->journal, &as->journal, &w->journal, NULL);
bch2_journal_pin_drop(&c->journal, &w->journal);
mutex_unlock(&c->btree_interior_update_lock);
{
struct btree_reserve *reserve;
struct btree_update *as;
+ int ret;
reserve = bch2_btree_reserve_get(c, nr_nodes, flags, cl);
if (IS_ERR(reserve))
bch2_keylist_init(&as->parent_keys, as->inline_keys);
+ ret = bch2_journal_preres_get(&c->journal, &as->journal_preres,
+ jset_u64s(BKEY_BTREE_PTR_U64s_MAX) * 3, 0);
+ if (ret) {
+ bch2_btree_reserve_put(c, reserve);
+ closure_debug_destroy(&as->cl);
+ mempool_free(as, &c->btree_interior_update_pool);
+ return ERR_PTR(ret);
+ }
+
mutex_lock(&c->btree_interior_update_lock);
list_add_tail(&as->list, &c->btree_interior_update_list);
mutex_unlock(&c->btree_interior_update_lock);
mutex_unlock(&c->btree_interior_update_lock);
}
-static void bch2_btree_set_root_ondisk(struct bch_fs *c, struct btree *b, int rw)
-{
- struct btree_root *r = &c->btree_roots[b->c.btree_id];
-
- mutex_lock(&c->btree_root_lock);
-
- BUG_ON(b != r->b);
- bkey_copy(&r->key, &b->key);
- r->level = b->c.level;
- r->alive = true;
- if (rw == WRITE)
- c->btree_roots_dirty = true;
-
- mutex_unlock(&c->btree_root_lock);
-}
-
/**
* bch_btree_set_root - update the root in memory and on disk
*
bch2_btree_set_root_inmem(as, b);
- btree_update_updated_root(as);
+ btree_update_updated_root(as, b);
/*
* Unlock old root after new root is visible:
bch2_btree_build_aux_trees(n1);
six_unlock_write(&n1->c.lock);
- bch2_keylist_add(&as->parent_keys, &n1->key);
+ if (parent)
+ bch2_keylist_add(&as->parent_keys, &n1->key);
}
bch2_btree_node_write(c, n1, SIX_LOCK_intent);
(bkey_cmp_packed(b, k, &insert->k) >= 0))
;
- while (!bch2_keylist_empty(keys)) {
- insert = bch2_keylist_front(keys);
-
+ for_each_keylist_key(keys, insert)
bch2_insert_fixup_btree_ptr(as, b, iter, insert, &node_iter);
- bch2_keylist_pop_front(keys);
- }
btree_update_updated_node(as, b);
bkey_copy(&b->key, new_key);
}
- btree_update_updated_root(as);
+ btree_update_updated_root(as, b);
bch2_btree_node_unlock_write(b, iter);
}