Merge pull request #49803 from soumyakoduri/wip-skoduri-sync-flow

rgw/sync-policy: Support disabling per-bucket replication

Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Casey Bodley 2023-04-06 10:26:41 -04:00 committed by GitHub
commit 7d4c7b6987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 84 additions and 7 deletions

View File

@ -467,6 +467,12 @@ RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSync
}
void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
/* Ensure this pipe doesn't match with any disabled pipes */
for (auto p: disabled_pipe_map) {
if (p.second.source.match(pipe.source) && p.second.dest.match(pipe.dest)) {
return;
}
}
pipe_map.insert(make_pair(pipe.id, pipe));
auto& rules_ref = rules[endpoints_pair(pipe)];
@ -482,6 +488,32 @@ void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe
handlers.insert(h);
}
void RGWBucketSyncFlowManager::pipe_set::remove_all() {
pipe_map.clear();
disabled_pipe_map.clear();
rules.clear();
handlers.clear();
}
void RGWBucketSyncFlowManager::pipe_set::disable(const rgw_sync_bucket_pipe& pipe) {
/* This pipe is disabled. Add it to disabled pipes & remove any
* matching pipes already inserted
*/
disabled_pipe_map.insert(make_pair(pipe.id, pipe));
for (auto iter_p = pipe_map.begin(); iter_p != pipe_map.end(); ) {
auto p = iter_p++;
if (p->second.source.match(pipe.source) && p->second.dest.match(pipe.dest)) {
auto& rules_ref = rules[endpoints_pair(p->second)];
if (rules_ref) {
pipe_handler h(rules_ref, p->second);
handlers.erase(h);
}
rules.erase(endpoints_pair(p->second));
pipe_map.erase(p);
}
}
}
void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
{
encode_json("pipes", pipe_map, f);
@ -557,6 +589,30 @@ void RGWBucketSyncFlowManager::init(const DoutPrefixProvider *dpp, const rgw_syn
}
}
/*
* These are the semantics to be followed while resolving the policy
* conflicts -
*
* ==================================================
* zonegroup bucket Result
* ==================================================
* enabled enabled enabled
* allowed enabled
* forbidden disabled
* allowed enabled enabled
* allowed disabled
* forbidden disabled
* forbidden enabled disabled
* allowed disabled
* forbidden disabled
*
* In case multiple group policies are set to reflect for any sync pair
* (<source-zone,source-bucket>, <dest-zone,dest-bucket>), the following
* rules are applied in the order-
* 1) Even if one policy status is FORBIDDEN, the sync will be disabled
* 2) Atleast one policy should be ENABLED for the sync to be allowed.
*
*/
void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
std::optional<rgw_bucket> effective_bucket,
RGWBucketSyncFlowManager::pipe_set *source_pipes,
@ -565,6 +621,7 @@ void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
{
string effective_bucket_key;
bool is_forbidden = false;
if (effective_bucket) {
effective_bucket_key = effective_bucket->get_key();
}
@ -574,10 +631,16 @@ void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
for (auto& item : flow_groups) {
auto& flow_group_map = item.second;
is_forbidden = false;
/* only return enabled groups */
if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
if (flow_group_map.status == rgw_sync_policy_group::Status::FORBIDDEN) {
/* FORBIDDEN takes precedence over all the other rules.
* Remove any other pipes which may allow access.
*/
is_forbidden = true;
} else if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
(only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) {
/* only return enabled groups */
continue;
}
@ -590,8 +653,13 @@ void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl;
source_pipes->insert(pipe);
if (is_forbidden) {
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): removing source pipe: " << pipe << dendl;
source_pipes->disable(pipe);
} else {
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl;
source_pipes->insert(pipe);
}
}
for (auto& entry : flow_group_map.dests) {
@ -604,8 +672,13 @@ void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl;
dest_pipes->insert(pipe);
if (is_forbidden) {
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): removing dest pipe: " << pipe << dendl;
dest_pipes->disable(pipe);
} else {
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl;
dest_pipes->insert(pipe);
}
}
}
}

View File

@ -31,7 +31,7 @@ struct rgw_sync_group_pipe_map {
rgw_zone_id zone;
std::optional<rgw_bucket> bucket;
rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN};
rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::UNKNOWN};
using zb_pipe_map_t = std::multimap<rgw_sync_bucket_entity, rgw_sync_bucket_pipe>;
@ -209,6 +209,7 @@ public:
struct pipe_set {
std::map<endpoints_pair, pipe_rules_ref> rules;
std::multimap<std::string, rgw_sync_bucket_pipe> pipe_map;
std::multimap<std::string, rgw_sync_bucket_pipe> disabled_pipe_map;
std::set<pipe_handler> handlers;
@ -217,10 +218,13 @@ public:
void clear() {
rules.clear();
pipe_map.clear();
disabled_pipe_map.clear();
handlers.clear();
}
void insert(const rgw_sync_bucket_pipe& pipe);
void remove_all();
void disable(const rgw_sync_bucket_pipe& pipe);
iterator begin() const {
return handlers.begin();