Merge pull request #15324 from yuyuyu101/wip-20093

msg/async: avoid requeue racing with handle_write

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2017-05-29 14:49:57 -05:00 committed by GitHub
commit 676fc82ced

View File

@ -1679,7 +1679,6 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
existing->delay_state->flush();
assert(!delay_state);
}
existing->requeue_sent();
existing->reset_recv_state();
auto temp_cs = std::move(cs);
@ -1706,8 +1705,11 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
// we need to delete time event in original thread
{
std::lock_guard<std::mutex> l(existing->lock);
existing->write_lock.lock();
existing->requeue_sent();
existing->outcoming_bl.clear();
existing->open_write = false;
existing->write_lock.unlock();
if (existing->state == STATE_NONE) {
existing->shutdown_socket();
existing->cs = std::move(cs);
@ -2431,6 +2433,7 @@ void AsyncConnection::handle_write()
}
auto start = ceph::mono_clock::now();
bool more;
do {
bufferlist data;
Message *m = _get_next_outgoing(&data);
@ -2442,13 +2445,14 @@ void AsyncConnection::handle_write()
sent.push_back(m);
m->get();
}
more = _has_next_outgoing();
write_lock.unlock();
// send_message or requeue messages may not encode message
if (!data.length())
prepare_send_message(get_features(), m, data);
r = write_message(m, data, _has_next_outgoing());
r = write_message(m, data, more);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
goto fail;