rgw_admin: show more data sync info

in radosgw-admin sync status command

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2016-03-09 15:41:13 -08:00
parent 03f8e624d9
commit eea64de61c
7 changed files with 219 additions and 41 deletions

View File

@ -1582,9 +1582,12 @@ void flush_ss(stringstream& ss, list<string>& l)
ss.str("");
}
stringstream& push_ss(stringstream& ss, list<string>& l)
stringstream& push_ss(stringstream& ss, list<string>& l, int tab = 0)
{
flush_ss(ss, l);
if (tab > 0) {
ss << setw(tab) << "" << setw(1);
}
return ss;
}
@ -1724,6 +1727,116 @@ static void get_md_sync_status(list<string>& status)
flush_ss(ss, status);
}
static void get_data_sync_status(const string& source_zone, list<string>& status, int tab)
{
RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
stringstream ss;
int ret = sync.init();
if (ret < 0) {
push_ss(ss, status, tab) << string("failed to retrieve sync info: ") + cpp_strerror(-ret);
flush_ss(ss, status);
return;
}
ret = sync.read_sync_status();
if (ret < 0) {
status.push_back(string("failed to read sync status: ") + cpp_strerror(-ret));
return;
}
const rgw_data_sync_status& sync_status = sync.get_sync_status();
string status_str;
switch (sync_status.sync_info.state) {
case rgw_data_sync_info::StateInit:
status_str = "init";
break;
case rgw_data_sync_info::StateBuildingFullSyncMaps:
status_str = "preparing for full sync";
break;
case rgw_data_sync_info::StateSync:
status_str = "syncing";
break;
default:
status_str = "unknown";
}
push_ss(ss, status, tab) << status_str;
uint64_t full_total = 0;
uint64_t full_complete = 0;
int num_full = 0;
int num_inc = 0;
int total_shards = 0;
for (auto marker_iter : sync_status.sync_markers) {
full_total += marker_iter.second.total_entries;
total_shards++;
if (marker_iter.second.state == rgw_data_sync_marker::SyncState::FullSync) {
num_full++;
full_complete += marker_iter.second.pos;
} else {
full_complete += marker_iter.second.total_entries;
}
if (marker_iter.second.state == rgw_data_sync_marker::SyncState::IncrementalSync) {
num_inc++;
}
}
push_ss(ss, status, tab) << "full sync: " << num_full << "/" << total_shards << " shards";
if (num_full > 0) {
push_ss(ss, status, tab) << "full sync: " << full_total - full_complete << " buckets to sync";
}
push_ss(ss, status, tab) << "incremental sync: " << num_inc << "/" << total_shards << " shards";
rgw_datalog_info log_info;
ret = sync.read_log_info(&log_info);
if (ret < 0) {
status.push_back(string("failed to fetch local sync status: ") + cpp_strerror(-ret));
return;
}
map<int, RGWDataChangesLogInfo> source_shards_info;
ret = sync.read_source_log_shards_info(&source_shards_info);
if (ret < 0) {
status.push_back(string("failed to fetch master sync status: ") + cpp_strerror(-ret));
return;
}
map<int, string> shards_behind;
for (auto local_iter : sync_status.sync_markers) {
int shard_id = local_iter.first;
auto iter = source_shards_info.find(shard_id);
if (iter == source_shards_info.end()) {
/* huh? */
derr << "ERROR: could not find remote sync shard status for shard_id=" << shard_id << dendl;
continue;
}
auto master_marker = iter->second.marker;
if (master_marker > local_iter.second.marker) {
shards_behind[shard_id] = local_iter.second.marker;
}
}
int total_behind = shards_behind.size() + (sync_status.sync_info.num_shards - num_inc);
if (total_behind == 0) {
status.push_back("data is caught up with master");
} else {
push_ss(ss, status, tab) << "data is behind on " << total_behind << " shards";
}
flush_ss(ss, status);
}
static void tab_dump(const string& header, int width, const list<string>& entries)
{
string s = header;
@ -1754,6 +1867,23 @@ static void sync_status(Formatter *formatter)
}
tab_dump("metadata sync", width, md_status);
list<string> data_status;
for (auto iter : store->zone_conn_map) {
const string& source_id = iter.first;
string zone_name;
string source_str = "source: ";
string s = source_str + source_id;
auto siter = store->zone_name_by_id.find(source_id);
if (siter != store->zone_name_by_id.end()) {
s += string(" (") + siter->second + ")";
}
data_status.push_back(s);
get_data_sync_status(source_id, data_status, source_str.size());
}
tab_dump("data sync", width, data_status);
}
int main(int argc, char **argv)

