/*
* Get us an open_bucket we can allocate from, return with it locked:
*/
-struct write_point *bch2_alloc_sectors_start(struct bch_fs *c,
- unsigned target,
- unsigned erasure_code,
- struct write_point_specifier write_point,
- struct bch_devs_list *devs_have,
- unsigned nr_replicas,
- unsigned nr_replicas_required,
- enum alloc_reserve reserve,
- unsigned flags,
- struct closure *cl)
+int bch2_alloc_sectors_start(struct bch_fs *c,
+ unsigned target,
+ unsigned erasure_code,
+ struct write_point_specifier write_point,
+ struct bch_devs_list *devs_have,
+ unsigned nr_replicas,
+ unsigned nr_replicas_required,
+ enum alloc_reserve reserve,
+ unsigned flags,
+ struct closure *cl,
+ struct write_point **wp_ret)
{
struct write_point *wp;
struct open_bucket *ob;
write_points_nr = c->write_points_nr;
have_cache = false;
- wp = writepoint_find(c, write_point.v);
+ *wp_ret = wp = writepoint_find(c, write_point.v);
if (wp->data_type == BCH_DATA_user)
ob_flags |= BUCKET_MAY_ALLOC_PARTIAL;
BUG_ON(!wp->sectors_free || wp->sectors_free == UINT_MAX);
- return wp;
+ return 0;
err:
open_bucket_for_each(c, &wp->ptrs, ob, i)
if (ptrs.nr < ARRAY_SIZE(ptrs.v))
switch (ret) {
case -OPEN_BUCKETS_EMPTY:
case -FREELIST_EMPTY:
- return cl ? ERR_PTR(-EAGAIN) : ERR_PTR(-ENOSPC);
+ return cl ? -EAGAIN : -ENOSPC;
case -INSUFFICIENT_DEVICES:
- return ERR_PTR(-EROFS);
+ return -EROFS;
default:
BUG();
}
void bch2_alloc_sectors_append_ptrs(struct bch_fs *c, struct write_point *wp,
struct bkey_i *k, unsigned sectors,
bool cached)
-
{
struct open_bucket *ob;
unsigned i;
BUG_ON(sectors > wp->sectors_free);
- wp->sectors_free -= sectors;
+ wp->sectors_free -= sectors;
+ wp->sectors_allocated += sectors;
open_bucket_for_each(c, &wp->ptrs, ob, i) {
struct bch_dev *ca = bch_dev_bkey_exists(c, ob->dev);
{
mutex_init(&wp->lock);
wp->data_type = type;
+
+ INIT_WORK(&wp->index_update_work, bch2_write_point_do_index_updates);
+ INIT_LIST_HEAD(&wp->writes);
+ spin_lock_init(&wp->writes_lock);
}
void bch2_fs_allocator_foreground_init(struct bch_fs *c)
}
}
+
+static const char * const bch2_write_point_states[] = {
+#define x(n) #n,
+ WRITE_POINT_STATES()
+#undef x
+ NULL
+};
+
+void bch2_write_points_to_text(struct printbuf *out, struct bch_fs *c)
+{
+ struct write_point *wp;
+ unsigned i;
+
+ for (wp = c->write_points;
+ wp < c->write_points + ARRAY_SIZE(c->write_points);
+ wp++) {
+ pr_buf(out, "%lu: ", wp->write_point);
+ bch2_hprint(out, wp->sectors_allocated);
+
+ pr_buf(out, " last wrote: ");
+ bch2_pr_time_units(out, sched_clock() - wp->last_used);
+
+ for (i = 0; i < WRITE_POINT_STATE_NR; i++) {
+ pr_buf(out, " %s: ", bch2_write_point_states[i]);
+ bch2_pr_time_units(out, wp->time[i]);
+ }
+
+ pr_newline(out);
+ }
+}
}
}
-static void __bch2_write(struct closure *);
+static void __bch2_write(struct bch_write_op *);
static void bch2_write_done(struct closure *cl)
{
goto out;
}
+static inline void __wp_update_state(struct write_point *wp, enum write_point_state state)
+{
+ if (state != wp->state) {
+ u64 now = ktime_get_ns();
+
+ if (wp->last_state_change &&
+ time_after64(now, wp->last_state_change))
+ wp->time[wp->state] += now - wp->last_state_change;
+ wp->state = state;
+ wp->last_state_change = now;
+ }
+}
+
+static inline void wp_update_state(struct write_point *wp, bool running)
+{
+ enum write_point_state state;
+
+ state = running ? WRITE_POINT_running :
+ !list_empty(&wp->writes) ? WRITE_POINT_waiting_io
+ : WRITE_POINT_stopped;
+
+ __wp_update_state(wp, state);
+}
+
static void bch2_write_index(struct closure *cl)
{
struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
- struct bch_fs *c = op->c;
+ struct write_point *wp = op->wp;
+ struct workqueue_struct *wq = index_update_wq(op);
- __bch2_write_index(op);
+ barrier();
- if (!(op->flags & BCH_WRITE_DONE)) {
- continue_at(cl, __bch2_write, index_update_wq(op));
- } else if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
- bch2_journal_flush_seq_async(&c->journal,
- *op_journal_seq(op),
- cl);
- continue_at(cl, bch2_write_done, index_update_wq(op));
- } else {
- continue_at_nobarrier(cl, bch2_write_done, NULL);
+ /*
+ * We're not using wp->writes_lock here, so this is racey: that's ok,
+ * because this is just for diagnostic purposes, and we're running out
+ * of interrupt context here so if we were to take the log we'd have to
+ * switch to spin_lock_irq()/irqsave(), which is not free:
+ */
+ if (wp->state == WRITE_POINT_waiting_io)
+ __wp_update_state(wp, WRITE_POINT_waiting_work);
+
+ op->btree_update_ready = true;
+ queue_work(wq, &wp->index_update_work);
+}
+
+void bch2_write_point_do_index_updates(struct work_struct *work)
+{
+ struct write_point *wp =
+ container_of(work, struct write_point, index_update_work);
+ struct bch_write_op *op;
+
+ while (1) {
+ spin_lock(&wp->writes_lock);
+ list_for_each_entry(op, &wp->writes, wp_list)
+ if (op->btree_update_ready) {
+ list_del(&op->wp_list);
+ goto unlock;
+ }
+ op = NULL;
+unlock:
+ wp_update_state(wp, op != NULL);
+ spin_unlock(&wp->writes_lock);
+
+ if (!op)
+ break;
+
+ op->flags |= BCH_WRITE_IN_WORKER;
+
+ __bch2_write_index(op);
+
+ if (!(op->flags & BCH_WRITE_DONE)) {
+ __bch2_write(op);
+ } else if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
+ bch2_journal_flush_seq_async(&op->c->journal,
+ *op_journal_seq(op),
+ &op->cl);
+ continue_at(&op->cl, bch2_write_done, index_update_wq(op));
+ } else {
+ bch2_write_done(&op->cl);
+ }
}
}
if (parent)
bio_endio(&parent->bio);
- else if (!(op->flags & BCH_WRITE_SKIP_CLOSURE_PUT))
- closure_put(cl);
else
- continue_at_nobarrier(cl, bch2_write_index, index_update_wq(op));
+ closure_put(cl);
}
static void init_append_extent(struct bch_write_op *op,
return ret;
}
-static void __bch2_write(struct closure *cl)
+static void __bch2_write(struct bch_write_op *op)
{
- struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
struct bch_fs *c = op->c;
- struct write_point *wp;
+ struct write_point *wp = NULL;
struct bio *bio = NULL;
- bool skip_put = true;
unsigned nofs_flags;
int ret;
nofs_flags = memalloc_nofs_save();
again:
memset(&op->failed, 0, sizeof(op->failed));
+ op->btree_update_ready = false;
do {
struct bkey_i *key_to_write;
/* +1 for possible cache device: */
if (op->open_buckets.nr + op->nr_replicas + 1 >
ARRAY_SIZE(op->open_buckets.v))
- goto flush_io;
+ break;
if (bch2_keylist_realloc(&op->insert_keys,
op->inline_keys,
ARRAY_SIZE(op->inline_keys),
BKEY_EXTENT_U64s_MAX))
- goto flush_io;
+ break;
if ((op->flags & BCH_WRITE_FROM_INTERNAL) &&
percpu_ref_is_dying(&c->writes)) {
* freeing up space on specific disks, which means that
* allocations for specific disks may hang arbitrarily long:
*/
- wp = bch2_alloc_sectors_start(c,
+ ret = bch2_alloc_sectors_start(c,
op->target,
op->opts.erasure_code && !(op->flags & BCH_WRITE_CACHED),
op->write_point,
op->alloc_reserve,
op->flags,
(op->flags & (BCH_WRITE_ALLOC_NOWAIT|
- BCH_WRITE_ONLY_SPECIFIED_DEVS)) ? NULL : cl);
- EBUG_ON(!wp);
-
- if (unlikely(IS_ERR(wp))) {
- if (unlikely(PTR_ERR(wp) != -EAGAIN)) {
- ret = PTR_ERR(wp);
+ BCH_WRITE_ONLY_SPECIFIED_DEVS))
+ ? NULL : &op->cl,
+ &wp);
+ if (unlikely(ret)) {
+ if (unlikely(ret != -EAGAIN))
goto err;
- }
- goto flush_io;
+ break;
}
- /*
- * It's possible for the allocator to fail, put us on the
- * freelist waitlist, and then succeed in one of various retry
- * paths: if that happens, we need to disable the skip_put
- * optimization because otherwise there won't necessarily be a
- * barrier before we free the bch_write_op:
- */
- if (atomic_read(&cl->remaining) & CLOSURE_WAITING)
- skip_put = false;
+ EBUG_ON(!wp);
bch2_open_bucket_get(c, wp, &op->open_buckets);
ret = bch2_write_extent(op, wp, &bio);
+
bch2_alloc_sectors_done(c, wp);
if (ret < 0)
goto err;
- if (ret) {
- skip_put = false;
- } else {
- /*
- * for the skip_put optimization this has to be set
- * before we submit the bio:
- */
+ if (!ret)
op->flags |= BCH_WRITE_DONE;
- }
bio->bi_end_io = bch2_write_endio;
bio->bi_private = &op->cl;
bio->bi_opf |= REQ_OP_WRITE;
- if (!skip_put)
- closure_get(bio->bi_private);
- else
- op->flags |= BCH_WRITE_SKIP_CLOSURE_PUT;
+ closure_get(bio->bi_private);
key_to_write = (void *) (op->insert_keys.keys_p +
key_to_write_offset);
bch2_submit_wbio_replicas(to_wbio(bio), c, BCH_DATA_user,
key_to_write);
} while (ret);
-
- if (!skip_put)
- continue_at(cl, bch2_write_index, index_update_wq(op));
out:
- memalloc_nofs_restore(nofs_flags);
- return;
-err:
- op->error = ret;
- op->flags |= BCH_WRITE_DONE;
-
- continue_at(cl, bch2_write_index, index_update_wq(op));
- goto out;
-flush_io:
/*
* If the write can't all be submitted at once, we generally want to
* block synchronously as that signals backpressure to the caller.
- *
- * However, if we're running out of a workqueue, we can't block here
- * because we'll be blocking other work items from completing:
*/
- if (current->flags & PF_WQ_WORKER) {
- continue_at(cl, bch2_write_index, index_update_wq(op));
- goto out;
- }
-
- closure_sync(cl);
-
- if (!bch2_keylist_empty(&op->insert_keys)) {
+ if (!(op->flags & BCH_WRITE_DONE) &&
+ !(op->flags & BCH_WRITE_IN_WORKER)) {
+ closure_sync(&op->cl);
__bch2_write_index(op);
- if (op->error) {
- op->flags |= BCH_WRITE_DONE;
- continue_at_nobarrier(cl, bch2_write_done, NULL);
- goto out;
- }
+ if (!(op->flags & BCH_WRITE_DONE))
+ goto again;
+ bch2_write_done(&op->cl);
+ } else {
+ spin_lock(&wp->writes_lock);
+ op->wp = wp;
+ list_add_tail(&op->wp_list, &wp->writes);
+ if (wp->state == WRITE_POINT_stopped)
+ __wp_update_state(wp, WRITE_POINT_waiting_io);
+ spin_unlock(&wp->writes_lock);
+
+ continue_at(&op->cl, bch2_write_index, NULL);
}
- goto again;
+ memalloc_nofs_restore(nofs_flags);
+ return;
+err:
+ op->error = ret;
+ op->flags |= BCH_WRITE_DONE;
+ goto out;
}
static void bch2_write_data_inline(struct bch_write_op *op, unsigned data_len)
{
- struct closure *cl = &op->cl;
struct bio *bio = &op->wbio.bio;
struct bvec_iter iter;
struct bkey_i_inline_data *id;
unsigned sectors;
int ret;
+ op->flags |= BCH_WRITE_WROTE_DATA_INLINE;
+ op->flags |= BCH_WRITE_DONE;
+
bch2_check_set_feature(op->c, BCH_FEATURE_inline_data);
ret = bch2_keylist_realloc(&op->insert_keys, op->inline_keys,
set_bkey_val_bytes(&id->k, data_len);
bch2_keylist_push(&op->insert_keys);
- op->flags |= BCH_WRITE_WROTE_DATA_INLINE;
- op->flags |= BCH_WRITE_DONE;
-
- continue_at_nobarrier(cl, bch2_write_index, NULL);
- return;
+ __bch2_write_index(op);
err:
bch2_write_done(&op->cl);
}
struct bch_fs *c = op->c;
unsigned data_len;
+ EBUG_ON(op->cl.parent);
BUG_ON(!op->nr_replicas);
BUG_ON(!op->write_point.v);
BUG_ON(!bkey_cmp(op->pos, POS_MAX));
return;
}
- continue_at_nobarrier(cl, __bch2_write, NULL);
+ __bch2_write(op);
return;
err:
bch2_disk_reservation_put(c, &op->res);
- if (op->end_io) {
- EBUG_ON(cl->parent);
- closure_debug_destroy(cl);
+ closure_debug_destroy(&op->cl);
+ if (op->end_io)
op->end_io(op);
- } else {
- closure_return(cl);
- }
}
/* Cache promotion on read */