mirror of
https://github.com/ceph/ceph
synced 2025-03-30 23:40:09 +00:00
crimson/osd/osd_operations: run peering_state related operations in a seastar::thread
Some peering events handling may involve seastar::future::wait(), so we need to run the peering state machine in a dedicated seastar::thread Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
This commit is contained in:
parent
19d4aebace
commit
bc23cf816f
@ -89,9 +89,11 @@ seastar::future<> PeeringEvent<T>::with_pg(
|
||||
return this->template enter_stage<interruptor>(
|
||||
BackfillRecovery::bp(*pg).process);
|
||||
}).then_interruptible([this, pg, &shard_services] {
|
||||
pg->do_peering_event(evt, ctx);
|
||||
that()->get_handle().exit();
|
||||
return complete_rctx(shard_services, pg);
|
||||
return pg->do_peering_event(evt, ctx
|
||||
).then_interruptible([this, pg, &shard_services] {
|
||||
that()->get_handle().exit();
|
||||
return complete_rctx(shard_services, pg);
|
||||
});
|
||||
}).then_interruptible([pg, &shard_services]()
|
||||
-> typename T::template interruptible_future<> {
|
||||
if (!pg->get_need_up_thru()) {
|
||||
|
@ -63,39 +63,45 @@ seastar::future<> PGAdvanceMap::start()
|
||||
pg->peering_request_pg_pipeline.process
|
||||
).then([this] {
|
||||
from = pg->get_osdmap_epoch();
|
||||
auto fut = seastar::now();
|
||||
if (do_init) {
|
||||
pg->handle_initialize(rctx);
|
||||
pg->handle_activate_map(rctx);
|
||||
}
|
||||
return seastar::do_for_each(
|
||||
boost::make_counting_iterator(*from + 1),
|
||||
boost::make_counting_iterator(to + 1),
|
||||
[this](epoch_t next_epoch) {
|
||||
return shard_services.get_map(next_epoch).then(
|
||||
[this] (cached_map_t&& next_map) {
|
||||
logger().debug("{}: advancing map to {}",
|
||||
*this, next_map->get_epoch());
|
||||
pg->handle_advance_map(next_map, rctx);
|
||||
});
|
||||
}).then([this] {
|
||||
pg->handle_activate_map(rctx);
|
||||
logger().debug("{}: map activated", *this);
|
||||
if (do_init) {
|
||||
shard_services.pg_created(pg->get_pgid(), pg);
|
||||
logger().info("PGAdvanceMap::start new pg {}", *pg);
|
||||
}
|
||||
return seastar::when_all_succeed(
|
||||
pg->get_need_up_thru()
|
||||
? shard_services.send_alive(
|
||||
pg->get_same_interval_since())
|
||||
: seastar::now(),
|
||||
shard_services.dispatch_context(
|
||||
pg->get_collection_ref(),
|
||||
std::move(rctx)));
|
||||
}).then_unpack([this] {
|
||||
logger().debug("{}: sending pg temp", *this);
|
||||
return shard_services.send_pg_temp();
|
||||
fut = pg->handle_initialize(rctx
|
||||
).then([this] {
|
||||
return pg->handle_activate_map(rctx);
|
||||
});
|
||||
}
|
||||
return fut.then([this] {
|
||||
return seastar::do_for_each(
|
||||
boost::make_counting_iterator(*from + 1),
|
||||
boost::make_counting_iterator(to + 1),
|
||||
[this](epoch_t next_epoch) {
|
||||
return shard_services.get_map(next_epoch).then(
|
||||
[this] (cached_map_t&& next_map) {
|
||||
logger().debug("{}: advancing map to {}",
|
||||
*this, next_map->get_epoch());
|
||||
return pg->handle_advance_map(next_map, rctx);
|
||||
});
|
||||
}).then([this] {
|
||||
return pg->handle_activate_map(rctx).then([this] {
|
||||
logger().debug("{}: map activated", *this);
|
||||
if (do_init) {
|
||||
shard_services.pg_created(pg->get_pgid(), pg);
|
||||
logger().info("PGAdvanceMap::start new pg {}", *pg);
|
||||
}
|
||||
return seastar::when_all_succeed(
|
||||
pg->get_need_up_thru()
|
||||
? shard_services.send_alive(
|
||||
pg->get_same_interval_since())
|
||||
: seastar::now(),
|
||||
shard_services.dispatch_context(
|
||||
pg->get_collection_ref(),
|
||||
std::move(rctx)));
|
||||
});
|
||||
}).then_unpack([this] {
|
||||
logger().debug("{}: sending pg temp", *this);
|
||||
return shard_services.send_pg_temp();
|
||||
});
|
||||
});
|
||||
}).then([this, ref=std::move(ref)] {
|
||||
logger().debug("{}: complete", *this);
|
||||
});
|
||||
|
@ -494,49 +494,61 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
|
||||
});
|
||||
}
|
||||
|
||||
void PG::do_peering_event(
|
||||
PG::interruptible_future<> PG::do_peering_event(
|
||||
PGPeeringEvent& evt, PeeringCtx &rctx)
|
||||
{
|
||||
if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
|
||||
peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
|
||||
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
|
||||
return interruptor::now();
|
||||
} else {
|
||||
logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
|
||||
peering_state.handle_event(
|
||||
evt.get_event(),
|
||||
&rctx);
|
||||
peering_state.write_if_dirty(rctx.transaction);
|
||||
// all peering event handling needs to be run in a dedicated seastar::thread,
|
||||
// so that event processing can involve I/O reqs freely, for example: PG::on_removal,
|
||||
// PG::on_new_interval
|
||||
return interruptor::async([this, &evt, &rctx] {
|
||||
peering_state.handle_event(
|
||||
evt.get_event(),
|
||||
&rctx);
|
||||
peering_state.write_if_dirty(rctx.transaction);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void PG::handle_advance_map(
|
||||
seastar::future<> PG::handle_advance_map(
|
||||
cached_map_t next_map, PeeringCtx &rctx)
|
||||
{
|
||||
vector<int> newup, newacting;
|
||||
int up_primary, acting_primary;
|
||||
next_map->pg_to_up_acting_osds(
|
||||
pgid.pgid,
|
||||
&newup, &up_primary,
|
||||
&newacting, &acting_primary);
|
||||
peering_state.advance_map(
|
||||
next_map,
|
||||
peering_state.get_osdmap(),
|
||||
newup,
|
||||
up_primary,
|
||||
newacting,
|
||||
acting_primary,
|
||||
rctx);
|
||||
osdmap_gate.got_map(next_map->get_epoch());
|
||||
return seastar::async([this, next_map=std::move(next_map), &rctx] {
|
||||
vector<int> newup, newacting;
|
||||
int up_primary, acting_primary;
|
||||
next_map->pg_to_up_acting_osds(
|
||||
pgid.pgid,
|
||||
&newup, &up_primary,
|
||||
&newacting, &acting_primary);
|
||||
peering_state.advance_map(
|
||||
next_map,
|
||||
peering_state.get_osdmap(),
|
||||
newup,
|
||||
up_primary,
|
||||
newacting,
|
||||
acting_primary,
|
||||
rctx);
|
||||
osdmap_gate.got_map(next_map->get_epoch());
|
||||
});
|
||||
}
|
||||
|
||||
void PG::handle_activate_map(PeeringCtx &rctx)
|
||||
seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
|
||||
{
|
||||
peering_state.activate_map(rctx);
|
||||
return seastar::async([this, &rctx] {
|
||||
peering_state.activate_map(rctx);
|
||||
});
|
||||
}
|
||||
|
||||
void PG::handle_initialize(PeeringCtx &rctx)
|
||||
seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
|
||||
{
|
||||
peering_state.handle_event(PeeringState::Initialize{}, &rctx);
|
||||
return seastar::async([this, &rctx] {
|
||||
peering_state.handle_event(PeeringState::Initialize{}, &rctx);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
@ -497,12 +497,12 @@ public:
|
||||
|
||||
seastar::future<> read_state(crimson::os::FuturizedStore* store);
|
||||
|
||||
void do_peering_event(
|
||||
interruptible_future<> do_peering_event(
|
||||
PGPeeringEvent& evt, PeeringCtx &rctx);
|
||||
|
||||
void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
|
||||
void handle_activate_map(PeeringCtx &rctx);
|
||||
void handle_initialize(PeeringCtx &rctx);
|
||||
seastar::future<> handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
|
||||
seastar::future<> handle_activate_map(PeeringCtx &rctx);
|
||||
seastar::future<> handle_initialize(PeeringCtx &rctx);
|
||||
|
||||
static hobject_t get_oid(const hobject_t& hobj);
|
||||
static RWState::State get_lock_type(const OpInfo &op_info);
|
||||
|
Loading…
Reference in New Issue
Block a user