mars_bio: queue PRIO_NORMAL writes for less contention

This commit is contained in:
Thomas Schoebel-Theuer 2012-10-12 09:50:37 +02:00 committed by Thomas Schoebel-Theuer
parent 1b016c796a
commit 9b4ed5e0b5
2 changed files with 136 additions and 78 deletions

View File

@ -49,7 +49,7 @@ void bio_callback(struct bio *bio, int code)
}
spin_unlock_irqrestore(&brick->lock, flags);
wake_up_interruptible(&brick->event);
wake_up_interruptible(&brick->response_event);
return;
err:
@ -364,58 +364,54 @@ done: ;
static
void bio_ref_io(struct bio_output *output, struct mref_object *mref)
{
if (mref->ref_prio == MARS_PRIO_LOW) { // queue for background IO
if (mref->ref_prio == MARS_PRIO_LOW ||
(mref->ref_prio == MARS_PRIO_NORMAL && mref->ref_rw)) {
struct bio_mref_aspect *mref_a = bio_mref_get_aspect(output->brick, mref);
struct bio_brick *brick = output->brick;
unsigned long flags;
atomic_inc(&mref->ref_count);
spin_lock_irqsave(&brick->lock, flags);
list_add_tail(&mref_a->io_head, &brick->background_list);
list_add_tail(&mref_a->io_head, &brick->queue_list[PRIO_INDEX(mref)]);
atomic_inc(&brick->queue_count[PRIO_INDEX(mref)]);
spin_unlock_irqrestore(&brick->lock, flags);
atomic_inc(&brick->background_count);
atomic_inc(&brick->total_background_count);
wake_up_interruptible(&brick->event);
brick->submitted = true;
wake_up_interruptible(&brick->submit_event);
return;
}
// foreground IO: start immediately
// realtime IO: start immediately
_bio_ref_io(output, mref, false);
}
static
bool _bg_should_run(struct bio_brick *brick)
{
return (atomic_read(&brick->background_count) > 0 &&
atomic_read(&brick->fly_count[0]) + atomic_read(&brick->fly_count[1]) <= brick->bg_threshold &&
(brick->bg_maxfly <= 0 || atomic_read(&brick->fly_count[2]) < brick->bg_maxfly));
}
static int bio_thread(void *data)
int bio_response_thread(void *data)
{
struct bio_brick *brick = data;
#ifdef IO_DEBUGGING
int round = 0;
#endif
MARS_INF("bio kthread has started on '%s'.\n", brick->brick_path);
MARS_INF("bio response thread has started on '%s'.\n", brick->brick_path);
for (;;) {
LIST_HEAD(tmp_list);
unsigned long flags;
int count;
#ifdef IO_DEBUGGING
round++;
MARS_IO("%d sleeping...\n", round);
#endif
wait_event_interruptible_timeout(
brick->event,
atomic_read(&brick->completed_count) > 0 ||
_bg_should_run(brick),
12 * HZ);
brick->response_event,
atomic_read(&brick->completed_count) > 0,
HZ);
MARS_IO("%d woken up, completed_count = %d background_count = %d fly_count[0] = %d fly_count[1] = %d fly_count[2] = %d\n",
MARS_IO("%d woken up, completed_count = %d fly_count[0] = %d fly_count[1] = %d fly_count[2] = %d\n",
round,
atomic_read(&brick->completed_count),
atomic_read(&brick->background_count),
atomic_read(&brick->fly_count[0]),
atomic_read(&brick->fly_count[1]),
atomic_read(&brick->fly_count[2]));
@ -424,6 +420,7 @@ static int bio_thread(void *data)
list_replace_init(&brick->completed_list, &tmp_list);
spin_unlock_irqrestore(&brick->lock, flags);
count = 0;
for (;;) {
struct list_head *tmp;
struct bio_mref_aspect *mref_a;
@ -431,7 +428,7 @@ static int bio_thread(void *data)
int code;
if (list_empty(&tmp_list)) {
if (kthread_should_stop() && atomic_read(&brick->background_count) <= 0)
if (kthread_should_stop())
goto done;
break;
}
@ -464,42 +461,103 @@ static int bio_thread(void *data)
atomic_dec(&brick->fly_count[PRIO_INDEX(mref)]);
atomic_inc(&brick->total_completed_count[PRIO_INDEX(mref)]);
count++;
MARS_IO("%d completed_count = %d background_count = %d fly_count = %d\n", round, atomic_read(&brick->completed_count), atomic_read(&brick->background_count), atomic_read(&brick->fly_count[PRIO_INDEX(mref)]));
MARS_IO("%d completed_count = %d fly_count = %d\n", round, atomic_read(&brick->completed_count), atomic_read(&brick->fly_count[PRIO_INDEX(mref)]));
if (likely(mref_a->bio)) {
bio_put(mref_a->bio);
}
bio_ref_put(mref_a->output, mref);
}
while (_bg_should_run(brick)) {
struct list_head *tmp;
struct bio_mref_aspect *mref_a;
struct mref_object *mref;
bool cork;
MARS_IO("%d pushing background to foreground, completed_count = %d background_count = %d\n", round, atomic_read(&brick->completed_count), atomic_read(&brick->background_count));
atomic_dec(&brick->background_count);
spin_lock_irqsave(&brick->lock, flags);
tmp = brick->background_list.next;
list_del_init(tmp);
spin_unlock_irqrestore(&brick->lock, flags);
mref_a = container_of(tmp, struct bio_mref_aspect, io_head);
mref = mref_a->object;
if (unlikely(!mref)) {
MARS_ERR("invalid mref\n");
continue;
}
cork = atomic_read(&brick->background_count) > 0;
_bio_ref_io(mref_a->output, mref, cork);
if (count) {
brick->submitted = true;
wake_up_interruptible(&brick->submit_event);
}
}
done:
MARS_INF("bio kthread has stopped.\n");
MARS_INF("bio response thread has stopped.\n");
return 0;
}
static
bool _bg_should_run(struct bio_brick *brick)
{
return (atomic_read(&brick->queue_count[2]) > 0 &&
atomic_read(&brick->fly_count[0]) + atomic_read(&brick->fly_count[1]) <= brick->bg_threshold &&
(brick->bg_maxfly <= 0 || atomic_read(&brick->fly_count[2]) < brick->bg_maxfly));
}
static
int bio_submit_thread(void *data)
{
struct bio_brick *brick = data;
#ifdef IO_DEBUGGING
int round = 0;
#endif
MARS_INF("bio submit thread has started on '%s'.\n", brick->brick_path);
while (!kthread_should_stop()) {
int prio;
#ifdef IO_DEBUGGING
round++;
MARS_IO("%d sleeping...\n", round);
#endif
wait_event_interruptible_timeout(
brick->submit_event,
brick->submitted,
HZ);
brick->submitted = false;
MARS_IO("%d woken up, completed_count = %d fly_count[0] = %d fly_count[1] = %d fly_count[2] = %d\n",
round,
atomic_read(&brick->completed_count),
atomic_read(&brick->fly_count[0]),
atomic_read(&brick->fly_count[1]),
atomic_read(&brick->fly_count[2]));
for (prio = 0; prio < MARS_PRIO_NR; prio++) {
LIST_HEAD(tmp_list);
unsigned long flags;
if (prio == MARS_PRIO_NR-1 && !_bg_should_run(brick)) {
break;
}
MARS_IO("%d pushing prio %d to foreground, completed_count = %d\n", round, prio, atomic_read(&brick->completed_count));
spin_lock_irqsave(&brick->lock, flags);
list_replace_init(&brick->queue_list[prio], &tmp_list);
spin_unlock_irqrestore(&brick->lock, flags);
while (!list_empty(&tmp_list)) {
struct list_head *tmp = tmp_list.next;
struct bio_mref_aspect *mref_a;
struct mref_object *mref;
bool cork;
list_del_init(tmp);
mref_a = container_of(tmp, struct bio_mref_aspect, io_head);
mref = mref_a->object;
if (unlikely(!mref)) {
MARS_ERR("invalid mref\n");
continue;
}
atomic_dec(&brick->queue_count[PRIO_INDEX(mref)]);
cork = atomic_read(&brick->queue_count[PRIO_INDEX(mref)]) > 0;
_bio_ref_io(mref_a->output, mref, cork);
bio_ref_put(mref_a->output, mref);
}
}
}
MARS_INF("bio submit thread has stopped.\n");
return 0;
}
@ -547,15 +605,12 @@ static int bio_switch(struct bio_brick *brick)
#endif
brick->bvec_max = queue_max_hw_sectors(q) >> (PAGE_SHIFT - 9);
brick->total_size = inode->i_size;
brick->thread = kthread_create(bio_thread, brick, "mars_bio%d", index++);
if (IS_ERR(brick->thread)) {
status = PTR_ERR(brick->thread);
MARS_ERR("cannot create thread\n");
brick->thread = NULL;
}
if (brick->thread) {
brick->response_thread = brick_thread_create(bio_response_thread, brick, "mars_bio_r%d", index);
brick->submit_thread = brick_thread_create(bio_submit_thread, brick, "mars_bio_s%d", index);
index++;
if (likely(brick->submit_thread && brick->response_thread)) {
brick->bdev = inode->i_bdev;
wake_up_process(brick->thread);
status = 0;
}
}
@ -571,11 +626,8 @@ static int bio_switch(struct bio_brick *brick)
filp_close(brick->filp, NULL);
brick->filp = NULL;
}
if (brick->thread) {
MARS_INF("stopping thread...\n");
kthread_stop(brick->thread);
brick->thread = NULL;
}
brick_thread_stop(brick->submit_thread);
brick_thread_stop(brick->response_thread);
brick->bdev = NULL;
brick->total_size = 0;
@ -606,22 +658,24 @@ char *bio_statistics(struct bio_brick *brick, int verbose)
"total "
"completed[0] = %d "
"completed[1] = %d "
"completed[2] = %d "
"background = %d | "
"completed[2] = %d | "
"queued[0] = %d "
"queued[1] = %d "
"queued[2] = %d "
"flying[0] = %d "
"flying[1] = %d "
"flying[2] = %d "
"completing = %d "
"background = %d\n",
"completing = %d\n",
atomic_read(&brick->total_completed_count[0]),
atomic_read(&brick->total_completed_count[1]),
atomic_read(&brick->total_completed_count[2]),
atomic_read(&brick->total_background_count),
atomic_read(&brick->fly_count[0]),
atomic_read(&brick->queue_count[0]),
atomic_read(&brick->queue_count[1]),
atomic_read(&brick->queue_count[2]),
atomic_read(&brick->fly_count[1]),
atomic_read(&brick->fly_count[2]),
atomic_read(&brick->completed_count),
atomic_read(&brick->background_count));
atomic_read(&brick->completed_count));
return res;
}
@ -632,7 +686,6 @@ void bio_reset_statistics(struct bio_brick *brick)
atomic_set(&brick->total_completed_count[0], 0);
atomic_set(&brick->total_completed_count[1], 0);
atomic_set(&brick->total_completed_count[2], 0);
atomic_set(&brick->total_background_count, 0);
}
@ -658,9 +711,12 @@ MARS_MAKE_STATICS(bio);
static int bio_brick_construct(struct bio_brick *brick)
{
spin_lock_init(&brick->lock);
INIT_LIST_HEAD(&brick->background_list);
INIT_LIST_HEAD(&brick->queue_list[0]);
INIT_LIST_HEAD(&brick->queue_list[1]);
INIT_LIST_HEAD(&brick->queue_list[2]);
INIT_LIST_HEAD(&brick->completed_list);
init_waitqueue_head(&brick->event);
init_waitqueue_head(&brick->submit_event);
init_waitqueue_head(&brick->response_event);
return 0;
}

View File

@ -27,19 +27,21 @@ struct bio_brick {
// readonly
loff_t total_size;
atomic_t fly_count[MARS_PRIO_NR];
atomic_t background_count;
atomic_t queue_count[MARS_PRIO_NR];
atomic_t completed_count;
atomic_t total_completed_count[MARS_PRIO_NR];
atomic_t total_background_count;
// private
spinlock_t lock;
struct list_head background_list;
struct list_head queue_list[MARS_PRIO_NR];
struct list_head completed_list;
wait_queue_head_t event;
wait_queue_head_t submit_event;
wait_queue_head_t response_event;
struct file *filp;
struct block_device *bdev;
struct task_struct *thread;
brick_thread_t *submit_thread;
brick_thread_t *response_thread;
int bvec_max;
bool submitted;
};
struct bio_input {