import mars-76.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-03-10 12:40:06 +01:00
parent 5ede8cd9ce
commit ceb8529521
11 changed files with 424 additions and 290 deletions

View File

@ -2,7 +2,7 @@
# Makefile for MARS # Makefile for MARS
# #
obj-$(CONFIG_MARS) += brick.o mars_generic.o mars_net.o mars_proc.o obj-$(CONFIG_MARS) += brick.o log_format.o mars_generic.o mars_net.o mars_proc.o
obj-$(CONFIG_MARS_DUMMY) += mars_dummy.o obj-$(CONFIG_MARS_DUMMY) += mars_dummy.o
obj-$(CONFIG_MARS_CHECK) += mars_check.o obj-$(CONFIG_MARS_CHECK) += mars_check.o
obj-$(CONFIG_MARS_IF) += mars_if.o obj-$(CONFIG_MARS_IF) += mars_if.o

181
log_format.c Normal file
View File

@ -0,0 +1,181 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/bio.h>
#include "log_format.h"
void init_logst(struct log_status *logst, struct mars_input *input, struct mars_output *output)
{
memset(logst, sizeof(*logst), 0);
logst->input = input;
logst->output = output;
}
EXPORT_SYMBOL_GPL(init_logst);
void log_skip(struct log_status *logst)
{
int bits;
if (!logst->info.transfer_size) {
int status = GENERIC_INPUT_CALL(logst->input, mars_get_info, &logst->info);
if (status < 0) {
MARS_FAT("cannot get transfer log info (code=%d)\n", status);
}
}
bits = logst->info.transfer_order + PAGE_SHIFT;
logst->log_pos = ((logst->log_pos >> bits) + 1) << bits;
}
EXPORT_SYMBOL_GPL(log_skip);
void *log_reserve(struct log_status *logst, struct log_header *lh)
{
struct mref_object *mref;
void *data;
int total_len;
int status;
int offset;
MARS_DBG("reserving %d bytes at %lld\n", lh->l_len, logst->log_pos);
if (unlikely(logst->log_mref)) {
MARS_ERR("mref already existing\n");
goto err;
}
mref = mars_alloc_mref(logst->output, &logst->ref_object_layout);
if (unlikely(!mref))
goto err;
mref->ref_pos = logst->log_pos;
total_len = lh->l_len + OVERHEAD;
mref->ref_len = total_len;
mref->ref_may_write = WRITE;
#if 1
mref->ref_prio = MARS_PRIO_LOW;
#endif
status = GENERIC_INPUT_CALL(logst->input, mref_get, mref);
if (unlikely(status < 0)) {
goto err_free;
}
if (unlikely(mref->ref_len < total_len)) {
goto put;
}
logst->log_mref = mref;
data = mref->ref_data;
offset = 0;
DATA_PUT(data, offset, START_MAGIC);
DATA_PUT(data, offset, (char)FORMAT_VERSION);
logst->validflag_offset = offset;
DATA_PUT(data, offset, (char)0); // valid_flag
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, total_len); // start of next header
DATA_PUT(data, offset, lh->l_stamp.tv_sec);
DATA_PUT(data, offset, lh->l_stamp.tv_nsec);
DATA_PUT(data, offset, lh->l_pos);
logst->reallen_offset = offset;
DATA_PUT(data, offset, lh->l_len);
DATA_PUT(data, offset, lh->l_code);
logst->payload_offset = offset;
logst->payload_len = lh->l_len;
return data + offset;
put:
GENERIC_INPUT_CALL(logst->input, mref_put, mref);
return NULL;
err_free:
mars_free_mref(mref);
err:
return NULL;
}
EXPORT_SYMBOL_GPL(log_reserve);
bool log_finalize(struct log_status *logst, int len, void (*endio)(struct generic_callback *cb), void *private)
{
struct mref_object *mref = logst->log_mref;
struct generic_callback *cb;
struct timespec now;
void *data;
int offset;
bool ok = false;
CHECK_PTR(mref, err);
logst->log_mref = NULL;
if (unlikely(len > logst->payload_len)) {
MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->payload_len);
goto put;
}
data = mref->ref_data;
/* Correct the length in the header.
*/
offset = logst->reallen_offset;
DATA_PUT(data, offset, len);
/* Write the trailer.
*/
offset = logst->payload_offset + len;
DATA_PUT(data, offset, END_MAGIC);
DATA_PUT(data, offset, (char)1); // valid_flag copy
DATA_PUT(data, offset, (char)0); // spare
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, (int)0); // spare
get_lamport(&now); // when the log entry was ready.
DATA_PUT(data, offset, now.tv_sec);
DATA_PUT(data, offset, now.tv_nsec);
logst->log_pos += offset;
/* This must come last. In case of incomplete
* or even operlapping disk transfers, this indicates
* the completeness / integrity of the payload at
* the time of starting the transfer.
*/
offset = logst->validflag_offset;
DATA_PUT(data, offset, (char)1);
cb = &mref->_ref_cb;
cb->cb_fn = endio;
cb->cb_error = 0;
cb->cb_prev = NULL;
cb->cb_private = private;
mref->ref_cb = cb;
mref->ref_rw = 1;
GENERIC_INPUT_CALL(logst->input, mref_io, mref);
ok = true;
put:
GENERIC_INPUT_CALL(logst->input, mref_put, mref);
err:
return ok;
}
EXPORT_SYMBOL_GPL(log_finalize);
////////////////// module init stuff /////////////////////////
static int __init init_log_format(void)
{
MARS_INF("init_log_format()\n");
return 0;
}
static void __exit exit_log_format(void)
{
MARS_INF("exit_log_format()\n");
}
MODULE_DESCRIPTION("MARS log_format infrastucture");
MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@1und1.de>");
MODULE_LICENSE("GPL");
module_init(init_log_format);
module_exit(exit_log_format);

View File

