cls/rgw: only promote instance entries if they sort after current instance

to support bidirectional replication, where each zone could create its
own instances with the same olh epoch, the sort order of those instances
should be used to determine whether an entry should be promoted over the
current instance

Fixes: https://tracker.ceph.com/issues/39142

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2019-10-30 11:21:06 -04:00
parent a078120512
commit 6f52900c81

View File

@ -1463,6 +1463,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
if (ret < 0) {
return ret;
}
const uint64_t prev_epoch = olh.get_epoch();
if (!olh.start_modify(op.olh_epoch)) {
ret = obj.write(op.olh_epoch, false);
@ -1475,6 +1476,12 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
return 0;
}
// promote this version to current if it's a newer epoch, or if it matches the
// current epoch and sorts after the current instance
const bool promote = (olh.get_epoch() > prev_epoch) ||
(olh.get_epoch() == prev_epoch &&
olh.get_entry().key.instance > op.key.instance);
if (olh_found) {
const string& olh_tag = olh.get_tag();
if (op.olh_tag != olh_tag) {
@ -1485,7 +1492,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
/* if pending removal, this is a new olh instance */
olh.set_tag(op.olh_tag);
}
if (olh.exists()) {
if (promote && olh.exists()) {
rgw_bucket_olh_entry& olh_entry = olh.get_entry();
/* found olh, previous instance is no longer the latest, need to update */
if (!(olh_entry.key == op.key)) {
@ -1502,7 +1509,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
} else {
bool instance_only = (op.key.instance.empty() && op.delete_marker);
cls_rgw_obj_key key(op.key.name);
ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only);
ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only);
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
return ret;
@ -1516,8 +1523,9 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
}
olh.update(op.key, op.delete_marker);
if (promote) {
olh.update(op.key, op.delete_marker);
}
olh.set_exists(true);
ret = olh.write();
@ -1527,7 +1535,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
}
/* write the instance and list entries */
ret = obj.write(olh.get_epoch(), true);
ret = obj.write(olh.get_epoch(), promote);
if (ret < 0) {
return ret;
}