import mars-107.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-05-26 15:32:32 +01:00
parent 4e6527b5cf
commit 4c6369d65d
13 changed files with 411 additions and 130 deletions

View File

@ -20,11 +20,22 @@ EXPORT_SYMBOL_GPL(init_logst);
struct log_cb_info {
struct mref_object *mref;
int nr_endio;
struct semaphore mutex;
atomic_t refcount;
int nr_cb;
void (*preios[MARS_LOG_CB_MAX])(void *private);
void (*endios[MARS_LOG_CB_MAX])(void *private, int error);
void *privates[MARS_LOG_CB_MAX];
};
static
void put_log_cb_info(struct log_cb_info *cb_info)
{
if (atomic_dec_and_test(&cb_info->refcount)) {
kfree(cb_info);
}
}
static
void log_write_endio(struct generic_callback *cb)
{
@ -38,12 +49,18 @@ void log_write_endio(struct generic_callback *cb)
mars_log_trace(cb_info->mref);
}
MARS_IO("nr_endio = %d\n", cb_info->nr_endio);
MARS_IO("nr_cb = %d\n", cb_info->nr_cb);
for (i = 0; i < cb_info->nr_endio; i++) {
cb_info->endios[i](cb_info->privates[i], cb->cb_error);
down(&cb_info->mutex);
for (i = 0; i < cb_info->nr_cb; i++) {
void (*cbf)(void *private, int error) = cb_info->endios[i];
if (cbf) {
cbf(cb_info->privates[i], cb->cb_error);
}
}
kfree(cb_info);
cb_info->nr_cb = 0; // prevent late preio() callbacks
up(&cb_info->mutex);
put_log_cb_info(cb_info);
return;
err:
@ -54,7 +71,9 @@ void log_flush(struct log_status *logst)
{
struct mref_object *mref = logst->log_mref;
struct generic_callback *cb;
struct log_cb_info *cb_info;
int gap;
int i;
if (!mref || !logst->count)
return;
@ -81,7 +100,8 @@ void log_flush(struct log_status *logst)
cb = &mref->_ref_cb;
cb->cb_fn = log_write_endio;
cb->cb_private = logst->private;
cb_info = logst->private;
cb->cb_private = cb_info;
logst->private = NULL;
cb->cb_error = 0;
cb->cb_prev = NULL;
@ -96,6 +116,18 @@ void log_flush(struct log_status *logst)
logst->offset = 0;
logst->count = 0;
logst->log_mref = NULL;
if (cb_info->nr_cb > 0) {
down(&cb_info->mutex);
for (i = 0; i < cb_info->nr_cb; i++) {
void (*cbf)(void *private) = cb_info->preios[i];
if (cbf) {
cbf(cb_info->privates[i]);
}
}
up(&cb_info->mutex);
}
put_log_cb_info(cb_info);
}
EXPORT_SYMBOL_GPL(log_flush);
@ -112,7 +144,7 @@ void *log_reserve(struct log_status *logst, struct log_header *lh)
mref = logst->log_mref;
if ((mref && total_len > mref->ref_len - logst->offset)
|| !cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX) {
|| !cb_info || cb_info->nr_cb >= MARS_LOG_CB_MAX) {
log_flush(logst);
}
@ -128,6 +160,8 @@ void *log_reserve(struct log_status *logst, struct log_header *lh)
goto err;
}
cb_info = logst->private;
sema_init(&cb_info->mutex, 1);
atomic_set(&cb_info->refcount, 2);
mref = mars_alloc_mref(logst->output, &logst->ref_object_layout);
if (unlikely(!mref)) {
@ -206,7 +240,7 @@ err:
}
EXPORT_SYMBOL_GPL(log_reserve);
bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private, int error), void *private)
bool log_finalize(struct log_status *logst, int len, void (*preio)(void *private), void (*endio)(void *private, int error), void *private)
{
struct mref_object *mref = logst->log_mref;
struct log_cb_info *cb_info = logst->private;
@ -214,7 +248,7 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private
void *data;
int offset;
int restlen;
int nr_endio;
int nr_cb;
bool ok = false;
CHECK_PTR(mref, err);
@ -228,7 +262,7 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private
MARS_ERR("trying to write more than available (%d > %d)\n", len, (int)(restlen - END_OVERHEAD));
goto err;
}
if (unlikely(!cb_info || cb_info->nr_endio >= MARS_LOG_CB_MAX)) {
if (unlikely(!cb_info || cb_info->nr_cb >= MARS_LOG_CB_MAX)) {
MARS_ERR("too many endio() calls\n");
goto err;
}
@ -267,9 +301,10 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private
offset = logst->validflag_offset;
DATA_PUT(data, offset, (char)1);
nr_endio = cb_info->nr_endio++;
cb_info->endios[nr_endio] = endio;
cb_info->privates[nr_endio] = private;
nr_cb = cb_info->nr_cb++;
cb_info->preios[nr_cb] = preio;
cb_info->endios[nr_cb] = endio;
cb_info->privates[nr_cb] = private;
logst->count++;
ok = true;

View File

@ -120,7 +120,7 @@ void log_flush(struct log_status *logst);
void *log_reserve(struct log_status *logst, struct log_header *lh);
bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private, int error), void *private);
bool log_finalize(struct log_status *logst, int len, void (*preio)(void *private), void (*endio)(void *private, int error), void *private);
int log_read(struct log_status *logst, struct log_header *lh, void **payload, int *payload_len);