@ -19,29 +19,15 @@
* by old code (of course, not all information / features will be * by old code (of course, not all information / features will be
* available then). * available then).
*/ */
struct log_header { #define log_header log_header_v1
struct log_header_v1 {
struct timespec l_stamp; struct timespec l_stamp;
loff_t l_pos; loff_t l_pos;
int l_len; int l_len;
int l_code; int l_code;
}; };
/* Bookkeeping status between calls
*/
struct log_status {
struct mars_input *input;
struct mars_output hidden_output;
struct generic_object_layout ref_object_layout;
struct mars_info info;
loff_t log_pos;
int validflag_offset;
int reallen_offset;
int payload_offset;
int payload_len;
struct mref_object *log_mref;
};
#define FORMAT_VERSION 1 // version of disk format, currently there is no other one #define FORMAT_VERSION 1 // version of disk format, currently there is no other one
#define CODE_UNKNOWN 0 #define CODE_UNKNOWN 0
@ -79,148 +65,33 @@ struct log_status {
offset += sizeof(val); \ offset += sizeof(val); \
} while (0) } while (0)
static inline ////////////////////////////////////////////////////////////////////////////
void log_skip(struct log_status *logst)
{
int bits;
if (!logst->info.transfer_size) {
int status = GENERIC_INPUT_CALL(logst->input, mars_get_info, &logst->info);
if (status < 0) {
MARS_FAT("cannot get transfer log info (code=%d)\n", status);
}
}
bits = logst->info.transfer_order + PAGE_SHIFT;
logst->log_pos = ((logst->log_pos >> bits) + 1) << bits;
}
static inline #ifdef __KERNEL__
void *log_reserve(struct log_status *logst, struct log_header *l)
{
struct mref_object *mref;
void *data;
int total_len;
int status;
int offset;
MARS_DBG("reserving %d bytes at %lld\n", l->l_len, logst->log_pos); /* Bookkeeping status between calls
*/
struct log_status {
struct mars_input *input;
struct mars_output *output;
struct generic_object_layout ref_object_layout;
if (unlikely(logst->log_mref)) { struct mars_info info;
MARS_ERR("mref already existing\n"); loff_t log_pos;
goto err; int validflag_offset;
} int reallen_offset;
int payload_offset;
int payload_len;
struct mref_object *log_mref;
};
mref = mars_alloc_mref(&logst->hidden_output, &logst->ref_object_layout); void init_logst(struct log_status *logst, struct mars_input *input, struct mars_output *output);
if (unlikely(!mref))
goto err;
mref->ref_pos = logst->log_pos; void log_skip(struct log_status *logst);
total_len = l->l_len + OVERHEAD;
mref->ref_len = total_len;
mref->ref_may_write = WRITE;
status = GENERIC_INPUT_CALL(logst->input, mref_get, mref); void *log_reserve(struct log_status *logst, struct log_header *lh);
if (unlikely(status < 0)) {
goto err_free;
}
if (unlikely(mref->ref_len < total_len)) {
goto put;
}
logst->log_mref = mref;
data = mref->ref_data;
offset = 0;
DATA_PUT(data, offset, START_MAGIC);
DATA_PUT(data, offset, (char)FORMAT_VERSION);
logst->validflag_offset = offset;
DATA_PUT(data, offset, (char)0); // valid_flag
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, total_len); // start of next header
DATA_PUT(data, offset, l->l_stamp.tv_sec);
DATA_PUT(data, offset, l->l_stamp.tv_nsec);
DATA_PUT(data, offset, l->l_pos);
logst->reallen_offset = offset;
DATA_PUT(data, offset, l->l_len);
DATA_PUT(data, offset, l->l_code);
logst->payload_offset = offset;
logst->payload_len = l->l_len;
return data + offset;
put:
GENERIC_INPUT_CALL(logst->input, mref_put, mref);
return NULL;
err_free:
mars_free_mref(mref);
err:
return NULL;
}
static inline
bool log_finalize(struct log_status *logst, int len, void (*endio)(struct generic_callback *cb), void *private)
{
struct mref_object *mref = logst->log_mref;
struct generic_callback *cb;
struct timespec now;
void *data;
int offset;
bool ok = false;
CHECK_PTR(mref, err);
logst->log_mref = NULL;
if (unlikely(len > logst->payload_len)) {
MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->payload_len);
goto put;
}
data = mref->ref_data;
/* Correct the length in the header.
*/
offset = logst->reallen_offset;
DATA_PUT(data, offset, len);
/* Write the trailer.
*/
offset = logst->payload_offset + len;
DATA_PUT(data, offset, END_MAGIC);
DATA_PUT(data, offset, (char)1); // valid_flag copy
DATA_PUT(data, offset, (char)0); // spare
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, (int)0); // spare
get_lamport(&now); // when the log entry was ready.
DATA_PUT(data, offset, now.tv_sec);
DATA_PUT(data, offset, now.tv_nsec);
logst->log_pos += offset;
/* This must come last. In case of incomplete
* or even operlapping disk transfers, this indicates
* the completeness / integrity of the payload at
* the time of starting the transfer.
*/
offset = logst->validflag_offset;
DATA_PUT(data, offset, (char)1);
cb = &mref->_ref_cb;
cb->cb_fn = endio;
cb->cb_error = 0;
cb->cb_prev = NULL;
cb->cb_private = private;
mref->ref_cb = cb;
mref->ref_rw = 1;
GENERIC_INPUT_CALL(logst->input, mref_io, mref);
ok = true;
put:
GENERIC_INPUT_CALL(logst->input, mref_put, mref);
err:
return ok;
}
bool log_finalize(struct log_status *logst, int len, void (*endio)(struct generic_callback *cb), void *private);
#endif #endif
#endif

6
mars.h
View File

