trans_logger: fix emergency mode (cease_logging)

This commit is contained in:
Thomas Schoebel-Theuer 2013-04-22 09:06:27 +02:00
parent 0c7bb9d00f
commit 1202f2ae8e
4 changed files with 51 additions and 18 deletions

View File

@ -80,6 +80,9 @@ EXPORT_SYMBOL_GPL(trans_logger_mem_usage);
int trans_logger_max_interleave = -1;
EXPORT_SYMBOL_GPL(trans_logger_max_interleave);
int trans_logger_resume = 0;
EXPORT_SYMBOL_GPL(trans_logger_resume);
struct writeback_group global_writeback = {
.lock = __RW_LOCK_UNLOCKED(global_writeback.lock),
.group_anchor = LIST_HEAD_INIT(global_writeback.group_anchor),
@ -575,6 +578,19 @@ void _inf_callback(struct trans_logger_input *input, bool force)
}
}
static inline
int _congested(struct trans_logger_brick *brick)
{
return atomic_read(&brick->q_phase[0].q_queued)
|| atomic_read(&brick->q_phase[0].q_flying)
|| atomic_read(&brick->q_phase[1].q_queued)
|| atomic_read(&brick->q_phase[1].q_flying)
|| atomic_read(&brick->q_phase[2].q_queued)
|| atomic_read(&brick->q_phase[2].q_flying)
|| atomic_read(&brick->q_phase[3].q_queued)
|| atomic_read(&brick->q_phase[3].q_flying);
}
////////////////// own brick / input / output operations //////////////////
atomic_t global_mshadow_count = ATOMIC_INIT(0);
@ -773,7 +789,19 @@ int trans_logger_ref_get(struct trans_logger_output *output, struct mref_object
mref->ref_len = REGION_SIZE - base_offset;
}
if (mref->ref_may_write == READ || unlikely(brick->cease_logging)) {
if (mref->ref_may_write == READ) {
return _read_ref_get(output, mref_a);
}
if (unlikely(brick->stopped_logging)) { // only in EMERGENCY mode
/* Wait until writeback has finished.
* We have to this because writeback is out-of-order.
* Otherwise consistency could be violated for some time.
*/
while (_congested(brick)) {
// in case of emergency, busy-wait should be acceptable
brick_msleep(HZ / 10);
}
return _read_ref_get(output, mref_a);
}
@ -973,7 +1001,7 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
}
// only READ is allowed on non-shadow buffers
if (unlikely(mref->ref_rw != READ && !brick->cease_logging)) {
if (unlikely(mref->ref_rw != READ)) {
MARS_FAT("bad operation %d on non-shadow\n", mref->ref_rw);
}
@ -2074,19 +2102,6 @@ done:
return res;
}
static inline
int _congested(struct trans_logger_brick *brick)
{
return atomic_read(&brick->q_phase[0].q_queued)
|| atomic_read(&brick->q_phase[0].q_flying)
|| atomic_read(&brick->q_phase[1].q_queued)
|| atomic_read(&brick->q_phase[1].q_flying)
|| atomic_read(&brick->q_phase[2].q_queued)
|| atomic_read(&brick->q_phase[2].q_flying)
|| atomic_read(&brick->q_phase[3].q_queued)
|| atomic_read(&brick->q_phase[3].q_flying);
}
/* Ranking tables.
*/
static
@ -2560,6 +2575,12 @@ void trans_logger_log(struct trans_logger_brick *brick)
atomic_inc(&brick->total_round_count);
if (brick->cease_logging) {
brick->stopped_logging = true;
} else if (brick->stopped_logging && !_congested(brick)) {
brick->stopped_logging = false;
}
_init_inputs(brick, false);
switch (winner) {

View File

@ -24,6 +24,7 @@ extern int trans_logger_completion_semantics;
extern int trans_logger_do_crc;
extern int trans_logger_mem_usage; // in KB
extern int trans_logger_max_interleave;
extern int trans_logger_resume;
extern atomic_t global_mshadow_count;
extern atomic64_t global_mshadow_used;
@ -152,6 +153,7 @@ struct trans_logger_brick {
int log_input_nr; // where we are currently logging to
int old_input_nr; // where old IO requests may be on the fly
int replay_code; // replay errors (if any)
bool stopped_logging; // direct IO without logging (only in case of EMERGENCY)
// private
struct trans_logger_hash_anchor **hash_table;
struct list_head group_head;

View File

@ -271,6 +271,7 @@ struct mars_rotate {
bool is_primary;
bool old_is_primary;
bool copy_is_done;
bool created_hole;
spinlock_t inf_lock;
bool infs_is_dirty[MAX_INFOS];
struct trans_logger_info infs[MAX_INFOS];
@ -2805,14 +2806,21 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
*/
if (IS_JAMMED()) {
//brick_say_logging = 0;
MARS_ERR_TO(rot->log_say, "DISK SPACE IS EXTREMELY LOW on %s\n", rot->parent_path);
if (rot->todo_primary || rot->is_primary) {
trans_brick->cease_logging = true;
rot->inf_prev_sequence = 0; // disable checking
}
} else if (!rot->todo_primary && !rot->is_primary) {
trans_brick->cease_logging = false;
} else if ((trans_brick->cease_logging | trans_brick->stopped_logging) && rot->created_hole && !IS_EXHAUSTED()) {
if (!trans_logger_resume) {
MARS_INF_TO(rot->log_say, "emergency mode on %s could be turned off now, but /proc/sys/mars/logger_resume inhibits it.\n", rot->parent_path);
} else {
trans_brick->cease_logging = false;
rot->created_hole = false;
MARS_INF_TO(rot->log_say, "emergency mode on %s will be turned off again\n", rot->parent_path);
}
}
if (trans_brick->cease_logging) {
if (trans_brick->cease_logging | trans_brick->stopped_logging) {
MARS_ERR_TO(rot->log_say, "EMERGENCY MODE on %s: stopped transaction logging, and created a hole in the logfile sequence nubers.\n", rot->parent_path);
/* Create a hole in the sequence of logfile numbers.
* The secondaries will later stumble over it.
@ -2822,6 +2830,7 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
if (likely(new_path && !mars_find_dent(global, new_path))) {
MARS_INF_TO(rot->log_say, "EMERGENCY: creating new logfile '%s'\n", new_path);
_create_new_logfile(new_path);
rot->created_hole = true;
}
brick_string_free(new_path);
}

View File

@ -214,6 +214,7 @@ ctl_table mars_table[] = {
INT_ENTRY("delay_say_on_overflow",delay_say_on_overflow, 0600),
INT_ENTRY("mapfree_period_sec", mapfree_period_sec, 0600),
INT_ENTRY("logger_max_interleave", trans_logger_max_interleave, 0600),
INT_ENTRY("logger_resume", trans_logger_resume, 0600),
INT_ENTRY("mem_limit_percent", mars_mem_percent, 0600),
INT_ENTRY("logger_mem_used_kb", trans_logger_mem_usage, 0400),
INT_ENTRY("mem_used_raw_kb", brick_global_block_used,0400),