messenger: introduce timeouts on pipes.

This will return read errors on a pipe if it gets no data
for the given period of time (default 15 minutes). In a stateful
session the Connection will hang around and the next write will
initiate standard reconnect, so things keep working but we don't
rack up hundreds of useless threads!
This commit is contained in:
Greg Farnum 2010-10-15 11:21:02 -07:00
parent 6e1eeac3b3
commit 8528ebb0c6
6 changed files with 25 additions and 21 deletions

View File

@ -353,6 +353,7 @@ static struct config_option config_optionsp[] = {
OPTION(ms_dispatch_throttle_bytes, 0, OPT_INT, 100 << 20),
OPTION(ms_bind_ipv6, 0, OPT_BOOL, false),
OPTION(ms_rwthread_stack_bytes, 0, OPT_INT, 1024 << 10),
OPTION(ms_tcp_read_timeout, 0, OPT_LONGLONG, 900),
OPTION(mon_data, 0, OPT_STR, ""),
OPTION(mon_tick_interval, 0, OPT_INT, 5),
OPTION(mon_subscribe_interval, 0, OPT_DOUBLE, 300),

View File

@ -155,6 +155,7 @@ struct md_config_t {
uint64_t ms_dispatch_throttle_bytes;
bool ms_bind_ipv6;
uint64_t ms_rwthread_stack_bytes;
uint64_t ms_tcp_read_timeout;
// mon
const char *mon_data;

View File

@ -559,7 +559,7 @@ int SimpleMessenger::Pipe::accept()
// identify peer
char banner[strlen(CEPH_BANNER)+1];
rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
rc = tcp_read(sd, banner, strlen(CEPH_BANNER), messenger->timeout);
if (rc < 0) {
dout(10) << "accept couldn't read banner" << dendl;
state = STATE_CLOSED;
@ -576,7 +576,7 @@ int SimpleMessenger::Pipe::accept()
bufferptr tp(sizeof(peer_addr));
addrbl.push_back(tp);
}
rc = tcp_read(sd, addrbl.c_str(), addrbl.length());
rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout);
if (rc < 0) {
dout(10) << "accept couldn't read peer_addr" << dendl;
state = STATE_CLOSED;
@ -611,7 +611,7 @@ int SimpleMessenger::Pipe::accept()
int reply_tag = 0;
bool replace = false;
while (1) {
rc = tcp_read(sd, (char*)&connect, sizeof(connect));
rc = tcp_read(sd, (char*)&connect, sizeof(connect), messenger->timeout);
if (rc < 0) {
dout(10) << "accept couldn't read connect" << dendl;
goto fail_unlocked;
@ -621,7 +621,7 @@ int SimpleMessenger::Pipe::accept()
authorizer.clear();
if (connect.authorizer_len) {
bp = buffer::create(connect.authorizer_len);
if (tcp_read(sd, bp.c_str(), connect.authorizer_len) < 0) {
if (tcp_read(sd, bp.c_str(), connect.authorizer_len, messenger->timeout) < 0) {
dout(10) << "accept couldn't read connect authorizer" << dendl;
goto fail_unlocked;
}
@ -957,7 +957,7 @@ int SimpleMessenger::Pipe::connect()
// verify banner
// FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER), messenger->timeout);
if (rc < 0) {
dout(2) << "connect couldn't read banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
@ -983,7 +983,7 @@ int SimpleMessenger::Pipe::connect()
bufferptr p(sizeof(paddr) * 2);
addrbl.push_back(p);
}
rc = tcp_read(sd, addrbl.c_str(), addrbl.length());
rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout);
if (rc < 0) {
dout(2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
@ -1069,7 +1069,7 @@ int SimpleMessenger::Pipe::connect()
dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
ceph_msg_connect_reply reply;
if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) {
if (tcp_read(sd, (char*)&reply, sizeof(reply), messenger->timeout) < 0) {
dout(2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
}
@ -1085,7 +1085,7 @@ int SimpleMessenger::Pipe::connect()
if (reply.authorizer_len) {
dout(10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
bufferptr bp = buffer::create(reply.authorizer_len);
if (tcp_read(sd, bp.c_str(), reply.authorizer_len) < 0) {
if (tcp_read(sd, bp.c_str(), reply.authorizer_len, messenger->timeout) < 0) {
dout(10) << "connect couldn't read connect authorizer_reply" << dendl;
goto fail;
}
@ -1460,7 +1460,7 @@ void SimpleMessenger::Pipe::reader()
char buf[80];
char tag = -1;
dout(20) << "reader reading tag..." << dendl;
int rc = tcp_read(sd, (char*)&tag, 1);
int rc = tcp_read(sd, (char*)&tag, 1, messenger->timeout);
if (rc < 0) {
pipe_lock.Lock();
dout(2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@ -1478,7 +1478,7 @@ void SimpleMessenger::Pipe::reader()
if (tag == CEPH_MSGR_TAG_ACK) {
dout(20) << "reader got ACK" << dendl;
ceph_le64 seq;
int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
int rc = tcp_read( sd, (char*)&seq, sizeof(seq), messenger->timeout);
pipe_lock.Lock();
if (rc < 0) {
dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@ -1708,12 +1708,12 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
__u32 header_crc;
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
if (tcp_read( sd, (char*)&header, sizeof(header), messenger->timeout ) < 0)
return -1;
header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
} else {
ceph_msg_header_old oldheader;
if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader), messenger->timeout ) < 0)
return -1;
// this is fugly
memcpy(&header, &oldheader, sizeof(header));
@ -1765,7 +1765,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
front_len = header.front_len;
if (front_len) {
bufferptr bp = buffer::create(front_len);
if (tcp_read( sd, bp.c_str(), front_len ) < 0)
if (tcp_read( sd, bp.c_str(), front_len, messenger->timeout ) < 0)
goto out_dethrottle;
front.push_back(bp);
dout(20) << "reader got front " << front.length() << dendl;
@ -1775,7 +1775,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
middle_len = header.middle_len;
if (middle_len) {
bufferptr bp = buffer::create(middle_len);
if (tcp_read( sd, bp.c_str(), middle_len ) < 0)
if (tcp_read( sd, bp.c_str(), middle_len, messenger->timeout ) < 0)
goto out_dethrottle;
middle.push_back(bp);
dout(20) << "reader got middle " << middle.length() << dendl;
@ -1792,7 +1792,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
(unsigned)left);
bufferptr bp = buffer::create(head);
if (tcp_read( sd, bp.c_str(), head ) < 0)
if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
goto out_dethrottle;
data.push_back(bp);
left -= head;
@ -1803,7 +1803,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
int middle = left & PAGE_MASK;
if (middle > 0) {
bufferptr bp = buffer::create_page_aligned(middle);
if (tcp_read( sd, bp.c_str(), middle ) < 0)
if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
goto out_dethrottle;
data.push_back(bp);
left -= middle;
@ -1812,7 +1812,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
if (left) {
bufferptr bp = buffer::create(left);
if (tcp_read( sd, bp.c_str(), left ) < 0)
if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
goto out_dethrottle;
data.push_back(bp);
dout(20) << "reader got data tail " << left << dendl;
@ -1820,7 +1820,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
}
// footer
if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
if (tcp_read(sd, (char*)&footer, sizeof(footer), messenger->timeout) < 0)
goto out_dethrottle;
aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;

View File

@ -214,6 +214,7 @@ private:
out_seq(0), in_seq(0), in_seq_acked(0),
reader_thread(this), writer_thread(this) {
connection_state->pipe = get();
messenger->timeout = g_conf.ms_tcp_read_timeout * 1000; //convert to ms
}
~Pipe() {
for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
@ -564,6 +565,7 @@ private:
void dispatch_entry();
SimpleMessenger *messenger; //hack to make dout macro work, will fix
int timeout;
public:
SimpleMessenger() :

View File

@ -8,14 +8,14 @@
* tcp crap
*/
int tcp_read(int sd, char *buf, int len) {
int tcp_read(int sd, char *buf, int len, int timeout) {
if (sd < 0)
return -1;
struct pollfd pfd;
pfd.fd = sd;
pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
while (len > 0) {
if (poll(&pfd, 1, -1) < 0)
if (poll(&pfd, 1, timeout) <= 0)
return -1;
if (!(pfd.revents & POLLIN))

View File

@ -25,7 +25,7 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss)
<< buf << ':' << serv;
}
extern int tcp_read(int sd, char *buf, int len);
extern int tcp_read(int sd, char *buf, int len, int timeout=0);
extern int tcp_write(int sd, const char *buf, int len);
inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) {