Merge pull request #2768 from ceph/wip-msgr

msgr: merge AsyncMessenger

Lightly reviewed.  Won't impact build or runtime unless explicitly enabled with ms_type = async (or random).
This commit is contained in:
Sage Weil 2014-10-24 10:58:27 -07:00
commit 3ef6fd1d0f
13 changed files with 4229 additions and 3 deletions

View File

@ -139,6 +139,7 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds
OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send
OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at
OPTION(ms_async_op_threads, OPT_INT, 2)
OPTION(inject_early_sigterm, OPT_BOOL, false)

View File

@ -17,14 +17,24 @@ libmsg_la_SOURCES += \
msg/simple/DispatchQueue.cc \
msg/simple/Pipe.cc \
msg/simple/PipeConnection.cc \
msg/simple/SimpleMessenger.cc
msg/simple/SimpleMessenger.cc \
msg/async/AsyncConnection.cc \
msg/async/AsyncMessenger.cc \
msg/async/Event.cc \
msg/async/net_handler.cc \
msg/async/EventEpoll.cc
noinst_HEADERS += \
msg/simple/Accepter.h \
msg/simple/DispatchQueue.h \
msg/simple/Pipe.h \
msg/simple/PipeConnection.h \
msg/simple/SimpleMessenger.h
msg/simple/SimpleMessenger.h \
msg/async/AsyncConnection.h \
msg/async/AsyncMessenger.h \
msg/async/Event.h \
msg/async/EventEpoll.h \
msg/async/net_handler.h
noinst_LTLIBRARIES += libmsg.la

View File

@ -3,14 +3,20 @@
#include "Messenger.h"
#include "msg/simple/SimpleMessenger.h"
#include "msg/async/AsyncMessenger.h"
Messenger *Messenger::create(CephContext *cct,
entity_name_t name,
string lname,
uint64_t nonce)
{
if (cct->_conf->ms_type == "simple")
int r = -1;
if (cct->_conf->ms_type == "random")
r = rand() % 2;
if (r == 0 || cct->_conf->ms_type == "simple")
return new SimpleMessenger(cct, name, lname, nonce);
else if (r == 1 || cct->_conf->ms_type == "async")
return new AsyncMessenger(cct, name, lname, nonce);
lderr(cct) << "unrecognized ms_type '" << cct->_conf->ms_type << "'" << dendl;
return NULL;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,270 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_MSG_ASYNCCONNECTION_H
#define CEPH_MSG_ASYNCCONNECTION_H
#include <list>
#include <map>
using namespace std;
#include "common/Mutex.h"
#include "include/buffer.h"
#include "auth/AuthSessionHandler.h"
#include "include/buffer.h"
#include "msg/Connection.h"
#include "net_handler.h"
#include "Event.h"
#include "msg/Messenger.h"
class AsyncMessenger;
/*
* AsyncConnection maintains a logic session between two endpoints. In other
* word, a pair of addresses can find the only AsyncConnection. AsyncConnection
* will handle with network fault or read/write transactions. If one file
* descriptor broken, AsyncConnection will maintain the message queue and
* sequence, try to reconnect peer endpoint.
*/
class AsyncConnection : public Connection {
const static uint64_t IOV_LEN = 1024;
int read_bulk(int fd, char *buf, int len);
int do_sendmsg(struct msghdr &msg, int len, bool more);
// if "send" is false, it will only append bl to send buffer
// the main usage is avoid error happen outside messenger threads
int _try_send(bufferlist bl, bool send=true);
int _send(Message *m);
int read_until(uint64_t needed, bufferptr &p);
int _process_connection();
void _connect();
void _stop();
int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
void was_session_reset();
void fault();
void discard_out_queue();
void discard_requeued_up_to(uint64_t seq);
void requeue_sent();
int randomize_out_seq();
void handle_ack(uint64_t seq);
void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
int write_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& blist);
int _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
bufferlist authorizer_reply) {
bufferlist reply_bl;
reply.tag = tag;
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
reply_bl.append((char*)&reply, sizeof(reply));
if (reply.authorizer_len) {
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
}
int r = _try_send(reply_bl);
if (r < 0)
return -1;
state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
return 0;
}
bool is_queued() {
return !out_q.empty() || outcoming_bl.length();
}
void shutdown_socket() {
if (sd >= 0)
::shutdown(sd, SHUT_RDWR);
}
Message *_get_next_outgoing() {
Message *m = 0;
while (!m && !out_q.empty()) {
map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
if (!p->second.empty()) {
m = p->second.front();
p->second.pop_front();
}
if (p->second.empty())
out_q.erase(p->first);
}
return m;
}
public:
AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c);
~AsyncConnection();
ostream& _conn_prefix(std::ostream *_dout);
bool is_connected() {
// FIXME?
return true;
}
// Only call when AsyncConnection first construct
void connect(const entity_addr_t& addr, int type) {
set_peer_type(type);
set_peer_addr(addr);
policy = msgr->get_policy(type);
_connect();
}
// Only call when AsyncConnection first construct
void accept(int sd);
int send_message(Message *m);
void send_keepalive();
void mark_down() {
Mutex::Locker l(lock);
_stop();
}
void mark_disposable() {
Mutex::Locker l(lock);
policy.lossy = true;
}
private:
enum {
STATE_NONE,
STATE_OPEN,
STATE_OPEN_KEEPALIVE2,
STATE_OPEN_KEEPALIVE2_ACK,
STATE_OPEN_TAG_ACK,
STATE_OPEN_MESSAGE_HEADER,
STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
STATE_OPEN_MESSAGE_THROTTLE_BYTES,
STATE_OPEN_MESSAGE_READ_FRONT,
STATE_OPEN_MESSAGE_READ_MIDDLE,
STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
STATE_OPEN_MESSAGE_READ_DATA,
STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH,
STATE_OPEN_TAG_CLOSE,
STATE_WAIT_SEND,
STATE_CONNECTING,
STATE_CONNECTING_WAIT_BANNER,
STATE_CONNECTING_WAIT_IDENTIFY_PEER,
STATE_CONNECTING_SEND_CONNECT_MSG,
STATE_CONNECTING_WAIT_CONNECT_REPLY,
STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH,
STATE_CONNECTING_WAIT_ACK_SEQ,
STATE_CONNECTING_READY,
STATE_ACCEPTING,
STATE_ACCEPTING_HANDLE_CONNECT,
STATE_ACCEPTING_WAIT_BANNER_ADDR,
STATE_ACCEPTING_WAIT_CONNECT_MSG,
STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
STATE_ACCEPTING_WAIT_SEQ,
STATE_ACCEPTING_READY,
STATE_STANDBY,
STATE_CLOSED,
STATE_WAIT, // just wait for racing connection
};
static const char *get_state_name(int state) {
const char* const statenames[] = {"STATE_NONE",
"STATE_OPEN",
"STATE_OPEN_KEEPALIVE2",
"STATE_OPEN_KEEPALIVE2_ACK",
"STATE_OPEN_TAG_ACK",
"STATE_OPEN_MESSAGE_HEADER",
"STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
"STATE_OPEN_MESSAGE_THROTTLE_BYTES",
"STATE_OPEN_MESSAGE_READ_FRONT",
"STATE_OPEN_MESSAGE_READ_MIDDLE",
"STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
"STATE_OPEN_MESSAGE_READ_DATA",
"STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
"STATE_OPEN_TAG_CLOSE",
"STATE_WAIT_SEND",
"STATE_CONNECTING",
"STATE_CONNECTING_WAIT_BANNER",
"STATE_CONNECTING_WAIT_IDENTIFY_PEER",
"STATE_CONNECTING_SEND_CONNECT_MSG",
"STATE_CONNECTING_WAIT_CONNECT_REPLY",
"STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
"STATE_CONNECTING_WAIT_ACK_SEQ",
"STATE_CONNECTING_READY",
"STATE_ACCEPTING",
"STATE_ACCEPTING_HANDLE_CONNECT",
"STATE_ACCEPTING_WAIT_BANNER_ADDR",
"STATE_ACCEPTING_WAIT_CONNECT_MSG",
"STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
"STATE_ACCEPTING_WAIT_SEQ",
"STATE_ACCEPTING_READY",
"STATE_STANDBY",
"STATE_CLOSED",
"STATE_WAIT",
"STATE_FAULT"};
return statenames[state];
}
CephContext *cc;
AsyncMessenger *async_msgr;
int global_seq;
__u32 connect_seq, peer_global_seq;
uint64_t out_seq;
uint64_t in_seq, in_seq_acked;
int state;
int state_after_send;
int sd;
int port;
Messenger::Policy policy;
map<int, list<Message*> > out_q; // priority queue for outbound msgs
list<Message*> sent;
Mutex lock;
utime_t backoff; // backoff time
bool open_write;
EventCallbackRef read_handler;
EventCallbackRef write_handler;
EventCallbackRef reset_handler;
EventCallbackRef remote_reset_handler;
bool keepalive;
struct iovec msgvec[IOV_LEN];
// Tis section are temp variables used by state transition
// Open state
utime_t recv_stamp;
utime_t throttle_stamp;
uint64_t msg_left;
ceph_msg_header current_header;
bufferlist data_buf;
bufferlist::iterator data_blp;
bufferlist front, middle, data;
ceph_msg_connect connect_msg;
// Connecting state
bool got_bad_auth;
AuthAuthorizer *authorizer;
ceph_msg_connect_reply connect_reply;
// Accepting state
entity_addr_t socket_addr;
CryptoKey session_key;
// used only for local state, it will be overwrite when state transition
bufferptr state_buffer;
// used only by "read_until"
uint64_t state_offset;
bufferlist outcoming_bl;
NetHandler net;
EventCenter *center;
ceph::shared_ptr<AuthSessionHandler> session_security;
public:
// used by eventcallback
void handle_write();
void process();
}; /* AsyncConnection */
typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
#endif

