diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 3d2d835e..58b6aeea 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -974,9 +974,12 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a) // am I the first member? (means "youngest" list entry) if (tmp == log_input->pos_list.next) { MARS_IO("first_finished = %lld\n", finished); - if (finished <= log_input->inf.inf_min_pos) { + if (unlikely(finished <= log_input->inf.inf_min_pos)) { MARS_ERR("backskip in log writeback: %lld -> %lld\n", log_input->inf.inf_min_pos, finished); } + if (unlikely(finished > log_input->inf.inf_max_pos)) { + MARS_ERR("min_pos > max_pos: %lld > %lld\n", finished, log_input->inf.inf_max_pos); + } log_input->inf.inf_min_pos = finished; get_lamport(&log_input->inf.inf_min_pos_stamp); _inf_callback(log_input, false); @@ -1078,6 +1081,7 @@ void wb_endio(struct generic_callback *cb) } else { MARS_ERR("internal: no endio defined\n"); } + done: wake_up_interruptible_all(&brick->worker_event); return; @@ -1093,7 +1097,7 @@ err: * point in time. */ static noinline -struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t pos, int len, struct timespec *elder, struct trans_logger_input *log_input) +struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t pos, int len, struct timespec *elder) { struct writeback_info *wb; struct trans_logger_input *read_input; @@ -1147,6 +1151,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p while (len > 0) { struct trans_logger_mref_aspect *sub_mref_a; struct mref_object *sub_mref; + struct trans_logger_input *log_input; int this_len; int status; @@ -1167,7 +1172,9 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p CHECK_ASPECT(sub_mref_a, sub_mref, err); sub_mref_a->my_input = read_input; + log_input = brick->inputs[brick->log_input_nr]; sub_mref_a->log_input = log_input; + atomic_inc(&log_input->log_ref_count); sub_mref_a->my_brick = brick; sub_mref_a->orig_rw = READ; sub_mref_a->wb = wb; @@ -1199,6 +1206,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p struct mref_object *sub_mref; struct trans_logger_mref_aspect *orig_mref_a; struct mref_object *orig_mref; + struct trans_logger_input *log_input; void *data; int this_len = len; int diff; @@ -1238,7 +1246,9 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p sub_mref_a->orig_mref_a = orig_mref_a; sub_mref_a->my_input = write_input; + log_input = orig_mref_a->log_input; sub_mref_a->log_input = log_input; + atomic_inc(&log_input->log_ref_count); sub_mref_a->my_brick = brick; sub_mref_a->orig_rw = WRITE; sub_mref_a->wb = wb; @@ -1275,7 +1285,6 @@ void _fire_one(struct list_head *tmp, bool do_update) struct trans_logger_mref_aspect *sub_mref_a; struct mref_object *sub_mref; struct trans_logger_input *sub_input; - struct trans_logger_input *log_input; sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head); sub_mref = sub_mref_a->object; @@ -1288,25 +1297,33 @@ void _fire_one(struct list_head *tmp, bool do_update) SETUP_CALLBACK(sub_mref, wb_endio, sub_mref_a); - sub_input = sub_mref_a->my_input; - log_input = sub_mref_a->log_input; - if (do_update) { struct trans_logger_mref_aspect *orig_mref_a = sub_mref_a->orig_mref_a; if (unlikely(!orig_mref_a)) { MARS_ERR("internal problem\n"); } else { loff_t max_pos = orig_mref_a->log_pos; - down(&log_input->inf_mutex); - if (log_input->inf.inf_max_pos < max_pos) { - log_input->inf.inf_max_pos = max_pos; - get_lamport(&log_input->inf.inf_max_pos_stamp); - _inf_callback(log_input, false); + struct trans_logger_input *log_input; + log_input = orig_mref_a->log_input; + if (unlikely(!log_input)) { + MARS_ERR("internal problem\n"); + } else { + down(&log_input->inf_mutex); + if (unlikely(max_pos < log_input->inf.inf_min_pos)) { + MARS_ERR("new max_pos < min_pos: %lld < %lld\n", max_pos, log_input->inf.inf_min_pos); + } + if (log_input->inf.inf_max_pos < max_pos) { + log_input->inf.inf_max_pos = max_pos; + get_lamport(&log_input->inf.inf_max_pos_stamp); + _inf_callback(log_input, false); + } + up(&log_input->inf_mutex); } - up(&log_input->inf_mutex); } } + sub_input = sub_mref_a->my_input; + #ifdef DO_WRITEBACK GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref); #else @@ -1453,9 +1470,8 @@ bool phase0_startio(struct trans_logger_mref_aspect *orig_mref_a) CHECK_PTR(orig_mref, err); brick = orig_mref_a->my_brick; CHECK_PTR(brick, err); - input = brick->inputs[brick->log_input_nr]; + input = orig_mref_a->log_input; CHECK_PTR(input, err); - orig_mref_a->log_input = input; logst = &input->logst; logst->do_crc = trans_logger_do_crc; @@ -1576,7 +1592,11 @@ bool prep_phase_startio(struct trans_logger_mref_aspect *mref_a) } #endif if (likely(!mref_a->is_hashed)) { + struct trans_logger_input *log_input; + log_input = brick->inputs[brick->log_input_nr]; MARS_IO("hashing %d at %lld\n", mref->ref_len, mref->ref_pos); + mref_a->log_input = log_input; + atomic_inc(&log_input->log_ref_count); hash_insert(brick, mref_a); } else { MARS_ERR("tried to hash twice\n"); @@ -1656,7 +1676,7 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a) goto done; } - wb = make_writeback(brick, orig_mref->ref_pos, orig_mref->ref_len, &orig_mref_a->stamp, orig_mref_a->log_input); + wb = make_writeback(brick, orig_mref->ref_pos, orig_mref->ref_len, &orig_mref_a->stamp); if (unlikely(!wb)) { MARS_ERR("no mem\n"); goto err; @@ -2872,6 +2892,8 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose) "hash_count=%d " "pos_count=%d " "balance=%d/%d/%d/%d " + "log_refs1=%d " + "log_refs2=%d " "fly=%d " "mref_flying1=%d " "mref_flying2=%d " @@ -2932,6 +2954,8 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose) atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), + atomic_read(&brick->inputs[TL_INPUT_LOG1]->log_ref_count), + atomic_read(&brick->inputs[TL_INPUT_LOG2]->log_ref_count), atomic_read(&brick->fly_count), atomic_read(&brick->inputs[TL_INPUT_LOG1]->logst.mref_flying), atomic_read(&brick->inputs[TL_INPUT_LOG2]->logst.mref_flying), @@ -3007,6 +3031,9 @@ void trans_logger_mref_aspect_exit_fn(struct generic_aspect *_ini) CHECK_HEAD_EMPTY(&ini->collect_head); CHECK_HEAD_EMPTY(&ini->sub_list); CHECK_HEAD_EMPTY(&ini->sub_head); + if (ini->log_input) { + atomic_dec(&ini->log_input->log_ref_count); + } } MARS_MAKE_STATICS(trans_logger); diff --git a/mars_trans_logger.h b/mars_trans_logger.h index 701cb5d5..6118a5ee 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -233,6 +233,7 @@ struct trans_logger_input { // informational struct trans_logger_info inf; // readonly from outside + atomic_t log_ref_count; bool is_operating; long long last_jiffies; diff --git a/sy_old/mars_light.c b/sy_old/mars_light.c index 7380ee65..4a61794d 100644 --- a/sy_old/mars_light.c +++ b/sy_old/mars_light.c @@ -2175,11 +2175,15 @@ void _rotate_trans(struct mars_rotate *rot) // try to cleanup old log if (log_nr != old_nr) { struct trans_logger_input *trans_input = trans_brick->inputs[old_nr]; + struct trans_logger_input *new_input = trans_brick->inputs[log_nr]; if (!trans_input->connect) { - MARS_DBG("ignoring unused input %d\n", old_nr); + MARS_DBG("ignoring unused old input %d\n", old_nr); + } else if (!new_input->is_operating) { + MARS_DBG("ignoring uninitialized new input %d\n", log_nr); } else if (trans_input->is_operating && trans_input->inf.inf_min_pos == trans_input->inf.inf_max_pos && - list_empty(&trans_input->pos_list)) { + list_empty(&trans_input->pos_list) && + atomic_read(&trans_input->log_ref_count) <= 0) { int status; MARS_INF("cleanup old transaction log (%d -> %d)\n", old_nr, log_nr); status = generic_disconnect((void*)trans_input);