rgw/dbstore: Object APIs

Support for simple Put, Get, Delete, List Ops of Regular Objects on dbstore.

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
This commit is contained in:
Soumya Koduri 2021-09-01 17:40:59 +05:30
parent 5907703adb
commit e05daafdb3
11 changed files with 3711 additions and 309 deletions

View File

@ -539,3 +539,47 @@ public:
};
};
WRITE_CLASS_ENCODER(RGWObjManifest)
struct RGWObjState {
rgw_obj obj;
bool is_atomic{false};
bool has_attrs{false};
bool exists{false};
uint64_t size{0}; //< size of raw object
uint64_t accounted_size{0}; //< size before compression, encryption
ceph::real_time mtime;
uint64_t epoch{0};
bufferlist obj_tag;
bufferlist tail_tag;
std::string write_tag;
bool fake_tag{false};
std::optional<RGWObjManifest> manifest;
std::string shadow_obj;
bool has_data{false};
bufferlist data;
bool prefetch_data{false};
bool keep_tail{false};
bool is_olh{false};
bufferlist olh_tag;
uint64_t pg_ver{false};
uint32_t zone_short_id{0};
/* important! don't forget to update copy constructor */
RGWObjVersionTracker objv_tracker;
std::map<std::string, bufferlist> attrset;
RGWObjState();
RGWObjState(const RGWObjState& rhs);
~RGWObjState();
bool get_attr(std::string name, bufferlist& dest) {
std::map<std::string, bufferlist>::iterator iter = attrset.find(name);
if (iter != attrset.end()) {
dest = iter->second;
return true;
}
return false;
}
};

View File

@ -155,51 +155,6 @@ struct RGWCloneRangeInfo {
uint64_t len;
};
struct RGWObjState {
rgw_obj obj;
bool is_atomic{false};
bool has_attrs{false};
bool exists{false};
uint64_t size{0}; //< size of raw object
uint64_t accounted_size{0}; //< size before compression, encryption
ceph::real_time mtime;
uint64_t epoch{0};
bufferlist obj_tag;
bufferlist tail_tag;
std::string write_tag;
bool fake_tag{false};
std::optional<RGWObjManifest> manifest;
std::string shadow_obj;
bool has_data{false};
bufferlist data;
bool prefetch_data{false};
bool keep_tail{false};
bool is_olh{false};
bufferlist olh_tag;
uint64_t pg_ver{false};
uint32_t zone_short_id{0};
/* important! don't forget to update copy constructor */
RGWObjVersionTracker objv_tracker;
std::map<std::string, bufferlist> attrset;
RGWObjState();
RGWObjState(const RGWObjState& rhs);
~RGWObjState();
bool get_attr(std::string name, bufferlist& dest) {
auto iter = attrset.find(name);
if (iter != attrset.end()) {
dest = iter->second;
return true;
}
return false;
}
};
class RGWFetchObjFilter {
public:
virtual ~RGWFetchObjFilter() {}

View File

@ -130,3 +130,27 @@ void StoreManager::close_storage(rgw::sal::Store* store)
delete store;
}
namespace rgw::sal {
int Object::range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end)
{
if (ofs < 0) {
ofs += obj_size;
if (ofs < 0)
ofs = 0;
end = obj_size - 1;
} else if (end < 0) {
end = obj_size - 1;
}
if (obj_size > 0) {
if (ofs >= (off_t)obj_size) {
return -ERANGE;
}
if (end >= (off_t)obj_size) {
end = obj_size - 1;
}
}
return 0;
}
}

View File

