From 62f0140d80a7564d3db377153a3cd87b3816d497 Mon Sep 17 00:00:00 2001 From: Vicente Cheng Date: Thu, 16 Dec 2021 03:20:05 +0000 Subject: [PATCH 1/2] test/msgr: add unittest to simulate network block temporarily Add new test case to verify the network block temporarily, that case would make outgoing_bl overflow so add the assert checking mechanism to claim_append Just use 2 connections because that we could not generate the large data set to verify it Simulate the EAGAIN situation looks like by skip calling cs.send() because EAGAIN would return size 0 and keep the outgoing_bl Signed-off-by: Vicente Cheng --- src/common/buffer.cc | 2 + src/common/options/global.yaml.in | 6 +++ src/msg/async/AsyncConnection.cc | 12 +++++- src/msg/async/AsyncConnection.h | 1 + src/test/msgr/test_msgr.cc | 63 +++++++++++++++++++++++++++++-- 5 files changed, 79 insertions(+), 5 deletions(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 4b1bc08936c..0efa576d76a 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -1287,6 +1287,8 @@ static ceph::spinlock debug_lock; void buffer::list::claim_append(list& bl) { + // check overflow + assert(_len + bl._len >= _len); // steal the other guy's buffers _len += bl._len; _num += bl._num; diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 3e9454db216..e586eb4e0df 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -1262,6 +1262,12 @@ options: desc: Inject various internal delays to induce races (seconds) default: 0 with_legacy: true +- name: ms_inject_network_congestion + type: uint + level: dev + desc: Inject a network congestions that stuck with N times operations + default: 0 + with_legacy: true - name: ms_blackhole_osd type: bool level: dev diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 5769c580e07..78f19b8e2f6 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -327,7 +327,12 @@ ssize_t AsyncConnection::_try_send(bool more) ceph_assert(center->in_thread()); ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length() << " bytes" << dendl; - ssize_t r = cs.send(outgoing_bl, more); + // network block would make ::send return EAGAIN, that would make here looks + // like do not call cs.send() and r = 0 + ssize_t r = 0; + if (likely(!inject_network_congestion())) { + r = cs.send(outgoing_bl, more); + } if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; return r; @@ -362,6 +367,11 @@ void AsyncConnection::inject_delay() { } } +bool AsyncConnection::inject_network_congestion() const { + return (async_msgr->cct->_conf->ms_inject_network_congestion > 0 && + rand() % async_msgr->cct->_conf->ms_inject_network_congestion != 0); +} + void AsyncConnection::process() { std::lock_guard l(lock); last_active = ceph::coarse_mono_clock::now(); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index c7f0f9fe860..82c29985b18 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -64,6 +64,7 @@ class AsyncConnection : public Connection { void _stop(); void fault(); void inject_delay(); + bool inject_network_congestion() const; bool is_queued() const; void shutdown_socket(); diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 17f679b0904..fd7b30fdc7f 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -1775,16 +1775,20 @@ class SyntheticWorkload { DummyAuthClientServer dummy_auth; public: - static const unsigned max_in_flight = 64; - static const unsigned max_connections = 128; + const unsigned max_in_flight = 0; + const unsigned max_connections = 0; static const unsigned max_message_len = 1024 * 1024 * 4; SyntheticWorkload(int servers, int clients, string type, int random_num, - Messenger::Policy srv_policy, Messenger::Policy cli_policy) + Messenger::Policy srv_policy, Messenger::Policy cli_policy, + int _max_in_flight = 64, int _max_connections = 128) : client_policy(cli_policy), dispatcher(false, this), rng(time(NULL)), - dummy_auth(g_ceph_context) { + dummy_auth(g_ceph_context), + max_in_flight(_max_in_flight), + max_connections(_max_connections) { + dummy_auth.auth_registry.refresh_config(); Messenger *msgr; int base_port = 16800; @@ -1918,6 +1922,32 @@ class SyntheticWorkload { } } + void send_large_message(bool inject_network_congestion=false) { + std::lock_guard l{lock}; + ConnectionRef conn = _get_random_connection(); + uuid_d uuid; + uuid.generate_random(); + MCommand *m = new MCommand(uuid); + vector cmds; + cmds.push_back("command"); + // set the random data to make the large message + bufferlist bl; + string s("abcdefghijklmnopqrstuvwxyz"); + for (int i = 0; i < 1024*256; i++) + bl.append(s); + // bl is around 6M + m->set_data(bl); + m->cmd = cmds; + m->set_priority(200); + // setup after connection is ready + if (inject_network_congestion && conn->is_connected()) { + g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100"); + } else { + g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0"); + } + conn->send_message(m); + } + void drop_connection() { std::lock_guard l{lock}; if (available_connections.size() < 10) @@ -2196,6 +2226,31 @@ TEST_P(MessengerTest, SyntheticInjectTest4) { g_ceph_context->_conf.set_val("ms_inject_delay_max", "0"); } +// This is test for network block, means ::send return EAGAIN +TEST_P(MessengerTest, SyntheticInjectTest5) { + SyntheticWorkload test_msg(1, 8, GetParam(), 100, + Messenger::Policy::stateful_server(0), + Messenger::Policy::lossless_client(0), + 64, 2); + bool simulate_network_congestion = true; + for (int i = 0; i < 2; ++i) + test_msg.generate_connection(); + for (int i = 0; i < 5000; ++i) { + if (!(i % 10)) { + ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + if (i < 1600) { + // means that we would stuck 1600 * 6M (9.6G) around with 2 connections + test_msg.send_large_message(simulate_network_congestion); + } else { + simulate_network_congestion = false; + test_msg.send_large_message(simulate_network_congestion); + } + } + test_msg.wait_for_done(); +} + class MarkdownDispatcher : public Dispatcher { ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock"); From cfca2daf3121a58d92b64fae9b99953dc082a0a6 Mon Sep 17 00:00:00 2001 From: Vicente Cheng Date: Fri, 10 Dec 2021 06:49:55 +0000 Subject: [PATCH 2/2] msg/async: fix outgoing_bl overflow and reset state_offset - we should reset state_offset when read done. - check outgoing_bl before we try to write a message. In some environments, network would temporily block and return EAGAIN. For async msgr, we would callback the write event directly, but that still increase the outgoing_bl. Think about this case, the sender is in congestion or network driver has some problems. The data appended to outgoing_bl and outgoing_bl is not consumed up-to-date immediately. That size of outgoing_bl will increase with time then overflow. The wrong outgoing_bl would cause some problems so we need to wait for outgoing_bl before we appended another message. Signed-off-by: Vicente Cheng --- src/msg/async/AsyncConnection.cc | 1 + src/msg/async/ProtocolV1.cc | 7 +++++++ src/msg/async/ProtocolV2.cc | 7 +++++++ 3 files changed, 15 insertions(+) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 78f19b8e2f6..dbca8e74dc9 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -226,6 +226,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) << " left is " << left << " buffer still has " << recv_end - recv_start << dendl; if (left == 0) { + state_offset = 0; return 0; } state_offset += to_read; diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 43363371bc3..7373bada4b4 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -315,6 +315,13 @@ void ProtocolV1::write_event() { auto start = ceph::mono_clock::now(); bool more; do { + if (connection->is_queued()) { + if (r = connection->_try_send(); r!= 0) { + // either fails to send or not all queued buffer is sent + break; + } + } + ceph::buffer::list data; Message *m = _get_next_outgoing(&data); if (!m) { diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index a176fc2c808..cf5d18b0c02 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -653,6 +653,13 @@ void ProtocolV2::write_event() { auto start = ceph::mono_clock::now(); bool more; do { + if (connection->is_queued()) { + if (r = connection->_try_send(); r!= 0) { + // either fails to send or not all queued buffer is sent + break; + } + } + const auto out_entry = _get_next_outgoing(); if (!out_entry.m) { break;