mirror of
https://github.com/ceph/ceph
synced 2025-01-20 18:21:57 +00:00
Merge pull request #13111 from cbodley/wip-rgw-mdlog-trim
rgw multisite: automated mdlog trimming Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
commit
404cee744f
@ -421,6 +421,7 @@ enum {
|
||||
OPT_METADATA_SYNC_INIT,
|
||||
OPT_METADATA_SYNC_RUN,
|
||||
OPT_MDLOG_LIST,
|
||||
OPT_MDLOG_AUTOTRIM,
|
||||
OPT_MDLOG_TRIM,
|
||||
OPT_MDLOG_FETCH,
|
||||
OPT_MDLOG_STATUS,
|
||||
@ -819,6 +820,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
|
||||
} else if (strcmp(prev_cmd, "mdlog") == 0) {
|
||||
if (strcmp(cmd, "list") == 0)
|
||||
return OPT_MDLOG_LIST;
|
||||
if (strcmp(cmd, "autotrim") == 0)
|
||||
return OPT_MDLOG_AUTOTRIM;
|
||||
if (strcmp(cmd, "trim") == 0)
|
||||
return OPT_MDLOG_TRIM;
|
||||
if (strcmp(cmd, "fetch") == 0)
|
||||
@ -6190,6 +6193,26 @@ next:
|
||||
formatter->flush(cout);
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT_MDLOG_AUTOTRIM) {
|
||||
// need a full history for purging old mdlog periods
|
||||
store->meta_mgr->init_oldest_log_period();
|
||||
|
||||
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
|
||||
RGWHTTPManager http(store->ctx(), crs.get_completion_mgr());
|
||||
int ret = http.set_threaded();
|
||||
if (ret < 0) {
|
||||
cerr << "failed to initialize http client with " << cpp_strerror(ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
|
||||
auto num_shards = g_conf->rgw_md_log_max_shards;
|
||||
ret = crs.run(create_admin_meta_log_trim_cr(store, &http, num_shards));
|
||||
if (ret < 0) {
|
||||
cerr << "automated mdlog trim failed with " << cpp_strerror(ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT_MDLOG_TRIM) {
|
||||
utime_t start_time, end_time;
|
||||
|
||||
|
@ -231,7 +231,8 @@ public:
|
||||
bufferlist& data,
|
||||
RGWObjVersionTracker *objv_tracker,
|
||||
real_time set_mtime) override;
|
||||
int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive) override;
|
||||
int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive,
|
||||
RGWObjVersionTracker *objv_tracker = nullptr) override;
|
||||
|
||||
int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
|
||||
RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
|
||||
@ -422,7 +423,8 @@ int RGWCache<T>::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time
|
||||
}
|
||||
|
||||
template <class T>
|
||||
int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive)
|
||||
int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive,
|
||||
RGWObjVersionTracker *objv_tracker)
|
||||
{
|
||||
rgw_pool pool;
|
||||
string oid;
|
||||
@ -436,7 +438,11 @@ int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& da
|
||||
info.status = 0;
|
||||
info.flags = CACHE_FLAG_DATA;
|
||||
}
|
||||
int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive);
|
||||
if (objv_tracker) {
|
||||
info.version = objv_tracker->write_version;
|
||||
info.flags |= CACHE_FLAG_OBJV;
|
||||
}
|
||||
int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker);
|
||||
if (cacheable) {
|
||||
string name = normal_name(pool, oid);
|
||||
if (ret >= 0) {
|
||||
|
@ -116,14 +116,14 @@ int RGWSimpleRadosReadAttrsCR::request_complete()
|
||||
|
||||
int RGWAsyncPutSystemObj::_send_request()
|
||||
{
|
||||
return store->put_system_obj_data(NULL, obj, bl, -1, exclusive);
|
||||
return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, objv_tracker);
|
||||
}
|
||||
|
||||
RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
|
||||
const rgw_raw_obj& _obj, bool _exclusive,
|
||||
bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
|
||||
obj(_obj), exclusive(_exclusive),
|
||||
bl(_bl)
|
||||
RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
|
||||
bool _exclusive, bufferlist& _bl)
|
||||
: RGWAsyncRadosRequest(caller, cn), store(_store), objv_tracker(_objv_tracker),
|
||||
obj(_obj), exclusive(_exclusive), bl(_bl)
|
||||
{
|
||||
}
|
||||
|
||||
@ -315,6 +315,40 @@ int RGWRadosRemoveOmapKeysCR::send_request() {
|
||||
return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
|
||||
}
|
||||
|
||||
RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
|
||||
: RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
|
||||
{
|
||||
set_description() << "remove dest=" << obj;
|
||||
}
|
||||
|
||||
int RGWRadosRemoveCR::send_request()
|
||||
{
|
||||
auto rados = store->get_rados_handle();
|
||||
int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
|
||||
if (r < 0) {
|
||||
lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
|
||||
return r;
|
||||
}
|
||||
ioctx.locator_set_key(obj.loc);
|
||||
|
||||
set_status() << "send request";
|
||||
|
||||
librados::ObjectWriteOperation op;
|
||||
op.remove();
|
||||
|
||||
cn = stack->create_completion_notifier();
|
||||
return ioctx.aio_operate(obj.oid, cn->completion(), &op);
|
||||
}
|
||||
|
||||
int RGWRadosRemoveCR::request_complete()
|
||||
{
|
||||
int r = cn->completion()->get_return_value();
|
||||
|
||||
set_status() << "request complete; ret=" << r;
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
|
||||
const rgw_raw_obj& _obj,
|
||||
const string& _lock_name,
|
||||
@ -722,6 +756,29 @@ int RGWRadosTimelogTrimCR::request_complete()
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
|
||||
const std::string& to_marker,
|
||||
std::string *last_trim_marker)
|
||||
: RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
|
||||
std::string{}, to_marker),
|
||||
cct(store->ctx()), last_trim_marker(last_trim_marker)
|
||||
{
|
||||
}
|
||||
|
||||
int RGWSyncLogTrimCR::request_complete()
|
||||
{
|
||||
int r = RGWRadosTimelogTrimCR::request_complete();
|
||||
if (r < 0 && r != -ENODATA) {
|
||||
return r;
|
||||
}
|
||||
if (*last_trim_marker < to_marker) {
|
||||
*last_trim_marker = to_marker;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int RGWAsyncStatObj::_send_request()
|
||||
{
|
||||
rgw_raw_obj raw_obj;
|
||||
|
@ -1,6 +1,8 @@
|
||||
#ifndef CEPH_RGW_CR_RADOS_H
|
||||
#define CEPH_RGW_CR_RADOS_H
|
||||
|
||||
#include <boost/intrusive_ptr.hpp>
|
||||
#include "include/assert.h"
|
||||
#include "rgw_coroutine.h"
|
||||
#include "rgw_rados.h"
|
||||
#include "common/WorkQueue.h"
|
||||
@ -119,6 +121,7 @@ public:
|
||||
|
||||
class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
|
||||
RGWRados *store;
|
||||
RGWObjVersionTracker *objv_tracker;
|
||||
rgw_raw_obj obj;
|
||||
bool exclusive;
|
||||
bufferlist bl;
|
||||
@ -127,8 +130,8 @@ protected:
|
||||
int _send_request() override;
|
||||
public:
|
||||
RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
|
||||
const rgw_raw_obj& _obj, bool _exclusive,
|
||||
bufferlist& _bl);
|
||||
RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
|
||||
bool _exclusive, bufferlist& _bl);
|
||||
};
|
||||
|
||||
class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
|
||||
@ -189,16 +192,18 @@ class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
|
||||
T *result;
|
||||
/// on ENOENT, call handle_data() with an empty object instead of failing
|
||||
const bool empty_on_enoent;
|
||||
RGWObjVersionTracker *objv_tracker;
|
||||
|
||||
RGWAsyncGetSystemObj *req{nullptr};
|
||||
|
||||
public:
|
||||
RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
|
||||
const rgw_raw_obj& _obj,
|
||||
T *_result, bool empty_on_enoent = true)
|
||||
T *_result, bool empty_on_enoent = true,
|
||||
RGWObjVersionTracker *objv_tracker = nullptr)
|
||||
: RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
|
||||
obj_ctx(store), obj(_obj), result(_result),
|
||||
empty_on_enoent(empty_on_enoent) {}
|
||||
empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
|
||||
~RGWSimpleRadosReadCR() override {
|
||||
request_cleanup();
|
||||
}
|
||||
@ -222,7 +227,7 @@ template <class T>
|
||||
int RGWSimpleRadosReadCR<T>::send_request()
|
||||
{
|
||||
req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
|
||||
store, &obj_ctx, NULL,
|
||||
store, &obj_ctx, objv_tracker,
|
||||
obj,
|
||||
&bl, 0, -1);
|
||||
if (pattrs) {
|
||||
@ -305,17 +310,16 @@ class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
|
||||
bufferlist bl;
|
||||
|
||||
rgw_raw_obj obj;
|
||||
RGWObjVersionTracker *objv_tracker;
|
||||
|
||||
RGWAsyncPutSystemObj *req;
|
||||
RGWAsyncPutSystemObj *req{nullptr};
|
||||
|
||||
public:
|
||||
RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
|
||||
const rgw_raw_obj& _obj,
|
||||
const T& _data) : RGWSimpleCoroutine(_store->ctx()),
|
||||
async_rados(_async_rados),
|
||||
store(_store),
|
||||
obj(_obj),
|
||||
req(NULL) {
|
||||
const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
|
||||
: RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
|
||||
store(_store), obj(_obj), objv_tracker(objv_tracker) {
|
||||
::encode(_data, bl);
|
||||
}
|
||||
|
||||
@ -332,7 +336,7 @@ public:
|
||||
|
||||
int send_request() override {
|
||||
req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
|
||||
store, obj, false, bl);
|
||||
store, objv_tracker, obj, false, bl);
|
||||
async_rados->queue(req);
|
||||
return 0;
|
||||
}
|
||||
@ -464,6 +468,19 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class RGWRadosRemoveCR : public RGWSimpleCoroutine {
|
||||
RGWRados *store;
|
||||
librados::IoCtx ioctx;
|
||||
const rgw_raw_obj obj;
|
||||
boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
|
||||
|
||||
public:
|
||||
RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
|
||||
|
||||
int send_request();
|
||||
int request_complete();
|
||||
};
|
||||
|
||||
class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
|
||||
RGWAsyncRadosProcessor *async_rados;
|
||||
RGWRados *store;
|
||||
@ -1093,6 +1110,16 @@ class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
|
||||
int request_complete() override;
|
||||
};
|
||||
|
||||
// wrapper to update last_trim_marker on success
|
||||
class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
|
||||
CephContext *cct;
|
||||
std::string *last_trim_marker;
|
||||
public:
|
||||
RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
|
||||
const std::string& to_marker, std::string *last_trim_marker);
|
||||
int request_complete() override;
|
||||
};
|
||||
|
||||
class RGWAsyncStatObj : public RGWAsyncRadosRequest {
|
||||
RGWRados *store;
|
||||
RGWBucketInfo bucket_info;
|
||||
|
@ -2891,6 +2891,7 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
|
||||
}
|
||||
|
||||
|
||||
// TODO: move into rgw_data_sync_trim.cc
|
||||
#undef dout_prefix
|
||||
#define dout_prefix (*_dout << "data trim: ")
|
||||
|
||||
@ -2936,28 +2937,7 @@ void take_min_markers(IterIn first, IterIn last, IterOut dest)
|
||||
}
|
||||
}
|
||||
|
||||
// wrapper to update last_trim_marker on success
|
||||
class LastTimelogTrimCR : public RGWRadosTimelogTrimCR {
|
||||
CephContext *cct;
|
||||
std::string *last_trim_marker;
|
||||
public:
|
||||
LastTimelogTrimCR(RGWRados *store, const std::string& oid,
|
||||
const std::string& to_marker, std::string *last_trim_marker)
|
||||
: RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
|
||||
std::string{}, to_marker),
|
||||
cct(store->ctx()), last_trim_marker(last_trim_marker)
|
||||
{}
|
||||
int request_complete() override {
|
||||
int r = RGWRadosTimelogTrimCR::request_complete();
|
||||
if (r < 0 && r != -ENODATA) {
|
||||
ldout(cct, 1) << "failed to trim datalog: " << cpp_strerror(r) << dendl;
|
||||
return r;
|
||||
}
|
||||
ldout(cct, 10) << "datalog trimmed to marker " << to_marker << dendl;
|
||||
*last_trim_marker = to_marker;
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
} // anonymous namespace
|
||||
|
||||
class DataLogTrimCR : public RGWCoroutine {
|
||||
RGWRados *store;
|
||||
@ -3036,7 +3016,7 @@ int DataLogTrimCR::operate()
|
||||
ldout(cct, 10) << "trimming log shard " << i
|
||||
<< " at marker=" << stable
|
||||
<< " last_trim=" << last_trim[i] << dendl;
|
||||
using TrimCR = LastTimelogTrimCR;
|
||||
using TrimCR = RGWSyncLogTrimCR;
|
||||
spawn(new TrimCR(store, store->data_log->get_oid(i),
|
||||
stable, &last_trim[i]),
|
||||
true);
|
||||
@ -3100,8 +3080,6 @@ int DataLogTrimPollCR::operate()
|
||||
return 0;
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
|
||||
RGWHTTPManager *http,
|
||||
int num_shards, utime_t interval)
|
||||
|
@ -313,28 +313,6 @@ public:
|
||||
static RGWMetadataTopHandler md_top_handler;
|
||||
|
||||
|
||||
static const std::string mdlog_history_oid = "meta.history";
|
||||
|
||||
struct RGWMetadataLogHistory {
|
||||
epoch_t oldest_realm_epoch;
|
||||
std::string oldest_period_id;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(oldest_realm_epoch, bl);
|
||||
::encode(oldest_period_id, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
void decode(bufferlist::iterator& p) {
|
||||
DECODE_START(1, p);
|
||||
::decode(oldest_realm_epoch, p);
|
||||
::decode(oldest_period_id, p);
|
||||
DECODE_FINISH(p);
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(RGWMetadataLogHistory)
|
||||
|
||||
|
||||
RGWMetadataManager::RGWMetadataManager(CephContext *_cct, RGWRados *_store)
|
||||
: cct(_cct), store(_store)
|
||||
{
|
||||
@ -351,15 +329,18 @@ RGWMetadataManager::~RGWMetadataManager()
|
||||
handlers.clear();
|
||||
}
|
||||
|
||||
const std::string RGWMetadataLogHistory::oid = "meta.history";
|
||||
|
||||
namespace {
|
||||
|
||||
int read_history(RGWRados *store, RGWMetadataLogHistory *state)
|
||||
int read_history(RGWRados *store, RGWMetadataLogHistory *state,
|
||||
RGWObjVersionTracker *objv_tracker)
|
||||
{
|
||||
RGWObjectCtx ctx{store};
|
||||
auto& pool = store->get_zone_params().log_pool;
|
||||
const auto& oid = mdlog_history_oid;
|
||||
const auto& oid = RGWMetadataLogHistory::oid;
|
||||
bufferlist bl;
|
||||
int ret = rgw_get_system_obj(store, ctx, pool, oid, bl, nullptr, nullptr);
|
||||
int ret = rgw_get_system_obj(store, ctx, pool, oid, bl, objv_tracker, nullptr);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
@ -375,19 +356,141 @@ int read_history(RGWRados *store, RGWMetadataLogHistory *state)
|
||||
}
|
||||
|
||||
int write_history(RGWRados *store, const RGWMetadataLogHistory& state,
|
||||
bool exclusive = false)
|
||||
RGWObjVersionTracker *objv_tracker, bool exclusive = false)
|
||||
{
|
||||
bufferlist bl;
|
||||
state.encode(bl);
|
||||
|
||||
auto& pool = store->get_zone_params().log_pool;
|
||||
const auto& oid = mdlog_history_oid;
|
||||
const auto& oid = RGWMetadataLogHistory::oid;
|
||||
return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(),
|
||||
exclusive, nullptr, real_time{});
|
||||
exclusive, objv_tracker, real_time{});
|
||||
}
|
||||
|
||||
using Cursor = RGWPeriodHistory::Cursor;
|
||||
|
||||
/// read the mdlog history and use it to initialize the given cursor
|
||||
class ReadHistoryCR : public RGWCoroutine {
|
||||
RGWRados *store;
|
||||
Cursor *cursor;
|
||||
RGWObjVersionTracker *objv_tracker;
|
||||
RGWMetadataLogHistory state;
|
||||
public:
|
||||
ReadHistoryCR(RGWRados *store, Cursor *cursor,
|
||||
RGWObjVersionTracker *objv_tracker)
|
||||
: RGWCoroutine(store->ctx()), store(store), cursor(cursor),
|
||||
objv_tracker(objv_tracker)
|
||||
{}
|
||||
|
||||
int operate() {
|
||||
reenter(this) {
|
||||
yield {
|
||||
rgw_raw_obj obj{store->get_zone_params().log_pool,
|
||||
RGWMetadataLogHistory::oid};
|
||||
constexpr bool empty_on_enoent = false;
|
||||
|
||||
using ReadCR = RGWSimpleRadosReadCR<RGWMetadataLogHistory>;
|
||||
call(new ReadCR(store->get_async_rados(), store, obj,
|
||||
&state, empty_on_enoent, objv_tracker));
|
||||
}
|
||||
if (retcode < 0) {
|
||||
ldout(cct, 1) << "failed to read mdlog history: "
|
||||
<< cpp_strerror(retcode) << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
*cursor = store->period_history->lookup(state.oldest_realm_epoch);
|
||||
if (!*cursor) {
|
||||
return set_cr_error(cursor->get_error());
|
||||
}
|
||||
|
||||
ldout(cct, 10) << "read mdlog history with oldest period id="
|
||||
<< state.oldest_period_id << " realm_epoch="
|
||||
<< state.oldest_realm_epoch << dendl;
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
/// write the given cursor to the mdlog history
|
||||
class WriteHistoryCR : public RGWCoroutine {
|
||||
RGWRados *store;
|
||||
Cursor cursor;
|
||||
RGWObjVersionTracker *objv;
|
||||
RGWMetadataLogHistory state;
|
||||
public:
|
||||
WriteHistoryCR(RGWRados *store, const Cursor& cursor,
|
||||
RGWObjVersionTracker *objv)
|
||||
: RGWCoroutine(store->ctx()), store(store), cursor(cursor), objv(objv)
|
||||
{}
|
||||
|
||||
int operate() {
|
||||
reenter(this) {
|
||||
state.oldest_period_id = cursor.get_period().get_id();
|
||||
state.oldest_realm_epoch = cursor.get_epoch();
|
||||
|
||||
yield {
|
||||
rgw_raw_obj obj{store->get_zone_params().log_pool,
|
||||
RGWMetadataLogHistory::oid};
|
||||
|
||||
using WriteCR = RGWSimpleRadosWriteCR<RGWMetadataLogHistory>;
|
||||
call(new WriteCR(store->get_async_rados(), store, obj, state, objv));
|
||||
}
|
||||
if (retcode < 0) {
|
||||
ldout(cct, 1) << "failed to write mdlog history: "
|
||||
<< cpp_strerror(retcode) << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
ldout(cct, 10) << "wrote mdlog history with oldest period id="
|
||||
<< state.oldest_period_id << " realm_epoch="
|
||||
<< state.oldest_realm_epoch << dendl;
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
/// update the mdlog history to reflect trimmed logs
|
||||
class TrimHistoryCR : public RGWCoroutine {
|
||||
RGWRados *store;
|
||||
const Cursor cursor; //< cursor to trimmed period
|
||||
RGWObjVersionTracker *objv; //< to prevent racing updates
|
||||
Cursor next; //< target cursor for oldest log period
|
||||
Cursor existing; //< existing cursor read from disk
|
||||
|
||||
public:
|
||||
TrimHistoryCR(RGWRados *store, Cursor cursor, RGWObjVersionTracker *objv)
|
||||
: RGWCoroutine(store->ctx()),
|
||||
store(store), cursor(cursor), objv(objv), next(cursor)
|
||||
{
|
||||
next.next(); // advance past cursor
|
||||
}
|
||||
|
||||
int operate() {
|
||||
reenter(this) {
|
||||
// read an existing history, and write the new history if it's newer
|
||||
yield call(new ReadHistoryCR(store, &existing, objv));
|
||||
if (retcode < 0) {
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
// reject older trims with ECANCELED
|
||||
if (cursor.get_epoch() < existing.get_epoch()) {
|
||||
ldout(cct, 4) << "found oldest log epoch=" << existing.get_epoch()
|
||||
<< ", rejecting trim at epoch=" << cursor.get_epoch() << dendl;
|
||||
return set_cr_error(-ECANCELED);
|
||||
}
|
||||
// overwrite with updated history
|
||||
yield call(new WriteHistoryCR(store, next, objv));
|
||||
if (retcode < 0) {
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
// traverse all the way back to the beginning of the period history, and
|
||||
// return a cursor to the first period in a fully attached history
|
||||
Cursor find_oldest_period(RGWRados *store)
|
||||
@ -431,7 +534,8 @@ Cursor RGWMetadataManager::init_oldest_log_period()
|
||||
{
|
||||
// read the mdlog history
|
||||
RGWMetadataLogHistory state;
|
||||
int ret = read_history(store, &state);
|
||||
RGWObjVersionTracker objv;
|
||||
int ret = read_history(store, &state, &objv);
|
||||
|
||||
if (ret == -ENOENT) {
|
||||
// initialize the mdlog history and write it
|
||||
@ -446,7 +550,7 @@ Cursor RGWMetadataManager::init_oldest_log_period()
|
||||
state.oldest_period_id = cursor.get_period().get_id();
|
||||
|
||||
constexpr bool exclusive = true; // don't overwrite
|
||||
int ret = write_history(store, state, exclusive);
|
||||
int ret = write_history(store, state, &objv, exclusive);
|
||||
if (ret < 0 && ret != -EEXIST) {
|
||||
ldout(cct, 1) << "failed to write mdlog history: "
|
||||
<< cpp_strerror(ret) << dendl;
|
||||
@ -486,7 +590,7 @@ Cursor RGWMetadataManager::init_oldest_log_period()
|
||||
Cursor RGWMetadataManager::read_oldest_log_period() const
|
||||
{
|
||||
RGWMetadataLogHistory state;
|
||||
int ret = read_history(store, &state);
|
||||
int ret = read_history(store, &state, nullptr);
|
||||
if (ret < 0) {
|
||||
ldout(store->ctx(), 1) << "failed to read mdlog history: "
|
||||
<< cpp_strerror(ret) << dendl;
|
||||
@ -500,6 +604,18 @@ Cursor RGWMetadataManager::read_oldest_log_period() const
|
||||
return store->period_history->lookup(state.oldest_realm_epoch);
|
||||
}
|
||||
|
||||
RGWCoroutine* RGWMetadataManager::read_oldest_log_period_cr(Cursor *period,
|
||||
RGWObjVersionTracker *objv) const
|
||||
{
|
||||
return new ReadHistoryCR(store, period, objv);
|
||||
}
|
||||
|
||||
RGWCoroutine* RGWMetadataManager::trim_log_period_cr(Cursor period,
|
||||
RGWObjVersionTracker *objv) const
|
||||
{
|
||||
return new TrimHistoryCR(store, period, objv);
|
||||
}
|
||||
|
||||
int RGWMetadataManager::init(const std::string& current_period)
|
||||
{
|
||||
// open a log for the current period
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
|
||||
class RGWRados;
|
||||
class RGWCoroutine;
|
||||
class JSONObj;
|
||||
struct RGWObjVersionTracker;
|
||||
|
||||
@ -265,6 +266,27 @@ struct RGWMetadataLogData {
|
||||
};
|
||||
WRITE_CLASS_ENCODER(RGWMetadataLogData)
|
||||
|
||||
struct RGWMetadataLogHistory {
|
||||
epoch_t oldest_realm_epoch;
|
||||
std::string oldest_period_id;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(oldest_realm_epoch, bl);
|
||||
::encode(oldest_period_id, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
void decode(bufferlist::iterator& p) {
|
||||
DECODE_START(1, p);
|
||||
::decode(oldest_realm_epoch, p);
|
||||
::decode(oldest_period_id, p);
|
||||
DECODE_FINISH(p);
|
||||
}
|
||||
|
||||
static const std::string oid;
|
||||
};
|
||||
WRITE_CLASS_ENCODER(RGWMetadataLogHistory)
|
||||
|
||||
class RGWMetadataManager {
|
||||
map<string, RGWMetadataHandler *> handlers;
|
||||
CephContext *cct;
|
||||
@ -303,6 +325,16 @@ public:
|
||||
/// period history
|
||||
RGWPeriodHistory::Cursor read_oldest_log_period() const;
|
||||
|
||||
/// read the oldest log period asynchronously and write its result to the
|
||||
/// given cursor pointer
|
||||
RGWCoroutine* read_oldest_log_period_cr(RGWPeriodHistory::Cursor *period,
|
||||
RGWObjVersionTracker *objv) const;
|
||||
|
||||
/// try to advance the oldest log period when the given period is trimmed,
|
||||
/// using a rados lock to provide atomicity
|
||||
RGWCoroutine* trim_log_period_cr(RGWPeriodHistory::Cursor period,
|
||||
RGWObjVersionTracker *objv) const;
|
||||
|
||||
/// find or create the metadata log for the given period
|
||||
RGWMetadataLog* get_log(const std::string& period);
|
||||
|
||||
|
@ -3229,9 +3229,20 @@ public:
|
||||
return http.set_threaded();
|
||||
}
|
||||
int process() override {
|
||||
crs.run(create_data_log_trim_cr(store, &http,
|
||||
cct->_conf->rgw_data_log_num_shards,
|
||||
trim_interval));
|
||||
list<RGWCoroutinesStack*> stacks;
|
||||
auto meta = new RGWCoroutinesStack(store->ctx(), &crs);
|
||||
meta->call(create_meta_log_trim_cr(store, &http,
|
||||
cct->_conf->rgw_md_log_max_shards,
|
||||
trim_interval));
|
||||
stacks.push_back(meta);
|
||||
|
||||
auto data = new RGWCoroutinesStack(store->ctx(), &crs);
|
||||
data->call(create_data_log_trim_cr(store, &http,
|
||||
cct->_conf->rgw_data_log_num_shards,
|
||||
trim_interval));
|
||||
stacks.push_back(data);
|
||||
|
||||
crs.run(stacks);
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
@ -4164,7 +4175,8 @@ int RGWRados::init_complete()
|
||||
|
||||
/* no point of running sync thread if we don't have a master zone configured
|
||||
or there is no rest_master_conn */
|
||||
if (get_zonegroup().master_zone.empty() || !rest_master_conn) {
|
||||
if (get_zonegroup().master_zone.empty() || !rest_master_conn
|
||||
|| current_period.get_id().empty()) {
|
||||
run_sync_thread = false;
|
||||
}
|
||||
|
||||
@ -6709,7 +6721,8 @@ int RGWRados::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mt
|
||||
}
|
||||
|
||||
int RGWRados::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
|
||||
off_t ofs, bool exclusive)
|
||||
off_t ofs, bool exclusive,
|
||||
RGWObjVersionTracker *objv_tracker)
|
||||
{
|
||||
rgw_rados_ref ref;
|
||||
rgw_pool pool;
|
||||
@ -6723,6 +6736,9 @@ int RGWRados::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
|
||||
if (exclusive)
|
||||
op.create(true);
|
||||
|
||||
if (objv_tracker) {
|
||||
objv_tracker->prepare_op_for_write(&op);
|
||||
}
|
||||
if (ofs == -1) {
|
||||
op.write_full(bl);
|
||||
} else {
|
||||
@ -6732,6 +6748,9 @@ int RGWRados::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
if (objv_tracker) {
|
||||
objv_tracker->apply_write();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2924,7 +2924,8 @@ public:
|
||||
ceph::real_time set_mtime /* 0 for don't set */);
|
||||
|
||||
virtual int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
|
||||
off_t ofs, bool exclusive);
|
||||
off_t ofs, bool exclusive,
|
||||
RGWObjVersionTracker *objv_tracker = nullptr);
|
||||
int aio_put_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
|
||||
off_t ofs, bool exclusive, void **handle);
|
||||
|
||||
|
@ -2313,3 +2313,690 @@ int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
|
||||
}
|
||||
|
||||
|
||||
// TODO: move into rgw_sync_trim.cc
|
||||
#undef dout_prefix
|
||||
#define dout_prefix (*_dout << "meta trim: ")
|
||||
|
||||
/// purge all log shards for the given mdlog
|
||||
class PurgeLogShardsCR : public RGWShardCollectCR {
|
||||
RGWRados *const store;
|
||||
const RGWMetadataLog* mdlog;
|
||||
const int num_shards;
|
||||
rgw_raw_obj obj;
|
||||
int i{0};
|
||||
|
||||
static constexpr int max_concurrent = 16;
|
||||
|
||||
public:
|
||||
PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
|
||||
const rgw_pool& pool, int num_shards)
|
||||
: RGWShardCollectCR(store->ctx(), max_concurrent),
|
||||
store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
|
||||
{}
|
||||
|
||||
bool spawn_next() override {
|
||||
if (i == num_shards) {
|
||||
return false;
|
||||
}
|
||||
mdlog->get_shard_oid(i++, obj.oid);
|
||||
spawn(new RGWRadosRemoveCR(store, obj), false);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
using Cursor = RGWPeriodHistory::Cursor;
|
||||
|
||||
/// purge mdlogs from the oldest up to (but not including) the given realm_epoch
|
||||
class PurgePeriodLogsCR : public RGWCoroutine {
|
||||
RGWRados *const store;
|
||||
RGWMetadataManager *const metadata;
|
||||
RGWObjVersionTracker objv;
|
||||
Cursor cursor;
|
||||
epoch_t realm_epoch;
|
||||
epoch_t *last_trim_epoch; //< update last trim on success
|
||||
|
||||
public:
|
||||
PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
|
||||
: RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
|
||||
realm_epoch(realm_epoch), last_trim_epoch(last_trim)
|
||||
{}
|
||||
|
||||
int operate();
|
||||
};
|
||||
|
||||
int PurgePeriodLogsCR::operate()
|
||||
{
|
||||
reenter(this) {
|
||||
// read our current oldest log period
|
||||
yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
|
||||
if (retcode < 0) {
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
assert(cursor);
|
||||
ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
|
||||
<< " period=" << cursor.get_period().get_id() << dendl;
|
||||
|
||||
// trim -up to- the given realm_epoch
|
||||
while (cursor.get_epoch() < realm_epoch) {
|
||||
ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
|
||||
<< " period=" << cursor.get_period().get_id() << dendl;
|
||||
yield {
|
||||
const auto mdlog = metadata->get_log(cursor.get_period().get_id());
|
||||
const auto& pool = store->get_zone_params().log_pool;
|
||||
auto num_shards = cct->_conf->rgw_md_log_max_shards;
|
||||
call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
|
||||
}
|
||||
if (retcode < 0) {
|
||||
ldout(cct, 1) << "failed to remove log shards: "
|
||||
<< cpp_strerror(retcode) << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
|
||||
<< " period=" << cursor.get_period().get_id() << dendl;
|
||||
|
||||
// update our mdlog history
|
||||
yield call(metadata->trim_log_period_cr(cursor, &objv));
|
||||
if (retcode == -ENOENT) {
|
||||
// must have raced to update mdlog history. return success and allow the
|
||||
// winner to continue purging
|
||||
ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
|
||||
<< " period=" << cursor.get_period().get_id() << dendl;
|
||||
return set_cr_done();
|
||||
} else if (retcode < 0) {
|
||||
ldout(cct, 1) << "failed to remove log shards for realm_epoch="
|
||||
<< cursor.get_epoch() << " period=" << cursor.get_period().get_id()
|
||||
<< " with: " << cpp_strerror(retcode) << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
if (*last_trim_epoch < cursor.get_epoch()) {
|
||||
*last_trim_epoch = cursor.get_epoch();
|
||||
}
|
||||
|
||||
assert(cursor.has_next()); // get_current() should always come after
|
||||
cursor.next();
|
||||
}
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
|
||||
|
||||
/// construct a RGWRESTConn for each zone in the realm
|
||||
template <typename Zonegroups>
|
||||
connection_map make_peer_connections(RGWRados *store,
|
||||
const Zonegroups& zonegroups)
|
||||
{
|
||||
connection_map connections;
|
||||
for (auto& g : zonegroups) {
|
||||
for (auto& z : g.second.zones) {
|
||||
std::unique_ptr<RGWRESTConn> conn{
|
||||
new RGWRESTConn(store->ctx(), store, z.first, z.second.endpoints)};
|
||||
connections.emplace(z.first, std::move(conn));
|
||||
}
|
||||
}
|
||||
return connections;
|
||||
}
|
||||
|
||||
/// return the marker that it's safe to trim up to
|
||||
const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
|
||||
{
|
||||
return m.state == m.FullSync ? m.next_step_marker : m.marker;
|
||||
}
|
||||
|
||||
/// comparison operator for take_min_status()
|
||||
bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
|
||||
{
|
||||
// sort by stable marker
|
||||
return get_stable_marker(lhs) < get_stable_marker(rhs);
|
||||
}
|
||||
|
||||
/// populate the status with the minimum stable marker of each shard for any
|
||||
/// peer whose realm_epoch matches the minimum realm_epoch in the input
|
||||
template <typename Iter>
|
||||
int take_min_status(CephContext *cct, Iter first, Iter last,
|
||||
rgw_meta_sync_status *status)
|
||||
{
|
||||
if (first == last) {
|
||||
return -EINVAL;
|
||||
}
|
||||
const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
|
||||
|
||||
status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
|
||||
for (auto p = first; p != last; ++p) {
|
||||
// validate peer's shard count
|
||||
if (p->sync_markers.size() != num_shards) {
|
||||
ldout(cct, 1) << "take_min_status got peer status with "
|
||||
<< p->sync_markers.size() << " shards, expected "
|
||||
<< num_shards << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
|
||||
// earlier epoch, take its entire status
|
||||
*status = std::move(*p);
|
||||
} else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
|
||||
// same epoch, take any earlier markers
|
||||
auto m = status->sync_markers.begin();
|
||||
for (auto& shard : p->sync_markers) {
|
||||
if (shard.second < m->second) {
|
||||
m->second = std::move(shard.second);
|
||||
}
|
||||
++m;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct TrimEnv {
|
||||
RGWRados *const store;
|
||||
RGWHTTPManager *const http;
|
||||
int num_shards;
|
||||
const std::string& zone;
|
||||
Cursor current; //< cursor to current period
|
||||
epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
|
||||
|
||||
TrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
|
||||
: store(store), http(http), num_shards(num_shards),
|
||||
zone(store->get_zone_params().get_id()),
|
||||
current(store->period_history->get_current())
|
||||
{}
|
||||
};
|
||||
|
||||
struct MasterTrimEnv : public TrimEnv {
|
||||
connection_map connections; //< peer connections
|
||||
std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
|
||||
/// last trim marker for each shard, only applies to current period's mdlog
|
||||
std::vector<std::string> last_trim_markers;
|
||||
|
||||
MasterTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
|
||||
: TrimEnv(store, http, num_shards),
|
||||
last_trim_markers(num_shards)
|
||||
{
|
||||
auto& period = current.get_period();
|
||||
connections = make_peer_connections(store, period.get_map().zonegroups);
|
||||
connections.erase(zone);
|
||||
peer_status.resize(connections.size());
|
||||
}
|
||||
};
|
||||
|
||||
struct PeerTrimEnv : public TrimEnv {
|
||||
/// last trim timestamp for each shard, only applies to current period's mdlog
|
||||
std::vector<ceph::real_time> last_trim_timestamps;
|
||||
|
||||
PeerTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
|
||||
: TrimEnv(store, http, num_shards),
|
||||
last_trim_timestamps(num_shards)
|
||||
{}
|
||||
|
||||
void set_num_shards(int num_shards) {
|
||||
this->num_shards = num_shards;
|
||||
last_trim_timestamps.resize(num_shards);
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
|
||||
/// spawn a trim cr for each shard that needs it, while limiting the number
|
||||
/// of concurrent shards
|
||||
class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
|
||||
private:
|
||||
static constexpr int MAX_CONCURRENT_SHARDS = 16;
|
||||
|
||||
MasterTrimEnv& env;
|
||||
RGWMetadataLog *mdlog;
|
||||
int shard_id{0};
|
||||
std::string oid;
|
||||
const rgw_meta_sync_status& sync_status;
|
||||
|
||||
public:
|
||||
MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
|
||||
const rgw_meta_sync_status& sync_status)
|
||||
: RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
|
||||
env(env), mdlog(mdlog), sync_status(sync_status)
|
||||
{}
|
||||
|
||||
bool spawn_next() override;
|
||||
};
|
||||
|
||||
bool MetaMasterTrimShardCollectCR::spawn_next()
|
||||
{
|
||||
while (shard_id < env.num_shards) {
|
||||
auto m = sync_status.sync_markers.find(shard_id);
|
||||
if (m == sync_status.sync_markers.end()) {
|
||||
shard_id++;
|
||||
continue;
|
||||
}
|
||||
auto& stable = get_stable_marker(m->second);
|
||||
auto& last_trim = env.last_trim_markers[shard_id];
|
||||
|
||||
if (stable <= last_trim) {
|
||||
// already trimmed
|
||||
ldout(cct, 20) << "skipping log shard " << shard_id
|
||||
<< " at marker=" << stable
|
||||
<< " last_trim=" << last_trim
|
||||
<< " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
|
||||
shard_id++;
|
||||
continue;
|
||||
}
|
||||
|
||||
mdlog->get_shard_oid(shard_id, oid);
|
||||
|
||||
ldout(cct, 10) << "trimming log shard " << shard_id
|
||||
<< " at marker=" << stable
|
||||
<< " last_trim=" << last_trim
|
||||
<< " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
|
||||
spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
|
||||
shard_id++;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// spawn rest requests to read each peer's sync status
|
||||
class MetaMasterStatusCollectCR : public RGWShardCollectCR {
|
||||
static constexpr int MAX_CONCURRENT_SHARDS = 16;
|
||||
|
||||
MasterTrimEnv& env;
|
||||
connection_map::iterator c;
|
||||
std::vector<rgw_meta_sync_status>::iterator s;
|
||||
public:
|
||||
MetaMasterStatusCollectCR(MasterTrimEnv& env)
|
||||
: RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
|
||||
env(env), c(env.connections.begin()), s(env.peer_status.begin())
|
||||
{}
|
||||
|
||||
bool spawn_next() override {
|
||||
if (c == env.connections.end()) {
|
||||
return false;
|
||||
}
|
||||
static rgw_http_param_pair params[] = {
|
||||
{ "type", "metadata" },
|
||||
{ "status", nullptr },
|
||||
{ nullptr, nullptr }
|
||||
};
|
||||
|
||||
ldout(cct, 20) << "query sync status from " << c->first << dendl;
|
||||
auto conn = c->second.get();
|
||||
using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
|
||||
spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
|
||||
false);
|
||||
++c;
|
||||
++s;
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
class MetaMasterTrimCR : public RGWCoroutine {
|
||||
MasterTrimEnv& env;
|
||||
rgw_meta_sync_status min_status; //< minimum sync status of all peers
|
||||
int ret{0};
|
||||
|
||||
public:
|
||||
MetaMasterTrimCR(MasterTrimEnv& env)
|
||||
: RGWCoroutine(env.store->ctx()), env(env)
|
||||
{}
|
||||
|
||||
int operate();
|
||||
};
|
||||
|
||||
int MetaMasterTrimCR::operate()
|
||||
{
|
||||
reenter(this) {
|
||||
// TODO: detect this and fail before we spawn the trim thread?
|
||||
if (env.connections.empty()) {
|
||||
ldout(cct, 4) << "no peers, exiting" << dendl;
|
||||
return set_cr_done();
|
||||
}
|
||||
|
||||
ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
|
||||
// query mdlog sync status from peers
|
||||
yield call(new MetaMasterStatusCollectCR(env));
|
||||
|
||||
// must get a successful reply from all peers to consider trimming
|
||||
if (ret < 0) {
|
||||
ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
|
||||
return set_cr_error(ret);
|
||||
}
|
||||
|
||||
// determine the minimum epoch and markers
|
||||
ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
|
||||
env.peer_status.end(), &min_status);
|
||||
if (ret < 0) {
|
||||
ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
|
||||
return set_cr_error(ret);
|
||||
}
|
||||
yield {
|
||||
auto store = env.store;
|
||||
auto epoch = min_status.sync_info.realm_epoch;
|
||||
ldout(cct, 4) << "realm epoch min=" << epoch
|
||||
<< " current=" << env.current.get_epoch()<< dendl;
|
||||
if (epoch > env.last_trim_epoch + 1) {
|
||||
// delete any prior mdlog periods
|
||||
spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
|
||||
} else {
|
||||
ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
|
||||
<< env.last_trim_epoch << dendl;
|
||||
}
|
||||
|
||||
// if realm_epoch == current, trim mdlog based on markers
|
||||
if (epoch == env.current.get_epoch()) {
|
||||
auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
|
||||
spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
|
||||
}
|
||||
}
|
||||
// ignore any errors during purge/trim because we want to hold the lock open
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/// read the first entry of the master's mdlog shard and trim to that position
|
||||
class MetaPeerTrimShardCR : public RGWCoroutine {
|
||||
RGWMetaSyncEnv& env;
|
||||
RGWMetadataLog *mdlog;
|
||||
const std::string& period_id;
|
||||
const int shard_id;
|
||||
RGWMetadataLogInfo info;
|
||||
ceph::real_time stable; //< safe timestamp to trim, according to master
|
||||
ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
|
||||
rgw_mdlog_shard_data result; //< result from master's mdlog listing
|
||||
|
||||
public:
|
||||
MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
|
||||
const std::string& period_id, int shard_id,
|
||||
ceph::real_time *last_trim)
|
||||
: RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
|
||||
period_id(period_id), shard_id(shard_id), last_trim(last_trim)
|
||||
{}
|
||||
|
||||
int operate() override;
|
||||
};
|
||||
|
||||
int MetaPeerTrimShardCR::operate()
|
||||
{
|
||||
reenter(this) {
|
||||
// query master's first mdlog entry for this shard
|
||||
yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
|
||||
"", 1, &result));
|
||||
if (retcode < 0) {
|
||||
ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
|
||||
<< shard_id << " for period " << period_id
|
||||
<< ": " << cpp_strerror(retcode) << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
if (result.entries.empty()) {
|
||||
// if there are no mdlog entries, we don't have a timestamp to compare. we
|
||||
// can't just trim everything, because there could be racing updates since
|
||||
// this empty reply. query the mdlog shard info to read its max timestamp,
|
||||
// then retry the listing to make sure it's still empty before trimming to
|
||||
// that
|
||||
ldout(cct, 10) << "empty master mdlog shard " << shard_id
|
||||
<< ", reading last timestamp from shard info" << dendl;
|
||||
// read the mdlog shard info for the last timestamp
|
||||
using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR;
|
||||
yield call(new ShardInfoCR(&env, period_id, shard_id, &info));
|
||||
if (retcode < 0) {
|
||||
ldout(cct, 5) << "failed to read info from master's mdlog shard "
|
||||
<< shard_id << " for period " << period_id
|
||||
<< ": " << cpp_strerror(retcode) << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
if (ceph::real_clock::is_zero(info.last_update)) {
|
||||
return set_cr_done(); // nothing to trim
|
||||
}
|
||||
ldout(cct, 10) << "got mdlog shard info with last update="
|
||||
<< info.last_update << dendl;
|
||||
// re-read the master's first mdlog entry to make sure it hasn't changed
|
||||
yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
|
||||
"", 1, &result));
|
||||
if (retcode < 0) {
|
||||
ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
|
||||
<< shard_id << " for period " << period_id
|
||||
<< ": " << cpp_strerror(retcode) << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
// if the mdlog is still empty, trim to max marker
|
||||
if (result.entries.empty()) {
|
||||
stable = info.last_update;
|
||||
} else {
|
||||
stable = result.entries.front().timestamp;
|
||||
|
||||
// can only trim -up to- master's first timestamp, so subtract a second.
|
||||
// (this is why we use timestamps instead of markers for the peers)
|
||||
stable -= std::chrono::seconds(1);
|
||||
}
|
||||
} else {
|
||||
stable = result.entries.front().timestamp;
|
||||
stable -= std::chrono::seconds(1);
|
||||
}
|
||||
|
||||
if (stable <= *last_trim) {
|
||||
ldout(cct, 10) << "skipping log shard " << shard_id
|
||||
<< " at timestamp=" << stable
|
||||
<< " last_trim=" << *last_trim << dendl;
|
||||
return set_cr_done();
|
||||
}
|
||||
|
||||
ldout(cct, 10) << "trimming log shard " << shard_id
|
||||
<< " at timestamp=" << stable
|
||||
<< " last_trim=" << *last_trim << dendl;
|
||||
yield {
|
||||
std::string oid;
|
||||
mdlog->get_shard_oid(shard_id, oid);
|
||||
call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
|
||||
}
|
||||
if (retcode < 0 && retcode != -ENODATA) {
|
||||
ldout(cct, 1) << "failed to trim mdlog shard " << shard_id
|
||||
<< ": " << cpp_strerror(retcode) << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
*last_trim = stable;
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
|
||||
static constexpr int MAX_CONCURRENT_SHARDS = 16;
|
||||
|
||||
PeerTrimEnv& env;
|
||||
RGWMetadataLog *mdlog;
|
||||
const std::string& period_id;
|
||||
RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
|
||||
int shard_id{0};
|
||||
|
||||
public:
|
||||
MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
|
||||
: RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
|
||||
env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
|
||||
{
|
||||
meta_env.init(cct, env.store, env.store->rest_master_conn,
|
||||
env.store->get_async_rados(), env.http, nullptr);
|
||||
}
|
||||
|
||||
bool spawn_next() override;
|
||||
};
|
||||
|
||||
bool MetaPeerTrimShardCollectCR::spawn_next()
|
||||
{
|
||||
if (shard_id >= env.num_shards) {
|
||||
return false;
|
||||
}
|
||||
auto& last_trim = env.last_trim_timestamps[shard_id];
|
||||
spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
|
||||
false);
|
||||
shard_id++;
|
||||
return true;
|
||||
}
|
||||
|
||||
class MetaPeerTrimCR : public RGWCoroutine {
|
||||
PeerTrimEnv& env;
|
||||
rgw_mdlog_info mdlog_info; //< master's mdlog info
|
||||
|
||||
public:
|
||||
MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
|
||||
|
||||
int operate();
|
||||
};
|
||||
|
||||
int MetaPeerTrimCR::operate()
|
||||
{
|
||||
reenter(this) {
|
||||
ldout(cct, 10) << "fetching master mdlog info" << dendl;
|
||||
yield {
|
||||
// query mdlog_info from master for oldest_log_period
|
||||
rgw_http_param_pair params[] = {
|
||||
{ "type", "metadata" },
|
||||
{ nullptr, nullptr }
|
||||
};
|
||||
|
||||
using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
|
||||
call(new LogInfoCR(cct, env.store->rest_master_conn, env.http,
|
||||
"/admin/log/", params, &mdlog_info));
|
||||
}
|
||||
if (retcode < 0) {
|
||||
ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
// use master's shard count instead
|
||||
env.set_num_shards(mdlog_info.num_shards);
|
||||
|
||||
if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
|
||||
// delete any prior mdlog periods
|
||||
yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
|
||||
&env.last_trim_epoch));
|
||||
} else {
|
||||
ldout(cct, 10) << "mdlogs already purged through realm_epoch "
|
||||
<< env.last_trim_epoch << dendl;
|
||||
}
|
||||
|
||||
// if realm_epoch == current, trim mdlog based on master's markers
|
||||
if (mdlog_info.realm_epoch == env.current.get_epoch()) {
|
||||
yield {
|
||||
auto meta_mgr = env.store->meta_mgr;
|
||||
auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
|
||||
call(new MetaPeerTrimShardCollectCR(env, mdlog));
|
||||
// ignore any errors during purge/trim because we want to hold the lock open
|
||||
}
|
||||
}
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
class MetaTrimPollCR : public RGWCoroutine {
|
||||
RGWRados *const store;
|
||||
const utime_t interval; //< polling interval
|
||||
const rgw_raw_obj obj;
|
||||
const std::string name{"meta_trim"}; //< lock name
|
||||
const std::string cookie;
|
||||
|
||||
protected:
|
||||
/// allocate the coroutine to run within the lease
|
||||
virtual RGWCoroutine* alloc_cr() = 0;
|
||||
|
||||
public:
|
||||
MetaTrimPollCR(RGWRados *store, utime_t interval)
|
||||
: RGWCoroutine(store->ctx()), store(store), interval(interval),
|
||||
obj(store->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
|
||||
cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
|
||||
{}
|
||||
|
||||
int operate();
|
||||
};
|
||||
|
||||
int MetaTrimPollCR::operate()
|
||||
{
|
||||
reenter(this) {
|
||||
for (;;) {
|
||||
set_status("sleeping");
|
||||
wait(interval);
|
||||
|
||||
// prevent others from trimming for our entire wait interval
|
||||
set_status("acquiring trim lock");
|
||||
yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
|
||||
obj, name, cookie, interval.sec()));
|
||||
if (retcode < 0) {
|
||||
ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
|
||||
continue;
|
||||
}
|
||||
|
||||
set_status("trimming");
|
||||
yield call(alloc_cr());
|
||||
|
||||
if (retcode < 0) {
|
||||
// on errors, unlock so other gateways can try
|
||||
set_status("unlocking");
|
||||
yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
|
||||
obj, name, cookie));
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
class MetaMasterTrimPollCR : public MetaTrimPollCR {
|
||||
MasterTrimEnv env; //< trim state to share between calls
|
||||
RGWCoroutine* alloc_cr() override {
|
||||
return new MetaMasterTrimCR(env);
|
||||
}
|
||||
public:
|
||||
MetaMasterTrimPollCR(RGWRados *store, RGWHTTPManager *http,
|
||||
int num_shards, utime_t interval)
|
||||
: MetaTrimPollCR(store, interval),
|
||||
env(store, http, num_shards)
|
||||
{}
|
||||
};
|
||||
|
||||
class MetaPeerTrimPollCR : public MetaTrimPollCR {
|
||||
PeerTrimEnv env; //< trim state to share between calls
|
||||
RGWCoroutine* alloc_cr() override {
|
||||
return new MetaPeerTrimCR(env);
|
||||
}
|
||||
public:
|
||||
MetaPeerTrimPollCR(RGWRados *store, RGWHTTPManager *http,
|
||||
int num_shards, utime_t interval)
|
||||
: MetaTrimPollCR(store, interval),
|
||||
env(store, http, num_shards)
|
||||
{}
|
||||
};
|
||||
|
||||
RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http,
|
||||
int num_shards, utime_t interval)
|
||||
{
|
||||
if (store->is_meta_master()) {
|
||||
return new MetaMasterTrimPollCR(store, http, num_shards, interval);
|
||||
}
|
||||
return new MetaPeerTrimPollCR(store, http, num_shards, interval);
|
||||
}
|
||||
|
||||
|
||||
struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
|
||||
MetaMasterAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
|
||||
: MasterTrimEnv(store, http, num_shards),
|
||||
MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
|
||||
{}
|
||||
};
|
||||
|
||||
struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
|
||||
MetaPeerAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
|
||||
: PeerTrimEnv(store, http, num_shards),
|
||||
MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
|
||||
{}
|
||||
};
|
||||
|
||||
RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store,
|
||||
RGWHTTPManager *http,
|
||||
int num_shards)
|
||||
{
|
||||
if (store->is_meta_master()) {
|
||||
return new MetaMasterAdminTrimCR(store, http, num_shards);
|
||||
}
|
||||
return new MetaPeerAdminTrimCR(store, http, num_shards);
|
||||
}
|
||||
|
@ -452,5 +452,13 @@ public:
|
||||
int operate() override;
|
||||
};
|
||||
|
||||
// MetaLogTrimCR factory function
|
||||
RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http,
|
||||
int num_shards, utime_t interval);
|
||||
|
||||
// factory function for mdlog trim via radosgw-admin
|
||||
RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store,
|
||||
RGWHTTPManager *http,
|
||||
int num_shards);
|
||||
|
||||
#endif
|
||||
|
@ -53,6 +53,17 @@ def get_zone_connection(zone, credentials):
|
||||
credentials = credentials[0]
|
||||
return get_gateway_connection(zone.gateways[0], credentials)
|
||||
|
||||
def mdlog_list(zone, period = None):
|
||||
cmd = ['mdlog', 'list']
|
||||
if period:
|
||||
cmd += ['--period', period]
|
||||
(mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
|
||||
mdlog_json = mdlog_json.decode('utf-8')
|
||||
return json.loads(mdlog_json)
|
||||
|
||||
def mdlog_autotrim(zone):
|
||||
zone.cluster.admin(['mdlog', 'autotrim'])
|
||||
|
||||
def meta_sync_status(zone):
|
||||
while True:
|
||||
cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
|
||||
@ -654,6 +665,9 @@ def test_multi_period_incremental_sync():
|
||||
if len(zonegroup.zones) < 3:
|
||||
raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
|
||||
|
||||
# periods to include in mdlog comparison
|
||||
mdlog_periods = [realm.current_period.id]
|
||||
|
||||
# create a bucket in each zone
|
||||
buckets = []
|
||||
for zone in zonegroup.zones:
|
||||
@ -673,6 +687,7 @@ def test_multi_period_incremental_sync():
|
||||
|
||||
# change master to zone 2 -> period 2
|
||||
set_master_zone(z2)
|
||||
mdlog_periods += [realm.current_period.id]
|
||||
|
||||
# create another bucket in each zone, except for z3
|
||||
for zone in zonegroup.zones:
|
||||
@ -689,6 +704,7 @@ def test_multi_period_incremental_sync():
|
||||
|
||||
# change master back to zone 1 -> period 3
|
||||
set_master_zone(z1)
|
||||
mdlog_periods += [realm.current_period.id]
|
||||
|
||||
# create another bucket in each zone, except for z3
|
||||
for zone in zonegroup.zones:
|
||||
@ -709,6 +725,31 @@ def test_multi_period_incremental_sync():
|
||||
for source_zone, target_zone in combinations(zonegroup.zones, 2):
|
||||
check_bucket_eq(source_zone, target_zone, bucket_name)
|
||||
|
||||
# verify that mdlogs are not empty and match for each period
|
||||
for period in mdlog_periods:
|
||||
master_mdlog = mdlog_list(z1, period)
|
||||
assert len(master_mdlog) > 0
|
||||
for zone in zonegroup.zones:
|
||||
if zone == z1:
|
||||
continue
|
||||
mdlog = mdlog_list(zone, period)
|
||||
assert len(mdlog) == len(master_mdlog)
|
||||
|
||||
# autotrim mdlogs for master zone
|
||||
mdlog_autotrim(z1)
|
||||
|
||||
# autotrim mdlogs for peers
|
||||
for zone in zonegroup.zones:
|
||||
if zone == z1:
|
||||
continue
|
||||
mdlog_autotrim(zone)
|
||||
|
||||
# verify that mdlogs are empty for each period
|
||||
for period in mdlog_periods:
|
||||
for zone in zonegroup.zones:
|
||||
mdlog = mdlog_list(zone, period)
|
||||
assert len(mdlog) == 0
|
||||
|
||||
def test_zonegroup_remove():
|
||||
zonegroup = realm.master_zonegroup()
|
||||
if len(zonegroup.zones) < 2:
|
||||
|
Loading…
Reference in New Issue
Block a user