From 37078c9800938e27d5f6cd142073e00e08e9c48e Mon Sep 17 00:00:00 2001 From: Thomas Schoebel-Theuer Date: Tue, 23 Aug 2022 15:48:08 +0200 Subject: [PATCH] infra: add suffix _flagged to historic bick waiting --- kernel/brick_wait.h | 8 +++--- kernel/lib_log.c | 3 +- kernel/mars_trans_logger.c | 59 ++++++++++++++++++++++++-------------- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/kernel/brick_wait.h b/kernel/brick_wait.h index 9fbfa425..d1a3086b 100644 --- a/kernel/brick_wait.h +++ b/kernel/brick_wait.h @@ -24,10 +24,10 @@ #ifndef BRICK_WAIT_H #define BRICK_WAIT_H -/* Try to abstract from changes of the upstream kernel - * by using a hopefully stable interface. +/* Historic adaptor. + * To disappear somewhen. */ -#define brick_wait(wq, flag, condition, timeout) \ +#define brick_wait_flagged(wq, flag, condition, timeout) \ ({ \ long __tmout = (timeout); \ int __old_flag; \ @@ -52,7 +52,7 @@ __tmout; \ }) -#define brick_wake(wq, flag) \ +#define brick_wake_flagged(wq, flag) \ ({ \ smp_rmb(); \ (flag)++; \ diff --git a/kernel/lib_log.c b/kernel/lib_log.c index a375e783..78c6336a 100644 --- a/kernel/lib_log.c +++ b/kernel/lib_log.c @@ -140,7 +140,8 @@ void log_write_endio(struct generic_callback *cb) atomic_dec(&logst->mref_flying); atomic_dec(&global_mref_flying); if (logst->signal_event && logst->signal_flag) - brick_wake(logst->signal_event, *(logst->signal_flag)); + brick_wake_flagged(logst->signal_event, + *(logst->signal_flag)); return; diff --git a/kernel/mars_trans_logger.c b/kernel/mars_trans_logger.c index b887152a..b3b91b6d 100644 --- a/kernel/mars_trans_logger.c +++ b/kernel/mars_trans_logger.c @@ -756,7 +756,7 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_ #ifdef DELAY_CALLERS // delay in case of too many master shadows / memory shortage - brick_wait(brick->caller_event, + brick_wait_flagged(brick->caller_event, brick->caller_flag, brick_global_memlimit < 1024 || atomic64_read(&global_mshadow_used) / 1024 < brick_global_memlimit, @@ -1018,7 +1018,8 @@ void _trans_logger_endio(struct generic_callback *cb) #ifdef ADDITIONAL_COUNTERS atomic_inc(&brick->total_cb_count); #endif - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); return; err: @@ -1064,7 +1065,8 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object #endif qq_mref_insert(&brick->q_phase[0], mref_a); - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); return; } @@ -1232,7 +1234,8 @@ void wb_endio(struct generic_callback *cb) } done: - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); return; err: @@ -1632,7 +1635,8 @@ void phase0_endio(void *private, int error) qq_deactivate(&brick->q_phase[0]); - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); return; err: MARS_ERR("giving up...\n"); @@ -1846,7 +1850,8 @@ void phase1_endio(struct generic_callback *cb) // queue up for the next phase qq_wb_insert(&brick->q_phase[2], wb); qq_deactivate(&brick->q_phase[1]); - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); return; err: @@ -1919,7 +1924,8 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a) #endif qq_wb_insert(&brick->q_phase[3], wb); qq_deactivate(&brick->q_phase[1]); - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); } done: @@ -1945,7 +1951,8 @@ void _phase2_endio(struct writeback_info *wb) // queue up for the next phase qq_wb_insert(&brick->q_phase[3], wb); - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); return; } @@ -2058,7 +2065,8 @@ bool phase2_startio(struct writeback_info *wb) ok = false; } } - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); } else { _phase2_endio(wb); } @@ -2103,7 +2111,8 @@ void phase3_endio(struct generic_callback *cb) qq_deactivate(&brick->q_phase[3]); - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); return; @@ -2178,7 +2187,8 @@ int run_mref_queue(struct logger_queue *q, bool (*startio)(struct trans_logger_m done: if (found) { mars_limit(&global_writeback.limiter, (total_len - 1) / 1024 + 1); - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); } return res; } @@ -2212,7 +2222,8 @@ int run_wb_queue(struct logger_queue *q, bool (*startio)(struct writeback_info * done: if (found) { mars_limit(&global_writeback.limiter, (total_len - 1) / 1024 + 1); - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); } return res; } @@ -2373,7 +2384,8 @@ int _do_ranking(struct trans_logger_brick *brick) } } else if (brick->delay_callers) { brick->delay_callers = false; - brick_wake(&brick->caller_event, brick->caller_flag); + brick_wake_flagged(&brick->caller_event, + brick->caller_flag); } // global limit for flying mrefs @@ -2714,7 +2726,7 @@ void trans_logger_log(struct trans_logger_brick *brick) int winner; int nr; - brick_wait( + brick_wait_flagged( brick->worker_event, brick->worker_flag, ({ @@ -2847,7 +2859,8 @@ void replay_endio(struct generic_callback *cb) MARS_ERR("callback with empty replay_head (replay_count=%d)\n", atomic_read(&brick->replay_count)); } - brick_wake(&brick->worker_event, brick->worker_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); return; err: MARS_FAT("cannot handle replay IO\n"); @@ -2886,8 +2899,8 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe bool ok = false; bool was_empty; - brick_wait(brick->worker_event, - brick->worker_flag, + brick_wait_flagged(brick->worker_event, + brick->worker_flag, atomic_read(&brick->replay_count) < max && (_has_conflict(brick, mref_a) ? conflicts++ : (ok = true), ok), 60 * HZ); @@ -3167,7 +3180,7 @@ void trans_logger_replay(struct trans_logger_brick *brick) ((long long)jiffies) - old_jiffies >= HZ * 3) && finished_pos >= 0) { // for safety, wait until the IO queue has drained. - brick_wait(brick->worker_event, + brick_wait_flagged(brick->worker_event, brick->worker_flag, atomic_read(&brick->replay_count) <= 0, 30 * HZ); @@ -3197,7 +3210,7 @@ void trans_logger_replay(struct trans_logger_brick *brick) MARS_INF("waiting for finish...\n"); - brick_wait(brick->worker_event, + brick_wait_flagged(brick->worker_event, brick->worker_flag, atomic_read(&brick->replay_count) <= 0, 60 * HZ); @@ -3260,7 +3273,7 @@ void trans_logger_replay(struct trans_logger_brick *brick) mars_trigger(); while (!brick_thread_should_stop()) { - brick_wait(brick->worker_event, + brick_wait_flagged(brick->worker_event, brick->worker_flag, brick_thread_should_stop(), HZ / 10); @@ -3311,8 +3324,10 @@ int trans_logger_switch(struct trans_logger_brick *brick) mars_power_led_on((void*)brick, false); if (brick->thread) { brick->terminate = true; - brick_wake(&brick->worker_event, brick->worker_flag); - brick_wake(&brick->caller_event, brick->caller_flag); + brick_wake_flagged(&brick->worker_event, + brick->worker_flag); + brick_wake_flagged(&brick->caller_event, + brick->caller_flag); if (brick->terminated) { MARS_INF("stopping thread...\n"); brick_thread_stop(brick->thread);