@ -46,7 +46,7 @@ namespace rgw::sal {
buckets.set_truncated(is_truncated);
for (const auto& ent : ulist.get_buckets()) {
buckets.add(std::unique_ptr<Bucket>(new DBBucket(this->store, ent.second, this)));
buckets.add(std::make_unique<DBBucket>(this->store, ent.second, this));
}
return 0;
@ -330,13 +330,37 @@ namespace rgw::sal {
std::unique_ptr<Object> DBBucket::get_object(const rgw_obj_key& k)
{
return nullptr;
return std::make_unique<DBObject>(this->store, k, this);
}
int DBBucket::list(const DoutPrefixProvider *dpp, ListParams& params, int max, ListResults& results, optional_yield y)
{
/* XXX: Objects */
return 0;
int ret = 0;
results.objs.clear();
DB::Bucket target(store->getDB(), get_info());
DB::Bucket::List list_op(&target);
list_op.params.prefix = params.prefix;
list_op.params.delim = params.delim;
list_op.params.marker = params.marker;
list_op.params.ns = params.ns;
list_op.params.end_marker = params.end_marker;
list_op.params.ns = params.ns;
list_op.params.enforce_ns = params.enforce_ns;
list_op.params.filter = params.filter;
list_op.params.list_versions = params.list_versions;
list_op.params.allow_unordered = params.allow_unordered;
results.objs.clear();
ret = list_op.list_objects(dpp, max, &results.objs, &results.common_prefixes, &results.is_truncated);
if (ret >= 0) {
results.next_marker = list_op.get_next_marker();
params.marker = results.next_marker;
}
return ret;
}
int DBBucket::list_multiparts(const DoutPrefixProvider *dpp,
@ -416,9 +440,465 @@ namespace rgw::sal {
std::unique_ptr<LuaScriptManager> DBStore::get_lua_script_manager()
{
return std::unique_ptr<LuaScriptManager>(new DBLuaScriptManager(this));
return std::make_unique<DBLuaScriptManager>(this);
}
int DBObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, RGWObjState **state, optional_yield y, bool follow_olh)
{
if (!*state) {
*state = new RGWObjState();
}
DB::Object op_target(store->getDB(), get_bucket()->get_info(), get_obj());
return op_target.get_obj_state(dpp, get_bucket()->get_info(), get_obj(), follow_olh, state);
}
int DBObject::read_attrs(const DoutPrefixProvider* dpp, DB::Object::Read &read_op, optional_yield y, rgw_obj* target_obj)
{
read_op.params.attrs = &attrs;
read_op.params.target_obj = target_obj;
read_op.params.obj_size = &obj_size;
read_op.params.lastmod = &mtime;
return read_op.prepare(dpp);
}
int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, Attrs* setattrs, Attrs* delattrs, optional_yield y, rgw_obj* target_obj)
{
Attrs empty;
DB::Object op_target(store->getDB(),
get_bucket()->get_info(), target_obj ? *target_obj : get_obj());
return op_target.set_attrs(dpp, setattrs ? *setattrs : empty, delattrs);
}
int DBObject::get_obj_attrs(RGWObjectCtx* rctx, optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj)
{
DB::Object op_target(store->getDB(), get_bucket()->get_info(), get_obj());
DB::Object::Read read_op(&op_target);
return read_attrs(dpp, read_op, y, target_obj);
}
int DBObject::modify_obj_attrs(RGWObjectCtx* rctx, const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp)
{
rgw_obj target = get_obj();
int r = get_obj_attrs(rctx, y, dpp, &target);
if (r < 0) {
return r;
}
set_atomic(rctx);
attrs[attr_name] = attr_val;
return set_obj_attrs(dpp, rctx, &attrs, nullptr, y, &target);
}
int DBObject::delete_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, const char* attr_name, optional_yield y)
{
rgw_obj target = get_obj();
Attrs rmattr;
bufferlist bl;
set_atomic(rctx);
rmattr[attr_name] = bl;
return set_obj_attrs(dpp, rctx, nullptr, &rmattr, y, &target);
}
int DBObject::copy_obj_data(RGWObjectCtx& rctx, Bucket* dest_bucket,
Object* dest_obj,
uint16_t olh_epoch,
std::string* petag,
const DoutPrefixProvider* dpp,
optional_yield y)
{
return 0;
}
/* RGWObjectCtx will be moved out of sal */
/* XXX: Placeholder. Should not be needed later after Dan's patch */
void DBObject::set_atomic(RGWObjectCtx* rctx) const
{
return;
}
/* RGWObjectCtx will be moved out of sal */
/* XXX: Placeholder. Should not be needed later after Dan's patch */
void DBObject::set_prefetch_data(RGWObjectCtx* rctx)
{
return;
}
bool DBObject::is_expired() {
return false;
}
void DBObject::gen_rand_obj_instance_name()
{
store->getDB()->gen_rand_obj_instance_name(&key);
}
int DBObject::omap_get_vals(const DoutPrefixProvider *dpp, const std::string& marker, uint64_t count,
std::map<std::string, bufferlist> *m,
bool* pmore, optional_yield y)
{
DB::Object op_target(store->getDB(),
get_bucket()->get_info(), get_obj());
return op_target.obj_omap_get_vals(dpp, marker, count, m, pmore);
}
int DBObject::omap_get_all(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *m,
optional_yield y)
{
DB::Object op_target(store->getDB(),
get_bucket()->get_info(), get_obj());
return op_target.obj_omap_get_all(dpp, m);
}
int DBObject::omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid,
const std::set<std::string>& keys,
Attrs* vals)
{
DB::Object op_target(store->getDB(),
get_bucket()->get_info(), get_obj());
return op_target.obj_omap_get_vals_by_keys(dpp, oid, keys, vals);
}
int DBObject::omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val,
bool must_exist, optional_yield y)
{
DB::Object op_target(store->getDB(),
get_bucket()->get_info(), get_obj());
return op_target.obj_omap_set_val_by_key(dpp, key, val, must_exist);
}
MPSerializer* DBObject::get_serializer(const DoutPrefixProvider *dpp, const std::string& lock_name)
{
return nullptr;
}
int DBObject::transition(RGWObjectCtx& rctx,
Bucket* bucket,
const rgw_placement_rule& placement_rule,
const real_time& mtime,
uint64_t olh_epoch,
const DoutPrefixProvider* dpp,
optional_yield y)
{
return 0;
}
bool DBObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2)
{
/* XXX: support single default zone and zonegroup for now */
return true;
}
int DBObject::get_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f, RGWObjectCtx* obj_ctx)
{
return 0;
}
std::unique_ptr<Object::ReadOp> DBObject::get_read_op(RGWObjectCtx* ctx)
{
return std::make_unique<DBObject::DBReadOp>(this, ctx);
}
DBObject::DBReadOp::DBReadOp(DBObject *_source, RGWObjectCtx *_rctx) :
source(_source),
rctx(_rctx),
op_target(_source->store->getDB(),
_source->get_bucket()->get_info(),
_source->get_obj()),
parent_op(&op_target)
{ }
int DBObject::DBReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp)
{
uint64_t obj_size;
parent_op.conds.mod_ptr = params.mod_ptr;
parent_op.conds.unmod_ptr = params.unmod_ptr;
parent_op.conds.high_precision_time = params.high_precision_time;
parent_op.conds.mod_zone_id = params.mod_zone_id;
parent_op.conds.mod_pg_ver = params.mod_pg_ver;
parent_op.conds.if_match = params.if_match;
parent_op.conds.if_nomatch = params.if_nomatch;
parent_op.params.lastmod = params.lastmod;
parent_op.params.target_obj = params.target_obj;
parent_op.params.obj_size = &obj_size;
parent_op.params.attrs = &source->get_attrs();
int ret = parent_op.prepare(dpp);
if (ret < 0)
return ret;
source->set_key(parent_op.state.obj.key);
source->set_obj_size(obj_size);
return ret;
}
int DBObject::DBReadOp::read(int64_t ofs, int64_t end, bufferlist& bl, optional_yield y, const DoutPrefixProvider* dpp)
{
return parent_op.read(ofs, end, bl, dpp);
}
int DBObject::DBReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y)
{
return parent_op.get_attr(dpp, name, dest);
}
std::unique_ptr<Object::DeleteOp> DBObject::get_delete_op(RGWObjectCtx* ctx)
{
return std::make_unique<DBObject::DBDeleteOp>(this, ctx);
}
DBObject::DBDeleteOp::DBDeleteOp(DBObject *_source, RGWObjectCtx *_rctx) :
source(_source),
rctx(_rctx),
op_target(_source->store->getDB(),
_source->get_bucket()->get_info(),
_source->get_obj()),
parent_op(&op_target)
{ }
int DBObject::DBDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y)
{
parent_op.params.bucket_owner = params.bucket_owner.get_id();
parent_op.params.versioning_status = params.versioning_status;
parent_op.params.obj_owner = params.obj_owner;
parent_op.params.olh_epoch = params.olh_epoch;
parent_op.params.marker_version_id = params.marker_version_id;
parent_op.params.bilog_flags = params.bilog_flags;
parent_op.params.remove_objs = params.remove_objs;
parent_op.params.expiration_time = params.expiration_time;
parent_op.params.unmod_since = params.unmod_since;
parent_op.params.mtime = params.mtime;
parent_op.params.high_precision_time = params.high_precision_time;
parent_op.params.zones_trace = params.zones_trace;
parent_op.params.abortmp = params.abortmp;
parent_op.params.parts_accounted_size = params.parts_accounted_size;
int ret = parent_op.delete_obj(dpp);
if (ret < 0)
return ret;
result.delete_marker = parent_op.result.delete_marker;
result.version_id = parent_op.result.version_id;
return ret;
}
int DBObject::delete_object(const DoutPrefixProvider* dpp, RGWObjectCtx* obj_ctx, optional_yield y, bool prevent_versioning)
{
DB::Object del_target(store->getDB(), bucket->get_info(), *obj_ctx, get_obj());
DB::Object::Delete del_op(&del_target);
del_op.params.bucket_owner = bucket->get_info().owner;
del_op.params.versioning_status = bucket->get_info().versioning_status();
return del_op.delete_obj(dpp);
}
int DBObject::delete_obj_aio(const DoutPrefixProvider* dpp, RGWObjState* astate,
Completions* aio, bool keep_index_consistent,
optional_yield y)
{
/* XXX: Make it async */
return 0;
}
int DBObject::copy_object(RGWObjectCtx& obj_ctx,
User* user,
req_info* info,
const rgw_zone_id& source_zone,
rgw::sal::Object* dest_object,
rgw::sal::Bucket* dest_bucket,
rgw::sal::Bucket* src_bucket,
const rgw_placement_rule& dest_placement,
ceph::real_time* src_mtime,
ceph::real_time* mtime,
const ceph::real_time* mod_ptr,
const ceph::real_time* unmod_ptr,
bool high_precision_time,
const char* if_match,
const char* if_nomatch,
AttrsMod attrs_mod,
bool copy_if_newer,
Attrs& attrs,
RGWObjCategory category,
uint64_t olh_epoch,
boost::optional<ceph::real_time> delete_at,
std::string* version_id,
std::string* tag,
std::string* etag,
void (*progress_cb)(off_t, void *),
void* progress_data,
const DoutPrefixProvider* dpp,
optional_yield y)
{
return 0;
}
int DBObject::DBReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, RGWGetDataCB* cb, optional_yield y)
{
return parent_op.iterate(dpp, ofs, end, cb);
}
int DBObject::swift_versioning_restore(RGWObjectCtx* obj_ctx,
bool& restored,
const DoutPrefixProvider* dpp)
{
return 0;
}
int DBObject::swift_versioning_copy(RGWObjectCtx* obj_ctx,
const DoutPrefixProvider* dpp,
optional_yield y)
{
return 0;
}
DBAtomicWriter::DBAtomicWriter(const DoutPrefixProvider *dpp,
optional_yield y,
std::unique_ptr<rgw::sal::Object> _head_obj,
DBStore* _store,
const rgw_user& _owner, RGWObjectCtx& obj_ctx,
const rgw_placement_rule *_ptail_placement_rule,
uint64_t _olh_epoch,
const std::string& _unique_tag) :
Writer(dpp, y),
store(_store),
owner(_owner),
ptail_placement_rule(_ptail_placement_rule),
olh_epoch(_olh_epoch),
unique_tag(_unique_tag),
obj(_store, _head_obj->get_key(), _head_obj->get_bucket()),
op_target(_store->getDB(), obj.get_bucket()->get_info(), obj.get_obj()),
parent_op(&op_target) {}
int DBAtomicWriter::prepare(optional_yield y)
{
return parent_op.prepare(NULL); /* send dpp */
}
int DBAtomicWriter::process(bufferlist&& data, uint64_t offset)
{
total_data_size += data.length();
/* XXX: Optimize all bufferlist copies in this function */
/* copy head_data into meta. */
uint64_t head_size = store->getDB()->get_max_head_size();
unsigned head_len = 0;
uint64_t max_chunk_size = store->getDB()->get_max_chunk_size();
int excess_size = 0;
/* Accumulate tail_data till max_chunk_size or flush op */
bufferlist tail_data;
if (data.length() != 0) {
if (offset < head_size) {
/* XXX: handle case (if exists) where offset > 0 & < head_size */
head_len = std::min((uint64_t)data.length(),
head_size - offset);
bufferlist tmp;
data.begin(0).copy(head_len, tmp);
head_data.append(tmp);
parent_op.meta.data = &head_data;
if (head_len == data.length()) {
return 0;
}
/* Move offset by copy_len */
offset = head_len;
}
/* handle tail parts.
* First accumulate and write data into dbstore in its chunk_size
* parts
*/
if (!tail_part_size) { /* new tail part */
tail_part_offset = offset;
}
data.begin(head_len).copy(data.length() - head_len, tail_data);
tail_part_size += tail_data.length();
tail_part_data.append(tail_data);
if (tail_part_size < max_chunk_size) {
return 0;
} else {
int write_ofs = 0;
while (tail_part_size >= max_chunk_size) {
excess_size = tail_part_size - max_chunk_size;
bufferlist tmp;
tail_part_data.begin(write_ofs).copy(max_chunk_size, tmp);
/* write tail objects data */
int ret = parent_op.write_data(dpp, tmp, tail_part_offset);
if (ret < 0) {
return ret;
}
tail_part_size -= max_chunk_size;
write_ofs += max_chunk_size;
tail_part_offset += max_chunk_size;
}
/* reset tail parts or update if excess data */
if (excess_size > 0) { /* wrote max_chunk_size data */
tail_part_size = excess_size;
bufferlist tmp;
tail_part_data.begin(write_ofs).copy(excess_size, tmp);
tail_part_data = tmp;
} else {
tail_part_size = 0;
tail_part_data.clear();
tail_part_offset = 0;
}
}
} else {
if (tail_part_size == 0) {
return 0; /* nothing more to write */
}
/* flush watever tail data is present */
int ret = parent_op.write_data(dpp, tail_part_data, tail_part_offset);
if (ret < 0) {
return ret;
}
tail_part_size = 0;
tail_part_data.clear();
tail_part_offset = 0;
}
return 0;
}
int DBAtomicWriter::complete(size_t accounted_size, const std::string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
std::map<std::string, bufferlist>& attrs,
ceph::real_time delete_at,
const char *if_match, const char *if_nomatch,
const std::string *user_data,
rgw_zone_set *zones_trace, bool *canceled,
optional_yield y)
{
parent_op.meta.mtime = mtime;
parent_op.meta.delete_at = delete_at;
parent_op.meta.if_match = if_match;
parent_op.meta.if_nomatch = if_nomatch;
parent_op.meta.user_data = user_data;
parent_op.meta.zones_trace = zones_trace;
/* XXX: handle accounted size */
accounted_size = total_data_size;
int ret = parent_op.write_meta(dpp, total_data_size, accounted_size, attrs);
if (canceled) {
*canceled = parent_op.meta.canceled;
}
return ret;
}
std::unique_ptr<RGWRole> DBStore::get_role(std::string name,
std::string tenant,
@ -481,12 +961,14 @@ namespace rgw::sal {
const rgw_placement_rule *ptail_placement_rule,
uint64_t olh_epoch,
const std::string& unique_tag) {
return nullptr;
return std::make_unique<DBAtomicWriter>(dpp, y,
std::move(_head_obj), this, owner, obj_ctx,
ptail_placement_rule, olh_epoch, unique_tag);
}
std::unique_ptr<User> DBStore::get_user(const rgw_user &u)
{
return std::unique_ptr<User>(new DBUser(this, u));
return std::make_unique<DBUser>(this, u);
}
int DBStore::get_user_by_access_key(const DoutPrefixProvider *dpp, const std::string& key, optional_yield y, std::unique_ptr<User>* user)
@ -545,7 +1027,7 @@ namespace rgw::sal {
std::unique_ptr<Object> DBStore::get_object(const rgw_obj_key& k)
{
return NULL;
return std::make_unique<DBObject>(this, k);
}
@ -633,7 +1115,7 @@ namespace rgw::sal {
return -EEXIST;
}*/
} else {
bucket = std::unique_ptr<Bucket>(new DBBucket(this, b, u));
bucket = std::make_unique<DBBucket>(this, b, u);
*existed = false;
bucket->set_attrs(attrs);
// XXX: For now single default zone and STANDARD storage class
@ -732,7 +1214,7 @@ namespace rgw::sal {
struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name)
{
return 0;
return std::make_unique<DBNotification>(obj, event_type);
}
int DBStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info)
@ -875,6 +1357,7 @@ extern "C" {
store->setDBStoreManager(dbsm);
store->setDB(db);
db->set_store((rgw::sal::Store*)store);
}
return store;

View File

@ -26,6 +26,20 @@ namespace rgw { namespace sal {
class DBStore;
class DBNotification : public Notification {
protected:
Object* obj;
rgw::notify::EventType event_type;
public:
DBNotification(Object* _obj, rgw::notify::EventType _type) : Notification(_obj, _type), obj(_obj), event_type(_type) {}
~DBNotification() = default;
virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;}
virtual int publish_commit(const DoutPrefixProvider* dpp, uint64_t size,
const ceph::real_time& mtime, const std::string& etag, const std::string& version) override { return 0; }
};
class DBUser : public User {
private:
DBStore *store;
@ -245,6 +259,179 @@ namespace rgw { namespace sal {
}
};
class DBObject : public Object {
private:
DBStore* store;
RGWAccessControlPolicy acls;
/* XXX: to be removed. Till Dan's patch comes, a placeholder
* for RGWObjState
*/
RGWObjState* state;
public:
struct DBReadOp : public ReadOp {
private:
DBObject* source;
RGWObjectCtx* rctx;
DB::Object op_target;
DB::Object::Read parent_op;
public:
DBReadOp(DBObject *_source, RGWObjectCtx *_rctx);
virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int read(int64_t ofs, int64_t end, bufferlist& bl, optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, RGWGetDataCB* cb, optional_yield y) override;
virtual int get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y) override;
};
struct DBDeleteOp : public DeleteOp {
private:
DBObject* source;
RGWObjectCtx* rctx;
DB::Object op_target;
DB::Object::Delete parent_op;
public:
DBDeleteOp(DBObject* _source, RGWObjectCtx* _rctx);
virtual int delete_obj(const DoutPrefixProvider* dpp, optional_yield y) override;
};
DBObject() = default;
DBObject(DBStore *_st, const rgw_obj_key& _k)
: Object(_k),
store(_st),
acls() {
}
DBObject(DBStore *_st, const rgw_obj_key& _k, Bucket* _b)
: Object(_k, _b),
store(_st),
acls() {
}
DBObject(DBObject& _o) = default;
virtual int delete_object(const DoutPrefixProvider* dpp,
RGWObjectCtx* obj_ctx,
optional_yield y,
bool prevent_versioning = false) override;
virtual int delete_obj_aio(const DoutPrefixProvider* dpp, RGWObjState* astate, Completions* aio,
bool keep_index_consistent, optional_yield y) override;
virtual int copy_object(RGWObjectCtx& obj_ctx, User* user,
req_info* info, const rgw_zone_id& source_zone,
rgw::sal::Object* dest_object, rgw::sal::Bucket* dest_bucket,
rgw::sal::Bucket* src_bucket,
const rgw_placement_rule& dest_placement,
ceph::real_time* src_mtime, ceph::real_time* mtime,
const ceph::real_time* mod_ptr, const ceph::real_time* unmod_ptr,
bool high_precision_time,
const char* if_match, const char* if_nomatch,
AttrsMod attrs_mod, bool copy_if_newer, Attrs& attrs,
RGWObjCategory category, uint64_t olh_epoch,
boost::optional<ceph::real_time> delete_at,
std::string* version_id, std::string* tag, std::string* etag,
void (*progress_cb)(off_t, void *), void* progress_data,
const DoutPrefixProvider* dpp, optional_yield y) override;
virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
virtual void set_atomic(RGWObjectCtx* rctx) const override;
virtual void set_prefetch_data(RGWObjectCtx* rctx) override;
virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, Attrs* setattrs, Attrs* delattrs, optional_yield y, rgw_obj* target_obj = NULL) override;
virtual int get_obj_attrs(RGWObjectCtx* rctx, optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(RGWObjectCtx* rctx, const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, const char* attr_name, optional_yield y) override;
virtual int copy_obj_data(RGWObjectCtx& rctx, Bucket* dest_bucket, Object* dest_obj, uint16_t olh_epoch, std::string* petag, const DoutPrefixProvider* dpp, optional_yield y) override;
virtual bool is_expired() override;
virtual void gen_rand_obj_instance_name() override;
virtual std::unique_ptr<Object> clone() override {
return std::unique_ptr<Object>(new DBObject(*this));
}
virtual MPSerializer* get_serializer(const DoutPrefixProvider *dpp, const std::string& lock_name) override;
virtual int transition(RGWObjectCtx& rctx,
Bucket* bucket,
const rgw_placement_rule& placement_rule,
const real_time& mtime,
uint64_t olh_epoch,
const DoutPrefixProvider* dpp,
optional_yield y) override;
virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override;
virtual int get_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f, RGWObjectCtx* obj_ctx) override;
/* Swift versioning */
virtual int swift_versioning_restore(RGWObjectCtx* obj_ctx,
bool& restored,
const DoutPrefixProvider* dpp) override;
virtual int swift_versioning_copy(RGWObjectCtx* obj_ctx,
const DoutPrefixProvider* dpp,
optional_yield y) override;
/* OPs */
virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx *) override;
virtual std::unique_ptr<DeleteOp> get_delete_op(RGWObjectCtx*) override;
/* OMAP */
virtual int omap_get_vals(const DoutPrefixProvider *dpp, const std::string& marker, uint64_t count,
std::map<std::string, bufferlist> *m,
bool* pmore, optional_yield y) override;
virtual int omap_get_all(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *m,
optional_yield y) override;
virtual int omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid,
const std::set<std::string>& keys,
Attrs* vals) override;
virtual int omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val,
bool must_exist, optional_yield y) override;
private:
int read_attrs(const DoutPrefixProvider* dpp, DB::Object::Read &read_op, optional_yield y, rgw_obj* target_obj = nullptr);
};
class DBAtomicWriter : public Writer {
protected:
rgw::sal::DBStore* store;
const rgw_user& owner;
const rgw_placement_rule *ptail_placement_rule;
uint64_t olh_epoch;
const std::string& unique_tag;
DBObject obj;
DB::Object op_target;
DB::Object::Write parent_op;
uint64_t total_data_size = 0; /* for total data being uploaded */
bufferlist head_data;
bufferlist tail_part_data;
uint64_t tail_part_offset;
uint64_t tail_part_size = 0; /* corresponds to each tail part being
written to dbstore */
public:
DBAtomicWriter(const DoutPrefixProvider *dpp,
optional_yield y,
std::unique_ptr<rgw::sal::Object> _head_obj,
DBStore* _store,
const rgw_user& _owner, RGWObjectCtx& obj_ctx,
const rgw_placement_rule *_ptail_placement_rule,
uint64_t _olh_epoch,
const std::string& _unique_tag);
~DBAtomicWriter() = default;
// prepare to start processing object data
virtual int prepare(optional_yield y) override;
// Process a bufferlist
virtual int process(bufferlist&& data, uint64_t offset) override;
// complete the operation and make its result visible to clients
virtual int complete(size_t accounted_size, const std::string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
std::map<std::string, bufferlist>& attrs,
ceph::real_time delete_at,
const char *if_match, const char *if_nomatch,
const std::string *user_data,
rgw_zone_set *zones_trace, bool *canceled,
optional_yield y) override;
};
class DBStore : public Store {
private:
/* DBStoreManager is used in case multiple

View File

@ -1484,28 +1484,6 @@ int RadosStore::get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucke
return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx);
}
int Object::range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end)
{
if (ofs < 0) {
ofs += obj_size;
if (ofs < 0)
ofs = 0;
end = obj_size - 1;
} else if (end < 0) {
end = obj_size - 1;
}
if (obj_size > 0) {
if (ofs >= (off_t)obj_size) {
return -ERANGE;
}
if (end >= (off_t)obj_size) {
end = obj_size - 1;
}
}
return 0;
}
RadosObject::~RadosObject() {}
int RadosObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, RGWObjState **state, optional_yield y, bool follow_olh)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -24,15 +24,16 @@ class SQLiteDB : public DB, public DBOp{
sqlite3_stmt *stmt = NULL;
DBOpPrepareParams PrepareParams;
SQLiteDB(string db_name, CephContext *_cct) : DB(db_name, _cct), cct(_cct) {
SQLiteDB(sqlite3 *dbi, string db_name, CephContext *_cct) : DB(db_name, _cct), cct(_cct) {
db = (void*)dbi;
InitPrepareParams(get_def_dpp(), PrepareParams);
}
SQLiteDB(sqlite3 *dbi, CephContext *_cct) : DB(_cct), cct(_cct) {
db = (void*)dbi;
SQLiteDB(string db_name, CephContext *_cct) : DB(db_name, _cct), cct(_cct) {
InitPrepareParams(get_def_dpp(), PrepareParams);
}
~SQLiteDB() {}
uint64_t get_blob_limit() override { return SQLITE_LIMIT_LENGTH; }
void *openDB(const DoutPrefixProvider *dpp) override;
int closeDB(const DoutPrefixProvider *dpp) override;
int InitializeDBOps(const DoutPrefixProvider *dpp) override;
@ -73,7 +74,7 @@ class SQLObjectOp : public ObjectOp {
SQLObjectOp(sqlite3 **sdbi, CephContext *_cct) : sdb(sdbi), cct(_cct) {};
~SQLObjectOp() {}
int InitializeObjectOps(const DoutPrefixProvider *dpp);
int InitializeObjectOps(string db_name, const DoutPrefixProvider *dpp);
int FreeObjectOps(const DoutPrefixProvider *dpp);
};
@ -83,7 +84,7 @@ class SQLInsertUser : public SQLiteDB, public InsertUserOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLInsertUser(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLInsertUser(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
~SQLInsertUser() {
if (stmt)
sqlite3_finalize(stmt);
@ -99,7 +100,7 @@ class SQLRemoveUser : public SQLiteDB, public RemoveUserOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLRemoveUser(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLRemoveUser(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
~SQLRemoveUser() {
if (stmt)
sqlite3_finalize(stmt);
@ -118,7 +119,7 @@ class SQLGetUser : public SQLiteDB, public GetUserOp {
sqlite3_stmt *userid_stmt = NULL; // Prepared statement to query by user_id
public:
SQLGetUser(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLGetUser(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
~SQLGetUser() {
if (stmt)
sqlite3_finalize(stmt);
@ -140,7 +141,7 @@ class SQLInsertBucket : public SQLiteDB, public InsertBucketOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLInsertBucket(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLInsertBucket(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
~SQLInsertBucket() {
if (stmt)
sqlite3_finalize(stmt);
@ -158,7 +159,7 @@ class SQLUpdateBucket : public SQLiteDB, public UpdateBucketOp {
sqlite3_stmt *owner_stmt = NULL; // Prepared statement
public:
SQLUpdateBucket(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLUpdateBucket(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
~SQLUpdateBucket() {
if (info_stmt)
sqlite3_finalize(info_stmt);
@ -178,7 +179,7 @@ class SQLRemoveBucket : public SQLiteDB, public RemoveBucketOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLRemoveBucket(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLRemoveBucket(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
~SQLRemoveBucket() {
if (stmt)
sqlite3_finalize(stmt);
@ -194,7 +195,7 @@ class SQLGetBucket : public SQLiteDB, public GetBucketOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLGetBucket(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLGetBucket(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
~SQLGetBucket() {
if (stmt)
sqlite3_finalize(stmt);
@ -210,7 +211,7 @@ class SQLListUserBuckets : public SQLiteDB, public ListUserBucketsOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLListUserBuckets(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLListUserBuckets(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
~SQLListUserBuckets() {
if (stmt)
sqlite3_finalize(stmt);
@ -220,16 +221,16 @@ class SQLListUserBuckets : public SQLiteDB, public ListUserBucketsOp {
int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
};
class SQLInsertObject : public SQLiteDB, public InsertObjectOp {
class SQLPutObject : public SQLiteDB, public PutObjectOp {
private:
sqlite3 **sdb = NULL;
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLInsertObject(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLInsertObject(sqlite3 **sdbi, CephContext *cct) : SQLiteDB(*sdbi, cct), sdb(sdbi) {}
SQLPutObject(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
SQLPutObject(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
~SQLInsertObject() {
~SQLPutObject() {
if (stmt)
sqlite3_finalize(stmt);
}
@ -238,16 +239,16 @@ class SQLInsertObject : public SQLiteDB, public InsertObjectOp {
int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
};
class SQLRemoveObject : public SQLiteDB, public RemoveObjectOp {
class SQLDeleteObject : public SQLiteDB, public DeleteObjectOp {
private:
sqlite3 **sdb = NULL;
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLRemoveObject(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLRemoveObject(sqlite3 **sdbi, CephContext *cct) : SQLiteDB(*sdbi, cct), sdb(sdbi) {}
SQLDeleteObject(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
SQLDeleteObject(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
~SQLRemoveObject() {
~SQLDeleteObject() {
if (stmt)
sqlite3_finalize(stmt);
}
@ -256,16 +257,59 @@ class SQLRemoveObject : public SQLiteDB, public RemoveObjectOp {
int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
};
class SQLListObject : public SQLiteDB, public ListObjectOp {
class SQLGetObject : public SQLiteDB, public GetObjectOp {
private:
sqlite3 **sdb = NULL;
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLListObject(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLListObject(sqlite3 **sdbi, CephContext *cct) : SQLiteDB(*sdbi, cct), sdb(sdbi) {}
SQLGetObject(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
SQLGetObject(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
~SQLListObject() {
~SQLGetObject() {
if (stmt)
sqlite3_finalize(stmt);
}
int Prepare(const DoutPrefixProvider *dpp, DBOpParams *params);
int Execute(const DoutPrefixProvider *dpp, DBOpParams *params);
int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
};
class SQLUpdateObject : public SQLiteDB, public UpdateObjectOp {
private:
sqlite3 **sdb = NULL;
sqlite3_stmt *omap_stmt = NULL; // Prepared statement
sqlite3_stmt *attrs_stmt = NULL; // Prepared statement
sqlite3_stmt *meta_stmt = NULL; // Prepared statement
public:
SQLUpdateObject(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
SQLUpdateObject(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
~SQLUpdateObject() {
if (omap_stmt)
sqlite3_finalize(omap_stmt);
if (attrs_stmt)
sqlite3_finalize(attrs_stmt);
if (meta_stmt)
sqlite3_finalize(meta_stmt);
}
int Prepare(const DoutPrefixProvider *dpp, DBOpParams *params);
int Execute(const DoutPrefixProvider *dpp, DBOpParams *params);
int Bind(const DoutPrefixProvider *dpp, DBOpParams *params);
};
class SQLListBucketObjects : public SQLiteDB, public ListBucketObjectsOp {
private:
sqlite3 **sdb = NULL;
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLListBucketObjects(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
SQLListBucketObjects(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
~SQLListBucketObjects() {
if (stmt)
sqlite3_finalize(stmt);
}
@ -280,8 +324,8 @@ class SQLPutObjectData : public SQLiteDB, public PutObjectDataOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLPutObjectData(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLPutObjectData(sqlite3 **sdbi, CephContext *cct) : SQLiteDB(*sdbi, cct), sdb(sdbi) {}
SQLPutObjectData(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
SQLPutObjectData(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
~SQLPutObjectData() {
if (stmt)
@ -298,8 +342,8 @@ class SQLGetObjectData : public SQLiteDB, public GetObjectDataOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLGetObjectData(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLGetObjectData(sqlite3 **sdbi, CephContext *cct) : SQLiteDB(*sdbi, cct), sdb(sdbi) {}
SQLGetObjectData(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
SQLGetObjectData(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
~SQLGetObjectData() {
if (stmt)
@ -316,8 +360,8 @@ class SQLDeleteObjectData : public SQLiteDB, public DeleteObjectDataOp {
sqlite3_stmt *stmt = NULL; // Prepared statement
public:
SQLDeleteObjectData(void **db, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), cct), sdb((sqlite3 **)db) {}
SQLDeleteObjectData(sqlite3 **sdbi, CephContext *cct) : SQLiteDB(*sdbi, cct), sdb(sdbi) {}
SQLDeleteObjectData(void **db, string db_name, CephContext *cct) : SQLiteDB((sqlite3 *)(*db), db_name, cct), sdb((sqlite3 **)db) {}
SQLDeleteObjectData(sqlite3 **sdbi, string db_name, CephContext *cct) : SQLiteDB(*sdbi, db_name, cct), sdb(sdbi) {}
~SQLDeleteObjectData() {
if (stmt)

View File

@ -58,6 +58,19 @@ namespace gtest {
ceph::real_time bucket_mtime = real_clock::now();
string marker1;
class DBGetDataCB : public RGWGetDataCB {
public:
bufferlist data_bl;
off_t data_ofs, data_len;
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
data_bl = bl;
data_ofs = bl_ofs;
data_len = bl_len;
return 0;
}
};
namespace {
class DBStoreTest : public ::testing::Test {
@ -82,10 +95,11 @@ namespace {
GlobalParams.op.user.uinfo.display_name = user1;
GlobalParams.op.user.uinfo.user_id.id = user_id1;
GlobalParams.op.bucket.info.bucket.name = bucket1;
GlobalParams.object = object1;
GlobalParams.offset = 0;
GlobalParams.data = data;
GlobalParams.datalen = data.length();
GlobalParams.op.obj.state.obj.bucket = GlobalParams.op.bucket.info.bucket;
GlobalParams.op.obj.state.obj.key.name = object1;
GlobalParams.op.obj.state.obj.key.instance = "inst1";
GlobalParams.op.obj_data.part_num = 0;
/* As of now InitializeParams doesnt do anything
* special based on fop. Hence its okay to do
@ -581,19 +595,25 @@ TEST_F(DBStoreTest, ListAllBuckets) {
ASSERT_EQ(ret, 0);
}
TEST_F(DBStoreTest, InsertObject) {
TEST_F(DBStoreTest, PutObject) {
struct DBOpParams params = GlobalParams;
int ret = -1;
ret = db->ProcessOp(dpp, "InsertObject", &params);
params.op.obj.category = RGWObjCategory::Main;
params.op.obj.storage_class = "STANDARD";
bufferlist b1;
encode("HELLO WORLD", b1);
cout<<"XXXXXXXXX Insert b1.length " << b1.length() << "\n";
params.op.obj.head_data = b1;
params.op.obj.state.size = 12;
params.op.obj.state.is_olh = false;
ret = db->ProcessOp(dpp, "PutObject", &params);
ASSERT_EQ(ret, 0);
}
TEST_F(DBStoreTest, ListObject) {
struct DBOpParams params = GlobalParams;
int ret = -1;
ret = db->ProcessOp(dpp, "ListObject", &params);
/* Insert another object */
params.op.obj.state.obj.key.name = "object2";
params.op.obj.state.obj.key.instance = "inst2";
ret = db->ProcessOp(dpp, "PutObject", &params);
ASSERT_EQ(ret, 0);
}
@ -602,13 +622,347 @@ TEST_F(DBStoreTest, ListAllObjects) {
int ret = -1;
ret = db->ListAllObjects(dpp, &params);
ASSERT_GE(ret, 0);
}
TEST_F(DBStoreTest, GetObject) {
struct DBOpParams params = GlobalParams;
int ret = -1;
ret = db->ProcessOp(dpp, "GetObject", &params);
ASSERT_EQ(ret, 0);
ASSERT_EQ(params.op.obj.category, RGWObjCategory::Main);
ASSERT_EQ(params.op.obj.storage_class, "STANDARD");
string data;
decode(data, params.op.obj.head_data);
ASSERT_EQ(data, "HELLO WORLD");
ASSERT_EQ(params.op.obj.state.size, 12);
}
TEST_F(DBStoreTest, GetObjectState) {
struct DBOpParams params = GlobalParams;
int ret = -1;
RGWObjState state;
RGWObjState *s = &state;
params.op.obj.state.obj.key.name = "object2";
params.op.obj.state.obj.key.instance = "inst2";
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
ret = op_target.get_obj_state(dpp, params.op.bucket.info, params.op.obj.state.obj,
false, &s);
ASSERT_EQ(ret, 0);
ASSERT_EQ(state.size, 12);
ASSERT_EQ(state.is_olh, false);
/* Recheck with get_state API */
ret = op_target.get_state(dpp, &s, false);
ASSERT_EQ(ret, 0);
ASSERT_EQ(state.size, 12);
ASSERT_EQ(state.is_olh, false);
}
TEST_F(DBStoreTest, ObjAttrs) {
struct DBOpParams params = GlobalParams;
int ret = -1;
map<string, bufferlist> setattrs;
map<string, bufferlist> rmattrs;
map<string, bufferlist> readattrs;
bufferlist b1, b2, b3;
encode("ACL", b1);
setattrs[RGW_ATTR_ACL] = b1;
encode("LC", b2);
setattrs[RGW_ATTR_LC] = b2;
encode("ETAG", b3);
setattrs[RGW_ATTR_ETAG] = b3;
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
/* Set some attrs */
ret = op_target.set_attrs(dpp, setattrs, nullptr);
ASSERT_EQ(ret, 0);
/* read those attrs */
DB::Object::Read read_op(&op_target);
read_op.params.attrs = &readattrs;
ret = read_op.prepare(dpp);
ASSERT_EQ(ret, 0);
string val;
decode(val, readattrs[RGW_ATTR_ACL]);
ASSERT_EQ(val, "ACL");
decode(val, readattrs[RGW_ATTR_LC]);
ASSERT_EQ(val, "LC");
decode(val, readattrs[RGW_ATTR_ETAG]);
ASSERT_EQ(val, "ETAG");
/* Remove some attrs */
rmattrs[RGW_ATTR_ACL] = b1;
map<string, bufferlist> empty;
ret = op_target.set_attrs(dpp, empty, &rmattrs);
ASSERT_EQ(ret, 0);
/* read those attrs */
ret = read_op.prepare(dpp);
ASSERT_EQ(ret, 0);
ASSERT_EQ(readattrs.count(RGW_ATTR_ACL), 0);
decode(val, readattrs[RGW_ATTR_LC]);
ASSERT_EQ(val, "LC");
decode(val, readattrs[RGW_ATTR_ETAG]);
ASSERT_EQ(val, "ETAG");
}
TEST_F(DBStoreTest, WriteObject) {
struct DBOpParams params = GlobalParams;
int ret = -1;
map<string, bufferlist> setattrs;
params.op.obj.state.obj.key.name = "object3";
params.op.obj.state.obj.key.instance = "inst3";
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
DB::Object::Write write_op(&op_target);
ret = write_op.prepare(dpp);
ASSERT_EQ(ret, 0);
write_op.meta.mtime = &bucket_mtime;
write_op.meta.category = RGWObjCategory::Main;
write_op.meta.owner = params.op.user.uinfo.user_id;
bufferlist b1;
encode("HELLO WORLD - Object3", b1);
cout<<"XXXXXXXXX Insert b1.length " << b1.length() << "\n";
write_op.meta.data = &b1;
bufferlist b2;
encode("ACL", b2);
setattrs[RGW_ATTR_ACL] = b2;
ret = write_op.write_meta(0, 22, 25, setattrs);
ASSERT_EQ(ret, 0);
}
TEST_F(DBStoreTest, ReadObject) {
struct DBOpParams params = GlobalParams;
int ret = -1;
map<string, bufferlist> readattrs;
params.op.obj.state.obj.key.name = "object3";
params.op.obj.state.obj.key.instance = "inst3";
uint64_t obj_size;
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
DB::Object::Read read_op(&op_target);
read_op.params.attrs = &readattrs;
read_op.params.obj_size = &obj_size;
ret = read_op.prepare(dpp);
ASSERT_EQ(ret, 0);
bufferlist bl;
ret = read_op.read(0, 25, bl, dpp);
cout<<"XXXXXXXXX Insert bl.length " << bl.length() << "\n";
ASSERT_EQ(ret, 25);
string data;
decode(data, bl);
ASSERT_EQ(data, "HELLO WORLD - Object3");
ASSERT_EQ(obj_size, 22);
}
TEST_F(DBStoreTest, IterateObject) {
struct DBOpParams params = GlobalParams;
int ret = -1;
map<string, bufferlist> readattrs;
uint64_t obj_size;
DBGetDataCB cb;
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
DB::Object::Read read_op(&op_target);
read_op.params.attrs = &readattrs;
read_op.params.obj_size = &obj_size;
ret = read_op.prepare(dpp);
ASSERT_EQ(ret, 0);
bufferlist bl;
ret = read_op.iterate(dpp, 0, 15, &cb);
ASSERT_EQ(ret, 0);
string data;
decode(data, cb.data_bl);
cout << "XXXXXXXXXX iterate data is " << data << ", bl_ofs = " << cb.data_ofs << ", bl_len = " << cb.data_len << "\n";
ASSERT_EQ(data, "HELLO WORLD");
ASSERT_EQ(cb.data_ofs, 0);
ASSERT_EQ(cb.data_len, 15);
}
TEST_F(DBStoreTest, ListBucketObjects) {
struct DBOpParams params = GlobalParams;
int ret = -1;
int max = 2;
bool is_truncated = false;
rgw_obj_key marker1;
DB::Bucket target(db, params.op.bucket.info);
DB::Bucket::List list_op(&target);
vector<rgw_bucket_dir_entry> dir_list;
marker1.name = "";
do {
is_truncated = false;
list_op.params.marker = marker1;
ret = list_op.list_objects(dpp, max, &dir_list, nullptr, &is_truncated);
ASSERT_EQ(ret, 0);
cout << "marker1 :" << marker1.name << "\n";
cout << "is_truncated :" << is_truncated << "\n";
for (const auto& ent: dir_list) {
cls_rgw_obj_key key = ent.key;
cout << "###################### \n";
cout << "key.name : " << key.name << "\n";
cout << "key.instance : " << key.instance << "\n";
marker1 = list_op.get_next_marker();
}
dir_list.clear();
} while(is_truncated);
}
TEST_F(DBStoreTest, DeleteObj) {
struct DBOpParams params = GlobalParams;
int ret = -1;
RGWObjState state;
RGWObjState *s = &state;
/* delete object2 */
params.op.obj.state.obj.key.name = "object2";
params.op.obj.state.obj.key.instance = "inst2";
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
DB::Object::Delete delete_op(&op_target);
ret = delete_op.delete_obj(dpp);
ASSERT_EQ(ret, 0);
/* Should return ENOENT */
ret = op_target.get_state(dpp, &s, false);
ASSERT_EQ(ret, -2);
}
TEST_F(DBStoreTest, ObjectOmapSetVal) {
struct DBOpParams params = GlobalParams;
int ret = -1;
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
string val = "part1_val";
bufferlist bl;
encode(val, bl);
ret = op_target.obj_omap_set_val_by_key(dpp, "part1", bl, false);
ASSERT_EQ(ret, 0);
val = "part2_val";
bl.clear();
encode(val, bl);
ret = op_target.obj_omap_set_val_by_key(dpp, "part2", bl, false);
ASSERT_EQ(ret, 0);
val = "part3_val";
bl.clear();
encode(val, bl);
ret = op_target.obj_omap_set_val_by_key(dpp, "part3", bl, false);
ASSERT_EQ(ret, 0);
val = "part4_val";
bl.clear();
encode(val, bl);
ret = op_target.obj_omap_set_val_by_key(dpp, "part4", bl, false);
ASSERT_EQ(ret, 0);
}
TEST_F(DBStoreTest, ObjectOmapGetValsByKeys) {
struct DBOpParams params = GlobalParams;
int ret = -1;
std::set<std::string> keys;
std::map<std::string, bufferlist> vals;
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
keys.insert("part2");
keys.insert("part4");
ret = op_target.obj_omap_get_vals_by_keys(dpp, "", keys, &vals);
ASSERT_EQ(ret, 0);
ASSERT_EQ(vals.size(), 2);
string val;
decode(val, vals["part2"]);
ASSERT_EQ(val, "part2_val");
decode(val, vals["part4"]);
ASSERT_EQ(val, "part4_val");
}
TEST_F(DBStoreTest, ObjectOmapGetAll) {
struct DBOpParams params = GlobalParams;
int ret = -1;
std::map<std::string, bufferlist> vals;
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
ret = op_target.obj_omap_get_all(dpp, &vals);
ASSERT_EQ(ret, 0);
ASSERT_EQ(vals.size(), 4);
string val;
decode(val, vals["part1"]);
ASSERT_EQ(val, "part1_val");
decode(val, vals["part2"]);
ASSERT_EQ(val, "part2_val");
decode(val, vals["part3"]);
ASSERT_EQ(val, "part3_val");
decode(val, vals["part4"]);
ASSERT_EQ(val, "part4_val");
}
TEST_F(DBStoreTest, ObjectOmapGetVals) {
struct DBOpParams params = GlobalParams;
int ret = -1;
std::set<std::string> keys;
std::map<std::string, bufferlist> vals;
bool pmore;
DB::Object op_target(db, params.op.bucket.info,
params.op.obj.state.obj);
ret = op_target.obj_omap_get_vals(dpp, "part3", 10, &vals, &pmore);
ASSERT_EQ(ret, 0);
ASSERT_EQ(vals.size(), 2);
string val;
decode(val, vals["part3"]);
ASSERT_EQ(val, "part3_val");
decode(val, vals["part4"]);
ASSERT_EQ(val, "part4_val");
}
TEST_F(DBStoreTest, PutObjectData) {
struct DBOpParams params = GlobalParams;
int ret = -1;
params.op.obj_data.part_num = 1;
params.op.obj_data.offset = 10;
params.op.obj_data.multipart_part_num = 2;
bufferlist b1;
encode("HELLO WORLD", b1);
params.op.obj_data.data = b1;
params.op.obj_data.size = 12;
ret = db->ProcessOp(dpp, "PutObjectData", &params);
ASSERT_EQ(ret, 0);
}
@ -619,6 +973,12 @@ TEST_F(DBStoreTest, GetObjectData) {
ret = db->ProcessOp(dpp, "GetObjectData", &params);
ASSERT_EQ(ret, 0);
ASSERT_EQ(params.op.obj_data.part_num, 1);
ASSERT_EQ(params.op.obj_data.offset, 10);
ASSERT_EQ(params.op.obj_data.multipart_part_num, 2);
string data;
decode(data, params.op.obj_data.data);
ASSERT_EQ(data, "HELLO WORLD");
}
TEST_F(DBStoreTest, DeleteObjectData) {
@ -629,11 +989,11 @@ TEST_F(DBStoreTest, DeleteObjectData) {
ASSERT_EQ(ret, 0);
}
TEST_F(DBStoreTest, RemoveObject) {
TEST_F(DBStoreTest, DeleteObject) {
struct DBOpParams params = GlobalParams;
int ret = -1;
ret = db->ProcessOp(dpp, "RemoveObject", &params);
ret = db->ProcessOp(dpp, "DeleteObject", &params);
ASSERT_EQ(ret, 0);
}
@ -678,9 +1038,9 @@ int main(int argc, char **argv)
// format: ./dbstore-tests logfile loglevel
if (argc == 3) {
c_logfile = argv[1];
c_loglevel = (atoi)(argv[2]);
cout << "logfile:" << c_logfile << ", loglevel set to " << c_loglevel << "\n";
c_logfile = argv[1];
c_loglevel = (atoi)(argv[2]);
cout << "logfile:" << c_logfile << ", loglevel set to " << c_loglevel << "\n";
}
::testing::InitGoogleTest(&argc, argv);