diff --git a/mars_trans_logger.c b/mars_trans_logger.c index a4a34b14..2ea47b0d 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -659,31 +659,35 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_ static noinline int trans_logger_ref_get(struct trans_logger_output *output, struct mref_object *mref) { - struct trans_logger_brick *brick = output->brick; + struct trans_logger_brick *brick; struct trans_logger_mref_aspect *mref_a; loff_t base_offset; CHECK_PTR(output, err); + brick = output->brick; + CHECK_PTR(brick, err); + CHECK_PTR(mref, err); MARS_IO("pos = %lld len = %d\n", mref->ref_pos, mref->ref_len); - if (mref->ref_len > brick->max_mref_size && brick->max_mref_size > 0) - mref->ref_len = brick->max_mref_size; + mref_a = trans_logger_mref_get_aspect(brick, mref); + CHECK_PTR(mref_a, err); + CHECK_ASPECT(mref_a, mref, err); atomic_inc(&brick->outer_balance_count); - if (atomic_read(&mref->ref_count) > 0) { // setup already performed + if (mref_a->stamp.tv_sec) { // setup already performed MARS_IO("again %d\n", atomic_read(&mref->ref_count)); - atomic_inc(&mref->ref_count); + CHECK_ATOMIC(&mref->ref_count, 1); + atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put() return mref->ref_len; } - mref_a = trans_logger_mref_get_aspect(brick, mref); - CHECK_PTR(mref_a, err); - CHECK_PTR(mref_a->object, err); - get_lamport(&mref_a->stamp); + if (mref->ref_len > brick->max_mref_size && brick->max_mref_size > 0) + mref->ref_len = brick->max_mref_size; + // ensure that REGION_SIZE boundaries are obeyed by hashing base_offset = mref->ref_pos & (loff_t)(REGION_SIZE - 1); if (mref->ref_len > REGION_SIZE - base_offset) {