mirror of
https://github.com/ceph/ceph
synced 2025-01-01 08:32:24 +00:00
rgw: Pull lock out of RGWInitDataSyncStatusCoroutine
RGWDataSyncCR manages the lock instead, holding it through StateInit and StateBuildingFullSyncMaps but releasing it by StateSync. Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
This commit is contained in:
parent
2b3fbcf9b1
commit
b0d401a0f4
@ -521,57 +521,52 @@ bool RGWListRemoteDataLogCR::spawn_next() {
|
||||
}
|
||||
|
||||
class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
|
||||
static constexpr uint32_t lock_duration = 30;
|
||||
RGWDataSyncCtx *sc;
|
||||
RGWDataSyncEnv *sync_env;
|
||||
rgw::sal::RadosStore* driver; // RGWDataSyncEnv also has a pointer to driver
|
||||
const rgw_pool& pool;
|
||||
static constexpr auto lock_name{ "sync_lock"sv };
|
||||
RGWDataSyncCtx* const sc;
|
||||
RGWDataSyncEnv* const sync_env{ sc->env };
|
||||
const uint32_t num_shards;
|
||||
rgw_data_sync_status* const status;
|
||||
RGWSyncTraceNodeRef tn;
|
||||
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
|
||||
|
||||
string sync_status_oid;
|
||||
const rgw_pool& pool{ sync_env->svc->zone->get_zone_params().log_pool };
|
||||
const string sync_status_oid{
|
||||
RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) };
|
||||
|
||||
string lock_name;
|
||||
string cookie;
|
||||
rgw_data_sync_status *status;
|
||||
std::vector<RGWObjVersionTracker>& objvs;
|
||||
map<int, RGWDataChangesLogInfo> shards_info;
|
||||
|
||||
RGWSyncTraceNodeRef tn;
|
||||
|
||||
public:
|
||||
RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, uint32_t num_shards,
|
||||
uint64_t instance_id,
|
||||
RGWSyncTraceNodeRef& _tn_parent,
|
||||
rgw_data_sync_status *status,
|
||||
std::vector<RGWObjVersionTracker>& objvs)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), driver(sync_env->driver),
|
||||
pool(sync_env->svc->zone->get_zone_params().log_pool),
|
||||
num_shards(num_shards), status(status), objvs(objvs),
|
||||
tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
|
||||
lock_name = "sync_lock";
|
||||
|
||||
RGWInitDataSyncStatusCoroutine(
|
||||
RGWDataSyncCtx* _sc, uint32_t num_shards, uint64_t instance_id,
|
||||
const RGWSyncTraceNodeRef& tn_parent, rgw_data_sync_status* status,
|
||||
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr,
|
||||
std::vector<RGWObjVersionTracker>& objvs)
|
||||
: RGWCoroutine(_sc->cct), sc(_sc), num_shards(num_shards), status(status),
|
||||
tn(sync_env->sync_tracer->add_node(tn_parent, "init_data_sync_status")),
|
||||
lease_cr(std::move(lease_cr)), objvs(objvs) {
|
||||
status->sync_info.instance_id = instance_id;
|
||||
}
|
||||
|
||||
#define COOKIE_LEN 16
|
||||
char buf[COOKIE_LEN + 1];
|
||||
|
||||
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
|
||||
cookie = buf;
|
||||
|
||||
sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sc->source_zone);
|
||||
|
||||
static auto continuous_lease_cr(RGWDataSyncCtx* const sc,
|
||||
RGWCoroutine* const caller) {
|
||||
auto lock_duration = sc->cct->_conf->rgw_sync_lease_period;
|
||||
return new RGWContinuousLeaseCR(
|
||||
sc->env->async_rados, sc->env->driver,
|
||||
{ sc->env->svc->zone->get_zone_params().log_pool,
|
||||
RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) },
|
||||
string(lock_name), lock_duration, caller);
|
||||
}
|
||||
|
||||
int operate(const DoutPrefixProvider *dpp) override {
|
||||
int ret;
|
||||
reenter(this) {
|
||||
using LockCR = RGWSimpleRadosLockCR;
|
||||
yield call(new LockCR(sync_env->async_rados, driver,
|
||||
rgw_raw_obj{pool, sync_status_oid},
|
||||
lock_name, cookie, lock_duration));
|
||||
if (retcode < 0) {
|
||||
tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
|
||||
return set_cr_error(retcode);
|
||||
if (!lease_cr->is_locked()) {
|
||||
drain_all();
|
||||
return set_cr_error(-ECANCELED);
|
||||
}
|
||||
|
||||
using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
|
||||
yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
|
||||
rgw_raw_obj{pool, sync_status_oid},
|
||||
@ -581,16 +576,11 @@ public:
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
/* take lock again, we just recreated the object */
|
||||
yield call(new LockCR(sync_env->async_rados, driver,
|
||||
rgw_raw_obj{pool, sync_status_oid},
|
||||
lock_name, cookie, lock_duration));
|
||||
if (retcode < 0) {
|
||||
tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
tn->log(10, "took lease");
|
||||
// In the original code we reacquired the lock. Since
|
||||
// RGWSimpleRadosWriteCR doesn't appear to touch the attributes
|
||||
// and cls_version works across it, this should be unnecessary.
|
||||
// Putting a note here just in case. If we see ECANCELED where
|
||||
// we expect EBUSY, we can revisit this.
|
||||
|
||||
/* fetch current position in logs */
|
||||
yield {
|
||||
@ -641,9 +631,6 @@ public:
|
||||
tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, driver,
|
||||
rgw_raw_obj{pool, sync_status_oid},
|
||||
lock_name, cookie));
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
@ -781,6 +768,65 @@ int RGWRemoteDataLog::read_recovering_shards(const DoutPrefixProvider *dpp, cons
|
||||
return ret;
|
||||
}
|
||||
|
||||
namespace RGWRDL {
|
||||
class DataSyncInitCR : public RGWCoroutine {
|
||||
RGWDataSyncCtx* const sc;
|
||||
const uint32_t num_shards;
|
||||
uint64_t instance_id;
|
||||
const RGWSyncTraceNodeRef& tn;
|
||||
rgw_data_sync_status* const sync_status;
|
||||
std::vector<RGWObjVersionTracker>& objvs;
|
||||
|
||||
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
|
||||
|
||||
public:
|
||||
|
||||
DataSyncInitCR(RGWDataSyncCtx* sc, uint32_t num_shards, uint64_t instance_id,
|
||||
const RGWSyncTraceNodeRef& tn,
|
||||
rgw_data_sync_status* sync_status,
|
||||
std::vector<RGWObjVersionTracker>& objvs)
|
||||
: RGWCoroutine(sc->cct), sc(sc), num_shards(num_shards),
|
||||
instance_id(instance_id), tn(tn),
|
||||
sync_status(sync_status), objvs(objvs) {}
|
||||
|
||||
~DataSyncInitCR() override {
|
||||
if (lease_cr) {
|
||||
lease_cr->abort();
|
||||
}
|
||||
}
|
||||
|
||||
int operate(const DoutPrefixProvider *dpp) override {
|
||||
reenter(this) {
|
||||
lease_cr.reset(
|
||||
RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc, this));
|
||||
|
||||
yield spawn(lease_cr.get(), false);
|
||||
while (!lease_cr->is_locked()) {
|
||||
if (lease_cr->is_done()) {
|
||||
tn->log(5, "ERROR: failed to take data sync status lease");
|
||||
set_status("lease lock failed, early abort");
|
||||
drain_all();
|
||||
return set_cr_error(lease_cr->get_ret_status());
|
||||
}
|
||||
tn->log(5, "waiting on data sync status lease");
|
||||
yield set_sleeping(true);
|
||||
}
|
||||
tn->log(5, "acquired data sync status lease");
|
||||
yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id,
|
||||
tn, sync_status, lease_cr, objvs));
|
||||
lease_cr->go_down();
|
||||
lease_cr.reset();
|
||||
drain_all();
|
||||
if (retcode < 0) {
|
||||
set_cr_error(retcode);
|
||||
}
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_shards)
|
||||
{
|
||||
rgw_data_sync_status sync_status;
|
||||
@ -799,7 +845,8 @@ int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_sh
|
||||
auto instance_id = ceph::util::generate_random_number<uint64_t>();
|
||||
RGWDataSyncCtx sc_local = sc;
|
||||
sc_local.env = &sync_env_local;
|
||||
ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status, objvs));
|
||||
ret = crs.run(dpp, new RGWRDL::DataSyncInitCR(&sc_local, num_shards,
|
||||
instance_id, tn, &sync_status, objvs));
|
||||
http_manager.stop();
|
||||
return ret;
|
||||
}
|
||||
@ -2192,6 +2239,10 @@ class RGWDataSyncCR : public RGWCoroutine {
|
||||
|
||||
RGWDataSyncModule *data_sync_module{nullptr};
|
||||
RGWObjVersionTracker objv;
|
||||
|
||||
boost::intrusive_ptr<RGWContinuousLeaseCR> init_lease;
|
||||
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
|
||||
|
||||
public:
|
||||
RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
|
||||
sc(_sc), sync_env(_sc->env),
|
||||
@ -2204,6 +2255,9 @@ public:
|
||||
for (auto iter : shard_crs) {
|
||||
iter.second->put();
|
||||
}
|
||||
if (init_lease) {
|
||||
init_lease->abort();
|
||||
}
|
||||
}
|
||||
|
||||
int operate(const DoutPrefixProvider *dpp) override {
|
||||
@ -2219,15 +2273,37 @@ public:
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state !=
|
||||
rgw_data_sync_info::StateSync) {
|
||||
init_lease.reset(
|
||||
RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc, this));
|
||||
yield lease_stack.reset(spawn(init_lease.get(), false));
|
||||
|
||||
while (!init_lease->is_locked()) {
|
||||
if (init_lease->is_done()) {
|
||||
tn->log(5, "ERROR: failed to take data sync status lease");
|
||||
set_status("lease lock failed, early abort");
|
||||
drain_all();
|
||||
return set_cr_error(init_lease->get_ret_status());
|
||||
}
|
||||
tn->log(5, "waiting on data sync status lease");
|
||||
yield set_sleeping(true);
|
||||
}
|
||||
tn->log(5, "acquired data sync status lease");
|
||||
}
|
||||
|
||||
/* state: init status */
|
||||
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
|
||||
tn->log(20, SSTR("init"));
|
||||
sync_status.sync_info.num_shards = num_shards;
|
||||
uint64_t instance_id;
|
||||
instance_id = ceph::util::generate_random_number<uint64_t>();
|
||||
yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status, objvs));
|
||||
yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn,
|
||||
&sync_status, init_lease, objvs));
|
||||
if (retcode < 0) {
|
||||
tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
|
||||
init_lease->go_down();
|
||||
drain_all();
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
// sets state = StateBuildingFullSyncMaps
|
||||
@ -2246,6 +2322,12 @@ public:
|
||||
tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
if (!init_lease->is_locked()) {
|
||||
init_lease->go_down();
|
||||
drain_all();
|
||||
return set_cr_error(-ECANCELED);
|
||||
}
|
||||
/* state: building full sync maps */
|
||||
yield call(new RGWListBucketIndexesCR(sc, &sync_status, objvs));
|
||||
if (retcode < 0) {
|
||||
@ -2254,6 +2336,11 @@ public:
|
||||
}
|
||||
sync_status.sync_info.state = rgw_data_sync_info::StateSync;
|
||||
|
||||
if (!init_lease->is_locked()) {
|
||||
init_lease->go_down();
|
||||
drain_all();
|
||||
return set_cr_error(-ECANCELED);
|
||||
}
|
||||
/* update new state */
|
||||
yield call(set_sync_info_cr());
|
||||
if (retcode < 0) {
|
||||
@ -2269,9 +2356,15 @@ public:
|
||||
tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode));
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
yield {
|
||||
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
|
||||
|
||||
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
|
||||
if (init_lease) {
|
||||
init_lease->go_down();
|
||||
drain_all();
|
||||
init_lease.reset();
|
||||
lease_stack.reset();
|
||||
}
|
||||
yield {
|
||||
tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
|
||||
for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
|
||||
iter != sync_status.sync_markers.end(); ++iter) {
|
||||
|
Loading…
Reference in New Issue
Block a user