mirror of https://github.com/schoebel/mars
import mars-99.tgz
This commit is contained in:
parent
356b102142
commit
a6d10aaa72
|
@ -36,7 +36,7 @@ struct pairing_heap_##KEYTYPE *_ph_merge_##KEYTYPE(struct pairing_heap_##KEYTYPE
|
|||
return heap2; \
|
||||
if (!heap2) \
|
||||
return heap1; \
|
||||
if (CMP(heap1, heap2)) { \
|
||||
if (CMP(heap1, heap2) < 0) { \
|
||||
heap2->next = heap1->subheaps; \
|
||||
heap1->subheaps = heap2; \
|
||||
return heap1; \
|
||||
|
@ -83,7 +83,7 @@ void ph_delete_min_##KEYTYPE(struct pairing_heap_##KEYTYPE **heap) \
|
|||
}
|
||||
|
||||
/* some default CMP() function */
|
||||
#define PAIRING_HEAP_COMPARE(a,b) ((a)->key < (b)->key)
|
||||
#define PAIRING_HEAP_COMPARE(a,b) ((a)->key < (b)->key ? -1 : ((a)->key > (b)->key ? 1 : 0))
|
||||
|
||||
/* less generic version: use the default CMP() function */
|
||||
#define PAIRING_HEAP_FUNCTIONS(_STATIC,KEYTYPE) \
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
// (c) 2011 Thomas Schoebel-Theuer / 1&1 Internet AG
|
||||
|
||||
#ifndef LIB_QUEUE_H
|
||||
#define LIB_QUEUE_H
|
||||
|
||||
#define QUEUE_ANCHOR(PREFIX,KEYTYPE,HEAPTYPE) \
|
||||
struct PREFIX##_queue *q_dep; \
|
||||
atomic_t *q_dep_plus; \
|
||||
struct list_head q_anchor; \
|
||||
struct pairing_heap_##HEAPTYPE *heap_high; \
|
||||
struct pairing_heap_##HEAPTYPE *heap_low; \
|
||||
long long q_last_insert; /* jiffies */ \
|
||||
KEYTYPE heap_margin; \
|
||||
KEYTYPE last_pos; \
|
||||
spinlock_t q_lock; \
|
||||
/* readonly from outside */ \
|
||||
atomic_t q_queued; \
|
||||
atomic_t q_flying; \
|
||||
atomic_t q_total; \
|
||||
/* tunables */ \
|
||||
int q_batchlen; \
|
||||
int q_max_queued; \
|
||||
int q_max_flying; \
|
||||
int q_max_jiffies; \
|
||||
int q_max_contention; \
|
||||
int q_over_pressure; \
|
||||
int q_io_prio; \
|
||||
bool q_ordering; \
|
||||
|
||||
|
||||
#define QUEUE_FUNCTIONS(PREFIX,ELEM_TYPE,HEAD,KEYFN,KEYCMP,HEAPTYPE) \
|
||||
\
|
||||
static inline \
|
||||
void q_##PREFIX##_init(struct PREFIX##_queue *q) \
|
||||
{ \
|
||||
INIT_LIST_HEAD(&q->q_anchor); \
|
||||
q->heap_low = NULL; \
|
||||
q->heap_high = NULL; \
|
||||
spin_lock_init(&q->q_lock); \
|
||||
atomic_set(&q->q_queued, 0); \
|
||||
atomic_set(&q->q_flying, 0); \
|
||||
} \
|
||||
\
|
||||
static inline \
|
||||
void q_##PREFIX##_insert(struct PREFIX##_queue *q, ELEM_TYPE *elem) \
|
||||
{ \
|
||||
unsigned long flags; \
|
||||
\
|
||||
traced_lock(&q->q_lock, flags); \
|
||||
\
|
||||
if (q->q_ordering) { \
|
||||
struct pairing_heap_##HEAPTYPE **use = &q->heap_high; \
|
||||
if (KEYCMP(KEYFN(elem), &q->heap_margin) <= 0) { \
|
||||
use = &q->heap_low; \
|
||||
} \
|
||||
ph_insert_##HEAPTYPE(use, &elem->ph); \
|
||||
} else { \
|
||||
list_add_tail(&elem->HEAD, &q->q_anchor); \
|
||||
} \
|
||||
atomic_inc(&q->q_queued); \
|
||||
atomic_inc(&q->q_total); \
|
||||
q->q_last_insert = jiffies; \
|
||||
\
|
||||
traced_unlock(&q->q_lock, flags); \
|
||||
} \
|
||||
\
|
||||
static inline \
|
||||
void q_##PREFIX##_pushback(struct PREFIX##_queue *q, ELEM_TYPE *elem) \
|
||||
{ \
|
||||
unsigned long flags; \
|
||||
\
|
||||
if (q->q_ordering) { \
|
||||
atomic_dec(&q->q_total); \
|
||||
q_##PREFIX##_insert(q, elem); \
|
||||
return; \
|
||||
} \
|
||||
\
|
||||
traced_lock(&q->q_lock, flags); \
|
||||
\
|
||||
list_add(&elem->HEAD, &q->q_anchor); \
|
||||
atomic_inc(&q->q_queued); \
|
||||
\
|
||||
traced_unlock(&q->q_lock, flags); \
|
||||
} \
|
||||
\
|
||||
static inline \
|
||||
ELEM_TYPE *q_##PREFIX##_fetch(struct PREFIX##_queue *q) \
|
||||
{ \
|
||||
ELEM_TYPE *elem = NULL; \
|
||||
unsigned long flags; \
|
||||
\
|
||||
traced_lock(&q->q_lock, flags); \
|
||||
\
|
||||
if (q->q_ordering) { \
|
||||
if (!q->heap_high) { \
|
||||
q->heap_high = q->heap_low; \
|
||||
q->heap_low = NULL; \
|
||||
q->heap_margin = 0; \
|
||||
q->last_pos = 0; \
|
||||
} \
|
||||
if (q->heap_high) { \
|
||||
elem = container_of(q->heap_high, ELEM_TYPE, ph); \
|
||||
\
|
||||
if (unlikely(KEYCMP(KEYFN(elem), &q->last_pos) < 0)) { \
|
||||
MARS_ERR("backskip pos %lld -> %lld\n", q->last_pos, KEYFN(elem)); \
|
||||
} \
|
||||
memcpy(&q->last_pos, KEYFN(elem), sizeof(q->last_pos)); \
|
||||
\
|
||||
if (KEYCMP(KEYFN(elem), &q->heap_margin) > 0) { \
|
||||
memcpy(&q->heap_margin, KEYFN(elem), sizeof(q->heap_margin)); \
|
||||
} \
|
||||
ph_delete_min_##HEAPTYPE(&q->heap_high); \
|
||||
atomic_dec(&q->q_queued); \
|
||||
} \
|
||||
} else if (!list_empty(&q->q_anchor)) { \
|
||||
struct list_head *next = q->q_anchor.next; \
|
||||
list_del_init(next); \
|
||||
atomic_dec(&q->q_queued); \
|
||||
elem = container_of(next, ELEM_TYPE, HEAD); \
|
||||
} \
|
||||
\
|
||||
traced_unlock(&q->q_lock, flags); \
|
||||
\
|
||||
return elem; \
|
||||
}
|
||||
|
||||
#endif
|
|
@ -36,132 +36,43 @@
|
|||
#endif
|
||||
|
||||
static inline
|
||||
bool qq_cmp(struct pairing_heap_mref *_a, struct pairing_heap_mref *_b)
|
||||
int lh_cmp(loff_t *a, loff_t *b)
|
||||
{
|
||||
struct trans_logger_mref_aspect *mref_a = container_of(_a, struct trans_logger_mref_aspect, ph);
|
||||
struct trans_logger_mref_aspect *mref_b = container_of(_b, struct trans_logger_mref_aspect, ph);
|
||||
struct mref_object *a = mref_a->object;
|
||||
struct mref_object *b = mref_b->object;
|
||||
return a->ref_pos < b->ref_pos;
|
||||
}
|
||||
|
||||
_PAIRING_HEAP_FUNCTIONS(static,mref,qq_cmp);
|
||||
|
||||
|
||||
//////////////////////// generic queue handling (TBD!!!) /////////////////////
|
||||
|
||||
static inline
|
||||
void q_init(struct logger_queue *q, struct trans_logger_output *output)
|
||||
{
|
||||
q->q_output = output;
|
||||
INIT_LIST_HEAD(&q->q_anchor);
|
||||
q->heap_low = NULL;
|
||||
q->heap_high = NULL;
|
||||
spin_lock_init(&q->q_lock);
|
||||
atomic_set(&q->q_queued, 0);
|
||||
atomic_set(&q->q_flying, 0);
|
||||
if (*a < *b)
|
||||
return -1;
|
||||
if (*a > *b)
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline
|
||||
void q_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
||||
int tr_cmp(struct pairing_heap_logger *_a, struct pairing_heap_logger *_b)
|
||||
{
|
||||
unsigned long flags;
|
||||
|
||||
traced_lock(&q->q_lock, flags);
|
||||
|
||||
if (q->q_ordering) {
|
||||
struct pairing_heap_mref **use = &q->heap_high;
|
||||
if (mref_a->object->ref_pos <= q->heap_margin) {
|
||||
use = &q->heap_low;
|
||||
}
|
||||
ph_insert_mref(use, &mref_a->ph);
|
||||
} else {
|
||||
list_add_tail(&mref_a->q_head, &q->q_anchor);
|
||||
}
|
||||
atomic_inc(&q->q_queued);
|
||||
atomic_inc(&q->q_total);
|
||||
q->q_last_insert = jiffies;
|
||||
|
||||
traced_unlock(&q->q_lock, flags);
|
||||
struct logger_head *a = container_of(_a, struct logger_head, ph);
|
||||
struct logger_head *b = container_of(_b, struct logger_head, ph);
|
||||
return lh_cmp(a->lh_pos, b->lh_pos);
|
||||
}
|
||||
|
||||
_PAIRING_HEAP_FUNCTIONS(static,logger,tr_cmp);
|
||||
|
||||
static inline
|
||||
void q_pushback(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
||||
loff_t *lh_get(struct logger_head *th)
|
||||
{
|
||||
unsigned long flags;
|
||||
|
||||
CHECK_ATOMIC(&mref_a->object->ref_count, 1);
|
||||
|
||||
mars_trace(mref_a->object, q->q_pushback_info);
|
||||
|
||||
if (q->q_ordering) {
|
||||
atomic_dec(&q->q_total);
|
||||
q_insert(q, mref_a);
|
||||
return;
|
||||
}
|
||||
|
||||
traced_lock(&q->q_lock, flags);
|
||||
|
||||
list_add(&mref_a->q_head, &q->q_anchor);
|
||||
atomic_inc(&q->q_queued);
|
||||
//q->q_last_insert = jiffies;
|
||||
|
||||
traced_unlock(&q->q_lock, flags);
|
||||
return th->lh_pos;
|
||||
}
|
||||
|
||||
static inline
|
||||
struct trans_logger_mref_aspect *q_fetch(struct logger_queue *q)
|
||||
{
|
||||
struct trans_logger_mref_aspect *mref_a = NULL;
|
||||
unsigned long flags;
|
||||
|
||||
traced_lock(&q->q_lock, flags);
|
||||
|
||||
if (q->q_ordering) {
|
||||
if (!q->heap_high) {
|
||||
q->heap_high = q->heap_low;
|
||||
q->heap_low = NULL;
|
||||
q->heap_margin = 0;
|
||||
q->last_pos = 0;
|
||||
}
|
||||
if (q->heap_high) {
|
||||
struct mref_object *mref;
|
||||
loff_t new_margin;
|
||||
mref_a = container_of(q->heap_high, struct trans_logger_mref_aspect, ph);
|
||||
mref = mref_a->object;
|
||||
#if 1
|
||||
if (unlikely(mref->ref_pos < q->last_pos)) {
|
||||
MARS_ERR("backskip pos %lld -> %lld len = %d\n", q->last_pos, mref->ref_pos, mref->ref_len);
|
||||
}
|
||||
q->last_pos = mref->ref_pos;
|
||||
#endif
|
||||
mref_a->fetch_margin = q->heap_margin;
|
||||
new_margin = mref->ref_pos + mref->ref_len;
|
||||
if (new_margin > q->heap_margin) {
|
||||
q->heap_margin = new_margin;
|
||||
}
|
||||
ph_delete_min_mref(&q->heap_high);
|
||||
atomic_dec(&q->q_queued);
|
||||
}
|
||||
} else if (!list_empty(&q->q_anchor)) {
|
||||
struct list_head *next = q->q_anchor.next;
|
||||
list_del_init(next);
|
||||
atomic_dec(&q->q_queued);
|
||||
mref_a = container_of(next, struct trans_logger_mref_aspect, q_head);
|
||||
}
|
||||
|
||||
traced_unlock(&q->q_lock, flags);
|
||||
|
||||
if (mref_a) {
|
||||
CHECK_ATOMIC(&mref_a->object->ref_count, 1);
|
||||
mars_trace(mref_a->object, q->q_fetch_info);
|
||||
}
|
||||
|
||||
return mref_a;
|
||||
}
|
||||
//QUEUE_FUNCTIONS(logger,struct trans_logger_mref_aspect,th.th_head,MREF_KEY_FN,th_cmp,mref);
|
||||
QUEUE_FUNCTIONS(logger,struct logger_head,lh_head,lh_get,lh_cmp,logger);
|
||||
|
||||
////////////////////////// logger queue handling ////////////////////////
|
||||
|
||||
static inline
|
||||
void qq_init(struct logger_queue *q, struct trans_logger_output *output)
|
||||
{
|
||||
q_logger_init(q);
|
||||
q->q_output = output;
|
||||
}
|
||||
|
||||
static noinline
|
||||
bool qq_is_ready(struct logger_queue *q)
|
||||
{
|
||||
|
@ -246,9 +157,34 @@ void qq_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
|||
|
||||
mars_trace(mref, q->q_insert_info);
|
||||
|
||||
q_insert(q, mref_a);
|
||||
q_logger_insert(q, &mref_a->lh);
|
||||
}
|
||||
|
||||
static inline
|
||||
void qq_pushback(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a)
|
||||
{
|
||||
CHECK_ATOMIC(&mref_a->object->ref_count, 1);
|
||||
|
||||
mars_trace(mref_a->object, q->q_pushback_info);
|
||||
|
||||
q_logger_pushback(q, &mref_a->lh);
|
||||
}
|
||||
|
||||
static inline
|
||||
struct trans_logger_mref_aspect *qq_fetch(struct logger_queue *q)
|
||||
{
|
||||
struct logger_head *test;
|
||||
struct trans_logger_mref_aspect *mref_a = NULL;
|
||||
|
||||
test = q_logger_fetch(q);
|
||||
|
||||
if (test) {
|
||||
mref_a = container_of(test, struct trans_logger_mref_aspect, lh);
|
||||
CHECK_ATOMIC(&mref_a->object->ref_count, 1);
|
||||
mars_trace(mref_a->object, q->q_fetch_info);
|
||||
}
|
||||
return mref_a;
|
||||
}
|
||||
|
||||
///////////////////////// own helper functions ////////////////////////
|
||||
|
||||
|
@ -712,8 +648,8 @@ restart:
|
|||
return;
|
||||
}
|
||||
|
||||
CHECK_HEAD_EMPTY(&mref_a->lh.lh_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->hash_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->q_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->replay_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->collect_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->sub_list);
|
||||
|
@ -837,8 +773,8 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
|
|||
shadow_a = mref_a->shadow_ref;
|
||||
if (shadow_a) {
|
||||
#if 1
|
||||
CHECK_HEAD_EMPTY(&mref_a->lh.lh_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->hash_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->q_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->pos_head);
|
||||
#endif
|
||||
atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put()
|
||||
|
@ -894,11 +830,41 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
|
|||
traced_unlock(&brick->pos_lock, flags);
|
||||
}
|
||||
|
||||
static inline
|
||||
void _free_one(struct list_head *tmp)
|
||||
{
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct mref_object *sub_mref;
|
||||
|
||||
list_del_init(tmp);
|
||||
|
||||
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
|
||||
sub_mref = sub_mref_a->object;
|
||||
|
||||
trans_logger_free_mref(sub_mref);
|
||||
}
|
||||
|
||||
static noinline
|
||||
void free_writeback(struct writeback_info *wb)
|
||||
{
|
||||
struct list_head *tmp;
|
||||
|
||||
if (unlikely(wb->w_error < 0)) {
|
||||
MARS_ERR("writeback error = %d at pos = %lld len = %d, writeback is incomplete\n", wb->w_error, wb->w_pos, wb->w_len);
|
||||
}
|
||||
|
||||
/* The sub_read and sub_write lists are usually empty here.
|
||||
* This code is only for cleanup in case of errors.
|
||||
*/
|
||||
while (unlikely((tmp = wb->w_sub_read_list.next) != &wb->w_sub_read_list)) {
|
||||
_free_one(tmp);
|
||||
}
|
||||
while (unlikely((tmp = wb->w_sub_write_list.next) != &wb->w_sub_write_list)) {
|
||||
_free_one(tmp);
|
||||
}
|
||||
|
||||
/* Now complete the original requests.
|
||||
*/
|
||||
while ((tmp = wb->w_collect_list.next) != &wb->w_collect_list) {
|
||||
struct trans_logger_mref_aspect *orig_mref_a;
|
||||
struct mref_object *orig_mref;
|
||||
|
@ -910,14 +876,18 @@ void free_writeback(struct writeback_info *wb)
|
|||
|
||||
CHECK_ATOMIC(&orig_mref->ref_count, 1);
|
||||
|
||||
pos_complete(orig_mref_a);
|
||||
if (likely(wb->w_error >= 0)) {
|
||||
pos_complete(orig_mref_a);
|
||||
}
|
||||
|
||||
__trans_logger_ref_put(orig_mref_a->output, orig_mref_a);
|
||||
}
|
||||
//...
|
||||
|
||||
kfree(wb);
|
||||
}
|
||||
|
||||
/* Generic endio() for writeback_info
|
||||
*/
|
||||
static noinline
|
||||
void wb_endio(struct generic_callback *cb)
|
||||
{
|
||||
|
@ -940,6 +910,10 @@ void wb_endio(struct generic_callback *cb)
|
|||
|
||||
atomic_dec(&output->wb_balance_count);
|
||||
|
||||
if (cb->cb_error < 0) {
|
||||
wb->w_error = cb->cb_error;
|
||||
}
|
||||
|
||||
rw = sub_mref->ref_rw;
|
||||
dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count;
|
||||
CHECK_ATOMIC(dec, 1);
|
||||
|
@ -948,7 +922,7 @@ void wb_endio(struct generic_callback *cb)
|
|||
}
|
||||
|
||||
endio = rw ? wb->write_endio : wb->read_endio;
|
||||
if (endio) {
|
||||
if (likely(endio)) {
|
||||
endio(cb);
|
||||
}
|
||||
return;
|
||||
|
@ -957,6 +931,12 @@ err:
|
|||
MARS_FAT("hanging up....\n");
|
||||
}
|
||||
|
||||
/* Atomically create writeback info, based on "snapshot" of current hash
|
||||
* state.
|
||||
* Notice that the hash can change during writeback IO, thus we need
|
||||
* struct writeback_info to precisely catch that information at a single
|
||||
* point in time.
|
||||
*/
|
||||
static noinline
|
||||
struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t pos, int len)
|
||||
{
|
||||
|
@ -977,6 +957,9 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
|
|||
MARS_ERR("len = %d\n", len);
|
||||
}
|
||||
|
||||
/* Atomically fetch transitive closure on all requests
|
||||
* overlapping with the current search region.
|
||||
*/
|
||||
hash_extend(output, &wb->w_pos, &wb->w_len, &wb->w_collect_list);
|
||||
|
||||
pos = wb->w_pos;
|
||||
|
@ -986,6 +969,55 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t
|
|||
MARS_ERR("len = %d\n", len);
|
||||
}
|
||||
|
||||
/* Create sub_mrefs for read of old disk version (phase2)
|
||||
*/
|
||||
if (brick->log_reads) {
|
||||
while (len > 0) {
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct mref_object *sub_mref;
|
||||
int this_len;
|
||||
int status;
|
||||
|
||||
sub_mref = trans_logger_alloc_mref((void*)output, &output->writeback_layout);
|
||||
if (unlikely(!sub_mref)) {
|
||||
MARS_FAT("cannot alloc sub_mref\n");
|
||||
goto err;
|
||||
}
|
||||
|
||||
sub_mref->ref_pos = pos;
|
||||
sub_mref->ref_len = len;
|
||||
sub_mref->ref_may_write = READ;
|
||||
sub_mref->ref_rw = READ;
|
||||
sub_mref->ref_data = NULL;
|
||||
|
||||
sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)output, sub_mref);
|
||||
CHECK_PTR(sub_mref_a, err);
|
||||
|
||||
sub_mref_a->output = output;
|
||||
sub_mref_a->wb = wb;
|
||||
|
||||
status = GENERIC_INPUT_CALL(sub_input, mref_get, sub_mref);
|
||||
if (unlikely(status < 0)) {
|
||||
MARS_FAT("cannot get sub_ref, status = %d\n", status);
|
||||
goto err;
|
||||
}
|
||||
|
||||
list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_read_list);
|
||||
atomic_inc(&wb->w_sub_read_count);
|
||||
atomic_inc(&output->wb_balance_count);
|
||||
|
||||
this_len = sub_mref->ref_len;
|
||||
pos += this_len;
|
||||
len -= this_len;
|
||||
}
|
||||
/* Re-init for startover
|
||||
*/
|
||||
pos = wb->w_pos;
|
||||
len = wb->w_len;
|
||||
}
|
||||
|
||||
/* Create sub_mrefs for writeback (phase4)
|
||||
*/
|
||||
while (len > 0) {
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct mref_object *sub_mref;
|
||||
|
@ -1230,8 +1262,8 @@ bool phase0_startio(struct trans_logger_mref_aspect *mref_a)
|
|||
}
|
||||
// else WRITE
|
||||
#if 1
|
||||
CHECK_HEAD_EMPTY(&mref_a->lh.lh_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->hash_head);
|
||||
CHECK_HEAD_EMPTY(&mref_a->q_head);
|
||||
if (unlikely(mref->ref_flags & (MREF_READING | MREF_WRITING))) {
|
||||
MARS_ERR("bad flags %d\n", mref->ref_flags);
|
||||
}
|
||||
|
@ -1282,7 +1314,7 @@ err:
|
|||
atomic_t provisionary_count = ATOMIC_INIT(0);
|
||||
|
||||
static noinline
|
||||
void new_endio(struct generic_callback *cb)
|
||||
void phase2_endio(struct generic_callback *cb)
|
||||
{
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct writeback_info *wb;
|
||||
|
@ -1301,19 +1333,21 @@ void new_endio(struct generic_callback *cb)
|
|||
goto err;
|
||||
}
|
||||
|
||||
hash_put_all(wb->w_output, &wb->w_collect_list);
|
||||
|
||||
free_writeback(wb);
|
||||
|
||||
atomic_dec(&provisionary_count);
|
||||
wake_up_interruptible(&output->event);
|
||||
|
||||
|
||||
// queue up for the next phase
|
||||
//qq_insert(&output->q_phase3, orig_mref_a);
|
||||
wake_up_interruptible(&output->event);
|
||||
return;
|
||||
|
||||
err:
|
||||
MARS_FAT("hanging up....\n");
|
||||
}
|
||||
|
||||
static noinline
|
||||
void phase4_endio(struct generic_callback *cb);
|
||||
|
||||
static noinline
|
||||
bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
||||
{
|
||||
|
@ -1353,7 +1387,8 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
|||
goto done;
|
||||
}
|
||||
|
||||
wb->write_endio = new_endio;
|
||||
wb->read_endio = phase2_endio;
|
||||
wb->write_endio = phase4_endio;
|
||||
fire_writeback(wb, &wb->w_sub_write_list);
|
||||
|
||||
done:
|
||||
|
@ -1497,6 +1532,40 @@ err:
|
|||
* Phase 4: overwrite old disk version with new version.
|
||||
*/
|
||||
|
||||
static noinline
|
||||
void phase4_endio(struct generic_callback *cb)
|
||||
{
|
||||
struct trans_logger_mref_aspect *sub_mref_a;
|
||||
struct writeback_info *wb;
|
||||
struct trans_logger_output *output;
|
||||
|
||||
CHECK_PTR(cb, err);
|
||||
sub_mref_a = cb->cb_private;
|
||||
CHECK_PTR(sub_mref_a, err);
|
||||
wb = sub_mref_a->wb;
|
||||
CHECK_PTR(wb, err);
|
||||
output = wb->w_output;
|
||||
CHECK_PTR(output, err);
|
||||
|
||||
if (unlikely(cb->cb_error < 0)) {
|
||||
MARS_FAT("IO error %d\n", cb->cb_error);
|
||||
goto err;
|
||||
}
|
||||
|
||||
hash_put_all(wb->w_output, &wb->w_collect_list);
|
||||
|
||||
free_writeback(wb);
|
||||
|
||||
atomic_dec(&provisionary_count);
|
||||
wake_up_interruptible(&output->event);
|
||||
|
||||
return;
|
||||
|
||||
err:
|
||||
MARS_FAT("hanging up....\n");
|
||||
}
|
||||
|
||||
|
||||
#ifndef NEW_CODE
|
||||
|
||||
static noinline
|
||||
|
@ -1739,7 +1808,7 @@ int run_queue(struct trans_logger_output *output, struct logger_queue *q, bool (
|
|||
int res;
|
||||
|
||||
while (max-- > 0) {
|
||||
mref_a = q_fetch(q);
|
||||
mref_a = qq_fetch(q);
|
||||
res = -1;
|
||||
if (!mref_a)
|
||||
goto done;
|
||||
|
@ -1748,7 +1817,7 @@ int run_queue(struct trans_logger_output *output, struct logger_queue *q, bool (
|
|||
|
||||
ok = startio(mref_a);
|
||||
if (unlikely(!ok)) {
|
||||
q_pushback(q, mref_a);
|
||||
qq_pushback(q, mref_a);
|
||||
output->did_pushback = true;
|
||||
res = 1;
|
||||
goto done;
|
||||
|
@ -2224,8 +2293,9 @@ static noinline
|
|||
int trans_logger_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
|
||||
{
|
||||
struct trans_logger_mref_aspect *ini = (void*)_ini;
|
||||
ini->lh.lh_pos = &ini->object->ref_pos;
|
||||
INIT_LIST_HEAD(&ini->lh.lh_head);
|
||||
INIT_LIST_HEAD(&ini->hash_head);
|
||||
INIT_LIST_HEAD(&ini->q_head);
|
||||
INIT_LIST_HEAD(&ini->pos_head);
|
||||
INIT_LIST_HEAD(&ini->replay_head);
|
||||
INIT_LIST_HEAD(&ini->collect_head);
|
||||
|
@ -2238,8 +2308,8 @@ static noinline
|
|||
void trans_logger_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
|
||||
{
|
||||
struct trans_logger_mref_aspect *ini = (void*)_ini;
|
||||
CHECK_HEAD_EMPTY(&ini->lh.lh_head);
|
||||
CHECK_HEAD_EMPTY(&ini->hash_head);
|
||||
CHECK_HEAD_EMPTY(&ini->q_head);
|
||||
CHECK_HEAD_EMPTY(&ini->pos_head);
|
||||
CHECK_HEAD_EMPTY(&ini->replay_head);
|
||||
CHECK_HEAD_EMPTY(&ini->collect_head);
|
||||
|
@ -2272,10 +2342,10 @@ int trans_logger_output_construct(struct trans_logger_output *output)
|
|||
}
|
||||
atomic_set(&output->hash_count, 0);
|
||||
init_waitqueue_head(&output->event);
|
||||
q_init(&output->q_phase1, output);
|
||||
q_init(&output->q_phase2, output);
|
||||
q_init(&output->q_phase3, output);
|
||||
q_init(&output->q_phase4, output);
|
||||
qq_init(&output->q_phase1, output);
|
||||
qq_init(&output->q_phase2, output);
|
||||
qq_init(&output->q_phase3, output);
|
||||
qq_init(&output->q_phase4, output);
|
||||
#if 1
|
||||
output->q_phase2.q_dep = &output->q_phase3;
|
||||
output->q_phase3.q_dep = &output->q_phase4;
|
||||
|
|
|
@ -10,37 +10,25 @@
|
|||
|
||||
#include "lib_log.h"
|
||||
#include "lib_pairing_heap.h"
|
||||
#include "lib_queue.h"
|
||||
|
||||
////////////////////////////////////////////////////////////////////
|
||||
|
||||
_PAIRING_HEAP_TYPEDEF(mref,)
|
||||
_PAIRING_HEAP_TYPEDEF(logger,)
|
||||
|
||||
struct logger_queue {
|
||||
struct logger_queue *q_dep;
|
||||
atomic_t *q_dep_plus;
|
||||
QUEUE_ANCHOR(logger,loff_t,logger);
|
||||
struct trans_logger_output *q_output;
|
||||
struct list_head q_anchor;
|
||||
struct pairing_heap_mref *heap_high;
|
||||
struct pairing_heap_mref *heap_low;
|
||||
loff_t heap_margin;
|
||||
loff_t last_pos;
|
||||
long long q_last_insert; // jiffies
|
||||
spinlock_t q_lock;
|
||||
atomic_t q_queued;
|
||||
atomic_t q_flying;
|
||||
atomic_t q_total;
|
||||
const char *q_insert_info;
|
||||
const char *q_pushback_info;
|
||||
const char *q_fetch_info;
|
||||
// tunables
|
||||
int q_batchlen;
|
||||
int q_max_queued;
|
||||
int q_max_flying;
|
||||
int q_max_jiffies;
|
||||
int q_max_contention;
|
||||
int q_over_pressure;
|
||||
int q_io_prio;
|
||||
bool q_ordering;
|
||||
|
||||
};
|
||||
|
||||
struct logger_head {
|
||||
struct list_head lh_head;
|
||||
loff_t *lh_pos;
|
||||
struct pairing_heap_logger ph;
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////
|
||||
|
@ -52,8 +40,10 @@ struct hash_anchor {
|
|||
|
||||
struct writeback_info {
|
||||
struct trans_logger_output *w_output;
|
||||
struct logger_head w_lh;
|
||||
loff_t w_pos;
|
||||
int w_len;
|
||||
int w_error;
|
||||
struct list_head w_collect_list; // list of collected orig requests
|
||||
struct list_head w_sub_read_list; // for saving the old data before overwrite
|
||||
struct list_head w_sub_write_list; // for overwriting
|
||||
|
@ -66,12 +56,13 @@ struct writeback_info {
|
|||
struct trans_logger_mref_aspect {
|
||||
GENERIC_ASPECT(mref);
|
||||
struct trans_logger_output *output;
|
||||
struct logger_head lh;
|
||||
struct list_head hash_head;
|
||||
struct list_head q_head;
|
||||
//struct list_head q_head;
|
||||
struct list_head pos_head;
|
||||
struct list_head replay_head;
|
||||
struct list_head collect_head;
|
||||
struct pairing_heap_mref ph;
|
||||
struct pairing_heap_logger ph;
|
||||
struct trans_logger_mref_aspect *shadow_ref;
|
||||
void *shadow_data;
|
||||
bool do_dealloc;
|
||||
|
@ -81,7 +72,6 @@ struct trans_logger_mref_aspect {
|
|||
bool is_collected;
|
||||
struct timespec stamp;
|
||||
loff_t log_pos;
|
||||
loff_t fetch_margin;
|
||||
struct generic_callback cb;
|
||||
struct trans_logger_mref_aspect *orig_mref_a;
|
||||
struct writeback_info *wb;
|
||||
|
|
Loading…
Reference in New Issue