mirror of
https://github.com/ceph/ceph
synced 2025-01-03 17:42:36 +00:00
Merge pull request #12605 from cbodley/wip-18300
rgw: RGWMetaSyncShardCR drops stack refs on destruction Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
commit
afa6cbf790
@ -373,8 +373,6 @@ bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* re
|
|||||||
return collect(NULL, ret, skip_stack);
|
return collect(NULL, ret, skip_stack);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
|
|
||||||
|
|
||||||
static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
|
static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
|
||||||
{
|
{
|
||||||
((RGWAioCompletionNotifier *)arg)->cb();
|
((RGWAioCompletionNotifier *)arg)->cb();
|
||||||
|
@ -179,72 +179,35 @@ int RGWMetadataLog::get_info(int shard_id, RGWMetadataLogInfo *info)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _mdlog_info_completion(librados::completion_t cb, void *arg);
|
|
||||||
|
|
||||||
class RGWMetadataLogInfoCompletion : public RefCountedObject {
|
|
||||||
RGWMetadataLogInfo *pinfo;
|
|
||||||
RGWCompletionManager *completion_manager;
|
|
||||||
void *user_info;
|
|
||||||
int *pret;
|
|
||||||
cls_log_header header;
|
|
||||||
librados::IoCtx io_ctx;
|
|
||||||
librados::AioCompletion *completion;
|
|
||||||
|
|
||||||
public:
|
|
||||||
RGWMetadataLogInfoCompletion(RGWMetadataLogInfo *_pinfo, RGWCompletionManager *_cm, void *_uinfo, int *_pret) :
|
|
||||||
pinfo(_pinfo), completion_manager(_cm), user_info(_uinfo), pret(_pret) {
|
|
||||||
completion = librados::Rados::aio_create_completion((void *)this, NULL,
|
|
||||||
_mdlog_info_completion);
|
|
||||||
}
|
|
||||||
|
|
||||||
~RGWMetadataLogInfoCompletion() {
|
|
||||||
completion->release();
|
|
||||||
}
|
|
||||||
|
|
||||||
void finish(librados::completion_t cb) {
|
|
||||||
*pret = completion->get_return_value();
|
|
||||||
if (*pret >= 0) {
|
|
||||||
pinfo->marker = header.max_marker;
|
|
||||||
pinfo->last_update = header.max_time.to_real_time();
|
|
||||||
}
|
|
||||||
completion_manager->complete(NULL, user_info);
|
|
||||||
put();
|
|
||||||
}
|
|
||||||
|
|
||||||
librados::IoCtx& get_io_ctx() { return io_ctx; }
|
|
||||||
|
|
||||||
cls_log_header *get_header() {
|
|
||||||
return &header;
|
|
||||||
}
|
|
||||||
|
|
||||||
librados::AioCompletion *get_completion() {
|
|
||||||
return completion;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
static void _mdlog_info_completion(librados::completion_t cb, void *arg)
|
static void _mdlog_info_completion(librados::completion_t cb, void *arg)
|
||||||
{
|
{
|
||||||
RGWMetadataLogInfoCompletion *infoc = (RGWMetadataLogInfoCompletion *)arg;
|
auto infoc = static_cast<RGWMetadataLogInfoCompletion *>(arg);
|
||||||
infoc->finish(cb);
|
infoc->finish(cb);
|
||||||
|
infoc->put(); // drop the ref from get_info_async()
|
||||||
}
|
}
|
||||||
|
|
||||||
int RGWMetadataLog::get_info_async(int shard_id, RGWMetadataLogInfo *info, RGWCompletionManager *completion_manager, void *user_info, int *pret)
|
RGWMetadataLogInfoCompletion::RGWMetadataLogInfoCompletion(info_callback_t cb)
|
||||||
|
: completion(librados::Rados::aio_create_completion((void *)this, nullptr,
|
||||||
|
_mdlog_info_completion)),
|
||||||
|
callback(cb)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
RGWMetadataLogInfoCompletion::~RGWMetadataLogInfoCompletion()
|
||||||
|
{
|
||||||
|
completion->release();
|
||||||
|
}
|
||||||
|
|
||||||
|
int RGWMetadataLog::get_info_async(int shard_id, RGWMetadataLogInfoCompletion *completion)
|
||||||
{
|
{
|
||||||
string oid;
|
string oid;
|
||||||
get_shard_oid(shard_id, oid);
|
get_shard_oid(shard_id, oid);
|
||||||
|
|
||||||
RGWMetadataLogInfoCompletion *req_completion = new RGWMetadataLogInfoCompletion(info, completion_manager, user_info, pret);
|
completion->get(); // hold a ref until the completion fires
|
||||||
|
|
||||||
req_completion->get();
|
return store->time_log_info_async(completion->get_io_ctx(), oid,
|
||||||
|
&completion->get_header(),
|
||||||
int ret = store->time_log_info_async(req_completion->get_io_ctx(), oid, req_completion->get_header(), req_completion->get_completion());
|
completion->get_completion());
|
||||||
if (ret < 0) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
req_completion->put();
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int RGWMetadataLog::trim(int shard_id, const real_time& from_time, const real_time& end_time,
|
int RGWMetadataLog::trim(int shard_id, const real_time& from_time, const real_time& end_time,
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#define CEPH_RGW_METADATA_H
|
#define CEPH_RGW_METADATA_H
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <boost/optional.hpp>
|
||||||
|
|
||||||
#include "include/types.h"
|
#include "include/types.h"
|
||||||
#include "rgw_common.h"
|
#include "rgw_common.h"
|
||||||
@ -12,6 +13,7 @@
|
|||||||
#include "cls/version/cls_version_types.h"
|
#include "cls/version/cls_version_types.h"
|
||||||
#include "cls/log/cls_log_types.h"
|
#include "cls/log/cls_log_types.h"
|
||||||
#include "common/RWLock.h"
|
#include "common/RWLock.h"
|
||||||
|
#include "common/RefCountedObj.h"
|
||||||
#include "common/ceph_time.h"
|
#include "common/ceph_time.h"
|
||||||
|
|
||||||
|
|
||||||
@ -140,6 +142,35 @@ struct RGWMetadataLogInfo {
|
|||||||
|
|
||||||
class RGWCompletionManager;
|
class RGWCompletionManager;
|
||||||
|
|
||||||
|
class RGWMetadataLogInfoCompletion : public RefCountedObject {
|
||||||
|
public:
|
||||||
|
using info_callback_t = std::function<void(int, const cls_log_header&)>;
|
||||||
|
private:
|
||||||
|
cls_log_header header;
|
||||||
|
librados::IoCtx io_ctx;
|
||||||
|
librados::AioCompletion *completion;
|
||||||
|
std::mutex mutex; //< protects callback between cancel/complete
|
||||||
|
boost::optional<info_callback_t> callback; //< cleared on cancel
|
||||||
|
public:
|
||||||
|
RGWMetadataLogInfoCompletion(info_callback_t callback);
|
||||||
|
virtual ~RGWMetadataLogInfoCompletion();
|
||||||
|
|
||||||
|
librados::IoCtx& get_io_ctx() { return io_ctx; }
|
||||||
|
cls_log_header& get_header() { return header; }
|
||||||
|
librados::AioCompletion* get_completion() { return completion; }
|
||||||
|
|
||||||
|
void finish(librados::completion_t cb) {
|
||||||
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
|
if (callback) {
|
||||||
|
(*callback)(completion->get_return_value(), header);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void cancel() {
|
||||||
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
|
callback = boost::none;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
class RGWMetadataLog {
|
class RGWMetadataLog {
|
||||||
CephContext *cct;
|
CephContext *cct;
|
||||||
RGWRados *store;
|
RGWRados *store;
|
||||||
@ -193,7 +224,7 @@ public:
|
|||||||
|
|
||||||
int trim(int shard_id, const real_time& from_time, const real_time& end_time, const string& start_marker, const string& end_marker);
|
int trim(int shard_id, const real_time& from_time, const real_time& end_time, const string& start_marker, const string& end_marker);
|
||||||
int get_info(int shard_id, RGWMetadataLogInfo *info);
|
int get_info(int shard_id, RGWMetadataLogInfo *info);
|
||||||
int get_info_async(int shard_id, RGWMetadataLogInfo *info, RGWCompletionManager *completion_manager, void *user_info, int *pret);
|
int get_info_async(int shard_id, RGWMetadataLogInfoCompletion *completion);
|
||||||
int lock_exclusive(int shard_id, timespan duration, string&zone_id, string& owner_id);
|
int lock_exclusive(int shard_id, timespan duration, string&zone_id, string& owner_id);
|
||||||
int unlock(int shard_id, string& zone_id, string& owner_id);
|
int unlock(int shard_id, string& zone_id, string& owner_id);
|
||||||
|
|
||||||
|
@ -1198,8 +1198,8 @@ class RGWCloneMetaLogCoroutine : public RGWCoroutine {
|
|||||||
int max_entries = CLONE_MAX_ENTRIES;
|
int max_entries = CLONE_MAX_ENTRIES;
|
||||||
|
|
||||||
RGWRESTReadResource *http_op = nullptr;
|
RGWRESTReadResource *http_op = nullptr;
|
||||||
|
boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
|
||||||
|
|
||||||
int req_ret = 0;
|
|
||||||
RGWMetadataLogInfo shard_info;
|
RGWMetadataLogInfo shard_info;
|
||||||
rgw_mdlog_shard_data data;
|
rgw_mdlog_shard_data data;
|
||||||
|
|
||||||
@ -1217,6 +1217,9 @@ public:
|
|||||||
if (http_op) {
|
if (http_op) {
|
||||||
http_op->put();
|
http_op->put();
|
||||||
}
|
}
|
||||||
|
if (completion) {
|
||||||
|
completion->cancel();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int operate();
|
int operate();
|
||||||
@ -1269,7 +1272,9 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
|
|||||||
|
|
||||||
bool *reset_backoff;
|
bool *reset_backoff;
|
||||||
|
|
||||||
map<RGWCoroutinesStack *, string> stack_to_pos;
|
// hold a reference to the cr stack while it's in the map
|
||||||
|
using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
|
||||||
|
map<StackRef, string> stack_to_pos;
|
||||||
map<string, string> pos_to_prev;
|
map<string, string> pos_to_prev;
|
||||||
|
|
||||||
bool can_adjust_marker = false;
|
bool can_adjust_marker = false;
|
||||||
@ -1331,7 +1336,7 @@ public:
|
|||||||
int child_ret;
|
int child_ret;
|
||||||
RGWCoroutinesStack *child;
|
RGWCoroutinesStack *child;
|
||||||
while (collect_next(&child_ret, &child)) {
|
while (collect_next(&child_ret, &child)) {
|
||||||
map<RGWCoroutinesStack *, string>::iterator iter = stack_to_pos.find(child);
|
auto iter = stack_to_pos.find(child);
|
||||||
if (iter == stack_to_pos.end()) {
|
if (iter == stack_to_pos.end()) {
|
||||||
/* some other stack that we don't care about */
|
/* some other stack that we don't care about */
|
||||||
continue;
|
continue;
|
||||||
@ -1371,8 +1376,6 @@ public:
|
|||||||
|
|
||||||
ldout(sync_env->cct, 0) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
|
ldout(sync_env->cct, 0) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
|
||||||
stack_to_pos.erase(iter);
|
stack_to_pos.erase(iter);
|
||||||
|
|
||||||
child->put();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1443,8 +1446,7 @@ public:
|
|||||||
// fetch remote and write locally
|
// fetch remote and write locally
|
||||||
yield {
|
yield {
|
||||||
RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, MDLOG_STATUS_COMPLETE, marker_tracker), false);
|
RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, MDLOG_STATUS_COMPLETE, marker_tracker), false);
|
||||||
stack->get();
|
// stack_to_pos holds a reference to the stack
|
||||||
|
|
||||||
stack_to_pos[stack] = iter->first;
|
stack_to_pos[stack] = iter->first;
|
||||||
pos_to_prev[iter->first] = marker;
|
pos_to_prev[iter->first] = marker;
|
||||||
}
|
}
|
||||||
@ -1592,8 +1594,7 @@ public:
|
|||||||
yield {
|
yield {
|
||||||
RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker), false);
|
RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker), false);
|
||||||
assert(stack);
|
assert(stack);
|
||||||
stack->get();
|
// stack_to_pos holds a reference to the stack
|
||||||
|
|
||||||
stack_to_pos[stack] = log_iter->id;
|
stack_to_pos[stack] = log_iter->id;
|
||||||
pos_to_prev[log_iter->id] = marker;
|
pos_to_prev[log_iter->id] = marker;
|
||||||
}
|
}
|
||||||
@ -2064,7 +2065,22 @@ int RGWCloneMetaLogCoroutine::state_init()
|
|||||||
|
|
||||||
int RGWCloneMetaLogCoroutine::state_read_shard_status()
|
int RGWCloneMetaLogCoroutine::state_read_shard_status()
|
||||||
{
|
{
|
||||||
int ret = mdlog->get_info_async(shard_id, &shard_info, stack->get_completion_mgr(), (void *)stack, &req_ret);
|
const bool add_ref = false; // default constructs with refs=1
|
||||||
|
|
||||||
|
completion.reset(new RGWMetadataLogInfoCompletion(
|
||||||
|
[this](int ret, const cls_log_header& header) {
|
||||||
|
if (ret < 0) {
|
||||||
|
ldout(cct, 1) << "ERROR: failed to read mdlog info with "
|
||||||
|
<< cpp_strerror(ret) << dendl;
|
||||||
|
} else {
|
||||||
|
shard_info.marker = header.max_marker;
|
||||||
|
shard_info.last_update = header.max_time.to_real_time();
|
||||||
|
}
|
||||||
|
// wake up parent stack
|
||||||
|
stack->get_completion_mgr()->complete(nullptr, stack);
|
||||||
|
}), add_ref);
|
||||||
|
|
||||||
|
int ret = mdlog->get_info_async(shard_id, completion.get());
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
ldout(cct, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
|
ldout(cct, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
|
||||||
return set_cr_error(ret);
|
return set_cr_error(ret);
|
||||||
@ -2075,6 +2091,8 @@ int RGWCloneMetaLogCoroutine::state_read_shard_status()
|
|||||||
|
|
||||||
int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
|
int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
|
||||||
{
|
{
|
||||||
|
completion.reset();
|
||||||
|
|
||||||
ldout(cct, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
|
ldout(cct, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
|
||||||
|
|
||||||
marker = shard_info.marker;
|
marker = shard_info.marker;
|
||||||
|
Loading…
Reference in New Issue
Block a user