Merge pull request #25912 from sebastian-philipp/orchestrator-drive-group

mgr/orchestrator: Extend DriveGroupSpec
This commit is contained in:
Sebastian Wagner 2019-01-18 14:59:48 +01:00 committed by GitHub
commit ad20b27034
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 260 additions and 57 deletions

View File

@ -144,7 +144,7 @@ OSD management
.. automethod:: Orchestrator.create_osds
.. automethod:: Orchestrator.replace_osds
.. automethod:: Orchestrator.remove_osds
.. autoclass:: OsdCreationSpec
.. autoclass:: DeviceSelection
.. autoclass:: DriveGroupSpec
Upgrades

View File

@ -1,3 +1,4 @@
add_subdirectory(dashboard)
add_subdirectory(insights)
add_subdirectory(ansible)
add_subdirectory(orchestrator_cli)

View File

@ -276,7 +276,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
return ansible_operation
def create_osds(self, osd_spec):
def create_osds(self, drive_group, all_hosts):
"""
Create one or more OSDs within a single Drive Group.
@ -285,7 +285,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
finer-grained OSD feature enablement (choice of backing store,
compression/encryption, etc).
:param osd_spec: OsdCreationSpec
:param osd_spec: DriveGroupSpec
"""
def verify_config(self):

View File

@ -4,15 +4,15 @@ ceph-mgr orchestrator interface
Please see the ceph-mgr module developer's guide for more information.
"""
import time
try:
from typing import TypeVar, Generic, List
from typing import TypeVar, Generic, List, Optional, Union
T = TypeVar('T')
G = Generic[T]
except ImportError:
T, G = object, object
import time
class _Completion(G):
@property
@ -183,6 +183,32 @@ class Orchestrator(object):
"""
raise NotImplementedError()
def add_host(self, host):
# type: (str) -> WriteCompletion
"""
Add a host to the orchestrator inventory.
:param host: hostname
"""
raise NotImplementedError()
def remote_host(self, host):
# type: (str) -> WriteCompletion
"""
Remove a host from the orchestrator inventory.
:param host: hostname
"""
raise NotImplementedError()
def get_hosts(self):
# type: () -> ReadCompletion[List[InventoryNode]]
"""
Report the hosts in the cluster.
The default implementation is extra slow.
:return: list of InventoryNodes
"""
return self.get_inventory()
def get_inventory(self, node_filter=None):
# type: (InventoryFilter) -> ReadCompletion[List[InventoryNode]]
"""
@ -229,8 +255,8 @@ class Orchestrator(object):
assert not (service_name and service_id)
raise NotImplementedError()
def create_osds(self, osd_spec):
# type: (OsdCreationSpec) -> WriteCompletion
def create_osds(self, drive_group, all_hosts):
# type: (DriveGroupSpec, List[str]) -> WriteCompletion
"""
Create one or more OSDs within a single Drive Group.
@ -239,12 +265,13 @@ class Orchestrator(object):
finer-grained OSD feature enablement (choice of backing store,
compression/encryption, etc).
:param osd_spec: OsdCreationSpec
:param drive_group: DriveGroupSpec
:param all_hosts: TODO, this is required because the orchestrator methods are not composable
"""
raise NotImplementedError()
def replace_osds(self, osd_spec):
# type: (OsdCreationSpec) -> WriteCompletion
def replace_osds(self, drive_group):
# type: (DriveGroupSpec) -> WriteCompletion
"""
Like create_osds, but the osd_id_claims must be fully
populated.
@ -459,38 +486,94 @@ class ServiceDescription(object):
return {k: v for (k, v) in out.items() if v is not None}
class DeviceSelection(object):
def __init__(self, paths=None, id_model=None, size=None, rotates=None, count=None):
# type: (List[str], str, str, bool, int) -> None
"""
ephemeral drive group device specification
:param paths: abs paths to the devices.
:param id_model: A wildcard string. e.g: "SDD*"
:param size: Size specification of format LOW:HIGH.
Can also take the the form :HIGH, LOW:
or an exact value (as ceph-volume inventory reports)
:param rotates: is the drive rotating or not
:param count: if this is present limit the number of drives to this number.
Any attributes (even none) can be included in the device
specification structure.
TODO: translate from the user interface (Drive Groups) to an actual list of devices.
"""
if paths is None:
paths = []
self.paths = paths # type: List[str]
if self.paths and any(p is not None for p in [id_model, size, rotates, count]):
raise TypeError('`paths` and other parameters are mutually exclusive')
self.id_model = id_model
self.size = size
self.rotates = rotates
self.count = count
@classmethod
def from_json(cls, device_spec):
return cls(**device_spec)
class DriveGroupSpec(object):
"""
Describe a drive group in the same form that ceph-volume
understands.
"""
def __init__(self, devices):
self.devices = devices
def __init__(self, host_pattern, data_devices, db_devices=None, wal_devices=None, journal_devices=None,
osds_per_device=None, objectstore='bluestore', encrypted=False, db_slots=None,
wal_slots=None):
# type: (str, DeviceSelection, Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], int, str, bool, int, int) -> ()
# concept of applying a drive group to a (set) of hosts is tightly
# linked to the drive group itself
#
# An fnmatch pattern to select hosts. Can also be a single host.
self.host_pattern = host_pattern
class OsdCreationSpec(object):
"""
Used during OSD creation.
self.data_devices = data_devices
self.db_devices = db_devices
self.wal_devices = wal_devices
self.journal_devices = journal_devices
The drive names used here may be ephemeral.
"""
def __init__(self):
self.format = None # filestore, bluestore
# Number of osd daemons per "DATA" device.
# To fully utilize nvme devices multiple osds are required.
self.osds_per_device = osds_per_device
self.node = None # name of a node
assert objectstore in ('filestore', 'bluestore')
self.objectstore = objectstore
# List of device names
self.drive_group = None
self.encrypted = encrypted
self.db_slots = db_slots
self.wal_slots = wal_slots
# FIXME: needs ceph-volume support
# Optional: mapping of drive to OSD ID, used when the
# created OSDs are meant to replace previous OSDs on
# the same node.
self.osd_id_claims = {}
# Arbitrary JSON-serializable object.
# Maybe your orchestrator knows how to do something
# special like encrypting drives
self.extended = {}
@classmethod
def from_json(self, json_drive_group):
"""
Initialize and verify 'Drive group' structure
:param json_drive_group: A valid json string with a Drive Group
specification
"""
args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in
json_drive_group.items()}
return DriveGroupSpec(**args)
def hosts(self, all_hosts):
import fnmatch
return fnmatch.filter(all_hosts, self.host_pattern)
class StatelessServiceSpec(object):

