diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 33cfd21e1ab..ce84b38724a 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -478,6 +478,7 @@ void rgw_pubsub_dest::dump(Formatter *f) const encode_json("push_endpoint_topic", arn_topic, f); encode_json("stored_secret", stored_secret, f); encode_json("persistent", persistent, f); + encode_json("persistent_queue", persistent_queue, f); encode_json("time_to_live", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f); encode_json("max_retries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f); encode_json("retry_sleep_duration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f); @@ -525,6 +526,7 @@ void rgw_pubsub_dest::decode_json(JSONObj* f) { JSONDecoder::decode_json("push_endpoint_topic", arn_topic, f); JSONDecoder::decode_json("stored_secret", stored_secret, f); JSONDecoder::decode_json("persistent", persistent, f); + JSONDecoder::decode_json("persistent_queue", persistent_queue, f); std::string ttl; JSONDecoder::decode_json("time_to_live", ttl, f); time_to_live = ttl == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE : std::stoul(ttl); diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 1ae88bdbe26..3835407eb45 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -342,12 +342,14 @@ struct rgw_pubsub_dest { std::string arn_topic; bool stored_secret = false; bool persistent = false; + // rados object name of the persistent queue in the 'notif' pool + std::string persistent_queue; uint32_t time_to_live; uint32_t max_retries; uint32_t retry_sleep_duration; void encode(bufferlist& bl) const { - ENCODE_START(6, 1, bl); + ENCODE_START(7, 1, bl); encode("", bl); encode("", bl); encode(push_endpoint, bl); @@ -358,6 +360,7 @@ struct rgw_pubsub_dest { encode(time_to_live, bl); encode(max_retries, bl); encode(retry_sleep_duration, bl); + encode(persistent_queue, bl); ENCODE_FINISH(bl); } @@ -384,6 +387,13 @@ struct rgw_pubsub_dest { decode(max_retries, bl); decode(retry_sleep_duration, bl); } + if (struct_v >= 7) { + decode(persistent_queue, bl); + } else if (persistent) { + // persistent topics created before v7 did not support tenant namespacing. + // continue to use 'arn_topic' alone as the queue's rados object name + persistent_queue = arn_topic; + } DECODE_FINISH(bl); } diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index a64c55ad10b..7e1b0491a19 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -306,6 +306,12 @@ class RGWPSCreateTopicOp : public RGWOp { ret = ps.get_topic(this, topic_name, result, y, nullptr); if (ret == -ENOENT) { // topic not present + + // initialize the persistent queue's location. this cannot change for + // existing topics. use ':' as the namespace delimiter because its + // inclusion in a TopicName would break ARNs + dest.persistent_queue = string_cat_reserve( + get_account_or_tenant(s->owner.id), ":", topic_name); } else if (ret < 0) { ldpp_dout(this, 1) << "failed to read topic '" << topic_name << "', with error:" << ret << dendl;