bch_keylist_push(&op->keys);
op->journal = i->pin;
- atomic_inc(op->journal);
ret = bch_btree_insert(op, s, &op->keys);
if (ret)
* Try to find the btree node with that references the oldest journal
* entry, best is our current candidate and is locked if non NULL:
*/
- struct btree *b, *best = NULL;
- unsigned iter;
+ struct btree *b, *best;
+ unsigned i;
+retry:
+ best = NULL;
+
+ for_each_cached_btree(b, c, i)
+ if (btree_current_write(b)->journal) {
+ if (!best)
+ best = b;
+ else if (journal_pin_cmp(c,
+ btree_current_write(best),
+ btree_current_write(b))) {
+ best = b;
+ }
+ }
- for_each_cached_btree(b, c, iter) {
- if (!down_write_trylock(&b->lock))
- continue;
+ b = best;
+ if (b) {
+ rw_lock(true, b, b->level);
- if (!btree_node_dirty(b) ||
- !btree_current_write(b)->journal) {
+ if (!btree_current_write(b)->journal) {
rw_unlock(true, b);
- continue;
+ /* We raced */
+ goto retry;
}
- if (!best)
- best = b;
- else if (journal_pin_cmp(c,
- btree_current_write(best),
- btree_current_write(b))) {
- rw_unlock(true, best);
- best = b;
- } else
- rw_unlock(true, b);
+ bch_btree_node_write(b, NULL);
+ rw_unlock(true, b);
}
-
- if (best)
- goto out;
-
- /* We can't find the best btree node, just pick the first */
- list_for_each_entry(b, &c->btree_cache, list)
- if (!b->level && btree_node_dirty(b)) {
- best = b;
- rw_lock(true, best, best->level);
- goto found;
- }
-
-out:
- if (!best)
- return;
-found:
- if (btree_node_dirty(best))
- bch_btree_node_write(best, NULL);
- rw_unlock(true, best);
}
#define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1)
do_journal_discard(ca);
if (c->journal.blocks_free)
- return;
+ goto out;
/*
* Allocate:
if (n)
c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;
-
+out:
if (!journal_full(&c->journal))
__closure_wake_up(&c->journal.wait);
}
journal_write_unlocked(cl);
}
-static void __journal_try_write(struct cache_set *c, bool noflush)
+static void journal_try_write(struct cache_set *c)
__releases(c->journal.lock)
{
struct closure *cl = &c->journal.io;
w->need_write = true;
- if (!closure_trylock(cl, &c->cl))
- spin_unlock(&c->journal.lock);
- else if (noflush && journal_full(&c->journal)) {
- spin_unlock(&c->journal.lock);
- continue_at(cl, journal_write, system_wq);
- } else
+ if (closure_trylock(cl, &c->cl))
journal_write_unlocked(cl);
+ else
+ spin_unlock(&c->journal.lock);
}
-#define journal_try_write(c) __journal_try_write(c, false)
-
-void bch_journal_meta(struct cache_set *c, struct closure *cl)
+static struct journal_write *journal_wait_for_write(struct cache_set *c,
+ unsigned nkeys)
{
- struct journal_write *w;
+ size_t sectors;
+ struct closure cl;
- if (CACHE_SYNC(&c->sb)) {
- spin_lock(&c->journal.lock);
- w = c->journal.cur;
+ closure_init_stack(&cl);
+
+ spin_lock(&c->journal.lock);
+
+ while (1) {
+ struct journal_write *w = c->journal.cur;
+
+ sectors = __set_blocks(w->data, w->data->keys + nkeys,
+ c) * c->sb.block_size;
+
+ if (sectors <= min_t(size_t,
+ c->journal.blocks_free * c->sb.block_size,
+ PAGE_SECTORS << JSET_BITS))
+ return w;
+
+ /* XXX: tracepoint */
+ if (!journal_full(&c->journal)) {
+ trace_bcache_journal_entry_full(c);
+
+ /*
+ * XXX: If we were inserting so many keys that they
+ * won't fit in an _empty_ journal write, we'll
+ * deadlock. For now, handle this in
+ * bch_keylist_realloc() - but something to think about.
+ */
+ BUG_ON(!w->data->keys);
+
+ closure_wait(&w->wait, &cl);
+ journal_try_write(c); /* unlocks */
+ } else {
+ trace_bcache_journal_full(c);
+
+ closure_wait(&c->journal.wait, &cl);
+ journal_reclaim(c);
+ spin_unlock(&c->journal.lock);
- if (cl)
- BUG_ON(!closure_wait(&w->wait, cl));
+ btree_flush_write(c);
+ }
- __journal_try_write(c, true);
+ closure_sync(&cl);
+ spin_lock(&c->journal.lock);
}
}
* bch_journal() hands those same keys off to btree_insert_async()
*/
-void bch_journal(struct closure *cl)
+atomic_t *bch_journal(struct cache_set *c,
+ struct keylist *keys,
+ struct closure *parent)
{
- struct btree_op *op = container_of(cl, struct btree_op, cl);
- struct cache_set *c = op->c;
struct journal_write *w;
- size_t sectors, nkeys;
-
- if (op->type != BTREE_INSERT ||
- !CACHE_SYNC(&c->sb))
- goto out;
-
- /*
- * If we're looping because we errored, might already be waiting on
- * another journal write:
- */
- while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
- closure_sync(cl->parent);
-
- spin_lock(&c->journal.lock);
-
- if (journal_full(&c->journal)) {
- trace_bcache_journal_full(c);
-
- closure_wait(&c->journal.wait, cl);
-
- journal_reclaim(c);
- spin_unlock(&c->journal.lock);
-
- btree_flush_write(c);
- continue_at(cl, bch_journal, bcache_wq);
- }
+ atomic_t *ret;
- w = c->journal.cur;
- nkeys = w->data->keys + bch_keylist_nkeys(&op->keys);
- sectors = __set_blocks(w->data, nkeys, c) * c->sb.block_size;
+ if (!CACHE_SYNC(&c->sb))
+ return NULL;
- if (sectors > min_t(size_t,
- c->journal.blocks_free * c->sb.block_size,
- PAGE_SECTORS << JSET_BITS)) {
- trace_bcache_journal_entry_full(c);
+ w = journal_wait_for_write(c, bch_keylist_nkeys(keys));
- /*
- * XXX: If we were inserting so many keys that they won't fit in
- * an _empty_ journal write, we'll deadlock. For now, handle
- * this in bch_keylist_realloc() - but something to think about.
- */
- BUG_ON(!w->data->keys);
+ memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys));
+ w->data->keys += bch_keylist_nkeys(keys);
- BUG_ON(!closure_wait(&w->wait, cl));
+ ret = &fifo_back(&c->journal.pin);
+ atomic_inc(ret);
- journal_try_write(c);
- continue_at(cl, bch_journal, bcache_wq);
- }
-
- memcpy(end(w->data), op->keys.keys, bch_keylist_bytes(&op->keys));
- w->data->keys += bch_keylist_nkeys(&op->keys);
-
- op->journal = &fifo_back(&c->journal.pin);
- atomic_inc(op->journal);
-
- if (op->flush_journal) {
- closure_wait(&w->wait, cl->parent);
+ if (parent) {
+ closure_wait(&w->wait, parent);
journal_try_write(c);
} else if (!w->need_write) {
schedule_delayed_work(&c->journal.work,
} else {
spin_unlock(&c->journal.lock);
}
-out:
- bch_btree_insert_async(cl);
+
+
+ return ret;
+}
+
+void bch_journal_meta(struct cache_set *c, struct closure *cl)
+{
+ struct keylist keys;
+ atomic_t *ref;
+
+ bch_keylist_init(&keys);
+
+ ref = bch_journal(c, &keys, cl);
+ if (ref)
+ atomic_dec_bug(ref);
}
void bch_journal_free(struct cache_set *c)
struct kmem_cache *bch_search_cache;
+static void bch_data_insert_start(struct closure *);
+
/* Cgroup interface */
#ifdef CONFIG_CGROUP_BCACHE
/* Insert data into cache */
-static void bio_invalidate(struct closure *cl)
+static void bch_data_insert_keys(struct closure *cl)
{
struct btree_op *op = container_of(cl, struct btree_op, cl);
- struct bio *bio = op->cache_bio;
+ struct search *s = container_of(op, struct search, op);
- pr_debug("invalidating %i sectors from %llu",
- bio_sectors(bio), (uint64_t) bio->bi_sector);
+ /*
+ * If we're looping, might already be waiting on
+ * another journal write - can't wait on more than one journal write at
+ * a time
+ *
+ * XXX: this looks wrong
+ */
+#if 0
+ while (atomic_read(&s->cl.remaining) & CLOSURE_WAITING)
+ closure_sync(&s->cl);
+#endif
- while (bio_sectors(bio)) {
- unsigned len = min(bio_sectors(bio), 1U << 14);
+ if (s->write)
+ op->journal = bch_journal(op->c, &op->keys,
+ op->flush_journal
+ ? &s->cl : NULL);
- if (bch_keylist_realloc(&op->keys, 0, op->c))
- goto out;
+ if (bch_btree_insert(op, op->c, &op->keys)) {
+ s->error = -ENOMEM;
+ op->insert_data_done = true;
+ }
- bio->bi_sector += len;
- bio->bi_size -= len << 9;
+ if (op->journal)
+ atomic_dec_bug(op->journal);
+ op->journal = NULL;
- bch_keylist_add(&op->keys,
- &KEY(op->inode, bio->bi_sector, len));
- }
+ if (!op->insert_data_done)
+ continue_at(cl, bch_data_insert_start, bcache_wq);
- op->insert_data_done = true;
- bio_put(bio);
-out:
- continue_at(cl, bch_journal, bcache_wq);
+ bch_keylist_free(&op->keys);
+ closure_return(cl);
}
struct open_bucket {
return true;
}
-static void bch_insert_data_error(struct closure *cl)
+static void bch_data_invalidate(struct closure *cl)
+{
+ struct btree_op *op = container_of(cl, struct btree_op, cl);
+ struct bio *bio = op->cache_bio;
+
+ pr_debug("invalidating %i sectors from %llu",
+ bio_sectors(bio), (uint64_t) bio->bi_sector);
+
+ while (bio_sectors(bio)) {
+ unsigned len = min(bio_sectors(bio), 1U << 14);
+
+ if (bch_keylist_realloc(&op->keys, 0, op->c))
+ goto out;
+
+ bio->bi_sector += len;
+ bio->bi_size -= len << 9;
+
+ bch_keylist_add(&op->keys, &KEY(op->inode,
+ bio->bi_sector, len));
+ }
+
+ op->insert_data_done = true;
+ bio_put(bio);
+out:
+ continue_at(cl, bch_data_insert_keys, bcache_wq);
+}
+
+static void bch_data_insert_error(struct closure *cl)
{
struct btree_op *op = container_of(cl, struct btree_op, cl);
op->keys.top = dst;
- bch_journal(cl);
+ bch_data_insert_keys(cl);
}
-static void bch_insert_data_endio(struct bio *bio, int error)
+static void bch_data_insert_endio(struct bio *bio, int error)
{
struct closure *cl = bio->bi_private;
struct btree_op *op = container_of(cl, struct btree_op, cl);
if (s->writeback)
s->error = error;
else if (s->write)
- set_closure_fn(cl, bch_insert_data_error, bcache_wq);
+ set_closure_fn(cl, bch_data_insert_error, bcache_wq);
else
set_closure_fn(cl, NULL, NULL);
}
bch_bbio_endio(op->c, bio, error, "writing data to cache");
}
-static void bch_insert_data_loop(struct closure *cl)
+static void bch_data_insert_start(struct closure *cl)
{
struct btree_op *op = container_of(cl, struct btree_op, cl);
struct search *s = container_of(op, struct search, op);
struct bio *bio = op->cache_bio, *n;
if (op->bypass)
- return bio_invalidate(cl);
+ return bch_data_invalidate(cl);
if (atomic_sub_return(bio_sectors(bio), &op->c->sectors_to_gc) < 0) {
set_gc_sectors(op->c);
if (bch_keylist_realloc(&op->keys,
1 + (op->csum ? 1 : 0),
op->c))
- continue_at(cl, bch_journal, bcache_wq);
+ continue_at(cl, bch_data_insert_keys, bcache_wq);
k = op->keys.top;
bkey_init(k);
n = bch_bio_split(bio, KEY_SIZE(k), GFP_NOIO, split);
- n->bi_end_io = bch_insert_data_endio;
+ n->bi_end_io = bch_data_insert_endio;
n->bi_private = cl;
if (s->writeback) {
} while (n != bio);
op->insert_data_done = true;
- continue_at(cl, bch_journal, bcache_wq);
+ continue_at(cl, bch_data_insert_keys, bcache_wq);
err:
/* bch_alloc_sectors() blocks if s->writeback = true */
BUG_ON(s->writeback);
* rest of the write.
*/
op->bypass = true;
- return bio_invalidate(cl);
+ return bch_data_invalidate(cl);
} else {
/*
* From a cache miss, we can just insert the keys for the data
bio_put(bio);
if (!bch_keylist_empty(&op->keys))
- continue_at(cl, bch_journal, bcache_wq);
+ continue_at(cl, bch_data_insert_keys, bcache_wq);
else
closure_return(cl);
}
}
/**
- * bch_insert_data - stick some data in the cache
+ * bch_data_insert - stick some data in the cache
*
* This is the starting point for any data to end up in a cache device; it could
* be from a normal write, or a writeback write, or a write to a flash only
* If op->bypass is true, instead of inserting the data it invalidates the
* region of the cache represented by op->cache_bio and op->inode.
*/
-void bch_insert_data(struct closure *cl)
+void bch_data_insert(struct closure *cl)
{
struct btree_op *op = container_of(cl, struct btree_op, cl);
bch_keylist_init(&op->keys);
bio_get(op->cache_bio);
- bch_insert_data_loop(cl);
-}
-
-void bch_btree_insert_async(struct closure *cl)
-{
- struct btree_op *op = container_of(cl, struct btree_op, cl);
- struct search *s = container_of(op, struct search, op);
-
- if (bch_btree_insert(op, op->c, &op->keys)) {
- s->error = -ENOMEM;
- op->insert_data_done = true;
- }
-
- if (op->insert_data_done) {
- bch_keylist_free(&op->keys);
- closure_return(cl);
- } else
- continue_at(cl, bch_insert_data_loop, bcache_wq);
+ bch_data_insert_start(cl);
}
/* Common code for the make_request functions */
if (s->op.cache_bio &&
!test_bit(CACHE_SET_STOPPING, &s->op.c->flags)) {
s->op.type = BTREE_REPLACE;
- closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+ closure_call(&s->op.cl, bch_data_insert, NULL, cl);
}
continue_at(cl, cached_dev_cache_miss_done, NULL);
closure_bio_submit(bio, cl, s->d);
}
- closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+ closure_call(&s->op.cl, bch_data_insert, NULL, cl);
continue_at(cl, cached_dev_write_complete, NULL);
}
-static void cached_dev_nodata(struct cached_dev *dc, struct search *s)
+static void cached_dev_nodata(struct closure *cl)
{
- struct closure *cl = &s->cl;
+ struct search *s = container_of(cl, struct search, cl);
struct bio *bio = &s->bio.bio;
if (s->op.flush_journal)
s = search_alloc(bio, d);
trace_bcache_request_start(s, bio);
- if (!bio->bi_size)
- cached_dev_nodata(dc, s);
- else {
+ if (!bio->bi_size) {
+ /*
+ * can't call bch_journal_meta from under
+ * generic_make_request
+ */
+ continue_at_nobarrier(&s->cl,
+ cached_dev_nodata,
+ bcache_wq);
+ } else {
s->op.bypass = check_should_bypass(dc, s);
if (rw)
return 0;
}
+static void flash_dev_nodata(struct closure *cl)
+{
+ struct search *s = container_of(cl, struct search, cl);
+
+ if (s->op.flush_journal)
+ bch_journal_meta(s->op.c, cl);
+
+ continue_at(cl, search_free, NULL);
+}
+
static void flash_dev_make_request(struct request_queue *q, struct bio *bio)
{
struct search *s;
trace_bcache_request_start(s, bio);
if (!bio->bi_size) {
- if (s->op.flush_journal)
- bch_journal_meta(s->op.c, cl);
+ /*
+ * can't call bch_journal_meta from under
+ * generic_make_request
+ */
+ continue_at_nobarrier(&s->cl,
+ flash_dev_nodata,
+ bcache_wq);
} else if (rw) {
bch_keybuf_check_overlapping(&s->op.c->moving_gc_keys,
&KEY(d->id, bio->bi_sector, 0),
s->writeback = true;
s->op.cache_bio = bio;
- closure_call(&s->op.cl, bch_insert_data, NULL, cl);
+ closure_call(&s->op.cl, bch_data_insert, NULL, cl);
} else {
closure_call(&s->op.cl, btree_read_async, NULL, cl);
}