mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
rgw: generalize container type for concurrent IO base class
Turned the ConcurrentIO class a template, so that we could use different kind of containers that are needed for the different operations. Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
parent
04441f2878
commit
30d0a49c84
@ -120,54 +120,22 @@ static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
|
||||
return r;
|
||||
}
|
||||
|
||||
int CLSRGWConcurrentIO::operator()() {
|
||||
int ret = 0;
|
||||
vector<string>::const_iterator iter = bucket_objs.begin();
|
||||
BucketIndexAioManager manager;
|
||||
for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
|
||||
ret = issue_op(*iter);
|
||||
if (ret < 0)
|
||||
break;
|
||||
}
|
||||
|
||||
int num_completions, r = 0;
|
||||
while (manager.wait_for_completions(0, &num_completions, &r)) {
|
||||
if (r >= 0 && ret >= 0) {
|
||||
for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
|
||||
int issue_ret = issue_op(*iter);
|
||||
if(issue_ret < 0) {
|
||||
ret = issue_ret;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else if (ret >= 0) {
|
||||
ret = r;
|
||||
}
|
||||
}
|
||||
|
||||
if (ret < 0) {
|
||||
cleanup();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int CLSRGWIssueBucketIndexInit::issue_op(const string& obj)
|
||||
int CLSRGWIssueBucketIndexInit::issue_op()
|
||||
{
|
||||
issued_objs.push_back(obj);
|
||||
return issue_bucket_index_init_op(io_ctx, obj, &manager);
|
||||
return issue_bucket_index_init_op(io_ctx, *iter, &manager);
|
||||
}
|
||||
|
||||
void CLSRGWIssueBucketIndexInit::cleanup()
|
||||
{
|
||||
// Do best effort removal
|
||||
for (vector<string>::iterator iter = issued_objs.begin(); iter != issued_objs.end(); ++iter) {
|
||||
io_ctx.remove(*iter);
|
||||
for (vector<string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
|
||||
io_ctx.remove(*citer);
|
||||
}
|
||||
}
|
||||
|
||||
int CLSRGWIssueSetTagTimeout::issue_op(const string& obj)
|
||||
int CLSRGWIssueSetTagTimeout::issue_op()
|
||||
{
|
||||
return issue_bucket_set_tag_timeout_op(io_ctx, obj, tag_timeout, &manager);
|
||||
return issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
|
||||
}
|
||||
|
||||
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
|
||||
|
@ -111,30 +111,59 @@ public:
|
||||
/* bucket index */
|
||||
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
|
||||
|
||||
template<class T>
|
||||
class CLSRGWConcurrentIO {
|
||||
protected:
|
||||
librados::IoCtx& io_ctx;
|
||||
vector<string>& bucket_objs;
|
||||
T& objs_container;
|
||||
typename T::const_iterator iter;
|
||||
uint32_t max_aio;
|
||||
BucketIndexAioManager manager;
|
||||
|
||||
virtual int issue_op(const string& obj) = 0;
|
||||
virtual int issue_op() = 0;
|
||||
|
||||
virtual void cleanup() {}
|
||||
virtual int valid_ret_code() { return 0; }
|
||||
|
||||
public:
|
||||
CLSRGWConcurrentIO(librados::IoCtx& ioc, vector<string>& _bucket_objs,
|
||||
uint32_t _max_aio) : io_ctx(ioc), bucket_objs(_bucket_objs), max_aio(_max_aio) {}
|
||||
CLSRGWConcurrentIO(librados::IoCtx& ioc, vector<string>& _objs_container,
|
||||
uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
|
||||
virtual ~CLSRGWConcurrentIO() {}
|
||||
|
||||
int operator()();
|
||||
int operator()() {
|
||||
int ret = 0;
|
||||
iter = objs_container.begin();
|
||||
for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
|
||||
ret = issue_op();
|
||||
if (ret < 0)
|
||||
break;
|
||||
}
|
||||
|
||||
int num_completions, r = 0;
|
||||
while (manager.wait_for_completions(0, &num_completions, &r)) {
|
||||
if (r >= 0 && ret >= 0) {
|
||||
for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
|
||||
int issue_ret = issue_op();
|
||||
if(issue_ret < 0) {
|
||||
ret = issue_ret;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else if (ret >= 0) {
|
||||
ret = r;
|
||||
}
|
||||
}
|
||||
|
||||
if (ret < 0) {
|
||||
cleanup();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
|
||||
vector<string> issued_objs;
|
||||
class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO<vector<string> > {
|
||||
protected:
|
||||
int issue_op(const string& obj);
|
||||
int issue_op();
|
||||
int valid_ret_code() { return -EEXIST; }
|
||||
void cleanup();
|
||||
public:
|
||||
@ -142,10 +171,10 @@ public:
|
||||
uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
|
||||
};
|
||||
|
||||
class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
|
||||
class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO<vector<string> > {
|
||||
uint64_t tag_timeout;
|
||||
protected:
|
||||
int issue_op(const string& obj);
|
||||
int issue_op();
|
||||
public:
|
||||
CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector<string>& _bucket_objs,
|
||||
uint32_t _max_aio, uint64_t _tag_timeout) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio),
|
||||
|
Loading…
Reference in New Issue
Block a user