diff --git a/brick.c b/brick.c index 37858412..c03ddae8 100644 --- a/brick.c +++ b/brick.c @@ -732,6 +732,7 @@ void get_lamport(struct timespec *now) down(&lamport_sem); + //*now = current_kernel_time(); *now = CURRENT_TIME; diff = timespec_compare(now, &lamport_now); if (diff > 0) { diff --git a/log_format.c b/log_format.c index 0feb470e..5b972f48 100644 --- a/log_format.c +++ b/log_format.c @@ -62,9 +62,10 @@ void log_flush(struct log_status *logst) // round up to next alignment border int align_offset = logst->offset & (logst->align_size-1); if (align_offset > 0) { + int restlen = mref->ref_len - logst->offset; gap = logst->align_size - align_offset; - if (gap > logst->restlen) { - gap = logst->restlen; + if (gap > restlen) { + gap = restlen; } } } @@ -106,14 +107,14 @@ void *log_reserve(struct log_status *logst, struct log_header *lh) MARS_DBG("reserving %d bytes at %lld\n", lh->l_len, logst->log_pos); - if (total_len > logst->restlen || !cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX) { + mref = logst->log_mref; + if ((mref && total_len > mref->ref_len - logst->offset) + || !cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX) { log_flush(logst); } mref = logst->log_mref; if (!mref) { - int chunk_offset; - int chunk_rest; if (unlikely(logst->private)) { MARS_ERR("oops\n"); kfree(logst->private); @@ -133,21 +134,19 @@ void *log_reserve(struct log_status *logst, struct log_header *lh) cb_info->mref = mref; mref->ref_pos = logst->log_pos; - chunk_offset = logst->log_pos & (loff_t)(logst->chunk_size - 1); - chunk_rest = logst->chunk_size - chunk_offset; - if (chunk_rest < total_len) { - mref->ref_pos += chunk_rest; - chunk_rest = logst->chunk_size; - } - mref->ref_len = chunk_rest; - if (mref->ref_len < total_len) { - MARS_INF("not good: ref_len = %d total_len = %d\n", mref->ref_len, total_len); - mref->ref_len = total_len; - } + mref->ref_len = total_len; mref->ref_may_write = WRITE; -#if 0 - mref->ref_prio = MARS_PRIO_LOW; -#endif + mref->ref_prio = logst->io_prio; + if (logst->chunk_size > 0) { + int chunk_offset; + int chunk_rest; + chunk_offset = logst->log_pos & (loff_t)(logst->chunk_size - 1); + chunk_rest = logst->chunk_size - chunk_offset; + while (chunk_rest < total_len) { + chunk_rest += logst->chunk_size; + } + mref->ref_len = chunk_rest; + } status = GENERIC_INPUT_CALL(logst->input, mref_get, mref); if (unlikely(status < 0)) { @@ -162,7 +161,6 @@ void *log_reserve(struct log_status *logst, struct log_header *lh) goto put; } - logst->restlen = mref->ref_len; logst->offset = 0; logst->log_mref = mref; } @@ -185,12 +183,12 @@ void *log_reserve(struct log_status *logst, struct log_header *lh) logst->payload_offset = offset; logst->payload_len = lh->l_len; - logst->offset = offset; return data + offset; put: GENERIC_INPUT_CALL(logst->input, mref_put, mref); + logst->log_mref = NULL; return NULL; err_free: @@ -211,13 +209,19 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private struct timespec now; void *data; int offset; + int restlen; int nr_endio; bool ok = false; CHECK_PTR(mref, err); - if (unlikely(len > logst->restlen)) { - MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->restlen); + if (unlikely(len > logst->payload_len)) { + MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->payload_len); + goto err; + } + restlen = mref->ref_len - logst->offset; + if (unlikely(len + END_OVERHEAD > restlen)) { + MARS_ERR("trying to write more than available (%d > %d)\n", len, (int)(restlen - END_OVERHEAD)); goto err; } if (unlikely(!cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX)) { @@ -245,8 +249,11 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private DATA_PUT(data, offset, now.tv_sec); DATA_PUT(data, offset, now.tv_nsec); + if (unlikely(offset > mref->ref_len)) { + MARS_ERR("length calculation was wrong: %d > %d\n", offset, mref->ref_len); + goto err; + } logst->offset = offset; - logst->restlen = mref->ref_len - offset; /* This must come last. In case of incomplete * or even overlapping disk transfers, this indicates @@ -271,6 +278,8 @@ EXPORT_SYMBOL_GPL(log_finalize); static int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *payload_len) { + bool dirty = false; + int offset; int i; *payload_len = 0; @@ -284,10 +293,12 @@ int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *pay char valid_copy; int restlen; - int offset = i; + offset = i; DATA_GET(buf, offset, start_magic); if (start_magic != START_MAGIC) { + if (start_magic != 0) + dirty = true; continue; } @@ -309,8 +320,7 @@ int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *pay } DATA_GET(buf, offset, total_len); if (total_len > restlen) { - MARS_WRN("data at offset %d is too long (len = %d larger than boundary %d)\n", i, total_len, restlen); - continue; + return -EAGAIN; } memset(lh, 0, sizeof(struct log_header)); @@ -338,12 +348,12 @@ int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *pay MARS_WRN("bad end_magic 0x%llx\n", end_magic); continue; } + DATA_GET(buf, offset, lh->l_crc); DATA_GET(buf, offset, valid_copy); if (valid_copy != 1) { MARS_WRN("found uncompleted / invalid data at %d len = %d (valid_flag = %d)\n", i, lh->l_len, (int)valid_copy); continue; } - DATA_GET(buf, offset, lh->l_crc); // skip spares offset += 3 + 4; DATA_GET(buf, offset, lh->l_written.tv_sec); @@ -354,13 +364,16 @@ int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *pay MARS_WRN("size mismatch at offset %d: %d != %d\n", i, total_len, offset - i); // just warn, but no consequences: better use the data, it has been checked by lots of magics } - - if (i > 0) { - MARS_WRN("skipped %d bytes to find valid data\n", i); - } - return offset - i; + goto done; } - return -EAGAIN; + offset = i; + +done: + // don't cry when nullbytes have been skipped + if (i > 0 && dirty) { + MARS_WRN("skipped %d dirty bytes at offset %d to find valid data\n", i, offset); + } + return offset; } static @@ -387,6 +400,7 @@ int log_read(struct log_status *logst, struct log_header *lh, void **payload, in struct generic_callback *cb; int chunk_offset; int chunk_rest; + mref = mars_alloc_mref(logst->output, &logst->ref_object_layout); if (unlikely(!mref)) { MARS_ERR("no mref\n"); @@ -395,7 +409,7 @@ int log_read(struct log_status *logst, struct log_header *lh, void **payload, in mref->ref_pos = logst->log_pos; chunk_offset = logst->log_pos & (loff_t)(logst->chunk_size - 1); chunk_rest = logst->chunk_size - chunk_offset; - mref->ref_len = chunk_rest; + mref->ref_len = chunk_rest + logst->chunk_size; #if 0 mref->ref_prio = MARS_PRIO_LOW; #endif @@ -429,12 +443,16 @@ int log_read(struct log_status *logst, struct log_header *lh, void **payload, in } status = log_scan(mref->ref_data + logst->offset, mref->ref_len - logst->offset, lh, payload, payload_len); - if (status < 0) { + if (unlikely(status == 0)) { + MARS_ERR("bad logfile scan\n"); + status = -EINVAL; + } + if (unlikely(status < 0)) { goto done_free; } logst->offset += status; - if (logst->offset < mref->ref_len) { + if (logst->offset < mref->ref_len - logst->chunk_size) { goto done; } diff --git a/log_format.h b/log_format.h index 0eb6747a..aca19d9c 100644 --- a/log_format.h +++ b/log_format.h @@ -91,6 +91,7 @@ struct log_status { // tunables int align_size; // alignment between requests int chunk_size; // must be at least 8K (better 64k) + int io_prio; // informational loff_t log_pos; // internal @@ -98,7 +99,6 @@ struct log_status { struct mars_output *output; struct generic_object_layout ref_object_layout; struct mars_info info; - int restlen; int offset; int validflag_offset; int reallen_offset; diff --git a/mars.h b/mars.h index 8abb1a42..26e00588 100644 --- a/mars.h +++ b/mars.h @@ -97,12 +97,14 @@ struct mref_object_layout { #ifdef MARS_TRACING +extern unsigned long long start_trace_clock; + #define MAX_TRACES 16 #define TRACING_INFO \ int ref_traces; \ - struct timespec ref_trace_stamp[MAX_TRACES]; \ - const char *ref_trace_info[MAX_TRACES]; + unsigned long long ref_trace_stamp[MAX_TRACES]; \ + const char *ref_trace_info[MAX_TRACES]; extern void _mars_log(char *buf, int len); extern void mars_log(const char *fmt, ...); diff --git a/mars_bio.c b/mars_bio.c index 90039e0a..46e2583c 100644 --- a/mars_bio.c +++ b/mars_bio.c @@ -329,6 +329,15 @@ static void bio_ref_io(struct bio_output *output, struct mref_object *mref) MARS_IO("starting IO rw = %d fly = %d\n", rw, atomic_read(&output->brick->fly_count)); mars_trace(mref, "bio_submit"); +#ifdef WAIT_CLASH + mref_a->hash_pos = (mref->ref_pos / PAGE_SIZE) % WAIT_CLASH; + if (mref->ref_rw) { + down_write(&output->brick->hashtable[mref_a->hash_pos]); + } else { + down_read(&output->brick->hashtable[mref_a->hash_pos]); + } +#endif + #ifdef FAKE_IO bio->bi_end_io(bio, 0); #else @@ -396,6 +405,13 @@ static int bio_thread(void *data) mref = mref_a->object; +#ifdef WAIT_CLASH + if (mref_a->object->ref_rw) { + up_write(&brick->hashtable[mref_a->hash_pos]); + } else { + up_read(&brick->hashtable[mref_a->hash_pos]); + } +#endif mars_trace(mref, "bio_endio"); cb = mref->ref_cb; @@ -524,6 +540,12 @@ MARS_MAKE_STATICS(bio); static int bio_brick_construct(struct bio_brick *brick) { +#ifdef WAIT_CLASH + int i; + for (i = 0; i < WAIT_CLASH; i++) { + init_rwsem(&brick->hashtable[i]); + } +#endif spin_lock_init(&brick->lock); INIT_LIST_HEAD(&brick->completed_list); init_waitqueue_head(&brick->event); diff --git a/mars_bio.h b/mars_bio.h index 5f121c77..915a9b9a 100644 --- a/mars_bio.h +++ b/mars_bio.h @@ -3,6 +3,9 @@ #define MARS_BIO_H #include +#include + +//#define WAIT_CLASH 1024 struct bio_mref_aspect { GENERIC_ASPECT(mref); @@ -10,6 +13,7 @@ struct bio_mref_aspect { struct bio *bio; struct bio_output *output; int status_code; + int hash_pos; struct page *page; //bool do_dealloc; }; @@ -28,6 +32,9 @@ struct bio_brick { struct file *filp; struct block_device *bdev; struct task_struct *thread; +#ifdef WAIT_CLASH + struct rw_semaphore hashtable[WAIT_CLASH]; +#endif int bvec_max; }; diff --git a/mars_client.c b/mars_client.c index 9cbd5dcc..e28c8b36 100644 --- a/mars_client.c +++ b/mars_client.c @@ -194,6 +194,11 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref goto error; } + while (output->brick->max_flying > 0 && atomic_read(&output->fly_count) > output->brick->max_flying) { + msleep(1000 * 2 / HZ); + } + + atomic_inc(&output->fly_count); atomic_inc(&mref->ref_count); traced_lock(&output->lock, flags); @@ -265,6 +270,8 @@ static int receiver_thread(void *data) list_del_init(&mref_a->io_head); traced_unlock(&output->lock, flags); + atomic_dec(&output->fly_count); + cb = mref->ref_cb; cb->cb_fn(cb); client_ref_put(output, mref); diff --git a/mars_client.h b/mars_client.h index 7eccbe61..6f30938a 100644 --- a/mars_client.h +++ b/mars_client.h @@ -12,6 +12,8 @@ struct client_mref_aspect { struct client_brick { MARS_BRICK(client); + // tunables + int max_flying; // limit on parallelism }; struct client_input { @@ -26,6 +28,7 @@ struct client_threadinfo { struct client_output { MARS_OUTPUT(client); + atomic_t fly_count; spinlock_t lock; struct list_head mref_list; struct list_head wait_list; diff --git a/mars_generic.c b/mars_generic.c index 718bfc8e..3b28852a 100644 --- a/mars_generic.c +++ b/mars_generic.c @@ -95,6 +95,9 @@ EXPORT_SYMBOL_GPL(mars_dent_meta); #ifdef MARS_TRACING +unsigned long long start_trace_clock = 0; +EXPORT_SYMBOL_GPL(start_trace_clock); + struct file *mars_log_file = NULL; loff_t mars_log_pos = 0; @@ -136,8 +139,8 @@ EXPORT_SYMBOL_GPL(mars_log); void mars_trace(struct mref_object *mref, const char *info) { int index = mref->ref_traces; - if (index < MAX_TRACES) { - mref->ref_trace_stamp[index] = CURRENT_TIME; + if (likely(index < MAX_TRACES)) { + mref->ref_trace_stamp[index] = cpu_clock(raw_smp_processor_id()); mref->ref_trace_info[index] = info; mref->ref_traces++; } @@ -146,10 +149,9 @@ EXPORT_SYMBOL_GPL(mars_trace); void mars_log_trace(struct mref_object *mref) { - static struct timespec first = {}; char *buf = kmalloc(PAGE_SIZE, GFP_MARS); - struct timespec old = {}; - struct timespec diff; + unsigned long long old; + unsigned long long diff; int i; int len; @@ -159,19 +161,19 @@ void mars_log_trace(struct mref_object *mref) if (!mars_log_file || !mref->ref_traces) { goto done; } - if (!first.tv_sec) { - first = mref->ref_trace_stamp[0]; + if (!start_trace_clock) { + start_trace_clock = mref->ref_trace_stamp[0]; } - old = first; - diff = timespec_sub(mref->ref_trace_stamp[mref->ref_traces-1], old); + diff = mref->ref_trace_stamp[mref->ref_traces-1] - mref->ref_trace_stamp[0]; - len = sprintf(buf, "%c ;%11lld ;%4d;%2ld.%09ld", mref->ref_rw ? 'W' : 'R', mref->ref_pos, mref->ref_len, diff.tv_sec, diff.tv_nsec); + len = sprintf(buf, "%c ;%12lld ;%6d;%10llu", mref->ref_rw ? 'W' : 'R', mref->ref_pos, mref->ref_len, diff / 1000); + old = start_trace_clock; for (i = 0; i < mref->ref_traces; i++) { - diff = timespec_sub(mref->ref_trace_stamp[i], old); + diff = mref->ref_trace_stamp[i] - old; - len += sprintf(buf + len, " ; %s ;%4ld.%09ld", mref->ref_trace_info[i], diff.tv_sec, diff.tv_nsec); + len += sprintf(buf + len, " ; %s ;%10llu", mref->ref_trace_info[i], diff / 1000); old = mref->ref_trace_stamp[i]; } len +=sprintf(buf + len, "\n"); @@ -180,6 +182,7 @@ void mars_log_trace(struct mref_object *mref) done: kfree(buf); + mref->ref_traces = 0; } EXPORT_SYMBOL_GPL(mars_log_trace); @@ -1210,6 +1213,10 @@ struct mars_brick *make_brick_all( MARS_DBG("substitute by remote brick '%s' on peer '%s'\n", new_name, remote); brick = mars_make_brick(global, belongs, _client_brick_type, new_path, new_name); + if (brick) { + struct client_brick *_brick = (void*)brick; + _brick->max_flying = 1000; + } } } if (!brick && new_brick_type == _bio_brick_type && _aio_brick_type) { diff --git a/mars_light.c b/mars_light.c index 1d0f4d12..3b102c75 100644 --- a/mars_light.c +++ b/mars_light.c @@ -58,13 +58,17 @@ struct light_class { //#define FLUSH_DELAY (HZ / 100 + 1) #define FLUSH_DELAY 0 -#define TRANS_FAKE +//#define TRANS_FAKE #define CONF_TRANS_BATCHLEN 32 #define CONF_TRANS_FLYING 4 +#define CONF_TRANS_PRIO MARS_PRIO_HIGH #define CONF_ALL_BATCHLEN 2 -#define CONF_ALL_FLYING 0 +#define CONF_ALL_FLYING 4 +#define CONF_ALL_CONTENTION 0 +#define CONF_ALL_PRESSURE 0 +#define CONF_ALL_PRIO MARS_PRIO_LOW #define CONF_ALL_MAX_QUEUE 10000 #define CONF_ALL_MAX_JIFFIES (180 * HZ) @@ -95,6 +99,21 @@ void _set_trans_params(struct mars_brick *_brick, void *private) trans_brick->outputs[0]->q_phase3.q_max_flying = CONF_ALL_FLYING; trans_brick->outputs[0]->q_phase4.q_max_flying = CONF_ALL_FLYING; + trans_brick->outputs[0]->q_phase1.q_max_contention = CONF_ALL_CONTENTION; + trans_brick->outputs[0]->q_phase2.q_max_contention = CONF_ALL_CONTENTION; + trans_brick->outputs[0]->q_phase3.q_max_contention = CONF_ALL_CONTENTION; + trans_brick->outputs[0]->q_phase4.q_max_contention = CONF_ALL_CONTENTION; + + trans_brick->outputs[0]->q_phase1.q_over_pressure = CONF_ALL_PRESSURE; + trans_brick->outputs[0]->q_phase2.q_over_pressure = CONF_ALL_PRESSURE; + trans_brick->outputs[0]->q_phase3.q_over_pressure = CONF_ALL_PRESSURE; + trans_brick->outputs[0]->q_phase4.q_over_pressure = CONF_ALL_PRESSURE; + + trans_brick->outputs[0]->q_phase1.q_io_prio = CONF_TRANS_PRIO; + trans_brick->outputs[0]->q_phase2.q_io_prio = CONF_ALL_PRIO; + trans_brick->outputs[0]->q_phase3.q_io_prio = CONF_ALL_PRIO; + trans_brick->outputs[0]->q_phase4.q_io_prio = CONF_ALL_PRIO; + trans_brick->outputs[0]->q_phase2.q_max_queued = CONF_ALL_MAX_QUEUE; trans_brick->outputs[0]->q_phase4.q_max_queued = CONF_ALL_MAX_QUEUE; diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 7c30eeef..3af16610 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -71,13 +71,11 @@ bool q_is_ready(struct logger_queue *q) if (dep) { contention += atomic_read(&dep->q_queued) + atomic_read(&dep->q_flying); } - max_contention = q->q_dep_flying; + max_contention = q->q_max_contention; over = queued - q->q_max_queued; -#if 0 - if (over > 0) { - max_contention += over / 128; + if (over > 0 && q->q_over_pressure > 0) { + max_contention += over / q->q_over_pressure; } -#endif #if 1 /* 2) when other queues are too much contended, @@ -333,6 +331,8 @@ static inline bool hash_put(struct trans_logger_output *output, struct trans_log ////////////////// own brick / input / output operations ////////////////// +static atomic_t global_mshadow_count = ATOMIC_INIT(0); + static int trans_logger_get_info(struct trans_logger_output *output, struct mars_info *info) { struct trans_logger_input *input = output->brick->inputs[0]; @@ -414,6 +414,7 @@ static int _write_ref_get(struct trans_logger_output *output, struct trans_logge } mref->ref_data = data; atomic_inc(&output->mshadow_count); + atomic_inc(&global_mshadow_count); #ifdef USE_MEMCPY if (mref_a->orig_data) { memcpy(mref->ref_data, mref_a->orig_data, mref->ref_len); @@ -516,6 +517,7 @@ restart: #endif mref->ref_data = NULL; atomic_dec(&output->mshadow_count); + atomic_dec(&global_mshadow_count); trans_logger_free_mref(mref); return; } @@ -828,7 +830,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) goto err; } - mars_trace(sub_mref, "sub_start"); + mars_trace(sub_mref, "sub_create"); atomic_inc(&output->sub_balance_count); pos += sub_mref->ref_len; @@ -848,9 +850,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) cb->cb_prev = NULL; sub_mref->ref_cb = cb; sub_mref->ref_rw = 0; -#if 1 - sub_mref->ref_prio = MARS_PRIO_LOW; -#endif + sub_mref->ref_prio = output->q_phase2.q_io_prio; atomic_inc(&output->q_phase2.q_flying); if (output->brick->log_reads) { @@ -969,6 +969,7 @@ static void phase4_endio(struct generic_callback *cb) CHECK_PTR(orig_mref, err); mars_trace(sub_mref_a->object, "sub_endio"); + mars_log_trace(sub_mref_a->object); atomic_dec(&output->q_phase4.q_flying); @@ -989,6 +990,7 @@ static void phase4_endio(struct generic_callback *cb) list_del_init(tmp); traced_unlock(&brick->pos_lock, flags); + mars_log_trace(sub_mref_a->object); put: //MARS_INF("put ORIGREF.\n"); @@ -1029,9 +1031,14 @@ static bool phase4_startio(struct trans_logger_mref_aspect *sub_mref_a) cb->cb_prev = NULL; sub_mref->ref_cb = cb; sub_mref->ref_rw = 1; + sub_mref->ref_prio = output->q_phase4.q_io_prio; atomic_inc(&output->q_phase4.q_flying); atomic_inc(&output->total_writeback_count); + + mars_log_trace(sub_mref); + mars_trace(sub_mref, "sub_start"); + if (orig_mref_a->is_outdated || output->brick->debug_shortcut) { MARS_IO("SHORTCUT %d\n", sub_mref->ref_len); atomic_inc(&output->total_shortcut_count); @@ -1108,6 +1115,8 @@ void trans_logger_log(struct trans_logger_output *output) long long last_jiffies = jiffies; long long log_jiffies = jiffies; + mars_power_led_on((void*)brick, true); + while (!kthread_should_stop() || _congested(output)) { int status; @@ -1126,7 +1135,22 @@ void trans_logger_log(struct trans_logger_output *output) //MARS_INF("AHA %d\n", atomic_read(&output->q_phase1.q_queued)); #if 1 - if (((long long)jiffies) - last_jiffies >= HZ * 10 && brick->power.button) { + { + static int old_mshadow_count = 0; + int cnt; + + cnt = atomic_read(&global_mshadow_count); + if (cnt + old_mshadow_count > 0 && cnt != old_mshadow_count) { + unsigned long long now = cpu_clock(raw_smp_processor_id()); + if (!start_trace_clock) + start_trace_clock = now; + now -= start_trace_clock; + mars_log("shadow_count ;%12lld ; %4d\n", now / 1000, cnt); + } + old_mshadow_count = cnt; + } + + if (((long long)jiffies) - last_jiffies >= HZ * 5 && brick->power.button) { last_jiffies = jiffies; MARS_INF("LOGGER: reads=%d writes=%d writeback=%d shortcut=%d (%d%%) | mshadow=%d sshadow=%d hash_count=%d balance=%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", atomic_read(&output->total_read_count), atomic_read(&output->total_write_count), atomic_read(&output->total_writeback_count), atomic_read(&output->total_shortcut_count), atomic_read(&output->total_writeback_count) ? atomic_read(&output->total_shortcut_count) * 100 / atomic_read(&output->total_writeback_count) : 0, atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->sub_balance_count), atomic_read(&output->inner_balance_count), atomic_read(&output->outer_balance_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying)); } @@ -1209,7 +1233,7 @@ int apply_data(struct trans_logger_output *output, struct log_header *lh, void * * The switch infrastructure must be changed before this * can become useful. */ -#if 1 +#if 0 while (len > 0) { struct mref_object *mref; struct trans_logger_mref_aspect *mref_a; @@ -1285,6 +1309,9 @@ void trans_logger_replay(struct trans_logger_output *output) MARS_ERR("cannot read logfile data, status = %d\n", status); break; } + if (!buf || !len) { + continue; + } status = apply_data(output, &lh, buf, len); if (status < 0) { @@ -1302,16 +1329,15 @@ void trans_logger_replay(struct trans_logger_output *output) if (brick->replay_pos == brick->end_pos) { MARS_INF("replay finished at %lld\n", brick->replay_pos); - mars_power_led_on((void*)brick, true); +#if 1 + while (!kthread_should_stop()) { + mars_power_led_on((void*)brick, true); + msleep(500); + } +#endif } else { MARS_INF("replay stopped prematurely at %lld (of %lld)\n", brick->replay_pos, brick->end_pos); - mars_power_led_off((void*)brick, true); } -#if 1 - while (!kthread_should_stop()) { - msleep(500); - } -#endif } ///////////////////////// logger thread / switching ///////////////////////// @@ -1326,7 +1352,6 @@ int trans_logger_thread(void *data) brick->current_pos = brick->start_pos; brick->logst.log_pos = brick->current_pos; - mars_power_led_on((void*)brick, true); brick->logst.align_size = brick->align_size; brick->logst.chunk_size = brick->chunk_size; @@ -1338,6 +1363,7 @@ int trans_logger_thread(void *data) } MARS_INF("........... logger has stopped.\n"); + mars_power_led_on((void*)brick, false); mars_power_led_off((void*)brick, true); return 0; } diff --git a/mars_trans_logger.h b/mars_trans_logger.h index 9ac773cd..5258e065 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -33,7 +33,9 @@ struct logger_queue { int q_max_queued; int q_max_flying; int q_max_jiffies; - int q_dep_flying; + int q_max_contention; + int q_over_pressure; + int q_io_prio; bool q_ordering; };