diff --git a/mars_generic.c b/mars_generic.c index f2befba9..eb198947 100644 --- a/mars_generic.c +++ b/mars_generic.c @@ -1222,7 +1222,7 @@ struct mars_brick *make_brick_all( if (!brick && new_brick_type == _bio_brick_type && _aio_brick_type) { struct kstat test = {}; int status = mars_stat(new_path, &test, false); - if (status < 0 || !S_ISBLK(test.mode)) { + if (true || status < 0 || !S_ISBLK(test.mode)) { new_brick_type = _aio_brick_type; MARS_DBG("substitute bio by aio\n"); } diff --git a/mars_trans_logger.c b/mars_trans_logger.c index d6dd499b..a5f45d53 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -12,14 +12,21 @@ //#define USE_KMALLOC #define USE_HIGHER_PHASES #define APPLY_DATA -#define CLEAN_ALL -//#define KEEP_UNIQUE -#define NOTRASH_DATA //#define DO_SKIP //#define DO_IGNORE // FIXME or DELETE #define DO_EXTEND #define NEW_CODE +//#define KEEP_UNIQUE +#define CLEAN_ALL +#define WB_COPY +#define X1 // kein make_writeback() etc +#define X2 // kein fire_writeback(), stattdessen sofort free_writeback() +#define X3 // kein hashing in make_writeback +#define X4 // erzeuge keine sub_mrefs in make_writeback +#define X5 // keine IOs +#define NOTRASH_DATA +#define ATOMIC_FREE #include #include @@ -406,13 +413,11 @@ void hash_insert(struct trans_logger_output *output, struct trans_logger_mref_as static noinline void hash_extend(struct trans_logger_output *output, loff_t *_pos, int *_len, struct list_head *collect_list) { - static atomic_t generation = ATOMIC_INIT(0); loff_t pos = *_pos; int len = *_len; loff_t base_index = pos >> REGION_SIZE_BITS; int hash = hash_fn(base_index); struct hash_anchor *start = &output->hash_table[hash]; - int my_generation; bool extended; unsigned int flags; @@ -420,8 +425,6 @@ void hash_extend(struct trans_logger_output *output, loff_t *_pos, int *_len, st CHECK_HEAD_EMPTY(collect_list); } - do { my_generation = atomic_add_return(1, &generation); } while (!my_generation); // avoid 0 - traced_readlock(&start->hash_lock, flags); do { @@ -436,15 +439,16 @@ void hash_extend(struct trans_logger_output *output, loff_t *_pos, int *_len, st test = test_a->object; // are the regions overlapping? - if (pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) { + if (pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos /*|| test_a->is_collected*/) { continue; // not relevant } // collect upon the first time - if (collect_list && test_a->collect_generation != my_generation) { - test_a->collect_generation = my_generation; + if (collect_list && !test_a->is_collected) { + CHECK_HEAD_EMPTY(&test_a->collect_head); test_a->is_collected = true; atomic_inc(&test->ref_count); // must be paired with _trans_logger_ref_put() + atomic_inc(&output->inner_balance_count); list_add_tail(&test_a->collect_head, collect_list); } @@ -475,13 +479,19 @@ bool hash_put(struct trans_logger_output *output, struct trans_logger_mref_aspec int hash = hash_fn(base_index); struct hash_anchor *start = &output->hash_table[hash]; unsigned int flags; - bool res; + bool res = true; traced_writelock(&start->hash_lock, flags); CHECK_ATOMIC(&elem->ref_count, 1); + + if (!elem_a->is_hashed) { + goto done; + } +#ifndef ATOMIC_FREE res = atomic_dec_and_test(&elem->ref_count); atomic_dec(&output->inner_balance_count); +#endif if (res) { list_del_init(&elem_a->hash_head); @@ -489,10 +499,52 @@ bool hash_put(struct trans_logger_output *output, struct trans_logger_mref_aspec atomic_dec(&output->hash_count); } +done: traced_writeunlock(&start->hash_lock, flags); return res; } +#ifdef ATOMIC_FREE +/* Atomically put all elements from the list. + * All elements must reside in the same collision list. + */ +static inline +void hash_put_all(struct trans_logger_output *output, struct list_head *list) +{ + struct list_head *tmp; + struct hash_anchor *start = NULL; + unsigned int flags; + + for (tmp = list->next; tmp != list; tmp = tmp->next) { + struct trans_logger_mref_aspect *elem_a; + struct mref_object *elem; + loff_t base_index; + int hash; + + elem_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head); + elem = elem_a->object; + if (!start) { + base_index = elem->ref_pos >> REGION_SIZE_BITS; + hash = hash_fn(base_index); + start = &output->hash_table[hash]; + traced_writelock(&start->hash_lock, flags); + } + + if (!elem_a->is_hashed) { + continue; + } + + list_del_init(&elem_a->hash_head); + elem_a->is_hashed = false; + atomic_dec(&output->hash_count); + } + + if (start) { + traced_writeunlock(&start->hash_lock, flags); + } +} +#endif + ////////////////// own brick / input / output operations ////////////////// static atomic_t global_mshadow_count = ATOMIC_INIT(0); @@ -726,12 +778,18 @@ restart: if (shadow_a) { unsigned long flags; bool finished; +#ifdef ATOMIC_FREE + hash_put(output, mref_a); + finished = atomic_dec_and_test(&mref->ref_count); + atomic_dec(&output->inner_balance_count); +#else if (mref_a->is_hashed) { finished = hash_put(output, mref_a); } else { - finished = atomic_dec_and_test(&mref->ref_count); atomic_dec(&output->inner_balance_count); + finished = atomic_dec_and_test(&mref->ref_count); } +#endif if (!finished) { return; } @@ -890,8 +948,23 @@ err: static noinline void free_writeback(struct writeback_info *wb) { + struct list_head *tmp; + + while ((tmp = wb->w_collect_list.next) != &wb->w_collect_list) { + struct trans_logger_mref_aspect *orig_mref_a; + struct mref_object *orig_mref; + + list_del_init(tmp); + + orig_mref_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head); + orig_mref = orig_mref_a->object; + + CHECK_ATOMIC(&orig_mref->ref_count, 1); + _trans_logger_ref_put(orig_mref_a->output, orig_mref); + } //... - kfree(wb); +//FIXME: introduce refcount + //kfree(wb); } static noinline @@ -901,6 +974,7 @@ void wb_endio(struct generic_callback *cb) struct mref_object *sub_mref; struct trans_logger_output *output; struct writeback_info *wb; + struct trans_logger_mref_aspect *base_mref_a; int rw; atomic_t *dec; void (*endio)(struct generic_callback *cb); @@ -914,11 +988,17 @@ void wb_endio(struct generic_callback *cb) wb = sub_mref_a->wb; CHECK_PTR(wb, err); + base_mref_a = sub_mref_a->base_mref_a; + if (base_mref_a) { + _trans_logger_ref_put(output, base_mref_a->object); + } + atomic_dec(&output->wb_balance_count); rw = sub_mref->ref_rw; dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count; - if (atomic_dec_and_test(dec)) { + CHECK_ATOMIC(dec, 1); + if (!atomic_dec_and_test(dec)) { return; } @@ -954,7 +1034,7 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t pos = wb->w_pos; len = wb->w_len; - +#ifdef X3 while (len > 0) { struct trans_logger_mref_aspect *sub_mref_a; struct mref_object *sub_mref; @@ -977,7 +1057,8 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t } data = base_mref_a->shadow_data + diff; - sub_mref = trans_logger_alloc_mref((void*)output, &brick->logst.ref_object_layout); +#ifdef X4 + sub_mref = trans_logger_alloc_mref((void*)output, &output->writeback_layout); if (unlikely(!sub_mref)) { MARS_FAT("cannot alloc sub_mref\n"); goto err; @@ -987,34 +1068,51 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t sub_mref->ref_len = this_len; sub_mref->ref_may_write = WRITE; sub_mref->ref_rw = WRITE; +#ifdef WB_COPY + sub_mref->ref_data = NULL; +#else sub_mref->ref_data = data; +#endif sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)output, sub_mref); CHECK_PTR(sub_mref_a, err); sub_mref_a->output = output; sub_mref_a->wb = wb; - sub_mref_a->base_mref_a = base_mref_a; - base_mref_a = NULL; status = GENERIC_INPUT_CALL(sub_input, mref_get, sub_mref); if (unlikely(status < 0)) { MARS_FAT("cannot get sub_ref, status = %d\n", status); goto err; } +#ifdef WB_COPY +#ifdef NOTRASH_DATA + memcpy(sub_mref->ref_data, data, sub_mref->ref_len); +#else + memset(sub_mref->ref_data, 0xff, sub_mref->ref_len); +#endif + _trans_logger_ref_put(output, base_mref_a->object); +#else + sub_mref_a->base_mref_a = base_mref_a; +#endif + base_mref_a = NULL; list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_write_list); atomic_inc(&wb->w_sub_write_count); atomic_inc(&output->wb_balance_count); this_len = sub_mref->ref_len; +#else + _trans_logger_ref_put(output, base_mref_a->object); +#endif pos += this_len; len -= this_len; } - +#endif return wb; err: + MARS_ERR("cleaning up...\n"); if (base_mref_a) { _trans_logger_ref_put(output, base_mref_a->object); } @@ -1047,12 +1145,17 @@ void fire_writeback(struct writeback_info *wb, struct list_head *start) cb->cb_error = 0; cb->cb_prev = NULL; sub_mref->ref_cb = cb; - + +#ifdef X5 GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref); +#else + wb_endio(cb); +#endif GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref); } } + ////////////////////////////// worker thread ////////////////////////////// /********************************************************************* @@ -1236,7 +1339,6 @@ bool phase0_startio(struct trans_logger_mref_aspect *mref_a) } #endif if (!mref_a->is_hashed) { - mref_a->is_hashed = true; mref_a->ignore_this = false; MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos); hash_insert(output, mref_a); @@ -1264,15 +1366,11 @@ static noinline void new_endio(struct generic_callback *cb) { struct trans_logger_mref_aspect *sub_mref_a; - struct trans_logger_output *output; struct writeback_info *wb; - struct list_head *tmp; CHECK_PTR(cb, err); sub_mref_a = cb->cb_private; CHECK_PTR(sub_mref_a, err); - output = sub_mref_a->output; - CHECK_PTR(output, err); wb = sub_mref_a->wb; CHECK_PTR(wb, err); @@ -1281,17 +1379,12 @@ void new_endio(struct generic_callback *cb) goto err; } - while ((tmp = wb->w_collect_list.next) != &wb->w_collect_list) { - struct trans_logger_mref_aspect *orig_mref_a; - struct mref_object *orig_mref; +#ifdef ATOMIC_FREE + hash_put_all(wb->w_output, &wb->w_collect_list); +#endif - list_del_init(tmp); - orig_mref_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head); - orig_mref = orig_mref_a->object; + free_writeback(wb); - GENERIC_INPUT_CALL(output->brick->inputs[0], mref_put, orig_mref); - } - return; err: @@ -1315,15 +1408,32 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) MARS_IO("already collected, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len); goto done; } - +#ifdef X1 + if (!orig_mref_a->is_hashed) { + MARS_IO("AHA not hashed, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len); + goto done; + } wb = make_writeback(output, orig_mref->ref_pos, orig_mref->ref_len); - if (!wb) { + if (unlikely(!wb)) { goto err; } - + if (unlikely(list_empty(&wb->w_collect_list))) { + MARS_IO("collection list is empty\n"); + free_writeback(wb); + goto done; + } +#ifdef X2 wb->write_endio = new_endio; - fire_writeback(wb, &wb->w_sub_write_list); + if (list_empty(&wb->w_sub_write_list)) { + free_writeback(wb); + } else { + fire_writeback(wb, &wb->w_sub_write_list); + } +#else + free_writeback(wb); +#endif +#endif done: #ifdef CLEAN_ALL @@ -1441,7 +1551,7 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) while (len > 0) { struct mref_object *sub_mref; struct trans_logger_mref_aspect *sub_mref_a; - sub_mref = trans_logger_alloc_mref((void*)output, &brick->logst.ref_object_layout); + sub_mref = trans_logger_alloc_mref((void*)output, &output->writeback_layout); if (unlikely(!sub_mref)) { MARS_FAT("cannot alloc sub_mref\n"); goto err; @@ -2382,9 +2492,9 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose) // FIXME: check for allocation overflows - sprintf(res, "total reads=%d writes=%d writeback=%d shortcut=%d (%d%%) mshadow=%d sshadow=%d phase1=%d phase2=%d phase3=%d phase4=%d | mshadow=%d sshadow=%d hash_count=%d balance=%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", + sprintf(res, "total reads=%d writes=%d writeback=%d shortcut=%d (%d%%) mshadow=%d sshadow=%d phase1=%d phase2=%d phase3=%d phase4=%d | mshadow=%d sshadow=%d hash_count=%d balance=%d/%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", atomic_read(&output->total_read_count), atomic_read(&output->total_write_count), atomic_read(&output->total_writeback_count), atomic_read(&output->total_shortcut_count), atomic_read(&output->total_writeback_count) ? atomic_read(&output->total_shortcut_count) * 100 / atomic_read(&output->total_writeback_count) : 0, atomic_read(&output->total_mshadow_count), atomic_read(&output->total_sshadow_count), atomic_read(&output->q_phase1.q_total), atomic_read(&output->q_phase2.q_total), atomic_read(&output->q_phase3.q_total), atomic_read(&output->q_phase4.q_total), - atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->sub_balance_count), atomic_read(&output->inner_balance_count), atomic_read(&output->outer_balance_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying)); + atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->sub_balance_count), atomic_read(&output->inner_balance_count), atomic_read(&output->outer_balance_count), atomic_read(&output->wb_balance_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying)); return res; } diff --git a/mars_trans_logger.h b/mars_trans_logger.h index 3e34a0c6..6055324d 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -94,7 +94,6 @@ struct trans_logger_mref_aspect { struct list_head sub_head; int total_sub_count; atomic_t current_sub_count; - int collect_generation; #ifdef BITMAP_CHECKS int shadow_offset; int bitmap_write; @@ -158,6 +157,7 @@ struct trans_logger_output { atomic_t total_sshadow_count; struct task_struct *thread; wait_queue_head_t event; + struct generic_object_layout writeback_layout; struct generic_object_layout replay_layout; // queues struct logger_queue q_phase1;