From 4c6369d65dfdedffd9c84e11867b401e3001f92e Mon Sep 17 00:00:00 2001 From: Thomas Schoebel-Theuer Date: Thu, 26 May 2011 15:32:32 +0100 Subject: [PATCH] import mars-107.tgz --- lib_log.c | 61 ++++++++++--- lib_log.h | 2 +- mars_bio.c | 90 ++++++++++++++------ mars_bio.h | 8 +- mars_client.c | 17 +++- mars_client.h | 1 + mars_copy.c | 16 ++-- mars_copy.h | 8 +- mars_light.c | 26 +++++- mars_server.c | 202 +++++++++++++++++++++++++++++++++++--------- mars_server.h | 7 ++ mars_trans_logger.c | 101 ++++++++++++++++------ mars_trans_logger.h | 2 + 13 files changed, 411 insertions(+), 130 deletions(-) diff --git a/lib_log.c b/lib_log.c index 266b8858..dfcda201 100644 --- a/lib_log.c +++ b/lib_log.c @@ -20,11 +20,22 @@ EXPORT_SYMBOL_GPL(init_logst); struct log_cb_info { struct mref_object *mref; - int nr_endio; + struct semaphore mutex; + atomic_t refcount; + int nr_cb; + void (*preios[MARS_LOG_CB_MAX])(void *private); void (*endios[MARS_LOG_CB_MAX])(void *private, int error); void *privates[MARS_LOG_CB_MAX]; }; +static +void put_log_cb_info(struct log_cb_info *cb_info) +{ + if (atomic_dec_and_test(&cb_info->refcount)) { + kfree(cb_info); + } +} + static void log_write_endio(struct generic_callback *cb) { @@ -38,12 +49,18 @@ void log_write_endio(struct generic_callback *cb) mars_log_trace(cb_info->mref); } - MARS_IO("nr_endio = %d\n", cb_info->nr_endio); + MARS_IO("nr_cb = %d\n", cb_info->nr_cb); - for (i = 0; i < cb_info->nr_endio; i++) { - cb_info->endios[i](cb_info->privates[i], cb->cb_error); + down(&cb_info->mutex); + for (i = 0; i < cb_info->nr_cb; i++) { + void (*cbf)(void *private, int error) = cb_info->endios[i]; + if (cbf) { + cbf(cb_info->privates[i], cb->cb_error); + } } - kfree(cb_info); + cb_info->nr_cb = 0; // prevent late preio() callbacks + up(&cb_info->mutex); + put_log_cb_info(cb_info); return; err: @@ -54,7 +71,9 @@ void log_flush(struct log_status *logst) { struct mref_object *mref = logst->log_mref; struct generic_callback *cb; + struct log_cb_info *cb_info; int gap; + int i; if (!mref || !logst->count) return; @@ -81,7 +100,8 @@ void log_flush(struct log_status *logst) cb = &mref->_ref_cb; cb->cb_fn = log_write_endio; - cb->cb_private = logst->private; + cb_info = logst->private; + cb->cb_private = cb_info; logst->private = NULL; cb->cb_error = 0; cb->cb_prev = NULL; @@ -96,6 +116,18 @@ void log_flush(struct log_status *logst) logst->offset = 0; logst->count = 0; logst->log_mref = NULL; + + if (cb_info->nr_cb > 0) { + down(&cb_info->mutex); + for (i = 0; i < cb_info->nr_cb; i++) { + void (*cbf)(void *private) = cb_info->preios[i]; + if (cbf) { + cbf(cb_info->privates[i]); + } + } + up(&cb_info->mutex); + } + put_log_cb_info(cb_info); } EXPORT_SYMBOL_GPL(log_flush); @@ -112,7 +144,7 @@ void *log_reserve(struct log_status *logst, struct log_header *lh) mref = logst->log_mref; if ((mref && total_len > mref->ref_len - logst->offset) - || !cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX) { + || !cb_info || cb_info->nr_cb >= MARS_LOG_CB_MAX) { log_flush(logst); } @@ -128,6 +160,8 @@ void *log_reserve(struct log_status *logst, struct log_header *lh) goto err; } cb_info = logst->private; + sema_init(&cb_info->mutex, 1); + atomic_set(&cb_info->refcount, 2); mref = mars_alloc_mref(logst->output, &logst->ref_object_layout); if (unlikely(!mref)) { @@ -206,7 +240,7 @@ err: } EXPORT_SYMBOL_GPL(log_reserve); -bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private, int error), void *private) +bool log_finalize(struct log_status *logst, int len, void (*preio)(void *private), void (*endio)(void *private, int error), void *private) { struct mref_object *mref = logst->log_mref; struct log_cb_info *cb_info = logst->private; @@ -214,7 +248,7 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private void *data; int offset; int restlen; - int nr_endio; + int nr_cb; bool ok = false; CHECK_PTR(mref, err); @@ -228,7 +262,7 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private MARS_ERR("trying to write more than available (%d > %d)\n", len, (int)(restlen - END_OVERHEAD)); goto err; } - if (unlikely(!cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX)) { + if (unlikely(!cb_info || cb_info->nr_cb >= MARS_LOG_CB_MAX)) { MARS_ERR("too many endio() calls\n"); goto err; } @@ -267,9 +301,10 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private offset = logst->validflag_offset; DATA_PUT(data, offset, (char)1); - nr_endio = cb_info->nr_endio++; - cb_info->endios[nr_endio] = endio; - cb_info->privates[nr_endio] = private; + nr_cb = cb_info->nr_cb++; + cb_info->preios[nr_cb] = preio; + cb_info->endios[nr_cb] = endio; + cb_info->privates[nr_cb] = private; logst->count++; ok = true; diff --git a/lib_log.h b/lib_log.h index 219cb7d1..7224aece 100644 --- a/lib_log.h +++ b/lib_log.h @@ -120,7 +120,7 @@ void log_flush(struct log_status *logst); void *log_reserve(struct log_status *logst, struct log_header *lh); -bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private, int error), void *private); +bool log_finalize(struct log_status *logst, int len, void (*preio)(void *private), void (*endio)(void *private, int error), void *private); int log_read(struct log_status *logst, struct log_header *lh, void **payload, int *payload_len); diff --git a/mars_bio.c b/mars_bio.c index 1d03ae94..d00c8ffd 100644 --- a/mars_bio.c +++ b/mars_bio.c @@ -254,6 +254,12 @@ void bio_ref_put(struct bio_output *output, struct mref_object *mref) CHECK_PTR(mref_a, err); if (likely(mref_a->bio)) { +#ifdef MARS_DEBUGGING + int bi_cnt = atomic_read(&mref_a->bio->bi_cnt); + if (bi_cnt > 1) { + MARS_DBG("bi_cnt = %d\n", bi_cnt); + } +#endif bio_put(mref_a->bio); mref_a->bio = NULL; } @@ -271,7 +277,8 @@ err: MARS_FAT("cannot work\n"); } -static void bio_ref_io(struct bio_output *output, struct mref_object *mref) +static +void _bio_ref_io(struct bio_output *output, struct mref_object *mref) { struct bio_brick *brick = output->brick; struct bio_mref_aspect *mref_a = bio_mref_get_aspect(output, mref); @@ -306,15 +313,6 @@ static void bio_ref_io(struct bio_output *output, struct mref_object *mref) MARS_IO("starting IO rw = %d fly = %d\n", rw, atomic_read(&brick->fly_count)); mars_trace(mref, "bio_submit"); -#ifdef WAIT_CLASH - mref_a->hash_pos = (mref->ref_pos / PAGE_SIZE) % WAIT_CLASH; - if (mref->ref_rw) { - down_write(&brick->hashtable[mref_a->hash_pos]); - } else { - down_read(&brick->hashtable[mref_a->hash_pos]); - } -#endif - #ifdef FAKE_IO bio->bi_end_io(bio, 0); #else @@ -331,10 +329,9 @@ static void bio_ref_io(struct bio_output *output, struct mref_object *mref) if (likely(status >= 0)) goto done; -#if 1 bio_put(bio); atomic_dec(&brick->fly_count); -#endif + err: MARS_ERR("IO error %d\n", status); cb = mref->ref_cb; @@ -345,6 +342,26 @@ err: done: ; } +static +void bio_ref_io(struct bio_output *output, struct mref_object *mref) +{ + if (mref->ref_prio == MARS_PRIO_LOW) { // queue for background IO + struct bio_mref_aspect *mref_a = bio_mref_get_aspect(output, mref); + struct bio_brick *brick = output->brick; + unsigned long flags; + + spin_lock_irqsave(&brick->lock, flags); + list_add_tail(&mref_a->io_head, &brick->background_list); + spin_unlock_irqrestore(&brick->lock, flags); + atomic_inc(&brick->background_count); + atomic_inc(&brick->total_background_count); + wake_up_interruptible(&brick->event); + return; + } + // foreground IO: start immediately + _bio_ref_io(output, mref); +} + static int bio_thread(void *data) { struct bio_brick *brick = data; @@ -355,7 +372,11 @@ static int bio_thread(void *data) LIST_HEAD(tmp_list); unsigned long flags; - wait_event_interruptible_timeout(brick->event, atomic_read(&brick->completed_count) > 0, 12 * HZ); + wait_event_interruptible_timeout( + brick->event, + atomic_read(&brick->completed_count) > 0 || + (atomic_read(&brick->background_count) > 0 && !atomic_read(&brick->fly_count)), + 12 * HZ); spin_lock_irqsave(&brick->lock, flags); list_replace_init(&brick->completed_list, &tmp_list); @@ -384,13 +405,6 @@ static int bio_thread(void *data) mref = mref_a->object; -#ifdef WAIT_CLASH - if (mref_a->object->ref_rw) { - up_write(&brick->hashtable[mref_a->hash_pos]); - } else { - up_read(&brick->hashtable[mref_a->hash_pos]); - } -#endif mars_trace(mref, "bio_endio"); cb = mref->ref_cb; @@ -406,8 +420,32 @@ static int bio_thread(void *data) atomic_dec(&brick->fly_count); atomic_inc(&brick->total_completed_count); MARS_IO("fly = %d\n", atomic_read(&brick->fly_count)); + if (likely(mref_a->bio)) { + bio_put(mref_a->bio); + } bio_ref_put(mref_a->output, mref); } + + if (!atomic_read(&brick->fly_count) && atomic_read(&brick->background_count) > 0) { + struct list_head *tmp; + struct bio_mref_aspect *mref_a; + struct mref_object *mref; + + atomic_dec(&brick->background_count); + spin_lock_irqsave(&brick->lock, flags); + tmp = brick->background_list.next; + list_del_init(tmp); + spin_unlock_irqrestore(&brick->lock, flags); + + mref_a = container_of(tmp, struct bio_mref_aspect, io_head); + mref = mref_a->object; + if (unlikely(!mref)) { + MARS_ERR("invalid mref\n"); + continue; + } + + _bio_ref_io(mref_a->output, mref); + } } done: MARS_INF("bio kthread has stopped.\n"); @@ -504,13 +542,13 @@ done: static noinline char *bio_statistics(struct bio_brick *brick, int verbose) { - char *res = kmalloc(128, GFP_MARS); + char *res = kmalloc(256, GFP_MARS); if (!res) return NULL; // FIXME: check for allocation overflows - sprintf(res, "total completed = %d | flying = %d completing = %d\n", atomic_read(&brick->total_completed_count), atomic_read(&brick->fly_count), atomic_read(&brick->completed_count)); + sprintf(res, "total completed = %d background = %d | flying = %d completing = %d background = %d\n", atomic_read(&brick->total_completed_count), atomic_read(&brick->total_background_count), atomic_read(&brick->fly_count), atomic_read(&brick->completed_count), atomic_read(&brick->background_count)); return res; } @@ -519,6 +557,7 @@ static noinline void bio_reset_statistics(struct bio_brick *brick) { atomic_set(&brick->total_completed_count, 0); + atomic_set(&brick->total_background_count, 0); } @@ -543,13 +582,8 @@ MARS_MAKE_STATICS(bio); static int bio_brick_construct(struct bio_brick *brick) { -#ifdef WAIT_CLASH - int i; - for (i = 0; i < WAIT_CLASH; i++) { - init_rwsem(&brick->hashtable[i]); - } -#endif spin_lock_init(&brick->lock); + INIT_LIST_HEAD(&brick->background_list); INIT_LIST_HEAD(&brick->completed_list); init_waitqueue_head(&brick->event); return 0; diff --git a/mars_bio.h b/mars_bio.h index 4d7f2fdc..ab8146a9 100644 --- a/mars_bio.h +++ b/mars_bio.h @@ -5,8 +5,6 @@ #include #include -//#define WAIT_CLASH 1024 - struct bio_mref_aspect { GENERIC_ASPECT(mref); struct list_head io_head; @@ -27,18 +25,18 @@ struct bio_brick { // readonly loff_t total_size; atomic_t fly_count; + atomic_t background_count; atomic_t completed_count; atomic_t total_completed_count; + atomic_t total_background_count; // private spinlock_t lock; + struct list_head background_list; struct list_head completed_list; wait_queue_head_t event; struct file *filp; struct block_device *bdev; struct task_struct *thread; -#ifdef WAIT_CLASH - struct rw_semaphore hashtable[WAIT_CLASH]; -#endif int bvec_max; }; diff --git a/mars_client.c b/mars_client.c index 839b3c4a..62ece717 100644 --- a/mars_client.c +++ b/mars_client.c @@ -33,8 +33,9 @@ static void _kill_socket(struct client_output *output) static void _kill_thread(struct client_threadinfo *ti) { - if (ti->thread) { + if (ti->thread && !ti->terminated) { kthread_stop(ti->thread); + ti->thread = NULL; } } @@ -231,7 +232,8 @@ error: client_ref_put(output, mref); } -static int receiver_thread(void *data) +static +int receiver_thread(void *data) { struct client_output *output = data; int status = 0; @@ -340,6 +342,8 @@ static int sender_thread(void *data) struct client_brick *brick = output->brick; int status = 0; + output->receiver.restart_count = 0; + while (!kthread_should_stop()) { struct list_head *tmp; struct client_mref_aspect *mref_a; @@ -375,9 +379,16 @@ static int sender_thread(void *data) traced_unlock(&output->lock, flags); } - wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info, 1 * HZ); + wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info, 10 * HZ); if (unlikely(output->receiver.terminated)) { +#if 1 + if (unlikely(output->receiver.restart_count++ > 3)) { // don't restart too often + MARS_ERR("receiver failed too often, giving up\n"); + status = -ECOMM; + break; + } +#endif output->receiver.terminated = false; output->receiver.thread = kthread_create(receiver_thread, output, "mars_receiver%d", thread_count++); if (unlikely(IS_ERR(output->receiver.thread))) { diff --git a/mars_client.h b/mars_client.h index 170ec18b..6b19907f 100644 --- a/mars_client.h +++ b/mars_client.h @@ -26,6 +26,7 @@ struct client_input { struct client_threadinfo { struct task_struct *thread; wait_queue_head_t run_event; + int restart_count; bool terminated; }; diff --git a/mars_copy.c b/mars_copy.c index 26d4afa2..6ac13d9e 100644 --- a/mars_copy.c +++ b/mars_copy.c @@ -53,7 +53,7 @@ int _clear_clash(struct copy_brick *brick) * [If you had no writes on A at all during the copy, of course * this is not necessary] * - * When optimize_mode is on, reads can utilize the already copied + * When utilize_mode is on, reads can utilize the already copied * region from B, but only as long as this region has not been * invalidated by writes (indicated by low_dirty). * @@ -69,7 +69,7 @@ int _determine_input(struct copy_brick *brick, struct mref_object *mref) int behind; loff_t ref_end; - if (!brick->optimize_mode || brick->low_dirty) + if (!brick->utilize_mode || brick->low_dirty) return INPUT_A_IO; ref_end = mref->ref_pos + mref->ref_len; @@ -178,7 +178,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_ len = tmp_pos - pos; } mref->ref_len = len; - mref->ref_prio = MARS_PRIO_LOW; + mref->ref_prio = brick->io_prio; mref->_ref_cb.cb_private = mref_a; mref->_ref_cb.cb_fn = copy_endio; mref->ref_cb = &mref->_ref_cb; @@ -279,7 +279,10 @@ int _next_state(struct copy_brick *brick, loff_t pos) if (!mref1) { goto done; } - if (mref2) { + if (brick->append_mode > 0 && mref1->ref_total_size && mref1->ref_total_size > brick->copy_end) { + brick->copy_end = mref1->ref_total_size; + } + if (mref2) { // do the verify int len = mref1->ref_len; if (len == mref2->ref_len && !memcmp(mref1->ref_data, mref2->ref_data, len)) { @@ -359,10 +362,11 @@ void _run_copy(struct copy_brick *brick) max = MAX_COPY_PARA - atomic_read(&brick->io_flight) * 2; MARS_IO("max = %d\n", max); - for (pos = brick->copy_start; pos < brick->copy_end; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) { + for (pos = brick->copy_start; pos < brick->copy_end || brick->append_mode > 1; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) { //MARS_IO("pos = %lld\n", pos); - if (brick->clash || max-- <= 0) + if (brick->clash || max-- <= 0 || kthread_should_stop()) { break; + } status = _next_state(brick, pos); } } diff --git a/mars_copy.h b/mars_copy.h index 643e4744..0d3319c2 100644 --- a/mars_copy.h +++ b/mars_copy.h @@ -33,10 +33,12 @@ struct copy_brick { // parameters volatile loff_t copy_start; volatile loff_t copy_end; // stop working if == 0 - loff_t copy_last; + int io_prio; + int append_mode; // 1 = passively, 2 = actively bool verify_mode; - bool optimize_mode; - bool permanent_update; // NYI + bool utilize_mode; // utilize already copied data + // readonly from outside + loff_t copy_last; bool low_dirty; // internal volatile bool trigger; diff --git a/mars_light.c b/mars_light.c index 1d687ee7..86c417cf 100644 --- a/mars_light.c +++ b/mars_light.c @@ -60,7 +60,7 @@ struct light_class { //#define TRANS_FAKE -#define CONF_TRANS_BATCHLEN 32 +#define CONF_TRANS_BATCHLEN 1024 //#define CONF_TRANS_FLYING 4 #define CONF_TRANS_FLYING 128 #define CONF_TRANS_PRIO MARS_PRIO_HIGH @@ -68,6 +68,8 @@ struct light_class { //#define CONF_TRANS_LOG_READS true #define CONF_TRANS_MINIMIZE_LATENCY false //#define CONF_TRANS_MINIMIZE_LATENCY true +//#define CONF_TRANS_COMPLETION_SEMANTICS 2 +#define CONF_TRANS_COMPLETION_SEMANTICS 0 //#define CONF_ALL_BATCHLEN 2 #define CONF_ALL_BATCHLEN 1 @@ -94,6 +96,9 @@ struct light_class { #define AIO_READAHEAD 1 #define AIO_WAIT_DURING_FDSYNC false +#define COPY_APPEND_MODE 1 +#define COPY_PRIO MARS_PRIO_LOW + static void _set_trans_params(struct mars_brick *_brick, void *private) { @@ -136,7 +141,9 @@ void _set_trans_params(struct mars_brick *_brick, void *private) trans_brick->q_phase2.q_ordering = true; trans_brick->q_phase4.q_ordering = true; + trans_brick->log_reads = CONF_TRANS_LOG_READS; + trans_brick->completion_semantics = CONF_TRANS_COMPLETION_SEMANTICS; trans_brick->minimize_latency = CONF_TRANS_MINIMIZE_LATENCY; #ifdef TRANS_FAKE trans_brick->debug_shortcut = true; @@ -219,6 +226,19 @@ void _set_if_params(struct mars_brick *_brick, void *private) MARS_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path); } +static +void _set_copy_params(struct mars_brick *_brick, void *private) +{ + struct copy_brick *copy_brick = (void*)_brick; + if (_brick->type != (void*)©_brick_type) { + MARS_ERR("bad brick type\n"); + return; + } + copy_brick->append_mode = COPY_APPEND_MODE; + copy_brick->io_prio = COPY_PRIO; + MARS_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path); +} + /////////////////////////////////////////////////////////////////////// // internal helpers @@ -336,7 +356,7 @@ int __make_copy( copy = make_brick_all(global, belongs, - NULL, + _set_copy_params, NULL, 0, fullpath[1], @@ -469,7 +489,7 @@ int _update_file(struct mars_global *global, const char *switch_path, const char MARS_DBG("src = '%s' dst = '%s'\n", tmp, file); status = __make_copy(global, NULL, switch_path, copy_path, NULL, argv, -1, ©); - if (status >= 0 && copy && !copy->permanent_update) { + if (status >= 0 && copy && !copy->append_mode) { if (end_pos > copy->copy_end) { MARS_DBG("appending to '%s' %lld => %lld\n", copy_path, copy->copy_end, end_pos); copy->copy_end = end_pos; diff --git a/mars_server.c b/mars_server.c index 717536af..a4b4ad30 100644 --- a/mars_server.c +++ b/mars_server.c @@ -40,45 +40,120 @@ static int server_worker(struct mars_global *global, struct mars_dent *dent, boo return 0; } -static void server_endio(struct generic_callback *cb) +static +int cb_thread(void *data) +{ + struct server_brick *brick = data; + int status = -EINVAL; + + brick->cb_running = true; + wake_up_interruptible(&brick->startup_event); + + MARS_DBG("--------------- cb_thread starting on socket %p\n", brick->handler_socket); + + while (!kthread_should_stop()) { + struct server_mref_aspect *mref_a; + struct mref_object *mref; + struct list_head *tmp; + struct socket **sock; + unsigned long flags; + + status = -EINVAL; + if (!brick->handler_socket) + break; + + wait_event_interruptible_timeout( + brick->cb_event, + !list_empty(&brick->cb_read_list) || + !list_empty(&brick->cb_write_list) || + kthread_should_stop(), + 3 * HZ); + + if (!brick->handler_socket) + break; + + traced_lock(&brick->cb_lock, flags); + tmp = brick->cb_write_list.next; + if (tmp == &brick->cb_write_list) { + tmp = brick->cb_read_list.next; + if (tmp == &brick->cb_read_list) { + traced_unlock(&brick->cb_lock, flags); + continue; + } + } + list_del_init(tmp); + traced_unlock(&brick->cb_lock, flags); + + mref_a = container_of(tmp, struct server_mref_aspect, cb_head); + mref = mref_a->object; + CHECK_PTR(mref, err); + status = -ENOTSOCK; + sock = mref_a->sock; + CHECK_PTR(sock, err); + CHECK_PTR(*sock, err); + + down(&brick->socket_sem); + status = mars_send_cb(sock, mref); + up(&brick->socket_sem); + + atomic_dec(&brick->in_flight); + GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref); + + if (unlikely(status < 0)) { + MARS_ERR("cannot send response, status = %d\n", status); + kernel_sock_shutdown(*sock, SHUT_WR); + break; + } + } + +err: + brick->cb_running = false; + MARS_DBG("---------- cb_thread terminating, status = %d\n", status); + wake_up_interruptible(&brick->startup_event); + return status; +} + +static +void server_endio(struct generic_callback *cb) { struct server_mref_aspect *mref_a; struct mref_object *mref; struct server_brick *brick; - struct socket **sock; - int status; + int rw; + unsigned long flags; mref_a = cb->cb_private; CHECK_PTR(mref_a, err); - mref = mref_a->object; - CHECK_PTR(mref, err); brick = mref_a->brick; CHECK_PTR(brick, err); - sock = mref_a->sock; - CHECK_PTR(sock, err); - CHECK_PTR(*sock, err); + mref = mref_a->object; + CHECK_PTR(mref, err); - down(&brick->socket_sem); - status = mars_send_cb(sock, mref); - up(&brick->socket_sem); + rw = mref->ref_rw; - if (status < 0) { - MARS_ERR("cannot send response, status = %d\n", status); - kernel_sock_shutdown(*sock, SHUT_WR); + traced_lock(&brick->cb_lock, flags); + if (rw) { + list_add_tail(&mref_a->cb_head, &brick->cb_write_list); + } else { + list_add_tail(&mref_a->cb_head, &brick->cb_read_list); } - atomic_dec(&brick->in_flight); + traced_unlock(&brick->cb_lock, flags); + + wake_up_interruptible(&brick->cb_event); return; err: MARS_FAT("cannot handle callback - giving up\n"); } - int server_io(struct server_brick *brick, struct socket **sock) { struct mref_object *mref; struct server_mref_aspect *mref_a; - int status; - + int status = -ENOTRECOVERABLE; + + if (!brick->cb_running) + goto done; + mref = server_alloc_mref(&brick->hidden_output, &brick->mref_object_layout); status = -ENOMEM; if (!mref) @@ -108,25 +183,47 @@ int server_io(struct server_brick *brick, struct socket **sock) MARS_INF("mref_get execution error = %d\n", status); mref->_ref_cb.cb_error = status; server_endio(&mref->_ref_cb); - mars_free_mref(mref); status = 0; // continue serving requests goto done; } GENERIC_INPUT_CALL(brick->inputs[0], mref_io, mref); - GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref); done: return status; } -static int handler_thread(void *data) +static +void _clean_list(struct server_brick *brick, struct list_head *start) +{ + for (;;) { + struct server_mref_aspect *mref_a; + struct mref_object *mref; + struct list_head *tmp = start->next; + if (tmp == start) + break; + + list_del_init(tmp); + + mref_a = container_of(tmp, struct server_mref_aspect, cb_head); + mref = mref_a->object; + if (!mref) + continue; + + GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref); + } +} + +static +int handler_thread(void *data) { struct server_brick *brick = data; struct socket **sock = &brick->handler_socket; + struct task_struct *cb_thread = brick->cb_thread; int max_round = 300; int status = 0; + brick->cb_thread = NULL; brick->handler_thread = NULL; wake_up_interruptible(&brick->startup_event); MARS_DBG("--------------- handler_thread starting on socket %p\n", *sock); @@ -135,7 +232,7 @@ static int handler_thread(void *data) //fake_mm(); - while (!kthread_should_stop()) { + while (brick->cb_running && !kthread_should_stop()) { struct mars_cmd cmd = {}; status = mars_recv_struct(sock, &cmd, mars_cmd_meta); @@ -209,19 +306,19 @@ static int handler_thread(void *data) CHECK_PTR(_bio_brick_type, err); //prev = mars_find_brick(mars_global, NULL, cmd.cmd_str1); - prev = - make_brick_all(mars_global, - NULL, - NULL, - NULL, - 10 * HZ, - path, - (const struct generic_brick_type*)_bio_brick_type, - (const struct generic_brick_type*[]){}, - NULL, - path, - (const char *[]){}, - 0); + prev = make_brick_all( + mars_global, + NULL, + NULL, + NULL, + 10 * HZ, + path, + (const struct generic_brick_type*)_bio_brick_type, + (const struct generic_brick_type*[]){}, + NULL, + path, + (const char *[]){}, + 0); if (likely(prev)) { status = generic_connect((void*)brick->inputs[0], (void*)prev->outputs[0]); } else { @@ -252,6 +349,15 @@ static int handler_thread(void *data) done: MARS_DBG("handler_thread terminating, status = %d\n", status); + kthread_stop(cb_thread); + wait_event_interruptible_timeout( + brick->startup_event, + !brick->cb_running, + 10 * HZ); + + _clean_list(brick, &brick->cb_read_list); + _clean_list(brick, &brick->cb_write_list); + do { int status = mars_kill_brick((void*)brick); if (status >= 0) { @@ -264,7 +370,7 @@ done: break; } msleep(1000); - } while (!brick->power.led_off); + } while (brick->cb_running && !brick->power.led_off); MARS_DBG("done\n"); return 0; @@ -316,14 +422,14 @@ static int server_switch(struct server_brick *brick) static int server_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data) { struct server_mref_aspect *ini = (void*)_ini; - (void)ini; + INIT_LIST_HEAD(&ini->cb_head); return 0; } static void server_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data) { struct server_mref_aspect *ini = (void*)_ini; - (void)ini; + CHECK_HEAD_EMPTY(&ini->cb_head); } MARS_MAKE_STATICS(server); @@ -335,7 +441,11 @@ static int server_brick_construct(struct server_brick *brick) struct server_output *hidden = &brick->hidden_output; _server_output_init(brick, hidden, "internal"); init_waitqueue_head(&brick->startup_event); + init_waitqueue_head(&brick->cb_event); sema_init(&brick->socket_sem, 1); + spin_lock_init(&brick->cb_lock); + INIT_LIST_HEAD(&brick->cb_read_list); + INIT_LIST_HEAD(&brick->cb_write_list); return 0; } @@ -448,15 +558,25 @@ static int _server_thread(void *data) goto err; } + brick->handler_socket = new_socket; + + thread = kthread_create(cb_thread, brick, "mars_cb%d", version); + if (IS_ERR(thread)) { + MARS_ERR("cannot create cb thread, status = %ld\n", PTR_ERR(thread)); + goto err; + } + brick->cb_thread = thread; + wake_up_process(thread); + thread = kthread_create(handler_thread, brick, "mars_handler%d", version++); if (IS_ERR(thread)) { - MARS_ERR("cannot create thread, status = %ld\n", PTR_ERR(thread)); + MARS_ERR("cannot create handler thread, status = %ld\n", PTR_ERR(thread)); goto err; } brick->handler_thread = thread; - brick->handler_socket = new_socket; wake_up_process(thread); - wait_event_interruptible(brick->startup_event, brick->handler_thread == NULL); + + wait_event_interruptible(brick->startup_event, brick->handler_thread == NULL && brick->cb_thread == NULL); continue; err: diff --git a/mars_server.h b/mars_server.h index 942beab9..d4ef3933 100644 --- a/mars_server.h +++ b/mars_server.h @@ -14,6 +14,7 @@ struct server_mref_aspect { GENERIC_ASPECT(mref); struct server_brick *brick; struct socket **sock; + struct list_head cb_head; }; struct server_output { @@ -26,9 +27,15 @@ struct server_brick { struct socket *handler_socket; struct semaphore socket_sem; struct task_struct *handler_thread; + struct task_struct *cb_thread; wait_queue_head_t startup_event; + wait_queue_head_t cb_event; struct generic_object_layout mref_object_layout; struct server_output hidden_output; + spinlock_t cb_lock; + struct list_head cb_read_list; + struct list_head cb_write_list; + bool cb_running; }; struct server_input { diff --git a/mars_trans_logger.c b/mars_trans_logger.c index da271a88..8306d9da 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -750,7 +750,7 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object atomic_inc(&brick->inner_balance_count); qq_mref_insert(&brick->q_phase1, mref_a); - //wake_up_interruptible(&brick->event); + wake_up_interruptible(&brick->event); return; } @@ -1228,14 +1228,73 @@ void put_list(struct writeback_info *wb, struct list_head *start) /********************************************************************* * Phase 1: write transaction log entry for the original write request. */ + static noinline -void phase1_endio(void *private, int error) +void _complete(struct trans_logger_brick *brick, struct trans_logger_mref_aspect *orig_mref_a, int error, bool pre_io) +{ + struct mref_object *orig_mref; + struct generic_callback *orig_cb; + + orig_mref = orig_mref_a->object; + CHECK_PTR(orig_mref, err); + + if (orig_mref_a->is_completed || + (pre_io && + (brick->completion_semantics >= 2 || + (brick->completion_semantics >= 1 && !orig_mref->ref_skip_sync)))) { + goto done; + } + + orig_cb = orig_mref->ref_cb; + CHECK_PTR(orig_cb, err); + CHECK_PTR(orig_cb->cb_fn, err); + + if (unlikely(error < 0)) { + orig_cb->cb_error = error; + } + if (likely(orig_cb->cb_error >= 0)) { + orig_mref->ref_flags &= ~MREF_WRITING; + orig_mref->ref_flags |= MREF_UPTODATE; + } + + orig_mref_a->is_completed = true; + orig_cb->cb_fn(orig_cb); + +done: + return; + +err: + MARS_ERR("giving up...\n"); +} + +static noinline +void phase1_preio(void *private) +{ + struct trans_logger_mref_aspect *orig_mref_a; + struct trans_logger_output *output; + struct trans_logger_brick *brick; + + orig_mref_a = private; + CHECK_PTR(orig_mref_a, err); + output = orig_mref_a->my_output; + CHECK_PTR(output, err); + brick = output->brick; + CHECK_PTR(brick, err); + + // signal completion to the upper layer + // FIXME: immediate error signalling is impossible here, but some delayed signalling should be possible as a workaround. Think! + _complete(brick, orig_mref_a, 0, true); + return; +err: + MARS_ERR("giving up...\n"); +} + +static noinline +void phase1_endio(void *private, int error) { struct trans_logger_mref_aspect *orig_mref_a; - struct mref_object *orig_mref; struct trans_logger_output *output; struct trans_logger_brick *brick; - struct generic_callback *orig_cb; orig_mref_a = private; CHECK_PTR(orig_mref_a, err); @@ -1243,30 +1302,15 @@ void phase1_endio(void *private, int error) CHECK_PTR(output, err); brick = output->brick; CHECK_PTR(brick, err); - orig_mref = orig_mref_a->object; - CHECK_PTR(orig_mref, err); - orig_cb = orig_mref->ref_cb; - CHECK_PTR(orig_cb, err); qq_dec_flying(&brick->q_phase1); - // error handling - if (error < 0) { - orig_cb->cb_error = error; - } - - // signal completion to the upper layer, as early as possible - if (likely(orig_cb->cb_error >= 0)) { - orig_mref->ref_flags &= ~MREF_WRITING; - orig_mref->ref_flags |= MREF_UPTODATE; - } - - CHECK_PTR(orig_cb->cb_fn, err); - orig_cb->cb_fn(orig_cb); + // signal completion to the upper layer + _complete(brick, orig_mref_a, error, false); // queue up for the next phase qq_mref_insert(&brick->q_phase2, orig_mref_a); - //wake_up_interruptible(&brick->event); + wake_up_interruptible(&brick->event); return; err: MARS_ERR("giving up...\n"); @@ -1308,7 +1352,7 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a) memcpy(data, orig_mref_a->shadow_data, orig_mref->ref_len); - ok = log_finalize(logst, orig_mref->ref_len, phase1_endio, orig_mref_a); + ok = log_finalize(logst, orig_mref->ref_len, phase1_preio, phase1_endio, orig_mref_a); if (unlikely(!ok)) { goto err; } @@ -1455,7 +1499,7 @@ void phase2_endio(struct generic_callback *cb) // queue up for the next phase qq_wb_insert(&brick->q_phase3, wb); - //wake_up_interruptible(&brick->event); + wake_up_interruptible(&brick->event); return; err: @@ -1517,7 +1561,7 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) } else { // shortcut #ifdef LATER qq_wb_insert(&brick->q_phase4, wb); - //wake_up_interruptible(&brick->event); + wake_up_interruptible(&brick->event); #else return phase4_startio(wb); #endif @@ -1543,7 +1587,7 @@ void _phase3_endio(struct writeback_info *wb) // queue up for the next phase qq_wb_insert(&brick->q_phase4, wb); - //wake_up_interruptible(&brick->event); + wake_up_interruptible(&brick->event); return; } @@ -1624,7 +1668,7 @@ bool _phase3_startio(struct trans_logger_mref_aspect *sub_mref_a) memcpy(data, sub_mref->ref_data, sub_mref->ref_len); - ok = log_finalize(logst, sub_mref->ref_len, phase3_endio, sub_mref_a); + ok = log_finalize(logst, sub_mref->ref_len, NULL, phase3_endio, sub_mref_a); if (unlikely(!ok)) { goto err; } @@ -1996,6 +2040,9 @@ void trans_logger_log(struct trans_logger_output *output) (brick->minimize_latency || (long long)jiffies - old_jiffies >= old_wait_timeout)) { do_flush = true; } +#if 1 + do_flush = true; +#endif if (do_flush && (bw_logst->count > 0 || bw_logst->count > 0)) { atomic_inc(&brick->total_flush_count); log_flush(fw_logst); diff --git a/mars_trans_logger.h b/mars_trans_logger.h index 8e3146eb..d9226efa 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -96,6 +96,7 @@ struct trans_logger_mref_aspect { bool is_hashed; bool is_dirty; bool is_collected; + bool is_completed; struct timespec stamp; loff_t log_pos; struct generic_callback cb; @@ -113,6 +114,7 @@ struct trans_logger_brick { int align_size; // alignment between requests int chunk_size; // must be at least 8K (better 64k) int flush_delay; // delayed firing of incomplete chunks + int completion_semantics; // 0 = early completion of all writes, 1 = early completion of non-sync, 2 = late completion bool do_replay; // mode of operation bool do_continuous_replay; // mode of operation bool log_reads; // additionally log pre-images