msgr: release bytes reserved from throttlers in failure paths

If we don't release those bytes, the throttler count eventually fills up
with bytes we were going to read but didn't (due to socket errors, etc)
until we can't read anything.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2010-06-30 12:08:18 -07:00
parent 3c80b9dafe
commit 7cedafd3e3
2 changed files with 63 additions and 32 deletions

View File

@ -1500,12 +1500,12 @@ void SimpleMessenger::Pipe::reader()
else if (tag == CEPH_MSGR_TAG_MSG) {
dout(20) << "reader got MSG" << dendl;
Message *m = read_message();
Message *m = 0;
int r = read_message(&m);
pipe_lock.Lock();
if (!m) {
derr(2) << "reader read null message, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault(false, true);
continue;
}
@ -1701,8 +1701,9 @@ void SimpleMessenger::Pipe::unlock_maybe_reap()
}
Message *SimpleMessenger::Pipe::read_message()
int SimpleMessenger::Pipe::read_message(Message **pm)
{
int ret = -1;
// envelope
//dout(10) << "receiver.read_message from sd " << sd << dendl;
@ -1712,12 +1713,12 @@ Message *SimpleMessenger::Pipe::read_message()
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
return 0;
return -1;
header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
} else {
ceph_msg_header_old oldheader;
if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
return 0;
return -1;
// this is fugly
memcpy(&header, &oldheader, sizeof(header));
header.src = oldheader.src.name;
@ -1736,51 +1737,58 @@ Message *SimpleMessenger::Pipe::read_message()
// verify header crc
if (header_crc != header.crc) {
dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
return 0;
return -1;
}
dout(10) << "getting message bytes now, currently using "
<< messenger->message_throttler.get_current() << "/"
<< messenger->message_throttler.get_max() << dendl;
uint64_t message_size = header.front_len + header.middle_len
+ header.data_len;
bufferlist front, middle, data;
int front_len, middle_len;
unsigned data_len, data_off;
int aborted;
Message *message;
uint64_t message_size = header.front_len + header.middle_len + header.data_len;
if (message_size) {
if (policy.throttler)
if (policy.throttler) {
dout(10) << "reader wants " << message_size << " from policy throttler "
<< policy.throttler->get_current() << "/"
<< policy.throttler->get_max() << dendl;
policy.throttler->get(message_size);
}
// throttle total bytes waiting for dispatch. do this _after_ the
// policy throttle, as this one does not deadlock (unless dispatch
// blocks indefinitely, which it shouldn't). in contrast, the
// policy throttle carries for the lifetime of the message.
dout(10) << "reader wants " << message_size << " from dispatch throttler "
<< messenger->message_throttler.get_current() << "/"
<< messenger->message_throttler.get_max() << dendl;
messenger->message_throttler.get(message_size);
}
// read front
bufferlist front;
int front_len = header.front_len;
front_len = header.front_len;
if (front_len) {
bufferptr bp = buffer::create(front_len);
if (tcp_read( sd, bp.c_str(), front_len ) < 0)
return 0;
goto out_dethrottle;
front.push_back(bp);
dout(20) << "reader got front " << front.length() << dendl;
}
// read middle
bufferlist middle;
int middle_len = header.middle_len;
middle_len = header.middle_len;
if (middle_len) {
bufferptr bp = buffer::create(middle_len);
if (tcp_read( sd, bp.c_str(), middle_len ) < 0)
return 0;
goto out_dethrottle;
middle.push_back(bp);
dout(20) << "reader got middle " << middle.length() << dendl;
}
// read data
bufferlist data;
unsigned data_len = le32_to_cpu(header.data_len);
unsigned data_off = le32_to_cpu(header.data_off);
data_len = le32_to_cpu(header.data_len);
data_off = le32_to_cpu(header.data_off);
if (data_len) {
int left = data_len;
if (data_off & ~PAGE_MASK) {
@ -1789,7 +1797,7 @@ Message *SimpleMessenger::Pipe::read_message()
(unsigned)left);
bufferptr bp = buffer::create(head);
if (tcp_read( sd, bp.c_str(), head ) < 0)
return 0;
goto out_dethrottle;
data.push_back(bp);
left -= head;
dout(20) << "reader got data head " << head << dendl;
@ -1800,7 +1808,7 @@ Message *SimpleMessenger::Pipe::read_message()
if (middle > 0) {
bufferptr bp = buffer::create_page_aligned(middle);
if (tcp_read( sd, bp.c_str(), middle ) < 0)
return 0;
goto out_dethrottle;
data.push_back(bp);
left -= middle;
dout(20) << "reader got data page-aligned middle " << middle << dendl;
@ -1809,7 +1817,7 @@ Message *SimpleMessenger::Pipe::read_message()
if (left) {
bufferptr bp = buffer::create(left);
if (tcp_read( sd, bp.c_str(), left ) < 0)
return 0;
goto out_dethrottle;
data.push_back(bp);
dout(20) << "reader got data tail " << left << dendl;
}
@ -1817,25 +1825,48 @@ Message *SimpleMessenger::Pipe::read_message()
// footer
if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
return 0;
goto out_dethrottle;
int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
dout(10) << "aborted = " << aborted << dendl;
if (aborted) {
dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message.. ABORTED" << dendl;
// MEH FIXME
Message *m = new MGenericMessage(CEPH_MSG_PING);
*pm = new MGenericMessage(CEPH_MSG_PING);
header.type = CEPH_MSG_PING;
m->set_header(header);
return m;
(*pm)->set_header(header);
ret = 0;
goto out_dethrottle;
}
dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message" << dendl;
Message *message = decode_message(header, footer, front, middle, data);
message = decode_message(header, footer, front, middle, data);
message->set_throttler(policy.throttler);
return message;
*pm = message;
return 0;
out_dethrottle:
// release bytes reserved from the throttlers on failure
if (message_size) {
if (policy.throttler) {
dout(10) << "reader releasing " << message_size << " from policy throttler "
<< policy.throttler->get_current() << "/"
<< policy.throttler->get_max() << dendl;
policy.throttler->put(message_size);
}
// throttle total bytes waiting for dispatch. do this _after_ the
// policy throttle, as this one does not deadlock (unless dispatch
// blocks indefinitely, which it shouldn't). in contrast, the
// policy throttle carries for the lifetime of the message.
dout(10) << "reader releasing " << message_size << " from dispatch throttler "
<< messenger->message_throttler.get_current() << "/"
<< messenger->message_throttler.get_max() << dendl;
messenger->message_throttler.put(message_size);
}
return ret;
}

View File

@ -149,7 +149,7 @@ private:
void writer();
void unlock_maybe_reap();
Message *read_message();
int read_message(Message **pm);
int write_message(Message *m);
int do_sendmsg(int sd, struct msghdr *msg, int len, bool more=false);
int write_ack(uint64_t s);