diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 011ef262..d6dd499b 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -19,6 +19,8 @@ //#define DO_IGNORE // FIXME or DELETE #define DO_EXTEND +#define NEW_CODE + #include #include #include @@ -441,6 +443,7 @@ void hash_extend(struct trans_logger_output *output, loff_t *_pos, int *_len, st // collect upon the first time if (collect_list && test_a->collect_generation != my_generation) { test_a->collect_generation = my_generation; + test_a->is_collected = true; atomic_inc(&test->ref_count); // must be paired with _trans_logger_ref_put() list_add_tail(&test_a->collect_head, collect_list); } @@ -882,6 +885,174 @@ err: MARS_FAT("cannot handle IO\n"); } +////////////////////////////// writeback info ////////////////////////////// + +static noinline +void free_writeback(struct writeback_info *wb) +{ + //... + kfree(wb); +} + +static noinline +void wb_endio(struct generic_callback *cb) +{ + struct trans_logger_mref_aspect *sub_mref_a; + struct mref_object *sub_mref; + struct trans_logger_output *output; + struct writeback_info *wb; + int rw; + atomic_t *dec; + void (*endio)(struct generic_callback *cb); + + sub_mref_a = cb->cb_private; + CHECK_PTR(sub_mref_a, err); + sub_mref = sub_mref_a->object; + CHECK_PTR(sub_mref, err); + output = sub_mref_a->output; + CHECK_PTR(output, err); + wb = sub_mref_a->wb; + CHECK_PTR(wb, err); + + atomic_dec(&output->wb_balance_count); + + rw = sub_mref->ref_rw; + dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count; + if (atomic_dec_and_test(dec)) { + return; + } + + endio = rw ? wb->write_endio : wb->read_endio; + if (endio) { + endio(cb); + } + return; + +err: + MARS_FAT("hanging up....\n"); +} + +static noinline +struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t pos, int len) +{ + struct trans_logger_brick *brick = output->brick; + struct trans_logger_input *sub_input = brick->inputs[0]; + struct writeback_info *wb = kzalloc(sizeof(struct writeback_info), GFP_MARS); + struct trans_logger_mref_aspect *base_mref_a = NULL; + if (!wb) { + goto err; + } + INIT_LIST_HEAD(&wb->w_collect_list); + INIT_LIST_HEAD(&wb->w_sub_read_list); + INIT_LIST_HEAD(&wb->w_sub_write_list); + wb->w_output = output; + + wb->w_pos = pos; + wb->w_len = len; + + hash_extend(output, &wb->w_pos, &wb->w_len, &wb->w_collect_list); + + pos = wb->w_pos; + len = wb->w_len; + + while (len > 0) { + struct trans_logger_mref_aspect *sub_mref_a; + struct mref_object *sub_mref; + struct mref_object *base_mref; + void *data; + int this_len = len; + int diff; + int status; + + base_mref_a = hash_find(output, pos, &this_len, true, NULL); + if (unlikely(!base_mref_a)) { + MARS_FAT("could not find data\n"); + goto err; + } + base_mref = base_mref_a->object; + diff = pos - base_mref->ref_pos; + if (unlikely(diff < 0)) { + MARS_FAT("bad diff %d\n", diff); + goto err; + } + data = base_mref_a->shadow_data + diff; + + sub_mref = trans_logger_alloc_mref((void*)output, &brick->logst.ref_object_layout); + if (unlikely(!sub_mref)) { + MARS_FAT("cannot alloc sub_mref\n"); + goto err; + } + + sub_mref->ref_pos = pos; + sub_mref->ref_len = this_len; + sub_mref->ref_may_write = WRITE; + sub_mref->ref_rw = WRITE; + sub_mref->ref_data = data; + + sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)output, sub_mref); + CHECK_PTR(sub_mref_a, err); + + sub_mref_a->output = output; + sub_mref_a->wb = wb; + sub_mref_a->base_mref_a = base_mref_a; + base_mref_a = NULL; + + status = GENERIC_INPUT_CALL(sub_input, mref_get, sub_mref); + if (unlikely(status < 0)) { + MARS_FAT("cannot get sub_ref, status = %d\n", status); + goto err; + } + + list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_write_list); + atomic_inc(&wb->w_sub_write_count); + atomic_inc(&output->wb_balance_count); + + this_len = sub_mref->ref_len; + pos += this_len; + len -= this_len; + } + + return wb; + + err: + if (base_mref_a) { + _trans_logger_ref_put(output, base_mref_a->object); + } + if (wb) { + free_writeback(wb); + } + return NULL; +} + +static noinline +void fire_writeback(struct writeback_info *wb, struct list_head *start) +{ + struct trans_logger_output *output = wb->w_output; + struct trans_logger_brick *brick = output->brick; + struct trans_logger_input *sub_input = brick->inputs[0]; + struct list_head *tmp; + + while ((tmp = start->next) != start) { + struct trans_logger_mref_aspect *sub_mref_a; + struct mref_object *sub_mref; + struct generic_callback *cb; + + list_del_init(tmp); + sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head); + sub_mref = sub_mref_a->object; + + cb = &sub_mref_a->cb; + cb->cb_fn = wb_endio; + cb->cb_private = sub_mref_a; + cb->cb_error = 0; + cb->cb_prev = NULL; + sub_mref->ref_cb = cb; + + GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref); + GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref); + } +} + ////////////////////////////// worker thread ////////////////////////////// /********************************************************************* @@ -1087,6 +1258,85 @@ err: * old version from disk somewhen later, e.g. when IO contention is low. */ +#ifdef NEW_CODE + +static noinline +void new_endio(struct generic_callback *cb) +{ + struct trans_logger_mref_aspect *sub_mref_a; + struct trans_logger_output *output; + struct writeback_info *wb; + struct list_head *tmp; + + CHECK_PTR(cb, err); + sub_mref_a = cb->cb_private; + CHECK_PTR(sub_mref_a, err); + output = sub_mref_a->output; + CHECK_PTR(output, err); + wb = sub_mref_a->wb; + CHECK_PTR(wb, err); + + if (unlikely(cb->cb_error < 0)) { + MARS_FAT("IO error %d\n", cb->cb_error); + goto err; + } + + while ((tmp = wb->w_collect_list.next) != &wb->w_collect_list) { + struct trans_logger_mref_aspect *orig_mref_a; + struct mref_object *orig_mref; + + list_del_init(tmp); + orig_mref_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head); + orig_mref = orig_mref_a->object; + + GENERIC_INPUT_CALL(output->brick->inputs[0], mref_put, orig_mref); + } + + return; + +err: + MARS_FAT("hanging up....\n"); +} + +static noinline +bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) +{ + struct mref_object *orig_mref; + struct trans_logger_output *output; + struct writeback_info *wb; + + CHECK_PTR(orig_mref_a, err); + orig_mref = orig_mref_a->object; + CHECK_PTR(orig_mref, err); + output = orig_mref_a->output; + CHECK_PTR(output, err); + + if (orig_mref_a->is_collected) { + MARS_IO("already collected, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len); + goto done; + } + + wb = make_writeback(output, orig_mref->ref_pos, orig_mref->ref_len); + if (!wb) { + goto err; + } + + wb->write_endio = new_endio; + + fire_writeback(wb, &wb->w_sub_write_list); + + done: +#ifdef CLEAN_ALL + _trans_logger_ref_put(output, orig_mref); +#endif + return true; + + err: + return false; +} + +#else // NEW_CODE + static noinline void _phase2_endio(struct trans_logger_mref_aspect *orig_mref_a) { @@ -1263,10 +1513,14 @@ err: return false; } +#endif // NEW_CODE + /********************************************************************* * Phase 3: log the old disk version. */ +#ifndef NEW_CODE + static noinline void _phase3_endio(struct trans_logger_mref_aspect *orig_mref_a) { @@ -1388,10 +1642,14 @@ err: return false; } +#endif // NEW_CODE + /********************************************************************* * Phase 4: overwrite old disk version with new version. */ +#ifndef NEW_CODE + static noinline void _phase4_endio(struct trans_logger_mref_aspect *orig_mref_a) { @@ -1650,6 +1908,7 @@ err: return false; } +#endif // NEW_CODE /********************************************************************* * The logger thread. @@ -1732,8 +1991,10 @@ void trans_logger_log(struct trans_logger_output *output) atomic_read(&output->q_phase1.q_queued) > 0 || #ifdef USE_HIGHER_PHASES q_is_ready(&output->q_phase2) || +#ifndef NEW_CODE q_is_ready(&output->q_phase3) || q_is_ready(&output->q_phase4) || +#endif #endif (kthread_should_stop() && !_congested(output)), wait_timeout); @@ -1788,17 +2049,21 @@ void trans_logger_log(struct trans_logger_output *output) log_jiffies = 0; } #ifdef USE_HIGHER_PHASES +#ifndef NEW_CODE if (q_is_ready(&output->q_phase4)) { (void)run_queue(output, &output->q_phase4, phase4_startio, output->q_phase4.q_batchlen); } +#endif if (q_is_ready(&output->q_phase2)) { (void)run_queue(output, &output->q_phase2, phase2_startio, output->q_phase2.q_batchlen); } +#ifndef NEW_CODE if (q_is_ready(&output->q_phase3)) { status = run_queue(output, &output->q_phase3, phase3_startio, output->q_phase3.q_batchlen); } +#endif #endif if (output->did_pushback) { #if 0 diff --git a/mars_trans_logger.h b/mars_trans_logger.h index aeef8b1c..3e34a0c6 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -54,11 +54,16 @@ struct hash_anchor { }; struct writeback_info { + struct trans_logger_output *w_output; loff_t w_pos; int w_len; struct list_head w_collect_list; // list of collected orig requests struct list_head w_sub_read_list; // for saving the old data before overwrite struct list_head w_sub_write_list; // for overwriting + atomic_t w_sub_read_count; + atomic_t w_sub_write_count; + void (*read_endio)(struct generic_callback *cb); + void (*write_endio)(struct generic_callback *cb); }; struct trans_logger_mref_aspect { @@ -76,12 +81,15 @@ struct trans_logger_mref_aspect { bool do_buffered; bool is_hashed; bool is_dirty; + bool is_collected; bool ignore_this; struct timespec stamp; loff_t log_pos; loff_t fetch_margin; struct generic_callback cb; struct trans_logger_mref_aspect *orig_mref_a; + struct writeback_info *wb; + struct trans_logger_mref_aspect *base_mref_a; struct list_head sub_list; struct list_head sub_head; int total_sub_count; @@ -141,6 +149,7 @@ struct trans_logger_output { atomic_t outer_balance_count; atomic_t inner_balance_count; atomic_t sub_balance_count; + atomic_t wb_balance_count; atomic_t total_read_count; atomic_t total_write_count; atomic_t total_writeback_count;