fix trans_logger writeback (timestamp ordering)

This commit is contained in:
Thomas Schoebel-Theuer 2012-02-07 09:22:51 +01:00 committed by Thomas Schoebel-Theuer
parent cb3dbc0e74
commit a596ee5a4d
1 changed files with 24 additions and 8 deletions

View File

@ -179,7 +179,7 @@ int hash_fn(loff_t pos)
}
static inline
struct trans_logger_mref_aspect *_hash_find(struct list_head *start, loff_t pos, int *max_len, bool use_collect_head)
struct trans_logger_mref_aspect *_hash_find(struct list_head *start, loff_t pos, int *max_len, struct timespec *elder, bool use_collect_head)
{
struct list_head *tmp;
struct trans_logger_mref_aspect *res = NULL;
@ -214,6 +214,11 @@ struct trans_logger_mref_aspect *_hash_find(struct list_head *start, loff_t pos,
CHECK_ATOMIC(&test->ref_count, 1);
// timestamp handling
if (elder && timespec_compare(&test_a->stamp, elder) > 0) {
continue; // not relevant
}
// are the regions overlapping?
if (pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) {
continue; // not relevant
@ -249,7 +254,7 @@ struct trans_logger_mref_aspect *hash_find(struct trans_logger_brick *brick, lof
//traced_readlock(&start->hash_lock, flags);
down_read(&start->hash_mutex);
res = _hash_find(&start->hash_anchor, pos, max_len, false);
res = _hash_find(&start->hash_anchor, pos, max_len, NULL, false);
//traced_readunlock(&start->hash_lock, flags);
up_read(&start->hash_mutex);
@ -286,7 +291,7 @@ void hash_insert(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
* and collect them into a list.
*/
static noinline
void hash_extend(struct trans_logger_brick *brick, loff_t *_pos, int *_len, struct list_head *collect_list)
void hash_extend(struct trans_logger_brick *brick, loff_t *_pos, int *_len, struct timespec *elder, struct list_head *collect_list)
{
loff_t pos = *_pos;
int len = *_len;
@ -322,6 +327,11 @@ void hash_extend(struct trans_logger_brick *brick, loff_t *_pos, int *_len, stru
CHECK_ATOMIC(&test->ref_count, 1);
// timestamp handling
if (elder && timespec_compare(&test_a->stamp, elder) > 0) {
continue; // not relevant
}
// are the regions overlapping?
if (test_a->is_collected || pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) {
continue; // not relevant
@ -368,6 +378,11 @@ void hash_extend(struct trans_logger_brick *brick, loff_t *_pos, int *_len, stru
test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head);
test = test_a->object;
// timestamp handling
if (elder && timespec_compare(&test_a->stamp, elder) > 0) {
continue; // not relevant
}
// are the regions overlapping?
if (test_a->is_collected || pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) {
continue; // not relevant
@ -561,7 +576,6 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
mref->ref_flags = 0;
mref_a->shadow_ref = mref_a; // cyclic self-reference => indicates master shadow
get_lamport(&mref_a->stamp);
#if 1
if (unlikely(mref->ref_len <= 0)) {
MARS_ERR("oops, len = %d\n", mref->ref_len);
@ -605,6 +619,8 @@ int trans_logger_ref_get(struct trans_logger_output *output, struct mref_object
CHECK_PTR(mref_a, err);
CHECK_PTR(mref_a->object, err);
get_lamport(&mref_a->stamp);
// ensure that REGION_SIZE boundaries are obeyed by hashing
base_offset = mref->ref_pos & (loff_t)(REGION_SIZE - 1);
if (mref->ref_len > REGION_SIZE - base_offset) {
@ -983,7 +999,7 @@ err:
* point in time.
*/
static noinline
struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t pos, int len, struct trans_logger_input *log_input)
struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t pos, int len, struct timespec *elder, struct trans_logger_input *log_input)
{
struct writeback_info *wb;
struct trans_logger_input *read_input;
@ -1012,7 +1028,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
/* Atomically fetch transitive closure on all requests
* overlapping with the current search region.
*/
hash_extend(brick, &wb->w_pos, &wb->w_len, &wb->w_collect_list);
hash_extend(brick, &wb->w_pos, &wb->w_len, elder, &wb->w_collect_list);
pos = wb->w_pos;
len = wb->w_len;
@ -1092,7 +1108,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
int diff;
int status;
orig_mref_a = _hash_find(&wb->w_collect_list, pos, &this_len, true);
orig_mref_a = _hash_find(&wb->w_collect_list, pos, &this_len, elder, true);
if (unlikely(!orig_mref_a)) {
MARS_FAT("could not find data\n");
goto err;
@ -1540,7 +1556,7 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
MARS_IO("AHA not hashed, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len);
goto done;
}
wb = make_writeback(brick, orig_mref->ref_pos, orig_mref->ref_len, orig_mref_a->log_input);
wb = make_writeback(brick, orig_mref->ref_pos, orig_mref->ref_len, &orig_mref_a->stamp, orig_mref_a->log_input);
if (unlikely(!wb)) {
goto err;
}