From ba5b7331e7f7fdea03d72db045f50b42d5281abd Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Mon, 20 Jun 2011 11:43:06 -0700 Subject: [PATCH] tcp.cc: de-globalize Signed-off-by: Colin McCabe --- src/msg/SimpleMessenger.cc | 52 +++++++++++++++++++------------------- src/msg/tcp.cc | 32 +++++++++++------------ src/msg/tcp.h | 8 +++--- 3 files changed, 47 insertions(+), 45 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index ee4d4766df6..20c2e06a08c 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -590,7 +590,7 @@ int SimpleMessenger::Pipe::accept() assert(state == STATE_ACCEPTING); // announce myself. - int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER)); + int rc = tcp_write(msgr->cct, sd, CEPH_BANNER, strlen(CEPH_BANNER)); if (rc < 0) { ldout(msgr->cct,10) << "accept couldn't write banner" << dendl; state = STATE_CLOSED; @@ -613,7 +613,7 @@ int SimpleMessenger::Pipe::accept() } ::encode(socket_addr, addrs); - rc = tcp_write(sd, addrs.c_str(), addrs.length()); + rc = tcp_write(msgr->cct, sd, addrs.c_str(), addrs.length()); if (rc < 0) { ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl; state = STATE_CLOSED; @@ -624,7 +624,7 @@ int SimpleMessenger::Pipe::accept() // identify peer char banner[strlen(CEPH_BANNER)+1]; - rc = tcp_read(sd, banner, strlen(CEPH_BANNER), msgr->timeout); + rc = tcp_read(msgr->cct, sd, banner, strlen(CEPH_BANNER), msgr->timeout); if (rc < 0) { ldout(msgr->cct,10) << "accept couldn't read banner" << dendl; state = STATE_CLOSED; @@ -641,7 +641,7 @@ int SimpleMessenger::Pipe::accept() bufferptr tp(sizeof(peer_addr)); addrbl.push_back(tp); } - rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), msgr->timeout); + rc = tcp_read(msgr->cct, sd, addrbl.c_str(), addrbl.length(), msgr->timeout); if (rc < 0) { ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl; state = STATE_CLOSED; @@ -677,7 +677,7 @@ int SimpleMessenger::Pipe::accept() bool replace = false; uint64_t existing_seq = -1; while (1) { - rc = tcp_read(sd, (char*)&connect, sizeof(connect), msgr->timeout); + rc = tcp_read(msgr->cct, sd, (char*)&connect, sizeof(connect), msgr->timeout); if (rc < 0) { ldout(msgr->cct,10) << "accept couldn't read connect" << dendl; goto fail_unlocked; @@ -687,7 +687,7 @@ int SimpleMessenger::Pipe::accept() authorizer.clear(); if (connect.authorizer_len) { bp = buffer::create(connect.authorizer_len); - if (tcp_read(sd, bp.c_str(), connect.authorizer_len, msgr->timeout) < 0) { + if (tcp_read(msgr->cct, sd, bp.c_str(), connect.authorizer_len, msgr->timeout) < 0) { ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl; goto fail_unlocked; } @@ -856,11 +856,11 @@ int SimpleMessenger::Pipe::accept() reply: reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; reply.authorizer_len = authorizer_reply.length(); - rc = tcp_write(sd, (char*)&reply, sizeof(reply)); + rc = tcp_write(msgr->cct, sd, (char*)&reply, sizeof(reply)); if (rc < 0) goto fail_unlocked; if (reply.authorizer_len) { - rc = tcp_write(sd, authorizer_reply.c_str(), authorizer_reply.length()); + rc = tcp_write(msgr->cct, sd, authorizer_reply.c_str(), authorizer_reply.length()); if (rc < 0) goto fail_unlocked; } @@ -921,13 +921,13 @@ int SimpleMessenger::Pipe::accept() register_pipe(); msgr->lock.Unlock(); - rc = tcp_write(sd, (char*)&reply, sizeof(reply)); + rc = tcp_write(msgr->cct, sd, (char*)&reply, sizeof(reply)); if (rc < 0) { goto fail_unlocked; } if (reply.authorizer_len) { - rc = tcp_write(sd, authorizer_reply.c_str(), authorizer_reply.length()); + rc = tcp_write(msgr->cct, sd, authorizer_reply.c_str(), authorizer_reply.length()); if (rc < 0) { goto fail_unlocked; } @@ -935,11 +935,11 @@ int SimpleMessenger::Pipe::accept() if (reply_tag == CEPH_MSGR_TAG_SEQ) { uint64_t newly_acked_seq = 0; - if(tcp_write(sd, (char*)&existing_seq, sizeof(existing_seq)) < 0) { + if(tcp_write(msgr->cct, sd, (char*)&existing_seq, sizeof(existing_seq)) < 0) { ldout(msgr->cct,2) << "accept write error on in_seq" << dendl; goto fail_unlocked; } - if(tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { + if(tcp_read(msgr->cct, sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl; goto fail_unlocked; } @@ -1030,7 +1030,7 @@ int SimpleMessenger::Pipe::connect() // verify banner // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. - rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER), msgr->timeout); + rc = tcp_read(msgr->cct, sd, (char*)&banner, strlen(CEPH_BANNER), msgr->timeout); if (rc < 0) { ldout(msgr->cct,2) << "connect couldn't read banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; @@ -1056,7 +1056,7 @@ int SimpleMessenger::Pipe::connect() bufferptr p(sizeof(paddr) * 2); addrbl.push_back(p); } - rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), msgr->timeout); + rc = tcp_read(msgr->cct, sd, addrbl.c_str(), addrbl.length(), msgr->timeout); if (rc < 0) { ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; @@ -1142,7 +1142,7 @@ int SimpleMessenger::Pipe::connect() ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl; ceph_msg_connect_reply reply; - if (tcp_read(sd, (char*)&reply, sizeof(reply), msgr->timeout) < 0) { + if (tcp_read(msgr->cct, sd, (char*)&reply, sizeof(reply), msgr->timeout) < 0) { ldout(msgr->cct,2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } @@ -1158,7 +1158,7 @@ int SimpleMessenger::Pipe::connect() if (reply.authorizer_len) { ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl; bufferptr bp = buffer::create(reply.authorizer_len); - if (tcp_read(sd, bp.c_str(), reply.authorizer_len, msgr->timeout) < 0) { + if (tcp_read(msgr->cct, sd, bp.c_str(), reply.authorizer_len, msgr->timeout) < 0) { ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << dendl; goto fail; } @@ -1243,12 +1243,12 @@ int SimpleMessenger::Pipe::connect() if (reply.tag == CEPH_MSGR_TAG_SEQ) { ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; uint64_t newly_acked_seq = 0; - if (tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { + if (tcp_read(msgr->cct, sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << dendl; goto fail_locked; } handle_ack(newly_acked_seq); - if (tcp_write(sd, (char*)&in_seq, sizeof(in_seq)) < 0) { + if (tcp_write(msgr->cct, sd, (char*)&in_seq, sizeof(in_seq)) < 0) { ldout(msgr->cct,2) << "connect write error on in_seq" << dendl; goto fail_locked; } @@ -1538,7 +1538,7 @@ void SimpleMessenger::Pipe::reader() char buf[80]; char tag = -1; ldout(msgr->cct,20) << "reader reading tag..." << dendl; - int rc = tcp_read(sd, (char*)&tag, 1, msgr->timeout); + int rc = tcp_read(msgr->cct, sd, (char*)&tag, 1, msgr->timeout); if (rc < 0) { pipe_lock.Lock(); ldout(msgr->cct,2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl; @@ -1556,7 +1556,7 @@ void SimpleMessenger::Pipe::reader() if (tag == CEPH_MSGR_TAG_ACK) { ldout(msgr->cct,20) << "reader got ACK" << dendl; ceph_le64 seq; - int rc = tcp_read( sd, (char*)&seq, sizeof(seq), msgr->timeout); + int rc = tcp_read(msgr->cct, sd, (char*)&seq, sizeof(seq), msgr->timeout); pipe_lock.Lock(); if (rc < 0) { ldout(msgr->cct,2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl; @@ -1810,12 +1810,12 @@ int SimpleMessenger::Pipe::read_message(Message **pm) __u32 header_crc; if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { - if (tcp_read( sd, (char*)&header, sizeof(header), msgr->timeout ) < 0) + if (tcp_read(msgr->cct, sd, (char*)&header, sizeof(header), msgr->timeout ) < 0) return -1; header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); } else { ceph_msg_header_old oldheader; - if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader), msgr->timeout ) < 0) + if (tcp_read(msgr->cct, sd, (char*)&oldheader, sizeof(oldheader), msgr->timeout ) < 0) return -1; // this is fugly memcpy(&header, &oldheader, sizeof(header)); @@ -1867,7 +1867,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) front_len = header.front_len; if (front_len) { bufferptr bp = buffer::create(front_len); - if (tcp_read( sd, bp.c_str(), front_len, msgr->timeout ) < 0) + if (tcp_read(msgr->cct, sd, bp.c_str(), front_len, msgr->timeout ) < 0) goto out_dethrottle; front.push_back(bp); ldout(msgr->cct,20) << "reader got front " << front.length() << dendl; @@ -1877,7 +1877,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) middle_len = header.middle_len; if (middle_len) { bufferptr bp = buffer::create(middle_len); - if (tcp_read( sd, bp.c_str(), middle_len, msgr->timeout ) < 0) + if (tcp_read(msgr->cct, sd, bp.c_str(), middle_len, msgr->timeout ) < 0) goto out_dethrottle; middle.push_back(bp); ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl; @@ -1927,7 +1927,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) bufferptr bp = blp.get_current_ptr(); int read = MIN(bp.length(), left); ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl; - int got = tcp_read_nonblocking(sd, bp.c_str(), read); + int got = tcp_read_nonblocking(msgr->cct, sd, bp.c_str(), read); ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl; connection_state->lock.Unlock(); if (got < 0) @@ -1942,7 +1942,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) } // footer - if (tcp_read(sd, (char*)&footer, sizeof(footer), msgr->timeout) < 0) + if (tcp_read(msgr->cct, sd, (char*)&footer, sizeof(footer), msgr->timeout) < 0) goto out_dethrottle; aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; diff --git a/src/msg/tcp.cc b/src/msg/tcp.cc index 275ab18dbf1..8938320cd6f 100644 --- a/src/msg/tcp.cc +++ b/src/msg/tcp.cc @@ -8,16 +8,16 @@ /****************** * tcp crap */ -int tcp_read(int sd, char *buf, int len, int timeout) +int tcp_read(CephContext *cct, int sd, char *buf, int len, int timeout) { if (sd < 0) return -1; while (len > 0) { - if (g_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % g_conf->ms_inject_socket_failures == 0) { - generic_dout(0) << "injecting socket failure" << dendl; + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + lgeneric_dout(cct, 0) << "injecting socket failure" << dendl; ::shutdown(sd, SHUT_RDWR); } } @@ -25,14 +25,14 @@ int tcp_read(int sd, char *buf, int len, int timeout) if (tcp_read_wait(sd, timeout) < 0) return -1; - int got = tcp_read_nonblocking(sd, buf, len); + int got = tcp_read_nonblocking(cct, sd, buf, len); if (got < 0) return -1; len -= got; buf += got; - //generic_dout(DBL) << "tcp_read got " << got << ", " << len << " left" << dendl; + //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl; } return len; } @@ -61,7 +61,7 @@ int tcp_read_wait(int sd, int timeout) * data available. Otherwise we cannot properly interpret a * read of 0 bytes. */ -int tcp_read_nonblocking(int sd, char *buf, int len) +int tcp_read_nonblocking(CephContext *cct, int sd, char *buf, int len) { again: int got = ::recv( sd, buf, len, MSG_DONTWAIT ); @@ -70,7 +70,7 @@ again: goto again; } else { char buf[100]; - generic_dout(10) << "tcp_read_nonblocking socket " << sd << " returned " + lgeneric_dout(cct, 10) << "tcp_read_nonblocking socket " << sd << " returned " << got << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; return -1; } @@ -84,7 +84,7 @@ again: return got; } -int tcp_write(int sd, const char *buf, int len) +int tcp_write(CephContext *cct, int sd, const char *buf, int len) { if (sd < 0) return -1; @@ -92,9 +92,9 @@ int tcp_write(int sd, const char *buf, int len) pfd.fd = sd; pfd.events = POLLOUT | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR; - if (g_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % g_conf->ms_inject_socket_failures == 0) { - generic_dout(0) << "injecting socket failure" << dendl; + if (cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + lgeneric_dout(cct, 0) << "injecting socket failure" << dendl; ::shutdown(sd, SHUT_RDWR); } } @@ -105,18 +105,18 @@ int tcp_write(int sd, const char *buf, int len) if (!(pfd.revents & POLLOUT)) return -1; - //generic_dout(DBL) << "tcp_write writing " << len << dendl; + //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl; assert(len > 0); while (len > 0) { int did = ::send( sd, buf, len, MSG_NOSIGNAL ); if (did < 0) { - //generic_dout(1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl; - //generic_derr(1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl; + //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl; + //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl; return did; } len -= did; buf += did; - //generic_dout(DBL) << "tcp_write did " << did << ", " << len << " left" << dendl; + //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl; } return 0; } diff --git a/src/msg/tcp.h b/src/msg/tcp.h index bccdbda213d..8673ce28e40 100644 --- a/src/msg/tcp.h +++ b/src/msg/tcp.h @@ -12,6 +12,8 @@ using std::ostream; +class CephContext; + inline ostream& operator<<(ostream& out, const sockaddr_storage &ss) { char buf[NI_MAXHOST] = { 0 }; @@ -25,10 +27,10 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss) << buf << ':' << serv; } -extern int tcp_read(int sd, char *buf, int len, int timeout=-1); +extern int tcp_read(CephContext *cct, int sd, char *buf, int len, int timeout=-1); extern int tcp_read_wait(int sd, int timeout); -extern int tcp_read_nonblocking(int sd, char *buf, int len); -extern int tcp_write(int sd, const char *buf, int len); +extern int tcp_read_nonblocking(CephContext *cct, int sd, char *buf, int len); +extern int tcp_write(CephContext *cct, int sd, const char *buf, int len); inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) { return strncmp((const char*)&a, (const char*)&b, sizeof(a)) == 0;