@ -48,6 +48,9 @@
// MARS-specific definitions // MARS-specific definitions
#define MARS_PRIO_HIGH 0
#define MARS_PRIO_LOW 1
// object stuff // object stuff
/* mref */ /* mref */
@ -73,10 +76,11 @@ struct mref_object_layout {
#define MREF_OBJECT(PREFIX) \ #define MREF_OBJECT(PREFIX) \
GENERIC_OBJECT(PREFIX); \ GENERIC_OBJECT(PREFIX); \
/* supplied by caller */ \ /* supplied by caller */ \
void *ref_data; /* preset to NULL for buffered IO */ \
loff_t ref_pos; \ loff_t ref_pos; \
int ref_len; \ int ref_len; \
int ref_may_write; \ int ref_may_write; \
void *ref_data; /* preset to NULL for buffered IO */ \ int ref_prio; \
int ref_timeout; \ int ref_timeout; \
/* maintained by the ref implementation, readable for callers */ \ /* maintained by the ref implementation, readable for callers */ \
int ref_flags; \ int ref_flags; \

View File

@ -28,68 +28,52 @@
////////////////// some helpers ////////////////// ////////////////// some helpers //////////////////
static static inline
void _queue(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a) void _enqueue(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a, int prio, bool at_end)
{ {
unsigned long flags; unsigned long flags;
if (prio > MARS_PRIO_LOW)
prio = MARS_PRIO_LOW;
if (prio < MARS_PRIO_HIGH)
prio = MARS_PRIO_HIGH;
traced_lock(&tinfo->lock, flags); traced_lock(&tinfo->lock, flags);
list_add_tail(&mref_a->io_head, &tinfo->mref_list); if (at_end) {
list_add_tail(&mref_a->io_head, &tinfo->mref_list[prio]);
traced_unlock(&tinfo->lock, flags); } else {
} list_add(&mref_a->io_head, &tinfo->mref_list[prio]);
static
void _delay(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a, int delta_jiffies)
{
long long timeout = (long long)jiffies + delta_jiffies;
unsigned long flags;
mref_a->timeout = timeout;
traced_lock(&tinfo->lock, flags);
list_add_tail(&mref_a->io_head, &tinfo->delay_list);
traced_unlock(&tinfo->lock, flags);
}
static
struct aio_mref_aspect *__get_delayed(struct aio_threadinfo *tinfo, bool remove)
{
struct list_head *tmp;
struct aio_mref_aspect *mref_a;
if (list_empty(&tinfo->delay_list))
return NULL;
tmp = tinfo->delay_list.next;
mref_a = container_of(tmp, struct aio_mref_aspect, io_head);
if (mref_a->timeout < (long long)jiffies) {
mref_a = NULL;
} else if (remove) {
list_del_init(tmp);
} }
return mref_a;
}
static
struct aio_mref_aspect *_get_delayed(struct aio_threadinfo *tinfo, bool remove)
{
struct aio_mref_aspect *mref_a;
unsigned long flags;
traced_lock(&tinfo->lock, flags);
mref_a = __get_delayed(tinfo, remove);
traced_unlock(&tinfo->lock, flags); traced_unlock(&tinfo->lock, flags);
return mref_a;
} }
static inline
struct aio_mref_aspect *_dequeue(struct aio_threadinfo *tinfo, bool do_remove)
{
struct aio_mref_aspect *mref_a = NULL;
int prio;
unsigned long flags = 0;
if (do_remove)
traced_lock(&tinfo->lock, flags);
for (prio = MARS_PRIO_HIGH; prio <= MARS_PRIO_LOW; prio++) {
struct list_head *tmp = tinfo->mref_list[prio].next;
if (tmp != &tinfo->mref_list[prio]) {
if (do_remove) {
list_del_init(tmp);
}
mref_a = container_of(tmp, struct aio_mref_aspect, io_head);
goto done;
}
}
done:
if (do_remove)
traced_unlock(&tinfo->lock, flags);
return mref_a;
}
////////////////// own brick / input / output operations ////////////////// ////////////////// own brick / input / output operations //////////////////
@ -184,7 +168,7 @@ static void aio_ref_io(struct aio_output *output, struct mref_object *mref)
goto done; goto done;
} }
_queue(tinfo, mref_a); _enqueue(tinfo, mref_a, mref->ref_prio, true);
wake_up_interruptible(&tinfo->event); wake_up_interruptible(&tinfo->event);
return; return;
@ -263,49 +247,40 @@ static int aio_submit_thread(void *data)
return -ENOMEM; return -ENOMEM;
while (!kthread_should_stop()) { while (!kthread_should_stop()) {
struct list_head *tmp = NULL;
struct aio_mref_aspect *mref_a; struct aio_mref_aspect *mref_a;
struct mref_object *mref; struct mref_object *mref;
unsigned long flags;
int err; int err;
wait_event_interruptible_timeout( wait_event_interruptible_timeout(
tinfo->event, tinfo->event,
!list_empty(&tinfo->mref_list) || kthread_should_stop() ||
_get_delayed(tinfo, false) || _dequeue(tinfo, false),
kthread_should_stop(),
HZ); HZ);
traced_lock(&tinfo->lock, flags); mref_a = _dequeue(tinfo, true);
if (!mref_a) {
if (!list_empty(&tinfo->mref_list)) {
tmp = tinfo->mref_list.next;
list_del_init(tmp);
mref_a = container_of(tmp, struct aio_mref_aspect, io_head);
} else {
mref_a = __get_delayed(tinfo, true);
}
traced_unlock(&tinfo->lock, flags);
if (!mref_a)
continue; continue;
}
// check for reads behind EOF // check for reads behind EOF
mref = mref_a->object; mref = mref_a->object;
if (!mref->ref_rw && mref->ref_pos + mref->ref_len > i_size_read(file->f_mapping->host)) { if (!mref->ref_rw && mref->ref_pos + mref->ref_len > i_size_read(file->f_mapping->host)) {
if (!mref->ref_timeout || mref->ref_timeout < (long long)jiffies) { if (mref->ref_timeout > 0 &&
_complete(output, mref, -ENODATA); ((!mref_a->start_jiffies && (mref_a->start_jiffies = jiffies, true)) ||
mref_a->start_jiffies + mref->ref_timeout >= (long long)jiffies)) {
msleep(50);
_enqueue(tinfo, mref_a, mref->ref_prio, true);
continue; continue;
} }
_delay(tinfo, mref_a, HZ/2); _complete(output, mref, -ENODATA);
continue; continue;
} }
err = aio_submit(output, mref_a, false); err = aio_submit(output, mref_a, false);
if (err == -EAGAIN) { if (err == -EAGAIN) {
_delay(tinfo, mref_a, (HZ/100)+1); _enqueue(tinfo, mref_a, mref->ref_prio, false);
msleep(20);
continue; continue;
} }
if (unlikely(err < 0)) { if (unlikely(err < 0)) {
@ -387,10 +362,11 @@ static int aio_event_thread(void *data)
if (output->o_fdsync if (output->o_fdsync
&& err >= 0 && err >= 0
&& mref->ref_rw != 0 && mref->ref_rw != READ
&& !mref_a->resubmit++) { && !mref_a->resubmit++) {
// workaround for non-implemented AIO FSYNC operation
if (!output->filp->f_op->aio_fsync) { if (!output->filp->f_op->aio_fsync) {
_queue(other, mref_a); _enqueue(other, mref_a, mref->ref_prio, true);
bounced++; bounced++;
continue; continue;
} }
@ -430,17 +406,22 @@ static int aio_sync_thread(void *data)
while (!kthread_should_stop()) { while (!kthread_should_stop()) {
LIST_HEAD(tmp_list); LIST_HEAD(tmp_list);
unsigned long flags; unsigned long flags;
int i;
int err; int err;
wait_event_interruptible_timeout( wait_event_interruptible_timeout(
tinfo->event, tinfo->event,
!list_empty(&tinfo->mref_list) || kthread_should_stop(), kthread_should_stop() ||
_dequeue(tinfo, false),
HZ); HZ);
traced_lock(&tinfo->lock, flags); traced_lock(&tinfo->lock, flags);
if (!list_empty(&tinfo->mref_list)) { for (i = MARS_PRIO_HIGH; i <= MARS_PRIO_LOW; i++) {
// move over the whole list if (!list_empty(&tinfo->mref_list[i])) {
list_replace_init(&tinfo->mref_list, &tmp_list); // move over the whole list
list_replace_init(&tinfo->mref_list[i], &tmp_list);
break;
}
} }
traced_unlock(&tinfo->lock, flags); traced_unlock(&tinfo->lock, flags);
@ -564,8 +545,10 @@ static int aio_switch(struct aio_brick *brick)
aio_sync_thread, aio_sync_thread,
}; };
struct aio_threadinfo *tinfo = &output->tinfo[i]; struct aio_threadinfo *tinfo = &output->tinfo[i];
INIT_LIST_HEAD(&tinfo->mref_list); int j;
INIT_LIST_HEAD(&tinfo->delay_list); for (j = MARS_PRIO_HIGH; j <= MARS_PRIO_LOW; j++) {
INIT_LIST_HEAD(&tinfo->mref_list[j]);
}
tinfo->output = output; tinfo->output = output;
spin_lock_init(&tinfo->lock); spin_lock_init(&tinfo->lock);
init_waitqueue_head(&tinfo->event); init_waitqueue_head(&tinfo->event);

View File

@ -8,7 +8,7 @@
struct aio_mref_aspect { struct aio_mref_aspect {
GENERIC_ASPECT(mref); GENERIC_ASPECT(mref);
struct list_head io_head; struct list_head io_head;
long long timeout; long long start_jiffies;
int resubmit; int resubmit;
bool do_dealloc; bool do_dealloc;
}; };
@ -22,8 +22,7 @@ struct aio_input {
}; };
struct aio_threadinfo { struct aio_threadinfo {
struct list_head mref_list; struct list_head mref_list[MARS_PRIO_LOW+1];
struct list_head delay_list;
struct aio_output *output; struct aio_output *output;
struct task_struct *thread; struct task_struct *thread;
wait_queue_head_t event; wait_queue_head_t event;

View File

@ -178,6 +178,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
len = tmp_pos - pos; len = tmp_pos - pos;
} }
mref->ref_len = len; mref->ref_len = len;
mref->ref_prio = MARS_PRIO_LOW;
mref->_ref_cb.cb_private = mref_a; mref->_ref_cb.cb_private = mref_a;
mref->_ref_cb.cb_fn = copy_endio; mref->_ref_cb.cb_fn = copy_endio;
mref->ref_cb = &mref->_ref_cb; mref->ref_cb = &mref->_ref_cb;

View File

@ -36,6 +36,7 @@ const struct meta mars_mref_meta[] = {
META_INI(ref_pos, struct mref_object, FIELD_INT), META_INI(ref_pos, struct mref_object, FIELD_INT),
META_INI(ref_len, struct mref_object, FIELD_INT), META_INI(ref_len, struct mref_object, FIELD_INT),
META_INI(ref_may_write, struct mref_object, FIELD_INT), META_INI(ref_may_write, struct mref_object, FIELD_INT),
META_INI(ref_prio, struct mref_object, FIELD_INT),
META_INI(ref_timeout, struct mref_object, FIELD_INT), META_INI(ref_timeout, struct mref_object, FIELD_INT),
META_INI(ref_flags, struct mref_object, FIELD_INT), META_INI(ref_flags, struct mref_object, FIELD_INT),
META_INI(ref_rw, struct mref_object, FIELD_INT), META_INI(ref_rw, struct mref_object, FIELD_INT),

View File

@ -28,6 +28,7 @@
#include "mars_trans_logger.h" #include "mars_trans_logger.h"
#include "mars_if.h" #include "mars_if.h"
static struct task_struct *main_thread = NULL; static struct task_struct *main_thread = NULL;
typedef int (*light_worker_fn)(void *buf, struct mars_dent *dent); typedef int (*light_worker_fn)(void *buf, struct mars_dent *dent);
@ -43,6 +44,47 @@ struct light_class {
light_worker_fn cl_backward; light_worker_fn cl_backward;
}; };
///////////////////////////////////////////////////////////////////////
// TUNING
//#define CONF_TRANS_LOGLEN 1000
#define CONF_TRANS_LOGLEN 8
#define CONF_TRANS_BATCHLEN 4
#define CONF_TRANS_FLYING 4
//#define CONF_TRANS_MAX_QUEUE 1000
#define CONF_TRANS_MAX_QUEUE 5000
#define CONF_TRANS_MAX_JIFFIES (30 * HZ)
static
void _set_trans_params(struct trans_logger_brick *trans_brick)
{
if (!trans_brick->outputs[0]->q_phase2.q_ordering) {
trans_brick->outputs[0]->q_phase1.q_batchlen = CONF_TRANS_LOGLEN;
trans_brick->outputs[0]->q_phase2.q_batchlen = CONF_TRANS_BATCHLEN;
trans_brick->outputs[0]->q_phase3.q_batchlen = CONF_TRANS_BATCHLEN;
trans_brick->outputs[0]->q_phase4.q_batchlen = CONF_TRANS_BATCHLEN;
trans_brick->outputs[0]->q_phase2.q_max_queued = CONF_TRANS_MAX_QUEUE;
trans_brick->outputs[0]->q_phase4.q_max_queued = CONF_TRANS_MAX_QUEUE;
trans_brick->outputs[0]->q_phase2.q_max_jiffies = CONF_TRANS_MAX_JIFFIES;
trans_brick->outputs[0]->q_phase4.q_max_jiffies = CONF_TRANS_MAX_JIFFIES;
trans_brick->outputs[0]->q_phase2.q_max_flying = CONF_TRANS_FLYING;
trans_brick->outputs[0]->q_phase4.q_max_flying = CONF_TRANS_FLYING;
trans_brick->outputs[0]->q_phase2.q_ordering = true;
trans_brick->outputs[0]->q_phase4.q_ordering = true;
trans_brick->log_reads = false;
if (!trans_brick->log_reads) {
trans_brick->outputs[0]->q_phase2.q_max_queued = 0;
trans_brick->outputs[0]->q_phase4.q_max_queued *= 2;
}
}
}
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
// internal helpers // internal helpers
@ -867,6 +909,8 @@ int make_log_init(void *buf, struct mars_dent *parent)
*/ */
rot->do_replay = true; rot->do_replay = true;
_set_trans_params((void*)trans_brick);
status = 0; status = 0;
done: done:

View File

@ -47,25 +47,33 @@ static inline void q_init(struct logger_queue *q)
} }
static static
bool q_is_ready(struct logger_queue *q) bool q_is_ready(struct logger_queue *q, bool do_drain)
{ {
int queued = atomic_read(&q->q_queued); int queued = atomic_read(&q->q_queued);
int flying; int flying;
bool res = false; bool res = false;
if (queued <= 0) if (queued <= 0)
goto always_done; goto always_done;
res = true; res = true;
if (queued >= q->q_max_queued) if (do_drain || queued >= q->q_max_queued)
goto done; goto done;
if (q->q_max_jiffies > 0 && if (q->q_max_jiffies > 0 &&
(long long)jiffies - q->q_last_action >= q->q_max_jiffies) (long long)jiffies - q->q_last_action >= q->q_max_jiffies)
goto done; goto done;
res = false; res = false;
goto always_done; goto always_done;
done: done:
/* Limit the number of flying requests (parallelism)
*/
flying = atomic_read(&q->q_flying); flying = atomic_read(&q->q_flying);
if (q->q_max_flying > 0 && flying >= q->q_max_flying) if (q->q_max_flying > 0 && flying >= q->q_max_flying)
res = false; res = false;
always_done: always_done:
return res; return res;
} }
@ -167,11 +175,11 @@ static inline int hash_fn(loff_t base_index)
return ((unsigned)tmp) % TRANS_HASH_MAX; return ((unsigned)tmp) % TRANS_HASH_MAX;
} }
static struct trans_logger_mref_aspect *hash_find(struct hash_anchor *table, loff_t pos, int len) static struct trans_logger_mref_aspect *hash_find(struct trans_logger_output *output, loff_t pos, int len)
{ {
loff_t base_index = pos >> REGION_SIZE_BITS; loff_t base_index = pos >> REGION_SIZE_BITS;
int hash = hash_fn(base_index); int hash = hash_fn(base_index);
struct hash_anchor *start = &table[hash]; struct hash_anchor *start = &output->hash_table[hash];
struct list_head *tmp; struct list_head *tmp;
struct trans_logger_mref_aspect *res = NULL; struct trans_logger_mref_aspect *res = NULL;
struct trans_logger_mref_aspect *test_a; struct trans_logger_mref_aspect *test_a;
@ -216,6 +224,7 @@ static struct trans_logger_mref_aspect *hash_find(struct hash_anchor *table, lof
if (res) { if (res) {
atomic_inc(&res->object->ref_count); atomic_inc(&res->object->ref_count);
atomic_inc(&output->inner_balance_count);
} }
traced_readunlock(&start->hash_lock, flags); traced_readunlock(&start->hash_lock, flags);
@ -223,33 +232,36 @@ static struct trans_logger_mref_aspect *hash_find(struct hash_anchor *table, lof
return res; return res;
} }
static inline static
void hash_insert(struct hash_anchor *table, struct trans_logger_mref_aspect *elem_a, atomic_t *cnt) void hash_insert(struct trans_logger_output *output, struct trans_logger_mref_aspect *elem_a)
{ {
loff_t base_index = elem_a->object->ref_pos >> REGION_SIZE_BITS; loff_t base_index = elem_a->object->ref_pos >> REGION_SIZE_BITS;
int hash = hash_fn(base_index); int hash = hash_fn(base_index);
struct hash_anchor *start = &table[hash]; struct hash_anchor *start = &output->hash_table[hash];
unsigned int flags; unsigned int flags;
traced_writelock(&start->hash_lock, flags);
#if 1 #if 1
CHECK_HEAD_EMPTY(&elem_a->hash_head); CHECK_HEAD_EMPTY(&elem_a->hash_head);
#endif #endif
atomic_inc(&elem_a->object->ref_count); // must be paired with hash_put()
// only for statistics:
atomic_inc(&output->inner_balance_count);
atomic_inc(&output->hash_count);
traced_writelock(&start->hash_lock, flags);
list_add(&elem_a->hash_head, &start->hash_anchor); list_add(&elem_a->hash_head, &start->hash_anchor);
atomic_inc(&elem_a->object->ref_count); // paired with hash_put()
atomic_inc(cnt); // only for statistics
traced_writeunlock(&start->hash_lock, flags); traced_writeunlock(&start->hash_lock, flags);
} }
static inline bool hash_put(struct hash_anchor *table, struct trans_logger_mref_aspect *elem_a, atomic_t *cnt) static inline bool hash_put(struct trans_logger_output *output, struct trans_logger_mref_aspect *elem_a)
{ {
struct mref_object *elem = elem_a->object; struct mref_object *elem = elem_a->object;
loff_t base_index = elem->ref_pos >> REGION_SIZE_BITS; loff_t base_index = elem->ref_pos >> REGION_SIZE_BITS;
int hash = hash_fn(base_index); int hash = hash_fn(base_index);
struct hash_anchor *start = &table[hash]; struct hash_anchor *start = &output->hash_table[hash];
unsigned int flags; unsigned int flags;
bool res; bool res;
@ -257,10 +269,11 @@ static inline bool hash_put(struct hash_anchor *table, struct trans_logger_mref_
CHECK_ATOMIC(&elem->ref_count, 1); CHECK_ATOMIC(&elem->ref_count, 1);
res = atomic_dec_and_test(&elem->ref_count); res = atomic_dec_and_test(&elem->ref_count);
atomic_dec(&output->inner_balance_count);
if (res) { if (res) {
list_del_init(&elem_a->hash_head); list_del_init(&elem_a->hash_head);
atomic_dec(cnt); atomic_dec(&output->hash_count);
} }
traced_writeunlock(&start->hash_lock, flags); traced_writeunlock(&start->hash_lock, flags);
@ -275,7 +288,7 @@ static int trans_logger_get_info(struct trans_logger_output *output, struct mars
return GENERIC_INPUT_CALL(input, mars_get_info, info); return GENERIC_INPUT_CALL(input, mars_get_info, info);
} }
static void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref); static void _trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref);
static int _read_ref_get(struct trans_logger_output *output, struct trans_logger_mref_aspect *mref_a) static int _read_ref_get(struct trans_logger_output *output, struct trans_logger_mref_aspect *mref_a)
{ {
@ -287,7 +300,7 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger
* the old one. * the old one.
* When a shadow is found, use it as buffer for the mref. * When a shadow is found, use it as buffer for the mref.
*/ */
shadow_a = hash_find(output->hash_table, mref->ref_pos, mref->ref_len); shadow_a = hash_find(output, mref->ref_pos, mref->ref_len);
if (shadow_a) { if (shadow_a) {
struct mref_object *shadow = shadow_a->object; struct mref_object *shadow = shadow_a->object;
int diff = shadow->ref_pos - mref->ref_pos; int diff = shadow->ref_pos - mref->ref_pos;
@ -304,7 +317,7 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger
* region. * region.
*/ */
mref->ref_len = diff; mref->ref_len = diff;
trans_logger_ref_put(output, shadow); _trans_logger_ref_put(output, shadow);
goto call_through; goto call_through;
} }
/* Attach mref to the existing shadow ("slave shadow"). /* Attach mref to the existing shadow ("slave shadow").
@ -315,7 +328,8 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger
mref->ref_data = shadow->ref_data - diff; mref->ref_data = shadow->ref_data - diff;
mref->ref_flags = shadow->ref_flags; mref->ref_flags = shadow->ref_flags;
mref_a->shadow_ref = shadow_a; mref_a->shadow_ref = shadow_a;
atomic_set(&mref->ref_count, 1); atomic_inc(&mref->ref_count);
atomic_inc(&output->inner_balance_count);
atomic_inc(&output->sshadow_count); atomic_inc(&output->sshadow_count);
#ifdef USE_MEMCPY #ifdef USE_MEMCPY
if (mref_a->orig_data) { if (mref_a->orig_data) {
@ -333,7 +347,7 @@ static int _write_ref_get(struct trans_logger_output *output, struct trans_logge
{ {
struct mref_object *mref = mref_a->object; struct mref_object *mref = mref_a->object;
// unconditionally create a new shadow buffer // unconditionally create a new master shadow buffer
mref->ref_data = kmalloc(mref->ref_len, GFP_MARS); mref->ref_data = kmalloc(mref->ref_len, GFP_MARS);
if (unlikely(!mref->ref_data)) { if (unlikely(!mref->ref_data)) {
return -ENOMEM; return -ENOMEM;
@ -347,7 +361,8 @@ static int _write_ref_get(struct trans_logger_output *output, struct trans_logge
mref_a->output = output; mref_a->output = output;
mref->ref_flags = MREF_UPTODATE; mref->ref_flags = MREF_UPTODATE;
mref_a->shadow_ref = mref_a; // cyclic self-reference => indicates master shadow mref_a->shadow_ref = mref_a; // cyclic self-reference => indicates master shadow
atomic_set(&mref->ref_count, 1); atomic_inc(&mref->ref_count);
atomic_inc(&output->inner_balance_count);
get_lamport(&mref_a->stamp); get_lamport(&mref_a->stamp);
return mref->ref_len; return mref->ref_len;
} }
@ -359,13 +374,13 @@ static int trans_logger_ref_get(struct trans_logger_output *output, struct mref_
CHECK_PTR(output, err); CHECK_PTR(output, err);
#if 1 // xxx atomic_inc(&output->outer_balance_count);
if (atomic_read(&mref->ref_count) > 0) { // setup already performed if (atomic_read(&mref->ref_count) > 0) { // setup already performed
MARS_INF("aha %d\n", atomic_read(&mref->ref_count)); MARS_DBG("aha %d\n", atomic_read(&mref->ref_count));
atomic_inc(&mref->ref_count); atomic_inc(&mref->ref_count);
return mref->ref_len; return mref->ref_len;
} }
#endif
mref_a = trans_logger_mref_get_aspect(output, mref); mref_a = trans_logger_mref_get_aspect(output, mref);
CHECK_PTR(mref_a, err); CHECK_PTR(mref_a, err);
@ -393,7 +408,8 @@ err:
return -EINVAL; return -EINVAL;
} }
static void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref) static
void __trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref)
{ {
struct trans_logger_mref_aspect *mref_a; struct trans_logger_mref_aspect *mref_a;
struct trans_logger_mref_aspect *shadow_a; struct trans_logger_mref_aspect *shadow_a;
@ -413,9 +429,10 @@ restart:
if (shadow_a) { if (shadow_a) {
bool finished; bool finished;
if (mref_a->is_hashed) { if (mref_a->is_hashed) {
finished = hash_put(output->hash_table, mref_a, &output->hash_count); finished = hash_put(output, mref_a);
} else { } else {
finished = atomic_dec_and_test(&mref->ref_count); finished = atomic_dec_and_test(&mref->ref_count);
atomic_dec(&output->inner_balance_count);
} }
if (!finished) { if (!finished) {
return; return;
@ -445,6 +462,18 @@ err:
MARS_FAT("oops\n"); MARS_FAT("oops\n");
} }
static void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref)
{
atomic_dec(&output->outer_balance_count);
_trans_logger_ref_put(output, mref);
}
static void _trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref)
{
//atomic_dec(&output->inner_balance_count);
__trans_logger_ref_put(output, mref);
}
static void _trans_logger_endio(struct generic_callback *cb) static void _trans_logger_endio(struct generic_callback *cb)
{ {
struct trans_logger_mref_aspect *mref_a; struct trans_logger_mref_aspect *mref_a;
@ -462,8 +491,6 @@ static void _trans_logger_endio(struct generic_callback *cb)
output = mref_a->output; output = mref_a->output;
CHECK_PTR(output, err); CHECK_PTR(output, err);
atomic_dec(&output->fly_count);
prev_cb = cb->cb_prev; prev_cb = cb->cb_prev;
CHECK_PTR(prev_cb, err); CHECK_PTR(prev_cb, err);
mref = mref_a->object; mref = mref_a->object;
@ -471,6 +498,9 @@ static void _trans_logger_endio(struct generic_callback *cb)
mref->ref_cb = prev_cb; mref->ref_cb = prev_cb;
prev_cb->cb_fn(prev_cb); prev_cb->cb_fn(prev_cb);
atomic_dec(&output->fly_count);
err: ; err: ;
} }
@ -514,7 +544,7 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mref_
if (!mref_a->is_hashed) { if (!mref_a->is_hashed) {
mref_a->is_hashed = true; mref_a->is_hashed = true;
MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos); MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
hash_insert(output->hash_table, mref_a, &output->hash_count); hash_insert(output, mref_a);
} }
q_insert(&output->q_phase1, mref_a); q_insert(&output->q_phase1, mref_a);
wake_up_interruptible(&output->event); wake_up_interruptible(&output->event);
@ -528,6 +558,7 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mref_
} }
atomic_inc(&output->fly_count); atomic_inc(&output->fly_count);
mref_a->output = output; mref_a->output = output;
cb = &mref_a->cb; cb = &mref_a->cb;
cb->cb_fn = _trans_logger_endio; cb->cb_fn = _trans_logger_endio;
@ -579,7 +610,11 @@ static void phase1_endio(struct generic_callback *cb)
orig_cb->cb_fn(orig_cb); orig_cb->cb_fn(orig_cb);
// queue up for the next phase // queue up for the next phase
q_insert(&output->q_phase2, orig_mref_a); if (output->brick->log_reads) {
q_insert(&output->q_phase2, orig_mref_a);
} else {
q_insert(&output->q_phase2, orig_mref_a);
}
wake_up_interruptible(&output->event); wake_up_interruptible(&output->event);
err: ; err: ;
} }
@ -696,7 +731,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
/* allocate internal sub_mref for further work /* allocate internal sub_mref for further work
*/ */
while (len > 0) { while (len > 0) {
sub_mref = mars_alloc_mref(&brick->logst.hidden_output, &brick->logst.ref_object_layout); sub_mref = trans_logger_alloc_mref((void*)brick->logst.output, &brick->logst.ref_object_layout);
if (unlikely(!sub_mref)) { if (unlikely(!sub_mref)) {
MARS_FAT("cannot alloc sub_mref\n"); MARS_FAT("cannot alloc sub_mref\n");
goto err; goto err;
@ -706,7 +741,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
sub_mref->ref_len = len; sub_mref->ref_len = len;
sub_mref->ref_may_write = WRITE; sub_mref->ref_may_write = WRITE;
sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)&brick->logst.hidden_output, sub_mref); sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)brick->logst.output, sub_mref);
CHECK_PTR(sub_mref_a, err); CHECK_PTR(sub_mref_a, err);
sub_mref_a->stamp = orig_mref_a->stamp; sub_mref_a->stamp = orig_mref_a->stamp;
sub_mref_a->orig_mref_a = orig_mref_a; sub_mref_a->orig_mref_a = orig_mref_a;
@ -717,6 +752,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
MARS_FAT("cannot get sub_ref, status = %d\n", status); MARS_FAT("cannot get sub_ref, status = %d\n", status);
goto err; goto err;
} }
atomic_inc(&output->sub_balance_count);
pos += sub_mref->ref_len; pos += sub_mref->ref_len;
len -= sub_mref->ref_len; len -= sub_mref->ref_len;
@ -725,6 +761,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
*/ */
CHECK_ATOMIC(&orig_mref->ref_count, 1); CHECK_ATOMIC(&orig_mref->ref_count, 1);
atomic_inc(&orig_mref->ref_count); atomic_inc(&orig_mref->ref_count);
atomic_inc(&output->inner_balance_count);
cb = &sub_mref_a->cb; cb = &sub_mref_a->cb;
cb->cb_fn = phase2_endio; cb->cb_fn = phase2_endio;
@ -733,6 +770,9 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
cb->cb_prev = NULL; cb->cb_prev = NULL;
sub_mref->ref_cb = cb; sub_mref->ref_cb = cb;
sub_mref->ref_rw = 0; sub_mref->ref_rw = 0;
#if 1
sub_mref->ref_prio = MARS_PRIO_LOW;
#endif
atomic_inc(&output->q_phase2.q_flying); atomic_inc(&output->q_phase2.q_flying);
if (output->brick->log_reads) { if (output->brick->log_reads) {
@ -746,7 +786,7 @@ static bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
* _replace_ the original reference by the sub_mref counts * _replace_ the original reference by the sub_mref counts
* from above). * from above).
*/ */
trans_logger_ref_put(output, orig_mref); _trans_logger_ref_put(output, orig_mref);
return true; return true;
err: err:
@ -855,7 +895,7 @@ static void phase4_endio(struct generic_callback *cb)
put: put:
//MARS_INF("put ORIGREF.\n"); //MARS_INF("put ORIGREF.\n");
CHECK_ATOMIC(&orig_mref->ref_count, 1); CHECK_ATOMIC(&orig_mref->ref_count, 1);
trans_logger_ref_put(orig_mref_a->output, orig_mref); _trans_logger_ref_put(orig_mref_a->output, orig_mref);
wake_up_interruptible(&output->event); wake_up_interruptible(&output->event);
err: ; err: ;
} }
@ -896,6 +936,7 @@ static bool phase4_startio(struct trans_logger_mref_aspect *sub_mref_a)
//MARS_INF("put SUBREF.\n"); //MARS_INF("put SUBREF.\n");
GENERIC_INPUT_CALL(input, mref_put, sub_mref); GENERIC_INPUT_CALL(input, mref_put, sub_mref);
atomic_dec(&output->sub_balance_count);
return true; return true;
err: err:
@ -939,6 +980,9 @@ static inline int _congested(struct trans_logger_output *output)
|| atomic_read(&output->q_phase4.q_flying); || atomic_read(&output->q_phase4.q_flying);
} }
#define DO_DRAIN(output) \
((output)->old_fly_count + atomic_read(&(output)->fly_count) <= 0)
static static
void trans_logger_log(struct trans_logger_output *output) void trans_logger_log(struct trans_logger_output *output)
{ {
@ -949,23 +993,25 @@ void trans_logger_log(struct trans_logger_output *output)
while (!kthread_should_stop() || _congested(output)) { while (!kthread_should_stop() || _congested(output)) {
int status; int status;
output->old_fly_count = atomic_read(&output->fly_count);
wait_event_interruptible_timeout( wait_event_interruptible_timeout(
output->event, output->event,
q_is_ready(&output->q_phase1) || q_is_ready(&output->q_phase1, DO_DRAIN(output)) ||
q_is_ready(&output->q_phase2) || q_is_ready(&output->q_phase2, DO_DRAIN(output)) ||
q_is_ready(&output->q_phase3) || q_is_ready(&output->q_phase3, DO_DRAIN(output)) ||
q_is_ready(&output->q_phase4) || q_is_ready(&output->q_phase4, DO_DRAIN(output)) ||
(kthread_should_stop() && !_congested(output)), (kthread_should_stop() && !_congested(output)),
wait_jiffies); wait_jiffies);
#if 1 #if 1
if (((int)jiffies) - last_jiffies >= HZ * 10 && brick->power.button) { if (((int)jiffies) - last_jiffies >= HZ * 10 && brick->power.button) {
last_jiffies = jiffies; last_jiffies = jiffies;
MARS_INF("LOGGER: mshadow=%d sshadow=%d hash_count=%d fly=%d phase1=%d/%d phase2=%d/%d phase3=%d/%d phase4=%d/%d\n", atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying)); MARS_INF("LOGGER: mshadow=%d sshadow=%d hash_count=%d balance=%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->sub_balance_count), atomic_read(&output->inner_balance_count), atomic_read(&output->outer_balance_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying));
} }
#endif #endif
wait_jiffies = HZ; wait_jiffies = HZ;
status = run_queue(&output->q_phase1, phase1_startio, 1000); status = run_queue(&output->q_phase1, phase1_startio, output->q_phase1.q_batchlen);
if (unlikely(status > 0)) { if (unlikely(status > 0)) {
(void)run_queue(&output->q_phase3, phase3_startio, 1); (void)run_queue(&output->q_phase3, phase3_startio, 1);
log_skip(&brick->logst); log_skip(&brick->logst);
@ -976,20 +1022,20 @@ void trans_logger_log(struct trans_logger_output *output)
/* Strategy / performance: /* Strategy / performance:
* run higher phases only when IO contention is "low". * run higher phases only when IO contention is "low".
*/ */
if (q_is_ready(&output->q_phase2)) { if (q_is_ready(&output->q_phase2, DO_DRAIN(output))) {
(void)run_queue(&output->q_phase2, phase2_startio, 64); (void)run_queue(&output->q_phase2, phase2_startio, output->q_phase2.q_batchlen);
} }
if (q_is_ready(&output->q_phase3)) { if (q_is_ready(&output->q_phase3, DO_DRAIN(output))) {
status = run_queue(&output->q_phase3, phase3_startio, 64); status = run_queue(&output->q_phase3, phase3_startio, output->q_phase3.q_batchlen);
if (unlikely(status > 0)) { if (unlikely(status > 0)) {
log_skip(&brick->logst); log_skip(&brick->logst);
wait_jiffies = 5; wait_jiffies = 5;
continue; continue;
} }
} }
if (q_is_ready(&output->q_phase4)) { if (q_is_ready(&output->q_phase4, DO_DRAIN(output))) {
(void)run_queue(&output->q_phase4, phase4_startio, 64); (void)run_queue(&output->q_phase4, phase4_startio, output->q_phase4.q_batchlen);
} }
} }
} }
@ -1085,8 +1131,7 @@ MARS_MAKE_STATICS(trans_logger);
static int trans_logger_brick_construct(struct trans_logger_brick *brick) static int trans_logger_brick_construct(struct trans_logger_brick *brick)
{ {
_generic_output_init((struct generic_brick*)brick, (struct generic_output_type*)&trans_logger_output_type, (struct generic_output*)&brick->logst.hidden_output, "internal"); init_logst(&brick->logst, (void*)brick->inputs[1], (void*)brick->outputs[0]);
brick->logst.input = (void*)brick->inputs[1];
return 0; return 0;
} }

View File

@ -24,6 +24,7 @@ struct logger_queue {
atomic_t q_flying; atomic_t q_flying;
long long q_last_action; // jiffies long long q_last_action; // jiffies
// tunables // tunables
int q_batchlen;
int q_max_queued; int q_max_queued;
int q_max_flying; int q_max_flying;
int q_max_jiffies; int q_max_jiffies;
@ -67,11 +68,14 @@ struct trans_logger_brick {
struct trans_logger_output { struct trans_logger_output {
MARS_OUTPUT(trans_logger); MARS_OUTPUT(trans_logger);
struct hash_anchor hash_table[TRANS_HASH_MAX];
atomic_t hash_count;
atomic_t fly_count; atomic_t fly_count;
atomic_t hash_count;
atomic_t mshadow_count; atomic_t mshadow_count;
atomic_t sshadow_count; atomic_t sshadow_count;
atomic_t outer_balance_count;
atomic_t inner_balance_count;
atomic_t sub_balance_count;
int old_fly_count;
struct task_struct *thread; struct task_struct *thread;
wait_queue_head_t event; wait_queue_head_t event;
// queues // queues
@ -79,6 +83,7 @@ struct trans_logger_output {
struct logger_queue q_phase2; struct logger_queue q_phase2;
struct logger_queue q_phase3; struct logger_queue q_phase3;
struct logger_queue q_phase4; struct logger_queue q_phase4;
struct hash_anchor hash_table[TRANS_HASH_MAX];
}; };
struct trans_logger_input { struct trans_logger_input {