View File

@ -0,0 +1,691 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <errno.h>
#include <iostream>
#include <fstream>
#include <poll.h>
#include "AsyncMessenger.h"
#include "common/config.h"
#include "common/Timer.h"
#include "common/errno.h"
#include "auth/Crypto.h"
#include "include/Spinlock.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
return *_dout << "-- " << m->get_myaddr() << " ";
}
static ostream& _prefix(std::ostream *_dout, Processor *p) {
return *_dout << " Processor -- ";
}
static ostream& _prefix(std::ostream *_dout, Worker *w) {
return *_dout << "--";
}
class C_handle_accept : public EventCallback {
AsyncConnectionRef conn;
int fd;
public:
C_handle_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {}
void do_request(int id) {
conn->accept(fd);
}
};
class C_handle_connect : public EventCallback {
AsyncConnectionRef conn;
const entity_addr_t addr;
int type;
public:
C_handle_connect(AsyncConnectionRef c, const entity_addr_t &d, int t)
:conn(c), addr(d), type(t) {}
void do_request(int id) {
conn->connect(addr, type);
}
};
/*******************
* Processor
*/
int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
{
const md_config_t *conf = msgr->cct->_conf;
// bind to a socket
ldout(msgr->cct, 10) << __func__ << dendl;
int family;
switch (bind_addr.get_family()) {
case AF_INET:
case AF_INET6:
family = bind_addr.get_family();
break;
default:
// bind_addr is empty
family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
}
/* socket creation */
listen_sd = ::socket(family, SOCK_STREAM, 0);
if (listen_sd < 0) {
lderr(msgr->cct) << __func__ << " unable to create socket: "
<< cpp_strerror(errno) << dendl;
return -errno;
}
// use whatever user specified (if anything)
entity_addr_t listen_addr = bind_addr;
listen_addr.set_family(family);
/* bind to port */
int rc = -1;
if (listen_addr.get_port()) {
// specific port
// reuse addr+port when possible
int on = 1;
rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (rc < 0) {
lderr(msgr->cct) << __func__ << " unable to setsockopt: "
<< cpp_strerror(errno) << dendl;
return -errno;
}
rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size());
if (rc < 0) {
lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr.ss_addr()
<< ": " << cpp_strerror(errno) << dendl;
return -errno;
}
} else {
// try a range of ports
for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) {
if (avoid_ports.count(port))
continue;
listen_addr.set_port(port);
rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size());
if (rc == 0)
break;
}
if (rc < 0) {
lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr.ss_addr()
<< " on any port in range " << msgr->cct->_conf->ms_bind_port_min
<< "-" << msgr->cct->_conf->ms_bind_port_max
<< ": " << cpp_strerror(errno) << dendl;
return -errno;
}
ldout(msgr->cct,10) << __func__ << " bound on random port " << listen_addr << dendl;
}
// what port did we get?
socklen_t llen = sizeof(listen_addr.ss_addr());
rc = getsockname(listen_sd, (sockaddr*)&listen_addr.ss_addr(), &llen);
if (rc < 0) {
rc = -errno;
lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl;
return rc;
}
ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
// listen!
rc = ::listen(listen_sd, 128);
if (rc < 0) {
rc = -errno;
lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr
<< ": " << cpp_strerror(rc) << dendl;
return rc;
}
msgr->set_myaddr(bind_addr);
if (bind_addr != entity_addr_t())
msgr->learned_addr(bind_addr);
if (msgr->get_myaddr().get_port() == 0) {
msgr->set_myaddr(listen_addr);
}
entity_addr_t addr = msgr->get_myaddr();
addr.nonce = nonce;
msgr->set_myaddr(addr);
msgr->init_local_connection();
ldout(msgr->cct,1) << __func__ << " bind my_inst.addr is " << msgr->get_myaddr() << dendl;
return 0;
}
int Processor::rebind(const set<int>& avoid_ports)
{
ldout(msgr->cct, 1) << __func__ << " rebind avoid " << avoid_ports << dendl;
entity_addr_t addr = msgr->get_myaddr();
set<int> new_avoid = avoid_ports;
new_avoid.insert(addr.get_port());
addr.set_port(0);
// adjust the nonce; we want our entity_addr_t to be truly unique.
nonce += 1000000;
msgr->my_inst.addr.nonce = nonce;
ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl;
ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl;
int r = bind(addr, new_avoid);
if (r == 0)
start();
return r;
}
int Processor::start()
{
ldout(msgr->cct, 1) << __func__ << " start" << dendl;
// start thread
if (listen_sd > 0)
create();
return 0;
}
void *Processor::entry()
{
ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
int errors = 0;
struct pollfd pfd;
pfd.fd = listen_sd;
pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
ldout(msgr->cct, 20) << __func__ << " calling poll" << dendl;
int r = poll(&pfd, 1, -1);
if (r < 0)
break;
ldout(msgr->cct,20) << __func__ << " poll got " << r << dendl;
if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
break;
ldout(msgr->cct,10) << __func__ << " pfd.revents=" << pfd.revents << dendl;
if (done) break;
// accept
entity_addr_t addr;
socklen_t slen = sizeof(addr.ss_addr());
int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
if (sd >= 0) {
errors = 0;
ldout(msgr->cct,10) << __func__ << "accepted incoming on sd " << sd << dendl;
msgr->add_accept(sd);
} else {
ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
if (++errors > 4)
break;
}
}
ldout(msgr->cct,20) << __func__ << " closing" << dendl;
// don't close socket, in case we start up again? blech.
if (listen_sd >= 0) {
::close(listen_sd);
listen_sd = -1;
}
ldout(msgr->cct,10) << __func__ << " stopping" << dendl;
return 0;
}
void Processor::stop()
{
done = true;
ldout(msgr->cct,10) << __func__ << dendl;
if (listen_sd >= 0) {
::shutdown(listen_sd, SHUT_RDWR);
}
// wait for thread to stop before closing the socket, to avoid
// racing against fd re-use.
if (is_started()) {
join();
}
if (listen_sd >= 0) {
::close(listen_sd);
listen_sd = -1;
}
done = false;
}
void Worker::stop()
{
ldout(msgr->cct, 10) << __func__ << dendl;
done = true;
center.wakeup();
}
void *Worker::entry()
{
ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
int r;
while (!done) {
ldout(msgr->cct, 20) << __func__ << " calling event process" << dendl;
r = center.process_events(30000000);
if (r < 0) {
ldout(msgr->cct,20) << __func__ << " process events failed: "
<< cpp_strerror(errno) << dendl;
// TODO do something?
}
}
return 0;
}
/*******************
* AsyncMessenger
*/
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
conn_id(0),
processor(this, _nonce),
lock("AsyncMessenger::lock"),
nonce(_nonce), did_bind(false),
global_seq(0),
cluster_protocol(0), stopped(true)
{
ceph_spin_init(&global_seq_lock);
for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
Worker *w = new Worker(this, cct);
workers.push_back(w);
}
local_connection = new AsyncConnection(cct, this, &workers[0]->center);
init_local_connection();
}
/**
* Destroy the AsyncMessenger. Pretty simple since all the work is done
* elsewhere.
*/
AsyncMessenger::~AsyncMessenger()
{
assert(!did_bind); // either we didn't bind or we shut down the Processor
}
void AsyncMessenger::ready()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
lock.Lock();
processor.start();
lock.Unlock();
}
int AsyncMessenger::shutdown()
{
ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl;
for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
(*it)->stop();
mark_down_all();
// break ref cycles on the loopback connection
processor.stop();
local_connection->set_priv(NULL);
stop_cond.Signal();
stopped = true;
return 0;
}
int AsyncMessenger::bind(const entity_addr_t &bind_addr)
{
lock.Lock();
if (started) {
ldout(cct,10) << __func__ << " already started" << dendl;
lock.Unlock();
return -1;
}
ldout(cct,10) << __func__ << " bind " << bind_addr << dendl;
lock.Unlock();
// bind to a socket
set<int> avoid_ports;
int r = processor.bind(bind_addr, avoid_ports);
if (r >= 0)
did_bind = true;
return r;
}
int AsyncMessenger::rebind(const set<int>& avoid_ports)
{
ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
assert(did_bind);
for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
(*it)->stop();
if ((*it)->is_started())
(*it)->join();
}
processor.stop();
mark_down_all();
return processor.rebind(avoid_ports);
}
int AsyncMessenger::start()
{
lock.Lock();
ldout(cct,1) << __func__ << " start" << dendl;
// register at least one entity, first!
assert(my_inst.name.type() >= 0);
assert(!started);
started = true;
stopped = false;
if (!did_bind) {
my_inst.addr.nonce = nonce;
_init_local_connection();
}
for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
(*it)->create();
lock.Unlock();
return 0;
}
void AsyncMessenger::wait()
{
lock.Lock();
if (!started) {
lock.Unlock();
return;
}
if (!stopped)
stop_cond.Wait(lock);
for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
(*it)->join();
lock.Unlock();
// done! clean up.
ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
processor.stop();
did_bind = false;
ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;
// close all pipes
lock.Lock();
{
ldout(cct, 10) << __func__ << ": closing pipes" << dendl;
while (!conns.empty()) {
AsyncConnectionRef p = conns.begin()->second;
_stop_conn(p);
}
}
lock.Unlock();
ldout(cct, 10) << __func__ << ": done." << dendl;
ldout(cct, 1) << __func__ << " complete." << dendl;
started = false;
}
AsyncConnectionRef AsyncMessenger::add_accept(int sd)
{
lock.Lock();
Worker *w = workers[conn_id % workers.size()];
AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
w->center.dispatch_event_external(EventCallbackRef(new C_handle_accept(conn, sd)));
accepting_conns.insert(conn);
conn_id++;
lock.Unlock();
return conn;
}
AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
{
assert(lock.is_locked());
assert(addr != my_inst.addr);
ldout(cct, 10) << __func__ << " " << addr
<< ", creating connection and registering" << dendl;
// create connection
Worker *w = workers[conn_id % workers.size()];
AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
conn->connect(addr, type);
assert(!conns.count(addr));
conns[addr] = conn;
conn_id++;
return conn;
}
ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
{
Mutex::Locker l(lock);
if (my_inst.addr == dest.addr) {
// local
return local_connection;
}
AsyncConnectionRef conn = _lookup_conn(dest.addr);
if (conn) {
ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl;
} else {
conn = create_connect(dest.addr, dest.name.type());
ldout(cct, 10) << __func__ << " " << dest << " new " << conn << dendl;
}
return conn;
}
ConnectionRef AsyncMessenger::get_loopback_connection()
{
return local_connection;
}
int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
{
ldout(cct, 1) << __func__ << "--> " << dest.name << " "
<< dest.addr << " -- " << *m << " -- ?+"
<< m->get_data().length() << " " << m << dendl;
if (dest.addr == entity_addr_t()) {
ldout(cct,0) << __func__ << " message " << *m
<< " with empty dest " << dest.addr << dendl;
m->put();
return -EINVAL;
}
AsyncConnectionRef conn = _lookup_conn(dest.addr);
submit_message(m, conn, dest.addr, dest.name.type());
return 0;
}
void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
const entity_addr_t& dest_addr, int dest_type)
{
if (cct->_conf->ms_dump_on_send) {
m->encode(-1, true);
ldout(cct, 0) << __func__ << "submit_message " << *m << "\n";
m->get_payload().hexdump(*_dout);
if (m->get_data().length() > 0) {
*_dout << " data:\n";
m->get_data().hexdump(*_dout);
}
*_dout << dendl;
m->clear_payload();
}
// existing connection?
if (con) {
con->send_message(m);
return ;
}
// local?
if (my_inst.addr == dest_addr) {
// local
ldout(cct, 20) << __func__ << " " << *m << " local" << dendl;
m->set_connection(local_connection.get());
m->set_recv_stamp(ceph_clock_now(cct));
ms_fast_preprocess(m);
if (ms_can_fast_dispatch(m)) {
ms_fast_dispatch(m);
} else {
if (m->get_priority() >= CEPH_MSG_PRIO_LOW) {
ms_fast_dispatch(m);
} else {
ms_deliver_dispatch(m);
}
}
return;
}
// remote, no existing pipe.
const Policy& policy = get_policy(dest_type);
if (policy.server) {
ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addr
<< ", lossy server for target type "
<< ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
m->put();
} else {
ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
}
}
/**
* If my_inst.addr doesn't have an IP set, this function
* will fill it in from the passed addr. Otherwise it does nothing and returns.
*/
void AsyncMessenger::set_addr_unknowns(entity_addr_t &addr)
{
Mutex::Locker l(lock);
if (my_inst.addr.is_blank_ip()) {
int port = my_inst.addr.get_port();
my_inst.addr.addr = addr.addr;
my_inst.addr.set_port(port);
_init_local_connection();
}
}
int AsyncMessenger::send_keepalive(Connection *con)
{
con->send_keepalive();
return 0;
}
void AsyncMessenger::mark_down_all()
{
ldout(cct,1) << __func__ << " " << dendl;
lock.Lock();
for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
q != accepting_conns.end(); ++q) {
AsyncConnectionRef p = *q;
ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
p->mark_down();
p->get();
ms_deliver_handle_reset(p.get());
}
accepting_conns.clear();
while (!conns.empty()) {
ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
AsyncConnectionRef p = it->second;
ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl;
conns.erase(it);
p->mark_down();
p->get();
ms_deliver_handle_reset(p.get());
}
lock.Unlock();
}
void AsyncMessenger::mark_down(const entity_addr_t& addr)
{
lock.Lock();
AsyncConnectionRef p = _lookup_conn(addr);
if (p) {
ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
_stop_conn(p);
p->get();
ms_deliver_handle_reset(p.get());
} else {
ldout(cct, 1) << __func__ << " " << addr << " -- pipe dne" << dendl;
}
lock.Unlock();
}
int AsyncMessenger::get_proto_version(int peer_type, bool connect)
{
int my_type = my_inst.name.type();
// set reply protocol version
if (peer_type == my_type) {
// internal
return cluster_protocol;
} else {
// public
if (connect) {
switch (peer_type) {
case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
}
} else {
switch (my_type) {
case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
}
}
}
return 0;
}
void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{
// be careful here: multiple threads may block here, and readers of
// my_inst.addr do NOT hold any lock.
// this always goes from true -> false under the protection of the
// mutex. if it is already false, we need not retake the mutex at
// all.
lock.Lock();
entity_addr_t t = peer_addr_for_me;
t.set_port(my_inst.addr.get_port());
my_inst.addr.addr = t.addr;
ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
_init_local_connection();
lock.Unlock();
}

