infra: make brick_wait upstream independent

This commit is contained in:
Thomas Schoebel-Theuer 2019-02-11 19:45:47 +01:00 committed by Thomas Schoebel-Theuer
parent c099e4ba25
commit e7fa3fa130
3 changed files with 48 additions and 48 deletions

View File

@ -24,45 +24,34 @@
#ifndef BRICK_WAIT_H
#define BRICK_WAIT_H
/* compat to some elder kernels...
/* Try to abstract from changes of the upstream kernel
* by using a hopefully stable interface.
*/
#ifndef ___wait_cond_timeout
#define ___wait_cond_timeout(x) (x)
#define prepare_to_wait_event(a,b,c) (prepare_to_wait(a, b, c), 0)
#endif
/* Some code stolen from include/linux/wait.h
*/
#define brick_wait(wq, condition, timeout) \
#define brick_wait(wq, flag, condition, timeout) \
({ \
__label__ __out; \
wait_queue_t __wait; \
long __ret = timeout; /* explicit shadow */ \
unsigned long __tmout = (timeout); \
\
might_sleep(); \
/* check in advance to avoid spinlocks in fastpath */ \
if (condition) \
goto __out; \
\
INIT_LIST_HEAD(&__wait.task_list); \
__wait.flags = 0; \
\
for (;;) { \
long __int = prepare_to_wait_event(&wq, &__wait, TASK_INTERRUPTIBLE); \
\
if (__int) { \
__ret = __int; \
break; \
} \
\
__ret = schedule_timeout(__ret); \
\
__set_current_state(TASK_RUNNING); \
if (___wait_cond_timeout(condition)) \
(flag) = false; \
smp_wmb(); \
while (!(condition)) { \
__tmout = wait_event_interruptible_timeout( \
wq, \
({ smp_rmb(); (flag); }), \
__tmout); \
if (__tmout <= 1) \
break; \
(flag) = false; \
smp_wmb(); \
} \
finish_wait(&wq, &__wait); \
__out: __ret; \
__tmout; \
})
#define brick_wake(wq, flag) \
({ \
(flag) = true; \
smp_wmb(); \
wake_up_interruptible_all(wq); \
})

View File

@ -737,6 +737,7 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
#ifdef DELAY_CALLERS
// delay in case of too many master shadows / memory shortage
brick_wait(brick->caller_event,
brick->caller_flag,
!brick->delay_callers &&
(brick_global_memlimit < 1024 || atomic64_read(&global_mshadow_used) / 1024 < brick_global_memlimit),
HZ / 2);
@ -979,7 +980,7 @@ void _trans_logger_endio(struct generic_callback *cb)
atomic_dec(&brick->any_fly_count);
atomic_inc(&brick->total_cb_count);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
return;
err:
@ -1021,7 +1022,7 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
atomic_inc(&brick->inner_balance_count);
qq_mref_insert(&brick->q_phase[0], mref_a);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
return;
}
@ -1181,7 +1182,7 @@ void wb_endio(struct generic_callback *cb)
}
done:
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
return;
err:
@ -1576,7 +1577,7 @@ void phase0_endio(void *private, int error)
qq_deactivate(&brick->q_phase[0]);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
return;
err:
MARS_ERR("giving up...\n");
@ -1784,7 +1785,7 @@ void phase1_endio(struct generic_callback *cb)
// queue up for the next phase
qq_wb_insert(&brick->q_phase[2], wb);
qq_deactivate(&brick->q_phase[1]);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
return;
err:
@ -1852,7 +1853,7 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a)
#endif
qq_wb_insert(&brick->q_phase[3], wb);
qq_deactivate(&brick->q_phase[1]);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
}
done:
@ -1878,7 +1879,7 @@ void _phase2_endio(struct writeback_info *wb)
// queue up for the next phase
qq_wb_insert(&brick->q_phase[3], wb);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
return;
}
@ -1992,7 +1993,7 @@ bool phase2_startio(struct writeback_info *wb)
ok = false;
}
}
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
} else {
_phase2_endio(wb);
}
@ -2035,7 +2036,7 @@ void phase3_endio(struct generic_callback *cb)
qq_deactivate(&brick->q_phase[3]);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
return;
@ -2110,7 +2111,7 @@ int run_mref_queue(struct logger_queue *q, bool (*startio)(struct trans_logger_m
done:
if (found) {
mars_limit(&global_writeback.limiter, (total_len - 1) / 1024 + 1);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
}
return res;
}
@ -2144,7 +2145,7 @@ int run_wb_queue(struct logger_queue *q, bool (*startio)(struct writeback_info *
done:
if (found) {
mars_limit(&global_writeback.limiter, (total_len - 1) / 1024 + 1);
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
}
return res;
}
@ -2301,7 +2302,7 @@ int _do_ranking(struct trans_logger_brick *brick)
}
} else if (brick->delay_callers) {
brick->delay_callers = false;
wake_up_interruptible_all(&brick->caller_event);
brick_wake(&brick->caller_event, brick->caller_flag);
}
// global limit for flying mrefs
@ -2608,6 +2609,7 @@ void trans_logger_log(struct trans_logger_brick *brick)
brick_wait(
brick->worker_event,
brick->worker_flag,
({
winner = _do_ranking(brick);
MARS_IO("winner = %d\n", winner);
@ -2733,7 +2735,7 @@ void replay_endio(struct generic_callback *cb)
MARS_ERR("callback with empty replay_head (replay_count=%d)\n", atomic_read(&brick->replay_count));
}
wake_up_interruptible_all(&brick->worker_event);
brick_wake(&brick->worker_event, brick->worker_flag);
return;
err:
MARS_FAT("cannot handle replay IO\n");
@ -2773,6 +2775,7 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
bool was_empty;
brick_wait(brick->worker_event,
brick->worker_flag,
atomic_read(&brick->replay_count) < max
&& (_has_conflict(brick, mref_a) ? conflicts++ : (ok = true), ok),
60 * HZ);
@ -3001,7 +3004,10 @@ void trans_logger_replay(struct trans_logger_brick *brick)
((long long)jiffies) - old_jiffies >= HZ * 3) &&
finished_pos >= 0) {
// for safety, wait until the IO queue has drained.
wait_event_interruptible_timeout(brick->worker_event, atomic_read(&brick->replay_count) <= 0, 30 * HZ);
brick_wait(brick->worker_event,
brick->worker_flag,
atomic_read(&brick->replay_count) <= 0,
30 * HZ);
if (unlikely(brick->disk_io_error)) {
@ -3023,7 +3029,10 @@ void trans_logger_replay(struct trans_logger_brick *brick)
MARS_INF("waiting for finish...\n");
wait_event_interruptible_timeout(brick->worker_event, atomic_read(&brick->replay_count) <= 0, 60 * HZ);
brick_wait(brick->worker_event,
brick->worker_flag,
atomic_read(&brick->replay_count) <= 0,
60 * HZ);
if (unlikely(finished_pos > brick->replay_end_pos)) {
MARS_ERR("finished_pos too large: %lld + %d = %lld > %lld\n", input->logst.log_pos, input->logst.offset, finished_pos, brick->replay_end_pos);

View File

@ -231,6 +231,8 @@ struct trans_logger_brick {
struct logger_queue q_phase[LOGGER_QUEUES];
struct rank_data rkd[LOGGER_QUEUES];
bool delay_callers;
bool caller_flag;
bool worker_flag;
};
struct trans_logger_output {