rgw: pubsub: trivial push notifications

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2018-08-03 15:48:53 -07:00
parent eb213bb302
commit 31b352fcfb

View File

@ -6,6 +6,7 @@
#include "rgw_sync_module_pubsub_rest.h"
#include "rgw_rest_conn.h"
#include "rgw_cr_rados.h"
#include "rgw_cr_rest.h"
#include "rgw_cr_tools.h"
#include "rgw_op.h"
#include "rgw_pubsub.h"
@ -545,6 +546,11 @@ class PSSubscription;
using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
class PSSubscription {
class InitCR;
class StoreEventCR;
friend class InitCR;
friend class StoreEventCR;
RGWDataSyncEnv *sync_env;
PSEnvRef env;
PSSubConfigRef sub_conf;
@ -553,6 +559,11 @@ class PSSubscription {
RGWDataAccessRef data_access;
RGWDataAccess::BucketRef bucket;
struct push_endpoint_info {
shared_ptr<RGWRESTConn> conn;
string path;
} push;
class InitCR;
InitCR *init_cr{nullptr};
@ -564,6 +575,7 @@ class PSSubscription {
int retention_days;
rgw_bucket_lifecycle_config_params lc_config;
public:
InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env,
PSConfigRef& _conf,
@ -621,6 +633,7 @@ class PSSubscription {
return 0;
}
};
class InitCR : public RGWSingletonCR<bool> {
RGWDataSyncEnv *sync_env;
PSSubscriptionRef sub;
@ -629,6 +642,36 @@ class PSSubscription {
PSConfigRef& conf;
PSSubConfigRef& sub_conf;
int i;
bool split_endpoint(const string& push_endpoint, string *addr, string *path) {
if (push_endpoint.size() < 9) { /* http://x/ */
return false;
}
size_t pos = push_endpoint.find(':');
if (pos == string::npos || pos >= push_endpoint.size() - 1) {
return false;
}
string protocol = push_endpoint.substr(0, pos);
string s = push_endpoint.substr(pos + 1);
if (s.size() < 4) { /* //x/ */
return false;
}
size_t slash_pos = s.find('/', 2);
if (slash_pos == string::npos) {
return false;
}
pos += slash_pos;
*addr = push_endpoint.substr(0, pos + 1);
*path = push_endpoint.substr(pos + 1);
return true;
}
public:
InitCR(RGWDataSyncEnv *_sync_env,
PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sync_env->cct),
@ -671,6 +714,18 @@ class PSSubscription {
ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
return set_cr_error(retcode);
}
if (!sub_conf->push_endpoint.empty()) {
string remote_id = string("pubsub:sub:") + sub->get_bucket_info_result->bucket_info.owner.to_str() + ":" + sub_conf->name;
string addr;
if (split_endpoint(sub_conf->push_endpoint, &addr, &sub->push.path)) {
list<string> endpoints{addr};
sub->push.conn = std::make_shared<RGWRESTConn>(sync_env->cct, sync_env->store, remote_id, endpoints);
} else {
ldout(sync_env->cct, 20) << "failed to split push endpoint: " << sub_conf->push_endpoint << dendl;
}
}
return set_cr_done();
}
@ -698,9 +753,12 @@ class PSSubscription {
}
};
using PushCR = RGWPostRESTResourceCR<rgw_pubsub_event, int>;
class StoreEventCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSSubscriptionRef sub;
EventRef event;
PSEvent pse;
PSConfigRef& conf;
PSSubConfigRef& sub_conf;
@ -713,6 +771,7 @@ class PSSubscription {
EventRef& _event) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
sub(_sub),
event(_event),
pse(_event),
conf(sub->env->conf),
sub_conf(sub->sub_conf) {
@ -743,6 +802,19 @@ class PSSubscription {
return set_cr_error(retcode);
}
if (sub->push.conn) {
yield {
rgw_http_param_pair params[] = {
{ nullptr, nullptr }
};
call(new PushCR(sync_env->cct, sub->push.conn.get(),
sync_env->http_manager,
sub->push.path,
params, *event, nullptr));
}
}
return set_cr_done();
}
return 0;