mirror of
https://github.com/ceph/ceph
synced 2025-04-11 04:02:04 +00:00
msgr: do not reopen failed lossy Connections
There was a race where: - sending stuff to a lossy Connection - it fails, and queues itself for reap, queues a RESET event - reaper clears the Pipe - some thread queues new messages and the Pipe is reopened, messages sent - RESET event delivered to dispatch, connection is closed and reopened. The result was that messages got sent to the OSD out of order during the window between the fault() and ms_handle_reset() getting called. This will prevent that. Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
parent
9a4e702795
commit
2e67b7a383
@ -199,6 +199,18 @@ public:
|
|||||||
return pipe->get();
|
return pipe->get();
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
bool try_get_pipe(RefCountedObject **p) {
|
||||||
|
Mutex::Locker l(lock);
|
||||||
|
if (failed) {
|
||||||
|
*p = NULL;
|
||||||
|
} else {
|
||||||
|
if (pipe)
|
||||||
|
*p = pipe->get();
|
||||||
|
else
|
||||||
|
*p = NULL;
|
||||||
|
}
|
||||||
|
return !failed;
|
||||||
|
}
|
||||||
void clear_pipe(RefCountedObject *old_p) {
|
void clear_pipe(RefCountedObject *old_p) {
|
||||||
if (old_p == pipe) {
|
if (old_p == pipe) {
|
||||||
Mutex::Locker l(lock);
|
Mutex::Locker l(lock);
|
||||||
|
@ -382,8 +382,12 @@ void SimpleMessenger::submit_message(Message *m, Connection *con, const entity_a
|
|||||||
{
|
{
|
||||||
Pipe *pipe = NULL;
|
Pipe *pipe = NULL;
|
||||||
if (con) {
|
if (con) {
|
||||||
pipe = con ? (Pipe *)con->pipe : NULL;
|
bool ok = con->try_get_pipe((RefCountedObject**)&pipe);
|
||||||
// we don't want to deal with ref-counting here, so we don't use get_pipe()
|
if (!ok) {
|
||||||
|
ldout(cct,0) << "submit_message " << *m << " on failed lossy con, dropping message " << m << dendl;
|
||||||
|
m->put();
|
||||||
|
return;
|
||||||
|
}
|
||||||
con->get();
|
con->get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -395,23 +399,10 @@ void SimpleMessenger::submit_message(Message *m, Connection *con, const entity_a
|
|||||||
} else {
|
} else {
|
||||||
// remote pipe.
|
// remote pipe.
|
||||||
if (pipe) {
|
if (pipe) {
|
||||||
pipe->pipe_lock.Lock();
|
ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
|
||||||
if (pipe->state == Pipe::STATE_CLOSED) {
|
pipe->send(m);
|
||||||
ldout(cct,0) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring closed pipe, dropping message " << m << dendl;
|
pipe->put();
|
||||||
pipe->unregister_pipe();
|
} else {
|
||||||
pipe->pipe_lock.Unlock();
|
|
||||||
pipe = 0;
|
|
||||||
assert(con);
|
|
||||||
con->put();
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
|
|
||||||
|
|
||||||
pipe->_send(m);
|
|
||||||
pipe->pipe_lock.Unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!pipe) {
|
|
||||||
const Policy& policy = get_policy(dest_type);
|
const Policy& policy = get_policy(dest_type);
|
||||||
if (policy.server) {
|
if (policy.server) {
|
||||||
ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type "
|
ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type "
|
||||||
|
Loading…
Reference in New Issue
Block a user