diff --git a/mars.h b/mars.h index 56a48922..742ac597 100644 --- a/mars.h +++ b/mars.h @@ -3,6 +3,7 @@ #define MARS_H #include +#include //#define MARS_TRACING // write runtime trace data to /mars/trace.csv @@ -351,6 +352,7 @@ extern char *my_id(void); int d_serial; /* for pre-grouping order */ \ int d_version; /* dynamic programming per call of mars_ent_work() */ \ char d_once_error; \ + bool d_killme; \ struct kstat new_stat; \ struct kstat old_stat; \ char *new_link; \ @@ -368,7 +370,8 @@ extern const struct meta mars_kstat_meta[]; extern const struct meta mars_dent_meta[]; struct mars_global { - struct semaphore mutex; + struct rw_semaphore dent_mutex; + struct rw_semaphore brick_mutex; struct generic_switch global_power; struct list_head dent_anchor; struct list_head brick_anchor; @@ -381,7 +384,6 @@ typedef int (*mars_dent_checker)(struct mars_dent *parent, const char *name, int typedef int (*mars_dent_worker)(struct mars_global *global, struct mars_dent *dent, bool direction); extern int mars_dent_work(struct mars_global *global, char *dirname, int allocsize, mars_dent_checker checker, mars_dent_worker worker, void *buf, int maxdepth); -extern struct mars_dent *_mars_find_dent(struct mars_global *global, const char *path); extern struct mars_dent *mars_find_dent(struct mars_global *global, const char *path); extern void mars_kill_dent(struct mars_dent *dent); extern void mars_free_dent(struct mars_dent *dent); diff --git a/mars_aio.c b/mars_aio.c index e064f7bd..a67d11f8 100644 --- a/mars_aio.c +++ b/mars_aio.c @@ -137,7 +137,7 @@ static int aio_ref_get(struct aio_output *output, struct mref_object *mref) if (!mref->ref_may_write) { loff_t len = total_size - mref->ref_pos; if (unlikely(len <= 0)) { - /* Allow reads starting _exactly_ at EOF when a timeout is specified (special case). + /* Special case: allow reads starting _exactly_ at EOF when a timeout is specified. */ if (len < 0 || mref->ref_timeout <= 0) { MARS_DBG("ENODATA %lld\n", len); diff --git a/mars_copy.c b/mars_copy.c index 6ac13d9e..28323b4c 100644 --- a/mars_copy.c +++ b/mars_copy.c @@ -93,7 +93,8 @@ int _determine_input(struct copy_brick *brick, struct mref_object *mref) return INPUT_A_IO; } -#define MAKE_INDEX(pos) (((pos) / COPY_CHUNK) % MAX_COPY_PARA) +#define GET_INDEX(pos) (((pos) / COPY_CHUNK) % MAX_COPY_PARA) +#define GET_OFFSET(pos) ((pos) % COPY_CHUNK) static void copy_endio(struct generic_callback *cb) @@ -101,8 +102,10 @@ void copy_endio(struct generic_callback *cb) struct copy_mref_aspect *mref_a; struct mref_object *mref; struct copy_brick *brick; + struct copy_state *st; int index; int queue; + int error = 0; mref_a = cb->cb_private; CHECK_PTR(mref_a, err); @@ -112,28 +115,34 @@ void copy_endio(struct generic_callback *cb) CHECK_PTR(brick, err); queue = mref_a->queue; - index = MAKE_INDEX(mref->ref_pos); + index = GET_INDEX(mref->ref_pos); + st = &brick->st[index]; + MARS_IO("queue = %d index = %d pos = %lld status = %d\n", queue, index, mref->ref_pos, cb->cb_error); if (unlikely(queue < 0 || queue >= 2)) { MARS_ERR("bad queue %d\n", queue); - _clash(brick); + error = -EINVAL; goto exit; } - if (unlikely(brick->table[index][queue])) { - MARS_ERR("table corruption at %d %d (%p => %p)\n", index, queue, brick->table[index], mref); - _clash(brick); - brick->state[index] = -EINVAL; + if (unlikely(st->table[queue])) { + MARS_ERR("table corruption at %d %d (%p => %p)\n", index, queue, st->table[queue], mref); + error = -EEXIST; goto exit; } if (unlikely(cb->cb_error < 0)) { - MARS_ERR("IO error %d on index %d, old state =%d\n", cb->cb_error, index, brick->state[index]); - brick->state[index] = cb->cb_error; - } else if (likely(brick->state[index] > 0)) { - brick->table[index][queue] = mref; + MARS_ERR("IO error %d on index %d, old state = %d\n", cb->cb_error, index, st->state); + error = cb->cb_error; + } else if (likely(!st->error)) { + st->table[queue] = mref; } exit: + if (unlikely(error)) { + st->error = error; + _clash(brick); + } atomic_dec(&brick->copy_flight); + st->active = false; brick->trigger = true; wake_up_interruptible(&brick->event); return; @@ -149,6 +158,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_ struct copy_mref_aspect *mref_a; struct copy_input *input; loff_t tmp_pos; + int offset; int len; int status = -1; @@ -173,7 +183,8 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_ mref->ref_rw = rw; mref->ref_data = data; mref->ref_pos = pos; - len = COPY_CHUNK - (pos & (COPY_CHUNK-1)); + offset = GET_OFFSET(pos); + len = COPY_CHUNK - offset; if (pos + len > tmp_pos) { len = tmp_pos - pos; } @@ -191,14 +202,14 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_ goto done; } if (unlikely(mref->ref_len < len)) { - MARS_ERR("shorten len %d < %d\n", mref->ref_len, len); - //FIXME: handle this case - status = -EAGAIN; + MARS_DBG("shorten len %d < %d\n", mref->ref_len, len); } MARS_IO("queue = %d index = %d pos = %lld len = %d rw = %d\n", queue, index, mref->ref_pos, mref->ref_len, rw); atomic_inc(&brick->copy_flight); + brick->st[index].len = mref->ref_len; + brick->st[index].active = true; GENERIC_INPUT_CALL(input, mref_io, mref); done: @@ -208,12 +219,12 @@ done: static void _clear_mref(struct copy_brick *brick, int index, int queue) { - struct mref_object *mref = brick->table[index][queue]; + struct mref_object *mref = brick->st[index].table[queue]; if (mref) { struct copy_input *input; input = queue ? brick->inputs[INPUT_B_COPY] : brick->inputs[INPUT_A_COPY]; GENERIC_INPUT_CALL(input, mref_put, mref); - brick->table[index][queue] = NULL; + brick->st[index].table[queue] = NULL; } } @@ -231,17 +242,18 @@ void _update_percent(struct copy_brick *brick) } static -int _next_state(struct copy_brick *brick, loff_t pos) +int _next_state(struct copy_brick *brick, int index, loff_t pos) { struct mref_object *mref1; struct mref_object *mref2; - int index = MAKE_INDEX(pos); + struct copy_state *st; char state; char next_state; int i; int status; - state = brick->state[index]; + st = &brick->st[index]; + state = st->state; next_state = -1; mref2 = NULL; status = 0; @@ -250,7 +262,7 @@ int _next_state(struct copy_brick *brick, loff_t pos) switch (state) { case COPY_STATE_START: - if (brick->table[index][0] || brick->table[index][1]) { + if (st->table[0] || st->table[1]) { MARS_ERR("index %d not startable\n", index); status = -EPROTO; goto done; @@ -269,13 +281,13 @@ int _next_state(struct copy_brick *brick, loff_t pos) } break; case COPY_STATE_READ2: - mref2 = brick->table[index][1]; + mref2 = st->table[1]; if (!mref2) { goto done; } /* fallthrough */ case COPY_STATE_READ1: - mref1 = brick->table[index][0]; + mref1 = st->table[0]; if (!mref1) { goto done; } @@ -288,7 +300,7 @@ int _next_state(struct copy_brick *brick, loff_t pos) !memcmp(mref1->ref_data, mref2->ref_data, len)) { /* skip start of writing, goto final treatment of writeout */ next_state = COPY_STATE_WRITE; - brick->state[index] = next_state; + st->state = next_state; goto COPY_STATE_WRITE; } _clear_mref(brick, index, 1); @@ -300,15 +312,13 @@ int _next_state(struct copy_brick *brick, loff_t pos) break; case COPY_STATE_WRITE: COPY_STATE_WRITE: - mref2 = brick->table[index][1]; + mref2 = st->table[1]; if (!mref2 || brick->copy_start != pos) { MARS_IO("irrelevant\n"); goto done; } - if (!brick->clash) { - brick->copy_start += mref2->ref_len; - MARS_IO("new copy_start = %lld\n", brick->copy_start); - _update_percent(brick); + if (!brick->clash && mref2->ref_len == COPY_CHUNK) { + st->finished = true; } next_state = COPY_STATE_CLEANUP; /* fallthrough */ @@ -323,9 +333,9 @@ int _next_state(struct copy_brick *brick, loff_t pos) status = -EILSEQ; } - brick->state[index] = next_state; + st->state = next_state; if (status < 0) { - brick->state[index] = -1; + st->error = status; MARS_ERR("status = %d\n", status); _clash(brick); } @@ -339,7 +349,7 @@ void _run_copy(struct copy_brick *brick) { int max; loff_t pos; - int i; + loff_t last = 0; int status; if (_clear_clash(brick)) { @@ -352,22 +362,41 @@ void _run_copy(struct copy_brick *brick) msleep(50); return; } - for (i = 0; i < MAX_COPY_PARA; i++) { - brick->table[i][0] = NULL; - brick->table[i][1] = NULL; - brick->state[i] = COPY_STATE_START; - } + memset(brick->st, 0, sizeof(brick->st)); } + /* Do at most max iterations in the below loop + */ max = MAX_COPY_PARA - atomic_read(&brick->io_flight) * 2; MARS_IO("max = %d\n", max); for (pos = brick->copy_start; pos < brick->copy_end || brick->append_mode > 1; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) { + int index = GET_INDEX(pos); + struct copy_state *st = &brick->st[index]; //MARS_IO("pos = %lld\n", pos); if (brick->clash || max-- <= 0 || kthread_should_stop()) { break; } - status = _next_state(brick, pos); + if (!st->skip) { + status = _next_state(brick, index, pos); + last = pos; + } + } + if (!brick->clash) { + int count = 0; + for (pos = brick->copy_start; pos < last; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) { + int index = GET_INDEX(pos); + struct copy_state *st = &brick->st[index]; + if (!st->finished) + break; + count += COPY_CHUNK; + st->finished = false; + } + if (count > 0) { + brick->copy_start += count; + MARS_IO("new copy_start = %lld\n", brick->copy_start); + _update_percent(brick); + } } } @@ -382,13 +411,15 @@ static int _copy_thread(void *data) while (!kthread_should_stop()) { loff_t old_start = brick->copy_start; loff_t old_end = brick->copy_end; - if (old_end > 0) + if (old_end > 0) { _run_copy(brick); + msleep(10); // yield FIXME: remove this, use event handling for over/underflow + } wait_event_interruptible_timeout(brick->event, brick->trigger || brick->copy_start != old_start || brick->copy_end != old_end || kthread_should_stop(), - 20 * HZ); + 5 * HZ); brick->trigger = false; } diff --git a/mars_copy.h b/mars_copy.h index 0d3319c2..bebdf3b8 100644 --- a/mars_copy.h +++ b/mars_copy.h @@ -11,15 +11,26 @@ #define INPUT_B_COPY 3 //#define COPY_CHUNK (64 * 1024) -#define COPY_CHUNK PAGE_SIZE -#define MAX_COPY_PARA (10 * 1024 * 1024 / COPY_CHUNK) +#define COPY_CHUNK (PAGE_SIZE) +#define MAX_COPY_PARA (4 * 1024 * 1024 / COPY_CHUNK) enum { - COPY_STATE_START = 0, - COPY_STATE_READ1 = 1, - COPY_STATE_READ2 = 2, - COPY_STATE_WRITE, - COPY_STATE_CLEANUP, + COPY_STATE_START = 0, + COPY_STATE_READ1 = 1, + COPY_STATE_READ2 = 2, + COPY_STATE_WAIT_A = 3, + COPY_STATE_WRITE = 4, + COPY_STATE_WAIT_B = 5, + COPY_STATE_CLEANUP = 6, +}; + +struct copy_state { + struct mref_object *table[2]; + char state; + char active; + short prev; + short len; + short error; }; struct copy_mref_aspect { @@ -49,9 +60,8 @@ struct copy_brick { wait_queue_head_t event; struct semaphore mutex; struct task_struct *thread; - char state[MAX_COPY_PARA]; - struct mref_object *table[MAX_COPY_PARA][2]; struct generic_object_layout mref_object_layout; + struct copy_state st[MAX_COPY_PARA]; }; struct copy_input { diff --git a/mars_generic.c b/mars_generic.c index fc3681cc..9307fd1b 100644 --- a/mars_generic.c +++ b/mars_generic.c @@ -655,8 +655,7 @@ int mars_filler(void *__buf, const char *name, int namlen, loff_t offset, dent = container_of(tmp, struct mars_dent, dent_link); cmp = strcmp(dent->d_path, newpath); if (!cmp) { - kfree(newpath); - return 0; + goto found; } // keep the list sorted. find the next smallest member. if ((dent->d_class < class || @@ -678,31 +677,37 @@ int mars_filler(void *__buf, const char *name, int namlen, loff_t offset, dent = kzalloc(cookie->allocsize, GFP_MARS); if (unlikely(!dent)) goto err_mem1; + dent->d_name = kmalloc(namlen + 1, GFP_MARS); if (unlikely(!dent->d_name)) goto err_mem2; - - dent->d_type = d_type; - dent->d_class = class; - dent->d_serial = serial; - dent->d_parent = cookie->parent; - dent->d_depth = cookie->depth; - dent->d_path = newpath; - dent->d_pathlen = pathlen; - dent->d_global = global; - INIT_LIST_HEAD(&dent->brick_list); memcpy(dent->d_name, name, namlen); dent->d_name[namlen] = '\0'; dent->d_namelen = namlen; dent->d_rest = dent->d_name + prefix; - down(&global->mutex); + dent->d_path = newpath; + newpath = NULL; + dent->d_pathlen = pathlen; + + INIT_LIST_HEAD(&dent->brick_list); + if (best) { list_add(&dent->dent_link, &best->dent_link); } else { list_add_tail(&dent->dent_link, anchor); } - up(&global->mutex); + +found: + dent->d_type = d_type; + dent->d_class = class; + dent->d_serial = serial; + dent->d_parent = cookie->parent; + dent->d_depth = cookie->depth; + dent->d_global = global; + dent->d_killme = false; + if (newpath) + kfree(newpath); return 0; err_mem2: @@ -750,6 +755,7 @@ int mars_dent_work(struct mars_global *global, char *dirname, int allocsize, mar .depth = 0, }; struct list_head *tmp; + struct list_head *next; int rounds = 0; int status; int total_status = 0; @@ -764,6 +770,8 @@ int mars_dent_work(struct mars_global *global, char *dirname, int allocsize, mar goto done; } + down_write(&global->dent_mutex); + restart: found_dir = false; @@ -780,6 +788,8 @@ restart: continue; dent->d_version = version; + msleep(10); // yield + MARS_IO("reading inode '%s'\n", dent->d_path); status = get_inode(dent->d_path, dent); total_status |= status; @@ -803,14 +813,30 @@ restart: } } } + if (found_dir && ++rounds < 10) { goto restart; } + /* Remove all dents marked for removal. + */ + for (tmp = global->dent_anchor.next, next = tmp->next; tmp != &global->dent_anchor; tmp = next, next = next->next) { + struct mars_dent *dent = container_of(tmp, struct mars_dent, dent_link); + if (!dent->d_killme) + continue; + MARS_DBG("killing dent '%s'\n", dent->d_path); + list_del_init(tmp); + //... FIXME memleak + } + + up_write(&global->dent_mutex); + /* Forward pass. */ - for (tmp = global->dent_anchor.next; tmp != &global->dent_anchor; tmp = tmp->next) { + down_read(&global->dent_mutex); + for (tmp = global->dent_anchor.next, next = tmp->next; tmp != &global->dent_anchor; tmp = next, next = next->next) { struct mars_dent *dent = container_of(tmp, struct mars_dent, dent_link); + msleep(10); // yield MARS_IO("forward treat '%s'\n", dent->d_path); status = worker(buf, dent, false); total_status |= status; @@ -825,6 +851,7 @@ restart: */ for (tmp = global->dent_anchor.prev; tmp != &global->dent_anchor; tmp = tmp->prev) { struct mars_dent *dent = container_of(tmp, struct mars_dent, dent_link); + msleep(10); // yield MARS_IO("backward treat '%s'\n", dent->d_path); status = worker(buf, dent, true); total_status |= status; @@ -832,17 +859,23 @@ restart: MARS_ERR("backwards: status %d on '%s'\n", status, dent->d_path); } } + up_read(&global->dent_mutex); done: return total_status; } EXPORT_SYMBOL_GPL(mars_dent_work); +static struct mars_dent *_mars_find_dent(struct mars_global *global, const char *path) { struct mars_dent *res = NULL; struct list_head *tmp; - + + if (!rwsem_is_locked(&global->dent_mutex)) { + MARS_ERR("dent_mutex not held!\n"); + } + for (tmp = global->dent_anchor.next; tmp != &global->dent_anchor; tmp = tmp->next) { struct mars_dent *tmp_dent = container_of(tmp, struct mars_dent, dent_link); if (!strcmp(tmp_dent->d_path, path)) { @@ -853,18 +886,19 @@ struct mars_dent *_mars_find_dent(struct mars_global *global, const char *path) return res; } -EXPORT_SYMBOL_GPL(_mars_find_dent); +//EXPORT_SYMBOL_GPL(_mars_find_dent); struct mars_dent *mars_find_dent(struct mars_global *global, const char *path) { struct mars_dent *res; - down(&global->mutex); + //down_read(&global->dent_mutex); res = _mars_find_dent(global, path); - up(&global->mutex); + //up_read(&global->dent_mutex); return res; } EXPORT_SYMBOL_GPL(mars_find_dent); +#if 0 // old code! does not work! incorrect locking / races! void mars_kill_dent(struct mars_dent *dent) { struct mars_global *global = dent->d_global; @@ -893,22 +927,29 @@ void mars_kill_dent(struct mars_dent *dent) up(&global->mutex); done: ; } +#else +void mars_kill_dent(struct mars_dent *dent) +{ + dent->d_killme = true; + while (!list_empty(&dent->brick_list)) { + struct list_head *tmp = dent->brick_list.next; + struct mars_brick *brick = container_of(tmp, struct mars_brick, dent_brick_link); + list_del_init(tmp); + // note: locking is now done there.... + mars_kill_brick(brick); + } +} +#endif EXPORT_SYMBOL_GPL(mars_kill_dent); void mars_free_dent(struct mars_dent *dent) { - struct mars_global *global = dent->d_global; int i; - if (global) { - mars_kill_dent(dent); - down(&global->mutex); - } - list_del(&dent->dent_link); - list_del(&dent->brick_list); - if (global) { - up(&global->mutex); - } + mars_kill_dent(dent); + + CHECK_HEAD_EMPTY(&dent->dent_link); + CHECK_HEAD_EMPTY(&dent->brick_list); for (i = 0; i < MARS_ARGV_MAX; i++) { if (dent->d_argv[i]) @@ -930,11 +971,15 @@ EXPORT_SYMBOL_GPL(mars_free_dent); void mars_free_dent_all(struct list_head *anchor) { +#if 0 // FIXME: locking while (!list_empty(anchor)) { struct mars_dent *dent; dent = container_of(anchor->prev, struct mars_dent, dent_link); mars_free_dent(dent); } +#else // provisionary memleak + list_del_init(anchor); +#endif } EXPORT_SYMBOL_GPL(mars_free_dent_all); @@ -950,12 +995,12 @@ struct mars_brick *mars_find_brick(struct mars_global *global, const void *brick if (!global || !path) return NULL; - down(&global->mutex); + down_read(&global->brick_mutex); for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) { struct mars_brick *test = container_of(tmp, struct mars_brick, global_brick_link); if (!strcmp(test->brick_path, path)) { - up(&global->mutex); + up_read(&global->brick_mutex); if (brick_type && test->type != brick_type) { MARS_ERR("bad brick type\n"); return NULL; @@ -964,7 +1009,7 @@ struct mars_brick *mars_find_brick(struct mars_global *global, const void *brick } } - up(&global->mutex); + up_read(&global->brick_mutex); return NULL; } @@ -1001,10 +1046,10 @@ int mars_free_brick(struct mars_brick *brick) global = brick->global; if (global) { - down(&global->mutex); + down_write(&global->brick_mutex); list_del_init(&brick->global_brick_link); list_del_init(&brick->dent_brick_link); - up(&global->mutex); + up_write(&global->brick_mutex); } status = generic_brick_exit_full((void*)brick); @@ -1097,12 +1142,12 @@ struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent /* Immediately make it visible, regardless of internal state. * Switching on / etc must be done separately. */ - down(&global->mutex); + down_write(&global->brick_mutex); list_add(&res->global_brick_link, &global->brick_anchor); if (belongs) { list_add(&res->dent_brick_link, &belongs->brick_list); } - up(&global->mutex); + up_write(&global->brick_mutex); return res; diff --git a/mars_light.c b/mars_light.c index 9a6bb1af..222fefb1 100644 --- a/mars_light.c +++ b/mars_light.c @@ -85,10 +85,11 @@ struct light_class { #define IF_SKIP_SYNC true #define IF_MAX_PLUGGED 10000 -#define IF_READAHEAD 1 -//#define IF_READAHEAD 0 +#define IF_READAHEAD 0 +//#define IF_READAHEAD 1 -#define BIO_READAHEAD 1 +#define BIO_READAHEAD 0 +//#define BIO_READAHEAD 1 #define BIO_NOIDLE true #define BIO_SYNC true #define BIO_UNPLUG true @@ -618,7 +619,7 @@ int run_bone(struct mars_peerinfo *peer, struct mars_dent *dent) update_mtime = timespec_compare(&dent->new_stat.mtime, &local_stat.mtime) > 0; update_ctime = timespec_compare(&dent->new_stat.ctime, &local_stat.ctime) > 0; - //MARS_DBG("timestamps '%s' remote = %ld.%09ld local = %ld.%09ld\n", dent->d_path, dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec, local_stat.mtime.tv_sec, local_stat.mtime.tv_nsec); + MARS_DBG("timestamps '%s' remote = %ld.%09ld local = %ld.%09ld\n", dent->d_path, dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec, local_stat.mtime.tv_sec, local_stat.mtime.tv_nsec); if ((dent->new_stat.mode & S_IRWXU) != (local_stat.mode & S_IRWXU) && @@ -700,7 +701,7 @@ int run_bones(struct mars_peerinfo *peer) MARS_DBG("NULL\n"); continue; } - //MARS_DBG("path = '%s'\n", dent->d_path); + MARS_DBG("path = '%s'\n", dent->d_path); status = run_bone(peer, dent); if (status > 0) run_trigger = true; @@ -1386,7 +1387,7 @@ int make_log(void *buf, struct mars_dent *dent) _update_replaylink(dent->d_parent, dent->d_serial + 1, 0, 0, !rot->is_primary); trans_brick->current_pos = 0; rot->last_jiffies = jiffies; - mars_trigger(); + //mars_trigger(); } status = -EAGAIN; goto done; @@ -2013,9 +2014,16 @@ enum { CL_PEERS, // resource definitions CL_RESOURCE, + CL_DEFAULTS0, + CL_DEFAULTS, + CL_DEFAULTS_ITEMS0, + CL_DEFAULTS_ITEMS, CL_SWITCH, CL_SWITCH_ITEMS, + CL_ACTUAL, + CL_ACTUAL_ITEMS, CL_CONNECT, + CL_SIZE, CL_DATA, CL_PRIMARY, CL__FILE, @@ -2069,6 +2077,46 @@ static const struct light_class light_classes[] = { .cl_forward = make_log_init, .cl_backward = NULL, }, + + /* Subdirectory for defaults... + */ + [CL_DEFAULTS0] = { + .cl_name = "defaults", + .cl_len = 8, + .cl_type = 'd', + .cl_hostcontext = false, + .cl_father = CL_RESOURCE, + .cl_forward = NULL, + .cl_backward = NULL, + }, + [CL_DEFAULTS] = { + .cl_name = "defaults-", + .cl_len = 9, + .cl_type = 'd', + .cl_hostcontext = true, + .cl_father = CL_RESOURCE, + .cl_forward = NULL, + .cl_backward = NULL, + }, + /* ... and its contents + */ + [CL_DEFAULTS_ITEMS0] = { + .cl_name = "", + .cl_len = 0, // catch any + .cl_type = 'l', + .cl_father = CL_DEFAULTS0, + .cl_forward = NULL, + .cl_backward = NULL, + }, + [CL_DEFAULTS_ITEMS] = { + .cl_name = "", + .cl_len = 0, // catch any + .cl_type = 'l', + .cl_father = CL_DEFAULTS, + .cl_forward = NULL, + .cl_backward = NULL, + }, + /* Subdirectory for controlling items... */ [CL_SWITCH] = { @@ -2090,6 +2138,30 @@ static const struct light_class light_classes[] = { .cl_forward = NULL, .cl_backward = NULL, }, + + /* Subdirectory for actual state + */ + [CL_ACTUAL] = { + .cl_name = "actual-", + .cl_len = 7, + .cl_type = 'd', + .cl_hostcontext = true, + .cl_father = CL_RESOURCE, + .cl_forward = NULL, + .cl_backward = NULL, + }, + /* ... and its contents + */ + [CL_ACTUAL_ITEMS] = { + .cl_name = "", + .cl_len = 0, // catch any + .cl_type = 'l', + .cl_father = CL_ACTUAL, + .cl_forward = NULL, + .cl_backward = NULL, + }, + + /* Symlink indicating the current peer */ [CL_CONNECT] = { @@ -2101,6 +2173,17 @@ static const struct light_class light_classes[] = { .cl_forward = NULL, .cl_backward = NULL, }, + /* Symlink indiating the (common) size of the resource + */ + [CL_SIZE] = { + .cl_name = "size", + .cl_len = 4, + .cl_type = 'l', + .cl_hostcontext = false, + .cl_father = CL_RESOURCE, + .cl_forward = NULL, + .cl_backward = NULL, + }, /* File or symlink to the real device / real (sparse) file * when hostcontext is missing, the corresponding peer will * not participate in that resource. @@ -2379,7 +2462,7 @@ void _show_status(struct mars_global *global) { struct list_head *tmp; - down(&global->mutex); + down_read(&global->brick_mutex); for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) { struct mars_brick *test; const char *path; @@ -2420,7 +2503,7 @@ void _show_status(struct mars_global *global) } kfree(dst); } - up(&global->mutex); + up_read(&global->brick_mutex); } #ifdef STAT_DEBUGGING @@ -2431,15 +2514,17 @@ void _show_statist(struct mars_global *global) int dent_count = 0; int brick_count = 0; - down(&global->mutex); MARS_STAT("================================== dents:\n"); + down_read(&global->dent_mutex); for (tmp = global->dent_anchor.next; tmp != &global->dent_anchor; tmp = tmp->next) { struct mars_dent *dent; dent = container_of(tmp, struct mars_dent, dent_link); - MARS_STAT("dent %d '%s' '%s'\n", dent->d_class, dent->d_path, dent->new_link ? dent->new_link : ""); + MARS_STAT("dent %d '%s' '%s' stamp=%ld.%09ld\n", dent->d_class, dent->d_path, dent->new_link ? dent->new_link : "", dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec); dent_count++; } + up_read(&global->dent_mutex); MARS_STAT("================================== bricks:\n"); + down_read(&global->brick_mutex); for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) { struct mars_brick *test; int i; @@ -2466,7 +2551,7 @@ void _show_statist(struct mars_global *global) } } } - up(&global->mutex); + up_read(&global->brick_mutex); MARS_INF("==================== STATISTICS: %d dents, %d bricks\n", dent_count, brick_count); } @@ -2482,7 +2567,8 @@ static int light_thread(void *data) .global_power = { .button = true, }, - .mutex = __SEMAPHORE_INITIALIZER(global.mutex, 1), + .dent_mutex = __RWSEM_INITIALIZER(global.dent_mutex), + .brick_mutex = __RWSEM_INITIALIZER(global.brick_mutex), .main_event = __WAIT_QUEUE_HEAD_INITIALIZER(global.main_event), }; mars_global = &global; // TODO: cleanup, avoid stack @@ -2509,7 +2595,7 @@ static int light_thread(void *data) _show_statist(&global); #endif - msleep(50); + msleep(500); wait_event_interruptible_timeout(global.main_event, global.main_trigger, 10 * HZ); global.main_trigger = false; diff --git a/mars_server.c b/mars_server.c index 257f24f5..07f691db 100644 --- a/mars_server.c +++ b/mars_server.c @@ -12,8 +12,6 @@ #define MARS_IO(args...) /*empty*/ #endif -//#define LOCAL // not longer use this! - #include #include #include @@ -32,18 +30,6 @@ static struct task_struct *server_thread = NULL; ///////////////////////// own helper functions //////////////////////// -#ifdef LOCAL -static int server_checker(struct mars_dent *parent, const char *name, int namlen, unsigned int d_type, int *prefix, int *serial) -{ - return 0; -} - -static int server_worker(struct mars_global *global, struct mars_dent *dent, bool direction) -{ - return 0; -} -#endif - static int cb_thread(void *data) { @@ -275,38 +261,16 @@ int handler_thread(void *data) } case CMD_GETENTS: { -#ifdef LOCAL - struct mars_global glob_tmp = { - .dent_anchor = LIST_HEAD_INIT(glob_tmp.dent_anchor), - .brick_anchor = LIST_HEAD_INIT(glob_tmp.brick_anchor), - .mutex = __SEMAPHORE_INITIALIZER(glob_tmp.mutex, 1), - }; - - status = -EINVAL; - if (!cmd.cmd_str1) - break; - - status = mars_dent_work(&glob_tmp, cmd.cmd_str1, sizeof(struct mars_dent), server_checker, server_worker, NULL, cmd.cmd_int1); - MARS_DBG("dents status = %d\n", status); - if (status < 0) - break; - - down(&brick->socket_sem); - status = mars_send_dent_list(sock, &glob_tmp.dent_anchor); - up(&brick->socket_sem); - - mars_free_dent_all(&glob_tmp.dent_anchor); -#else status = -EINVAL; if (unlikely(!cmd.cmd_str1 || !mars_global)) break; down(&brick->socket_sem); - down(&mars_global->mutex); + down_read(&mars_global->dent_mutex); status = mars_send_dent_list(sock, &mars_global->dent_anchor); - up(&mars_global->mutex); + up_read(&mars_global->dent_mutex); up(&brick->socket_sem); -#endif + if (status < 0) { MARS_ERR("could not send dentry information, status = %d\n", status); } diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 8306d9da..c4a8e9ff 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -2142,6 +2142,8 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe 60 * HZ); atomic_inc(&brick->replay_count); + atomic_inc(&brick->total_replay_count); + traced_lock(&brick->replay_lock, flags); list_add(&mref_a->replay_head, &brick->replay_list); traced_unlock(&brick->replay_lock, flags); @@ -2246,6 +2248,7 @@ void trans_logger_replay(struct trans_logger_output *output) struct trans_logger_input *input = brick->inputs[TL_INPUT_FW_LOG1]; loff_t start_pos; loff_t finished_pos; + int status = 0; bool has_triggered = false; brick->replay_code = -EAGAIN; // indicates "running" @@ -2267,23 +2270,34 @@ void trans_logger_replay(struct trans_logger_output *output) struct log_header lh = {}; void *buf = NULL; int len = 0; - int status; if (kthread_should_stop()) { break; } status = log_read(&input->logst, &lh, &buf, &len); + if (status == -EAGAIN) { + MARS_DBG("got -EAGAIN\n"); + msleep(100); + continue; + } if (unlikely(status < 0)) { brick->replay_code = status; MARS_ERR("cannot read logfile data, status = %d\n", status); break; } + finished_pos = input->logst.log_pos + input->logst.offset; + if (!brick->do_continuous_replay && finished_pos >= brick->replay_end_pos) { + status = 0; // treat as EOF + } if (!status) { // EOF -> wait until kthread_should_stop() - MARS_DBG("got EOF\n"); + MARS_DBG("EOF at %lld\n", finished_pos); if (!brick->do_continuous_replay) { break; } + if (finished_pos > brick->replay_end_pos) { + brick->replay_end_pos = finished_pos; + } msleep(1000); } @@ -2296,14 +2310,13 @@ void trans_logger_replay(struct trans_logger_output *output) status = apply_data(brick, lh.l_pos, buf, len); if (unlikely(status < 0)) { brick->replay_code = status; - MARS_ERR("cannot apply data, len = %d, status = %d\n", len, status); + MARS_ERR("cannot apply data at pos = %lld len = %d, status = %d\n", lh.l_pos, len, status); break; } } // do this _after_ any opportunities for errors... if (atomic_read(&brick->replay_count) <= 0) { - finished_pos = input->logst.log_pos + input->logst.offset; brick->current_pos = finished_pos; input->replay_min_pos = finished_pos; input->replay_max_pos = finished_pos; // FIXME @@ -2313,11 +2326,13 @@ void trans_logger_replay(struct trans_logger_output *output) wait_event_interruptible_timeout(brick->event, atomic_read(&brick->replay_count) <= 0, 60 * HZ); finished_pos = input->logst.log_pos + input->logst.offset; - brick->current_pos = finished_pos; - input->replay_min_pos = finished_pos; - input->replay_max_pos = finished_pos; // FIXME + if (status >= 0) { + brick->current_pos = finished_pos; + input->replay_min_pos = finished_pos; + input->replay_max_pos = finished_pos; // FIXME + } - if (finished_pos == brick->replay_end_pos) { + if (status >= 0 && finished_pos == brick->replay_end_pos) { MARS_INF("replay finished at %lld\n", finished_pos); brick->replay_code = 0; } else { @@ -2399,15 +2414,18 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose) // FIXME: check for allocation overflows - sprintf(res, "total callbacks = %d reads=%d writes=%d flushes=%d (%d%%) wb_clusters=%d writebacks=%d (%d%%) shortcut=%d (%d%%) mshadow=%d sshadow=%d rounds=%d restarts=%d phase1=%d phase2=%d phase3=%d phase4=%d | mshadow=%d sshadow=%d hash_count=%d pos_count=%d balance=%d/%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", - atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_flush_count), atomic_read(&brick->total_write_count) ? atomic_read(&brick->total_flush_count) * 100 / atomic_read(&brick->total_write_count) : 0, atomic_read(&brick->total_writeback_cluster_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_writeback_cluster_count) ? atomic_read(&brick->total_writeback_count) * 100 / atomic_read(&brick->total_writeback_cluster_count) : 0, atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total), - atomic_read(&brick->mshadow_count), atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying)); + sprintf(res, "current_pos = %lld | mode replay=%d continuous=%d replay_code=%d log_reads=%d | total replay=%d callbacks=%d reads=%d writes=%d flushes=%d (%d%%) wb_clusters=%d writebacks=%d (%d%%) shortcut=%d (%d%%) mshadow=%d sshadow=%d rounds=%d restarts=%d phase1=%d phase2=%d phase3=%d phase4=%d | replay=%d mshadow=%d sshadow=%d hash_count=%d pos_count=%d balance=%d/%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n", + brick->current_pos, + brick->do_replay, brick->do_continuous_replay, brick->replay_code, brick->log_reads, + atomic_read(&brick->total_replay_count), atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_flush_count), atomic_read(&brick->total_write_count) ? atomic_read(&brick->total_flush_count) * 100 / atomic_read(&brick->total_write_count) : 0, atomic_read(&brick->total_writeback_cluster_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_writeback_cluster_count) ? atomic_read(&brick->total_writeback_count) * 100 / atomic_read(&brick->total_writeback_cluster_count) : 0, atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total), + atomic_read(&brick->replay_count), atomic_read(&brick->mshadow_count), atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying)); return res; } static noinline void trans_logger_reset_statistics(struct trans_logger_brick *brick) { + atomic_set(&brick->total_replay_count, 0); atomic_set(&brick->total_cb_count, 0); atomic_set(&brick->total_read_count, 0); atomic_set(&brick->total_write_count, 0); diff --git a/mars_trans_logger.h b/mars_trans_logger.h index d9226efa..ed4b99b0 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -145,6 +145,7 @@ struct trans_logger_brick { atomic_t inner_balance_count; atomic_t sub_balance_count; atomic_t wb_balance_count; + atomic_t total_replay_count; atomic_t total_cb_count; atomic_t total_read_count; atomic_t total_write_count; diff --git a/userspace/marsadm b/userspace/marsadm index 08be8404..1689b0fc 100644 --- a/userspace/marsadm +++ b/userspace/marsadm @@ -32,7 +32,35 @@ sub check_id { sub check_res { my $res = shift; - die "resource '$res' does not exist\n" unless -d "$mars/resource-$res"; + if(not -d "$mars/resource-$res") { + # DO WHAT I MEAN: try to substitute a device name for a badly given resource name if it is unique + my $count = 0; + my $found; + my @tests = glob("$mars/resource-*/device-$host"); + foreach my $test (@tests) { + my $target = readlink($test); + if($target eq $res) { + $found = $test; + $count++; + } + } + if(!$count) { + @tests = glob("$mars/resource-*/_direct-*-$host"); + foreach my $test (@tests) { + my $target = readlink($test); + $target =~ s/^.*,//; + if($target eq $res) { + $found = $test; + $count++; + } + } + } + die "resource '$res' does not exist ($count replacements found)\n" unless $count == 1 and $found; + $found =~ s:^.*/resource-(.*)/.*$:$1:; + warn "substituting bad resource name '$res' by uniquely matching resource name '$found'\n"; + $res = $found; + } + return $res; } sub check_res_member { @@ -66,12 +94,20 @@ sub _trigger { } sub _switch { - my ($path, $on) = @_; + my ($cmd, $res, $path, $on) = @_; + my $src = $on ? "1" : "0"; + + my $old = readlink($path); + if($old && $old eq $src) { + print "${cmd} on resource $res is already activated\n" if $cmd; + return; + } + my $tmp = $path; $tmp =~ s/\/([^\/]+)$/.tmp.$1/; - my $src = $on ? "1" : "0"; symlink($src, $tmp) or die "cannot create switch symlink\n"; rename($tmp, $path) or die "cannot rename switch symlink\n"; + print "successfully started ${cmd} on resource $res\n" if $cmd; } sub _writable { @@ -235,24 +271,21 @@ sub attach_res { my ($cmd, $res) = @_; my $detach = ($cmd eq "detach"); my $path = "$mars/resource-$res/switch-$host/attach"; - _switch($path, !$detach); - print "successfully started ${cmd} of resource $res\n" + _switch($cmd, $res, $path, !$detach); } sub connect_res { my ($cmd, $res) = @_; my $disconnect = ($cmd eq "disconnect"); my $path = "$mars/resource-$res/switch-$host/connect"; - _switch($path, !$disconnect); - print "successfully started ${cmd} on resource $res\n" + _switch($cmd, $res, $path, !$disconnect); } sub pause_res { my ($cmd, $res) = @_; my $pause = ($cmd eq "pause-sync"); my $path = "$mars/resource-$res/switch-$host/sync"; - _switch($path, !$pause); - print "successfully started ${cmd} on resource $res\n" + _switch($cmd, $res, $path, !$pause); } sub up_res { @@ -377,7 +410,7 @@ sub do_res { my $cmd = shift; my $res = shift; - check_res($res) unless $cmd =~ m/^(join-system|create-resource)$/; + $res = check_res($res) unless $cmd =~ m/^(join-system|create-resource)$/; check_res_member($res) unless $cmd =~ m/^(join|create)-(system|resource)$/; my $func = $cmd_table{$cmd};