import mars-110.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-06-10 14:57:52 +01:00
parent eed34b465c
commit 9e6af72a34
10 changed files with 351 additions and 161 deletions

6
mars.h
View File

@ -3,6 +3,7 @@
#define MARS_H
#include <linux/semaphore.h>
#include <linux/rwsem.h>
//#define MARS_TRACING // write runtime trace data to /mars/trace.csv
@ -351,6 +352,7 @@ extern char *my_id(void);
int d_serial; /* for pre-grouping order */ \
int d_version; /* dynamic programming per call of mars_ent_work() */ \
char d_once_error; \
bool d_killme; \
struct kstat new_stat; \
struct kstat old_stat; \
char *new_link; \
@ -368,7 +370,8 @@ extern const struct meta mars_kstat_meta[];
extern const struct meta mars_dent_meta[];
struct mars_global {
struct semaphore mutex;
struct rw_semaphore dent_mutex;
struct rw_semaphore brick_mutex;
struct generic_switch global_power;
struct list_head dent_anchor;
struct list_head brick_anchor;
@ -381,7 +384,6 @@ typedef int (*mars_dent_checker)(struct mars_dent *parent, const char *name, int
typedef int (*mars_dent_worker)(struct mars_global *global, struct mars_dent *dent, bool direction);
extern int mars_dent_work(struct mars_global *global, char *dirname, int allocsize, mars_dent_checker checker, mars_dent_worker worker, void *buf, int maxdepth);
extern struct mars_dent *_mars_find_dent(struct mars_global *global, const char *path);
extern struct mars_dent *mars_find_dent(struct mars_global *global, const char *path);
extern void mars_kill_dent(struct mars_dent *dent);
extern void mars_free_dent(struct mars_dent *dent);

View File

@ -137,7 +137,7 @@ static int aio_ref_get(struct aio_output *output, struct mref_object *mref)
if (!mref->ref_may_write) {
loff_t len = total_size - mref->ref_pos;
if (unlikely(len <= 0)) {
/* Allow reads starting _exactly_ at EOF when a timeout is specified (special case).
/* Special case: allow reads starting _exactly_ at EOF when a timeout is specified.
*/
if (len < 0 || mref->ref_timeout <= 0) {
MARS_DBG("ENODATA %lld\n", len);

View File

@ -93,7 +93,8 @@ int _determine_input(struct copy_brick *brick, struct mref_object *mref)
return INPUT_A_IO;
}
#define MAKE_INDEX(pos) (((pos) / COPY_CHUNK) % MAX_COPY_PARA)
#define GET_INDEX(pos) (((pos) / COPY_CHUNK) % MAX_COPY_PARA)
#define GET_OFFSET(pos) ((pos) % COPY_CHUNK)
static
void copy_endio(struct generic_callback *cb)
@ -101,8 +102,10 @@ void copy_endio(struct generic_callback *cb)
struct copy_mref_aspect *mref_a;
struct mref_object *mref;
struct copy_brick *brick;
struct copy_state *st;
int index;
int queue;
int error = 0;
mref_a = cb->cb_private;
CHECK_PTR(mref_a, err);
@ -112,28 +115,34 @@ void copy_endio(struct generic_callback *cb)
CHECK_PTR(brick, err);
queue = mref_a->queue;
index = MAKE_INDEX(mref->ref_pos);
index = GET_INDEX(mref->ref_pos);
st = &brick->st[index];
MARS_IO("queue = %d index = %d pos = %lld status = %d\n", queue, index, mref->ref_pos, cb->cb_error);
if (unlikely(queue < 0 || queue >= 2)) {
MARS_ERR("bad queue %d\n", queue);
_clash(brick);
error = -EINVAL;
goto exit;
}
if (unlikely(brick->table[index][queue])) {
MARS_ERR("table corruption at %d %d (%p => %p)\n", index, queue, brick->table[index], mref);
_clash(brick);
brick->state[index] = -EINVAL;
if (unlikely(st->table[queue])) {
MARS_ERR("table corruption at %d %d (%p => %p)\n", index, queue, st->table[queue], mref);
error = -EEXIST;
goto exit;
}
if (unlikely(cb->cb_error < 0)) {
MARS_ERR("IO error %d on index %d, old state =%d\n", cb->cb_error, index, brick->state[index]);
brick->state[index] = cb->cb_error;
} else if (likely(brick->state[index] > 0)) {
brick->table[index][queue] = mref;
MARS_ERR("IO error %d on index %d, old state = %d\n", cb->cb_error, index, st->state);
error = cb->cb_error;
} else if (likely(!st->error)) {
st->table[queue] = mref;
}
exit:
if (unlikely(error)) {
st->error = error;
_clash(brick);
}
atomic_dec(&brick->copy_flight);
st->active = false;
brick->trigger = true;
wake_up_interruptible(&brick->event);
return;
@ -149,6 +158,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
struct copy_mref_aspect *mref_a;
struct copy_input *input;
loff_t tmp_pos;
int offset;
int len;
int status = -1;
@ -173,7 +183,8 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
mref->ref_rw = rw;
mref->ref_data = data;
mref->ref_pos = pos;
len = COPY_CHUNK - (pos & (COPY_CHUNK-1));
offset = GET_OFFSET(pos);
len = COPY_CHUNK - offset;
if (pos + len > tmp_pos) {
len = tmp_pos - pos;
}
@ -191,14 +202,14 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
goto done;
}
if (unlikely(mref->ref_len < len)) {
MARS_ERR("shorten len %d < %d\n", mref->ref_len, len);
//FIXME: handle this case
status = -EAGAIN;
MARS_DBG("shorten len %d < %d\n", mref->ref_len, len);
}
MARS_IO("queue = %d index = %d pos = %lld len = %d rw = %d\n", queue, index, mref->ref_pos, mref->ref_len, rw);
atomic_inc(&brick->copy_flight);
brick->st[index].len = mref->ref_len;
brick->st[index].active = true;
GENERIC_INPUT_CALL(input, mref_io, mref);
done:
@ -208,12 +219,12 @@ done:
static
void _clear_mref(struct copy_brick *brick, int index, int queue)
{
struct mref_object *mref = brick->table[index][queue];
struct mref_object *mref = brick->st[index].table[queue];
if (mref) {
struct copy_input *input;
input = queue ? brick->inputs[INPUT_B_COPY] : brick->inputs[INPUT_A_COPY];
GENERIC_INPUT_CALL(input, mref_put, mref);
brick->table[index][queue] = NULL;
brick->st[index].table[queue] = NULL;
}
}
@ -231,17 +242,18 @@ void _update_percent(struct copy_brick *brick)
}
static
int _next_state(struct copy_brick *brick, loff_t pos)
int _next_state(struct copy_brick *brick, int index, loff_t pos)
{
struct mref_object *mref1;
struct mref_object *mref2;
int index = MAKE_INDEX(pos);
struct copy_state *st;
char state;
char next_state;
int i;
int status;
state = brick->state[index];
st = &brick->st[index];
state = st->state;
next_state = -1;
mref2 = NULL;
status = 0;
@ -250,7 +262,7 @@ int _next_state(struct copy_brick *brick, loff_t pos)
switch (state) {
case COPY_STATE_START:
if (brick->table[index][0] || brick->table[index][1]) {
if (st->table[0] || st->table[1]) {
MARS_ERR("index %d not startable\n", index);
status = -EPROTO;
goto done;
@ -269,13 +281,13 @@ int _next_state(struct copy_brick *brick, loff_t pos)
}
break;
case COPY_STATE_READ2:
mref2 = brick->table[index][1];
mref2 = st->table[1];
if (!mref2) {
goto done;
}
/* fallthrough */
case COPY_STATE_READ1:
mref1 = brick->table[index][0];
mref1 = st->table[0];
if (!mref1) {
goto done;
}
@ -288,7 +300,7 @@ int _next_state(struct copy_brick *brick, loff_t pos)
!memcmp(mref1->ref_data, mref2->ref_data, len)) {
/* skip start of writing, goto final treatment of writeout */
next_state = COPY_STATE_WRITE;
brick->state[index] = next_state;
st->state = next_state;
goto COPY_STATE_WRITE;
}
_clear_mref(brick, index, 1);
@ -300,15 +312,13 @@ int _next_state(struct copy_brick *brick, loff_t pos)
break;
case COPY_STATE_WRITE:
COPY_STATE_WRITE:
mref2 = brick->table[index][1];
mref2 = st->table[1];
if (!mref2 || brick->copy_start != pos) {
MARS_IO("irrelevant\n");
goto done;
}
if (!brick->clash) {
brick->copy_start += mref2->ref_len;
MARS_IO("new copy_start = %lld\n", brick->copy_start);
_update_percent(brick);
if (!brick->clash && mref2->ref_len == COPY_CHUNK) {
st->finished = true;
}
next_state = COPY_STATE_CLEANUP;
/* fallthrough */
@ -323,9 +333,9 @@ int _next_state(struct copy_brick *brick, loff_t pos)
status = -EILSEQ;
}
brick->state[index] = next_state;
st->state = next_state;
if (status < 0) {
brick->state[index] = -1;
st->error = status;
MARS_ERR("status = %d\n", status);
_clash(brick);
}
@ -339,7 +349,7 @@ void _run_copy(struct copy_brick *brick)
{
int max;
loff_t pos;
int i;
loff_t last = 0;
int status;
if (_clear_clash(brick)) {
@ -352,22 +362,41 @@ void _run_copy(struct copy_brick *brick)
msleep(50);
return;
}
for (i = 0; i < MAX_COPY_PARA; i++) {
brick->table[i][0] = NULL;
brick->table[i][1] = NULL;
brick->state[i] = COPY_STATE_START;
}
memset(brick->st, 0, sizeof(brick->st));
}
/* Do at most max iterations in the below loop
*/
max = MAX_COPY_PARA - atomic_read(&brick->io_flight) * 2;
MARS_IO("max = %d\n", max);
for (pos = brick->copy_start; pos < brick->copy_end || brick->append_mode > 1; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) {
int index = GET_INDEX(pos);
struct copy_state *st = &brick->st[index];
//MARS_IO("pos = %lld\n", pos);
if (brick->clash || max-- <= 0 || kthread_should_stop()) {
break;
}
status = _next_state(brick, pos);
if (!st->skip) {
status = _next_state(brick, index, pos);
last = pos;
}
}
if (!brick->clash) {
int count = 0;
for (pos = brick->copy_start; pos < last; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) {
int index = GET_INDEX(pos);
struct copy_state *st = &brick->st[index];
if (!st->finished)
break;
count += COPY_CHUNK;
st->finished = false;
}
if (count > 0) {
brick->copy_start += count;
MARS_IO("new copy_start = %lld\n", brick->copy_start);
_update_percent(brick);
}
}
}
@ -382,13 +411,15 @@ static int _copy_thread(void *data)
while (!kthread_should_stop()) {
loff_t old_start = brick->copy_start;
loff_t old_end = brick->copy_end;
if (old_end > 0)
if (old_end > 0) {
_run_copy(brick);
msleep(10); // yield FIXME: remove this, use event handling for over/underflow
}
wait_event_interruptible_timeout(brick->event,
brick->trigger || brick->copy_start != old_start || brick->copy_end != old_end || kthread_should_stop(),
20 * HZ);
5 * HZ);
brick->trigger = false;
}

View File

@ -11,15 +11,26 @@
#define INPUT_B_COPY 3
//#define COPY_CHUNK (64 * 1024)
#define COPY_CHUNK PAGE_SIZE
#define MAX_COPY_PARA (10 * 1024 * 1024 / COPY_CHUNK)
#define COPY_CHUNK (PAGE_SIZE)
#define MAX_COPY_PARA (4 * 1024 * 1024 / COPY_CHUNK)
enum {
COPY_STATE_START = 0,
COPY_STATE_READ1 = 1,
COPY_STATE_READ2 = 2,
COPY_STATE_WRITE,
COPY_STATE_CLEANUP,
COPY_STATE_START = 0,
COPY_STATE_READ1 = 1,
COPY_STATE_READ2 = 2,
COPY_STATE_WAIT_A = 3,
COPY_STATE_WRITE = 4,
COPY_STATE_WAIT_B = 5,
COPY_STATE_CLEANUP = 6,
};
struct copy_state {
struct mref_object *table[2];
char state;
char active;
short prev;
short len;
short error;
};
struct copy_mref_aspect {
@ -49,9 +60,8 @@ struct copy_brick {
wait_queue_head_t event;
struct semaphore mutex;
struct task_struct *thread;
char state[MAX_COPY_PARA];
struct mref_object *table[MAX_COPY_PARA][2];
struct generic_object_layout mref_object_layout;
struct copy_state st[MAX_COPY_PARA];
};
struct copy_input {

View File

@ -655,8 +655,7 @@ int mars_filler(void *__buf, const char *name, int namlen, loff_t offset,
dent = container_of(tmp, struct mars_dent, dent_link);
cmp = strcmp(dent->d_path, newpath);
if (!cmp) {
kfree(newpath);
return 0;
goto found;
}
// keep the list sorted. find the next smallest member.
if ((dent->d_class < class ||
@ -678,31 +677,37 @@ int mars_filler(void *__buf, const char *name, int namlen, loff_t offset,
dent = kzalloc(cookie->allocsize, GFP_MARS);
if (unlikely(!dent))
goto err_mem1;
dent->d_name = kmalloc(namlen + 1, GFP_MARS);
if (unlikely(!dent->d_name))
goto err_mem2;
dent->d_type = d_type;
dent->d_class = class;
dent->d_serial = serial;
dent->d_parent = cookie->parent;
dent->d_depth = cookie->depth;
dent->d_path = newpath;
dent->d_pathlen = pathlen;
dent->d_global = global;
INIT_LIST_HEAD(&dent->brick_list);
memcpy(dent->d_name, name, namlen);
dent->d_name[namlen] = '\0';
dent->d_namelen = namlen;
dent->d_rest = dent->d_name + prefix;
down(&global->mutex);
dent->d_path = newpath;
newpath = NULL;
dent->d_pathlen = pathlen;
INIT_LIST_HEAD(&dent->brick_list);
if (best) {
list_add(&dent->dent_link, &best->dent_link);
} else {
list_add_tail(&dent->dent_link, anchor);
}
up(&global->mutex);
found:
dent->d_type = d_type;
dent->d_class = class;
dent->d_serial = serial;
dent->d_parent = cookie->parent;
dent->d_depth = cookie->depth;
dent->d_global = global;
dent->d_killme = false;
if (newpath)
kfree(newpath);
return 0;
err_mem2:
@ -750,6 +755,7 @@ int mars_dent_work(struct mars_global *global, char *dirname, int allocsize, mar
.depth = 0,
};
struct list_head *tmp;
struct list_head *next;
int rounds = 0;
int status;
int total_status = 0;
@ -764,6 +770,8 @@ int mars_dent_work(struct mars_global *global, char *dirname, int allocsize, mar
goto done;
}
down_write(&global->dent_mutex);
restart:
found_dir = false;
@ -780,6 +788,8 @@ restart:
continue;
dent->d_version = version;
msleep(10); // yield
MARS_IO("reading inode '%s'\n", dent->d_path);
status = get_inode(dent->d_path, dent);
total_status |= status;
@ -803,14 +813,30 @@ restart:
}
}
}
if (found_dir && ++rounds < 10) {
goto restart;
}
/* Remove all dents marked for removal.
*/
for (tmp = global->dent_anchor.next, next = tmp->next; tmp != &global->dent_anchor; tmp = next, next = next->next) {
struct mars_dent *dent = container_of(tmp, struct mars_dent, dent_link);
if (!dent->d_killme)
continue;
MARS_DBG("killing dent '%s'\n", dent->d_path);
list_del_init(tmp);
//... FIXME memleak
}
up_write(&global->dent_mutex);
/* Forward pass.
*/
for (tmp = global->dent_anchor.next; tmp != &global->dent_anchor; tmp = tmp->next) {
down_read(&global->dent_mutex);
for (tmp = global->dent_anchor.next, next = tmp->next; tmp != &global->dent_anchor; tmp = next, next = next->next) {
struct mars_dent *dent = container_of(tmp, struct mars_dent, dent_link);
msleep(10); // yield
MARS_IO("forward treat '%s'\n", dent->d_path);
status = worker(buf, dent, false);
total_status |= status;
@ -825,6 +851,7 @@ restart:
*/
for (tmp = global->dent_anchor.prev; tmp != &global->dent_anchor; tmp = tmp->prev) {
struct mars_dent *dent = container_of(tmp, struct mars_dent, dent_link);
msleep(10); // yield
MARS_IO("backward treat '%s'\n", dent->d_path);
status = worker(buf, dent, true);
total_status |= status;
@ -832,17 +859,23 @@ restart:
MARS_ERR("backwards: status %d on '%s'\n", status, dent->d_path);
}
}
up_read(&global->dent_mutex);
done:
return total_status;
}
EXPORT_SYMBOL_GPL(mars_dent_work);
static
struct mars_dent *_mars_find_dent(struct mars_global *global, const char *path)
{
struct mars_dent *res = NULL;
struct list_head *tmp;
if (!rwsem_is_locked(&global->dent_mutex)) {
MARS_ERR("dent_mutex not held!\n");
}
for (tmp = global->dent_anchor.next; tmp != &global->dent_anchor; tmp = tmp->next) {
struct mars_dent *tmp_dent = container_of(tmp, struct mars_dent, dent_link);
if (!strcmp(tmp_dent->d_path, path)) {
@ -853,18 +886,19 @@ struct mars_dent *_mars_find_dent(struct mars_global *global, const char *path)
return res;
}
EXPORT_SYMBOL_GPL(_mars_find_dent);
//EXPORT_SYMBOL_GPL(_mars_find_dent);
struct mars_dent *mars_find_dent(struct mars_global *global, const char *path)
{
struct mars_dent *res;
down(&global->mutex);
//down_read(&global->dent_mutex);
res = _mars_find_dent(global, path);
up(&global->mutex);
//up_read(&global->dent_mutex);
return res;
}
EXPORT_SYMBOL_GPL(mars_find_dent);
#if 0 // old code! does not work! incorrect locking / races!
void mars_kill_dent(struct mars_dent *dent)
{
struct mars_global *global = dent->d_global;
@ -893,22 +927,29 @@ void mars_kill_dent(struct mars_dent *dent)
up(&global->mutex);
done: ;
}
#else
void mars_kill_dent(struct mars_dent *dent)
{
dent->d_killme = true;
while (!list_empty(&dent->brick_list)) {
struct list_head *tmp = dent->brick_list.next;
struct mars_brick *brick = container_of(tmp, struct mars_brick, dent_brick_link);
list_del_init(tmp);
// note: locking is now done there....
mars_kill_brick(brick);
}
}
#endif
EXPORT_SYMBOL_GPL(mars_kill_dent);
void mars_free_dent(struct mars_dent *dent)
{
struct mars_global *global = dent->d_global;
int i;
if (global) {
mars_kill_dent(dent);
down(&global->mutex);
}
list_del(&dent->dent_link);
list_del(&dent->brick_list);
if (global) {
up(&global->mutex);
}
mars_kill_dent(dent);
CHECK_HEAD_EMPTY(&dent->dent_link);
CHECK_HEAD_EMPTY(&dent->brick_list);
for (i = 0; i < MARS_ARGV_MAX; i++) {
if (dent->d_argv[i])
@ -930,11 +971,15 @@ EXPORT_SYMBOL_GPL(mars_free_dent);
void mars_free_dent_all(struct list_head *anchor)
{
#if 0 // FIXME: locking
while (!list_empty(anchor)) {
struct mars_dent *dent;
dent = container_of(anchor->prev, struct mars_dent, dent_link);
mars_free_dent(dent);
}
#else // provisionary memleak
list_del_init(anchor);
#endif
}
EXPORT_SYMBOL_GPL(mars_free_dent_all);
@ -950,12 +995,12 @@ struct mars_brick *mars_find_brick(struct mars_global *global, const void *brick
if (!global || !path)
return NULL;
down(&global->mutex);
down_read(&global->brick_mutex);
for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) {
struct mars_brick *test = container_of(tmp, struct mars_brick, global_brick_link);
if (!strcmp(test->brick_path, path)) {
up(&global->mutex);
up_read(&global->brick_mutex);
if (brick_type && test->type != brick_type) {
MARS_ERR("bad brick type\n");
return NULL;
@ -964,7 +1009,7 @@ struct mars_brick *mars_find_brick(struct mars_global *global, const void *brick
}
}
up(&global->mutex);
up_read(&global->brick_mutex);
return NULL;
}
@ -1001,10 +1046,10 @@ int mars_free_brick(struct mars_brick *brick)
global = brick->global;
if (global) {
down(&global->mutex);
down_write(&global->brick_mutex);
list_del_init(&brick->global_brick_link);
list_del_init(&brick->dent_brick_link);
up(&global->mutex);
up_write(&global->brick_mutex);
}
status = generic_brick_exit_full((void*)brick);
@ -1097,12 +1142,12 @@ struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent
/* Immediately make it visible, regardless of internal state.
* Switching on / etc must be done separately.
*/
down(&global->mutex);
down_write(&global->brick_mutex);
list_add(&res->global_brick_link, &global->brick_anchor);
if (belongs) {
list_add(&res->dent_brick_link, &belongs->brick_list);
}
up(&global->mutex);
up_write(&global->brick_mutex);
return res;

View File

@ -85,10 +85,11 @@ struct light_class {
#define IF_SKIP_SYNC true
#define IF_MAX_PLUGGED 10000
#define IF_READAHEAD 1
//#define IF_READAHEAD 0
#define IF_READAHEAD 0
//#define IF_READAHEAD 1
#define BIO_READAHEAD 1
#define BIO_READAHEAD 0
//#define BIO_READAHEAD 1
#define BIO_NOIDLE true
#define BIO_SYNC true
#define BIO_UNPLUG true
@ -618,7 +619,7 @@ int run_bone(struct mars_peerinfo *peer, struct mars_dent *dent)
update_mtime = timespec_compare(&dent->new_stat.mtime, &local_stat.mtime) > 0;
update_ctime = timespec_compare(&dent->new_stat.ctime, &local_stat.ctime) > 0;
//MARS_DBG("timestamps '%s' remote = %ld.%09ld local = %ld.%09ld\n", dent->d_path, dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec, local_stat.mtime.tv_sec, local_stat.mtime.tv_nsec);
MARS_DBG("timestamps '%s' remote = %ld.%09ld local = %ld.%09ld\n", dent->d_path, dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec, local_stat.mtime.tv_sec, local_stat.mtime.tv_nsec);
if ((dent->new_stat.mode & S_IRWXU) !=
(local_stat.mode & S_IRWXU) &&
@ -700,7 +701,7 @@ int run_bones(struct mars_peerinfo *peer)
MARS_DBG("NULL\n");
continue;
}
//MARS_DBG("path = '%s'\n", dent->d_path);
MARS_DBG("path = '%s'\n", dent->d_path);
status = run_bone(peer, dent);
if (status > 0)
run_trigger = true;
@ -1386,7 +1387,7 @@ int make_log(void *buf, struct mars_dent *dent)
_update_replaylink(dent->d_parent, dent->d_serial + 1, 0, 0, !rot->is_primary);
trans_brick->current_pos = 0;
rot->last_jiffies = jiffies;
mars_trigger();
//mars_trigger();
}
status = -EAGAIN;
goto done;
@ -2013,9 +2014,16 @@ enum {
CL_PEERS,
// resource definitions
CL_RESOURCE,
CL_DEFAULTS0,
CL_DEFAULTS,
CL_DEFAULTS_ITEMS0,
CL_DEFAULTS_ITEMS,
CL_SWITCH,
CL_SWITCH_ITEMS,
CL_ACTUAL,
CL_ACTUAL_ITEMS,
CL_CONNECT,
CL_SIZE,
CL_DATA,
CL_PRIMARY,
CL__FILE,
@ -2069,6 +2077,46 @@ static const struct light_class light_classes[] = {
.cl_forward = make_log_init,
.cl_backward = NULL,
},
/* Subdirectory for defaults...
*/
[CL_DEFAULTS0] = {
.cl_name = "defaults",
.cl_len = 8,
.cl_type = 'd',
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
[CL_DEFAULTS] = {
.cl_name = "defaults-",
.cl_len = 9,
.cl_type = 'd',
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* ... and its contents
*/
[CL_DEFAULTS_ITEMS0] = {
.cl_name = "",
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_father = CL_DEFAULTS0,
.cl_forward = NULL,
.cl_backward = NULL,
},
[CL_DEFAULTS_ITEMS] = {
.cl_name = "",
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_father = CL_DEFAULTS,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Subdirectory for controlling items...
*/
[CL_SWITCH] = {
@ -2090,6 +2138,30 @@ static const struct light_class light_classes[] = {
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Subdirectory for actual state
*/
[CL_ACTUAL] = {
.cl_name = "actual-",
.cl_len = 7,
.cl_type = 'd',
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* ... and its contents
*/
[CL_ACTUAL_ITEMS] = {
.cl_name = "",
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_father = CL_ACTUAL,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Symlink indicating the current peer
*/
[CL_CONNECT] = {
@ -2101,6 +2173,17 @@ static const struct light_class light_classes[] = {
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Symlink indiating the (common) size of the resource
*/
[CL_SIZE] = {
.cl_name = "size",
.cl_len = 4,
.cl_type = 'l',
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* File or symlink to the real device / real (sparse) file
* when hostcontext is missing, the corresponding peer will
* not participate in that resource.
@ -2379,7 +2462,7 @@ void _show_status(struct mars_global *global)
{
struct list_head *tmp;
down(&global->mutex);
down_read(&global->brick_mutex);
for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) {
struct mars_brick *test;
const char *path;
@ -2420,7 +2503,7 @@ void _show_status(struct mars_global *global)
}
kfree(dst);
}
up(&global->mutex);
up_read(&global->brick_mutex);
}
#ifdef STAT_DEBUGGING
@ -2431,15 +2514,17 @@ void _show_statist(struct mars_global *global)
int dent_count = 0;
int brick_count = 0;
down(&global->mutex);
MARS_STAT("================================== dents:\n");
down_read(&global->dent_mutex);
for (tmp = global->dent_anchor.next; tmp != &global->dent_anchor; tmp = tmp->next) {
struct mars_dent *dent;
dent = container_of(tmp, struct mars_dent, dent_link);
MARS_STAT("dent %d '%s' '%s'\n", dent->d_class, dent->d_path, dent->new_link ? dent->new_link : "");
MARS_STAT("dent %d '%s' '%s' stamp=%ld.%09ld\n", dent->d_class, dent->d_path, dent->new_link ? dent->new_link : "", dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec);
dent_count++;
}
up_read(&global->dent_mutex);
MARS_STAT("================================== bricks:\n");
down_read(&global->brick_mutex);
for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) {
struct mars_brick *test;
int i;
@ -2466,7 +2551,7 @@ void _show_statist(struct mars_global *global)
}
}
}
up(&global->mutex);
up_read(&global->brick_mutex);
MARS_INF("==================== STATISTICS: %d dents, %d bricks\n", dent_count, brick_count);
}
@ -2482,7 +2567,8 @@ static int light_thread(void *data)
.global_power = {
.button = true,
},
.mutex = __SEMAPHORE_INITIALIZER(global.mutex, 1),
.dent_mutex = __RWSEM_INITIALIZER(global.dent_mutex),
.brick_mutex = __RWSEM_INITIALIZER(global.brick_mutex),
.main_event = __WAIT_QUEUE_HEAD_INITIALIZER(global.main_event),
};
mars_global = &global; // TODO: cleanup, avoid stack
@ -2509,7 +2595,7 @@ static int light_thread(void *data)
_show_statist(&global);
#endif
msleep(50);
msleep(500);
wait_event_interruptible_timeout(global.main_event, global.main_trigger, 10 * HZ);
global.main_trigger = false;

View File

@ -12,8 +12,6 @@
#define MARS_IO(args...) /*empty*/
#endif
//#define LOCAL // not longer use this!
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
@ -32,18 +30,6 @@ static struct task_struct *server_thread = NULL;
///////////////////////// own helper functions ////////////////////////
#ifdef LOCAL
static int server_checker(struct mars_dent *parent, const char *name, int namlen, unsigned int d_type, int *prefix, int *serial)
{
return 0;
}
static int server_worker(struct mars_global *global, struct mars_dent *dent, bool direction)
{
return 0;
}
#endif
static
int cb_thread(void *data)
{
@ -275,38 +261,16 @@ int handler_thread(void *data)
}
case CMD_GETENTS:
{
#ifdef LOCAL
struct mars_global glob_tmp = {
.dent_anchor = LIST_HEAD_INIT(glob_tmp.dent_anchor),
.brick_anchor = LIST_HEAD_INIT(glob_tmp.brick_anchor),
.mutex = __SEMAPHORE_INITIALIZER(glob_tmp.mutex, 1),
};
status = -EINVAL;
if (!cmd.cmd_str1)
break;
status = mars_dent_work(&glob_tmp, cmd.cmd_str1, sizeof(struct mars_dent), server_checker, server_worker, NULL, cmd.cmd_int1);
MARS_DBG("dents status = %d\n", status);
if (status < 0)
break;
down(&brick->socket_sem);
status = mars_send_dent_list(sock, &glob_tmp.dent_anchor);
up(&brick->socket_sem);
mars_free_dent_all(&glob_tmp.dent_anchor);
#else
status = -EINVAL;
if (unlikely(!cmd.cmd_str1 || !mars_global))
break;
down(&brick->socket_sem);
down(&mars_global->mutex);
down_read(&mars_global->dent_mutex);
status = mars_send_dent_list(sock, &mars_global->dent_anchor);
up(&mars_global->mutex);
up_read(&mars_global->dent_mutex);
up(&brick->socket_sem);
#endif
if (status < 0) {
MARS_ERR("could not send dentry information, status = %d\n", status);
}

View File

@ -2142,6 +2142,8 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
60 * HZ);
atomic_inc(&brick->replay_count);
atomic_inc(&brick->total_replay_count);
traced_lock(&brick->replay_lock, flags);
list_add(&mref_a->replay_head, &brick->replay_list);
traced_unlock(&brick->replay_lock, flags);
@ -2246,6 +2248,7 @@ void trans_logger_replay(struct trans_logger_output *output)
struct trans_logger_input *input = brick->inputs[TL_INPUT_FW_LOG1];
loff_t start_pos;
loff_t finished_pos;
int status = 0;
bool has_triggered = false;
brick->replay_code = -EAGAIN; // indicates "running"
@ -2267,23 +2270,34 @@ void trans_logger_replay(struct trans_logger_output *output)
struct log_header lh = {};
void *buf = NULL;
int len = 0;
int status;
if (kthread_should_stop()) {
break;
}
status = log_read(&input->logst, &lh, &buf, &len);
if (status == -EAGAIN) {
MARS_DBG("got -EAGAIN\n");
msleep(100);
continue;
}
if (unlikely(status < 0)) {
brick->replay_code = status;
MARS_ERR("cannot read logfile data, status = %d\n", status);
break;
}
finished_pos = input->logst.log_pos + input->logst.offset;
if (!brick->do_continuous_replay && finished_pos >= brick->replay_end_pos) {
status = 0; // treat as EOF
}
if (!status) { // EOF -> wait until kthread_should_stop()
MARS_DBG("got EOF\n");
MARS_DBG("EOF at %lld\n", finished_pos);
if (!brick->do_continuous_replay) {
break;
}
if (finished_pos > brick->replay_end_pos) {
brick->replay_end_pos = finished_pos;
}
msleep(1000);
}
@ -2296,14 +2310,13 @@ void trans_logger_replay(struct trans_logger_output *output)
status = apply_data(brick, lh.l_pos, buf, len);
if (unlikely(status < 0)) {
brick->replay_code = status;
MARS_ERR("cannot apply data, len = %d, status = %d\n", len, status);
MARS_ERR("cannot apply data at pos = %lld len = %d, status = %d\n", lh.l_pos, len, status);
break;
}
}
// do this _after_ any opportunities for errors...
if (atomic_read(&brick->replay_count) <= 0) {
finished_pos = input->logst.log_pos + input->logst.offset;
brick->current_pos = finished_pos;
input->replay_min_pos = finished_pos;
input->replay_max_pos = finished_pos; // FIXME
@ -2313,11 +2326,13 @@ void trans_logger_replay(struct trans_logger_output *output)
wait_event_interruptible_timeout(brick->event, atomic_read(&brick->replay_count) <= 0, 60 * HZ);
finished_pos = input->logst.log_pos + input->logst.offset;
brick->current_pos = finished_pos;
input->replay_min_pos = finished_pos;
input->replay_max_pos = finished_pos; // FIXME
if (status >= 0) {
brick->current_pos = finished_pos;
input->replay_min_pos = finished_pos;
input->replay_max_pos = finished_pos; // FIXME
}
if (finished_pos == brick->replay_end_pos) {
if (status >= 0 && finished_pos == brick->replay_end_pos) {
MARS_INF("replay finished at %lld\n", finished_pos);
brick->replay_code = 0;
} else {
@ -2399,15 +2414,18 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
// FIXME: check for allocation overflows
sprintf(res, "total callbacks = %d reads=%d writes=%d flushes=%d (%d%%) wb_clusters=%d writebacks=%d (%d%%) shortcut=%d (%d%%) mshadow=%d sshadow=%d rounds=%d restarts=%d phase1=%d phase2=%d phase3=%d phase4=%d | mshadow=%d sshadow=%d hash_count=%d pos_count=%d balance=%d/%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n",
atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_flush_count), atomic_read(&brick->total_write_count) ? atomic_read(&brick->total_flush_count) * 100 / atomic_read(&brick->total_write_count) : 0, atomic_read(&brick->total_writeback_cluster_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_writeback_cluster_count) ? atomic_read(&brick->total_writeback_count) * 100 / atomic_read(&brick->total_writeback_cluster_count) : 0, atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total),
atomic_read(&brick->mshadow_count), atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying));
sprintf(res, "current_pos = %lld | mode replay=%d continuous=%d replay_code=%d log_reads=%d | total replay=%d callbacks=%d reads=%d writes=%d flushes=%d (%d%%) wb_clusters=%d writebacks=%d (%d%%) shortcut=%d (%d%%) mshadow=%d sshadow=%d rounds=%d restarts=%d phase1=%d phase2=%d phase3=%d phase4=%d | replay=%d mshadow=%d sshadow=%d hash_count=%d pos_count=%d balance=%d/%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n",
brick->current_pos,
brick->do_replay, brick->do_continuous_replay, brick->replay_code, brick->log_reads,
atomic_read(&brick->total_replay_count), atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_flush_count), atomic_read(&brick->total_write_count) ? atomic_read(&brick->total_flush_count) * 100 / atomic_read(&brick->total_write_count) : 0, atomic_read(&brick->total_writeback_cluster_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_writeback_cluster_count) ? atomic_read(&brick->total_writeback_count) * 100 / atomic_read(&brick->total_writeback_cluster_count) : 0, atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total),
atomic_read(&brick->replay_count), atomic_read(&brick->mshadow_count), atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying));
return res;
}
static noinline
void trans_logger_reset_statistics(struct trans_logger_brick *brick)
{
atomic_set(&brick->total_replay_count, 0);
atomic_set(&brick->total_cb_count, 0);
atomic_set(&brick->total_read_count, 0);
atomic_set(&brick->total_write_count, 0);

View File

@ -145,6 +145,7 @@ struct trans_logger_brick {
atomic_t inner_balance_count;
atomic_t sub_balance_count;
atomic_t wb_balance_count;
atomic_t total_replay_count;
atomic_t total_cb_count;
atomic_t total_read_count;
atomic_t total_write_count;

View File

@ -32,7 +32,35 @@ sub check_id {
sub check_res {
my $res = shift;
die "resource '$res' does not exist\n" unless -d "$mars/resource-$res";
if(not -d "$mars/resource-$res") {
# DO WHAT I MEAN: try to substitute a device name for a badly given resource name if it is unique
my $count = 0;
my $found;
my @tests = glob("$mars/resource-*/device-$host");
foreach my $test (@tests) {
my $target = readlink($test);
if($target eq $res) {
$found = $test;
$count++;
}
}
if(!$count) {
@tests = glob("$mars/resource-*/_direct-*-$host");
foreach my $test (@tests) {
my $target = readlink($test);
$target =~ s/^.*,//;
if($target eq $res) {
$found = $test;
$count++;
}
}
}
die "resource '$res' does not exist ($count replacements found)\n" unless $count == 1 and $found;
$found =~ s:^.*/resource-(.*)/.*$:$1:;
warn "substituting bad resource name '$res' by uniquely matching resource name '$found'\n";
$res = $found;
}
return $res;
}
sub check_res_member {
@ -66,12 +94,20 @@ sub _trigger {
}
sub _switch {
my ($path, $on) = @_;
my ($cmd, $res, $path, $on) = @_;
my $src = $on ? "1" : "0";
my $old = readlink($path);
if($old && $old eq $src) {
print "${cmd} on resource $res is already activated\n" if $cmd;
return;
}
my $tmp = $path;
$tmp =~ s/\/([^\/]+)$/.tmp.$1/;
my $src = $on ? "1" : "0";
symlink($src, $tmp) or die "cannot create switch symlink\n";
rename($tmp, $path) or die "cannot rename switch symlink\n";
print "successfully started ${cmd} on resource $res\n" if $cmd;
}
sub _writable {
@ -235,24 +271,21 @@ sub attach_res {
my ($cmd, $res) = @_;
my $detach = ($cmd eq "detach");
my $path = "$mars/resource-$res/switch-$host/attach";
_switch($path, !$detach);
print "successfully started ${cmd} of resource $res\n"
_switch($cmd, $res, $path, !$detach);
}
sub connect_res {
my ($cmd, $res) = @_;
my $disconnect = ($cmd eq "disconnect");
my $path = "$mars/resource-$res/switch-$host/connect";
_switch($path, !$disconnect);
print "successfully started ${cmd} on resource $res\n"
_switch($cmd, $res, $path, !$disconnect);
}
sub pause_res {
my ($cmd, $res) = @_;
my $pause = ($cmd eq "pause-sync");
my $path = "$mars/resource-$res/switch-$host/sync";
_switch($path, !$pause);
print "successfully started ${cmd} on resource $res\n"
_switch($cmd, $res, $path, !$pause);
}
sub up_res {
@ -377,7 +410,7 @@ sub do_res {
my $cmd = shift;
my $res = shift;
check_res($res) unless $cmd =~ m/^(join-system|create-resource)$/;
$res = check_res($res) unless $cmd =~ m/^(join-system|create-resource)$/;
check_res_member($res) unless $cmd =~ m/^(join|create)-(system|resource)$/;
my $func = $cmd_table{$cmd};