View File

@ -0,0 +1,7 @@
set(MGR_ORCHESTRATOR_CLI_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-orchestrator_cli-virtualenv)
add_custom_target(mgr-orchestrator_cli-test-venv
COMMAND ${CMAKE_SOURCE_DIR}/src/tools/setup-virtualenv.sh --python=${MGR_PYTHON_EXECUTABLE} ${MGR_ORCHESTRATOR_CLI_VIRTUALENV}
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/src/pybind/mgr/orchestrator_cli
COMMENT "orchestrator_cli tests virtualenv is being created")
add_dependencies(tests mgr-orchestrator_cli-test-venv)

View File

@ -1,2 +1,10 @@
from __future__ import absolute_import
import os
if 'UNITTEST' not in os.environ:
from .module import OrchestratorCli
else:
import sys
import mock
sys.path.append("..")
sys.modules['ceph_module'] = mock.Mock()

View File

@ -224,12 +224,15 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
return HandleCommandResult(-errno.EINVAL,
stderr="Invalid device spec, should be <node>:<device>")
spec = orchestrator.OsdCreationSpec()
spec.node = node_name
spec.format = "bluestore"
spec.drive_group = orchestrator.DriveGroupSpec([block_device])
devs = orchestrator.DeviceSelection(paths=block_device)
spec = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
completion = self.create_osds(spec)
# TODO: Remove this and make the orchestrator composable
host_completion = self.get_hosts()
self.wait([host_completion])
all_hosts = [h.name for h in host_completion.result]
completion = self.create_osds(spec, all_hosts)
self._orchestrator_wait([completion])
return HandleCommandResult()

