mirror of https://github.com/schoebel/mars
import mars-100.tgz
This commit is contained in:
parent
a6d10aaa72
commit
3245a9ea55
|
@ -20,6 +20,8 @@
|
|||
#include <linux/namei.h>
|
||||
#include <linux/kthread.h>
|
||||
|
||||
#define SKIP_BIO false
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
||||
// meta descriptions
|
||||
|
@ -1222,7 +1224,7 @@ struct mars_brick *make_brick_all(
|
|||
if (!brick && new_brick_type == _bio_brick_type && _aio_brick_type) {
|
||||
struct kstat test = {};
|
||||
int status = mars_stat(new_path, &test, false);
|
||||
if (true || status < 0 || !S_ISBLK(test.mode)) {
|
||||
if (SKIP_BIO || status < 0 || !S_ISBLK(test.mode)) {
|
||||
new_brick_type = _aio_brick_type;
|
||||
MARS_DBG("substitute bio by aio\n");
|
||||
}
|
||||
|
|
|
@ -63,6 +63,8 @@ struct light_class {
|
|||
#define CONF_TRANS_BATCHLEN 32
|
||||
#define CONF_TRANS_FLYING 4
|
||||
#define CONF_TRANS_PRIO MARS_PRIO_HIGH
|
||||
#define CONF_TRANS_LOG_READS false
|
||||
//#define CONF_TRANS_LOG_READS true
|
||||
|
||||
//#define CONF_ALL_BATCHLEN 2
|
||||
#define CONF_ALL_BATCHLEN 1
|
||||
|
@ -125,7 +127,7 @@ void _set_trans_params(struct mars_brick *_brick, void *private)
|
|||
|
||||
trans_brick->outputs[0]->q_phase2.q_ordering = true;
|
||||
trans_brick->outputs[0]->q_phase4.q_ordering = true;
|
||||
trans_brick->log_reads = false;
|
||||
trans_brick->log_reads = CONF_TRANS_LOG_READS;
|
||||
#ifdef TRANS_FAKE
|
||||
trans_brick->debug_shortcut = true;
|
||||
#endif
|
||||
|
|
|
@ -10,15 +10,13 @@
|
|||
// variants
|
||||
#define KEEP_UNIQUE
|
||||
//#define USE_KMALLOC
|
||||
#define HIGHER_ORDER
|
||||
//#define WB_COPY
|
||||
|
||||
// changing this is dangerous for data integrity! use only for testing!
|
||||
#define USE_MEMCPY
|
||||
#define USE_HIGHER_PHASES
|
||||
#define APPLY_DATA
|
||||
|
||||
#define NEW_CODE
|
||||
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/module.h>
|
||||
#include <linux/string.h>
|
||||
|
@ -148,7 +146,7 @@ always_done:
|
|||
}
|
||||
|
||||
static inline
|
||||
void qq_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
||||
void qq_mref_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
||||
{
|
||||
struct mref_object *mref = mref_a->object;
|
||||
CHECK_ATOMIC(&mref->ref_count, 1);
|
||||
|
@ -161,7 +159,13 @@ void qq_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
|||
}
|
||||
|
||||
static inline
|
||||
void qq_pushback(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
||||
void qq_wb_insert(struct logger_queue *q, struct writeback_info *wb)
|
||||
{
|
||||
q_logger_insert(q, &wb->w_lh);
|
||||
}
|
||||
|
||||
static inline
|
||||
void qq_mref_pushback(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
||||
{
|
||||
CHECK_ATOMIC(&mref_a->object->ref_count, 1);
|
||||
|
||||
|
@ -171,7 +175,13 @@ void qq_pushback(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a
|
|||
}
|
||||
|
||||
static inline
|
||||
struct trans_logger_mref_aspect *qq_fetch(struct logger_queue *q)
|
||||
void qq_wb_pushback(struct logger_queue *q, struct writeback_info *wb)
|
||||
{
|
||||
q_logger_pushback(q, &wb->w_lh);
|
||||
}
|
||||
|
||||
static inline
|
||||
struct trans_logger_mref_aspect *qq_mref_fetch(struct logger_queue *q)
|
||||
{
|
||||
struct logger_head *test;
|
||||
struct trans_logger_mref_aspect *mref_a = NULL;
|
||||
|
@ -186,6 +196,20 @@ struct trans_logger_mref_aspect *qq_fetch(struct logger_queue *q)
|
|||
return mref_a;
|
||||
}
|
||||
|
||||
static inline
|
||||
struct writeback_info *qq_wb_fetch(struct logger_queue *q)
|
||||
{
|
||||
struct logger_head *test;
|
||||
struct writeback_info *res = NULL;
|
||||
|
||||
test = q_logger_fetch(q);
|
||||
|
||||
if (test) {
|
||||
res = container_of(test, struct writeback_info, w_lh);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
///////////////////////// own helper functions ////////////////////////
|
||||
|
||||
|
||||
|
@ -514,6 +538,10 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
|
|||
struct mref_object *mref = mref_a->object;
|
||||
struct page *page;
|
||||
void *data;
|
||||
#ifndef USE_KMALLOC
|
||||
int offset;
|
||||
int order = 0;
|
||||
#endif
|
||||
|
||||
#ifdef KEEP_UNIQUE
|
||||
struct trans_logger_mref_aspect *mshadow_a;
|
||||
|
@ -528,15 +556,22 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
|
|||
data = kmalloc(mref->ref_len, GFP_MARS);
|
||||
mref->ref_page = NULL;
|
||||
#else
|
||||
//TODO: allow higher-order pages
|
||||
if (mref->ref_len > PAGE_SIZE)
|
||||
mref->ref_len = PAGE_SIZE;
|
||||
page = alloc_page(GFP_MARS);
|
||||
offset = mref->ref_pos & (PAGE_SIZE-1);
|
||||
#ifdef HIGHER_ORDER
|
||||
order = 4;
|
||||
while (order > 0 && mref->ref_len + offset < (PAGE_SIZE << (order-1))) {
|
||||
order--;
|
||||
}
|
||||
#endif
|
||||
if (mref->ref_len > (PAGE_SIZE << order) - offset) {
|
||||
mref->ref_len = (PAGE_SIZE << order) - offset;
|
||||
}
|
||||
page = alloc_pages(GFP_MARS, order);
|
||||
if (unlikely(!page)) {
|
||||
return -ENOMEM;
|
||||
}
|
||||
mref->ref_page = page;
|
||||
data = page_address(page);
|
||||
data = page_address(page) + offset;
|
||||
#endif
|
||||
if (unlikely(!data)) {
|
||||
return -ENOMEM;
|
||||
|
@ -780,7 +815,7 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
|
|||
atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put()
|
||||
atomic_inc(&output->inner_balance_count);
|
||||
|
||||
qq_insert(&output->q_phase1, mref_a);
|
||||
qq_mref_insert(&output->q_phase1, mref_a);
|
||||
wake_up_interruptible(&output->event);
|
||||
return;
|
||||
}
|
||||
|
@ -946,17 +981,19 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
|
|||
if (!wb) {
|
||||
goto err;
|
||||
}
|
||||
INIT_LIST_HEAD(&wb->w_collect_list);
|
||||
INIT_LIST_HEAD(&wb->w_sub_read_list);
|
||||
INIT_LIST_HEAD(&wb->w_sub_write_list);
|
||||
wb->w_output = output;
|
||||
|
||||
wb->w_pos = pos;
|
||||
wb->w_len = len;
|
||||
if (unlikely(len < 0)) {
|
||||
MARS_ERR("len = %d\n", len);
|
||||
}
|
||||
|
||||
wb->w_output = output;
|
||||
wb->w_pos = pos;
|
||||
wb->w_len = len;
|
||||
wb->w_lh.lh_pos = &wb->w_pos;
|
||||
INIT_LIST_HEAD(&wb->w_lh.lh_head);
|
||||
INIT_LIST_HEAD(&wb->w_collect_list);
|
||||
INIT_LIST_HEAD(&wb->w_sub_read_list);
|
||||
INIT_LIST_HEAD(&wb->w_sub_write_list);
|
||||
|
||||
/* Atomically fetch transitive closure on all requests
|
||||
* overlapping with the current search region.
|
||||
*/
|
||||
|
@ -1091,32 +1128,46 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static inline
|
||||
void _fire_one(struct trans_logger_input *sub_input, struct list_head *tmp, bool do_put)
|
||||
{
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct mref_object *sub_mref;
|
||||
struct generic_callback *cb;
|
||||
|
||||
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
|
||||
sub_mref = sub_mref_a->object;
|
||||
|
||||
cb = &sub_mref_a->cb;
|
||||
cb->cb_fn = wb_endio;
|
||||
cb->cb_private = sub_mref_a;
|
||||
cb->cb_error = 0;
|
||||
cb->cb_prev = NULL;
|
||||
sub_mref->ref_cb = cb;
|
||||
|
||||
GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref);
|
||||
if (do_put) {
|
||||
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
|
||||
}
|
||||
}
|
||||
|
||||
static noinline
|
||||
void fire_writeback(struct writeback_info *wb, struct list_head *start)
|
||||
void fire_writeback(struct writeback_info *wb, struct list_head *start, bool do_remove)
|
||||
{
|
||||
struct trans_logger_output *output = wb->w_output;
|
||||
struct trans_logger_brick *brick = output->brick;
|
||||
struct trans_logger_input *sub_input = brick->inputs[0];
|
||||
struct list_head *tmp;
|
||||
|
||||
while ((tmp = start->next) != start) {
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct mref_object *sub_mref;
|
||||
struct generic_callback *cb;
|
||||
|
||||
list_del_init(tmp);
|
||||
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
|
||||
sub_mref = sub_mref_a->object;
|
||||
|
||||
cb = &sub_mref_a->cb;
|
||||
cb->cb_fn = wb_endio;
|
||||
cb->cb_private = sub_mref_a;
|
||||
cb->cb_error = 0;
|
||||
cb->cb_prev = NULL;
|
||||
sub_mref->ref_cb = cb;
|
||||
|
||||
GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref);
|
||||
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
|
||||
if (do_remove) {
|
||||
while ((tmp = start->next) != start) {
|
||||
list_del_init(tmp);
|
||||
_fire_one(sub_input, tmp, true);
|
||||
}
|
||||
} else {
|
||||
for (tmp = start->next; tmp != start; tmp = tmp->next) {
|
||||
_fire_one(sub_input, tmp, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1162,7 +1213,7 @@ void phase1_endio(void *private, int error)
|
|||
orig_cb->cb_fn(orig_cb);
|
||||
|
||||
// queue up for the next phase
|
||||
qq_insert(&output->q_phase2, orig_mref_a);
|
||||
qq_mref_insert(&output->q_phase2, orig_mref_a);
|
||||
wake_up_interruptible(&output->event);
|
||||
return;
|
||||
err:
|
||||
|
@ -1311,8 +1362,6 @@ err:
|
|||
* old version from disk somewhen later, e.g. when IO contention is low.
|
||||
*/
|
||||
|
||||
atomic_t provisionary_count = ATOMIC_INIT(0);
|
||||
|
||||
static noinline
|
||||
void phase2_endio(struct generic_callback *cb)
|
||||
{
|
||||
|
@ -1333,11 +1382,10 @@ void phase2_endio(struct generic_callback *cb)
|
|||
goto err;
|
||||
}
|
||||
|
||||
atomic_dec(&provisionary_count);
|
||||
|
||||
atomic_dec(&output->q_phase2.q_flying);
|
||||
|
||||
// queue up for the next phase
|
||||
//qq_insert(&output->q_phase3, orig_mref_a);
|
||||
qq_wb_insert(&output->q_phase3, wb);
|
||||
wake_up_interruptible(&output->event);
|
||||
return;
|
||||
|
||||
|
@ -1347,6 +1395,8 @@ err:
|
|||
|
||||
static noinline
|
||||
void phase4_endio(struct generic_callback *cb);
|
||||
static noinline
|
||||
bool phase4_startio(struct writeback_info *wb);
|
||||
|
||||
static noinline
|
||||
bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
||||
|
@ -1374,8 +1424,6 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
|||
goto err;
|
||||
}
|
||||
|
||||
atomic_inc(&provisionary_count);
|
||||
|
||||
if (unlikely(list_empty(&wb->w_collect_list))) {
|
||||
MARS_ERR("collection list is empty, orig pos = %lld len = %d (collected=%d), extended pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len, (int)orig_mref_a->is_collected, wb->w_pos, wb->w_len);
|
||||
free_writeback(wb);
|
||||
|
@ -1389,7 +1437,14 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
|||
|
||||
wb->read_endio = phase2_endio;
|
||||
wb->write_endio = phase4_endio;
|
||||
fire_writeback(wb, &wb->w_sub_write_list);
|
||||
atomic_set(&wb->w_sub_log_count, atomic_read(&wb->w_sub_read_count));
|
||||
|
||||
if (output->brick->log_reads) {
|
||||
atomic_inc(&output->q_phase2.q_flying);
|
||||
fire_writeback(wb, &wb->w_sub_read_list, false);
|
||||
} else { // shortcut
|
||||
return phase4_startio(wb);
|
||||
}
|
||||
|
||||
done:
|
||||
return true;
|
||||
|
@ -1403,23 +1458,15 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
|||
* Phase 3: log the old disk version.
|
||||
*/
|
||||
|
||||
#ifndef NEW_CODE
|
||||
|
||||
static noinline
|
||||
void _phase3_endio(struct trans_logger_mref_aspect *orig_mref_a)
|
||||
static inline
|
||||
void _phase3_endio(struct writeback_info *wb)
|
||||
{
|
||||
struct trans_logger_output *output;
|
||||
|
||||
output = orig_mref_a->output;
|
||||
CHECK_PTR(output, err);
|
||||
struct trans_logger_output *output = wb->w_output;
|
||||
|
||||
// queue up for the next phase
|
||||
qq_insert(&output->q_phase4, orig_mref_a);
|
||||
qq_wb_insert(&output->q_phase4, wb);
|
||||
wake_up_interruptible(&output->event);
|
||||
return;
|
||||
|
||||
err:
|
||||
MARS_FAT("hanging up....\n");
|
||||
}
|
||||
|
||||
static noinline
|
||||
|
@ -1427,14 +1474,14 @@ void phase3_endio(void *private, int error)
|
|||
{
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct trans_logger_output *output;
|
||||
struct trans_logger_mref_aspect *orig_mref_a;
|
||||
struct writeback_info *wb;
|
||||
|
||||
sub_mref_a = private;
|
||||
CHECK_PTR(sub_mref_a, err);
|
||||
output = sub_mref_a->output;
|
||||
CHECK_PTR(output, err);
|
||||
orig_mref_a = sub_mref_a->orig_mref_a;
|
||||
CHECK_PTR(orig_mref_a, err);
|
||||
wb = sub_mref_a->wb;
|
||||
CHECK_PTR(wb, err);
|
||||
|
||||
atomic_dec(&output->q_phase3.q_flying);
|
||||
|
||||
|
@ -1443,8 +1490,9 @@ void phase3_endio(void *private, int error)
|
|||
goto err; // FIXME: this leads to hanging requests. do better.
|
||||
}
|
||||
|
||||
if (atomic_dec_and_test(&orig_mref_a->current_sub_count)) {
|
||||
_phase3_endio(orig_mref_a);
|
||||
CHECK_ATOMIC(&wb->w_sub_log_count, 1);
|
||||
if (atomic_dec_and_test(&wb->w_sub_log_count)) {
|
||||
_phase3_endio(wb);
|
||||
}
|
||||
return;
|
||||
|
||||
|
@ -1496,38 +1544,52 @@ err:
|
|||
}
|
||||
|
||||
static noinline
|
||||
bool phase3_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
||||
bool phase3_startio(struct writeback_info *wb)
|
||||
{
|
||||
struct trans_logger_output *output;
|
||||
struct trans_logger_brick *brick;
|
||||
struct trans_logger_input *sub_input;
|
||||
bool ok = true;
|
||||
|
||||
CHECK_PTR(orig_mref_a, err);
|
||||
output = orig_mref_a->output;
|
||||
CHECK_PTR(wb, err);
|
||||
output = wb->w_output;
|
||||
CHECK_PTR(output, err);
|
||||
brick = output->brick;
|
||||
CHECK_PTR(brick, err);
|
||||
sub_input = brick->inputs[0];
|
||||
CHECK_PTR(sub_input, err);
|
||||
|
||||
if (output->brick->log_reads && orig_mref_a->total_sub_count > 0) {
|
||||
if (output->brick->log_reads && atomic_read(&wb->w_sub_log_count) > 0) {
|
||||
struct list_head *start;
|
||||
struct list_head *tmp;
|
||||
|
||||
atomic_set(&orig_mref_a->current_sub_count, orig_mref_a->total_sub_count);
|
||||
for (tmp = orig_mref_a->sub_list.next; tmp != &orig_mref_a->sub_list; tmp = tmp->next) {
|
||||
start = &wb->w_sub_read_list;
|
||||
while ((tmp = start->next) != start) {
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct mref_object *sub_mref;
|
||||
|
||||
list_del_init(tmp);
|
||||
|
||||
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
|
||||
sub_mref = sub_mref_a->object;
|
||||
|
||||
atomic_inc(&output->q_phase3.q_flying);
|
||||
mars_trace(sub_mref, "sub_log");
|
||||
_phase3_startio(sub_mref_a);
|
||||
|
||||
if (!_phase3_startio(sub_mref_a)) {
|
||||
ok = false;
|
||||
}
|
||||
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
|
||||
}
|
||||
wake_up_interruptible(&output->event);
|
||||
} else {
|
||||
_phase3_endio(orig_mref_a);
|
||||
_phase3_endio(wb);
|
||||
}
|
||||
return true;
|
||||
return ok;
|
||||
err:
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif // NEW_CODE
|
||||
|
||||
/*********************************************************************
|
||||
* Phase 4: overwrite old disk version with new version.
|
||||
*/
|
||||
|
@ -1554,93 +1616,12 @@ void phase4_endio(struct generic_callback *cb)
|
|||
|
||||
hash_put_all(wb->w_output, &wb->w_collect_list);
|
||||
|
||||
free_writeback(wb);
|
||||
|
||||
atomic_dec(&provisionary_count);
|
||||
wake_up_interruptible(&output->event);
|
||||
|
||||
return;
|
||||
|
||||
err:
|
||||
MARS_FAT("hanging up....\n");
|
||||
}
|
||||
|
||||
|
||||
#ifndef NEW_CODE
|
||||
|
||||
static noinline
|
||||
void _phase4_endio(struct trans_logger_mref_aspect *orig_mref_a)
|
||||
{
|
||||
struct mref_object *orig_mref;
|
||||
struct trans_logger_output *output;
|
||||
struct trans_logger_brick *brick;
|
||||
struct list_head *tmp;
|
||||
unsigned long flags;
|
||||
|
||||
orig_mref = orig_mref_a->object;
|
||||
CHECK_PTR(orig_mref, err);
|
||||
output = orig_mref_a->output;
|
||||
CHECK_PTR(output, err);
|
||||
brick = output->brick;
|
||||
CHECK_PTR(brick, err);
|
||||
|
||||
// save final completion status
|
||||
traced_lock(&brick->pos_lock, flags);
|
||||
tmp = &orig_mref_a->pos_head;
|
||||
if (tmp == brick->pos_list.next) {
|
||||
loff_t finished = orig_mref_a->log_pos;
|
||||
if (finished <= brick->replay_pos) {
|
||||
MARS_ERR("backskip in log replay: %lld -> %lld\n", brick->replay_pos, orig_mref_a->log_pos);
|
||||
}
|
||||
brick->replay_pos = finished;
|
||||
}
|
||||
list_del_init(tmp);
|
||||
traced_unlock(&brick->pos_lock, flags);
|
||||
|
||||
//MARS_DBG("put ORIGREF.\n");
|
||||
CHECK_ATOMIC(&orig_mref->ref_count, 1);
|
||||
if (orig_mref_a->is_dirty) {
|
||||
MARS_ERR("dirty pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len);
|
||||
//...
|
||||
} else {
|
||||
__trans_logger_ref_put(orig_mref_a->output, orig_mref_a);
|
||||
}
|
||||
wake_up_interruptible(&output->event);
|
||||
return;
|
||||
|
||||
err:
|
||||
MARS_FAT("hanging up....\n");
|
||||
}
|
||||
|
||||
static noinline
|
||||
void phase4_endio(struct generic_callback *cb)
|
||||
{
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct trans_logger_output *output;
|
||||
struct trans_logger_mref_aspect *orig_mref_a;
|
||||
|
||||
CHECK_PTR(cb, err);
|
||||
sub_mref_a = cb->cb_private;
|
||||
CHECK_PTR(sub_mref_a, err);
|
||||
output = sub_mref_a->output;
|
||||
CHECK_PTR(output, err);
|
||||
orig_mref_a = sub_mref_a->orig_mref_a;
|
||||
CHECK_PTR(orig_mref_a, err);
|
||||
|
||||
mars_trace(sub_mref, "sub_endio");
|
||||
mars_log_trace(sub_mref);
|
||||
|
||||
atomic_dec(&output->q_phase4.q_flying);
|
||||
|
||||
if (unlikely(cb->cb_error < 0)) {
|
||||
MARS_FAT("IO error %d\n", cb->cb_error);
|
||||
goto err;
|
||||
}
|
||||
free_writeback(wb);
|
||||
|
||||
wake_up_interruptible(&output->event);
|
||||
|
||||
if (atomic_dec_and_test(&orig_mref_a->current_sub_count)) {
|
||||
_phase4_endio(orig_mref_a);
|
||||
}
|
||||
return;
|
||||
|
||||
err:
|
||||
|
@ -1649,158 +1630,20 @@ err:
|
|||
|
||||
|
||||
static noinline
|
||||
bool get_newest_data(struct trans_logger_output *output, void *buf, loff_t pos, int len, struct trans_logger_mref_aspect *orig_mref_a)
|
||||
bool phase4_startio(struct writeback_info *wb)
|
||||
{
|
||||
while (len > 0) {
|
||||
struct trans_logger_mref_aspect *src_a;
|
||||
struct mref_object *src;
|
||||
int diff;
|
||||
int this_len = len;
|
||||
|
||||
src_a = hash_find(output, pos, &this_len);
|
||||
if (unlikely(!src_a)) {
|
||||
MARS_ERR("data is GONE at pos = %lld len = %d\n", pos, len);
|
||||
return false;
|
||||
}
|
||||
if (unlikely(!src_a->shadow_ref)) {
|
||||
MARS_ERR("no shadow at pos = %lld len = %d\n", pos, len);
|
||||
return false;
|
||||
}
|
||||
#ifdef KEEP_UNIQUE
|
||||
if (unlikely(src_a->shadow_ref != orig_mref_a->shadow_ref)) {
|
||||
MARS_ERR("different shadows at pos = %lld len = %d: %p -> %p pos = %lld len = %d / %p -> %p pos = %lld len = %d\n", pos, len, src_a, src_a->shadow_ref, src_a->shadow_ref->object->ref_pos, src_a->shadow_ref->object->ref_len, orig_mref_a, orig_mref_a->shadow_ref, orig_mref_a->shadow_ref->object->ref_pos, orig_mref_a->shadow_ref->object->ref_len);
|
||||
}
|
||||
#else
|
||||
if (unlikely(src_a->shadow_ref != src_a)) {
|
||||
MARS_ERR("invalid master shadow at pos = %lld len = %d\n", pos, len);
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
src = src_a->object;
|
||||
CHECK_ATOMIC(&src->ref_count, 1);
|
||||
|
||||
diff = pos - src->ref_pos;
|
||||
if (unlikely(diff < 0 || diff + this_len > src->ref_len)) {
|
||||
MARS_ERR("bad diff %d (found len = %d, this_len = %d)\n", diff, src->ref_len, this_len);
|
||||
return false;
|
||||
}
|
||||
memcpy(buf, src_a->shadow_data + diff, this_len);
|
||||
|
||||
len -= this_len;
|
||||
pos += this_len;
|
||||
buf += this_len;
|
||||
|
||||
__trans_logger_ref_put(output, src_a);
|
||||
}
|
||||
atomic_inc(&wb->w_output->q_phase4.q_flying);
|
||||
fire_writeback(wb, &wb->w_sub_write_list, true);
|
||||
return true;
|
||||
}
|
||||
|
||||
static noinline
|
||||
bool _phase4_startio(struct trans_logger_mref_aspect *sub_mref_a)
|
||||
{
|
||||
struct mref_object *sub_mref = NULL;
|
||||
struct generic_callback *cb;
|
||||
struct trans_logger_output *output;
|
||||
struct trans_logger_input *sub_input;
|
||||
struct trans_logger_mref_aspect *orig_mref_a;
|
||||
int status;
|
||||
bool ok;
|
||||
|
||||
CHECK_PTR(sub_mref_a, err);
|
||||
sub_mref = sub_mref_a->object;
|
||||
CHECK_PTR(sub_mref, err);
|
||||
output = sub_mref_a->output;
|
||||
CHECK_PTR(output, err);
|
||||
CHECK_PTR(output->brick, err);
|
||||
sub_input = output->brick->inputs[0];
|
||||
CHECK_PTR(sub_input, err);
|
||||
orig_mref_a = sub_mref_a->orig_mref_a;
|
||||
CHECK_PTR(orig_mref_a, err);
|
||||
|
||||
orig_mref_a->is_dirty = false;
|
||||
orig_mref_a->shadow_ref->is_dirty = false;
|
||||
|
||||
status = 0;
|
||||
ok = get_newest_data(output, sub_mref->ref_data, sub_mref->ref_pos, sub_mref->ref_len, orig_mref_a);
|
||||
if (unlikely(!ok)) {
|
||||
MARS_ERR("cannot get data at pos = %lld len = %d\n", sub_mref->ref_pos, sub_mref->ref_len);
|
||||
status = -EIO;
|
||||
}
|
||||
|
||||
cb = &sub_mref_a->cb;
|
||||
cb->cb_fn = phase4_endio;
|
||||
cb->cb_private = sub_mref_a;
|
||||
cb->cb_error = status;
|
||||
cb->cb_prev = NULL;
|
||||
sub_mref->ref_cb = cb;
|
||||
sub_mref->ref_rw = WRITE;
|
||||
sub_mref->ref_prio = output->q_phase4.q_io_prio;
|
||||
|
||||
atomic_inc(&output->q_phase4.q_flying);
|
||||
|
||||
mars_log_trace(sub_mref);
|
||||
mars_trace(sub_mref, "sub_start");
|
||||
|
||||
if (status < 0 || output->brick->debug_shortcut) {
|
||||
MARS_IO("SHORTCUT %d\n", sub_mref->ref_len);
|
||||
atomic_inc(&output->total_shortcut_count);
|
||||
phase4_endio(cb);
|
||||
} else {
|
||||
atomic_inc(&output->total_writeback_count);
|
||||
GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref);
|
||||
}
|
||||
|
||||
MARS_IO("put SUBREF.\n");
|
||||
|
||||
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
|
||||
atomic_dec(&output->sub_balance_count);
|
||||
return true;
|
||||
|
||||
err:
|
||||
MARS_ERR("cannot start phase 4 IO %p\n", sub_mref);
|
||||
return false;
|
||||
}
|
||||
|
||||
static noinline
|
||||
bool phase4_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
||||
{
|
||||
struct trans_logger_output *output;
|
||||
|
||||
CHECK_PTR(orig_mref_a, err);
|
||||
output = orig_mref_a->output;
|
||||
CHECK_PTR(output, err);
|
||||
|
||||
if (orig_mref_a->total_sub_count > 0) {
|
||||
struct list_head *tmp;
|
||||
|
||||
atomic_set(&orig_mref_a->current_sub_count, orig_mref_a->total_sub_count);
|
||||
while ((tmp = orig_mref_a->sub_list.next) != &orig_mref_a->sub_list) {
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct mref_object *sub_mref;
|
||||
list_del_init(tmp);
|
||||
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
|
||||
sub_mref = sub_mref_a->object;
|
||||
mars_trace(sub_mref, "sub_write");
|
||||
_phase4_startio(sub_mref_a);
|
||||
}
|
||||
wake_up_interruptible(&output->event);
|
||||
} else {
|
||||
_phase4_endio(orig_mref_a);
|
||||
}
|
||||
return true;
|
||||
err:
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif // NEW_CODE
|
||||
|
||||
/*********************************************************************
|
||||
* The logger thread.
|
||||
* There is only a single instance, dealing with all requests in parallel.
|
||||
*/
|
||||
|
||||
static noinline
|
||||
int run_queue(struct trans_logger_output *output, struct logger_queue *q, bool (*startio)(struct trans_logger_mref_aspect *sub_mref_a), int max)
|
||||
int run_mref_queue(struct logger_queue *q, bool (*startio)(struct trans_logger_mref_aspect *sub_mref_a), int max)
|
||||
{
|
||||
struct trans_logger_mref_aspect *mref_a;
|
||||
bool found = false;
|
||||
|
@ -1808,7 +1651,7 @@ int run_queue(struct trans_logger_output *output, struct logger_queue *q, bool (
|
|||
int res;
|
||||
|
||||
while (max-- > 0) {
|
||||
mref_a = qq_fetch(q);
|
||||
mref_a = qq_mref_fetch(q);
|
||||
res = -1;
|
||||
if (!mref_a)
|
||||
goto done;
|
||||
|
@ -1817,18 +1660,51 @@ int run_queue(struct trans_logger_output *output, struct logger_queue *q, bool (
|
|||
|
||||
ok = startio(mref_a);
|
||||
if (unlikely(!ok)) {
|
||||
qq_pushback(q, mref_a);
|
||||
output->did_pushback = true;
|
||||
qq_mref_pushback(q, mref_a);
|
||||
q->q_output->did_pushback = true;
|
||||
res = 1;
|
||||
goto done;
|
||||
}
|
||||
__trans_logger_ref_put(output, mref_a);
|
||||
__trans_logger_ref_put(q->q_output, mref_a);
|
||||
}
|
||||
res = 0;
|
||||
|
||||
done:
|
||||
if (found) {
|
||||
wake_up_interruptible(&output->event);
|
||||
wake_up_interruptible(&q->q_output->event);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static noinline
|
||||
int run_wb_queue(struct logger_queue *q, bool (*startio)(struct writeback_info *wb), int max)
|
||||
{
|
||||
struct writeback_info *wb;
|
||||
bool found = false;
|
||||
bool ok;
|
||||
int res;
|
||||
|
||||
while (max-- > 0) {
|
||||
wb = qq_wb_fetch(q);
|
||||
res = -1;
|
||||
if (!wb)
|
||||
goto done;
|
||||
|
||||
found = true;
|
||||
|
||||
ok = startio(wb);
|
||||
if (unlikely(!ok)) {
|
||||
qq_wb_pushback(q, wb);
|
||||
q->q_output->did_pushback = true;
|
||||
res = 1;
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
res = 0;
|
||||
|
||||
done:
|
||||
if (found) {
|
||||
wake_up_interruptible(&q->q_output->event);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -1874,13 +1750,9 @@ void trans_logger_log(struct trans_logger_output *output)
|
|||
wait_event_interruptible_timeout(
|
||||
output->event,
|
||||
atomic_read(&output->q_phase1.q_queued) > 0 ||
|
||||
#ifdef USE_HIGHER_PHASES
|
||||
qq_is_ready(&output->q_phase2) ||
|
||||
#ifndef NEW_CODE
|
||||
qq_is_ready(&output->q_phase3) ||
|
||||
qq_is_ready(&output->q_phase4) ||
|
||||
#endif
|
||||
#endif
|
||||
(kthread_should_stop() && !_congested(output)),
|
||||
wait_timeout);
|
||||
|
||||
|
@ -1916,7 +1788,7 @@ void trans_logger_log(struct trans_logger_output *output)
|
|||
|
||||
/* This is highest priority, do it always.
|
||||
*/
|
||||
status = run_queue(output, &output->q_phase1, phase0_startio, output->q_phase1.q_batchlen);
|
||||
status = run_mref_queue(&output->q_phase1, phase0_startio, output->q_phase1.q_batchlen);
|
||||
if (status < 0) {
|
||||
#ifdef STAT_DEBUGGING
|
||||
wait_timeout = 10 * HZ;
|
||||
|
@ -1933,23 +1805,19 @@ void trans_logger_log(struct trans_logger_output *output)
|
|||
log_flush(&brick->logst);
|
||||
log_jiffies = 0;
|
||||
}
|
||||
#ifdef USE_HIGHER_PHASES
|
||||
#ifndef NEW_CODE
|
||||
|
||||
if (qq_is_ready(&output->q_phase4)) {
|
||||
(void)run_queue(output, &output->q_phase4, phase4_startio, output->q_phase4.q_batchlen);
|
||||
(void)run_wb_queue(&output->q_phase4, phase4_startio, output->q_phase4.q_batchlen);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (qq_is_ready(&output->q_phase2)) {
|
||||
(void)run_queue(output, &output->q_phase2, phase2_startio, output->q_phase2.q_batchlen);
|
||||
(void)run_mref_queue(&output->q_phase2, phase2_startio, output->q_phase2.q_batchlen);
|
||||
}
|
||||
|
||||
#ifndef NEW_CODE
|
||||
if (qq_is_ready(&output->q_phase3)) {
|
||||
status = run_queue(output, &output->q_phase3, phase3_startio, output->q_phase3.q_batchlen);
|
||||
status = run_wb_queue(&output->q_phase3, phase3_startio, output->q_phase3.q_batchlen);
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
if (output->did_pushback) {
|
||||
#if 0
|
||||
log_flush(&brick->logst);
|
||||
|
@ -2351,7 +2219,6 @@ int trans_logger_output_construct(struct trans_logger_output *output)
|
|||
output->q_phase3.q_dep = &output->q_phase4;
|
||||
output->q_phase4.q_dep = &output->q_phase1;
|
||||
|
||||
output->q_phase2.q_dep_plus = &provisionary_count;
|
||||
#endif
|
||||
output->q_phase1.q_insert_info = "q1_ins";
|
||||
output->q_phase1.q_pushback_info = "q1_push";
|
||||
|
|
|
@ -49,6 +49,7 @@ struct writeback_info {
|
|||
struct list_head w_sub_write_list; // for overwriting
|
||||
atomic_t w_sub_read_count;
|
||||
atomic_t w_sub_write_count;
|
||||
atomic_t w_sub_log_count;
|
||||
void (*read_endio)(struct generic_callback *cb);
|
||||
void (*write_endio)(struct generic_callback *cb);
|
||||
};
|
||||
|
@ -73,7 +74,6 @@ struct trans_logger_mref_aspect {
|
|||
struct timespec stamp;
|
||||
loff_t log_pos;
|
||||
struct generic_callback cb;
|
||||
struct trans_logger_mref_aspect *orig_mref_a;
|
||||
struct writeback_info *wb;
|
||||
struct list_head sub_list;
|
||||
struct list_head sub_head;
|
||||
|
|
Loading…
Reference in New Issue