diff --git a/kernel/lib_queue.h b/kernel/lib_queue.h index 750f4ca8..56b86255 100644 --- a/kernel/lib_queue.h +++ b/kernel/lib_queue.h @@ -28,6 +28,7 @@ #define QUEUE_ANCHOR(PREFIX,KEYTYPE,HEAPTYPE) \ /* parameters */ \ /* readonly from outside */ \ + int q_active; \ int q_queued; \ atomic_t q_flying; \ /* tunables */ \ @@ -61,11 +62,23 @@ void q_##PREFIX##_init(struct PREFIX##_queue *q) \ q->heap_low = NULL; \ q->heap_high = NULL; \ spin_lock_init(&q->q_lock); \ + q->q_active = 0; \ q->q_queued = 0; \ atomic_set(&q->q_flying, 0); \ } \ \ static inline \ +void __q_##PREFIX##_insert_ordered(struct PREFIX##_queue *q, ELEM_TYPE *elem) \ +{ \ + struct pairing_heap_##HEAPTYPE **use = &q->heap_high; \ + \ + if (KEYCMP(KEYFN(elem), &q->heap_margin) <= 0) { \ + use = &q->heap_low; \ + } \ + ph_insert_##HEAPTYPE(use, &elem->ph); \ +} \ + \ +static inline \ void q_##PREFIX##_insert(struct PREFIX##_queue *q, ELEM_TYPE *elem) \ { \ unsigned long flags; \ @@ -73,14 +86,11 @@ void q_##PREFIX##_insert(struct PREFIX##_queue *q, ELEM_TYPE *elem) \ traced_lock(&q->q_lock, flags); \ \ if (q->q_ordering) { \ - struct pairing_heap_##HEAPTYPE **use = &q->heap_high; \ - if (KEYCMP(KEYFN(elem), &q->heap_margin) <= 0) { \ - use = &q->heap_low; \ - } \ - ph_insert_##HEAPTYPE(use, &elem->ph); \ + __q_##PREFIX##_insert_ordered(q, elem); \ } else { \ list_add_tail(&elem->HEAD, &q->q_anchor); \ } \ + q->q_active++; \ q->q_queued++; \ q->q_last_insert = jiffies; \ \ @@ -94,17 +104,18 @@ void q_##PREFIX##_pushback(struct PREFIX##_queue *q, ELEM_TYPE *elem) \ { \ unsigned long flags; \ \ - if (q->q_ordering) { \ - q_##PREFIX##_insert(q, elem); \ - return; \ - } \ - \ traced_lock(&q->q_lock, flags); \ \ - list_add(&elem->HEAD, &q->q_anchor); \ + if (q->q_ordering) { \ + __q_##PREFIX##_insert_ordered(q, elem); \ + } else { \ + list_add(&elem->HEAD, &q->q_anchor); \ + } \ q->q_queued++; \ \ traced_unlock(&q->q_lock, flags); \ + \ + q_##PREFIX##_trigger(q); \ } \ \ static inline \ @@ -164,6 +175,16 @@ void q_##PREFIX##_dec_flying(struct PREFIX##_queue *q) \ q_##PREFIX##_trigger(q); \ } \ \ +static inline \ +void q_##PREFIX##_activate(struct PREFIX##_queue *q, int count) \ +{ \ + unsigned long flags; \ + \ + traced_lock(&q->q_lock, flags); \ + q->q_active += count; \ + traced_unlock(&q->q_lock, flags); \ + q_##PREFIX##_trigger(q); \ +} \ #endif diff --git a/kernel/mars_trans_logger.c b/kernel/mars_trans_logger.c index 3939e65b..d675b50a 100644 --- a/kernel/mars_trans_logger.c +++ b/kernel/mars_trans_logger.c @@ -218,6 +218,18 @@ void qq_dec_flying(struct logger_queue *q) q_logger_dec_flying(q); } +static inline +void qq_activate(struct logger_queue *q) +{ + q_logger_activate(q, 1); +} + +static inline +void qq_deactivate(struct logger_queue *q) +{ + q_logger_activate(q, -1); +} + static inline void qq_mref_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mref_a) { @@ -1583,6 +1595,8 @@ void phase0_endio(void *private, int error) banning_reset(&brick->q_phase[0].q_banning); + qq_deactivate(&brick->q_phase[0]); + wake_up_interruptible_all(&brick->worker_event); return; err: @@ -1708,6 +1722,7 @@ bool prep_phase_startio(struct trans_logger_mref_aspect *mref_a) __trans_logger_ref_put(brick, mref_a); + qq_deactivate(&brick->q_phase[0]); return true; } // else WRITE @@ -1793,6 +1808,7 @@ void phase1_endio(struct generic_callback *cb) // queue up for the next phase qq_wb_insert(&brick->q_phase[2], wb); + qq_deactivate(&brick->q_phase[1]); wake_up_interruptible_all(&brick->worker_event); return; @@ -1820,10 +1836,12 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a) if (orig_mref_a->is_collected) { MARS_IO("already collected, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len); + qq_deactivate(&brick->q_phase[1]); goto done; } if (!orig_mref_a->is_hashed) { MARS_IO("AHA not hashed, pos = %lld len = %d\n", orig_mref->ref_pos, orig_mref->ref_len); + qq_deactivate(&brick->q_phase[1]); goto done; } @@ -1845,12 +1863,22 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a) qq_inc_flying(&brick->q_phase[1]); fire_writeback(&wb->w_sub_read_list, false); } else { // shortcut -#ifndef SHORTCUT_1_to_3 - qq_wb_insert(&brick->q_phase[3], wb); - wake_up_interruptible_all(&brick->worker_event); -#else - return phase3_startio(wb); +#ifdef SHORTCUT_1_to_3 + bool res; + + /* speculate that next phase can be immediately started */ + qq_activate(&brick->q_phase[3]); + res = phase3_startio(wb); + if (likely(res)) { + qq_deactivate(&brick->q_phase[1]); + goto done; + } + /* speculation was wrong: no shortcutting */ + qq_deactivate(&brick->q_phase[3]); #endif + qq_wb_insert(&brick->q_phase[3], wb); + qq_deactivate(&brick->q_phase[1]); + wake_up_interruptible_all(&brick->worker_event); } done: @@ -1906,6 +1934,7 @@ void phase2_endio(void *private, int error) banning_reset(&brick->q_phase[2].q_banning); _phase2_endio(wb); } + qq_deactivate(&brick->q_phase[2]); return; err: @@ -2035,6 +2064,8 @@ void phase3_endio(struct generic_callback *cb) banning_reset(&brick->q_phase[3].q_banning); + qq_deactivate(&brick->q_phase[3]); + wake_up_interruptible_all(&brick->worker_event); return;