View File

@ -234,6 +234,33 @@ public:
}
};
class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
RGWDataSyncEnv *sync_env;
int num_shards;
map<int, RGWDataChangesLogInfo> *datalog_info;
int shard_id;
#define READ_DATALOG_MAX_CONCURRENT 10
public:
RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
int _num_shards,
map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
sync_env(_sync_env), num_shards(_num_shards),
datalog_info(_datalog_info), shard_id(0) {}
bool spawn_next();
};
bool RGWReadRemoteDataLogInfoCR::spawn_next() {
if (shard_id >= num_shards) {
return false;
}
spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
shard_id++;
return true;
}
class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
@ -353,6 +380,17 @@ int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
return 0;
}
int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
{
rgw_datalog_info log_info;
int ret = read_log_info(&log_info);
if (ret < 0) {
return ret;
}
return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
}
int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger)
{
if (initialized) {

View File

@ -3,6 +3,7 @@
#include "rgw_coroutine.h"
#include "rgw_http_client.h"
#include "rgw_bucket.h"
#include "common/RWLock.h"
@ -191,6 +192,7 @@ public:
void finish();
int read_log_info(rgw_datalog_info *log_info);
int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
int get_shard_info(int shard_id);
int read_sync_status(rgw_data_sync_status *sync_status);
int init_sync_status(int num_shards);
@ -236,6 +238,13 @@ public:
int read_sync_status() { return source_log.read_sync_status(&sync_status); }
int init_sync_status() { return source_log.init_sync_status(num_shards); }
int read_log_info(rgw_datalog_info *log_info) {
return source_log.read_log_info(log_info);
}
int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
return source_log.read_source_log_shards_info(shards_info);
}
int run() { return source_log.run_sync(num_shards, sync_status); }
void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }

View File

@ -3641,6 +3641,7 @@ int RGWRados::init_complete()
const string& id = ziter->first;
RGWZone& z = ziter->second;
zone_id_by_name[z.name] = id;
zone_name_by_id[id] = z.name;
if (id != zone_id()) {
if (!z.endpoints.empty()) {
ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;

View File

@ -1868,6 +1868,7 @@ public:
map<string, RGWRESTConn *> zonegroup_conn_map;
map<string, string> zone_id_by_name;
map<string, string> zone_name_by_id;
RGWRESTConn *get_zone_conn_by_id(const string& id) {
auto citer = zone_conn_map.find(id);

View File

@ -137,40 +137,12 @@ void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("entries", entries, obj);
};
class RGWShardCollectCR : public RGWCoroutine {
CephContext *cct;
int RGWShardCollectCR::operate() {
reenter(this) {
while (spawn_next()) {
current_running++;
int cur_shard;
int current_running;
int max_concurrent;
int status;
public:
RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
current_running(0),
max_concurrent(_max_concurrent),
status(0) {}
virtual bool spawn_next() = 0;
int operate() {
reenter(this) {
while (spawn_next()) {
current_running++;
while (current_running >= max_concurrent) {
int child_ret;
yield wait_for_child();
if (collect_next(&child_ret)) {
current_running--;
if (child_ret < 0 && child_ret != -ENOENT) {
ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
status = child_ret;
}
}
}
}
while (current_running > 0) {
while (current_running >= max_concurrent) {
int child_ret;
yield wait_for_child();
if (collect_next(&child_ret)) {
@ -181,15 +153,25 @@ public:
}
}
}
if (status < 0) {
return set_cr_error(status);
}
return set_cr_done();
}
return 0;
while (current_running > 0) {
int child_ret;
yield wait_for_child();
if (collect_next(&child_ret)) {
current_running--;
if (child_ret < 0 && child_ret != -ENOENT) {
ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
status = child_ret;
}
}
}
if (status < 0) {
return set_cr_error(status);
}
return set_cr_done();
}
};
return 0;
}
class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
RGWMetaSyncEnv *sync_env;

View File

@ -418,6 +418,23 @@ public:
int operate();
};
class RGWShardCollectCR : public RGWCoroutine {
CephContext *cct;
int cur_shard;
int current_running;
int max_concurrent;
int status;
public:
RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
current_running(0),
max_concurrent(_max_concurrent),
status(0) {}
virtual bool spawn_next() = 0;
int operate();
};
#endif