View File

@ -0,0 +1,32 @@
#!/usr/bin/env bash
# run from ./ or from ../
: ${MGR_ORCHESTRATOR_CLI_VIRTUALENV:=/tmp/mgr-orchestrator_cli-virtualenv}
: ${WITH_PYTHON2:=ON}
: ${WITH_PYTHON3:=ON}
: ${CEPH_BUILD_DIR:=$PWD/.tox}
test -d orchestrator_cli && cd orchestrator_cli
if [ -e tox.ini ]; then
TOX_PATH=$(readlink -f tox.ini)
else
TOX_PATH=$(readlink -f $(dirname $0)/tox.ini)
fi
# tox.ini will take care of this.
unset PYTHONPATH
export CEPH_BUILD_DIR=$CEPH_BUILD_DIR
if [ -f ${MGR_ORCHESTRATOR_CLI_VIRTUALENV}/bin/activate ]
then
source ${MGR_ORCHESTRATOR_CLI_VIRTUALENV}/bin/activate
fi
if [ "$WITH_PYTHON2" = "ON" ]; then
ENV_LIST+="py27"
fi
if [ "$WITH_PYTHON3" = "ON" ]; then
ENV_LIST+=",py3"
fi
tox -c ${TOX_PATH} -e ${ENV_LIST}

View File

@ -0,0 +1,36 @@
from __future__ import absolute_import
import pytest
from orchestrator import DriveGroupSpec, DeviceSelection
def test_DriveGroup():
dg_json = {
'host_pattern': 'hostname',
'data_devices': {'paths': ['/dev/sda']}
}
dg = DriveGroupSpec.from_json(dg_json)
assert dg.hosts(['hostname']) == ['hostname']
assert dg.data_devices.paths == ['/dev/sda']
def test_DriveGroup_fail():
with pytest.raises(TypeError):
DriveGroupSpec.from_json({})
def test_drivegroup_pattern():
dg = DriveGroupSpec('node[1-3]', DeviceSelection())
assert dg.hosts(['node{}'.format(i) for i in range(10)]) == ['node1', 'node2', 'node3']
def test_drive_selection():
devs = DeviceSelection(paths=['/dev/sda'])
spec = DriveGroupSpec('node_name', data_devices=devs)
assert spec.data_devices.paths == ['/dev/sda']
with pytest.raises(TypeError, match='exclusive'):
DeviceSelection(paths=['/dev/sda'], rotates=False)

View File

@ -0,0 +1,18 @@
[tox]
envlist = py27,py3
skipsdist = true
toxworkdir = {env:CEPH_BUILD_DIR}/orchestrator_cli
minversion = 2.5
[testenv]
deps =
pytest
mock
requests-mock
setenv=
UNITTEST = true
py27: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.2
py3: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.3
commands=
{envbindir}/py.test .

View File

