diff --git a/Kconfig b/Kconfig index 886d6c06..287797b6 100644 --- a/Kconfig +++ b/Kconfig @@ -7,12 +7,12 @@ config MARS ---help--- Experimental storage System. -#config MARS_HUNG -# tristate "hangup on kernel stacktrace (only for debugging)" -# depends on MARS -# default n -# ---help--- -# Experimental storage System. +config MARS_HUNG + tristate "hangup on kernel stacktrace (only for debugging)" + depends on MARS + default n + ---help--- + Experimental storage System. config MARS_DUMMY tristate "MARS dummy brick" diff --git a/lib_log.c b/lib_log.c index ef5b77a4..bac531f0 100644 --- a/lib_log.c +++ b/lib_log.c @@ -471,7 +471,7 @@ restart: mref->ref_pos = logst->log_pos; chunk_offset = logst->log_pos & (loff_t)(logst->chunk_size - 1); chunk_rest = logst->chunk_size - chunk_offset; - mref->ref_len = chunk_rest + logst->chunk_size * 8; + mref->ref_len = chunk_rest + logst->chunk_size * 4; mref->ref_prio = logst->io_prio; status = GENERIC_INPUT_CALL(logst->input, mref_get, mref); diff --git a/mars_aio.c b/mars_aio.c index 49aacd3b..ec955e6f 100644 --- a/mars_aio.c +++ b/mars_aio.c @@ -158,7 +158,7 @@ static int aio_ref_get(struct aio_output *output, struct mref_object *mref) return -EILSEQ; mref->ref_data = mars_alloc(mref->ref_pos, (mref_a->alloc_len = mref->ref_len)); if (!mref->ref_data) { - MARS_DBG("ENOMEM %d\n", mref->ref_len); + MARS_ERR("ENOMEM %d bytes\n", mref->ref_len); return -ENOMEM; } #if 0 // ??? @@ -208,7 +208,7 @@ void _complete(struct aio_output *output, struct mref_object *mref, int err) cb->cb_error = err; if (err < 0) { - MARS_ERR("IO error %d\n", err); + MARS_ERR("IO error %d at pos=%lld len=%d (mref=%p ref_data=%p)\n", err, mref->ref_pos, mref->ref_len, mref, mref->ref_data); } else { mref->ref_flags |= MREF_UPTODATE; } diff --git a/mars_generic.c b/mars_generic.c index aa38f3c5..1f114405 100644 --- a/mars_generic.c +++ b/mars_generic.c @@ -26,7 +26,7 @@ // MARS-specific memory allocation -#define PERFORMANCE_WORKAROUND +#define USE_KERNEL_PAGES #define MARS_MAX_ORDER 8 //#define USE_OFFSET //#define USE_INTERNAL_FREELIST @@ -40,13 +40,17 @@ void *mars_alloc(loff_t pos, int len) { int offset = 0; void *data; -#ifdef PERFORMANCE_WORKAROUND +#ifdef USE_KERNEL_PAGES int order = MARS_MAX_ORDER; + if (unlikely(len > (PAGE_SIZE << order) || len <=0)) { + MARS_ERR("trying to allocate %d bytes (max = %d)\n", len, (PAGE_SIZE << order)); + return NULL; + } #endif #ifdef USE_OFFSET offset = pos & (PAGE_SIZE-1); #endif -#ifdef PERFORMANCE_WORKAROUND +#ifdef USE_KERNEL_PAGES len += offset; while (order > 0 && (PAGE_SIZE << (order-1)) >= len) { order--; @@ -72,7 +76,7 @@ EXPORT_SYMBOL_GPL(mars_alloc); void mars_free(void *data, int len) { int offset = 0; -#ifdef PERFORMANCE_WORKAROUND +#ifdef USE_KERNEL_PAGES int order = MARS_MAX_ORDER; #endif if (!data) { @@ -82,7 +86,7 @@ void mars_free(void *data, int len) offset = ((unsigned long)data) & (PAGE_SIZE-1); #endif data -= offset; -#ifdef PERFORMANCE_WORKAROUND +#ifdef USE_KERNEL_PAGES len += offset; while (order > 0 && (PAGE_SIZE << (order-1)) >= len) { order--; diff --git a/mars_if.c b/mars_if.c index 66c3038e..394a27de 100644 --- a/mars_if.c +++ b/mars_if.c @@ -9,6 +9,8 @@ //#define IO_DEBUGGING #define REQUEST_MERGING +//#define ALWAYS_UNPLUG false +#define ALWAYS_UNPLUG true #define PREFETCH_LEN PAGE_SIZE //#define FRONT_MERGE // FIXME: this does not work. @@ -114,7 +116,8 @@ void if_endio(struct generic_callback *cb) /* Kick off plugged mrefs */ -static void _if_unplug(struct if_input *input) +static +void _if_unplug(struct if_input *input) { //struct if_brick *brick = input->brick; LIST_HEAD(tmp_list); @@ -124,6 +127,9 @@ static void _if_unplug(struct if_input *input) down(&input->kick_sem); traced_lock(&input->req_lock, flags); +#ifdef USE_TIMER + del_timer(&input->timer); +#endif if (!list_empty(&input->plug_anchor)) { // move over the whole list list_replace_init(&input->plug_anchor, &tmp_list); @@ -167,6 +173,14 @@ static void _if_unplug(struct if_input *input) } } +#ifdef USE_TIMER +static +void if_timer(unsigned long data) +{ + _if_unplug((void*)data); +} +#endif + /* accept a linux bio, convert to mref and call buf_io() on it. */ static int if_make_request(struct request_queue *q, struct bio *bio) @@ -193,6 +207,20 @@ static int if_make_request(struct request_queue *q, struct bio *bio) input = q->queuedata; if (unlikely(!input)) goto err; + + if (unlikely(!bio_sectors(bio))) { + atomic_inc(&input->total_empty_count); + _if_unplug(input); + /* THINK: usually this happens only at write barriers. + * We have no "barrier" operation in MARS, since + * callback semantics should always denote + * "writethrough accomplished". + * In case of exceptional semantics, we need to do + * something here. For now, we do just nothing. + */ + bio_endio(bio, 0); + return 0; + } if (rw) { atomic_inc(&input->total_write_count); @@ -421,10 +449,24 @@ err: } } - if (unplug || + if (ALWAYS_UNPLUG || unplug || (brick && brick->max_plugged > 0 && atomic_read(&input->plugged_count) > brick->max_plugged)) { _if_unplug(input); } +#ifdef USE_TIMER + else { + unsigned long flags; + traced_lock(&input->req_lock, flags); + if (timer_pending(&input->timer)) { + del_timer(&input->timer); + } + input->timer.function = if_timer; + input->timer.data = (unsigned long)input; + input->timer.expires = jiffies + HZ/10; + add_timer(&input->timer); + traced_unlock(&input->req_lock, flags); + } +#endif return error; } @@ -639,13 +681,14 @@ char *if_statistics(struct if_brick *brick, int verbose) int tmp4 = atomic_read(&input->total_mref_write_count); if (!res) return NULL; - sprintf(res, "reads = %d mref_reads = %d (%d%%) writes = %d mref_writes = %d (%d%%) | plugged = %d flying = %d (reads = %d writes = %d)\n", + sprintf(res, "total reads = %d mref_reads = %d (%d%%) writes = %d mref_writes = %d (%d%%) empty = %d | plugged = %d flying = %d (reads = %d writes = %d)\n", tmp1, tmp2, tmp1 ? tmp2 * 100 / tmp1 : 0, tmp3, tmp4, tmp3 ? tmp4 * 100 / tmp3 : 0, + atomic_read(&input->total_empty_count), atomic_read(&input->plugged_count), atomic_read(&input->flying_count), atomic_read(&input->read_flying_count), @@ -659,6 +702,7 @@ void if_reset_statistics(struct if_brick *brick) struct if_input *input = brick->inputs[0]; atomic_set(&input->total_read_count, 0); atomic_set(&input->total_write_count, 0); + atomic_set(&input->total_empty_count, 0); atomic_set(&input->total_mref_read_count, 0); atomic_set(&input->total_mref_write_count, 0); } @@ -686,7 +730,7 @@ static void if_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data MARS_MAKE_STATICS(if); -//////////////////////// contructors / destructors //////////////////////// +//////////////////////// constructors / destructors //////////////////////// static int if_brick_construct(struct if_brick *brick) { @@ -713,6 +757,9 @@ static int if_input_construct(struct if_input *input) atomic_set(&input->open_count, 0); atomic_set(&input->flying_count, 0); atomic_set(&input->plugged_count, 0); +#ifdef USE_TIMER + init_timer(&input->timer); +#endif return 0; } diff --git a/mars_if.h b/mars_if.h index ae63b6c0..9c6007df 100644 --- a/mars_if.h +++ b/mars_if.h @@ -12,6 +12,8 @@ #define IF_HASH_MAX 2048 #define IF_HASH_CHUNK (PAGE_SIZE * 32) +//#define USE_TIMER + struct if_mref_aspect { GENERIC_ASPECT(mref); struct list_head plug_head; @@ -34,6 +36,9 @@ struct if_input { struct request_queue *q; struct gendisk *disk; struct block_device *bdev; +#ifdef USE_TIMER + struct timer_list timer; +#endif atomic_t open_count; atomic_t plugged_count; atomic_t flying_count; @@ -42,6 +47,7 @@ struct if_input { atomic_t write_flying_count; atomic_t total_read_count; atomic_t total_write_count; + atomic_t total_empty_count; atomic_t total_mref_read_count; atomic_t total_mref_write_count; spinlock_t req_lock; diff --git a/mars_light.c b/mars_light.c index 182120ea..48bd7aec 100644 --- a/mars_light.c +++ b/mars_light.c @@ -1053,7 +1053,7 @@ out_old: } static -int _update_versionlink(struct mars_global *global, struct mars_dent *parent, int sequence, loff_t start_pos, loff_t end_pos) +int _update_versionlink(struct mars_global *global, struct mars_dent *parent, int sequence, loff_t start_pos, loff_t end_pos, bool is_primary) { char *prev = NULL; struct mars_dent *prev_link = NULL; @@ -1101,6 +1101,14 @@ int _update_versionlink(struct mars_global *global, struct mars_dent *parent, in } else { MARS_DBG("make version symlink '%s' -> '%s' status = %d\n", old, new, status); } + if (is_primary) { + kfree(new); + new = path_make("%s/version-%09d-primary", parent->d_path, sequence); + if (!new) { + goto out; + } + mars_symlink(old, new, &now, 0); + } out: if (new) { @@ -1112,6 +1120,55 @@ out: return status; } +static +int _check_versionlink(struct mars_global *global, struct mars_dent *parent, int sequence) +{ + char *my = NULL; + char *other = NULL; + struct mars_dent *my_dent; + struct mars_dent *other_dent; + int status = -ENOMEM; + + my = path_make("%s/version-%09d-%s", parent->d_path, sequence, my_id()); + if (!my) { + goto out; + } + + other = path_make("%s/version-%09d-primary", parent->d_path, sequence); + if (!other) { + goto out; + } + + status = -ENOENT; + my_dent = mars_find_dent(global, my); + if (!my_dent || !my_dent->new_link) { + MARS_WRN("cannot find symlink '%s'\n", my); + goto out; + } + other_dent = mars_find_dent(global, other); + if (!other_dent || !other_dent->new_link) { + MARS_WRN("cannot find symlink '%s'\n", other); + goto out; + } + + status = 0; + if (!strcmp(my_dent->new_link, other_dent->new_link)) { + status++; + MARS_DBG("versions OK\n"); + } else { + MARS_DBG("versions MISMATCH\n"); + } + +out: + if (my) { + kfree(my); + } + if (other) { + kfree(other); + } + return status; +} + /* This must be called once at every round of logfile checking. */ static @@ -1471,9 +1528,10 @@ int _make_logging_status(struct mars_rotate *rot) case 1: /* Relevant, and transaction replay already finished. * Allow switching over to a new logfile. */ - if (!trans_brick->power.button && !trans_brick->power.led_on && trans_brick->power.led_off) { + if (!trans_brick->power.button && !trans_brick->power.led_on && trans_brick->power.led_off && + (rot->is_primary || _check_versionlink(global, dent->d_parent, dent->d_serial) > 0)) { _update_replaylink(dent->d_parent, dent->d_serial + 1, 0, 0, !rot->is_primary); - _update_versionlink(global, dent->d_parent, dent->d_serial + 1, 0, 0); + _update_versionlink(global, dent->d_parent, dent->d_serial + 1, 0, 0, rot->is_primary); trans_brick->current_pos = 0; rot->last_jiffies = jiffies; //mars_trigger(); @@ -1689,7 +1747,8 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *parent) } else { do_stop = (rot->relevant_log && rot->relevant_log != rot->current_log) || - (rot->current_log && !S_ISREG(rot->current_log->new_stat.mode)); + (rot->current_log && !S_ISREG(rot->current_log->new_stat.mode)) || + (!rot->is_primary && (!rot->if_brick || rot->if_brick->power.led_off)); } MARS_DBG("do_stop = %d\n", (int)do_stop); @@ -1704,7 +1763,7 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *parent) continue; } status = _update_replaylink(parent, trans_input->sequence, trans_input->replay_min_pos, trans_input->replay_max_pos, true); - status = _update_versionlink(global, parent, trans_input->sequence, trans_input->replay_min_pos, trans_input->replay_max_pos); + status = _update_versionlink(global, parent, trans_input->sequence, trans_input->replay_min_pos, trans_input->replay_max_pos, rot->is_primary); old_input = trans_input; } rot->last_jiffies = jiffies; @@ -1866,6 +1925,7 @@ int make_dev(void *buf, struct mars_dent *dent) struct mars_rotate *rot = NULL; struct mars_brick *dev_brick; struct if_brick *_dev_brick; + bool switch_on; int status = 0; if (!parent || !dent->new_link) { @@ -1881,8 +1941,8 @@ int make_dev(void *buf, struct mars_dent *dent) MARS_DBG("nothing to do\n"); goto done; } - if (!rot->trans_brick || rot->trans_brick->do_replay || !rot->trans_brick->power.led_on || rot->trans_brick->power.led_off) { - MARS_DBG("transaction logger not ready for writing\n"); + if (!rot->trans_brick) { + MARS_DBG("transaction logger does not exist\n"); goto done; } @@ -1892,6 +1952,11 @@ int make_dev(void *buf, struct mars_dent *dent) goto done; } + switch_on = + rot->is_primary && + !rot->trans_brick->do_replay && + rot->trans_brick->power.led_on; + dev_brick = make_brick_all(global, dent, @@ -1901,7 +1966,7 @@ int make_dev(void *buf, struct mars_dent *dent) dent->d_argv[0], (const struct generic_brick_type*)&if_brick_type, (const struct generic_brick_type*[]){(const struct generic_brick_type*)&trans_logger_brick_type}, - rot->is_primary ? NULL : "", // KLUDGE + switch_on ? NULL : "", // KLUDGE "%s/linuxdev-%s", (const char *[]){"%s/logger"}, 1, @@ -2247,7 +2312,7 @@ static const struct light_class light_classes[] = { .cl_name = "defaults-", .cl_len = 9, .cl_type = 'd', - .cl_hostcontext = true, + .cl_hostcontext = false, .cl_father = CL_RESOURCE, .cl_forward = NULL, .cl_backward = NULL, @@ -2277,7 +2342,7 @@ static const struct light_class light_classes[] = { .cl_name = "switch-", .cl_len = 7, .cl_type = 'd', - .cl_hostcontext = true, + .cl_hostcontext = false, .cl_father = CL_RESOURCE, .cl_forward = NULL, .cl_backward = NULL, @@ -2299,7 +2364,7 @@ static const struct light_class light_classes[] = { .cl_name = "actual-", .cl_len = 7, .cl_type = 'd', - .cl_hostcontext = true, + .cl_hostcontext = false, .cl_father = CL_RESOURCE, .cl_forward = NULL, .cl_backward = NULL, @@ -2418,7 +2483,7 @@ static const struct light_class light_classes[] = { .cl_len = 8, .cl_type = 'l', .cl_serial = true, - .cl_hostcontext = true, + .cl_hostcontext = false, .cl_father = CL_RESOURCE, .cl_forward = NULL, .cl_backward = NULL, diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 417a7f58..824ed94c 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -15,7 +15,7 @@ // commenting this out is dangerous for data integrity! use only for testing! #define USE_MEMCPY -#define DO_WRITEBACK // otherweise FAKE IO +#define DO_WRITEBACK // otherwise FAKE IO #define APPLY_DATA #include @@ -620,10 +620,10 @@ void __trans_logger_ref_put(struct trans_logger_output *output, struct trans_log struct trans_logger_mref_aspect *shadow_a; struct trans_logger_input *input; - MARS_IO("pos = %lld len = %d\n", mref->ref_pos, mref->ref_len); - restart: mref = mref_a->object; + MARS_IO("pos = %lld len = %d\n", mref->ref_pos, mref->ref_len); + CHECK_ATOMIC(&mref->ref_count, 1); CHECK_PTR(output, err); @@ -2331,7 +2331,8 @@ void trans_logger_replay(struct trans_logger_output *output) input->logst.chunk_size = brick->chunk_size; init_logst(&input->logst, (void*)input, (void*)&input->hidden_output, brick->replay_start_pos); - start_pos = input->logst.log_pos; + start_pos = brick->replay_start_pos; + input->logst.log_pos = start_pos; brick->current_pos = start_pos; input->replay_min_pos = start_pos; input->replay_max_pos = start_pos; // FIXME: this is wrong. @@ -2339,11 +2340,15 @@ void trans_logger_replay(struct trans_logger_output *output) mars_power_led_on((void*)brick, true); for (;;) { + loff_t new_finished_pos; struct log_header lh = {}; void *buf = NULL; int len = 0; - if (kthread_should_stop()) { + finished_pos = input->logst.log_pos + input->logst.offset; + if (kthread_should_stop() || + (!brick->do_continuous_replay && finished_pos >= brick->replay_end_pos)) { + status = 0; // treat as EOF break; } @@ -2358,32 +2363,29 @@ void trans_logger_replay(struct trans_logger_output *output) MARS_ERR("cannot read logfile data, status = %d\n", status); break; } - finished_pos = input->logst.log_pos + input->logst.offset; - if (!brick->do_continuous_replay && finished_pos >= brick->replay_end_pos) { - status = 0; // treat as EOF - } - if (!status) { // EOF -> wait until kthread_should_stop() - MARS_DBG("EOF at %lld\n", finished_pos); + new_finished_pos = input->logst.log_pos + input->logst.offset; + if ((!status && len <= 0) || + new_finished_pos > brick->replay_end_pos) { // EOF -> wait until kthread_should_stop() + MARS_DBG("EOF at %lld (old = %lld, end_pos = %lld)\n", new_finished_pos, finished_pos, brick->replay_end_pos); if (!brick->do_continuous_replay) { + // notice: finished_pos remains at old value here! + brick->replay_end_pos = finished_pos; break; } - if (finished_pos > brick->replay_end_pos) { - brick->replay_end_pos = finished_pos; - } msleep(1000); + continue; } if (lh.l_code != CODE_WRITE_NEW) { MARS_IO("ignoring pos = %lld len = %d code = %d\n", lh.l_pos, lh.l_len, lh.l_code); - continue; - } - - if (likely(buf && len)) { + } else if (likely(buf && len)) { status = apply_data(brick, lh.l_pos, buf, len); if (unlikely(status < 0)) { brick->replay_code = status; MARS_ERR("cannot apply data at pos = %lld len = %d, status = %d\n", lh.l_pos, len, status); break; + } else { + finished_pos = new_finished_pos; } } @@ -2400,7 +2402,10 @@ void trans_logger_replay(struct trans_logger_output *output) wait_event_interruptible_timeout(brick->worker_event, atomic_read(&brick->replay_count) <= 0, 60 * HZ); - finished_pos = input->logst.log_pos + input->logst.offset; + if (unlikely(finished_pos > brick->replay_end_pos)) { + MARS_ERR("finished_pos too large: %lld + %d = %lld > %lld\n", input->logst.log_pos, input->logst.offset, finished_pos, brick->replay_end_pos); + finished_pos = brick->replay_end_pos; + } if (status >= 0) { brick->current_pos = finished_pos; input->replay_min_pos = finished_pos;