mirror of
https://github.com/ceph/ceph
synced 2025-01-29 14:34:40 +00:00
rgw/pubsub: remove pusub sync module functionality
also update the documentation Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
This commit is contained in:
parent
f5fc9e690f
commit
0989166a63
@ -10,9 +10,6 @@ Bucket notifications provide a mechanism for sending information out of radosgw
|
||||
when certain events happen on the bucket. Notifications can be sent to HTTP
|
||||
endpoints, AMQP0.9.1 endpoints, and Kafka endpoints.
|
||||
|
||||
The `PubSub Module`_ (and *not* the bucket-notification mechanism) should be
|
||||
used for events stored in Ceph.
|
||||
|
||||
A user can create topics. A topic entity is defined by its name and is "per
|
||||
tenant". A user can associate its topics (via notification configuration) only
|
||||
with buckets it owns.
|
||||
@ -27,8 +24,7 @@ to create filters. There can be multiple notifications for any specific topic,
|
||||
and the same topic can used for multiple notifications.
|
||||
|
||||
REST API has been defined so as to provide configuration and control interfaces
|
||||
for the bucket notification mechanism. This API is similar to the one defined
|
||||
as the S3-compatible API of the `PubSub Module`_.
|
||||
for the bucket notification mechanism.
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
@ -99,7 +95,6 @@ Remove a topic by running the following command:
|
||||
|
||||
Notification Performance Statistics
|
||||
-----------------------------------
|
||||
Counters are shared between the `PubSub Module`_ and the bucket notification mechanism:
|
||||
|
||||
- ``pubsub_event_triggered``: a running counter of events that have at least one topic associated with them
|
||||
- ``pubsub_event_lost``: a running counter of events that had topics associated with them, but that were not pushed to any of the endpoints
|
||||
@ -126,6 +121,8 @@ Topics
|
||||
``application/x-www-form-urlencoded``.
|
||||
|
||||
|
||||
.. _Create a Topic:
|
||||
|
||||
Create a Topic
|
||||
``````````````
|
||||
|
||||
@ -203,7 +200,7 @@ Request parameters:
|
||||
- "broker": The message is considered "delivered" if it is acked by the broker (default).
|
||||
- "routable": The message is considered "delivered" if the broker can route to a consumer.
|
||||
|
||||
.. tip:: The topic-name (see :ref:`radosgw-create-a-topic`) is used for the
|
||||
.. tip:: The topic-name (see :ref:`Create a Topic`) is used for the
|
||||
AMQP topic ("routing key" for a topic exchange).
|
||||
|
||||
- Kafka endpoint
|
||||
@ -456,9 +453,8 @@ Detailed under: `Bucket Operations`_.
|
||||
Events
|
||||
~~~~~~
|
||||
|
||||
Events are in JSON format (regardless of the actual endpoint), and share the
|
||||
same structure as S3-compatible events that are pushed or pulled using the
|
||||
pubsub sync module. For example:
|
||||
Events are in JSON format (regardless of the actual endpoint), and are S3-compatible.
|
||||
For example:
|
||||
|
||||
::
|
||||
|
||||
@ -539,7 +535,6 @@ pubsub sync module. For example:
|
||||
and is added to all notifications triggered by the topic. (This is an
|
||||
extension to the S3 notification API.)
|
||||
|
||||
.. _PubSub Module : ../pubsub-module
|
||||
.. _S3 Notification Compatibility: ../s3-notification-compatibility
|
||||
.. _AWS Create Topic: https://docs.aws.amazon.com/sns/latest/api/API_CreateTopic.html
|
||||
.. _Bucket Operations: ../s3/bucketops
|
||||
|
@ -1,646 +0,0 @@
|
||||
==================
|
||||
PubSub Sync Module
|
||||
==================
|
||||
|
||||
.. versionadded:: Nautilus
|
||||
|
||||
.. contents::
|
||||
|
||||
This sync module provides a publish and subscribe mechanism for the object store modification
|
||||
events. Events are published into predefined topics. Topics can be subscribed to, and events
|
||||
can be pulled from them. Events need to be acked. Also, events will expire and disappear
|
||||
after a period of time.
|
||||
|
||||
A push notification mechanism exists too, currently supporting HTTP,
|
||||
AMQP0.9.1 and Kafka endpoints. In this case, the events are pushed to an endpoint on top of storing them in Ceph. If events should only be pushed to an endpoint
|
||||
and do not need to be stored in Ceph, the `Bucket Notification`_ mechanism should be used instead of pubsub sync module.
|
||||
|
||||
A user can create different topics. A topic entity is defined by its name and is per tenant. A
|
||||
user can only associate its topics (via notification configuration) with buckets it owns.
|
||||
|
||||
In order to publish events for specific bucket a notification entity needs to be created. A
|
||||
notification can be created on a subset of event types, or for all event types (default).
|
||||
There can be multiple notifications for any specific topic, and the same topic could be used for multiple notifications.
|
||||
|
||||
A subscription to a topic can also be defined. There can be multiple subscriptions for any
|
||||
specific topic.
|
||||
|
||||
REST API has been defined to provide configuration and control interfaces for the pubsub
|
||||
mechanisms. This API has two flavors, one is S3-compatible and one is not. The two flavors can be used
|
||||
together, although it is recommended to use the S3-compatible one.
|
||||
The S3-compatible API is similar to the one used in the bucket notification mechanism.
|
||||
|
||||
Events are stored as RGW objects in a special bucket, under a special user (pubsub control user). Events cannot
|
||||
be accessed directly, but need to be pulled and acked using the new REST API.
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
|
||||
S3 Bucket Notification Compatibility <s3-notification-compatibility>
|
||||
|
||||
.. note:: To enable bucket notifications API, the `rgw_enable_apis` configuration parameter should contain: "notifications".
|
||||
|
||||
PubSub Zone Configuration
|
||||
-------------------------
|
||||
|
||||
The pubsub sync module requires the creation of a new zone in a :ref:`multisite` environment...
|
||||
First, a master zone must exist (see: :ref:`master-zone-label`),
|
||||
then a secondary zone should be created (see :ref:`secondary-zone-label`).
|
||||
In the creation of the secondary zone, its tier type must be set to ``pubsub``:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin zone create --rgw-zonegroup={zone-group-name} \
|
||||
--rgw-zone={zone-name} \
|
||||
--endpoints={http://fqdn}[,{http://fqdn}] \
|
||||
--sync-from-all=0 \
|
||||
--sync-from={master-zone-name} \
|
||||
--tier-type=pubsub
|
||||
|
||||
|
||||
PubSub Zone Configuration Parameters
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
::
|
||||
|
||||
{
|
||||
"tenant": <tenant>, # default: <empty>
|
||||
"uid": <uid>, # default: "pubsub"
|
||||
"data_bucket_prefix": <prefix> # default: "pubsub-"
|
||||
"data_oid_prefix": <prefix> #
|
||||
"events_retention_days": <days> # default: 7
|
||||
}
|
||||
|
||||
* ``tenant`` (string)
|
||||
|
||||
The tenant of the pubsub control user.
|
||||
|
||||
* ``uid`` (string)
|
||||
|
||||
The uid of the pubsub control user.
|
||||
|
||||
* ``data_bucket_prefix`` (string)
|
||||
|
||||
The prefix of the bucket name that will be created to store events for specific topic.
|
||||
|
||||
* ``data_oid_prefix`` (string)
|
||||
|
||||
The oid prefix for the stored events.
|
||||
|
||||
* ``events_retention_days`` (integer)
|
||||
|
||||
How many days to keep events that weren't acked.
|
||||
|
||||
Configuring Parameters via CLI
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The tier configuration could be set using the following command:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin zone modify --rgw-zonegroup={zone-group-name} \
|
||||
--rgw-zone={zone-name} \
|
||||
--tier-config={key}={val}[,{key}={val}]
|
||||
|
||||
Where the ``key`` in the configuration specifies the configuration variable that needs to be updated (from the list above), and
|
||||
the ``val`` specifies its new value. For example, setting the pubsub control user ``uid`` to ``user_ps``:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin zone modify --rgw-zonegroup={zone-group-name} \
|
||||
--rgw-zone={zone-name} \
|
||||
--tier-config=uid=pubsub
|
||||
|
||||
A configuration field can be removed by using ``--tier-config-rm={key}``.
|
||||
|
||||
|
||||
Topic and Subscription Management via CLI
|
||||
-----------------------------------------
|
||||
|
||||
Configuration of all topics, associated with a tenant, could be fetched using the following command:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic list [--tenant={tenant}]
|
||||
|
||||
|
||||
Configuration of a specific topic could be fetched using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic get --topic={topic-name} [--tenant={tenant}]
|
||||
|
||||
|
||||
And removed using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin topic rm --topic={topic-name} [--tenant={tenant}]
|
||||
|
||||
|
||||
Configuration of a subscription could be fetched using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin subscription get --subscription={topic-name} [--tenant={tenant}]
|
||||
|
||||
And removed using:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin subscription rm --subscription={topic-name} [--tenant={tenant}]
|
||||
|
||||
|
||||
To fetch all of the events stored in a subscription, use:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin subscription pull --subscription={topic-name} [--marker={last-marker}] [--tenant={tenant}]
|
||||
|
||||
|
||||
To ack (and remove) an event from a subscription, use:
|
||||
|
||||
::
|
||||
|
||||
# radosgw-admin subscription ack --subscription={topic-name} --event-id={event-id} [--tenant={tenant}]
|
||||
|
||||
|
||||
PubSub Performance Stats
|
||||
-------------------------
|
||||
Same counters are shared between the pubsub sync module and the notification mechanism.
|
||||
|
||||
- ``pubsub_event_triggered``: running counter of events with at lease one topic associated with them
|
||||
- ``pubsub_event_lost``: running counter of events that had topics and subscriptions associated with them but that were not stored or pushed to any of the subscriptions
|
||||
- ``pubsub_store_ok``: running counter, for all subscriptions, of stored events
|
||||
- ``pubsub_store_fail``: running counter, for all subscriptions, of events failed to be stored
|
||||
- ``pubsub_push_ok``: running counter, for all subscriptions, of events successfully pushed to their endpoint
|
||||
- ``pubsub_push_fail``: running counter, for all subscriptions, of events failed to be pushed to their endpoint
|
||||
- ``pubsub_push_pending``: gauge value of events pushed to an endpoint but not acked or nacked yet
|
||||
|
||||
.. note::
|
||||
|
||||
``pubsub_event_triggered`` and ``pubsub_event_lost`` are incremented per event, while:
|
||||
``pubsub_store_ok``, ``pubsub_store_fail``, ``pubsub_push_ok``, ``pubsub_push_fail``, are incremented per store/push action on each subscriptions.
|
||||
|
||||
PubSub REST API
|
||||
---------------
|
||||
|
||||
.. tip:: PubSub REST calls, and only them, should be sent to an RGW which belong to a PubSub zone
|
||||
|
||||
Topics
|
||||
~~~~~~
|
||||
|
||||
.. _radosgw-create-a-topic:
|
||||
|
||||
Create a Topic
|
||||
``````````````
|
||||
|
||||
This will create a new topic. Topic creation is needed both for both flavors of the API.
|
||||
Optionally the topic could be provided with push endpoint parameters that would be used later
|
||||
when an S3-compatible notification is created.
|
||||
Upon successful request, the response will include the topic ARN that could be later used to reference this topic in an S3-compatible notification request.
|
||||
To update a topic, use the same command used for topic creation, with the topic name of an existing topic and different endpoint values.
|
||||
|
||||
.. tip:: Any S3-compatible notification already associated with the topic needs to be re-created for the topic update to take effect
|
||||
|
||||
::
|
||||
|
||||
PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
|
||||
|
||||
Request parameters:
|
||||
|
||||
- push-endpoint: URI of an endpoint to send push notification to
|
||||
- OpaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the topic
|
||||
|
||||
The endpoint URI may include parameters depending with the type of endpoint:
|
||||
|
||||
- HTTP endpoint
|
||||
|
||||
- URI: ``http[s]://<fqdn>[:<port]``
|
||||
- port defaults to: 80/443 for HTTP/S accordingly
|
||||
- verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
|
||||
|
||||
- AMQP0.9.1 endpoint
|
||||
|
||||
- URI: ``amqp[s]://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
|
||||
- user/password defaults to: guest/guest
|
||||
- user/password may only be provided over HTTPS. Topic creation request will be rejected if not
|
||||
- port defaults to: 5672/5671 for unencrypted/SSL-encrypted connections
|
||||
- vhost defaults to: "/"
|
||||
- verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
|
||||
- if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
|
||||
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1). Different topics pointing to the same endpoint must use the same exchange
|
||||
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
|
||||
|
||||
- "none": message is considered "delivered" if sent to broker
|
||||
- "broker": message is considered "delivered" if acked by broker (default)
|
||||
- "routable": message is considered "delivered" if broker can route to a consumer
|
||||
|
||||
.. tip:: The topic-name (see :ref:`radosgw-create-a-topic`) is used for the AMQP topic ("routing key" for a topic exchange)
|
||||
|
||||
- Kafka endpoint
|
||||
|
||||
- URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
|
||||
- if ``use-ssl`` is set to "true", secure connection will be used for connecting with the broker ("false" by default)
|
||||
- if ``ca-location`` is provided, and secure connection is used, the specified CA will be used, instead of the default one, to authenticate the broker
|
||||
- user/password may only be provided over HTTPS. Topic creation request will be rejected if not
|
||||
- user/password may only be provided together with ``use-ssl``, connection to the broker would fail if not
|
||||
- port defaults to: 9092
|
||||
- kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
|
||||
|
||||
- "none": message is considered "delivered" if sent to broker
|
||||
- "broker": message is considered "delivered" if acked by broker (default)
|
||||
|
||||
The topic ARN in the response will have the following format:
|
||||
|
||||
::
|
||||
|
||||
arn:aws:sns:<zone-group>:<tenant>:<topic>
|
||||
|
||||
Get Topic Information
|
||||
`````````````````````
|
||||
|
||||
Returns information about specific topic. This includes subscriptions to that topic, and push-endpoint information, if provided.
|
||||
|
||||
::
|
||||
|
||||
GET /topics/<topic-name>
|
||||
|
||||
Response will have the following format (JSON):
|
||||
|
||||
::
|
||||
|
||||
{
|
||||
"topic":{
|
||||
"user":"",
|
||||
"name":"",
|
||||
"dest":{
|
||||
"bucket_name":"",
|
||||
"oid_prefix":"",
|
||||
"push_endpoint":"",
|
||||
"push_endpoint_args":"",
|
||||
"push_endpoint_topic":"",
|
||||
"stored_secret":false,
|
||||
"persistent":true,
|
||||
},
|
||||
"arn":""
|
||||
"opaqueData":""
|
||||
},
|
||||
"subs":[]
|
||||
}
|
||||
|
||||
- topic.user: name of the user that created the topic
|
||||
- name: name of the topic
|
||||
- dest.bucket_name: not used
|
||||
- dest.oid_prefix: not used
|
||||
- dest.push_endpoint: in case of S3-compliant notifications, this value will be used as the push-endpoint URL
|
||||
- if push-endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not
|
||||
- dest.push_endpoint_args: in case of S3-compliant notifications, this value will be used as the push-endpoint args
|
||||
- dest.push_endpoint_topic: in case of S3-compliant notifications, this value will hold the topic name as sent to the endpoint (may be different than the internal topic name)
|
||||
- topic.arn: topic ARN
|
||||
- subs: list of subscriptions associated with this topic
|
||||
|
||||
Delete Topic
|
||||
````````````
|
||||
|
||||
::
|
||||
|
||||
DELETE /topics/<topic-name>
|
||||
|
||||
Delete the specified topic.
|
||||
|
||||
List Topics
|
||||
```````````
|
||||
|
||||
List all topics associated with a tenant.
|
||||
|
||||
::
|
||||
|
||||
GET /topics
|
||||
|
||||
- if push-endpoint URL contain user/password information, in any of the topic, request must be made over HTTPS. Topic list request will be rejected if not
|
||||
|
||||
S3-Compliant Notifications
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Detailed under: `Bucket Operations`_.
|
||||
|
||||
.. note::
|
||||
|
||||
- Notification creation will also create a subscription for pushing/pulling events
|
||||
- The generated subscription's name will have the same as the notification Id, and could be used later to fetch and ack events with the subscription API.
|
||||
- Notification deletion will deletes all generated subscriptions
|
||||
- In case that bucket deletion implicitly deletes the notification,
|
||||
the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
|
||||
and will have to be deleted explicitly with the subscription deletion API
|
||||
- Filtering based on metadata (which is an extension to S3) is not supported, and such rules will be ignored
|
||||
- Filtering based on tags (which is an extension to S3) is not supported, and such rules will be ignored
|
||||
|
||||
|
||||
Non S3-Compliant Notifications
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Create a Notification
|
||||
`````````````````````
|
||||
|
||||
This will create a publisher for a specific bucket into a topic.
|
||||
|
||||
::
|
||||
|
||||
PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
|
||||
|
||||
Request parameters:
|
||||
|
||||
- topic-name: name of topic
|
||||
- event: event type (string), one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
|
||||
|
||||
Delete Notification Information
|
||||
```````````````````````````````
|
||||
|
||||
Delete publisher from a specific bucket into a specific topic.
|
||||
|
||||
::
|
||||
|
||||
DELETE /notifications/bucket/<bucket>?topic=<topic-name>
|
||||
|
||||
Request parameters:
|
||||
|
||||
- topic-name: name of topic
|
||||
|
||||
.. note:: When the bucket is deleted, any notification defined on it is also deleted
|
||||
|
||||
List Notifications
|
||||
``````````````````
|
||||
|
||||
List all topics with associated events defined on a bucket.
|
||||
|
||||
::
|
||||
|
||||
GET /notifications/bucket/<bucket>
|
||||
|
||||
Response will have the following format (JSON):
|
||||
|
||||
::
|
||||
|
||||
{"topics":[
|
||||
{
|
||||
"topic":{
|
||||
"user":"",
|
||||
"name":"",
|
||||
"dest":{
|
||||
"bucket_name":"",
|
||||
"oid_prefix":"",
|
||||
"push_endpoint":"",
|
||||
"push_endpoint_args":"",
|
||||
"push_endpoint_topic":""
|
||||
}
|
||||
"arn":""
|
||||
},
|
||||
"events":[]
|
||||
}
|
||||
]}
|
||||
|
||||
Subscriptions
|
||||
~~~~~~~~~~~~~
|
||||
|
||||
Create a Subscription
|
||||
`````````````````````
|
||||
|
||||
Creates a new subscription.
|
||||
|
||||
::
|
||||
|
||||
PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
|
||||
|
||||
Request parameters:
|
||||
|
||||
- topic-name: name of topic
|
||||
- push-endpoint: URI of endpoint to send push notification to
|
||||
|
||||
The endpoint URI may include parameters depending with the type of endpoint:
|
||||
|
||||
- HTTP endpoint
|
||||
|
||||
- URI: ``http[s]://<fqdn>[:<port]``
|
||||
- port defaults to: 80/443 for HTTP/S accordingly
|
||||
- verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
|
||||
|
||||
- AMQP0.9.1 endpoint
|
||||
|
||||
- URI: ``amqp://[<user>:<password>@]<fqdn>[:<port>][/<vhost>]``
|
||||
- user/password defaults to : guest/guest
|
||||
- port defaults to: 5672
|
||||
- vhost defaults to: "/"
|
||||
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
|
||||
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
|
||||
|
||||
- "none": message is considered "delivered" if sent to broker
|
||||
- "broker": message is considered "delivered" if acked by broker (default)
|
||||
- "routable": message is considered "delivered" if broker can route to a consumer
|
||||
|
||||
- Kafka endpoint
|
||||
|
||||
- URI: ``kafka://[<user>:<password>@]<fqdn>[:<port]``
|
||||
- if ``ca-location`` is provided, secure connection will be used for connection with the broker
|
||||
- user/password may only be provided over HTTPS. Topic creation request will be rejected if not
|
||||
- user/password may only be provided together with ``ca-location``. Topic creation request will be rejected if not
|
||||
- port defaults to: 9092
|
||||
- kafka-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
|
||||
|
||||
- "none": message is considered "delivered" if sent to broker
|
||||
- "broker": message is considered "delivered" if acked by broker (default)
|
||||
|
||||
|
||||
Get Subscription Information
|
||||
````````````````````````````
|
||||
|
||||
Returns information about specific subscription.
|
||||
|
||||
::
|
||||
|
||||
GET /subscriptions/<sub-name>
|
||||
|
||||
Response will have the following format (JSON):
|
||||
|
||||
::
|
||||
|
||||
{
|
||||
"user":"",
|
||||
"name":"",
|
||||
"topic":"",
|
||||
"dest":{
|
||||
"bucket_name":"",
|
||||
"oid_prefix":"",
|
||||
"push_endpoint":"",
|
||||
"push_endpoint_args":"",
|
||||
"push_endpoint_topic":""
|
||||
}
|
||||
"s3_id":""
|
||||
}
|
||||
|
||||
- user: name of the user that created the subscription
|
||||
- name: name of the subscription
|
||||
- topic: name of the topic the subscription is associated with
|
||||
- dest.bucket_name: name of the bucket storing the events
|
||||
- dest.oid_prefix: oid prefix for the events stored in the bucket
|
||||
- dest.push_endpoint: in case of S3-compliant notifications, this value will be used as the push-endpoint URL
|
||||
- if push-endpoint URL contain user/password information, request must be made over HTTPS. Topic get request will be rejected if not
|
||||
- dest.push_endpoint_args: in case of S3-compliant notifications, this value will be used as the push-endpoint args
|
||||
- dest.push_endpoint_topic: in case of S3-compliant notifications, this value will hold the topic name as sent to the endpoint (may be different than the internal topic name)
|
||||
- s3_id: in case of S3-compliant notifications, this will hold the notification name that created the subscription
|
||||
|
||||
Delete Subscription
|
||||
```````````````````
|
||||
|
||||
Removes a subscription.
|
||||
|
||||
::
|
||||
|
||||
DELETE /subscriptions/<sub-name>
|
||||
|
||||
Events
|
||||
~~~~~~
|
||||
|
||||
Pull Events
|
||||
```````````
|
||||
|
||||
Pull events sent to a specific subscription.
|
||||
|
||||
::
|
||||
|
||||
GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
|
||||
|
||||
Request parameters:
|
||||
|
||||
- marker: pagination marker for list of events, if not specified will start from the oldest
|
||||
- max-entries: max number of events to return
|
||||
|
||||
The response will hold information on the current marker and whether there are more events not fetched:
|
||||
|
||||
::
|
||||
|
||||
{"next_marker":"","is_truncated":false,...}
|
||||
|
||||
|
||||
The actual content of the response is depended with how the subscription was created.
|
||||
In case that the subscription was created via an S3-compatible notification,
|
||||
the events will have an S3-compatible record format (JSON):
|
||||
|
||||
::
|
||||
|
||||
{"Records":[
|
||||
{
|
||||
"eventVersion":"2.1"
|
||||
"eventSource":"aws:s3",
|
||||
"awsRegion":"",
|
||||
"eventTime":"",
|
||||
"eventName":"",
|
||||
"userIdentity":{
|
||||
"principalId":""
|
||||
},
|
||||
"requestParameters":{
|
||||
"sourceIPAddress":""
|
||||
},
|
||||
"responseElements":{
|
||||
"x-amz-request-id":"",
|
||||
"x-amz-id-2":""
|
||||
},
|
||||
"s3":{
|
||||
"s3SchemaVersion":"1.0",
|
||||
"configurationId":"",
|
||||
"bucket":{
|
||||
"name":"",
|
||||
"ownerIdentity":{
|
||||
"principalId":""
|
||||
},
|
||||
"arn":"",
|
||||
"id":""
|
||||
},
|
||||
"object":{
|
||||
"key":"",
|
||||
"size":"0",
|
||||
"eTag":"",
|
||||
"versionId":"",
|
||||
"sequencer":"",
|
||||
"metadata":[],
|
||||
"tags":[]
|
||||
}
|
||||
},
|
||||
"eventId":"",
|
||||
"opaqueData":"",
|
||||
}
|
||||
]}
|
||||
|
||||
- awsRegion: zonegroup
|
||||
- eventTime: timestamp indicating when the event was triggered
|
||||
- eventName: either ``s3:ObjectCreated:``, or ``s3:ObjectRemoved:``
|
||||
- userIdentity: not supported
|
||||
- requestParameters: not supported
|
||||
- responseElements: not supported
|
||||
- s3.configurationId: notification ID that created the subscription for the event
|
||||
- s3.bucket.name: name of the bucket
|
||||
- s3.bucket.ownerIdentity.principalId: owner of the bucket
|
||||
- s3.bucket.arn: ARN of the bucket
|
||||
- s3.bucket.id: Id of the bucket (an extension to the S3 notification API)
|
||||
- s3.object.key: object key
|
||||
- s3.object.size: not supported
|
||||
- s3.object.eTag: object etag
|
||||
- s3.object.version: object version in case of versioned bucket
|
||||
- s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
|
||||
- s3.object.metadata: not supported (an extension to the S3 notification API)
|
||||
- s3.object.tags: not supported (an extension to the S3 notification API)
|
||||
- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
|
||||
- s3.opaqueData: opaque data is set in the topic configuration and added to all notifications triggered by the topic (an extension to the S3 notification API)
|
||||
|
||||
In case that the subscription was not created via a non S3-compatible notification,
|
||||
the events will have the following event format (JSON):
|
||||
|
||||
::
|
||||
|
||||
{"events":[
|
||||
{
|
||||
"id":"",
|
||||
"event":"",
|
||||
"timestamp":"",
|
||||
"info":{
|
||||
"attrs":{
|
||||
"mtime":""
|
||||
},
|
||||
"bucket":{
|
||||
"bucket_id":"",
|
||||
"name":"",
|
||||
"tenant":""
|
||||
},
|
||||
"key":{
|
||||
"instance":"",
|
||||
"name":""
|
||||
}
|
||||
}
|
||||
}
|
||||
]}
|
||||
|
||||
- id: unique ID of the event, that could be used for acking
|
||||
- event: one of: ``OBJECT_CREATE``, ``OBJECT_DELETE``, ``DELETE_MARKER_CREATE``
|
||||
- timestamp: timestamp indicating when the event was sent
|
||||
- info.attrs.mtime: timestamp indicating when the event was triggered
|
||||
- info.bucket.bucket_id: id of the bucket
|
||||
- info.bucket.name: name of the bucket
|
||||
- info.bucket.tenant: tenant the bucket belongs to
|
||||
- info.key.instance: object version in case of versioned bucket
|
||||
- info.key.name: object key
|
||||
|
||||
Ack Event
|
||||
`````````
|
||||
|
||||
Ack event so that it can be removed from the subscription history.
|
||||
|
||||
::
|
||||
|
||||
POST /subscriptions/<sub-name>?ack&event-id=<event-id>
|
||||
|
||||
Request parameters:
|
||||
|
||||
- event-id: id of event to be acked
|
||||
|
||||
.. _Bucket Notification : ../notifications
|
||||
.. _Bucket Operations: ../s3/bucketops
|
@ -2,7 +2,7 @@
|
||||
S3 Bucket Notifications Compatibility
|
||||
=====================================
|
||||
|
||||
Ceph's `Bucket Notifications`_ and `PubSub Module`_ APIs follow `AWS S3 Bucket Notifications API`_. However, some differences exist, as listed below.
|
||||
Ceph's `Bucket Notifications`_ API follow `AWS S3 Bucket Notifications API`_. However, some differences exist, as listed below.
|
||||
|
||||
|
||||
.. note::
|
||||
@ -60,70 +60,57 @@ Unsupported Fields in the Event Record
|
||||
--------------------------------------
|
||||
|
||||
The records sent for bucket notification follow format described in: `Event Message Structure`_.
|
||||
However, the following fields may be sent empty, under the different deployment options (Notification/PubSub):
|
||||
However, the ``requestParameters.sourceIPAddress`` field will be sent empty.
|
||||
|
||||
+----------------------------------------+--------------+---------------+------------------------------------------------------------+
|
||||
| Field | Notification | PubSub | Description |
|
||||
+========================================+==============+===============+============================================================+
|
||||
| ``userIdentity.principalId`` | Supported | Not Supported | The identity of the user that triggered the event |
|
||||
+----------------------------------------+--------------+---------------+------------------------------------------------------------+
|
||||
| ``requestParameters.sourceIPAddress`` | Not Supported | The IP address of the client that triggered the event |
|
||||
+----------------------------------------+--------------+---------------+------------------------------------------------------------+
|
||||
| ``requestParameters.x-amz-request-id`` | Supported | Not Supported | The request id that triggered the event |
|
||||
+----------------------------------------+--------------+---------------+------------------------------------------------------------+
|
||||
| ``requestParameters.x-amz-id-2`` | Supported | Not Supported | The IP address of the RGW on which the event was triggered |
|
||||
+----------------------------------------+--------------+---------------+------------------------------------------------------------+
|
||||
| ``s3.object.size`` | Supported | Not Supported | The size of the object |
|
||||
+----------------------------------------+--------------+---------------+------------------------------------------------------------+
|
||||
|
||||
Event Types
|
||||
-----------
|
||||
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| Event | Notification | PubSub |
|
||||
+================================================+=================+===========================================+
|
||||
| ``s3:ObjectCreated:*`` | Supported |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectCreated:Put`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectCreated:Post`` | Supported | Not Supported |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectCreated:Copy`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectCreated:CompleteMultipartUpload`` | Supported | Supported at ``s3:ObjectCreated:*`` level |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectRemoved:*`` | Supported | Supported only the specific events below |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectRemoved:Delete`` | Supported |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Expiration:Current`` | Supported, Ceph extension |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Expiration:NonCurrent`` | Supported, Ceph extension |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Expiration:DeleteMarker`` | Supported, Ceph extension |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Expiration:AbortMultipartUpload`` | Defined, Ceph extension (not generated) |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Transition:Current`` | Supported, Ceph extension |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Transition:NonCurrent`` | Supported, Ceph extension |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectSynced:*`` | Supported, Ceph extension |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectSynced:Create`` | Supported, Ceph Extension |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectSynced:Delete`` | Defined, Ceph extension (not generated) |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectSynced:DeletionMarkerCreated`` | Defined, Ceph extension (not generated) |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectRestore:Post`` | Not applicable to Ceph |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ObjectRestore:Complete`` | Not applicable to Ceph |
|
||||
+------------------------------------------------+-----------------+-------------------------------------------+
|
||||
| ``s3:ReducedRedundancyLostObject`` | Not applicable to Ceph |
|
||||
+----------------------------------------------+-----------------+---------------------------------------------+
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| Event | Note |
|
||||
+========================================================+=========================================+
|
||||
| ``s3:ObjectCreated:*`` | Supported |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectCreated:Put`` | Supported |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectCreated:Post`` | Supported |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectCreated:Copy`` | Supported |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectCreated:CompleteMultipartUpload`` | Supported |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectRemoved:*`` | Supported |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectRemoved:Delete`` | Supported |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Expiration:Current`` | Ceph extension |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Expiration:NonCurrent`` | Ceph extension |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Expiration:DeleteMarker`` | Ceph extension |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Expiration:AbortMultipartUpload`` | Defined, Ceph extension (not generated) |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Transition:Current`` | Ceph extension |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectLifecycle:Transition:NonCurrent`` | Ceph extension |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectSynced:*`` | Ceph extension |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectSynced:Create`` | Ceph Extension |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectSynced:Delete`` | Defined, Ceph extension (not generated) |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectSynced:DeletionMarkerCreated`` | Defined, Ceph extension (not generated) |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectRestore:Post`` | Not applicable |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ObjectRestore:Complete`` | Not applicable |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
| ``s3:ReducedRedundancyLostObject`` | Not applicable |
|
||||
+--------------------------------------------------------+-----------------------------------------+
|
||||
|
||||
.. note::
|
||||
|
||||
@ -158,6 +145,5 @@ We also have the following extensions to topic configuration:
|
||||
.. _AWS Simple Notification Service API: https://docs.aws.amazon.com/sns/latest/api/API_Operations.html
|
||||
.. _AWS S3 Bucket Notifications API: https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
|
||||
.. _Event Message Structure: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
|
||||
.. _`PubSub Module`: ../pubsub-module
|
||||
.. _`Bucket Notifications`: ../notifications
|
||||
.. _`boto3 SDK filter extensions`: https://github.com/ceph/ceph/tree/master/examples/boto3
|
||||
|
@ -29,7 +29,6 @@ for configuring any sync plugin
|
||||
|
||||
ElasticSearch Sync Module <elastic-sync-module>
|
||||
Cloud Sync Module <cloud-sync-module>
|
||||
PubSub Module <pubsub-module>
|
||||
Archive Sync Module <archive-sync-module>
|
||||
|
||||
.. note ``rgw`` is the default sync plugin and there is no need to explicitly
|
||||
@ -95,5 +94,4 @@ Now start the radosgw in the zone
|
||||
.. _`elasticsearch sync module`: ../elastic-sync-module
|
||||
.. _`elasticsearch`: ../elastic-sync-module
|
||||
.. _`cloud sync module`: ../cloud-sync-module
|
||||
.. _`pubsub module`: ../pubsub-module
|
||||
.. _`archive sync module`: ../archive-sync-module
|
||||
|
@ -84,7 +84,6 @@ set(librgw_common_srcs
|
||||
rgw_sync_policy.cc
|
||||
rgw_pubsub_push.cc
|
||||
rgw_notify_event_type.cc
|
||||
rgw_sync_module_pubsub_rest.cc
|
||||
rgw_period_history.cc
|
||||
rgw_period_puller.cc
|
||||
rgw_reshard.cc
|
||||
@ -177,7 +176,6 @@ set(librgw_common_srcs
|
||||
store/rados/rgw_sync_module_es.cc
|
||||
store/rados/rgw_sync_module_es_rest.cc
|
||||
store/rados/rgw_sync_module_log.cc
|
||||
store/rados/rgw_sync_module_pubsub.cc
|
||||
store/rados/rgw_sync_trace.cc
|
||||
store/rados/rgw_tools.cc
|
||||
store/rados/rgw_trim_bilog.cc
|
||||
|
@ -1,532 +0,0 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab ft=cpp
|
||||
|
||||
#include <algorithm>
|
||||
#include "rgw_rest_pubsub_common.h"
|
||||
#include "rgw_rest_pubsub.h"
|
||||
#include "rgw_sync_module_pubsub.h"
|
||||
#include "rgw_pubsub_push.h"
|
||||
#include "rgw_sync_module_pubsub_rest.h"
|
||||
#include "rgw_pubsub.h"
|
||||
#include "rgw_op.h"
|
||||
#include "rgw_rest.h"
|
||||
#include "rgw_rest_s3.h"
|
||||
#include "rgw_arn.h"
|
||||
#include "rgw_zone.h"
|
||||
#include "services/svc_zone.h"
|
||||
#include "rgw_sal_rados.h"
|
||||
|
||||
#define dout_context g_ceph_context
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
using namespace std;
|
||||
|
||||
// command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
|
||||
class RGWPSCreateTopic_ObjStore : public RGWPSCreateTopicOp {
|
||||
public:
|
||||
int get_params() override {
|
||||
|
||||
topic_name = s->object->get_name();
|
||||
|
||||
opaque_data = s->info.args.get("OpaqueData");
|
||||
dest.push_endpoint = s->info.args.get("push-endpoint");
|
||||
|
||||
if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
|
||||
return -EINVAL;
|
||||
}
|
||||
dest.push_endpoint_args = s->info.args.get_str();
|
||||
// dest object only stores endpoint info
|
||||
// bucket to store events/records will be set only when subscription is created
|
||||
dest.bucket_name = "";
|
||||
dest.oid_prefix = "";
|
||||
dest.arn_topic = topic_name;
|
||||
// the topic ARN will be sent in the reply
|
||||
const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
|
||||
store->get_zone()->get_zonegroup().get_name(),
|
||||
s->user->get_tenant(), topic_name);
|
||||
topic_arn = arn.to_string();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void send_response() override {
|
||||
if (op_ret) {
|
||||
set_req_state_err(s, op_ret);
|
||||
}
|
||||
dump_errno(s);
|
||||
end_header(s, this, "application/json");
|
||||
|
||||
if (op_ret < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
Formatter::ObjectSection section(*s->formatter, "result");
|
||||
encode_json("arn", topic_arn, s->formatter);
|
||||
}
|
||||
rgw_flush_formatter_and_reset(s, s->formatter);
|
||||
}
|
||||
};
|
||||
|
||||
// command: GET /topics
|
||||
class RGWPSListTopics_ObjStore : public RGWPSListTopicsOp {
|
||||
public:
|
||||
void send_response() override {
|
||||
if (op_ret) {
|
||||
set_req_state_err(s, op_ret);
|
||||
}
|
||||
dump_errno(s);
|
||||
end_header(s, this, "application/json");
|
||||
|
||||
if (op_ret < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
encode_json("result", result, s->formatter);
|
||||
rgw_flush_formatter_and_reset(s, s->formatter);
|
||||
}
|
||||
};
|
||||
|
||||
// command: GET /topics/<topic-name>
|
||||
class RGWPSGetTopic_ObjStore : public RGWPSGetTopicOp {
|
||||
public:
|
||||
int get_params() override {
|
||||
topic_name = s->object->get_name();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void send_response() override {
|
||||
if (op_ret) {
|
||||
set_req_state_err(s, op_ret);
|
||||
}
|
||||
dump_errno(s);
|
||||
end_header(s, this, "application/json");
|
||||
|
||||
if (op_ret < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
encode_json("result", result, s->formatter);
|
||||
rgw_flush_formatter_and_reset(s, s->formatter);
|
||||
}
|
||||
};
|
||||
|
||||
// command: DELETE /topics/<topic-name>
|
||||
class RGWPSDeleteTopic_ObjStore : public RGWPSDeleteTopicOp {
|
||||
public:
|
||||
int get_params() override {
|
||||
topic_name = s->object->get_name();
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
// ceph specifc topics handler factory
|
||||
class RGWHandler_REST_PSTopic : public RGWHandler_REST_S3 {
|
||||
protected:
|
||||
int init_permissions(RGWOp* op, optional_yield) override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int read_permissions(RGWOp* op, optional_yield) override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool supports_quota() override {
|
||||
return false;
|
||||
}
|
||||
|
||||
RGWOp *op_get() override {
|
||||
if (s->init_state.url_bucket.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
if (s->object == nullptr || s->object->empty()) {
|
||||
return new RGWPSListTopics_ObjStore();
|
||||
}
|
||||
return new RGWPSGetTopic_ObjStore();
|
||||
}
|
||||
RGWOp *op_put() override {
|
||||
if (!s->object->empty()) {
|
||||
return new RGWPSCreateTopic_ObjStore();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
RGWOp *op_delete() override {
|
||||
if (!s->object->empty()) {
|
||||
return new RGWPSDeleteTopic_ObjStore();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
public:
|
||||
explicit RGWHandler_REST_PSTopic(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
|
||||
virtual ~RGWHandler_REST_PSTopic() = default;
|
||||
};
|
||||
|
||||
// command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
|
||||
class RGWPSCreateSub_ObjStore : public RGWPSCreateSubOp {
|
||||
public:
|
||||
int get_params() override {
|
||||
sub_name = s->object->get_name();
|
||||
|
||||
bool exists;
|
||||
topic_name = s->info.args.get("topic", &exists);
|
||||
if (!exists) {
|
||||
ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
const auto psmodule = static_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
|
||||
const auto& conf = psmodule->get_effective_conf();
|
||||
|
||||
dest.push_endpoint = s->info.args.get("push-endpoint");
|
||||
if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
|
||||
return -EINVAL;
|
||||
}
|
||||
dest.push_endpoint_args = s->info.args.get_str();
|
||||
dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + topic_name;
|
||||
dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
|
||||
dest.arn_topic = topic_name;
|
||||
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
// command: GET /subscriptions/<sub-name>
|
||||
class RGWPSGetSub_ObjStore : public RGWPSGetSubOp {
|
||||
public:
|
||||
int get_params() override {
|
||||
sub_name = s->object->get_name();
|
||||
return 0;
|
||||
}
|
||||
void send_response() override {
|
||||
if (op_ret) {
|
||||
set_req_state_err(s, op_ret);
|
||||
}
|
||||
dump_errno(s);
|
||||
end_header(s, this, "application/json");
|
||||
|
||||
if (op_ret < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
encode_json("result", result, s->formatter);
|
||||
rgw_flush_formatter_and_reset(s, s->formatter);
|
||||
}
|
||||
};
|
||||
|
||||
// command: DELETE /subscriptions/<sub-name>
|
||||
class RGWPSDeleteSub_ObjStore : public RGWPSDeleteSubOp {
|
||||
public:
|
||||
int get_params() override {
|
||||
sub_name = s->object->get_name();
|
||||
topic_name = s->info.args.get("topic");
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
// command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
|
||||
class RGWPSAckSubEvent_ObjStore : public RGWPSAckSubEventOp {
|
||||
public:
|
||||
explicit RGWPSAckSubEvent_ObjStore() {}
|
||||
|
||||
int get_params() override {
|
||||
sub_name = s->object->get_name();
|
||||
|
||||
bool exists;
|
||||
|
||||
event_id = s->info.args.get("event-id", &exists);
|
||||
if (!exists) {
|
||||
ldpp_dout(this, 1) << "missing required param 'event-id'" << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
// command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
|
||||
class RGWPSPullSubEvents_ObjStore : public RGWPSPullSubEventsOp {
|
||||
public:
|
||||
int get_params() override {
|
||||
sub_name = s->object->get_name();
|
||||
marker = s->info.args.get("marker");
|
||||
const int ret = s->info.args.get_int("max-entries", &max_entries,
|
||||
RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
|
||||
if (ret < 0) {
|
||||
ldpp_dout(this, 1) << "failed to parse 'max-entries' param" << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void send_response() override {
|
||||
if (op_ret) {
|
||||
set_req_state_err(s, op_ret);
|
||||
}
|
||||
dump_errno(s);
|
||||
end_header(s, this, "application/json");
|
||||
|
||||
if (op_ret < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
encode_json("result", *sub, s->formatter);
|
||||
rgw_flush_formatter_and_reset(s, s->formatter);
|
||||
}
|
||||
};
|
||||
|
||||
// subscriptions handler factory
|
||||
class RGWHandler_REST_PSSub : public RGWHandler_REST_S3 {
|
||||
protected:
|
||||
int init_permissions(RGWOp* op, optional_yield) override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int read_permissions(RGWOp* op, optional_yield) override {
|
||||
return 0;
|
||||
}
|
||||
bool supports_quota() override {
|
||||
return false;
|
||||
}
|
||||
RGWOp *op_get() override {
|
||||
if (s->object->empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
if (s->info.args.exists("events")) {
|
||||
return new RGWPSPullSubEvents_ObjStore();
|
||||
}
|
||||
return new RGWPSGetSub_ObjStore();
|
||||
}
|
||||
RGWOp *op_put() override {
|
||||
if (!s->object->empty()) {
|
||||
return new RGWPSCreateSub_ObjStore();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
RGWOp *op_delete() override {
|
||||
if (!s->object->empty()) {
|
||||
return new RGWPSDeleteSub_ObjStore();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
RGWOp *op_post() override {
|
||||
if (s->info.args.exists("ack")) {
|
||||
return new RGWPSAckSubEvent_ObjStore();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
public:
|
||||
explicit RGWHandler_REST_PSSub(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
|
||||
virtual ~RGWHandler_REST_PSSub() = default;
|
||||
};
|
||||
|
||||
namespace {
|
||||
// extract bucket name from ceph specific notification command, with the format:
|
||||
// /notifications/<bucket-name>
|
||||
int notif_bucket_path(const string& path, std::string& bucket_name) {
|
||||
if (path.empty()) {
|
||||
return -EINVAL;
|
||||
}
|
||||
size_t pos = path.find('/');
|
||||
if (pos == string::npos) {
|
||||
return -EINVAL;
|
||||
}
|
||||
if (pos >= path.size()) {
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
string type = path.substr(0, pos);
|
||||
if (type != "bucket") {
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
bucket_name = path.substr(pos + 1);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// command (ceph specific): PUT /notification/bucket/<bucket name>?topic=<topic name>
|
||||
class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
|
||||
private:
|
||||
std::string topic_name;
|
||||
rgw::notify::EventTypeList events;
|
||||
|
||||
int get_params() override {
|
||||
bool exists;
|
||||
topic_name = s->info.args.get("topic", &exists);
|
||||
if (!exists) {
|
||||
ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
std::string events_str = s->info.args.get("events", &exists);
|
||||
if (!exists) {
|
||||
// if no events are provided, we notify on all of them
|
||||
events_str =
|
||||
"OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE,OBJECT_EXPIRATION";
|
||||
}
|
||||
rgw::notify::from_string_list(events_str, events);
|
||||
if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
|
||||
ldpp_dout(this, 1) << "invalid event type in list: " << events_str << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
return notif_bucket_path(s->object->get_name(), bucket_name);
|
||||
}
|
||||
|
||||
public:
|
||||
const char* name() const override { return "pubsub_notification_create"; }
|
||||
void execute(optional_yield y) override;
|
||||
};
|
||||
|
||||
void RGWPSCreateNotif_ObjStore::execute(optional_yield y)
|
||||
{
|
||||
ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
|
||||
|
||||
auto b = ps->get_bucket(bucket_info.bucket);
|
||||
op_ret = b->create_notification(this, topic_name, events, y);
|
||||
if (op_ret < 0) {
|
||||
ldpp_dout(this, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
|
||||
return;
|
||||
}
|
||||
ldpp_dout(this, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
|
||||
}
|
||||
|
||||
// command: DELETE /notifications/bucket/<bucket>?topic=<topic-name>
|
||||
class RGWPSDeleteNotif_ObjStore : public RGWPSDeleteNotifOp {
|
||||
private:
|
||||
std::string topic_name;
|
||||
|
||||
int get_params() override {
|
||||
bool exists;
|
||||
topic_name = s->info.args.get("topic", &exists);
|
||||
if (!exists) {
|
||||
ldpp_dout(this, 1) << "missing required param 'topic'" << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
return notif_bucket_path(s->object->get_name(), bucket_name);
|
||||
}
|
||||
|
||||
public:
|
||||
void execute(optional_yield y) override;
|
||||
const char* name() const override { return "pubsub_notification_delete"; }
|
||||
};
|
||||
|
||||
void RGWPSDeleteNotif_ObjStore::execute(optional_yield y) {
|
||||
op_ret = get_params();
|
||||
if (op_ret < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
|
||||
auto b = ps->get_bucket(bucket_info.bucket);
|
||||
op_ret = b->remove_notification(this, topic_name, y);
|
||||
if (op_ret < 0) {
|
||||
ldpp_dout(s, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
|
||||
return;
|
||||
}
|
||||
ldpp_dout(this, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
|
||||
}
|
||||
|
||||
// command: GET /notifications/bucket/<bucket>
|
||||
class RGWPSListNotifs_ObjStore : public RGWPSListNotifsOp {
|
||||
private:
|
||||
rgw_pubsub_bucket_topics result;
|
||||
|
||||
int get_params() override {
|
||||
return notif_bucket_path(s->object->get_name(), bucket_name);
|
||||
}
|
||||
|
||||
public:
|
||||
void execute(optional_yield y) override;
|
||||
void send_response() override {
|
||||
if (op_ret) {
|
||||
set_req_state_err(s, op_ret);
|
||||
}
|
||||
dump_errno(s);
|
||||
end_header(s, this, "application/json");
|
||||
|
||||
if (op_ret < 0) {
|
||||
return;
|
||||
}
|
||||
encode_json("result", result, s->formatter);
|
||||
rgw_flush_formatter_and_reset(s, s->formatter);
|
||||
}
|
||||
const char* name() const override { return "pubsub_notifications_list"; }
|
||||
};
|
||||
|
||||
void RGWPSListNotifs_ObjStore::execute(optional_yield y)
|
||||
{
|
||||
ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
|
||||
auto b = ps->get_bucket(bucket_info.bucket);
|
||||
op_ret = b->get_topics(&result);
|
||||
if (op_ret < 0) {
|
||||
ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// ceph specific notification handler factory
|
||||
class RGWHandler_REST_PSNotifs : public RGWHandler_REST_S3 {
|
||||
protected:
|
||||
int init_permissions(RGWOp* op, optional_yield) override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int read_permissions(RGWOp* op, optional_yield) override {
|
||||
return 0;
|
||||
}
|
||||
bool supports_quota() override {
|
||||
return false;
|
||||
}
|
||||
RGWOp *op_get() override {
|
||||
if (s->object->empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
return new RGWPSListNotifs_ObjStore();
|
||||
}
|
||||
RGWOp *op_put() override {
|
||||
if (!s->object->empty()) {
|
||||
return new RGWPSCreateNotif_ObjStore();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
RGWOp *op_delete() override {
|
||||
if (!s->object->empty()) {
|
||||
return new RGWPSDeleteNotif_ObjStore();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
public:
|
||||
explicit RGWHandler_REST_PSNotifs(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
|
||||
virtual ~RGWHandler_REST_PSNotifs() = default;
|
||||
};
|
||||
|
||||
// factory for ceph specific PubSub REST handlers
|
||||
RGWHandler_REST* RGWRESTMgr_PubSub::get_handler(rgw::sal::Store* store,
|
||||
req_state* const s,
|
||||
const rgw::auth::StrategyRegistry& auth_registry,
|
||||
const std::string& frontend_prefix)
|
||||
{
|
||||
if (RGWHandler_REST_S3::init_from_header(store, s, RGWFormat::JSON, true) < 0) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
RGWHandler_REST* handler{nullptr};
|
||||
|
||||
// ceph specific PubSub API: topics/subscriptions/notification are reserved bucket names
|
||||
// this API is available only on RGW that belong to a pubsub zone
|
||||
if (s->init_state.url_bucket == "topics") {
|
||||
handler = new RGWHandler_REST_PSTopic(auth_registry);
|
||||
} else if (s->init_state.url_bucket == "subscriptions") {
|
||||
handler = new RGWHandler_REST_PSSub(auth_registry);
|
||||
} else if (s->init_state.url_bucket == "notifications") {
|
||||
handler = new RGWHandler_REST_PSNotifs(auth_registry);
|
||||
} else if (s->info.args.exists("notification")) {
|
||||
const int ret = RGWHandler_REST::allocate_formatter(s, RGWFormat::XML, true);
|
||||
if (ret == 0) {
|
||||
handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
|
||||
}
|
||||
}
|
||||
|
||||
ldpp_dout(s, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
@ -1,14 +0,0 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab ft=cpp
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "rgw_rest.h"
|
||||
|
||||
class RGWRESTMgr_PubSub : public RGWRESTMgr {
|
||||
public:
|
||||
virtual RGWHandler_REST* get_handler(rgw::sal::Store* store,
|
||||
req_state* s,
|
||||
const rgw::auth::StrategyRegistry& auth_registry,
|
||||
const std::string& frontend_prefix) override;
|
||||
};
|
@ -8,7 +8,6 @@
|
||||
#include "rgw_rest_pubsub.h"
|
||||
#include "rgw_pubsub_push.h"
|
||||
#include "rgw_pubsub.h"
|
||||
#include "rgw_sync_module_pubsub.h"
|
||||
#include "rgw_op.h"
|
||||
#include "rgw_rest.h"
|
||||
#include "rgw_rest_s3.h"
|
||||
@ -546,16 +545,6 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
|
||||
std::string data_bucket_prefix = "";
|
||||
std::string data_oid_prefix = "";
|
||||
bool push_only = true;
|
||||
if (store->get_sync_module()) {
|
||||
const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
|
||||
if (psmodule) {
|
||||
const auto& conf = psmodule->get_effective_conf();
|
||||
data_bucket_prefix = conf["data_bucket_prefix"];
|
||||
data_oid_prefix = conf["data_oid_prefix"];
|
||||
// TODO: allow "push-only" on PS zone as well
|
||||
push_only = false;
|
||||
}
|
||||
}
|
||||
|
||||
if(configurations.list.empty()) {
|
||||
// get all topics on a bucket
|
||||
|
@ -11,7 +11,6 @@
|
||||
#include "rgw_sync_module_log.h"
|
||||
#include "rgw_sync_module_es.h"
|
||||
#include "rgw_sync_module_aws.h"
|
||||
#include "rgw_sync_module_pubsub.h"
|
||||
|
||||
#include <boost/asio/yield.hpp>
|
||||
|
||||
@ -85,7 +84,4 @@ void rgw_register_sync_modules(RGWSyncModulesManager *modules_manager)
|
||||
|
||||
RGWSyncModuleRef aws_module(std::make_shared<RGWAWSSyncModule>());
|
||||
modules_manager->register_module("cloud", aws_module);
|
||||
|
||||
RGWSyncModuleRef pubsub_module(std::make_shared<RGWPSSyncModule>());
|
||||
modules_manager->register_module("pubsub", pubsub_module);
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,43 +0,0 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab ft=cpp
|
||||
|
||||
#ifndef CEPH_RGW_SYNC_MODULE_PUBSUB_H
|
||||
#define CEPH_RGW_SYNC_MODULE_PUBSUB_H
|
||||
|
||||
#include "rgw_sync_module.h"
|
||||
|
||||
class RGWPSSyncModule : public RGWSyncModule {
|
||||
public:
|
||||
RGWPSSyncModule() {}
|
||||
bool supports_data_export() override {
|
||||
return false;
|
||||
}
|
||||
bool supports_writes() override {
|
||||
return true;
|
||||
}
|
||||
int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
|
||||
};
|
||||
|
||||
class RGWPSDataSyncModule;
|
||||
class RGWRESTConn;
|
||||
|
||||
class RGWPSSyncModuleInstance : public RGWSyncModuleInstance {
|
||||
std::unique_ptr<RGWPSDataSyncModule> data_handler;
|
||||
JSONFormattable effective_conf;
|
||||
public:
|
||||
RGWPSSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config);
|
||||
~RGWPSSyncModuleInstance() = default;
|
||||
RGWDataSyncModule *get_data_handler() override;
|
||||
RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
|
||||
bool supports_user_writes() override {
|
||||
return true;
|
||||
}
|
||||
const JSONFormattable& get_effective_conf() {
|
||||
return effective_conf;
|
||||
}
|
||||
// start with full sync based on configuration
|
||||
// default to incremental only
|
||||
virtual bool should_full_sync() const override;
|
||||
};
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user