rgw: paginate ListTopics

rename read_topics()/write_topics() to 'v1' and only call them from
internal v1 call paths

public get_topics() now calls read_topics_v1() for the v1 case, and does
the paginated listing with driver->meta_list_keys_next() for v2

RGWPSListTopicsOp now uses the NextToken request/response params with
the paginated get_topics(), limiting responses to 100 entries like AWS

'radosgw-admin topic list' also paginates the listing according to
--max-entries to avoid reading everything into memory at once

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2024-01-09 18:55:40 -05:00
parent 7827b2527f
commit db6c73a0cd
4 changed files with 114 additions and 81 deletions

View File

@ -1169,7 +1169,7 @@ static void show_reshard_status(
}
static void show_topics_info_v2(const rgw_pubsub_topic& topic,
std::set<std::string> subscribed_buckets,
const std::set<std::string>& subscribed_buckets,
Formatter* formatter) {
formatter->open_object_section("topic");
topic.dump(formatter);
@ -10650,39 +10650,47 @@ next:
if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
RGWPubSub ps(driver, tenant, *site);
rgw_pubsub_topics result;
ret = ps.get_topics(dpp(), result, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
if (!rgw::sal::User::empty(user)) {
for (auto it = result.topics.cbegin(); it != result.topics.cend();) {
const auto& topic = it->second;
if (user->get_id() != topic.user) {
result.topics.erase(it++);
} else {
++it;
}
std::string next_token = marker;
formatter->open_object_section("result");
formatter->open_array_section("topics");
do {
rgw_pubsub_topics result;
int ret = ps.get_topics(dpp(), next_token, max_entries,
result, next_token, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
}
if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
Formatter::ObjectSection top_section(*formatter, "result");
Formatter::ArraySection s(*formatter, "topics");
for (const auto& [_, topic] : result.topics) {
if (!rgw::sal::User::empty(user) && user->get_id() != topic.user) {
continue;
}
std::set<std::string> subscribed_buckets;
ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
null_yield, dpp());
if (ret < 0) {
cerr << "failed to fetch bucket topic mapping info for topic: "
<< topic.name << ", ret=" << ret << std::endl;
} else {
if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
null_yield, dpp());
if (ret < 0) {
cerr << "failed to fetch bucket topic mapping info for topic: "
<< topic.name << ", ret=" << ret << std::endl;
}
show_topics_info_v2(topic, subscribed_buckets, formatter.get());
} else {
encode_json("result", result, formatter.get());
}
if (max_entries_specified) {
--max_entries;
}
}
} else {
encode_json("result", result, formatter.get());
} while (!next_token.empty() && max_entries > 0);
formatter->close_section(); // topics
if (max_entries_specified) {
encode_json("truncated", !next_token.empty(), formatter.get());
if (!next_token.empty()) {
encode_json("marker", next_token, formatter.get());
}
}
formatter->close_section(); // result
formatter->flush(cout);
}

View File

@ -510,45 +510,61 @@ RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver,
{
}
int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
RGWObjVersionTracker *objv_tracker, optional_yield y) const
int RGWPubSub::get_topics(const DoutPrefixProvider* dpp,
const std::string& start_marker, int max_items,
rgw_pubsub_topics& result, std::string& next_marker,
optional_yield y) const
{
if (use_notification_v2) {
void* handle = NULL;
auto ret =
driver->meta_list_keys_init(dpp, "topic", std::string(), &handle);
if (ret < 0) {
return ret;
}
bool truncated;
int max = 1000;
do {
std::list<std::string> topics;
ret = driver->meta_list_keys_next(dpp, handle, max, topics, &truncated);
if (ret < 0) {
ldpp_dout(dpp, 1)
<< "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
break;
}
for (auto& topic_entry : topics) {
std::string topic_name;
std::string topic_tenant;
parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
if (tenant != topic_tenant) {
continue;
}
rgw_pubsub_topic topic;
const auto op_ret = get_topic(dpp, topic_name, topic, y, nullptr);
if (op_ret < 0) {
ret = op_ret;
continue;
}
result.topics[topic_name] = std::move(topic);
}
} while (truncated);
driver->meta_list_keys_complete(handle);
if (!use_notification_v2) {
// v1 returns all topics, ignoring marker/max_items
return read_topics_v1(dpp, result, nullptr, y);
}
// TODO: prefix filter on 'tenant:'
void* handle = NULL;
int ret = driver->meta_list_keys_init(dpp, "topic", start_marker, &handle);
if (ret < 0) {
return ret;
}
auto g = make_scope_guard(
[this, handle] { driver->meta_list_keys_complete(handle); });
if (max_items > 1000) {
max_items = 1000;
}
std::list<std::string> topics;
bool truncated = false;
ret = driver->meta_list_keys_next(dpp, handle, max_items, topics, &truncated);
if (ret < 0) {
ldpp_dout(dpp, 1)
<< "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
return ret;
}
for (auto& topic_entry : topics) {
std::string topic_name;
std::string topic_tenant;
parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
if (tenant != topic_tenant) {
continue;
}
rgw_pubsub_topic topic;
int r = get_topic(dpp, topic_name, topic, y, nullptr);
if (r < 0) {
continue;
}
result.topics[topic_name] = std::move(topic);
}
if (truncated) {
next_marker = driver->meta_get_marker(handle);
} else {
next_marker.clear();
}
return ret;
}
int RGWPubSub::read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
RGWObjVersionTracker *objv_tracker, optional_yield y) const
{
const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
@ -557,8 +573,8 @@ int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& res
return 0;
}
int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
RGWObjVersionTracker *objv_tracker, optional_yield y) const
int RGWPubSub::write_topics_v1(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
RGWObjVersionTracker *objv_tracker, optional_yield y) const
{
const int ret = driver->write_topics(tenant, topics, objv_tracker, y, dpp);
if (ret < 0 && ret != -ENOENT) {
@ -616,7 +632,7 @@ int RGWPubSub::get_topic(const DoutPrefixProvider* dpp,
return ret;
}
rgw_pubsub_topics topics;
const int ret = read_topics(dpp, topics, nullptr, y);
const int ret = read_topics_v1(dpp, topics, nullptr, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
@ -932,7 +948,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
int ret = read_topics(dpp, topics, &objv_tracker, y);
int ret = read_topics_v1(dpp, topics, &objv_tracker, y);
if (ret < 0 && ret != -ENOENT) {
// its not an error if not topics exist, we create one
ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
@ -947,7 +963,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
new_topic.opaque_data = opaque_data;
new_topic.policy_text = policy_text;
ret = write_topics(dpp, topics, &objv_tracker, y);
ret = write_topics_v1(dpp, topics, &objv_tracker, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
@ -989,7 +1005,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
int ret = read_topics(dpp, topics, &objv_tracker, y);
int ret = read_topics_v1(dpp, topics, &objv_tracker, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
@ -1001,7 +1017,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
topics.topics.erase(name);
ret = write_topics(dpp, topics, &objv_tracker, y);
ret = write_topics_v1(dpp, topics, &objv_tracker, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
return ret;

View File

@ -562,10 +562,10 @@ class RGWPubSub
const std::string tenant;
bool use_notification_v2 = false;
int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
RGWObjVersionTracker* objv_tracker, optional_yield y) const;
int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
RGWObjVersionTracker* objv_tracker, optional_yield y) const;
int read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
RGWObjVersionTracker* objv_tracker, optional_yield y) const;
int write_topics_v1(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
RGWObjVersionTracker* objv_tracker, optional_yield y) const;
public:
RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
@ -620,11 +620,13 @@ public:
int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const;
};
// get the list of topics
// return 0 on success or if no topic was associated with the bucket, error code otherwise
int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, optional_yield y) const {
return read_topics(dpp, result, nullptr, y);
}
// get a paginated list of topics
// return 0 on success, error code otherwise
int get_topics(const DoutPrefixProvider* dpp,
const std::string& start_marker, int max_items,
rgw_pubsub_topics& result, std::string& next_marker,
optional_yield y) const;
// get a topic with by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise.

View File

@ -293,6 +293,7 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
class RGWPSListTopicsOp : public RGWOp {
private:
rgw_pubsub_topics result;
std::string next_token;
public:
int verify_permission(optional_yield) override {
@ -325,15 +326,21 @@ public:
f->close_section(); // ListTopicsResult
f->open_object_section("ResponseMetadata");
encode_xml("RequestId", s->req_id, f);
f->close_section(); // ResponseMetadat
f->close_section(); // ResponseMetadata
if (!next_token.empty()) {
encode_xml("NextToken", next_token, f);
}
f->close_section(); // ListTopicsResponse
rgw_flush_formatter_and_reset(s, f);
}
};
void RGWPSListTopicsOp::execute(optional_yield y) {
const std::string start_token = s->info.args.get("NextToken");
const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
op_ret = ps.get_topics(this, result, y);
constexpr int max_items = 100;
op_ret = ps.get_topics(this, start_token, max_items, result, next_token, y);
// if there are no topics it is not considered an error
op_ret = op_ret == -ENOENT ? 0 : op_ret;
if (op_ret < 0) {