diff --git a/src/rgw/Makefile.am b/src/rgw/Makefile.am index 8995aff0b3c..663ac3ad724 100644 --- a/src/rgw/Makefile.am +++ b/src/rgw/Makefile.am @@ -68,6 +68,7 @@ radosgw_SOURCES = \ rgw/rgw_http_client.cc \ rgw/rgw_swift.cc \ rgw/rgw_swift_auth.cc \ + rgw/rgw_loadgen.cc \ rgw/rgw_mongoose.cc \ mongoose/mongoose.c \ rgw/rgw_main.cc diff --git a/src/rgw/rgw_client_io.cc b/src/rgw/rgw_client_io.cc index 41cd6b98b18..193f44e96e0 100644 --- a/src/rgw/rgw_client_io.cc +++ b/src/rgw/rgw_client_io.cc @@ -5,6 +5,21 @@ #include "rgw_client_io.h" +#define dout_subsys ceph_subsys_rgw + +void RGWClientIO::init(CephContext *cct) { + init_env(cct); + + if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) { + std::map& env_map = env.get_map(); + std::map::iterator iter = env_map.begin(); + + for (iter = env_map.begin(); iter != env_map.end(); ++iter) { + ldout(cct, 20) << iter->first << "=" << iter->second << dendl; + } + } +} + int RGWClientIO::print(const char *format, ...) { diff --git a/src/rgw/rgw_client_io.h b/src/rgw/rgw_client_io.h index 64705fd20cb..2e8720e502d 100644 --- a/src/rgw/rgw_client_io.h +++ b/src/rgw/rgw_client_io.h @@ -25,10 +25,7 @@ public: virtual ~RGWClientIO() {} RGWClientIO() : account(false), bytes_sent(0), bytes_received(0) {} - void init(CephContext *cct) { - init_env(cct); - } - + void init(CephContext *cct); int print(const char *format, ...); int write(const char *buf, int len); virtual void flush() = 0; diff --git a/src/rgw/rgw_loadgen.cc b/src/rgw/rgw_loadgen.cc new file mode 100644 index 00000000000..01cf1a35f09 --- /dev/null +++ b/src/rgw/rgw_loadgen.cc @@ -0,0 +1,111 @@ + +#include + +#include "rgw_loadgen.h" +#include "rgw_auth_s3.h" + + +#define dout_subsys ceph_subsys_rgw + +void RGWLoadGenRequestEnv::set_date(utime_t& tm) +{ + stringstream s; + tm.asctime(s); + date_str = s.str(); +} + +int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key) +{ + map meta_map; + map sub_resources; + + string canonical_header; + string digest; + + rgw_create_s3_canonical_header(request_method.c_str(), + NULL, /* const char *content_md5 */ + content_type.c_str(), + date_str.c_str(), + meta_map, + uri.c_str(), + sub_resources, + canonical_header); + + int ret = rgw_get_s3_header_digest(canonical_header, access_key.key, digest); + if (ret < 0) { + return ret; + } + + headers["HTTP_DATE"] = date_str; + headers["HTTP_AUTHORIZATION"] = string("AWS ") + access_key.id + ":" + digest; + + return 0; +} + +int RGWLoadGenIO::write_data(const char *buf, int len) +{ + return len; +} + +int RGWLoadGenIO::read_data(char *buf, int len) +{ + int read_len = MIN(left_to_read, (uint64_t)len); + left_to_read -= read_len; + return read_len; +} + +void RGWLoadGenIO::flush() +{ +} + +int RGWLoadGenIO::complete_request() +{ + return 0; +} + +void RGWLoadGenIO::init_env(CephContext *cct) +{ + env.init(cct); + + left_to_read = req->content_length; + + char buf[32]; + snprintf(buf, sizeof(buf), "%lld", (long long)req->content_length); + env.set("CONTENT_LENGTH", buf); + + env.set("CONTENT_TYPE", req->content_type.c_str()); + env.set("HTTP_DATE", req->date_str.c_str()); + + for (map::iterator iter = req->headers.begin(); iter != req->headers.end(); ++iter) { + env.set(iter->first.c_str(), iter->second.c_str()); + } + + env.set("REQUEST_METHOD", req->request_method.c_str()); + env.set("REQUEST_URI", req->uri.c_str()); + env.set("QUERY_STRING", req->query_string.c_str()); + env.set("SCRIPT_URI", req->uri.c_str()); + + char port_buf[16]; + snprintf(port_buf, sizeof(port_buf), "%d", req->port); + env.set("SERVER_PORT", port_buf); +} + +int RGWLoadGenIO::send_status(const char *status, const char *status_name) +{ + return 0; +} + +int RGWLoadGenIO::send_100_continue() +{ + return 0; +} + +int RGWLoadGenIO::complete_header() +{ + return 0; +} + +int RGWLoadGenIO::send_content_length(uint64_t len) +{ + return 0; +} diff --git a/src/rgw/rgw_loadgen.h b/src/rgw/rgw_loadgen.h new file mode 100644 index 00000000000..5459330ce38 --- /dev/null +++ b/src/rgw/rgw_loadgen.h @@ -0,0 +1,45 @@ +#ifndef CEPH_RGW_LOADGEN_H +#define CEPH_RGW_LOADGEN_H + +#include "rgw_client_io.h" + + +struct RGWLoadGenRequestEnv { + int port; + uint64_t content_length; + string content_type; + string request_method; + string uri; + string query_string; + string date_str; + + map headers; + + RGWLoadGenRequestEnv() : port(0), content_length(0) {} + + void set_date(utime_t& tm); + int sign(RGWAccessKey& access_key); +}; + +class RGWLoadGenIO : public RGWClientIO +{ + uint64_t left_to_read; + RGWLoadGenRequestEnv *req; +public: + void init_env(CephContext *cct); + + int write_data(const char *buf, int len); + int read_data(char *buf, int len); + + int send_status(const char *status, const char *status_name); + int send_100_continue(); + int complete_header(); + int complete_request(); + int send_content_length(uint64_t len); + + RGWLoadGenIO(RGWLoadGenRequestEnv *_re) : left_to_read(0), req(_re) {} + void flush(); +}; + + +#endif diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index e27efd6f7d6..c8ed9146d82 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -52,6 +52,7 @@ #include "rgw_log.h" #include "rgw_tools.h" #include "rgw_resolve.h" +#include "rgw_loadgen.h" #include "rgw_mongoose.h" #include "mongoose/mongoose.h" @@ -76,6 +77,8 @@ class RGWProcess; static int signal_fd[2] = {0, 0}; static atomic_t disable_signal_fd; +static void signal_shutdown(); + #define SOCKET_BACKLOG 1024 @@ -159,53 +162,52 @@ struct RGWProcessEnv { }; class RGWProcess { + deque m_req_queue; +protected: RGWRados *store; OpsLogSocket *olog; - deque m_req_queue; ThreadPool m_tp; Throttle req_throttle; RGWREST *rest; - int sock_fd; RGWFrontendConfig *conf; + int sock_fd; - RGWProcessEnv *process_env; - - struct RGWWQ : public ThreadPool::WorkQueue { + struct RGWWQ : public ThreadPool::WorkQueue { RGWProcess *process; RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) - : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), process(p) {} + : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), process(p) {} - bool _enqueue(RGWFCGXRequest *req) { + bool _enqueue(RGWRequest *req) { process->m_req_queue.push_back(req); perfcounter->inc(l_rgw_qlen); dout(20) << "enqueued request req=" << hex << req << dec << dendl; _dump_queue(); return true; } - void _dequeue(RGWFCGXRequest *req) { + void _dequeue(RGWRequest *req) { assert(0); } bool _empty() { return process->m_req_queue.empty(); } - RGWFCGXRequest *_dequeue() { + RGWRequest *_dequeue() { if (process->m_req_queue.empty()) return NULL; - RGWFCGXRequest *req = process->m_req_queue.front(); + RGWRequest *req = process->m_req_queue.front(); process->m_req_queue.pop_front(); dout(20) << "dequeued request req=" << hex << req << dec << dendl; _dump_queue(); perfcounter->inc(l_rgw_qlen, -1); return req; } - void _process(RGWFCGXRequest *req) { + void _process(RGWRequest *req) { perfcounter->inc(l_rgw_qactive); process->handle_request(req); process->req_throttle.put(1); perfcounter->inc(l_rgw_qactive, -1); } void _dump_queue() { - deque::iterator iter; + deque::iterator iter; if (process->m_req_queue.empty()) { dout(20) << "RGWWQ: empty" << dendl; return; @@ -226,24 +228,35 @@ public: RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) : store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", num_threads), req_throttle(cct, "rgw_ops", num_threads * 2), - rest(pe->rest), sock_fd(-1), + rest(pe->rest), conf(_conf), + sock_fd(-1), req_wq(this, g_conf->rgw_op_thread_timeout, g_conf->rgw_op_thread_suicide_timeout, &m_tp), max_req_id(0) {} - void run(); - void handle_request(RGWFCGXRequest *req); + virtual ~RGWProcess() {} + virtual void run() = 0; + virtual void handle_request(RGWRequest *req) = 0; void close_fd() { - if (sock_fd >= 0) - close(sock_fd); + if (sock_fd >= 0) { + ::close(sock_fd); + sock_fd = -1; + } } }; -void RGWProcess::run() -{ - sock_fd = 0; +class RGWFCGXProcess : public RGWProcess { +public: + RGWFCGXProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) : + RGWProcess(cct, pe, num_threads, _conf) {} + void run(); + void handle_request(RGWRequest *req); +}; + +void RGWFCGXProcess::run() +{ string socket_path; string socket_port; string socket_host; @@ -306,10 +319,126 @@ void RGWProcess::run() req_wq.queue(req); } - m_tp.drain(); + m_tp.drain(&req_wq); m_tp.stop(); } +struct RGWLoadGenRequest : public RGWRequest { + string method; + string resource; + int content_length; + atomic_t *fail_flag; + + + RGWLoadGenRequest(const string& _m, const string& _r, int _cl, + atomic_t *ff) : method(_m), resource(_r), content_length(_cl), fail_flag(ff) {} +}; + +class RGWLoadGenProcess : public RGWProcess { + RGWAccessKey access_key; +public: + RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) : + RGWProcess(cct, pe, num_threads, _conf) {} + void run(); + void checkpoint(); + void handle_request(RGWRequest *req); + void gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag); + + void set_access_key(RGWAccessKey& key) { access_key = key; } +}; + +void RGWLoadGenProcess::checkpoint() +{ + m_tp.drain(&req_wq); +} + +void RGWLoadGenProcess::run() +{ + m_tp.start(); /* start thread pool */ + + int i; + + int num_objs; + + conf->get_val("num_objs", 1000, &num_objs); + + int num_buckets; + conf->get_val("num_buckets", 1, &num_buckets); + + string buckets[num_buckets]; + + atomic_t failed; + + for (i = 0; i < num_buckets; i++) { + buckets[i] = "/loadgen"; + string& bucket = buckets[i]; + append_rand_alpha(NULL, bucket, bucket, 16); + + /* first create a bucket */ + gen_request("PUT", bucket, 0, &failed); + checkpoint(); + } + + string *objs = new string[num_objs]; + + if (failed.read()) { + derr << "ERROR: bucket creation failed" << dendl; + goto done; + } + + for (i = 0; i < num_objs; i++) { + char buf[16 + 1]; + gen_rand_alphanumeric(NULL, buf, sizeof(buf)); + buf[16] = '\0'; + objs[i] = buckets[i % num_buckets] + "/" + buf; + } + + for (i = 0; i < num_objs; i++) { + gen_request("PUT", objs[i], 4096, &failed); + } + + checkpoint(); + + if (failed.read()) { + derr << "ERROR: bucket creation failed" << dendl; + goto done; + } + + for (i = 0; i < num_objs; i++) { + gen_request("GET", objs[i], 4096, NULL); + } + + checkpoint(); + + for (i = 0; i < num_objs; i++) { + gen_request("DELETE", objs[i], 0, NULL); + } + + checkpoint(); + + for (i = 0; i < num_buckets; i++) { + gen_request("DELETE", buckets[i], 0, NULL); + } + +done: + checkpoint(); + + m_tp.stop(); + + delete[] objs; + + signal_shutdown(); +} + +void RGWLoadGenProcess::gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag) +{ + RGWLoadGenRequest *req = new RGWLoadGenRequest(method, resource, content_length, fail_flag); + req->id = ++max_req_id; + dout(10) << "allocated request req=" << hex << req << dec << dendl; + req_throttle.get(1); + req_wq.queue(req); +} + static void signal_shutdown() { if (!disable_signal_fd.read()) { @@ -491,11 +620,12 @@ done: dout(1) << "====== req done req=" << hex << req << dec << " http_status=" << http_ret << " ======" << dendl; - return ret; + return (ret < 0 ? ret : s->err.ret); } -void RGWProcess::handle_request(RGWFCGXRequest *req) +void RGWFCGXProcess::handle_request(RGWRequest *r) { + RGWFCGXRequest *req = static_cast(r); FCGX_Request *fcgx = &req->fcgx; RGWFCGX client_io(fcgx); @@ -511,6 +641,37 @@ void RGWProcess::handle_request(RGWFCGXRequest *req) delete req; } +void RGWLoadGenProcess::handle_request(RGWRequest *r) +{ + RGWLoadGenRequest *req = static_cast(r); + + RGWLoadGenRequestEnv env; + + utime_t tm = ceph_clock_now(NULL); + + env.port = 80; + env.content_length = req->content_length; + env.content_type = "binary/octet-stream"; + env.request_method = req->method; + env.uri = req->resource; + env.set_date(tm); + env.sign(access_key); + + RGWLoadGenIO client_io(&env); + + int ret = process_request(store, rest, req, &client_io, olog); + if (ret < 0) { + /* we don't really care about return code */ + dout(20) << "process_request() returned " << ret << dendl; + + if (req->fail_flag) { + req->fail_flag->inc(); + } + } + + delete req; +} + static int mongoose_callback(struct mg_connection *conn) { struct mg_request_info *req_info = mg_get_request_info(conn); @@ -646,15 +807,18 @@ bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out) class RGWFrontend { public: virtual ~RGWFrontend() {} + + virtual int init() = 0; + virtual int run() = 0; virtual void stop() = 0; virtual void join() = 0; }; -class RGWFCGXControlThread : public Thread { +class RGWProcessControlThread : public Thread { RGWProcess *pprocess; public: - RGWFCGXControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} + RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} void *entry() { pprocess->run(); @@ -662,24 +826,25 @@ public: }; }; -class RGWFCGXFrontend : public RGWFrontend { +class RGWProcessFrontend : public RGWFrontend { +protected: RGWFrontendConfig *conf; RGWProcess *pprocess; RGWProcessEnv env; - RGWFCGXControlThread *thread; + RGWProcessControlThread *thread; public: - RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) { - pprocess = new RGWProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); - thread = new RGWFCGXControlThread(pprocess); + RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), pprocess(NULL), env(pe), thread(NULL) { } - ~RGWFCGXFrontend() { + ~RGWProcessFrontend() { delete thread; delete pprocess; } int run() { + assert(pprocess); /* should have initialized by init() */ + thread = new RGWProcessControlThread(pprocess); thread->create(); return 0; } @@ -694,6 +859,53 @@ public: } }; +class RGWFCGXFrontend : public RGWProcessFrontend { +public: + RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {} + + int init() { + pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); + return 0; + } +}; + +class RGWLoadGenFrontend : public RGWProcessFrontend { +public: + RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {} + + int init() { + int num_threads; + conf->get_val("num_threads", g_conf->rgw_thread_pool_size, &num_threads); + RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env, num_threads, conf); + + pprocess = pp; + + string uid; + conf->get_val("uid", "", &uid); + if (uid.empty()) { + derr << "ERROR: uid param must be specified for loadgen frontend" << dendl; + return EINVAL; + } + + RGWUserInfo user_info; + int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL); + if (ret < 0) { + derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl; + return ret; + } + + map::iterator aiter = user_info.access_keys.begin(); + if (aiter == user_info.access_keys.end()) { + derr << "ERROR: user has no S3 access keys set" << dendl; + return -EINVAL; + } + + pp->set_access_key(aiter->second); + + return 0; + } +}; + class RGWMongooseFrontend : public RGWFrontend { RGWFrontendConfig *conf; struct mg_context *ctx; @@ -703,6 +915,10 @@ public: RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), ctx(NULL), env(pe) { } + int init() { + return 0; + } + int run() { char thread_pool_buf[32]; snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size); @@ -915,11 +1131,23 @@ int main(int argc, const char **argv) RGWProcessEnv env = { store, &rest, olog, port }; fe = new RGWMongooseFrontend(env, config); + } else if (framework == "loadgen") { + int port; + config->get_val("port", 80, &port); + + RGWProcessEnv env = { store, &rest, olog, port }; + + fe = new RGWLoadGenFrontend(env, config); } else { dout(0) << "WARNING: skipping unknown framework: " << framework << dendl; continue; } dout(0) << "starting handler: " << fiter->first << dendl; + int r = fe->init(); + if (r < 0) { + derr << "ERROR: failed initializing frontend" << dendl; + return -r; + } fe->run(); fes.push_back(fe); diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index 53282c1c2f8..aca7873f88a 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -202,7 +202,7 @@ void set_req_state_err(struct req_state *s, int err_no) if (err_no < 0) err_no = -err_no; - s->err.ret = err_no; + s->err.ret = -err_no; if (s->prot_flags & RGW_REST_SWIFT) { r = search_err(err_no, RGW_HTTP_SWIFT_ERRORS, ARRAY_LEN(RGW_HTTP_SWIFT_ERRORS)); if (r) {