diff --git a/mars_trans_logger.c b/mars_trans_logger.c index a23a4693..0cf21420 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -821,7 +821,7 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put() atomic_inc(&brick->inner_balance_count); - qq_mref_insert(&brick->q_phase0, mref_a); + qq_mref_insert(&brick->q_phase[0], mref_a); wake_up_interruptible_all(&brick->worker_event); return; } @@ -1316,7 +1316,7 @@ void phase0_endio(void *private, int error) orig_mref = orig_mref_a->object; CHECK_PTR(orig_mref, err); - qq_dec_flying(&brick->q_phase0); + qq_dec_flying(&brick->q_phase[0]); /* Pin mref->ref_count so it can't go away * after _complete(). @@ -1333,7 +1333,7 @@ void phase0_endio(void *private, int error) /* Queue up for the next phase. */ - qq_mref_insert(&brick->q_phase1, orig_mref_a); + qq_mref_insert(&brick->q_phase[1], orig_mref_a); /* Undo the above pinning */ @@ -1401,7 +1401,7 @@ bool phase0_startio(struct trans_logger_mref_aspect *orig_mref_a) atomic_inc(&brick->pos_count); traced_unlock(&input->pos_lock, flags); - qq_inc_flying(&brick->q_phase0); + qq_inc_flying(&brick->q_phase[0]); return true; err: @@ -1519,10 +1519,10 @@ void phase1_endio(struct generic_callback *cb) goto err; } - qq_dec_flying(&brick->q_phase1); + qq_dec_flying(&brick->q_phase[1]); // queue up for the next phase - qq_wb_insert(&brick->q_phase2, wb); + qq_wb_insert(&brick->q_phase[2], wb); wake_up_interruptible_all(&brick->worker_event); return; @@ -1582,11 +1582,11 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a) atomic_set(&wb->w_sub_log_count, atomic_read(&wb->w_sub_read_count)); if (brick->log_reads) { - qq_inc_flying(&brick->q_phase1); + qq_inc_flying(&brick->q_phase[1]); fire_writeback(&wb->w_sub_read_list, false); } else { // shortcut #ifdef LATER - qq_wb_insert(&brick->q_phase3, wb); + qq_wb_insert(&brick->q_phase[3], wb); wake_up_interruptible_all(&brick->worker_event); #else return phase3_startio(wb); @@ -1615,7 +1615,7 @@ void _phase2_endio(struct writeback_info *wb) struct trans_logger_brick *brick = wb->w_brick; // queue up for the next phase - qq_wb_insert(&brick->q_phase3, wb); + qq_wb_insert(&brick->q_phase[3], wb); wake_up_interruptible_all(&brick->worker_event); return; } @@ -1634,7 +1634,7 @@ void phase2_endio(void *private, int error) brick = wb->w_brick; CHECK_PTR(brick, err); - qq_dec_flying(&brick->q_phase2); + qq_dec_flying(&brick->q_phase[2]); if (unlikely(error < 0)) { MARS_FAT("IO error %d\n", error); @@ -1694,7 +1694,7 @@ bool _phase2_startio(struct trans_logger_mref_aspect *sub_mref_a) goto err; } - qq_inc_flying(&brick->q_phase2); + qq_inc_flying(&brick->q_phase[2]); return true; @@ -1766,7 +1766,7 @@ void phase3_endio(struct generic_callback *cb) hash_put_all(brick, &wb->w_collect_list); - qq_dec_flying(&brick->q_phase3); + qq_dec_flying(&brick->q_phase[3]); atomic_inc(&brick->total_writeback_cluster_count); free_writeback(wb); @@ -1804,7 +1804,7 @@ bool phase3_startio(struct writeback_info *wb) /* Start writeback IO */ - qq_inc_flying(&wb->w_brick->q_phase3); + qq_inc_flying(&wb->w_brick->q_phase[3]); fire_writeback(&wb->w_sub_write_list, true); return true; } @@ -1886,14 +1886,14 @@ done: static inline int _congested(struct trans_logger_brick *brick) { - return atomic_read(&brick->q_phase0.q_queued) - || atomic_read(&brick->q_phase0.q_flying) - || atomic_read(&brick->q_phase1.q_queued) - || atomic_read(&brick->q_phase1.q_flying) - || atomic_read(&brick->q_phase2.q_queued) - || atomic_read(&brick->q_phase2.q_flying) - || atomic_read(&brick->q_phase3.q_queued) - || atomic_read(&brick->q_phase3.q_flying); + return atomic_read(&brick->q_phase[0].q_queued) + || atomic_read(&brick->q_phase[0].q_flying) + || atomic_read(&brick->q_phase[1].q_queued) + || atomic_read(&brick->q_phase[1].q_flying) + || atomic_read(&brick->q_phase[2].q_queued) + || atomic_read(&brick->q_phase[2].q_flying) + || atomic_read(&brick->q_phase[3].q_queued) + || atomic_read(&brick->q_phase[3].q_flying); } static inline @@ -1925,16 +1925,16 @@ static noinline bool _condition(struct condition_status *st, struct trans_logger_brick *brick) { st->log_ready = logst_is_ready(brick); - st->q0_ready = atomic_read(&brick->q_phase0.q_queued) > 0 && + st->q0_ready = atomic_read(&brick->q_phase[0].q_queued) > 0 && st->log_ready; - st->q1_ready = qq_is_ready(&brick->q_phase1); - st->q2_ready = qq_is_ready(&brick->q_phase2); - st->q3_ready = qq_is_ready(&brick->q_phase3); + st->q1_ready = qq_is_ready(&brick->q_phase[1]); + st->q2_ready = qq_is_ready(&brick->q_phase[2]); + st->q3_ready = qq_is_ready(&brick->q_phase[3]); st->extra_ready = (kthread_should_stop() && !_congested(brick)); st->some_ready = st->q0_ready | st->q1_ready | st->q2_ready | st->q3_ready | st->extra_ready; #if 0 if (!st->some_ready) - st->q0_ready = atomic_read(&brick->q_phase0.q_queued) > 0; + st->q0_ready = atomic_read(&brick->q_phase[0].q_queued) > 0; #endif return st->some_ready; } @@ -2069,7 +2069,7 @@ void trans_logger_log(struct trans_logger_brick *brick) j0 = jiffies; #endif - //MARS_DBG("AHA %d\n", atomic_read(&brick->q_phase0.q_queued)); + //MARS_DBG("AHA %d\n", atomic_read(&brick->q_phase[0].q_queued)); #ifdef STAT_DEBUGGING if (((long long)jiffies) - last_jiffies >= HZ * 5 && brick->power.button) { @@ -2088,7 +2088,7 @@ void trans_logger_log(struct trans_logger_brick *brick) /* This is highest priority, do it first. */ if (st.q0_ready) { - run_mref_queue(&brick->q_phase0, prep_phase_startio, brick->q_phase0.q_batchlen); + run_mref_queue(&brick->q_phase[0], prep_phase_startio, brick->q_phase[0].q_batchlen); } j1 = jiffies; @@ -2103,12 +2103,12 @@ void trans_logger_log(struct trans_logger_brick *brick) * soft rate (or rate balance). */ if (true || st.q3_ready) { - run_wb_queue(&brick->q_phase3, phase3_startio, brick->q_phase3.q_batchlen); + run_wb_queue(&brick->q_phase[3], phase3_startio, brick->q_phase[3].q_batchlen); } j2 = jiffies; if (true || st.q2_ready) { - run_wb_queue(&brick->q_phase2, phase2_startio, brick->q_phase2.q_batchlen); + run_wb_queue(&brick->q_phase[2], phase2_startio, brick->q_phase[2].q_batchlen); } j3 = jiffies; @@ -2117,7 +2117,7 @@ void trans_logger_log(struct trans_logger_brick *brick) * stopping individual queues! */ if (true || st.q1_ready) { - run_mref_queue(&brick->q_phase1, phase1_startio, brick->q_phase1.q_batchlen); + run_mref_queue(&brick->q_phase[1], phase1_startio, brick->q_phase[1].q_batchlen); } j4 = jiffies; @@ -2141,7 +2141,7 @@ void trans_logger_log(struct trans_logger_brick *brick) if (!do_flush) { // start over soon wait_timeout = brick->flush_delay; } - } else if (atomic_read(&brick->q_phase0.q_queued) <= 0 && + } else if (atomic_read(&brick->q_phase[0].q_queued) <= 0 && (brick->minimize_latency || (long long)jiffies - old_jiffies >= old_wait_timeout)) { do_flush = true; } @@ -2193,9 +2193,9 @@ void trans_logger_log(struct trans_logger_brick *brick) unlimited = LIMIT_FN(1, 2); } if (unlimited != old_unlimited) { - brick->q_phase1.q_unlimited = unlimited; - brick->q_phase2.q_unlimited = unlimited; - brick->q_phase3.q_unlimited = unlimited; + brick->q_phase[1].q_unlimited = unlimited; + brick->q_phase[2].q_unlimited = unlimited; + brick->q_phase[3].q_unlimited = unlimited; MARS_DBG("mshadow_count = %d/%d global_mem = %ld/%lld unlimited %d -> %d\n", atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic64_read(&brick->shadow_mem_used), brick_global_memlimit, old_unlimited, unlimited); old_unlimited = unlimited; wake_up_interruptible_all(&brick->worker_event); @@ -2644,10 +2644,10 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose) atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->total_delay_count), - atomic_read(&brick->q_phase0.q_total), - atomic_read(&brick->q_phase1.q_total), - atomic_read(&brick->q_phase2.q_total), - atomic_read(&brick->q_phase3.q_total), + atomic_read(&brick->q_phase[0].q_total), + atomic_read(&brick->q_phase[1].q_total), + atomic_read(&brick->q_phase[2].q_total), + atomic_read(&brick->q_phase[3].q_total), atomic_read(&brick->mref_object_layout.alloc_count), atomic64_read(&brick->shadow_mem_used), brick_global_memlimit, @@ -2662,14 +2662,14 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose) atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), - atomic_read(&brick->q_phase0.q_queued), - atomic_read(&brick->q_phase0.q_flying), - atomic_read(&brick->q_phase1.q_queued), - atomic_read(&brick->q_phase1.q_flying), - atomic_read(&brick->q_phase2.q_queued), - atomic_read(&brick->q_phase2.q_flying), - atomic_read(&brick->q_phase3.q_queued), - atomic_read(&brick->q_phase3.q_flying)); + atomic_read(&brick->q_phase[0].q_queued), + atomic_read(&brick->q_phase[0].q_flying), + atomic_read(&brick->q_phase[1].q_queued), + atomic_read(&brick->q_phase[1].q_flying), + atomic_read(&brick->q_phase[2].q_queued), + atomic_read(&brick->q_phase[2].q_flying), + atomic_read(&brick->q_phase[3].q_queued), + atomic_read(&brick->q_phase[3].q_flying)); return res; } @@ -2746,29 +2746,29 @@ int trans_logger_brick_construct(struct trans_logger_brick *brick) INIT_LIST_HEAD(&brick->replay_list); init_waitqueue_head(&brick->worker_event); init_waitqueue_head(&brick->caller_event); - qq_init(&brick->q_phase0, brick); - qq_init(&brick->q_phase1, brick); - qq_init(&brick->q_phase2, brick); - qq_init(&brick->q_phase3, brick); + qq_init(&brick->q_phase[0], brick); + qq_init(&brick->q_phase[1], brick); + qq_init(&brick->q_phase[2], brick); + qq_init(&brick->q_phase[3], brick); #if 1 - brick->q_phase1.q_dep = &brick->q_phase3; + brick->q_phase[1].q_dep = &brick->q_phase[3]; /* TODO: this is cyclic and therefore potentially dangerous. * Find a better solution to the starvation problem! */ - //brick->q_phase3.q_dep = &brick->q_phase0; + //brick->q_phase[3].q_dep = &brick->q_phase[0]; #endif - brick->q_phase0.q_insert_info = "q0_ins"; - brick->q_phase0.q_pushback_info = "q0_push"; - brick->q_phase0.q_fetch_info = "q0_fetch"; - brick->q_phase1.q_insert_info = "q1_ins"; - brick->q_phase1.q_pushback_info = "q1_push"; - brick->q_phase1.q_fetch_info = "q1_fetch"; - brick->q_phase2.q_insert_info = "q2_ins"; - brick->q_phase2.q_pushback_info = "q2_push"; - brick->q_phase2.q_fetch_info = "q2_fetch"; - brick->q_phase3.q_insert_info = "q3_ins"; - brick->q_phase3.q_pushback_info = "q3_push"; - brick->q_phase3.q_fetch_info = "q3_fetch"; + brick->q_phase[0].q_insert_info = "q0_ins"; + brick->q_phase[0].q_pushback_info = "q0_push"; + brick->q_phase[0].q_fetch_info = "q0_fetch"; + brick->q_phase[1].q_insert_info = "q1_ins"; + brick->q_phase[1].q_pushback_info = "q1_push"; + brick->q_phase[1].q_fetch_info = "q1_fetch"; + brick->q_phase[2].q_insert_info = "q2_ins"; + brick->q_phase[2].q_pushback_info = "q2_push"; + brick->q_phase[2].q_fetch_info = "q2_fetch"; + brick->q_phase[3].q_insert_info = "q3_ins"; + brick->q_phase[3].q_pushback_info = "q3_push"; + brick->q_phase[3].q_fetch_info = "q3_fetch"; brick->new_input_nr = TL_INPUT_LOG1; brick->log_input_nr = TL_INPUT_LOG1; brick->old_input_nr = TL_INPUT_LOG1; diff --git a/mars_trans_logger.h b/mars_trans_logger.h index 2c5b5b34..1f0d5066 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -6,6 +6,7 @@ #define REGION_SIZE (1 << REGION_SIZE_BITS) #define TRANS_HASH_MAX 8192 //#define TRANS_HASH_MAX 16384 +#define LOGGER_QUEUES 4 #include @@ -170,10 +171,7 @@ struct trans_logger_brick { atomic_t total_restart_count; atomic_t total_delay_count; // queues - struct logger_queue q_phase0; - struct logger_queue q_phase1; - struct logger_queue q_phase2; - struct logger_queue q_phase3; + struct logger_queue q_phase[LOGGER_QUEUES]; bool did_pushback; bool did_work; bool delay_callers; diff --git a/sy_old/mars_light.c b/sy_old/mars_light.c index 24a14d49..5557877f 100644 --- a/sy_old/mars_light.c +++ b/sy_old/mars_light.c @@ -141,40 +141,40 @@ int _set_trans_params(struct mars_brick *_brick, void *private) MARS_ERR("bad brick type\n"); return -EINVAL; } - if (!trans_brick->q_phase1.q_ordering) { - trans_brick->q_phase0.q_batchlen = CONF_TRANS_BATCHLEN; - trans_brick->q_phase1.q_batchlen = CONF_ALL_BATCHLEN; - trans_brick->q_phase2.q_batchlen = CONF_ALL_BATCHLEN; - trans_brick->q_phase3.q_batchlen = CONF_ALL_BATCHLEN; + if (!trans_brick->q_phase[1].q_ordering) { + trans_brick->q_phase[0].q_batchlen = CONF_TRANS_BATCHLEN; + trans_brick->q_phase[1].q_batchlen = CONF_ALL_BATCHLEN; + trans_brick->q_phase[2].q_batchlen = CONF_ALL_BATCHLEN; + trans_brick->q_phase[3].q_batchlen = CONF_ALL_BATCHLEN; - trans_brick->q_phase0.q_max_flying = CONF_TRANS_FLYING; - trans_brick->q_phase1.q_max_flying = CONF_ALL_FLYING; - trans_brick->q_phase2.q_max_flying = CONF_ALL_FLYING; - trans_brick->q_phase3.q_max_flying = CONF_ALL_FLYING; + trans_brick->q_phase[0].q_max_flying = CONF_TRANS_FLYING; + trans_brick->q_phase[1].q_max_flying = CONF_ALL_FLYING; + trans_brick->q_phase[2].q_max_flying = CONF_ALL_FLYING; + trans_brick->q_phase[3].q_max_flying = CONF_ALL_FLYING; - trans_brick->q_phase0.q_max_contention = CONF_ALL_CONTENTION; - trans_brick->q_phase1.q_max_contention = CONF_ALL_CONTENTION; - trans_brick->q_phase2.q_max_contention = CONF_ALL_CONTENTION; - trans_brick->q_phase3.q_max_contention = CONF_ALL_CONTENTION; + trans_brick->q_phase[0].q_max_contention = CONF_ALL_CONTENTION; + trans_brick->q_phase[1].q_max_contention = CONF_ALL_CONTENTION; + trans_brick->q_phase[2].q_max_contention = CONF_ALL_CONTENTION; + trans_brick->q_phase[3].q_max_contention = CONF_ALL_CONTENTION; - trans_brick->q_phase0.q_over_pressure = CONF_ALL_PRESSURE; - trans_brick->q_phase1.q_over_pressure = CONF_ALL_PRESSURE; - trans_brick->q_phase2.q_over_pressure = CONF_ALL_PRESSURE; - trans_brick->q_phase3.q_over_pressure = CONF_ALL_PRESSURE; + trans_brick->q_phase[0].q_over_pressure = CONF_ALL_PRESSURE; + trans_brick->q_phase[1].q_over_pressure = CONF_ALL_PRESSURE; + trans_brick->q_phase[2].q_over_pressure = CONF_ALL_PRESSURE; + trans_brick->q_phase[3].q_over_pressure = CONF_ALL_PRESSURE; - trans_brick->q_phase0.q_io_prio = CONF_TRANS_PRIO; - trans_brick->q_phase1.q_io_prio = CONF_ALL_PRIO; - trans_brick->q_phase2.q_io_prio = CONF_ALL_PRIO; - trans_brick->q_phase3.q_io_prio = CONF_ALL_PRIO; + trans_brick->q_phase[0].q_io_prio = CONF_TRANS_PRIO; + trans_brick->q_phase[1].q_io_prio = CONF_ALL_PRIO; + trans_brick->q_phase[2].q_io_prio = CONF_ALL_PRIO; + trans_brick->q_phase[3].q_io_prio = CONF_ALL_PRIO; - trans_brick->q_phase1.q_max_queued = CONF_ALL_MAX_QUEUE; - trans_brick->q_phase3.q_max_queued = CONF_ALL_MAX_QUEUE; + trans_brick->q_phase[1].q_max_queued = CONF_ALL_MAX_QUEUE; + trans_brick->q_phase[3].q_max_queued = CONF_ALL_MAX_QUEUE; - trans_brick->q_phase1.q_max_jiffies = CONF_ALL_MAX_JIFFIES; - trans_brick->q_phase3.q_max_jiffies = CONF_ALL_MAX_JIFFIES; + trans_brick->q_phase[1].q_max_jiffies = CONF_ALL_MAX_JIFFIES; + trans_brick->q_phase[3].q_max_jiffies = CONF_ALL_MAX_JIFFIES; - trans_brick->q_phase1.q_ordering = true; - trans_brick->q_phase3.q_ordering = true; + trans_brick->q_phase[1].q_ordering = true; + trans_brick->q_phase[3].q_ordering = true; trans_brick->shadow_mem_limit = CONF_TRANS_SHADOW_LIMIT; trans_brick->log_reads = CONF_TRANS_LOG_READS; @@ -191,8 +191,8 @@ int _set_trans_params(struct mars_brick *_brick, void *private) trans_brick->max_flying = CONF_LOGST_FLYING; if (!trans_brick->log_reads) { - trans_brick->q_phase1.q_max_queued = 0; - trans_brick->q_phase3.q_max_queued *= 2; + trans_brick->q_phase[1].q_max_queued = 0; + trans_brick->q_phase[3].q_max_queued *= 2; } } MARS_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);