View File

@ -0,0 +1,385 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_ASYNCMESSENGER_H
#define CEPH_ASYNCMESSENGER_H
#include "include/types.h"
#include "include/xlist.h"
#include <list>
#include <map>
using namespace std;
#include "include/unordered_map.h"
#include "include/unordered_set.h"
#include "common/Mutex.h"
#include "include/atomic.h"
#include "common/Cond.h"
#include "common/Thread.h"
#include "common/Throttle.h"
#include "msg/SimplePolicyMessenger.h"
#include "include/assert.h"
#include "AsyncConnection.h"
#include "Event.h"
class AsyncMessenger;
/**
* If the Messenger binds to a specific address, the Processor runs
* and listens for incoming connections.
*/
class Processor : public Thread {
AsyncMessenger *msgr;
bool done;
int listen_sd;
uint64_t nonce;
public:
Processor(AsyncMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {}
void *entry();
void stop();
int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
int rebind(const set<int>& avoid_port);
int start();
void accept();
};
class Worker : public Thread {
AsyncMessenger *msgr;
bool done;
public:
EventCenter center;
Worker(AsyncMessenger *m, CephContext *c): msgr(m), done(false), center(c) {
center.init(5000);
}
void *entry();
void stop();
};
/*
* AsyncMessenger is represented for maintaining a set of asynchronous connections,
* it may own a bind address and the accepted connections will be managed by
* AsyncMessenger.
*
*/
class AsyncMessenger : public SimplePolicyMessenger {
// First we have the public Messenger interface implementation...
public:
/**
* Initialize the AsyncMessenger!
*
* @param cct The CephContext to use
* @param name The name to assign ourselves
* _nonce A unique ID to use for this AsyncMessenger. It should not
* be a value that will be repeated if the daemon restarts.
*/
AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce);
/**
* Destroy the AsyncMessenger. Pretty simple since all the work is done
* elsewhere.
*/
virtual ~AsyncMessenger();
/** @defgroup Accessors
* @{
*/
void set_addr_unknowns(entity_addr_t& addr);
int get_dispatch_queue_len() {
return 0;
}
double get_dispatch_queue_max_age(utime_t now) {
return 0;
}
/** @} Accessors */
/**
* @defgroup Configuration functions
* @{
*/
void set_cluster_protocol(int p) {
assert(!started && !did_bind);
cluster_protocol = p;
}
int bind(const entity_addr_t& bind_addr);
int rebind(const set<int>& avoid_ports);
/** @} Configuration functions */
/**
* @defgroup Startup/Shutdown
* @{
*/
virtual int start();
virtual void wait();
virtual int shutdown();
/** @} // Startup/Shutdown */
/**
* @defgroup Messaging
* @{
*/
virtual int send_message(Message *m, const entity_inst_t& dest) {
Mutex::Locker l(lock);
return _send_message(m, dest);
}
/** @} // Messaging */
/**
* @defgroup Connection Management
* @{
*/
virtual ConnectionRef get_connection(const entity_inst_t& dest);
virtual ConnectionRef get_loopback_connection();
int send_keepalive(Connection *con);
virtual void mark_down(const entity_addr_t& addr);
virtual void mark_down_all();
/** @} // Connection Management */
/**
* @defgroup Inner classes
* @{
*/
Connection *create_anon_connection() {
Mutex::Locker l(lock);
Worker *w = workers[conn_id % workers.size()];
conn_id++;
return new AsyncConnection(cct, this, &w->center);
}
/**
* @} // Inner classes
*/
protected:
/**
* @defgroup Messenger Interfaces
* @{
*/
/**
* Start up the DispatchQueue thread once we have somebody to dispatch to.
*/
virtual void ready();
/** @} // Messenger Interfaces */
private:
/**
* @defgroup Utility functions
* @{
*/
/**
* Create a connection associated with the given entity (of the given type).
* Initiate the connection. (This function returning does not guarantee
* connection success.)
*
* @param addr The address of the entity to connect to.
* @param type The peer type of the entity at the address.
* @param con An existing Connection to associate with the new connection. If
* NULL, it creates a new Connection.
* @param msg an initial message to queue on the new connection
*
* @return a pointer to the newly-created connection. Caller does not own a
* reference; take one if you need it.
*/
AsyncConnectionRef create_connect(const entity_addr_t& addr, int type);
/**
* Queue up a Message for delivery to the entity specified
* by addr and dest_type.
* submit_message() is responsible for creating
* new AsyncConnection (and closing old ones) as necessary.
*
* @param m The Message to queue up. This function eats a reference.
* @param con The existing Connection to use, or NULL if you don't know of one.
* @param addr The address to send the Message to.
* @param dest_type The peer type of the address we're sending to
* just drop silently under failure.
*/
void submit_message(Message *m, AsyncConnectionRef con,
const entity_addr_t& dest_addr, int dest_type);
int _send_message(Message *m, const entity_inst_t& dest);
private:
vector<Worker*> workers;
int conn_id;
Processor processor;
friend class Processor;
/// overall lock used for AsyncMessenger data structures
Mutex lock;
// AsyncMessenger stuff
/// approximately unique ID set by the Constructor for use in entity_addr_t
uint64_t nonce;
/**
* The following aren't lock-protected since you shouldn't be able to race
* the only writers.
*/
int listen_sd;
/**
* false; set to true if the AsyncMessenger bound to a specific address;
* and set false again by Accepter::stop().
*/
bool did_bind;
/// counter for the global seq our connection protocol uses
__u32 global_seq;
/// lock to protect the global_seq
ceph_spinlock_t global_seq_lock;
/**
* hash map of addresses to Asyncconnection
*
* NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
* invalid and can be replaced by anyone holding the msgr lock
*/
ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns;
/**
* list of connection are in teh process of accepting
*
* These are not yet in the conns map.
*/
// FIXME clear up
set<AsyncConnectionRef> accepting_conns;
/// internal cluster protocol version, if any, for talking to entities of the same type.
int cluster_protocol;
Cond stop_cond;
bool stopped;
AsyncConnectionRef _lookup_conn(const entity_addr_t& k) {
assert(lock.is_locked());
ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
if (p == conns.end())
return NULL;
return p->second;
}
void _stop_conn(AsyncConnectionRef c) {
assert(lock.is_locked());
if (c) {
c->mark_down();
conns.erase(c->peer_addr);
}
}
void _init_local_connection() {
assert(lock.is_locked());
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
ms_deliver_handle_fast_connect(local_connection.get());
}
public:
/// con used for sending messages to ourselves
ConnectionRef local_connection;
/**
* @defgroup AsyncMessenger internals
* @{
*/
/**
* This wraps _lookup_conn.
*/
AsyncConnectionRef lookup_conn(const entity_addr_t& k) {
Mutex::Locker l(lock);
return _lookup_conn(k);
}
void accept_conn(AsyncConnectionRef conn) {
Mutex::Locker l(lock);
conns[conn->peer_addr] = conn;
accepting_conns.erase(conn);
}
void learned_addr(const entity_addr_t &peer_addr_for_me);
AsyncConnectionRef add_accept(int sd);
/**
* This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.
*/
AuthAuthorizer *get_authorizer(int peer_type, bool force_new) {
return ms_deliver_get_authorizer(peer_type, force_new);
}
/**
* This wraps ms_deliver_verify_authorizer; we use it for AsyncConnection.
*/
bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply,
bool& isvalid, CryptoKey& session_key) {
return ms_deliver_verify_authorizer(con, peer_type, protocol, auth,
auth_reply, isvalid, session_key);
}
/**
* Increment the global sequence for this AsyncMessenger and return it.
* This is for the connect protocol, although it doesn't hurt if somebody
* else calls it.
*
* @return a global sequence ID that nobody else has seen.
*/
__u32 get_global_seq(__u32 old=0) {
ceph_spin_lock(&global_seq_lock);
if (old > global_seq)
global_seq = old;
__u32 ret = ++global_seq;
ceph_spin_unlock(&global_seq_lock);
return ret;
}
/**
* Get the protocol version we support for the given peer type: either
* a peer protocol (if it matches our own), the protocol version for the
* peer (if we're connecting), or our protocol version (if we're accepting).
*/
int get_proto_version(int peer_type, bool connect);
/**
* Fill in the address and peer type for the local connection, which
* is used for delivering messages back to ourself.
*/
void init_local_connection() {
Mutex::Locker l(lock);
_init_local_connection();
}
/**
* @} // AsyncMessenger Internals
*/
} ;
#endif /* CEPH_ASYNCMESSENGER_H */

