Refactor teuthology.orchestra.run

RemoteProcess behaves more like subprocess.Popen, with some important
differences.

A summary of the API changes:
* RemoteProcess.exitstatus is either an int or None; it is never a callable
  nor a gevent.AsyncResult.
* New method: RemoteProcess.execute()
* New method: RemoteProcess.poll()
* New method: RemoteProcess.wait()
* New attribute: RemoteProcess.returncode - alias to exitstatus
* New property: RemoteProcess.finished - added because returncode can be None
  if the connection was interrupted
* run.execute() is removed.

Signed-off-by: Zack Cerza <zack.cerza@inktank.com>
This commit is contained in:
Zack Cerza 2014-05-30 11:59:09 -05:00
parent 59ee17dc19
commit b386f5e5df
4 changed files with 131 additions and 125 deletions

View File

@ -16,23 +16,114 @@ log = logging.getLogger(__name__)
class RemoteProcess(object):
"""
Remote process object used to keep track of attributes of a process.
An object to begin and monitor execution of a process on a remote host
"""
__slots__ = [
'command', 'stdin', 'stdout', 'stderr', 'exitstatus', 'exited',
'client', 'args', 'check_status', 'command', 'hostname',
'stdin', 'stdout', 'stderr',
'_stdin_buf', '_stdout_buf', '_stderr_buf',
'returncode', 'exitstatus',
'greenlets',
# for orchestra.remote.Remote to place a backreference
'remote',
]
def __init__(self, command, stdin, stdout, stderr, exitstatus, exited):
self.command = command
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.exitstatus = exitstatus
self.exited = exited
def __init__(self, client, args, check_status=True, hostname=None):
"""
Create the object. Does not initiate command execution.
:param client: paramiko.SSHConnection to run the command with
:param args: Command to run.
:type args: String or list of strings
:param check_status: Whether to raise CommandFailedError on non-zero
exit status, and . Defaults to True. All signals
and connection loss are made to look like SIGHUP.
:param hostname: Name of remote host (optional)
"""
self.client = client
self.args = args
if isinstance(args, basestring):
self.command = args
else:
self.command = quote(args)
self.check_status = check_status
if hostname:
self.hostname = hostname
else:
(self.hostname, port) = client.get_transport().getpeername()
self.greenlets = []
self.stdin, self.stdout, self.stderr = (None, None, None)
self.returncode = self.exitstatus = None
def execute(self):
"""
Execute remote command
"""
log.getChild(self.hostname).info(u"Running: {cmd!r}".format(
cmd=self.command))
(self._stdin_buf, self._stdout_buf, self._stderr_buf) = \
self.client.exec_command(self.command)
(self.stdin, self.stdout, self.stderr) = \
(self._stdin_buf, self._stdout_buf, self._stderr_buf)
def add_greenlet(self, greenlet):
self.greenlets.append(greenlet)
def wait(self):
"""
Block until remote process finishes.
:returns: self.returncode
"""
for greenlet in self.greenlets:
greenlet.get()
status = self._get_exitstatus()
self.exitstatus = self.returncode = status
if self.check_status:
if status is None:
# command either died due to a signal, or the connection
# was lost
transport = self.client.get_transport()
if not transport.is_active():
# look like we lost the connection
raise ConnectionLostError(command=self.command)
# connection seems healthy still, assuming it was a
# signal; sadly SSH does not tell us which signal
raise CommandCrashedError(command=self.command)
if status != 0:
raise CommandFailedError(command=self.command,
exitstatus=status, node=self.hostname)
return status
def _get_exitstatus(self):
"""
:returns: the remote command's exit status (return code). Note that
if the connection is lost, or if the process was killed by a
signal, this returns None instead of paramiko's -1.
"""
status = self._stdout_buf.channel.recv_exit_status()
if status == -1:
status = None
return status
@property
def finished(self):
return self._stdout_buf.channel.exit_status_ready()
def poll(self):
"""
:returns: self.returncode if the process is finished; else None
"""
if self.finished:
return self.returncode
return None
class Raw(object):
@ -66,64 +157,6 @@ def quote(args):
return ' '.join(_quote(args))
def execute(client, args, name=None):
"""
Execute a command remotely.
Caller needs to handle stdin etc.
:param client: SSHConnection to run the command with
:param args: command to run
:param name: name of client (optional)
:type args: string or list of strings
Returns a RemoteProcess, where exitstatus is a callable that will
block until the exit status is available.
"""
if isinstance(args, basestring):
cmd = args
else:
cmd = quote(args)
if name:
host = name
else:
(host, port) = client.get_transport().getpeername()
log.getChild(host).info(u"Running: {cmd!r}".format(cmd=cmd))
(in_, out, err) = client.exec_command(cmd)
def get_exitstatus():
"""
Get exit status.
When -1 on connection loss *and* signals occur, this
maps to more pythonic None
"""
status = out.channel.recv_exit_status()
if status == -1:
status = None
return status
def exitstatus_ready():
"""
out.channel exit wrapper.
"""
return out.channel.exit_status_ready()
r = RemoteProcess(
command=cmd,
stdin=in_,
stdout=out,
stderr=err,
# this is a callable that will block until the status is
# available
exitstatus=get_exitstatus,
exited=exitstatus_ready,
)
return r
def copy_to_log(f, logger, loglevel=logging.INFO):
"""
Interface to older xreadlines api.
@ -316,13 +349,15 @@ def run(
if name is None:
name = host
r = execute(client, args, name=name)
r = RemoteProcess(client, args, check_status=check_status, hostname=name)
r.execute()
r.stdin = KludgeFile(wrapped=r.stdin)
g_in = None
if stdin is not PIPE:
g_in = gevent.spawn(copy_and_close, stdin, r.stdin)
r.add_greenlet(g_in)
r.stdin = None
else:
assert not wait, \
@ -336,6 +371,7 @@ def run(
if stderr is None:
stderr = logger.getChild(name).getChild('stderr')
g_err = gevent.spawn(copy_file_to, r.stderr, stderr)
r.add_greenlet(g_err)
r.stderr = stderr
else:
assert not wait, \
@ -346,45 +382,14 @@ def run(
if stdout is None:
stdout = logger.getChild(name).getChild('stdout')
g_out = gevent.spawn(copy_file_to, r.stdout, stdout)
r.add_greenlet(g_out)
r.stdout = stdout
else:
assert not wait, \
"Using PIPE for stdout without wait=False would deadlock."
def _check_status(status):
"""
get values needed if uninitialized. Handle ssh issues when checking
the status.
"""
if g_err is not None:
g_err.get()
if g_out is not None:
g_out.get()
if g_in is not None:
g_in.get()
status = status()
if check_status:
if status is None:
# command either died due to a signal, or the connection
# was lost
transport = client.get_transport()
if not transport.is_active():
# look like we lost the connection
raise ConnectionLostError(command=r.command)
# connection seems healthy still, assuming it was a
# signal; sadly SSH does not tell us which signal
raise CommandCrashedError(command=r.command)
if status != 0:
raise CommandFailedError(
command=r.command, exitstatus=status, node=name)
return status
if wait:
r.exitstatus = _check_status(r.exitstatus)
else:
r.exitstatus = spawn_asyncresult(_check_status, r.exitstatus)
r.wait()
return r
@ -403,9 +408,8 @@ def wait(processes, timeout=None):
while len(not_ready) > 0:
check_time()
for proc in list(not_ready):
if proc.exitstatus.ready():
if proc.finished:
not_ready.remove(proc)
for proc in processes:
assert isinstance(proc.exitstatus, gevent.event.AsyncResult)
proc.exitstatus.get()
proc.wait()

View File

@ -59,7 +59,8 @@ class TestIntegration():
r.stdin.write('bar\n')
r.stdin.close()
got = r.exitstatus.get()
r.wait()
got = r.exitstatus
assert got == 0
assert r.stdout.getvalue() == 'foo\nbar\n'

View File

@ -27,6 +27,8 @@ class TestRemote(object):
def test_run(self):
fudge.clear_expectations()
ssh = fudge.Fake('SSHConnection')
ssh.expects('get_transport').returns_fake().expects('getpeername')\
.returns(('name', 22))
run = fudge.Fake('run')
args = [
'something',
@ -34,12 +36,8 @@ class TestRemote(object):
]
foo = object()
ret = RemoteProcess(
command='fakey',
stdin=None,
stdout=None,
stderr=None,
exitstatus=None,
exited=None,
client=ssh,
args='fakey',
)
r = remote.Remote(name='jdoe@xyzzy.example.com', ssh=ssh)
run.expects_call().with_args(

View File

@ -1,7 +1,6 @@
from cStringIO import StringIO
import fudge
import gevent.event
import logging
from .. import run
@ -280,12 +279,11 @@ class TestRun(object):
wait=False,
)
assert r.command == 'foo'
assert isinstance(r.exitstatus, gevent.event.AsyncResult)
e = assert_raises(
run.CommandFailedError,
r.exitstatus.get,
r.wait,
)
assert e.exitstatus == 42
assert r.returncode == 42
assert str(e) == "Command failed on HOST with status 42: 'foo'"
@fudge.with_fakes
@ -305,6 +303,7 @@ class TestRun(object):
logger = fudge.Fake('logger').is_a_stub()
channel = fudge.Fake('channel')
out.has_attr(channel=channel)
channel.expects('exit_status_ready').with_args().returns(False)
channel.expects('recv_exit_status').with_args().returns(0)
r = run.run(
client=ssh,
@ -315,9 +314,9 @@ class TestRun(object):
)
r.stdin.write('bar')
assert r.command == 'foo'
assert isinstance(r.exitstatus, gevent.event.AsyncResult)
assert r.exitstatus.ready() == False
got = r.exitstatus.get()
assert r.poll() is None
got = r.wait()
assert isinstance(r.returncode, int)
assert got == 0
@fudge.with_fakes
@ -339,6 +338,7 @@ class TestRun(object):
logger = fudge.Fake('logger').is_a_stub()
channel = fudge.Fake('channel')
out.has_attr(channel=channel)
channel.expects('exit_status_ready').with_args().returns(False)
channel.expects('recv_exit_status').with_args().returns(0)
r = run.run(
client=ssh,
@ -347,13 +347,14 @@ class TestRun(object):
stdout=run.PIPE,
wait=False,
)
assert r.exitstatus is None
assert r.command == 'foo'
assert isinstance(r.exitstatus, gevent.event.AsyncResult)
assert r.exitstatus.ready() == False
assert r.poll() is None
assert r.stdout.read() == 'one'
assert r.stdout.read() == 'two'
assert r.stdout.read() == ''
got = r.exitstatus.get()
got = r.wait()
assert isinstance(r.exitstatus, int)
assert got == 0
@fudge.with_fakes
@ -375,6 +376,7 @@ class TestRun(object):
logger = fudge.Fake('logger').is_a_stub()
channel = fudge.Fake('channel')
out.has_attr(channel=channel)
channel.expects('exit_status_ready').with_args().returns(False)
channel.expects('recv_exit_status').with_args().returns(0)
r = run.run(
client=ssh,
@ -383,13 +385,14 @@ class TestRun(object):
stderr=run.PIPE,
wait=False,
)
assert r.exitstatus is None
assert r.command == 'foo'
assert isinstance(r.exitstatus, gevent.event.AsyncResult)
assert r.exitstatus.ready() is False
assert r.poll() is None
assert r.stderr.read() == 'one'
assert r.stderr.read() == 'two'
assert r.stderr.read() == ''
got = r.exitstatus.get()
got = r.wait()
assert isinstance(r.exitstatus, int)
assert got == 0
def test_quote_simple(self):