ceph/qa/tasks/cephfs/test_nfs.py

84 lines
2.7 KiB
Python
Raw Normal View History

import os
import json
import time
import errno
import logging
from io import BytesIO
from tasks.mgr.mgr_test_case import MgrTestCase
from teuthology.exceptions import CommandFailedError
log = logging.getLogger(__name__)
class TestNFS(MgrTestCase):
def _nfs_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("nfs", *args)
def _orch_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("orch", *args)
def _sys_cmd(self, cmd):
cmd[0:0] = ['sudo']
ret = self.ctx.cluster.run(args=cmd, check_status=False, stdout=BytesIO(), stderr=BytesIO())
stdout = ret[0].stdout
if stdout:
# It's RemoteProcess defined in teuthology/orchestra/run.py
return stdout.getvalue()
def setUp(self):
super(TestNFS, self).setUp()
self._load_module("cephadm")
self._orch_cmd("set", "backend", "cephadm")
self.cluster_id = "test"
self.export_type = "cephfs"
self.pseudo_path = "/cephfs"
self.expected_name = 'nfs.ganesha-test'
def _check_port_status(self):
log.info("NETSTAT")
self._sys_cmd(['netstat', '-tnlp'])
def _check_nfs_server_status(self):
res = self._sys_cmd(['systemctl', 'status', 'nfs-server'])
if isinstance(res, bytes) and b'Active: active' in res:
self._disable_nfs()
def _disable_nfs(self):
log.info("Disabling NFS")
self._sys_cmd(['systemctl', 'disable', 'nfs-server', '--now'])
def _check_nfs_status(self):
return self._orch_cmd('ls', 'nfs')
def _check_idempotency(self, *args):
for _ in range(2):
self._nfs_cmd(*args)
def test_create_cluster(self):
self._check_nfs_server_status()
self._nfs_cmd("cluster", "create", self.export_type, self.cluster_id)
time.sleep(8)
orch_output = self._check_nfs_status()
expected_status = '1/1'
try:
if self.expected_name not in orch_output or expected_status not in orch_output:
raise CommandFailedError("NFS Ganesha cluster could not be deployed")
except (TypeError, CommandFailedError):
raise
def test_create_cluster_idempotent(self):
self._check_nfs_server_status()
self._check_idempotency("cluster", "create", self.export_type, self.cluster_id)
def test_delete_cluster(self):
self.test_create_cluster()
self._nfs_cmd("cluster", "delete", self.cluster_id)
time.sleep(8)
orch_output = self._check_nfs_status()
self.assertEqual("No services reported\n", orch_output)
def test_delete_cluster_idempotent(self):
self._check_idempotency("cluster", "delete", self.cluster_id)