@ -2,10 +2,10 @@ import threading
import functools
import os
import uuid
from mgr_module import MgrModule
import orchestrator
try:
from typing import List
except ImportError:
pass # just for type checking
try:
from kubernetes import client, config
@ -17,6 +17,9 @@ except ImportError:
client = None
config = None
from mgr_module import MgrModule
import orchestrator
from .rook_cluster import RookCluster
@ -390,11 +393,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
lambda: self.rook_cluster.rm_service(service_type, service_id), None,
"Removing {0} services for {1}".format(service_type, service_id))
def create_osds(self, spec):
# Validate spec.node
if not self.rook_cluster.node_exists(spec.node):
def create_osds(self, drive_group, all_hosts):
# type: (orchestrator.DriveGroupSpec, List[str]) -> RookWriteCompletion
assert len(drive_group.hosts(all_hosts)) == 1
if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
raise RuntimeError("Node '{0}' is not in the Kubernetes "
"cluster".format(spec.node))
"cluster".format(drive_group.hosts(all_hosts)))
# Validate whether cluster CRD can accept individual OSD
# creations (i.e. not useAllDevices)
@ -403,7 +408,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
"support OSD creation.")
def execute():
self.rook_cluster.add_osds(spec)
self.rook_cluster.add_osds(drive_group, all_hosts)
def is_complete():
# Find OSD pods on this host
@ -411,7 +416,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
pods = self._k8s.list_namespaced_pod("rook-ceph",
label_selector="rook_cluster=rook-ceph,app=rook-ceph-osd",
field_selector="spec.nodeName={0}".format(
spec.node
drive_group.hosts(all_hosts)[0]
)).items
for p in pods:
pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
@ -426,7 +431,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
continue
metadata = self.get_metadata('osd', "%s" % osd_id)
if metadata and metadata['devices'] in spec.drive_group.devices:
if metadata and metadata['devices'] in drive_group.data_devices.paths:
found.append(osd_id)
else:
self.log.info("ignoring osd {0} {1}".format(
@ -437,6 +442,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
return RookWriteCompletion(execute, is_complete,
"Creating OSD on {0}:{1}".format(
spec.node,
spec.drive_group.devices
drive_group.hosts(all_hosts)[0],
drive_group.data_devices.paths
))

View File

@ -19,6 +19,12 @@ try:
except ImportError:
ApiException = None
try:
import orchestrator
except ImportError:
pass # just used for type checking.
ROOK_SYSTEM_NS = "rook-ceph-system"
ROOK_API_VERSION = "v1"
ROOK_API_NAME = "ceph.rook.io/%s" % ROOK_API_VERSION
@ -351,16 +357,15 @@ class RookCluster(object):
else:
return True
def add_osds(self, spec):
def add_osds(self, drive_group, all_hosts):
# type: (orchestrator.DriveGroupSpec, List[str]) -> None
"""
Rook currently (0.8) can only do single-drive OSDs, so we
treat all drive groups as just a list of individual OSDs.
"""
# assert isinstance(spec, orchestrator.OsdSpec)
block_devices = drive_group.data_devices
block_devices = spec.drive_group.devices
assert spec.format in ("bluestore", "filestore")
assert drive_group.objectstore in ("bluestore", "filestore")
# The CRD looks something like this:
# nodes:
@ -386,13 +391,13 @@ class RookCluster(object):
current_nodes = current_cluster['spec']['storage'].get('nodes', [])
if spec.node not in [n['name'] for n in current_nodes]:
if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]:
patch.append({
"op": "add", "path": "/spec/storage/nodes/-", "value": {
"name": spec.node,
"name": drive_group.hosts(all_hosts)[0],
"devices": [{'name': d} for d in block_devices],
"storeConfig": {
"storeType": spec.format
"storeType": drive_group.objectstore
}
}
})
@ -401,7 +406,7 @@ class RookCluster(object):
node_idx = None
current_node = None
for i, c in enumerate(current_nodes):
if c['name'] == spec.node:
if c['name'] == drive_group.hosts(all_hosts)[0]:
current_node = c
node_idx = i
break

View File

@ -246,8 +246,8 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
def add_stateless_service(self, service_type, spec):
raise NotImplementedError(service_type)
def create_osds(self, spec):
raise NotImplementedError(str(spec))
def create_osds(self, drive_group, all_hosts):
raise NotImplementedError(str(drive_group))
def service_action(self, action, service_type, service_name=None, service_id=None):
return TestWriteCompletion(

View File

@ -564,6 +564,11 @@ if(WITH_MGR)
list(APPEND tox_tests run-tox-mgr-ansible)
set(MGR_ANSIBLE_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-ansible-virtualenv)
list(APPEND env_vars_for_tox_tests MGR_ANSIBLE_VIRTUALENV=${MGR_ANSIBLE_VIRTUALENV})
add_test(NAME run-tox-mgr-orchestrator_cli COMMAND bash ${CMAKE_SOURCE_DIR}/src/pybind/mgr/orchestrator_cli/run-tox.sh)
list(APPEND tox_tests run-tox-mgr-orchestrator_cli)
set(MGR_ORCHESTRATOR_CLI_VIRTUALENV ${CEPH_BUILD_VIRTUALENV}/mgr-orchestrator_cli-virtualenv)
list(APPEND env_vars_for_tox_tests MGR_ORCHESTRATOR_CLI_VIRTUALENV=${MGR_ORCHESTRATOR_CLI_VIRTUALENV})
endif()
set_property(