Merge pull request #4511 from yuyuyu101/wip-async-fix-7

Wip async fix 7

Reviewed-by: Sage Weil <sage@redhat.com>
Reviewed-by: Greg Farnum <gfarnum@redhat.com>
This commit is contained in:
Gregory Farnum 2015-05-01 13:55:12 -07:00
commit 62d5b39abc
3 changed files with 28 additions and 13 deletions

View File

@ -70,6 +70,17 @@ namespace ceph {
ceph_spin_unlock(&lock);
return ret;
}
bool compare_and_swap(T o, T n) {
bool success = false;
ceph_spin_lock(&lock);
if (val == o) {
success = true;
val = n;
}
ceph_spin_unlock(&lock);
return success;
}
private:
// forbid copying
atomic_spinlock_t(const atomic_spinlock_t<T> &other);
@ -113,6 +124,10 @@ namespace ceph {
// at some point. this hack can go away someday...
return AO_load_full((AO_t *)&val);
}
bool compare_and_swap(AO_t o, AO_t n) {
return AO_compare_and_swap(&val, o, n);
}
private:
// forbid copying
atomic_t(const atomic_t &other);

View File

@ -812,7 +812,7 @@ void AsyncConnection::process()
ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl;
} else {
if (session_security->check_message_signature(message)) {
ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl;
ldout(async_msgr->cct, 0) << __func__ << " Signature check failed" << dendl;
message->put();
goto fail;
}
@ -1113,9 +1113,9 @@ int AsyncConnection::_process_connection()
connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0;
connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0;
if (authorizer)
ldout(async_msgr->cct, 10) << __func__ << "connect_msg.authorizer_len="
<< connect_msg.authorizer_len << " protocol="
<< connect_msg.authorizer_protocol << dendl;
ldout(async_msgr->cct, 10) << __func__ << " connect_msg.authorizer_len="
<< connect_msg.authorizer_len << " protocol="
<< connect_msg.authorizer_protocol << dendl;
connect_msg.flags = 0;
if (policy.lossy)
connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
@ -1474,7 +1474,7 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
state = STATE_CONNECTING_SEND_CONNECT_MSG;
}
if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
ldout(async_msgr->cct, 0) << __func__ << "connect got RESETSESSION" << dendl;
ldout(async_msgr->cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
was_session_reset();
state = STATE_CONNECTING_SEND_CONNECT_MSG;
}
@ -1676,7 +1676,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
goto replace;
} else {
// our existing outgoing wins
ldout(async_msgr->cct,10) << __func__ << "accept connection race, existing "
ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing "
<< existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
assert(peer_addr > async_msgr->get_myaddr());
@ -2043,11 +2043,12 @@ void AsyncConnection::fault()
recv_start = recv_end = 0;
state_offset = 0;
replacing = false;
is_reset_from_peer = false;
outcoming_bl.clear();
if (!once_ready && !is_queued() &&
state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
<< "accept state just closed, state="
<< " accept state just closed, state="
<< get_state_name(state) << dendl;
center->dispatch_event_external(reset_handler);
_stop();
@ -2255,14 +2256,14 @@ int AsyncConnection::write_message(const ceph_msg_header& header, const ceph_msg
void AsyncConnection::handle_ack(uint64_t seq)
{
lsubdout(async_msgr->cct, ms, 15) << __func__ << " got ack seq " << seq << dendl;
ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
// trim sent list
while (!sent.empty() && sent.front()->get_seq() <= seq) {
Message *m = sent.front();
sent.pop_front();
lsubdout(async_msgr->cct, ms, 10) << __func__ << "reader got ack seq "
<< seq << " >= " << m->get_seq() << " on "
<< m << " " << *m << dendl;
ldout(async_msgr->cct, 10) << __func__ << " got ack seq "
<< seq << " >= " << m->get_seq() << " on "
<< m << " " << *m << dendl;
m->put();
}
}

View File

@ -246,7 +246,7 @@ void EventCenter::delete_time_event(uint64_t id)
void EventCenter::wakeup()
{
if (!already_wakeup.read()) {
if (already_wakeup.compare_and_swap(0, 1)) {
ldout(cct, 1) << __func__ << dendl;
char buf[1];
buf[0] = 'c';
@ -254,7 +254,6 @@ void EventCenter::wakeup()
int n = write(notify_send_fd, buf, 1);
// FIXME ?
assert(n == 1);
already_wakeup.set(1);
}
}