View File

@ -254,6 +254,12 @@ void bio_ref_put(struct bio_output *output, struct mref_object *mref)
CHECK_PTR(mref_a, err);
if (likely(mref_a->bio)) {
#ifdef MARS_DEBUGGING
int bi_cnt = atomic_read(&mref_a->bio->bi_cnt);
if (bi_cnt > 1) {
MARS_DBG("bi_cnt = %d\n", bi_cnt);
}
#endif
bio_put(mref_a->bio);
mref_a->bio = NULL;
}
@ -271,7 +277,8 @@ err:
MARS_FAT("cannot work\n");
}
static void bio_ref_io(struct bio_output *output, struct mref_object *mref)
static
void _bio_ref_io(struct bio_output *output, struct mref_object *mref)
{
struct bio_brick *brick = output->brick;
struct bio_mref_aspect *mref_a = bio_mref_get_aspect(output, mref);
@ -306,15 +313,6 @@ static void bio_ref_io(struct bio_output *output, struct mref_object *mref)
MARS_IO("starting IO rw = %d fly = %d\n", rw, atomic_read(&brick->fly_count));
mars_trace(mref, "bio_submit");
#ifdef WAIT_CLASH
mref_a->hash_pos = (mref->ref_pos / PAGE_SIZE) % WAIT_CLASH;
if (mref->ref_rw) {
down_write(&brick->hashtable[mref_a->hash_pos]);
} else {
down_read(&brick->hashtable[mref_a->hash_pos]);
}
#endif
#ifdef FAKE_IO
bio->bi_end_io(bio, 0);
#else
@ -331,10 +329,9 @@ static void bio_ref_io(struct bio_output *output, struct mref_object *mref)
if (likely(status >= 0))
goto done;
#if 1
bio_put(bio);
atomic_dec(&brick->fly_count);
#endif
err:
MARS_ERR("IO error %d\n", status);
cb = mref->ref_cb;
@ -345,6 +342,26 @@ err:
done: ;
}
static
void bio_ref_io(struct bio_output *output, struct mref_object *mref)
{
if (mref->ref_prio == MARS_PRIO_LOW) { // queue for background IO
struct bio_mref_aspect *mref_a = bio_mref_get_aspect(output, mref);
struct bio_brick *brick = output->brick;
unsigned long flags;
spin_lock_irqsave(&brick->lock, flags);
list_add_tail(&mref_a->io_head, &brick->background_list);
spin_unlock_irqrestore(&brick->lock, flags);
atomic_inc(&brick->background_count);
atomic_inc(&brick->total_background_count);
wake_up_interruptible(&brick->event);
return;
}
// foreground IO: start immediately
_bio_ref_io(output, mref);
}
static int bio_thread(void *data)
{
struct bio_brick *brick = data;
@ -355,7 +372,11 @@ static int bio_thread(void *data)
LIST_HEAD(tmp_list);
unsigned long flags;
wait_event_interruptible_timeout(brick->event, atomic_read(&brick->completed_count) > 0, 12 * HZ);
wait_event_interruptible_timeout(
brick->event,
atomic_read(&brick->completed_count) > 0 ||
(atomic_read(&brick->background_count) > 0 && !atomic_read(&brick->fly_count)),
12 * HZ);
spin_lock_irqsave(&brick->lock, flags);
list_replace_init(&brick->completed_list, &tmp_list);
@ -384,13 +405,6 @@ static int bio_thread(void *data)
mref = mref_a->object;
#ifdef WAIT_CLASH
if (mref_a->object->ref_rw) {
up_write(&brick->hashtable[mref_a->hash_pos]);
} else {
up_read(&brick->hashtable[mref_a->hash_pos]);
}
#endif
mars_trace(mref, "bio_endio");
cb = mref->ref_cb;
@ -406,8 +420,32 @@ static int bio_thread(void *data)
atomic_dec(&brick->fly_count);
atomic_inc(&brick->total_completed_count);
MARS_IO("fly = %d\n", atomic_read(&brick->fly_count));
if (likely(mref_a->bio)) {
bio_put(mref_a->bio);
}
bio_ref_put(mref_a->output, mref);
}
if (!atomic_read(&brick->fly_count) && atomic_read(&brick->background_count) > 0) {
struct list_head *tmp;
struct bio_mref_aspect *mref_a;
struct mref_object *mref;
atomic_dec(&brick->background_count);
spin_lock_irqsave(&brick->lock, flags);
tmp = brick->background_list.next;
list_del_init(tmp);
spin_unlock_irqrestore(&brick->lock, flags);
mref_a = container_of(tmp, struct bio_mref_aspect, io_head);
mref = mref_a->object;
if (unlikely(!mref)) {
MARS_ERR("invalid mref\n");
continue;
}
_bio_ref_io(mref_a->output, mref);
}
}
done:
MARS_INF("bio kthread has stopped.\n");
@ -504,13 +542,13 @@ done:
static noinline
char *bio_statistics(struct bio_brick *brick, int verbose)
{
char *res = kmalloc(128, GFP_MARS);
char *res = kmalloc(256, GFP_MARS);
if (!res)
return NULL;
// FIXME: check for allocation overflows
sprintf(res, "total completed = %d | flying = %d completing = %d\n", atomic_read(&brick->total_completed_count), atomic_read(&brick->fly_count), atomic_read(&brick->completed_count));
sprintf(res, "total completed = %d background = %d | flying = %d completing = %d background = %d\n", atomic_read(&brick->total_completed_count), atomic_read(&brick->total_background_count), atomic_read(&brick->fly_count), atomic_read(&brick->completed_count), atomic_read(&brick->background_count));
return res;
}
@ -519,6 +557,7 @@ static noinline
void bio_reset_statistics(struct bio_brick *brick)
{
atomic_set(&brick->total_completed_count, 0);
atomic_set(&brick->total_background_count, 0);
}
@ -543,13 +582,8 @@ MARS_MAKE_STATICS(bio);
static int bio_brick_construct(struct bio_brick *brick)
{
#ifdef WAIT_CLASH
int i;
for (i = 0; i < WAIT_CLASH; i++) {
init_rwsem(&brick->hashtable[i]);
}
#endif
spin_lock_init(&brick->lock);
INIT_LIST_HEAD(&brick->background_list);
INIT_LIST_HEAD(&brick->completed_list);
init_waitqueue_head(&brick->event);
return 0;

View File

@ -5,8 +5,6 @@
#include <linux/blkdev.h>
#include <linux/rwsem.h>
//#define WAIT_CLASH 1024
struct bio_mref_aspect {
GENERIC_ASPECT(mref);
struct list_head io_head;
@ -27,18 +25,18 @@ struct bio_brick {
// readonly
loff_t total_size;
atomic_t fly_count;
atomic_t background_count;
atomic_t completed_count;
atomic_t total_completed_count;
atomic_t total_background_count;
// private
spinlock_t lock;
struct list_head background_list;
struct list_head completed_list;
wait_queue_head_t event;
struct file *filp;
struct block_device *bdev;
struct task_struct *thread;
#ifdef WAIT_CLASH
struct rw_semaphore hashtable[WAIT_CLASH];
#endif
int bvec_max;
};

View File

@ -33,8 +33,9 @@ static void _kill_socket(struct client_output *output)
static void _kill_thread(struct client_threadinfo *ti)
{
if (ti->thread) {
if (ti->thread && !ti->terminated) {
kthread_stop(ti->thread);
ti->thread = NULL;
}
}
@ -231,7 +232,8 @@ error:
client_ref_put(output, mref);
}
static int receiver_thread(void *data)
static
int receiver_thread(void *data)
{
struct client_output *output = data;
int status = 0;
@ -340,6 +342,8 @@ static int sender_thread(void *data)
struct client_brick *brick = output->brick;
int status = 0;
output->receiver.restart_count = 0;
while (!kthread_should_stop()) {
struct list_head *tmp;
struct client_mref_aspect *mref_a;
@ -375,9 +379,16 @@ static int sender_thread(void *data)
traced_unlock(&output->lock, flags);
}
wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info, 1 * HZ);
wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info, 10 * HZ);
if (unlikely(output->receiver.terminated)) {
#if 1
if (unlikely(output->receiver.restart_count++ > 3)) { // don't restart too often
MARS_ERR("receiver failed too often, giving up\n");
status = -ECOMM;
break;
}
#endif
output->receiver.terminated = false;
output->receiver.thread = kthread_create(receiver_thread, output, "mars_receiver%d", thread_count++);
if (unlikely(IS_ERR(output->receiver.thread))) {

View File

@ -26,6 +26,7 @@ struct client_input {
struct client_threadinfo {
struct task_struct *thread;
wait_queue_head_t run_event;
int restart_count;
bool terminated;
};

View File

@ -53,7 +53,7 @@ int _clear_clash(struct copy_brick *brick)
* [If you had no writes on A at all during the copy, of course
* this is not necessary]
*
* When optimize_mode is on, reads can utilize the already copied
* When utilize_mode is on, reads can utilize the already copied
* region from B, but only as long as this region has not been
* invalidated by writes (indicated by low_dirty).
*
@ -69,7 +69,7 @@ int _determine_input(struct copy_brick *brick, struct mref_object *mref)
int behind;
loff_t ref_end;
if (!brick->optimize_mode || brick->low_dirty)
if (!brick->utilize_mode || brick->low_dirty)
return INPUT_A_IO;
ref_end = mref->ref_pos + mref->ref_len;
@ -178,7 +178,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
len = tmp_pos - pos;
}
mref->ref_len = len;
mref->ref_prio = MARS_PRIO_LOW;
mref->ref_prio = brick->io_prio;
mref->_ref_cb.cb_private = mref_a;
mref->_ref_cb.cb_fn = copy_endio;
mref->ref_cb = &mref->_ref_cb;
@ -279,7 +279,10 @@ int _next_state(struct copy_brick *brick, loff_t pos)
if (!mref1) {
goto done;
}
if (mref2) {
if (brick->append_mode > 0 && mref1->ref_total_size && mref1->ref_total_size > brick->copy_end) {
brick->copy_end = mref1->ref_total_size;
}
if (mref2) { // do the verify
int len = mref1->ref_len;
if (len == mref2->ref_len &&
!memcmp(mref1->ref_data, mref2->ref_data, len)) {
@ -359,10 +362,11 @@ void _run_copy(struct copy_brick *brick)
max = MAX_COPY_PARA - atomic_read(&brick->io_flight) * 2;
MARS_IO("max = %d\n", max);
for (pos = brick->copy_start; pos < brick->copy_end; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) {
for (pos = brick->copy_start; pos < brick->copy_end || brick->append_mode > 1; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) {
//MARS_IO("pos = %lld\n", pos);
if (brick->clash || max-- <= 0)
if (brick->clash || max-- <= 0 || kthread_should_stop()) {
break;
}
status = _next_state(brick, pos);
}
}

View File

@ -33,10 +33,12 @@ struct copy_brick {
// parameters
volatile loff_t copy_start;
volatile loff_t copy_end; // stop working if == 0
loff_t copy_last;
int io_prio;
int append_mode; // 1 = passively, 2 = actively
bool verify_mode;
bool optimize_mode;
bool permanent_update; // NYI
bool utilize_mode; // utilize already copied data
// readonly from outside
loff_t copy_last;
bool low_dirty;
// internal
volatile bool trigger;

View File

@ -60,7 +60,7 @@ struct light_class {
//#define TRANS_FAKE
#define CONF_TRANS_BATCHLEN 32
#define CONF_TRANS_BATCHLEN 1024
//#define CONF_TRANS_FLYING 4
#define CONF_TRANS_FLYING 128
#define CONF_TRANS_PRIO MARS_PRIO_HIGH
@ -68,6 +68,8 @@ struct light_class {
//#define CONF_TRANS_LOG_READS true
#define CONF_TRANS_MINIMIZE_LATENCY false
//#define CONF_TRANS_MINIMIZE_LATENCY true
//#define CONF_TRANS_COMPLETION_SEMANTICS 2
#define CONF_TRANS_COMPLETION_SEMANTICS 0
//#define CONF_ALL_BATCHLEN 2
#define CONF_ALL_BATCHLEN 1
@ -94,6 +96,9 @@ struct light_class {
#define AIO_READAHEAD 1
#define AIO_WAIT_DURING_FDSYNC false
#define COPY_APPEND_MODE 1
#define COPY_PRIO MARS_PRIO_LOW
static
void _set_trans_params(struct mars_brick *_brick, void *private)
{
@ -136,7 +141,9 @@ void _set_trans_params(struct mars_brick *_brick, void *private)
trans_brick->q_phase2.q_ordering = true;
trans_brick->q_phase4.q_ordering = true;
trans_brick->log_reads = CONF_TRANS_LOG_READS;
trans_brick->completion_semantics = CONF_TRANS_COMPLETION_SEMANTICS;
trans_brick->minimize_latency = CONF_TRANS_MINIMIZE_LATENCY;
#ifdef TRANS_FAKE
trans_brick->debug_shortcut = true;
@ -219,6 +226,19 @@ void _set_if_params(struct mars_brick *_brick, void *private)
MARS_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);
}
static
void _set_copy_params(struct mars_brick *_brick, void *private)
{
struct copy_brick *copy_brick = (void*)_brick;
if (_brick->type != (void*)&copy_brick_type) {
MARS_ERR("bad brick type\n");
return;
}
copy_brick->append_mode = COPY_APPEND_MODE;
copy_brick->io_prio = COPY_PRIO;
MARS_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);
}
///////////////////////////////////////////////////////////////////////
// internal helpers
@ -336,7 +356,7 @@ int __make_copy(
copy =
make_brick_all(global,
belongs,
NULL,
_set_copy_params,
NULL,
0,
fullpath[1],
@ -469,7 +489,7 @@ int _update_file(struct mars_global *global, const char *switch_path, const char
MARS_DBG("src = '%s' dst = '%s'\n", tmp, file);
status = __make_copy(global, NULL, switch_path, copy_path, NULL, argv, -1, &copy);
if (status >= 0 && copy && !copy->permanent_update) {
if (status >= 0 && copy && !copy->append_mode) {
if (end_pos > copy->copy_end) {
MARS_DBG("appending to '%s' %lld => %lld\n", copy_path, copy->copy_end, end_pos);
copy->copy_end = end_pos;

View File

@ -40,45 +40,120 @@ static int server_worker(struct mars_global *global, struct mars_dent *dent, boo
return 0;
}
static void server_endio(struct generic_callback *cb)
static
int cb_thread(void *data)
{
struct server_brick *brick = data;
int status = -EINVAL;
brick->cb_running = true;
wake_up_interruptible(&brick->startup_event);
MARS_DBG("--------------- cb_thread starting on socket %p\n", brick->handler_socket);
while (!kthread_should_stop()) {
struct server_mref_aspect *mref_a;
struct mref_object *mref;
struct list_head *tmp;
struct socket **sock;
unsigned long flags;
status = -EINVAL;
if (!brick->handler_socket)
break;
wait_event_interruptible_timeout(
brick->cb_event,
!list_empty(&brick->cb_read_list) ||
!list_empty(&brick->cb_write_list) ||
kthread_should_stop(),
3 * HZ);
if (!brick->handler_socket)
break;
traced_lock(&brick->cb_lock, flags);
tmp = brick->cb_write_list.next;
if (tmp == &brick->cb_write_list) {
tmp = brick->cb_read_list.next;
if (tmp == &brick->cb_read_list) {
traced_unlock(&brick->cb_lock, flags);
continue;
}
}
list_del_init(tmp);
traced_unlock(&brick->cb_lock, flags);
mref_a = container_of(tmp, struct server_mref_aspect, cb_head);
mref = mref_a->object;
CHECK_PTR(mref, err);
status = -ENOTSOCK;
sock = mref_a->sock;
CHECK_PTR(sock, err);
CHECK_PTR(*sock, err);
down(&brick->socket_sem);
status = mars_send_cb(sock, mref);
up(&brick->socket_sem);
atomic_dec(&brick->in_flight);
GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref);
if (unlikely(status < 0)) {
MARS_ERR("cannot send response, status = %d\n", status);
kernel_sock_shutdown(*sock, SHUT_WR);
break;
}
}
err:
brick->cb_running = false;
MARS_DBG("---------- cb_thread terminating, status = %d\n", status);
wake_up_interruptible(&brick->startup_event);
return status;
}
static
void server_endio(struct generic_callback *cb)
{
struct server_mref_aspect *mref_a;
struct mref_object *mref;
struct server_brick *brick;
struct socket **sock;
int status;
int rw;
unsigned long flags;
mref_a = cb->cb_private;
CHECK_PTR(mref_a, err);
mref = mref_a->object;
CHECK_PTR(mref, err);
brick = mref_a->brick;
CHECK_PTR(brick, err);
sock = mref_a->sock;
CHECK_PTR(sock, err);
CHECK_PTR(*sock, err);
mref = mref_a->object;
CHECK_PTR(mref, err);
down(&brick->socket_sem);
status = mars_send_cb(sock, mref);
up(&brick->socket_sem);
rw = mref->ref_rw;
if (status < 0) {
MARS_ERR("cannot send response, status = %d\n", status);
kernel_sock_shutdown(*sock, SHUT_WR);
traced_lock(&brick->cb_lock, flags);
if (rw) {
list_add_tail(&mref_a->cb_head, &brick->cb_write_list);
} else {
list_add_tail(&mref_a->cb_head, &brick->cb_read_list);
}
atomic_dec(&brick->in_flight);
traced_unlock(&brick->cb_lock, flags);
wake_up_interruptible(&brick->cb_event);
return;
err:
MARS_FAT("cannot handle callback - giving up\n");
}
int server_io(struct server_brick *brick, struct socket **sock)
{
struct mref_object *mref;
struct server_mref_aspect *mref_a;
int status;
int status = -ENOTRECOVERABLE;
if (!brick->cb_running)
goto done;
mref = server_alloc_mref(&brick->hidden_output, &brick->mref_object_layout);
status = -ENOMEM;
if (!mref)
@ -108,25 +183,47 @@ int server_io(struct server_brick *brick, struct socket **sock)
MARS_INF("mref_get execution error = %d\n", status);
mref->_ref_cb.cb_error = status;
server_endio(&mref->_ref_cb);
mars_free_mref(mref);
status = 0; // continue serving requests
goto done;
}
GENERIC_INPUT_CALL(brick->inputs[0], mref_io, mref);
GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref);
done:
return status;
}
static int handler_thread(void *data)
static
void _clean_list(struct server_brick *brick, struct list_head *start)
{
for (;;) {
struct server_mref_aspect *mref_a;
struct mref_object *mref;
struct list_head *tmp = start->next;
if (tmp == start)
break;
list_del_init(tmp);
mref_a = container_of(tmp, struct server_mref_aspect, cb_head);
mref = mref_a->object;
if (!mref)
continue;
GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref);
}
}
static
int handler_thread(void *data)
{
struct server_brick *brick = data;
struct socket **sock = &brick->handler_socket;
struct task_struct *cb_thread = brick->cb_thread;
int max_round = 300;
int status = 0;
brick->cb_thread = NULL;
brick->handler_thread = NULL;
wake_up_interruptible(&brick->startup_event);
MARS_DBG("--------------- handler_thread starting on socket %p\n", *sock);
@ -135,7 +232,7 @@ static int handler_thread(void *data)
//fake_mm();
while (!kthread_should_stop()) {
while (brick->cb_running && !kthread_should_stop()) {
struct mars_cmd cmd = {};
status = mars_recv_struct(sock, &cmd, mars_cmd_meta);
@ -209,19 +306,19 @@ static int handler_thread(void *data)
CHECK_PTR(_bio_brick_type, err);
//prev = mars_find_brick(mars_global, NULL, cmd.cmd_str1);
prev =
make_brick_all(mars_global,
NULL,
NULL,
NULL,
10 * HZ,
path,
(const struct generic_brick_type*)_bio_brick_type,
(const struct generic_brick_type*[]){},
NULL,
path,
(const char *[]){},
0);
prev = make_brick_all(
mars_global,
NULL,
NULL,
NULL,
10 * HZ,
path,
(const struct generic_brick_type*)_bio_brick_type,
(const struct generic_brick_type*[]){},
NULL,
path,
(const char *[]){},
0);
if (likely(prev)) {
status = generic_connect((void*)brick->inputs[0], (void*)prev->outputs[0]);
} else {
@ -252,6 +349,15 @@ static int handler_thread(void *data)
done:
MARS_DBG("handler_thread terminating, status = %d\n", status);
kthread_stop(cb_thread);
wait_event_interruptible_timeout(
brick->startup_event,
!brick->cb_running,
10 * HZ);
_clean_list(brick, &brick->cb_read_list);
_clean_list(brick, &brick->cb_write_list);
do {
int status = mars_kill_brick((void*)brick);
if (status >= 0) {
@ -264,7 +370,7 @@ done:
break;
}
msleep(1000);
} while (!brick->power.led_off);
} while (brick->cb_running && !brick->power.led_off);
MARS_DBG("done\n");
return 0;
@ -316,14 +422,14 @@ static int server_switch(struct server_brick *brick)
static int server_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct server_mref_aspect *ini = (void*)_ini;
(void)ini;
INIT_LIST_HEAD(&ini->cb_head);
return 0;
}
static void server_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct server_mref_aspect *ini = (void*)_ini;
(void)ini;
CHECK_HEAD_EMPTY(&ini->cb_head);
}
MARS_MAKE_STATICS(server);
@ -335,7 +441,11 @@ static int server_brick_construct(struct server_brick *brick)
struct server_output *hidden = &brick->hidden_output;
_server_output_init(brick, hidden, "internal");
init_waitqueue_head(&brick->startup_event);
init_waitqueue_head(&brick->cb_event);
sema_init(&brick->socket_sem, 1);
spin_lock_init(&brick->cb_lock);
INIT_LIST_HEAD(&brick->cb_read_list);
INIT_LIST_HEAD(&brick->cb_write_list);
return 0;
}
@ -448,15 +558,25 @@ static int _server_thread(void *data)
goto err;
}
brick->handler_socket = new_socket;
thread = kthread_create(cb_thread, brick, "mars_cb%d", version);
if (IS_ERR(thread)) {
MARS_ERR("cannot create cb thread, status = %ld\n", PTR_ERR(thread));
goto err;
}
brick->cb_thread = thread;
wake_up_process(thread);
thread = kthread_create(handler_thread, brick, "mars_handler%d", version++);
if (IS_ERR(thread)) {
MARS_ERR("cannot create thread, status = %ld\n", PTR_ERR(thread));
MARS_ERR("cannot create handler thread, status = %ld\n", PTR_ERR(thread));
goto err;
}
brick->handler_thread = thread;
brick->handler_socket = new_socket;
wake_up_process(thread);
wait_event_interruptible(brick->startup_event, brick->handler_thread == NULL);
wait_event_interruptible(brick->startup_event, brick->handler_thread == NULL && brick->cb_thread == NULL);
continue;
err:

View File

@ -14,6 +14,7 @@ struct server_mref_aspect {
GENERIC_ASPECT(mref);
struct server_brick *brick;
struct socket **sock;
struct list_head cb_head;
};
struct server_output {
@ -26,9 +27,15 @@ struct server_brick {
struct socket *handler_socket;
struct semaphore socket_sem;
struct task_struct *handler_thread;
struct task_struct *cb_thread;
wait_queue_head_t startup_event;
wait_queue_head_t cb_event;
struct generic_object_layout mref_object_layout;
struct server_output hidden_output;
spinlock_t cb_lock;
struct list_head cb_read_list;
struct list_head cb_write_list;
bool cb_running;
};
struct server_input {

View File

@ -750,7 +750,7 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
atomic_inc(&brick->inner_balance_count);
qq_mref_insert(&brick->q_phase1, mref_a);
//wake_up_interruptible(&brick->event);
wake_up_interruptible(&brick->event);
return;
}
@ -1228,14 +1228,73 @@ void put_list(struct writeback_info *wb, struct list_head *start)
/*********************************************************************
* Phase 1: write transaction log entry for the original write request.
*/
static noinline
void phase1_endio(void *private, int error)
void _complete(struct trans_logger_brick *brick, struct trans_logger_mref_aspect *orig_mref_a, int error, bool pre_io)
{
struct mref_object *orig_mref;
struct generic_callback *orig_cb;
orig_mref = orig_mref_a->object;
CHECK_PTR(orig_mref, err);
if (orig_mref_a->is_completed ||
(pre_io &&
(brick->completion_semantics >= 2 ||
(brick->completion_semantics >= 1 && !orig_mref->ref_skip_sync)))) {
goto done;
}
orig_cb = orig_mref->ref_cb;
CHECK_PTR(orig_cb, err);
CHECK_PTR(orig_cb->cb_fn, err);
if (unlikely(error < 0)) {
orig_cb->cb_error = error;
}
if (likely(orig_cb->cb_error >= 0)) {
orig_mref->ref_flags &= ~MREF_WRITING;
orig_mref->ref_flags |= MREF_UPTODATE;
}
orig_mref_a->is_completed = true;
orig_cb->cb_fn(orig_cb);
done:
return;
err:
MARS_ERR("giving up...\n");
}
static noinline
void phase1_preio(void *private)
{
struct trans_logger_mref_aspect *orig_mref_a;
struct trans_logger_output *output;
struct trans_logger_brick *brick;
orig_mref_a = private;
CHECK_PTR(orig_mref_a, err);
output = orig_mref_a->my_output;
CHECK_PTR(output, err);
brick = output->brick;
CHECK_PTR(brick, err);
// signal completion to the upper layer
// FIXME: immediate error signalling is impossible here, but some delayed signalling should be possible as a workaround. Think!
_complete(brick, orig_mref_a, 0, true);
return;
err:
MARS_ERR("giving up...\n");
}
static noinline
void phase1_endio(void *private, int error)
{
struct trans_logger_mref_aspect *orig_mref_a;
struct mref_object *orig_mref;
struct trans_logger_output *output;
struct trans_logger_brick *brick;
struct generic_callback *orig_cb;
orig_mref_a = private;
CHECK_PTR(orig_mref_a, err);
@ -1243,30 +1302,15 @@ void phase1_endio(void *private, int error)
CHECK_PTR(output, err);
brick = output->brick;
CHECK_PTR(brick, err);
orig_mref = orig_mref_a->object;
CHECK_PTR(orig_mref, err);
orig_cb = orig_mref->ref_cb;
CHECK_PTR(orig_cb, err);
qq_dec_flying(&brick->q_phase1);
// error handling
if (error < 0) {
orig_cb->cb_error = error;
}
// signal completion to the upper layer, as early as possible
if (likely(orig_cb->cb_error >= 0)) {
orig_mref->ref_flags &= ~MREF_WRITING;
orig_mref->ref_flags |= MREF_UPTODATE;
}
CHECK_PTR(orig_cb->cb_fn, err);
orig_cb->cb_fn(orig_cb);
// signal completion to the upper layer
_complete(brick, orig_mref_a, error, false);
// queue up for the next phase
qq_mref_insert(&brick->q_phase2, orig_mref_a);
//wake_up_interruptible(&brick->event);
wake_up_interruptible(&brick->event);
return;
err:
MARS_ERR("giving up...\n");
@ -1308,7 +1352,7 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a)
memcpy(data, orig_mref_a->shadow_data, orig_mref->ref_len);
ok = log_finalize(logst, orig_mref->ref_len, phase1_endio, orig_mref_a);
ok = log_finalize(logst, orig_mref->ref_len, phase1_preio, phase1_endio, orig_mref_a);
if (unlikely(!ok)) {
goto err;
}
@ -1455,7 +1499,7 @@ void phase2_endio(struct generic_callback *cb)
// queue up for the next phase
qq_wb_insert(&brick->q_phase3, wb);
//wake_up_interruptible(&brick->event);
wake_up_interruptible(&brick->event);
return;
err:
@ -1517,7 +1561,7 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
} else { // shortcut
#ifdef LATER
qq_wb_insert(&brick->q_phase4, wb);
//wake_up_interruptible(&brick->event);
wake_up_interruptible(&brick->event);
#else
return phase4_startio(wb);
#endif
@ -1543,7 +1587,7 @@ void _phase3_endio(struct writeback_info *wb)
// queue up for the next phase
qq_wb_insert(&brick->q_phase4, wb);
//wake_up_interruptible(&brick->event);
wake_up_interruptible(&brick->event);
return;
}
@ -1624,7 +1668,7 @@ bool _phase3_startio(struct trans_logger_mref_aspect *sub_mref_a)
memcpy(data, sub_mref->ref_data, sub_mref->ref_len);
ok = log_finalize(logst, sub_mref->ref_len, phase3_endio, sub_mref_a);
ok = log_finalize(logst, sub_mref->ref_len, NULL, phase3_endio, sub_mref_a);
if (unlikely(!ok)) {
goto err;
}
@ -1996,6 +2040,9 @@ void trans_logger_log(struct trans_logger_output *output)
(brick->minimize_latency || (long long)jiffies - old_jiffies >= old_wait_timeout)) {
do_flush = true;
}
#if 1
do_flush = true;
#endif
if (do_flush && (bw_logst->count > 0 || bw_logst->count > 0)) {
atomic_inc(&brick->total_flush_count);
log_flush(fw_logst);

View File

@ -96,6 +96,7 @@ struct trans_logger_mref_aspect {
bool is_hashed;
bool is_dirty;
bool is_collected;
bool is_completed;
struct timespec stamp;
loff_t log_pos;
struct generic_callback cb;
@ -113,6 +114,7 @@ struct trans_logger_brick {
int align_size; // alignment between requests
int chunk_size; // must be at least 8K (better 64k)
int flush_delay; // delayed firing of incomplete chunks
int completion_semantics; // 0 = early completion of all writes, 1 = early completion of non-sync, 2 = late completion
bool do_replay; // mode of operation
bool do_continuous_replay; // mode of operation
bool log_reads; // additionally log pre-images