import mars-35.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2010-08-11 17:02:08 +01:00
parent 00c2987c6c
commit 08643a0940
4 changed files with 482 additions and 52 deletions

View File

@ -5,7 +5,8 @@
#include <linux/list.h>
#include <asm/atomic.h>
#define MARS_BUF_HASH_MAX 512
//#define MARS_BUF_HASH_MAX 512
#define MARS_BUF_HASH_MAX 2048
struct buf_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);

View File

@ -81,7 +81,7 @@ void make_test_instance(void)
void connect(struct generic_input *a, struct generic_output *b)
{
int status;
#if 1
#if 0
struct generic_brick *tmp = brick(&check_brick_type);
status = generic_connect(a, tmp->outputs[0]);

View File

@ -16,6 +16,208 @@
#include "mars_trans_logger.h"
////////////////////////////////////////////////////////////////////
#define CODE_UNKNOWN 0
#define CODE_WRITE_NEW 1
#define START_MAGIC 0xa8f7e908d9177957
#define END_MAGIC 0x74941fb74ab5726d
#define OVERHEAD \
( \
sizeof(START_MAGIC) + \
sizeof(char) * 2 + \
sizeof(short) + \
sizeof(int) + \
sizeof(struct log_header) + \
sizeof(END_MAGIC) + \
sizeof(char) * 2 + \
sizeof(short) + \
sizeof(int) + \
sizeof(struct timespec) + \
0 \
)
// TODO: make this bytesex-aware.
#define DATA_PUT(data,offset,val) \
do { \
*((typeof(val)*)(data+offset)) = val; \
offset += sizeof(val); \
} while (0)
#define DATA_GET(data,offset,val) \
do { \
val = *((typeof(val)*)(data+offset)); \
offset += sizeof(val); \
} while (0)
void *log_reserve(struct trans_logger_input *input, struct log_header *l)
{
struct mars_ref_object *mref;
void *data;
int total_len;
int status;
int offset;
if (unlikely(input->log_mref)) {
MARS_ERR("mref already existing\n");
goto err;
}
mref = trans_logger_alloc_mars_ref(&input->hidden_output, &input->ref_object_layout);
if (unlikely(!mref))
goto err;
mref->ref_pos = input->log_pos;
total_len = l->l_len + OVERHEAD;
mref->ref_len = total_len;
mref->ref_may_write = WRITE;
status = GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (unlikely(status < 0)) {
goto err_free;
}
if (unlikely(status < total_len)) {
goto put;
}
input->log_mref = mref;
data = mref->ref_data;
offset = 0;
DATA_PUT(data, offset, START_MAGIC);
DATA_PUT(data, offset, (char)1); // version of format, currently there is no other one
input->validflag_offset = offset;
DATA_PUT(data, offset, (char)0); // valid_flag
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, total_len); // start of next header
DATA_PUT(data, offset, l->l_stamp.tv_sec);
DATA_PUT(data, offset, l->l_stamp.tv_nsec);
DATA_PUT(data, offset, l->l_pos);
input->reallen_offset = offset;
DATA_PUT(data, offset, l->l_len);
DATA_PUT(data, offset, l->l_code);
input->payload_offset = offset;
input->payload_len = l->l_len;
return data + offset;
put:
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
err_free:
trans_logger_free_mars_ref(mref);
err:
return NULL;
}
bool log_finalize(struct trans_logger_input *input, int len, void (*endio)(struct generic_callback *cb), void *private)
{
struct mars_ref_object *mref = input->log_mref;
struct trans_logger_mars_ref_aspect *mref_a;
struct generic_callback *cb;
struct timespec now;
void *data;
int offset;
bool ok = false;
if (unlikely(!input->log_mref)) {
MARS_ERR("mref is missing\n");
goto err;
}
if (unlikely(len > input->payload_len)) {
MARS_ERR("trying to write more than reserved\n");
goto put;
}
mref_a = trans_logger_mars_ref_get_aspect(&input->hidden_output, mref);
if (unlikely(!mref_a)) {
MARS_ERR("mref_a is missing\n");
goto put;
}
data = mref->ref_data;
/* Correct the length in the header.
*/
offset = input->reallen_offset;
DATA_PUT(data, offset, len);
/* Write the trailer.
*/
offset = input->payload_offset + len;
DATA_PUT(data, offset, END_MAGIC);
DATA_PUT(data, offset, (char)1); // valid_flag copy
DATA_PUT(data, offset, (char)0); // spare
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, (int)0); // spare
now = CURRENT_TIME; // when the log entry was ready.
DATA_PUT(data, offset, now.tv_sec);
DATA_PUT(data, offset, now.tv_nsec);
input->log_pos += offset;
/* This must come last. In case of incomplete
* or even operlapping disk transfers, this indicates
* the completeness / integrity of the payload at
* the time of starting the transfer.
*/
offset = input->validflag_offset;
DATA_PUT(data, offset, (char)1);
cb = &mref_a->cb;
cb->cb_fn = endio;
cb->cb_private = private;
cb->cb_error = 0;
cb->cb_prev = NULL;
mref->ref_cb = cb;
GENERIC_INPUT_CALL(input, mars_ref_io, mref, WRITE);
ok = true;
put:
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
err:
return ok;
}
////////////////////////////////////////////////////////////////////
static inline void q_init(struct logger_queue *q)
{
spin_lock_init(&q->q_lock);
INIT_LIST_HEAD(&q->q_anchor);
}
static inline void q_insert(struct logger_queue *q, struct trans_logger_mars_ref_aspect *mref_a)
{
unsigned long flags;
traced_lock(&q->q_lock, flags);
list_add_tail(&mref_a->q_head, &q->q_anchor);
traced_unlock(&q->q_lock, flags);
}
static inline struct trans_logger_mars_ref_aspect *q_fetch(struct logger_queue *q)
{
struct trans_logger_mars_ref_aspect *mref_a = NULL;
unsigned long flags;
traced_lock(&q->q_lock, flags);
if (likely(!list_empty(&q->q_anchor))) {
mref_a = container_of(q->q_anchor.next, struct trans_logger_mars_ref_aspect, q_head);
list_del_init(q->q_anchor.next);
}
traced_unlock(&q->q_lock, flags);
return mref_a;
}
///////////////////////// own helper functions ////////////////////////
@ -76,6 +278,10 @@ static struct trans_logger_mars_ref_aspect *hash_find(struct hash_anchor *table,
}
}
if (res) {
atomic_inc(&res->hash_count);
}
traced_readunlock(&start->hash_lock, flags);
return res;
@ -83,19 +289,19 @@ static struct trans_logger_mars_ref_aspect *hash_find(struct hash_anchor *table,
static inline void hash_insert(struct hash_anchor *table, struct trans_logger_mars_ref_aspect *elem)
{
unsigned int base_index = ((unsigned int)elem->object->ref_pos) >> REGION_SIZE_BITS;
int hash = hash_fn(base_index);
struct hash_anchor *start = &table[hash];
unsigned int flags;
unsigned int base_index = ((unsigned int)elem->object->ref_pos) >> REGION_SIZE_BITS;
int hash = hash_fn(base_index);
struct hash_anchor *start = &table[hash];
unsigned int flags;
traced_writelock(&start->hash_lock, flags);
traced_writelock(&start->hash_lock, flags);
list_add(&elem->hash_head, &start->hash_anchor);
list_add(&elem->hash_head, &start->hash_anchor);
traced_writeunlock(&start->hash_lock, flags);
traced_writeunlock(&start->hash_lock, flags);
}
static inline void hash_delete(struct hash_anchor *table, struct trans_logger_mars_ref_aspect *elem)
static inline void hash_put(struct hash_anchor *table, struct trans_logger_mars_ref_aspect *elem)
{
unsigned int base_index = ((unsigned int)elem->object->ref_pos) >> REGION_SIZE_BITS;
int hash = hash_fn(base_index);
@ -104,7 +310,10 @@ static inline void hash_delete(struct hash_anchor *table, struct trans_logger_ma
traced_writelock(&start->hash_lock, flags);
list_del_init(&elem->hash_head);
CHECK_ATOMIC(&elem->hash_count, 1);
if (atomic_dec_and_test(&elem->hash_count)) {
list_del_init(&elem->hash_head);
}
traced_writeunlock(&start->hash_lock, flags);
}
@ -117,66 +326,239 @@ static int trans_logger_get_info(struct trans_logger_output *output, struct mars
return GENERIC_INPUT_CALL(input, mars_get_info, info);
}
static int trans_logger_ref_get(struct trans_logger_output *output, struct mars_ref_object *mref)
static int _read_ref_get(struct trans_logger_output *output, struct trans_logger_mars_ref_aspect *mref_a)
{
struct mars_ref_object *mref = mref_a->object;
struct trans_logger_input *input = output->brick->inputs[0];
unsigned int base_offset = ((unsigned int)mref->ref_pos) & (REGION_SIZE - 1);
struct trans_logger_mars_ref_aspect *shadow_a;
/* Look if there is a newer version on the fly, shadowing
* the old one.
* When a shadow is found, use it as buffer for the mref.
*/
shadow_a = hash_find(output->hash_table, mref->ref_pos, mref->ref_len);
if (shadow_a) {
struct mars_ref_object *shadow = shadow_a->object;
int diff = shadow->ref_pos - mref->ref_pos;
int restlen;
if (diff > 0) {
/* Although the shadow is overlapping, the
* region before its start is _not_ shadowed.
* Thus we return this (smaller) unshadowed
* region.
*/
mref->ref_len = diff;
hash_put(output->hash_table, shadow_a);
goto call_through;
}
/* Attach mref to the existing shadow.
*/
restlen = shadow->ref_len + diff;
if (mref->ref_len > restlen)
mref->ref_len = restlen;
mref->ref_data = shadow->ref_data - diff;
mref->ref_flags = shadow->ref_flags;
mref_a->sub_a = shadow_a;
return mref->ref_len;
}
call_through:
return GENERIC_INPUT_CALL(input, mars_ref_get, mref);
}
static int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mars_ref_aspect *mref_a)
{
struct mars_ref_object *mref = mref_a->object;
struct trans_logger_input *input = output->brick->inputs[0];
struct mars_ref_object *shadow;
struct trans_logger_mars_ref_aspect *shadow_a;
// unconditionally create a new shadow buffer
shadow = trans_logger_alloc_mars_ref(&input->hidden_output, &input->ref_object_layout);
shadow_a = trans_logger_mars_ref_get_aspect(output, shadow);
if (unlikely(!shadow_a)) {
MARS_FAT("cannot get my own aspect\n");
trans_logger_free_mars_ref(shadow);
return -ENOSYS;
}
shadow->ref_data = kmalloc(mref->ref_len, GFP_MARS);
if (unlikely(!shadow->ref_data)) {
trans_logger_free_mars_ref(shadow);
return -ENOMEM;
}
shadow->ref_pos = mref->ref_pos;
shadow->ref_len = mref->ref_len;
shadow->ref_may_write = WRITE;
shadow->ref_flags = 0;
hash_insert(output->hash_table, shadow_a);
mref->ref_data = shadow->ref_data;
mref->ref_flags = 0;
mref_a->sub_a = shadow_a;
return mref->ref_len;
}
static int trans_logger_ref_get(struct trans_logger_output *output, struct mars_ref_object *mref)
{
struct trans_logger_mars_ref_aspect *mref_a;
unsigned int base_offset;
mref_a = trans_logger_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
return -EINVAL;
}
base_offset = ((unsigned int)mref->ref_pos) & (REGION_SIZE - 1);
if (base_offset + mref->ref_len > REGION_SIZE)
mref->ref_len = REGION_SIZE - base_offset;
if (mref->ref_rw == READ) {
/* Look if the is a newer version on the fly which shadows
* the old one.
* When a shadow is found, use it as buffer for the mref.
*/
shadow_a = hash_find(output->hash_table, mref->ref_pos, mref->ref_len);
if (shadow_a) {
struct mars_ref_object *shadow = shadow_a->object;
int diff = shadow->ref_pos - mref->ref_pos;
int restlen;
if (diff > 0) {
/* Although the shadow is overlapping, the
* region before its start is _not_ shadowed.
* Thus we return this (smaller) unshadowed
* region.
*/
mref->ref_len = diff;
goto call;
}
restlen = shadow->ref_len + diff;
if (mref->ref_len > restlen)
mref->ref_len = restlen;
//...
return -ENOSYS;
}
call:
return GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (mref->ref_may_write == READ) {
return _read_ref_get(output, mref_a);
}
//...
return -ENOSYS;
return _write_ref_get(output, mref_a);
}
static void trans_logger_ref_put(struct trans_logger_output *output, struct mars_ref_object *mref)
{
struct trans_logger_mars_ref_aspect *mref_a;
struct trans_logger_input *input = output->brick->inputs[0];
struct trans_logger_mars_ref_aspect *shadow_a;
mref_a = trans_logger_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_FAT("cannot get mref_a on %p\n", mref);
return;
}
shadow_a = mref_a->sub_a;
if (shadow_a) {
hash_put(output->hash_table, shadow_a);
CHECK_ATOMIC(&mref->ref_count, 1);
if (!atomic_dec_and_test(&mref->ref_count))
return;
trans_logger_free_mars_ref(mref);
return;
}
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
}
static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_ref_object *mref, int rw)
{
struct trans_logger_mars_ref_aspect *mref_a;
struct trans_logger_input *input = output->brick->inputs[0];
mref_a = trans_logger_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_FAT("cannot get mref_a on %p\n", mref);
return;
}
// is this a shadow buffer?
if (mref_a->sub_a) {
mref_a->output = output;
mref_a->stamp = CURRENT_TIME;
q_insert(&output->q_phase1, mref_a);
return;
}
// only READ is allowed on non-shadow buffers
if (unlikely(rw != READ)) {
MARS_FAT("bad operation %d without shadow\n", rw);
}
GENERIC_INPUT_CALL(input, mars_ref_io, mref, rw);
}
////////////////////////////// worker thread //////////////////////////////
static void phase2_endio(struct generic_callback *cb)
{
//struct trans_logger_mars_ref_aspect *mref_a = cb->cb_private;
//struct mars_ref_object *mref = mref_a->object;
struct generic_callback *cb_prev = cb->cb_prev;
if (unlikely(!cb_prev)) {
MARS_FAT("callback chain is corrupted\n");
return;
}
cb_prev->cb_error = cb->cb_error;
cb_prev->cb_fn(cb_prev);
//trans_logger_ref_put(mref_a->output, mref);
}
static void phase2_startio(struct trans_logger_mars_ref_aspect *mref_a)
{
struct mars_ref_object *mref = mref_a->object;
struct generic_callback *cb = &mref_a->cb;
struct trans_logger_output *output = mref_a->output;
struct trans_logger_input *input = output->brick->inputs[0];
cb->cb_fn = phase2_endio;
cb->cb_private = mref_a;
cb->cb_error = 0;
cb->cb_prev = mref->ref_cb;
mref->ref_cb = cb;
GENERIC_INPUT_CALL(input, mars_ref_io, mref, READ);
}
static void phase1_endio(struct generic_callback *cb)
{
struct trans_logger_mars_ref_aspect *mref_a = cb->cb_private;
//struct mars_ref_object *mref = mref_a->object;
struct trans_logger_output *output = mref_a->output;
struct generic_callback *cb_prev = &mref_a->cb;
cb_prev->cb_error = cb->cb_error;
cb_prev->cb_fn(cb_prev);
q_insert(&output->q_phase2, mref_a);
}
static bool phase1_startio(struct trans_logger_mars_ref_aspect *mref_a)
{
struct mars_ref_object *mref = mref_a->object;
struct trans_logger_output *output = mref_a->output;
struct trans_logger_input *input = output->brick->inputs[1];
struct log_header l = {
.l_stamp = mref_a->stamp,
.l_pos = mref->ref_pos,
.l_len = mref->ref_len,
.l_code = CODE_WRITE_NEW,
};
void *data;
bool ok;
data = log_reserve(input, &l);
if (unlikely(!data)) {
return false;
}
memcpy(data, mref->ref_data, mref->ref_len);
ok = log_finalize(input, mref->ref_len, phase1_endio, mref_a);
if (unlikely(!ok)) {
return false;
}
return true;
}
//////////////// object / aspect constructors / destructors ///////////////
static int trans_logger_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct trans_logger_mars_ref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->hash_head);
(void)ini;
INIT_LIST_HEAD(&ini->q_head);
atomic_set(&ini->hash_count, 1);
return 0;
}
@ -184,7 +566,7 @@ static void trans_logger_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, vo
{
struct trans_logger_mars_ref_aspect *ini = (void*)_ini;
CHECK_HEAD_EMPTY(&ini->hash_head);
(void)ini;
CHECK_HEAD_EMPTY(&ini->q_head);
}
MARS_MAKE_STATICS(trans_logger);
@ -204,6 +586,16 @@ static int trans_logger_output_construct(struct trans_logger_output *output)
rwlock_init(&start->hash_lock);
INIT_LIST_HEAD(&start->hash_anchor);
}
q_init(&output->q_phase1);
q_init(&output->q_phase2);
q_init(&output->q_phase3);
return 0;
}
static int trans_logger_input_construct(struct trans_logger_input *input)
{
struct trans_logger_output *hidden = &input->hidden_output;
_trans_logger_output_init(input->brick, hidden, "internal");
return 0;
}
@ -223,6 +615,7 @@ static struct trans_logger_output_ops trans_logger_output_ops = {
const struct trans_logger_input_type trans_logger_input_type = {
.type_name = "trans_logger_input",
.input_size = sizeof(struct trans_logger_input),
.input_construct = &trans_logger_input_construct,
};
static const struct trans_logger_input_type *trans_logger_input_types[] = {

View File

@ -6,27 +6,63 @@
#define REGION_SIZE (1 << REGION_SIZE_BITS)
#define TRANS_HASH_MAX 32
struct trans_logger_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct list_head hash_head;
#include <linux/time.h>
struct log_header {
struct timespec l_stamp;
loff_t l_pos;
int l_len;
int l_code;
};
struct trans_logger_brick {
MARS_BRICK(trans_logger);
////////////////////////////////////////////////////////////////////
struct logger_queue {
spinlock_t q_lock;
struct list_head q_anchor;
};
struct trans_logger_input {
MARS_INPUT(trans_logger);
};
////////////////////////////////////////////////////////////////////
struct hash_anchor {
rwlock_t hash_lock;
struct list_head hash_anchor;
};
struct trans_logger_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct list_head hash_head;
struct list_head q_head;
atomic_t hash_count;
struct trans_logger_mars_ref_aspect *sub_a;
struct trans_logger_output *output;
struct timespec stamp;
struct generic_callback cb;
};
struct trans_logger_brick {
MARS_BRICK(trans_logger);
};
struct trans_logger_output {
MARS_OUTPUT(trans_logger);
struct hash_anchor hash_table[TRANS_HASH_MAX];
// queues
struct logger_queue q_phase1;
struct logger_queue q_phase2;
struct logger_queue q_phase3;
};
struct trans_logger_input {
MARS_INPUT(trans_logger);
loff_t log_pos;
struct mars_ref_object *log_mref;
int validflag_offset;
int reallen_offset;
int payload_offset;
int payload_len;
struct trans_logger_output hidden_output;
struct generic_object_layout ref_object_layout;
};
MARS_TYPES(trans_logger);