diff --git a/lib_log.c b/lib_log.c index e3087e87..180b9acf 100644 --- a/lib_log.c +++ b/lib_log.c @@ -74,6 +74,10 @@ void log_write_endio(struct generic_callback *cb) mars_log_trace(cb_info->mref); } + logst = cb_info->logst; + CHECK_PTR(logst, done); + atomic_dec(&logst->mref_flying); + MARS_IO("nr_cb = %d\n", cb_info->nr_cb); down(&cb_info->mutex); @@ -85,9 +89,7 @@ void log_write_endio(struct generic_callback *cb) } cb_info->nr_cb = 0; // prevent late preio() callbacks up(&cb_info->mutex); - logst = cb_info->logst; - CHECK_PTR(logst, done); - atomic_dec(&logst->mref_flying); + done: put_log_cb_info(cb_info); return; @@ -210,11 +212,6 @@ void *log_reserve(struct log_status *logst, struct log_header *lh) mref->ref_len = chunk_rest; } - while (!is_log_ready(logst)) { - MARS_DBG("this should not happen! ensure that is_log_ready() is called beforhand.\n"); - msleep(10000); // punishment delay - } - for (;;) { status = GENERIC_INPUT_CALL(logst->input, mref_get, mref); if (likely(status >= 0)) { diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 706eab32..ea22229b 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -14,6 +14,7 @@ #define KEEP_UNIQUE //#define WB_COPY #define LATER +#define DELAY_CALLERS // this is _needed_ // commenting this out is dangerous for data integrity! use only for testing! #define USE_MEMCPY @@ -536,8 +537,10 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_ } #endif +#ifdef DELAY_CALLERS // delay in case of too many master shadows / memory shortage wait_event_interruptible_timeout(brick->caller_event, !brick->delay_callers, HZ / 2); +#endif // create a new master shadow data = brick_block_alloc(mref->ref_pos, (mref_a->alloc_len = mref->ref_len)); @@ -950,7 +953,7 @@ void wb_endio(struct generic_callback *cb) dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count; CHECK_ATOMIC(dec, 1); if (!atomic_dec_and_test(dec)) { - return; + goto done; } _endio = rw ? &wb->write_endio : &wb->read_endio; @@ -961,6 +964,7 @@ void wb_endio(struct generic_callback *cb) } else { MARS_ERR("internal: no endio defined\n"); } +done: wake_up_interruptible_all(&brick->worker_event); return; @@ -1884,6 +1888,7 @@ struct condition_status { bool q3_ready; bool q4_ready; bool extra_ready; + bool some_ready; }; static noinline @@ -1895,7 +1900,7 @@ bool _condition(struct condition_status *st, struct trans_logger_brick *brick) st->q3_ready = qq_is_ready(&brick->q_phase3); st->q4_ready = qq_is_ready(&brick->q_phase4); st->extra_ready = (kthread_should_stop() && !_congested(brick)); - return st->q1_ready | st->q2_ready | st->q3_ready | st->q4_ready | st->extra_ready; + return (st->some_ready = st->q1_ready | st->q2_ready | st->q3_ready | st->q4_ready | st->extra_ready); } static @@ -1979,9 +1984,11 @@ void _exit_inputs(struct trans_logger_brick *brick, bool force) static noinline void trans_logger_log(struct trans_logger_brick *brick) { +#ifdef DELAY_CALLERS bool unlimited = false; bool old_unlimited = false; bool delay_callers; +#endif long wait_timeout = HZ; #ifdef STAT_DEBUGGING long long last_jiffies = jiffies; @@ -2005,7 +2012,6 @@ void trans_logger_log(struct trans_logger_brick *brick) long long j2; long long j3; long long j4; - bool orig; #endif MARS_IO("waiting for request\n"); @@ -2021,7 +2027,6 @@ void trans_logger_log(struct trans_logger_brick *brick) #if 1 j0 = jiffies; - orig = st.q1_ready | st.q2_ready | st.q3_ready | st.q4_ready | st.extra_ready; #endif //MARS_DBG("AHA %d\n", atomic_read(&brick->q_phase1.q_queued)); @@ -2042,7 +2047,9 @@ void trans_logger_log(struct trans_logger_brick *brick) /* This is highest priority, do it first. */ - run_mref_queue(&brick->q_phase1, phase0_startio, brick->q_phase1.q_batchlen); + if (st.q1_ready) { + run_mref_queue(&brick->q_phase1, phase0_startio, brick->q_phase1.q_batchlen); + } j1 = jiffies; /* In order to speed up draining, check the other queues @@ -2106,7 +2113,7 @@ void trans_logger_log(struct trans_logger_brick *brick) } } - if (orig && !brick->did_work) { + if (st.some_ready && !brick->did_work) { char *txt; txt = brick->ops->brick_statistics(brick, 0); MARS_WRN("inconsistent work, pushback = %d q1 = %d q2 = %d q3 = %d q4 = %d extra = %d ====> %s\n", brick->did_pushback, st.q1_ready, st.q2_ready, st.q3_ready, st.q4_ready, st.extra_ready, txt ? txt : "(ERROR)"); @@ -2115,7 +2122,7 @@ void trans_logger_log(struct trans_logger_brick *brick) } } #endif -#if 1 // provisionary flood handling FIXME: do better +#ifdef DELAY_CALLERS // provisionary flood handling FIXME: do better #define LIMIT_FN(factor,divider) \ (atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit * (factor) / (divider) && brick->shadow_mem_limit > 16) || \ (atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit * (factor) / (divider) && brick_global_memlimit > PAGE_SIZE * 16) diff --git a/sy_old/mars_light.c b/sy_old/mars_light.c index a46e0442..4dbbc43f 100644 --- a/sy_old/mars_light.c +++ b/sy_old/mars_light.c @@ -67,8 +67,8 @@ struct light_class { #define CONF_TRANS_SHADOW_LIMIT (65536 * 1) // don't fill the hashtable too much #define CONF_TRANS_CHUNKSIZE (128 * 1024) -#define CONF_TRANS_MAX_MREF_SIZE 0 -//#define CONF_TRANS_MAX_MREF_SIZE PAGE_SIZE +//#define CONF_TRANS_MAX_MREF_SIZE 0 +#define CONF_TRANS_MAX_MREF_SIZE PAGE_SIZE //#define CONF_TRANS_ALIGN 512 #define CONF_TRANS_ALIGN 0 //#define FLUSH_DELAY (HZ / 100 + 1) @@ -77,9 +77,10 @@ struct light_class { //#define TRANS_FAKE #define CONF_TRANS_BATCHLEN 1024 -//#define CONF_TRANS_FLYING 4 -//#define CONF_TRANS_FLYING 128 -#define CONF_TRANS_FLYING 16 +//#define CONF_LOGST_FLYING 0 +#define CONF_LOGST_FLYING 16 +//#define CONF_TRANS_FLYING 16 +#define CONF_TRANS_FLYING 0 #define CONF_TRANS_PRIO MARS_PRIO_HIGH #define CONF_TRANS_LOG_READS false //#define CONF_TRANS_LOG_READS true @@ -180,6 +181,7 @@ int _set_trans_params(struct mars_brick *_brick, void *private) trans_brick->align_size = CONF_TRANS_ALIGN; trans_brick->chunk_size = CONF_TRANS_CHUNKSIZE; trans_brick->flush_delay = FLUSH_DELAY; + trans_brick->max_flying = CONF_LOGST_FLYING; if (!trans_brick->log_reads) { trans_brick->q_phase2.q_max_queued = 0;