diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index d7c11e5b881..a776d040f33 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -421,6 +421,7 @@ enum { OPT_METADATA_SYNC_INIT, OPT_METADATA_SYNC_RUN, OPT_MDLOG_LIST, + OPT_MDLOG_AUTOTRIM, OPT_MDLOG_TRIM, OPT_MDLOG_FETCH, OPT_MDLOG_STATUS, @@ -819,6 +820,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ } else if (strcmp(prev_cmd, "mdlog") == 0) { if (strcmp(cmd, "list") == 0) return OPT_MDLOG_LIST; + if (strcmp(cmd, "autotrim") == 0) + return OPT_MDLOG_AUTOTRIM; if (strcmp(cmd, "trim") == 0) return OPT_MDLOG_TRIM; if (strcmp(cmd, "fetch") == 0) @@ -6190,6 +6193,26 @@ next: formatter->flush(cout); } + if (opt_cmd == OPT_MDLOG_AUTOTRIM) { + // need a full history for purging old mdlog periods + store->meta_mgr->init_oldest_log_period(); + + RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); + RGWHTTPManager http(store->ctx(), crs.get_completion_mgr()); + int ret = http.set_threaded(); + if (ret < 0) { + cerr << "failed to initialize http client with " << cpp_strerror(ret) << std::endl; + return -ret; + } + + auto num_shards = g_conf->rgw_md_log_max_shards; + ret = crs.run(create_admin_meta_log_trim_cr(store, &http, num_shards)); + if (ret < 0) { + cerr << "automated mdlog trim failed with " << cpp_strerror(ret) << std::endl; + return -ret; + } + } + if (opt_cmd == OPT_MDLOG_TRIM) { utime_t start_time, end_time; diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index 32068d39073..6bc9ef14903 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -231,7 +231,8 @@ public: bufferlist& data, RGWObjVersionTracker *objv_tracker, real_time set_mtime) override; - int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive) override; + int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive, + RGWObjVersionTracker *objv_tracker = nullptr) override; int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state, RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj, @@ -422,7 +423,8 @@ int RGWCache::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time } template -int RGWCache::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive) +int RGWCache::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive, + RGWObjVersionTracker *objv_tracker) { rgw_pool pool; string oid; @@ -436,7 +438,11 @@ int RGWCache::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& da info.status = 0; info.flags = CACHE_FLAG_DATA; } - int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive); + if (objv_tracker) { + info.version = objv_tracker->write_version; + info.flags |= CACHE_FLAG_OBJV; + } + int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker); if (cacheable) { string name = normal_name(pool, oid); if (ret >= 0) { diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 9316d620398..353deaa67ec 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -116,14 +116,14 @@ int RGWSimpleRadosReadAttrsCR::request_complete() int RGWAsyncPutSystemObj::_send_request() { - return store->put_system_obj_data(NULL, obj, bl, -1, exclusive); + return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, objv_tracker); } RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, - const rgw_raw_obj& _obj, bool _exclusive, - bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store), - obj(_obj), exclusive(_exclusive), - bl(_bl) + RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj, + bool _exclusive, bufferlist& _bl) + : RGWAsyncRadosRequest(caller, cn), store(_store), objv_tracker(_objv_tracker), + obj(_obj), exclusive(_exclusive), bl(_bl) { } @@ -315,6 +315,40 @@ int RGWRadosRemoveOmapKeysCR::send_request() { return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op); } +RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj) + : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj) +{ + set_description() << "remove dest=" << obj; +} + +int RGWRadosRemoveCR::send_request() +{ + auto rados = store->get_rados_handle(); + int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx); + if (r < 0) { + lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl; + return r; + } + ioctx.locator_set_key(obj.loc); + + set_status() << "send request"; + + librados::ObjectWriteOperation op; + op.remove(); + + cn = stack->create_completion_notifier(); + return ioctx.aio_operate(obj.oid, cn->completion(), &op); +} + +int RGWRadosRemoveCR::request_complete() +{ + int r = cn->completion()->get_return_value(); + + set_status() << "request complete; ret=" << r; + + return r; +} + RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, const string& _lock_name, @@ -722,6 +756,29 @@ int RGWRadosTimelogTrimCR::request_complete() return r; } + +RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid, + const std::string& to_marker, + std::string *last_trim_marker) + : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{}, + std::string{}, to_marker), + cct(store->ctx()), last_trim_marker(last_trim_marker) +{ +} + +int RGWSyncLogTrimCR::request_complete() +{ + int r = RGWRadosTimelogTrimCR::request_complete(); + if (r < 0 && r != -ENODATA) { + return r; + } + if (*last_trim_marker < to_marker) { + *last_trim_marker = to_marker; + } + return 0; +} + + int RGWAsyncStatObj::_send_request() { rgw_raw_obj raw_obj; diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index ddc9524c8cd..a892b2a6db0 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -1,6 +1,8 @@ #ifndef CEPH_RGW_CR_RADOS_H #define CEPH_RGW_CR_RADOS_H +#include +#include "include/assert.h" #include "rgw_coroutine.h" #include "rgw_rados.h" #include "common/WorkQueue.h" @@ -119,6 +121,7 @@ public: class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest { RGWRados *store; + RGWObjVersionTracker *objv_tracker; rgw_raw_obj obj; bool exclusive; bufferlist bl; @@ -127,8 +130,8 @@ protected: int _send_request() override; public: RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, - const rgw_raw_obj& _obj, bool _exclusive, - bufferlist& _bl); + RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj, + bool _exclusive, bufferlist& _bl); }; class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest { @@ -189,16 +192,18 @@ class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { T *result; /// on ENOENT, call handle_data() with an empty object instead of failing const bool empty_on_enoent; + RGWObjVersionTracker *objv_tracker; RGWAsyncGetSystemObj *req{nullptr}; public: RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, - T *_result, bool empty_on_enoent = true) + T *_result, bool empty_on_enoent = true, + RGWObjVersionTracker *objv_tracker = nullptr) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), obj_ctx(store), obj(_obj), result(_result), - empty_on_enoent(empty_on_enoent) {} + empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {} ~RGWSimpleRadosReadCR() override { request_cleanup(); } @@ -222,7 +227,7 @@ template int RGWSimpleRadosReadCR::send_request() { req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(), - store, &obj_ctx, NULL, + store, &obj_ctx, objv_tracker, obj, &bl, 0, -1); if (pattrs) { @@ -305,17 +310,16 @@ class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine { bufferlist bl; rgw_raw_obj obj; + RGWObjVersionTracker *objv_tracker; - RGWAsyncPutSystemObj *req; + RGWAsyncPutSystemObj *req{nullptr}; public: RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, - const T& _data) : RGWSimpleCoroutine(_store->ctx()), - async_rados(_async_rados), - store(_store), - obj(_obj), - req(NULL) { + const T& _data, RGWObjVersionTracker *objv_tracker = nullptr) + : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), + store(_store), obj(_obj), objv_tracker(objv_tracker) { ::encode(_data, bl); } @@ -332,7 +336,7 @@ public: int send_request() override { req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(), - store, obj, false, bl); + store, objv_tracker, obj, false, bl); async_rados->queue(req); return 0; } @@ -464,6 +468,19 @@ public: } }; +class RGWRadosRemoveCR : public RGWSimpleCoroutine { + RGWRados *store; + librados::IoCtx ioctx; + const rgw_raw_obj obj; + boost::intrusive_ptr cn; + +public: + RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj); + + int send_request(); + int request_complete(); +}; + class RGWSimpleRadosLockCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; @@ -1093,6 +1110,16 @@ class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine { int request_complete() override; }; +// wrapper to update last_trim_marker on success +class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR { + CephContext *cct; + std::string *last_trim_marker; + public: + RGWSyncLogTrimCR(RGWRados *store, const std::string& oid, + const std::string& to_marker, std::string *last_trim_marker); + int request_complete() override; +}; + class RGWAsyncStatObj : public RGWAsyncRadosRequest { RGWRados *store; RGWBucketInfo bucket_info; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index b1383065558..e3f3b079365 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2891,6 +2891,7 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone, } +// TODO: move into rgw_data_sync_trim.cc #undef dout_prefix #define dout_prefix (*_dout << "data trim: ") @@ -2936,28 +2937,7 @@ void take_min_markers(IterIn first, IterIn last, IterOut dest) } } -// wrapper to update last_trim_marker on success -class LastTimelogTrimCR : public RGWRadosTimelogTrimCR { - CephContext *cct; - std::string *last_trim_marker; - public: - LastTimelogTrimCR(RGWRados *store, const std::string& oid, - const std::string& to_marker, std::string *last_trim_marker) - : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{}, - std::string{}, to_marker), - cct(store->ctx()), last_trim_marker(last_trim_marker) - {} - int request_complete() override { - int r = RGWRadosTimelogTrimCR::request_complete(); - if (r < 0 && r != -ENODATA) { - ldout(cct, 1) << "failed to trim datalog: " << cpp_strerror(r) << dendl; - return r; - } - ldout(cct, 10) << "datalog trimmed to marker " << to_marker << dendl; - *last_trim_marker = to_marker; - return 0; - } -}; +} // anonymous namespace class DataLogTrimCR : public RGWCoroutine { RGWRados *store; @@ -3036,7 +3016,7 @@ int DataLogTrimCR::operate() ldout(cct, 10) << "trimming log shard " << i << " at marker=" << stable << " last_trim=" << last_trim[i] << dendl; - using TrimCR = LastTimelogTrimCR; + using TrimCR = RGWSyncLogTrimCR; spawn(new TrimCR(store, store->data_log->get_oid(i), stable, &last_trim[i]), true); @@ -3100,8 +3080,6 @@ int DataLogTrimPollCR::operate() return 0; } -} // anonymous namespace - RGWCoroutine* create_data_log_trim_cr(RGWRados *store, RGWHTTPManager *http, int num_shards, utime_t interval) diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index e26201bc3be..0a0f3dec324 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -313,28 +313,6 @@ public: static RGWMetadataTopHandler md_top_handler; -static const std::string mdlog_history_oid = "meta.history"; - -struct RGWMetadataLogHistory { - epoch_t oldest_realm_epoch; - std::string oldest_period_id; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - ::encode(oldest_realm_epoch, bl); - ::encode(oldest_period_id, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::iterator& p) { - DECODE_START(1, p); - ::decode(oldest_realm_epoch, p); - ::decode(oldest_period_id, p); - DECODE_FINISH(p); - } -}; -WRITE_CLASS_ENCODER(RGWMetadataLogHistory) - - RGWMetadataManager::RGWMetadataManager(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store) { @@ -351,15 +329,18 @@ RGWMetadataManager::~RGWMetadataManager() handlers.clear(); } +const std::string RGWMetadataLogHistory::oid = "meta.history"; + namespace { -int read_history(RGWRados *store, RGWMetadataLogHistory *state) +int read_history(RGWRados *store, RGWMetadataLogHistory *state, + RGWObjVersionTracker *objv_tracker) { RGWObjectCtx ctx{store}; auto& pool = store->get_zone_params().log_pool; - const auto& oid = mdlog_history_oid; + const auto& oid = RGWMetadataLogHistory::oid; bufferlist bl; - int ret = rgw_get_system_obj(store, ctx, pool, oid, bl, nullptr, nullptr); + int ret = rgw_get_system_obj(store, ctx, pool, oid, bl, objv_tracker, nullptr); if (ret < 0) { return ret; } @@ -375,19 +356,141 @@ int read_history(RGWRados *store, RGWMetadataLogHistory *state) } int write_history(RGWRados *store, const RGWMetadataLogHistory& state, - bool exclusive = false) + RGWObjVersionTracker *objv_tracker, bool exclusive = false) { bufferlist bl; state.encode(bl); auto& pool = store->get_zone_params().log_pool; - const auto& oid = mdlog_history_oid; + const auto& oid = RGWMetadataLogHistory::oid; return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), - exclusive, nullptr, real_time{}); + exclusive, objv_tracker, real_time{}); } using Cursor = RGWPeriodHistory::Cursor; +/// read the mdlog history and use it to initialize the given cursor +class ReadHistoryCR : public RGWCoroutine { + RGWRados *store; + Cursor *cursor; + RGWObjVersionTracker *objv_tracker; + RGWMetadataLogHistory state; + public: + ReadHistoryCR(RGWRados *store, Cursor *cursor, + RGWObjVersionTracker *objv_tracker) + : RGWCoroutine(store->ctx()), store(store), cursor(cursor), + objv_tracker(objv_tracker) + {} + + int operate() { + reenter(this) { + yield { + rgw_raw_obj obj{store->get_zone_params().log_pool, + RGWMetadataLogHistory::oid}; + constexpr bool empty_on_enoent = false; + + using ReadCR = RGWSimpleRadosReadCR; + call(new ReadCR(store->get_async_rados(), store, obj, + &state, empty_on_enoent, objv_tracker)); + } + if (retcode < 0) { + ldout(cct, 1) << "failed to read mdlog history: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + *cursor = store->period_history->lookup(state.oldest_realm_epoch); + if (!*cursor) { + return set_cr_error(cursor->get_error()); + } + + ldout(cct, 10) << "read mdlog history with oldest period id=" + << state.oldest_period_id << " realm_epoch=" + << state.oldest_realm_epoch << dendl; + return set_cr_done(); + } + return 0; + } +}; + +/// write the given cursor to the mdlog history +class WriteHistoryCR : public RGWCoroutine { + RGWRados *store; + Cursor cursor; + RGWObjVersionTracker *objv; + RGWMetadataLogHistory state; + public: + WriteHistoryCR(RGWRados *store, const Cursor& cursor, + RGWObjVersionTracker *objv) + : RGWCoroutine(store->ctx()), store(store), cursor(cursor), objv(objv) + {} + + int operate() { + reenter(this) { + state.oldest_period_id = cursor.get_period().get_id(); + state.oldest_realm_epoch = cursor.get_epoch(); + + yield { + rgw_raw_obj obj{store->get_zone_params().log_pool, + RGWMetadataLogHistory::oid}; + + using WriteCR = RGWSimpleRadosWriteCR; + call(new WriteCR(store->get_async_rados(), store, obj, state, objv)); + } + if (retcode < 0) { + ldout(cct, 1) << "failed to write mdlog history: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + ldout(cct, 10) << "wrote mdlog history with oldest period id=" + << state.oldest_period_id << " realm_epoch=" + << state.oldest_realm_epoch << dendl; + return set_cr_done(); + } + return 0; + } +}; + +/// update the mdlog history to reflect trimmed logs +class TrimHistoryCR : public RGWCoroutine { + RGWRados *store; + const Cursor cursor; //< cursor to trimmed period + RGWObjVersionTracker *objv; //< to prevent racing updates + Cursor next; //< target cursor for oldest log period + Cursor existing; //< existing cursor read from disk + + public: + TrimHistoryCR(RGWRados *store, Cursor cursor, RGWObjVersionTracker *objv) + : RGWCoroutine(store->ctx()), + store(store), cursor(cursor), objv(objv), next(cursor) + { + next.next(); // advance past cursor + } + + int operate() { + reenter(this) { + // read an existing history, and write the new history if it's newer + yield call(new ReadHistoryCR(store, &existing, objv)); + if (retcode < 0) { + return set_cr_error(retcode); + } + // reject older trims with ECANCELED + if (cursor.get_epoch() < existing.get_epoch()) { + ldout(cct, 4) << "found oldest log epoch=" << existing.get_epoch() + << ", rejecting trim at epoch=" << cursor.get_epoch() << dendl; + return set_cr_error(-ECANCELED); + } + // overwrite with updated history + yield call(new WriteHistoryCR(store, next, objv)); + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; + } +}; + // traverse all the way back to the beginning of the period history, and // return a cursor to the first period in a fully attached history Cursor find_oldest_period(RGWRados *store) @@ -431,7 +534,8 @@ Cursor RGWMetadataManager::init_oldest_log_period() { // read the mdlog history RGWMetadataLogHistory state; - int ret = read_history(store, &state); + RGWObjVersionTracker objv; + int ret = read_history(store, &state, &objv); if (ret == -ENOENT) { // initialize the mdlog history and write it @@ -446,7 +550,7 @@ Cursor RGWMetadataManager::init_oldest_log_period() state.oldest_period_id = cursor.get_period().get_id(); constexpr bool exclusive = true; // don't overwrite - int ret = write_history(store, state, exclusive); + int ret = write_history(store, state, &objv, exclusive); if (ret < 0 && ret != -EEXIST) { ldout(cct, 1) << "failed to write mdlog history: " << cpp_strerror(ret) << dendl; @@ -486,7 +590,7 @@ Cursor RGWMetadataManager::init_oldest_log_period() Cursor RGWMetadataManager::read_oldest_log_period() const { RGWMetadataLogHistory state; - int ret = read_history(store, &state); + int ret = read_history(store, &state, nullptr); if (ret < 0) { ldout(store->ctx(), 1) << "failed to read mdlog history: " << cpp_strerror(ret) << dendl; @@ -500,6 +604,18 @@ Cursor RGWMetadataManager::read_oldest_log_period() const return store->period_history->lookup(state.oldest_realm_epoch); } +RGWCoroutine* RGWMetadataManager::read_oldest_log_period_cr(Cursor *period, + RGWObjVersionTracker *objv) const +{ + return new ReadHistoryCR(store, period, objv); +} + +RGWCoroutine* RGWMetadataManager::trim_log_period_cr(Cursor period, + RGWObjVersionTracker *objv) const +{ + return new TrimHistoryCR(store, period, objv); +} + int RGWMetadataManager::init(const std::string& current_period) { // open a log for the current period diff --git a/src/rgw/rgw_metadata.h b/src/rgw/rgw_metadata.h index 8b7526399a8..4d077e8f888 100644 --- a/src/rgw/rgw_metadata.h +++ b/src/rgw/rgw_metadata.h @@ -18,6 +18,7 @@ class RGWRados; +class RGWCoroutine; class JSONObj; struct RGWObjVersionTracker; @@ -265,6 +266,27 @@ struct RGWMetadataLogData { }; WRITE_CLASS_ENCODER(RGWMetadataLogData) +struct RGWMetadataLogHistory { + epoch_t oldest_realm_epoch; + std::string oldest_period_id; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(oldest_realm_epoch, bl); + ::encode(oldest_period_id, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& p) { + DECODE_START(1, p); + ::decode(oldest_realm_epoch, p); + ::decode(oldest_period_id, p); + DECODE_FINISH(p); + } + + static const std::string oid; +}; +WRITE_CLASS_ENCODER(RGWMetadataLogHistory) + class RGWMetadataManager { map handlers; CephContext *cct; @@ -303,6 +325,16 @@ public: /// period history RGWPeriodHistory::Cursor read_oldest_log_period() const; + /// read the oldest log period asynchronously and write its result to the + /// given cursor pointer + RGWCoroutine* read_oldest_log_period_cr(RGWPeriodHistory::Cursor *period, + RGWObjVersionTracker *objv) const; + + /// try to advance the oldest log period when the given period is trimmed, + /// using a rados lock to provide atomicity + RGWCoroutine* trim_log_period_cr(RGWPeriodHistory::Cursor period, + RGWObjVersionTracker *objv) const; + /// find or create the metadata log for the given period RGWMetadataLog* get_log(const std::string& period); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 10d29a4abf7..562b067beb2 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3229,9 +3229,20 @@ public: return http.set_threaded(); } int process() override { - crs.run(create_data_log_trim_cr(store, &http, - cct->_conf->rgw_data_log_num_shards, - trim_interval)); + list stacks; + auto meta = new RGWCoroutinesStack(store->ctx(), &crs); + meta->call(create_meta_log_trim_cr(store, &http, + cct->_conf->rgw_md_log_max_shards, + trim_interval)); + stacks.push_back(meta); + + auto data = new RGWCoroutinesStack(store->ctx(), &crs); + data->call(create_data_log_trim_cr(store, &http, + cct->_conf->rgw_data_log_num_shards, + trim_interval)); + stacks.push_back(data); + + crs.run(stacks); return 0; } }; @@ -4164,7 +4175,8 @@ int RGWRados::init_complete() /* no point of running sync thread if we don't have a master zone configured or there is no rest_master_conn */ - if (get_zonegroup().master_zone.empty() || !rest_master_conn) { + if (get_zonegroup().master_zone.empty() || !rest_master_conn + || current_period.get_id().empty()) { run_sync_thread = false; } @@ -6709,7 +6721,8 @@ int RGWRados::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mt } int RGWRados::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, - off_t ofs, bool exclusive) + off_t ofs, bool exclusive, + RGWObjVersionTracker *objv_tracker) { rgw_rados_ref ref; rgw_pool pool; @@ -6723,6 +6736,9 @@ int RGWRados::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, if (exclusive) op.create(true); + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } if (ofs == -1) { op.write_full(bl); } else { @@ -6732,6 +6748,9 @@ int RGWRados::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, if (r < 0) return r; + if (objv_tracker) { + objv_tracker->apply_write(); + } return 0; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 08a1b9aefe9..cbb6610235e 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2924,7 +2924,8 @@ public: ceph::real_time set_mtime /* 0 for don't set */); virtual int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, - off_t ofs, bool exclusive); + off_t ofs, bool exclusive, + RGWObjVersionTracker *objv_tracker = nullptr); int aio_put_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive, void **handle); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 94ead4aea29..9d8d250132b 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -2313,3 +2313,690 @@ int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete() } +// TODO: move into rgw_sync_trim.cc +#undef dout_prefix +#define dout_prefix (*_dout << "meta trim: ") + +/// purge all log shards for the given mdlog +class PurgeLogShardsCR : public RGWShardCollectCR { + RGWRados *const store; + const RGWMetadataLog* mdlog; + const int num_shards; + rgw_raw_obj obj; + int i{0}; + + static constexpr int max_concurrent = 16; + + public: + PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog, + const rgw_pool& pool, int num_shards) + : RGWShardCollectCR(store->ctx(), max_concurrent), + store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "") + {} + + bool spawn_next() override { + if (i == num_shards) { + return false; + } + mdlog->get_shard_oid(i++, obj.oid); + spawn(new RGWRadosRemoveCR(store, obj), false); + return true; + } +}; + +using Cursor = RGWPeriodHistory::Cursor; + +/// purge mdlogs from the oldest up to (but not including) the given realm_epoch +class PurgePeriodLogsCR : public RGWCoroutine { + RGWRados *const store; + RGWMetadataManager *const metadata; + RGWObjVersionTracker objv; + Cursor cursor; + epoch_t realm_epoch; + epoch_t *last_trim_epoch; //< update last trim on success + + public: + PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim) + : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr), + realm_epoch(realm_epoch), last_trim_epoch(last_trim) + {} + + int operate(); +}; + +int PurgePeriodLogsCR::operate() +{ + reenter(this) { + // read our current oldest log period + yield call(metadata->read_oldest_log_period_cr(&cursor, &objv)); + if (retcode < 0) { + return set_cr_error(retcode); + } + assert(cursor); + ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch() + << " period=" << cursor.get_period().get_id() << dendl; + + // trim -up to- the given realm_epoch + while (cursor.get_epoch() < realm_epoch) { + ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch() + << " period=" << cursor.get_period().get_id() << dendl; + yield { + const auto mdlog = metadata->get_log(cursor.get_period().get_id()); + const auto& pool = store->get_zone_params().log_pool; + auto num_shards = cct->_conf->rgw_md_log_max_shards; + call(new PurgeLogShardsCR(store, mdlog, pool, num_shards)); + } + if (retcode < 0) { + ldout(cct, 1) << "failed to remove log shards: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch() + << " period=" << cursor.get_period().get_id() << dendl; + + // update our mdlog history + yield call(metadata->trim_log_period_cr(cursor, &objv)); + if (retcode == -ENOENT) { + // must have raced to update mdlog history. return success and allow the + // winner to continue purging + ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch() + << " period=" << cursor.get_period().get_id() << dendl; + return set_cr_done(); + } else if (retcode < 0) { + ldout(cct, 1) << "failed to remove log shards for realm_epoch=" + << cursor.get_epoch() << " period=" << cursor.get_period().get_id() + << " with: " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + if (*last_trim_epoch < cursor.get_epoch()) { + *last_trim_epoch = cursor.get_epoch(); + } + + assert(cursor.has_next()); // get_current() should always come after + cursor.next(); + } + return set_cr_done(); + } + return 0; +} + +namespace { + +using connection_map = std::map>; + +/// construct a RGWRESTConn for each zone in the realm +template +connection_map make_peer_connections(RGWRados *store, + const Zonegroups& zonegroups) +{ + connection_map connections; + for (auto& g : zonegroups) { + for (auto& z : g.second.zones) { + std::unique_ptr conn{ + new RGWRESTConn(store->ctx(), store, z.first, z.second.endpoints)}; + connections.emplace(z.first, std::move(conn)); + } + } + return connections; +} + +/// return the marker that it's safe to trim up to +const std::string& get_stable_marker(const rgw_meta_sync_marker& m) +{ + return m.state == m.FullSync ? m.next_step_marker : m.marker; +} + +/// comparison operator for take_min_status() +bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs) +{ + // sort by stable marker + return get_stable_marker(lhs) < get_stable_marker(rhs); +} + +/// populate the status with the minimum stable marker of each shard for any +/// peer whose realm_epoch matches the minimum realm_epoch in the input +template +int take_min_status(CephContext *cct, Iter first, Iter last, + rgw_meta_sync_status *status) +{ + if (first == last) { + return -EINVAL; + } + const size_t num_shards = cct->_conf->rgw_md_log_max_shards; + + status->sync_info.realm_epoch = std::numeric_limits::max(); + for (auto p = first; p != last; ++p) { + // validate peer's shard count + if (p->sync_markers.size() != num_shards) { + ldout(cct, 1) << "take_min_status got peer status with " + << p->sync_markers.size() << " shards, expected " + << num_shards << dendl; + return -EINVAL; + } + if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) { + // earlier epoch, take its entire status + *status = std::move(*p); + } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) { + // same epoch, take any earlier markers + auto m = status->sync_markers.begin(); + for (auto& shard : p->sync_markers) { + if (shard.second < m->second) { + m->second = std::move(shard.second); + } + ++m; + } + } + } + return 0; +} + +struct TrimEnv { + RGWRados *const store; + RGWHTTPManager *const http; + int num_shards; + const std::string& zone; + Cursor current; //< cursor to current period + epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged + + TrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards) + : store(store), http(http), num_shards(num_shards), + zone(store->get_zone_params().get_id()), + current(store->period_history->get_current()) + {} +}; + +struct MasterTrimEnv : public TrimEnv { + connection_map connections; //< peer connections + std::vector peer_status; //< sync status for each peer + /// last trim marker for each shard, only applies to current period's mdlog + std::vector last_trim_markers; + + MasterTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards) + : TrimEnv(store, http, num_shards), + last_trim_markers(num_shards) + { + auto& period = current.get_period(); + connections = make_peer_connections(store, period.get_map().zonegroups); + connections.erase(zone); + peer_status.resize(connections.size()); + } +}; + +struct PeerTrimEnv : public TrimEnv { + /// last trim timestamp for each shard, only applies to current period's mdlog + std::vector last_trim_timestamps; + + PeerTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards) + : TrimEnv(store, http, num_shards), + last_trim_timestamps(num_shards) + {} + + void set_num_shards(int num_shards) { + this->num_shards = num_shards; + last_trim_timestamps.resize(num_shards); + } +}; + +} // anonymous namespace + + +/// spawn a trim cr for each shard that needs it, while limiting the number +/// of concurrent shards +class MetaMasterTrimShardCollectCR : public RGWShardCollectCR { + private: + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + MasterTrimEnv& env; + RGWMetadataLog *mdlog; + int shard_id{0}; + std::string oid; + const rgw_meta_sync_status& sync_status; + + public: + MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog, + const rgw_meta_sync_status& sync_status) + : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), + env(env), mdlog(mdlog), sync_status(sync_status) + {} + + bool spawn_next() override; +}; + +bool MetaMasterTrimShardCollectCR::spawn_next() +{ + while (shard_id < env.num_shards) { + auto m = sync_status.sync_markers.find(shard_id); + if (m == sync_status.sync_markers.end()) { + shard_id++; + continue; + } + auto& stable = get_stable_marker(m->second); + auto& last_trim = env.last_trim_markers[shard_id]; + + if (stable <= last_trim) { + // already trimmed + ldout(cct, 20) << "skipping log shard " << shard_id + << " at marker=" << stable + << " last_trim=" << last_trim + << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl; + shard_id++; + continue; + } + + mdlog->get_shard_oid(shard_id, oid); + + ldout(cct, 10) << "trimming log shard " << shard_id + << " at marker=" << stable + << " last_trim=" << last_trim + << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl; + spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false); + shard_id++; + return true; + } + return false; +} + +/// spawn rest requests to read each peer's sync status +class MetaMasterStatusCollectCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + MasterTrimEnv& env; + connection_map::iterator c; + std::vector::iterator s; + public: + MetaMasterStatusCollectCR(MasterTrimEnv& env) + : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), + env(env), c(env.connections.begin()), s(env.peer_status.begin()) + {} + + bool spawn_next() override { + if (c == env.connections.end()) { + return false; + } + static rgw_http_param_pair params[] = { + { "type", "metadata" }, + { "status", nullptr }, + { nullptr, nullptr } + }; + + ldout(cct, 20) << "query sync status from " << c->first << dendl; + auto conn = c->second.get(); + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s), + false); + ++c; + ++s; + return true; + } +}; + +class MetaMasterTrimCR : public RGWCoroutine { + MasterTrimEnv& env; + rgw_meta_sync_status min_status; //< minimum sync status of all peers + int ret{0}; + + public: + MetaMasterTrimCR(MasterTrimEnv& env) + : RGWCoroutine(env.store->ctx()), env(env) + {} + + int operate(); +}; + +int MetaMasterTrimCR::operate() +{ + reenter(this) { + // TODO: detect this and fail before we spawn the trim thread? + if (env.connections.empty()) { + ldout(cct, 4) << "no peers, exiting" << dendl; + return set_cr_done(); + } + + ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl; + // query mdlog sync status from peers + yield call(new MetaMasterStatusCollectCR(env)); + + // must get a successful reply from all peers to consider trimming + if (ret < 0) { + ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; + return set_cr_error(ret); + } + + // determine the minimum epoch and markers + ret = take_min_status(env.store->ctx(), env.peer_status.begin(), + env.peer_status.end(), &min_status); + if (ret < 0) { + ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl; + return set_cr_error(ret); + } + yield { + auto store = env.store; + auto epoch = min_status.sync_info.realm_epoch; + ldout(cct, 4) << "realm epoch min=" << epoch + << " current=" << env.current.get_epoch()<< dendl; + if (epoch > env.last_trim_epoch + 1) { + // delete any prior mdlog periods + spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true); + } else { + ldout(cct, 10) << "mdlogs already purged up to realm_epoch " + << env.last_trim_epoch << dendl; + } + + // if realm_epoch == current, trim mdlog based on markers + if (epoch == env.current.get_epoch()) { + auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id()); + spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true); + } + } + // ignore any errors during purge/trim because we want to hold the lock open + return set_cr_done(); + } + return 0; +} + + +/// read the first entry of the master's mdlog shard and trim to that position +class MetaPeerTrimShardCR : public RGWCoroutine { + RGWMetaSyncEnv& env; + RGWMetadataLog *mdlog; + const std::string& period_id; + const int shard_id; + RGWMetadataLogInfo info; + ceph::real_time stable; //< safe timestamp to trim, according to master + ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim + rgw_mdlog_shard_data result; //< result from master's mdlog listing + + public: + MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog, + const std::string& period_id, int shard_id, + ceph::real_time *last_trim) + : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog), + period_id(period_id), shard_id(shard_id), last_trim(last_trim) + {} + + int operate() override; +}; + +int MetaPeerTrimShardCR::operate() +{ + reenter(this) { + // query master's first mdlog entry for this shard + yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id, + "", 1, &result)); + if (retcode < 0) { + ldout(cct, 5) << "failed to read first entry from master's mdlog shard " + << shard_id << " for period " << period_id + << ": " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + if (result.entries.empty()) { + // if there are no mdlog entries, we don't have a timestamp to compare. we + // can't just trim everything, because there could be racing updates since + // this empty reply. query the mdlog shard info to read its max timestamp, + // then retry the listing to make sure it's still empty before trimming to + // that + ldout(cct, 10) << "empty master mdlog shard " << shard_id + << ", reading last timestamp from shard info" << dendl; + // read the mdlog shard info for the last timestamp + using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR; + yield call(new ShardInfoCR(&env, period_id, shard_id, &info)); + if (retcode < 0) { + ldout(cct, 5) << "failed to read info from master's mdlog shard " + << shard_id << " for period " << period_id + << ": " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + if (ceph::real_clock::is_zero(info.last_update)) { + return set_cr_done(); // nothing to trim + } + ldout(cct, 10) << "got mdlog shard info with last update=" + << info.last_update << dendl; + // re-read the master's first mdlog entry to make sure it hasn't changed + yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id, + "", 1, &result)); + if (retcode < 0) { + ldout(cct, 5) << "failed to read first entry from master's mdlog shard " + << shard_id << " for period " << period_id + << ": " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + // if the mdlog is still empty, trim to max marker + if (result.entries.empty()) { + stable = info.last_update; + } else { + stable = result.entries.front().timestamp; + + // can only trim -up to- master's first timestamp, so subtract a second. + // (this is why we use timestamps instead of markers for the peers) + stable -= std::chrono::seconds(1); + } + } else { + stable = result.entries.front().timestamp; + stable -= std::chrono::seconds(1); + } + + if (stable <= *last_trim) { + ldout(cct, 10) << "skipping log shard " << shard_id + << " at timestamp=" << stable + << " last_trim=" << *last_trim << dendl; + return set_cr_done(); + } + + ldout(cct, 10) << "trimming log shard " << shard_id + << " at timestamp=" << stable + << " last_trim=" << *last_trim << dendl; + yield { + std::string oid; + mdlog->get_shard_oid(shard_id, oid); + call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", "")); + } + if (retcode < 0 && retcode != -ENODATA) { + ldout(cct, 1) << "failed to trim mdlog shard " << shard_id + << ": " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + *last_trim = stable; + return set_cr_done(); + } + return 0; +} + +class MetaPeerTrimShardCollectCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + PeerTrimEnv& env; + RGWMetadataLog *mdlog; + const std::string& period_id; + RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR + int shard_id{0}; + + public: + MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog) + : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS), + env(env), mdlog(mdlog), period_id(env.current.get_period().get_id()) + { + meta_env.init(cct, env.store, env.store->rest_master_conn, + env.store->get_async_rados(), env.http, nullptr); + } + + bool spawn_next() override; +}; + +bool MetaPeerTrimShardCollectCR::spawn_next() +{ + if (shard_id >= env.num_shards) { + return false; + } + auto& last_trim = env.last_trim_timestamps[shard_id]; + spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim), + false); + shard_id++; + return true; +} + +class MetaPeerTrimCR : public RGWCoroutine { + PeerTrimEnv& env; + rgw_mdlog_info mdlog_info; //< master's mdlog info + + public: + MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {} + + int operate(); +}; + +int MetaPeerTrimCR::operate() +{ + reenter(this) { + ldout(cct, 10) << "fetching master mdlog info" << dendl; + yield { + // query mdlog_info from master for oldest_log_period + rgw_http_param_pair params[] = { + { "type", "metadata" }, + { nullptr, nullptr } + }; + + using LogInfoCR = RGWReadRESTResourceCR; + call(new LogInfoCR(cct, env.store->rest_master_conn, env.http, + "/admin/log/", params, &mdlog_info)); + } + if (retcode < 0) { + ldout(cct, 4) << "failed to read mdlog info from master" << dendl; + return set_cr_error(retcode); + } + // use master's shard count instead + env.set_num_shards(mdlog_info.num_shards); + + if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) { + // delete any prior mdlog periods + yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch, + &env.last_trim_epoch)); + } else { + ldout(cct, 10) << "mdlogs already purged through realm_epoch " + << env.last_trim_epoch << dendl; + } + + // if realm_epoch == current, trim mdlog based on master's markers + if (mdlog_info.realm_epoch == env.current.get_epoch()) { + yield { + auto meta_mgr = env.store->meta_mgr; + auto mdlog = meta_mgr->get_log(env.current.get_period().get_id()); + call(new MetaPeerTrimShardCollectCR(env, mdlog)); + // ignore any errors during purge/trim because we want to hold the lock open + } + } + return set_cr_done(); + } + return 0; +} + +class MetaTrimPollCR : public RGWCoroutine { + RGWRados *const store; + const utime_t interval; //< polling interval + const rgw_raw_obj obj; + const std::string name{"meta_trim"}; //< lock name + const std::string cookie; + + protected: + /// allocate the coroutine to run within the lease + virtual RGWCoroutine* alloc_cr() = 0; + + public: + MetaTrimPollCR(RGWRados *store, utime_t interval) + : RGWCoroutine(store->ctx()), store(store), interval(interval), + obj(store->get_zone_params().log_pool, RGWMetadataLogHistory::oid), + cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) + {} + + int operate(); +}; + +int MetaTrimPollCR::operate() +{ + reenter(this) { + for (;;) { + set_status("sleeping"); + wait(interval); + + // prevent others from trimming for our entire wait interval + set_status("acquiring trim lock"); + yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, + obj, name, cookie, interval.sec())); + if (retcode < 0) { + ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; + continue; + } + + set_status("trimming"); + yield call(alloc_cr()); + + if (retcode < 0) { + // on errors, unlock so other gateways can try + set_status("unlocking"); + yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store, + obj, name, cookie)); + } + } + } + return 0; +} + +class MetaMasterTrimPollCR : public MetaTrimPollCR { + MasterTrimEnv env; //< trim state to share between calls + RGWCoroutine* alloc_cr() override { + return new MetaMasterTrimCR(env); + } + public: + MetaMasterTrimPollCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : MetaTrimPollCR(store, interval), + env(store, http, num_shards) + {} +}; + +class MetaPeerTrimPollCR : public MetaTrimPollCR { + PeerTrimEnv env; //< trim state to share between calls + RGWCoroutine* alloc_cr() override { + return new MetaPeerTrimCR(env); + } + public: + MetaPeerTrimPollCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : MetaTrimPollCR(store, interval), + env(store, http, num_shards) + {} +}; + +RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) +{ + if (store->is_meta_master()) { + return new MetaMasterTrimPollCR(store, http, num_shards, interval); + } + return new MetaPeerTrimPollCR(store, http, num_shards, interval); +} + + +struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR { + MetaMasterAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards) + : MasterTrimEnv(store, http, num_shards), + MetaMasterTrimCR(*static_cast(this)) + {} +}; + +struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR { + MetaPeerAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards) + : PeerTrimEnv(store, http, num_shards), + MetaPeerTrimCR(*static_cast(this)) + {} +}; + +RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store, + RGWHTTPManager *http, + int num_shards) +{ + if (store->is_meta_master()) { + return new MetaMasterAdminTrimCR(store, http, num_shards); + } + return new MetaPeerAdminTrimCR(store, http, num_shards); +} diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 40f15e10f64..c651f7a9ad1 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -452,5 +452,13 @@ public: int operate() override; }; +// MetaLogTrimCR factory function +RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval); + +// factory function for mdlog trim via radosgw-admin +RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store, + RGWHTTPManager *http, + int num_shards); #endif diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index beacdc88cad..4b19a5ca256 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -53,6 +53,17 @@ def get_zone_connection(zone, credentials): credentials = credentials[0] return get_gateway_connection(zone.gateways[0], credentials) +def mdlog_list(zone, period = None): + cmd = ['mdlog', 'list'] + if period: + cmd += ['--period', period] + (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True) + mdlog_json = mdlog_json.decode('utf-8') + return json.loads(mdlog_json) + +def mdlog_autotrim(zone): + zone.cluster.admin(['mdlog', 'autotrim']) + def meta_sync_status(zone): while True: cmd = ['metadata', 'sync', 'status'] + zone.zone_args() @@ -654,6 +665,9 @@ def test_multi_period_incremental_sync(): if len(zonegroup.zones) < 3: raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.") + # periods to include in mdlog comparison + mdlog_periods = [realm.current_period.id] + # create a bucket in each zone buckets = [] for zone in zonegroup.zones: @@ -673,6 +687,7 @@ def test_multi_period_incremental_sync(): # change master to zone 2 -> period 2 set_master_zone(z2) + mdlog_periods += [realm.current_period.id] # create another bucket in each zone, except for z3 for zone in zonegroup.zones: @@ -689,6 +704,7 @@ def test_multi_period_incremental_sync(): # change master back to zone 1 -> period 3 set_master_zone(z1) + mdlog_periods += [realm.current_period.id] # create another bucket in each zone, except for z3 for zone in zonegroup.zones: @@ -709,6 +725,31 @@ def test_multi_period_incremental_sync(): for source_zone, target_zone in combinations(zonegroup.zones, 2): check_bucket_eq(source_zone, target_zone, bucket_name) + # verify that mdlogs are not empty and match for each period + for period in mdlog_periods: + master_mdlog = mdlog_list(z1, period) + assert len(master_mdlog) > 0 + for zone in zonegroup.zones: + if zone == z1: + continue + mdlog = mdlog_list(zone, period) + assert len(mdlog) == len(master_mdlog) + + # autotrim mdlogs for master zone + mdlog_autotrim(z1) + + # autotrim mdlogs for peers + for zone in zonegroup.zones: + if zone == z1: + continue + mdlog_autotrim(zone) + + # verify that mdlogs are empty for each period + for period in mdlog_periods: + for zone in zonegroup.zones: + mdlog = mdlog_list(zone, period) + assert len(mdlog) == 0 + def test_zonegroup_remove(): zonegroup = realm.master_zonegroup() if len(zonegroup.zones) < 2: