mirror of
https://github.com/ceph/ceph
synced 2025-02-23 19:17:37 +00:00
rgw/pubsub: remove deprecated radogw-admin pubsub creation commands
make topic and subscription read commands an official feature Fixes: https://tracker.ceph.com/issues/43536 Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
This commit is contained in:
parent
d900743d72
commit
4c486c32c6
@ -457,6 +457,28 @@ which are as follows:
|
||||
:command:`reshard cancel`
|
||||
Cancel resharding a bucket
|
||||
|
||||
:command:`topic list`
|
||||
List bucket notifications/pubsub topics
|
||||
|
||||
:command:`topic get`
|
||||
Get a bucket notifications/pubsub topic
|
||||
|
||||
:command:`topic rm`
|
||||
Remove a bucket notifications/pubsub topic
|
||||
|
||||
:command:`subscription get`
|
||||
Get a pubsub subscription definition
|
||||
|
||||
:command:`subscription rm`
|
||||
Remove a pubsub subscription
|
||||
|
||||
:command:`subscription pull`
|
||||
Show events in a pubsub subscription
|
||||
|
||||
:command:`subscription ack`
|
||||
Ack (remove) an events in a pubsub subscription
|
||||
|
||||
|
||||
Options
|
||||
=======
|
||||
|
||||
@ -898,6 +920,22 @@ Role Options
|
||||
|
||||
The path prefix for filtering the roles.
|
||||
|
||||
|
||||
Bucket Notifications/PubSub Options
|
||||
===================================
|
||||
.. option:: --topic
|
||||
|
||||
The bucket notifications/pubsub topic name.
|
||||
|
||||
.. option:: --subscription
|
||||
|
||||
The pubsub subscription name.
|
||||
|
||||
.. option:: --event-id
|
||||
|
||||
The event id in a pubsub subscription.
|
||||
|
||||
|
||||
Examples
|
||||
========
|
||||
|
||||
|
@ -29,6 +29,31 @@ mechanism. This API is similar to the one defined as the S3-compatible API of th
|
||||
|
||||
S3 Bucket Notification Compatibility <s3-notification-compatibility>
|
||||
|
||||
|
||||
Topic Management via CLI
|
||||
------------------------
|
||||
|
||||
Configuration of all topics of a user could be fetched using the following command:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic list --uid={user-id}
|
||||
|
||||
|
||||
Configuration of a specific topic could be fetched using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic get --uid={user-id} --topic={topic-name}
|
||||
|
||||
|
||||
And removed using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic rm --uid={user-id} --topic={topic-name}
|
||||
|
||||
|
||||
Notification Performance Stats
|
||||
------------------------------
|
||||
The same counters are shared between the pubsub sync module and the bucket notification mechanism.
|
||||
|
@ -112,6 +112,58 @@ the ``val`` specifies its new value. For example, setting the pubsub control use
|
||||
|
||||
A configuration field can be removed by using ``--tier-config-rm={key}``.
|
||||
|
||||
|
||||
Topic and Subscription Management via CLI
|
||||
-----------------------------------------
|
||||
|
||||
Configuration of all topics of a user could be fetched using the following command:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic list --uid={user-id}
|
||||
|
||||
|
||||
Configuration of a specific topic could be fetched using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic get --uid={user-id} --topic={topic-name}
|
||||
|
||||
|
||||
And removed using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic rm --uid={user-id} --topic={topic-name}
|
||||
|
||||
|
||||
Configuration of a subscription could be fetched using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin subscription get --uid={user-id} --subscription={topic-name}
|
||||
|
||||
And removed using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin subscription rm --uid={user-id} --subscription={topic-name}
|
||||
|
||||
|
||||
To fetch all of the events stored in a subcription, use:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin subscription pull --uid={user-id} --subscription={topic-name} [--marker={last-marker}]
|
||||
|
||||
|
||||
To ack (and remove) an event from a subscription, use:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin subscription ack --uid={user-id} --subscription={topic-name} --event-id={event-id}
|
||||
|
||||
|
||||
PubSub Performance Stats
|
||||
-------------------------
|
||||
Same counters are shared between the pubsub sync module and the notification mechanism.
|
||||
|
@ -271,6 +271,13 @@ void usage()
|
||||
cout << " mfa remove delete MFA TOTP token\n";
|
||||
cout << " mfa check check MFA TOTP token\n";
|
||||
cout << " mfa resync re-sync MFA TOTP token\n";
|
||||
cout << " topic list list bucket notifications/pubsub topics\n";
|
||||
cout << " topic get get a bucket notifications/pubsub topic\n";
|
||||
cout << " topic rm remove a bucket notifications/pubsub topic\n";
|
||||
cout << " subscription get get a pubsub subscription definition\n";
|
||||
cout << " subscription rm remove a pubsub subscription\n";
|
||||
cout << " subscription pull show events in a pubsub subscription\n";
|
||||
cout << " subscription ack ack (remove) an events in a pubsub subscription\n";
|
||||
cout << "options:\n";
|
||||
cout << " --tenant=<tenant> tenant name\n";
|
||||
cout << " --uid=<id> user id\n";
|
||||
@ -415,6 +422,10 @@ void usage()
|
||||
cout << " --totp-seconds the time resolution that is being used for TOTP generation\n";
|
||||
cout << " --totp-window the number of TOTP tokens that are checked before and after the current token when validating token\n";
|
||||
cout << " --totp-pin the valid value of a TOTP token at a certain time\n";
|
||||
cout << "\nBucket notifications/pubsub options:\n";
|
||||
cout << " --topic bucket notifications/pubsub topic name\n";
|
||||
cout << " --subscription pubsub subscription name\n";
|
||||
cout << " --event-id event id in a pubsub subscription\n";
|
||||
cout << "\n";
|
||||
generic_client_usage();
|
||||
}
|
||||
@ -738,13 +749,10 @@ enum class OPT {
|
||||
RESHARD_STALE_INSTANCES_LIST,
|
||||
RESHARD_STALE_INSTANCES_DELETE,
|
||||
PUBSUB_TOPICS_LIST,
|
||||
PUBSUB_TOPIC_CREATE,
|
||||
// TODO add "subscription list" command
|
||||
PUBSUB_TOPIC_GET,
|
||||
PUBSUB_TOPIC_RM,
|
||||
PUBSUB_NOTIFICATION_CREATE,
|
||||
PUBSUB_NOTIFICATION_RM,
|
||||
PUBSUB_SUB_GET,
|
||||
PUBSUB_SUB_CREATE,
|
||||
PUBSUB_SUB_RM,
|
||||
PUBSUB_SUB_PULL,
|
||||
PUBSUB_EVENT_RM,
|
||||
@ -953,17 +961,13 @@ static SimpleCmd::Commands all_cmds = {
|
||||
{ "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST },
|
||||
{ "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
|
||||
{ "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
|
||||
{ "pubsub topics list", OPT::PUBSUB_TOPICS_LIST },
|
||||
{ "pubsub topic create", OPT::PUBSUB_TOPIC_CREATE },
|
||||
{ "pubsub topic get", OPT::PUBSUB_TOPIC_GET },
|
||||
{ "pubsub topic rm", OPT::PUBSUB_TOPIC_RM },
|
||||
{ "pubsub notification create", OPT::PUBSUB_NOTIFICATION_CREATE },
|
||||
{ "pubsub notification rm", OPT::PUBSUB_NOTIFICATION_RM },
|
||||
{ "pubsub sub get", OPT::PUBSUB_SUB_GET },
|
||||
{ "pubsub sub create", OPT::PUBSUB_SUB_CREATE },
|
||||
{ "pubsub sub rm", OPT::PUBSUB_SUB_RM },
|
||||
{ "pubsub sub pull", OPT::PUBSUB_SUB_PULL },
|
||||
{ "pubsub event rm", OPT::PUBSUB_EVENT_RM },
|
||||
{ "topic list", OPT::PUBSUB_TOPICS_LIST },
|
||||
{ "topic get", OPT::PUBSUB_TOPIC_GET },
|
||||
{ "topic rm", OPT::PUBSUB_TOPIC_RM },
|
||||
{ "subscription get", OPT::PUBSUB_SUB_GET },
|
||||
{ "subscription rm", OPT::PUBSUB_SUB_RM },
|
||||
{ "subscription pull", OPT::PUBSUB_SUB_PULL },
|
||||
{ "subscription ack", OPT::PUBSUB_EVENT_RM },
|
||||
};
|
||||
|
||||
static SimpleCmd::Aliases cmd_aliases = {
|
||||
@ -3203,9 +3207,6 @@ int main(int argc, const char **argv)
|
||||
|
||||
string topic_name;
|
||||
string sub_name;
|
||||
string sub_oid_prefix;
|
||||
string sub_dest_bucket;
|
||||
string sub_push_endpoint;
|
||||
string event_id;
|
||||
|
||||
std::optional<string> opt_group_id;
|
||||
@ -3245,8 +3246,6 @@ int main(int argc, const char **argv)
|
||||
std::optional<string> opt_mode;
|
||||
std::optional<rgw_user> opt_dest_owner;
|
||||
|
||||
rgw::notify::EventTypeList event_types;
|
||||
|
||||
SimpleCmd cmd(all_cmds, cmd_aliases);
|
||||
|
||||
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
|
||||
@ -3599,18 +3598,10 @@ int main(int argc, const char **argv)
|
||||
trim_delay_ms = atoi(val.c_str());
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) {
|
||||
topic_name = val;
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--sub-name", (char*)NULL)) {
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--subscription", (char*)NULL)) {
|
||||
sub_name = val;
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--sub-oid-prefix", (char*)NULL)) {
|
||||
sub_oid_prefix = val;
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--sub-dest-bucket", (char*)NULL)) {
|
||||
sub_dest_bucket = val;
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) {
|
||||
sub_push_endpoint = val;
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
|
||||
event_id = val;
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) {
|
||||
rgw::notify::from_string_list(val, event_types);
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--group-id", (char*)NULL)) {
|
||||
opt_group_id = val;
|
||||
} else if (ceph_argparse_witharg(args, i, &val, "--status", (char*)NULL)) {
|
||||
@ -3874,6 +3865,10 @@ int main(int argc, const char **argv)
|
||||
OPT::ROLE_POLICY_GET,
|
||||
OPT::RESHARD_LIST,
|
||||
OPT::RESHARD_STATUS,
|
||||
OPT::PUBSUB_TOPICS_LIST,
|
||||
OPT::PUBSUB_TOPIC_GET,
|
||||
OPT::PUBSUB_SUB_GET,
|
||||
OPT::PUBSUB_SUB_PULL,
|
||||
};
|
||||
|
||||
|
||||
@ -9012,10 +9007,6 @@ next:
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) {
|
||||
if (get_tier_type(store) != "pubsub") {
|
||||
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (user_id.empty()) {
|
||||
cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
|
||||
return EINVAL;
|
||||
@ -9054,34 +9045,7 @@ next:
|
||||
formatter->flush(cout);
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::PUBSUB_TOPIC_CREATE) {
|
||||
if (get_tier_type(store) != "pubsub") {
|
||||
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (topic_name.empty()) {
|
||||
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (user_id.empty()) {
|
||||
cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
RGWUserInfo& user_info = user_op.get_user_info();
|
||||
RGWUserPubSub ups(store, user_info.user_id);
|
||||
|
||||
ret = ups.create_topic(topic_name);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::PUBSUB_TOPIC_GET) {
|
||||
if (get_tier_type(store) != "pubsub") {
|
||||
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (topic_name.empty()) {
|
||||
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
|
||||
return EINVAL;
|
||||
@ -9096,92 +9060,14 @@ next:
|
||||
rgw_pubsub_topic_subs topic;
|
||||
ret = ups.get_topic(topic_name, &topic);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
|
||||
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
encode_json("topic", topic, formatter);
|
||||
formatter->flush(cout);
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::PUBSUB_NOTIFICATION_CREATE) {
|
||||
if (get_tier_type(store) != "pubsub") {
|
||||
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (topic_name.empty()) {
|
||||
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (user_id.empty()) {
|
||||
cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (bucket_name.empty()) {
|
||||
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
RGWUserInfo& user_info = user_op.get_user_info();
|
||||
RGWUserPubSub ups(store, user_info.user_id);
|
||||
|
||||
rgw_bucket bucket;
|
||||
|
||||
RGWBucketInfo bucket_info;
|
||||
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
|
||||
auto b = ups.get_bucket(bucket_info.bucket);
|
||||
ret = b->create_notification(topic_name, event_types);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::PUBSUB_NOTIFICATION_RM) {
|
||||
if (get_tier_type(store) != "pubsub") {
|
||||
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (topic_name.empty()) {
|
||||
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (user_id.empty()) {
|
||||
cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (bucket_name.empty()) {
|
||||
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
RGWUserInfo& user_info = user_op.get_user_info();
|
||||
RGWUserPubSub ups(store, user_info.user_id);
|
||||
|
||||
rgw_bucket bucket;
|
||||
|
||||
RGWBucketInfo bucket_info;
|
||||
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
|
||||
auto b = ups.get_bucket(bucket_info.bucket);
|
||||
ret = b->remove_notification(topic_name);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::PUBSUB_TOPIC_RM) {
|
||||
if (get_tier_type(store) != "pubsub") {
|
||||
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (topic_name.empty()) {
|
||||
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
|
||||
return EINVAL;
|
||||
@ -9210,7 +9096,7 @@ next:
|
||||
return EINVAL;
|
||||
}
|
||||
if (sub_name.empty()) {
|
||||
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
|
||||
cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
RGWUserInfo& user_info = user_op.get_user_info();
|
||||
@ -9228,55 +9114,6 @@ next:
|
||||
formatter->flush(cout);
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::PUBSUB_SUB_CREATE) {
|
||||
if (get_tier_type(store) != "pubsub") {
|
||||
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (user_id.empty()) {
|
||||
cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (sub_name.empty()) {
|
||||
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (topic_name.empty()) {
|
||||
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
RGWUserInfo& user_info = user_op.get_user_info();
|
||||
RGWUserPubSub ups(store, user_info.user_id);
|
||||
|
||||
rgw_pubsub_topic_subs topic;
|
||||
int ret = ups.get_topic(topic_name, &topic);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: topic not found" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
rgw_pubsub_sub_dest dest_config;
|
||||
dest_config.bucket_name = sub_dest_bucket;
|
||||
dest_config.oid_prefix = sub_oid_prefix;
|
||||
dest_config.push_endpoint = sub_push_endpoint;
|
||||
|
||||
auto psmodule = static_cast<RGWPSSyncModuleInstance *>(store->getRados()->get_sync_module().get());
|
||||
auto conf = psmodule->get_effective_conf();
|
||||
|
||||
if (dest_config.bucket_name.empty()) {
|
||||
dest_config.bucket_name = string(conf["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name;
|
||||
}
|
||||
if (dest_config.oid_prefix.empty()) {
|
||||
dest_config.oid_prefix = conf["data_oid_prefix"];
|
||||
}
|
||||
auto sub = ups.get_sub(sub_name);
|
||||
ret = sub->subscribe(topic_name, dest_config);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl;
|
||||
return -ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::PUBSUB_SUB_RM) {
|
||||
if (get_tier_type(store) != "pubsub") {
|
||||
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
|
||||
@ -9287,7 +9124,7 @@ next:
|
||||
return EINVAL;
|
||||
}
|
||||
if (sub_name.empty()) {
|
||||
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
|
||||
cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
RGWUserInfo& user_info = user_op.get_user_info();
|
||||
@ -9311,7 +9148,7 @@ next:
|
||||
return EINVAL;
|
||||
}
|
||||
if (sub_name.empty()) {
|
||||
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
|
||||
cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
RGWUserInfo& user_info = user_op.get_user_info();
|
||||
@ -9320,7 +9157,7 @@ next:
|
||||
if (!max_entries_specified) {
|
||||
max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS;
|
||||
}
|
||||
auto sub = ups.get_sub(sub_name);
|
||||
auto sub = ups.get_sub_with_events(sub_name);
|
||||
ret = sub->list_events(marker, max_entries);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
|
||||
@ -9340,7 +9177,7 @@ next:
|
||||
return EINVAL;
|
||||
}
|
||||
if (sub_name.empty()) {
|
||||
cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
|
||||
cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
if (event_id.empty()) {
|
||||
@ -9350,7 +9187,7 @@ next:
|
||||
RGWUserInfo& user_info = user_op.get_user_info();
|
||||
RGWUserPubSub ups(store, user_info.user_id);
|
||||
|
||||
auto sub = ups.get_sub(sub_name);
|
||||
auto sub = ups.get_sub_with_events(sub_name);
|
||||
ret = sub->remove_event(event_id);
|
||||
if (ret < 0) {
|
||||
cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
|
||||
|
@ -43,28 +43,6 @@ config:
|
||||
"data_oid_prefix": <prefix> #
|
||||
"events_retention_days": <int> # default: 7
|
||||
"start_with_full_sync" <bool> # default: false
|
||||
|
||||
# non-dynamic config
|
||||
"notifications": [
|
||||
{
|
||||
"path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
|
||||
# or a prefix if it ends with a wildcard
|
||||
"topic": <topic-name>
|
||||
},
|
||||
...
|
||||
],
|
||||
"subscriptions": [
|
||||
{
|
||||
"name": <subscription-name>,
|
||||
"topic": <topic>,
|
||||
"push_endpoint": <endpoint>,
|
||||
"push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
|
||||
"data_bucket": <bucket>, # override name of bucket where subscription data will be store
|
||||
"data_oid_prefix": <prefix> # set prefix for subscription data object ids
|
||||
"s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
*/
|
||||
@ -119,28 +97,6 @@ struct PSSubConfig {
|
||||
encode_json("s3_id", s3_id, f);
|
||||
}
|
||||
|
||||
void init(CephContext *cct, const JSONFormattable& config,
|
||||
const string& data_bucket_prefix,
|
||||
const string& default_oid_prefix) {
|
||||
name = config["name"];
|
||||
topic = config["topic"];
|
||||
push_endpoint_name = config["push_endpoint"];
|
||||
string default_bucket_name = data_bucket_prefix + name;
|
||||
data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
|
||||
data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
|
||||
s3_id = config["s3_id"];
|
||||
arn_topic = config["arn_topic"];
|
||||
if (!push_endpoint_name.empty()) {
|
||||
push_endpoint_args = config["push_endpoint_args"];
|
||||
try {
|
||||
push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
|
||||
ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
|
||||
} catch (const RGWPubSubEndpoint::configuration_error& e) {
|
||||
ldout(cct, 1) << "ERROR: failed to create push endpoint: "
|
||||
<< push_endpoint_name << " due to: " << e.what() << dendl;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
|
||||
@ -196,22 +152,14 @@ static string json_str(const char *name, const T& obj, bool pretty = false)
|
||||
using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
|
||||
using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
|
||||
|
||||
// global pubsub configuration
|
||||
struct PSConfig {
|
||||
const std::string id{"pubsub"};
|
||||
rgw_user user;
|
||||
std::string data_bucket_prefix;
|
||||
std::string data_oid_prefix;
|
||||
|
||||
int events_retention_days{0};
|
||||
|
||||
uint64_t sync_instance{0};
|
||||
uint64_t max_id{0};
|
||||
|
||||
/* FIXME: no hard coded buckets, we'll have configurable topics */
|
||||
std::map<std::string, PSSubConfigRef> subs;
|
||||
std::map<std::string, PSTopicConfigRef> topics;
|
||||
std::multimap<std::string, PSNotificationConfig> notifications;
|
||||
|
||||
bool start_with_full_sync{false};
|
||||
|
||||
void dump(Formatter *f) const {
|
||||
@ -221,37 +169,6 @@ struct PSConfig {
|
||||
encode_json("data_oid_prefix", data_oid_prefix, f);
|
||||
encode_json("events_retention_days", events_retention_days, f);
|
||||
encode_json("sync_instance", sync_instance, f);
|
||||
encode_json("max_id", max_id, f);
|
||||
{
|
||||
Formatter::ArraySection section(*f, "subs");
|
||||
for (auto& sub : subs) {
|
||||
encode_json("sub", *sub.second, f);
|
||||
}
|
||||
}
|
||||
{
|
||||
Formatter::ArraySection section(*f, "topics");
|
||||
for (auto& topic : topics) {
|
||||
encode_json("topic", *topic.second, f);
|
||||
}
|
||||
}
|
||||
{
|
||||
Formatter::ObjectSection section(*f, "notifications");
|
||||
std::string last;
|
||||
for (auto& notif : notifications) {
|
||||
const string& n = notif.first;
|
||||
if (n != last) {
|
||||
if (!last.empty()) {
|
||||
f->close_section();
|
||||
}
|
||||
f->open_array_section(n.c_str());
|
||||
}
|
||||
last = n;
|
||||
encode_json("notifications", notif.second, f);
|
||||
}
|
||||
if (!last.empty()) {
|
||||
f->close_section();
|
||||
}
|
||||
}
|
||||
encode_json("start_with_full_sync", start_with_full_sync, f);
|
||||
}
|
||||
|
||||
@ -261,77 +178,14 @@ struct PSConfig {
|
||||
data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
|
||||
data_oid_prefix = config["data_oid_prefix"];
|
||||
events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
|
||||
|
||||
for (auto& c : config["notifications"].array()) {
|
||||
PSNotificationConfig nc;
|
||||
nc.id = ++max_id;
|
||||
nc.init(cct, c);
|
||||
notifications.insert(std::make_pair(nc.path, nc));
|
||||
|
||||
PSTopicConfig topic_config = { .name = nc.topic };
|
||||
topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
|
||||
}
|
||||
for (auto& c : config["subscriptions"].array()) {
|
||||
auto sc = std::make_shared<PSSubConfig>();
|
||||
sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
|
||||
subs[sc->name] = sc;
|
||||
auto iter = topics.find(sc->topic);
|
||||
if (iter != topics.end()) {
|
||||
iter->second->subs.insert(sc->name);
|
||||
}
|
||||
}
|
||||
start_with_full_sync = config["start_with_full_sync"](false);
|
||||
|
||||
ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
|
||||
ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
|
||||
}
|
||||
|
||||
void init_instance(const RGWRealm& realm, uint64_t instance_id) {
|
||||
sync_instance = instance_id;
|
||||
}
|
||||
|
||||
void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
|
||||
const std::string path = bucket.name + "/" + key.name;
|
||||
|
||||
auto iter = notifications.upper_bound(path);
|
||||
if (iter == notifications.begin()) {
|
||||
return;
|
||||
}
|
||||
|
||||
do {
|
||||
--iter;
|
||||
if (iter->first.size() > path.size()) {
|
||||
break;
|
||||
}
|
||||
if (path.compare(0, iter->first.size(), iter->first) != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
PSNotificationConfig& target = iter->second;
|
||||
|
||||
if (!target.is_prefix &&
|
||||
path.size() != iter->first.size()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto topic = topics.find(target.topic);
|
||||
if (topic == topics.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id <<
|
||||
" target_path=" << target.path << ", topic=" << target.topic << dendl;
|
||||
(*result)->push_back(topic->second);
|
||||
} while (iter != notifications.begin());
|
||||
}
|
||||
|
||||
bool find_sub(const string& name, PSSubConfigRef *ref) {
|
||||
auto iter = subs.find(name);
|
||||
if (iter != subs.end()) {
|
||||
*ref = iter->second;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
using PSConfigRef = std::shared_ptr<PSConfig>;
|
||||
@ -920,13 +774,9 @@ class PSManager
|
||||
int operate() override {
|
||||
reenter(this) {
|
||||
if (owner.empty()) {
|
||||
if (!conf->find_sub(sub_name, &sub_conf)) {
|
||||
ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
|
||||
ldout(sync_env->cct, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl;
|
||||
mgr->remove_get_sub(owner, sub_name);
|
||||
return set_cr_error(-ENOENT);
|
||||
}
|
||||
|
||||
*ref = PSSubscription::get_shared(sc, mgr->env, sub_conf);
|
||||
return set_cr_error(-EINVAL);
|
||||
} else {
|
||||
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
|
||||
yield {
|
||||
@ -948,14 +798,11 @@ class PSManager
|
||||
|
||||
yield (*ref)->call_init_cr(this);
|
||||
if (retcode < 0) {
|
||||
ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
|
||||
ldout(sync_env->cct, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl;
|
||||
mgr->remove_get_sub(owner, sub_name);
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
if (owner.empty()) {
|
||||
mgr->subs[sub_name] = *ref;
|
||||
}
|
||||
mgr->remove_get_sub(owner, sub_name);
|
||||
|
||||
return set_cr_done();
|
||||
@ -1164,7 +1011,6 @@ public:
|
||||
(*topics)->push_back(tc);
|
||||
}
|
||||
|
||||
env->conf->get_topics(sync_env->cct, bucket, key, topics);
|
||||
return set_cr_done();
|
||||
}
|
||||
return 0;
|
||||
@ -1174,19 +1020,16 @@ public:
|
||||
class RGWPSHandleObjEventCR : public RGWCoroutine {
|
||||
RGWDataSyncCtx* const sc;
|
||||
const PSEnvRef env;
|
||||
const rgw_user& owner;
|
||||
const rgw_user owner;
|
||||
const EventRef<rgw_pubsub_event> event;
|
||||
const EventRef<rgw_pubsub_s3_record> record;
|
||||
const TopicsRef topics;
|
||||
const std::array<rgw_user, 2> owners;
|
||||
bool has_subscriptions;
|
||||
bool event_handled;
|
||||
bool sub_conf_found;
|
||||
PSSubscriptionRef sub;
|
||||
std::array<rgw_user, 2>::const_iterator oiter;
|
||||
std::vector<PSTopicConfigRef>::const_iterator titer;
|
||||
std::set<std::string>::const_iterator siter;
|
||||
int last_sub_conf_error;
|
||||
|
||||
public:
|
||||
RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
|
||||
@ -1201,7 +1044,6 @@ public:
|
||||
event(_event),
|
||||
record(_record),
|
||||
topics(_topics),
|
||||
owners({owner, rgw_user{}}),
|
||||
has_subscriptions(false),
|
||||
event_handled(false) {}
|
||||
|
||||
@ -1226,78 +1068,66 @@ public:
|
||||
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
|
||||
ldout(sc->cct, 20) << ": subscription: " << *siter << dendl;
|
||||
has_subscriptions = true;
|
||||
sub_conf_found = false;
|
||||
// try to read subscription configuration from global/user cond
|
||||
// configuration is considered missing only if does not exist in either
|
||||
for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
|
||||
yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub);
|
||||
if (retcode < 0) {
|
||||
if (sub_conf_found) {
|
||||
// not a real issue, sub conf already found
|
||||
retcode = 0;
|
||||
}
|
||||
last_sub_conf_error = retcode;
|
||||
continue;
|
||||
}
|
||||
sub_conf_found = true;
|
||||
if (sub->sub_conf->s3_id.empty()) {
|
||||
// subscription was not made by S3 compatible API
|
||||
ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
|
||||
yield call(PSSubscription::store_event_cr(sc, sub, event));
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
|
||||
} else {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
|
||||
event_handled = true;
|
||||
}
|
||||
if (sub->sub_conf->push_endpoint) {
|
||||
ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
|
||||
yield call(PSSubscription::push_event_cr(sc, sub, event));
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
|
||||
} else {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
|
||||
event_handled = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// subscription was made by S3 compatible API
|
||||
ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
|
||||
record->configurationId = sub->sub_conf->s3_id;
|
||||
record->opaque_data = (*titer)->opaque_data;
|
||||
yield call(PSSubscription::store_event_cr(sc, sub, record));
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
|
||||
} else {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
|
||||
event_handled = true;
|
||||
}
|
||||
if (sub->sub_conf->push_endpoint) {
|
||||
ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
|
||||
yield call(PSSubscription::push_event_cr(sc, sub, record));
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
|
||||
} else {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
|
||||
event_handled = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!sub_conf_found) {
|
||||
// could not find conf for subscription at user or global levels
|
||||
// try to read subscription configuration
|
||||
yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub);
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
|
||||
<< " ret=" << last_sub_conf_error << dendl;
|
||||
if (retcode == -ENOENT) {
|
||||
// missing subscription info should be reflected back as invalid argument
|
||||
// and not as missing object
|
||||
retcode = -EINVAL;
|
||||
<< " ret=" << retcode << dendl;
|
||||
if (retcode == -ENOENT) {
|
||||
// missing subscription info should be reflected back as invalid argument
|
||||
// and not as missing object
|
||||
retcode = -EINVAL;
|
||||
}
|
||||
// try the next subscription
|
||||
continue;
|
||||
}
|
||||
if (sub->sub_conf->s3_id.empty()) {
|
||||
// subscription was not made by S3 compatible API
|
||||
ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
|
||||
yield call(PSSubscription::store_event_cr(sc, sub, event));
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
|
||||
} else {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
|
||||
event_handled = true;
|
||||
}
|
||||
if (sub->sub_conf->push_endpoint) {
|
||||
ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
|
||||
yield call(PSSubscription::push_event_cr(sc, sub, event));
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
|
||||
} else {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
|
||||
event_handled = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// subscription was made by S3 compatible API
|
||||
ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
|
||||
record->configurationId = sub->sub_conf->s3_id;
|
||||
record->opaque_data = (*titer)->opaque_data;
|
||||
yield call(PSSubscription::store_event_cr(sc, sub, record));
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
|
||||
} else {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
|
||||
event_handled = true;
|
||||
}
|
||||
if (sub->sub_conf->push_endpoint) {
|
||||
ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
|
||||
yield call(PSSubscription::push_event_cr(sc, sub, record));
|
||||
if (retcode < 0) {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
|
||||
ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
|
||||
} else {
|
||||
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
|
||||
event_handled = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -166,6 +166,13 @@
|
||||
mfa remove delete MFA TOTP token
|
||||
mfa check check MFA TOTP token
|
||||
mfa resync re-sync MFA TOTP token
|
||||
topic list list bucket notifications/pubsub topics
|
||||
topic get get a bucket notifications/pubsub topic
|
||||
topic rm remove a bucket notifications/pubsub topic
|
||||
subscription get get a pubsub subscription definition
|
||||
subscription rm remove a pubsub subscription
|
||||
subscription pull show events in a pubsub subscription
|
||||
subscription ack ack (remove) an events in a pubsub subscription
|
||||
options:
|
||||
--tenant=<tenant> tenant name
|
||||
--uid=<id> user id
|
||||
@ -316,6 +323,11 @@
|
||||
--totp-window the number of TOTP tokens that are checked before and after the current token when validating token
|
||||
--totp-pin the valid value of a TOTP token at a certain time
|
||||
|
||||
Bucket notifications/pubsub options:
|
||||
--topic bucket notifications/pubsub topic name
|
||||
--subscription pubsub subscription name
|
||||
--event-id event id in a pubsub subscription
|
||||
|
||||
--conf/-c FILE read configuration from the given configuration file
|
||||
--id ID set ID portion of my name
|
||||
--name/-n TYPE.ID set name
|
||||
|
@ -177,6 +177,7 @@ class AMQPReceiver(object):
|
||||
remaining_retries -= 1
|
||||
print('failed to connect to rabbitmq (remaining retries '
|
||||
+ str(remaining_retries) + '): ' + str(error))
|
||||
time.sleep(1)
|
||||
|
||||
if remaining_retries == 0:
|
||||
raise Exception('failed to connect to rabbitmq - no retries left')
|
||||
@ -2221,6 +2222,113 @@ def test_ps_subscription():
|
||||
topic_conf.del_config()
|
||||
master_zone.delete_bucket(bucket_name)
|
||||
|
||||
|
||||
def test_ps_admin():
|
||||
""" test radosgw-admin commands """
|
||||
master_zone, ps_zone = init_env()
|
||||
bucket_name = gen_bucket_name()
|
||||
topic_name = bucket_name+TOPIC_SUFFIX
|
||||
realm = get_realm()
|
||||
zonegroup = realm.master_zonegroup()
|
||||
|
||||
# create topic
|
||||
topic_conf = PSTopic(ps_zone.conn, topic_name)
|
||||
topic_conf.set_config()
|
||||
result, status = topic_conf.get_config()
|
||||
assert_equal(status, 200)
|
||||
parsed_result = json.loads(result)
|
||||
assert_equal(parsed_result['topic']['name'], topic_name)
|
||||
result, status = ps_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + ps_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
parsed_result = json.loads(result)
|
||||
assert len(parsed_result['topics']) > 0
|
||||
result, status = ps_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + ps_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
parsed_result = json.loads(result)
|
||||
assert_equal(parsed_result['topic']['name'], topic_name)
|
||||
|
||||
# create s3 topics
|
||||
endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
|
||||
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
|
||||
topic_conf_s3 = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
|
||||
topic_conf_s3.set_config()
|
||||
result, status = topic_conf_s3.get_config()
|
||||
assert_equal(status, 200)
|
||||
assert_equal(result['GetTopicResponse']['GetTopicResult']['Topic']['Name'], topic_name)
|
||||
result, status = master_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + master_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
parsed_result = json.loads(result)
|
||||
assert len(parsed_result['topics']) > 0
|
||||
result, status = master_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + master_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
parsed_result = json.loads(result)
|
||||
assert_equal(parsed_result['topic']['name'], topic_name)
|
||||
|
||||
# create bucket on the first of the rados zones
|
||||
bucket = master_zone.create_bucket(bucket_name)
|
||||
# wait for sync
|
||||
zone_meta_checkpoint(ps_zone.zone)
|
||||
# create notifications
|
||||
notification_conf = PSNotification(ps_zone.conn, bucket_name,
|
||||
topic_name)
|
||||
_, status = notification_conf.set_config()
|
||||
assert_equal(status/100, 2)
|
||||
# create subscription
|
||||
sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
|
||||
topic_name)
|
||||
_, status = sub_conf.set_config()
|
||||
assert_equal(status/100, 2)
|
||||
result, status = ps_zone.zone.cluster.admin(['subscription', 'get', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX]
|
||||
+ ps_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
parsed_result = json.loads(result)
|
||||
assert_equal(parsed_result['name'], bucket_name+SUB_SUFFIX)
|
||||
# create objects in the bucket
|
||||
number_of_objects = 110
|
||||
for i in range(number_of_objects):
|
||||
key = bucket.new_key(str(i))
|
||||
key.set_contents_from_string('bar')
|
||||
# wait for sync
|
||||
# get events from subscription
|
||||
zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
|
||||
result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX]
|
||||
+ ps_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
parsed_result = json.loads(result)
|
||||
assert_equal(len(parsed_result['events']), 100)
|
||||
marker = parsed_result['next_marker']
|
||||
result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--marker', marker]
|
||||
+ ps_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
parsed_result = json.loads(result)
|
||||
assert_equal(len(parsed_result['events']), 10)
|
||||
event_id = parsed_result['events'][0]['id']
|
||||
|
||||
# ack an event in the subscription
|
||||
result, status = ps_zone.zone.cluster.admin(['subscription', 'ack', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--event-id', event_id]
|
||||
+ ps_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
|
||||
# remove the subscription
|
||||
result, status = ps_zone.zone.cluster.admin(['subscription', 'rm', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX]
|
||||
+ ps_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
|
||||
# remove the topics
|
||||
result, status = ps_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name]
|
||||
+ ps_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
result, status = master_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name]
|
||||
+ master_zone.zone.zone_arg())
|
||||
assert_equal(status, 0)
|
||||
|
||||
# cleanup
|
||||
for key in bucket.list():
|
||||
key.delete()
|
||||
notification_conf.del_config()
|
||||
master_zone.delete_bucket(bucket_name)
|
||||
|
||||
|
||||
def test_ps_incremental_sync():
|
||||
""" test that events are only sent on incremental sync """
|
||||
master_zone, ps_zone = init_env()
|
||||
|
Loading…
Reference in New Issue
Block a user