rgw: distribution of events to subscriptions

more work required

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Yehuda Sadeh 2018-06-29 21:10:10 -07:00
parent bdd1a690a6
commit 6cff97f871

View File

@ -165,10 +165,10 @@ struct PSConfig {
sync_instance = instance_id;
}
void get_notifs(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSNotificationConfig *> *notifs) {
void get_topics(CephContext *cct, const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
string path = bucket_info.bucket.name + "/" + key.name;
notifs->clear();
result->clear();
auto iter = notifications.upper_bound(path);
if (iter == notifications.begin()) {
@ -184,14 +184,20 @@ struct PSConfig {
break;
}
PSNotificationConfig *target = &iter->second;
PSNotificationConfig& target = iter->second;
if (!target->is_prefix &&
if (!target.is_prefix &&
path.size() != iter->first.size()) {
continue;
}
notifs->push_back(target);
auto topic = topics.find(target.topic);
if (topic == topics.end()) {
continue;
}
ldout(cct, 10) << ": found topic for path=" << bucket_info.bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
result->push_back(&topic->second);
} while (iter != notifications.begin());
}
@ -208,6 +214,10 @@ struct PSConfig {
using PSConfigRef = std::shared_ptr<PSConfig>;
using EventRef = std::shared_ptr<rgw_pubsub_event>;
static void make_event_ref(EventRef *event) {
*event = std::make_shared<rgw_pubsub_event>();
}
class PSManager;
using PSManagerRef = std::shared_ptr<PSManager>;
@ -530,39 +540,57 @@ public:
};
class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
uint64_t versioned_epoch;
vector<PSNotificationConfig *> notifs;
vector<PSNotificationConfig *>::iterator niter;
vector<PSTopicConfig *> topics;
vector<PSTopicConfig *>::iterator titer;
set<string>::iterator siter;
PSSubscriptionRef sub;
EventRef event;
public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), env(_env),
versioned_epoch(_versioned_epoch) {
PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
sync_env(_sync_env),
env(_env),
versioned_epoch(_versioned_epoch) {
#warning this will need to change obviously
env->conf->get_notifs(_bucket_info, _key, &notifs);
env->conf->get_topics(sync_env->cct, _bucket_info, _key, &topics);
}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
make_event_ref(&event);
event->bucket = bucket_info.bucket;
event->key = key;
event->event = OBJECT_CREATE;
event->timestamp = real_clock::now();
ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl;
#warning more event init
for (niter = notifs.begin(); niter != notifs.end(); ++niter) {
yield {
ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": id=" << (*niter)->id << " path=" << (*niter)->path << ", topic=" << (*niter)->topic << dendl;
for (titer = topics.begin(); titer != topics.end(); ++titer) {
ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
yield call(env->manager->get_subscription_cr(*siter, &sub));
if (retcode < 0) {
ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
continue;
}
yield call(sub->store_event_cr(event));
if (retcode < 0) {
ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
continue;
}
#warning publish notification
#if 0
string path = conf->get_obj_path(bucket_info, key);
es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
path, nullptr /* params */,
doc, nullptr /* result */));
#endif
}
if (retcode < 0) {
return set_cr_error(retcode);