1
0
mirror of https://github.com/schoebel/mars synced 2025-03-29 23:06:36 +00:00

import mars-86.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-03-29 15:40:40 +01:00
parent 4bcaf0a3f8
commit 57da6d4b37
12 changed files with 188 additions and 74 deletions

View File

@ -732,6 +732,7 @@ void get_lamport(struct timespec *now)
down(&lamport_sem); down(&lamport_sem);
//*now = current_kernel_time();
*now = CURRENT_TIME; *now = CURRENT_TIME;
diff = timespec_compare(now, &lamport_now); diff = timespec_compare(now, &lamport_now);
if (diff > 0) { if (diff > 0) {

View File

@ -62,9 +62,10 @@ void log_flush(struct log_status *logst)
// round up to next alignment border // round up to next alignment border
int align_offset = logst->offset & (logst->align_size-1); int align_offset = logst->offset & (logst->align_size-1);
if (align_offset > 0) { if (align_offset > 0) {
int restlen = mref->ref_len - logst->offset;
gap = logst->align_size - align_offset; gap = logst->align_size - align_offset;
if (gap > logst->restlen) { if (gap > restlen) {
gap = logst->restlen; gap = restlen;
} }
} }
} }
@ -106,14 +107,14 @@ void *log_reserve(struct log_status *logst, struct log_header *lh)
MARS_DBG("reserving %d bytes at %lld\n", lh->l_len, logst->log_pos); MARS_DBG("reserving %d bytes at %lld\n", lh->l_len, logst->log_pos);
if (total_len > logst->restlen || !cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX) { mref = logst->log_mref;
if ((mref && total_len > mref->ref_len - logst->offset)
|| !cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX) {
log_flush(logst); log_flush(logst);
} }
mref = logst->log_mref; mref = logst->log_mref;
if (!mref) { if (!mref) {
int chunk_offset;
int chunk_rest;
if (unlikely(logst->private)) { if (unlikely(logst->private)) {
MARS_ERR("oops\n"); MARS_ERR("oops\n");
kfree(logst->private); kfree(logst->private);
@ -133,21 +134,19 @@ void *log_reserve(struct log_status *logst, struct log_header *lh)
cb_info->mref = mref; cb_info->mref = mref;
mref->ref_pos = logst->log_pos; mref->ref_pos = logst->log_pos;
chunk_offset = logst->log_pos & (loff_t)(logst->chunk_size - 1); mref->ref_len = total_len;
chunk_rest = logst->chunk_size - chunk_offset;
if (chunk_rest < total_len) {
mref->ref_pos += chunk_rest;
chunk_rest = logst->chunk_size;
}
mref->ref_len = chunk_rest;
if (mref->ref_len < total_len) {
MARS_INF("not good: ref_len = %d total_len = %d\n", mref->ref_len, total_len);
mref->ref_len = total_len;
}
mref->ref_may_write = WRITE; mref->ref_may_write = WRITE;
#if 0 mref->ref_prio = logst->io_prio;
mref->ref_prio = MARS_PRIO_LOW; if (logst->chunk_size > 0) {
#endif int chunk_offset;
int chunk_rest;
chunk_offset = logst->log_pos & (loff_t)(logst->chunk_size - 1);
chunk_rest = logst->chunk_size - chunk_offset;
while (chunk_rest < total_len) {
chunk_rest += logst->chunk_size;
}
mref->ref_len = chunk_rest;
}
status = GENERIC_INPUT_CALL(logst->input, mref_get, mref); status = GENERIC_INPUT_CALL(logst->input, mref_get, mref);
if (unlikely(status < 0)) { if (unlikely(status < 0)) {
@ -162,7 +161,6 @@ void *log_reserve(struct log_status *logst, struct log_header *lh)
goto put; goto put;
} }
logst->restlen = mref->ref_len;
logst->offset = 0; logst->offset = 0;
logst->log_mref = mref; logst->log_mref = mref;
} }
@ -185,12 +183,12 @@ void *log_reserve(struct log_status *logst, struct log_header *lh)
logst->payload_offset = offset; logst->payload_offset = offset;
logst->payload_len = lh->l_len; logst->payload_len = lh->l_len;
logst->offset = offset;
return data + offset; return data + offset;
put: put:
GENERIC_INPUT_CALL(logst->input, mref_put, mref); GENERIC_INPUT_CALL(logst->input, mref_put, mref);
logst->log_mref = NULL;
return NULL; return NULL;
err_free: err_free:
@ -211,13 +209,19 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private
struct timespec now; struct timespec now;
void *data; void *data;
int offset; int offset;
int restlen;
int nr_endio; int nr_endio;
bool ok = false; bool ok = false;
CHECK_PTR(mref, err); CHECK_PTR(mref, err);
if (unlikely(len > logst->restlen)) { if (unlikely(len > logst->payload_len)) {
MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->restlen); MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->payload_len);
goto err;
}
restlen = mref->ref_len - logst->offset;
if (unlikely(len + END_OVERHEAD > restlen)) {
MARS_ERR("trying to write more than available (%d > %d)\n", len, (int)(restlen - END_OVERHEAD));
goto err; goto err;
} }
if (unlikely(!cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX)) { if (unlikely(!cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX)) {
@ -245,8 +249,11 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private
DATA_PUT(data, offset, now.tv_sec); DATA_PUT(data, offset, now.tv_sec);
DATA_PUT(data, offset, now.tv_nsec); DATA_PUT(data, offset, now.tv_nsec);
if (unlikely(offset > mref->ref_len)) {
MARS_ERR("length calculation was wrong: %d > %d\n", offset, mref->ref_len);
goto err;
}
logst->offset = offset; logst->offset = offset;
logst->restlen = mref->ref_len - offset;
/* This must come last. In case of incomplete /* This must come last. In case of incomplete
* or even overlapping disk transfers, this indicates * or even overlapping disk transfers, this indicates
@ -271,6 +278,8 @@ EXPORT_SYMBOL_GPL(log_finalize);
static static
int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *payload_len) int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *payload_len)
{ {
bool dirty = false;
int offset;
int i; int i;
*payload_len = 0; *payload_len = 0;
@ -284,10 +293,12 @@ int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *pay
char valid_copy; char valid_copy;
int restlen; int restlen;
int offset = i;
offset = i;
DATA_GET(buf, offset, start_magic); DATA_GET(buf, offset, start_magic);
if (start_magic != START_MAGIC) { if (start_magic != START_MAGIC) {
if (start_magic != 0)
dirty = true;
continue; continue;
} }
@ -309,8 +320,7 @@ int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *pay
} }
DATA_GET(buf, offset, total_len); DATA_GET(buf, offset, total_len);
if (total_len > restlen) { if (total_len > restlen) {
MARS_WRN("data at offset %d is too long (len = %d larger than boundary %d)\n", i, total_len, restlen); return -EAGAIN;
continue;
} }
memset(lh, 0, sizeof(struct log_header)); memset(lh, 0, sizeof(struct log_header));
@ -338,12 +348,12 @@ int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *pay
MARS_WRN("bad end_magic 0x%llx\n", end_magic); MARS_WRN("bad end_magic 0x%llx\n", end_magic);
continue; continue;
} }
DATA_GET(buf, offset, lh->l_crc);
DATA_GET(buf, offset, valid_copy); DATA_GET(buf, offset, valid_copy);
if (valid_copy != 1) { if (valid_copy != 1) {
MARS_WRN("found uncompleted / invalid data at %d len = %d (valid_flag = %d)\n", i, lh->l_len, (int)valid_copy); MARS_WRN("found uncompleted / invalid data at %d len = %d (valid_flag = %d)\n", i, lh->l_len, (int)valid_copy);
continue; continue;
} }
DATA_GET(buf, offset, lh->l_crc);
// skip spares // skip spares
offset += 3 + 4; offset += 3 + 4;
DATA_GET(buf, offset, lh->l_written.tv_sec); DATA_GET(buf, offset, lh->l_written.tv_sec);
@ -354,13 +364,16 @@ int log_scan(void *buf, int len, struct log_header *lh, void **payload, int *pay
MARS_WRN("size mismatch at offset %d: %d != %d\n", i, total_len, offset - i); MARS_WRN("size mismatch at offset %d: %d != %d\n", i, total_len, offset - i);
// just warn, but no consequences: better use the data, it has been checked by lots of magics // just warn, but no consequences: better use the data, it has been checked by lots of magics
} }
goto done;
if (i > 0) {
MARS_WRN("skipped %d bytes to find valid data\n", i);
}
return offset - i;
} }
return -EAGAIN; offset = i;
done:
// don't cry when nullbytes have been skipped
if (i > 0 && dirty) {
MARS_WRN("skipped %d dirty bytes at offset %d to find valid data\n", i, offset);
}
return offset;
} }
static static
@ -387,6 +400,7 @@ int log_read(struct log_status *logst, struct log_header *lh, void **payload, in
struct generic_callback *cb; struct generic_callback *cb;
int chunk_offset; int chunk_offset;
int chunk_rest; int chunk_rest;
mref = mars_alloc_mref(logst->output, &logst->ref_object_layout); mref = mars_alloc_mref(logst->output, &logst->ref_object_layout);
if (unlikely(!mref)) { if (unlikely(!mref)) {
MARS_ERR("no mref\n"); MARS_ERR("no mref\n");
@ -395,7 +409,7 @@ int log_read(struct log_status *logst, struct log_header *lh, void **payload, in
mref->ref_pos = logst->log_pos; mref->ref_pos = logst->log_pos;
chunk_offset = logst->log_pos & (loff_t)(logst->chunk_size - 1); chunk_offset = logst->log_pos & (loff_t)(logst->chunk_size - 1);
chunk_rest = logst->chunk_size - chunk_offset; chunk_rest = logst->chunk_size - chunk_offset;
mref->ref_len = chunk_rest; mref->ref_len = chunk_rest + logst->chunk_size;
#if 0 #if 0
mref->ref_prio = MARS_PRIO_LOW; mref->ref_prio = MARS_PRIO_LOW;
#endif #endif
@ -429,12 +443,16 @@ int log_read(struct log_status *logst, struct log_header *lh, void **payload, in
} }
status = log_scan(mref->ref_data + logst->offset, mref->ref_len - logst->offset, lh, payload, payload_len); status = log_scan(mref->ref_data + logst->offset, mref->ref_len - logst->offset, lh, payload, payload_len);
if (status < 0) { if (unlikely(status == 0)) {
MARS_ERR("bad logfile scan\n");
status = -EINVAL;
}
if (unlikely(status < 0)) {
goto done_free; goto done_free;
} }
logst->offset += status; logst->offset += status;
if (logst->offset < mref->ref_len) { if (logst->offset < mref->ref_len - logst->chunk_size) {
goto done; goto done;
} }

View File

@ -91,6 +91,7 @@ struct log_status {
// tunables // tunables
int align_size; // alignment between requests int align_size; // alignment between requests
int chunk_size; // must be at least 8K (better 64k) int chunk_size; // must be at least 8K (better 64k)
int io_prio;
// informational // informational
loff_t log_pos; loff_t log_pos;
// internal // internal
@ -98,7 +99,6 @@ struct log_status {
struct mars_output *output; struct mars_output *output;
struct generic_object_layout ref_object_layout; struct generic_object_layout ref_object_layout;
struct mars_info info; struct mars_info info;
int restlen;
int offset; int offset;
int validflag_offset; int validflag_offset;
int reallen_offset; int reallen_offset;

6
mars.h
View File

@ -97,12 +97,14 @@ struct mref_object_layout {
#ifdef MARS_TRACING #ifdef MARS_TRACING
extern unsigned long long start_trace_clock;
#define MAX_TRACES 16 #define MAX_TRACES 16
#define TRACING_INFO \ #define TRACING_INFO \
int ref_traces; \ int ref_traces; \
struct timespec ref_trace_stamp[MAX_TRACES]; \ unsigned long long ref_trace_stamp[MAX_TRACES]; \
const char *ref_trace_info[MAX_TRACES]; const char *ref_trace_info[MAX_TRACES];
extern void _mars_log(char *buf, int len); extern void _mars_log(char *buf, int len);
extern void mars_log(const char *fmt, ...); extern void mars_log(const char *fmt, ...);

View File

@ -329,6 +329,15 @@ static void bio_ref_io(struct bio_output *output, struct mref_object *mref)
MARS_IO("starting IO rw = %d fly = %d\n", rw, atomic_read(&output->brick->fly_count)); MARS_IO("starting IO rw = %d fly = %d\n", rw, atomic_read(&output->brick->fly_count));
mars_trace(mref, "bio_submit"); mars_trace(mref, "bio_submit");
#ifdef WAIT_CLASH
mref_a->hash_pos = (mref->ref_pos / PAGE_SIZE) % WAIT_CLASH;
if (mref->ref_rw) {
down_write(&output->brick->hashtable[mref_a->hash_pos]);
} else {
down_read(&output->brick->hashtable[mref_a->hash_pos]);
}
#endif
#ifdef FAKE_IO #ifdef FAKE_IO
bio->bi_end_io(bio, 0); bio->bi_end_io(bio, 0);
#else #else
@ -396,6 +405,13 @@ static int bio_thread(void *data)
mref = mref_a->object; mref = mref_a->object;
#ifdef WAIT_CLASH
if (mref_a->object->ref_rw) {
up_write(&brick->hashtable[mref_a->hash_pos]);
} else {
up_read(&brick->hashtable[mref_a->hash_pos]);
}
#endif
mars_trace(mref, "bio_endio"); mars_trace(mref, "bio_endio");
cb = mref->ref_cb; cb = mref->ref_cb;
@ -524,6 +540,12 @@ MARS_MAKE_STATICS(bio);
static int bio_brick_construct(struct bio_brick *brick) static int bio_brick_construct(struct bio_brick *brick)
{ {
#ifdef WAIT_CLASH
int i;
for (i = 0; i < WAIT_CLASH; i++) {
init_rwsem(&brick->hashtable[i]);
}
#endif
spin_lock_init(&brick->lock); spin_lock_init(&brick->lock);
INIT_LIST_HEAD(&brick->completed_list); INIT_LIST_HEAD(&brick->completed_list);
init_waitqueue_head(&brick->event); init_waitqueue_head(&brick->event);

View File

@ -3,6 +3,9 @@
#define MARS_BIO_H #define MARS_BIO_H
#include <linux/blkdev.h> #include <linux/blkdev.h>
#include <linux/rwsem.h>
//#define WAIT_CLASH 1024
struct bio_mref_aspect { struct bio_mref_aspect {
GENERIC_ASPECT(mref); GENERIC_ASPECT(mref);
@ -10,6 +13,7 @@ struct bio_mref_aspect {
struct bio *bio; struct bio *bio;
struct bio_output *output; struct bio_output *output;
int status_code; int status_code;
int hash_pos;
struct page *page; struct page *page;
//bool do_dealloc; //bool do_dealloc;
}; };
@ -28,6 +32,9 @@ struct bio_brick {
struct file *filp; struct file *filp;
struct block_device *bdev; struct block_device *bdev;
struct task_struct *thread; struct task_struct *thread;
#ifdef WAIT_CLASH
struct rw_semaphore hashtable[WAIT_CLASH];
#endif
int bvec_max; int bvec_max;
}; };

View File

@ -194,6 +194,11 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
goto error; goto error;
} }
while (output->brick->max_flying > 0 && atomic_read(&output->fly_count) > output->brick->max_flying) {
msleep(1000 * 2 / HZ);
}
atomic_inc(&output->fly_count);
atomic_inc(&mref->ref_count); atomic_inc(&mref->ref_count);
traced_lock(&output->lock, flags); traced_lock(&output->lock, flags);
@ -265,6 +270,8 @@ static int receiver_thread(void *data)
list_del_init(&mref_a->io_head); list_del_init(&mref_a->io_head);
traced_unlock(&output->lock, flags); traced_unlock(&output->lock, flags);
atomic_dec(&output->fly_count);
cb = mref->ref_cb; cb = mref->ref_cb;
cb->cb_fn(cb); cb->cb_fn(cb);
client_ref_put(output, mref); client_ref_put(output, mref);

View File

@ -12,6 +12,8 @@ struct client_mref_aspect {
struct client_brick { struct client_brick {
MARS_BRICK(client); MARS_BRICK(client);
// tunables
int max_flying; // limit on parallelism
}; };
struct client_input { struct client_input {
@ -26,6 +28,7 @@ struct client_threadinfo {
struct client_output { struct client_output {
MARS_OUTPUT(client); MARS_OUTPUT(client);
atomic_t fly_count;
spinlock_t lock; spinlock_t lock;
struct list_head mref_list; struct list_head mref_list;
struct list_head wait_list; struct list_head wait_list;

View File

@ -95,6 +95,9 @@ EXPORT_SYMBOL_GPL(mars_dent_meta);
#ifdef MARS_TRACING #ifdef MARS_TRACING
unsigned long long start_trace_clock = 0;
EXPORT_SYMBOL_GPL(start_trace_clock);
struct file *mars_log_file = NULL; struct file *mars_log_file = NULL;
loff_t mars_log_pos = 0; loff_t mars_log_pos = 0;
@ -136,8 +139,8 @@ EXPORT_SYMBOL_GPL(mars_log);
void mars_trace(struct mref_object *mref, const char *info) void mars_trace(struct mref_object *mref, const char *info)
{ {
int index = mref->ref_traces; int index = mref->ref_traces;
if (index < MAX_TRACES) { if (likely(index < MAX_TRACES)) {
mref->ref_trace_stamp[index] = CURRENT_TIME; mref->ref_trace_stamp[index] = cpu_clock(raw_smp_processor_id());
mref->ref_trace_info[index] = info; mref->ref_trace_info[index] = info;
mref->ref_traces++; mref->ref_traces++;
} }
@ -146,10 +149,9 @@ EXPORT_SYMBOL_GPL(mars_trace);
void mars_log_trace(struct mref_object *mref) void mars_log_trace(struct mref_object *mref)
{ {
static struct timespec first = {};
char *buf = kmalloc(PAGE_SIZE, GFP_MARS); char *buf = kmalloc(PAGE_SIZE, GFP_MARS);
struct timespec old = {}; unsigned long long old;
struct timespec diff; unsigned long long diff;
int i; int i;
int len; int len;
@ -159,19 +161,19 @@ void mars_log_trace(struct mref_object *mref)
if (!mars_log_file || !mref->ref_traces) { if (!mars_log_file || !mref->ref_traces) {
goto done; goto done;
} }
if (!first.tv_sec) { if (!start_trace_clock) {
first = mref->ref_trace_stamp[0]; start_trace_clock = mref->ref_trace_stamp[0];
} }
old = first;
diff = timespec_sub(mref->ref_trace_stamp[mref->ref_traces-1], old); diff = mref->ref_trace_stamp[mref->ref_traces-1] - mref->ref_trace_stamp[0];
len = sprintf(buf, "%c ;%11lld ;%4d;%2ld.%09ld", mref->ref_rw ? 'W' : 'R', mref->ref_pos, mref->ref_len, diff.tv_sec, diff.tv_nsec); len = sprintf(buf, "%c ;%12lld ;%6d;%10llu", mref->ref_rw ? 'W' : 'R', mref->ref_pos, mref->ref_len, diff / 1000);
old = start_trace_clock;
for (i = 0; i < mref->ref_traces; i++) { for (i = 0; i < mref->ref_traces; i++) {
diff = timespec_sub(mref->ref_trace_stamp[i], old); diff = mref->ref_trace_stamp[i] - old;
len += sprintf(buf + len, " ; %s ;%4ld.%09ld", mref->ref_trace_info[i], diff.tv_sec, diff.tv_nsec); len += sprintf(buf + len, " ; %s ;%10llu", mref->ref_trace_info[i], diff / 1000);
old = mref->ref_trace_stamp[i]; old = mref->ref_trace_stamp[i];
} }
len +=sprintf(buf + len, "\n"); len +=sprintf(buf + len, "\n");
@ -180,6 +182,7 @@ void mars_log_trace(struct mref_object *mref)
done: done:
kfree(buf); kfree(buf);
mref->ref_traces = 0;
} }
EXPORT_SYMBOL_GPL(mars_log_trace); EXPORT_SYMBOL_GPL(mars_log_trace);
@ -1210,6 +1213,10 @@ struct mars_brick *make_brick_all(
MARS_DBG("substitute by remote brick '%s' on peer '%s'\n", new_name, remote); MARS_DBG("substitute by remote brick '%s' on peer '%s'\n", new_name, remote);
brick = mars_make_brick(global, belongs, _client_brick_type, new_path, new_name); brick = mars_make_brick(global, belongs, _client_brick_type, new_path, new_name);
if (brick) {
struct client_brick *_brick = (void*)brick;
_brick->max_flying = 1000;
}
} }
} }
if (!brick && new_brick_type == _bio_brick_type && _aio_brick_type) { if (!brick && new_brick_type == _bio_brick_type && _aio_brick_type) {

View File

@ -58,13 +58,17 @@ struct light_class {
//#define FLUSH_DELAY (HZ / 100 + 1) //#define FLUSH_DELAY (HZ / 100 + 1)
#define FLUSH_DELAY 0 #define FLUSH_DELAY 0
#define TRANS_FAKE //#define TRANS_FAKE
#define CONF_TRANS_BATCHLEN 32 #define CONF_TRANS_BATCHLEN 32
#define CONF_TRANS_FLYING 4 #define CONF_TRANS_FLYING 4
#define CONF_TRANS_PRIO MARS_PRIO_HIGH
#define CONF_ALL_BATCHLEN 2 #define CONF_ALL_BATCHLEN 2
#define CONF_ALL_FLYING 0 #define CONF_ALL_FLYING 4
#define CONF_ALL_CONTENTION 0
#define CONF_ALL_PRESSURE 0
#define CONF_ALL_PRIO MARS_PRIO_LOW
#define CONF_ALL_MAX_QUEUE 10000 #define CONF_ALL_MAX_QUEUE 10000
#define CONF_ALL_MAX_JIFFIES (180 * HZ) #define CONF_ALL_MAX_JIFFIES (180 * HZ)
@ -95,6 +99,21 @@ void _set_trans_params(struct mars_brick *_brick, void *private)
trans_brick->outputs[0]->q_phase3.q_max_flying = CONF_ALL_FLYING; trans_brick->outputs[0]->q_phase3.q_max_flying = CONF_ALL_FLYING;
trans_brick->outputs[0]->q_phase4.q_max_flying = CONF_ALL_FLYING; trans_brick->outputs[0]->q_phase4.q_max_flying = CONF_ALL_FLYING;
trans_brick->outputs[0]->q_phase1.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->outputs[0]->q_phase2.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->outputs[0]->q_phase3.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->outputs[0]->q_phase4.q_max_contention = CONF_ALL_CONTENTION;
trans_brick->outputs[0]->q_phase1.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->outputs[0]->q_phase2.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->outputs[0]->q_phase3.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->outputs[0]->q_phase4.q_over_pressure = CONF_ALL_PRESSURE;
trans_brick->outputs[0]->q_phase1.q_io_prio = CONF_TRANS_PRIO;
trans_brick->outputs[0]->q_phase2.q_io_prio = CONF_ALL_PRIO;
trans_brick->outputs[0]->q_phase3.q_io_prio = CONF_ALL_PRIO;
trans_brick->outputs[0]->q_phase4.q_io_prio = CONF_ALL_PRIO;
trans_brick->outputs[0]->q_phase2.q_max_queued = CONF_ALL_MAX_QUEUE; trans_brick->outputs[0]->q_phase2.q_max_queued = CONF_ALL_MAX_QUEUE;
trans_brick->outputs[0]->q_phase4.q_max_queued = CONF_ALL_MAX_QUEUE; trans_brick->outputs[0]->q_phase4.q_max_queued = CONF_ALL_MAX_QUEUE;

View File

@ -71,13 +71,11 @@ bool q_is_ready(struct logger_queue *q)
if (dep) { if (dep) {
contention += atomic_read(&dep->q_queued) + atomic_read(&dep->q_flying); contention += atomic_read(&dep->q_queued) + atomic_read(&dep->q_flying);
} }
max_contention = q->q_dep_flying; max_contention = q->q_max_contention;
over = queued - q->q_max_queued; over = queued - q->q_max_queued;
#if 0 if (over > 0 && q->q_over_pressure > 0) {
if (over > 0) { max_contention += over / q->q_over_pressure;
max_contention += over / 128;
} }
#endif
#if 1 #if 1
/* 2) when other queues are too much contended, /* 2) when other queues are too much contended,
@ -333,6 +331,8 @@ static inline bool hash_put(struct trans_logger_output *output, struct trans_log
////////////////// own brick / input / output operations ////////////////// ////////////////// own brick / input / output operations //////////////////
static atomic_t global_mshadow_count = ATOMIC_INIT(0);
static int trans_logger_get_info(struct trans_logger_output *output, struct mars_info *info) static int trans_logger_get_info(struct trans_logger_output *output, struct mars_info *info)
{ {
struct trans_logger_input *input = output->brick->inputs[0]; struct trans_logger_input *input = output->brick->inputs[0];
@ -414,6 +414,7 @@ static int _write_ref_get(struct trans_logger_output *output, struct trans_logge
} }
mref->ref_data = data; mref->ref_data = data;
atomic_inc(&output->mshadow_count); atomic_inc(&output->mshadow_count);
atomic_inc(&global_mshadow_count);
#ifdef USE_MEMCPY #ifdef USE_MEMCPY
if (mref_a->orig_data) { if (mref_a->orig_data) {
memcpy(mref->ref_data, mref_a->orig_data, mref->ref_len); memcpy(mref->ref_data, mref_a->orig_data, mref->ref_len);
@ -516,6 +517,7 @@ restart:
#endif #endif
mref->ref_data = NULL; mref->ref_data = NULL;
atomic_dec(&output->mshadow_count); atomic_dec(&output->mshadow_count);
atomic_dec(&global_mshadow_count);
trans_logger_free_mref(mref); trans_logger_free_mref(mref);
return; return;
} }
@ -828,7 +830,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
goto err; goto err;
} }
mars_trace(sub_mref, "sub_start"); mars_trace(sub_mref, "sub_create");
atomic_inc(&output->sub_balance_count); atomic_inc(&output->sub_balance_count);
pos += sub_mref->ref_len; pos += sub_mref->ref_len;
@ -848,9 +850,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
cb->cb_prev = NULL; cb->cb_prev = NULL;
sub_mref->ref_cb = cb; sub_mref->ref_cb = cb;
sub_mref->ref_rw = 0; sub_mref->ref_rw = 0;
#if 1 sub_mref->ref_prio = output->q_phase2.q_io_prio;
sub_mref->ref_prio = MARS_PRIO_LOW;
#endif
atomic_inc(&output->q_phase2.q_flying); atomic_inc(&output->q_phase2.q_flying);
if (output->brick->log_reads) { if (output->brick->log_reads) {
@ -969,6 +969,7 @@ static void phase4_endio(struct generic_callback *cb)
CHECK_PTR(orig_mref, err); CHECK_PTR(orig_mref, err);
mars_trace(sub_mref_a->object, "sub_endio"); mars_trace(sub_mref_a->object, "sub_endio");
mars_log_trace(sub_mref_a->object);
atomic_dec(&output->q_phase4.q_flying); atomic_dec(&output->q_phase4.q_flying);
@ -989,6 +990,7 @@ static void phase4_endio(struct generic_callback *cb)
list_del_init(tmp); list_del_init(tmp);
traced_unlock(&brick->pos_lock, flags); traced_unlock(&brick->pos_lock, flags);
mars_log_trace(sub_mref_a->object);
put: put:
//MARS_INF("put ORIGREF.\n"); //MARS_INF("put ORIGREF.\n");
@ -1029,9 +1031,14 @@ static bool phase4_startio(struct trans_logger_mref_aspect *sub_mref_a)
cb->cb_prev = NULL; cb->cb_prev = NULL;
sub_mref->ref_cb = cb; sub_mref->ref_cb = cb;
sub_mref->ref_rw = 1; sub_mref->ref_rw = 1;
sub_mref->ref_prio = output->q_phase4.q_io_prio;
atomic_inc(&output->q_phase4.q_flying); atomic_inc(&output->q_phase4.q_flying);
atomic_inc(&output->total_writeback_count); atomic_inc(&output->total_writeback_count);
mars_log_trace(sub_mref);
mars_trace(sub_mref, "sub_start");
if (orig_mref_a->is_outdated || output->brick->debug_shortcut) { if (orig_mref_a->is_outdated || output->brick->debug_shortcut) {
MARS_IO("SHORTCUT %d\n", sub_mref->ref_len); MARS_IO("SHORTCUT %d\n", sub_mref->ref_len);
atomic_inc(&output->total_shortcut_count); atomic_inc(&output->total_shortcut_count);
@ -1108,6 +1115,8 @@ void trans_logger_log(struct trans_logger_output *output)
long long last_jiffies = jiffies; long long last_jiffies = jiffies;
long long log_jiffies = jiffies; long long log_jiffies = jiffies;
mars_power_led_on((void*)brick, true);
while (!kthread_should_stop() || _congested(output)) { while (!kthread_should_stop() || _congested(output)) {
int status; int status;
@ -1126,7 +1135,22 @@ void trans_logger_log(struct trans_logger_output *output)
//MARS_INF("AHA %d\n", atomic_read(&output->q_phase1.q_queued)); //MARS_INF("AHA %d\n", atomic_read(&output->q_phase1.q_queued));
#if 1 #if 1
if (((long long)jiffies) - last_jiffies >= HZ * 10 && brick->power.button) { {
static int old_mshadow_count = 0;
int cnt;
cnt = atomic_read(&global_mshadow_count);
if (cnt + old_mshadow_count > 0 && cnt != old_mshadow_count) {
unsigned long long now = cpu_clock(raw_smp_processor_id());
if (!start_trace_clock)
start_trace_clock = now;
now -= start_trace_clock;
mars_log("shadow_count ;%12lld ; %4d\n", now / 1000, cnt);
}
old_mshadow_count = cnt;
}
if (((long long)jiffies) - last_jiffies >= HZ * 5 && brick->power.button) {
last_jiffies = jiffies; last_jiffies = jiffies;
MARS_INF("LOGGER: reads=%d writes=%d writeback=%d shortcut=%d (%d%%) | mshadow=%d sshadow=%d hash_count=%d balance=%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", atomic_read(&output->total_read_count), atomic_read(&output->total_write_count), atomic_read(&output->total_writeback_count), atomic_read(&output->total_shortcut_count), atomic_read(&output->total_writeback_count) ? atomic_read(&output->total_shortcut_count) * 100 / atomic_read(&output->total_writeback_count) : 0, atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->sub_balance_count), atomic_read(&output->inner_balance_count), atomic_read(&output->outer_balance_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying)); MARS_INF("LOGGER: reads=%d writes=%d writeback=%d shortcut=%d (%d%%) | mshadow=%d sshadow=%d hash_count=%d balance=%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", atomic_read(&output->total_read_count), atomic_read(&output->total_write_count), atomic_read(&output->total_writeback_count), atomic_read(&output->total_shortcut_count), atomic_read(&output->total_writeback_count) ? atomic_read(&output->total_shortcut_count) * 100 / atomic_read(&output->total_writeback_count) : 0, atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->sub_balance_count), atomic_read(&output->inner_balance_count), atomic_read(&output->outer_balance_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying));
} }
@ -1209,7 +1233,7 @@ int apply_data(struct trans_logger_output *output, struct log_header *lh, void *
* The switch infrastructure must be changed before this * The switch infrastructure must be changed before this
* can become useful. * can become useful.
*/ */
#if 1 #if 0
while (len > 0) { while (len > 0) {
struct mref_object *mref; struct mref_object *mref;
struct trans_logger_mref_aspect *mref_a; struct trans_logger_mref_aspect *mref_a;
@ -1285,6 +1309,9 @@ void trans_logger_replay(struct trans_logger_output *output)
MARS_ERR("cannot read logfile data, status = %d\n", status); MARS_ERR("cannot read logfile data, status = %d\n", status);
break; break;
} }
if (!buf || !len) {
continue;
}
status = apply_data(output, &lh, buf, len); status = apply_data(output, &lh, buf, len);
if (status < 0) { if (status < 0) {
@ -1302,16 +1329,15 @@ void trans_logger_replay(struct trans_logger_output *output)
if (brick->replay_pos == brick->end_pos) { if (brick->replay_pos == brick->end_pos) {
MARS_INF("replay finished at %lld\n", brick->replay_pos); MARS_INF("replay finished at %lld\n", brick->replay_pos);
mars_power_led_on((void*)brick, true); #if 1
while (!kthread_should_stop()) {
mars_power_led_on((void*)brick, true);
msleep(500);
}
#endif
} else { } else {
MARS_INF("replay stopped prematurely at %lld (of %lld)\n", brick->replay_pos, brick->end_pos); MARS_INF("replay stopped prematurely at %lld (of %lld)\n", brick->replay_pos, brick->end_pos);
mars_power_led_off((void*)brick, true);
} }
#if 1
while (!kthread_should_stop()) {
msleep(500);
}
#endif
} }
///////////////////////// logger thread / switching ///////////////////////// ///////////////////////// logger thread / switching /////////////////////////
@ -1326,7 +1352,6 @@ int trans_logger_thread(void *data)
brick->current_pos = brick->start_pos; brick->current_pos = brick->start_pos;
brick->logst.log_pos = brick->current_pos; brick->logst.log_pos = brick->current_pos;
mars_power_led_on((void*)brick, true);
brick->logst.align_size = brick->align_size; brick->logst.align_size = brick->align_size;
brick->logst.chunk_size = brick->chunk_size; brick->logst.chunk_size = brick->chunk_size;
@ -1338,6 +1363,7 @@ int trans_logger_thread(void *data)
} }
MARS_INF("........... logger has stopped.\n"); MARS_INF("........... logger has stopped.\n");
mars_power_led_on((void*)brick, false);
mars_power_led_off((void*)brick, true); mars_power_led_off((void*)brick, true);
return 0; return 0;
} }

View File

@ -33,7 +33,9 @@ struct logger_queue {
int q_max_queued; int q_max_queued;
int q_max_flying; int q_max_flying;
int q_max_jiffies; int q_max_jiffies;
int q_dep_flying; int q_max_contention;
int q_over_pressure;
int q_io_prio;
bool q_ordering; bool q_ordering;
}; };