mirror of
https://github.com/ceph/ceph
synced 2024-12-17 00:46:05 +00:00
rgw: add preliminary support for sync update policies on metadata sync
We want to be able to conditionally apply new updates: 1) if we already have a newer version than the sync is applying for some reason (replay of logs?), we don't want to go back in time. 2) If both zones were active at the same time, then we'd like to be able to do a merge based on timestamps. In order to support this, we add a sync_type flag to the implementations of RGWMetadataHandler::put, and then check the version or the mtime of the incoming put to what we have on disk, and refuse the update if needed. We return the 204 NoContent success code when refusing sync; for the moment the conversion is automatic but we're going to pull it out in the next couple commits. This commit does not complete the feature: we don't provide an interface for specifying a different sync protocol. Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
parent
934ad88142
commit
4f9855e470
@ -1394,7 +1394,8 @@ public:
|
||||
return 0;
|
||||
}
|
||||
|
||||
int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) {
|
||||
int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
|
||||
time_t mtime, JSONObj *obj, sync_type_t sync_type) {
|
||||
RGWBucketEntryPoint be, old_be;
|
||||
decode_json_obj(be, obj);
|
||||
|
||||
@ -1406,6 +1407,12 @@ public:
|
||||
if (ret < 0 && ret != -ENOENT)
|
||||
return ret;
|
||||
|
||||
// are we actually going to perform this put, or is it too old?
|
||||
if (!check_versions(old_ot.read_version, orig_mtime,
|
||||
objv_tracker.write_version, mtime, sync_type)) {
|
||||
return STATUS_NO_APPLY;
|
||||
}
|
||||
|
||||
objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
|
||||
|
||||
ret = store->put_bucket_entrypoint_info(entry, be, false, objv_tracker, mtime);
|
||||
@ -1536,7 +1543,8 @@ public:
|
||||
return 0;
|
||||
}
|
||||
|
||||
int put(RGWRados *store, string& oid, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) {
|
||||
int put(RGWRados *store, string& oid, RGWObjVersionTracker& objv_tracker,
|
||||
time_t mtime, JSONObj *obj, sync_type_t sync_type) {
|
||||
RGWBucketCompleteInfo bci, old_bci;
|
||||
decode_json_obj(bci, obj);
|
||||
|
||||
@ -1562,6 +1570,12 @@ public:
|
||||
bci.info.bucket.index_pool = old_bci.info.bucket.index_pool;
|
||||
}
|
||||
|
||||
// are we actually going to perform this put, or is it too old?
|
||||
if (!check_versions(old_bci.info.objv_tracker.read_version, orig_mtime,
|
||||
objv_tracker.write_version, mtime, sync_type)) {
|
||||
return STATUS_NO_APPLY;
|
||||
}
|
||||
|
||||
/* record the read version (if any), store the new version */
|
||||
bci.info.objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
|
||||
bci.info.objv_tracker.write_version = objv_tracker.write_version;
|
||||
|
@ -99,6 +99,7 @@ using ceph::crypto::MD5;
|
||||
#define STATUS_NO_CONTENT 1902
|
||||
#define STATUS_PARTIAL_CONTENT 1903
|
||||
#define STATUS_REDIRECT 1904
|
||||
#define STATUS_NO_APPLY 1905
|
||||
|
||||
#define ERR_INVALID_BUCKET_NAME 2000
|
||||
#define ERR_INVALID_OBJECT_NAME 2001
|
||||
|
@ -15,6 +15,7 @@ const static struct rgw_http_errors RGW_HTTP_ERRORS[] = {
|
||||
{ STATUS_ACCEPTED, 202, "Accepted" },
|
||||
{ STATUS_NO_CONTENT, 204, "NoContent" },
|
||||
{ STATUS_PARTIAL_CONTENT, 206, "" },
|
||||
{ STATUS_NO_APPLY, 204, "NoContent" },
|
||||
{ ERR_PERMANENT_REDIRECT, 301, "PermanentRedirect" },
|
||||
{ STATUS_REDIRECT, 303, "" },
|
||||
{ ERR_NOT_MODIFIED, 304, "NotModified" },
|
||||
|
@ -194,7 +194,8 @@ public:
|
||||
virtual string get_type() { return string(); }
|
||||
|
||||
virtual int get(RGWRados *store, string& entry, RGWMetadataObject **obj) { return -ENOTSUP; }
|
||||
virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) { return -ENOTSUP; }
|
||||
virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
|
||||
time_t mtime, JSONObj *obj, sync_type_t sync_type) { return -ENOTSUP; }
|
||||
|
||||
virtual void get_pool_and_oid(RGWRados *store, const string& key, rgw_bucket& bucket, string& oid) {}
|
||||
|
||||
@ -357,7 +358,10 @@ int RGWMetadataManager::put(string& metadata_key, bufferlist& bl)
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
return handler->put(store, entry, objv_tracker, mtime, jo);
|
||||
RGWMetadataHandler::sync_type_t sync_type;
|
||||
sync_type = RGWMetadataHandler::APPLY_ALWAYS;
|
||||
|
||||
return handler->put(store, entry, objv_tracker, mtime, jo, sync_type);
|
||||
}
|
||||
|
||||
int RGWMetadataManager::remove(string& metadata_key)
|
||||
|
@ -44,14 +44,18 @@ class RGWMetadataManager;
|
||||
class RGWMetadataHandler {
|
||||
friend class RGWMetadataManager;
|
||||
|
||||
protected:
|
||||
virtual void get_pool_and_oid(RGWRados *store, const string& key, rgw_bucket& bucket, string& oid) = 0;
|
||||
public:
|
||||
enum sync_type_t {
|
||||
APPLY_ALWAYS,
|
||||
APPLY_UPDATES,
|
||||
APPLY_NEWER
|
||||
};
|
||||
virtual ~RGWMetadataHandler() {}
|
||||
virtual string get_type() = 0;
|
||||
|
||||
virtual int get(RGWRados *store, string& entry, RGWMetadataObject **obj) = 0;
|
||||
virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) = 0;
|
||||
virtual int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
|
||||
time_t mtime, JSONObj *obj, sync_type_t type) = 0;
|
||||
virtual int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) = 0;
|
||||
|
||||
virtual int list_keys_init(RGWRados *store, void **phandle) = 0;
|
||||
@ -62,6 +66,33 @@ public:
|
||||
virtual void get_hash_key(const string& section, const string& key, string& hash_key) {
|
||||
hash_key = section + ":" + key;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void get_pool_and_oid(RGWRados *store, const string& key, rgw_bucket& bucket, string& oid) = 0;
|
||||
/**
|
||||
* Compare an incoming versus on-disk tag/version+mtime combo against
|
||||
* the sync mode to see if the new one should replace the on-disk one.
|
||||
*
|
||||
* @return true if the update should proceed, false otherwise.
|
||||
*/
|
||||
bool check_versions(const obj_version& ondisk, const time_t& ondisk_time,
|
||||
const obj_version& incoming, const time_t& incoming_time,
|
||||
sync_type_t sync_mode) {
|
||||
switch (sync_mode) {
|
||||
case APPLY_UPDATES:
|
||||
if ((ondisk.tag != incoming.tag) ||
|
||||
(ondisk.ver >= incoming.ver))
|
||||
return false;
|
||||
break;
|
||||
case APPLY_NEWER:
|
||||
if (ondisk_time >= incoming_time)
|
||||
return false;
|
||||
break;
|
||||
case APPLY_ALWAYS: //deliberate fall-thru -- we always apply!
|
||||
default: break;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
#define META_LOG_OBJ_PREFIX "meta.log."
|
||||
|
@ -2295,7 +2295,8 @@ public:
|
||||
return 0;
|
||||
}
|
||||
|
||||
int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, time_t mtime, JSONObj *obj) {
|
||||
int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
|
||||
time_t mtime, JSONObj *obj, sync_type_t sync_mode) {
|
||||
RGWUserInfo info;
|
||||
|
||||
decode_json_obj(info, obj);
|
||||
@ -2306,6 +2307,11 @@ public:
|
||||
if (ret < 0 && ret != -ENOENT)
|
||||
return ret;
|
||||
|
||||
// are we actually going to perform this put, or is it too old?
|
||||
if (!check_versions(objv_tracker.read_version, orig_mtime,
|
||||
objv_tracker.write_version, mtime, sync_mode)) {
|
||||
return STATUS_NO_APPLY;
|
||||
}
|
||||
|
||||
ret = rgw_store_user_info(store, info, &old_info, &objv_tracker, mtime, false);
|
||||
if (ret < 0)
|
||||
|
Loading…
Reference in New Issue
Block a user