import mars-103.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-04-29 10:36:10 +01:00
parent 05e63c109a
commit cabe994815
8 changed files with 626 additions and 346 deletions

View File

@ -4,14 +4,8 @@
#define LIB_QUEUE_H
#define QUEUE_ANCHOR(PREFIX,KEYTYPE,HEAPTYPE) \
spinlock_t q_lock; \
struct list_head q_anchor; \
struct pairing_heap_##HEAPTYPE *heap_high; \
struct pairing_heap_##HEAPTYPE *heap_low; \
long long q_last_insert; /* jiffies */ \
KEYTYPE heap_margin; \
KEYTYPE last_pos; \
/* parameters */ \
wait_queue_head_t *q_event; \
atomic_t *q_contention; \
struct PREFIX##_queue *q_dep; \
bool q_barrier; \
@ -28,11 +22,27 @@
int q_over_pressure; \
int q_io_prio; \
bool q_ordering; \
/* private */ \
spinlock_t q_lock; \
struct list_head q_anchor; \
struct pairing_heap_##HEAPTYPE *heap_high; \
struct pairing_heap_##HEAPTYPE *heap_low; \
long long q_last_insert; /* jiffies */ \
KEYTYPE heap_margin; \
KEYTYPE last_pos; \
#define QUEUE_FUNCTIONS(PREFIX,ELEM_TYPE,HEAD,KEYFN,KEYCMP,HEAPTYPE) \
\
static inline \
void q_##PREFIX##_trigger(struct PREFIX##_queue *q) \
{ \
if (q->q_event) { \
wake_up_interruptible(q->q_event); \
} \
} \
\
static inline \
void q_##PREFIX##_init(struct PREFIX##_queue *q) \
{ \
INIT_LIST_HEAD(&q->q_anchor); \
@ -64,6 +74,8 @@ void q_##PREFIX##_insert(struct PREFIX##_queue *q, ELEM_TYPE *elem) \
q->q_last_insert = jiffies; \
\
traced_unlock(&q->q_lock, flags); \
\
q_##PREFIX##_trigger(q); \
} \
\
static inline \
@ -123,6 +135,8 @@ ELEM_TYPE *q_##PREFIX##_fetch(struct PREFIX##_queue *q) \
\
traced_unlock(&q->q_lock, flags); \
\
q_##PREFIX##_trigger(q); \
\
return elem; \
} \
\
@ -181,8 +195,9 @@ bool q_##PREFIX##_is_ready(struct logger_queue *q) \
* (measured in realtime) \
*/ \
if (q->q_max_jiffies > 0 && \
(long long)jiffies - q->q_last_insert >= q->q_max_jiffies) \
(long long)jiffies - q->q_last_insert >= q->q_max_jiffies) { \
goto limit; \
} \
\
/* 6) when no contention, start draining the queue. \
*/ \
@ -196,11 +211,28 @@ limit: \
/* Limit the number of flying requests (parallelism) \
*/ \
flying = atomic_read(&q->q_flying); \
if (q->q_max_flying > 0 && flying >= q->q_max_flying) \
if (q->q_max_flying > 0 && flying >= q->q_max_flying) { \
res = false; \
} \
\
always_done: \
return res; \
} \
\
static inline \
void q_##PREFIX##_inc_flying(struct PREFIX##_queue *q) \
{ \
atomic_inc(&q->q_flying); \
q_##PREFIX##_trigger(q); \
} \
\
static inline \
void q_##PREFIX##_dec_flying(struct PREFIX##_queue *q) \
{ \
atomic_dec(&q->q_flying); \
q_##PREFIX##_trigger(q); \
} \
\
#endif

View File

