mars/mars_trans_logger.c

2706 lines
73 KiB
C
Raw Normal View History

2010-08-08 20:51:20 +00:00
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
2011-02-23 20:48:06 +00:00
// Trans_Logger brick
2010-08-08 20:51:20 +00:00
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
2011-04-01 11:18:32 +00:00
//#define IO_DEBUGGING
2011-07-28 11:41:06 +00:00
//#define REPLAY_DEBUGGING
2011-04-01 11:18:32 +00:00
//#define STAT_DEBUGGING // here means: display full statistics
2011-06-30 13:15:52 +00:00
//#define HASH_DEBUGGING
2011-08-25 10:16:32 +00:00
//#define REFCOUNT_BUG // FIXME!!!
2010-08-08 20:51:20 +00:00
2011-04-18 09:23:04 +00:00
// variants
#define KEEP_UNIQUE
//#define WB_COPY
2011-04-20 14:26:44 +00:00
#define LATER
2011-11-14 14:21:15 +00:00
#define DELAY_CALLERS // this is _needed_
2011-04-18 09:23:04 +00:00
2011-06-30 13:15:52 +00:00
// commenting this out is dangerous for data integrity! use only for testing!
2011-04-08 09:52:46 +00:00
#define USE_MEMCPY
2011-07-15 10:12:06 +00:00
#define DO_WRITEBACK // otherwise FAKE IO
2011-04-08 09:52:46 +00:00
#define APPLY_DATA
2011-03-08 16:45:52 +00:00
2010-08-08 20:51:20 +00:00
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/bio.h>
2010-08-20 10:58:24 +00:00
#include <linux/kthread.h>
2010-08-08 20:51:20 +00:00
#include "mars.h"
2011-07-28 11:41:06 +00:00
#ifdef REPLAY_DEBUGGING
#define MARS_RPL(_fmt, _args...) _MARS_MSG(false, "REPLAY ", _fmt, ##_args)
#else
#define MARS_RPL(_args...) /*empty*/
#endif
2010-08-08 20:51:20 +00:00
///////////////////////// own type definitions ////////////////////////
#include "mars_trans_logger.h"
2011-03-07 18:36:08 +00:00
#if 1
2011-04-08 09:52:46 +00:00
#define inline noinline
#endif
static inline
2011-04-18 14:14:16 +00:00
int lh_cmp(loff_t *a, loff_t *b)
2010-11-12 11:18:40 +00:00
{
2011-04-18 14:14:16 +00:00
if (*a < *b)
return -1;
if (*a > *b)
return 1;
return 0;
2010-11-12 11:18:40 +00:00
}
2011-04-08 09:52:46 +00:00
static inline
2011-04-18 14:14:16 +00:00
int tr_cmp(struct pairing_heap_logger *_a, struct pairing_heap_logger *_b)
2010-08-11 16:02:08 +00:00
{
2011-04-18 14:14:16 +00:00
struct logger_head *a = container_of(_a, struct logger_head, ph);
struct logger_head *b = container_of(_b, struct logger_head, ph);
return lh_cmp(a->lh_pos, b->lh_pos);
2010-08-11 16:02:08 +00:00
}
2011-04-18 14:14:16 +00:00
_PAIRING_HEAP_FUNCTIONS(static,logger,tr_cmp);
2010-08-11 16:02:08 +00:00
2011-04-08 09:52:46 +00:00
static inline
2011-04-18 14:14:16 +00:00
loff_t *lh_get(struct logger_head *th)
2010-08-20 10:58:24 +00:00
{
2011-04-18 14:14:16 +00:00
return th->lh_pos;
}
2010-08-20 10:58:24 +00:00
2011-04-18 14:14:16 +00:00
QUEUE_FUNCTIONS(logger,struct logger_head,lh_head,lh_get,lh_cmp,logger);
2010-08-20 10:58:24 +00:00
2011-04-18 14:14:16 +00:00
////////////////////////// logger queue handling ////////////////////////
2010-08-20 10:58:24 +00:00
2011-04-08 09:52:46 +00:00
static inline
2011-04-29 09:36:10 +00:00
void qq_init(struct logger_queue *q, struct trans_logger_brick *brick)
2010-08-11 16:02:08 +00:00
{
2011-04-18 14:14:16 +00:00
q_logger_init(q);
2011-06-30 13:15:52 +00:00
q->q_event = &brick->worker_event;
2011-04-29 09:36:10 +00:00
q->q_contention = &brick->fly_count;
q->q_brick = brick;
}
static inline
void qq_inc_flying(struct logger_queue *q)
{
q_logger_inc_flying(q);
}
static inline
void qq_dec_flying(struct logger_queue *q)
{
q_logger_dec_flying(q);
2010-08-11 16:02:08 +00:00
}
2011-04-18 09:23:04 +00:00
static noinline
bool qq_is_ready(struct logger_queue *q)
{
2011-04-20 14:26:44 +00:00
return q_logger_is_ready(q);
2011-04-18 09:23:04 +00:00
}
static inline
2011-04-19 14:46:38 +00:00
void qq_mref_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
2011-04-18 09:23:04 +00:00
{
struct mref_object *mref = mref_a->object;
CHECK_ATOMIC(&mref->ref_count, 1);
atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put()
2011-04-29 09:36:10 +00:00
atomic_inc(&q->q_brick->inner_balance_count);
2011-04-18 09:23:04 +00:00
mars_trace(mref, q->q_insert_info);
2011-04-18 14:14:16 +00:00
q_logger_insert(q, &mref_a->lh);
2011-04-18 09:23:04 +00:00
}
2011-04-18 14:14:16 +00:00
static inline
2011-04-19 14:46:38 +00:00
void qq_wb_insert(struct logger_queue *q, struct writeback_info *wb)
{
q_logger_insert(q, &wb->w_lh);
}
static inline
void qq_mref_pushback(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
2011-04-18 14:14:16 +00:00
{
CHECK_ATOMIC(&mref_a->object->ref_count, 1);
mars_trace(mref_a->object, q->q_pushback_info);
q_logger_pushback(q, &mref_a->lh);
}
static inline
2011-04-19 14:46:38 +00:00
void qq_wb_pushback(struct logger_queue *q, struct writeback_info *wb)
{
q_logger_pushback(q, &wb->w_lh);
}
static inline
struct trans_logger_mref_aspect *qq_mref_fetch(struct logger_queue *q)
2011-04-18 14:14:16 +00:00
{
struct logger_head *test;
struct trans_logger_mref_aspect *mref_a = NULL;
test = q_logger_fetch(q);
if (test) {
mref_a = container_of(test, struct trans_logger_mref_aspect, lh);
CHECK_ATOMIC(&mref_a->object->ref_count, 1);
mars_trace(mref_a->object, q->q_fetch_info);
}
return mref_a;
}
2011-04-18 09:23:04 +00:00
2011-04-19 14:46:38 +00:00
static inline
struct writeback_info *qq_wb_fetch(struct logger_queue *q)
{
struct logger_head *test;
struct writeback_info *res = NULL;
test = q_logger_fetch(q);
if (test) {
res = container_of(test, struct writeback_info, w_lh);
}
return res;
}
2010-08-08 20:51:20 +00:00
///////////////////////// own helper functions ////////////////////////
2011-04-08 09:52:46 +00:00
static inline
2011-04-12 15:31:08 +00:00
int hash_fn(loff_t pos)
2010-08-08 20:51:20 +00:00
{
// simple and stupid
2011-06-30 13:15:52 +00:00
long base_index = (long)pos >> REGION_SIZE_BITS;
base_index += base_index / TRANS_HASH_MAX / 7;
2011-04-12 15:31:08 +00:00
return base_index % TRANS_HASH_MAX;
2010-08-08 20:51:20 +00:00
}
2011-04-12 15:31:08 +00:00
static inline
struct trans_logger_mref_aspect *_hash_find(struct list_head *start, loff_t pos, int *max_len, bool use_collect_head)
2010-08-08 20:51:20 +00:00
{
struct list_head *tmp;
2010-12-15 12:13:18 +00:00
struct trans_logger_mref_aspect *res = NULL;
2011-03-31 16:16:00 +00:00
int len = *max_len;
2011-06-30 13:15:52 +00:00
#ifdef HASH_DEBUGGING
2011-04-01 11:18:32 +00:00
int count = 0;
#endif
2011-04-12 15:31:08 +00:00
2011-03-31 16:16:00 +00:00
/* The lists are always sorted according to age (newest first).
2010-08-10 17:39:30 +00:00
* Caution: there may be duplicates in the list, some of them
* overlapping with the search area in many different ways.
2010-08-08 20:51:20 +00:00
*/
2011-04-12 15:31:08 +00:00
for (tmp = start->next; tmp != start; tmp = tmp->next) {
2011-04-01 11:18:32 +00:00
struct trans_logger_mref_aspect *test_a;
struct mref_object *test;
int diff;
2011-06-30 13:15:52 +00:00
#ifdef HASH_DEBUGGING
2010-08-08 20:51:20 +00:00
static int max = 0;
if (++count > max) {
max = count;
2011-06-30 13:15:52 +00:00
if (!(max % 100)) {
MARS_INF("hash max=%d hash=%d (pos=%lld)\n", max, hash_fn(pos), pos);
2010-08-08 20:51:20 +00:00
}
}
#endif
2011-04-12 15:31:08 +00:00
if (use_collect_head) {
test_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head);
} else {
test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head);
}
2010-08-08 20:51:20 +00:00
test = test_a->object;
2011-04-12 15:31:08 +00:00
CHECK_ATOMIC(&test->ref_count, 1);
2011-04-01 11:18:32 +00:00
2010-08-09 16:57:56 +00:00
// are the regions overlapping?
2011-04-12 15:31:08 +00:00
if (pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) {
2011-04-01 11:18:32 +00:00
continue; // not relevant
}
2011-04-12 15:31:08 +00:00
2011-04-01 11:18:32 +00:00
diff = test->ref_pos - pos;
if (diff <= 0) {
int restlen = test->ref_len + diff;
res = test_a;
2011-03-31 16:16:00 +00:00
if (restlen < len) {
len = restlen;
2010-08-08 20:51:20 +00:00
}
2011-04-01 11:18:32 +00:00
break;
}
if (diff < len) {
len = diff;
2010-08-08 20:51:20 +00:00
}
}
2011-04-12 15:31:08 +00:00
*max_len = len;
return res;
}
static noinline
2011-04-29 09:36:10 +00:00
struct trans_logger_mref_aspect *hash_find(struct trans_logger_brick *brick, loff_t pos, int *max_len)
2011-04-12 15:31:08 +00:00
{
2011-04-29 09:36:10 +00:00
2011-04-12 15:31:08 +00:00
int hash = hash_fn(pos);
2011-04-29 09:36:10 +00:00
struct hash_anchor *start = &brick->hash_table[hash];
2011-04-12 15:31:08 +00:00
struct trans_logger_mref_aspect *res;
2011-06-30 13:15:52 +00:00
//unsigned int flags;
2011-04-12 15:31:08 +00:00
2011-06-30 13:15:52 +00:00
//traced_readlock(&start->hash_lock, flags);
down_read(&start->hash_mutex);
2011-04-12 15:31:08 +00:00
res = _hash_find(&start->hash_anchor, pos, max_len, false);
2011-06-30 13:15:52 +00:00
//traced_readunlock(&start->hash_lock, flags);
up_read(&start->hash_mutex);
2010-08-08 20:51:20 +00:00
return res;
}
2011-04-08 09:52:46 +00:00
static noinline
2011-04-29 09:36:10 +00:00
void hash_insert(struct trans_logger_brick *brick, struct trans_logger_mref_aspect *elem_a)
2010-08-08 20:51:20 +00:00
{
2011-04-12 15:31:08 +00:00
int hash = hash_fn(elem_a->object->ref_pos);
2011-04-29 09:36:10 +00:00
struct hash_anchor *start = &brick->hash_table[hash];
2011-06-30 13:15:52 +00:00
//unsigned int flags;
2010-08-08 20:51:20 +00:00
2010-08-20 10:58:24 +00:00
#if 1
CHECK_HEAD_EMPTY(&elem_a->hash_head);
2011-04-15 10:13:22 +00:00
CHECK_ATOMIC(&elem_a->object->ref_count, 1);
2010-08-20 10:58:24 +00:00
#endif
2011-03-10 11:40:06 +00:00
// only for statistics:
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->hash_count);
2011-03-10 11:40:06 +00:00
2011-06-30 13:15:52 +00:00
//traced_writelock(&start->hash_lock, flags);
down_write(&start->hash_mutex);
2011-03-10 11:40:06 +00:00
2011-04-08 09:52:46 +00:00
list_add(&elem_a->hash_head, &start->hash_anchor);
elem_a->is_hashed = true;
2011-06-30 13:15:52 +00:00
//traced_writeunlock(&start->hash_lock, flags);
up_write(&start->hash_mutex);
2011-04-08 09:52:46 +00:00
}
/* Find the transitive closure of overlapping requests
* and collect them into a list.
*/
static noinline
2011-04-29 09:36:10 +00:00
void hash_extend(struct trans_logger_brick *brick, loff_t *_pos, int *_len, struct list_head *collect_list)
2011-04-08 09:52:46 +00:00
{
loff_t pos = *_pos;
int len = *_len;
2011-04-12 15:31:08 +00:00
int hash = hash_fn(pos);
2011-04-29 09:36:10 +00:00
struct hash_anchor *start = &brick->hash_table[hash];
2011-04-12 15:31:08 +00:00
struct list_head *tmp;
2011-04-08 09:52:46 +00:00
bool extended;
2011-06-30 13:15:52 +00:00
//unsigned int flags;
#ifdef HASH_DEBUGGING
int count = 0;
static int max = 0;
#endif
2011-04-08 09:52:46 +00:00
if (collect_list) {
CHECK_HEAD_EMPTY(collect_list);
}
2011-06-30 13:15:52 +00:00
//traced_readlock(&start->hash_lock, flags);
down_read(&start->hash_mutex);
2011-04-08 09:52:46 +00:00
do {
extended = false;
2011-03-18 13:15:40 +00:00
for (tmp = start->hash_anchor.next; tmp != &start->hash_anchor; tmp = tmp->next) {
2011-04-08 09:52:46 +00:00
struct trans_logger_mref_aspect *test_a;
struct mref_object *test;
2011-04-12 15:31:08 +00:00
loff_t diff;
2011-06-30 13:15:52 +00:00
#ifdef HASH_DEBUGGING
count++;
#endif
2011-04-08 09:52:46 +00:00
2011-03-18 13:15:40 +00:00
test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head);
test = test_a->object;
2011-04-08 09:52:46 +00:00
2011-04-15 10:13:22 +00:00
CHECK_ATOMIC(&test->ref_count, 1);
2011-04-08 09:52:46 +00:00
// are the regions overlapping?
2011-04-12 15:31:08 +00:00
if (test_a->is_collected || pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) {
2011-04-08 09:52:46 +00:00
continue; // not relevant
}
2011-04-12 15:31:08 +00:00
2011-04-08 09:52:46 +00:00
// extend the search region when necessary
2011-04-12 15:31:08 +00:00
diff = pos - test->ref_pos;
if (diff > 0) {
len += diff;
2011-04-08 09:52:46 +00:00
pos = test->ref_pos;
extended = true;
}
2011-04-12 15:31:08 +00:00
diff = (test->ref_pos + test->ref_len) - (pos + len);
if (diff > 0) {
len += diff;
2011-04-08 09:52:46 +00:00
extended = true;
2011-03-18 13:15:40 +00:00
}
}
2011-04-08 09:52:46 +00:00
} while (extended); // start over for transitive closure
2010-08-08 20:51:20 +00:00
2011-04-08 09:52:46 +00:00
*_pos = pos;
*_len = len;
2010-08-08 20:51:20 +00:00
2011-06-30 13:15:52 +00:00
#ifdef HASH_DEBUGGING
if (count > max + 100) {
int i = 0;
max = count;
MARS_INF("iterations max=%d hash=%d (pos=%lld len=%d)\n", max, hash, pos, len);
for (tmp = start->hash_anchor.next; tmp != &start->hash_anchor; tmp = tmp->next) {
struct trans_logger_mref_aspect *test_a;
struct mref_object *test;
test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head);
test = test_a->object;
MARS_INF("%03d pos = %lld len = %d collected = %d\n", i++, test->ref_pos, test->ref_len, test_a->is_collected);
}
MARS_INF("----------------\n");
}
#endif
2011-04-12 15:31:08 +00:00
for (tmp = start->hash_anchor.next; tmp != &start->hash_anchor; tmp = tmp->next) {
struct trans_logger_mref_aspect *test_a;
struct mref_object *test;
test_a = container_of(tmp, struct trans_logger_mref_aspect, hash_head);
test = test_a->object;
// are the regions overlapping?
if (test_a->is_collected || pos >= test->ref_pos + test->ref_len || pos + len <= test->ref_pos) {
continue; // not relevant
}
// collect
CHECK_HEAD_EMPTY(&test_a->collect_head);
test_a->is_collected = true;
2011-04-18 09:23:04 +00:00
CHECK_ATOMIC(&test->ref_count, 1);
2011-04-12 15:31:08 +00:00
list_add_tail(&test_a->collect_head, collect_list);
2010-08-11 16:02:08 +00:00
}
2010-08-08 20:51:20 +00:00
2011-06-30 13:15:52 +00:00
//traced_readunlock(&start->hash_lock, flags);
up_read(&start->hash_mutex);
2010-08-08 20:51:20 +00:00
}
2011-04-11 13:40:06 +00:00
/* Atomically put all elements from the list.
* All elements must reside in the same collision list.
*/
static inline
2011-04-29 09:36:10 +00:00
void hash_put_all(struct trans_logger_brick *brick, struct list_head *list)
2011-04-11 13:40:06 +00:00
{
struct list_head *tmp;
struct hash_anchor *start = NULL;
2011-04-12 15:31:08 +00:00
int first_hash = -1;
2011-06-30 13:15:52 +00:00
//unsigned int flags;
2011-04-11 13:40:06 +00:00
for (tmp = list->next; tmp != list; tmp = tmp->next) {
struct trans_logger_mref_aspect *elem_a;
struct mref_object *elem;
int hash;
2011-04-12 15:31:08 +00:00
elem_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head);
2011-04-11 13:40:06 +00:00
elem = elem_a->object;
2011-04-15 10:13:22 +00:00
CHECK_ATOMIC(&elem->ref_count, 1);
2011-04-12 15:31:08 +00:00
hash = hash_fn(elem->ref_pos);
2011-04-11 13:40:06 +00:00
if (!start) {
2011-04-12 15:31:08 +00:00
first_hash = hash;
2011-04-29 09:36:10 +00:00
start = &brick->hash_table[hash];
2011-06-30 13:15:52 +00:00
//traced_writelock(&start->hash_lock, flags);
down_write(&start->hash_mutex);
2011-04-12 15:31:08 +00:00
} else if (unlikely(hash != first_hash)) {
MARS_ERR("oops, different hashes: %d != %d\n", hash, first_hash);
2011-04-11 13:40:06 +00:00
}
if (!elem_a->is_hashed) {
continue;
}
list_del_init(&elem_a->hash_head);
elem_a->is_hashed = false;
2011-04-29 09:36:10 +00:00
atomic_dec(&brick->hash_count);
2011-04-11 13:40:06 +00:00
}
if (start) {
2011-06-30 13:15:52 +00:00
//traced_writeunlock(&start->hash_lock, flags);
up_write(&start->hash_mutex);
2011-04-11 13:40:06 +00:00
}
}
2010-08-08 20:51:20 +00:00
////////////////// own brick / input / output operations //////////////////
2011-03-29 14:40:40 +00:00
static atomic_t global_mshadow_count = ATOMIC_INIT(0);
2011-04-08 09:52:46 +00:00
static noinline
int trans_logger_get_info(struct trans_logger_output *output, struct mars_info *info)
2010-08-08 20:51:20 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_input *input = output->brick->inputs[TL_INPUT_READ];
2010-08-08 20:51:20 +00:00
return GENERIC_INPUT_CALL(input, mars_get_info, info);
}
2011-04-08 09:52:46 +00:00
static noinline
int _make_sshadow(struct trans_logger_output *output, struct trans_logger_mref_aspect *mref_a, struct trans_logger_mref_aspect *mshadow_a)
{
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick = output->brick;
2011-04-08 09:52:46 +00:00
struct mref_object *mref = mref_a->object;
struct mref_object *mshadow;
int diff;
mshadow = mshadow_a->object;
#if 1
if (unlikely(mref->ref_len > mshadow->ref_len)) {
MARS_ERR("oops %d -> %d\n", mref->ref_len, mshadow->ref_len);
mref->ref_len = mshadow->ref_len;
}
if (unlikely(mshadow_a == mref_a)) {
MARS_ERR("oops %p == %p\n", mshadow_a, mref_a);
return -EINVAL;
}
#endif
diff = mref->ref_pos - mshadow->ref_pos;
#if 1
if (unlikely(diff < 0)) {
MARS_ERR("oops diff = %d\n", diff);
return -EINVAL;
}
#endif
/* Attach mref to the existing shadow ("slave shadow").
*/
mref_a->shadow_data = mshadow_a->shadow_data + diff;
mref_a->do_dealloc = false;
if (!mref->ref_data) { // buffered IO
mref->ref_data = mref_a->shadow_data;
mref_a->do_buffered = true;
}
mref->ref_flags = mshadow->ref_flags;
mref_a->shadow_ref = mshadow_a;
mref_a->my_brick = brick;
2011-04-14 14:21:26 +00:00
2011-04-18 09:23:04 +00:00
/* Get an ordinary internal reference
*/
2011-04-14 14:21:26 +00:00
atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put()
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->inner_balance_count);
2011-04-14 14:21:26 +00:00
2011-04-18 09:23:04 +00:00
/* Get an additional internal reference from slave to master,
* such that the master cannot go away before the slave.
*/
atomic_inc(&mshadow->ref_count); // is compensated by master transition in __trans_logger_ref_put()
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->inner_balance_count);
2011-04-18 09:23:04 +00:00
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->sshadow_count);
atomic_inc(&brick->total_sshadow_count);
2011-04-08 09:52:46 +00:00
#if 1
if (unlikely(mref->ref_len <= 0)) {
MARS_ERR("oops, len = %d\n", mref->ref_len);
return -EINVAL;
}
#endif
return mref->ref_len;
}
2010-08-20 10:58:24 +00:00
2011-04-08 09:52:46 +00:00
static noinline
int _read_ref_get(struct trans_logger_output *output, struct trans_logger_mref_aspect *mref_a)
2010-08-08 20:51:20 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick = output->brick;
2010-12-15 12:13:18 +00:00
struct mref_object *mref = mref_a->object;
2011-04-29 09:36:10 +00:00
struct trans_logger_input *input = brick->inputs[TL_INPUT_READ];
2011-04-01 11:18:32 +00:00
struct trans_logger_mref_aspect *mshadow_a;
2010-08-08 20:51:20 +00:00
2010-08-11 16:02:08 +00:00
/* Look if there is a newer version on the fly, shadowing
* the old one.
* When a shadow is found, use it as buffer for the mref.
*/
2011-04-29 09:36:10 +00:00
mshadow_a = hash_find(brick, mref->ref_pos, &mref->ref_len);
2011-04-08 09:52:46 +00:00
if (!mshadow_a) {
return GENERIC_INPUT_CALL(input, mref_get, mref);
2010-08-11 16:02:08 +00:00
}
2011-04-14 14:21:26 +00:00
2011-04-08 09:52:46 +00:00
return _make_sshadow(output, mref_a, mshadow_a);
}
2010-08-11 16:02:08 +00:00
2011-04-08 09:52:46 +00:00
static noinline
int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_aspect *mref_a)
2010-08-11 16:02:08 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick = output->brick;
2010-12-15 12:13:18 +00:00
struct mref_object *mref = mref_a->object;
2011-04-08 09:52:46 +00:00
void *data;
2010-08-11 16:02:08 +00:00
2011-04-08 09:52:46 +00:00
#ifdef KEEP_UNIQUE
struct trans_logger_mref_aspect *mshadow_a;
2011-04-29 09:36:10 +00:00
mshadow_a = hash_find(brick, mref->ref_pos, &mref->ref_len);
2011-04-08 09:52:46 +00:00
if (mshadow_a) {
return _make_sshadow(output, mref_a, mshadow_a);
}
#endif
2011-11-14 14:21:15 +00:00
#ifdef DELAY_CALLERS
2011-06-30 13:15:52 +00:00
// delay in case of too many master shadows / memory shortage
wait_event_interruptible_timeout(brick->caller_event, !brick->delay_callers, HZ / 2);
2011-11-14 14:21:15 +00:00
#endif
2011-06-30 13:15:52 +00:00
2011-04-08 09:52:46 +00:00
// create a new master shadow
2011-08-12 11:09:48 +00:00
data = brick_block_alloc(mref->ref_pos, (mref_a->alloc_len = mref->ref_len));
2011-04-08 09:52:46 +00:00
if (unlikely(!data)) {
return -ENOMEM;
}
2011-06-30 13:15:52 +00:00
atomic64_add(mref->ref_len, &brick->shadow_mem_used);
2011-04-08 09:52:46 +00:00
#ifdef CONFIG_DEBUG_KERNEL
memset(data, 0x11, mref->ref_len);
#endif
mref_a->shadow_data = data;
mref_a->do_dealloc = true;
if (!mref->ref_data) {
2011-04-01 11:18:32 +00:00
mref->ref_data = data;
2011-04-08 09:52:46 +00:00
mref_a->do_buffered = true;
2010-08-11 16:02:08 +00:00
}
mref_a->my_brick = brick;
2011-03-31 16:16:00 +00:00
mref->ref_flags = 0;
2011-03-08 16:45:52 +00:00
mref_a->shadow_ref = mref_a; // cyclic self-reference => indicates master shadow
2011-04-14 14:21:26 +00:00
2011-03-08 16:45:52 +00:00
get_lamport(&mref_a->stamp);
2011-04-08 09:52:46 +00:00
#if 1
if (unlikely(mref->ref_len <= 0)) {
MARS_ERR("oops, len = %d\n", mref->ref_len);
return -EINVAL;
}
#endif
2011-04-14 14:21:26 +00:00
atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put()
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->inner_balance_count);
2011-04-14 14:21:26 +00:00
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->mshadow_count);
atomic_inc(&brick->total_mshadow_count);
2011-04-08 09:52:46 +00:00
atomic_inc(&global_mshadow_count);
2010-08-11 16:02:08 +00:00
return mref->ref_len;
}
2011-04-08 09:52:46 +00:00
static noinline
int trans_logger_ref_get(struct trans_logger_output *output, struct mref_object *mref)
2010-08-11 16:02:08 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick = output->brick;
2010-12-15 12:13:18 +00:00
struct trans_logger_mref_aspect *mref_a;
2010-08-27 15:42:10 +00:00
loff_t base_offset;
2010-08-11 16:02:08 +00:00
2010-08-23 05:06:06 +00:00
CHECK_PTR(output, err);
2011-04-08 09:52:46 +00:00
MARS_IO("pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
if (mref->ref_len > brick->max_mref_size && brick->max_mref_size > 0)
mref->ref_len = brick->max_mref_size;
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->outer_balance_count);
2011-03-10 11:40:06 +00:00
2011-03-08 16:45:52 +00:00
if (atomic_read(&mref->ref_count) > 0) { // setup already performed
2011-04-08 09:52:46 +00:00
MARS_IO("again %d\n", atomic_read(&mref->ref_count));
2011-03-08 16:45:52 +00:00
atomic_inc(&mref->ref_count);
return mref->ref_len;
}
mref_a = trans_logger_mref_get_aspect(brick, mref);
2010-08-23 05:06:06 +00:00
CHECK_PTR(mref_a, err);
CHECK_PTR(mref_a->object, err);
2010-08-11 16:02:08 +00:00
2011-04-12 15:31:08 +00:00
// ensure that REGION_SIZE boundaries are obeyed by hashing
2010-08-27 15:42:10 +00:00
base_offset = mref->ref_pos & (loff_t)(REGION_SIZE - 1);
2011-04-12 15:31:08 +00:00
if (mref->ref_len > REGION_SIZE - base_offset) {
2010-08-08 20:51:20 +00:00
mref->ref_len = REGION_SIZE - base_offset;
2011-04-12 15:31:08 +00:00
}
2010-08-08 20:51:20 +00:00
2010-08-11 16:02:08 +00:00
if (mref->ref_may_write == READ) {
return _read_ref_get(output, mref_a);
2010-08-08 20:51:20 +00:00
}
2011-02-23 20:48:06 +00:00
/* FIXME: THIS IS PROVISIONARY (use event instead)
*/
while (unlikely(!output->brick->power.led_on)) {
2011-03-08 16:45:52 +00:00
msleep(HZ);
2011-02-23 20:48:06 +00:00
}
2010-08-11 16:02:08 +00:00
return _write_ref_get(output, mref_a);
2010-08-23 05:06:06 +00:00
err:
return -EINVAL;
2010-08-08 20:51:20 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
void __trans_logger_ref_put(struct trans_logger_brick *brick, struct trans_logger_mref_aspect *mref_a)
2010-08-08 20:51:20 +00:00
{
2011-04-12 15:31:08 +00:00
struct mref_object *mref;
2010-12-15 12:13:18 +00:00
struct trans_logger_mref_aspect *shadow_a;
2010-08-20 10:58:24 +00:00
struct trans_logger_input *input;
2010-08-11 16:02:08 +00:00
2011-03-08 16:45:52 +00:00
restart:
2011-04-12 15:31:08 +00:00
mref = mref_a->object;
2011-07-15 10:12:06 +00:00
MARS_IO("pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
2010-12-15 11:58:22 +00:00
CHECK_ATOMIC(&mref->ref_count, 1);
2011-04-14 14:21:26 +00:00
// are we a shadow (whether master or slave)?
2010-08-20 10:58:24 +00:00
shadow_a = mref_a->shadow_ref;
if (shadow_a) {
2011-03-08 16:45:52 +00:00
bool finished;
2011-04-12 15:31:08 +00:00
CHECK_ATOMIC(&mref->ref_count, 1);
2011-04-11 13:40:06 +00:00
finished = atomic_dec_and_test(&mref->ref_count);
2011-04-29 09:36:10 +00:00
atomic_dec(&brick->inner_balance_count);
2011-04-12 15:31:08 +00:00
if (unlikely(finished && mref_a->is_hashed)) {
MARS_ERR("trying to put a hashed mref, pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
finished = false; // leaves a memleak
2011-03-08 16:45:52 +00:00
}
2011-04-12 15:31:08 +00:00
2011-03-08 16:45:52 +00:00
if (!finished) {
return;
}
2011-04-15 10:13:22 +00:00
2011-04-18 14:14:16 +00:00
CHECK_HEAD_EMPTY(&mref_a->lh.lh_head);
2011-04-12 15:31:08 +00:00
CHECK_HEAD_EMPTY(&mref_a->hash_head);
CHECK_HEAD_EMPTY(&mref_a->replay_head);
CHECK_HEAD_EMPTY(&mref_a->collect_head);
CHECK_HEAD_EMPTY(&mref_a->sub_list);
CHECK_HEAD_EMPTY(&mref_a->sub_head);
2011-04-14 14:21:26 +00:00
CHECK_HEAD_EMPTY(&mref_a->pos_head);
2011-04-15 10:13:22 +00:00
2010-08-20 10:58:24 +00:00
if (shadow_a != mref_a) { // we are a slave shadow
2011-04-08 09:52:46 +00:00
//MARS_DBG("slave\n");
2011-04-29 09:36:10 +00:00
atomic_dec(&brick->sshadow_count);
2010-08-20 10:58:24 +00:00
CHECK_HEAD_EMPTY(&mref_a->hash_head);
2011-03-08 16:45:52 +00:00
trans_logger_free_mref(mref);
// now put the master shadow
2011-04-12 15:31:08 +00:00
mref_a = shadow_a;
2011-03-08 16:45:52 +00:00
goto restart;
2010-08-20 10:58:24 +00:00
}
2011-03-08 16:45:52 +00:00
// we are a master shadow
2011-04-08 09:52:46 +00:00
CHECK_PTR(mref_a->shadow_data, err);
2011-04-01 11:18:32 +00:00
if (mref_a->do_dealloc) {
2011-08-12 11:09:48 +00:00
brick_block_free(mref_a->shadow_data, mref_a->alloc_len);
2011-06-30 13:15:52 +00:00
atomic64_sub(mref->ref_len, &brick->shadow_mem_used);
2011-04-08 09:52:46 +00:00
mref_a->shadow_data = NULL;
mref_a->do_dealloc = false;
}
if (mref_a->do_buffered) {
2011-04-01 11:18:32 +00:00
mref->ref_data = NULL;
}
2011-04-29 09:36:10 +00:00
atomic_dec(&brick->mshadow_count);
2011-03-29 14:40:40 +00:00
atomic_dec(&global_mshadow_count);
2011-03-08 16:45:52 +00:00
trans_logger_free_mref(mref);
2010-08-11 16:02:08 +00:00
return;
}
2011-04-29 09:36:10 +00:00
// only READ is allowed on non-shadow buffers
if (unlikely(mref->ref_rw != READ)) {
MARS_FAT("bad operation %d on non-shadow\n", mref->ref_rw);
}
// no shadow => call through
input = brick->inputs[TL_INPUT_READ];
2010-12-15 12:13:18 +00:00
GENERIC_INPUT_CALL(input, mref_put, mref);
2011-03-08 16:45:52 +00:00
return;
err:
MARS_FAT("oops\n");
2010-08-08 20:51:20 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
2011-04-12 15:31:08 +00:00
void _trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref)
2011-03-10 11:40:06 +00:00
{
2011-04-12 15:31:08 +00:00
struct trans_logger_mref_aspect *mref_a;
mref_a = trans_logger_mref_get_aspect(output->brick, mref);
2011-04-12 15:31:08 +00:00
CHECK_PTR(mref_a, err);
__trans_logger_ref_put(output->brick, mref_a);
2011-04-12 15:31:08 +00:00
return;
err:
MARS_FAT("giving up...\n");
2011-03-10 11:40:06 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
2011-04-12 15:31:08 +00:00
void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref)
2011-03-10 11:40:06 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick = output->brick;
atomic_dec(&brick->outer_balance_count);
2011-04-12 15:31:08 +00:00
_trans_logger_ref_put(output, mref);
2011-03-10 11:40:06 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
void _trans_logger_endio(struct generic_callback *cb)
2010-08-26 17:12:30 +00:00
{
2010-12-15 12:13:18 +00:00
struct trans_logger_mref_aspect *mref_a;
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick;
2010-12-15 11:58:22 +00:00
2010-08-26 17:12:30 +00:00
mref_a = cb->cb_private;
CHECK_PTR(mref_a, err);
if (unlikely(&mref_a->cb != cb)) {
MARS_FAT("bad callback -- hanging up\n");
goto err;
}
brick = mref_a->my_brick;
2011-04-29 09:36:10 +00:00
CHECK_PTR(brick, err);
2010-08-26 17:12:30 +00:00
NEXT_CHECKED_CALLBACK(cb, err);
2011-03-10 11:40:06 +00:00
2011-04-29 09:36:10 +00:00
atomic_dec(&brick->fly_count);
atomic_inc(&brick->total_cb_count);
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-04-29 09:36:10 +00:00
return;
2011-03-10 11:40:06 +00:00
2011-04-29 09:36:10 +00:00
err:
MARS_FAT("cannot handle callback\n");
2010-08-26 17:12:30 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object *mref)
2010-08-08 20:51:20 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick = output->brick;
2010-12-15 12:13:18 +00:00
struct trans_logger_mref_aspect *mref_a;
2011-04-01 11:18:32 +00:00
struct trans_logger_mref_aspect *shadow_a;
2011-04-29 09:36:10 +00:00
struct trans_logger_input *input;
2010-08-11 16:02:08 +00:00
2010-08-20 10:58:24 +00:00
CHECK_ATOMIC(&mref->ref_count, 1);
mref_a = trans_logger_mref_get_aspect(brick, mref);
2010-08-23 05:06:06 +00:00
CHECK_PTR(mref_a, err);
2010-08-11 16:02:08 +00:00
2011-04-08 09:52:46 +00:00
MARS_IO("pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
2011-03-18 13:15:40 +00:00
// statistics
if (mref->ref_rw) {
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->total_write_count);
2011-03-18 13:15:40 +00:00
} else {
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->total_read_count);
2011-03-18 13:15:40 +00:00
}
2010-08-11 16:02:08 +00:00
// is this a shadow buffer?
2011-04-01 11:18:32 +00:00
shadow_a = mref_a->shadow_ref;
if (shadow_a) {
2010-08-20 10:58:24 +00:00
#if 1
2011-04-18 14:14:16 +00:00
CHECK_HEAD_EMPTY(&mref_a->lh.lh_head);
2011-04-08 09:52:46 +00:00
CHECK_HEAD_EMPTY(&mref_a->hash_head);
CHECK_HEAD_EMPTY(&mref_a->pos_head);
2011-04-01 11:18:32 +00:00
#endif
2011-04-14 14:21:26 +00:00
atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put()
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->inner_balance_count);
2011-04-08 09:52:46 +00:00
2011-04-29 09:36:10 +00:00
qq_mref_insert(&brick->q_phase1, mref_a);
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2010-08-11 16:02:08 +00:00
return;
}
// only READ is allowed on non-shadow buffers
2010-12-15 11:58:22 +00:00
if (unlikely(mref->ref_rw != READ)) {
2011-04-08 09:52:46 +00:00
MARS_FAT("bad operation %d on non-shadow\n", mref->ref_rw);
2010-08-11 16:02:08 +00:00
}
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->fly_count);
2011-03-10 11:40:06 +00:00
mref_a->my_brick = brick;
INSERT_CALLBACK(mref, &mref_a->cb, _trans_logger_endio, mref_a);
2010-08-26 17:12:30 +00:00
2011-04-29 09:36:10 +00:00
input = output->brick->inputs[TL_INPUT_READ];
2010-12-15 12:13:18 +00:00
GENERIC_INPUT_CALL(input, mref_io, mref);
2011-04-08 09:52:46 +00:00
return;
err:
MARS_FAT("cannot handle IO\n");
2010-08-08 20:51:20 +00:00
}
2011-04-10 16:59:06 +00:00
////////////////////////////// writeback info //////////////////////////////
2011-05-13 11:19:28 +00:00
/* save final completion status when necessary
*/
2011-04-15 10:13:22 +00:00
static noinline
void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
{
struct trans_logger_brick *brick = orig_mref_a->my_brick;
struct trans_logger_input *log_input = orig_mref_a->log_input;
2011-04-15 10:13:22 +00:00
struct list_head *tmp;
unsigned long flags;
CHECK_PTR(log_input, err);
2011-05-13 11:19:28 +00:00
atomic_inc(&brick->total_writeback_count);
2011-04-15 10:13:22 +00:00
tmp = &orig_mref_a->pos_head;
2011-05-13 11:19:28 +00:00
2011-11-16 11:02:11 +00:00
traced_lock(&log_input->pos_lock, flags);
2011-05-13 11:19:28 +00:00
// am I the first member? (means "youngest" list entry)
if (tmp == log_input->pos_list.next) {
if (unlikely(!log_input)) {
2011-05-19 11:36:00 +00:00
MARS_ERR("cannot tell what input I am operating on\n");
} else {
loff_t finished = orig_mref_a->log_pos;
MARS_DBG("finished = %lld\n", finished);
if (finished <= log_input->replay_min_pos) {
MARS_ERR("backskip in log replay: %lld -> %lld\n", log_input->replay_min_pos, orig_mref_a->log_pos);
2011-05-19 11:36:00 +00:00
}
log_input->replay_min_pos = finished;
2011-05-19 11:36:00 +00:00
}
} else {
struct trans_logger_mref_aspect *prev_mref_a;
prev_mref_a = container_of(tmp->prev, struct trans_logger_mref_aspect, pos_head);
if (orig_mref_a->log_pos <= prev_mref_a->log_pos) {
MARS_ERR("backskip: %lld -> %lld\n", orig_mref_a->log_pos, prev_mref_a->log_pos);
} else {
/* Transitively transfer log_pos to the predecessor
2011-05-19 11:36:00 +00:00
* to correctly reflect the committed region.
*/
prev_mref_a->log_pos = orig_mref_a->log_pos;
2011-04-15 10:13:22 +00:00
}
}
list_del_init(tmp);
2011-05-13 11:19:28 +00:00
atomic_dec(&brick->pos_count);
2011-11-16 11:02:11 +00:00
traced_unlock(&log_input->pos_lock, flags);
err:;
2011-04-15 10:13:22 +00:00
}
2011-04-18 14:14:16 +00:00
static inline
void _free_one(struct list_head *tmp)
{
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
list_del_init(tmp);
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
trans_logger_free_mref(sub_mref);
}
2011-04-10 16:59:06 +00:00
static noinline
void free_writeback(struct writeback_info *wb)
{
2011-04-11 13:40:06 +00:00
struct list_head *tmp;
int cleanup_count = 0;
2011-04-11 13:40:06 +00:00
2011-04-18 14:14:16 +00:00
if (unlikely(wb->w_error < 0)) {
MARS_ERR("writeback error = %d at pos = %lld len = %d, writeback is incomplete\n", wb->w_error, wb->w_pos, wb->w_len);
}
/* The sub_read and sub_write lists are usually empty here.
* This code is only for cleanup in case of errors.
*/
while (unlikely((tmp = wb->w_sub_read_list.next) != &wb->w_sub_read_list)) {
cleanup_count++;
2011-04-18 14:14:16 +00:00
_free_one(tmp);
}
while (unlikely((tmp = wb->w_sub_write_list.next) != &wb->w_sub_write_list)) {
cleanup_count++;
2011-04-18 14:14:16 +00:00
_free_one(tmp);
}
/* Now complete the original requests.
*/
2011-04-11 13:40:06 +00:00
while ((tmp = wb->w_collect_list.next) != &wb->w_collect_list) {
struct trans_logger_mref_aspect *orig_mref_a;
struct mref_object *orig_mref;
list_del_init(tmp);
orig_mref_a = container_of(tmp, struct trans_logger_mref_aspect, collect_head);
orig_mref = orig_mref_a->object;
CHECK_ATOMIC(&orig_mref->ref_count, 1);
2011-08-12 11:09:48 +00:00
#if 1
while (!orig_mref_a->is_completed) {
MARS_ERR("request %lld (len = %d) was not completed, cleanup_count = %d\n", orig_mref->ref_pos, orig_mref->ref_len, cleanup_count);
msleep(10000);
2011-08-12 11:09:48 +00:00
}
#endif
2011-04-18 14:14:16 +00:00
if (likely(wb->w_error >= 0)) {
pos_complete(orig_mref_a);
}
2011-04-15 10:13:22 +00:00
__trans_logger_ref_put(orig_mref_a->my_brick, orig_mref_a);
2011-04-11 13:40:06 +00:00
}
2011-04-18 14:14:16 +00:00
2011-08-12 11:09:48 +00:00
brick_mem_free(wb);
2011-04-10 16:59:06 +00:00
}
2011-04-18 14:14:16 +00:00
/* Generic endio() for writeback_info
*/
2011-04-10 16:59:06 +00:00
static noinline
void wb_endio(struct generic_callback *cb)
{
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick;
2011-04-10 16:59:06 +00:00
struct writeback_info *wb;
int rw;
atomic_t *dec;
2011-05-19 11:36:00 +00:00
void (**_endio)(struct generic_callback *cb);
2011-04-10 16:59:06 +00:00
void (*endio)(struct generic_callback *cb);
sub_mref_a = cb->cb_private;
CHECK_PTR(sub_mref_a, err);
sub_mref = sub_mref_a->object;
CHECK_PTR(sub_mref, err);
wb = sub_mref_a->wb;
CHECK_PTR(wb, err);
brick = wb->w_brick;
2011-04-29 09:36:10 +00:00
CHECK_PTR(brick, err);
2011-04-10 16:59:06 +00:00
2011-04-29 09:36:10 +00:00
atomic_dec(&brick->wb_balance_count);
2011-04-10 16:59:06 +00:00
2011-04-18 14:14:16 +00:00
if (cb->cb_error < 0) {
wb->w_error = cb->cb_error;
}
2011-04-10 16:59:06 +00:00
rw = sub_mref->ref_rw;
dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count;
2011-04-11 13:40:06 +00:00
CHECK_ATOMIC(dec, 1);
if (!atomic_dec_and_test(dec)) {
2011-11-14 14:21:15 +00:00
goto done;
2011-04-10 16:59:06 +00:00
}
2011-05-19 11:36:00 +00:00
_endio = rw ? &wb->write_endio : &wb->read_endio;
endio = *_endio;
*_endio = NULL;
2011-04-18 14:14:16 +00:00
if (likely(endio)) {
2011-04-10 16:59:06 +00:00
endio(cb);
2011-05-19 11:36:00 +00:00
} else {
MARS_ERR("internal: no endio defined\n");
2011-04-10 16:59:06 +00:00
}
2011-11-14 14:21:15 +00:00
done:
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-04-10 16:59:06 +00:00
return;
err:
MARS_FAT("hanging up....\n");
}
2011-04-18 14:14:16 +00:00
/* Atomically create writeback info, based on "snapshot" of current hash
* state.
* Notice that the hash can change during writeback IO, thus we need
* struct writeback_info to precisely catch that information at a single
* point in time.
*/
2011-04-10 16:59:06 +00:00
static noinline
struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t pos, int len, struct trans_logger_input *log_input)
2011-04-10 16:59:06 +00:00
{
2011-05-13 11:19:28 +00:00
struct writeback_info *wb;
2011-04-29 09:36:10 +00:00
struct trans_logger_input *read_input;
struct trans_logger_input *write_input;
int write_input_nr;
2011-05-13 11:19:28 +00:00
/* Allocate structure representing a bunch of adjacent writebacks
*/
2011-08-12 11:09:48 +00:00
wb = brick_zmem_alloc(sizeof(struct writeback_info));
2011-04-10 16:59:06 +00:00
if (!wb) {
goto err;
}
2011-04-12 15:31:08 +00:00
if (unlikely(len < 0)) {
MARS_ERR("len = %d\n", len);
}
2011-04-10 16:59:06 +00:00
wb->w_brick = brick;
2011-04-19 14:46:38 +00:00
wb->w_pos = pos;
wb->w_len = len;
wb->w_lh.lh_pos = &wb->w_pos;
INIT_LIST_HEAD(&wb->w_lh.lh_head);
INIT_LIST_HEAD(&wb->w_collect_list);
INIT_LIST_HEAD(&wb->w_sub_read_list);
INIT_LIST_HEAD(&wb->w_sub_write_list);
2011-04-18 14:14:16 +00:00
/* Atomically fetch transitive closure on all requests
* overlapping with the current search region.
*/
2011-04-29 09:36:10 +00:00
hash_extend(brick, &wb->w_pos, &wb->w_len, &wb->w_collect_list);
2011-04-10 16:59:06 +00:00
pos = wb->w_pos;
len = wb->w_len;
2011-04-12 15:31:08 +00:00
if (unlikely(len < 0)) {
MARS_ERR("len = %d\n", len);
}
2011-05-13 11:19:28 +00:00
/* Determine the "channels" we want to operate on
*/
2011-04-29 09:36:10 +00:00
read_input = brick->inputs[TL_INPUT_READ];
write_input_nr = TL_INPUT_WRITEBACK;
write_input = brick->inputs[write_input_nr];
if (!write_input->connect) {
write_input_nr = TL_INPUT_READ;
write_input = read_input;
}
2011-04-18 14:14:16 +00:00
/* Create sub_mrefs for read of old disk version (phase2)
*/
if (brick->log_reads) {
while (len > 0) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
int this_len;
int status;
sub_mref = trans_logger_alloc_mref(brick, &read_input->sub_layout);
2011-04-18 14:14:16 +00:00
if (unlikely(!sub_mref)) {
MARS_FAT("cannot alloc sub_mref\n");
goto err;
}
sub_mref->ref_pos = pos;
sub_mref->ref_len = len;
sub_mref->ref_may_write = READ;
sub_mref->ref_rw = READ;
sub_mref->ref_data = NULL;
sub_mref_a = trans_logger_mref_get_aspect(brick, sub_mref);
2011-04-18 14:14:16 +00:00
CHECK_PTR(sub_mref_a, err);
2011-04-29 09:36:10 +00:00
sub_mref_a->my_input = read_input;
sub_mref_a->log_input = log_input;
sub_mref_a->my_brick = brick;
2011-04-18 14:14:16 +00:00
sub_mref_a->wb = wb;
2011-04-29 09:36:10 +00:00
status = GENERIC_INPUT_CALL(read_input, mref_get, sub_mref);
2011-04-18 14:14:16 +00:00
if (unlikely(status < 0)) {
MARS_FAT("cannot get sub_ref, status = %d\n", status);
goto err;
}
list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_read_list);
atomic_inc(&wb->w_sub_read_count);
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->wb_balance_count);
2011-04-18 14:14:16 +00:00
this_len = sub_mref->ref_len;
pos += this_len;
len -= this_len;
}
/* Re-init for startover
*/
pos = wb->w_pos;
len = wb->w_len;
}
2011-05-13 11:19:28 +00:00
/* Always create sub_mrefs for writeback (phase4)
2011-04-18 14:14:16 +00:00
*/
2011-04-10 16:59:06 +00:00
while (len > 0) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
2011-05-13 11:19:28 +00:00
struct trans_logger_mref_aspect *orig_mref_a;
struct mref_object *orig_mref;
2011-04-10 16:59:06 +00:00
void *data;
int this_len = len;
int diff;
int status;
2011-05-13 11:19:28 +00:00
orig_mref_a = _hash_find(&wb->w_collect_list, pos, &this_len, true);
if (unlikely(!orig_mref_a)) {
2011-04-10 16:59:06 +00:00
MARS_FAT("could not find data\n");
goto err;
}
2011-05-13 11:19:28 +00:00
orig_mref = orig_mref_a->object;
diff = pos - orig_mref->ref_pos;
2011-04-10 16:59:06 +00:00
if (unlikely(diff < 0)) {
MARS_FAT("bad diff %d\n", diff);
goto err;
}
2011-05-13 11:19:28 +00:00
data = orig_mref_a->shadow_data + diff;
2011-04-10 16:59:06 +00:00
sub_mref = trans_logger_alloc_mref(brick, &write_input->sub_layout);
2011-04-10 16:59:06 +00:00
if (unlikely(!sub_mref)) {
MARS_FAT("cannot alloc sub_mref\n");
goto err;
}
sub_mref->ref_pos = pos;
sub_mref->ref_len = this_len;
sub_mref->ref_may_write = WRITE;
sub_mref->ref_rw = WRITE;
2011-04-11 13:40:06 +00:00
#ifdef WB_COPY
sub_mref->ref_data = NULL;
#else
2011-04-10 16:59:06 +00:00
sub_mref->ref_data = data;
2011-04-11 13:40:06 +00:00
#endif
2011-04-10 16:59:06 +00:00
sub_mref_a = trans_logger_mref_get_aspect(brick, sub_mref);
2011-04-10 16:59:06 +00:00
CHECK_PTR(sub_mref_a, err);
2011-05-13 11:19:28 +00:00
sub_mref_a->orig_mref_a = orig_mref_a;
2011-04-29 09:36:10 +00:00
sub_mref_a->my_input = write_input;
2011-05-13 11:19:28 +00:00
sub_mref_a->log_input = log_input;
sub_mref_a->my_brick = brick;
2011-04-10 16:59:06 +00:00
sub_mref_a->wb = wb;
2011-04-29 09:36:10 +00:00
status = GENERIC_INPUT_CALL(write_input, mref_get, sub_mref);
2011-04-10 16:59:06 +00:00
if (unlikely(status < 0)) {
MARS_FAT("cannot get sub_ref, status = %d\n", status);
goto err;
}
2011-04-11 13:40:06 +00:00
#ifdef WB_COPY
memcpy(sub_mref->ref_data, data, sub_mref->ref_len);
#endif
2011-04-10 16:59:06 +00:00
list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_write_list);
atomic_inc(&wb->w_sub_write_count);
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->wb_balance_count);
2011-04-10 16:59:06 +00:00
this_len = sub_mref->ref_len;
pos += this_len;
len -= this_len;
}
2011-04-12 15:31:08 +00:00
2011-04-10 16:59:06 +00:00
return wb;
err:
2011-04-11 13:40:06 +00:00
MARS_ERR("cleaning up...\n");
2011-04-10 16:59:06 +00:00
if (wb) {
free_writeback(wb);
}
return NULL;
}
2011-04-19 14:46:38 +00:00
static inline
2011-05-13 11:19:28 +00:00
void _fire_one(struct list_head *tmp, bool do_update, bool do_put)
2011-04-19 14:46:38 +00:00
{
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
2011-04-29 09:36:10 +00:00
struct trans_logger_input *sub_input;
2011-05-13 11:19:28 +00:00
struct trans_logger_input *log_input;
2011-04-19 14:46:38 +00:00
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
SETUP_CALLBACK(sub_mref, wb_endio, sub_mref_a);
2011-04-19 14:46:38 +00:00
2011-04-29 09:36:10 +00:00
sub_input = sub_mref_a->my_input;
2011-05-13 11:19:28 +00:00
log_input = sub_mref_a->log_input;
if (do_update) {
struct trans_logger_mref_aspect *orig_mref_a = sub_mref_a->orig_mref_a;
if (unlikely(!orig_mref_a)) {
MARS_ERR("internal problem\n");
} else {
loff_t max_pos = orig_mref_a->log_pos;
if (log_input->replay_max_pos < max_pos) {
log_input->replay_max_pos = max_pos;
}
}
}
2011-04-29 09:36:10 +00:00
2011-06-30 13:15:52 +00:00
#ifdef DO_WRITEBACK
2011-04-19 14:46:38 +00:00
GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref);
2011-06-30 13:15:52 +00:00
#else
wb_endio(cb);
#endif
2011-04-19 14:46:38 +00:00
if (do_put) {
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
}
}
2011-04-29 09:36:10 +00:00
static inline
2011-05-19 11:36:00 +00:00
void fire_writeback(struct list_head *start, bool do_update, bool do_remove)
2011-04-10 16:59:06 +00:00
{
struct list_head *tmp;
2011-04-19 14:46:38 +00:00
if (do_remove) {
2011-05-19 11:36:00 +00:00
/* Caution! The wb structure may get deallocated
* during _fire_one() in some cases (e.g. when the
* callback is directly called by the mref_io operation).
* Ensure that no ptr dereferencing can take
* place after working on the last list member.
*/
tmp = start->next;
while (tmp != start) {
struct list_head *next;
list_del_init(tmp);
next = start->next;
_fire_one(tmp, do_update, true);
tmp = next;
}
2011-04-19 14:46:38 +00:00
} else {
for (tmp = start->next; tmp != start; tmp = tmp->next) {
2011-05-13 11:19:28 +00:00
_fire_one(tmp, do_update, false);
2011-04-19 14:46:38 +00:00
}
2011-04-10 16:59:06 +00:00
}
}
2011-05-19 11:36:00 +00:00
#if 0 // currently not used
2011-05-13 11:19:28 +00:00
static inline
void put_list(struct writeback_info *wb, struct list_head *start)
{
struct list_head *tmp;
while ((tmp = start->next) != start) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
struct trans_logger_input *sub_input;
list_del_init(tmp);
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
sub_input = sub_mref_a->my_input;
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
}
}
2011-05-19 11:36:00 +00:00
#endif
2011-04-11 13:40:06 +00:00
2010-08-11 16:02:08 +00:00
////////////////////////////// worker thread //////////////////////////////
2010-08-20 10:58:24 +00:00
/*********************************************************************
* Phase 1: write transaction log entry for the original write request.
*/
2011-05-26 14:32:32 +00:00
2011-04-08 09:52:46 +00:00
static noinline
2011-05-26 14:32:32 +00:00
void _complete(struct trans_logger_brick *brick, struct trans_logger_mref_aspect *orig_mref_a, int error, bool pre_io)
2010-08-11 16:02:08 +00:00
{
2010-12-15 12:13:18 +00:00
struct mref_object *orig_mref;
2010-08-20 10:58:24 +00:00
orig_mref = orig_mref_a->object;
2010-08-23 05:06:06 +00:00
CHECK_PTR(orig_mref, err);
2010-08-11 16:02:08 +00:00
2011-05-26 14:32:32 +00:00
if (orig_mref_a->is_completed ||
(pre_io &&
(brick->completion_semantics >= 2 ||
(brick->completion_semantics >= 1 && !orig_mref->ref_skip_sync)))) {
goto done;
}
2011-04-29 09:36:10 +00:00
if (likely(error >= 0)) {
2010-12-15 12:13:18 +00:00
orig_mref->ref_flags &= ~MREF_WRITING;
orig_mref->ref_flags |= MREF_UPTODATE;
2010-08-20 10:58:24 +00:00
}
CHECKED_CALLBACK(orig_mref, error, err);
2011-05-26 14:32:32 +00:00
orig_mref_a->is_completed = true;
2010-08-11 16:02:08 +00:00
2011-05-26 14:32:32 +00:00
done:
return;
err:
MARS_ERR("giving up...\n");
}
static noinline
void phase1_preio(void *private)
{
struct trans_logger_mref_aspect *orig_mref_a;
struct trans_logger_brick *brick;
orig_mref_a = private;
CHECK_PTR(orig_mref_a, err);
brick = orig_mref_a->my_brick;
2011-05-26 14:32:32 +00:00
CHECK_PTR(brick, err);
// signal completion to the upper layer
// FIXME: immediate error signalling is impossible here, but some delayed signalling should be possible as a workaround. Think!
2011-08-12 11:09:48 +00:00
CHECK_ATOMIC(&orig_mref_a->object->ref_count, 1);
2011-08-25 10:16:32 +00:00
#ifdef REFCOUNT_BUG // FIXME!!!
2011-05-26 14:32:32 +00:00
_complete(brick, orig_mref_a, 0, true);
2011-08-12 11:09:48 +00:00
CHECK_ATOMIC(&orig_mref_a->object->ref_count, 1);
#endif
2011-05-26 14:32:32 +00:00
return;
err:
MARS_ERR("giving up...\n");
}
static noinline
void phase1_endio(void *private, int error)
{
struct trans_logger_mref_aspect *orig_mref_a;
struct trans_logger_brick *brick;
orig_mref_a = private;
CHECK_PTR(orig_mref_a, err);
brick = orig_mref_a->my_brick;
2011-05-26 14:32:32 +00:00
CHECK_PTR(brick, err);
qq_dec_flying(&brick->q_phase1);
2011-07-20 13:11:44 +00:00
/* Queue up for the next phase.
* This will pin mref->ref_count so it can't go away
* after _complete().
*/
qq_mref_insert(&brick->q_phase2, orig_mref_a);
2011-05-26 14:32:32 +00:00
// signal completion to the upper layer
_complete(brick, orig_mref_a, error, false);
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-04-12 15:31:08 +00:00
return;
err:
MARS_ERR("giving up...\n");
2010-08-11 16:02:08 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a)
2010-08-11 16:02:08 +00:00
{
2010-12-15 12:13:18 +00:00
struct mref_object *orig_mref;
2010-12-15 11:58:22 +00:00
struct trans_logger_brick *brick;
struct trans_logger_input *input;
2011-04-29 09:36:10 +00:00
struct log_status *logst;
2010-08-20 10:58:24 +00:00
void *data;
2011-03-20 17:38:08 +00:00
unsigned long flags;
2010-08-20 10:58:24 +00:00
bool ok;
2010-08-23 05:06:06 +00:00
CHECK_PTR(orig_mref_a, err);
orig_mref = orig_mref_a->object;
CHECK_PTR(orig_mref, err);
brick = orig_mref_a->my_brick;
2010-12-15 11:58:22 +00:00
CHECK_PTR(brick, err);
input = brick->inputs[brick->log_input_nr];
CHECK_PTR(input, err);
orig_mref_a->log_input = input;
logst = &input->logst;
2010-08-23 05:06:06 +00:00
{
struct log_header l = {
.l_stamp = orig_mref_a->stamp,
.l_pos = orig_mref->ref_pos,
.l_len = orig_mref->ref_len,
.l_code = CODE_WRITE_NEW,
};
2011-04-29 09:36:10 +00:00
data = log_reserve(logst, &l);
2010-08-23 05:06:06 +00:00
}
2010-08-20 10:58:24 +00:00
if (unlikely(!data)) {
2010-08-23 05:06:06 +00:00
goto err;
2010-08-20 10:58:24 +00:00
}
2011-04-08 09:52:46 +00:00
memcpy(data, orig_mref_a->shadow_data, orig_mref->ref_len);
2010-08-20 10:58:24 +00:00
2011-05-26 14:32:32 +00:00
ok = log_finalize(logst, orig_mref->ref_len, phase1_preio, phase1_endio, orig_mref_a);
2010-08-20 10:58:24 +00:00
if (unlikely(!ok)) {
2010-08-23 05:06:06 +00:00
goto err;
2010-08-20 10:58:24 +00:00
}
2011-04-29 09:36:10 +00:00
orig_mref_a->log_pos = logst->log_pos + logst->offset;
2011-03-20 17:38:08 +00:00
2011-11-16 11:02:11 +00:00
traced_lock(&input->pos_lock, flags);
2011-05-13 11:19:28 +00:00
#if 1
if (!list_empty(&input->pos_list)) {
2011-05-13 11:19:28 +00:00
struct trans_logger_mref_aspect *last_mref_a;
last_mref_a = container_of(input->pos_list.prev, struct trans_logger_mref_aspect, pos_head);
2011-05-13 11:19:28 +00:00
if (last_mref_a->log_pos >= orig_mref_a->log_pos) {
MARS_ERR("backskip in pos_list, %lld >= %lld\n", last_mref_a->log_pos, orig_mref_a->log_pos);
}
}
#endif
list_add_tail(&orig_mref_a->pos_head, &input->pos_list);
2011-05-13 11:19:28 +00:00
atomic_inc(&brick->pos_count);
2011-11-16 11:02:11 +00:00
traced_unlock(&input->pos_lock, flags);
2011-02-23 20:48:06 +00:00
2011-04-29 09:36:10 +00:00
qq_inc_flying(&brick->q_phase1);
2010-08-20 10:58:24 +00:00
return true;
2010-08-23 05:06:06 +00:00
err:
return false;
2010-08-20 10:58:24 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
bool phase0_startio(struct trans_logger_mref_aspect *mref_a)
{
struct mref_object *mref = mref_a->object;
struct trans_logger_mref_aspect *shadow_a;
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick;
2011-04-08 09:52:46 +00:00
CHECK_PTR(mref, err);
shadow_a = mref_a->shadow_ref;
CHECK_PTR(shadow_a, err);
brick = mref_a->my_brick;
2011-04-29 09:36:10 +00:00
CHECK_PTR(brick, err);
2011-04-08 09:52:46 +00:00
MARS_IO("pos = %lld len = %d rw = %d\n", mref->ref_pos, mref->ref_len, mref->ref_rw);
if (mref->ref_rw == READ) {
// nothing to do: directly signal success.
struct mref_object *shadow = shadow_a->object;
if (unlikely(shadow == mref)) {
MARS_ERR("oops, we should be a slave shadow, but are a master one\n");
}
#ifdef USE_MEMCPY
if (mref_a->shadow_data != mref->ref_data) {
if (unlikely(mref->ref_len <= 0 || mref->ref_len > PAGE_SIZE)) {
MARS_ERR("implausible ref_len = %d\n", mref->ref_len);
}
MARS_IO("read memcpy to = %p from = %p len = %d\n", mref->ref_data, mref_a->shadow_data, mref->ref_len);
memcpy(mref->ref_data, mref_a->shadow_data, mref->ref_len);
}
#endif
mref->ref_flags |= MREF_UPTODATE;
CHECKED_CALLBACK(mref, 0, err);
2011-04-08 09:52:46 +00:00
__trans_logger_ref_put(brick, mref_a);
2011-04-08 09:52:46 +00:00
return true;
}
// else WRITE
#if 1
2011-04-18 14:14:16 +00:00
CHECK_HEAD_EMPTY(&mref_a->lh.lh_head);
2011-04-08 09:52:46 +00:00
CHECK_HEAD_EMPTY(&mref_a->hash_head);
if (unlikely(mref->ref_flags & (MREF_READING | MREF_WRITING))) {
MARS_ERR("bad flags %d\n", mref->ref_flags);
}
#endif
/* In case of non-buffered IO, the buffer is
* under control of the user. In particular, he
* may change it without telling us.
* Therefore we make a copy (or "snapshot") here.
*/
mref->ref_flags |= MREF_WRITING;
#ifdef USE_MEMCPY
if (mref_a->shadow_data != mref->ref_data) {
if (unlikely(mref->ref_len <= 0 || mref->ref_len > PAGE_SIZE)) {
MARS_ERR("implausible ref_len = %d\n", mref->ref_len);
}
MARS_IO("write memcpy to = %p from = %p len = %d\n", mref_a->shadow_data, mref->ref_data, mref->ref_len);
memcpy(mref_a->shadow_data, mref->ref_data, mref->ref_len);
}
#endif
mref_a->is_dirty = true;
mref_a->shadow_ref->is_dirty = true;
#ifndef KEEP_UNIQUE
if (unlikely(mref_a->shadow_ref != mref_a)) {
MARS_ERR("something is wrong: %p != %p\n", mref_a->shadow_ref, mref_a);
}
#endif
if (!mref_a->is_hashed) {
MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
2011-04-29 09:36:10 +00:00
hash_insert(brick, mref_a);
2011-04-08 09:52:46 +00:00
}
return phase1_startio(mref_a);
err:
MARS_ERR("cannot work\n");
msleep(1000);
return false;
}
2010-08-20 10:58:24 +00:00
/*********************************************************************
* Phase 2: read original version of data.
* This happens _after_ phase 1, deliberately.
* We are explicitly dealing with old and new versions.
* The new version is hashed in memory all the time (such that parallel
2011-03-18 13:15:40 +00:00
* READs will see them), so we have plenty of time for getting the
2010-08-20 10:58:24 +00:00
* old version from disk somewhen later, e.g. when IO contention is low.
*/
2011-04-10 16:59:06 +00:00
static noinline
2011-04-18 14:14:16 +00:00
void phase2_endio(struct generic_callback *cb)
2011-04-10 16:59:06 +00:00
{
struct trans_logger_mref_aspect *sub_mref_a;
struct writeback_info *wb;
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick;
2011-04-10 16:59:06 +00:00
CHECK_PTR(cb, err);
sub_mref_a = cb->cb_private;
CHECK_PTR(sub_mref_a, err);
wb = sub_mref_a->wb;
CHECK_PTR(wb, err);
brick = wb->w_brick;
2011-04-29 09:36:10 +00:00
CHECK_PTR(brick, err);
2011-04-10 16:59:06 +00:00
if (unlikely(cb->cb_error < 0)) {
MARS_FAT("IO error %d\n", cb->cb_error);
goto err;
}
2011-04-29 09:36:10 +00:00
qq_dec_flying(&brick->q_phase2);
2011-04-10 16:59:06 +00:00
2011-04-18 14:14:16 +00:00
// queue up for the next phase
2011-04-29 09:36:10 +00:00
qq_wb_insert(&brick->q_phase3, wb);
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-04-10 16:59:06 +00:00
return;
err:
MARS_FAT("hanging up....\n");
}
2011-04-18 14:14:16 +00:00
static noinline
void phase4_endio(struct generic_callback *cb);
2011-04-19 14:46:38 +00:00
static noinline
bool phase4_startio(struct writeback_info *wb);
2011-04-18 14:14:16 +00:00
2011-04-10 16:59:06 +00:00
static noinline
bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
{
struct mref_object *orig_mref;
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick;
2011-04-10 16:59:06 +00:00
struct writeback_info *wb;
CHECK_PTR(orig_mref_a, err);
orig_mref = orig_mref_a->object;
CHECK_PTR(orig_mref, err);
brick = orig_mref_a->my_brick;
2011-04-29 09:36:10 +00:00
CHECK_PTR(brick, err);
2011-04-10 16:59:06 +00:00
if (orig_mref_a->is_collected) {
MARS_IO("already collected, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len);
goto done;
}
2011-04-11 13:40:06 +00:00
if (!orig_mref_a->is_hashed) {
MARS_IO("AHA not hashed, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len);
goto done;
}
wb = make_writeback(brick, orig_mref->ref_pos, orig_mref->ref_len, orig_mref_a->log_input);
2011-04-11 13:40:06 +00:00
if (unlikely(!wb)) {
2011-04-10 16:59:06 +00:00
goto err;
}
2011-04-12 15:31:08 +00:00
2011-04-11 13:40:06 +00:00
if (unlikely(list_empty(&wb->w_collect_list))) {
2011-04-12 15:31:08 +00:00
MARS_ERR("collection list is empty, orig pos = %lld len = %d (collected=%d), extended pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len, (int)orig_mref_a->is_collected, wb->w_pos, wb->w_len);
2011-04-11 13:40:06 +00:00
free_writeback(wb);
goto done;
}
2011-04-12 15:31:08 +00:00
if (unlikely(list_empty(&wb->w_sub_write_list))) {
MARS_ERR("hmmm.... this should not happen\n");
2011-04-11 13:40:06 +00:00
free_writeback(wb);
2011-04-12 15:31:08 +00:00
goto done;
2011-04-11 13:40:06 +00:00
}
2011-04-12 15:31:08 +00:00
2011-04-18 14:14:16 +00:00
wb->read_endio = phase2_endio;
wb->write_endio = phase4_endio;
2011-04-19 14:46:38 +00:00
atomic_set(&wb->w_sub_log_count, atomic_read(&wb->w_sub_read_count));
if (brick->log_reads) {
2011-04-29 09:36:10 +00:00
qq_inc_flying(&brick->q_phase2);
2011-05-19 11:36:00 +00:00
fire_writeback(&wb->w_sub_read_list, false, false);
2011-04-19 14:46:38 +00:00
} else { // shortcut
2011-04-20 14:26:44 +00:00
#ifdef LATER
2011-04-29 09:36:10 +00:00
qq_wb_insert(&brick->q_phase4, wb);
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-04-20 14:26:44 +00:00
#else
2011-04-19 14:46:38 +00:00
return phase4_startio(wb);
2011-04-20 14:26:44 +00:00
#endif
2011-04-19 14:46:38 +00:00
}
2011-04-10 16:59:06 +00:00
done:
return true;
err:
return false;
}
2010-08-20 10:58:24 +00:00
/*********************************************************************
* Phase 3: log the old disk version.
*/
2011-04-19 14:46:38 +00:00
static inline
void _phase3_endio(struct writeback_info *wb)
2011-04-08 09:52:46 +00:00
{
struct trans_logger_brick *brick = wb->w_brick;
2011-04-08 09:52:46 +00:00
// queue up for the next phase
2011-04-29 09:36:10 +00:00
qq_wb_insert(&brick->q_phase4, wb);
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-04-08 09:52:46 +00:00
return;
}
static noinline
void phase3_endio(void *private, int error)
2010-08-11 16:02:08 +00:00
{
2010-12-15 12:13:18 +00:00
struct trans_logger_mref_aspect *sub_mref_a;
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick;
2011-04-19 14:46:38 +00:00
struct writeback_info *wb;
2010-08-23 05:06:06 +00:00
2011-03-11 13:57:54 +00:00
sub_mref_a = private;
2010-08-23 05:06:06 +00:00
CHECK_PTR(sub_mref_a, err);
2011-04-19 14:46:38 +00:00
wb = sub_mref_a->wb;
CHECK_PTR(wb, err);
brick = wb->w_brick;
2011-04-29 09:36:10 +00:00
CHECK_PTR(brick, err);
2011-04-08 09:52:46 +00:00
2011-04-29 09:36:10 +00:00
qq_dec_flying(&brick->q_phase3);
2010-08-11 16:02:08 +00:00
2011-03-11 13:57:54 +00:00
if (unlikely(error < 0)) {
MARS_FAT("IO error %d\n", error);
goto err; // FIXME: this leads to hanging requests. do better.
2010-08-20 10:58:24 +00:00
}
2010-08-11 16:02:08 +00:00
2011-04-19 14:46:38 +00:00
CHECK_ATOMIC(&wb->w_sub_log_count, 1);
if (atomic_dec_and_test(&wb->w_sub_log_count)) {
_phase3_endio(wb);
2011-04-08 09:52:46 +00:00
}
return;
err:
MARS_FAT("hanging up....\n");
2010-08-11 16:02:08 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
bool _phase3_startio(struct trans_logger_mref_aspect *sub_mref_a)
2010-08-11 16:02:08 +00:00
{
2011-04-20 14:26:44 +00:00
struct mref_object *sub_mref = NULL;
2011-04-29 09:36:10 +00:00
struct writeback_info *wb;
struct trans_logger_input *input;
2010-12-15 11:58:22 +00:00
struct trans_logger_brick *brick;
2011-04-29 09:36:10 +00:00
struct log_status *logst;
2010-08-11 16:02:08 +00:00
void *data;
bool ok;
2010-08-23 05:06:06 +00:00
CHECK_PTR(sub_mref_a, err);
sub_mref = sub_mref_a->object;
CHECK_PTR(sub_mref, err);
2011-04-29 09:36:10 +00:00
wb = sub_mref_a->wb;
CHECK_PTR(wb, err);
brick = wb->w_brick;
2010-12-15 11:58:22 +00:00
CHECK_PTR(brick, err);
input = sub_mref_a->log_input;
CHECK_PTR(input, err);
2011-04-29 09:36:10 +00:00
logst = &input->logst;
2010-08-23 05:06:06 +00:00
{
struct log_header l = {
.l_stamp = sub_mref_a->stamp,
.l_pos = sub_mref->ref_pos,
.l_len = sub_mref->ref_len,
.l_code = CODE_WRITE_OLD,
};
2011-04-29 09:36:10 +00:00
data = log_reserve(logst, &l);
2010-08-23 05:06:06 +00:00
}
2010-08-11 16:02:08 +00:00
if (unlikely(!data)) {
2010-08-23 05:06:06 +00:00
goto err;
2010-08-11 16:02:08 +00:00
}
2010-08-20 10:58:24 +00:00
memcpy(data, sub_mref->ref_data, sub_mref->ref_len);
2010-08-11 16:02:08 +00:00
2011-05-26 14:32:32 +00:00
ok = log_finalize(logst, sub_mref->ref_len, NULL, phase3_endio, sub_mref_a);
2010-08-11 16:02:08 +00:00
if (unlikely(!ok)) {
2010-08-23 05:06:06 +00:00
goto err;
2010-08-11 16:02:08 +00:00
}
2011-04-20 14:26:44 +00:00
2011-04-29 09:36:10 +00:00
qq_inc_flying(&brick->q_phase3);
2010-08-11 16:02:08 +00:00
return true;
2010-08-23 05:06:06 +00:00
err:
2011-04-20 14:26:44 +00:00
MARS_FAT("cannot log old data, pos = %lld len = %d\n", sub_mref ? sub_mref->ref_pos : 0, sub_mref ? sub_mref->ref_len : 0);
2010-08-23 05:06:06 +00:00
return false;
2010-08-11 16:02:08 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
2011-04-19 14:46:38 +00:00
bool phase3_startio(struct writeback_info *wb)
2011-04-08 09:52:46 +00:00
{
2011-04-19 14:46:38 +00:00
struct trans_logger_brick *brick;
bool ok = true;
2011-04-08 09:52:46 +00:00
2011-04-19 14:46:38 +00:00
CHECK_PTR(wb, err);
brick = wb->w_brick;
2011-04-19 14:46:38 +00:00
CHECK_PTR(brick, err);
2011-04-08 09:52:46 +00:00
2011-04-29 09:36:10 +00:00
if (brick->log_reads && atomic_read(&wb->w_sub_log_count) > 0) {
2011-04-19 14:46:38 +00:00
struct list_head *start;
2011-04-08 09:52:46 +00:00
struct list_head *tmp;
2011-04-19 14:46:38 +00:00
start = &wb->w_sub_read_list;
2011-04-20 14:26:44 +00:00
for (tmp = start->next; tmp != start; tmp = tmp->next) {
2011-04-08 09:52:46 +00:00
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
2011-04-19 14:46:38 +00:00
2011-04-08 09:52:46 +00:00
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
2011-04-19 14:46:38 +00:00
2011-04-08 09:52:46 +00:00
mars_trace(sub_mref, "sub_log");
2011-04-19 14:46:38 +00:00
if (!_phase3_startio(sub_mref_a)) {
ok = false;
}
2011-04-08 09:52:46 +00:00
}
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-04-08 09:52:46 +00:00
} else {
2011-04-19 14:46:38 +00:00
_phase3_endio(wb);
2011-04-08 09:52:46 +00:00
}
2011-04-19 14:46:38 +00:00
return ok;
2011-04-08 09:52:46 +00:00
err:
return false;
}
2010-08-20 10:58:24 +00:00
/*********************************************************************
* Phase 4: overwrite old disk version with new version.
*/
2011-04-18 14:14:16 +00:00
static noinline
void phase4_endio(struct generic_callback *cb)
{
struct trans_logger_mref_aspect *sub_mref_a;
struct writeback_info *wb;
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick;
2011-04-18 14:14:16 +00:00
CHECK_PTR(cb, err);
sub_mref_a = cb->cb_private;
CHECK_PTR(sub_mref_a, err);
wb = sub_mref_a->wb;
CHECK_PTR(wb, err);
brick = wb->w_brick;
2011-04-29 09:36:10 +00:00
CHECK_PTR(brick, err);
2011-04-18 14:14:16 +00:00
if (unlikely(cb->cb_error < 0)) {
MARS_FAT("IO error %d\n", cb->cb_error);
goto err;
}
2011-04-29 09:36:10 +00:00
hash_put_all(brick, &wb->w_collect_list);
2011-04-18 14:14:16 +00:00
2011-04-29 09:36:10 +00:00
qq_dec_flying(&brick->q_phase4);
2011-05-13 11:19:28 +00:00
atomic_inc(&brick->total_writeback_cluster_count);
2011-03-20 17:38:08 +00:00
2011-04-19 14:46:38 +00:00
free_writeback(wb);
2010-08-20 10:58:24 +00:00
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-03-20 17:38:08 +00:00
2011-04-08 09:52:46 +00:00
return;
2010-08-23 05:06:06 +00:00
2011-04-08 09:52:46 +00:00
err:
MARS_FAT("hanging up....\n");
2010-08-20 10:58:24 +00:00
}
2011-04-08 09:52:46 +00:00
static noinline
2011-04-19 14:46:38 +00:00
bool phase4_startio(struct writeback_info *wb)
2011-04-08 09:52:46 +00:00
{
2011-04-20 14:26:44 +00:00
struct list_head *start = &wb->w_sub_read_list;
struct list_head *tmp;
/* Cleanup read requests (if they exist from previous phases)
*/
while ((tmp = start->next) != start) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
2011-04-29 09:36:10 +00:00
struct trans_logger_input *sub_input;
2011-04-20 14:26:44 +00:00
list_del_init(tmp);
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
2011-04-29 09:36:10 +00:00
sub_input = sub_mref_a->my_input;
2011-04-20 14:26:44 +00:00
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
}
/* Start writeback IO
*/
qq_inc_flying(&wb->w_brick->q_phase4);
2011-05-19 11:36:00 +00:00
fire_writeback(&wb->w_sub_write_list, true, true);
2011-04-08 09:52:46 +00:00
return true;
}
2011-04-19 14:46:38 +00:00
/*********************************************************************
* The logger thread.
* There is only a single instance, dealing with all requests in parallel.
*/
2011-04-08 09:52:46 +00:00
static noinline
2011-04-19 14:46:38 +00:00
int run_mref_queue(struct logger_queue *q, bool (*startio)(struct trans_logger_mref_aspect *sub_mref_a), int max)
2010-08-20 10:58:24 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick = q->q_brick;
2011-04-19 14:46:38 +00:00
bool found = false;
2011-04-08 09:52:46 +00:00
bool ok;
2011-04-19 14:46:38 +00:00
int res;
2010-08-20 10:58:24 +00:00
2011-04-29 09:36:10 +00:00
do {
struct trans_logger_mref_aspect *mref_a;
2011-04-19 14:46:38 +00:00
mref_a = qq_mref_fetch(q);
res = -1;
if (!mref_a)
goto done;
2011-03-29 14:40:40 +00:00
2011-04-19 14:46:38 +00:00
ok = startio(mref_a);
if (unlikely(!ok)) {
qq_mref_pushback(q, mref_a);
2011-04-29 09:36:10 +00:00
brick->did_pushback = true;
2011-04-19 14:46:38 +00:00
res = 1;
goto done;
}
2011-04-29 09:36:10 +00:00
brick->did_work = true;
found = true;
__trans_logger_ref_put(mref_a->my_brick, mref_a);
2011-04-29 09:36:10 +00:00
} while (--max > 0);
2011-04-19 14:46:38 +00:00
res = 0;
2010-08-20 10:58:24 +00:00
2011-04-19 14:46:38 +00:00
done:
if (found) {
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-04-08 09:52:46 +00:00
}
2011-04-19 14:46:38 +00:00
return res;
2011-04-08 09:52:46 +00:00
}
static noinline
2011-04-19 14:46:38 +00:00
int run_wb_queue(struct logger_queue *q, bool (*startio)(struct writeback_info *wb), int max)
2010-08-23 05:06:06 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_brick *brick = q->q_brick;
2011-03-11 13:57:54 +00:00
bool found = false;
2010-08-23 05:06:06 +00:00
bool ok;
2011-03-11 13:57:54 +00:00
int res;
2010-08-23 05:06:06 +00:00
2011-04-29 09:36:10 +00:00
do {
struct writeback_info *wb;
2011-04-19 14:46:38 +00:00
wb = qq_wb_fetch(q);
2011-03-11 13:57:54 +00:00
res = -1;
2011-04-19 14:46:38 +00:00
if (!wb)
2011-03-11 13:57:54 +00:00
goto done;
2011-04-19 14:46:38 +00:00
ok = startio(wb);
2010-08-23 05:06:06 +00:00
if (unlikely(!ok)) {
2011-04-19 14:46:38 +00:00
qq_wb_pushback(q, wb);
2011-04-29 09:36:10 +00:00
brick->did_pushback = true;
2011-03-11 13:57:54 +00:00
res = 1;
goto done;
2010-08-23 05:06:06 +00:00
}
2011-04-29 09:36:10 +00:00
brick->did_work = true;
found = true;
} while (--max > 0);
2011-03-11 13:57:54 +00:00
res = 0;
done:
if (found) {
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-03-11 13:57:54 +00:00
}
return res;
2010-08-23 05:06:06 +00:00
}
2011-04-08 09:52:46 +00:00
static inline
2011-04-29 09:36:10 +00:00
int _congested(struct trans_logger_brick *brick)
{
return atomic_read(&brick->q_phase1.q_queued)
|| atomic_read(&brick->q_phase1.q_flying)
|| atomic_read(&brick->q_phase2.q_queued)
|| atomic_read(&brick->q_phase2.q_flying)
|| atomic_read(&brick->q_phase3.q_queued)
|| atomic_read(&brick->q_phase3.q_flying)
|| atomic_read(&brick->q_phase4.q_queued)
|| atomic_read(&brick->q_phase4.q_flying);
}
static inline
bool logst_is_ready(struct trans_logger_brick *brick)
{
int nr = brick->log_input_nr;
struct trans_logger_input *input = brick->inputs[nr];
struct log_status *logst = &input->logst;
return is_log_ready(logst);
}
2011-04-29 09:36:10 +00:00
/* The readyness of the queues is volatile (may change underneath due
* to interrupts etc).
* In order to get consistency during one round of the loop in
* trans_logger_log(), we capture the status exactly once and
* use the captured status during processing.
*/
struct condition_status {
bool q1_ready;
bool q2_ready;
bool q3_ready;
bool q4_ready;
bool extra_ready;
2011-11-14 14:21:15 +00:00
bool some_ready;
2011-04-29 09:36:10 +00:00
};
static noinline
bool _condition(struct condition_status *st, struct trans_logger_brick *brick)
2010-08-20 10:58:24 +00:00
{
st->q1_ready = atomic_read(&brick->q_phase1.q_queued) > 0 &&
logst_is_ready(brick);
2011-04-29 09:36:10 +00:00
st->q2_ready = qq_is_ready(&brick->q_phase2);
st->q3_ready = qq_is_ready(&brick->q_phase3);
st->q4_ready = qq_is_ready(&brick->q_phase4);
st->extra_ready = (kthread_should_stop() && !_congested(brick));
2011-11-14 14:21:15 +00:00
return (st->some_ready = st->q1_ready | st->q2_ready | st->q3_ready | st->q4_ready | st->extra_ready);
2011-02-23 20:48:06 +00:00
}
static
2011-11-15 17:32:20 +00:00
void _init_input(struct trans_logger_input *input)
{
struct trans_logger_brick *brick = input->brick;
struct log_status *logst = &input->logst;
2011-11-15 17:32:20 +00:00
loff_t start_pos = input->log_start_pos;
init_logst(logst, (void*)input, 0);
logst->align_size = brick->align_size;
logst->chunk_size = brick->chunk_size;
logst->max_flying = brick->max_flying;
input->replay_min_pos = start_pos;
input->replay_max_pos = start_pos; // FIXME: Theoretically, this could be wrong when starting on an interrupted replay / inconsistent system. However, we normally never start ordinary logging in such a case (possibly except some desperate emergency cases when there really is no other chance, such as physical loss of transaction logs). Nevertheless, better use old consistenty information from the FS here.
logst->log_pos = start_pos;
input->is_operating = true;
}
static
void _init_inputs(struct trans_logger_brick *brick)
{
struct trans_logger_input *input;
int nr = brick->new_input_nr;
if (brick->log_input_nr != brick->old_input_nr) {
MARS_DBG("nothing to do, new_input_nr = %d log_input_nr = &d old_input_nr = %d\n", brick->new_input_nr, brick->log_input_nr, brick->old_input_nr);
goto done;
}
if (unlikely(nr < TL_INPUT_LOG1 || nr > TL_INPUT_LOG2)) {
MARS_ERR("bad new_input_nr = %d\n", nr);
goto done;
}
input = brick->inputs[nr];
CHECK_PTR(input, done);
if (input->is_operating || !input->connect) {
MARS_DBG("cannot yet switch over to %d (is_operating = %d connect = %p)\n", nr, input->is_operating, input->connect);
goto done;
}
2011-11-15 17:32:20 +00:00
_init_input(input);
brick->log_input_nr = nr;
2011-11-15 17:32:20 +00:00
MARS_INF("switching over to new logfile %d (old = %d) startpos = %lld\n", nr, brick->old_input_nr, input->log_start_pos);
done: ;
}
static
void _flush_inputs(struct trans_logger_brick *brick)
{
int i;
for (i = TL_INPUT_LOG1; i <= TL_INPUT_LOG2; i++) {
struct trans_logger_input *input = brick->inputs[i];
struct log_status *logst = &input->logst;
if (input->is_operating && logst->count > 0) {
atomic_inc(&brick->total_flush_count);
log_flush(logst);
}
}
}
static
void _exit_inputs(struct trans_logger_brick *brick, bool force)
{
int i;
for (i = TL_INPUT_LOG1; i <= TL_INPUT_LOG2; i++) {
struct trans_logger_input *input = brick->inputs[i];
struct log_status *logst = &input->logst;
if (input->is_operating &&
(force || !input->connect)) {
MARS_DBG("cleaning up input %d (log = %d old = %d)\n", i, brick->log_input_nr, brick->old_input_nr);
exit_logst(logst);
input->is_operating = false;
if (i == brick->old_input_nr)
brick->old_input_nr = brick->log_input_nr;
}
}
}
2011-04-08 09:52:46 +00:00
static noinline
void trans_logger_log(struct trans_logger_brick *brick)
2011-02-23 20:48:06 +00:00
{
2011-11-14 14:21:15 +00:00
#ifdef DELAY_CALLERS
2011-06-30 13:15:52 +00:00
bool unlimited = false;
bool old_unlimited = false;
2011-06-30 13:15:52 +00:00
bool delay_callers;
2011-11-14 14:21:15 +00:00
#endif
2011-04-29 09:36:10 +00:00
long wait_timeout = HZ;
2011-04-01 11:18:32 +00:00
#ifdef STAT_DEBUGGING
2011-03-30 12:02:50 +00:00
long long last_jiffies = jiffies;
#endif
2011-04-29 09:36:10 +00:00
#if 1
int max_delta = 0;
#endif
_init_inputs(brick);
2011-04-08 09:52:46 +00:00
2011-03-29 14:40:40 +00:00
mars_power_led_on((void*)brick, true);
2011-04-29 09:36:10 +00:00
while (!kthread_should_stop() || _congested(brick)) {
long long old_jiffies = jiffies;
2011-05-13 11:19:28 +00:00
long old_wait_timeout;
2011-05-19 11:36:00 +00:00
bool do_flush;
2011-04-29 09:36:10 +00:00
struct condition_status st = {};
#if 1
long long j0;
long long j1;
long long j2;
long long j3;
long long j4;
2011-03-11 13:57:54 +00:00
#endif
2011-04-29 09:36:10 +00:00
2011-04-08 09:52:46 +00:00
MARS_IO("waiting for request\n");
2011-04-29 09:36:10 +00:00
__wait_event_interruptible_timeout(
2011-06-30 13:15:52 +00:00
brick->worker_event,
2011-04-29 09:36:10 +00:00
_condition(&st, brick),
2011-03-11 13:57:54 +00:00
wait_timeout);
2011-05-13 11:19:28 +00:00
atomic_inc(&brick->total_round_count);
_init_inputs(brick);
2011-04-29 09:36:10 +00:00
#if 1
j0 = jiffies;
2011-04-01 11:18:32 +00:00
#endif
2011-04-29 09:36:10 +00:00
//MARS_DBG("AHA %d\n", atomic_read(&brick->q_phase1.q_queued));
2011-04-01 11:18:32 +00:00
#ifdef STAT_DEBUGGING
2011-03-29 14:40:40 +00:00
if (((long long)jiffies) - last_jiffies >= HZ * 5 && brick->power.button) {
2011-04-01 11:18:32 +00:00
char *txt;
2010-08-23 05:06:06 +00:00
last_jiffies = jiffies;
2011-04-01 11:18:32 +00:00
txt = brick->ops->brick_statistics(brick, 0);
if (txt) {
MARS_INF("%s", txt);
2011-08-12 11:09:48 +00:00
brick_string_free(txt);
2011-04-01 11:18:32 +00:00
}
2010-08-20 10:58:24 +00:00
}
2010-08-23 05:06:06 +00:00
#endif
2011-04-29 09:36:10 +00:00
brick->did_pushback = false;
brick->did_work = false;
2011-03-11 13:57:54 +00:00
2011-04-29 09:36:10 +00:00
/* This is highest priority, do it first.
2011-03-18 13:15:40 +00:00
*/
2011-11-14 14:21:15 +00:00
if (st.q1_ready) {
run_mref_queue(&brick->q_phase1, phase0_startio, brick->q_phase1.q_batchlen);
}
2011-04-29 09:36:10 +00:00
j1 = jiffies;
2010-08-20 10:58:24 +00:00
2011-04-29 09:36:10 +00:00
/* In order to speed up draining, check the other queues
* in backward direction.
2010-08-23 05:06:06 +00:00
*/
2011-04-29 09:36:10 +00:00
if (st.q4_ready) {
run_wb_queue(&brick->q_phase4, phase4_startio, brick->q_phase4.q_batchlen);
2011-03-18 13:15:40 +00:00
}
2011-04-29 09:36:10 +00:00
j2 = jiffies;
2011-04-19 14:46:38 +00:00
2011-04-29 09:36:10 +00:00
if (st.q3_ready) {
run_wb_queue(&brick->q_phase3, phase3_startio, brick->q_phase3.q_batchlen);
2011-03-18 13:15:40 +00:00
}
2011-04-29 09:36:10 +00:00
j3 = jiffies;
2011-03-18 13:15:40 +00:00
2011-04-29 09:36:10 +00:00
if (st.q2_ready) {
run_mref_queue(&brick->q_phase2, phase2_startio, brick->q_phase2.q_batchlen);
2010-12-15 11:58:22 +00:00
}
2011-04-29 09:36:10 +00:00
j4 = jiffies;
2010-12-15 11:58:22 +00:00
2011-04-29 09:36:10 +00:00
/* A kind of delayed plugging mechanism
*/
2011-05-13 11:19:28 +00:00
old_wait_timeout = wait_timeout;
2011-04-29 09:36:10 +00:00
wait_timeout = HZ / 10; // 100ms before flushing
#ifdef CONFIG_DEBUG_KERNEL // debug override for catching long blocks
2011-06-30 13:15:52 +00:00
//wait_timeout = 16 * HZ;
2011-04-29 09:36:10 +00:00
#endif
2011-05-19 11:36:00 +00:00
/* Calling log_flush() too often may result in
* increased overhead (and thus in lower throughput).
* OTOH, calling it too seldom may hold back
* IO completion for the end user for some time.
* Play around with wait_timeout to optimize this.
*/
do_flush = false;
2011-04-29 09:36:10 +00:00
if (brick->did_work) {
2011-05-13 11:19:28 +00:00
atomic_inc(&brick->total_restart_count);
2011-05-19 11:36:00 +00:00
do_flush = !brick->flush_delay;
if (!do_flush) { // start over soon
wait_timeout = brick->flush_delay;
}
} else if (atomic_read(&brick->q_phase1.q_queued) <= 0 &&
2011-05-13 11:19:28 +00:00
(brick->minimize_latency || (long long)jiffies - old_jiffies >= old_wait_timeout)) {
2011-05-19 11:36:00 +00:00
do_flush = true;
}
2011-05-26 14:32:32 +00:00
#if 1
do_flush = true;
#endif
if (do_flush) {
_flush_inputs(brick);
2011-03-11 13:57:54 +00:00
}
2011-04-29 09:36:10 +00:00
#if 1
{
int delta = (long long)jiffies - j0;
int delta1 = (long long)j1 - j0;
int delta2 = (long long)j2 - j0;
int delta3 = (long long)j3 - j0;
int delta4 = (long long)j4 - j0;
if (delta > max_delta) {
max_delta = delta;
MARS_INF("delta = %d %d %d %d %d\n", delta, delta1, delta2, delta3, delta4);
}
2010-08-23 05:06:06 +00:00
}
2011-04-29 09:36:10 +00:00
2011-11-14 14:21:15 +00:00
if (st.some_ready && !brick->did_work) {
2011-04-29 09:36:10 +00:00
char *txt;
txt = brick->ops->brick_statistics(brick, 0);
2011-07-20 13:11:44 +00:00
MARS_WRN("inconsistent work, pushback = %d q1 = %d q2 = %d q3 = %d q4 = %d extra = %d ====> %s\n", brick->did_pushback, st.q1_ready, st.q2_ready, st.q3_ready, st.q4_ready, st.extra_ready, txt ? txt : "(ERROR)");
2011-04-29 09:36:10 +00:00
if (txt) {
2011-08-12 11:09:48 +00:00
brick_string_free(txt);
2011-04-29 09:36:10 +00:00
}
}
2011-06-30 13:15:52 +00:00
#endif
2011-11-14 14:21:15 +00:00
#ifdef DELAY_CALLERS // provisionary flood handling FIXME: do better
#define LIMIT_FN(factor,divider) \
(atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit * (factor) / (divider) && brick->shadow_mem_limit > 16) || \
(atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit * (factor) / (divider) && brick_global_memlimit > PAGE_SIZE * 16)
delay_callers = LIMIT_FN(1, 1);
2011-06-30 13:15:52 +00:00
if (delay_callers != brick->delay_callers) {
MARS_DBG("mshadow_count = %d/%d global_mem = %lld/%lld stalling %d -> %d\n", atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic64_read(&brick->shadow_mem_used), brick_global_memlimit, brick->delay_callers, delay_callers);
2011-06-30 13:15:52 +00:00
brick->delay_callers = delay_callers;
wake_up_interruptible_all(&brick->worker_event);
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->caller_event);
if (delay_callers)
atomic_inc(&brick->total_delay_count);
2011-06-30 13:15:52 +00:00
}
if (unlimited) {
unlimited = LIMIT_FN(3, 8);
2011-06-30 13:15:52 +00:00
} else {
unlimited = LIMIT_FN(1, 2);
}
if (unlimited != old_unlimited) {
brick->q_phase2.q_unlimited = unlimited;
brick->q_phase3.q_unlimited = unlimited;
brick->q_phase4.q_unlimited = unlimited;
MARS_DBG("mshadow_count = %d/%d global_mem = %lld/%lld unlimited %d -> %d\n", atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic64_read(&brick->shadow_mem_used), brick_global_memlimit, old_unlimited, unlimited);
old_unlimited = unlimited;
wake_up_interruptible_all(&brick->worker_event);
wake_up_interruptible_all(&brick->caller_event);
2011-06-30 13:15:52 +00:00
}
2011-04-29 09:36:10 +00:00
#endif
_exit_inputs(brick, false);
2010-08-20 10:58:24 +00:00
}
_exit_inputs(brick, true);
2011-02-23 20:48:06 +00:00
}
2011-04-08 09:52:46 +00:00
////////////////////////////// log replay //////////////////////////////
2011-03-27 15:18:38 +00:00
2011-04-08 09:52:46 +00:00
static noinline
2011-03-27 15:18:38 +00:00
void replay_endio(struct generic_callback *cb)
{
struct trans_logger_mref_aspect *mref_a = cb->cb_private;
2011-04-08 09:52:46 +00:00
struct trans_logger_brick *brick;
unsigned long flags;
2011-03-27 15:18:38 +00:00
CHECK_PTR(mref_a, err);
brick = mref_a->my_brick;
2011-04-08 09:52:46 +00:00
CHECK_PTR(brick, err);
2011-03-27 15:18:38 +00:00
2011-04-08 09:52:46 +00:00
traced_lock(&brick->replay_lock, flags);
list_del_init(&mref_a->replay_head);
traced_unlock(&brick->replay_lock, flags);
2011-04-29 09:36:10 +00:00
atomic_dec(&brick->replay_count);
2011-06-30 13:15:52 +00:00
wake_up_interruptible_all(&brick->worker_event);
2011-03-27 15:18:38 +00:00
return;
err:
MARS_FAT("cannot handle replay IO\n");
}
2011-04-08 09:52:46 +00:00
static noinline
2011-04-29 09:36:10 +00:00
bool _has_conflict(struct trans_logger_brick *brick, struct trans_logger_mref_aspect *mref_a)
2011-03-27 15:18:38 +00:00
{
2011-04-08 09:52:46 +00:00
struct mref_object *mref = mref_a->object;
struct list_head *tmp;
bool res = false;
unsigned long flags;
// NOTE: replacing this by rwlock_t will not gain anything, because there exists at most 1 reader at any time
traced_lock(&brick->replay_lock, flags);
for (tmp = brick->replay_list.next; tmp != &brick->replay_list; tmp = tmp->next) {
struct trans_logger_mref_aspect *tmp_a;
struct mref_object *tmp_mref;
tmp_a = container_of(tmp, struct trans_logger_mref_aspect, replay_head);
tmp_mref = tmp_a->object;
if (tmp_mref->ref_pos + tmp_mref->ref_len > mref->ref_len && tmp_mref->ref_pos < mref->ref_pos + mref->ref_len) {
res = true;
break;
}
}
traced_unlock(&brick->replay_lock, flags);
return res;
}
static noinline
2011-04-29 09:36:10 +00:00
void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspect *mref_a)
2011-04-08 09:52:46 +00:00
{
int max = 1024 * 2; // limit parallelism somewhat
unsigned long flags;
2011-06-30 13:15:52 +00:00
wait_event_interruptible_timeout(brick->worker_event,
2011-04-29 09:36:10 +00:00
atomic_read(&brick->replay_count) <= max
&& !_has_conflict(brick, mref_a),
2011-04-08 09:52:46 +00:00
60 * HZ);
2011-04-29 09:36:10 +00:00
atomic_inc(&brick->replay_count);
2011-06-10 13:57:52 +00:00
atomic_inc(&brick->total_replay_count);
2011-04-08 09:52:46 +00:00
traced_lock(&brick->replay_lock, flags);
list_add(&mref_a->replay_head, &brick->replay_list);
traced_unlock(&brick->replay_lock, flags);
}
static noinline
2011-04-29 09:36:10 +00:00
int apply_data(struct trans_logger_brick *brick, loff_t pos, void *buf, int len)
2011-04-08 09:52:46 +00:00
{
2011-04-29 09:36:10 +00:00
struct trans_logger_input *input = brick->inputs[TL_INPUT_WRITEBACK];
2011-03-27 15:18:38 +00:00
int status;
2011-04-08 09:52:46 +00:00
MARS_IO("got data, pos = %lld, len = %d\n", pos, len);
2011-03-27 15:18:38 +00:00
2011-04-29 09:36:10 +00:00
if (!input->connect) {
input = brick->inputs[TL_INPUT_READ];
}
2011-03-27 15:18:38 +00:00
/* TODO for better efficiency:
* Instead of starting IO here, just put the data into the hashes
* and queues such that ordinary IO will be corrected.
* Writeback will be lazy then.
* The switch infrastructure must be changed before this
2011-04-08 09:52:46 +00:00
* becomes possible.
2011-03-27 15:18:38 +00:00
*/
2011-04-08 09:52:46 +00:00
#ifdef APPLY_DATA
2011-03-27 15:18:38 +00:00
while (len > 0) {
struct mref_object *mref;
struct trans_logger_mref_aspect *mref_a;
status = -ENOMEM;
mref = trans_logger_alloc_mref(brick, &input->sub_layout);
2011-03-27 15:18:38 +00:00
if (unlikely(!mref)) {
MARS_ERR("no memory\n");
goto done;
}
mref_a = trans_logger_mref_get_aspect(brick, mref);
2011-03-27 15:18:38 +00:00
CHECK_PTR(mref_a, done);
2011-04-08 09:52:46 +00:00
mref->ref_pos = pos;
mref->ref_data = NULL;
2011-03-27 15:18:38 +00:00
mref->ref_len = len;
mref->ref_may_write = WRITE;
mref->ref_rw = WRITE;
status = GENERIC_INPUT_CALL(input, mref_get, mref);
if (unlikely(status < 0)) {
MARS_ERR("cannot get mref, status = %d\n", status);
goto done;
}
2011-04-08 09:52:46 +00:00
if (unlikely(!mref->ref_data)) {
status = -ENOMEM;
MARS_ERR("cannot get mref, status = %d\n", status);
goto done;
}
if (unlikely(mref->ref_len <= 0 || mref->ref_len > len)) {
status = -EINVAL;
MARS_ERR("bad ref len = %d (requested = %d)\n", mref->ref_len, len);
goto done;
}
2011-03-27 15:18:38 +00:00
mars_trace(mref, "replay_start");
2011-04-08 09:52:46 +00:00
2011-04-29 09:36:10 +00:00
wait_replay(brick, mref_a);
2011-04-08 09:52:46 +00:00
mars_trace(mref, "replay_io");
memcpy(mref->ref_data, buf, mref->ref_len);
SETUP_CALLBACK(mref, replay_endio, mref_a);
mref_a->my_brick = brick;
2011-03-27 15:18:38 +00:00
GENERIC_INPUT_CALL(input, mref_io, mref);
2011-04-08 09:52:46 +00:00
if (unlikely(mref->ref_len <= 0)) {
status = -EINVAL;
MARS_ERR("bad ref len = %d (requested = %d)\n", mref->ref_len, len);
goto done;
}
pos += mref->ref_len;
2011-03-27 15:18:38 +00:00
buf += mref->ref_len;
len -= mref->ref_len;
GENERIC_INPUT_CALL(input, mref_put, mref);
}
#endif
status = 0;
done:
return status;
}
2011-04-08 09:52:46 +00:00
static noinline
void trans_logger_replay(struct trans_logger_brick *brick)
2011-02-23 20:48:06 +00:00
{
struct trans_logger_input *input = brick->inputs[brick->log_input_nr];
2011-05-13 11:19:28 +00:00
loff_t start_pos;
loff_t finished_pos;
2011-06-30 13:15:52 +00:00
long long old_jiffies = jiffies;
2011-06-10 13:57:52 +00:00
int status = 0;
2011-04-08 09:52:46 +00:00
2011-06-17 11:32:38 +00:00
brick->replay_code = 0; // indicates "running"
2011-02-23 20:48:06 +00:00
2011-07-28 11:41:06 +00:00
start_pos = brick->replay_start_pos;
init_logst(&input->logst, (void*)input, start_pos);
2011-04-29 09:36:10 +00:00
input->logst.align_size = brick->align_size;
input->logst.chunk_size = brick->chunk_size;
2011-02-23 20:48:06 +00:00
2011-07-28 11:41:06 +00:00
MARS_INF("starting replay from %lld to %lld\n", start_pos, brick->replay_end_pos);
2011-05-13 11:19:28 +00:00
input->replay_min_pos = start_pos;
input->replay_max_pos = start_pos; // FIXME: this is wrong.
2011-04-08 09:52:46 +00:00
mars_power_led_on((void*)brick, true);
for (;;) {
2011-07-15 10:12:06 +00:00
loff_t new_finished_pos;
2011-03-18 13:15:40 +00:00
struct log_header lh = {};
2011-03-27 15:18:38 +00:00
void *buf = NULL;
int len = 0;
2011-07-15 10:12:06 +00:00
finished_pos = input->logst.log_pos + input->logst.offset;
if (kthread_should_stop() ||
(!brick->do_continuous_replay && finished_pos >= brick->replay_end_pos)) {
status = 0; // treat as EOF
2011-03-18 13:15:40 +00:00
break;
}
2011-04-29 09:36:10 +00:00
status = log_read(&input->logst, &lh, &buf, &len);
2011-06-10 13:57:52 +00:00
if (status == -EAGAIN) {
MARS_DBG("got -EAGAIN\n");
msleep(100);
continue;
}
2011-04-08 09:52:46 +00:00
if (unlikely(status < 0)) {
brick->replay_code = status;
2011-03-27 15:18:38 +00:00
MARS_ERR("cannot read logfile data, status = %d\n", status);
break;
}
2011-07-28 11:41:06 +00:00
2011-07-15 10:12:06 +00:00
new_finished_pos = input->logst.log_pos + input->logst.offset;
2011-07-28 11:41:06 +00:00
MARS_RPL("read %lld %lld\n", finished_pos, new_finished_pos);
2011-07-15 10:12:06 +00:00
if ((!status && len <= 0) ||
new_finished_pos > brick->replay_end_pos) { // EOF -> wait until kthread_should_stop()
MARS_DBG("EOF at %lld (old = %lld, end_pos = %lld)\n", new_finished_pos, finished_pos, brick->replay_end_pos);
2011-04-08 09:52:46 +00:00
if (!brick->do_continuous_replay) {
2011-07-15 10:12:06 +00:00
// notice: finished_pos remains at old value here!
2011-06-10 13:57:52 +00:00
brick->replay_end_pos = finished_pos;
2011-07-15 10:12:06 +00:00
break;
2011-06-10 13:57:52 +00:00
}
2011-04-08 09:52:46 +00:00
msleep(1000);
2011-07-15 10:12:06 +00:00
continue;
2011-03-29 14:40:40 +00:00
}
2011-03-27 15:18:38 +00:00
2011-04-29 09:36:10 +00:00
if (lh.l_code != CODE_WRITE_NEW) {
MARS_IO("ignoring pos = %lld len = %d code = %d\n", lh.l_pos, lh.l_len, lh.l_code);
2011-07-15 10:12:06 +00:00
} else if (likely(buf && len)) {
2011-04-29 09:36:10 +00:00
status = apply_data(brick, lh.l_pos, buf, len);
2011-07-28 11:41:06 +00:00
MARS_RPL("apply %lld %lld (pos=%lld status=%d)\n", finished_pos, new_finished_pos, lh.l_pos, status);
2011-04-08 09:52:46 +00:00
if (unlikely(status < 0)) {
brick->replay_code = status;
2011-06-10 13:57:52 +00:00
MARS_ERR("cannot apply data at pos = %lld len = %d, status = %d\n", lh.l_pos, len, status);
2011-04-08 09:52:46 +00:00
break;
2011-07-15 10:12:06 +00:00
} else {
finished_pos = new_finished_pos;
2011-04-08 09:52:46 +00:00
}
}
// do this _after_ any opportunities for errors...
2011-06-30 13:15:52 +00:00
if (atomic_read(&brick->replay_count) <= 0 || ((long long)jiffies) - old_jiffies >= HZ * 5) {
2011-05-13 11:19:28 +00:00
input->replay_min_pos = finished_pos;
input->replay_max_pos = finished_pos; // FIXME
2011-06-30 13:15:52 +00:00
old_jiffies = jiffies;
2011-03-18 13:15:40 +00:00
}
_exit_inputs(brick, false);
2011-03-18 13:15:40 +00:00
}
2011-03-27 15:18:38 +00:00
2011-06-30 13:15:52 +00:00
MARS_INF("waiting for finish...\n");
wait_event_interruptible_timeout(brick->worker_event, atomic_read(&brick->replay_count) <= 0, 60 * HZ);
2011-03-27 15:18:38 +00:00
2011-07-15 10:12:06 +00:00
if (unlikely(finished_pos > brick->replay_end_pos)) {
MARS_ERR("finished_pos too large: %lld + %d = %lld > %lld\n", input->logst.log_pos, input->logst.offset, finished_pos, brick->replay_end_pos);
finished_pos = brick->replay_end_pos;
}
2011-06-10 13:57:52 +00:00
if (status >= 0) {
input->replay_min_pos = finished_pos;
input->replay_max_pos = finished_pos; // FIXME
}
2011-03-27 15:18:38 +00:00
2011-06-10 13:57:52 +00:00
if (status >= 0 && finished_pos == brick->replay_end_pos) {
2011-05-13 11:19:28 +00:00
MARS_INF("replay finished at %lld\n", finished_pos);
2011-07-20 13:11:44 +00:00
brick->replay_code = 1;
2011-03-27 15:18:38 +00:00
} else {
2011-05-13 11:19:28 +00:00
MARS_INF("replay stopped prematurely at %lld (of %lld)\n", finished_pos, brick->replay_end_pos);
2011-07-20 13:11:44 +00:00
brick->replay_code = 2;
2011-04-08 09:52:46 +00:00
}
_exit_inputs(brick, true);
2011-07-20 13:11:44 +00:00
mars_trigger();
2011-04-08 09:52:46 +00:00
while (!kthread_should_stop()) {
msleep(500);
2011-02-23 20:48:06 +00:00
}
}
2011-03-27 15:18:38 +00:00
///////////////////////// logger thread / switching /////////////////////////
2011-04-08 09:52:46 +00:00
static noinline
2011-02-23 20:48:06 +00:00
int trans_logger_thread(void *data)
{
struct trans_logger_output *output = data;
struct trans_logger_brick *brick = output->brick;
MARS_INF("........... logger has started.\n");
if (brick->do_replay) {
trans_logger_replay(brick);
2011-02-23 20:48:06 +00:00
} else {
trans_logger_log(brick);
2011-02-23 20:48:06 +00:00
}
MARS_INF("........... logger has stopped.\n");
2011-03-29 14:40:40 +00:00
mars_power_led_on((void*)brick, false);
2011-02-23 20:48:06 +00:00
mars_power_led_off((void*)brick, true);
return 0;
}
2011-04-08 09:52:46 +00:00
static noinline
2011-02-23 20:48:06 +00:00
int trans_logger_switch(struct trans_logger_brick *brick)
{
static int index = 0;
struct trans_logger_output *output = brick->outputs[0];
if (brick->power.button) {
2011-04-29 09:36:10 +00:00
if (!brick->thread && brick->power.led_off) {
2011-03-18 13:15:40 +00:00
mars_power_led_off((void*)brick, false);
2011-04-29 09:36:10 +00:00
brick->thread = kthread_create(trans_logger_thread, output, "mars_logger%d", index++);
if (IS_ERR(brick->thread)) {
int error = PTR_ERR(brick->thread);
2011-02-23 20:48:06 +00:00
MARS_ERR("cannot create thread, status=%d\n", error);
2011-04-29 09:36:10 +00:00
brick->thread = NULL;
2011-02-23 20:48:06 +00:00
return error;
}
2011-04-29 09:36:10 +00:00
get_task_struct(brick->thread);
wake_up_process(brick->thread);
2011-02-23 20:48:06 +00:00
}
} else {
mars_power_led_on((void*)brick, false);
2011-04-29 09:36:10 +00:00
if (brick->thread) {
2011-08-25 10:16:32 +00:00
MARS_INF("stopping thread...\n");
2011-04-29 09:36:10 +00:00
kthread_stop(brick->thread);
put_task_struct(brick->thread);
brick->thread = NULL;
2011-02-23 20:48:06 +00:00
}
}
2010-08-20 10:58:24 +00:00
return 0;
}
2011-04-12 15:31:08 +00:00
2011-04-01 11:18:32 +00:00
//////////////// informational / statistics ///////////////
2011-04-08 09:52:46 +00:00
static noinline
2011-04-01 11:18:32 +00:00
char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
{
char *res = brick_string_alloc(1024);
2011-04-01 11:18:32 +00:00
if (!res)
return NULL;
snprintf(res, 1023, "mode replay=%d continuous=%d replay_code=%d log_reads=%d | replay_start_pos = %lld replay_end_pos = %lld | new_input_nr = %d log_input_nr = %d (old = %d) replay_min_pos1 = %lld replay_max_pos1 = %lld replay_min_pos2 = %lld replay_max_pos2 = %lld | total replay=%d callbacks=%d reads=%d writes=%d flushes=%d (%d%%) wb_clusters=%d writebacks=%d (%d%%) shortcut=%d (%d%%) mshadow=%d sshadow=%d rounds=%d restarts=%d delays=%d phase1=%d phase2=%d phase3=%d phase4=%d | current shadow_mem_used=%ld/%lld replay=%d mshadow=%d/%d sshadow=%d hash_count=%d pos_count=%d balance=%d/%d/%d/%d fly=%d phase1=%d+%d phase2=%d+%d phase3=%d+%d phase4=%d+%d\n",
brick->do_replay, brick->do_continuous_replay, brick->replay_code, brick->log_reads,
2011-11-15 17:32:20 +00:00
brick->replay_start_pos, brick->replay_end_pos,
brick->new_input_nr, brick->log_input_nr, brick->old_input_nr,
brick->inputs[TL_INPUT_LOG1]->replay_min_pos, brick->inputs[TL_INPUT_LOG1]->replay_max_pos,
brick->inputs[TL_INPUT_LOG2]->replay_min_pos, brick->inputs[TL_INPUT_LOG2]->replay_max_pos,
atomic_read(&brick->total_replay_count), atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_flush_count), atomic_read(&brick->total_write_count) ? atomic_read(&brick->total_flush_count) * 100 / atomic_read(&brick->total_write_count) : 0, atomic_read(&brick->total_writeback_cluster_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_writeback_cluster_count) ? atomic_read(&brick->total_writeback_count) * 100 / atomic_read(&brick->total_writeback_cluster_count) : 0, atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->total_delay_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total),
atomic64_read(&brick->shadow_mem_used), brick_global_memlimit, atomic_read(&brick->replay_count), atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying));
2011-04-01 11:18:32 +00:00
return res;
}
2011-04-08 09:52:46 +00:00
static noinline
2011-04-01 11:18:32 +00:00
void trans_logger_reset_statistics(struct trans_logger_brick *brick)
{
2011-06-10 13:57:52 +00:00
atomic_set(&brick->total_replay_count, 0);
2011-04-29 09:36:10 +00:00
atomic_set(&brick->total_cb_count, 0);
atomic_set(&brick->total_read_count, 0);
atomic_set(&brick->total_write_count, 0);
2011-05-13 11:19:28 +00:00
atomic_set(&brick->total_flush_count, 0);
2011-04-29 09:36:10 +00:00
atomic_set(&brick->total_writeback_count, 0);
2011-05-13 11:19:28 +00:00
atomic_set(&brick->total_writeback_cluster_count, 0);
2011-04-29 09:36:10 +00:00
atomic_set(&brick->total_shortcut_count, 0);
atomic_set(&brick->total_mshadow_count, 0);
atomic_set(&brick->total_sshadow_count, 0);
2011-05-13 11:19:28 +00:00
atomic_set(&brick->total_round_count, 0);
atomic_set(&brick->total_restart_count, 0);
atomic_set(&brick->total_delay_count, 0);
2011-04-01 11:18:32 +00:00
}
2010-08-20 10:58:24 +00:00
2010-08-08 20:51:20 +00:00
//////////////// object / aspect constructors / destructors ///////////////
2011-04-08 09:52:46 +00:00
static noinline
int trans_logger_mref_aspect_init_fn(struct generic_aspect *_ini)
2010-08-08 20:51:20 +00:00
{
2010-12-15 12:13:18 +00:00
struct trans_logger_mref_aspect *ini = (void*)_ini;
2011-04-18 14:14:16 +00:00
ini->lh.lh_pos = &ini->object->ref_pos;
INIT_LIST_HEAD(&ini->lh.lh_head);
2010-08-08 20:51:20 +00:00
INIT_LIST_HEAD(&ini->hash_head);
2011-03-20 17:38:08 +00:00
INIT_LIST_HEAD(&ini->pos_head);
2011-04-08 09:52:46 +00:00
INIT_LIST_HEAD(&ini->replay_head);
INIT_LIST_HEAD(&ini->collect_head);
INIT_LIST_HEAD(&ini->sub_list);
INIT_LIST_HEAD(&ini->sub_head);
2010-08-08 20:51:20 +00:00
return 0;
}
2011-04-08 09:52:46 +00:00
static noinline
void trans_logger_mref_aspect_exit_fn(struct generic_aspect *_ini)
2010-08-08 20:51:20 +00:00
{
2010-12-15 12:13:18 +00:00
struct trans_logger_mref_aspect *ini = (void*)_ini;
2011-04-18 14:14:16 +00:00
CHECK_HEAD_EMPTY(&ini->lh.lh_head);
2010-08-08 20:51:20 +00:00
CHECK_HEAD_EMPTY(&ini->hash_head);
2011-04-08 09:52:46 +00:00
CHECK_HEAD_EMPTY(&ini->pos_head);
CHECK_HEAD_EMPTY(&ini->replay_head);
CHECK_HEAD_EMPTY(&ini->collect_head);
CHECK_HEAD_EMPTY(&ini->sub_list);
CHECK_HEAD_EMPTY(&ini->sub_head);
2010-08-08 20:51:20 +00:00
}
MARS_MAKE_STATICS(trans_logger);
////////////////////// brick constructors / destructors ////////////////////
2011-04-08 09:52:46 +00:00
static noinline
int trans_logger_brick_construct(struct trans_logger_brick *brick)
2010-08-08 20:51:20 +00:00
{
2011-04-29 09:36:10 +00:00
int i;
for (i = 0; i < TRANS_HASH_MAX; i++) {
struct hash_anchor *start = &brick->hash_table[i];
2011-06-30 13:15:52 +00:00
//rwlock_init(&start->hash_lock);
init_rwsem(&start->hash_mutex);
2011-04-29 09:36:10 +00:00
INIT_LIST_HEAD(&start->hash_anchor);
}
atomic_set(&brick->hash_count, 0);
2011-04-08 09:52:46 +00:00
spin_lock_init(&brick->replay_lock);
INIT_LIST_HEAD(&brick->replay_list);
2011-06-30 13:15:52 +00:00
init_waitqueue_head(&brick->worker_event);
init_waitqueue_head(&brick->caller_event);
2011-04-29 09:36:10 +00:00
qq_init(&brick->q_phase1, brick);
qq_init(&brick->q_phase2, brick);
qq_init(&brick->q_phase3, brick);
qq_init(&brick->q_phase4, brick);
#if 1
brick->q_phase2.q_dep = &brick->q_phase4;
brick->q_phase4.q_dep = &brick->q_phase1;
#endif
brick->q_phase1.q_insert_info = "q1_ins";
brick->q_phase1.q_pushback_info = "q1_push";
brick->q_phase1.q_fetch_info = "q1_fetch";
brick->q_phase2.q_insert_info = "q2_ins";
brick->q_phase2.q_pushback_info = "q2_push";
brick->q_phase2.q_fetch_info = "q2_fetch";
brick->q_phase3.q_insert_info = "q3_ins";
brick->q_phase3.q_pushback_info = "q3_push";
brick->q_phase3.q_fetch_info = "q3_fetch";
brick->q_phase4.q_insert_info = "q4_ins";
brick->q_phase4.q_pushback_info = "q4_push";
brick->q_phase4.q_fetch_info = "q4_fetch";
brick->new_input_nr = TL_INPUT_LOG1;
brick->log_input_nr = TL_INPUT_LOG1;
brick->old_input_nr = TL_INPUT_LOG1;
2010-08-08 20:51:20 +00:00
return 0;
}
2011-04-08 09:52:46 +00:00
static noinline
int trans_logger_output_construct(struct trans_logger_output *output)
2010-08-08 20:51:20 +00:00
{
2010-08-11 16:02:08 +00:00
return 0;
}
2011-04-08 09:52:46 +00:00
static noinline
int trans_logger_input_construct(struct trans_logger_input *input)
2010-08-11 16:02:08 +00:00
{
2011-11-16 11:02:11 +00:00
spin_lock_init(&input->pos_lock);
INIT_LIST_HEAD(&input->pos_list);
return 0;
}
static noinline
int trans_logger_input_destruct(struct trans_logger_input *input)
{
CHECK_HEAD_EMPTY(&input->pos_list);
brick_string_free(input->inf_host);
input->inf_host = NULL;
2010-08-08 20:51:20 +00:00
return 0;
}
///////////////////////// static structs ////////////////////////
static struct trans_logger_brick_ops trans_logger_brick_ops = {
2011-02-23 20:48:06 +00:00
.brick_switch = trans_logger_switch,
2011-04-01 11:18:32 +00:00
.brick_statistics = trans_logger_statistics,
.reset_statistics = trans_logger_reset_statistics,
2010-08-08 20:51:20 +00:00
};
static struct trans_logger_output_ops trans_logger_output_ops = {
.mars_get_info = trans_logger_get_info,
2010-12-15 12:13:18 +00:00
.mref_get = trans_logger_ref_get,
.mref_put = trans_logger_ref_put,
.mref_io = trans_logger_ref_io,
2010-08-08 20:51:20 +00:00
};
2010-08-10 17:39:30 +00:00
const struct trans_logger_input_type trans_logger_input_type = {
2010-08-08 20:51:20 +00:00
.type_name = "trans_logger_input",
.input_size = sizeof(struct trans_logger_input),
2010-08-11 16:02:08 +00:00
.input_construct = &trans_logger_input_construct,
.input_destruct = &trans_logger_input_destruct,
2010-08-08 20:51:20 +00:00
};
static const struct trans_logger_input_type *trans_logger_input_types[] = {
&trans_logger_input_type,
2010-08-09 16:57:56 +00:00
&trans_logger_input_type,
&trans_logger_input_type,
2011-05-13 11:19:28 +00:00
&trans_logger_input_type,
&trans_logger_input_type,
&trans_logger_input_type,
2010-08-08 20:51:20 +00:00
};
2010-08-10 17:39:30 +00:00
const struct trans_logger_output_type trans_logger_output_type = {
2010-08-08 20:51:20 +00:00
.type_name = "trans_logger_output",
.output_size = sizeof(struct trans_logger_output),
.master_ops = &trans_logger_output_ops,
.output_construct = &trans_logger_output_construct,
};
static const struct trans_logger_output_type *trans_logger_output_types[] = {
&trans_logger_output_type,
};
const struct trans_logger_brick_type trans_logger_brick_type = {
.type_name = "trans_logger_brick",
.brick_size = sizeof(struct trans_logger_brick),
2011-05-13 11:19:28 +00:00
.max_inputs = TL_INPUT_NR,
2010-08-08 20:51:20 +00:00
.max_outputs = 1,
.master_ops = &trans_logger_brick_ops,
.aspect_types = trans_logger_aspect_types,
2010-08-08 20:51:20 +00:00
.default_input_types = trans_logger_input_types,
.default_output_types = trans_logger_output_types,
.brick_construct = &trans_logger_brick_construct,
};
EXPORT_SYMBOL_GPL(trans_logger_brick_type);
////////////////// module init stuff /////////////////////////
2011-08-25 10:16:32 +00:00
int __init init_mars_trans_logger(void)
2010-08-08 20:51:20 +00:00
{
2011-02-23 20:48:06 +00:00
MARS_INF("init_trans_logger()\n");
2010-08-08 20:51:20 +00:00
return trans_logger_register_brick_type();
}
2011-08-25 10:16:32 +00:00
void __exit exit_mars_trans_logger(void)
2010-08-08 20:51:20 +00:00
{
2011-02-23 20:48:06 +00:00
MARS_INF("exit_trans_logger()\n");
2010-08-08 20:51:20 +00:00
trans_logger_unregister_brick_type();
}
2011-08-25 10:16:32 +00:00
#ifndef CONFIG_MARS_HAVE_BIGMODULE
2010-08-08 20:51:20 +00:00
MODULE_DESCRIPTION("MARS trans_logger brick");
MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@1und1.de>");
MODULE_LICENSE("GPL");
2011-08-25 10:16:32 +00:00
module_init(init_mars_trans_logger);
module_exit(exit_mars_trans_logger);
#endif