Merge pull request #51862 from yuvalif/wip-61520-reef

reef: test/rgw/notifications: fix kafka consumer shutdown issue

Reviewed-by: Shilpa Jagannath <smanjara@redhat.com>
This commit is contained in:
Yuri Weinstein 2023-06-08 12:06:54 -04:00 committed by GitHub
commit 54dcabdd65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -419,7 +419,10 @@ class KafkaReceiver(object):
port = 9093
while remaining_retries > 0:
try:
self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
self.consumer = KafkaConsumer(topic,
bootstrap_servers = kafka_server+':'+str(port),
security_protocol=security_type,
consumer_timeout_ms=16000)
print('Kafka consumer created on topic: '+topic)
break
except Exception as error:
@ -448,7 +451,7 @@ def kafka_receiver_thread_runner(receiver):
while not receiver.stop:
for msg in receiver.consumer:
receiver.events.append(json.loads(msg.value))
timer.sleep(0.1)
time.sleep(0.1)
log.info('Kafka receiver ended')
print('Kafka receiver ended')
except Exception as error:
@ -468,6 +471,7 @@ def stop_kafka_receiver(receiver, task):
receiver.stop = True
task.join(1)
try:
receiver.consumer.unsubscribe()
receiver.consumer.close()
except Exception as error:
log.info('failed to gracefuly stop Kafka receiver: %s', str(error))