mirror of
https://github.com/ceph/ceph
synced 2025-02-19 08:57:27 +00:00
Merge pull request #29637 from liuchang0812/rgw-es7
rgw: sync with elastic search v7 Reviewed-by: Abhishek Lekshmanan <abhishek@suse.com>
This commit is contained in:
commit
125ab97d88
@ -112,6 +112,7 @@ public:
|
||||
|
||||
using ESVersion = std::pair<int,int>;
|
||||
static constexpr ESVersion ES_V5{5,0};
|
||||
static constexpr ESVersion ES_V7{7,0};
|
||||
|
||||
struct ESInfo {
|
||||
std::string name;
|
||||
@ -171,6 +172,7 @@ struct ElasticConfig {
|
||||
uint32_t num_shards{0};
|
||||
uint32_t num_replicas{0};
|
||||
std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
|
||||
ESInfo es_info;
|
||||
|
||||
void init(CephContext *cct, const JSONFormattable& config) {
|
||||
string elastic_endpoint = config["endpoint"];
|
||||
@ -216,7 +218,12 @@ struct ElasticConfig {
|
||||
}
|
||||
|
||||
string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
|
||||
return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
|
||||
if (es_info.version >= ES_V7) {
|
||||
return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
|
||||
;
|
||||
} else {
|
||||
return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
|
||||
}
|
||||
}
|
||||
|
||||
bool should_handle_operation(RGWBucketInfo& bucket_info) {
|
||||
@ -328,8 +335,12 @@ struct es_type : public T {
|
||||
|
||||
template <class T>
|
||||
struct es_index_mappings {
|
||||
ESVersion es_version;
|
||||
ESType string_type {ESType::String};
|
||||
|
||||
es_index_mappings(ESVersion esv):es_version(esv) {
|
||||
}
|
||||
|
||||
es_type<T> est(ESType t) const {
|
||||
return es_type<T>(t);
|
||||
}
|
||||
@ -345,7 +356,8 @@ struct es_index_mappings {
|
||||
}
|
||||
|
||||
void dump(Formatter *f) const {
|
||||
f->open_object_section("object");
|
||||
if (es_version <= ES_V7)
|
||||
f->open_object_section("object");
|
||||
f->open_object_section("properties");
|
||||
encode_json("bucket", est(string_type), f);
|
||||
encode_json("name", est(string_type), f);
|
||||
@ -370,6 +382,8 @@ struct es_index_mappings {
|
||||
f->close_section(); // properties
|
||||
f->close_section(); // meta
|
||||
f->close_section(); // properties
|
||||
|
||||
if (es_version <= ES_V7)
|
||||
f->close_section(); // object
|
||||
}
|
||||
};
|
||||
@ -396,7 +410,8 @@ struct es_index_config : public es_index_config_base {
|
||||
es_index_settings settings;
|
||||
es_index_mappings<T> mappings;
|
||||
|
||||
es_index_config(es_index_settings& _s) : settings(_s) {}
|
||||
es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) {
|
||||
}
|
||||
|
||||
void dump(Formatter *f) const {
|
||||
encode_json("settings", settings, f);
|
||||
@ -679,10 +694,10 @@ public:
|
||||
|
||||
if (es_info.version >= ES_V5) {
|
||||
ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
|
||||
index_conf.reset(new es_index_config<es_type_v5>(settings));
|
||||
index_conf.reset(new es_index_config<es_type_v5>(settings, es_info.version));
|
||||
} else {
|
||||
ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
|
||||
index_conf.reset(new es_index_config<es_type_v2>(settings));
|
||||
index_conf.reset(new es_index_config<es_type_v2>(settings, es_info.version));
|
||||
}
|
||||
call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
|
||||
conf->conn.get(),
|
||||
@ -694,7 +709,8 @@ public:
|
||||
if (retcode < 0) {
|
||||
ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
|
||||
|
||||
if (err_response.error.type != "index_already_exists_exception") {
|
||||
if (err_response.error.type != "index_already_exists_exception" &&
|
||||
err_response.error.type != "resource_already_exists_exception") {
|
||||
return set_cr_error(retcode);
|
||||
}
|
||||
|
||||
@ -801,6 +817,25 @@ public:
|
||||
|
||||
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
|
||||
conf->init_instance(sync_env->store->svc()->zone->get_realm(), instance_id);
|
||||
// try to get elastic search version
|
||||
RGWCoroutinesManager crs(sync_env->store->ctx(), sync_env->store->getRados()->get_cr_registry());
|
||||
RGWHTTPManager http_manager(sync_env->store->ctx(), crs.get_completion_mgr());
|
||||
int ret = http_manager.start();
|
||||
if (ret < 0) {
|
||||
return;
|
||||
}
|
||||
ret = crs.run(new RGWReadRESTResourceCR<ESInfo>(sync_env->cct,
|
||||
conf->conn.get(),
|
||||
&http_manager,
|
||||
"/", nullptr,
|
||||
&(conf->default_headers),
|
||||
&(conf->es_info)));
|
||||
http_manager.stop();
|
||||
if (ret < 0) {
|
||||
ldout(sync_env->cct, 1) << conf->id << ": fetch elastic info failed: " << ret << dendl;
|
||||
} else {
|
||||
ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
|
||||
|
Loading…
Reference in New Issue
Block a user