infra: add suffix _flagged to historic bick waiting

This commit is contained in:
Thomas Schoebel-Theuer 2022-08-23 15:48:08 +02:00
parent 677a8b4e1e
commit 37078c9800
3 changed files with 43 additions and 27 deletions

View File

@ -24,10 +24,10 @@
#ifndef BRICK_WAIT_H
#define BRICK_WAIT_H
/* Try to abstract from changes of the upstream kernel
* by using a hopefully stable interface.
/* Historic adaptor.
* To disappear somewhen.
*/
#define brick_wait(wq, flag, condition, timeout) \
#define brick_wait_flagged(wq, flag, condition, timeout) \
({ \
long __tmout = (timeout); \
int __old_flag; \
@ -52,7 +52,7 @@
__tmout; \
})
#define brick_wake(wq, flag) \
#define brick_wake_flagged(wq, flag) \
({ \
smp_rmb(); \
(flag)++; \

View File

@ -140,7 +140,8 @@ void log_write_endio(struct generic_callback *cb)
atomic_dec(&logst->mref_flying);
atomic_dec(&global_mref_flying);
if (logst->signal_event && logst->signal_flag)
brick_wake(logst->signal_event, *(logst->signal_flag));
brick_wake_flagged(logst->signal_event,
*(logst->signal_flag));
return;

View File

@ -756,7 +756,7 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
#ifdef DELAY_CALLERS
// delay in case of too many master shadows / memory shortage
brick_wait(brick->caller_event,
brick_wait_flagged(brick->caller_event,
brick->caller_flag,
brick_global_memlimit < 1024 ||
atomic64_read(&global_mshadow_used) / 1024 < brick_global_memlimit,
@ -1018,7 +1018,8 @@ void _trans_logger_endio(struct generic_callback *cb)
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_cb_count);
#endif
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
return;
err:
@ -1064,7 +1065,8 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
#endif
qq_mref_insert(&brick->q_phase[0], mref_a);
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
return;
}
@ -1232,7 +1234,8 @@ void wb_endio(struct generic_callback *cb)
}
done:
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
return;
err:
@ -1632,7 +1635,8 @@ void phase0_endio(void *private, int error)
qq_deactivate(&brick->q_phase[0]);
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
return;
err:
MARS_ERR("giving up...\n");
@ -1846,7 +1850,8 @@ void phase1_endio(struct generic_callback *cb)
// queue up for the next phase
qq_wb_insert(&brick->q_phase[2], wb);
qq_deactivate(&brick->q_phase[1]);
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
return;
err:
@ -1919,7 +1924,8 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a)
#endif
qq_wb_insert(&brick->q_phase[3], wb);
qq_deactivate(&brick->q_phase[1]);
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
}
done:
@ -1945,7 +1951,8 @@ void _phase2_endio(struct writeback_info *wb)
// queue up for the next phase
qq_wb_insert(&brick->q_phase[3], wb);
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
return;
}
@ -2058,7 +2065,8 @@ bool phase2_startio(struct writeback_info *wb)
ok = false;
}
}
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
} else {
_phase2_endio(wb);
}
@ -2103,7 +2111,8 @@ void phase3_endio(struct generic_callback *cb)
qq_deactivate(&brick->q_phase[3]);
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
return;
@ -2178,7 +2187,8 @@ int run_mref_queue(struct logger_queue *q, bool (*startio)(struct trans_logger_m
done:
if (found) {
mars_limit(&global_writeback.limiter, (total_len - 1) / 1024 + 1);
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
}
return res;
}
@ -2212,7 +2222,8 @@ int run_wb_queue(struct logger_queue *q, bool (*startio)(struct writeback_info *
done:
if (found) {
mars_limit(&global_writeback.limiter, (total_len - 1) / 1024 + 1);
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
}
return res;
}
@ -2373,7 +2384,8 @@ int _do_ranking(struct trans_logger_brick *brick)
}
} else if (brick->delay_callers) {
brick->delay_callers = false;
brick_wake(&brick->caller_event, brick->caller_flag);
brick_wake_flagged(&brick->caller_event,
brick->caller_flag);
}
// global limit for flying mrefs
@ -2714,7 +2726,7 @@ void trans_logger_log(struct trans_logger_brick *brick)
int winner;
int nr;
brick_wait(
brick_wait_flagged(
brick->worker_event,
brick->worker_flag,
({
@ -2847,7 +2859,8 @@ void replay_endio(struct generic_callback *cb)
MARS_ERR("callback with empty replay_head (replay_count=%d)\n", atomic_read(&brick->replay_count));
}
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
return;
err:
MARS_FAT("cannot handle replay IO\n");
@ -2886,8 +2899,8 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
bool ok = false;
bool was_empty;
brick_wait(brick->worker_event,
brick->worker_flag,
brick_wait_flagged(brick->worker_event,
brick->worker_flag,
atomic_read(&brick->replay_count) < max
&& (_has_conflict(brick, mref_a) ? conflicts++ : (ok = true), ok),
60 * HZ);
@ -3167,7 +3180,7 @@ void trans_logger_replay(struct trans_logger_brick *brick)
((long long)jiffies) - old_jiffies >= HZ * 3) &&
finished_pos >= 0) {
// for safety, wait until the IO queue has drained.
brick_wait(brick->worker_event,
brick_wait_flagged(brick->worker_event,
brick->worker_flag,
atomic_read(&brick->replay_count) <= 0,
30 * HZ);
@ -3197,7 +3210,7 @@ void trans_logger_replay(struct trans_logger_brick *brick)
MARS_INF("waiting for finish...\n");
brick_wait(brick->worker_event,
brick_wait_flagged(brick->worker_event,
brick->worker_flag,
atomic_read(&brick->replay_count) <= 0,
60 * HZ);
@ -3260,7 +3273,7 @@ void trans_logger_replay(struct trans_logger_brick *brick)
mars_trigger();
while (!brick_thread_should_stop()) {
brick_wait(brick->worker_event,
brick_wait_flagged(brick->worker_event,
brick->worker_flag,
brick_thread_should_stop(),
HZ / 10);
@ -3311,8 +3324,10 @@ int trans_logger_switch(struct trans_logger_brick *brick)
mars_power_led_on((void*)brick, false);
if (brick->thread) {
brick->terminate = true;
brick_wake(&brick->worker_event, brick->worker_flag);
brick_wake(&brick->caller_event, brick->caller_flag);
brick_wake_flagged(&brick->worker_event,
brick->worker_flag);
brick_wake_flagged(&brick->caller_event,
brick->caller_flag);
if (brick->terminated) {
MARS_INF("stopping thread...\n");
brick_thread_stop(brick->thread);