mirror of
https://github.com/ceph/ceph
synced 2025-01-20 10:01:45 +00:00
Merge pull request #33193 from cbodley/wip-44068
rgw: data_sync_source_zones only contains 'exporting' zones Reviewed-by: Yehuda Sadeh <yehuda@redhat.com> Reviewed-by: Yuval Lifshitz <yuvalif@yahoo.com>
This commit is contained in:
commit
5e4582c1a2
@ -732,8 +732,8 @@ int RGWRemoteDataLog::init_sync_status(int num_shards)
|
||||
sync_env_local.http_manager = &http_manager;
|
||||
auto instance_id = ceph::util::generate_random_number<uint64_t>();
|
||||
RGWDataSyncCtx sc_local = sc;
|
||||
sc.env = &sync_env_local;
|
||||
ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sc, num_shards, instance_id, tn, &sync_status));
|
||||
sc_local.env = &sync_env_local;
|
||||
ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status));
|
||||
http_manager.stop();
|
||||
return ret;
|
||||
}
|
||||
|
@ -1783,6 +1783,8 @@ void rgw_bucket_shard_sync_info::decode_json(JSONObj *obj)
|
||||
state = StateFullSync;
|
||||
} else if (s == "incremental-sync") {
|
||||
state = StateIncrementalSync;
|
||||
} else if (s == "stopped") {
|
||||
state = StateStopped;
|
||||
} else {
|
||||
state = StateInit;
|
||||
}
|
||||
|
@ -591,15 +591,17 @@ public:
|
||||
trim_interval));
|
||||
stacks.push_back(meta);
|
||||
|
||||
auto data = new RGWCoroutinesStack(store->ctx(), &crs);
|
||||
data->call(create_data_log_trim_cr(store, &http,
|
||||
cct->_conf->rgw_data_log_num_shards,
|
||||
trim_interval));
|
||||
stacks.push_back(data);
|
||||
if (store->svc()->zone->sync_module_exports_data()) {
|
||||
auto data = new RGWCoroutinesStack(store->ctx(), &crs);
|
||||
data->call(create_data_log_trim_cr(store, &http,
|
||||
cct->_conf->rgw_data_log_num_shards,
|
||||
trim_interval));
|
||||
stacks.push_back(data);
|
||||
|
||||
auto bucket = new RGWCoroutinesStack(store->ctx(), &crs);
|
||||
bucket->call(bucket_trim->create_bucket_trim_cr(&http));
|
||||
stacks.push_back(bucket);
|
||||
auto bucket = new RGWCoroutinesStack(store->ctx(), &crs);
|
||||
bucket->call(bucket_trim->create_bucket_trim_cr(&http));
|
||||
stacks.push_back(bucket);
|
||||
}
|
||||
|
||||
crs.run(stacks);
|
||||
return 0;
|
||||
|
@ -110,13 +110,13 @@ public:
|
||||
}
|
||||
|
||||
|
||||
int supports_data_export(const string& name) {
|
||||
bool supports_data_export(const string& name) {
|
||||
RGWSyncModuleRef module;
|
||||
if (!get_module(name, &module)) {
|
||||
return -ENOENT;
|
||||
return false;
|
||||
}
|
||||
|
||||
return module.get()->supports_data_export();
|
||||
return module->supports_data_export();
|
||||
}
|
||||
|
||||
int create_instance(CephContext *cct, const string& name, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
|
||||
|
@ -364,8 +364,8 @@ int take_min_status(CephContext *cct, Iter first, Iter last,
|
||||
auto m = status->begin();
|
||||
for (auto& shard : *peer) {
|
||||
auto& marker = *m++;
|
||||
// only consider incremental sync markers
|
||||
if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
|
||||
// if no sync has started, we can safely trim everything
|
||||
if (shard.state == rgw_bucket_shard_sync_info::StateInit) {
|
||||
continue;
|
||||
}
|
||||
// always take the first marker, or any later marker that's smaller
|
||||
|
@ -194,13 +194,15 @@ int RGWSI_Zone::do_start()
|
||||
return ret;
|
||||
}
|
||||
|
||||
auto sync_modules = sync_modules_svc->get_manager();
|
||||
RGWSyncModuleRef sm;
|
||||
if (!sync_modules_svc->get_manager()->get_module(zone_public_config->tier_type, &sm)) {
|
||||
if (!sync_modules->get_module(zone_public_config->tier_type, &sm)) {
|
||||
lderr(cct) << "ERROR: tier type not found: " << zone_public_config->tier_type << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
writeable_zone = sm->supports_writes();
|
||||
exports_data = sm->supports_data_export();
|
||||
|
||||
/* first build all zones index */
|
||||
for (auto ziter : zonegroup->zones) {
|
||||
@ -232,7 +234,7 @@ int RGWSI_Zone::do_start()
|
||||
bool zone_is_target = target_zones.find(z.id) != target_zones.end();
|
||||
|
||||
if (zone_is_source || zone_is_target) {
|
||||
if (zone_is_source) {
|
||||
if (zone_is_source && sync_modules->supports_data_export(z.tier_type)) {
|
||||
data_sync_source_zones.push_back(&z);
|
||||
}
|
||||
if (zone_is_target) {
|
||||
@ -882,11 +884,6 @@ bool RGWSI_Zone::zone_is_writeable()
|
||||
return writeable_zone && !get_zone().is_read_only();
|
||||
}
|
||||
|
||||
bool RGWSI_Zone::sync_module_supports_writes() const
|
||||
{
|
||||
return writeable_zone;
|
||||
}
|
||||
|
||||
uint32_t RGWSI_Zone::get_zone_short_id() const
|
||||
{
|
||||
return zone_short_id;
|
||||
|
@ -41,6 +41,7 @@ class RGWSI_Zone : public RGWServiceInstance
|
||||
rgw_zone_id cur_zone_id;
|
||||
uint32_t zone_short_id{0};
|
||||
bool writeable_zone{false};
|
||||
bool exports_data{false};
|
||||
|
||||
std::shared_ptr<RGWBucketSyncPolicyHandler> sync_policy_handler;
|
||||
std::map<rgw_zone_id, std::shared_ptr<RGWBucketSyncPolicyHandler> > sync_policy_handlers;
|
||||
@ -94,7 +95,8 @@ public:
|
||||
bool zone_is_writeable();
|
||||
bool zone_syncs_from(const RGWZone& target_zone, const RGWZone& source_zone) const;
|
||||
bool get_redirect_zone_endpoint(string *endpoint);
|
||||
bool sync_module_supports_writes() const;
|
||||
bool sync_module_supports_writes() const { return writeable_zone; }
|
||||
bool sync_module_exports_data() const { return exports_data; }
|
||||
|
||||
RGWRESTConn *get_master_conn() {
|
||||
return rest_master_conn;
|
||||
|
@ -270,16 +270,12 @@ def bucket_sync_status(target_zone, source_zone, bucket_name):
|
||||
assert(retcode == 2) # ENOENT
|
||||
|
||||
bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
|
||||
log.debug('current bucket sync markers=%s', bucket_sync_status_json)
|
||||
sync_status = json.loads(bucket_sync_status_json)
|
||||
|
||||
markers={}
|
||||
for entry in sync_status:
|
||||
val = entry['val']
|
||||
if val['status'] == 'incremental-sync':
|
||||
pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
|
||||
else:
|
||||
pos = ''
|
||||
pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
|
||||
markers[entry['key']] = pos
|
||||
|
||||
return markers
|
||||
|
Loading…
Reference in New Issue
Block a user