mirror of
https://github.com/ceph/ceph
synced 2025-01-20 01:51:34 +00:00
150 lines
4.2 KiB
Python
150 lines
4.2 KiB
Python
"""
|
|
Handle parallel execution on remote hosts
|
|
"""
|
|
import logging
|
|
|
|
from teuthology import misc as teuthology
|
|
from teuthology.parallel import parallel
|
|
from teuthology.orchestra import run as tor
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
from gevent import queue as queue
|
|
from gevent import event as event
|
|
|
|
def _init_barrier(barrier_queue, remote):
|
|
"""current just queues a remote host"""
|
|
barrier_queue.put(remote)
|
|
|
|
def _do_barrier(barrier, barrier_queue, remote):
|
|
"""special case for barrier"""
|
|
barrier_queue.get()
|
|
if barrier_queue.empty():
|
|
barrier.set()
|
|
barrier.clear()
|
|
else:
|
|
barrier.wait()
|
|
|
|
barrier_queue.put(remote)
|
|
if barrier_queue.full():
|
|
barrier.set()
|
|
barrier.clear()
|
|
else:
|
|
barrier.wait()
|
|
|
|
def _exec_host(barrier, barrier_queue, remote, sudo, testdir, ls):
|
|
"""Execute command remotely"""
|
|
log.info('Running commands on host %s', remote.name)
|
|
args = [
|
|
'TESTDIR={tdir}'.format(tdir=testdir),
|
|
'bash',
|
|
'-s'
|
|
]
|
|
if sudo:
|
|
args.insert(0, 'sudo')
|
|
|
|
r = remote.run( args=args, stdin=tor.PIPE, wait=False)
|
|
r.stdin.writelines(['set -e\n'])
|
|
r.stdin.flush()
|
|
for l in ls:
|
|
l.replace('$TESTDIR', testdir)
|
|
if l == "barrier":
|
|
_do_barrier(barrier, barrier_queue, remote)
|
|
continue
|
|
|
|
r.stdin.writelines([l, '\n'])
|
|
r.stdin.flush()
|
|
r.stdin.writelines(['\n'])
|
|
r.stdin.flush()
|
|
r.stdin.close()
|
|
tor.wait([r])
|
|
|
|
def _generate_remotes(ctx, config):
|
|
"""Return remote roles and the type of role specified in config"""
|
|
if 'all' in config and len(config) == 1:
|
|
ls = config['all']
|
|
for remote in ctx.cluster.remotes.iterkeys():
|
|
yield (remote, ls)
|
|
elif 'clients' in config:
|
|
ls = config['clients']
|
|
for role in teuthology.all_roles_of_type(ctx.cluster, 'client'):
|
|
(remote,) = ctx.cluster.only('client.{r}'.format(r=role)).remotes.iterkeys()
|
|
yield (remote, ls)
|
|
del config['clients']
|
|
for role, ls in config.iteritems():
|
|
(remote,) = ctx.cluster.only(role).remotes.iterkeys()
|
|
yield (remote, ls)
|
|
else:
|
|
for role, ls in config.iteritems():
|
|
(remote,) = ctx.cluster.only(role).remotes.iterkeys()
|
|
yield (remote, ls)
|
|
|
|
def task(ctx, config):
|
|
"""
|
|
Execute commands on multiple hosts in parallel
|
|
|
|
tasks:
|
|
- ceph:
|
|
- ceph-fuse: [client.0, client.1]
|
|
- pexec:
|
|
client.0:
|
|
- while true; do echo foo >> bar; done
|
|
client.1:
|
|
- sleep 1
|
|
- tail -f bar
|
|
- interactive:
|
|
|
|
Execute commands on all hosts in the cluster in parallel. This
|
|
is useful if there are many hosts and you want to run the same
|
|
command on all:
|
|
|
|
tasks:
|
|
- pexec:
|
|
all:
|
|
- grep FAIL /var/log/ceph/*
|
|
|
|
Or if you want to run in parallel on all clients:
|
|
|
|
tasks:
|
|
- pexec:
|
|
clients:
|
|
- dd if=/dev/zero of={testdir}/mnt.* count=1024 bs=1024
|
|
|
|
You can also ensure that parallel commands are synchronized with the
|
|
special 'barrier' statement:
|
|
|
|
tasks:
|
|
- pexec:
|
|
clients:
|
|
- cd {testdir}/mnt.*
|
|
- while true; do
|
|
- barrier
|
|
- dd if=/dev/zero of=./foo count=1024 bs=1024
|
|
- done
|
|
|
|
The above writes to the file foo on all clients over and over, but ensures that
|
|
all clients perform each write command in sync. If one client takes longer to
|
|
write, all the other clients will wait.
|
|
|
|
"""
|
|
log.info('Executing custom commands...')
|
|
assert isinstance(config, dict), "task pexec got invalid config"
|
|
|
|
sudo = False
|
|
if 'sudo' in config:
|
|
sudo = config['sudo']
|
|
del config['sudo']
|
|
|
|
testdir = teuthology.get_testdir(ctx)
|
|
|
|
remotes = list(_generate_remotes(ctx, config))
|
|
count = len(remotes)
|
|
barrier_queue = queue.Queue(count)
|
|
barrier = event.Event()
|
|
|
|
for remote in remotes:
|
|
_init_barrier(barrier_queue, remote[0])
|
|
with parallel() as p:
|
|
for remote in remotes:
|
|
p.spawn(_exec_host, barrier, barrier_queue, remote[0], sudo, testdir, remote[1])
|