mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
rgw: easy data access helper class
Only does put object right now. Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
parent
702f7f42b4
commit
fad214eb4b
@ -13,6 +13,8 @@
|
||||
#include "rgw_common.h"
|
||||
#include "rgw_rados.h"
|
||||
#include "rgw_tools.h"
|
||||
#include "rgw_acl_s3.h"
|
||||
#include "rgw_op.h"
|
||||
|
||||
#include "services/svc_sys_obj.h"
|
||||
|
||||
@ -270,6 +272,152 @@ void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const strin
|
||||
}
|
||||
}
|
||||
|
||||
RGWDataAccess::RGWDataAccess(RGWRados *_store) : store(_store)
|
||||
{
|
||||
obj_ctx = std::make_unique<RGWObjectCtx>(store);
|
||||
}
|
||||
|
||||
|
||||
int RGWDataAccess::Bucket::finish_init()
|
||||
{
|
||||
auto iter = attrs.find(RGW_ATTR_ACL);
|
||||
if (iter == attrs.end()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bufferlist::const_iterator bliter = iter->second.begin();
|
||||
try {
|
||||
policy.decode(bliter);
|
||||
} catch (buffer::error& err) {
|
||||
return -EIO;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWDataAccess::Bucket::init()
|
||||
{
|
||||
int ret = sd->store->get_bucket_info(*sd->obj_ctx,
|
||||
tenant, name,
|
||||
bucket_info,
|
||||
&mtime,
|
||||
&attrs);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
return finish_init();
|
||||
}
|
||||
|
||||
int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
|
||||
const map<string, bufferlist>& _attrs)
|
||||
{
|
||||
bucket_info = _bucket_info;
|
||||
attrs = _attrs;
|
||||
|
||||
return finish_init();
|
||||
}
|
||||
|
||||
int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
|
||||
ObjectRef *obj) {
|
||||
obj->reset(new Object(sd, self_ref, key));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWDataAccess::Object::put(bufferlist& data,
|
||||
map<string, bufferlist>& attrs)
|
||||
{
|
||||
RGWRados *store = sd->store;
|
||||
CephContext *cct = store->ctx();
|
||||
|
||||
string tag;
|
||||
append_rand_alpha(cct, tag, tag, 32);
|
||||
|
||||
RGWBucketInfo& bucket_info = bucket->bucket_info;
|
||||
|
||||
RGWPutObjProcessor_Atomic processor(*sd->obj_ctx,
|
||||
bucket_info,
|
||||
bucket_info.bucket,
|
||||
key.name,
|
||||
cct->_conf->rgw_obj_stripe_size, tag,
|
||||
bucket_info.versioning_enabled());
|
||||
if (key.instance.empty()) {
|
||||
processor.set_version_id(key.instance);
|
||||
}
|
||||
|
||||
if (olh_epoch) {
|
||||
processor.set_olh_epoch(*olh_epoch);
|
||||
}
|
||||
int ret = processor.prepare(store, NULL);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
off_t ofs = 0;
|
||||
auto obj_size = data.length();
|
||||
|
||||
RGWMD5Etag etag_calc;
|
||||
|
||||
do {
|
||||
size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
|
||||
|
||||
bufferlist bl;
|
||||
|
||||
data.splice(0, read_len, &bl);
|
||||
etag_calc.update(bl);
|
||||
|
||||
bool again;
|
||||
|
||||
do {
|
||||
void *handle;
|
||||
rgw_raw_obj obj;
|
||||
|
||||
ret = processor.handle_data(bl, ofs, &handle, &obj, &again);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
ret = processor.throttle_data(handle, obj, read_len, false);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
} while (again);
|
||||
|
||||
ofs += read_len;
|
||||
} while (data.length() > 0);
|
||||
|
||||
bool has_etag_attr = false;
|
||||
auto iter = attrs.find(RGW_ATTR_ETAG);
|
||||
if (iter != attrs.end()) {
|
||||
bufferlist& bl = iter->second;
|
||||
etag = bl.to_str();
|
||||
has_etag_attr = true;
|
||||
}
|
||||
|
||||
if (!aclbl) {
|
||||
RGWAccessControlPolicy_S3 policy(cct);
|
||||
|
||||
policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
|
||||
|
||||
policy.encode(aclbl.emplace());
|
||||
}
|
||||
|
||||
if (etag.empty()) {
|
||||
etag_calc.finish(&etag);
|
||||
}
|
||||
|
||||
if (!has_etag_attr) {
|
||||
bufferlist etagbl;
|
||||
etagbl.append(etag);
|
||||
attrs[RGW_ATTR_ETAG] = etagbl;
|
||||
}
|
||||
attrs[RGW_ATTR_ACL] = *aclbl;
|
||||
|
||||
return processor.complete(obj_size, etag,
|
||||
&mtime, mtime,
|
||||
attrs, delete_at);
|
||||
}
|
||||
|
||||
void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
|
||||
{
|
||||
policy.encode(aclbl.emplace());
|
||||
int rgw_tools_init(CephContext *cct)
|
||||
{
|
||||
ext_mime_map = new std::map<std::string, std::string>;
|
||||
|
@ -45,4 +45,150 @@ int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
|
||||
int rgw_tools_init(CephContext *cct);
|
||||
void rgw_tools_cleanup();
|
||||
|
||||
template<class H, size_t S>
|
||||
class RGWEtag
|
||||
{
|
||||
H hash;
|
||||
|
||||
public:
|
||||
RGWEtag() {}
|
||||
|
||||
void update(const char *buf, size_t len) {
|
||||
hash.Update((const unsigned char *)buf, len);
|
||||
}
|
||||
|
||||
void update(bufferlist& bl) {
|
||||
if (bl.length() > 0) {
|
||||
update(bl.c_str(), bl.length());
|
||||
}
|
||||
}
|
||||
|
||||
void update(const string& s) {
|
||||
if (!s.empty()) {
|
||||
update(s.c_str(), s.size());
|
||||
}
|
||||
}
|
||||
void finish(string *etag) {
|
||||
char etag_buf[S];
|
||||
char etag_buf_str[S * 2 + 16];
|
||||
|
||||
hash.Final((unsigned char *)etag_buf);
|
||||
buf_to_hex((const unsigned char *)etag_buf, S,
|
||||
etag_buf_str);
|
||||
|
||||
*etag = etag_buf_str;
|
||||
}
|
||||
};
|
||||
|
||||
using RGWMD5Etag = RGWEtag<MD5, CEPH_CRYPTO_MD5_DIGESTSIZE>;
|
||||
|
||||
class RGWDataAccess
|
||||
{
|
||||
RGWRados *store;
|
||||
std::unique_ptr<RGWObjectCtx> obj_ctx;
|
||||
|
||||
public:
|
||||
RGWDataAccess(RGWRados *_store);
|
||||
|
||||
class Object;
|
||||
class Bucket;
|
||||
|
||||
using BucketRef = std::shared_ptr<Bucket>;
|
||||
using ObjectRef = std::shared_ptr<Object>;
|
||||
|
||||
class Bucket {
|
||||
friend class RGWDataAccess;
|
||||
friend class Object;
|
||||
|
||||
std::shared_ptr<Bucket> self_ref;
|
||||
|
||||
RGWDataAccess *sd{nullptr};
|
||||
RGWBucketInfo bucket_info;
|
||||
string tenant;
|
||||
string name;
|
||||
string bucket_id;
|
||||
ceph::real_time mtime;
|
||||
map<std::string, bufferlist> attrs;
|
||||
|
||||
RGWAccessControlPolicy policy;
|
||||
int finish_init();
|
||||
|
||||
Bucket(RGWDataAccess *_sd,
|
||||
const string& _tenant,
|
||||
const string& _name,
|
||||
const string& _bucket_id) : sd(_sd),
|
||||
tenant(_tenant),
|
||||
name(_name),
|
||||
bucket_id(_bucket_id) {}
|
||||
Bucket(RGWDataAccess *_sd) : sd(_sd) {}
|
||||
int init();
|
||||
int init(const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _attrs);
|
||||
public:
|
||||
int get_object(const rgw_obj_key& key,
|
||||
ObjectRef *obj);
|
||||
|
||||
};
|
||||
|
||||
|
||||
class Object {
|
||||
RGWDataAccess *sd{nullptr};
|
||||
BucketRef bucket;
|
||||
rgw_obj_key key;
|
||||
|
||||
ceph::real_time mtime;
|
||||
string etag;
|
||||
std::optional<uint64_t> olh_epoch;
|
||||
ceph::real_time delete_at;
|
||||
|
||||
std::optional<bufferlist> aclbl;
|
||||
|
||||
Object(RGWDataAccess *_sd,
|
||||
BucketRef& _bucket,
|
||||
const rgw_obj_key& _key) : sd(_sd),
|
||||
bucket(_bucket),
|
||||
key(_key) {}
|
||||
public:
|
||||
int put(bufferlist& data, map<string, bufferlist>& attrs); /* might modify attrs */
|
||||
|
||||
void set_mtime(const ceph::real_time& _mtime) {
|
||||
mtime = _mtime;
|
||||
}
|
||||
|
||||
void set_etag(const string& _etag) {
|
||||
etag = _etag;
|
||||
}
|
||||
|
||||
void set_olh_epoch(uint64_t epoch) {
|
||||
olh_epoch = epoch;
|
||||
}
|
||||
|
||||
void set_delete_at(ceph::real_time _delete_at) {
|
||||
delete_at = _delete_at;
|
||||
}
|
||||
|
||||
void set_policy(const RGWAccessControlPolicy& policy);
|
||||
|
||||
friend class Bucket;
|
||||
};
|
||||
|
||||
int get_bucket(const string& tenant,
|
||||
const string name,
|
||||
const string bucket_id,
|
||||
BucketRef *bucket) {
|
||||
bucket->reset(new Bucket(this, tenant, name, bucket_id));
|
||||
(*bucket)->self_ref = *bucket;
|
||||
return (*bucket)->init();
|
||||
}
|
||||
|
||||
int get_bucket(const RGWBucketInfo& bucket_info,
|
||||
const map<string, bufferlist>& attrs,
|
||||
BucketRef *bucket) {
|
||||
bucket->reset(new Bucket(this));
|
||||
(*bucket)->self_ref = *bucket;
|
||||
return (*bucket)->init(bucket_info, attrs);
|
||||
}
|
||||
friend class Bucket;
|
||||
friend class Object;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user