mirror of
https://github.com/ceph/ceph
synced 2025-01-19 17:41:39 +00:00
rgw/dbstore: Use Object ID to handle racing writes
Create unique ID for each object upload which will be atomically updated in the head object at the end. This will prevent data corruption during concurrent writes. Incase of Multipart Uploads, upload_id is used as ObjectID. XXX: The stale or obsolete tail data needs to be deleted Also addressed invalid usage of CephContext in dbstore tests. Signed-off-by: Soumya Koduri <skoduri@redhat.com>
This commit is contained in:
parent
bad21fa497
commit
38052496c8
@ -922,6 +922,7 @@ namespace rgw::sal {
|
||||
obj->get_obj());
|
||||
DB::Object::Write obj_op(&op_target);
|
||||
|
||||
/* Create meta object */
|
||||
obj_op.meta.owner = owner.get_id();
|
||||
obj_op.meta.category = RGWObjCategory::MultiMeta;
|
||||
obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;
|
||||
@ -1098,17 +1099,9 @@ namespace rgw::sal {
|
||||
/* Original head object */
|
||||
DB::Object op_target(store->getDB(),
|
||||
target_obj->get_bucket()->get_info(),
|
||||
target_obj->get_obj());
|
||||
target_obj->get_obj(), get_upload_id());
|
||||
DB::Object::Write obj_op(&op_target);
|
||||
obj_op.prepare(NULL);
|
||||
|
||||
/* Meta object */
|
||||
std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
|
||||
DB::Object meta_op_target(store->getDB(),
|
||||
meta_obj->get_bucket()->get_info(),
|
||||
meta_obj->get_obj());
|
||||
DB::Object::Write mp_op(&meta_op_target);
|
||||
mp_op.update_mp_parts(dpp, target_obj->get_obj().key);
|
||||
ret = obj_op.prepare(dpp);
|
||||
|
||||
obj_op.meta.owner = owner.get_id();
|
||||
obj_op.meta.flags = PUT_OBJ_CREATE;
|
||||
@ -1226,12 +1219,13 @@ namespace rgw::sal {
|
||||
oid(head_obj->get_name() + "." + upload_id +
|
||||
"." + std::to_string(part_num)),
|
||||
meta_obj(((DBMultipartUpload*)upload)->get_meta_obj()),
|
||||
op_target(_store->getDB(), meta_obj->get_bucket()->get_info(), meta_obj->get_obj()),
|
||||
op_target(_store->getDB(), head_obj->get_bucket()->get_info(), head_obj->get_obj(), upload_id),
|
||||
parent_op(&op_target), part_num(_part_num),
|
||||
part_num_str(_part_num_str) { parent_op.prepare(NULL);}
|
||||
part_num_str(_part_num_str) {}
|
||||
|
||||
int DBMultipartWriter::prepare(optional_yield y)
|
||||
{
|
||||
parent_op.prepare(NULL);
|
||||
parent_op.set_mp_part_str(upload_id + "." + std::to_string(part_num));
|
||||
// XXX: do we need to handle part_num_str??
|
||||
return 0;
|
||||
|
@ -366,7 +366,7 @@ int DB::remove_user(const DoutPrefixProvider *dpp,
|
||||
RGWUserInfo& uinfo, RGWObjVersionTracker *pobjv)
|
||||
{
|
||||
DBOpParams params = {};
|
||||
InitializeParams(dpp, "CreateUser", ¶ms);
|
||||
InitializeParams(dpp, "RemoveUser", ¶ms);
|
||||
int ret = 0;
|
||||
|
||||
RGWUserInfo orig_info;
|
||||
@ -750,6 +750,7 @@ int DB::raw_obj::InitializeParamsfromRawObj(const DoutPrefixProvider *dpp,
|
||||
params->op.obj.state.obj.key.name = obj_name;
|
||||
params->op.obj.state.obj.key.instance = obj_instance;
|
||||
params->op.obj.state.obj.key.ns = obj_ns;
|
||||
params->op.obj.obj_id = obj_id;
|
||||
|
||||
if (multipart_part_str != "0.0") {
|
||||
params->op.obj.is_multipart = true;
|
||||
@ -775,6 +776,7 @@ int DB::Object::InitializeParamsfromObject(const DoutPrefixProvider *dpp,
|
||||
params->objectdata_table = store->getObjectDataTable(bucket);
|
||||
params->op.bucket.info.bucket.name = bucket;
|
||||
params->op.obj.state.obj = obj;
|
||||
params->op.obj.obj_id = obj_id;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -1083,6 +1085,13 @@ int DB::raw_obj::read(const DoutPrefixProvider *dpp, int64_t ofs,
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Verify if its valid obj */
|
||||
if (!params.op.obj_data.size) {
|
||||
ret = -ENOENT;
|
||||
ldpp_dout(dpp, 0)<<"In GetObjectData failed err:(" <<ret<<")" << dendl;
|
||||
return ret;
|
||||
}
|
||||
|
||||
bufferlist& read_bl = params.op.obj_data.data;
|
||||
|
||||
unsigned copy_len;
|
||||
@ -1189,6 +1198,9 @@ int DB::Object::get_obj_state(const DoutPrefixProvider *dpp,
|
||||
}
|
||||
|
||||
s = ¶ms.op.obj.state;
|
||||
/* XXX: For now use state->shadow_obj to store ObjectID string */
|
||||
s->shadow_obj = params.op.obj.obj_id;
|
||||
|
||||
**state = *s;
|
||||
|
||||
if (follow_olh && params.op.obj.state.obj.key.instance.empty()) {
|
||||
@ -1250,6 +1262,8 @@ int DB::Object::Read::prepare(const DoutPrefixProvider *dpp)
|
||||
|
||||
RGWObjState base_state;
|
||||
RGWObjState *astate = &base_state;
|
||||
|
||||
/* XXX Read obj_id too */
|
||||
int r = source->get_state(dpp, &astate, true);
|
||||
if (r < 0)
|
||||
return r;
|
||||
@ -1259,6 +1273,7 @@ int DB::Object::Read::prepare(const DoutPrefixProvider *dpp)
|
||||
}
|
||||
|
||||
state.obj = astate->obj;
|
||||
source->obj_id = astate->shadow_obj;
|
||||
|
||||
if (params.target_obj) {
|
||||
*params.target_obj = state.obj;
|
||||
@ -1382,7 +1397,7 @@ int DB::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl, const DoutP
|
||||
int part_num = (ofs / max_chunk_size);
|
||||
/* XXX: Handle multipart_str */
|
||||
raw_obj read_obj(store, source->get_bucket_info().bucket.name, astate->obj.key.name,
|
||||
astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num);
|
||||
astate->obj.key.instance, astate->obj.key.ns, source->obj_id, "0.0", part_num);
|
||||
|
||||
read_len = len;
|
||||
|
||||
@ -1425,7 +1440,7 @@ int DB::get_obj_iterate_cb(const DoutPrefixProvider *dpp,
|
||||
/* read entire data. So pass offset as '0' & len as '-1' */
|
||||
r = robj.read(dpp, 0, -1, bl);
|
||||
|
||||
if (r < 0) {
|
||||
if (r <= 0) {
|
||||
return r;
|
||||
}
|
||||
}
|
||||
@ -1498,7 +1513,7 @@ int DB::Object::iterate_obj(const DoutPrefixProvider *dpp,
|
||||
|
||||
/* XXX: Handle multipart_str */
|
||||
raw_obj read_obj(store, get_bucket_info().bucket.name, astate->obj.key.name,
|
||||
astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num);
|
||||
astate->obj.key.instance, astate->obj.key.ns, obj_id, "0.0", part_num);
|
||||
bool reading_from_head = (ofs < head_data_size);
|
||||
|
||||
r = cb(dpp, read_obj, ofs, read_len, reading_from_head, astate, arg);
|
||||
@ -1517,38 +1532,21 @@ int DB::Object::Write::prepare(const DoutPrefixProvider* dpp)
|
||||
{
|
||||
DB *store = target->get_store();
|
||||
|
||||
DBOpParams params = {};
|
||||
int ret = -1;
|
||||
|
||||
/* XXX: handle assume_noent */
|
||||
store->InitializeParams(dpp, "GetObject", ¶ms);
|
||||
target->InitializeParamsfromObject(dpp, ¶ms);
|
||||
|
||||
ret = store->ProcessOp(dpp, "GetObject", ¶ms);
|
||||
|
||||
if (ret) {
|
||||
ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
|
||||
goto out;
|
||||
obj_state.obj = target->obj;
|
||||
|
||||
if (target->obj_id.empty()) {
|
||||
// generate obj_id
|
||||
char buf[33];
|
||||
gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
|
||||
target->obj_id = target->obj.key.name + "." + buf;
|
||||
}
|
||||
|
||||
/* pick one field check if object exists */
|
||||
if (params.op.obj.state.exists) {
|
||||
ldpp_dout(dpp, 0)<<"Object(bucket:" << target->bucket_info.bucket.name << ", Object:"<< target->obj.key.name << ") exists" << dendl;
|
||||
|
||||
} else { /* create object entry in the object table */
|
||||
params.op.obj.storage_class = "STANDARD"; /* XXX: handle storage class */
|
||||
ret = store->ProcessOp(dpp, "PutObject", ¶ms);
|
||||
|
||||
if (ret) {
|
||||
ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
obj_state = params.op.obj.state;
|
||||
|
||||
ret = 0;
|
||||
|
||||
out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1581,7 +1579,7 @@ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp,
|
||||
|
||||
/* XXX: Handle multipart_str */
|
||||
raw_obj write_obj(store, target->get_bucket_info().bucket.name, obj_state.obj.key.name,
|
||||
obj_state.obj.key.instance, obj_state.obj.key.ns, mp_part_str, part_num);
|
||||
obj_state.obj.key.instance, obj_state.obj.key.ns, target->obj_id, mp_part_str, part_num);
|
||||
|
||||
|
||||
ldpp_dout(dpp, 20) << "dbstore->write obj-ofs=" << ofs << " write_len=" << len << dendl;
|
||||
@ -1712,10 +1710,10 @@ int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
|
||||
|
||||
/* XXX: handle multipart */
|
||||
params.op.query_str = "meta";
|
||||
ret = store->ProcessOp(dpp, "UpdateObject", ¶ms);
|
||||
ret = store->ProcessOp(dpp, "PutObject", ¶ms);
|
||||
|
||||
if (ret) {
|
||||
ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <<ret<<")" << dendl;
|
||||
ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
|
||||
goto out;
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ struct DBOpObjectInfo {
|
||||
uint64_t head_size{0};
|
||||
rgw_placement_rule head_placement_rule;
|
||||
uint64_t max_head_size{0};
|
||||
std::string prefix;
|
||||
std::string obj_id;
|
||||
rgw_bucket_placement tail_placement; /* might be different than the original bucket,
|
||||
as object might have been copied across pools */
|
||||
std::map<uint64_t, RGWObjManifestRule> rules;
|
||||
@ -267,7 +267,7 @@ struct DBOpObjectPrepareInfo {
|
||||
std::string obj_attrs = ":obj_attrs";
|
||||
std::string head_size = ":head_size";
|
||||
std::string max_head_size = ":max_head_size";
|
||||
std::string prefix = ":prefix";
|
||||
std::string obj_id = ":obj_id";
|
||||
std::string tail_instance = ":tail_instance";
|
||||
std::string head_placement_rule_name = ":head_placement_rule_name";
|
||||
std::string head_placement_storage_class = ":head_placement_storage_class";
|
||||
@ -546,12 +546,12 @@ class DBOp {
|
||||
ObjAttrs BLOB, \
|
||||
HeadSize INTEGER, \
|
||||
MaxHeadSize INTEGER, \
|
||||
Prefix std::string, \
|
||||
TailInstance std::string, \
|
||||
HeadPlacementRuleName std::string, \
|
||||
HeadPlacementRuleStorageClass String, \
|
||||
TailPlacementRuleName std::string, \
|
||||
TailPlacementStorageClass String, \
|
||||
ObjID TEXT, \
|
||||
TailInstance TEXT, \
|
||||
HeadPlacementRuleName TEXT, \
|
||||
HeadPlacementRuleStorageClass TEXT, \
|
||||
TailPlacementRuleName TEXT, \
|
||||
TailPlacementStorageClass TEXT, \
|
||||
ManifestPartObjs BLOB, \
|
||||
ManifestPartRules BLOB, \
|
||||
Omap BLOB, \
|
||||
@ -577,14 +577,15 @@ class DBOp {
|
||||
ObjInstance TEXT, \
|
||||
ObjNS TEXT, \
|
||||
BucketName TEXT NOT NULL , \
|
||||
ObjID String, \
|
||||
MultipartPartStr TEXT, \
|
||||
PartNum INTEGER NOT NULL, \
|
||||
Offset INTEGER, \
|
||||
Size INTEGER, \
|
||||
Data BLOB, \
|
||||
PRIMARY KEY (ObjName, BucketName, ObjInstance, MultipartPartStr, PartNum), \
|
||||
FOREIGN KEY (BucketName, ObjName, ObjInstance) \
|
||||
REFERENCES '{}' (BucketName, ObjName, ObjInstance) ON DELETE CASCADE ON UPDATE CASCADE \n);";
|
||||
PRIMARY KEY (ObjName, BucketName, ObjInstance, ObjID, MultipartPartStr, PartNum), \
|
||||
FOREIGN KEY (BucketName) \
|
||||
REFERENCES '{}' (BucketName) ON DELETE CASCADE ON UPDATE CASCADE \n);";
|
||||
|
||||
const std::string CreateQuotaTableQ =
|
||||
"CREATE TABLE IF NOT EXISTS '{}' ( \
|
||||
@ -636,7 +637,7 @@ class DBOp {
|
||||
if (!type.compare("ObjectData"))
|
||||
return fmt::format(CreateObjectDataTableQ.c_str(),
|
||||
params->objectdata_table.c_str(),
|
||||
params->object_table.c_str());
|
||||
params->bucket_table.c_str());
|
||||
if (!type.compare("Quota"))
|
||||
return fmt::format(CreateQuotaTableQ.c_str(),
|
||||
params->quota_table.c_str());
|
||||
@ -944,7 +945,7 @@ class PutObjectOp: virtual public DBOp {
|
||||
AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
|
||||
ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
|
||||
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
|
||||
Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
|
||||
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
|
||||
TailPlacementRuleName, TailPlacementStorageClass, \
|
||||
ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData ) \
|
||||
VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, \
|
||||
@ -971,7 +972,7 @@ class PutObjectOp: virtual public DBOp {
|
||||
params.op.obj.pg_ver, params.op.obj.zone_short_id,
|
||||
params.op.obj.obj_version, params.op.obj.obj_version_tag,
|
||||
params.op.obj.obj_attrs, params.op.obj.head_size,
|
||||
params.op.obj.max_head_size, params.op.obj.prefix,
|
||||
params.op.obj.max_head_size, params.op.obj.obj_id,
|
||||
params.op.obj.tail_instance,
|
||||
params.op.obj.head_placement_rule_name,
|
||||
params.op.obj.head_placement_storage_class,
|
||||
@ -1009,7 +1010,7 @@ class GetObjectOp: virtual public DBOp {
|
||||
AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
|
||||
ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
|
||||
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
|
||||
Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
|
||||
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
|
||||
TailPlacementRuleName, TailPlacementStorageClass, \
|
||||
ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
|
||||
where BucketName = {} and ObjName = {} and ObjInstance = {}";
|
||||
@ -1038,7 +1039,7 @@ class ListBucketObjectsOp: virtual public DBOp {
|
||||
AccountedSize, Mtime, Epoch, ObjTag, TailTag, WriteTag, FakeTag, \
|
||||
ShadowObj, HasData, IsOLH, OLHTag, PGVer, ZoneShortID, \
|
||||
ObjVersion, ObjVersionTag, ObjAttrs, HeadSize, MaxHeadSize, \
|
||||
Prefix, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
|
||||
ObjID, TailInstance, HeadPlacementRuleName, HeadPlacementRuleStorageClass, \
|
||||
TailPlacementRuleName, TailPlacementStorageClass, \
|
||||
ManifestPartObjs, ManifestPartRules, Omap, IsMultipart, MPPartsList, HeadData from '{}' \
|
||||
where BucketName = {} and ObjName > {} ORDER BY ObjName ASC LIMIT {}";
|
||||
@ -1046,7 +1047,7 @@ class ListBucketObjectsOp: virtual public DBOp {
|
||||
virtual ~ListBucketObjectsOp() {}
|
||||
|
||||
std::string Schema(DBOpPrepareParams ¶ms) {
|
||||
/* XXX: Include prefix, delim */
|
||||
/* XXX: Include obj_id, delim */
|
||||
return fmt::format(Query.c_str(),
|
||||
params.object_table.c_str(),
|
||||
params.op.bucket.bucket_name.c_str(),
|
||||
@ -1076,7 +1077,7 @@ class UpdateObjectOp: virtual public DBOp {
|
||||
Epoch = {}, ObjTag = {}, TailTag = {}, WriteTag = {}, FakeTag = {}, \
|
||||
ShadowObj = {}, HasData = {}, IsOLH = {}, OLHTag = {}, PGVer = {}, \
|
||||
ZoneShortID = {}, ObjVersion = {}, ObjVersionTag = {}, ObjAttrs = {}, \
|
||||
HeadSize = {}, MaxHeadSize = {}, Prefix = {}, TailInstance = {}, \
|
||||
HeadSize = {}, MaxHeadSize = {}, ObjID = {}, TailInstance = {}, \
|
||||
HeadPlacementRuleName = {}, HeadPlacementRuleStorageClass = {}, \
|
||||
TailPlacementRuleName = {}, TailPlacementStorageClass = {}, \
|
||||
ManifestPartObjs = {}, ManifestPartRules = {}, Omap = {}, \
|
||||
@ -1127,7 +1128,7 @@ class UpdateObjectOp: virtual public DBOp {
|
||||
params.op.obj.pg_ver, params.op.obj.zone_short_id,
|
||||
params.op.obj.obj_version, params.op.obj.obj_version_tag,
|
||||
params.op.obj.obj_attrs, params.op.obj.head_size,
|
||||
params.op.obj.max_head_size, params.op.obj.prefix,
|
||||
params.op.obj.max_head_size, params.op.obj.obj_id,
|
||||
params.op.obj.tail_instance,
|
||||
params.op.obj.head_placement_rule_name,
|
||||
params.op.obj.head_placement_storage_class,
|
||||
@ -1147,8 +1148,8 @@ class PutObjectDataOp: virtual public DBOp {
|
||||
private:
|
||||
const std::string Query =
|
||||
"INSERT OR REPLACE INTO '{}' \
|
||||
(ObjName, ObjInstance, ObjNS, BucketName, MultipartPartStr, PartNum, Offset, Size, Data) \
|
||||
VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {})";
|
||||
(ObjName, ObjInstance, ObjNS, BucketName, ObjID, MultipartPartStr, PartNum, Offset, Size, Data) \
|
||||
VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {})";
|
||||
|
||||
public:
|
||||
virtual ~PutObjectDataOp() {}
|
||||
@ -1159,6 +1160,7 @@ class PutObjectDataOp: virtual public DBOp {
|
||||
params.op.obj.obj_name, params.op.obj.obj_instance,
|
||||
params.op.obj.obj_ns,
|
||||
params.op.bucket.bucket_name.c_str(),
|
||||
params.op.obj.obj_id,
|
||||
params.op.obj_data.multipart_part_str.c_str(),
|
||||
params.op.obj_data.part_num,
|
||||
params.op.obj_data.offset.c_str(),
|
||||
@ -1167,6 +1169,7 @@ class PutObjectDataOp: virtual public DBOp {
|
||||
}
|
||||
};
|
||||
|
||||
/* XXX: Recheck if this is really needed */
|
||||
class UpdateObjectDataOp: virtual public DBOp {
|
||||
private:
|
||||
const std::string Query =
|
||||
@ -1192,8 +1195,8 @@ class GetObjectDataOp: virtual public DBOp {
|
||||
private:
|
||||
const std::string Query =
|
||||
"SELECT \
|
||||
ObjName, ObjInstance, ObjNS, BucketName, MultipartPartStr, PartNum, Offset, Size, Data \
|
||||
from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {} ORDER BY MultipartPartStr, PartNum";
|
||||
ObjName, ObjInstance, ObjNS, BucketName, ObjID, MultipartPartStr, PartNum, Offset, Size, Data \
|
||||
from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {} and ObjID = {} ORDER BY MultipartPartStr, PartNum";
|
||||
|
||||
public:
|
||||
virtual ~GetObjectDataOp() {}
|
||||
@ -1203,14 +1206,15 @@ class GetObjectDataOp: virtual public DBOp {
|
||||
params.objectdata_table.c_str(),
|
||||
params.op.bucket.bucket_name.c_str(),
|
||||
params.op.obj.obj_name.c_str(),
|
||||
params.op.obj.obj_instance.c_str());
|
||||
params.op.obj.obj_instance.c_str(),
|
||||
params.op.obj.obj_id.c_str());
|
||||
}
|
||||
};
|
||||
|
||||
class DeleteObjectDataOp: virtual public DBOp {
|
||||
private:
|
||||
const std::string Query =
|
||||
"DELETE from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {}";
|
||||
"DELETE from '{}' where BucketName = {} and ObjName = {} and ObjInstance = {} and ObjID = {}";
|
||||
|
||||
public:
|
||||
virtual ~DeleteObjectDataOp() {}
|
||||
@ -1220,7 +1224,8 @@ class DeleteObjectDataOp: virtual public DBOp {
|
||||
params.objectdata_table.c_str(),
|
||||
params.op.bucket.bucket_name.c_str(),
|
||||
params.op.obj.obj_name.c_str(),
|
||||
params.op.obj.obj_instance.c_str());
|
||||
params.op.obj.obj_instance.c_str(),
|
||||
params.op.obj.obj_id.c_str());
|
||||
}
|
||||
};
|
||||
|
||||
@ -1512,12 +1517,12 @@ class DB {
|
||||
// "<bucketname>_<objname>_<objinstance>_<multipart-part-str>_<partnum>"
|
||||
const std::string raw_obj_oid = "{0}_{1}_{2}_{3}_{4}";
|
||||
|
||||
inline std::string to_oid(const std::string& bucket, const std::string& obj_name, const std::string& obj_instance,
|
||||
inline std::string to_oid(const std::string& bucket, const std::string& obj_name, const std::string& obj_instance, const std::string& obj_id,
|
||||
std::string mp_str, uint64_t partnum) {
|
||||
std::string s = fmt::format(raw_obj_oid.c_str(), bucket, obj_name, obj_instance, mp_str, partnum);
|
||||
std::string s = fmt::format(raw_obj_oid.c_str(), bucket, obj_name, obj_instance, obj_id, mp_str, partnum);
|
||||
return s;
|
||||
}
|
||||
inline int from_oid(const std::string& oid, std::string& bucket, std::string& obj_name,
|
||||
inline int from_oid(const std::string& oid, std::string& bucket, std::string& obj_name, std::string& obj_id,
|
||||
std::string& obj_instance,
|
||||
std::string& mp_str, uint64_t& partnum) {
|
||||
std::vector<std::string> result;
|
||||
@ -1525,8 +1530,9 @@ class DB {
|
||||
bucket = result[0];
|
||||
obj_name = result[1];
|
||||
obj_instance = result[2];
|
||||
mp_str = result[3];
|
||||
partnum = stoi(result[4]);
|
||||
obj_id = result[3];
|
||||
mp_str = result[4];
|
||||
partnum = stoi(result[5]);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -1538,6 +1544,7 @@ class DB {
|
||||
std::string obj_name;
|
||||
std::string obj_instance;
|
||||
std::string obj_ns;
|
||||
std::string obj_id;
|
||||
std::string multipart_part_str;
|
||||
uint64_t part_num;
|
||||
|
||||
@ -1549,12 +1556,13 @@ class DB {
|
||||
}
|
||||
|
||||
raw_obj(DB* _db, std::string& _bname, std::string& _obj_name, std::string& _obj_instance,
|
||||
std::string& _obj_ns, std::string _mp_part_str, int _part_num) {
|
||||
std::string& _obj_ns, std::string& _obj_id, std::string _mp_part_str, int _part_num) {
|
||||
db = _db;
|
||||
bucket_name = _bname;
|
||||
obj_name = _obj_name;
|
||||
obj_instance = _obj_instance;
|
||||
obj_ns = _obj_ns;
|
||||
obj_id = _obj_id;
|
||||
multipart_part_str = _mp_part_str;
|
||||
part_num = _part_num;
|
||||
|
||||
@ -1566,7 +1574,7 @@ class DB {
|
||||
int r;
|
||||
|
||||
db = _db;
|
||||
r = db->from_oid(oid, bucket_name, obj_name, obj_instance, multipart_part_str,
|
||||
r = db->from_oid(oid, bucket_name, obj_name, obj_instance, obj_id, multipart_part_str,
|
||||
part_num);
|
||||
if (r < 0) {
|
||||
multipart_part_str = "0.0";
|
||||
@ -1647,6 +1655,7 @@ class DB {
|
||||
rgw_obj obj;
|
||||
|
||||
RGWObjState *state;
|
||||
std::string obj_id;
|
||||
|
||||
bool versioning_disabled;
|
||||
|
||||
@ -1660,6 +1669,8 @@ class DB {
|
||||
|
||||
Object(DB *_store, const RGWBucketInfo& _bucket_info, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info), obj(_obj) {}
|
||||
|
||||
Object(DB *_store, const RGWBucketInfo& _bucket_info, const rgw_obj& _obj, const std::string& _obj_id) : store(_store), bucket_info(_bucket_info), obj(_obj), obj_id(_obj_id) {}
|
||||
|
||||
struct Read {
|
||||
DB::Object *source;
|
||||
|
||||
|
@ -146,7 +146,10 @@ int main(int argc, char *argv[])
|
||||
cout << "loglevel set to " << loglevel << "\n";
|
||||
}
|
||||
|
||||
dbsm = new DBStoreManager(logfile, loglevel);
|
||||
vector<const char*> args;
|
||||
auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
|
||||
CODE_ENVIRONMENT_DAEMON, CINIT_FLAG_NO_MON_CONFIG, 1);
|
||||
dbsm = new DBStoreManager(cct.get(), logfile, loglevel);
|
||||
dbs = dbsm->getDB(tenant, true);
|
||||
|
||||
cout<<"No. of threads being created = "<<num_thr<<"\n";
|
||||
|
@ -32,11 +32,9 @@ public:
|
||||
cct = _cct;
|
||||
default_db = createDB(default_tenant);
|
||||
};
|
||||
DBStoreManager(std::string logfile, int loglevel): DBStoreHandles() {
|
||||
DBStoreManager(CephContext *_cct, std::string logfile, int loglevel): DBStoreHandles() {
|
||||
/* No ceph context. Create one with log args provided */
|
||||
std::vector<const char*> args;
|
||||
cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
|
||||
CODE_ENVIRONMENT_DAEMON, CINIT_FLAG_NO_MON_CONFIG, 1)->get();
|
||||
cct = _cct;
|
||||
cct->_log->set_log_file(logfile);
|
||||
cct->_log->reopen_log_file();
|
||||
cct->_conf->subsys.set_log_level(ceph_subsys_rgw, loglevel);
|
||||
|
@ -275,6 +275,7 @@ enum GetObjectData {
|
||||
ObjDataInstance,
|
||||
ObjDataNS,
|
||||
ObjDataBucketName,
|
||||
ObjDataID,
|
||||
MultipartPartStr,
|
||||
PartNum,
|
||||
Offset,
|
||||
@ -442,7 +443,7 @@ static int list_object(const DoutPrefixProvider *dpp, DBOpInfo &op, sqlite3_stmt
|
||||
SQL_DECODE_BLOB_PARAM(dpp, stmt, ObjAttrs, op.obj.state.attrset, sdb);
|
||||
op.obj.head_size = sqlite3_column_int(stmt, HeadSize);
|
||||
op.obj.max_head_size = sqlite3_column_int(stmt, MaxHeadSize);
|
||||
op.obj.prefix = (const char*)sqlite3_column_text(stmt, Prefix);
|
||||
op.obj.obj_id = (const char*)sqlite3_column_text(stmt, Prefix);
|
||||
op.obj.tail_instance = (const char*)sqlite3_column_text(stmt, TailInstance);
|
||||
op.obj.head_placement_rule.name = (const char*)sqlite3_column_text(stmt, HeadPlacementRuleName);
|
||||
op.obj.head_placement_rule.storage_class = (const char*)sqlite3_column_text(stmt, HeadPlacementRuleStorageClass);
|
||||
@ -487,6 +488,7 @@ static int get_objectdata(const DoutPrefixProvider *dpp, DBOpInfo &op, sqlite3_s
|
||||
op.bucket.info.bucket.name = (const char*)sqlite3_column_text(stmt, ObjBucketName);
|
||||
op.obj.state.obj.key.instance = (const char*)sqlite3_column_text(stmt, ObjInstance);
|
||||
op.obj.state.obj.key.ns = (const char*)sqlite3_column_text(stmt, ObjNS);
|
||||
op.obj.obj_id = (const char*)sqlite3_column_text(stmt, ObjDataID);
|
||||
op.obj_data.part_num = sqlite3_column_int(stmt, PartNum);
|
||||
op.obj_data.offset = sqlite3_column_int(stmt, Offset);
|
||||
op.obj_data.size = sqlite3_column_int(stmt, ObjDataSize);
|
||||
@ -1816,8 +1818,8 @@ int SQLPutObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *params)
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.max_head_size.c_str(), sdb);
|
||||
SQL_BIND_INT(dpp, stmt, index, params->op.obj.max_head_size, sdb);
|
||||
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.prefix.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.prefix.c_str(), sdb);
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_id.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.obj_id.c_str(), sdb);
|
||||
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.tail_instance.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.tail_instance.c_str(), sdb);
|
||||
@ -2149,8 +2151,8 @@ int SQLUpdateObject::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *para
|
||||
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.max_head_size.c_str(), sdb);
|
||||
SQL_BIND_INT(dpp, *stmt, index, params->op.obj.max_head_size, sdb);
|
||||
|
||||
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.prefix.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, *stmt, index, params->op.obj.prefix.c_str(), sdb);
|
||||
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.obj_id.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, *stmt, index, params->op.obj.obj_id.c_str(), sdb);
|
||||
|
||||
SQL_BIND_INDEX(dpp, *stmt, index, p_params.op.obj.tail_instance.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, *stmt, index, params->op.obj.tail_instance.c_str(), sdb);
|
||||
@ -2291,7 +2293,6 @@ int SQLPutObjectData::Prepare(const DoutPrefixProvider *dpp, struct DBOpParams *
|
||||
if (p_params.objectdata_table.empty()) {
|
||||
p_params.objectdata_table = getObjectDataTable(bucket_name);
|
||||
}
|
||||
params->bucket_table = p_params.bucket_table;
|
||||
params->object_table = p_params.object_table;
|
||||
params->objectdata_table = p_params.objectdata_table;
|
||||
(void)createObjectDataTable(dpp, params);
|
||||
@ -2324,6 +2325,9 @@ int SQLPutObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *par
|
||||
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.bucket.info.bucket.name.c_str(), sdb);
|
||||
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_id.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.obj_id.c_str(), sdb);
|
||||
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj_data.part_num.c_str(), sdb);
|
||||
|
||||
SQL_BIND_INT(dpp, stmt, index, params->op.obj_data.part_num, sdb);
|
||||
@ -2475,6 +2479,9 @@ int SQLGetObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *par
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_instance.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.instance.c_str(), sdb);
|
||||
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_id.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.obj_id.c_str(), sdb);
|
||||
|
||||
out:
|
||||
return rc;
|
||||
}
|
||||
@ -2530,6 +2537,10 @@ int SQLDeleteObjectData::Bind(const DoutPrefixProvider *dpp, struct DBOpParams *
|
||||
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_instance.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.state.obj.key.instance.c_str(), sdb);
|
||||
|
||||
SQL_BIND_INDEX(dpp, stmt, index, p_params.op.obj.obj_id.c_str(), sdb);
|
||||
SQL_BIND_TEXT(dpp, stmt, index, params->op.obj.obj_id.c_str(), sdb);
|
||||
|
||||
out:
|
||||
return rc;
|
||||
}
|
||||
|
@ -29,9 +29,9 @@ namespace gtest {
|
||||
|
||||
void SetUp() override {
|
||||
cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
|
||||
CODE_ENVIRONMENT_DAEMON, CINIT_FLAG_NO_MON_CONFIG, 1)->get();
|
||||
CODE_ENVIRONMENT_DAEMON, CINIT_FLAG_NO_MON_CONFIG, 1);
|
||||
if (!db_type.compare("SQLite")) {
|
||||
db = new SQLiteDB(tenant, cct);
|
||||
db = new SQLiteDB(tenant, cct.get());
|
||||
ASSERT_TRUE(db != nullptr);
|
||||
ret = db->Initialize(logfile, loglevel);
|
||||
ASSERT_GE(ret, 0);
|
||||
@ -51,7 +51,7 @@ namespace gtest {
|
||||
int ret;
|
||||
string logfile = "rgw_dbstore_tests.log";
|
||||
int loglevel = 30;
|
||||
CephContext *cct;
|
||||
boost::intrusive_ptr<CephContext> cct;
|
||||
};
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user