ceph/qa/tasks/kafka.py

196 lines
6.4 KiB
Python
Raw Normal View History

"""
Deploy and configure Kafka for Teuthology
"""
import contextlib
import logging
from teuthology import misc as teuthology
from teuthology import contextutil
from teuthology.orchestra import run
log = logging.getLogger(__name__)
def get_kafka_version(config):
for client, client_config in config.items():
if 'kafka_version' in client_config:
kafka_version = client_config.get('kafka_version')
return kafka_version
def get_kafka_dir(ctx, config):
kafka_version = get_kafka_version(config)
current_version = 'kafka-' + kafka_version + '-src'
return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
@contextlib.contextmanager
def install_kafka(ctx, config):
"""
Downloading the kafka tar file.
"""
assert isinstance(config, dict)
log.info('Installing Kafka...')
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
test_dir=teuthology.get_testdir(ctx)
current_version = get_kafka_version(config)
link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/kafka-' + current_version + '-src.tgz'
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
)
file1 = 'kafka-' + current_version + '-src.tgz'
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1],
)
try:
yield
finally:
log.info('Removing packaged dependencies of Kafka...')
test_dir=get_kafka_dir(ctx, config)
current_version = get_kafka_version(config)
for (client,_) in config.items():
ctx.cluster.only(client).run(
args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
)
ctx.cluster.only(client).run(
args=['rm', '-rf', test_dir],
)
rmfile1 = 'kafka-' + current_version + '-src.tgz'
ctx.cluster.only(client).run(
args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=rmfile1)],
)
@contextlib.contextmanager
def run_kafka(ctx,config):
"""
This includes two parts:
1. Starting Zookeeper service
2. Starting Kafka service
"""
assert isinstance(config, dict)
log.info('Bringing up Zookeeper and Kafka services...')
for (client,_) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./gradlew', 'jar',
'-PscalaVersion=2.13.2'
],
)
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-start.sh',
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
run.Raw('&'), 'exit'
],
)
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-server-start.sh',
'{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
run.Raw('&'), 'exit'
],
)
try:
yield
finally:
log.info('Stopping Zookeeper and Kafka Services...')
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-server-stop.sh',
'{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
],
)
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-stop.sh',
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
],
)
@contextlib.contextmanager
def run_admin_cmds(ctx,config):
"""
Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
"""
assert isinstance(config, dict)
log.info('Checking kafka server through producer/consumer commands...')
for (client,_) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-topics.sh', '--create', '--topic', 'quickstart-events',
'--bootstrap-server', 'localhost:9092'
],
)
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'echo', "First", run.Raw('|'),
'./kafka-console-producer.sh', '--topic', 'quickstart-events',
'--bootstrap-server', 'localhost:9092'
],
)
ctx.cluster.only(client).run(
args=[
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-console-consumer.sh', '--topic', 'quickstart-events',
'--from-beginning',
'--bootstrap-server', 'localhost:9092',
run.Raw('&'), 'exit'
],
)
try:
yield
finally:
pass
@contextlib.contextmanager
def task(ctx,config):
"""
Following is the way how to run kafka::
tasks:
- kafka:
client.0:
"""
assert config is None or isinstance(config, list) \
or isinstance(config, dict), \
"task kafka only supports a list or dictionary for configuration"
all_clients = ['client.{id}'.format(id=id_)
for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
if config is None:
config = all_clients
if isinstance(config, list):
config = dict.fromkeys(config)
log.debug('Kafka config is %s', config)
with contextutil.nested(
lambda: install_kafka(ctx=ctx, config=config),
lambda: run_kafka(ctx=ctx, config=config),
lambda: run_admin_cmds(ctx=ctx, config=config),
):
yield