import mars-101.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-04-20 15:26:44 +01:00
parent 3245a9ea55
commit a67c253c10
3 changed files with 119 additions and 83 deletions

View File

@ -4,15 +4,17 @@
#define LIB_QUEUE_H
#define QUEUE_ANCHOR(PREFIX,KEYTYPE,HEAPTYPE) \
struct PREFIX##_queue *q_dep; \
atomic_t *q_dep_plus; \
spinlock_t q_lock; \
struct list_head q_anchor; \
struct pairing_heap_##HEAPTYPE *heap_high; \
struct pairing_heap_##HEAPTYPE *heap_low; \
long long q_last_insert; /* jiffies */ \
KEYTYPE heap_margin; \
KEYTYPE last_pos; \
spinlock_t q_lock; \
/* parameters */ \
atomic_t *q_contention; \
struct PREFIX##_queue *q_dep; \
bool q_barrier; \
/* readonly from outside */ \
atomic_t q_queued; \
atomic_t q_flying; \
@ -122,6 +124,83 @@ ELEM_TYPE *q_##PREFIX##_fetch(struct PREFIX##_queue *q) \
traced_unlock(&q->q_lock, flags); \
\
return elem; \
}
} \
\
static inline \
bool q_##PREFIX##_is_ready(struct logger_queue *q) \
{ \
struct PREFIX##_queue *dep; \
int queued = atomic_read(&q->q_queued); \
int contention; \
int max_contention; \
int over; \
int flying; \
bool res = false; \
\
/* 1) when empty, there is nothing to do. \
*/ \
if (queued <= 0) \
goto always_done; \
\
/* compute some characteristic measures \
*/ \
contention = 0; \
if (q->q_contention) { \
contention = atomic_read(q->q_contention); \
} \
dep = q->q_dep; \
while (dep) { \
contention += atomic_read(&dep->q_queued) + atomic_read(&dep->q_flying); \
dep = dep->q_dep; \
} \
max_contention = q->q_max_contention; \
over = queued - q->q_max_queued; \
if (over > 0 && q->q_over_pressure > 0) { \
max_contention += over / q->q_over_pressure; \
} \
\
/* 2) check whether queue is halted \
*/ \
if (q->q_barrier && contention > 0) \
goto always_done; \
\
/* 3) when other queues are too much contended, \
* refrain from contending the IO system even more. \
*/ \
if (contention > max_contention) { \
goto always_done; \
} \
\
/* 4) when the maximum queue length is reached, start IO. \
*/ \
res = true; \
if (over > 0) \
goto limit; \
\
/* 5) also start IO when queued requests are too old \
* (measured in realtime) \
*/ \
if (q->q_max_jiffies > 0 && \
(long long)jiffies - q->q_last_insert >= q->q_max_jiffies) \
goto limit; \
\
/* 6) when no contention, start draining the queue. \
*/ \
if (contention <= 0) \
goto limit; \
\
res = false; \
goto always_done; \
\
limit: \
/* Limit the number of flying requests (parallelism) \
*/ \
flying = atomic_read(&q->q_flying); \
if (q->q_max_flying > 0 && flying >= q->q_max_flying) \
res = false; \
\
always_done: \
return res; \
} \
#endif

View File

