From e8a4b307f9ccdd076d4e403655d55aecdb9123db Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 16 Dec 2013 17:05:18 -0800 Subject: [PATCH] rgw: loadgen frontend read uid, init access key Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_main.cc | 106 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 88 insertions(+), 18 deletions(-) diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index f28272c6212..3990950bed7 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -233,6 +233,7 @@ public: virtual ~RGWProcess() {} virtual void run() = 0; virtual void handle_request(RGWRequest *req) = 0; + virtual void close_fd() {} }; @@ -321,35 +322,47 @@ void RGWFCGXProcess::run() } struct RGWLoadGenRequest : public RGWRequest { + const char *method; + int content_length; + + RGWLoadGenRequest(const char *_m, int _cl) : method(_m), content_length(_cl) {} }; class RGWLoadGenProcess : public RGWProcess { - int sock_fd; + RGWAccessKey access_key; public: RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) : - RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {} + RGWProcess(cct, pe, num_threads, _conf) {} void run(); void handle_request(RGWRequest *req); + void gen_request(const char *method, int content_length); void close_fd() { } + + void set_access_key(RGWAccessKey& key) { access_key = key; } }; void RGWLoadGenProcess::run() { m_tp.start(); /* start thread pool */ - for (;;) { - RGWLoadGenRequest *req = new RGWLoadGenRequest; - req->id = ++max_req_id; - dout(10) << "allocated request req=" << hex << req << dec << dendl; - req_throttle.get(1); - req_wq.queue(req); + for (int i = 0; i < 1000; i++) { + gen_request("PUT", 4096); } - m_tp.drain(); + m_tp.stop(); } +void RGWLoadGenProcess::gen_request(const char *method, int content_length) +{ + RGWLoadGenRequest *req = new RGWLoadGenRequest(method, content_length); + req->id = ++max_req_id; + dout(10) << "allocated request req=" << hex << req << dec << dendl; + req_throttle.get(1); + req_wq.queue(req); +} + static void signal_shutdown() { if (!disable_signal_fd.read()) { @@ -561,9 +574,9 @@ void RGWLoadGenProcess::handle_request(RGWRequest *r) RGWLoadGenRequestEnv env; env.port = 80; - env.content_length = 0; + env.content_length = req->content_length; env.content_type = "binary/octet-stream"; - env.request_method = "GET"; + env.request_method = req->method; env.uri = "/foo/bar"; RGWLoadGenIO client_io(&env); @@ -712,6 +725,9 @@ bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out) class RGWFrontend { public: virtual ~RGWFrontend() {} + + virtual int init() = 0; + virtual int run() = 0; virtual void stop() = 0; virtual void join() = 0; @@ -728,17 +744,15 @@ public: }; }; -template class RGWProcessFrontend : public RGWFrontend { +protected: RGWFrontendConfig *conf; - T *pprocess; + RGWProcess *pprocess; RGWProcessEnv env; RGWProcessControlThread *thread; public: - RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) { - pprocess = new T(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); - thread = new RGWProcessControlThread(pprocess); + RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), pprocess(NULL), env(pe), thread(NULL) { } ~RGWProcessFrontend() { @@ -746,6 +760,8 @@ public: } int run() { + assert(pprocess); /* should have initialized by init() */ + thread = new RGWProcessControlThread(pprocess); thread->create(); return 0; } @@ -760,6 +776,51 @@ public: } }; +class RGWFCGXFrontend : public RGWProcessFrontend { +public: + RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {} + + int init() { + pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); + return 0; + } +}; + +class RGWLoadGenFrontend : public RGWProcessFrontend { +public: + RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {} + + int init() { + RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); + + pprocess = pp; + + string uid; + conf->get_val("uid", "", &uid); + if (uid.empty()) { + derr << "ERROR: uid param must be specified for loadgen frontend" << dendl; + return EINVAL; + } + + RGWUserInfo user_info; + int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL); + if (ret < 0) { + derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl; + return ret; + } + + map::iterator aiter = user_info.access_keys.begin(); + if (aiter == user_info.access_keys.end()) { + derr << "ERROR: user has no S3 access keys set" << dendl; + return -EINVAL; + } + + pp->set_access_key(aiter->second); + + return 0; + } +}; + class RGWMongooseFrontend : public RGWFrontend { RGWFrontendConfig *conf; struct mg_context *ctx; @@ -769,6 +830,10 @@ public: RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), ctx(NULL), env(pe) { } + int init() { + return 0; + } + int run() { char thread_pool_buf[32]; snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size); @@ -971,7 +1036,7 @@ int main(int argc, const char **argv) if (framework == "fastcgi" || framework == "fcgi") { RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 }; - fe = new RGWProcessFrontend(fcgi_pe, config); + fe = new RGWFCGXFrontend(fcgi_pe, config); } else if (framework == "mongoose") { string err; @@ -987,12 +1052,17 @@ int main(int argc, const char **argv) RGWProcessEnv env = { store, &rest, olog, port }; - fe = new RGWProcessFrontend(env, config); + fe = new RGWLoadGenFrontend(env, config); } else { dout(0) << "WARNING: skipping unknown framework: " << framework << dendl; continue; } dout(0) << "starting handler: " << fiter->first << dendl; + int r = fe->init(); + if (r < 0) { + derr << "ERROR: failed initializing frontend" << dendl; + return -r; + } fe->run(); fes.push_back(fe);