diff --git a/src/config.cc b/src/config.cc index 95809e14cbb..1925e777747 100644 --- a/src/config.cc +++ b/src/config.cc @@ -353,6 +353,7 @@ static struct config_option config_optionsp[] = { OPTION(ms_dispatch_throttle_bytes, 0, OPT_INT, 100 << 20), OPTION(ms_bind_ipv6, 0, OPT_BOOL, false), OPTION(ms_rwthread_stack_bytes, 0, OPT_INT, 1024 << 10), + OPTION(ms_tcp_read_timeout, 0, OPT_LONGLONG, 900), OPTION(mon_data, 0, OPT_STR, ""), OPTION(mon_tick_interval, 0, OPT_INT, 5), OPTION(mon_subscribe_interval, 0, OPT_DOUBLE, 300), diff --git a/src/config.h b/src/config.h index d04f6be7937..abec2b9d890 100644 --- a/src/config.h +++ b/src/config.h @@ -155,6 +155,7 @@ struct md_config_t { uint64_t ms_dispatch_throttle_bytes; bool ms_bind_ipv6; uint64_t ms_rwthread_stack_bytes; + uint64_t ms_tcp_read_timeout; // mon const char *mon_data; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index ddf65927347..463226777db 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -559,7 +559,7 @@ int SimpleMessenger::Pipe::accept() // identify peer char banner[strlen(CEPH_BANNER)+1]; - rc = tcp_read(sd, banner, strlen(CEPH_BANNER)); + rc = tcp_read(sd, banner, strlen(CEPH_BANNER), messenger->timeout); if (rc < 0) { dout(10) << "accept couldn't read banner" << dendl; state = STATE_CLOSED; @@ -576,7 +576,7 @@ int SimpleMessenger::Pipe::accept() bufferptr tp(sizeof(peer_addr)); addrbl.push_back(tp); } - rc = tcp_read(sd, addrbl.c_str(), addrbl.length()); + rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout); if (rc < 0) { dout(10) << "accept couldn't read peer_addr" << dendl; state = STATE_CLOSED; @@ -611,7 +611,7 @@ int SimpleMessenger::Pipe::accept() int reply_tag = 0; bool replace = false; while (1) { - rc = tcp_read(sd, (char*)&connect, sizeof(connect)); + rc = tcp_read(sd, (char*)&connect, sizeof(connect), messenger->timeout); if (rc < 0) { dout(10) << "accept couldn't read connect" << dendl; goto fail_unlocked; @@ -621,7 +621,7 @@ int SimpleMessenger::Pipe::accept() authorizer.clear(); if (connect.authorizer_len) { bp = buffer::create(connect.authorizer_len); - if (tcp_read(sd, bp.c_str(), connect.authorizer_len) < 0) { + if (tcp_read(sd, bp.c_str(), connect.authorizer_len, messenger->timeout) < 0) { dout(10) << "accept couldn't read connect authorizer" << dendl; goto fail_unlocked; } @@ -957,7 +957,7 @@ int SimpleMessenger::Pipe::connect() // verify banner // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. - rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER)); + rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER), messenger->timeout); if (rc < 0) { dout(2) << "connect couldn't read banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; @@ -983,7 +983,7 @@ int SimpleMessenger::Pipe::connect() bufferptr p(sizeof(paddr) * 2); addrbl.push_back(p); } - rc = tcp_read(sd, addrbl.c_str(), addrbl.length()); + rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout); if (rc < 0) { dout(2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; @@ -1069,7 +1069,7 @@ int SimpleMessenger::Pipe::connect() dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl; ceph_msg_connect_reply reply; - if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) { + if (tcp_read(sd, (char*)&reply, sizeof(reply), messenger->timeout) < 0) { dout(2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } @@ -1085,7 +1085,7 @@ int SimpleMessenger::Pipe::connect() if (reply.authorizer_len) { dout(10) << "reply.authorizer_len=" << reply.authorizer_len << dendl; bufferptr bp = buffer::create(reply.authorizer_len); - if (tcp_read(sd, bp.c_str(), reply.authorizer_len) < 0) { + if (tcp_read(sd, bp.c_str(), reply.authorizer_len, messenger->timeout) < 0) { dout(10) << "connect couldn't read connect authorizer_reply" << dendl; goto fail; } @@ -1460,7 +1460,7 @@ void SimpleMessenger::Pipe::reader() char buf[80]; char tag = -1; dout(20) << "reader reading tag..." << dendl; - int rc = tcp_read(sd, (char*)&tag, 1); + int rc = tcp_read(sd, (char*)&tag, 1, messenger->timeout); if (rc < 0) { pipe_lock.Lock(); dout(2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl; @@ -1478,7 +1478,7 @@ void SimpleMessenger::Pipe::reader() if (tag == CEPH_MSGR_TAG_ACK) { dout(20) << "reader got ACK" << dendl; ceph_le64 seq; - int rc = tcp_read( sd, (char*)&seq, sizeof(seq)); + int rc = tcp_read( sd, (char*)&seq, sizeof(seq), messenger->timeout); pipe_lock.Lock(); if (rc < 0) { dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl; @@ -1708,12 +1708,12 @@ int SimpleMessenger::Pipe::read_message(Message **pm) __u32 header_crc; if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { - if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) + if (tcp_read( sd, (char*)&header, sizeof(header), messenger->timeout ) < 0) return -1; header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); } else { ceph_msg_header_old oldheader; - if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0) + if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader), messenger->timeout ) < 0) return -1; // this is fugly memcpy(&header, &oldheader, sizeof(header)); @@ -1765,7 +1765,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) front_len = header.front_len; if (front_len) { bufferptr bp = buffer::create(front_len); - if (tcp_read( sd, bp.c_str(), front_len ) < 0) + if (tcp_read( sd, bp.c_str(), front_len, messenger->timeout ) < 0) goto out_dethrottle; front.push_back(bp); dout(20) << "reader got front " << front.length() << dendl; @@ -1775,7 +1775,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) middle_len = header.middle_len; if (middle_len) { bufferptr bp = buffer::create(middle_len); - if (tcp_read( sd, bp.c_str(), middle_len ) < 0) + if (tcp_read( sd, bp.c_str(), middle_len, messenger->timeout ) < 0) goto out_dethrottle; middle.push_back(bp); dout(20) << "reader got middle " << middle.length() << dendl; @@ -1792,7 +1792,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK), (unsigned)left); bufferptr bp = buffer::create(head); - if (tcp_read( sd, bp.c_str(), head ) < 0) + if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0) goto out_dethrottle; data.push_back(bp); left -= head; @@ -1803,7 +1803,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) int middle = left & PAGE_MASK; if (middle > 0) { bufferptr bp = buffer::create_page_aligned(middle); - if (tcp_read( sd, bp.c_str(), middle ) < 0) + if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0) goto out_dethrottle; data.push_back(bp); left -= middle; @@ -1812,7 +1812,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) if (left) { bufferptr bp = buffer::create(left); - if (tcp_read( sd, bp.c_str(), left ) < 0) + if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0) goto out_dethrottle; data.push_back(bp); dout(20) << "reader got data tail " << left << dendl; @@ -1820,7 +1820,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) } // footer - if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) + if (tcp_read(sd, (char*)&footer, sizeof(footer), messenger->timeout) < 0) goto out_dethrottle; aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 0b9b46ee765..b4a0ef334cf 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -214,6 +214,7 @@ private: out_seq(0), in_seq(0), in_seq_acked(0), reader_thread(this), writer_thread(this) { connection_state->pipe = get(); + messenger->timeout = g_conf.ms_tcp_read_timeout * 1000; //convert to ms } ~Pipe() { for (map::item* >::iterator i = queue_items.begin(); @@ -564,6 +565,7 @@ private: void dispatch_entry(); SimpleMessenger *messenger; //hack to make dout macro work, will fix + int timeout; public: SimpleMessenger() : diff --git a/src/msg/tcp.cc b/src/msg/tcp.cc index b0dbf6073a9..bab1a7651ff 100644 --- a/src/msg/tcp.cc +++ b/src/msg/tcp.cc @@ -8,14 +8,14 @@ * tcp crap */ -int tcp_read(int sd, char *buf, int len) { +int tcp_read(int sd, char *buf, int len, int timeout) { if (sd < 0) return -1; struct pollfd pfd; pfd.fd = sd; pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR; while (len > 0) { - if (poll(&pfd, 1, -1) < 0) + if (poll(&pfd, 1, timeout) <= 0) return -1; if (!(pfd.revents & POLLIN)) diff --git a/src/msg/tcp.h b/src/msg/tcp.h index b1c636cba7a..97ef3a90a86 100644 --- a/src/msg/tcp.h +++ b/src/msg/tcp.h @@ -25,7 +25,7 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss) << buf << ':' << serv; } -extern int tcp_read(int sd, char *buf, int len); +extern int tcp_read(int sd, char *buf, int len, int timeout=0); extern int tcp_write(int sd, const char *buf, int len); inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) {