325
src/msg/async/Event.cc Normal file
View File

@ -0,0 +1,325 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <time.h>
#include "common/errno.h"
#include "Event.h"
#ifdef HAVE_EPOLL
#include "EventEpoll.h"
#else
#ifdef HAVE_KQUEUE
#include "EventKqueue.h"
#else
#include "EventSelect.h"
#endif
#endif
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << "Event "
class C_handle_notify : public EventCallback {
public:
C_handle_notify() {}
void do_request(int fd_or_id) {
}
};
int EventCenter::init(int n)
{
// can't init multi times
assert(nevent == 0);
#ifdef HAVE_EPOLL
driver = new EpollDriver(cct);
#else
#ifdef HAVE_KQUEUE
driver = new KqueueDriver(cct);
#else
driver = new SelectDriver(cct);
#endif
#endif
if (!driver) {
lderr(cct) << __func__ << " failed to create event driver " << dendl;
return -1;
}
int r = driver->init(n);
if (r < 0) {
lderr(cct) << __func__ << " failed to init event driver." << dendl;
return r;
}
int fds[2];
if (pipe(fds) < 0) {
lderr(cct) << __func__ << " can't create notify pipe" << dendl;
return -1;
}
notify_receive_fd = fds[0];
notify_send_fd = fds[1];
file_events = (FileEvent *)malloc(sizeof(FileEvent)*n);
memset(file_events, 0, sizeof(FileEvent)*n);
nevent = n;
create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify()));
return 0;
}
EventCenter::~EventCenter()
{
if (driver)
delete driver;
if (notify_receive_fd > 0)
::close(notify_receive_fd);
if (notify_send_fd > 0)
::close(notify_send_fd);
}
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
{
int r;
if (fd > nevent) {
int new_size = nevent << 2;
while (fd > new_size)
new_size <<= 2;
ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
r = driver->resize_events(new_size);
if (r < 0) {
lderr(cct) << __func__ << " event count is exceed." << dendl;
return -ERANGE;
}
FileEvent *new_events = (FileEvent *)realloc(file_events, sizeof(FileEvent)*new_size);
if (!new_events) {
lderr(cct) << __func__ << " failed to realloc file_events" << cpp_strerror(errno) << dendl;
return -errno;
}
file_events = new_events;
nevent = new_size;
}
EventCenter::FileEvent *event = _get_file_event(fd);
r = driver->add_event(fd, event->mask, mask);
if (r < 0)
return r;
event->mask |= mask;
if (mask & EVENT_READABLE) {
event->read_cb = ctxt;
}
if (mask & EVENT_WRITABLE) {
event->write_cb = ctxt;
}
ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask
<< " now mask is " << event->mask << dendl;
return 0;
}
void EventCenter::delete_file_event(int fd, int mask)
{
EventCenter::FileEvent *event = _get_file_event(fd);
if (!event->mask)
return ;
driver->del_event(fd, event->mask, mask);
if (mask & EVENT_READABLE && event->read_cb) {
event->read_cb.reset();
}
if (mask & EVENT_WRITABLE && event->write_cb) {
event->write_cb.reset();
}
event->mask = event->mask & (~mask);
ldout(cct, 10) << __func__ << " delete fd=" << fd << " mask=" << mask
<< " now mask is " << event->mask << dendl;
}
uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
{
uint64_t id = time_event_next_id++;
ldout(cct, 10) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
EventCenter::TimeEvent event;
utime_t expire;
struct timeval tv;
if (microseconds < 5) {
tv.tv_sec = 0;
tv.tv_usec = microseconds;
} else {
expire = ceph_clock_now(cct);
expire.copy_to_timeval(&tv);
tv.tv_sec += microseconds / 1000000;
tv.tv_usec += microseconds % 1000000;
}
expire.set_from_timeval(&tv);
event.id = id;
event.time_cb = ctxt;
time_events[expire].push_back(event);
return id;
}
void EventCenter::wakeup()
{
ldout(cct, 1) << __func__ << dendl;
char buf[1];
buf[0] = 'c';
// wake up "event_wait"
int n = write(notify_send_fd, buf, 1);
// FIXME ?
assert(n == 1);
}
int EventCenter::process_time_events()
{
int processed = 0;
time_t now = time(NULL);
utime_t cur = ceph_clock_now(cct);
ldout(cct, 10) << __func__ << " cur time is " << cur << dendl;
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < last_time) {
map<utime_t, list<TimeEvent> > changed;
for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
it != time_events.end(); ++it) {
changed[utime_t()].swap(it->second);
}
time_events.swap(changed);
}
last_time = now;
map<utime_t, list<TimeEvent> >::iterator prev;
for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
it != time_events.end(); ) {
prev = it;
if (cur >= it->first) {
for (list<TimeEvent>::iterator j = it->second.begin();
j != it->second.end(); ++j) {
ldout(cct, 10) << __func__ << " process time event: id=" << j->id << " time is "
<< it->first << dendl;
j->time_cb->do_request(j->id);
}
processed++;
++it;
time_events.erase(prev);
} else {
break;
}
}
return processed;
}
int EventCenter::process_events(int timeout_microseconds)
{
struct timeval tv;
int numevents;
bool trigger_time = false;
utime_t period, shortest, now = ceph_clock_now(cct);
now.copy_to_timeval(&tv);
if (timeout_microseconds > 0) {
tv.tv_sec += timeout_microseconds / 1000000;
tv.tv_usec += timeout_microseconds % 1000000;
}
shortest.set_from_timeval(&tv);
{
map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
if (it != time_events.end() && shortest >= it->first) {
ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
shortest = it->first;
trigger_time = true;
if (shortest > now) {
period = now - shortest;
period.copy_to_timeval(&tv);
} else {
tv.tv_sec = 0;
tv.tv_usec = 0;
}
} else {
tv.tv_sec = timeout_microseconds / 1000000;
tv.tv_usec = timeout_microseconds % 1000000;
}
}
ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
vector<FiredFileEvent> fired_events;
numevents = driver->event_wait(fired_events, &tv);
for (int j = 0; j < numevents; j++) {
int rfired = 0;
FileEvent *event = _get_file_event(fired_events[j].fd);
if (!event)
continue;
/* note the event->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (event->mask & fired_events[j].mask & EVENT_READABLE) {
rfired = 1;
event->read_cb->do_request(fired_events[j].fd);
}
event = _get_file_event(fired_events[j].fd);
if (!event)
continue;
if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
if (!rfired || event->read_cb != event->write_cb)
event->write_cb->do_request(fired_events[j].fd);
}
ldout(cct, 20) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
}
if (trigger_time)
numevents += process_time_events();
{
lock.Lock();
while (!external_events.empty()) {
EventCallbackRef e = external_events.front();
external_events.pop_front();
lock.Unlock();
e->do_request(0);
lock.Lock();
}
lock.Unlock();
}
return numevents;
}
void EventCenter::dispatch_event_external(EventCallbackRef e)
{
lock.Lock();
external_events.push_back(e);
lock.Unlock();
wakeup();
}

143
src/msg/async/Event.h Normal file
View File

@ -0,0 +1,143 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_MSG_EVENT_H
#define CEPH_MSG_EVENT_H
#ifdef __APPLE__
#include <AvailabilityMacros.h>
#endif
// We use epoll, kqueue, evport, select in descending order by performance.
#if defined(__linux__)
#define HAVE_EPOLL 1
#endif
#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif
#ifdef __sun
#include <sys/feature_tests.h>
#ifdef _DTRACE_VERSION
#define HAVE_EVPORT 1
#endif
#endif
#include "include/Context.h"
#include "include/unordered_map.h"
#include "common/WorkQueue.h"
#define EVENT_NONE 0
#define EVENT_READABLE 1
#define EVENT_WRITABLE 2
class EventCenter;
class EventCallback {
public:
virtual void do_request(int fd_or_id) = 0;
virtual ~EventCallback() {} // we want a virtual destructor!!!
};
typedef ceph::shared_ptr<EventCallback> EventCallbackRef;
struct FiredFileEvent {
int fd;
int mask;
};
/*
* EventDriver is a wrap of event mechanisms depends on different OS.
* For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
* be used for worst condition.
*/
class EventDriver {
public:
virtual ~EventDriver() {} // we want a virtual destructor!!!
virtual int init(int nevent) = 0;
virtual int add_event(int fd, int cur_mask, int mask) = 0;
virtual void del_event(int fd, int cur_mask, int del_mask) = 0;
virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
virtual int resize_events(int newsize) = 0;
};
/*
* EventCenter maintain a set of file descriptor and handle registered events.
*
* EventCenter is aimed to used by one thread, other threads access EventCenter
* only can use dispatch_event_external method which is protected by lock.
*/
class EventCenter {
struct FileEvent {
int mask;
EventCallbackRef read_cb;
EventCallbackRef write_cb;
FileEvent(): mask(0) {}
};
struct TimeEvent {
uint64_t id;
EventCallbackRef time_cb;
TimeEvent(): id(0) {}
};
CephContext *cct;
int nevent;
// Used only to external event
Mutex lock;
deque<EventCallbackRef> external_events;
FileEvent *file_events;
EventDriver *driver;
map<utime_t, list<TimeEvent> > time_events;
uint64_t time_event_next_id;
time_t last_time; // last time process time event
int notify_receive_fd;
int notify_send_fd;
int process_time_events();
FileEvent *_get_file_event(int fd) {
FileEvent *p = &file_events[fd];
if (!p->mask)
new(p) FileEvent();
return p;
}
public:
EventCenter(CephContext *c):
cct(c), nevent(0),
lock("AsyncMessenger::lock"),
driver(NULL), time_event_next_id(0),
notify_receive_fd(-1), notify_send_fd(-1) {
last_time = time(NULL);
}
~EventCenter();
int init(int nevent);
// Used by internal thread
int create_file_event(int fd, int mask, EventCallbackRef ctxt);
uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
void delete_file_event(int fd, int mask);
int process_events(int timeout_microseconds);
void wakeup();
// Used by external thread
void dispatch_event_external(EventCallbackRef e);
};
#endif

130
src/msg/async/EventEpoll.cc Normal file
View File

@ -0,0 +1,130 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "common/errno.h"
#include "EventEpoll.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << "EpollDriver."
int EpollDriver::init(int nevent)
{
events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*nevent);
if (!events) {
lderr(cct) << __func__ << " unable to malloc memory: "
<< cpp_strerror(errno) << dendl;
return -errno;
}
memset(events, 0, sizeof(struct epoll_event)*nevent);
epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
if (epfd == -1) {
lderr(cct) << __func__ << " unable to do epoll_create: "
<< cpp_strerror(errno) << dendl;
return -errno;
}
size = nevent;
return 0;
}
int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
{
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op;
op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD;
ee.events = EPOLLET;
add_mask |= cur_mask; /* Merge old events */
if (add_mask & EVENT_READABLE)
ee.events |= EPOLLIN;
if (add_mask & EVENT_WRITABLE)
ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(epfd, op, fd, &ee) == -1) {
lderr(cct) << __func__ << " unable to add event: "
<< cpp_strerror(errno) << dendl;
return -errno;
}
ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask=" << add_mask
<< dendl;
return 0;
}
void EpollDriver::del_event(int fd, int cur_mask, int delmask)
{
struct epoll_event ee;
int mask = cur_mask & (~delmask);
ee.events = 0;
if (mask & EVENT_READABLE) ee.events |= EPOLLIN;
if (mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (mask != EVENT_NONE) {
if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ee) < 0) {
lderr(cct) << __func__ << " epoll_ctl: modify fd=" << fd << " mask=" << mask
<< " failed." << cpp_strerror(errno) << dendl;
}
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ee) < 0) {
lderr(cct) << __func__ << " epoll_ctl: delete fd=" << fd
<< " failed." << cpp_strerror(errno) << dendl;
}
}
ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask=" << mask
<< dendl;
}
int EpollDriver::resize_events(int newsize)
{
return 0;
}
int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
{
int retval, numevents = 0;
retval = epoll_wait(epfd, events, size,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
fired_events.resize(numevents);
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = events + j;
if (e->events & EPOLLIN) mask |= EVENT_READABLE;
if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE;
if (e->events & EPOLLERR) mask |= EVENT_WRITABLE;
if (e->events & EPOLLHUP) mask |= EVENT_WRITABLE;
fired_events[j].fd = e->data.fd;
fired_events[j].mask = mask;
}
}
return numevents;
}

