diff --git a/lib_pairing_heap.h b/lib_pairing_heap.h index 18afe3af..4a2569ea 100644 --- a/lib_pairing_heap.h +++ b/lib_pairing_heap.h @@ -36,7 +36,7 @@ struct pairing_heap_##KEYTYPE *_ph_merge_##KEYTYPE(struct pairing_heap_##KEYTYPE return heap2; \ if (!heap2) \ return heap1; \ - if (CMP(heap1, heap2)) { \ + if (CMP(heap1, heap2) < 0) { \ heap2->next = heap1->subheaps; \ heap1->subheaps = heap2; \ return heap1; \ @@ -83,7 +83,7 @@ void ph_delete_min_##KEYTYPE(struct pairing_heap_##KEYTYPE **heap) \ } /* some default CMP() function */ -#define PAIRING_HEAP_COMPARE(a,b) ((a)->key < (b)->key) +#define PAIRING_HEAP_COMPARE(a,b) ((a)->key < (b)->key ? -1 : ((a)->key > (b)->key ? 1 : 0)) /* less generic version: use the default CMP() function */ #define PAIRING_HEAP_FUNCTIONS(_STATIC,KEYTYPE) \ diff --git a/lib_queue.h b/lib_queue.h new file mode 100644 index 00000000..e84aa392 --- /dev/null +++ b/lib_queue.h @@ -0,0 +1,127 @@ +// (c) 2011 Thomas Schoebel-Theuer / 1&1 Internet AG + +#ifndef LIB_QUEUE_H +#define LIB_QUEUE_H + +#define QUEUE_ANCHOR(PREFIX,KEYTYPE,HEAPTYPE) \ + struct PREFIX##_queue *q_dep; \ + atomic_t *q_dep_plus; \ + struct list_head q_anchor; \ + struct pairing_heap_##HEAPTYPE *heap_high; \ + struct pairing_heap_##HEAPTYPE *heap_low; \ + long long q_last_insert; /* jiffies */ \ + KEYTYPE heap_margin; \ + KEYTYPE last_pos; \ + spinlock_t q_lock; \ + /* readonly from outside */ \ + atomic_t q_queued; \ + atomic_t q_flying; \ + atomic_t q_total; \ + /* tunables */ \ + int q_batchlen; \ + int q_max_queued; \ + int q_max_flying; \ + int q_max_jiffies; \ + int q_max_contention; \ + int q_over_pressure; \ + int q_io_prio; \ + bool q_ordering; \ + + +#define QUEUE_FUNCTIONS(PREFIX,ELEM_TYPE,HEAD,KEYFN,KEYCMP,HEAPTYPE) \ + \ +static inline \ +void q_##PREFIX##_init(struct PREFIX##_queue *q) \ +{ \ + INIT_LIST_HEAD(&q->q_anchor); \ + q->heap_low = NULL; \ + q->heap_high = NULL; \ + spin_lock_init(&q->q_lock); \ + atomic_set(&q->q_queued, 0); \ + atomic_set(&q->q_flying, 0); \ +} \ + \ +static inline \ +void q_##PREFIX##_insert(struct PREFIX##_queue *q, ELEM_TYPE *elem) \ +{ \ + unsigned long flags; \ + \ + traced_lock(&q->q_lock, flags); \ + \ + if (q->q_ordering) { \ + struct pairing_heap_##HEAPTYPE **use = &q->heap_high; \ + if (KEYCMP(KEYFN(elem), &q->heap_margin) <= 0) { \ + use = &q->heap_low; \ + } \ + ph_insert_##HEAPTYPE(use, &elem->ph); \ + } else { \ + list_add_tail(&elem->HEAD, &q->q_anchor); \ + } \ + atomic_inc(&q->q_queued); \ + atomic_inc(&q->q_total); \ + q->q_last_insert = jiffies; \ + \ + traced_unlock(&q->q_lock, flags); \ +} \ + \ +static inline \ +void q_##PREFIX##_pushback(struct PREFIX##_queue *q, ELEM_TYPE *elem) \ +{ \ + unsigned long flags; \ + \ + if (q->q_ordering) { \ + atomic_dec(&q->q_total); \ + q_##PREFIX##_insert(q, elem); \ + return; \ + } \ + \ + traced_lock(&q->q_lock, flags); \ + \ + list_add(&elem->HEAD, &q->q_anchor); \ + atomic_inc(&q->q_queued); \ + \ + traced_unlock(&q->q_lock, flags); \ +} \ + \ +static inline \ +ELEM_TYPE *q_##PREFIX##_fetch(struct PREFIX##_queue *q) \ +{ \ + ELEM_TYPE *elem = NULL; \ + unsigned long flags; \ + \ + traced_lock(&q->q_lock, flags); \ + \ + if (q->q_ordering) { \ + if (!q->heap_high) { \ + q->heap_high = q->heap_low; \ + q->heap_low = NULL; \ + q->heap_margin = 0; \ + q->last_pos = 0; \ + } \ + if (q->heap_high) { \ + elem = container_of(q->heap_high, ELEM_TYPE, ph); \ + \ + if (unlikely(KEYCMP(KEYFN(elem), &q->last_pos) < 0)) { \ + MARS_ERR("backskip pos %lld -> %lld\n", q->last_pos, KEYFN(elem)); \ + } \ + memcpy(&q->last_pos, KEYFN(elem), sizeof(q->last_pos)); \ + \ + if (KEYCMP(KEYFN(elem), &q->heap_margin) > 0) { \ + memcpy(&q->heap_margin, KEYFN(elem), sizeof(q->heap_margin)); \ + } \ + ph_delete_min_##HEAPTYPE(&q->heap_high); \ + atomic_dec(&q->q_queued); \ + } \ + } else if (!list_empty(&q->q_anchor)) { \ + struct list_head *next = q->q_anchor.next; \ + list_del_init(next); \ + atomic_dec(&q->q_queued); \ + elem = container_of(next, ELEM_TYPE, HEAD); \ + } \ + \ + traced_unlock(&q->q_lock, flags); \ + \ + return elem; \ +} + +#endif diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 71f7872d..af5b349a 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -36,132 +36,43 @@ #endif static inline -bool qq_cmp(struct pairing_heap_mref *_a, struct pairing_heap_mref *_b) +int lh_cmp(loff_t *a, loff_t *b) { - struct trans_logger_mref_aspect *mref_a = container_of(_a, struct trans_logger_mref_aspect, ph); - struct trans_logger_mref_aspect *mref_b = container_of(_b, struct trans_logger_mref_aspect, ph); - struct mref_object *a = mref_a->object; - struct mref_object *b = mref_b->object; - return a->ref_pos < b->ref_pos; -} - -_PAIRING_HEAP_FUNCTIONS(static,mref,qq_cmp); - - -//////////////////////// generic queue handling (TBD!!!) ///////////////////// - -static inline -void q_init(struct logger_queue *q, struct trans_logger_output *output) -{ - q->q_output = output; - INIT_LIST_HEAD(&q->q_anchor); - q->heap_low = NULL; - q->heap_high = NULL; - spin_lock_init(&q->q_lock); - atomic_set(&q->q_queued, 0); - atomic_set(&q->q_flying, 0); + if (*a < *b) + return -1; + if (*a > *b) + return 1; + return 0; } static inline -void q_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a) +int tr_cmp(struct pairing_heap_logger *_a, struct pairing_heap_logger *_b) { - unsigned long flags; - - traced_lock(&q->q_lock, flags); - - if (q->q_ordering) { - struct pairing_heap_mref **use = &q->heap_high; - if (mref_a->object->ref_pos <= q->heap_margin) { - use = &q->heap_low; - } - ph_insert_mref(use, &mref_a->ph); - } else { - list_add_tail(&mref_a->q_head, &q->q_anchor); - } - atomic_inc(&q->q_queued); - atomic_inc(&q->q_total); - q->q_last_insert = jiffies; - - traced_unlock(&q->q_lock, flags); + struct logger_head *a = container_of(_a, struct logger_head, ph); + struct logger_head *b = container_of(_b, struct logger_head, ph); + return lh_cmp(a->lh_pos, b->lh_pos); } +_PAIRING_HEAP_FUNCTIONS(static,logger,tr_cmp); + static inline -void q_pushback(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a) +loff_t *lh_get(struct logger_head *th) { - unsigned long flags; - - CHECK_ATOMIC(&mref_a->object->ref_count, 1); - - mars_trace(mref_a->object, q->q_pushback_info); - - if (q->q_ordering) { - atomic_dec(&q->q_total); - q_insert(q, mref_a); - return; - } - - traced_lock(&q->q_lock, flags); - - list_add(&mref_a->q_head, &q->q_anchor); - atomic_inc(&q->q_queued); - //q->q_last_insert = jiffies; - - traced_unlock(&q->q_lock, flags); + return th->lh_pos; } -static inline -struct trans_logger_mref_aspect *q_fetch(struct logger_queue *q) -{ - struct trans_logger_mref_aspect *mref_a = NULL; - unsigned long flags; - - traced_lock(&q->q_lock, flags); - - if (q->q_ordering) { - if (!q->heap_high) { - q->heap_high = q->heap_low; - q->heap_low = NULL; - q->heap_margin = 0; - q->last_pos = 0; - } - if (q->heap_high) { - struct mref_object *mref; - loff_t new_margin; - mref_a = container_of(q->heap_high, struct trans_logger_mref_aspect, ph); - mref = mref_a->object; -#if 1 - if (unlikely(mref->ref_pos < q->last_pos)) { - MARS_ERR("backskip pos %lld -> %lld len = %d\n", q->last_pos, mref->ref_pos, mref->ref_len); - } - q->last_pos = mref->ref_pos; -#endif - mref_a->fetch_margin = q->heap_margin; - new_margin = mref->ref_pos + mref->ref_len; - if (new_margin > q->heap_margin) { - q->heap_margin = new_margin; - } - ph_delete_min_mref(&q->heap_high); - atomic_dec(&q->q_queued); - } - } else if (!list_empty(&q->q_anchor)) { - struct list_head *next = q->q_anchor.next; - list_del_init(next); - atomic_dec(&q->q_queued); - mref_a = container_of(next, struct trans_logger_mref_aspect, q_head); - } - - traced_unlock(&q->q_lock, flags); - - if (mref_a) { - CHECK_ATOMIC(&mref_a->object->ref_count, 1); - mars_trace(mref_a->object, q->q_fetch_info); - } - - return mref_a; -} +//QUEUE_FUNCTIONS(logger,struct trans_logger_mref_aspect,th.th_head,MREF_KEY_FN,th_cmp,mref); +QUEUE_FUNCTIONS(logger,struct logger_head,lh_head,lh_get,lh_cmp,logger); ////////////////////////// logger queue handling //////////////////////// +static inline +void qq_init(struct logger_queue *q, struct trans_logger_output *output) +{ + q_logger_init(q); + q->q_output = output; +} + static noinline bool qq_is_ready(struct logger_queue *q) { @@ -246,9 +157,34 @@ void qq_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a) mars_trace(mref, q->q_insert_info); - q_insert(q, mref_a); + q_logger_insert(q, &mref_a->lh); } +static inline +void qq_pushback(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a) +{ + CHECK_ATOMIC(&mref_a->object->ref_count, 1); + + mars_trace(mref_a->object, q->q_pushback_info); + + q_logger_pushback(q, &mref_a->lh); +} + +static inline +struct trans_logger_mref_aspect *qq_fetch(struct logger_queue *q) +{ + struct logger_head *test; + struct trans_logger_mref_aspect *mref_a = NULL; + + test = q_logger_fetch(q); + + if (test) { + mref_a = container_of(test, struct trans_logger_mref_aspect, lh); + CHECK_ATOMIC(&mref_a->object->ref_count, 1); + mars_trace(mref_a->object, q->q_fetch_info); + } + return mref_a; +} ///////////////////////// own helper functions //////////////////////// @@ -712,8 +648,8 @@ restart: return; } + CHECK_HEAD_EMPTY(&mref_a->lh.lh_head); CHECK_HEAD_EMPTY(&mref_a->hash_head); - CHECK_HEAD_EMPTY(&mref_a->q_head); CHECK_HEAD_EMPTY(&mref_a->replay_head); CHECK_HEAD_EMPTY(&mref_a->collect_head); CHECK_HEAD_EMPTY(&mref_a->sub_list); @@ -837,8 +773,8 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object shadow_a = mref_a->shadow_ref; if (shadow_a) { #if 1 + CHECK_HEAD_EMPTY(&mref_a->lh.lh_head); CHECK_HEAD_EMPTY(&mref_a->hash_head); - CHECK_HEAD_EMPTY(&mref_a->q_head); CHECK_HEAD_EMPTY(&mref_a->pos_head); #endif atomic_inc(&mref->ref_count); // must be paired with __trans_logger_ref_put() @@ -894,11 +830,41 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a) traced_unlock(&brick->pos_lock, flags); } +static inline +void _free_one(struct list_head *tmp) +{ + struct trans_logger_mref_aspect *sub_mref_a; + struct mref_object *sub_mref; + + list_del_init(tmp); + + sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head); + sub_mref = sub_mref_a->object; + + trans_logger_free_mref(sub_mref); +} + static noinline void free_writeback(struct writeback_info *wb) { struct list_head *tmp; + if (unlikely(wb->w_error < 0)) { + MARS_ERR("writeback error = %d at pos = %lld len = %d, writeback is incomplete\n", wb->w_error, wb->w_pos, wb->w_len); + } + + /* The sub_read and sub_write lists are usually empty here. + * This code is only for cleanup in case of errors. + */ + while (unlikely((tmp = wb->w_sub_read_list.next) != &wb->w_sub_read_list)) { + _free_one(tmp); + } + while (unlikely((tmp = wb->w_sub_write_list.next) != &wb->w_sub_write_list)) { + _free_one(tmp); + } + + /* Now complete the original requests. + */ while ((tmp = wb->w_collect_list.next) != &wb->w_collect_list) { struct trans_logger_mref_aspect *orig_mref_a; struct mref_object *orig_mref; @@ -910,14 +876,18 @@ void free_writeback(struct writeback_info *wb) CHECK_ATOMIC(&orig_mref->ref_count, 1); - pos_complete(orig_mref_a); + if (likely(wb->w_error >= 0)) { + pos_complete(orig_mref_a); + } __trans_logger_ref_put(orig_mref_a->output, orig_mref_a); } - //... + kfree(wb); } +/* Generic endio() for writeback_info + */ static noinline void wb_endio(struct generic_callback *cb) { @@ -940,6 +910,10 @@ void wb_endio(struct generic_callback *cb) atomic_dec(&output->wb_balance_count); + if (cb->cb_error < 0) { + wb->w_error = cb->cb_error; + } + rw = sub_mref->ref_rw; dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count; CHECK_ATOMIC(dec, 1); @@ -948,7 +922,7 @@ void wb_endio(struct generic_callback *cb) } endio = rw ? wb->write_endio : wb->read_endio; - if (endio) { + if (likely(endio)) { endio(cb); } return; @@ -957,6 +931,12 @@ err: MARS_FAT("hanging up....\n"); } +/* Atomically create writeback info, based on "snapshot" of current hash + * state. + * Notice that the hash can change during writeback IO, thus we need + * struct writeback_info to precisely catch that information at a single + * point in time. + */ static noinline struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t pos, int len) { @@ -977,6 +957,9 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t MARS_ERR("len = %d\n", len); } + /* Atomically fetch transitive closure on all requests + * overlapping with the current search region. + */ hash_extend(output, &wb->w_pos, &wb->w_len, &wb->w_collect_list); pos = wb->w_pos; @@ -986,6 +969,55 @@ struct writeback_info *make_writeback(struct trans_logger_output *output, loff_t MARS_ERR("len = %d\n", len); } + /* Create sub_mrefs for read of old disk version (phase2) + */ + if (brick->log_reads) { + while (len > 0) { + struct trans_logger_mref_aspect *sub_mref_a; + struct mref_object *sub_mref; + int this_len; + int status; + + sub_mref = trans_logger_alloc_mref((void*)output, &output->writeback_layout); + if (unlikely(!sub_mref)) { + MARS_FAT("cannot alloc sub_mref\n"); + goto err; + } + + sub_mref->ref_pos = pos; + sub_mref->ref_len = len; + sub_mref->ref_may_write = READ; + sub_mref->ref_rw = READ; + sub_mref->ref_data = NULL; + + sub_mref_a = trans_logger_mref_get_aspect((struct trans_logger_output*)output, sub_mref); + CHECK_PTR(sub_mref_a, err); + + sub_mref_a->output = output; + sub_mref_a->wb = wb; + + status = GENERIC_INPUT_CALL(sub_input, mref_get, sub_mref); + if (unlikely(status < 0)) { + MARS_FAT("cannot get sub_ref, status = %d\n", status); + goto err; + } + + list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_read_list); + atomic_inc(&wb->w_sub_read_count); + atomic_inc(&output->wb_balance_count); + + this_len = sub_mref->ref_len; + pos += this_len; + len -= this_len; + } + /* Re-init for startover + */ + pos = wb->w_pos; + len = wb->w_len; + } + + /* Create sub_mrefs for writeback (phase4) + */ while (len > 0) { struct trans_logger_mref_aspect *sub_mref_a; struct mref_object *sub_mref; @@ -1230,8 +1262,8 @@ bool phase0_startio(struct trans_logger_mref_aspect *mref_a) } // else WRITE #if 1 + CHECK_HEAD_EMPTY(&mref_a->lh.lh_head); CHECK_HEAD_EMPTY(&mref_a->hash_head); - CHECK_HEAD_EMPTY(&mref_a->q_head); if (unlikely(mref->ref_flags & (MREF_READING | MREF_WRITING))) { MARS_ERR("bad flags %d\n", mref->ref_flags); } @@ -1282,7 +1314,7 @@ err: atomic_t provisionary_count = ATOMIC_INIT(0); static noinline -void new_endio(struct generic_callback *cb) +void phase2_endio(struct generic_callback *cb) { struct trans_logger_mref_aspect *sub_mref_a; struct writeback_info *wb; @@ -1301,19 +1333,21 @@ void new_endio(struct generic_callback *cb) goto err; } - hash_put_all(wb->w_output, &wb->w_collect_list); - - free_writeback(wb); - atomic_dec(&provisionary_count); - wake_up_interruptible(&output->event); + + // queue up for the next phase + //qq_insert(&output->q_phase3, orig_mref_a); + wake_up_interruptible(&output->event); return; err: MARS_FAT("hanging up....\n"); } +static noinline +void phase4_endio(struct generic_callback *cb); + static noinline bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) { @@ -1353,7 +1387,8 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a) goto done; } - wb->write_endio = new_endio; + wb->read_endio = phase2_endio; + wb->write_endio = phase4_endio; fire_writeback(wb, &wb->w_sub_write_list); done: @@ -1497,6 +1532,40 @@ err: * Phase 4: overwrite old disk version with new version. */ +static noinline +void phase4_endio(struct generic_callback *cb) +{ + struct trans_logger_mref_aspect *sub_mref_a; + struct writeback_info *wb; + struct trans_logger_output *output; + + CHECK_PTR(cb, err); + sub_mref_a = cb->cb_private; + CHECK_PTR(sub_mref_a, err); + wb = sub_mref_a->wb; + CHECK_PTR(wb, err); + output = wb->w_output; + CHECK_PTR(output, err); + + if (unlikely(cb->cb_error < 0)) { + MARS_FAT("IO error %d\n", cb->cb_error); + goto err; + } + + hash_put_all(wb->w_output, &wb->w_collect_list); + + free_writeback(wb); + + atomic_dec(&provisionary_count); + wake_up_interruptible(&output->event); + + return; + +err: + MARS_FAT("hanging up....\n"); +} + + #ifndef NEW_CODE static noinline @@ -1739,7 +1808,7 @@ int run_queue(struct trans_logger_output *output, struct logger_queue *q, bool ( int res; while (max-- > 0) { - mref_a = q_fetch(q); + mref_a = qq_fetch(q); res = -1; if (!mref_a) goto done; @@ -1748,7 +1817,7 @@ int run_queue(struct trans_logger_output *output, struct logger_queue *q, bool ( ok = startio(mref_a); if (unlikely(!ok)) { - q_pushback(q, mref_a); + qq_pushback(q, mref_a); output->did_pushback = true; res = 1; goto done; @@ -2224,8 +2293,9 @@ static noinline int trans_logger_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data) { struct trans_logger_mref_aspect *ini = (void*)_ini; + ini->lh.lh_pos = &ini->object->ref_pos; + INIT_LIST_HEAD(&ini->lh.lh_head); INIT_LIST_HEAD(&ini->hash_head); - INIT_LIST_HEAD(&ini->q_head); INIT_LIST_HEAD(&ini->pos_head); INIT_LIST_HEAD(&ini->replay_head); INIT_LIST_HEAD(&ini->collect_head); @@ -2238,8 +2308,8 @@ static noinline void trans_logger_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data) { struct trans_logger_mref_aspect *ini = (void*)_ini; + CHECK_HEAD_EMPTY(&ini->lh.lh_head); CHECK_HEAD_EMPTY(&ini->hash_head); - CHECK_HEAD_EMPTY(&ini->q_head); CHECK_HEAD_EMPTY(&ini->pos_head); CHECK_HEAD_EMPTY(&ini->replay_head); CHECK_HEAD_EMPTY(&ini->collect_head); @@ -2272,10 +2342,10 @@ int trans_logger_output_construct(struct trans_logger_output *output) } atomic_set(&output->hash_count, 0); init_waitqueue_head(&output->event); - q_init(&output->q_phase1, output); - q_init(&output->q_phase2, output); - q_init(&output->q_phase3, output); - q_init(&output->q_phase4, output); + qq_init(&output->q_phase1, output); + qq_init(&output->q_phase2, output); + qq_init(&output->q_phase3, output); + qq_init(&output->q_phase4, output); #if 1 output->q_phase2.q_dep = &output->q_phase3; output->q_phase3.q_dep = &output->q_phase4; diff --git a/mars_trans_logger.h b/mars_trans_logger.h index 7429fe90..1910e5ce 100644 --- a/mars_trans_logger.h +++ b/mars_trans_logger.h @@ -10,37 +10,25 @@ #include "lib_log.h" #include "lib_pairing_heap.h" +#include "lib_queue.h" //////////////////////////////////////////////////////////////////// -_PAIRING_HEAP_TYPEDEF(mref,) +_PAIRING_HEAP_TYPEDEF(logger,) struct logger_queue { - struct logger_queue *q_dep; - atomic_t *q_dep_plus; + QUEUE_ANCHOR(logger,loff_t,logger); struct trans_logger_output *q_output; - struct list_head q_anchor; - struct pairing_heap_mref *heap_high; - struct pairing_heap_mref *heap_low; - loff_t heap_margin; - loff_t last_pos; - long long q_last_insert; // jiffies - spinlock_t q_lock; - atomic_t q_queued; - atomic_t q_flying; - atomic_t q_total; const char *q_insert_info; const char *q_pushback_info; const char *q_fetch_info; - // tunables - int q_batchlen; - int q_max_queued; - int q_max_flying; - int q_max_jiffies; - int q_max_contention; - int q_over_pressure; - int q_io_prio; - bool q_ordering; + +}; + +struct logger_head { + struct list_head lh_head; + loff_t *lh_pos; + struct pairing_heap_logger ph; }; //////////////////////////////////////////////////////////////////// @@ -52,8 +40,10 @@ struct hash_anchor { struct writeback_info { struct trans_logger_output *w_output; + struct logger_head w_lh; loff_t w_pos; int w_len; + int w_error; struct list_head w_collect_list; // list of collected orig requests struct list_head w_sub_read_list; // for saving the old data before overwrite struct list_head w_sub_write_list; // for overwriting @@ -66,12 +56,13 @@ struct writeback_info { struct trans_logger_mref_aspect { GENERIC_ASPECT(mref); struct trans_logger_output *output; + struct logger_head lh; struct list_head hash_head; - struct list_head q_head; + //struct list_head q_head; struct list_head pos_head; struct list_head replay_head; struct list_head collect_head; - struct pairing_heap_mref ph; + struct pairing_heap_logger ph; struct trans_logger_mref_aspect *shadow_ref; void *shadow_data; bool do_dealloc; @@ -81,7 +72,6 @@ struct trans_logger_mref_aspect { bool is_collected; struct timespec stamp; loff_t log_pos; - loff_t fetch_margin; struct generic_callback cb; struct trans_logger_mref_aspect *orig_mref_a; struct writeback_info *wb;