tcp.cc: de-globalize

Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
This commit is contained in:
Colin Patrick McCabe 2011-06-20 11:43:06 -07:00
parent 0fc6ef2ef3
commit ba5b7331e7
3 changed files with 47 additions and 45 deletions

View File

@ -590,7 +590,7 @@ int SimpleMessenger::Pipe::accept()
assert(state == STATE_ACCEPTING);
// announce myself.
int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER));
int rc = tcp_write(msgr->cct, sd, CEPH_BANNER, strlen(CEPH_BANNER));
if (rc < 0) {
ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
state = STATE_CLOSED;
@ -613,7 +613,7 @@ int SimpleMessenger::Pipe::accept()
}
::encode(socket_addr, addrs);
rc = tcp_write(sd, addrs.c_str(), addrs.length());
rc = tcp_write(msgr->cct, sd, addrs.c_str(), addrs.length());
if (rc < 0) {
ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
state = STATE_CLOSED;
@ -624,7 +624,7 @@ int SimpleMessenger::Pipe::accept()
// identify peer
char banner[strlen(CEPH_BANNER)+1];
rc = tcp_read(sd, banner, strlen(CEPH_BANNER), msgr->timeout);
rc = tcp_read(msgr->cct, sd, banner, strlen(CEPH_BANNER), msgr->timeout);
if (rc < 0) {
ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
state = STATE_CLOSED;
@ -641,7 +641,7 @@ int SimpleMessenger::Pipe::accept()
bufferptr tp(sizeof(peer_addr));
addrbl.push_back(tp);
}
rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), msgr->timeout);
rc = tcp_read(msgr->cct, sd, addrbl.c_str(), addrbl.length(), msgr->timeout);
if (rc < 0) {
ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
state = STATE_CLOSED;
@ -677,7 +677,7 @@ int SimpleMessenger::Pipe::accept()
bool replace = false;
uint64_t existing_seq = -1;
while (1) {
rc = tcp_read(sd, (char*)&connect, sizeof(connect), msgr->timeout);
rc = tcp_read(msgr->cct, sd, (char*)&connect, sizeof(connect), msgr->timeout);
if (rc < 0) {
ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
goto fail_unlocked;
@ -687,7 +687,7 @@ int SimpleMessenger::Pipe::accept()
authorizer.clear();
if (connect.authorizer_len) {
bp = buffer::create(connect.authorizer_len);
if (tcp_read(sd, bp.c_str(), connect.authorizer_len, msgr->timeout) < 0) {
if (tcp_read(msgr->cct, sd, bp.c_str(), connect.authorizer_len, msgr->timeout) < 0) {
ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl;
goto fail_unlocked;
}
@ -856,11 +856,11 @@ int SimpleMessenger::Pipe::accept()
reply:
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
rc = tcp_write(sd, (char*)&reply, sizeof(reply));
rc = tcp_write(msgr->cct, sd, (char*)&reply, sizeof(reply));
if (rc < 0)
goto fail_unlocked;
if (reply.authorizer_len) {
rc = tcp_write(sd, authorizer_reply.c_str(), authorizer_reply.length());
rc = tcp_write(msgr->cct, sd, authorizer_reply.c_str(), authorizer_reply.length());
if (rc < 0)
goto fail_unlocked;
}
@ -921,13 +921,13 @@ int SimpleMessenger::Pipe::accept()
register_pipe();
msgr->lock.Unlock();
rc = tcp_write(sd, (char*)&reply, sizeof(reply));
rc = tcp_write(msgr->cct, sd, (char*)&reply, sizeof(reply));
if (rc < 0) {
goto fail_unlocked;
}
if (reply.authorizer_len) {
rc = tcp_write(sd, authorizer_reply.c_str(), authorizer_reply.length());
rc = tcp_write(msgr->cct, sd, authorizer_reply.c_str(), authorizer_reply.length());
if (rc < 0) {
goto fail_unlocked;
}
@ -935,11 +935,11 @@ int SimpleMessenger::Pipe::accept()
if (reply_tag == CEPH_MSGR_TAG_SEQ) {
uint64_t newly_acked_seq = 0;
if(tcp_write(sd, (char*)&existing_seq, sizeof(existing_seq)) < 0) {
if(tcp_write(msgr->cct, sd, (char*)&existing_seq, sizeof(existing_seq)) < 0) {
ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
goto fail_unlocked;
}
if(tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
if(tcp_read(msgr->cct, sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
goto fail_unlocked;
}
@ -1030,7 +1030,7 @@ int SimpleMessenger::Pipe::connect()
// verify banner
// FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER), msgr->timeout);
rc = tcp_read(msgr->cct, sd, (char*)&banner, strlen(CEPH_BANNER), msgr->timeout);
if (rc < 0) {
ldout(msgr->cct,2) << "connect couldn't read banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
@ -1056,7 +1056,7 @@ int SimpleMessenger::Pipe::connect()
bufferptr p(sizeof(paddr) * 2);
addrbl.push_back(p);
}
rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), msgr->timeout);
rc = tcp_read(msgr->cct, sd, addrbl.c_str(), addrbl.length(), msgr->timeout);
if (rc < 0) {
ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
@ -1142,7 +1142,7 @@ int SimpleMessenger::Pipe::connect()
ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
ceph_msg_connect_reply reply;
if (tcp_read(sd, (char*)&reply, sizeof(reply), msgr->timeout) < 0) {
if (tcp_read(msgr->cct, sd, (char*)&reply, sizeof(reply), msgr->timeout) < 0) {
ldout(msgr->cct,2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
}
@ -1158,7 +1158,7 @@ int SimpleMessenger::Pipe::connect()
if (reply.authorizer_len) {
ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
bufferptr bp = buffer::create(reply.authorizer_len);
if (tcp_read(sd, bp.c_str(), reply.authorizer_len, msgr->timeout) < 0) {
if (tcp_read(msgr->cct, sd, bp.c_str(), reply.authorizer_len, msgr->timeout) < 0) {
ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << dendl;
goto fail;
}
@ -1243,12 +1243,12 @@ int SimpleMessenger::Pipe::connect()
if (reply.tag == CEPH_MSGR_TAG_SEQ) {
ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
uint64_t newly_acked_seq = 0;
if (tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
if (tcp_read(msgr->cct, sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << dendl;
goto fail_locked;
}
handle_ack(newly_acked_seq);
if (tcp_write(sd, (char*)&in_seq, sizeof(in_seq)) < 0) {
if (tcp_write(msgr->cct, sd, (char*)&in_seq, sizeof(in_seq)) < 0) {
ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
goto fail_locked;
}
@ -1538,7 +1538,7 @@ void SimpleMessenger::Pipe::reader()
char buf[80];
char tag = -1;
ldout(msgr->cct,20) << "reader reading tag..." << dendl;
int rc = tcp_read(sd, (char*)&tag, 1, msgr->timeout);
int rc = tcp_read(msgr->cct, sd, (char*)&tag, 1, msgr->timeout);
if (rc < 0) {
pipe_lock.Lock();
ldout(msgr->cct,2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@ -1556,7 +1556,7 @@ void SimpleMessenger::Pipe::reader()
if (tag == CEPH_MSGR_TAG_ACK) {
ldout(msgr->cct,20) << "reader got ACK" << dendl;
ceph_le64 seq;
int rc = tcp_read( sd, (char*)&seq, sizeof(seq), msgr->timeout);
int rc = tcp_read(msgr->cct, sd, (char*)&seq, sizeof(seq), msgr->timeout);
pipe_lock.Lock();
if (rc < 0) {
ldout(msgr->cct,2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@ -1810,12 +1810,12 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
__u32 header_crc;
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
if (tcp_read( sd, (char*)&header, sizeof(header), msgr->timeout ) < 0)
if (tcp_read(msgr->cct, sd, (char*)&header, sizeof(header), msgr->timeout ) < 0)
return -1;
header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
} else {
ceph_msg_header_old oldheader;
if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader), msgr->timeout ) < 0)
if (tcp_read(msgr->cct, sd, (char*)&oldheader, sizeof(oldheader), msgr->timeout ) < 0)
return -1;
// this is fugly
memcpy(&header, &oldheader, sizeof(header));
@ -1867,7 +1867,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
front_len = header.front_len;
if (front_len) {
bufferptr bp = buffer::create(front_len);
if (tcp_read( sd, bp.c_str(), front_len, msgr->timeout ) < 0)
if (tcp_read(msgr->cct, sd, bp.c_str(), front_len, msgr->timeout ) < 0)
goto out_dethrottle;
front.push_back(bp);
ldout(msgr->cct,20) << "reader got front " << front.length() << dendl;
@ -1877,7 +1877,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
middle_len = header.middle_len;
if (middle_len) {
bufferptr bp = buffer::create(middle_len);
if (tcp_read( sd, bp.c_str(), middle_len, msgr->timeout ) < 0)
if (tcp_read(msgr->cct, sd, bp.c_str(), middle_len, msgr->timeout ) < 0)
goto out_dethrottle;
middle.push_back(bp);
ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl;
@ -1927,7 +1927,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
bufferptr bp = blp.get_current_ptr();
int read = MIN(bp.length(), left);
ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
int got = tcp_read_nonblocking(sd, bp.c_str(), read);
int got = tcp_read_nonblocking(msgr->cct, sd, bp.c_str(), read);
ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
connection_state->lock.Unlock();
if (got < 0)
@ -1942,7 +1942,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
}
// footer
if (tcp_read(sd, (char*)&footer, sizeof(footer), msgr->timeout) < 0)
if (tcp_read(msgr->cct, sd, (char*)&footer, sizeof(footer), msgr->timeout) < 0)
goto out_dethrottle;
aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;

View File

@ -8,16 +8,16 @@
/******************
* tcp crap
*/
int tcp_read(int sd, char *buf, int len, int timeout)
int tcp_read(CephContext *cct, int sd, char *buf, int len, int timeout)
{
if (sd < 0)
return -1;
while (len > 0) {
if (g_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % g_conf->ms_inject_socket_failures == 0) {
generic_dout(0) << "injecting socket failure" << dendl;
if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
lgeneric_dout(cct, 0) << "injecting socket failure" << dendl;
::shutdown(sd, SHUT_RDWR);
}
}
@ -25,14 +25,14 @@ int tcp_read(int sd, char *buf, int len, int timeout)
if (tcp_read_wait(sd, timeout) < 0)
return -1;
int got = tcp_read_nonblocking(sd, buf, len);
int got = tcp_read_nonblocking(cct, sd, buf, len);
if (got < 0)
return -1;
len -= got;
buf += got;
//generic_dout(DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
//lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
}
return len;
}
@ -61,7 +61,7 @@ int tcp_read_wait(int sd, int timeout)
* data available. Otherwise we cannot properly interpret a
* read of 0 bytes.
*/
int tcp_read_nonblocking(int sd, char *buf, int len)
int tcp_read_nonblocking(CephContext *cct, int sd, char *buf, int len)
{
again:
int got = ::recv( sd, buf, len, MSG_DONTWAIT );
@ -70,7 +70,7 @@ again:
goto again;
} else {
char buf[100];
generic_dout(10) << "tcp_read_nonblocking socket " << sd << " returned "
lgeneric_dout(cct, 10) << "tcp_read_nonblocking socket " << sd << " returned "
<< got << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
return -1;
}
@ -84,7 +84,7 @@ again:
return got;
}
int tcp_write(int sd, const char *buf, int len)
int tcp_write(CephContext *cct, int sd, const char *buf, int len)
{
if (sd < 0)
return -1;
@ -92,9 +92,9 @@ int tcp_write(int sd, const char *buf, int len)
pfd.fd = sd;
pfd.events = POLLOUT | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
if (g_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % g_conf->ms_inject_socket_failures == 0) {
generic_dout(0) << "injecting socket failure" << dendl;
if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
lgeneric_dout(cct, 0) << "injecting socket failure" << dendl;
::shutdown(sd, SHUT_RDWR);
}
}
@ -105,18 +105,18 @@ int tcp_write(int sd, const char *buf, int len)
if (!(pfd.revents & POLLOUT))
return -1;
//generic_dout(DBL) << "tcp_write writing " << len << dendl;
//lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
assert(len > 0);
while (len > 0) {
int did = ::send( sd, buf, len, MSG_NOSIGNAL );
if (did < 0) {
//generic_dout(1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl;
//generic_derr(1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl;
//lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl;
//lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl;
return did;
}
len -= did;
buf += did;
//generic_dout(DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
//lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
}
return 0;
}

View File

@ -12,6 +12,8 @@
using std::ostream;
class CephContext;
inline ostream& operator<<(ostream& out, const sockaddr_storage &ss)
{
char buf[NI_MAXHOST] = { 0 };
@ -25,10 +27,10 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss)
<< buf << ':' << serv;
}
extern int tcp_read(int sd, char *buf, int len, int timeout=-1);
extern int tcp_read(CephContext *cct, int sd, char *buf, int len, int timeout=-1);
extern int tcp_read_wait(int sd, int timeout);
extern int tcp_read_nonblocking(int sd, char *buf, int len);
extern int tcp_write(int sd, const char *buf, int len);
extern int tcp_read_nonblocking(CephContext *cct, int sd, char *buf, int len);
extern int tcp_write(CephContext *cct, int sd, const char *buf, int len);
inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) {
return strncmp((const char*)&a, (const char*)&b, sizeof(a)) == 0;