trans_logger: replace single queues by array

This commit is contained in:
schoebel 2012-02-11 20:01:16 +01:00 committed by Thomas Schoebel-Theuer
parent 63c3bf0f49
commit 2c5fcccc4e
3 changed files with 96 additions and 98 deletions

View File

@ -821,7 +821,7 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put()
atomic_inc(&brick->inner_balance_count);
qq_mref_insert(&brick->q_phase0, mref_a);
qq_mref_insert(&brick->q_phase[0], mref_a);
wake_up_interruptible_all(&brick->worker_event);
return;
}
@ -1316,7 +1316,7 @@ void phase0_endio(void *private, int error)
orig_mref = orig_mref_a->object;
CHECK_PTR(orig_mref, err);
qq_dec_flying(&brick->q_phase0);
qq_dec_flying(&brick->q_phase[0]);
/* Pin mref->ref_count so it can't go away
* after _complete().
@ -1333,7 +1333,7 @@ void phase0_endio(void *private, int error)
/* Queue up for the next phase.
*/
qq_mref_insert(&brick->q_phase1, orig_mref_a);
qq_mref_insert(&brick->q_phase[1], orig_mref_a);
/* Undo the above pinning
*/
@ -1401,7 +1401,7 @@ bool phase0_startio(struct trans_logger_mref_aspect *orig_mref_a)
atomic_inc(&brick->pos_count);
traced_unlock(&input->pos_lock, flags);
qq_inc_flying(&brick->q_phase0);
qq_inc_flying(&brick->q_phase[0]);
return true;
err:
@ -1519,10 +1519,10 @@ void phase1_endio(struct generic_callback *cb)
goto err;
}
qq_dec_flying(&brick->q_phase1);
qq_dec_flying(&brick->q_phase[1]);
// queue up for the next phase
qq_wb_insert(&brick->q_phase2, wb);
qq_wb_insert(&brick->q_phase[2], wb);
wake_up_interruptible_all(&brick->worker_event);
return;
@ -1582,11 +1582,11 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a)
atomic_set(&wb->w_sub_log_count, atomic_read(&wb->w_sub_read_count));
if (brick->log_reads) {
qq_inc_flying(&brick->q_phase1);
qq_inc_flying(&brick->q_phase[1]);
fire_writeback(&wb->w_sub_read_list, false);
} else { // shortcut
#ifdef LATER
qq_wb_insert(&brick->q_phase3, wb);
qq_wb_insert(&brick->q_phase[3], wb);
wake_up_interruptible_all(&brick->worker_event);
#else
return phase3_startio(wb);
@ -1615,7 +1615,7 @@ void _phase2_endio(struct writeback_info *wb)
struct trans_logger_brick *brick = wb->w_brick;
// queue up for the next phase
qq_wb_insert(&brick->q_phase3, wb);
qq_wb_insert(&brick->q_phase[3], wb);
wake_up_interruptible_all(&brick->worker_event);
return;
}
@ -1634,7 +1634,7 @@ void phase2_endio(void *private, int error)
brick = wb->w_brick;
CHECK_PTR(brick, err);
qq_dec_flying(&brick->q_phase2);
qq_dec_flying(&brick->q_phase[2]);
if (unlikely(error < 0)) {
MARS_FAT("IO error %d\n", error);
@ -1694,7 +1694,7 @@ bool _phase2_startio(struct trans_logger_mref_aspect *sub_mref_a)
goto err;
}
qq_inc_flying(&brick->q_phase2);
qq_inc_flying(&brick->q_phase[2]);
return true;
@ -1766,7 +1766,7 @@ void phase3_endio(struct generic_callback *cb)
hash_put_all(brick, &wb->w_collect_list);
qq_dec_flying(&brick->q_phase3);
qq_dec_flying(&brick->q_phase[3]);
atomic_inc(&brick->total_writeback_cluster_count);
free_writeback(wb);
@ -1804,7 +1804,7 @@ bool phase3_startio(struct writeback_info *wb)
/* Start writeback IO
*/
qq_inc_flying(&wb->w_brick->q_phase3);
qq_inc_flying(&wb->w_brick->q_phase[3]);
fire_writeback(&wb->w_sub_write_list, true);
return true;
}
@ -1886,14 +1886,14 @@ done:
static inline
int _congested(struct trans_logger_brick *brick)
{
return atomic_read(&brick->q_phase0.q_queued)
|| atomic_read(&brick->q_phase0.q_flying)
|| atomic_read(&brick->q_phase1.q_queued)
|| atomic_read(&brick->q_phase1.q_flying)
|| atomic_read(&brick->q_phase2.q_queued)
|| atomic_read(&brick->q_phase2.q_flying)
|| atomic_read(&brick->q_phase3.q_queued)
|| atomic_read(&brick->q_phase3.q_flying);
return atomic_read(&brick->q_phase[0].q_queued)
|| atomic_read(&brick->q_phase[0].q_flying)
|| atomic_read(&brick->q_phase[1].q_queued)
|| atomic_read(&brick->q_phase[1].q_flying)
|| atomic_read(&brick->q_phase[2].q_queued)
|| atomic_read(&brick->q_phase[2].q_flying)
|| atomic_read(&brick->q_phase[3].q_queued)
|| atomic_read(&brick->q_phase[3].q_flying);
}
static inline
@ -1925,16 +1925,16 @@ static noinline
bool _condition(struct condition_status *st, struct trans_logger_brick *brick)
{
st->log_ready = logst_is_ready(brick);
st->q0_ready = atomic_read(&brick->q_phase0.q_queued) > 0 &&
st->q0_ready = atomic_read(&brick->q_phase[0].q_queued) > 0 &&
st->log_ready;
st->q1_ready = qq_is_ready(&brick->q_phase1);
st->q2_ready = qq_is_ready(&brick->q_phase2);
st->q3_ready = qq_is_ready(&brick->q_phase3);
st->q1_ready = qq_is_ready(&brick->q_phase[1]);
st->q2_ready = qq_is_ready(&brick->q_phase[2]);
st->q3_ready = qq_is_ready(&brick->q_phase[3]);
st->extra_ready = (kthread_should_stop() && !_congested(brick));
st->some_ready = st->q0_ready | st->q1_ready | st->q2_ready | st->q3_ready | st->extra_ready;
#if 0
if (!st->some_ready)
st->q0_ready = atomic_read(&brick->q_phase0.q_queued) > 0;
st->q0_ready = atomic_read(&brick->q_phase[0].q_queued) > 0;
#endif
return st->some_ready;
}
@ -2069,7 +2069,7 @@ void trans_logger_log(struct trans_logger_brick *brick)
j0 = jiffies;
#endif
//MARS_DBG("AHA %d\n", atomic_read(&brick->q_phase0.q_queued));
//MARS_DBG("AHA %d\n", atomic_read(&brick->q_phase[0].q_queued));
#ifdef STAT_DEBUGGING
if (((long long)jiffies) - last_jiffies >= HZ * 5 && brick->power.button) {
@ -2088,7 +2088,7 @@ void trans_logger_log(struct trans_logger_brick *brick)
/* This is highest priority, do it first.
*/
if (st.q0_ready) {
run_mref_queue(&brick->q_phase0, prep_phase_startio, brick->q_phase0.q_batchlen);
run_mref_queue(&brick->q_phase[0], prep_phase_startio, brick->q_phase[0].q_batchlen);
}
j1 = jiffies;
@ -2103,12 +2103,12 @@ void trans_logger_log(struct trans_logger_brick *brick)
* soft rate (or rate balance).
*/
if (true || st.q3_ready) {
run_wb_queue(&brick->q_phase3, phase3_startio, brick->q_phase3.q_batchlen);
run_wb_queue(&brick->q_phase[3], phase3_startio, brick->q_phase[3].q_batchlen);
}
j2 = jiffies;
if (true || st.q2_ready) {
run_wb_queue(&brick->q_phase2, phase2_startio, brick->q_phase2.q_batchlen);
run_wb_queue(&brick->q_phase[2], phase2_startio, brick->q_phase[2].q_batchlen);
}
j3 = jiffies;
@ -2117,7 +2117,7 @@ void trans_logger_log(struct trans_logger_brick *brick)
* stopping individual queues!
*/
if (true || st.q1_ready) {
run_mref_queue(&brick->q_phase1, phase1_startio, brick->q_phase1.q_batchlen);
run_mref_queue(&brick->q_phase[1], phase1_startio, brick->q_phase[1].q_batchlen);
}
j4 = jiffies;
@ -2141,7 +2141,7 @@ void trans_logger_log(struct trans_logger_brick *brick)
if (!do_flush) { // start over soon
wait_timeout = brick->flush_delay;
}
} else if (atomic_read(&brick->q_phase0.q_queued) <= 0 &&
} else if (atomic_read(&brick->q_phase[0].q_queued) <= 0 &&
(brick->minimize_latency || (long long)jiffies - old_jiffies >= old_wait_timeout)) {
do_flush = true;
}
@ -2193,9 +2193,9 @@ void trans_logger_log(struct trans_logger_brick *brick)
unlimited = LIMIT_FN(1, 2);
}
if (unlimited != old_unlimited) {
brick->q_phase1.q_unlimited = unlimited;
brick->q_phase2.q_unlimited = unlimited;
brick->q_phase3.q_unlimited = unlimited;
brick->q_phase[1].q_unlimited = unlimited;
brick->q_phase[2].q_unlimited = unlimited;
brick->q_phase[3].q_unlimited = unlimited;
MARS_DBG("mshadow_count = %d/%d global_mem = %ld/%lld unlimited %d -> %d\n", atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic64_read(&brick->shadow_mem_used), brick_global_memlimit, old_unlimited, unlimited);
old_unlimited = unlimited;
wake_up_interruptible_all(&brick->worker_event);
@ -2644,10 +2644,10 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
atomic_read(&brick->total_round_count),
atomic_read(&brick->total_restart_count),
atomic_read(&brick->total_delay_count),
atomic_read(&brick->q_phase0.q_total),
atomic_read(&brick->q_phase1.q_total),
atomic_read(&brick->q_phase2.q_total),
atomic_read(&brick->q_phase3.q_total),
atomic_read(&brick->q_phase[0].q_total),
atomic_read(&brick->q_phase[1].q_total),
atomic_read(&brick->q_phase[2].q_total),
atomic_read(&brick->q_phase[3].q_total),
atomic_read(&brick->mref_object_layout.alloc_count),
atomic64_read(&brick->shadow_mem_used),
brick_global_memlimit,
@ -2662,14 +2662,14 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
atomic_read(&brick->outer_balance_count),
atomic_read(&brick->wb_balance_count),
atomic_read(&brick->fly_count),
atomic_read(&brick->q_phase0.q_queued),
atomic_read(&brick->q_phase0.q_flying),
atomic_read(&brick->q_phase1.q_queued),
atomic_read(&brick->q_phase1.q_flying),
atomic_read(&brick->q_phase2.q_queued),
atomic_read(&brick->q_phase2.q_flying),
atomic_read(&brick->q_phase3.q_queued),
atomic_read(&brick->q_phase3.q_flying));
atomic_read(&brick->q_phase[0].q_queued),
atomic_read(&brick->q_phase[0].q_flying),
atomic_read(&brick->q_phase[1].q_queued),
atomic_read(&brick->q_phase[1].q_flying),
atomic_read(&brick->q_phase[2].q_queued),
atomic_read(&brick->q_phase[2].q_flying),
atomic_read(&brick->q_phase[3].q_queued),
atomic_read(&brick->q_phase[3].q_flying));
return res;
}
@ -2746,29 +2746,29 @@ int trans_logger_brick_construct(struct trans_logger_brick *brick)
INIT_LIST_HEAD(&brick->replay_list);
init_waitqueue_head(&brick->worker_event);
init_waitqueue_head(&brick->caller_event);
qq_init(&brick->q_phase0, brick);
qq_init(&brick->q_phase1, brick);
qq_init(&brick->q_phase2, brick);
qq_init(&brick->q_phase3, brick);
qq_init(&brick->q_phase[0], brick);
qq_init(&brick->q_phase[1], brick);
qq_init(&brick->q_phase[2], brick);
qq_init(&brick->q_phase[3], brick);
#if 1
brick->q_phase1.q_dep = &brick->q_phase3;
brick->q_phase[1].q_dep = &brick->q_phase[3];
/* TODO: this is cyclic and therefore potentially dangerous.
* Find a better solution to the starvation problem!
*/
//brick->q_phase3.q_dep = &brick->q_phase0;
//brick->q_phase[3].q_dep = &brick->q_phase[0];
#endif
brick->q_phase0.q_insert_info = "q0_ins";
brick->q_phase0.q_pushback_info = "q0_push";
brick->q_phase0.q_fetch_info = "q0_fetch";
brick->q_phase1.q_insert_info = "q1_ins";
brick->q_phase1.q_pushback_info = "q1_push";
brick->q_phase1.q_fetch_info = "q1_fetch";
brick->q_phase2.q_insert_info = "q2_ins";
brick->q_phase2.q_pushback_info = "q2_push";
brick->q_phase2.q_fetch_info = "q2_fetch";
brick->q_phase3.q_insert_info = "q3_ins";
brick->q_phase3.q_pushback_info = "q3_push";
brick->q_phase3.q_fetch_info = "q3_fetch";
brick->q_phase[0].q_insert_info = "q0_ins";
brick->q_phase[0].q_pushback_info = "q0_push";
brick->q_phase[0].q_fetch_info = "q0_fetch";
brick->q_phase[1].q_insert_info = "q1_ins";
brick->q_phase[1].q_pushback_info = "q1_push";
brick->q_phase[1].q_fetch_info = "q1_fetch";
brick->q_phase[2].q_insert_info = "q2_ins";
brick->q_phase[2].q_pushback_info = "q2_push";
brick->q_phase[2].q_fetch_info = "q2_fetch";
brick->q_phase[3].q_insert_info = "q3_ins";
brick->q_phase[3].q_pushback_info = "q3_push";
brick->q_phase[3].q_fetch_info = "q3_fetch";
brick->new_input_nr = TL_INPUT_LOG1;
brick->log_input_nr = TL_INPUT_LOG1;
brick->old_input_nr = TL_INPUT_LOG1;

