ceph/qa/tasks/kafka.py
Kalpesh Pandya 6135747a06 qa/tasks: Checking for kafka cleanup
Adding a sleep after running ./kafka-server-stop.sh and ./zookeeper-server-stop.sh
scripts so that nothing gets logged into the kafka logs after the sleep time.
And finally killing the process.

This resolves: https://tracker.ceph.com/issues/53220

Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
2022-01-11 21:14:15 +05:30

205 lines
6.6 KiB
Python

"""
Deploy and configure Kafka for Teuthology
"""
import contextlib
import logging
import time
from teuthology import misc as teuthology
from teuthology import contextutil
from teuthology.orchestra import run
log = logging.getLogger(__name__)
def get_kafka_version(config):
for client, client_config in config.items():
if 'kafka_version' in client_config:
kafka_version = client_config.get('kafka_version')
return kafka_version
def get_kafka_dir(ctx, config):
kafka_version = get_kafka_version(config)
current_version = 'kafka-' + kafka_version + '-src'
return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
@contextlib.contextmanager
def install_kafka(ctx, config):
"""
Downloading the kafka tar file.
"""
assert isinstance(config, dict)
log.info('Installing Kafka...')
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
test_dir=teuthology.get_testdir(ctx)
current_version = get_kafka_version(config)
link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/kafka-' + current_version + '-src.tgz'
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
)
file1 = 'kafka-' + current_version + '-src.tgz'
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1],
)
try:
yield
finally:
log.info('Removing packaged dependencies of Kafka...')
test_dir=get_kafka_dir(ctx, config)
current_version = get_kafka_version(config)
for (client,_) in config.items():
ctx.cluster.only(client).run(
args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
)
ctx.cluster.only(client).run(
args=['rm', '-rf', test_dir],
)
rmfile1 = 'kafka-' + current_version + '-src.tgz'
ctx.cluster.only(client).run(
args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=rmfile1)],
)
@contextlib.contextmanager
def run_kafka(ctx,config):
"""
This includes two parts:
1. Starting Zookeeper service
2. Starting Kafka service
"""
assert isinstance(config, dict)
log.info('Bringing up Zookeeper and Kafka services...')
for (client,_) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./gradlew', 'jar',
'-PscalaVersion=2.13.2'
],
)
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-start.sh',
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
run.Raw('&'), 'exit'
],
)
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-server-start.sh',
'{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
run.Raw('&'), 'exit'
],
)
try:
yield
finally:
log.info('Stopping Zookeeper and Kafka Services...')
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-server-stop.sh',
'{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
],
)
time.sleep(5)
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-stop.sh',
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
],
)
time.sleep(5)
ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
@contextlib.contextmanager
def run_admin_cmds(ctx,config):
"""
Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
"""
assert isinstance(config, dict)
log.info('Checking kafka server through producer/consumer commands...')
for (client,_) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-topics.sh', '--create', '--topic', 'quickstart-events',
'--bootstrap-server', 'localhost:9092'
],
)
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'echo', "First", run.Raw('|'),
'./kafka-console-producer.sh', '--topic', 'quickstart-events',
'--bootstrap-server', 'localhost:9092'
],
)
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-console-consumer.sh', '--topic', 'quickstart-events',
'--from-beginning',
'--bootstrap-server', 'localhost:9092',
run.Raw('&'), 'exit'
],
)
try:
yield
finally:
pass
@contextlib.contextmanager
def task(ctx,config):
"""
Following is the way how to run kafka::
tasks:
- kafka:
client.0:
kafka_version: 2.6.0
"""
assert config is None or isinstance(config, list) \
or isinstance(config, dict), \
"task kafka only supports a list or dictionary for configuration"
all_clients = ['client.{id}'.format(id=id_)
for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
if config is None:
config = all_clients
if isinstance(config, list):
config = dict.fromkeys(config)
log.debug('Kafka config is %s', config)
with contextutil.nested(
lambda: install_kafka(ctx=ctx, config=config),
lambda: run_kafka(ctx=ctx, config=config),
lambda: run_admin_cmds(ctx=ctx, config=config),
):
yield