mirror of
https://github.com/ceph/ceph
synced 2025-01-19 17:41:39 +00:00
6135747a06
Adding a sleep after running ./kafka-server-stop.sh and ./zookeeper-server-stop.sh scripts so that nothing gets logged into the kafka logs after the sleep time. And finally killing the process. This resolves: https://tracker.ceph.com/issues/53220 Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
205 lines
6.6 KiB
Python
205 lines
6.6 KiB
Python
"""
|
|
Deploy and configure Kafka for Teuthology
|
|
"""
|
|
import contextlib
|
|
import logging
|
|
import time
|
|
|
|
from teuthology import misc as teuthology
|
|
from teuthology import contextutil
|
|
from teuthology.orchestra import run
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
def get_kafka_version(config):
|
|
for client, client_config in config.items():
|
|
if 'kafka_version' in client_config:
|
|
kafka_version = client_config.get('kafka_version')
|
|
return kafka_version
|
|
|
|
def get_kafka_dir(ctx, config):
|
|
kafka_version = get_kafka_version(config)
|
|
current_version = 'kafka-' + kafka_version + '-src'
|
|
return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def install_kafka(ctx, config):
|
|
"""
|
|
Downloading the kafka tar file.
|
|
"""
|
|
assert isinstance(config, dict)
|
|
log.info('Installing Kafka...')
|
|
|
|
for (client, _) in config.items():
|
|
(remote,) = ctx.cluster.only(client).remotes.keys()
|
|
test_dir=teuthology.get_testdir(ctx)
|
|
current_version = get_kafka_version(config)
|
|
|
|
link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/kafka-' + current_version + '-src.tgz'
|
|
ctx.cluster.only(client).run(
|
|
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
|
|
)
|
|
|
|
file1 = 'kafka-' + current_version + '-src.tgz'
|
|
ctx.cluster.only(client).run(
|
|
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1],
|
|
)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
log.info('Removing packaged dependencies of Kafka...')
|
|
test_dir=get_kafka_dir(ctx, config)
|
|
current_version = get_kafka_version(config)
|
|
for (client,_) in config.items():
|
|
ctx.cluster.only(client).run(
|
|
args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
|
|
)
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=['rm', '-rf', test_dir],
|
|
)
|
|
|
|
rmfile1 = 'kafka-' + current_version + '-src.tgz'
|
|
ctx.cluster.only(client).run(
|
|
args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=rmfile1)],
|
|
)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def run_kafka(ctx,config):
|
|
"""
|
|
This includes two parts:
|
|
1. Starting Zookeeper service
|
|
2. Starting Kafka service
|
|
"""
|
|
assert isinstance(config, dict)
|
|
log.info('Bringing up Zookeeper and Kafka services...')
|
|
for (client,_) in config.items():
|
|
(remote,) = ctx.cluster.only(client).remotes.keys()
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
|
'./gradlew', 'jar',
|
|
'-PscalaVersion=2.13.2'
|
|
],
|
|
)
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
|
'./zookeeper-server-start.sh',
|
|
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
|
|
run.Raw('&'), 'exit'
|
|
],
|
|
)
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
|
'./kafka-server-start.sh',
|
|
'{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
|
|
run.Raw('&'), 'exit'
|
|
],
|
|
)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
log.info('Stopping Zookeeper and Kafka Services...')
|
|
|
|
for (client, _) in config.items():
|
|
(remote,) = ctx.cluster.only(client).remotes.keys()
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
|
'./kafka-server-stop.sh',
|
|
'{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
|
|
],
|
|
)
|
|
|
|
time.sleep(5)
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
|
'./zookeeper-server-stop.sh',
|
|
'{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
|
|
],
|
|
)
|
|
|
|
time.sleep(5)
|
|
|
|
ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def run_admin_cmds(ctx,config):
|
|
"""
|
|
Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
|
|
"""
|
|
assert isinstance(config, dict)
|
|
log.info('Checking kafka server through producer/consumer commands...')
|
|
for (client,_) in config.items():
|
|
(remote,) = ctx.cluster.only(client).remotes.keys()
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=[
|
|
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
|
'./kafka-topics.sh', '--create', '--topic', 'quickstart-events',
|
|
'--bootstrap-server', 'localhost:9092'
|
|
],
|
|
)
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=[
|
|
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
|
'echo', "First", run.Raw('|'),
|
|
'./kafka-console-producer.sh', '--topic', 'quickstart-events',
|
|
'--bootstrap-server', 'localhost:9092'
|
|
],
|
|
)
|
|
|
|
ctx.cluster.only(client).run(
|
|
args=[
|
|
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
|
|
'./kafka-console-consumer.sh', '--topic', 'quickstart-events',
|
|
'--from-beginning',
|
|
'--bootstrap-server', 'localhost:9092',
|
|
run.Raw('&'), 'exit'
|
|
],
|
|
)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
pass
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def task(ctx,config):
|
|
"""
|
|
Following is the way how to run kafka::
|
|
tasks:
|
|
- kafka:
|
|
client.0:
|
|
kafka_version: 2.6.0
|
|
"""
|
|
assert config is None or isinstance(config, list) \
|
|
or isinstance(config, dict), \
|
|
"task kafka only supports a list or dictionary for configuration"
|
|
|
|
all_clients = ['client.{id}'.format(id=id_)
|
|
for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
|
|
if config is None:
|
|
config = all_clients
|
|
if isinstance(config, list):
|
|
config = dict.fromkeys(config)
|
|
|
|
log.debug('Kafka config is %s', config)
|
|
|
|
with contextutil.nested(
|
|
lambda: install_kafka(ctx=ctx, config=config),
|
|
lambda: run_kafka(ctx=ctx, config=config),
|
|
lambda: run_admin_cmds(ctx=ctx, config=config),
|
|
):
|
|
yield
|