Merge pull request #44289 from Vicente-Cheng/fix-outgoing-bl-overflow

msg/async: fix outgoing_bl overflow and reset state_offset

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Yuri Weinstein 2022-02-11 14:48:18 -08:00 committed by GitHub
commit 0549f3b07b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 94 additions and 5 deletions

View File

@ -1287,6 +1287,8 @@ static ceph::spinlock debug_lock;
void buffer::list::claim_append(list& bl)
{
// check overflow
assert(_len + bl._len >= _len);
// steal the other guy's buffers
_len += bl._len;
_num += bl._num;

View File

@ -1262,6 +1262,12 @@ options:
desc: Inject various internal delays to induce races (seconds)
default: 0
with_legacy: true
- name: ms_inject_network_congestion
type: uint
level: dev
desc: Inject a network congestions that stuck with N times operations
default: 0
with_legacy: true
- name: ms_blackhole_osd
type: bool
level: dev

View File

@ -226,6 +226,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p)
<< " left is " << left << " buffer still has "
<< recv_end - recv_start << dendl;
if (left == 0) {
state_offset = 0;
return 0;
}
state_offset += to_read;
@ -327,7 +328,12 @@ ssize_t AsyncConnection::_try_send(bool more)
ceph_assert(center->in_thread());
ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
<< " bytes" << dendl;
ssize_t r = cs.send(outgoing_bl, more);
// network block would make ::send return EAGAIN, that would make here looks
// like do not call cs.send() and r = 0
ssize_t r = 0;
if (likely(!inject_network_congestion())) {
r = cs.send(outgoing_bl, more);
}
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
return r;
@ -362,6 +368,11 @@ void AsyncConnection::inject_delay() {
}
}
bool AsyncConnection::inject_network_congestion() const {
return (async_msgr->cct->_conf->ms_inject_network_congestion > 0 &&
rand() % async_msgr->cct->_conf->ms_inject_network_congestion != 0);
}
void AsyncConnection::process() {
std::lock_guard<std::mutex> l(lock);
last_active = ceph::coarse_mono_clock::now();

View File

@ -64,6 +64,7 @@ class AsyncConnection : public Connection {
void _stop();
void fault();
void inject_delay();
bool inject_network_congestion() const;
bool is_queued() const;
void shutdown_socket();

View File

@ -315,6 +315,13 @@ void ProtocolV1::write_event() {
auto start = ceph::mono_clock::now();
bool more;
do {
if (connection->is_queued()) {
if (r = connection->_try_send(); r!= 0) {
// either fails to send or not all queued buffer is sent
break;
}
}
ceph::buffer::list data;
Message *m = _get_next_outgoing(&data);
if (!m) {

View File

@ -653,6 +653,13 @@ void ProtocolV2::write_event() {
auto start = ceph::mono_clock::now();
bool more;
do {
if (connection->is_queued()) {
if (r = connection->_try_send(); r!= 0) {
// either fails to send or not all queued buffer is sent
break;
}
}
const auto out_entry = _get_next_outgoing();
if (!out_entry.m) {
break;

View File

@ -1775,16 +1775,20 @@ class SyntheticWorkload {
DummyAuthClientServer dummy_auth;
public:
static const unsigned max_in_flight = 64;
static const unsigned max_connections = 128;
const unsigned max_in_flight = 0;
const unsigned max_connections = 0;
static const unsigned max_message_len = 1024 * 1024 * 4;
SyntheticWorkload(int servers, int clients, string type, int random_num,
Messenger::Policy srv_policy, Messenger::Policy cli_policy)
Messenger::Policy srv_policy, Messenger::Policy cli_policy,
int _max_in_flight = 64, int _max_connections = 128)
: client_policy(cli_policy),
dispatcher(false, this),
rng(time(NULL)),
dummy_auth(g_ceph_context) {
dummy_auth(g_ceph_context),
max_in_flight(_max_in_flight),
max_connections(_max_connections) {
dummy_auth.auth_registry.refresh_config();
Messenger *msgr;
int base_port = 16800;
@ -1918,6 +1922,32 @@ class SyntheticWorkload {
}
}
void send_large_message(bool inject_network_congestion=false) {
std::lock_guard l{lock};
ConnectionRef conn = _get_random_connection();
uuid_d uuid;
uuid.generate_random();
MCommand *m = new MCommand(uuid);
vector<string> cmds;
cmds.push_back("command");
// set the random data to make the large message
bufferlist bl;
string s("abcdefghijklmnopqrstuvwxyz");
for (int i = 0; i < 1024*256; i++)
bl.append(s);
// bl is around 6M
m->set_data(bl);
m->cmd = cmds;
m->set_priority(200);
// setup after connection is ready
if (inject_network_congestion && conn->is_connected()) {
g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100");
} else {
g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0");
}
conn->send_message(m);
}
void drop_connection() {
std::lock_guard l{lock};
if (available_connections.size() < 10)
@ -2196,6 +2226,31 @@ TEST_P(MessengerTest, SyntheticInjectTest4) {
g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
}
// This is test for network block, means ::send return EAGAIN
TEST_P(MessengerTest, SyntheticInjectTest5) {
SyntheticWorkload test_msg(1, 8, GetParam(), 100,
Messenger::Policy::stateful_server(0),
Messenger::Policy::lossless_client(0),
64, 2);
bool simulate_network_congestion = true;
for (int i = 0; i < 2; ++i)
test_msg.generate_connection();
for (int i = 0; i < 5000; ++i) {
if (!(i % 10)) {
ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl;
test_msg.print_internal_state();
}
if (i < 1600) {
// means that we would stuck 1600 * 6M (9.6G) around with 2 connections
test_msg.send_large_message(simulate_network_congestion);
} else {
simulate_network_congestion = false;
test_msg.send_large_message(simulate_network_congestion);
}
}
test_msg.wait_for_done();
}
class MarkdownDispatcher : public Dispatcher {
ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");