@ -294,6 +294,7 @@ static int aio_submit_thread(void *data)
while (!kthread_should_stop()) {
struct aio_mref_aspect *mref_a;
struct mref_object *mref;
int sleeptime;
int err;
wait_event_interruptible_timeout(
@ -334,12 +335,29 @@ static int aio_submit_thread(void *data)
}
}
sleeptime = 1000 / HZ;
for (;;) {
if (mref->ref_rw != READ && output->brick->wait_during_fdsync) {
if (output->fdsync_active) {
atomic_inc(&output->total_fdsync_wait_count);
}
wait_event_interruptible_timeout(
output->fdsync_event,
!output->fdsync_active || kthread_should_stop(),
60 * HZ);
}
err = aio_submit(output, mref_a, false);
if (likely(err != -EAGAIN)) {
break;
}
msleep(1000 / HZ);
atomic_inc(&output->total_delay_count);
msleep(sleeptime);
if (sleeptime < 100) {
sleeptime += 1000 / HZ;
}
}
if (unlikely(err < 0)) {
_complete(output, mref, err);
@ -469,17 +487,20 @@ static int aio_sync_thread(void *data)
int i;
int err;
output->fdsync_active = false;
wait_event_interruptible_timeout(
tinfo->event,
kthread_should_stop() ||
_dequeue(tinfo, false),
HZ);
60 * HZ);
traced_lock(&tinfo->lock, flags);
for (i = MARS_PRIO_HIGH; i <= MARS_PRIO_LOW; i++) {
if (!list_empty(&tinfo->mref_list[i])) {
// move over the whole list
list_replace_init(&tinfo->mref_list[i], &tmp_list);
output->fdsync_active = true;
break;
}
}
@ -489,8 +510,11 @@ static int aio_sync_thread(void *data)
continue;
err = vfs_fsync(file, file->f_path.dentry, 1);
if (err < 0)
output->fdsync_active = false;
wake_up_interruptible(&output->fdsync_event);
if (err < 0) {
MARS_ERR("FDSYNC error %d\n", err);
}
/* Signal completion for the whole list.
* No locking needed, it's on the stack.
@ -532,8 +556,8 @@ char *aio_statistics(struct aio_brick *brick, int verbose)
// FIXME: check for allocation overflows
sprintf(res, "total reads=%d writes=%d allocs=%d | flying reads=%d writes=%d allocs=%d \n",
atomic_read(&output->total_read_count), atomic_read(&output->total_write_count), atomic_read(&output->total_alloc_count),
sprintf(res, "total reads = %d writes = %d allocs = %d delays = %d fdsync_waits = %d | flying reads = %d writes = %d allocs = %d \n",
atomic_read(&output->total_read_count), atomic_read(&output->total_write_count), atomic_read(&output->total_alloc_count), atomic_read(&output->total_delay_count), atomic_read(&output->total_fdsync_wait_count),
atomic_read(&output->read_count), atomic_read(&output->write_count), atomic_read(&output->alloc_count));
return res;
@ -546,6 +570,8 @@ void aio_reset_statistics(struct aio_brick *brick)
atomic_set(&output->total_read_count, 0);
atomic_set(&output->total_write_count, 0);
atomic_set(&output->total_alloc_count, 0);
atomic_set(&output->total_delay_count, 0);
atomic_set(&output->total_fdsync_wait_count, 0);
}
@ -700,6 +726,7 @@ cleanup:
static int aio_output_construct(struct aio_output *output)
{
init_waitqueue_head(&output->fdsync_event);
return 0;
}

View File

@ -19,6 +19,7 @@ struct aio_brick {
int readahead;
bool o_direct;
bool o_fdsync;
bool wait_during_fdsync;
};
struct aio_input {
@ -41,10 +42,14 @@ struct aio_output {
int fd; // FIXME: remove this!
struct aio_threadinfo tinfo[3];
aio_context_t ctxp;
wait_queue_head_t fdsync_event;
volatile bool fdsync_active;
// statistics
atomic_t total_read_count;
atomic_t total_write_count;
atomic_t total_alloc_count;
atomic_t total_delay_count;
atomic_t total_fdsync_wait_count;
atomic_t read_count;
atomic_t write_count;
atomic_t alloc_count;

View File

@ -43,6 +43,7 @@ void bio_callback(struct bio *bio, int code)
spin_lock_irqsave(&brick->lock, flags);
if (list_empty(&mref_a->io_head)) {
list_add_tail(&mref_a->io_head, &brick->completed_list);
atomic_inc(&brick->completed_count);
}
spin_unlock_irqrestore(&brick->lock, flags);
@ -272,6 +273,7 @@ err:
static void bio_ref_io(struct bio_output *output, struct mref_object *mref)
{
struct bio_brick *brick = output->brick;
struct bio_mref_aspect *mref_a = bio_mref_get_aspect(output, mref);
struct bio *bio;
struct generic_callback *cb;
@ -284,20 +286,20 @@ static void bio_ref_io(struct bio_output *output, struct mref_object *mref)
CHECK_ATOMIC(&mref->ref_count, 1);
atomic_inc(&mref->ref_count);
atomic_inc(&output->brick->fly_count);
atomic_inc(&brick->fly_count);
bio_get(bio);
rw = mref->ref_rw & 1;
MARS_IO("starting IO rw = %d fly = %d\n", rw, atomic_read(&output->brick->fly_count));
MARS_IO("starting IO rw = %d fly = %d\n", rw, atomic_read(&brick->fly_count));
mars_trace(mref, "bio_submit");
#ifdef WAIT_CLASH
mref_a->hash_pos = (mref->ref_pos / PAGE_SIZE) % WAIT_CLASH;
if (mref->ref_rw) {
down_write(&output->brick->hashtable[mref_a->hash_pos]);
down_write(&brick->hashtable[mref_a->hash_pos]);
} else {
down_read(&output->brick->hashtable[mref_a->hash_pos]);
down_read(&brick->hashtable[mref_a->hash_pos]);
}
#endif
@ -318,7 +320,7 @@ static void bio_ref_io(struct bio_output *output, struct mref_object *mref)
#if 1
bio_put(bio);
atomic_dec(&output->brick->fly_count);
atomic_dec(&brick->fly_count);
#endif
err:
MARS_ERR("IO error %d\n", status);
@ -327,7 +329,7 @@ err:
cb->cb_error = status;
cb->cb_fn(cb);
}
done:;
done: ;
}
static int bio_thread(void *data)
@ -340,7 +342,7 @@ static int bio_thread(void *data)
LIST_HEAD(tmp_list);
unsigned long flags;
wait_event_interruptible_timeout(brick->event, !list_empty(&brick->completed_list), 10 * HZ);
wait_event_interruptible_timeout(brick->event, atomic_read(&brick->completed_count) > 0, 12 * HZ);
spin_lock_irqsave(&brick->lock, flags);
list_replace_init(&brick->completed_list, &tmp_list);
@ -361,6 +363,7 @@ static int bio_thread(void *data)
tmp = tmp_list.next;
list_del_init(tmp);
atomic_dec(&brick->completed_count);
mref_a = container_of(tmp, struct bio_mref_aspect, io_head);
code = mref_a->status_code;
@ -388,6 +391,7 @@ static int bio_thread(void *data)
cb->cb_fn(cb);
atomic_dec(&brick->fly_count);
atomic_inc(&brick->total_completed_count);
MARS_IO("fly = %d\n", atomic_read(&brick->fly_count));
bio_ref_put(mref_a->output, mref);
}
@ -482,6 +486,29 @@ done:
}
//////////////// informational / statistics ///////////////
static noinline
char *bio_statistics(struct bio_brick *brick, int verbose)
{
char *res = kmalloc(128, GFP_MARS);
if (!res)
return NULL;
// FIXME: check for allocation overflows
sprintf(res, "total completed = %d | flying = %d completing = %d\n", atomic_read(&brick->total_completed_count), atomic_read(&brick->fly_count), atomic_read(&brick->completed_count));
return res;
}
static noinline
void bio_reset_statistics(struct bio_brick *brick)
{
atomic_set(&brick->total_completed_count, 0);
}
//////////////// object / aspect constructors / destructors ///////////////
static int bio_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
@ -534,6 +561,8 @@ static int bio_output_destruct(struct bio_output *output)
static struct bio_brick_ops bio_brick_ops = {
.brick_switch = bio_switch,
.brick_statistics = bio_statistics,
.reset_statistics = bio_reset_statistics,
};
static struct bio_output_ops bio_output_ops = {

View File

@ -24,6 +24,8 @@ struct bio_brick {
// readonly
loff_t total_size;
atomic_t fly_count;
atomic_t completed_count;
atomic_t total_completed_count;
// private
spinlock_t lock;
struct list_head completed_list;

View File

@ -61,10 +61,12 @@ struct light_class {
//#define TRANS_FAKE
#define CONF_TRANS_BATCHLEN 32
#define CONF_TRANS_FLYING 4
//#define CONF_TRANS_FLYING 4
#define CONF_TRANS_FLYING 128
#define CONF_TRANS_PRIO MARS_PRIO_HIGH
//#define CONF_TRANS_LOG_READS false
#define CONF_TRANS_LOG_READS true
#define CONF_TRANS_MINIMIZE_LATENCY true
//#define CONF_ALL_BATCHLEN 2
#define CONF_ALL_BATCHLEN 1
@ -84,6 +86,7 @@ struct light_class {
//#define IF_READAHEAD 0
#define BIO_READAHEAD 1
#define AIO_READAHEAD 1
#define AIO_WAIT_DURING_FDSYNC true
static
void _set_trans_params(struct mars_brick *_brick, void *private)
@ -93,41 +96,42 @@ void _set_trans_params(struct mars_brick *_brick, void *private)
MARS_ERR("bad brick type\n");
return;
}
if (!trans_brick->outputs[0]->q_phase2.q_ordering) {
trans_brick->outputs[0]->q_phase1.q_batchlen = CONF_TRANS_BATCHLEN;
trans_brick->outputs[0]->q_phase2.q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->outputs[0]->q_phase3.q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->outputs[0]->q_phase4.q_batchlen = CONF_ALL_BATCHLEN;
if (!trans_brick->q_phase2.q_ordering) {
trans_brick->q_phase1.q_batchlen = CONF_TRANS_BATCHLEN;
trans_brick->q_phase2.q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->q_phase3.q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->q_phase4.q_batchlen = CONF_ALL_BATCHLEN;
trans_brick->outputs[0]->q_phase1.q_max_flying = CONF_TRANS_FLYING;
trans_brick->outputs[0]->q_phase2.q_max_flying = CONF_ALL_FLYING;
trans_brick->outputs[0]->q_phase3.q_max_flying = CONF_ALL_FLYING;
trans_brick->outputs[0]->q_phase4.q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase1.q_max_flying = CONF_TRANS_FLYING;
trans_brick->q_phase2.q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase3.q_max_flying = CONF_ALL_FLYING;
trans_brick->q_phase4.q_max_flying = CONF_ALL_FLYING;
trans_brick->outputs[0]->q_phase1.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->outputs[0]->q_phase2.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->outputs[0]->q_phase3.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->outputs[0]->q_phase4.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase1.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase2.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase3.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->q_phase4.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->outputs[0]->q_phase1.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->outputs[0]->q_phase2.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->outputs[0]->q_phase3.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->outputs[0]->q_phase4.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase1.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase2.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase3.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->q_phase4.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->outputs[0]->q_phase1.q_io_prio = CONF_TRANS_PRIO;
trans_brick->outputs[0]->q_phase2.q_io_prio = CONF_ALL_PRIO;
trans_brick->outputs[0]->q_phase3.q_io_prio = CONF_ALL_PRIO;
trans_brick->outputs[0]->q_phase4.q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase1.q_io_prio = CONF_TRANS_PRIO;
trans_brick->q_phase2.q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase3.q_io_prio = CONF_ALL_PRIO;
trans_brick->q_phase4.q_io_prio = CONF_ALL_PRIO;
trans_brick->outputs[0]->q_phase2.q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->outputs[0]->q_phase4.q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->q_phase2.q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->q_phase4.q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->outputs[0]->q_phase2.q_max_jiffies = CONF_ALL_MAX_JIFFIES;
trans_brick->outputs[0]->q_phase4.q_max_jiffies = CONF_ALL_MAX_JIFFIES;
trans_brick->q_phase2.q_max_jiffies = CONF_ALL_MAX_JIFFIES;
trans_brick->q_phase4.q_max_jiffies = CONF_ALL_MAX_JIFFIES;
trans_brick->outputs[0]->q_phase2.q_ordering = true;
trans_brick->outputs[0]->q_phase4.q_ordering = true;
trans_brick->q_phase2.q_ordering = true;
trans_brick->q_phase4.q_ordering = true;
trans_brick->log_reads = CONF_TRANS_LOG_READS;
trans_brick->minimize_latency = CONF_TRANS_MINIMIZE_LATENCY;
#ifdef TRANS_FAKE
trans_brick->debug_shortcut = true;
#endif
@ -137,8 +141,8 @@ void _set_trans_params(struct mars_brick *_brick, void *private)
trans_brick->flush_delay = FLUSH_DELAY;
if (!trans_brick->log_reads) {
trans_brick->outputs[0]->q_phase2.q_max_queued = 0;
trans_brick->outputs[0]->q_phase4.q_max_queued *= 2;
trans_brick->q_phase2.q_max_queued = 0;
trans_brick->q_phase4.q_max_queued *= 2;
}
}
}
@ -164,6 +168,7 @@ void _set_aio_params(struct mars_brick *_brick, void *private)
aio_brick->readahead = AIO_READAHEAD;
aio_brick->o_direct = false; // important!
aio_brick->o_fdsync = true;
aio_brick->wait_during_fdsync = AIO_WAIT_DURING_FDSYNC;
}
static

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,7 @@ _PAIRING_HEAP_TYPEDEF(logger,)
struct logger_queue {
QUEUE_ANCHOR(logger,loff_t,logger);
struct trans_logger_output *q_output;
struct trans_logger_brick *q_brick;
const char *q_insert_info;
const char *q_pushback_info;
const char *q_fetch_info;
@ -33,6 +33,27 @@ struct logger_head {
////////////////////////////////////////////////////////////////////
#if 0
#define TL_INPUT_READ 0
#define TL_INPUT_WRITEBACK 1
#define TL_INPUT_FW_LOG1 2
#define TL_INPUT_FW_LOG2 3
#define TL_INPUT_BW_LOG1 4
#define TL_INPUT_BW_LOG2 5
#define TL_INPUT_COUNT 6
#else
#define TL_INPUT_READ 0
#define TL_INPUT_WRITEBACK 0
#define TL_INPUT_FW_LOG1 1
#define TL_INPUT_FW_LOG2 1
#define TL_INPUT_BW_LOG1 1
#define TL_INPUT_BW_LOG2 1
#define TL_INPUT_COUNT 2
#endif
struct hash_anchor {
rwlock_t hash_lock;
struct list_head hash_anchor;
@ -56,7 +77,8 @@ struct writeback_info {
struct trans_logger_mref_aspect {
GENERIC_ASPECT(mref);
struct trans_logger_output *output;
struct trans_logger_output *my_output;
struct trans_logger_input *my_input;
struct logger_head lh;
struct list_head hash_head;
//struct list_head q_head;
@ -92,6 +114,7 @@ struct trans_logger_brick {
bool do_replay; // mode of operation
bool do_continuous_replay; // mode of operation
bool log_reads; // additionally log pre-images
bool minimize_latency; // ... at the cost of throughput
bool debug_shortcut; // only for testing! never use in production!
loff_t replay_start_pos; // where to start replay
loff_t replay_end_pos; // end of replay
@ -102,15 +125,13 @@ struct trans_logger_brick {
int replay_code; // replay errors (if any)
// private
loff_t old_margin;
struct log_status logst;
spinlock_t pos_lock;
spinlock_t replay_lock;
struct list_head pos_list;
struct list_head replay_list;
};
struct trans_logger_output {
MARS_OUTPUT(trans_logger);
struct task_struct *thread;
wait_queue_head_t event;
// statistics
atomic_t replay_count;
atomic_t fly_count;
atomic_t hash_count;
@ -120,27 +141,32 @@ struct trans_logger_output {
atomic_t inner_balance_count;
atomic_t sub_balance_count;
atomic_t wb_balance_count;
atomic_t total_cb_count;
atomic_t total_read_count;
atomic_t total_write_count;
atomic_t total_writeback_count;
atomic_t total_shortcut_count;
atomic_t total_mshadow_count;
atomic_t total_sshadow_count;
struct task_struct *thread;
wait_queue_head_t event;
struct generic_object_layout writeback_layout;
struct generic_object_layout replay_layout;
// queues
struct logger_queue q_phase1;
struct logger_queue q_phase2;
struct logger_queue q_phase3;
struct logger_queue q_phase4;
bool did_pushback;
bool did_work;
struct hash_anchor hash_table[TRANS_HASH_MAX];
};
struct trans_logger_output {
MARS_OUTPUT(trans_logger);
};
struct trans_logger_input {
MARS_INPUT(trans_logger);
struct generic_object_layout sub_layout;
struct trans_logger_output hidden_output;
struct log_status logst;
};
MARS_TYPES(trans_logger);