mirror of
https://github.com/ceph/ceph
synced 2025-02-21 01:47:25 +00:00
rgw/pubsub: prevent kafka thread from spinning when there are no
messages Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
This commit is contained in:
parent
0423dae9be
commit
9cce2381cb
@ -372,9 +372,9 @@ private:
|
||||
while (!stopped) {
|
||||
|
||||
// publish all messages in the queue
|
||||
auto event_count = 0U;
|
||||
const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
|
||||
dequeued += count;
|
||||
auto reply_count = 0U;
|
||||
const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
|
||||
dequeued += send_count;
|
||||
ConnectionList::iterator conn_it;
|
||||
ConnectionList::const_iterator end_it;
|
||||
{
|
||||
@ -412,14 +412,14 @@ private:
|
||||
INCREMENT_AND_CONTINUE(conn_it);
|
||||
}
|
||||
|
||||
event_count += rd_kafka_poll(conn->producer, read_timeout_ms);
|
||||
reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);
|
||||
|
||||
// just increment the iterator
|
||||
++conn_it;
|
||||
}
|
||||
// if no messages were received or published
|
||||
// across all connection, sleep for 100ms
|
||||
if (count == 0 && event_count) {
|
||||
if (send_count == 0 && reply_count == 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ from nose.tools import assert_not_equal, assert_equal
|
||||
# configure logging for the tests module
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
skip_push_tests = False
|
||||
skip_push_tests = True
|
||||
|
||||
####################################
|
||||
# utility functions for pubsub tests
|
||||
|
Loading…
Reference in New Issue
Block a user