limit memory consumtion in logst

This commit is contained in:
Thomas Schoebel-Theuer 2011-11-14 15:21:15 +01:00 committed by Thomas Schoebel-Theuer
parent 6089a77d84
commit eb8c9ddac4
3 changed files with 26 additions and 20 deletions

View File

@ -74,6 +74,10 @@ void log_write_endio(struct generic_callback *cb)
mars_log_trace(cb_info->mref);
}
logst = cb_info->logst;
CHECK_PTR(logst, done);
atomic_dec(&logst->mref_flying);
MARS_IO("nr_cb = %d\n", cb_info->nr_cb);
down(&cb_info->mutex);
@ -85,9 +89,7 @@ void log_write_endio(struct generic_callback *cb)
}
cb_info->nr_cb = 0; // prevent late preio() callbacks
up(&cb_info->mutex);
logst = cb_info->logst;
CHECK_PTR(logst, done);
atomic_dec(&logst->mref_flying);
done:
put_log_cb_info(cb_info);
return;
@ -210,11 +212,6 @@ void *log_reserve(struct log_status *logst, struct log_header *lh)
mref->ref_len = chunk_rest;
}
while (!is_log_ready(logst)) {
MARS_DBG("this should not happen! ensure that is_log_ready() is called beforhand.\n");
msleep(10000); // punishment delay
}
for (;;) {
status = GENERIC_INPUT_CALL(logst->input, mref_get, mref);
if (likely(status >= 0)) {

View File

@ -14,6 +14,7 @@
#define KEEP_UNIQUE
//#define WB_COPY
#define LATER
#define DELAY_CALLERS // this is _needed_
// commenting this out is dangerous for data integrity! use only for testing!
#define USE_MEMCPY
@ -536,8 +537,10 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
}
#endif
#ifdef DELAY_CALLERS
// delay in case of too many master shadows / memory shortage
wait_event_interruptible_timeout(brick->caller_event, !brick->delay_callers, HZ / 2);
#endif
// create a new master shadow
data = brick_block_alloc(mref->ref_pos, (mref_a->alloc_len = mref->ref_len));
@ -950,7 +953,7 @@ void wb_endio(struct generic_callback *cb)
dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count;
CHECK_ATOMIC(dec, 1);
if (!atomic_dec_and_test(dec)) {
return;
goto done;
}
_endio = rw ? &wb->write_endio : &wb->read_endio;
@ -961,6 +964,7 @@ void wb_endio(struct generic_callback *cb)
} else {
MARS_ERR("internal: no endio defined\n");
}
done:
wake_up_interruptible_all(&brick->worker_event);
return;
@ -1884,6 +1888,7 @@ struct condition_status {
bool q3_ready;
bool q4_ready;
bool extra_ready;
bool some_ready;
};
static noinline
@ -1895,7 +1900,7 @@ bool _condition(struct condition_status *st, struct trans_logger_brick *brick)
st->q3_ready = qq_is_ready(&brick->q_phase3);
st->q4_ready = qq_is_ready(&brick->q_phase4);
st->extra_ready = (kthread_should_stop() && !_congested(brick));
return st->q1_ready | st->q2_ready | st->q3_ready | st->q4_ready | st->extra_ready;
return (st->some_ready = st->q1_ready | st->q2_ready | st->q3_ready | st->q4_ready | st->extra_ready);
}
static
@ -1979,9 +1984,11 @@ void _exit_inputs(struct trans_logger_brick *brick, bool force)
static noinline
void trans_logger_log(struct trans_logger_brick *brick)
{
#ifdef DELAY_CALLERS
bool unlimited = false;
bool old_unlimited = false;
bool delay_callers;
#endif
long wait_timeout = HZ;
#ifdef STAT_DEBUGGING
long long last_jiffies = jiffies;
@ -2005,7 +2012,6 @@ void trans_logger_log(struct trans_logger_brick *brick)
long long j2;
long long j3;
long long j4;
bool orig;
#endif
MARS_IO("waiting for request\n");
@ -2021,7 +2027,6 @@ void trans_logger_log(struct trans_logger_brick *brick)
#if 1
j0 = jiffies;
orig = st.q1_ready | st.q2_ready | st.q3_ready | st.q4_ready | st.extra_ready;
#endif
//MARS_DBG("AHA %d\n", atomic_read(&brick->q_phase1.q_queued));
@ -2042,7 +2047,9 @@ void trans_logger_log(struct trans_logger_brick *brick)
/* This is highest priority, do it first.
*/
run_mref_queue(&brick->q_phase1, phase0_startio, brick->q_phase1.q_batchlen);
if (st.q1_ready) {
run_mref_queue(&brick->q_phase1, phase0_startio, brick->q_phase1.q_batchlen);
}
j1 = jiffies;
/* In order to speed up draining, check the other queues
@ -2106,7 +2113,7 @@ void trans_logger_log(struct trans_logger_brick *brick)
}
}
if (orig && !brick->did_work) {
if (st.some_ready && !brick->did_work) {
char *txt;
txt = brick->ops->brick_statistics(brick, 0);
MARS_WRN("inconsistent work, pushback = %d q1 = %d q2 = %d q3 = %d q4 = %d extra = %d ====> %s\n", brick->did_pushback, st.q1_ready, st.q2_ready, st.q3_ready, st.q4_ready, st.extra_ready, txt ? txt : "(ERROR)");
@ -2115,7 +2122,7 @@ void trans_logger_log(struct trans_logger_brick *brick)
}
}
#endif
#if 1 // provisionary flood handling FIXME: do better
#ifdef DELAY_CALLERS // provisionary flood handling FIXME: do better
#define LIMIT_FN(factor,divider) \
(atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit * (factor) / (divider) && brick->shadow_mem_limit > 16) || \
(atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit * (factor) / (divider) && brick_global_memlimit > PAGE_SIZE * 16)

View File

@ -67,8 +67,8 @@ struct light_class {
#define CONF_TRANS_SHADOW_LIMIT (65536 * 1) // don't fill the hashtable too much
#define CONF_TRANS_CHUNKSIZE (128 * 1024)
#define CONF_TRANS_MAX_MREF_SIZE 0
//#define CONF_TRANS_MAX_MREF_SIZE PAGE_SIZE
//#define CONF_TRANS_MAX_MREF_SIZE 0
#define CONF_TRANS_MAX_MREF_SIZE PAGE_SIZE
//#define CONF_TRANS_ALIGN 512
#define CONF_TRANS_ALIGN 0
//#define FLUSH_DELAY (HZ / 100 + 1)
@ -77,9 +77,10 @@ struct light_class {
//#define TRANS_FAKE
#define CONF_TRANS_BATCHLEN 1024
//#define CONF_TRANS_FLYING 4
//#define CONF_TRANS_FLYING 128
#define CONF_TRANS_FLYING 16
//#define CONF_LOGST_FLYING 0
#define CONF_LOGST_FLYING 16
//#define CONF_TRANS_FLYING 16
#define CONF_TRANS_FLYING 0
#define CONF_TRANS_PRIO MARS_PRIO_HIGH
#define CONF_TRANS_LOG_READS false
//#define CONF_TRANS_LOG_READS true
@ -180,6 +181,7 @@ int _set_trans_params(struct mars_brick *_brick, void *private)
trans_brick->align_size = CONF_TRANS_ALIGN;
trans_brick->chunk_size = CONF_TRANS_CHUNKSIZE;
trans_brick->flush_delay = FLUSH_DELAY;
trans_brick->max_flying = CONF_LOGST_FLYING;
if (!trans_brick->log_reads) {
trans_brick->q_phase2.q_max_queued = 0;