mirror of
https://github.com/schoebel/mars
synced 2025-02-03 13:51:45 +00:00
logger: fix scarce race on replay EOF
This led to annoying error messages like checksumming mismatches or record sequence number mismatches etc. AFAIK the data integrity was not in danger (since the checks masked out any potentially harmful actions).
This commit is contained in:
parent
80aec4506f
commit
21991f3cf3
@ -35,7 +35,7 @@ void exit_logst(struct log_status *logst)
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(exit_logst);
|
||||
|
||||
void init_logst(struct log_status *logst, struct mars_input *input, loff_t start_pos)
|
||||
void init_logst(struct log_status *logst, struct mars_input *input, loff_t start_pos, loff_t end_pos)
|
||||
{
|
||||
exit_logst(logst);
|
||||
|
||||
@ -43,7 +43,9 @@ void init_logst(struct log_status *logst, struct mars_input *input, loff_t start
|
||||
|
||||
logst->input = input;
|
||||
logst->brick = input->brick;
|
||||
logst->start_pos = start_pos;
|
||||
logst->log_pos = start_pos;
|
||||
logst->end_pos = end_pos;
|
||||
init_waitqueue_head(&logst->event);
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(init_logst);
|
||||
@ -396,6 +398,7 @@ restart:
|
||||
status = 0;
|
||||
mref = logst->read_mref;
|
||||
if (!mref || logst->do_free) {
|
||||
loff_t this_len;
|
||||
if (mref) {
|
||||
GENERIC_INPUT_CALL(logst->input, mref_put, mref);
|
||||
logst->read_mref = NULL;
|
||||
@ -403,13 +406,22 @@ restart:
|
||||
logst->offset = 0;
|
||||
}
|
||||
|
||||
this_len = logst->end_pos - logst->log_pos;
|
||||
if (this_len > logst->chunk_size) {
|
||||
this_len = logst->chunk_size;
|
||||
} else if (unlikely(this_len <= 0)) {
|
||||
MARS_ERR("tried bad IO len %lld, start_pos = %lld log_pos = %lld end_pos = %lld\n", this_len, logst->start_pos, logst->log_pos, logst->end_pos);
|
||||
status = -EOVERFLOW;
|
||||
goto done;
|
||||
}
|
||||
|
||||
mref = mars_alloc_mref(logst->brick);
|
||||
if (unlikely(!mref)) {
|
||||
MARS_ERR("no mref\n");
|
||||
goto done;
|
||||
}
|
||||
mref->ref_pos = logst->log_pos;
|
||||
mref->ref_len = logst->chunk_size;
|
||||
mref->ref_len = this_len;
|
||||
mref->ref_prio = logst->io_prio;
|
||||
|
||||
status = GENERIC_INPUT_CALL(logst->input, mref_get, mref);
|
||||
|
@ -232,6 +232,8 @@ struct log_status {
|
||||
// interfacing
|
||||
wait_queue_head_t *signal_event;
|
||||
// tunables
|
||||
loff_t start_pos;
|
||||
loff_t end_pos;
|
||||
int align_size; // alignment between requests
|
||||
int chunk_size; // must be at least 8K (better 64k)
|
||||
int max_size; // max payload length
|
||||
@ -262,7 +264,7 @@ struct log_status {
|
||||
void *private;
|
||||
};
|
||||
|
||||
void init_logst(struct log_status *logst, struct mars_input *input, loff_t start_pos);
|
||||
void init_logst(struct log_status *logst, struct mars_input *input, loff_t start_pos, loff_t end_pos);
|
||||
void exit_logst(struct log_status *logst);
|
||||
|
||||
void log_flush(struct log_status *logst);
|
||||
|
@ -2361,12 +2361,12 @@ int _do_ranking(struct trans_logger_brick *brick, struct rank_data rkd[])
|
||||
}
|
||||
|
||||
static
|
||||
void _init_input(struct trans_logger_input *input, loff_t start_pos)
|
||||
void _init_input(struct trans_logger_input *input, loff_t start_pos, loff_t end_pos)
|
||||
{
|
||||
struct trans_logger_brick *brick = input->brick;
|
||||
struct log_status *logst = &input->logst;
|
||||
|
||||
init_logst(logst, (void*)input, start_pos);
|
||||
init_logst(logst, (void*)input, start_pos, end_pos);
|
||||
logst->signal_event = &brick->worker_event;
|
||||
logst->align_size = CONF_TRANS_ALIGN;
|
||||
logst->chunk_size = CONF_TRANS_CHUNKSIZE;
|
||||
@ -2374,7 +2374,7 @@ void _init_input(struct trans_logger_input *input, loff_t start_pos)
|
||||
|
||||
|
||||
input->inf.inf_min_pos = start_pos;
|
||||
input->inf.inf_max_pos = start_pos; // ATTENTION: this remains correct as far as our replay code _never_ kicks off any requests in parallel (which is current state of the "art", relying on BBU caching for performance). WHENEVER YOU CHANGE THIS some day, you MUST maintain the correct end_pos here!
|
||||
input->inf.inf_max_pos = end_pos;
|
||||
get_lamport(&input->inf.inf_max_pos_stamp);
|
||||
memcpy(&input->inf.inf_min_pos_stamp, &input->inf.inf_max_pos_stamp, sizeof(input->inf.inf_min_pos_stamp));
|
||||
|
||||
@ -2416,7 +2416,7 @@ void _init_inputs(struct trans_logger_brick *brick, bool is_first)
|
||||
|
||||
down(&input->inf_mutex);
|
||||
|
||||
_init_input(input, 0);
|
||||
_init_input(input, 0, 0);
|
||||
input->inf.inf_is_logging = is_first;
|
||||
|
||||
// from now on, new requests should go to the new input
|
||||
@ -2847,6 +2847,7 @@ void trans_logger_replay(struct trans_logger_brick *brick)
|
||||
struct trans_logger_input *input = brick->inputs[brick->log_input_nr];
|
||||
struct log_header lh = {};
|
||||
loff_t start_pos;
|
||||
loff_t end_pos;
|
||||
loff_t finished_pos = -1;
|
||||
loff_t new_finished_pos = -1;
|
||||
long long old_jiffies = jiffies;
|
||||
@ -2858,17 +2859,18 @@ void trans_logger_replay(struct trans_logger_brick *brick)
|
||||
brick->disk_io_error = 0;
|
||||
|
||||
start_pos = brick->replay_start_pos;
|
||||
end_pos = brick->replay_end_pos;
|
||||
brick->replay_current_pos = start_pos;
|
||||
|
||||
_init_input(input, start_pos);
|
||||
_init_input(input, start_pos, end_pos);
|
||||
|
||||
input->inf.inf_min_pos = start_pos;
|
||||
input->inf.inf_max_pos = brick->replay_end_pos;
|
||||
input->inf.inf_log_pos = brick->replay_end_pos;
|
||||
input->inf.inf_max_pos = end_pos;
|
||||
input->inf.inf_log_pos = end_pos;
|
||||
input->inf.inf_is_replaying = true;
|
||||
input->inf.inf_is_logging = false;
|
||||
|
||||
MARS_INF("starting replay from %lld to %lld\n", start_pos, brick->replay_end_pos);
|
||||
MARS_INF("starting replay from %lld to %lld\n", start_pos, end_pos);
|
||||
|
||||
mars_power_led_on((void*)brick, true);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user