mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
msg/async: avoid unnecessary costly wakeups for outbound messages
If a wakeup for an outbound message has already been scheduled or is currently executing within the worker thread, avoid re-adding a wakeup. For small IO sizes under high queue depths, these extra syscalls start to add up. For larger IO sizes or small queue depths, it doesn't hurt performance. fio --ioengine=rbd results: IOPS pre-change post-change 4K: 84.9k 98.3k 32K: 58.4k 59.5k 256K: 12.1k 12.2k 4M: 803 802 Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
parent
f6b022bdbe
commit
294c41f18a
@ -240,7 +240,8 @@ void ProtocolV1::send_message(Message *m) {
|
||||
out_q[m->get_priority()].emplace_back(std::move(bl), m);
|
||||
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
|
||||
<< dendl;
|
||||
if (can_write != WriteStatus::REPLACING) {
|
||||
if (can_write != WriteStatus::REPLACING && !write_in_progress) {
|
||||
write_in_progress = true;
|
||||
connection->center->dispatch_event_external(connection->write_handler);
|
||||
}
|
||||
}
|
||||
@ -348,6 +349,7 @@ void ProtocolV1::write_event() {
|
||||
} else if (r > 0)
|
||||
break;
|
||||
} while (can_write == WriteStatus::CANWRITE);
|
||||
write_in_progress = false;
|
||||
connection->write_lock.unlock();
|
||||
|
||||
// if r > 0 mean data still lefted, so no need _try_send.
|
||||
@ -378,6 +380,7 @@ void ProtocolV1::write_event() {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
write_in_progress = false;
|
||||
connection->write_lock.unlock();
|
||||
connection->lock.lock();
|
||||
connection->write_lock.lock();
|
||||
@ -1174,6 +1177,7 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
|
||||
}
|
||||
|
||||
void ProtocolV1::requeue_sent() {
|
||||
write_in_progress = false;
|
||||
if (sent.empty()) {
|
||||
return;
|
||||
}
|
||||
@ -1233,6 +1237,7 @@ void ProtocolV1::discard_out_queue() {
|
||||
}
|
||||
}
|
||||
out_q.clear();
|
||||
write_in_progress = false;
|
||||
}
|
||||
|
||||
void ProtocolV1::reset_recv_state()
|
||||
@ -2305,6 +2310,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
|
||||
<< __func__ << " stop myself to swap existing" << dendl;
|
||||
exproto->can_write = WriteStatus::REPLACING;
|
||||
exproto->replacing = true;
|
||||
exproto->write_in_progress = false;
|
||||
existing->state_offset = 0;
|
||||
// avoid previous thread modify event
|
||||
exproto->state = NONE;
|
||||
|
@ -108,6 +108,7 @@ protected:
|
||||
// priority queue for outbound msgs
|
||||
std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q;
|
||||
bool keepalive;
|
||||
bool write_in_progress = false;
|
||||
|
||||
__u32 connect_seq, peer_global_seq;
|
||||
std::atomic<uint64_t> in_seq{0};
|
||||
|
@ -132,6 +132,7 @@ void ProtocolV2::discard_out_queue() {
|
||||
}
|
||||
}
|
||||
out_queue.clear();
|
||||
write_in_progress = false;
|
||||
}
|
||||
|
||||
void ProtocolV2::reset_session() {
|
||||
@ -181,6 +182,7 @@ void ProtocolV2::stop() {
|
||||
void ProtocolV2::fault() { _fault(); }
|
||||
|
||||
void ProtocolV2::requeue_sent() {
|
||||
write_in_progress = false;
|
||||
if (sent.empty()) {
|
||||
return;
|
||||
}
|
||||
@ -424,7 +426,8 @@ void ProtocolV2::send_message(Message *m) {
|
||||
out_queue_entry_t{is_prepared, m});
|
||||
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
|
||||
<< dendl;
|
||||
if ((!replacing && can_write) || state == STANDBY) {
|
||||
if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
|
||||
write_in_progress = true;
|
||||
connection->center->dispatch_event_external(connection->write_handler);
|
||||
}
|
||||
}
|
||||
@ -629,6 +632,7 @@ void ProtocolV2::write_event() {
|
||||
} else if (r > 0)
|
||||
break;
|
||||
} while (can_write);
|
||||
write_in_progress = false;
|
||||
|
||||
// if r > 0 mean data still lefted, so no need _try_send.
|
||||
if (r == 0) {
|
||||
@ -657,6 +661,7 @@ void ProtocolV2::write_event() {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
write_in_progress = false;
|
||||
connection->write_lock.unlock();
|
||||
connection->lock.lock();
|
||||
connection->write_lock.lock();
|
||||
@ -2673,6 +2678,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
|
||||
connection->dispatch_queue->queue_reset(connection);
|
||||
|
||||
exproto->can_write = false;
|
||||
exproto->write_in_progress = false;
|
||||
exproto->reconnecting = reconnecting;
|
||||
exproto->replacing = true;
|
||||
std::swap(exproto->session_stream_handlers, session_stream_handlers);
|
||||
|
@ -113,6 +113,7 @@ private:
|
||||
} pre_auth;
|
||||
|
||||
bool keepalive;
|
||||
bool write_in_progress = false;
|
||||
|
||||
ostream &_conn_prefix(std::ostream *_dout);
|
||||
void run_continuation(Ct<ProtocolV2> *pcontinuation);
|
||||
|
Loading…
Reference in New Issue
Block a user