msg/async: avoid unnecessary costly wakeups for outbound messages

If a wakeup for an outbound message has already been scheduled or is
currently executing within the worker thread, avoid re-adding a wakeup.
For small IO sizes under high queue depths, these extra syscalls start
to add up. For larger IO sizes or small queue depths, it doesn't hurt
performance.

fio --ioengine=rbd results:

IOPS	pre-change	post-change
4K:	84.9k		98.3k
32K:	58.4k		59.5k
256K:	12.1k		12.2k
4M:	803		802

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2019-06-04 13:48:57 -04:00
parent f6b022bdbe
commit 294c41f18a
4 changed files with 16 additions and 2 deletions

View File

@ -240,7 +240,8 @@ void ProtocolV1::send_message(Message *m) {
out_q[m->get_priority()].emplace_back(std::move(bl), m);
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
if (can_write != WriteStatus::REPLACING) {
if (can_write != WriteStatus::REPLACING && !write_in_progress) {
write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
@ -348,6 +349,7 @@ void ProtocolV1::write_event() {
} else if (r > 0)
break;
} while (can_write == WriteStatus::CANWRITE);
write_in_progress = false;
connection->write_lock.unlock();
// if r > 0 mean data still lefted, so no need _try_send.
@ -378,6 +380,7 @@ void ProtocolV1::write_event() {
return;
}
} else {
write_in_progress = false;
connection->write_lock.unlock();
connection->lock.lock();
connection->write_lock.lock();
@ -1174,6 +1177,7 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
}
void ProtocolV1::requeue_sent() {
write_in_progress = false;
if (sent.empty()) {
return;
}
@ -1233,6 +1237,7 @@ void ProtocolV1::discard_out_queue() {
}
}
out_q.clear();
write_in_progress = false;
}
void ProtocolV1::reset_recv_state()
@ -2305,6 +2310,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
<< __func__ << " stop myself to swap existing" << dendl;
exproto->can_write = WriteStatus::REPLACING;
exproto->replacing = true;
exproto->write_in_progress = false;
existing->state_offset = 0;
// avoid previous thread modify event
exproto->state = NONE;

View File

@ -108,6 +108,7 @@ protected:
// priority queue for outbound msgs
std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q;
bool keepalive;
bool write_in_progress = false;
__u32 connect_seq, peer_global_seq;
std::atomic<uint64_t> in_seq{0};

View File

@ -132,6 +132,7 @@ void ProtocolV2::discard_out_queue() {
}
}
out_queue.clear();
write_in_progress = false;
}
void ProtocolV2::reset_session() {
@ -181,6 +182,7 @@ void ProtocolV2::stop() {
void ProtocolV2::fault() { _fault(); }
void ProtocolV2::requeue_sent() {
write_in_progress = false;
if (sent.empty()) {
return;
}
@ -424,7 +426,8 @@ void ProtocolV2::send_message(Message *m) {
out_queue_entry_t{is_prepared, m});
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
if ((!replacing && can_write) || state == STANDBY) {
if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
@ -629,6 +632,7 @@ void ProtocolV2::write_event() {
} else if (r > 0)
break;
} while (can_write);
write_in_progress = false;
// if r > 0 mean data still lefted, so no need _try_send.
if (r == 0) {
@ -657,6 +661,7 @@ void ProtocolV2::write_event() {
return;
}
} else {
write_in_progress = false;
connection->write_lock.unlock();
connection->lock.lock();
connection->write_lock.lock();
@ -2673,6 +2678,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
connection->dispatch_queue->queue_reset(connection);
exproto->can_write = false;
exproto->write_in_progress = false;
exproto->reconnecting = reconnecting;
exproto->replacing = true;
std::swap(exproto->session_stream_handlers, session_stream_handlers);

View File

@ -113,6 +113,7 @@ private:
} pre_auth;
bool keepalive;
bool write_in_progress = false;
ostream &_conn_prefix(std::ostream *_dout);
void run_continuation(Ct<ProtocolV2> *pcontinuation);