View File

@ -0,0 +1,48 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_MSG_EVENTEPOLL_H
#define CEPH_MSG_EVENTEPOLL_H
#include <unistd.h>
#include <sys/epoll.h>
#include "Event.h"
class EpollDriver : public EventDriver {
int epfd;
struct epoll_event *events;
CephContext *cct;
int size;
public:
EpollDriver(CephContext *c): epfd(-1), events(NULL), cct(c) {}
virtual ~EpollDriver() {
if (epfd != -1)
close(epfd);
if (events)
free(events);
}
int init(int nevent);
int add_event(int fd, int cur_mask, int add_mask);
void del_event(int fd, int cur_mask, int del_mask);
int resize_events(int newsize);
int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp);
};
#endif

View File

@ -0,0 +1,142 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "net_handler.h"
#include "common/errno.h"
#include "common/debug.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << "net_handler: "
namespace ceph{
int NetHandler::create_socket(int domain, bool reuse_addr)
{
int s, on = 1;
if ((s = ::socket(domain, SOCK_STREAM, 0)) == -1) {
lderr(cct) << __func__ << " couldn't created socket " << cpp_strerror(errno) << dendl;
return -errno;
}
/* Make sure connection-intensive things like the benckmark
* will be able to close/open sockets a zillion of times */
if (reuse_addr) {
if (::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
lderr(cct) << __func__ << " setsockopt SO_REUSEADDR failed: %s"
<< strerror(errno) << dendl;
return -errno;
}
}
return s;
}
int NetHandler::set_nonblock(int sd)
{
int flags;
/* Set the socket nonblocking.
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
* interrupted by a signal. */
if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
lderr(cct) << __func__ << " fcntl(F_GETFL) failed: %s" << strerror(errno) << dendl;
return -errno;
}
if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
lderr(cct) << __func__ << " fcntl(F_SETFL,O_NONBLOCK): %s" << strerror(errno) << dendl;
return -errno;
}
return 0;
}
void NetHandler::set_socket_options(int sd)
{
// disable Nagle algorithm?
if (cct->_conf->ms_tcp_nodelay) {
int flag = 1;
int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
if (r < 0) {
r = -errno;
ldout(cct, 0) << "couldn't set TCP_NODELAY: " << cpp_strerror(r) << dendl;
}
}
if (cct->_conf->ms_tcp_rcvbuf) {
int size = cct->_conf->ms_tcp_rcvbuf;
int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
if (r < 0) {
r = -errno;
ldout(cct, 0) << "couldn't set SO_RCVBUF to " << size << ": " << cpp_strerror(r) << dendl;
}
}
// block ESIGPIPE
#ifdef CEPH_USE_SO_NOSIGPIPE
int val = 1;
int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
if (r) {
r = -errno;
ldout(cct,0) << "couldn't set SO_NOSIGPIPE: " << cpp_strerror(r) << dendl;
}
#endif
}
int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
{
int ret;
int s = create_socket(addr.get_family());
if (s < 0)
return s;
if (nonblock) {
ret = set_nonblock(s);
if (ret < 0)
return ret;
}
ret = ::connect(s, (sockaddr*)&addr.addr, addr.addr_size());
if (ret < 0) {
if (errno == EINPROGRESS && nonblock)
return s;
lderr(cct) << __func__ << " connect: %s " << strerror(errno) << dendl;
close(s);
return -errno;
}
set_socket_options(s);
return s;
}
int NetHandler::connect(const entity_addr_t &addr)
{
return generic_connect(addr, false);
}
int NetHandler::nonblock_connect(const entity_addr_t &addr)
{
return generic_connect(addr, true);
}
}

View File

@ -0,0 +1,37 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_COMMON_NET_UTILS_H
#define CEPH_COMMON_NET_UTILS_H
#include "common/config.h"
namespace ceph {
class NetHandler {
private:
int create_socket(int domain, bool reuse_addr=false);
int generic_connect(const entity_addr_t& addr, bool nonblock);
CephContext *cct;
public:
NetHandler(CephContext *c): cct(c) {}
int set_nonblock(int sd);
void set_socket_options(int sd);
int connect(const entity_addr_t &addr);
int nonblock_connect(const entity_addr_t &addr);
};
}
#endif