mirror of
https://github.com/ceph/ceph
synced 2025-02-25 03:52:04 +00:00
rgw/pubsub: rgw_pubsub_dest stores persistent queue oid
Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
parent
ba2566a5fe
commit
3ef1ab3de2
@ -478,6 +478,7 @@ void rgw_pubsub_dest::dump(Formatter *f) const
|
||||
encode_json("push_endpoint_topic", arn_topic, f);
|
||||
encode_json("stored_secret", stored_secret, f);
|
||||
encode_json("persistent", persistent, f);
|
||||
encode_json("persistent_queue", persistent_queue, f);
|
||||
encode_json("time_to_live", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f);
|
||||
encode_json("max_retries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f);
|
||||
encode_json("retry_sleep_duration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f);
|
||||
@ -525,6 +526,7 @@ void rgw_pubsub_dest::decode_json(JSONObj* f) {
|
||||
JSONDecoder::decode_json("push_endpoint_topic", arn_topic, f);
|
||||
JSONDecoder::decode_json("stored_secret", stored_secret, f);
|
||||
JSONDecoder::decode_json("persistent", persistent, f);
|
||||
JSONDecoder::decode_json("persistent_queue", persistent_queue, f);
|
||||
std::string ttl;
|
||||
JSONDecoder::decode_json("time_to_live", ttl, f);
|
||||
time_to_live = ttl == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE : std::stoul(ttl);
|
||||
|
@ -342,12 +342,14 @@ struct rgw_pubsub_dest {
|
||||
std::string arn_topic;
|
||||
bool stored_secret = false;
|
||||
bool persistent = false;
|
||||
// rados object name of the persistent queue in the 'notif' pool
|
||||
std::string persistent_queue;
|
||||
uint32_t time_to_live;
|
||||
uint32_t max_retries;
|
||||
uint32_t retry_sleep_duration;
|
||||
|
||||
void encode(bufferlist& bl) const {
|
||||
ENCODE_START(6, 1, bl);
|
||||
ENCODE_START(7, 1, bl);
|
||||
encode("", bl);
|
||||
encode("", bl);
|
||||
encode(push_endpoint, bl);
|
||||
@ -358,6 +360,7 @@ struct rgw_pubsub_dest {
|
||||
encode(time_to_live, bl);
|
||||
encode(max_retries, bl);
|
||||
encode(retry_sleep_duration, bl);
|
||||
encode(persistent_queue, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
@ -384,6 +387,13 @@ struct rgw_pubsub_dest {
|
||||
decode(max_retries, bl);
|
||||
decode(retry_sleep_duration, bl);
|
||||
}
|
||||
if (struct_v >= 7) {
|
||||
decode(persistent_queue, bl);
|
||||
} else if (persistent) {
|
||||
// persistent topics created before v7 did not support tenant namespacing.
|
||||
// continue to use 'arn_topic' alone as the queue's rados object name
|
||||
persistent_queue = arn_topic;
|
||||
}
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
||||
|
@ -306,6 +306,12 @@ class RGWPSCreateTopicOp : public RGWOp {
|
||||
ret = ps.get_topic(this, topic_name, result, y, nullptr);
|
||||
if (ret == -ENOENT) {
|
||||
// topic not present
|
||||
|
||||
// initialize the persistent queue's location. this cannot change for
|
||||
// existing topics. use ':' as the namespace delimiter because its
|
||||
// inclusion in a TopicName would break ARNs
|
||||
dest.persistent_queue = string_cat_reserve(
|
||||
get_account_or_tenant(s->owner.id), ":", topic_name);
|
||||
} else if (ret < 0) {
|
||||
ldpp_dout(this, 1) << "failed to read topic '" << topic_name
|
||||
<< "', with error:" << ret << dendl;
|
||||
|
Loading…
Reference in New Issue
Block a user