rgw: loadgen frontend read uid, init access key

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
This commit is contained in:
Yehuda Sadeh 2013-12-16 17:05:18 -08:00
parent 57137cb4b0
commit e8a4b307f9

View File

@ -233,6 +233,7 @@ public:
virtual ~RGWProcess() {}
virtual void run() = 0;
virtual void handle_request(RGWRequest *req) = 0;
virtual void close_fd() {}
};
@ -321,35 +322,47 @@ void RGWFCGXProcess::run()
}
struct RGWLoadGenRequest : public RGWRequest {
const char *method;
int content_length;
RGWLoadGenRequest(const char *_m, int _cl) : method(_m), content_length(_cl) {}
};
class RGWLoadGenProcess : public RGWProcess {
int sock_fd;
RGWAccessKey access_key;
public:
RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {}
RGWProcess(cct, pe, num_threads, _conf) {}
void run();
void handle_request(RGWRequest *req);
void gen_request(const char *method, int content_length);
void close_fd() { }
void set_access_key(RGWAccessKey& key) { access_key = key; }
};
void RGWLoadGenProcess::run()
{
m_tp.start(); /* start thread pool */
for (;;) {
RGWLoadGenRequest *req = new RGWLoadGenRequest;
req->id = ++max_req_id;
dout(10) << "allocated request req=" << hex << req << dec << dendl;
req_throttle.get(1);
req_wq.queue(req);
for (int i = 0; i < 1000; i++) {
gen_request("PUT", 4096);
}
m_tp.drain();
m_tp.stop();
}
void RGWLoadGenProcess::gen_request(const char *method, int content_length)
{
RGWLoadGenRequest *req = new RGWLoadGenRequest(method, content_length);
req->id = ++max_req_id;
dout(10) << "allocated request req=" << hex << req << dec << dendl;
req_throttle.get(1);
req_wq.queue(req);
}
static void signal_shutdown()
{
if (!disable_signal_fd.read()) {
@ -561,9 +574,9 @@ void RGWLoadGenProcess::handle_request(RGWRequest *r)
RGWLoadGenRequestEnv env;
env.port = 80;
env.content_length = 0;
env.content_length = req->content_length;
env.content_type = "binary/octet-stream";
env.request_method = "GET";
env.request_method = req->method;
env.uri = "/foo/bar";
RGWLoadGenIO client_io(&env);
@ -712,6 +725,9 @@ bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out)
class RGWFrontend {
public:
virtual ~RGWFrontend() {}
virtual int init() = 0;
virtual int run() = 0;
virtual void stop() = 0;
virtual void join() = 0;
@ -728,17 +744,15 @@ public:
};
};
template <class T>
class RGWProcessFrontend : public RGWFrontend {
protected:
RGWFrontendConfig *conf;
T *pprocess;
RGWProcess *pprocess;
RGWProcessEnv env;
RGWProcessControlThread *thread;
public:
RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) {
pprocess = new T(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
thread = new RGWProcessControlThread(pprocess);
RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), pprocess(NULL), env(pe), thread(NULL) {
}
~RGWProcessFrontend() {
@ -746,6 +760,8 @@ public:
}
int run() {
assert(pprocess); /* should have initialized by init() */
thread = new RGWProcessControlThread(pprocess);
thread->create();
return 0;
}
@ -760,6 +776,51 @@ public:
}
};
class RGWFCGXFrontend : public RGWProcessFrontend {
public:
RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
int init() {
pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
return 0;
}
};
class RGWLoadGenFrontend : public RGWProcessFrontend {
public:
RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
int init() {
RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
pprocess = pp;
string uid;
conf->get_val("uid", "", &uid);
if (uid.empty()) {
derr << "ERROR: uid param must be specified for loadgen frontend" << dendl;
return EINVAL;
}
RGWUserInfo user_info;
int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
if (ret < 0) {
derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl;
return ret;
}
map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
if (aiter == user_info.access_keys.end()) {
derr << "ERROR: user has no S3 access keys set" << dendl;
return -EINVAL;
}
pp->set_access_key(aiter->second);
return 0;
}
};
class RGWMongooseFrontend : public RGWFrontend {
RGWFrontendConfig *conf;
struct mg_context *ctx;
@ -769,6 +830,10 @@ public:
RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), ctx(NULL), env(pe) {
}
int init() {
return 0;
}
int run() {
char thread_pool_buf[32];
snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
@ -971,7 +1036,7 @@ int main(int argc, const char **argv)
if (framework == "fastcgi" || framework == "fcgi") {
RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
fe = new RGWProcessFrontend<RGWFCGXProcess>(fcgi_pe, config);
fe = new RGWFCGXFrontend(fcgi_pe, config);
} else if (framework == "mongoose") {
string err;
@ -987,12 +1052,17 @@ int main(int argc, const char **argv)
RGWProcessEnv env = { store, &rest, olog, port };
fe = new RGWProcessFrontend<RGWLoadGenProcess>(env, config);
fe = new RGWLoadGenFrontend(env, config);
} else {
dout(0) << "WARNING: skipping unknown framework: " << framework << dendl;
continue;
}
dout(0) << "starting handler: " << fiter->first << dendl;
int r = fe->init();
if (r < 0) {
derr << "ERROR: failed initializing frontend" << dendl;
return -r;
}
fe->run();
fes.push_back(fe);