From ceb8529521579a75ed4e586664d347d2a180d03b Mon Sep 17 00:00:00 2001 From: Thomas Schoebel-Theuer Date: Thu, 10 Mar 2011 12:40:06 +0100 Subject: [PATCH] import mars-76.tgz --- Makefile | 2 +- log_format.c | 181 ++++++++++++++++++++++++++++++++++++++++++++ log_format.h | 177 ++++++------------------------------------- mars.h | 6 +- mars_aio.c | 149 ++++++++++++++++-------------------- mars_aio.h | 5 +- mars_copy.c | 1 + mars_generic.c | 1 + mars_light.c | 44 +++++++++++ mars_trans_logger.c | 139 ++++++++++++++++++++++------------ mars_trans_logger.h | 9 ++- 11 files changed, 424 insertions(+), 290 deletions(-) create mode 100644 log_format.c diff --git a/Makefile b/Makefile index a9a2792a..1cc344a3 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ # Makefile for MARS # -obj-$(CONFIG_MARS) += brick.o mars_generic.o mars_net.o mars_proc.o +obj-$(CONFIG_MARS) += brick.o log_format.o mars_generic.o mars_net.o mars_proc.o obj-$(CONFIG_MARS_DUMMY) += mars_dummy.o obj-$(CONFIG_MARS_CHECK) += mars_check.o obj-$(CONFIG_MARS_IF) += mars_if.o diff --git a/log_format.c b/log_format.c new file mode 100644 index 00000000..5da816d5 --- /dev/null +++ b/log_format.c @@ -0,0 +1,181 @@ +// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG + +#include +#include +#include + +#include "log_format.h" + +void init_logst(struct log_status *logst, struct mars_input *input, struct mars_output *output) +{ + memset(logst, sizeof(*logst), 0); + logst->input = input; + logst->output = output; +} +EXPORT_SYMBOL_GPL(init_logst); + +void log_skip(struct log_status *logst) +{ + int bits; + if (!logst->info.transfer_size) { + int status = GENERIC_INPUT_CALL(logst->input, mars_get_info, &logst->info); + if (status < 0) { + MARS_FAT("cannot get transfer log info (code=%d)\n", status); + } + } + bits = logst->info.transfer_order + PAGE_SHIFT; + logst->log_pos = ((logst->log_pos >> bits) + 1) << bits; +} +EXPORT_SYMBOL_GPL(log_skip); + +void *log_reserve(struct log_status *logst, struct log_header *lh) +{ + struct mref_object *mref; + void *data; + int total_len; + int status; + int offset; + + MARS_DBG("reserving %d bytes at %lld\n", lh->l_len, logst->log_pos); + + if (unlikely(logst->log_mref)) { + MARS_ERR("mref already existing\n"); + goto err; + } + + mref = mars_alloc_mref(logst->output, &logst->ref_object_layout); + if (unlikely(!mref)) + goto err; + + mref->ref_pos = logst->log_pos; + total_len = lh->l_len + OVERHEAD; + mref->ref_len = total_len; + mref->ref_may_write = WRITE; +#if 1 + mref->ref_prio = MARS_PRIO_LOW; +#endif + + status = GENERIC_INPUT_CALL(logst->input, mref_get, mref); + if (unlikely(status < 0)) { + goto err_free; + } + if (unlikely(mref->ref_len < total_len)) { + goto put; + } + + logst->log_mref = mref; + data = mref->ref_data; + offset = 0; + DATA_PUT(data, offset, START_MAGIC); + DATA_PUT(data, offset, (char)FORMAT_VERSION); + logst->validflag_offset = offset; + DATA_PUT(data, offset, (char)0); // valid_flag + DATA_PUT(data, offset, (short)0); // spare + DATA_PUT(data, offset, total_len); // start of next header + DATA_PUT(data, offset, lh->l_stamp.tv_sec); + DATA_PUT(data, offset, lh->l_stamp.tv_nsec); + DATA_PUT(data, offset, lh->l_pos); + logst->reallen_offset = offset; + DATA_PUT(data, offset, lh->l_len); + DATA_PUT(data, offset, lh->l_code); + + logst->payload_offset = offset; + logst->payload_len = lh->l_len; + + return data + offset; + +put: + GENERIC_INPUT_CALL(logst->input, mref_put, mref); + return NULL; + +err_free: + mars_free_mref(mref); +err: + return NULL; +} +EXPORT_SYMBOL_GPL(log_reserve); + +bool log_finalize(struct log_status *logst, int len, void (*endio)(struct generic_callback *cb), void *private) +{ + struct mref_object *mref = logst->log_mref; + struct generic_callback *cb; + struct timespec now; + void *data; + int offset; + bool ok = false; + + CHECK_PTR(mref, err); + + logst->log_mref = NULL; + if (unlikely(len > logst->payload_len)) { + MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->payload_len); + goto put; + } + + data = mref->ref_data; + + /* Correct the length in the header. + */ + offset = logst->reallen_offset; + DATA_PUT(data, offset, len); + + /* Write the trailer. + */ + offset = logst->payload_offset + len; + DATA_PUT(data, offset, END_MAGIC); + DATA_PUT(data, offset, (char)1); // valid_flag copy + DATA_PUT(data, offset, (char)0); // spare + DATA_PUT(data, offset, (short)0); // spare + DATA_PUT(data, offset, (int)0); // spare + get_lamport(&now); // when the log entry was ready. + DATA_PUT(data, offset, now.tv_sec); + DATA_PUT(data, offset, now.tv_nsec); + + logst->log_pos += offset; + + /* This must come last. In case of incomplete + * or even operlapping disk transfers, this indicates + * the completeness / integrity of the payload at + * the time of starting the transfer. + */ + offset = logst->validflag_offset; + DATA_PUT(data, offset, (char)1); + + cb = &mref->_ref_cb; + cb->cb_fn = endio; + cb->cb_error = 0; + cb->cb_prev = NULL; + cb->cb_private = private; + mref->ref_cb = cb; + mref->ref_rw = 1; + + GENERIC_INPUT_CALL(logst->input, mref_io, mref); + + ok = true; +put: + GENERIC_INPUT_CALL(logst->input, mref_put, mref); + +err: + return ok; +} +EXPORT_SYMBOL_GPL(log_finalize); + +////////////////// module init stuff ///////////////////////// + +static int __init init_log_format(void) +{ + MARS_INF("init_log_format()\n"); + return 0; +} + +static void __exit exit_log_format(void) +{ + MARS_INF("exit_log_format()\n"); +} + +MODULE_DESCRIPTION("MARS log_format infrastucture"); +MODULE_AUTHOR("Thomas Schoebel-Theuer "); +MODULE_LICENSE("GPL"); + +module_init(init_log_format); +module_exit(exit_log_format); diff --git a/log_format.h b/log_format.h index 97df4bd1..1e7d0dd7 100644 --- a/log_format.h +++ b/log_format.h @@ -19,29 +19,15 @@ * by old code (of course, not all information / features will be * available then). */ -struct log_header { +#define log_header log_header_v1 + +struct log_header_v1 { struct timespec l_stamp; loff_t l_pos; int l_len; int l_code; }; -/* Bookkeeping status between calls - */ -struct log_status { - struct mars_input *input; - struct mars_output hidden_output; - struct generic_object_layout ref_object_layout; - - struct mars_info info; - loff_t log_pos; - int validflag_offset; - int reallen_offset; - int payload_offset; - int payload_len; - struct mref_object *log_mref; -}; - #define FORMAT_VERSION 1 // version of disk format, currently there is no other one #define CODE_UNKNOWN 0 @@ -79,148 +65,33 @@ struct log_status { offset += sizeof(val); \ } while (0) -static inline -void log_skip(struct log_status *logst) -{ - int bits; - if (!logst->info.transfer_size) { - int status = GENERIC_INPUT_CALL(logst->input, mars_get_info, &logst->info); - if (status < 0) { - MARS_FAT("cannot get transfer log info (code=%d)\n", status); - } - } - bits = logst->info.transfer_order + PAGE_SHIFT; - logst->log_pos = ((logst->log_pos >> bits) + 1) << bits; -} +//////////////////////////////////////////////////////////////////////////// -static inline -void *log_reserve(struct log_status *logst, struct log_header *l) -{ - struct mref_object *mref; - void *data; - int total_len; - int status; - int offset; +#ifdef __KERNEL__ - MARS_DBG("reserving %d bytes at %lld\n", l->l_len, logst->log_pos); +/* Bookkeeping status between calls + */ +struct log_status { + struct mars_input *input; + struct mars_output *output; + struct generic_object_layout ref_object_layout; - if (unlikely(logst->log_mref)) { - MARS_ERR("mref already existing\n"); - goto err; - } + struct mars_info info; + loff_t log_pos; + int validflag_offset; + int reallen_offset; + int payload_offset; + int payload_len; + struct mref_object *log_mref; +}; - mref = mars_alloc_mref(&logst->hidden_output, &logst->ref_object_layout); - if (unlikely(!mref)) - goto err; +void init_logst(struct log_status *logst, struct mars_input *input, struct mars_output *output); - mref->ref_pos = logst->log_pos; - total_len = l->l_len + OVERHEAD; - mref->ref_len = total_len; - mref->ref_may_write = WRITE; +void log_skip(struct log_status *logst); - status = GENERIC_INPUT_CALL(logst->input, mref_get, mref); - if (unlikely(status < 0)) { - goto err_free; - } - if (unlikely(mref->ref_len < total_len)) { - goto put; - } - - logst->log_mref = mref; - data = mref->ref_data; - offset = 0; - DATA_PUT(data, offset, START_MAGIC); - DATA_PUT(data, offset, (char)FORMAT_VERSION); - logst->validflag_offset = offset; - DATA_PUT(data, offset, (char)0); // valid_flag - DATA_PUT(data, offset, (short)0); // spare - DATA_PUT(data, offset, total_len); // start of next header - DATA_PUT(data, offset, l->l_stamp.tv_sec); - DATA_PUT(data, offset, l->l_stamp.tv_nsec); - DATA_PUT(data, offset, l->l_pos); - logst->reallen_offset = offset; - DATA_PUT(data, offset, l->l_len); - DATA_PUT(data, offset, l->l_code); - - logst->payload_offset = offset; - logst->payload_len = l->l_len; - - return data + offset; - -put: - GENERIC_INPUT_CALL(logst->input, mref_put, mref); - return NULL; - -err_free: - mars_free_mref(mref); -err: - return NULL; -} - -static inline -bool log_finalize(struct log_status *logst, int len, void (*endio)(struct generic_callback *cb), void *private) -{ - struct mref_object *mref = logst->log_mref; - struct generic_callback *cb; - struct timespec now; - void *data; - int offset; - bool ok = false; - - CHECK_PTR(mref, err); - - logst->log_mref = NULL; - if (unlikely(len > logst->payload_len)) { - MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->payload_len); - goto put; - } - - data = mref->ref_data; - - /* Correct the length in the header. - */ - offset = logst->reallen_offset; - DATA_PUT(data, offset, len); - - /* Write the trailer. - */ - offset = logst->payload_offset + len; - DATA_PUT(data, offset, END_MAGIC); - DATA_PUT(data, offset, (char)1); // valid_flag copy - DATA_PUT(data, offset, (char)0); // spare - DATA_PUT(data, offset, (short)0); // spare - DATA_PUT(data, offset, (int)0); // spare - get_lamport(&now); // when the log entry was ready. - DATA_PUT(data, offset, now.tv_sec); - DATA_PUT(data, offset, now.tv_nsec); - - logst->log_pos += offset; - - /* This must come last. In case of incomplete - * or even operlapping disk transfers, this indicates - * the completeness / integrity of the payload at - * the time of starting the transfer. - */ - offset = logst->validflag_offset; - DATA_PUT(data, offset, (char)1); - - cb = &mref->_ref_cb; - cb->cb_fn = endio; - cb->cb_error = 0; - cb->cb_prev = NULL; - cb->cb_private = private; - mref->ref_cb = cb; - mref->ref_rw = 1; - - GENERIC_INPUT_CALL(logst->input, mref_io, mref); - - ok = true; -put: - GENERIC_INPUT_CALL(logst->input, mref_put, mref); - -err: - return ok; -} +void *log_reserve(struct log_status *logst, struct log_header *lh); +bool log_finalize(struct log_status *logst, int len, void (*endio)(struct generic_callback *cb), void *private); #endif +#endif diff --git a/mars.h b/mars.h index 9c52d998..c1c5f848 100644 --- a/mars.h +++ b/mars.h @@ -48,6 +48,9 @@ // MARS-specific definitions +#define MARS_PRIO_HIGH 0 +#define MARS_PRIO_LOW 1 + // object stuff /* mref */ @@ -73,10 +76,11 @@ struct mref_object_layout { #define MREF_OBJECT(PREFIX) \ GENERIC_OBJECT(PREFIX); \ /* supplied by caller */ \ + void *ref_data; /* preset to NULL for buffered IO */ \ loff_t ref_pos; \ int ref_len; \ int ref_may_write; \ - void *ref_data; /* preset to NULL for buffered IO */ \ + int ref_prio; \ int ref_timeout; \ /* maintained by the ref implementation, readable for callers */ \ int ref_flags; \ diff --git a/mars_aio.c b/mars_aio.c index d03fd687..5a6ddd18 100644 --- a/mars_aio.c +++ b/mars_aio.c @@ -28,68 +28,52 @@ ////////////////// some helpers ////////////////// -static -void _queue(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a) +static inline +void _enqueue(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a, int prio, bool at_end) { unsigned long flags; + if (prio > MARS_PRIO_LOW) + prio = MARS_PRIO_LOW; + if (prio < MARS_PRIO_HIGH) + prio = MARS_PRIO_HIGH; traced_lock(&tinfo->lock, flags); - list_add_tail(&mref_a->io_head, &tinfo->mref_list); - - traced_unlock(&tinfo->lock, flags); -} - -static -void _delay(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a, int delta_jiffies) -{ - long long timeout = (long long)jiffies + delta_jiffies; - unsigned long flags; - - mref_a->timeout = timeout; - - traced_lock(&tinfo->lock, flags); - - list_add_tail(&mref_a->io_head, &tinfo->delay_list); - - traced_unlock(&tinfo->lock, flags); -} - -static -struct aio_mref_aspect *__get_delayed(struct aio_threadinfo *tinfo, bool remove) -{ - struct list_head *tmp; - struct aio_mref_aspect *mref_a; - - if (list_empty(&tinfo->delay_list)) - return NULL; - - tmp = tinfo->delay_list.next; - mref_a = container_of(tmp, struct aio_mref_aspect, io_head); - if (mref_a->timeout < (long long)jiffies) { - mref_a = NULL; - } else if (remove) { - list_del_init(tmp); + if (at_end) { + list_add_tail(&mref_a->io_head, &tinfo->mref_list[prio]); + } else { + list_add(&mref_a->io_head, &tinfo->mref_list[prio]); } - return mref_a; -} - -static -struct aio_mref_aspect *_get_delayed(struct aio_threadinfo *tinfo, bool remove) -{ - struct aio_mref_aspect *mref_a; - unsigned long flags; - - traced_lock(&tinfo->lock, flags); - - mref_a = __get_delayed(tinfo, remove); - traced_unlock(&tinfo->lock, flags); - - return mref_a; } +static inline +struct aio_mref_aspect *_dequeue(struct aio_threadinfo *tinfo, bool do_remove) +{ + struct aio_mref_aspect *mref_a = NULL; + int prio; + unsigned long flags = 0; + + if (do_remove) + traced_lock(&tinfo->lock, flags); + + for (prio = MARS_PRIO_HIGH; prio <= MARS_PRIO_LOW; prio++) { + struct list_head *tmp = tinfo->mref_list[prio].next; + if (tmp != &tinfo->mref_list[prio]) { + if (do_remove) { + list_del_init(tmp); + } + mref_a = container_of(tmp, struct aio_mref_aspect, io_head); + goto done; + } + } + +done: + if (do_remove) + traced_unlock(&tinfo->lock, flags); + return mref_a; +} ////////////////// own brick / input / output operations ////////////////// @@ -184,7 +168,7 @@ static void aio_ref_io(struct aio_output *output, struct mref_object *mref) goto done; } - _queue(tinfo, mref_a); + _enqueue(tinfo, mref_a, mref->ref_prio, true); wake_up_interruptible(&tinfo->event); return; @@ -263,49 +247,40 @@ static int aio_submit_thread(void *data) return -ENOMEM; while (!kthread_should_stop()) { - struct list_head *tmp = NULL; struct aio_mref_aspect *mref_a; struct mref_object *mref; - unsigned long flags; int err; wait_event_interruptible_timeout( tinfo->event, - !list_empty(&tinfo->mref_list) || - _get_delayed(tinfo, false) || - kthread_should_stop(), + kthread_should_stop() || + _dequeue(tinfo, false), HZ); - traced_lock(&tinfo->lock, flags); - - if (!list_empty(&tinfo->mref_list)) { - tmp = tinfo->mref_list.next; - list_del_init(tmp); - mref_a = container_of(tmp, struct aio_mref_aspect, io_head); - } else { - mref_a = __get_delayed(tinfo, true); - } - - traced_unlock(&tinfo->lock, flags); - - if (!mref_a) + mref_a = _dequeue(tinfo, true); + if (!mref_a) { continue; + } // check for reads behind EOF mref = mref_a->object; if (!mref->ref_rw && mref->ref_pos + mref->ref_len > i_size_read(file->f_mapping->host)) { - if (!mref->ref_timeout || mref->ref_timeout < (long long)jiffies) { - _complete(output, mref, -ENODATA); + if (mref->ref_timeout > 0 && + ((!mref_a->start_jiffies && (mref_a->start_jiffies = jiffies, true)) || + mref_a->start_jiffies + mref->ref_timeout >= (long long)jiffies)) { + msleep(50); + _enqueue(tinfo, mref_a, mref->ref_prio, true); continue; } - _delay(tinfo, mref_a, HZ/2); + _complete(output, mref, -ENODATA); continue; } err = aio_submit(output, mref_a, false); if (err == -EAGAIN) { - _delay(tinfo, mref_a, (HZ/100)+1); + _enqueue(tinfo, mref_a, mref->ref_prio, false); + msleep(20); continue; } if (unlikely(err < 0)) { @@ -387,10 +362,11 @@ static int aio_event_thread(void *data) if (output->o_fdsync && err >= 0 - && mref->ref_rw != 0 + && mref->ref_rw != READ && !mref_a->resubmit++) { + // workaround for non-implemented AIO FSYNC operation if (!output->filp->f_op->aio_fsync) { - _queue(other, mref_a); + _enqueue(other, mref_a, mref->ref_prio, true); bounced++; continue; } @@ -430,17 +406,22 @@ static int aio_sync_thread(void *data) while (!kthread_should_stop()) { LIST_HEAD(tmp_list); unsigned long flags; + int i; int err; wait_event_interruptible_timeout( tinfo->event, - !list_empty(&tinfo->mref_list) || kthread_should_stop(), + kthread_should_stop() || + _dequeue(tinfo, false), HZ); traced_lock(&tinfo->lock, flags); - if (!list_empty(&tinfo->mref_list)) { - // move over the whole list - list_replace_init(&tinfo->mref_list, &tmp_list); + for (i = MARS_PRIO_HIGH; i <= MARS_PRIO_LOW; i++) { + if (!list_empty(&tinfo->mref_list[i])) { + // move over the whole list + list_replace_init(&tinfo->mref_list[i], &tmp_list); + break; + } } traced_unlock(&tinfo->lock, flags); @@ -564,8 +545,10 @@ static int aio_switch(struct aio_brick *brick) aio_sync_thread, }; struct aio_threadinfo *tinfo = &output->tinfo[i]; - INIT_LIST_HEAD(&tinfo->mref_list); - INIT_LIST_HEAD(&tinfo->delay_list); + int j; + for (j = MARS_PRIO_HIGH; j <= MARS_PRIO_LOW; j++) { + INIT_LIST_HEAD(&tinfo->mref_list[j]); + } tinfo->output = output; spin_lock_init(&tinfo->lock); init_waitqueue_head(&tinfo->event); diff --git a/mars_aio.h b/mars_aio.h index 3f06f9a2..4cb89416 100644 --- a/mars_aio.h +++ b/mars_aio.h @@ -8,7 +8,7 @@ struct aio_mref_aspect { GENERIC_ASPECT(mref); struct list_head io_head; - long long timeout; + long long start_jiffies; int resubmit; bool do_dealloc; }; @@ -22,8 +22,7 @@ struct aio_input { }; struct aio_threadinfo { - struct list_head mref_list; - struct list_head delay_list; + struct list_head mref_list[MARS_PRIO_LOW+1]; struct aio_output *output; struct task_struct *thread; wait_queue_head_t event; diff --git a/mars_copy.c b/mars_copy.c index 862864d0..f1da1650 100644 --- a/mars_copy.c +++ b/mars_copy.c @@ -178,6 +178,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_ len = tmp_pos - pos; } mref->ref_len = len; + mref->ref_prio = MARS_PRIO_LOW; mref->_ref_cb.cb_private = mref_a; mref->_ref_cb.cb_fn = copy_endio; mref->ref_cb = &mref->_ref_cb; diff --git a/mars_generic.c b/mars_generic.c index 44eecaeb..8641260e 100644 --- a/mars_generic.c +++ b/mars_generic.c @@ -36,6 +36,7 @@ const struct meta mars_mref_meta[] = { META_INI(ref_pos, struct mref_object, FIELD_INT), META_INI(ref_len, struct mref_object, FIELD_INT), META_INI(ref_may_write, struct mref_object, FIELD_INT), + META_INI(ref_prio, struct mref_object, FIELD_INT), META_INI(ref_timeout, struct mref_object, FIELD_INT), META_INI(ref_flags, struct mref_object, FIELD_INT), META_INI(ref_rw, struct mref_object, FIELD_INT), diff --git a/mars_light.c b/mars_light.c index 845b0aec..e318b9d0 100644 --- a/mars_light.c +++ b/mars_light.c @@ -28,6 +28,7 @@ #include "mars_trans_logger.h" #include "mars_if.h" + static struct task_struct *main_thread = NULL; typedef int (*light_worker_fn)(void *buf, struct mars_dent *dent); @@ -43,6 +44,47 @@ struct light_class { light_worker_fn cl_backward; }; +/////////////////////////////////////////////////////////////////////// + +// TUNING + +//#define CONF_TRANS_LOGLEN 1000 +#define CONF_TRANS_LOGLEN 8 +#define CONF_TRANS_BATCHLEN 4 +#define CONF_TRANS_FLYING 4 +//#define CONF_TRANS_MAX_QUEUE 1000 +#define CONF_TRANS_MAX_QUEUE 5000 +#define CONF_TRANS_MAX_JIFFIES (30 * HZ) + +static +void _set_trans_params(struct trans_logger_brick *trans_brick) +{ + if (!trans_brick->outputs[0]->q_phase2.q_ordering) { + trans_brick->outputs[0]->q_phase1.q_batchlen = CONF_TRANS_LOGLEN; + trans_brick->outputs[0]->q_phase2.q_batchlen = CONF_TRANS_BATCHLEN; + trans_brick->outputs[0]->q_phase3.q_batchlen = CONF_TRANS_BATCHLEN; + trans_brick->outputs[0]->q_phase4.q_batchlen = CONF_TRANS_BATCHLEN; + + trans_brick->outputs[0]->q_phase2.q_max_queued = CONF_TRANS_MAX_QUEUE; + trans_brick->outputs[0]->q_phase4.q_max_queued = CONF_TRANS_MAX_QUEUE; + + trans_brick->outputs[0]->q_phase2.q_max_jiffies = CONF_TRANS_MAX_JIFFIES; + trans_brick->outputs[0]->q_phase4.q_max_jiffies = CONF_TRANS_MAX_JIFFIES; + + trans_brick->outputs[0]->q_phase2.q_max_flying = CONF_TRANS_FLYING; + trans_brick->outputs[0]->q_phase4.q_max_flying = CONF_TRANS_FLYING; + + trans_brick->outputs[0]->q_phase2.q_ordering = true; + trans_brick->outputs[0]->q_phase4.q_ordering = true; + trans_brick->log_reads = false; + if (!trans_brick->log_reads) { + trans_brick->outputs[0]->q_phase2.q_max_queued = 0; + trans_brick->outputs[0]->q_phase4.q_max_queued *= 2; + } + } +} + + /////////////////////////////////////////////////////////////////////// // internal helpers @@ -867,6 +909,8 @@ int make_log_init(void *buf, struct mars_dent *parent) */ rot->do_replay = true; + _set_trans_params((void*)trans_brick); + status = 0; done: diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 3dca7f69..99a48861 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -47,25 +47,33 @@ static inline void q_init(struct logger_queue *q) } static -bool q_is_ready(struct logger_queue *q) +bool q_is_ready(struct logger_queue *q, bool do_drain) { int queued = atomic_read(&q->q_queued); int flying; bool res = false; + if (queued <= 0) goto always_done; + res = true; - if (queued >= q->q_max_queued) + if (do_drain || queued >= q->q_max_queued) goto done; + if (q->q_max_jiffies > 0 && (long long)jiffies - q->q_last_action >= q->q_max_jiffies) goto done; + res = false; goto always_done; + done: + /* Limit the number of flying requests (parallelism) + */ flying = atomic_read(&q->q_flying); if (q->q_max_flying > 0 && flying >= q->q_max_flying) res = false; + always_done: return res; } @@ -167,11 +175,11 @@ static inline int hash_fn(loff_t base_index) return ((unsigned)tmp) % TRANS_HASH_MAX; } -static struct trans_logger_mref_aspect *hash_find(struct hash_anchor *table, loff_t pos, int len) +static struct trans_logger_mref_aspect *hash_find(struct trans_logger_output *output, loff_t pos, int len) { loff_t base_index = pos >> REGION_SIZE_BITS; int hash = hash_fn(base_index); - struct hash_anchor *start = &table[hash]; + struct hash_anchor *start = &output->hash_table[hash]; struct list_head *tmp; struct trans_logger_mref_aspect *res = NULL; struct trans_logger_mref_aspect *test_a; @@ -216,6 +224,7 @@ static struct trans_logger_mref_aspect *hash_find(struct hash_anchor *table, lof if (res) { atomic_inc(&res->object->ref_count); + atomic_inc(&output->inner_balance_count); } traced_readunlock(&start->hash_lock, flags); @@ -223,33 +232,36 @@ static struct trans_logger_mref_aspect *hash_find(struct hash_anchor *table, lof return res; } -static inline -void hash_insert(struct hash_anchor *table, struct trans_logger_mref_aspect *elem_a, atomic_t *cnt) +static +void hash_insert(struct trans_logger_output *output, struct trans_logger_mref_aspect *elem_a) { loff_t base_index = elem_a->object->ref_pos >> REGION_SIZE_BITS; int hash = hash_fn(base_index); - struct hash_anchor *start = &table[hash]; + struct hash_anchor *start = &output->hash_table[hash]; unsigned int flags; - traced_writelock(&start->hash_lock, flags); - #if 1 CHECK_HEAD_EMPTY(&elem_a->hash_head); #endif + atomic_inc(&elem_a->object->ref_count); // must be paired with hash_put() + // only for statistics: + atomic_inc(&output->inner_balance_count); + atomic_inc(&output->hash_count); + + traced_writelock(&start->hash_lock, flags); + list_add(&elem_a->hash_head, &start->hash_anchor); - atomic_inc(&elem_a->object->ref_count); // paired with hash_put() - atomic_inc(cnt); // only for statistics traced_writeunlock(&start->hash_lock, flags); } -static inline bool hash_put(struct hash_anchor *table, struct trans_logger_mref_aspect *elem_a, atomic_t *cnt) +static inline bool hash_put(struct trans_logger_output *output, struct trans_logger_mref_aspect *elem_a) { struct mref_object *elem = elem_a->object; loff_t base_index = elem->ref_pos >> REGION_SIZE_BITS; int hash = hash_fn(base_index); - struct hash_anchor *start = &table[hash]; + struct hash_anchor *start = &output->hash_table[hash]; unsigned int flags; bool res; @@ -257,10 +269,11 @@ static inline bool hash_put(struct hash_anchor *table, struct trans_logger_mref_ CHECK_ATOMIC(&elem->ref_count, 1); res = atomic_dec_and_test(&elem->ref_count); + atomic_dec(&output->inner_balance_count); if (res) { list_del_init(&elem_a->hash_head); - atomic_dec(cnt); + atomic_dec(&output->hash_count); } traced_writeunlock(&start->hash_lock, flags); @@ -275,7 +288,7 @@ static int trans_logger_get_info(struct trans_logger_output *output, struct mars return GENERIC_INPUT_CALL(input, mars_get_info, info); } -static void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref); +static void _trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref); static int _read_ref_get(struct trans_logger_output *output, struct trans_logger_mref_aspect *mref_a) { @@ -287,7 +300,7 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger * the old one. * When a shadow is found, use it as buffer for the mref. */ - shadow_a = hash_find(output->hash_table, mref->ref_pos, mref->ref_len); + shadow_a = hash_find(output, mref->ref_pos, mref->ref_len); if (shadow_a) { struct mref_object *shadow = shadow_a->object; int diff = shadow->ref_pos - mref->ref_pos; @@ -304,7 +317,7 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger * region. */ mref->ref_len = diff; - trans_logger_ref_put(output, shadow); + _trans_logger_ref_put(output, shadow); goto call_through; } /* Attach mref to the existing shadow ("slave shadow"). @@ -315,7 +328,8 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger mref->ref_data = shadow->ref_data - diff; mref->ref_flags = shadow->ref_flags; mref_a->shadow_ref = shadow_a; - atomic_set(&mref->ref_count, 1); + atomic_inc(&mref->ref_count); + atomic_inc(&output->inner_balance_count); atomic_inc(&output->sshadow_count); #ifdef USE_MEMCPY if (mref_a->orig_data) { @@ -333,7 +347,7 @@ static int _write_ref_get(struct trans_logger_output *output, struct trans_logge { struct mref_object *mref = mref_a->object; - // unconditionally create a new shadow buffer + // unconditionally create a new master shadow buffer mref->ref_data = kmalloc(mref->ref_len, GFP_MARS); if (unlikely(!mref->ref_data)) { return -ENOMEM; @@ -347,7 +361,8 @@ static int _write_ref_get(struct trans_logger_output *output, struct trans_logge mref_a->output = output; mref->ref_flags = MREF_UPTODATE; mref_a->shadow_ref = mref_a; // cyclic self-reference => indicates master shadow - atomic_set(&mref->ref_count, 1); + atomic_inc(&mref->ref_count); + atomic_inc(&output->inner_balance_count); get_lamport(&mref_a->stamp); return mref->ref_len; } @@ -359,13 +374,13 @@ static int trans_logger_ref_get(struct trans_logger_output *output, struct mref_ CHECK_PTR(output, err); -#if 1 // xxx + atomic_inc(&output->outer_balance_count); + if (atomic_read(&mref->ref_count) > 0) { // setup already performed - MARS_INF("aha %d\n", atomic_read(&mref->ref_count)); + MARS_DBG("aha %d\n", atomic_read(&mref->ref_count)); atomic_inc(&mref->ref_count); return mref->ref_len; } -#endif mref_a = trans_logger_mref_get_aspect(output, mref); CHECK_PTR(mref_a, err); @@ -393,7 +408,8 @@ err: return -EINVAL; } -static void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) +static +void __trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) { struct trans_logger_mref_aspect *mref_a; struct trans_logger_mref_aspect *shadow_a; @@ -413,9 +429,10 @@ restart: if (shadow_a) { bool finished; if (mref_a->is_hashed) { - finished = hash_put(output->hash_table, mref_a, &output->hash_count); + finished = hash_put(output, mref_a); } else { finished = atomic_dec_and_test(&mref->ref_count); + atomic_dec(&output->inner_balance_count); } if (!finished) { return; @@ -445,6 +462,18 @@ err: MARS_FAT("oops\n"); } +static void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) +{ + atomic_dec(&output->outer_balance_count); + _trans_logger_ref_put(output, mref); +} + +static void _trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) +{ + //atomic_dec(&output->inner_balance_count); + __trans_logger_ref_put(output, mref); +} + static void _trans_logger_endio(struct generic_callback *cb) { struct trans_logger_mref_aspect *mref_a; @@ -462,8 +491,6 @@ static void _trans_logger_endio(struct generic_callback *cb) output = mref_a->output; CHECK_PTR(output, err); - atomic_dec(&output->fly_count); - prev_cb = cb->cb_prev; CHECK_PTR(prev_cb, err); mref = mref_a->object; @@ -471,6 +498,9 @@ static void _trans_logger_endio(struct generic_callback *cb) mref->ref_cb = prev_cb; prev_cb->cb_fn(prev_cb); + + atomic_dec(&output->fly_count); + err: ; } @@ -514,7 +544,7 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mref_ if (!mref_a->is_hashed) { mref_a->is_hashed = true; MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos); - hash_insert(output->hash_table, mref_a, &output->hash_count); + hash_insert(output, mref_a); } q_insert(&output->q_phase1, mref_a); wake_up_interruptible(&output->event); @@ -528,6 +558,7 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mref_ } atomic_inc(&output->fly_count); + mref_a->output = output; cb = &mref_a->cb; cb->cb_fn = _trans_logger_endio; @@ -579,7 +610,11 @@ static void phase1_endio(struct generic_callback *cb) orig_cb->cb_fn(orig_cb); // queue up for the next phase - q_insert(&output->q_phase2, orig_mref_a); + if (output->brick->log_reads) { + q_insert(&output->q_phase2, orig_mref_a); + } else { + q_insert(&output->q_phase2, orig_mref_a); + } wake_up_interruptible(&output->event); err: ; } @@ -696,7 +731,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) /* allocate internal sub_mref for further work */ while (len > 0) { - sub_mref = mars_alloc_mref(&brick->logst.hidden_output, &brick->logst.ref_object_layout); + sub_mref = trans_logger_alloc_mref((void*)brick->logst.output, &brick->logst.ref_object_layout); if (unlikely(!sub_mref)) { MARS_FAT("cannot alloc sub_mref\n"); goto err; @@ -706,7 +741,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) sub_mref->ref_len = len; sub_mref->ref_may_write = WRITE; - sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)&brick->logst.hidden_output, sub_mref); + sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)brick->logst.output, sub_mref); CHECK_PTR(sub_mref_a, err); sub_mref_a->stamp = orig_mref_a->stamp; sub_mref_a->orig_mref_a = orig_mref_a; @@ -717,6 +752,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) MARS_FAT("cannot get sub_ref, status = %d\n", status); goto err; } + atomic_inc(&output->sub_balance_count); pos += sub_mref->ref_len; len -= sub_mref->ref_len; @@ -725,6 +761,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) */ CHECK_ATOMIC(&orig_mref->ref_count, 1); atomic_inc(&orig_mref->ref_count); + atomic_inc(&output->inner_balance_count); cb = &sub_mref_a->cb; cb->cb_fn = phase2_endio; @@ -733,6 +770,9 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) cb->cb_prev = NULL; sub_mref->ref_cb = cb; sub_mref->ref_rw = 0; +#if 1 + sub_mref->ref_prio = MARS_PRIO_LOW; +#endif atomic_inc(&output->q_phase2.q_flying); if (output->brick->log_reads) { @@ -746,7 +786,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) * _replace_ the original reference by the sub_mref counts * from above). */ - trans_logger_ref_put(output, orig_mref); + _trans_logger_ref_put(output, orig_mref); return true; err: @@ -855,7 +895,7 @@ static void phase4_endio(struct generic_callback *cb) put: //MARS_INF("put ORIGREF.\n"); CHECK_ATOMIC(&orig_mref->ref_count, 1); - trans_logger_ref_put(orig_mref_a->output, orig_mref); + _trans_logger_ref_put(orig_mref_a->output, orig_mref); wake_up_interruptible(&output->event); err: ; } @@ -896,6 +936,7 @@ static bool phase4_startio(struct trans_logger_mref_aspect *sub_mref_a) //MARS_INF("put SUBREF.\n"); GENERIC_INPUT_CALL(input, mref_put, sub_mref); + atomic_dec(&output->sub_balance_count); return true; err: @@ -939,6 +980,9 @@ static inline int _congested(struct trans_logger_output *output) || atomic_read(&output->q_phase4.q_flying); } +#define DO_DRAIN(output) \ + ((output)->old_fly_count + atomic_read(&(output)->fly_count) <= 0) + static void trans_logger_log(struct trans_logger_output *output) { @@ -949,23 +993,25 @@ void trans_logger_log(struct trans_logger_output *output) while (!kthread_should_stop() || _congested(output)) { int status; + output->old_fly_count = atomic_read(&output->fly_count); + wait_event_interruptible_timeout( output->event, - q_is_ready(&output->q_phase1) || - q_is_ready(&output->q_phase2) || - q_is_ready(&output->q_phase3) || - q_is_ready(&output->q_phase4) || + q_is_ready(&output->q_phase1, DO_DRAIN(output)) || + q_is_ready(&output->q_phase2, DO_DRAIN(output)) || + q_is_ready(&output->q_phase3, DO_DRAIN(output)) || + q_is_ready(&output->q_phase4, DO_DRAIN(output)) || (kthread_should_stop() && !_congested(output)), wait_jiffies); #if 1 if (((int)jiffies) - last_jiffies >= HZ * 10 && brick->power.button) { last_jiffies = jiffies; - MARS_INF("LOGGER: mshadow=%d sshadow=%d hash_count=%d fly=%d phase1=%d/%d phase2=%d/%d phase3=%d/%d phase4=%d/%d\n", atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying)); + MARS_INF("LOGGER: mshadow=%d sshadow=%d hash_count=%d balance=%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->sub_balance_count), atomic_read(&output->inner_balance_count), atomic_read(&output->outer_balance_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying)); } #endif wait_jiffies = HZ; - status = run_queue(&output->q_phase1, phase1_startio, 1000); + status = run_queue(&output->q_phase1, phase1_startio, output->q_phase1.q_batchlen); if (unlikely(status > 0)) { (void)run_queue(&output->q_phase3, phase3_startio, 1); log_skip(&brick->logst); @@ -976,20 +1022,20 @@ void trans_logger_log(struct trans_logger_output *output) /* Strategy / performance: * run higher phases only when IO contention is "low". */ - if (q_is_ready(&output->q_phase2)) { - (void)run_queue(&output->q_phase2, phase2_startio, 64); + if (q_is_ready(&output->q_phase2, DO_DRAIN(output))) { + (void)run_queue(&output->q_phase2, phase2_startio, output->q_phase2.q_batchlen); } - if (q_is_ready(&output->q_phase3)) { - status = run_queue(&output->q_phase3, phase3_startio, 64); + if (q_is_ready(&output->q_phase3, DO_DRAIN(output))) { + status = run_queue(&output->q_phase3, phase3_startio, output->q_phase3.q_batchlen); if (unlikely(status > 0)) { log_skip(&brick->logst); wait_jiffies = 5; continue; } } - if (q_is_ready(&output->q_phase4)) { - (void)run_queue(&output->q_phase4, phase4_startio, 64); + if (q_is_ready(&output->q_phase4, DO_DRAIN(output))) { + (void)run_queue(&output->q_phase4, phase4_startio, output->q_phase4.q_batchlen); } } } @@ -1085,8 +1131,7 @@ MARS_MAKE_STATICS(trans_logger); static int trans_logger_brick_construct(struct trans_logger_brick *brick) { - _generic_output_init((struct generic_brick*)brick, (struct generic_output_type*)&trans_logger_output_type, (struct generic_output*)&brick->logst.hidden_output, "internal"); - brick->logst.input = (void*)brick->inputs[1]; + init_logst(&brick->logst, (void*)brick->inputs[1], (void*)brick->outputs[0]); return 0; } diff --git a/mars_trans_logger.h b/mars_trans_logger.h index 9e48abd1..66598228 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -24,6 +24,7 @@ struct logger_queue { atomic_t q_flying; long long q_last_action; // jiffies // tunables + int q_batchlen; int q_max_queued; int q_max_flying; int q_max_jiffies; @@ -67,11 +68,14 @@ struct trans_logger_brick { struct trans_logger_output { MARS_OUTPUT(trans_logger); - struct hash_anchor hash_table[TRANS_HASH_MAX]; - atomic_t hash_count; atomic_t fly_count; + atomic_t hash_count; atomic_t mshadow_count; atomic_t sshadow_count; + atomic_t outer_balance_count; + atomic_t inner_balance_count; + atomic_t sub_balance_count; + int old_fly_count; struct task_struct *thread; wait_queue_head_t event; // queues @@ -79,6 +83,7 @@ struct trans_logger_output { struct logger_queue q_phase2; struct logger_queue q_phase3; struct logger_queue q_phase4; + struct hash_anchor hash_table[TRANS_HASH_MAX]; }; struct trans_logger_input {