diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 89a94106b43..ae253107cb0 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -419,7 +419,10 @@ class KafkaReceiver(object): port = 9093 while remaining_retries > 0: try: - self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type) + self.consumer = KafkaConsumer(topic, + bootstrap_servers = kafka_server+':'+str(port), + security_protocol=security_type, + consumer_timeout_ms=16000) print('Kafka consumer created on topic: '+topic) break except Exception as error: @@ -448,7 +451,7 @@ def kafka_receiver_thread_runner(receiver): while not receiver.stop: for msg in receiver.consumer: receiver.events.append(json.loads(msg.value)) - timer.sleep(0.1) + time.sleep(0.1) log.info('Kafka receiver ended') print('Kafka receiver ended') except Exception as error: @@ -468,6 +471,7 @@ def stop_kafka_receiver(receiver, task): receiver.stop = True task.join(1) try: + receiver.consumer.unsubscribe() receiver.consumer.close() except Exception as error: log.info('failed to gracefuly stop Kafka receiver: %s', str(error))