mirror of
https://github.com/ceph/ceph
synced 2025-02-21 01:47:25 +00:00
msg/Pipe: goto fail_unlocked on early failures in accept()
Instead of duplicating an incomplete cleanup sequence (that does not clear_pipe()), goto fail_unlocked and do the cleanup in a generic way. s/rc/r/ while we are here. Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
parent
afafb87e84
commit
ec612a5bda
161
src/msg/Pipe.cc
161
src/msg/Pipe.cc
@ -216,86 +216,13 @@ int Pipe::accept()
|
||||
// my creater gave me sd via accept()
|
||||
assert(state == STATE_ACCEPTING);
|
||||
|
||||
// announce myself.
|
||||
int rc = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
|
||||
if (rc < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
|
||||
state = STATE_CLOSED;
|
||||
state_closed.set(1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// and my addr
|
||||
// vars
|
||||
bufferlist addrs;
|
||||
::encode(msgr->my_inst.addr, addrs);
|
||||
|
||||
port = msgr->my_inst.addr.get_port();
|
||||
|
||||
// and peer's socket addr (they might not know their ip)
|
||||
entity_addr_t socket_addr;
|
||||
socklen_t len = sizeof(socket_addr.ss_addr());
|
||||
int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
|
||||
if (r < 0) {
|
||||
char buf[80];
|
||||
ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
|
||||
state = STATE_CLOSED;
|
||||
state_closed.set(1);
|
||||
return -1;
|
||||
}
|
||||
::encode(socket_addr, addrs);
|
||||
|
||||
rc = tcp_write(addrs.c_str(), addrs.length());
|
||||
if (rc < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
|
||||
state = STATE_CLOSED;
|
||||
state_closed.set(1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
|
||||
|
||||
// identify peer
|
||||
socklen_t len;
|
||||
int r;
|
||||
char banner[strlen(CEPH_BANNER)+1];
|
||||
if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
|
||||
state = STATE_CLOSED;
|
||||
state_closed.set(1);
|
||||
return -1;
|
||||
}
|
||||
if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
|
||||
banner[strlen(CEPH_BANNER)] = 0;
|
||||
ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
|
||||
state = STATE_CLOSED;
|
||||
state_closed.set(1);
|
||||
return -1;
|
||||
}
|
||||
bufferlist addrbl;
|
||||
{
|
||||
bufferptr tp(sizeof(peer_addr));
|
||||
addrbl.push_back(tp);
|
||||
}
|
||||
if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
|
||||
state = STATE_CLOSED;
|
||||
state_closed.set(1);
|
||||
return -1;
|
||||
}
|
||||
{
|
||||
bufferlist::iterator ti = addrbl.begin();
|
||||
::decode(peer_addr, ti);
|
||||
}
|
||||
|
||||
ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
|
||||
if (peer_addr.is_blank_ip()) {
|
||||
// peer apparently doesn't know what ip they have; figure it out for them.
|
||||
int port = peer_addr.get_port();
|
||||
peer_addr.addr = socket_addr.addr;
|
||||
peer_addr.set_port(port);
|
||||
ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
|
||||
<< " (socket is " << socket_addr << ")" << dendl;
|
||||
}
|
||||
set_peer_addr(peer_addr); // so that connection_state gets set up
|
||||
|
||||
ceph_msg_connect connect;
|
||||
ceph_msg_connect_reply reply;
|
||||
Pipe *existing = 0;
|
||||
@ -314,6 +241,70 @@ int Pipe::accept()
|
||||
// used for reading in the remote acked seq on connect
|
||||
uint64_t newly_acked_seq = 0;
|
||||
|
||||
// announce myself.
|
||||
r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
|
||||
if (r < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
|
||||
goto fail_unlocked;
|
||||
}
|
||||
|
||||
// and my addr
|
||||
::encode(msgr->my_inst.addr, addrs);
|
||||
|
||||
port = msgr->my_inst.addr.get_port();
|
||||
|
||||
// and peer's socket addr (they might not know their ip)
|
||||
len = sizeof(socket_addr.ss_addr());
|
||||
r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
|
||||
if (r < 0) {
|
||||
char buf[80];
|
||||
ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
|
||||
goto fail_unlocked;
|
||||
}
|
||||
::encode(socket_addr, addrs);
|
||||
|
||||
r = tcp_write(addrs.c_str(), addrs.length());
|
||||
if (r < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
|
||||
goto fail_unlocked;
|
||||
}
|
||||
|
||||
ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
|
||||
|
||||
// identify peer
|
||||
if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
|
||||
goto fail_unlocked;
|
||||
}
|
||||
if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
|
||||
banner[strlen(CEPH_BANNER)] = 0;
|
||||
ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
|
||||
goto fail_unlocked;
|
||||
}
|
||||
{
|
||||
bufferptr tp(sizeof(peer_addr));
|
||||
addrbl.push_back(tp);
|
||||
}
|
||||
if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
|
||||
goto fail_unlocked;
|
||||
}
|
||||
{
|
||||
bufferlist::iterator ti = addrbl.begin();
|
||||
::decode(peer_addr, ti);
|
||||
}
|
||||
|
||||
ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
|
||||
if (peer_addr.is_blank_ip()) {
|
||||
// peer apparently doesn't know what ip they have; figure it out for them.
|
||||
int port = peer_addr.get_port();
|
||||
peer_addr.addr = socket_addr.addr;
|
||||
peer_addr.set_port(port);
|
||||
ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
|
||||
<< " (socket is " << socket_addr << ")" << dendl;
|
||||
}
|
||||
set_peer_addr(peer_addr); // so that connection_state gets set up
|
||||
|
||||
while (1) {
|
||||
if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
|
||||
ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
|
||||
@ -544,12 +535,12 @@ int Pipe::accept()
|
||||
reply:
|
||||
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
|
||||
reply.authorizer_len = authorizer_reply.length();
|
||||
rc = tcp_write((char*)&reply, sizeof(reply));
|
||||
if (rc < 0)
|
||||
r = tcp_write((char*)&reply, sizeof(reply));
|
||||
if (r < 0)
|
||||
goto fail_unlocked;
|
||||
if (reply.authorizer_len) {
|
||||
rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
|
||||
if (rc < 0)
|
||||
r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
|
||||
if (r < 0)
|
||||
goto fail_unlocked;
|
||||
}
|
||||
}
|
||||
@ -631,20 +622,20 @@ int Pipe::accept()
|
||||
register_pipe();
|
||||
msgr->lock.Unlock();
|
||||
|
||||
rc = tcp_write((char*)&reply, sizeof(reply));
|
||||
if (rc < 0) {
|
||||
r = tcp_write((char*)&reply, sizeof(reply));
|
||||
if (r < 0) {
|
||||
goto fail_registered;
|
||||
}
|
||||
|
||||
if (reply.authorizer_len) {
|
||||
rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
|
||||
if (rc < 0) {
|
||||
r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
|
||||
if (r < 0) {
|
||||
goto fail_registered;
|
||||
}
|
||||
}
|
||||
|
||||
if (reply_tag == CEPH_MSGR_TAG_SEQ) {
|
||||
if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
|
||||
if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
|
||||
ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
|
||||
goto fail_registered;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user