@ -63,8 +63,8 @@ struct light_class {
#define CONF_TRANS_BATCHLEN 32
#define CONF_TRANS_FLYING 4
#define CONF_TRANS_PRIO MARS_PRIO_HIGH
#define CONF_TRANS_LOG_READS false
//#define CONF_TRANS_LOG_READS true
//#define CONF_TRANS_LOG_READS false
#define CONF_TRANS_LOG_READS true
//#define CONF_ALL_BATCHLEN 2
#define CONF_ALL_BATCHLEN 1

View File

@ -12,6 +12,7 @@
//#define USE_KMALLOC
#define HIGHER_ORDER
//#define WB_COPY
#define LATER
// changing this is dangerous for data integrity! use only for testing!
#define USE_MEMCPY
@ -69,80 +70,13 @@ void qq_init(struct logger_queue *q, struct trans_logger_output *output)
{
q_logger_init(q);
q->q_output = output;
q->q_contention = &output->fly_count;
}
static noinline
bool qq_is_ready(struct logger_queue *q)
{
struct logger_queue *dep;
int queued = atomic_read(&q->q_queued);
int contention;
int max_contention;
int over;
int flying;
bool res = false;
/* 1) when empty, there is nothing to do.
*/
if (queued <= 0)
goto always_done;
/* compute some characteristic measures
*/
contention = atomic_read(&q->q_output->fly_count);
if (q->q_dep_plus) {
contention += atomic_read(q->q_dep_plus);
}
dep = q->q_dep;
while (dep) {
contention += atomic_read(&dep->q_queued) + atomic_read(&dep->q_flying);
dep = dep->q_dep;
}
max_contention = q->q_max_contention;
over = queued - q->q_max_queued;
if (over > 0 && q->q_over_pressure > 0) {
max_contention += over / q->q_over_pressure;
}
#if 1
/* 2) when other queues are too much contended,
* refrain from contending the IO system even more.
*/
if (contention > max_contention) {
goto always_done;
}
#endif
/* 3) when the maximum queue length is reached, start IO.
*/
res = true;
if (over > 0)
goto limit;
/* 4) also start IO when queued requests are too old
* (measured in realtime)
*/
if (q->q_max_jiffies > 0 &&
(long long)jiffies - q->q_last_insert >= q->q_max_jiffies)
goto limit;
/* 5) when no contention, start draining the queue.
*/
if (contention <= 0)
goto limit;
res = false;
goto always_done;
limit:
/* Limit the number of flying requests (parallelism)
*/
flying = atomic_read(&q->q_flying);
if (q->q_max_flying > 0 && flying >= q->q_max_flying)
res = false;
always_done:
return res;
return q_logger_is_ready(q);
}
static inline
@ -1443,7 +1377,12 @@ bool phase2_startio(struct trans_logger_mref_aspect *orig_mref_a)
atomic_inc(&output->q_phase2.q_flying);
fire_writeback(wb, &wb->w_sub_read_list, false);
} else { // shortcut
#ifdef LATER
qq_wb_insert(&output->q_phase4, wb);
wake_up_interruptible(&output->event);
#else
return phase4_startio(wb);
#endif
}
done:
@ -1503,7 +1442,7 @@ err:
static noinline
bool _phase3_startio(struct trans_logger_mref_aspect *sub_mref_a)
{
struct mref_object *sub_mref;
struct mref_object *sub_mref = NULL;
struct trans_logger_output *output;
struct trans_logger_brick *brick;
void *data;
@ -1537,9 +1476,12 @@ bool _phase3_startio(struct trans_logger_mref_aspect *sub_mref_a)
if (unlikely(!ok)) {
goto err;
}
atomic_inc(&output->q_phase3.q_flying);
return true;
err:
MARS_FAT("cannot log old data, pos = %lld len = %d\n", sub_mref ? sub_mref->ref_pos : 0, sub_mref ? sub_mref->ref_len : 0);
return false;
}
@ -1564,22 +1506,18 @@ bool phase3_startio(struct writeback_info *wb)
struct list_head *tmp;
start = &wb->w_sub_read_list;
while ((tmp = start->next) != start) {
for (tmp = start->next; tmp != start; tmp = tmp->next) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
list_del_init(tmp);
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
atomic_inc(&output->q_phase3.q_flying);
mars_trace(sub_mref, "sub_log");
if (!_phase3_startio(sub_mref_a)) {
ok = false;
}
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
}
wake_up_interruptible(&output->event);
} else {
@ -1632,6 +1570,26 @@ err:
static noinline
bool phase4_startio(struct writeback_info *wb)
{
struct trans_logger_input *sub_input = wb->w_output->brick->inputs[0];
struct list_head *start = &wb->w_sub_read_list;
struct list_head *tmp;
/* Cleanup read requests (if they exist from previous phases)
*/
while ((tmp = start->next) != start) {
struct trans_logger_mref_aspect *sub_mref_a;
struct mref_object *sub_mref;
list_del_init(tmp);
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
sub_mref = sub_mref_a->object;
GENERIC_INPUT_CALL(sub_input, mref_put, sub_mref);
}
/* Start writeback IO
*/
atomic_inc(&wb->w_output->q_phase4.q_flying);
fire_writeback(wb, &wb->w_sub_write_list, true);
return true;
@ -2215,8 +2173,7 @@ int trans_logger_output_construct(struct trans_logger_output *output)
qq_init(&output->q_phase3, output);
qq_init(&output->q_phase4, output);
#if 1
output->q_phase2.q_dep = &output->q_phase3;
output->q_phase3.q_dep = &output->q_phase4;
output->q_phase2.q_dep = &output->q_phase4;
output->q_phase4.q_dep = &output->q_phase1;
#endif