mirror of
https://github.com/ceph/ceph
synced 2025-02-19 00:47:49 +00:00
Merge pull request #10558 from cbodley/wip-rgw-mdlog-history
rgw: store oldest mdlog period in rados Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
commit
69b11838fe
@ -3,6 +3,7 @@
|
||||
|
||||
#include <boost/intrusive_ptr.hpp>
|
||||
#include "common/ceph_json.h"
|
||||
#include "common/errno.h"
|
||||
#include "rgw_metadata.h"
|
||||
#include "rgw_coroutine.h"
|
||||
#include "cls/version/cls_version_types.h"
|
||||
@ -345,6 +346,29 @@ public:
|
||||
|
||||
static RGWMetadataTopHandler md_top_handler;
|
||||
|
||||
|
||||
static const std::string mdlog_history_oid = "meta.history";
|
||||
|
||||
struct RGWMetadataLogHistory {
|
||||
epoch_t oldest_realm_epoch;
|
||||
std::string oldest_period_id;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(1, 1, bl);
|
||||
::encode(oldest_realm_epoch, bl);
|
||||
::encode(oldest_period_id, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
void decode(bufferlist::iterator& p) {
|
||||
DECODE_START(1, p);
|
||||
::decode(oldest_realm_epoch, p);
|
||||
::decode(oldest_period_id, p);
|
||||
DECODE_FINISH(p);
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(RGWMetadataLogHistory)
|
||||
|
||||
|
||||
RGWMetadataManager::RGWMetadataManager(CephContext *_cct, RGWRados *_store)
|
||||
: cct(_cct), store(_store)
|
||||
{
|
||||
@ -363,93 +387,55 @@ RGWMetadataManager::~RGWMetadataManager()
|
||||
|
||||
namespace {
|
||||
|
||||
class FindAnyShardCR : public RGWCoroutine {
|
||||
RGWRados *const store;
|
||||
const RGWMetadataLog& mdlog;
|
||||
const int num_shards;
|
||||
int ret = 0;
|
||||
public:
|
||||
FindAnyShardCR(RGWRados *store, const RGWMetadataLog& mdlog, int num_shards)
|
||||
: RGWCoroutine(store->ctx()), store(store), mdlog(mdlog),
|
||||
num_shards(num_shards) {}
|
||||
|
||||
int operate() {
|
||||
reenter(this) {
|
||||
// send stat requests for each shard in parallel
|
||||
yield {
|
||||
auto async_rados = store->get_async_rados();
|
||||
auto& pool = store->get_zone_params().log_pool;
|
||||
auto oid = std::string{};
|
||||
|
||||
for (int i = 0; i < num_shards; i++) {
|
||||
mdlog.get_shard_oid(i, oid);
|
||||
auto obj = rgw_obj{pool, oid};
|
||||
spawn(new RGWStatObjCR(async_rados, store, obj), true);
|
||||
}
|
||||
}
|
||||
drain_all();
|
||||
// if any shards were found, return success
|
||||
while (collect_next(&ret)) {
|
||||
if (ret == 0) {
|
||||
// TODO: cancel instead of waiting for the rest
|
||||
return set_cr_done();
|
||||
}
|
||||
ret = 0; // collect_next() won't modify &ret unless it's a failure
|
||||
}
|
||||
// no shards found
|
||||
set_retcode(-ENOENT);
|
||||
return set_cr_error(-ENOENT);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
// return true if any log shards exist for the given period
|
||||
int find_shards_for_period(RGWRados *store, const std::string& period_id)
|
||||
int read_history(RGWRados *store, RGWMetadataLogHistory *state)
|
||||
{
|
||||
auto cct = store->ctx();
|
||||
RGWMetadataLog mdlog(cct, store, period_id);
|
||||
auto num_shards = cct->_conf->rgw_md_log_max_shards;
|
||||
|
||||
using FindAnyShardCRRef = boost::intrusive_ptr<FindAnyShardCR>;
|
||||
auto cr = FindAnyShardCRRef{new FindAnyShardCR(store, mdlog, num_shards)};
|
||||
|
||||
RGWCoroutinesManager mgr(cct, nullptr);
|
||||
int r = mgr.run(cr.get());
|
||||
if (r < 0) {
|
||||
return r;
|
||||
RGWObjectCtx ctx{store};
|
||||
auto& pool = store->get_zone_params().log_pool;
|
||||
const auto& oid = mdlog_history_oid;
|
||||
bufferlist bl;
|
||||
int ret = rgw_get_system_obj(store, ctx, pool, oid, bl, nullptr, nullptr);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
return cr->get_ret_status();
|
||||
try {
|
||||
auto p = bl.begin();
|
||||
state->decode(p);
|
||||
} catch (buffer::error& e) {
|
||||
ldout(store->ctx(), 1) << "failed to decode the mdlog history: "
|
||||
<< e.what() << dendl;
|
||||
return -EIO;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
RGWPeriodHistory::Cursor find_oldest_log_period(RGWRados *store)
|
||||
int write_history(RGWRados *store, const RGWMetadataLogHistory& state,
|
||||
bool exclusive = false)
|
||||
{
|
||||
// search backwards through the period history for the first period with no
|
||||
// log shard objects, and return its successor (some shards may be missing
|
||||
// if they contain no metadata yet, so we need to check all shards)
|
||||
bufferlist bl;
|
||||
state.encode(bl);
|
||||
|
||||
auto& pool = store->get_zone_params().log_pool;
|
||||
const auto& oid = mdlog_history_oid;
|
||||
return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(),
|
||||
exclusive, nullptr, real_time{});
|
||||
}
|
||||
|
||||
using Cursor = RGWPeriodHistory::Cursor;
|
||||
|
||||
// traverse all the way back to the beginning of the period history, and
|
||||
// return a cursor to the first period in a fully attached history
|
||||
Cursor find_oldest_period(RGWRados *store)
|
||||
{
|
||||
auto cct = store->ctx();
|
||||
auto cursor = store->period_history->get_current();
|
||||
auto oldest_log = cursor;
|
||||
|
||||
while (cursor) {
|
||||
// search for an existing log shard object for this period
|
||||
int r = find_shards_for_period(store, cursor.get_period().get_id());
|
||||
if (r == -ENOENT) {
|
||||
ldout(store->ctx(), 10) << "find_oldest_log_period found no log shards "
|
||||
"for period " << cursor.get_period().get_id() << "; returning "
|
||||
"period " << oldest_log.get_period().get_id() << dendl;
|
||||
return oldest_log;
|
||||
}
|
||||
if (r < 0) {
|
||||
return RGWPeriodHistory::Cursor{r};
|
||||
}
|
||||
oldest_log = cursor;
|
||||
|
||||
// advance to the period's predecessor
|
||||
if (!cursor.has_prev()) {
|
||||
auto& predecessor = cursor.get_period().get_predecessor();
|
||||
if (predecessor.empty()) {
|
||||
// this is the first period, so our logs must start here
|
||||
ldout(store->ctx(), 10) << "find_oldest_log_period returning first "
|
||||
ldout(cct, 10) << "find_oldest_period returning first "
|
||||
"period " << cursor.get_period().get_id() << dendl;
|
||||
return cursor;
|
||||
}
|
||||
@ -457,30 +443,99 @@ RGWPeriodHistory::Cursor find_oldest_log_period(RGWRados *store)
|
||||
RGWPeriod period;
|
||||
int r = store->period_puller->pull(predecessor, period);
|
||||
if (r < 0) {
|
||||
return RGWPeriodHistory::Cursor{r};
|
||||
return Cursor{r};
|
||||
}
|
||||
auto prev = store->period_history->insert(std::move(period));
|
||||
if (!prev) {
|
||||
return prev;
|
||||
}
|
||||
ldout(store->ctx(), 10) << "find_oldest_log_period advancing to "
|
||||
ldout(cct, 20) << "find_oldest_period advancing to "
|
||||
"predecessor period " << predecessor << dendl;
|
||||
assert(cursor.has_prev());
|
||||
}
|
||||
cursor.prev();
|
||||
}
|
||||
ldout(store->ctx(), 10) << "find_oldest_log_period returning empty cursor" << dendl;
|
||||
ldout(cct, 10) << "find_oldest_period returning empty cursor" << dendl;
|
||||
return cursor;
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
Cursor RGWMetadataManager::init_oldest_log_period()
|
||||
{
|
||||
// read the mdlog history
|
||||
RGWMetadataLogHistory state;
|
||||
int ret = read_history(store, &state);
|
||||
|
||||
if (ret == -ENOENT) {
|
||||
// initialize the mdlog history and write it
|
||||
ldout(cct, 10) << "initializing mdlog history" << dendl;
|
||||
auto cursor = find_oldest_period(store);
|
||||
if (!cursor) {
|
||||
return cursor;
|
||||
}
|
||||
|
||||
// write the initial history
|
||||
state.oldest_realm_epoch = cursor.get_epoch();
|
||||
state.oldest_period_id = cursor.get_period().get_id();
|
||||
|
||||
constexpr bool exclusive = true; // don't overwrite
|
||||
int ret = write_history(store, state, exclusive);
|
||||
if (ret < 0 && ret != -EEXIST) {
|
||||
ldout(cct, 1) << "failed to write mdlog history: "
|
||||
<< cpp_strerror(ret) << dendl;
|
||||
return Cursor{ret};
|
||||
}
|
||||
return cursor;
|
||||
} else if (ret < 0) {
|
||||
ldout(cct, 1) << "failed to read mdlog history: "
|
||||
<< cpp_strerror(ret) << dendl;
|
||||
return Cursor{ret};
|
||||
}
|
||||
|
||||
// if it's already in the history, return it
|
||||
auto cursor = store->period_history->lookup(state.oldest_realm_epoch);
|
||||
if (cursor) {
|
||||
return cursor;
|
||||
}
|
||||
// pull the oldest period by id
|
||||
RGWPeriod period;
|
||||
ret = store->period_puller->pull(state.oldest_period_id, period);
|
||||
if (ret < 0) {
|
||||
ldout(cct, 1) << "failed to read period id=" << state.oldest_period_id
|
||||
<< " for mdlog history: " << cpp_strerror(ret) << dendl;
|
||||
return Cursor{ret};
|
||||
}
|
||||
// verify its realm_epoch
|
||||
if (period.get_realm_epoch() != state.oldest_realm_epoch) {
|
||||
ldout(cct, 1) << "inconsistent mdlog history: read period id="
|
||||
<< period.get_id() << " with realm_epoch=" << period.get_realm_epoch()
|
||||
<< ", expected realm_epoch=" << state.oldest_realm_epoch << dendl;
|
||||
return Cursor{-EINVAL};
|
||||
}
|
||||
// attach the period to our history
|
||||
return store->period_history->attach(std::move(period));
|
||||
}
|
||||
|
||||
Cursor RGWMetadataManager::read_oldest_log_period() const
|
||||
{
|
||||
RGWMetadataLogHistory state;
|
||||
int ret = read_history(store, &state);
|
||||
if (ret < 0) {
|
||||
ldout(store->ctx(), 1) << "failed to read mdlog history: "
|
||||
<< cpp_strerror(ret) << dendl;
|
||||
return Cursor{ret};
|
||||
}
|
||||
|
||||
ldout(store->ctx(), 10) << "read mdlog history with oldest period id="
|
||||
<< state.oldest_period_id << " realm_epoch="
|
||||
<< state.oldest_realm_epoch << dendl;
|
||||
|
||||
return store->period_history->lookup(state.oldest_realm_epoch);
|
||||
}
|
||||
|
||||
int RGWMetadataManager::init(const std::string& current_period)
|
||||
{
|
||||
if (store->is_meta_master()) {
|
||||
// find our oldest log so we can tell other zones where to start their sync
|
||||
oldest_log_period = find_oldest_log_period(store);
|
||||
}
|
||||
// open a log for the current period
|
||||
current_log = get_log(current_period);
|
||||
return 0;
|
||||
|
@ -232,8 +232,6 @@ class RGWMetadataManager {
|
||||
std::map<std::string, RGWMetadataLog> md_logs;
|
||||
// use the current period's log for mutating operations
|
||||
RGWMetadataLog* current_log = nullptr;
|
||||
// oldest log's position in the period history
|
||||
RGWPeriodHistory::Cursor oldest_log_period;
|
||||
|
||||
void parse_metadata_key(const string& metadata_key, string& type, string& entry);
|
||||
|
||||
@ -255,9 +253,13 @@ public:
|
||||
|
||||
int init(const std::string& current_period);
|
||||
|
||||
RGWPeriodHistory::Cursor get_oldest_log_period() const {
|
||||
return oldest_log_period;
|
||||
}
|
||||
/// initialize the oldest log period if it doesn't exist, and attach it to
|
||||
/// our current history
|
||||
RGWPeriodHistory::Cursor init_oldest_log_period();
|
||||
|
||||
/// read the oldest log period, and return a cursor to it in our existing
|
||||
/// period history
|
||||
RGWPeriodHistory::Cursor read_oldest_log_period() const;
|
||||
|
||||
/// find or create the metadata log for the given period
|
||||
RGWMetadataLog* get_log(const std::string& period);
|
||||
|
@ -3843,7 +3843,14 @@ int RGWRados::init_complete()
|
||||
obj_expirer->start_processor();
|
||||
}
|
||||
|
||||
/* not point of running sync thread if we don't have a master zone configured
|
||||
if (run_sync_thread) {
|
||||
// initialize the log period history. we want to do this any time we're not
|
||||
// running under radosgw-admin, so we check run_sync_thread here before
|
||||
// disabling it based on the zone/zonegroup setup
|
||||
meta_mgr->init_oldest_log_period();
|
||||
}
|
||||
|
||||
/* no point of running sync thread if we don't have a master zone configured
|
||||
or there is no rest_master_conn */
|
||||
if (get_zonegroup().master_zone.empty() || !rest_master_conn) {
|
||||
run_sync_thread = false;
|
||||
|
@ -133,7 +133,7 @@ void RGWOp_MDLog_List::send_response() {
|
||||
|
||||
void RGWOp_MDLog_Info::execute() {
|
||||
num_objects = s->cct->_conf->rgw_md_log_max_shards;
|
||||
period = store->meta_mgr->get_oldest_log_period();
|
||||
period = store->meta_mgr->read_oldest_log_period();
|
||||
http_ret = period.get_error();
|
||||
}
|
||||
|
||||
|
@ -1888,8 +1888,8 @@ int RGWRemoteMetaLog::run_sync()
|
||||
return 0;
|
||||
}
|
||||
r = read_log_info(&mdlog_info);
|
||||
if (r == -EIO) {
|
||||
// keep retrying if master isn't alive
|
||||
if (r == -EIO || r == -ENOENT) {
|
||||
// keep retrying if master isn't alive or hasn't initialized the log
|
||||
ldout(store->ctx(), 10) << __func__ << "(): waiting for master.." << dendl;
|
||||
backoff.backoff_sleep();
|
||||
continue;
|
||||
@ -1920,6 +1920,9 @@ int RGWRemoteMetaLog::run_sync()
|
||||
if (sync_status.sync_info.period.empty() ||
|
||||
sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
|
||||
sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
|
||||
ldout(store->ctx(), 1) << "epoch=" << sync_status.sync_info.realm_epoch
|
||||
<< " in sync status comes before remote's oldest mdlog epoch="
|
||||
<< mdlog_info.realm_epoch << ", restarting sync" << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,7 @@
|
||||
|
||||
static map<string, string> ext_mime_map;
|
||||
|
||||
int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, const char *data, size_t size, bool exclusive,
|
||||
int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, const string& oid, const char *data, size_t size, bool exclusive,
|
||||
RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
|
||||
{
|
||||
map<string,bufferlist> no_attrs;
|
||||
|
@ -16,7 +16,7 @@ struct RGWObjVersionTracker;
|
||||
|
||||
struct obj_version;
|
||||
|
||||
int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, const char *data, size_t size, bool exclusive,
|
||||
int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, const string& oid, const char *data, size_t size, bool exclusive,
|
||||
RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs = NULL);
|
||||
int rgw_get_system_obj(RGWRados *rgwstore, RGWObjectCtx& obj_ctx, rgw_bucket& bucket, const string& key, bufferlist& bl,
|
||||
RGWObjVersionTracker *objv_tracker, real_time *pmtime, map<string, bufferlist> *pattrs = NULL,
|
||||
|
Loading…
Reference in New Issue
Block a user