1
0
mirror of https://github.com/ceph/ceph synced 2024-12-17 17:05:42 +00:00

Merge pull request from ceph/wip-rgw-loadgen

Wip rgw loadgen

Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Yehuda Sadeh 2014-01-17 10:24:18 -08:00
commit 184e64d073
7 changed files with 433 additions and 36 deletions

View File

@ -68,6 +68,7 @@ radosgw_SOURCES = \
rgw/rgw_http_client.cc \
rgw/rgw_swift.cc \
rgw/rgw_swift_auth.cc \
rgw/rgw_loadgen.cc \
rgw/rgw_mongoose.cc \
mongoose/mongoose.c \
rgw/rgw_main.cc

View File

@ -5,6 +5,21 @@
#include "rgw_client_io.h"
#define dout_subsys ceph_subsys_rgw
void RGWClientIO::init(CephContext *cct) {
init_env(cct);
if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
std::map<string, string, ltstr_nocase>& env_map = env.get_map();
std::map<string, string, ltstr_nocase>::iterator iter = env_map.begin();
for (iter = env_map.begin(); iter != env_map.end(); ++iter) {
ldout(cct, 20) << iter->first << "=" << iter->second << dendl;
}
}
}
int RGWClientIO::print(const char *format, ...)
{

View File

@ -25,10 +25,7 @@ public:
virtual ~RGWClientIO() {}
RGWClientIO() : account(false), bytes_sent(0), bytes_received(0) {}
void init(CephContext *cct) {
init_env(cct);
}
void init(CephContext *cct);
int print(const char *format, ...);
int write(const char *buf, int len);
virtual void flush() = 0;

111
src/rgw/rgw_loadgen.cc Normal file
View File

@ -0,0 +1,111 @@
#include <string.h>
#include "rgw_loadgen.h"
#include "rgw_auth_s3.h"
#define dout_subsys ceph_subsys_rgw
void RGWLoadGenRequestEnv::set_date(utime_t& tm)
{
stringstream s;
tm.asctime(s);
date_str = s.str();
}
int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key)
{
map<string, string> meta_map;
map<string, string> sub_resources;
string canonical_header;
string digest;
rgw_create_s3_canonical_header(request_method.c_str(),
NULL, /* const char *content_md5 */
content_type.c_str(),
date_str.c_str(),
meta_map,
uri.c_str(),
sub_resources,
canonical_header);
int ret = rgw_get_s3_header_digest(canonical_header, access_key.key, digest);
if (ret < 0) {
return ret;
}
headers["HTTP_DATE"] = date_str;
headers["HTTP_AUTHORIZATION"] = string("AWS ") + access_key.id + ":" + digest;
return 0;
}
int RGWLoadGenIO::write_data(const char *buf, int len)
{
return len;
}
int RGWLoadGenIO::read_data(char *buf, int len)
{
int read_len = MIN(left_to_read, (uint64_t)len);
left_to_read -= read_len;
return read_len;
}
void RGWLoadGenIO::flush()
{
}
int RGWLoadGenIO::complete_request()
{
return 0;
}
void RGWLoadGenIO::init_env(CephContext *cct)
{
env.init(cct);
left_to_read = req->content_length;
char buf[32];
snprintf(buf, sizeof(buf), "%lld", (long long)req->content_length);
env.set("CONTENT_LENGTH", buf);
env.set("CONTENT_TYPE", req->content_type.c_str());
env.set("HTTP_DATE", req->date_str.c_str());
for (map<string, string>::iterator iter = req->headers.begin(); iter != req->headers.end(); ++iter) {
env.set(iter->first.c_str(), iter->second.c_str());
}
env.set("REQUEST_METHOD", req->request_method.c_str());
env.set("REQUEST_URI", req->uri.c_str());
env.set("QUERY_STRING", req->query_string.c_str());
env.set("SCRIPT_URI", req->uri.c_str());
char port_buf[16];
snprintf(port_buf, sizeof(port_buf), "%d", req->port);
env.set("SERVER_PORT", port_buf);
}
int RGWLoadGenIO::send_status(const char *status, const char *status_name)
{
return 0;
}
int RGWLoadGenIO::send_100_continue()
{
return 0;
}
int RGWLoadGenIO::complete_header()
{
return 0;
}
int RGWLoadGenIO::send_content_length(uint64_t len)
{
return 0;
}