View File

@ -6,6 +6,7 @@
#define REGION_SIZE (1 << REGION_SIZE_BITS)
#define TRANS_HASH_MAX 8192
//#define TRANS_HASH_MAX 16384
#define LOGGER_QUEUES 4
#include <linux/time.h>
@ -170,10 +171,7 @@ struct trans_logger_brick {
atomic_t total_restart_count;
atomic_t total_delay_count;
// queues
struct logger_queue q_phase0;
struct logger_queue q_phase1;
struct logger_queue q_phase2;
struct logger_queue q_phase3;
struct logger_queue q_phase[LOGGER_QUEUES];
bool did_pushback;
bool did_work;
bool delay_callers;

View File

@ -141,40 +141,40 @@ int _set_trans_params(struct mars_brick *_brick, void *private)
MARS_ERR("bad brick type\n");
return -EINVAL;
}
if (!trans_brick->q_phase1.q_ordering) {
trans_brick->q_phase0.q_batchlen = CONF_TRANS_BATCHLEN;
trans_brick->q_phase1.q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->q_phase2.q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->q_phase3.q_batchlen = CONF_ALL_BATCHLEN;
if (!trans_brick->q_phase[1].q_ordering) {
trans_brick->q_phase[0].q_batchlen = CONF_TRANS_BATCHLEN;
trans_brick->q_phase[1].q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->q_phase[2].q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->q_phase[3].q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->q_phase0.q_max_flying = CONF_TRANS_FLYING;
trans_brick->q_phase1.q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase2.q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase3.q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase[0].q_max_flying = CONF_TRANS_FLYING;
trans_brick->q_phase[1].q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase[2].q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase[3].q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase0.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase1.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase2.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase3.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase[0].q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase[1].q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase[2].q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase[3].q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase0.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase1.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase2.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase3.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase[0].q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase[1].q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase[2].q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase[3].q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase0.q_io_prio = CONF_TRANS_PRIO;
trans_brick->q_phase1.q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase2.q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase3.q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase[0].q_io_prio = CONF_TRANS_PRIO;
trans_brick->q_phase[1].q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase[2].q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase[3].q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase1.q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->q_phase3.q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->q_phase[1].q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->q_phase[3].q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->q_phase1.q_max_jiffies = CONF_ALL_MAX_JIFFIES;
trans_brick->q_phase3.q_max_jiffies = CONF_ALL_MAX_JIFFIES;
trans_brick->q_phase[1].q_max_jiffies = CONF_ALL_MAX_JIFFIES;
trans_brick->q_phase[3].q_max_jiffies = CONF_ALL_MAX_JIFFIES;
trans_brick->q_phase1.q_ordering = true;
trans_brick->q_phase3.q_ordering = true;
trans_brick->q_phase[1].q_ordering = true;
trans_brick->q_phase[3].q_ordering = true;
trans_brick->shadow_mem_limit = CONF_TRANS_SHADOW_LIMIT;
trans_brick->log_reads = CONF_TRANS_LOG_READS;
@ -191,8 +191,8 @@ int _set_trans_params(struct mars_brick *_brick, void *private)
trans_brick->max_flying = CONF_LOGST_FLYING;
if (!trans_brick->log_reads) {
trans_brick->q_phase1.q_max_queued = 0;
trans_brick->q_phase3.q_max_queued *= 2;
trans_brick->q_phase[1].q_max_queued = 0;
trans_brick->q_phase[3].q_max_queued *= 2;
}
}
MARS_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);