mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
mgr/test_orchestrator: Allow initializing dummy data
- Add test_orchestrator load_data command - Add QA tests for loading data Fixes: https://tracker.ceph.com/issues/41151 Signed-off-by: Kiefer Chang <kiefer.chang@suse.com>
This commit is contained in:
parent
f0e2c59e8c
commit
be8edd2aec
@ -26,6 +26,9 @@ class TestOrchestratorCli(MgrTestCase):
|
||||
"""
|
||||
return self.mgr_cluster.mon_manager.raw_cluster_cmd_result("orchestrator", *args, **kwargs)
|
||||
|
||||
def _test_orchestrator_cmd_result(self, *args, **kwargs):
|
||||
return self.mgr_cluster.mon_manager.raw_cluster_cmd_result("test_orchestrator", *args, **kwargs)
|
||||
|
||||
def setUp(self):
|
||||
super(TestOrchestratorCli, self).setUp()
|
||||
|
||||
@ -153,3 +156,79 @@ class TestOrchestratorCli(MgrTestCase):
|
||||
evs = json.loads(self._progress_cmd('json'))['completed']
|
||||
self.assertEqual(len(evs), 1)
|
||||
self.assertIn('update_mgrs', evs[0]['message'])
|
||||
|
||||
def test_load_data(self):
|
||||
data = {
|
||||
'inventory': [
|
||||
{
|
||||
'name': 'host0',
|
||||
'devices': [
|
||||
{
|
||||
'type': 'hdd',
|
||||
'id': '/dev/sda',
|
||||
'size': 1024**4 * 4,
|
||||
'rotates': True
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
'name': 'host1',
|
||||
'devices': [
|
||||
{
|
||||
'type': 'hdd',
|
||||
'id': '/dev/sda',
|
||||
'size': 1024**4 * 4,
|
||||
'rotates': True
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
'services': [
|
||||
{
|
||||
'nodename': 'host0',
|
||||
'service_type': 'mon',
|
||||
'service_instance': 'a'
|
||||
},
|
||||
{
|
||||
'nodename': 'host1',
|
||||
'service_type': 'osd',
|
||||
'service_instance': '1'
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
ret = self._test_orchestrator_cmd_result('load_data', '-i', '-', stdin=json.dumps(data))
|
||||
self.assertEqual(ret, 0)
|
||||
out = self._orch_cmd('device', 'ls', '--format=json')
|
||||
inventory = data['inventory']
|
||||
inventory_result = json.loads(out)
|
||||
self.assertEqual(len(inventory), len(inventory_result))
|
||||
|
||||
out = self._orch_cmd('device', 'ls', 'host0', '--format=json')
|
||||
inventory_result = json.loads(out)
|
||||
self.assertEqual(len(inventory_result), 1)
|
||||
self.assertEqual(inventory_result[0]['name'], 'host0')
|
||||
|
||||
out = self._orch_cmd('service', 'ls', '--format=json')
|
||||
services = data['services']
|
||||
services_result = json.loads(out)
|
||||
self.assertEqual(len(services), len(services_result))
|
||||
|
||||
out = self._orch_cmd('service', 'ls', 'host0', '--format=json')
|
||||
services_result = json.loads(out)
|
||||
self.assertEqual(len(services_result), 1)
|
||||
self.assertEqual(services_result[0]['nodename'], 'host0')
|
||||
|
||||
# test invalid input file: invalid json
|
||||
json_str = '{ "inventory: '
|
||||
ret = self._test_orchestrator_cmd_result('load_data', '-i', '-', stdin=json_str)
|
||||
self.assertEqual(ret, errno.EINVAL)
|
||||
|
||||
# test invalid input file: missing key
|
||||
json_str = '{ "inventory": [{"devices": []}] }'
|
||||
ret = self._test_orchestrator_cmd_result('load_data', '-i', '-', stdin=json_str)
|
||||
self.assertEqual(ret, errno.EINVAL)
|
||||
|
||||
# load empty data for other tests
|
||||
ret = self._test_orchestrator_cmd_result('load_data', '-i', '-', stdin='{}')
|
||||
self.assertEqual(ret, 0)
|
||||
|
@ -4,9 +4,11 @@ ceph-mgr orchestrator interface
|
||||
|
||||
Please see the ceph-mgr module developer's guide for more information.
|
||||
"""
|
||||
import copy
|
||||
import sys
|
||||
import time
|
||||
import fnmatch
|
||||
from functools import wraps
|
||||
import uuid
|
||||
import string
|
||||
import random
|
||||
@ -573,6 +575,17 @@ class PlacementSpec(object):
|
||||
self.label = None
|
||||
|
||||
|
||||
def handle_type_error(method):
|
||||
@wraps(method)
|
||||
def inner(cls, *args, **kwargs):
|
||||
try:
|
||||
return method(cls, *args, **kwargs)
|
||||
except TypeError as e:
|
||||
error_msg = '{}: {}'.format(cls.__name__, e)
|
||||
raise OrchestratorValidationError(error_msg)
|
||||
return inner
|
||||
|
||||
|
||||
class ServiceDescription(object):
|
||||
"""
|
||||
For responding to queries about the status of a particular service,
|
||||
@ -649,6 +662,7 @@ class ServiceDescription(object):
|
||||
return {k: v for (k, v) in out.items() if v is not None}
|
||||
|
||||
@classmethod
|
||||
@handle_type_error
|
||||
def from_json(cls, data):
|
||||
return cls(**data)
|
||||
|
||||
@ -850,6 +864,11 @@ class InventoryDevice(object):
|
||||
available=self.available, dev_id=self.dev_id,
|
||||
extended=self.extended)
|
||||
|
||||
@classmethod
|
||||
@handle_type_error
|
||||
def from_json(cls, data):
|
||||
return cls(**data)
|
||||
|
||||
@classmethod
|
||||
def from_ceph_volume_inventory(cls, data):
|
||||
# TODO: change InventoryDevice itself to mirror c-v inventory closely!
|
||||
@ -906,6 +925,21 @@ class InventoryNode(object):
|
||||
def to_json(self):
|
||||
return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data):
|
||||
try:
|
||||
_data = copy.deepcopy(data)
|
||||
name = _data.pop('name')
|
||||
devices = [InventoryDevice.from_json(device)
|
||||
for device in _data.pop('devices')]
|
||||
if _data:
|
||||
error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
|
||||
raise OrchestratorValidationError(error_msg)
|
||||
return cls(name, devices)
|
||||
except KeyError as e:
|
||||
error_msg = '{} is required for {}'.format(e, cls.__name__)
|
||||
raise OrchestratorValidationError(error_msg)
|
||||
|
||||
@classmethod
|
||||
def from_nested_items(cls, hosts):
|
||||
devs = InventoryDevice.from_ceph_volume_inventory_list
|
||||
|
@ -1,16 +1,58 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from orchestrator import ReadCompletion, raise_if_exception, RGWSpec
|
||||
from orchestrator import InventoryNode, InventoryDevice, ServiceDescription
|
||||
from orchestrator import OrchestratorValidationError
|
||||
|
||||
from orchestrator import InventoryDevice, ReadCompletion, raise_if_exception, RGWSpec
|
||||
|
||||
def test_inventory_device():
|
||||
i_d = InventoryDevice()
|
||||
s = i_d.pretty_print()
|
||||
assert len(s)
|
||||
def _test_resource(data, resource_class, extra):
|
||||
# create the instance with normal way
|
||||
rsc = resource_class(**data)
|
||||
if hasattr(rsc, 'pretty_print'):
|
||||
assert rsc.pretty_print()
|
||||
|
||||
# ensure we can deserialize and serialize
|
||||
rsc = resource_class.from_json(data)
|
||||
rsc.to_json()
|
||||
|
||||
# if there is an unexpected data provided
|
||||
data.update(extra)
|
||||
with pytest.raises(OrchestratorValidationError):
|
||||
resource_class.from_json(data)
|
||||
|
||||
|
||||
def test_inventory():
|
||||
json_data = {
|
||||
'name': 'host0',
|
||||
'devices': [
|
||||
{
|
||||
'type': 'hdd',
|
||||
'id': '/dev/sda',
|
||||
'size': 1024,
|
||||
'rotates': True
|
||||
}
|
||||
]
|
||||
}
|
||||
_test_resource(json_data, InventoryNode, {'abc': False})
|
||||
for devices in json_data['devices']:
|
||||
_test_resource(devices, InventoryDevice, {'abc': False})
|
||||
|
||||
json_data = [{}, {'name': 'host0'}, {'devices': []}]
|
||||
for data in json_data:
|
||||
with pytest.raises(OrchestratorValidationError):
|
||||
InventoryNode.from_json(data)
|
||||
|
||||
|
||||
def test_service_description():
|
||||
json_data = {
|
||||
'nodename': 'test',
|
||||
'service_type': 'mon',
|
||||
'service_instance': 'a'
|
||||
}
|
||||
_test_resource(json_data, ServiceDescription, {'abc': False})
|
||||
|
||||
|
||||
def test_raise():
|
||||
|
@ -1,3 +1,4 @@
|
||||
import errno
|
||||
import json
|
||||
import re
|
||||
import os
|
||||
@ -6,6 +7,9 @@ import functools
|
||||
import uuid
|
||||
from subprocess import check_output, CalledProcessError
|
||||
|
||||
import six
|
||||
|
||||
from mgr_module import CLICommand, HandleCommandResult
|
||||
from mgr_module import MgrModule, PersistentStoreDict
|
||||
|
||||
import orchestrator
|
||||
@ -123,6 +127,18 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
|
||||
return all(c.is_complete for c in completions)
|
||||
|
||||
@CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
|
||||
def _load_data(self, inbuf):
|
||||
try:
|
||||
data = json.loads(inbuf)
|
||||
self._init_data(data)
|
||||
return HandleCommandResult()
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
msg = 'Invalid JSON file: {}'.format(e)
|
||||
return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
|
||||
except orchestrator.OrchestratorValidationError as e:
|
||||
return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
|
||||
|
||||
def available(self):
|
||||
return True, ""
|
||||
|
||||
@ -131,6 +147,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
|
||||
self._initialized = threading.Event()
|
||||
self._shutdown = threading.Event()
|
||||
self._init_data({})
|
||||
|
||||
def shutdown(self):
|
||||
self._shutdown.set()
|
||||
@ -150,6 +167,12 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
|
||||
self._shutdown.wait(5)
|
||||
|
||||
def _init_data(self, data=None):
|
||||
self._inventory = [orchestrator.InventoryNode.from_json(inventory_node)
|
||||
for inventory_node in data.get('inventory', [])]
|
||||
self._services = [orchestrator.ServiceDescription.from_json(service)
|
||||
for service in data.get('services', [])]
|
||||
|
||||
@deferred_read
|
||||
def get_inventory(self, node_filter=None, refresh=False):
|
||||
"""
|
||||
@ -157,6 +180,13 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
"""
|
||||
if node_filter and node_filter.nodes is not None:
|
||||
assert isinstance(node_filter.nodes, list)
|
||||
|
||||
if self._inventory:
|
||||
if node_filter:
|
||||
return list(filter(lambda node: node.name in node_filter.nodes,
|
||||
self._inventory))
|
||||
return self._inventory
|
||||
|
||||
try:
|
||||
c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
|
||||
except OSError:
|
||||
@ -186,7 +216,13 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
it returns the mgr we're running in.
|
||||
"""
|
||||
if service_type:
|
||||
assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported"
|
||||
support_services = ("mds", "osd", "mon", "rgw", "mgr", "iscsi")
|
||||
assert service_type in support_services, service_type + " unsupported"
|
||||
|
||||
if self._services:
|
||||
if node_name:
|
||||
return list(filter(lambda svc: svc.nodename == node_name, self._services))
|
||||
return self._services
|
||||
|
||||
out = map(str, check_output(['ps', 'aux']).splitlines())
|
||||
types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
|
||||
@ -245,6 +281,8 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
|
||||
@deferred_read
|
||||
def get_hosts(self):
|
||||
if self._inventory:
|
||||
return self._inventory
|
||||
return [orchestrator.InventoryNode('localhost', [])]
|
||||
|
||||
@deferred_write("add_host")
|
||||
@ -259,11 +297,11 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
|
||||
raise orchestrator.NoOrchestrator()
|
||||
if host == 'raise_import_error':
|
||||
raise ImportError("test_orchestrator not enabled")
|
||||
assert isinstance(host, str)
|
||||
assert isinstance(host, six.string_types)
|
||||
|
||||
@deferred_write("remove_host")
|
||||
def remove_host(self, host):
|
||||
assert isinstance(host, str)
|
||||
assert isinstance(host, six.string_types)
|
||||
|
||||
@deferred_write("update_mgrs")
|
||||
def update_mgrs(self, num, hosts):
|
||||
|
Loading…
Reference in New Issue
Block a user