diff --git a/mars.h b/mars.h index 07d79fd0..99d27fa8 100644 --- a/mars.h +++ b/mars.h @@ -275,10 +275,10 @@ static const struct generic_aspect_type *BRICK##_aspect_types[BRICK_OBJ_MAX] = { #define _CHECK_ATOMIC(atom,OP,minval) \ do { \ - int test = atomic_read(atom); \ - if (test OP (minval)) { \ + int __test = atomic_read(atom); \ + if (__test OP (minval)) { \ atomic_set(atom, minval); \ - MARS_ERR("%d: atomic " #atom " " #OP " " #minval " (%d)\n", __LINE__, test); \ + MARS_ERR("%d: atomic " #atom " " #OP " " #minval " (%d)\n", __LINE__, __test); \ } \ } while (0) @@ -287,7 +287,7 @@ static const struct generic_aspect_type *BRICK##_aspect_types[BRICK_OBJ_MAX] = { #define CHECK_HEAD_EMPTY(head) \ if (unlikely(!list_empty(head))) { \ - INIT_LIST_HEAD(head); \ + list_del_init(head); \ MARS_ERR("%d: list_head " #head " (%p) not empty\n", __LINE__, head); \ } \ diff --git a/mars_aio.c b/mars_aio.c index 7e232e86..9cf59da2 100644 --- a/mars_aio.c +++ b/mars_aio.c @@ -121,6 +121,8 @@ static int aio_ref_get(struct aio_output *output, struct mref_object *mref) mref->ref_flags = 0; #endif mref_a->do_dealloc = true; + atomic_inc(&output->total_alloc_count); + atomic_inc(&output->alloc_count); } atomic_inc(&mref->ref_count); @@ -144,6 +146,7 @@ static void aio_ref_put(struct aio_output *output, struct mref_object *mref) mref_a = aio_mref_get_aspect(output, mref); if (mref_a && mref_a->do_dealloc) { kfree(mref->ref_data); + atomic_dec(&output->alloc_count); } aio_free_mref(mref); done:; @@ -163,7 +166,15 @@ void _complete(struct aio_output *output, struct mref_object *mref, int err) } else { mref->ref_flags |= MREF_UPTODATE; } + cb->cb_fn(cb); + + if (mref->ref_rw) { + atomic_dec(&output->write_count); + } else { + atomic_dec(&output->read_count); + } + aio_ref_put(output, mref); } @@ -175,6 +186,15 @@ static void aio_ref_io(struct aio_output *output, struct mref_object *mref) atomic_inc(&mref->ref_count); + // statistics + if (mref->ref_rw) { + atomic_inc(&output->total_write_count); + atomic_inc(&output->write_count); + } else { + atomic_inc(&output->total_read_count); + atomic_inc(&output->read_count); + } + if (unlikely(!output->filp)) { goto done; } @@ -496,6 +516,35 @@ static int aio_get_info(struct aio_output *output, struct mars_info *info) return 0; } +//////////////// informational / statistics /////////////// + +static noinline +char *aio_statistics(struct aio_brick *brick, int verbose) +{ + struct aio_output *output = brick->outputs[0]; + char *res = kmalloc(256, GFP_MARS); + if (!res) + return NULL; + + // FIXME: check for allocation overflows + + sprintf(res, "total reads=%d writes=%d allocs=%d | flying reads=%d writes=%d allocs=%d \n", + atomic_read(&output->total_read_count), atomic_read(&output->total_write_count), atomic_read(&output->total_alloc_count), + atomic_read(&output->read_count), atomic_read(&output->write_count), atomic_read(&output->alloc_count)); + + return res; +} + +static noinline +void aio_reset_statistics(struct aio_brick *brick) +{ + struct aio_output *output = brick->outputs[0]; + atomic_set(&output->total_read_count, 0); + atomic_set(&output->total_write_count, 0); + atomic_set(&output->total_alloc_count, 0); +} + + //////////////// object / aspect constructors / destructors /////////////// static int aio_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data) @@ -659,6 +708,8 @@ static int aio_output_destruct(struct aio_output *output) static struct aio_brick_ops aio_brick_ops = { .brick_switch = aio_switch, + .brick_statistics = aio_statistics, + .reset_statistics = aio_reset_statistics, }; static struct aio_output_ops aio_output_ops = { diff --git a/mars_aio.h b/mars_aio.h index 349accdf..7974f050 100644 --- a/mars_aio.h +++ b/mars_aio.h @@ -41,6 +41,13 @@ struct aio_output { int fd; // FIXME: remove this! struct aio_threadinfo tinfo[3]; aio_context_t ctxp; + // statistics + atomic_t total_read_count; + atomic_t total_write_count; + atomic_t total_alloc_count; + atomic_t read_count; + atomic_t write_count; + atomic_t alloc_count; }; MARS_TYPES(aio); diff --git a/mars_trans_logger.c b/mars_trans_logger.c index a5f45d53..5dbe7eba 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -13,20 +13,14 @@ #define USE_HIGHER_PHASES #define APPLY_DATA //#define DO_SKIP -//#define DO_IGNORE // FIXME or DELETE #define DO_EXTEND #define NEW_CODE -//#define KEEP_UNIQUE -#define CLEAN_ALL +#define KEEP_UNIQUE #define WB_COPY -#define X1 // kein make_writeback() etc -#define X2 // kein fire_writeback(), stattdessen sofort free_writeback() -#define X3 // kein hashing in make_writeback -#define X4 // erzeuge keine sub_mrefs in make_writeback -#define X5 // keine IOs #define NOTRASH_DATA -#define ATOMIC_FREE +#define CLEAN_ALL +#define WB_MEMLEAK #include #include @@ -44,70 +38,6 @@ #define inline noinline #endif -#ifdef BITMAP_CHECKS // code stolen -enum { - REG_OP_ISFREE, /* true if region is all zero bits */ - REG_OP_ALLOC, /* set all bits in region */ - REG_OP_RELEASE, /* clear all bits in region */ -}; - -static int bitmap_op(unsigned long *bitmap, int pos, int nbits_reg, int reg_op) -{ - int index; /* index first long of region in bitmap */ - int offset; /* bit offset region in bitmap[index] */ - int nlongs_reg; /* num longs spanned by region in bitmap */ - int nbitsinlong; /* num bits of region in each spanned long */ - unsigned long mask; /* bitmask for one long of region */ - int i; /* scans bitmap by longs */ - int ret = 0; /* return value */ - - index = pos / BITS_PER_LONG; - offset = pos - (index * BITS_PER_LONG); - nlongs_reg = BITS_TO_LONGS(nbits_reg); - nbitsinlong = min(nbits_reg, BITS_PER_LONG); - - /* - * Can't do "mask = (1UL << nbitsinlong) - 1", as that - * overflows if nbitsinlong == BITS_PER_LONG. - */ - mask = (1UL << (nbitsinlong - 1)); - mask += mask - 1; - mask <<= offset; - - switch (reg_op) { - case REG_OP_ISFREE: - for (i = 0; i < nlongs_reg; i++) { - if (bitmap[index + i] & mask) - goto done; - } - ret = 1; /* all bits in region free (zero) */ - break; - - case REG_OP_ALLOC: - for (i = 0; i < nlongs_reg; i++) - bitmap[index + i] |= mask; - break; - - case REG_OP_RELEASE: - for (i = 0; i < nlongs_reg; i++) - bitmap[index + i] &= ~mask; - break; - } -done: - return ret; -} - -#define CHECK_BITMAP(mref,mref_a) \ - { \ - int i; \ - for (i = 0; i < sizeof(mref_a->dirty_bitmap)/sizeof(long); i++) { \ - if (mref_a->dirty_bitmap[i] != 0) { \ - MARS_ERR("bitmap %d: dirty = %8lx touched = %8lx slave = %8lx worked = %8lx at pos = %lld len = %d (writes=%d slave_writes=%d reads=%d, start_phase1=%d end_phase1=%d start_phase2=%d sub_count=%d)\n", i, mref_a->dirty_bitmap[i], mref_a->touch_bitmap[i], mref_a->slave_bitmap[i], mref_a->work_bitmap[i], mref->ref_pos, mref->ref_len, mref_a->bitmap_write, mref_a->bitmap_write_slave, mref_a->bitmap_read, mref_a->start_phase1, mref_a->end_phase1, mref_a->start_phase2, mref_a->sub_count); \ - } \ - } \ - } -#endif - //////////////////////////////////////////////////////////////////// static inline @@ -153,6 +83,9 @@ bool q_is_ready(struct logger_queue *q) /* compute some characteristic measures */ contention = atomic_read(&q->q_output->fly_count); + if (q->q_dep_plus) { + contention += atomic_read(q->q_dep_plus); + } dep = q->q_dep; while (dep) { contention += atomic_read(&dep->q_queued) + atomic_read(&dep->q_flying); @@ -306,35 +239,28 @@ struct trans_logger_mref_aspect *q_fetch(struct logger_queue *q) static inline -int hash_fn(loff_t base_index) +int hash_fn(loff_t pos) { // simple and stupid - loff_t tmp; - tmp = base_index ^ (base_index / TRANS_HASH_MAX); - return ((unsigned)tmp) % TRANS_HASH_MAX; + loff_t base_index = pos >> REGION_SIZE_BITS; + return base_index % TRANS_HASH_MAX; } -static noinline -struct trans_logger_mref_aspect *hash_find(struct trans_logger_output *output, loff_t pos, int *max_len, bool do_ignore, struct trans_logger_mref_aspect *cond_insert_a) +static inline +struct trans_logger_mref_aspect *_hash_find(struct list_head *start, loff_t pos, int *max_len, bool use_collect_head) { - loff_t base_index = pos >> REGION_SIZE_BITS; - int hash = hash_fn(base_index); - struct hash_anchor *start = &output->hash_table[hash]; struct list_head *tmp; struct trans_logger_mref_aspect *res = NULL; int len = *max_len; - unsigned int flags; #ifdef STAT_DEBUGGING int count = 0; #endif - - traced_readlock(&start->hash_lock, flags); - + /* The lists are always sorted according to age (newest first). * Caution: there may be duplicates in the list, some of them * overlapping with the search area in many different ways. */ - for (tmp = start->hash_anchor.next; tmp != &start->hash_anchor; tmp = tmp->next) { + for (tmp = start->next; tmp != start; tmp = tmp->next) { struct trans_logger_mref_aspect *test_a; struct mref_object *test; int diff; @@ -343,18 +269,24 @@ struct trans_logger_mref_aspect *hash_find(struct trans_logger_output *output, l if (++count > max) { max = count; if (!(max % 10)) { - MARS_DBG("hash max=%d hash=%d base_index=%lld\n", max, hash, base_index); + MARS_DBG("hash max=%d hash=%d\n", max, hash); } } #endif - test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head); + if (use_collect_head) { + test_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head); + } else { + test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head); + } test = test_a->object; + + CHECK_ATOMIC(&test->ref_count, 1); // are the regions overlapping? - if (pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos || (test_a->ignore_this & do_ignore)) { + if (pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) { continue; // not relevant } - + diff = test->ref_pos - pos; if (diff <= 0) { int restlen = test->ref_len + diff; @@ -369,26 +301,36 @@ struct trans_logger_mref_aspect *hash_find(struct trans_logger_output *output, l } } + *max_len = len; + return res; +} + +static noinline +struct trans_logger_mref_aspect *hash_find(struct trans_logger_output *output, loff_t pos, int *max_len) +{ + int hash = hash_fn(pos); + struct hash_anchor *start = &output->hash_table[hash]; + struct trans_logger_mref_aspect *res; + unsigned int flags; + + traced_readlock(&start->hash_lock, flags); + + res = _hash_find(&start->hash_anchor, pos, max_len, false); + if (res) { - atomic_inc(&res->object->ref_count); // must be paired with _trans_logger_ref_put() + atomic_inc(&res->object->ref_count); // must be paired with __trans_logger_ref_put() atomic_inc(&output->inner_balance_count); - } else if (cond_insert_a) { - atomic_inc(&output->hash_count); - list_add(&cond_insert_a->hash_head, &start->hash_anchor); - cond_insert_a->is_hashed = true; } traced_readunlock(&start->hash_lock, flags); - *max_len = len; return res; } static noinline void hash_insert(struct trans_logger_output *output, struct trans_logger_mref_aspect *elem_a) { - loff_t base_index = elem_a->object->ref_pos >> REGION_SIZE_BITS; - int hash = hash_fn(base_index); + int hash = hash_fn(elem_a->object->ref_pos); struct hash_anchor *start = &output->hash_table[hash]; unsigned int flags; @@ -415,9 +357,9 @@ void hash_extend(struct trans_logger_output *output, loff_t *_pos, int *_len, st { loff_t pos = *_pos; int len = *_len; - loff_t base_index = pos >> REGION_SIZE_BITS; - int hash = hash_fn(base_index); + int hash = hash_fn(pos); struct hash_anchor *start = &output->hash_table[hash]; + struct list_head *tmp; bool extended; unsigned int flags; @@ -428,83 +370,62 @@ void hash_extend(struct trans_logger_output *output, loff_t *_pos, int *_len, st traced_readlock(&start->hash_lock, flags); do { - struct list_head *tmp; extended = false; for (tmp = start->hash_anchor.next; tmp != &start->hash_anchor; tmp = tmp->next) { struct trans_logger_mref_aspect *test_a; struct mref_object *test; + loff_t diff; test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head); test = test_a->object; // are the regions overlapping? - if (pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos /*|| test_a->is_collected*/) { + if (test_a->is_collected || pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) { continue; // not relevant } - - // collect upon the first time - if (collect_list && !test_a->is_collected) { - CHECK_HEAD_EMPTY(&test_a->collect_head); - test_a->is_collected = true; - atomic_inc(&test->ref_count); // must be paired with _trans_logger_ref_put() - atomic_inc(&output->inner_balance_count); - list_add_tail(&test_a->collect_head, collect_list); - } - + // extend the search region when necessary - if (test->ref_pos < pos) { - len += pos - test->ref_pos; + diff = pos - test->ref_pos; + if (diff > 0) { + len += diff; pos = test->ref_pos; extended = true; } - if (test->ref_pos + test->ref_len > pos + len) { - len += (pos + len) - (test->ref_pos + test->ref_len); + diff = (test->ref_pos + test->ref_len) - (pos + len); + if (diff > 0) { + len += diff; extended = true; } } } while (extended); // start over for transitive closure - traced_readunlock(&start->hash_lock, flags); - *_pos = pos; *_len = len; -} -static inline -bool hash_put(struct trans_logger_output *output, struct trans_logger_mref_aspect *elem_a) -{ - struct mref_object *elem = elem_a->object; - loff_t base_index = elem->ref_pos >> REGION_SIZE_BITS; - int hash = hash_fn(base_index); - struct hash_anchor *start = &output->hash_table[hash]; - unsigned int flags; - bool res = true; - - traced_writelock(&start->hash_lock, flags); - - CHECK_ATOMIC(&elem->ref_count, 1); - - if (!elem_a->is_hashed) { - goto done; - } -#ifndef ATOMIC_FREE - res = atomic_dec_and_test(&elem->ref_count); - atomic_dec(&output->inner_balance_count); -#endif - - if (res) { - list_del_init(&elem_a->hash_head); - elem_a->is_hashed = false; - atomic_dec(&output->hash_count); + for (tmp = start->hash_anchor.next; tmp != &start->hash_anchor; tmp = tmp->next) { + struct trans_logger_mref_aspect *test_a; + struct mref_object *test; + + test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head); + test = test_a->object; + + // are the regions overlapping? + if (test_a->is_collected || pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) { + continue; // not relevant + } + + // collect + CHECK_HEAD_EMPTY(&test_a->collect_head); + test_a->is_collected = true; + atomic_inc(&test->ref_count); // must be paired with __trans_logger_ref_put() + atomic_inc(&output->inner_balance_count); + list_add_tail(&test_a->collect_head, collect_list); } -done: - traced_writeunlock(&start->hash_lock, flags); - return res; + traced_readunlock(&start->hash_lock, flags); } -#ifdef ATOMIC_FREE /* Atomically put all elements from the list. * All elements must reside in the same collision list. */ @@ -513,21 +434,23 @@ void hash_put_all(struct trans_logger_output *output, struct list_head *list) { struct list_head *tmp; struct hash_anchor *start = NULL; + int first_hash = -1; unsigned int flags; for (tmp = list->next; tmp != list; tmp = tmp->next) { struct trans_logger_mref_aspect *elem_a; struct mref_object *elem; - loff_t base_index; int hash; - elem_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head); + elem_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head); elem = elem_a->object; + hash = hash_fn(elem->ref_pos); if (!start) { - base_index = elem->ref_pos >> REGION_SIZE_BITS; - hash = hash_fn(base_index); + first_hash = hash; start = &output->hash_table[hash]; traced_writelock(&start->hash_lock, flags); + } else if (unlikely(hash != first_hash)) { + MARS_ERR("oops, different hashes: %d != %d\n", hash, first_hash); } if (!elem_a->is_hashed) { @@ -543,7 +466,6 @@ void hash_put_all(struct trans_logger_output *output, struct list_head *list) traced_writeunlock(&start->hash_lock, flags); } } -#endif ////////////////// own brick / input / output operations ////////////////// @@ -556,9 +478,6 @@ int trans_logger_get_info(struct trans_logger_output *output, struct mars_info * return GENERIC_INPUT_CALL(input, mars_get_info, info); } -static noinline -void _trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref); - static noinline int _make_sshadow(struct trans_logger_output *output, struct trans_logger_mref_aspect *mref_a, struct trans_logger_mref_aspect *mshadow_a) { @@ -588,9 +507,6 @@ int _make_sshadow(struct trans_logger_output *output, struct trans_logger_mref_a /* Attach mref to the existing shadow ("slave shadow"). */ mref_a->shadow_data = mshadow_a->shadow_data + diff; -#ifdef BITMAP_CHECKS - mref_a->shadow_offset = diff; -#endif mref_a->do_dealloc = false; if (!mref->ref_data) { // buffered IO mref->ref_data = mref_a->shadow_data; @@ -623,7 +539,7 @@ int _read_ref_get(struct trans_logger_output *output, struct trans_logger_mref_a * the old one. * When a shadow is found, use it as buffer for the mref. */ - mshadow_a = hash_find(output, mref->ref_pos, &mref->ref_len, true, NULL); + mshadow_a = hash_find(output, mref->ref_pos, &mref->ref_len); if (!mshadow_a) { return GENERIC_INPUT_CALL(input, mref_get, mref); } @@ -639,7 +555,7 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_ #ifdef KEEP_UNIQUE struct trans_logger_mref_aspect *mshadow_a; - mshadow_a = hash_find(output, mref->ref_pos, &mref->ref_len, false, NULL); + mshadow_a = hash_find(output, mref->ref_pos, &mref->ref_len); if (mshadow_a) { return _make_sshadow(output, mref_a, mshadow_a); } @@ -683,27 +599,6 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_ MARS_ERR("oops, len = %d\n", mref->ref_len); return -EINVAL; } -#endif -#ifdef KEEP_UNIQUE -#ifdef DO_IGNORE - mref_a->ignore_this = true; - mshadow_a = hash_find(output, mref->ref_pos, &mref->ref_len, false, mref_a); - if (mshadow_a) { - MARS_INF("RACE DETECTED\n"); - mref_a->ignore_this = false; - mref->ref_page = NULL; - mref_a->shadow_data = NULL; - mref_a->do_dealloc = false; - mref_a->ignore_this = false; - mref_a->shadow_ref = NULL; - if (mref_a->do_buffered) { - mref->ref_data = NULL; - } - atomic_dec(&mref->ref_count); - atomic_dec(&output->inner_balance_count); - return _make_sshadow(output, mref_a, mshadow_a); - } -#endif #endif atomic_inc(&output->mshadow_count); @@ -735,9 +630,11 @@ int trans_logger_ref_get(struct trans_logger_output *output, struct mref_object CHECK_PTR(mref_a, err); CHECK_PTR(mref_a->object, err); + // ensure that REGION_SIZE boundaries are obeyed by hashing base_offset = mref->ref_pos & (loff_t)(REGION_SIZE - 1); - if (base_offset + mref->ref_len > REGION_SIZE) + if (mref->ref_len > REGION_SIZE - base_offset) { mref->ref_len = REGION_SIZE - base_offset; + } if (mref->ref_may_write == READ) { return _read_ref_get(output, mref_a); @@ -756,43 +653,43 @@ err: } static noinline -void __trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) +void __trans_logger_ref_put(struct trans_logger_output *output, struct trans_logger_mref_aspect *mref_a) { - struct trans_logger_mref_aspect *mref_a; + struct mref_object *mref; struct trans_logger_mref_aspect *shadow_a; struct trans_logger_input *input; MARS_IO("pos = %lld len = %d\n", mref->ref_pos, mref->ref_len); restart: + mref = mref_a->object; CHECK_ATOMIC(&mref->ref_count, 1); CHECK_PTR(output, err); - mref_a = trans_logger_mref_get_aspect(output, mref); - CHECK_PTR(mref_a, err); - CHECK_PTR(mref_a->object, err); - // are we a shadow? shadow_a = mref_a->shadow_ref; if (shadow_a) { unsigned long flags; bool finished; -#ifdef ATOMIC_FREE - hash_put(output, mref_a); + + CHECK_ATOMIC(&mref->ref_count, 1); finished = atomic_dec_and_test(&mref->ref_count); atomic_dec(&output->inner_balance_count); -#else - if (mref_a->is_hashed) { - finished = hash_put(output, mref_a); - } else { - atomic_dec(&output->inner_balance_count); - finished = atomic_dec_and_test(&mref->ref_count); + if (unlikely(finished && mref_a->is_hashed)) { + MARS_ERR("trying to put a hashed mref, pos = %lld len = %d\n", mref->ref_pos, mref->ref_len); + finished = false; // leaves a memleak } -#endif + if (!finished) { return; } + CHECK_HEAD_EMPTY(&mref_a->hash_head); + CHECK_HEAD_EMPTY(&mref_a->q_head); + CHECK_HEAD_EMPTY(&mref_a->replay_head); + CHECK_HEAD_EMPTY(&mref_a->collect_head); + CHECK_HEAD_EMPTY(&mref_a->sub_list); + CHECK_HEAD_EMPTY(&mref_a->sub_head); #if 1 // FIXME: do bookkeping here traced_lock(&output->brick->pos_lock, flags); list_del_init(&mref_a->pos_head); @@ -804,14 +701,11 @@ restart: CHECK_HEAD_EMPTY(&mref_a->hash_head); trans_logger_free_mref(mref); // now put the master shadow - mref = shadow_a->object; + mref_a = shadow_a; goto restart; } // we are a master shadow CHECK_PTR(mref_a->shadow_data, err); -#ifdef BITMAP_CHECKS - CHECK_BITMAP(mref, mref_a); -#endif if (mref_a->do_dealloc) { #ifdef USE_KMALLOC kfree(mref_a->shadow_data); @@ -837,6 +731,21 @@ err: MARS_FAT("oops\n"); } +static noinline +void _trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) +{ + struct trans_logger_mref_aspect *mref_a; + + mref_a = trans_logger_mref_get_aspect(output, mref); + CHECK_PTR(mref_a, err); + + __trans_logger_ref_put(output, mref_a); + return; + +err: + MARS_FAT("giving up...\n"); +} + static noinline void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) { @@ -844,13 +753,6 @@ void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object _trans_logger_ref_put(output, mref); } -static noinline -void _trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) -{ - //atomic_dec(&output->inner_balance_count); - __trans_logger_ref_put(output, mref); -} - static noinline void _trans_logger_endio(struct generic_callback *cb) { @@ -960,11 +862,15 @@ void free_writeback(struct writeback_info *wb) orig_mref = orig_mref_a->object; CHECK_ATOMIC(&orig_mref->ref_count, 1); - _trans_logger_ref_put(orig_mref_a->output, orig_mref); +#ifdef CLEAN_ALL + __trans_logger_ref_put(orig_mref_a->output, orig_mref_a); +#endif } //... +#ifndef WB_MEMLEAK //FIXME: introduce refcount - //kfree(wb); + kfree(wb); +#endif } static noinline @@ -974,7 +880,6 @@ void wb_endio(struct generic_callback *cb) struct mref_object *sub_mref; struct trans_logger_output *output; struct writeback_info *wb; - struct trans_logger_mref_aspect *base_mref_a; int rw; atomic_t *dec; void (*endio)(struct generic_callback *cb); @@ -988,11 +893,6 @@ void wb_endio(struct generic_callback *cb) wb = sub_mref_a->wb; CHECK_PTR(wb, err); - base_mref_a = sub_mref_a->base_mref_a; - if (base_mref_a) { - _trans_logger_ref_put(output, base_mref_a->object); - } - atomic_dec(&output->wb_balance_count); rw = sub_mref->ref_rw; @@ -1018,7 +918,6 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t struct trans_logger_brick *brick = output->brick; struct trans_logger_input *sub_input = brick->inputs[0]; struct writeback_info *wb = kzalloc(sizeof(struct writeback_info), GFP_MARS); - struct trans_logger_mref_aspect *base_mref_a = NULL; if (!wb) { goto err; } @@ -1029,22 +928,30 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t wb->w_pos = pos; wb->w_len = len; + if (unlikely(len < 0)) { + MARS_ERR("len = %d\n", len); + } hash_extend(output, &wb->w_pos, &wb->w_len, &wb->w_collect_list); pos = wb->w_pos; len = wb->w_len; -#ifdef X3 + + if (unlikely(len < 0)) { + MARS_ERR("len = %d\n", len); + } + while (len > 0) { struct trans_logger_mref_aspect *sub_mref_a; struct mref_object *sub_mref; + struct trans_logger_mref_aspect *base_mref_a; struct mref_object *base_mref; void *data; int this_len = len; int diff; int status; - base_mref_a = hash_find(output, pos, &this_len, true, NULL); + base_mref_a = _hash_find(&wb->w_collect_list, pos, &this_len, true); if (unlikely(!base_mref_a)) { MARS_FAT("could not find data\n"); goto err; @@ -1057,7 +964,6 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t } data = base_mref_a->shadow_data + diff; -#ifdef X4 sub_mref = trans_logger_alloc_mref((void*)output, &output->writeback_layout); if (unlikely(!sub_mref)) { MARS_FAT("cannot alloc sub_mref\n"); @@ -1091,31 +997,21 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t #else memset(sub_mref->ref_data, 0xff, sub_mref->ref_len); #endif - _trans_logger_ref_put(output, base_mref_a->object); -#else - sub_mref_a->base_mref_a = base_mref_a; #endif - base_mref_a = NULL; list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_write_list); atomic_inc(&wb->w_sub_write_count); atomic_inc(&output->wb_balance_count); this_len = sub_mref->ref_len; -#else - _trans_logger_ref_put(output, base_mref_a->object); -#endif pos += this_len; len -= this_len; } -#endif + return wb; err: MARS_ERR("cleaning up...\n"); - if (base_mref_a) { - _trans_logger_ref_put(output, base_mref_a->object); - } if (wb) { free_writeback(wb); } @@ -1146,11 +1042,7 @@ void fire_writeback(struct writeback_info *wb, struct list_head *start) cb->cb_prev = NULL; sub_mref->ref_cb = cb; -#ifdef X5 GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref); -#else - wb_endio(cb); -#endif GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref); } } @@ -1182,12 +1074,10 @@ void phase1_endio(void *private, int error) orig_cb = orig_mref->ref_cb; CHECK_PTR(orig_cb, err); -#ifdef BITMAP_CHECKS - orig_mref_a->shadow_ref->end_phase1++; -#endif // error handling - if (error < 0) + if (error < 0) { orig_cb->cb_error = error; + } // signal completion to the upper layer, as early as possible if (likely(orig_cb->cb_error >= 0)) { @@ -1199,9 +1089,13 @@ void phase1_endio(void *private, int error) orig_cb->cb_fn(orig_cb); // queue up for the next phase + atomic_inc(&orig_mref->ref_count); // must be paired with __trans_logger_ref_put() + atomic_inc(&output->inner_balance_count); q_insert(&output->q_phase2, orig_mref_a); wake_up_interruptible(&output->event); -err: ; + return; +err: + MARS_ERR("giving up...\n"); } static noinline @@ -1236,9 +1130,6 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a) goto err; } -#ifdef BITMAP_CHECKS - orig_mref_a->shadow_ref->start_phase1++; -#endif memcpy(data, orig_mref_a->shadow_data, orig_mref->ref_len); ok = log_finalize(&brick->logst, orig_mref->ref_len, phase1_endio, orig_mref_a); @@ -1294,7 +1185,7 @@ bool phase0_startio(struct trans_logger_mref_aspect *mref_a) mref->ref_flags |= MREF_UPTODATE; cb->cb_fn(cb); - _trans_logger_ref_put(output, mref); + __trans_logger_ref_put(output, mref_a); return true; } @@ -1323,23 +1214,12 @@ bool phase0_startio(struct trans_logger_mref_aspect *mref_a) #endif mref_a->is_dirty = true; mref_a->shadow_ref->is_dirty = true; -#ifdef BITMAP_CHECKS - mref_a->shadow_ref->bitmap_write++; - bitmap_op(mref_a->shadow_ref->dirty_bitmap, mref_a->shadow_offset/512, mref->ref_len/512, REG_OP_ALLOC); - bitmap_op(mref_a->shadow_ref->touch_bitmap, mref_a->shadow_offset/512, mref->ref_len/512, REG_OP_ALLOC); - bitmap_op(mref_a->shadow_ref->work_bitmap, mref_a->shadow_offset/512, mref->ref_len/512, REG_OP_ALLOC); - if (mref_a->shadow_ref != mref_a) { - mref_a->shadow_ref->bitmap_write_slave++; - bitmap_op(mref_a->shadow_ref->slave_bitmap, mref_a->shadow_offset/512, mref->ref_len/512, REG_OP_ALLOC); - } -#endif #ifndef KEEP_UNIQUE if (unlikely(mref_a->shadow_ref != mref_a)) { MARS_ERR("something is wrong: %p != %p\n", mref_a->shadow_ref, mref_a); } #endif if (!mref_a->is_hashed) { - mref_a->ignore_this = false; MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos); hash_insert(output, mref_a); } @@ -1360,31 +1240,35 @@ err: * old version from disk somewhen later, e.g. when IO contention is low. */ -#ifdef NEW_CODE +atomic_t provisionary_count = ATOMIC_INIT(0); static noinline void new_endio(struct generic_callback *cb) { struct trans_logger_mref_aspect *sub_mref_a; struct writeback_info *wb; + struct trans_logger_output *output; CHECK_PTR(cb, err); sub_mref_a = cb->cb_private; CHECK_PTR(sub_mref_a, err); wb = sub_mref_a->wb; CHECK_PTR(wb, err); + output = wb->w_output; + CHECK_PTR(output, err); if (unlikely(cb->cb_error < 0)) { MARS_FAT("IO error %d\n", cb->cb_error); goto err; } -#ifdef ATOMIC_FREE hash_put_all(wb->w_output, &wb->w_collect_list); -#endif free_writeback(wb); + atomic_dec(&provisionary_count); + wake_up_interruptible(&output->event); + return; err: @@ -1408,7 +1292,6 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) MARS_IO("already collected, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len); goto done; } -#ifdef X1 if (!orig_mref_a->is_hashed) { MARS_IO("AHA not hashed, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len); goto done; @@ -1417,27 +1300,26 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) if (unlikely(!wb)) { goto err; } + + atomic_inc(&provisionary_count); + if (unlikely(list_empty(&wb->w_collect_list))) { - MARS_IO("collection list is empty\n"); + MARS_ERR("collection list is empty, orig pos = %lld len = %d (collected=%d), extended pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len, (int)orig_mref_a->is_collected, wb->w_pos, wb->w_len); free_writeback(wb); goto done; } -#ifdef X2 - wb->write_endio = new_endio; - - if (list_empty(&wb->w_sub_write_list)) { + if (unlikely(list_empty(&wb->w_sub_write_list))) { + MARS_ERR("hmmm.... this should not happen\n"); free_writeback(wb); - } else { - fire_writeback(wb, &wb->w_sub_write_list); + goto done; } -#else - free_writeback(wb); -#endif -#endif + + wb->write_endio = new_endio; + fire_writeback(wb, &wb->w_sub_write_list); done: #ifdef CLEAN_ALL - _trans_logger_ref_put(output, orig_mref); + __trans_logger_ref_put(orig_mref_a->output, orig_mref_a); #endif return true; @@ -1445,185 +1327,6 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) return false; } -#else // NEW_CODE - -static noinline -void _phase2_endio(struct trans_logger_mref_aspect *orig_mref_a) -{ - struct trans_logger_output *output; - - output = orig_mref_a->output; - CHECK_PTR(output, err); - - // queue up for the next phase - if (output->brick->log_reads) { - q_insert(&output->q_phase3, orig_mref_a); - } else { - q_insert(&output->q_phase4, orig_mref_a); - } - wake_up_interruptible(&output->event); - return; - -err: - MARS_FAT("hanging up....\n"); -} - -static noinline -void phase2_endio(struct generic_callback *cb) -{ - struct trans_logger_mref_aspect *sub_mref_a; - struct trans_logger_output *output; - struct trans_logger_mref_aspect *orig_mref_a; - - CHECK_PTR(cb, err); - sub_mref_a = cb->cb_private; - CHECK_PTR(sub_mref_a, err); - output = sub_mref_a->output; - CHECK_PTR(output, err); - orig_mref_a = sub_mref_a->orig_mref_a; - CHECK_PTR(orig_mref_a, err); - - atomic_dec(&output->q_phase2.q_flying); - - if (unlikely(cb->cb_error < 0)) { - MARS_FAT("IO error %d\n", cb->cb_error); - goto err; - } - - if (atomic_dec_and_test(&orig_mref_a->current_sub_count)) { - _phase2_endio(orig_mref_a); - } - return; - -err: - MARS_FAT("hanging up....\n"); -} - -static noinline -bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) -{ - struct mref_object *orig_mref; - struct trans_logger_output *output; - struct trans_logger_input *sub_input; - struct trans_logger_brick *brick; - struct generic_callback *cb; - loff_t margin; - loff_t pos; - int len; - int status; - - CHECK_PTR(orig_mref_a, err); - orig_mref = orig_mref_a->object; - CHECK_PTR(orig_mref, err); - output = orig_mref_a->output; - CHECK_PTR(output, err); - brick = output->brick; - CHECK_PTR(brick, err); - sub_input = brick->inputs[0]; - CHECK_PTR(sub_input, err); - - pos = orig_mref->ref_pos; - len = orig_mref->ref_len; - -#ifdef DO_EXTEND - hash_extend(output, &pos, &len, NULL); -#endif - -#ifdef BITMAP_CHECKS - orig_mref_a->shadow_ref->start_phase2++; -#endif - margin = orig_mref_a->fetch_margin; -#ifdef DO_SKIP - if (pos < margin && margin >= brick->old_margin) { - int diff = margin - pos; - MARS_DBG("skip = %d margin = %lld at pos = %lld len = %d newlen = %d\n", diff, margin, pos, len, len - diff); - pos = margin; - len -= diff; - } -#endif - brick->old_margin = margin; - - /* allocate internal sub_mref for further work - */ - if (unlikely(!list_empty(&orig_mref_a->sub_list))) { - MARS_ERR("oops, list is not empty\n"); - } - while (len > 0) { - struct mref_object *sub_mref; - struct trans_logger_mref_aspect *sub_mref_a; - sub_mref = trans_logger_alloc_mref((void*)output, &output->writeback_layout); - if (unlikely(!sub_mref)) { - MARS_FAT("cannot alloc sub_mref\n"); - goto err; - } - - sub_mref->ref_pos = pos; - sub_mref->ref_len = len; - sub_mref->ref_may_write = WRITE; - sub_mref->ref_rw = READ; // for now - sub_mref->ref_data = NULL; // for now use buffered IO. // TODO: use direct IO, circumvent memcpy() - - sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)output, sub_mref); - CHECK_PTR(sub_mref_a, err); - sub_mref_a->stamp = orig_mref_a->stamp; - sub_mref_a->orig_mref_a = orig_mref_a; - sub_mref_a->output = output; - - status = GENERIC_INPUT_CALL(sub_input, mref_get, sub_mref); - if (unlikely(status < 0)) { - MARS_FAT("cannot get sub_ref, status = %d\n", status); - goto err; - } - -#ifdef BITMAP_CHECKS - orig_mref_a->shadow_ref->sub_count++; - bitmap_op(orig_mref_a->shadow_ref->work_bitmap, (sub_mref->ref_pos - orig_mref->ref_pos + orig_mref_a->shadow_offset)/512, sub_mref->ref_len/512, REG_OP_RELEASE); -#endif - mars_trace(sub_mref, "sub_create"); - - atomic_inc(&output->sub_balance_count); - pos += sub_mref->ref_len; - len -= sub_mref->ref_len; - - CHECK_ATOMIC(&orig_mref->ref_count, 1); - - cb = &sub_mref_a->cb; - cb->cb_fn = phase2_endio; - cb->cb_private = sub_mref_a; - cb->cb_error = 0; - cb->cb_prev = NULL; - sub_mref->ref_cb = cb; - sub_mref->ref_rw = READ; - sub_mref->ref_prio = output->q_phase2.q_io_prio; - - list_add_tail(&sub_mref_a->sub_head, &orig_mref_a->sub_list); - orig_mref_a->total_sub_count++; - } - - if (output->brick->log_reads && orig_mref_a->total_sub_count > 0) { - struct list_head *tmp; - - atomic_set(&orig_mref_a->current_sub_count, orig_mref_a->total_sub_count); - for (tmp = orig_mref_a->sub_list.next; tmp != &orig_mref_a->sub_list; tmp = tmp->next) { - struct trans_logger_mref_aspect *sub_mref_a; - struct mref_object *sub_mref; - sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head); - sub_mref = sub_mref_a->object; - atomic_inc(&output->q_phase2.q_flying); - mars_trace(sub_mref, "sub_read"); - GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref); - } - wake_up_interruptible(&output->event); - } else { - _phase2_endio(orig_mref_a); - } - return true; - -err: - return false; -} - -#endif // NEW_CODE /********************************************************************* * Phase 3: log the old disk version. @@ -1796,7 +1499,7 @@ void _phase4_endio(struct trans_logger_mref_aspect *orig_mref_a) //... } else { #ifdef CLEAN_ALL - _trans_logger_ref_put(orig_mref_a->output, orig_mref); + __trans_logger_ref_put(orig_mref_a->output, orig_mref_a); #endif } wake_up_interruptible(&output->event); @@ -1863,7 +1566,7 @@ bool get_newest_data(struct trans_logger_output *output, void *buf, loff_t pos, int diff; int this_len = len; - src_a = hash_find(output, pos, &this_len, true, NULL); + src_a = hash_find(output, pos, &this_len); if (unlikely(!src_a)) { MARS_ERR("data is GONE at pos = %lld len = %d\n", pos, len); return false; @@ -1895,16 +1598,12 @@ bool get_newest_data(struct trans_logger_output *output, void *buf, loff_t pos, #else memset(buf, 0xff, this_len); #endif -#ifdef BITMAP_CHECKS - src_a->shadow_ref->bitmap_read++; - bitmap_op(src_a->shadow_ref->dirty_bitmap, (src_a->shadow_offset + diff)/512, this_len/512, REG_OP_RELEASE); -#endif len -= this_len; pos += this_len; buf += this_len; - _trans_logger_ref_put(output, src); + __trans_logger_ref_put(output, src_a); } return true; } @@ -2480,6 +2179,7 @@ int trans_logger_switch(struct trans_logger_brick *brick) } return 0; } + //////////////// informational / statistics /////////////// static noinline @@ -2573,6 +2273,8 @@ int trans_logger_output_construct(struct trans_logger_output *output) output->q_phase2.q_dep = &output->q_phase3; output->q_phase3.q_dep = &output->q_phase4; output->q_phase4.q_dep = &output->q_phase1; + + output->q_phase2.q_dep_plus = &provisionary_count; #endif output->q_phase1.q_insert_info = "q1_ins"; output->q_phase1.q_pushback_info = "q1_push"; diff --git a/mars_trans_logger.h b/mars_trans_logger.h index 6055324d..e2353008 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -6,11 +6,6 @@ #define REGION_SIZE (1 << REGION_SIZE_BITS) #define TRANS_HASH_MAX 512 -//#define BITMAP_CHECKS -#ifdef BITMAP_CHECKS -#include -#endif - #include #include "log_format.h" #include "pairing_heap.h" @@ -21,6 +16,7 @@ _PAIRING_HEAP_TYPEDEF(mref,) struct logger_queue { struct logger_queue *q_dep; + atomic_t *q_dep_plus; struct trans_logger_output *q_output; struct list_head q_anchor; struct pairing_heap_mref *heap_high; @@ -82,32 +78,17 @@ struct trans_logger_mref_aspect { bool is_hashed; bool is_dirty; bool is_collected; - bool ignore_this; struct timespec stamp; loff_t log_pos; loff_t fetch_margin; struct generic_callback cb; struct trans_logger_mref_aspect *orig_mref_a; struct writeback_info *wb; - struct trans_logger_mref_aspect *base_mref_a; + //struct trans_logger_mref_aspect *base_mref_a; struct list_head sub_list; struct list_head sub_head; int total_sub_count; atomic_t current_sub_count; -#ifdef BITMAP_CHECKS - int shadow_offset; - int bitmap_write; - int bitmap_write_slave; - int bitmap_read; - int start_phase1; - int end_phase1; - int start_phase2; - int sub_count; - unsigned long dirty_bitmap[4]; - unsigned long touch_bitmap[4]; - unsigned long slave_bitmap[4]; - unsigned long work_bitmap[4]; -#endif }; struct trans_logger_brick {