import mars-105.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-05-13 12:19:28 +01:00
parent 9d1b483b97
commit b01cfa51e2
10 changed files with 456 additions and 96 deletions

View File

@ -56,7 +56,7 @@ void log_flush(struct log_status *logst)
struct generic_callback *cb;
int gap;
if (!mref)
if (!mref || !logst->count)
return;
gap = 0;
@ -94,6 +94,7 @@ void log_flush(struct log_status *logst)
GENERIC_INPUT_CALL(logst->input, mref_put, mref);
logst->offset = 0;
logst->count = 0;
logst->log_mref = NULL;
}
EXPORT_SYMBOL_GPL(log_flush);
@ -270,6 +271,7 @@ bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private
cb_info->endios[nr_endio] = endio;
cb_info->privates[nr_endio] = private;
logst->count++;
ok = true;
err:

View File

@ -93,6 +93,7 @@ struct log_status {
int chunk_size; // must be at least 8K (better 64k)
int io_prio;
// informational
int count;
loff_t log_pos;
// internal
struct mars_input *input;

7
mars.h
View File

@ -250,8 +250,8 @@ GENERIC_ASPECT_FUNCTIONS(mars,mref);
// MARS-specific memory allocation
extern void *mars_vmalloc(loff_t pos, int len);
extern void mars_vfree(void *data);
extern void *mars_alloc(loff_t pos, int len);
extern void mars_free(void *data, int len);
extern struct page *mars_iomap(void *data, int *offset, int *len);
/////////////////////////////////////////////////////////////////////////
@ -308,6 +308,9 @@ static const struct generic_aspect_type *BRICK##_aspect_types[BRICK_OBJ_MAX] = {
extern const struct meta mars_info_meta[];
extern const struct meta mars_mref_meta[];
extern int mars_digest_size;
extern void mars_digest(void *digest, void *data, int len);
/////////////////////////////////////////////////////////////////////////
extern struct mars_global *mars_global;

View File

@ -22,12 +22,42 @@
#define STRONG_MM
#define MEMLEAK // FIXME: remove this
//#define MEASURE_SYNC
#define MEASURE_SYNC 8
//#define USE_FSYNC
///////////////////////// own type definitions ////////////////////////
#include "mars_aio.h"
#ifdef MEASURE_SYNC
static int sync_ticks[MEASURE_SYNC] = {};
static void measure_sync(int ticks)
{
int order = ticks;
if (ticks > 1) {
order = MEASURE_SYNC - 1;
while (order > 0 && (1 << (order-1)) >= ticks) {
order--;
}
order++;
}
sync_ticks[order]++;
}
static char *show_sync(void)
{
char *res = kmalloc(256, GFP_MARS);
int i;
int pos = 0;
for (i = 0; i < MEASURE_SYNC; i++) {
pos += snprintf(res + pos, 256, "%d: %d ", i, sync_ticks[i]);
}
return res;
}
#endif
////////////////// some helpers //////////////////
static inline
@ -526,10 +556,14 @@ static int aio_sync_thread(void *data)
#ifdef MEASURE_SYNC
old_jiffies = jiffies;
#endif
#ifdef USE_FSYNC
err = vfs_fsync(file, file->f_path.dentry, 1);
#else
err = do_sync_mapping_range(file->f_mapping, 0, LLONG_MAX, SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER);
#endif
#ifdef MEASURE_SYNC
MARS_DBG("fdsync jiffies = %lld\n", jiffies - old_jiffies);
measure_sync(jiffies - old_jiffies);
#endif
output->fdsync_active = false;
wake_up_interruptible(&output->fdsync_event);
@ -572,17 +606,26 @@ char *aio_statistics(struct aio_brick *brick, int verbose)
{
struct aio_output *output = brick->outputs[0];
char *res = kmalloc(256, GFP_MARS);
char *sync = NULL;
if (!res)
return NULL;
#ifdef MEASURE_SYNC
sync = show_sync();
#endif
// FIXME: check for allocation overflows
sprintf(res, "total reads = %d writes = %d allocs = %d delays = %d msleeps = %d fdsyncs = %d fdsync_waits = %d | flying reads = %d writes = %d allocs = %d q0 = %d/%d q1 = %d/%d q2 = %d/%d\n",
sprintf(res, "total reads = %d writes = %d allocs = %d delays = %d msleeps = %d fdsyncs = %d fdsync_waits = %d | flying reads = %d writes = %d allocs = %d q0 = %d/%d q1 = %d/%d q2 = %d/%d | %s\n",
atomic_read(&output->total_read_count), atomic_read(&output->total_write_count), atomic_read(&output->total_alloc_count), atomic_read(&output->total_delay_count), atomic_read(&output->total_msleep_count), atomic_read(&output->total_fdsync_count), atomic_read(&output->total_fdsync_wait_count),
atomic_read(&output->read_count), atomic_read(&output->write_count), atomic_read(&output->alloc_count),
atomic_read(&output->tinfo[0].total_enqueue_count), atomic_read(&output->tinfo[0].total_dequeue_count),
atomic_read(&output->tinfo[1].total_enqueue_count), atomic_read(&output->tinfo[2].total_dequeue_count),
atomic_read(&output->tinfo[2].total_enqueue_count), atomic_read(&output->tinfo[2].total_dequeue_count));
atomic_read(&output->tinfo[2].total_enqueue_count), atomic_read(&output->tinfo[2].total_dequeue_count),
sync ? sync : "");
if (sync)
kfree(sync);
return res;
}

View File

@ -212,7 +212,7 @@ static int bio_ref_get(struct bio_output *output, struct mref_object *mref)
if (!mref->ref_data) { // buffered IO.
status = -ENOMEM;
mref->ref_data = mars_vmalloc(mref->ref_pos, mref->ref_len);
mref->ref_data = mars_alloc(mref->ref_pos, mref->ref_len);
if (unlikely(!mref->ref_data)) {
goto done;
}
@ -259,7 +259,7 @@ void bio_ref_put(struct bio_output *output, struct mref_object *mref)
}
if (mref_a->do_dealloc) {
MARS_IO("free page\n");
mars_vfree(mref->ref_data);
mars_free(mref->ref_data, mref->ref_len);
mref->ref_data = NULL;
}
bio_free_mref(mref);

View File

@ -26,27 +26,60 @@
// MARS-specific memory allocation
void *mars_vmalloc(loff_t pos, int len)
{
int offset;
void *data;
#define PERFORMANCE_WORKAROUND
#define MARS_MAX_ORDER 6
//#define USE_OFFSET
void *mars_alloc(loff_t pos, int len)
{
int offset = 0;
void *data;
#ifdef PERFORMANCE_WORKAROUND
int order = MARS_MAX_ORDER;
#endif
#ifdef USE_OFFSET
offset = pos & (PAGE_SIZE-1);
#endif
#ifdef PERFORMANCE_WORKAROUND
len += offset;
while (order > 0 && (PAGE_SIZE << (order-1)) >= len) {
order--;
}
data = (void*)__get_free_pages(GFP_MARS, order);
#else
data = __vmalloc(len + offset, GFP_MARS, PAGE_KERNEL_IO);
#endif
if (likely(data)) {
data += offset;
}
return data;
}
EXPORT_SYMBOL_GPL(mars_vmalloc);
EXPORT_SYMBOL_GPL(mars_alloc);
void mars_vfree(void *data)
void mars_free(void *data, int len)
{
int offset = ((unsigned long)data) & (PAGE_SIZE-1);
int offset = 0;
#ifdef PERFORMANCE_WORKAROUND
int order = MARS_MAX_ORDER;
#endif
if (!data) {
return;
}
#ifdef USE_OFFSET
offset = ((unsigned long)data) & (PAGE_SIZE-1);
#endif
data -= offset;
#ifdef PERFORMANCE_WORKAROUND
len += offset;
while (order > 0 && (PAGE_SIZE << (order-1)) >= len) {
order--;
}
__free_pages(virt_to_page((unsigned long)data), order);
#else
vfree(data);
#endif
}
EXPORT_SYMBOL_GPL(mars_vfree);
EXPORT_SYMBOL_GPL(mars_free);
struct page *mars_iomap(void *data, int *offset, int *len)
{
@ -345,6 +378,30 @@ int mars_lchown(const char *path, uid_t uid)
}
EXPORT_SYMBOL_GPL(mars_lchown);
#include <linux/crypto.h>
struct crypto_hash *mars_tfm = NULL;
int mars_digest_size = 0;
EXPORT_SYMBOL_GPL(mars_digest_size);
void mars_digest(void *digest, void *data, int len)
{
struct hash_desc desc = {
.tfm = mars_tfm,
.flags = 0,
};
struct scatterlist sg;
memset(digest, 0, mars_digest_size);
crypto_hash_init(&desc);
sg_init_table(&sg, 1);
sg_set_buf(&sg, data, len);
crypto_hash_update(&desc, &sg, sg.length);
crypto_hash_final(&desc, digest);
}
EXPORT_SYMBOL_GPL(mars_digest);
//////////////////////////////////////////////////////////////
// object stuff
@ -1345,8 +1402,29 @@ EXPORT_SYMBOL_GPL(mm_fake);
static int __init init_mars(void)
{
MARS_INF("init_mars()\n");
brick_obj_max = BRICK_OBJ_MAX;
mars_tfm = crypto_alloc_hash("md5", 0, CRYPTO_ALG_ASYNC);
if (!mars_tfm) {
MARS_ERR("cannot alloc crypto hash\n");
return -ENOMEM;
}
if (IS_ERR(mars_tfm)) {
MARS_ERR("alloc crypto hash failed, status = %d\n", PTR_ERR(mars_tfm));
return PTR_ERR(mars_tfm);
}
#if 0
if (crypto_tfm_alg_type(crypto_hash_tfm(mars_tfm)) != CRYPTO_ALG_TYPE_DIGEST) {
MARS_ERR("bad crypto hash type\n");
return -EINVAL;
}
#endif
mars_digest_size = crypto_hash_digestsize(mars_tfm);
MARS_INF("digest_size = %d\n", mars_digest_size);
set_fake();
#ifdef MARS_TRACING
{
int flags = O_CREAT | O_TRUNC | O_RDWR | O_LARGEFILE;
@ -1372,6 +1450,9 @@ static void __exit exit_mars(void)
kfree(id);
id = NULL;
}
if (mars_tfm) {
crypto_free_hash(mars_tfm);
}
put_fake();
#ifdef MARS_TRACING
if (mars_log_file) {

View File

@ -55,8 +55,8 @@ struct light_class {
#define CONF_TRANS_CHUNKSIZE (128 * 1024)
//#define CONF_TRANS_ALIGN 512
#define CONF_TRANS_ALIGN 0
//#define FLUSH_DELAY (HZ / 100 + 1)
#define FLUSH_DELAY 0
#define FLUSH_DELAY (HZ / 100 + 1)
//#define FLUSH_DELAY 0
//#define TRANS_FAKE
@ -64,9 +64,10 @@ struct light_class {
//#define CONF_TRANS_FLYING 4
#define CONF_TRANS_FLYING 128
#define CONF_TRANS_PRIO MARS_PRIO_HIGH
//#define CONF_TRANS_LOG_READS false
#define CONF_TRANS_LOG_READS true
#define CONF_TRANS_MINIMIZE_LATENCY true
#define CONF_TRANS_LOG_READS false
//#define CONF_TRANS_LOG_READS true
#define CONF_TRANS_MINIMIZE_LATENCY false
//#define CONF_TRANS_MINIMIZE_LATENCY true
//#define CONF_ALL_BATCHLEN 2
#define CONF_ALL_BATCHLEN 1
@ -951,7 +952,7 @@ void _create_new_logfile(const char *path)
}
static
int _update_replaylink(struct mars_dent *parent, int sequence, loff_t pos, bool check_exist)
int _update_replaylink(struct mars_dent *parent, int sequence, loff_t start_pos, loff_t end_pos, bool check_exist)
{
struct timespec now = {};
char *old;
@ -973,7 +974,7 @@ int _update_replaylink(struct mars_dent *parent, int sequence, loff_t pos, bool
status = -ENOMEM;
}
old = path_make("log-%09d-%s,%lld", sequence, my_id(), pos);
old = path_make("log-%09d-%s,%lld,%lld", sequence, my_id(), start_pos, end_pos - start_pos);
if (!old) {
goto out_old;
}
@ -997,6 +998,62 @@ out_old:
return status;
}
static
int _update_versionlink(struct mars_global *global, struct mars_dent *parent, int sequence, loff_t start_pos, loff_t end_pos)
{
char *prev;
struct mars_dent *prev_link;
char *prev_digest = NULL;
struct timespec now = {};
char *new = NULL;
int i;
int status = -EINVAL;
int len = 0;
char data[mars_digest_size + 96];
char digest[mars_digest_size];
char old[mars_digest_size * 2 + 2];
prev = path_make("%s/version-%09d-%s", parent->d_path, sequence-1, my_id());
if (unlikely(!prev)) {
goto out;
}
prev_link = mars_find_dent(global, parent->d_path);
if (likely(prev_link)) {
prev_digest = prev_link->new_link;
}
len = snprintf(data, sizeof(data), "%d,%lld,%lld,%s", sequence, start_pos, end_pos, prev_digest ? prev_digest : "");
MARS_DBG("data = '%s' len = %d\n", data, len);
mars_digest(digest, data, len);
for (i = 0; i < mars_digest_size; i++) {
sprintf(old + i * 2, "%02x", digest[i]);
}
new = path_make("%s/version-%09d-%s", parent->d_path, sequence, my_id());
if (!new) {
goto out;
}
get_lamport(&now);
status = mars_symlink(old, new, &now, 0);
if (status < 0) {
MARS_ERR("cannot create symlink '%s' -> '%s' status = %d\n", old, new, status);
} else {
MARS_DBG("make version symlink '%s' -> '%s' status = %d\n", old, new, status);
}
out:
if (new) {
kfree(new);
}
if (prev) {
kfree(prev);
}
return status;
}
/* This must be called once at every round of logfile checking.
*/
static
@ -1052,7 +1109,7 @@ int make_log_init(void *buf, struct mars_dent *parent)
goto done;
}
status = _parse_args(replay_link, replay_link->new_link, 2);
status = _parse_args(replay_link, replay_link->new_link, 3);
if (unlikely(status < 0)) {
goto done;
}
@ -1167,7 +1224,7 @@ done:
* ret == 3 : relevant for appending
*/
static
int _check_logging_status(struct mars_global *global, struct mars_dent *dent, long long *oldpos, long long *newpos)
int _check_logging_status(struct mars_global *global, struct mars_dent *dent, long long *oldpos_start, long long *oldpos_end, long long *newpos)
{
struct mars_dent *parent = dent->d_parent;
struct mars_rotate *rot = parent->d_private;
@ -1186,24 +1243,33 @@ int _check_logging_status(struct mars_global *global, struct mars_dent *dent, lo
goto done;
}
if (sscanf(rot->replay_link->d_argv[1], "%lld", oldpos) != 1) {
MARS_ERR("bad position argument '%s'\n", rot->replay_link->d_argv[1]);
if (sscanf(rot->replay_link->d_argv[1], "%lld", oldpos_start) != 1) {
MARS_ERR("bad start position argument '%s'\n", rot->replay_link->d_argv[1]);
status = -EINVAL;
goto done;
}
if (sscanf(rot->replay_link->d_argv[2], "%lld", oldpos_end) != 1) {
MARS_ERR("bad end position argument '%s'\n", rot->replay_link->d_argv[2]);
status = -EINVAL;
goto done;
}
*oldpos_end += *oldpos_start;
if (unlikely(*oldpos_end < *oldpos_start)) {
MARS_ERR("end_pos %lld < start_pos %lld\n", *oldpos_end, *oldpos_start);
}
if (unlikely(rot->aio_info.current_size < *oldpos_start)) {
MARS_ERR("oops, bad replay position attempted at logfile '%s' (file length %lld should never be smaller than requested position %lld, is your filesystem corrupted?) => please repair this by hand\n", rot->aio_dent->d_path, rot->aio_info.current_size, *oldpos_start);
status = -EINVAL;
goto done;
}
if (unlikely(rot->aio_info.current_size < *oldpos)) {
MARS_ERR("oops, bad replay position attempted in logfile '%s' (file length %lld should never be smaller than requested position %lld, is your filesystem corrupted?) => please repair this by hand\n", rot->aio_dent->d_path, rot->aio_info.current_size, *oldpos);
status = -EINVAL;
goto done;
}
if (rot->aio_info.current_size > *oldpos) {
MARS_DBG("transaction log replay is necessary on '%s' from %lld to %lld\n", rot->aio_dent->d_path, *oldpos, rot->aio_info.current_size);
if (rot->aio_info.current_size > *oldpos_start) {
MARS_DBG("transaction log replay is necessary on '%s' from %lld to %lld (dirty region ends at %lld)\n", rot->aio_dent->d_path, *oldpos_start, rot->aio_info.current_size, *oldpos_end);
*newpos = rot->aio_info.current_size;
status = 2;
} else if (rot->aio_info.current_size > 0) {
MARS_DBG("transaction log '%s' is already applied (would be usable for appending at position %lld, but a fresh log is needed for safety reasons)\n", rot->aio_dent->d_path, *oldpos);
MARS_DBG("transaction log '%s' is already applied (would be usable for appending at position %lld, but a fresh log is needed for safety reasons)\n", rot->aio_dent->d_path, *oldpos_start);
*newpos = rot->aio_info.current_size;
status = 1;
} else if (!rot->is_primary) {
@ -1231,6 +1297,7 @@ int make_log(void *buf, struct mars_dent *dent)
struct trans_logger_brick *trans_brick;
struct mars_dent *prev_log;
loff_t start_pos = 0;
loff_t dirty_pos = 0;
loff_t end_pos = 0;
int status = -EINVAL;
@ -1275,7 +1342,7 @@ int make_log(void *buf, struct mars_dent *dent)
/* Find current logging status.
*/
status = _check_logging_status(global, dent, &start_pos, &end_pos);
status = _check_logging_status(global, dent, &start_pos, &dirty_pos, &end_pos);
if (status < 0) {
goto done;
}
@ -1288,7 +1355,7 @@ int make_log(void *buf, struct mars_dent *dent)
* When primary, switch over to a new logfile.
*/
if (!trans_brick->power.button && !trans_brick->power.led_on && trans_brick->power.led_off) {
_update_replaylink(dent->d_parent, dent->d_serial + 1, 0, !rot->is_primary);
_update_replaylink(dent->d_parent, dent->d_serial + 1, 0, 0, !rot->is_primary);
trans_brick->current_pos = 0;
rot->last_jiffies = jiffies;
mars_trigger();
@ -1334,6 +1401,7 @@ static
int _start_trans(struct mars_rotate *rot)
{
struct trans_logger_brick *trans_brick = rot->trans_brick;
struct trans_logger_input *trans_input;
int status = 0;
if (trans_brick->power.button || !trans_brick->power.led_off) {
@ -1351,11 +1419,16 @@ int _start_trans(struct mars_rotate *rot)
MARS_ERR("log aio brick already present, this should not happen\n");
goto done;
}
trans_input = trans_brick->inputs[TL_INPUT_FW_LOG1];
if (unlikely(!trans_input)) {
MARS_ERR("log input does not exist\n");
goto done;
}
/* For safety, disconnect old connection first
*/
if (trans_brick->inputs[1]->connect) {
(void)generic_disconnect((void*)trans_brick->inputs[1]);
if (trans_input->connect) {
(void)generic_disconnect((void*)trans_input);
}
/* Open new transaction log
@ -1380,14 +1453,14 @@ int _start_trans(struct mars_rotate *rot)
/* Connect to new transaction log
*/
status = generic_connect((void*)trans_brick->inputs[1], (void*)rot->relevant_brick->outputs[0]);
status = generic_connect((void*)trans_input, (void*)rot->relevant_brick->outputs[0]);
if (status < 0) {
goto done;
}
/* Supply all relevant parameters
*/
trans_brick->sequence = rot->relevant_log->d_serial;
trans_input->sequence = rot->relevant_log->d_serial;
if ((trans_brick->do_replay = rot->do_replay)) {
trans_brick->replay_start_pos = rot->start_pos;
trans_brick->replay_end_pos = rot->end_pos;
@ -1450,14 +1523,44 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *parent)
/* Stopping is also possible in case of errors
*/
if (trans_brick->power.button && trans_brick->power.led_on && !trans_brick->power.led_off) {
bool do_stop = true;
if (trans_brick->do_replay) {
int i;
for (i = TL_INPUT_FW_LOG1; i <= TL_INPUT_FW_LOG2; i++) {
struct trans_logger_input *trans_input;
trans_input = trans_brick->inputs[i];
if (!trans_input) {
continue;
}
if (trans_input->replay_min_pos != trans_brick->replay_end_pos || trans_brick->replay_code == -EAGAIN) {
do_stop = false;
break;
}
}
} else {
do_stop = (rot->relevant_log && rot->relevant_log != rot->current_log);
}
#if 0 // old code
bool do_stop =
trans_brick->do_replay ?
(trans_brick->replay_pos == trans_brick->replay_end_pos || trans_brick->replay_code != -EAGAIN) :
(rot->relevant_log && rot->relevant_log != rot->current_log);
#endif
MARS_DBG("do_stop = %d\n", (int)do_stop);
if (do_stop || (long long)jiffies > rot->last_jiffies + 5 * HZ) {
status = _update_replaylink(parent, trans_brick->sequence, trans_brick->replay_pos, true);
struct trans_logger_input *old_input = NULL;
int i;
for (i = TL_INPUT_FW_LOG1; i <= TL_INPUT_FW_LOG2; i++) {
struct trans_logger_input *trans_input;
trans_input = trans_brick->inputs[i];
if (!trans_input || trans_input == old_input) {
continue;
}
status = _update_replaylink(parent, trans_input->sequence, trans_input->replay_min_pos, trans_input->replay_max_pos, true);
status = _update_versionlink(global, parent, trans_input->sequence, trans_input->replay_min_pos, trans_input->replay_max_pos);
old_input = trans_input;
}
rot->last_jiffies = jiffies;
}
if (do_stop) {
@ -1478,7 +1581,7 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *parent)
goto done;
}
}
/* Starting is only possible when no error ocurred.
/* Starting is only possible when no error occurred.
*/
if (!rot->relevant_log || rot->has_error) {
MARS_DBG("nothing to do\n");
@ -1847,6 +1950,7 @@ enum {
CL_SYNC,
CL__COPY,
CL__DIRECT,
CL_VERSION,
CL_REPLAYSTATUS,
CL_LOG,
CL_DEVICE,
@ -1998,6 +2102,18 @@ static const struct light_class light_classes[] = {
.cl_backward = kill_all,
},
/* Passive symlink indicating the split-brain crypto hash
*/
[CL_VERSION] = {
.cl_name = "version-",
.cl_len = 8,
.cl_type = 'l',
.cl_serial = true,
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Passive symlink indicating the last state of
* transaction log replay.
*/
@ -2007,10 +2123,8 @@ static const struct light_class light_classes[] = {
.cl_type = 'l',
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
#if 0
.cl_forward = make_replay,
.cl_backward = kill_all,
#endif
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Logfiles for transaction logger
*/

View File

@ -495,7 +495,7 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
#endif
// create a new master shadow
data = mars_vmalloc(mref->ref_pos, mref->ref_len);
data = mars_alloc(mref->ref_pos, mref->ref_len);
if (unlikely(!data)) {
return -ENOMEM;
}
@ -628,7 +628,7 @@ restart:
// we are a master shadow
CHECK_PTR(mref_a->shadow_data, err);
if (mref_a->do_dealloc) {
mars_vfree(mref_a->shadow_data);
mars_free(mref_a->shadow_data, mref->ref_len);
mref_a->shadow_data = NULL;
mref_a->do_dealloc = false;
}
@ -779,25 +779,37 @@ err:
////////////////////////////// writeback info //////////////////////////////
/* save final completion status when necessary
*/
static noinline
void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
{
struct trans_logger_output *output = orig_mref_a->my_output;
struct trans_logger_brick *brick = output->brick;
struct trans_logger_input *input = orig_mref_a->log_input;
struct list_head *tmp;
unsigned long flags;
// save final completion status
traced_lock(&brick->pos_lock, flags);
atomic_inc(&brick->total_writeback_count);
if (unlikely(!input)) {
MARS_ERR("cannot tell what input I am operating on\n");
}
tmp = &orig_mref_a->pos_head;
if (tmp == brick->pos_list.next) {
traced_lock(&brick->pos_lock, flags);
// am I the first member? (means "youngest" list entry)
if (tmp == brick->pos_list.next && input) {
loff_t finished = orig_mref_a->log_pos;
if (finished <= brick->replay_pos) {
MARS_ERR("backskip in log replay: %lld -> %lld\n", brick->replay_pos, orig_mref_a->log_pos);
MARS_INF("finished = %lld\n", finished);
if (finished <= input->replay_min_pos) {
MARS_ERR("backskip in log replay: %lld -> %lld\n", input->replay_min_pos, orig_mref_a->log_pos);
}
brick->replay_pos = finished;
input->replay_min_pos = finished;
}
list_del_init(tmp);
atomic_dec(&brick->pos_count);
traced_unlock(&brick->pos_lock, flags);
}
@ -915,11 +927,16 @@ static noinline
struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t pos, int len)
{
struct trans_logger_brick *brick = output->brick;
struct writeback_info *wb = kzalloc(sizeof(struct writeback_info), GFP_MARS);
struct writeback_info *wb;
struct trans_logger_input *log_input;
struct trans_logger_input *read_input;
struct trans_logger_input *write_input;
struct list_head *tmp;
int write_input_nr;
/* Allocate structure representing a bunch of adjacent writebacks
*/
wb = kzalloc(sizeof(struct writeback_info), GFP_MARS);
if (!wb) {
goto err;
}
@ -948,6 +965,9 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
MARS_ERR("len = %d\n", len);
}
/* Determine the "channels" we want to operate on
*/
log_input = brick->inputs[TL_INPUT_FW_LOG1];
read_input = brick->inputs[TL_INPUT_READ];
write_input_nr = TL_INPUT_WRITEBACK;
write_input = brick->inputs[write_input_nr];
@ -956,6 +976,15 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
write_input = read_input;
}
/* Assign input.
*/
for (tmp = wb->w_collect_list.next; tmp != &wb->w_collect_list; tmp = tmp->next) {
struct trans_logger_mref_aspect *orig_mref_a;
orig_mref_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head);
orig_mref_a->log_input = log_input;
}
/* Create sub_mrefs for read of old disk version (phase2)
*/
if (brick->log_reads) {
@ -1004,30 +1033,31 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
len = wb->w_len;
}
/* Create sub_mrefs for writeback (phase4)
/* Always create sub_mrefs for writeback (phase4)
*/
while (len > 0) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
struct trans_logger_mref_aspect *base_mref_a;
struct mref_object *base_mref;
struct trans_logger_mref_aspect *orig_mref_a;
struct mref_object *orig_mref;
void *data;
int this_len = len;
int diff;
int status;
base_mref_a = _hash_find(&wb->w_collect_list, pos, &this_len, true);
if (unlikely(!base_mref_a)) {
orig_mref_a = _hash_find(&wb->w_collect_list, pos, &this_len, true);
if (unlikely(!orig_mref_a)) {
MARS_FAT("could not find data\n");
goto err;
}
base_mref = base_mref_a->object;
diff = pos - base_mref->ref_pos;
orig_mref = orig_mref_a->object;
diff = pos - orig_mref->ref_pos;
if (unlikely(diff < 0)) {
MARS_FAT("bad diff %d\n", diff);
goto err;
}
data = base_mref_a->shadow_data + diff;
data = orig_mref_a->shadow_data + diff;
sub_mref = trans_logger_alloc_mref(&write_input->hidden_output, &write_input->sub_layout);
if (unlikely(!sub_mref)) {
@ -1048,7 +1078,9 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
sub_mref_a = trans_logger_mref_get_aspect(&write_input->hidden_output, sub_mref);
CHECK_PTR(sub_mref_a, err);
sub_mref_a->orig_mref_a = orig_mref_a;
sub_mref_a->my_input = write_input;
sub_mref_a->log_input = log_input;
sub_mref_a->my_output = &write_input->hidden_output;
sub_mref_a->wb = wb;
@ -1081,11 +1113,12 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
}
static inline
void _fire_one(struct list_head *tmp, bool do_put)
void _fire_one(struct list_head *tmp, bool do_update, bool do_put)
{
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
struct trans_logger_input *sub_input;
struct trans_logger_input *log_input;
struct generic_callback *cb;
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
@ -1099,6 +1132,19 @@ void _fire_one(struct list_head *tmp, bool do_put)
sub_mref->ref_cb = cb;
sub_input = sub_mref_a->my_input;
log_input = sub_mref_a->log_input;
if (do_update) {
struct trans_logger_mref_aspect *orig_mref_a = sub_mref_a->orig_mref_a;
if (unlikely(!orig_mref_a)) {
MARS_ERR("internal problem\n");
} else {
loff_t max_pos = orig_mref_a->log_pos;
if (log_input->replay_max_pos < max_pos) {
log_input->replay_max_pos = max_pos;
}
}
}
GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref);
if (do_put) {
@ -1107,22 +1153,41 @@ void _fire_one(struct list_head *tmp, bool do_put)
}
static inline
void fire_writeback(struct writeback_info *wb, struct list_head *start, bool do_remove)
void fire_writeback(struct writeback_info *wb, struct list_head *start, bool do_update, bool do_remove)
{
struct list_head *tmp;
if (do_remove) {
while ((tmp = start->next) != start) {
list_del_init(tmp);
_fire_one(tmp, true);
_fire_one(tmp, do_update, true);
}
} else {
for (tmp = start->next; tmp != start; tmp = tmp->next) {
_fire_one(tmp, false);
_fire_one(tmp, do_update, false);
}
}
}
static inline
void put_list(struct writeback_info *wb, struct list_head *start)
{
struct list_head *tmp;
while ((tmp = start->next) != start) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
struct trans_logger_input *sub_input;
list_del_init(tmp);
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
sub_input = sub_mref_a->my_input;
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
}
}
////////////////////////////// worker thread //////////////////////////////
@ -1216,7 +1281,17 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a)
orig_mref_a->log_pos = logst->log_pos + logst->offset;
traced_lock(&brick->pos_lock, flags);
#if 1
if (!list_empty(&brick->pos_list)) {
struct trans_logger_mref_aspect *last_mref_a;
last_mref_a = container_of(brick->pos_list.prev, struct trans_logger_mref_aspect, pos_head);
if (last_mref_a->log_pos >= orig_mref_a->log_pos) {
MARS_ERR("backskip in pos_list, %lld >= %lld\n", last_mref_a->log_pos, orig_mref_a->log_pos);
}
}
#endif
list_add_tail(&orig_mref_a->pos_head, &brick->pos_list);
atomic_inc(&brick->pos_count);
traced_unlock(&brick->pos_lock, flags);
qq_inc_flying(&brick->q_phase1);
@ -1404,7 +1479,7 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
if (output->brick->log_reads) {
qq_inc_flying(&brick->q_phase2);
fire_writeback(wb, &wb->w_sub_read_list, false);
fire_writeback(wb, &wb->w_sub_read_list, false, false);
} else { // shortcut
#ifdef LATER
qq_wb_insert(&brick->q_phase4, wb);
@ -1599,10 +1674,11 @@ void phase4_endio(struct generic_callback *cb)
hash_put_all(brick, &wb->w_collect_list);
qq_dec_flying(&brick->q_phase4);
atomic_inc(&brick->total_writeback_cluster_count);
free_writeback(wb);
//wake_up_interruptible(&brick->event);
wake_up_interruptible(&brick->event);
return;
@ -1636,7 +1712,7 @@ bool phase4_startio(struct writeback_info *wb)
/* Start writeback IO
*/
qq_inc_flying(&wb->w_output->brick->q_phase4);
fire_writeback(wb, &wb->w_sub_write_list, true);
fire_writeback(wb, &wb->w_sub_write_list, true, true);
return true;
}
@ -1760,6 +1836,7 @@ void trans_logger_log(struct trans_logger_output *output)
struct trans_logger_input *bw_input;
struct log_status *fw_logst;
struct log_status *bw_logst;
loff_t start_pos;
long wait_timeout = HZ;
#ifdef STAT_DEBUGGING
long long last_jiffies = jiffies;
@ -1786,13 +1863,17 @@ void trans_logger_log(struct trans_logger_output *output)
init_logst(bw_logst, (void*)bw_input, (void*)&bw_input->hidden_output, 0);
}
brick->replay_pos = brick->current_pos = brick->log_start_pos;
fw_logst->log_pos = brick->current_pos;
start_pos = brick->log_start_pos;
brick->current_pos = start_pos;
fw_input->replay_min_pos = start_pos;
fw_input->replay_max_pos = start_pos; // FIXME: Theoretically, this could be wrong when starting on an interrupted replay / inconsistent system. However, we normally never start ordinary logging in such a case (possibly except some desperate emergency cases when there really is no other chance, such as physical loss of transaction logs). Nevertheless, better use old consistenty information from the FS here.
fw_logst->log_pos = start_pos;
mars_power_led_on((void*)brick, true);
while (!kthread_should_stop() || _congested(brick)) {
long long old_jiffies = jiffies;
long old_wait_timeout;
struct condition_status st = {};
#if 1
long long j0;
@ -1810,6 +1891,8 @@ void trans_logger_log(struct trans_logger_output *output)
_condition(&st, brick),
wait_timeout);
atomic_inc(&brick->total_round_count);
#if 1
j0 = jiffies;
orig = st.q1_ready | st.q2_ready | st.q3_ready | st.q4_ready | st.extra_ready;
@ -1856,28 +1939,29 @@ void trans_logger_log(struct trans_logger_output *output)
/* A kind of delayed plugging mechanism
*/
old_wait_timeout = wait_timeout;
wait_timeout = HZ / 10; // 100ms before flushing
#ifdef CONFIG_DEBUG_KERNEL // debug override for catching long blocks
wait_timeout = 16 * HZ;
#endif
wait_timeout = 1;
if (brick->did_work) {
wait_timeout = 0; // start over immediately
} else if (brick->minimize_latency || (long long)jiffies - old_jiffies >= wait_timeout) {
atomic_inc(&brick->total_restart_count);
wait_timeout = brick->flush_delay; // start over soon
} else if ((bw_logst->count > 0 || bw_logst->count > 0) &&
atomic_read(&brick->q_phase1.q_queued) <= 0 &&
(brick->minimize_latency || (long long)jiffies - old_jiffies >= old_wait_timeout)) {
/* Calling log_flush() too often may result in
* increased overhead (and thus in lower throughput).
* OTOH, calling it too seldom may hold back
* IO completion for the end user for some time.
* Play around with wait_timeout to optimize this.
*/
atomic_inc(&brick->total_flush_count);
log_flush(fw_logst);
if (bw_logst != fw_logst) {
log_flush(bw_logst);
}
}
#if 1
log_flush(fw_logst);
#endif
#if 1
{
int delta = (long long)jiffies - j0;
@ -2072,6 +2156,8 @@ void trans_logger_replay(struct trans_logger_output *output)
{
struct trans_logger_brick *brick = output->brick;
struct trans_logger_input *input = brick->inputs[TL_INPUT_FW_LOG1];
loff_t start_pos;
loff_t finished_pos;
bool has_triggered = false;
brick->replay_code = -EAGAIN; // indicates "running"
@ -2082,7 +2168,11 @@ void trans_logger_replay(struct trans_logger_output *output)
input->logst.chunk_size = brick->chunk_size;
init_logst(&input->logst, (void*)input, (void*)&input->hidden_output, brick->replay_start_pos);
brick->replay_pos = brick->current_pos = input->logst.log_pos;
start_pos = input->logst.log_pos;
brick->current_pos = start_pos;
input->replay_min_pos = start_pos;
input->replay_max_pos = start_pos; // FIXME: this is wrong.
mars_power_led_on((void*)brick, true);
for (;;) {
@ -2125,19 +2215,25 @@ void trans_logger_replay(struct trans_logger_output *output)
// do this _after_ any opportunities for errors...
if (atomic_read(&brick->replay_count) <= 0) {
brick->replay_pos = brick->current_pos = input->logst.log_pos + input->logst.offset;
finished_pos = input->logst.log_pos + input->logst.offset;
brick->current_pos = finished_pos;
input->replay_min_pos = finished_pos;
input->replay_max_pos = finished_pos; // FIXME
}
}
wait_event_interruptible_timeout(brick->event, atomic_read(&brick->replay_count) <= 0, 60 * HZ);
brick->replay_pos = brick->current_pos = input->logst.log_pos + input->logst.offset;
finished_pos = input->logst.log_pos + input->logst.offset;
brick->current_pos = finished_pos;
input->replay_min_pos = finished_pos;
input->replay_max_pos = finished_pos; // FIXME
if (brick->replay_pos == brick->replay_end_pos) {
MARS_INF("replay finished at %lld\n", brick->replay_pos);
if (finished_pos == brick->replay_end_pos) {
MARS_INF("replay finished at %lld\n", finished_pos);
brick->replay_code = 0;
} else {
MARS_INF("replay stopped prematurely at %lld (of %lld)\n", brick->replay_pos, brick->replay_end_pos);
MARS_INF("replay stopped prematurely at %lld (of %lld)\n", finished_pos, brick->replay_end_pos);
if (brick->replay_code == -EAGAIN)
brick->replay_code = -EIO;
}
@ -2215,9 +2311,9 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
// FIXME: check for allocation overflows
sprintf(res, "total callbacks = %d reads=%d writes=%d writeback=%d shortcut=%d (%d%%) mshadow=%d sshadow=%d phase1=%d phase2=%d phase3=%d phase4=%d | mshadow=%d sshadow=%d hash_count=%d balance=%d/%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n",
atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total),
atomic_read(&brick->mshadow_count), atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying));
sprintf(res, "total callbacks = %d reads=%d writes=%d flushes=%d (%d%%) wb_clusters=%d writebacks=%d (%d%%) shortcut=%d (%d%%) mshadow=%d sshadow=%d rounds=%d restarts=%d phase1=%d phase2=%d phase3=%d phase4=%d | mshadow=%d sshadow=%d hash_count=%d pos_count=%d balance=%d/%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n",
atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_flush_count), atomic_read(&brick->total_write_count) ? atomic_read(&brick->total_flush_count) * 100 / atomic_read(&brick->total_write_count) : 0, atomic_read(&brick->total_writeback_cluster_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_writeback_cluster_count) ? atomic_read(&brick->total_writeback_count) * 100 / atomic_read(&brick->total_writeback_cluster_count) : 0, atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total),
atomic_read(&brick->mshadow_count), atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying));
return res;
}
@ -2227,10 +2323,14 @@ void trans_logger_reset_statistics(struct trans_logger_brick *brick)
atomic_set(&brick->total_cb_count, 0);
atomic_set(&brick->total_read_count, 0);
atomic_set(&brick->total_write_count, 0);
atomic_set(&brick->total_flush_count, 0);
atomic_set(&brick->total_writeback_count, 0);
atomic_set(&brick->total_writeback_cluster_count, 0);
atomic_set(&brick->total_shortcut_count, 0);
atomic_set(&brick->total_mshadow_count, 0);
atomic_set(&brick->total_sshadow_count, 0);
atomic_set(&brick->total_round_count, 0);
atomic_set(&brick->total_restart_count, 0);
}
@ -2347,6 +2447,9 @@ static const struct trans_logger_input_type *trans_logger_input_types[] = {
&trans_logger_input_type,
&trans_logger_input_type,
&trans_logger_input_type,
&trans_logger_input_type,
&trans_logger_input_type,
&trans_logger_input_type,
};
const struct trans_logger_output_type trans_logger_output_type = {
@ -2367,7 +2470,7 @@ static const struct trans_logger_output_type *trans_logger_output_types[] = {
const struct trans_logger_brick_type trans_logger_brick_type = {
.type_name = "trans_logger_brick",
.brick_size = sizeof(struct trans_logger_brick),
.max_inputs = 3,
.max_inputs = TL_INPUT_NR,
.max_outputs = 1,
.master_ops = &trans_logger_brick_ops,
.default_input_types = trans_logger_input_types,

View File

@ -34,13 +34,14 @@ struct logger_head {
////////////////////////////////////////////////////////////////////
#if 0
#define TL_INPUT_READ 0
#define TL_INPUT_WRITEBACK 1
#define TL_INPUT_FW_LOG1 2
#define TL_INPUT_FW_LOG2 3
#define TL_INPUT_BW_LOG1 4
#define TL_INPUT_BW_LOG2 5
#define TL_INPUT_COUNT 6
#define TL_INPUT_NR 6
#else
@ -50,7 +51,7 @@ struct logger_head {
#define TL_INPUT_FW_LOG2 1
#define TL_INPUT_BW_LOG1 1
#define TL_INPUT_BW_LOG2 1
#define TL_INPUT_COUNT 2
#define TL_INPUT_NR 2
#endif
@ -79,6 +80,7 @@ struct trans_logger_mref_aspect {
GENERIC_ASPECT(mref);
struct trans_logger_output *my_output;
struct trans_logger_input *my_input;
struct trans_logger_input *log_input;
struct logger_head lh;
struct list_head hash_head;
//struct list_head q_head;
@ -87,6 +89,7 @@ struct trans_logger_mref_aspect {
struct list_head collect_head;
struct pairing_heap_logger ph;
struct trans_logger_mref_aspect *shadow_ref;
struct trans_logger_mref_aspect *orig_mref_a;
void *shadow_data;
bool do_dealloc;
bool do_buffered;
@ -106,7 +109,6 @@ struct trans_logger_mref_aspect {
struct trans_logger_brick {
MARS_BRICK(trans_logger);
// parameters
int sequence; // logfile sequence number
int limit_congest;// limit phase1 congestion.
int align_size; // alignment between requests
int chunk_size; // must be at least 8K (better 64k)
@ -120,7 +122,6 @@ struct trans_logger_brick {
loff_t replay_end_pos; // end of replay
loff_t log_start_pos; // where to start logging
// readonly from outside
loff_t replay_pos; // current replay position (both in replay mode and in logging mode)
loff_t current_pos; // current logging position (usually ahead of replay_pos)
int replay_code; // replay errors (if any)
// private
@ -135,6 +136,7 @@ struct trans_logger_brick {
atomic_t replay_count;
atomic_t fly_count;
atomic_t hash_count;
atomic_t pos_count;
atomic_t mshadow_count;
atomic_t sshadow_count;
atomic_t outer_balance_count;
@ -144,10 +146,14 @@ struct trans_logger_brick {
atomic_t total_cb_count;
atomic_t total_read_count;
atomic_t total_write_count;
atomic_t total_flush_count;
atomic_t total_writeback_count;
atomic_t total_writeback_cluster_count;
atomic_t total_shortcut_count;
atomic_t total_mshadow_count;
atomic_t total_sshadow_count;
atomic_t total_round_count;
atomic_t total_restart_count;
// queues
struct logger_queue q_phase1;
struct logger_queue q_phase2;
@ -164,6 +170,13 @@ struct trans_logger_output {
struct trans_logger_input {
MARS_INPUT(trans_logger);
// parameters
int sequence; // logfile sequence number
// readonly from outside
loff_t replay_min_pos; // current replay position (both in replay mode and in logging mode)
loff_t replay_max_pos; // dito, indicating the "dirty" area which could be potentially "inconsistent"
// private
struct generic_object_layout sub_layout;
struct trans_logger_output hidden_output;
struct log_status logst;

View File

@ -216,7 +216,7 @@ sub create_res {
if($create) {
symlink($host, "$tmp/primary") or die "cannot create primary symlink\n";
symlink("log-000000001-$host,0", "$tmp/replay-$host") or die "cannot create replay status\n";
symlink("log-000000001-$host,0,0", "$tmp/replay-$host") or die "cannot create replay status\n";
rename($tmp, "$mars/resource-$res") or die "cannot finalize resource '$res'\n";
print "successfully created resource '$res'\n";
} else {