Merge pull request #41882 from tchaikov/wip-crimson-int-safty

crimson/osd: guard non-pg-op handling with with_sequencer()

Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
This commit is contained in:
Kefu Chai 2021-06-16 22:09:28 +08:00 committed by GitHub
commit 08661e6f57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 34 deletions

View File

@ -87,42 +87,42 @@ seastar::future<> ClientRequest::start()
return interruptor::with_interruption([this, pgref]() mutable {
epoch_t same_interval_since = pgref->get_interval_start_epoch();
logger().debug("{} same_interval_since: {}", *this, same_interval_since);
may_set_prev_op();
return sequencer.start_op(
*this,
handle,
interruptor::wrap_function(
[this, pgref]() mutable -> interruptible_future<> {
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
return osd.send_incremental_map(conn, m->get_map_epoch());
}
const bool has_pg_op = is_pg_op();
auto interruptible_do_op = interruptor::wrap_function([=] {
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
return interruptible_future<>(
osd.send_incremental_map(conn, m->get_map_epoch()));
}
return with_blocking_future_interruptible<IOInterruptCondition>(
handle.enter(pp(pg).await_map)
).then_interruptible([this, &pg] {
return with_blocking_future_interruptible<IOInterruptCondition>(
handle.enter(pp(pg).await_map)
).then_interruptible([this, &pg] {
return with_blocking_future_interruptible<IOInterruptCondition>(
pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
}).then_interruptible([this, &pg](auto map) {
return with_blocking_future_interruptible<IOInterruptCondition>(
handle.enter(pp(pg).wait_for_active));
}).then_interruptible([this, &pg]() {
return with_blocking_future_interruptible<IOInterruptCondition>(
pg.wait_for_active_blocker.wait());
}).then_interruptible([this, pgref=std::move(pgref)]() mutable {
if (m->finish_decode()) {
m->clear_payload();
}
if (is_pg_op()) {
return process_pg_op(pgref);
} else {
return process_op(pgref);
}
});
})
).then_interruptible([this, pgref]() {
sequencer.finish_op(*this);
return seastar::stop_iteration::yes;
pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
}).then_interruptible([this, &pg](auto map) {
return with_blocking_future_interruptible<IOInterruptCondition>(
handle.enter(pp(pg).wait_for_active));
}).then_interruptible([this, &pg]() {
return with_blocking_future_interruptible<IOInterruptCondition>(
pg.wait_for_active_blocker.wait());
}).then_interruptible([this,
has_pg_op,
pgref=std::move(pgref)]() mutable {
if (m->finish_decode()) {
m->clear_payload();
}
return (has_pg_op ?
process_pg_op(pgref) :
process_op(pgref));
});
});
// keep the ordering of non-pg ops when across pg internvals
return (has_pg_op ?
interruptible_do_op() :
with_sequencer(std::move(interruptible_do_op)))
.then_interruptible([pgref]() {
return seastar::stop_iteration::yes;
});
}, [this, pgref](std::exception_ptr eptr) {
if (should_abort_request(*this, std::move(eptr))) {
sequencer.abort();

View File

@ -63,6 +63,14 @@ public:
}
private:
template <typename FuncT>
interruptible_future<> with_sequencer(FuncT&& func) {
may_set_prev_op();
return sequencer.start_op(*this, handle, std::forward<FuncT>(func))
.then_interruptible([this] {
sequencer.finish_op(*this);
});
}
interruptible_future<> do_process(
Ref<PG>& pg,
crimson::osd::ObjectContextRef obc);