import mars-92.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-04-10 17:59:06 +01:00
parent bf7489cb64
commit 67b4e52cad
2 changed files with 274 additions and 0 deletions

View File

@ -19,6 +19,8 @@
//#define DO_IGNORE // FIXME or DELETE
#define DO_EXTEND
#define NEW_CODE
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
@ -441,6 +443,7 @@ void hash_extend(struct trans_logger_output *output, loff_t *_pos, int *_len, st
// collect upon the first time
if (collect_list && test_a->collect_generation != my_generation) {
test_a->collect_generation = my_generation;
test_a->is_collected = true;
atomic_inc(&test->ref_count); // must be paired with _trans_logger_ref_put()
list_add_tail(&test_a->collect_head, collect_list);
}
@ -882,6 +885,174 @@ err:
MARS_FAT("cannot handle IO\n");
}
////////////////////////////// writeback info //////////////////////////////
static noinline
void free_writeback(struct writeback_info *wb)
{
//...
kfree(wb);
}
static noinline
void wb_endio(struct generic_callback *cb)
{
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
struct trans_logger_output *output;
struct writeback_info *wb;
int rw;
atomic_t *dec;
void (*endio)(struct generic_callback *cb);
sub_mref_a = cb->cb_private;
CHECK_PTR(sub_mref_a, err);
sub_mref = sub_mref_a->object;
CHECK_PTR(sub_mref, err);
output = sub_mref_a->output;
CHECK_PTR(output, err);
wb = sub_mref_a->wb;
CHECK_PTR(wb, err);
atomic_dec(&output->wb_balance_count);
rw = sub_mref->ref_rw;
dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count;
if (atomic_dec_and_test(dec)) {
return;
}
endio = rw ? wb->write_endio : wb->read_endio;
if (endio) {
endio(cb);
}
return;
err:
MARS_FAT("hanging up....\n");
}
static noinline
struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t pos, int len)
{
struct trans_logger_brick *brick = output->brick;
struct trans_logger_input *sub_input = brick->inputs[0];
struct writeback_info *wb = kzalloc(sizeof(struct writeback_info), GFP_MARS);
struct trans_logger_mref_aspect *base_mref_a = NULL;
if (!wb) {
goto err;
}
INIT_LIST_HEAD(&wb->w_collect_list);
INIT_LIST_HEAD(&wb->w_sub_read_list);
INIT_LIST_HEAD(&wb->w_sub_write_list);
wb->w_output = output;
wb->w_pos = pos;
wb->w_len = len;
hash_extend(output, &wb->w_pos, &wb->w_len, &wb->w_collect_list);
pos = wb->w_pos;
len = wb->w_len;
while (len > 0) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
struct mref_object *base_mref;
void *data;
int this_len = len;
int diff;
int status;
base_mref_a = hash_find(output, pos, &this_len, true, NULL);
if (unlikely(!base_mref_a)) {
MARS_FAT("could not find data\n");
goto err;
}
base_mref = base_mref_a->object;
diff = pos - base_mref->ref_pos;
if (unlikely(diff < 0)) {
MARS_FAT("bad diff %d\n", diff);
goto err;
}
data = base_mref_a->shadow_data + diff;
sub_mref = trans_logger_alloc_mref((void*)output, &brick->logst.ref_object_layout);
if (unlikely(!sub_mref)) {
MARS_FAT("cannot alloc sub_mref\n");
goto err;
}
sub_mref->ref_pos = pos;
sub_mref->ref_len = this_len;
sub_mref->ref_may_write = WRITE;
sub_mref->ref_rw = WRITE;
sub_mref->ref_data = data;
sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)output, sub_mref);
CHECK_PTR(sub_mref_a, err);
sub_mref_a->output = output;
sub_mref_a->wb = wb;
sub_mref_a->base_mref_a = base_mref_a;
base_mref_a = NULL;
status = GENERIC_INPUT_CALL(sub_input, mref_get, sub_mref);
if (unlikely(status < 0)) {
MARS_FAT("cannot get sub_ref, status = %d\n", status);
goto err;
}
list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_write_list);
atomic_inc(&wb->w_sub_write_count);
atomic_inc(&output->wb_balance_count);
this_len = sub_mref->ref_len;
pos += this_len;
len -= this_len;
}
return wb;
err:
if (base_mref_a) {
_trans_logger_ref_put(output, base_mref_a->object);
}
if (wb) {
free_writeback(wb);
}
return NULL;
}
static noinline
void fire_writeback(struct writeback_info *wb, struct list_head *start)
{
struct trans_logger_output *output = wb->w_output;
struct trans_logger_brick *brick = output->brick;
struct trans_logger_input *sub_input = brick->inputs[0];
struct list_head *tmp;
while ((tmp = start->next) != start) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
struct generic_callback *cb;
list_del_init(tmp);
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
cb = &sub_mref_a->cb;
cb->cb_fn = wb_endio;
cb->cb_private = sub_mref_a;
cb->cb_error = 0;
cb->cb_prev = NULL;
sub_mref->ref_cb = cb;
GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref);
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
}
}
////////////////////////////// worker thread //////////////////////////////
/*********************************************************************
@ -1087,6 +1258,85 @@ err:
* old version from disk somewhen later, e.g. when IO contention is low.
*/
#ifdef NEW_CODE
static noinline
void new_endio(struct generic_callback *cb)
{
struct trans_logger_mref_aspect *sub_mref_a;
struct trans_logger_output *output;
struct writeback_info *wb;
struct list_head *tmp;
CHECK_PTR(cb, err);
sub_mref_a = cb->cb_private;
CHECK_PTR(sub_mref_a, err);
output = sub_mref_a->output;
CHECK_PTR(output, err);
wb = sub_mref_a->wb;
CHECK_PTR(wb, err);
if (unlikely(cb->cb_error < 0)) {
MARS_FAT("IO error %d\n", cb->cb_error);
goto err;
}
while ((tmp = wb->w_collect_list.next) != &wb->w_collect_list) {
struct trans_logger_mref_aspect *orig_mref_a;
struct mref_object *orig_mref;
list_del_init(tmp);
orig_mref_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head);
orig_mref = orig_mref_a->object;
GENERIC_INPUT_CALL(output->brick->inputs[0], mref_put, orig_mref);
}
return;
err:
MARS_FAT("hanging up....\n");
}
static noinline
bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
{
struct mref_object *orig_mref;
struct trans_logger_output *output;
struct writeback_info *wb;
CHECK_PTR(orig_mref_a, err);
orig_mref = orig_mref_a->object;
CHECK_PTR(orig_mref, err);
output = orig_mref_a->output;
CHECK_PTR(output, err);
if (orig_mref_a->is_collected) {
MARS_IO("already collected, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len);
goto done;
}
wb = make_writeback(output, orig_mref->ref_pos, orig_mref->ref_len);
if (!wb) {
goto err;
}
wb->write_endio = new_endio;
fire_writeback(wb, &wb->w_sub_write_list);
done:
#ifdef CLEAN_ALL
_trans_logger_ref_put(output, orig_mref);
#endif
return true;
err:
return false;
}
#else // NEW_CODE
static noinline
void _phase2_endio(struct trans_logger_mref_aspect *orig_mref_a)
{
@ -1263,10 +1513,14 @@ err:
return false;
}
#endif // NEW_CODE
/*********************************************************************
* Phase 3: log the old disk version.
*/
#ifndef NEW_CODE
static noinline
void _phase3_endio(struct trans_logger_mref_aspect *orig_mref_a)
{
@ -1388,10 +1642,14 @@ err:
return false;
}
#endif // NEW_CODE
/*********************************************************************
* Phase 4: overwrite old disk version with new version.
*/
#ifndef NEW_CODE
static noinline
void _phase4_endio(struct trans_logger_mref_aspect *orig_mref_a)
{
@ -1650,6 +1908,7 @@ err:
return false;
}
#endif // NEW_CODE
/*********************************************************************
* The logger thread.
@ -1732,8 +1991,10 @@ void trans_logger_log(struct trans_logger_output *output)
atomic_read(&output->q_phase1.q_queued) > 0 ||
#ifdef USE_HIGHER_PHASES
q_is_ready(&output->q_phase2) ||
#ifndef NEW_CODE
q_is_ready(&output->q_phase3) ||
q_is_ready(&output->q_phase4) ||
#endif
#endif
(kthread_should_stop() && !_congested(output)),
wait_timeout);
@ -1788,17 +2049,21 @@ void trans_logger_log(struct trans_logger_output *output)
log_jiffies = 0;
}
#ifdef USE_HIGHER_PHASES
#ifndef NEW_CODE
if (q_is_ready(&output->q_phase4)) {
(void)run_queue(output, &output->q_phase4, phase4_startio, output->q_phase4.q_batchlen);
}
#endif
if (q_is_ready(&output->q_phase2)) {
(void)run_queue(output, &output->q_phase2, phase2_startio, output->q_phase2.q_batchlen);
}
#ifndef NEW_CODE
if (q_is_ready(&output->q_phase3)) {
status = run_queue(output, &output->q_phase3, phase3_startio, output->q_phase3.q_batchlen);
}
#endif
#endif
if (output->did_pushback) {
#if 0

View File

@ -54,11 +54,16 @@ struct hash_anchor {
};
struct writeback_info {
struct trans_logger_output *w_output;
loff_t w_pos;
int w_len;
struct list_head w_collect_list; // list of collected orig requests
struct list_head w_sub_read_list; // for saving the old data before overwrite
struct list_head w_sub_write_list; // for overwriting
atomic_t w_sub_read_count;
atomic_t w_sub_write_count;
void (*read_endio)(struct generic_callback *cb);
void (*write_endio)(struct generic_callback *cb);
};
struct trans_logger_mref_aspect {
@ -76,12 +81,15 @@ struct trans_logger_mref_aspect {
bool do_buffered;
bool is_hashed;
bool is_dirty;
bool is_collected;
bool ignore_this;
struct timespec stamp;
loff_t log_pos;
loff_t fetch_margin;
struct generic_callback cb;
struct trans_logger_mref_aspect *orig_mref_a;
struct writeback_info *wb;
struct trans_logger_mref_aspect *base_mref_a;
struct list_head sub_list;
struct list_head sub_head;
int total_sub_count;
@ -141,6 +149,7 @@ struct trans_logger_output {
atomic_t outer_balance_count;
atomic_t inner_balance_count;
atomic_t sub_balance_count;
atomic_t wb_balance_count;
atomic_t total_read_count;
atomic_t total_write_count;
atomic_t total_writeback_count;