45
src/rgw/rgw_loadgen.h Normal file
View File

@ -0,0 +1,45 @@
#ifndef CEPH_RGW_LOADGEN_H
#define CEPH_RGW_LOADGEN_H
#include "rgw_client_io.h"
struct RGWLoadGenRequestEnv {
int port;
uint64_t content_length;
string content_type;
string request_method;
string uri;
string query_string;
string date_str;
map<string, string> headers;
RGWLoadGenRequestEnv() : port(0), content_length(0) {}
void set_date(utime_t& tm);
int sign(RGWAccessKey& access_key);
};
class RGWLoadGenIO : public RGWClientIO
{
uint64_t left_to_read;
RGWLoadGenRequestEnv *req;
public:
void init_env(CephContext *cct);
int write_data(const char *buf, int len);
int read_data(char *buf, int len);
int send_status(const char *status, const char *status_name);
int send_100_continue();
int complete_header();
int complete_request();
int send_content_length(uint64_t len);
RGWLoadGenIO(RGWLoadGenRequestEnv *_re) : left_to_read(0), req(_re) {}
void flush();
};
#endif

View File

@ -52,6 +52,7 @@
#include "rgw_log.h"
#include "rgw_tools.h"
#include "rgw_resolve.h"
#include "rgw_loadgen.h"
#include "rgw_mongoose.h"
#include "mongoose/mongoose.h"
@ -76,6 +77,8 @@ class RGWProcess;
static int signal_fd[2] = {0, 0};
static atomic_t disable_signal_fd;
static void signal_shutdown();
#define SOCKET_BACKLOG 1024
@ -159,53 +162,52 @@ struct RGWProcessEnv {
};
class RGWProcess {
deque<RGWRequest *> m_req_queue;
protected:
RGWRados *store;
OpsLogSocket *olog;
deque<RGWFCGXRequest *> m_req_queue;
ThreadPool m_tp;
Throttle req_throttle;
RGWREST *rest;
int sock_fd;
RGWFrontendConfig *conf;
int sock_fd;
RGWProcessEnv *process_env;
struct RGWWQ : public ThreadPool::WorkQueue<RGWFCGXRequest> {
struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
RGWProcess *process;
RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
: ThreadPool::WorkQueue<RGWFCGXRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
: ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
bool _enqueue(RGWFCGXRequest *req) {
bool _enqueue(RGWRequest *req) {
process->m_req_queue.push_back(req);
perfcounter->inc(l_rgw_qlen);
dout(20) << "enqueued request req=" << hex << req << dec << dendl;
_dump_queue();
return true;
}
void _dequeue(RGWFCGXRequest *req) {
void _dequeue(RGWRequest *req) {
assert(0);
}
bool _empty() {
return process->m_req_queue.empty();
}
RGWFCGXRequest *_dequeue() {
RGWRequest *_dequeue() {
if (process->m_req_queue.empty())
return NULL;
RGWFCGXRequest *req = process->m_req_queue.front();
RGWRequest *req = process->m_req_queue.front();
process->m_req_queue.pop_front();
dout(20) << "dequeued request req=" << hex << req << dec << dendl;
_dump_queue();
perfcounter->inc(l_rgw_qlen, -1);
return req;
}
void _process(RGWFCGXRequest *req) {
void _process(RGWRequest *req) {
perfcounter->inc(l_rgw_qactive);
process->handle_request(req);
process->req_throttle.put(1);
perfcounter->inc(l_rgw_qactive, -1);
}
void _dump_queue() {
deque<RGWFCGXRequest *>::iterator iter;
deque<RGWRequest *>::iterator iter;
if (process->m_req_queue.empty()) {
dout(20) << "RGWWQ: empty" << dendl;
return;
@ -226,24 +228,35 @@ public:
RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf)
: store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", num_threads),
req_throttle(cct, "rgw_ops", num_threads * 2),
rest(pe->rest), sock_fd(-1),
rest(pe->rest),
conf(_conf),
sock_fd(-1),
req_wq(this, g_conf->rgw_op_thread_timeout,
g_conf->rgw_op_thread_suicide_timeout, &m_tp),
max_req_id(0) {}
void run();
void handle_request(RGWFCGXRequest *req);
virtual ~RGWProcess() {}
virtual void run() = 0;
virtual void handle_request(RGWRequest *req) = 0;
void close_fd() {
if (sock_fd >= 0)
close(sock_fd);
if (sock_fd >= 0) {
::close(sock_fd);
sock_fd = -1;
}
}
};
void RGWProcess::run()
{
sock_fd = 0;
class RGWFCGXProcess : public RGWProcess {
public:
RGWFCGXProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
RGWProcess(cct, pe, num_threads, _conf) {}
void run();
void handle_request(RGWRequest *req);
};
void RGWFCGXProcess::run()
{
string socket_path;
string socket_port;
string socket_host;
@ -306,10 +319,126 @@ void RGWProcess::run()
req_wq.queue(req);
}
m_tp.drain();
m_tp.drain(&req_wq);
m_tp.stop();
}
struct RGWLoadGenRequest : public RGWRequest {
string method;
string resource;
int content_length;
atomic_t *fail_flag;
RGWLoadGenRequest(const string& _m, const string& _r, int _cl,
atomic_t *ff) : method(_m), resource(_r), content_length(_cl), fail_flag(ff) {}
};
class RGWLoadGenProcess : public RGWProcess {
RGWAccessKey access_key;
public:
RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
RGWProcess(cct, pe, num_threads, _conf) {}
void run();
void checkpoint();
void handle_request(RGWRequest *req);
void gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag);
void set_access_key(RGWAccessKey& key) { access_key = key; }
};
void RGWLoadGenProcess::checkpoint()
{
m_tp.drain(&req_wq);
}
void RGWLoadGenProcess::run()
{
m_tp.start(); /* start thread pool */
int i;
int num_objs;
conf->get_val("num_objs", 1000, &num_objs);
int num_buckets;
conf->get_val("num_buckets", 1, &num_buckets);
string buckets[num_buckets];
atomic_t failed;
for (i = 0; i < num_buckets; i++) {
buckets[i] = "/loadgen";
string& bucket = buckets[i];
append_rand_alpha(NULL, bucket, bucket, 16);
/* first create a bucket */
gen_request("PUT", bucket, 0, &failed);
checkpoint();
}
string *objs = new string[num_objs];
if (failed.read()) {
derr << "ERROR: bucket creation failed" << dendl;
goto done;
}
for (i = 0; i < num_objs; i++) {
char buf[16 + 1];
gen_rand_alphanumeric(NULL, buf, sizeof(buf));
buf[16] = '\0';
objs[i] = buckets[i % num_buckets] + "/" + buf;
}
for (i = 0; i < num_objs; i++) {
gen_request("PUT", objs[i], 4096, &failed);
}
checkpoint();
if (failed.read()) {
derr << "ERROR: bucket creation failed" << dendl;
goto done;
}
for (i = 0; i < num_objs; i++) {
gen_request("GET", objs[i], 4096, NULL);
}
checkpoint();
for (i = 0; i < num_objs; i++) {
gen_request("DELETE", objs[i], 0, NULL);
}
checkpoint();
for (i = 0; i < num_buckets; i++) {
gen_request("DELETE", buckets[i], 0, NULL);
}
done:
checkpoint();
m_tp.stop();
delete[] objs;
signal_shutdown();
}
void RGWLoadGenProcess::gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag)
{
RGWLoadGenRequest *req = new RGWLoadGenRequest(method, resource, content_length, fail_flag);
req->id = ++max_req_id;
dout(10) << "allocated request req=" << hex << req << dec << dendl;
req_throttle.get(1);
req_wq.queue(req);
}
static void signal_shutdown()
{
if (!disable_signal_fd.read()) {
@ -491,11 +620,12 @@ done:
dout(1) << "====== req done req=" << hex << req << dec << " http_status=" << http_ret << " ======" << dendl;
return ret;
return (ret < 0 ? ret : s->err.ret);
}
void RGWProcess::handle_request(RGWFCGXRequest *req)
void RGWFCGXProcess::handle_request(RGWRequest *r)
{
RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
FCGX_Request *fcgx = &req->fcgx;
RGWFCGX client_io(fcgx);
@ -511,6 +641,37 @@ void RGWProcess::handle_request(RGWFCGXRequest *req)
delete req;
}
void RGWLoadGenProcess::handle_request(RGWRequest *r)
{
RGWLoadGenRequest *req = static_cast<RGWLoadGenRequest *>(r);
RGWLoadGenRequestEnv env;
utime_t tm = ceph_clock_now(NULL);
env.port = 80;
env.content_length = req->content_length;
env.content_type = "binary/octet-stream";
env.request_method = req->method;
env.uri = req->resource;
env.set_date(tm);
env.sign(access_key);
RGWLoadGenIO client_io(&env);
int ret = process_request(store, rest, req, &client_io, olog);
if (ret < 0) {
/* we don't really care about return code */
dout(20) << "process_request() returned " << ret << dendl;
if (req->fail_flag) {
req->fail_flag->inc();
}
}
delete req;
}
static int mongoose_callback(struct mg_connection *conn) {
struct mg_request_info *req_info = mg_get_request_info(conn);
@ -646,15 +807,18 @@ bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out)
class RGWFrontend {
public:
virtual ~RGWFrontend() {}
virtual int init() = 0;
virtual int run() = 0;
virtual void stop() = 0;
virtual void join() = 0;
};
class RGWFCGXControlThread : public Thread {
class RGWProcessControlThread : public Thread {
RGWProcess *pprocess;
public:
RGWFCGXControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
void *entry() {
pprocess->run();
@ -662,24 +826,25 @@ public:
};
};
class RGWFCGXFrontend : public RGWFrontend {
class RGWProcessFrontend : public RGWFrontend {
protected:
RGWFrontendConfig *conf;
RGWProcess *pprocess;
RGWProcessEnv env;
RGWFCGXControlThread *thread;
RGWProcessControlThread *thread;
public:
RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) {
pprocess = new RGWProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
thread = new RGWFCGXControlThread(pprocess);
RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), pprocess(NULL), env(pe), thread(NULL) {
}
~RGWFCGXFrontend() {
~RGWProcessFrontend() {
delete thread;
delete pprocess;
}
int run() {
assert(pprocess); /* should have initialized by init() */
thread = new RGWProcessControlThread(pprocess);
thread->create();
return 0;
}
@ -694,6 +859,53 @@ public:
}
};
class RGWFCGXFrontend : public RGWProcessFrontend {
public:
RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
int init() {
pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
return 0;
}
};
class RGWLoadGenFrontend : public RGWProcessFrontend {
public:
RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
int init() {
int num_threads;
conf->get_val("num_threads", g_conf->rgw_thread_pool_size, &num_threads);
RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env, num_threads, conf);
pprocess = pp;
string uid;
conf->get_val("uid", "", &uid);
if (uid.empty()) {
derr << "ERROR: uid param must be specified for loadgen frontend" << dendl;
return EINVAL;
}
RGWUserInfo user_info;
int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
if (ret < 0) {
derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl;
return ret;
}
map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
if (aiter == user_info.access_keys.end()) {
derr << "ERROR: user has no S3 access keys set" << dendl;
return -EINVAL;
}
pp->set_access_key(aiter->second);
return 0;
}
};
class RGWMongooseFrontend : public RGWFrontend {
RGWFrontendConfig *conf;
struct mg_context *ctx;
@ -703,6 +915,10 @@ public:
RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), ctx(NULL), env(pe) {
}
int init() {
return 0;
}
int run() {
char thread_pool_buf[32];
snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
@ -915,11 +1131,23 @@ int main(int argc, const char **argv)
RGWProcessEnv env = { store, &rest, olog, port };
fe = new RGWMongooseFrontend(env, config);
} else if (framework == "loadgen") {
int port;
config->get_val("port", 80, &port);
RGWProcessEnv env = { store, &rest, olog, port };
fe = new RGWLoadGenFrontend(env, config);
} else {
dout(0) << "WARNING: skipping unknown framework: " << framework << dendl;
continue;
}
dout(0) << "starting handler: " << fiter->first << dendl;
int r = fe->init();
if (r < 0) {
derr << "ERROR: failed initializing frontend" << dendl;
return -r;
}
fe->run();
fes.push_back(fe);

View File

@ -202,7 +202,7 @@ void set_req_state_err(struct req_state *s, int err_no)
if (err_no < 0)
err_no = -err_no;
s->err.ret = err_no;
s->err.ret = -err_no;
if (s->prot_flags & RGW_REST_SWIFT) {
r = search_err(err_no, RGW_HTTP_SWIFT_ERRORS, ARRAY_LEN(RGW_HTTP_SWIFT_ERRORS));
if (r) {