Merge pull request #36969 from votdev/issue_46666_container_spec

cephadm: Introduce 'container' specification to deploy custom containers
This commit is contained in:
Joshua Schmid 2020-09-18 10:50:30 +02:00 committed by GitHub
commit a0065d4a95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 804 additions and 169 deletions

View File

@ -412,6 +412,81 @@ Service Commands::
ceph orch <start|stop|restart|redeploy|reconfig> <service_name>
Deploying custom containers
===========================
The orchestrator enables custom containers to be deployed using a YAML file.
A corresponding :ref:`orchestrator-cli-service-spec` must look like:
.. code-block:: yaml
service_type: container
service_id: foo
placement:
...
image: docker.io/library/foo:latest
entrypoint: /usr/bin/foo
uid: 1000
gid: 1000
args:
- "--net=host"
- "--cpus=2"
ports:
- 8080
- 8443
envs:
- SECRET=mypassword
- PORT=8080
- PUID=1000
- PGID=1000
volume_mounts:
CONFIG_DIR: /etc/foo
bind_mounts:
- ['type=bind', 'source=lib/modules', 'destination=/lib/modules', 'ro=true']
dirs:
- CONFIG_DIR
files:
CONFIG_DIR/foo.conf:
- refresh=true
- username=xyz
where the properties of a service specification are:
* ``service_id``
A unique name of the service.
* ``image``
The name of the Docker image.
* ``uid``
The UID to use when creating directories and files in the host system.
* ``gid``
The GID to use when creating directories and files in the host system.
* ``entrypoint``
Overwrite the default ENTRYPOINT of the image.
* ``args``
A list of additional Podman/Docker command line arguments.
* ``ports``
A list of TCP ports to open in the host firewall.
* ``envs``
A list of environment variables.
* ``bind_mounts``
When you use a bind mount, a file or directory on the host machine
is mounted into the container. Relative `source=...` paths will be
located below `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
* ``volume_mounts``
When you use a volume mount, a new directory is created within
Dockers storage directory on the host machine, and Docker manages
that directorys contents. Relative source paths will be located below
`/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
* ``dirs``
A list of directories that are created below
`/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
* ``files``
A dictionary, where the key is the relative path of the file and the
value the file content. The content must be double quoted when using
a string. Use '\n' for line breaks in that case. Otherwise define
multi-line content as list of strings. The given files will be created
below the directory `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
.. _orchestrator-cli-service-spec:
Service Specification
@ -429,25 +504,28 @@ to specify the deployment of services. For example:
- host1
- host2
- host3
spec: ...
unmanaged: false
...
where the properties of a service specification are:
* ``service_type`` is the type of the service. Needs to be either a Ceph
service (``mon``, ``crash``, ``mds``, ``mgr``, ``osd`` or
``rbd-mirror``), a gateway (``nfs`` or ``rgw``), or part of the
monitoring stack (``alertmanager``, ``grafana``, ``node-exporter`` or
``prometheus``)
* ``service_id`` is the name of the service
* ``placement`` is a :ref:`orchestrator-cli-placement-spec`
* ``spec``: additional specifications for a specific service
* ``unmanaged``: If set to ``true``, the orchestrator will not deploy nor
remove any daemon associated with this service. Placement and all other
properties will be ignored. This is useful, if this service should not
be managed temporarily.
* ``service_type``
The type of the service. Needs to be either a Ceph
service (``mon``, ``crash``, ``mds``, ``mgr``, ``osd`` or
``rbd-mirror``), a gateway (``nfs`` or ``rgw``), part of the
monitoring stack (``alertmanager``, ``grafana``, ``node-exporter`` or
``prometheus``) or (``container``) for custom containers.
* ``service_id``
The name of the service.
* ``placement``
See :ref:`orchestrator-cli-placement-spec`.
* ``unmanaged``
If set to ``true``, the orchestrator will not deploy nor
remove any daemon associated with this service. Placement and all other
properties will be ignored. This is useful, if this service should not
be managed temporarily.
Each service type can have different requirements for the ``spec`` element.
Each service type can have additional service specific properties.
Service specifications of type ``mon``, ``mgr``, and the monitoring
types do not require a ``service_id``.
@ -670,6 +748,7 @@ This is an overview of the current implementation status of the orchestrators.
apply osd ✔ ✔
apply rbd-mirror ✔ ✔
apply rgw ⚪ ✔
apply container ⚪ ✔
host add ⚪ ✔
host ls ✔ ✔
host rm ⚪ ✔

View File

@ -371,6 +371,23 @@ is_available "nfs" "$cond" 10
$CEPHADM shell --fsid $FSID --config $CONFIG --keyring $KEYRING -- \
ceph orch resume
# add alertmanager via custom container
alertmanager_image=$(cat ${CEPHADM_SAMPLES_DIR}/custom_container.json | jq -r '.image')
tcp_ports=$(cat ${CEPHADM_SAMPLES_DIR}/custom_container.json | jq -r '.ports | map_values(.|tostring) | join(" ")')
cat ${CEPHADM_SAMPLES_DIR}/custom_container.json | \
${CEPHADM//--image $IMAGE_MASTER/} \
--image $alertmanager_image \
deploy \
--tcp-ports "$tcp_ports" \
--name container.alertmanager.a \
--fsid $FSID \
--config-json -
cond="$CEPHADM enter --fsid $FSID --name container.alertmanager.a -- test -f \
/etc/alertmanager/alertmanager.yml"
is_available "alertmanager.yml" "$cond" 10
cond="curl 'http://localhost:9093' | grep -q 'Alertmanager'"
is_available "alertmanager" "$cond" 10
## run
# WRITE ME

View File

@ -37,7 +37,6 @@ You can invoke cephadm in two ways:
injected_stdin = '...'
"""
import argparse
import datetime
import fcntl
@ -64,6 +63,8 @@ try:
from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
except ImportError:
pass
import re
import uuid
from functools import wraps
@ -232,17 +233,12 @@ class NFSGanesha(object):
self.daemon_id = daemon_id
self.image = image
def json_get(key, default=None, require=False):
if require and not key in config_json.keys():
raise Error('{} missing from config-json'.format(key))
return config_json.get(key, default)
# config-json options
self.pool = json_get('pool', require=True)
self.namespace = json_get('namespace')
self.userid = json_get('userid')
self.extra_args = json_get('extra_args', [])
self.files = json_get('files', {})
self.pool = dict_get(config_json, 'pool', require=True)
self.namespace = dict_get(config_json, 'namespace')
self.userid = dict_get(config_json, 'userid')
self.extra_args = dict_get(config_json, 'extra_args', [])
self.files = dict_get(config_json, 'files', {})
# validate the supplied args
self.validate()
@ -312,14 +308,6 @@ class NFSGanesha(object):
# type: () -> List[str]
return self.daemon_args + self.extra_args
def get_file_content(self, fname):
# type: (str) -> str
"""Normalize the json file content into a string"""
content = self.files.get(fname)
if isinstance(content, list):
content = '\n'.join(content)
return content
def create_daemon_dirs(self, data_dir, uid, gid):
# type: (str, int, int) -> None
"""Create files under the container data dir"""
@ -335,7 +323,7 @@ class NFSGanesha(object):
# populate files from the config-json
for fname in self.files:
config_file = os.path.join(config_dir, fname)
config_content = self.get_file_content(fname)
config_content = dict_get_join(self.files, fname)
logger.info('Write file: %s' % (config_file))
with open(config_file, 'w') as f:
os.fchown(f.fileno(), uid, gid)
@ -391,13 +379,8 @@ class CephIscsi(object):
self.daemon_id = daemon_id
self.image = image
def json_get(key, default=None, require=False):
if require and not key in config_json.keys():
raise Error('{} missing from config-json'.format(key))
return config_json.get(key, default)
# config-json options
self.files = json_get('files', {})
self.files = dict_get(config_json, 'files', {})
# validate the supplied args
self.validate()
@ -467,14 +450,6 @@ class CephIscsi(object):
cname = '%s-%s' % (cname, desc)
return cname
def get_file_content(self, fname):
# type: (str) -> str
"""Normalize the json file content into a string"""
content = self.files.get(fname)
if isinstance(content, list):
content = '\n'.join(content)
return content
def create_daemon_dirs(self, data_dir, uid, gid):
# type: (str, int, int) -> None
"""Create files under the container data dir"""
@ -488,7 +463,7 @@ class CephIscsi(object):
# populate files from the config-json
for fname in self.files:
config_file = os.path.join(data_dir, fname)
config_content = self.get_file_content(fname)
config_content = dict_get_join(self.files, fname)
logger.info('Write file: %s' % (config_file))
with open(config_file, 'w') as f:
os.fchown(f.fileno(), uid, gid)
@ -520,12 +495,165 @@ class CephIscsi(object):
##################################
class CustomContainer(object):
"""Defines a custom container"""
daemon_type = 'container'
def __init__(self, fsid: str, daemon_id: Union[int, str],
config_json: Dict, image: str) -> None:
self.fsid = fsid
self.daemon_id = daemon_id
self.image = image
# config-json options
self.entrypoint = dict_get(config_json, 'entrypoint')
self.uid = dict_get(config_json, 'uid', 65534) # nobody
self.gid = dict_get(config_json, 'gid', 65534) # nobody
self.volume_mounts = dict_get(config_json, 'volume_mounts', {})
self.args = dict_get(config_json, 'args', [])
self.envs = dict_get(config_json, 'envs', [])
self.privileged = dict_get(config_json, 'privileged', False)
self.bind_mounts = dict_get(config_json, 'bind_mounts', [])
self.ports = dict_get(config_json, 'ports', [])
self.dirs = dict_get(config_json, 'dirs', [])
self.files = dict_get(config_json, 'files', {})
@classmethod
def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer':
return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""
Create dirs/files below the container data directory.
"""
logger.info('Creating custom container configuration '
'dirs/files in {} ...'.format(data_dir))
if not os.path.isdir(data_dir):
raise OSError('data_dir is not a directory: %s' % data_dir)
for dir_path in self.dirs:
logger.info('Creating directory: {}'.format(dir_path))
dir_path = os.path.join(data_dir, dir_path.strip('/'))
makedirs(dir_path, uid, gid, 0o755)
for file_path in self.files:
logger.info('Creating file: {}'.format(file_path))
content = dict_get_join(self.files, file_path)
file_path = os.path.join(data_dir, file_path.strip('/'))
with open(file_path, 'w', encoding='utf-8') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(content)
def get_daemon_args(self) -> List[str]:
return []
def get_container_args(self) -> List[str]:
return self.args
def get_container_envs(self) -> List[str]:
return self.envs
def get_container_mounts(self, data_dir: str) -> Dict[str, str]:
"""
Get the volume mounts. Relative source paths will be located below
`/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
Example:
{
/foo/conf: /conf
foo/conf: /conf
}
becomes
{
/foo/conf: /conf
/var/lib/ceph/<cluster-fsid>/<daemon-name>/foo/conf: /conf
}
"""
mounts = {}
for source, destination in self.volume_mounts.items():
source = os.path.join(data_dir, source)
mounts[source] = destination
return mounts
def get_container_binds(self, data_dir: str) -> List[List[str]]:
"""
Get the bind mounts. Relative `source=...` paths will be located below
`/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
Example:
[
'type=bind',
'source=lib/modules',
'destination=/lib/modules',
'ro=true'
]
becomes
[
...
'source=/var/lib/ceph/<cluster-fsid>/<daemon-name>/lib/modules',
...
]
"""
binds = self.bind_mounts.copy()
for bind in binds:
for index, value in enumerate(bind):
match = re.match(r'^source=(.+)$', value)
if match:
bind[index] = 'source={}'.format(os.path.join(
data_dir, match.group(1)))
return binds
##################################
def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> Any:
"""
Helper function to get a key from a dictionary.
:param d: The dictionary to process.
:param key: The name of the key to get.
:param default: The default value in case the key does not
exist. Default is `None`.
:param require: Set to `True` if the key is required. An
exception will be raised if the key does not exist in
the given dictionary.
:return: Returns the value of the given key.
:raises: :exc:`self.Error` if the given key does not exist
and `require` is set to `True`.
"""
if require and key not in d.keys():
raise Error('{} missing from dict'.format(key))
return d.get(key, default)
##################################
def dict_get_join(d: Dict, key: str) -> Any:
"""
Helper function to get the value of a given key from a dictionary.
`List` values will be converted to a string by joining them with a
line break.
:param d: The dictionary to process.
:param key: The name of the key to get.
:return: Returns the value of the given key. If it was a `list`, it
will be joining with a line break.
"""
value = d.get(key)
if isinstance(value, list):
value = '\n'.join(map(str, value))
return value
##################################
def get_supported_daemons():
# type: () -> List[str]
supported_daemons = list(Ceph.daemons)
supported_daemons.extend(Monitoring.components)
supported_daemons.append(NFSGanesha.daemon_type)
supported_daemons.append(CephIscsi.daemon_type)
supported_daemons.append(CustomContainer.daemon_type)
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
@ -1582,6 +1710,9 @@ def get_daemon_args(fsid, daemon_type, daemon_id):
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(fsid, daemon_id)
r.extend(cc.get_daemon_args())
return r
@ -1598,6 +1729,7 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(config)
if keyring:
keyring_path = os.path.join(data_dir, 'keyring')
with open(keyring_path, 'w') as f:
@ -1606,7 +1738,7 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
f.write(keyring)
if daemon_type in Monitoring.components.keys():
config = get_parm(args.config_json) # type: ignore
config_json: Dict[str, Any] = get_parm(args.config_json)
required_files = Monitoring.components[daemon_type].get('config-json-files', list())
# Set up directories specific to the monitoring component
@ -1632,25 +1764,25 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
# populate the config directory for the component from the config-json
for fname in required_files:
if 'files' in config: # type: ignore
if isinstance(config['files'][fname], list): # type: ignore
content = '\n'.join(config['files'][fname]) # type: ignore
else:
content = config['files'][fname] # type: ignore
if 'files' in config_json: # type: ignore
content = dict_get_join(config_json['files'], fname)
with open(os.path.join(data_dir_root, config_dir, fname), 'w') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(content)
if daemon_type == NFSGanesha.daemon_type:
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
nfs_ganesha.create_daemon_dirs(data_dir, uid, gid)
if daemon_type == CephIscsi.daemon_type:
elif daemon_type == CephIscsi.daemon_type:
ceph_iscsi = CephIscsi.init(fsid, daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(fsid, daemon_id)
cc.create_daemon_dirs(data_dir, uid, gid)
def get_parm(option):
# type: (str) -> Dict[str, str]
@ -1715,8 +1847,12 @@ def get_container_binds(fsid, daemon_type, daemon_id):
binds = list()
if daemon_type == CephIscsi.daemon_type:
assert daemon_id
binds.extend(CephIscsi.get_container_binds())
elif daemon_type == CustomContainer.daemon_type:
assert daemon_id
cc = CustomContainer.init(fsid, daemon_id)
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
binds.extend(cc.get_container_binds(data_dir))
return binds
@ -1802,14 +1938,25 @@ def get_container_mounts(fsid, daemon_type, daemon_id,
log_dir = get_log_dir(fsid)
mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir))
if daemon_type == CustomContainer.daemon_type:
assert daemon_id
cc = CustomContainer.init(fsid, daemon_id)
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
mounts.update(cc.get_container_mounts(data_dir))
return mounts
def get_container(fsid, daemon_type, daemon_id,
privileged=False,
ptrace=False,
container_args=None):
# type: (str, str, Union[int, str], bool, bool, Optional[List[str]]) -> CephContainer
def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str],
privileged: bool = False,
ptrace: bool = False,
container_args: Optional[List[str]] = None) -> 'CephContainer':
entrypoint: str = ''
name: str = ''
ceph_args: List[str] = []
envs: List[str] = []
host_network: bool = True
if container_args is None:
container_args = []
if daemon_type in ['mon', 'osd']:
@ -1829,21 +1976,23 @@ def get_container(fsid, daemon_type, daemon_id,
name = '%s.%s' % (daemon_type, daemon_id)
elif daemon_type in Monitoring.components:
entrypoint = ''
name = ''
elif daemon_type == NFSGanesha.daemon_type:
entrypoint = NFSGanesha.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
envs.extend(NFSGanesha.get_container_envs())
elif daemon_type == CephIscsi.daemon_type:
entrypoint = CephIscsi.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
# So the container can modprobe iscsi_target_mod and have write perms
# to configfs we need to make this a privileged container.
privileged = True
else:
entrypoint = ''
name = ''
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(fsid, daemon_id)
entrypoint = cc.entrypoint
host_network = False
envs.extend(cc.get_container_envs())
container_args.extend(cc.get_container_args())
ceph_args = [] # type: List[str]
if daemon_type in Monitoring.components:
uid, gid = extract_uid_gid_monitoring(daemon_type)
monitoring_args = [
@ -1858,10 +2007,6 @@ def get_container(fsid, daemon_type, daemon_id,
elif daemon_type in Ceph.daemons:
ceph_args = ['-n', name, '-f']
envs = [] # type: List[str]
if daemon_type == NFSGanesha.daemon_type:
envs.extend(NFSGanesha.get_container_envs())
# if using podman, set -d, --conmon-pidfile & --cidfile flags
# so service can have Type=Forking
if 'podman' in container_path:
@ -1884,6 +2029,7 @@ def get_container(fsid, daemon_type, daemon_id,
privileged=privileged,
ptrace=ptrace,
init=args.container_init,
host_network=host_network,
)
@ -2002,7 +2148,7 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
def _write_container_cmd_to_bash(file_obj, container, comment=None, background=False):
# type: (IO[str], CephContainer, Optional[str], Optional[bool]) -> None
if comment:
# Sometimes adding a comment, espectially if there are multiple containers in one
# Sometimes adding a comment, especially if there are multiple containers in one
# unit file, makes it easier to read and grok.
file_obj.write('# ' + comment + '\n')
# Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
@ -2014,6 +2160,7 @@ def _write_container_cmd_to_bash(file_obj, container, comment=None, background=F
# container run command
file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n')
def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, start=True,
osd_fsid=None):
@ -2340,19 +2487,19 @@ WantedBy=ceph-{fsid}.target
class CephContainer:
def __init__(self,
image,
entrypoint,
args=[],
volume_mounts={},
cname='',
container_args=[],
envs=None,
privileged=False,
ptrace=False,
bind_mounts=None,
init=False,
):
# type: (str, str, List[str], Dict[str, str], str, List[str], Optional[List[str]], bool, bool, Optional[List[List[str]]], bool) -> None
image: str,
entrypoint: str,
args: List[str] = [],
volume_mounts: Dict[str, str] = {},
cname: str = '',
container_args: List[str] = [],
envs: Optional[List[str]] = None,
privileged: bool = False,
ptrace: bool = False,
bind_mounts: Optional[List[List[str]]] = None,
init: bool = False,
host_network: bool = True,
) -> None:
self.image = image
self.entrypoint = entrypoint
self.args = args
@ -2364,84 +2511,87 @@ class CephContainer:
self.ptrace = ptrace
self.bind_mounts = bind_mounts if bind_mounts else []
self.init = init
self.host_network = host_network
def run_cmd(self):
# type: () -> List[str]
vols = [] # type: List[str]
envs = [] # type: List[str]
cname = [] # type: List[str]
binds = [] # type: List[str]
entrypoint = [] # type: List[str]
if self.entrypoint:
entrypoint = ['--entrypoint', self.entrypoint]
priv = [] # type: List[str]
if self.privileged:
priv = ['--privileged',
# let OSD etc read block devs that haven't been chowned
'--group-add=disk']
if self.ptrace:
priv.append('--cap-add=SYS_PTRACE')
init = ['--init'] if self.init else []
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
binds = sum([['--mount', '{}'.format(','.join(bind))]
for bind in self.bind_mounts],[])
envs = [
'-e', 'CONTAINER_IMAGE=%s' % self.image,
'-e', 'NODE_NAME=%s' % get_hostname(),
]
if self.envs:
for e in self.envs:
envs.extend(['-e', e])
cname = ['--name', self.cname] if self.cname else []
return [
def run_cmd(self) -> List[str]:
cmd_args: List[str] = [
str(container_path),
'run',
'--rm',
'--net=host',
'--ipc=host',
] + self.container_args + priv + \
cname + init + envs + \
vols + binds + entrypoint + \
[
self.image
] + self.args # type: ignore
]
envs: List[str] = [
'-e', 'CONTAINER_IMAGE=%s' % self.image,
'-e', 'NODE_NAME=%s' % get_hostname(),
]
vols: List[str] = []
binds: List[str] = []
def shell_cmd(self, cmd):
# type: (List[str]) -> List[str]
priv = [] # type: List[str]
if self.host_network:
cmd_args.append('--net=host')
if self.entrypoint:
cmd_args.extend(['--entrypoint', self.entrypoint])
if self.privileged:
priv = ['--privileged',
# let OSD etc read block devs that haven't been chowned
'--group-add=disk']
vols = [] # type: List[str]
cmd_args.extend([
'--privileged',
# let OSD etc read block devs that haven't been chowned
'--group-add=disk',
])
if self.ptrace:
cmd_args.append('--cap-add=SYS_PTRACE')
if self.init:
cmd_args.append('--init')
if self.cname:
cmd_args.extend(['--name', self.cname])
if self.envs:
for env in self.envs:
envs.extend(['-e', env])
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
binds = [] # type: List[str]
binds = sum([['--mount', '{}'.format(','.join(bind))]
for bind in self.bind_mounts], [])
envs = [
'-e', 'CONTAINER_IMAGE=%s' % self.image,
'-e', 'NODE_NAME=%s' % get_hostname(),
]
if self.envs:
for e in self.envs:
envs.extend(['-e', e])
cmd_args = [] # type: List[str]
if cmd:
cmd_args = ['-c'] + cmd
return [
return cmd_args + self.container_args + envs + vols + binds + [
self.image,
] + self.args # type: ignore
def shell_cmd(self, cmd: List[str]) -> List[str]:
cmd_args: List[str] = [
str(container_path),
'run',
'--rm',
'--net=host',
'--ipc=host',
] + self.container_args + priv + envs + vols + binds + [
]
envs: List[str] = [
'-e', 'CONTAINER_IMAGE=%s' % self.image,
'-e', 'NODE_NAME=%s' % get_hostname(),
]
vols: List[str] = []
binds: List[str] = []
if self.host_network:
cmd_args.append('--net=host')
if self.privileged:
cmd_args.extend([
'--privileged',
# let OSD etc read block devs that haven't been chowned
'--group-add=disk',
])
if self.envs:
for env in self.envs:
envs.extend(['-e', env])
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
binds = sum([['--mount', '{}'.format(','.join(bind))]
for bind in self.bind_mounts], [])
return cmd_args + self.container_args + envs + vols + binds + [
'--entrypoint', cmd[0],
self.image
self.image,
] + cmd[1:]
def exec_cmd(self, cmd):
@ -2474,7 +2624,6 @@ class CephContainer:
def run(self, timeout=DEFAULT_TIMEOUT):
# type: (Optional[int]) -> str
logger.debug(self.run_cmd())
out, _, _ = call_throws(
self.run_cmd(), desc=self.entrypoint, timeout=timeout)
return out
@ -3256,8 +3405,22 @@ def command_deploy():
config=config, keyring=keyring,
reconfig=args.reconfig,
ports=daemon_ports)
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(args.fsid, daemon_id)
if not args.reconfig and not redeploy:
daemon_ports.extend(cc.ports)
c = get_container(args.fsid, daemon_type, daemon_id,
privileged=cc.privileged,
ptrace=args.allow_ptrace)
deploy_daemon(args.fsid, daemon_type, daemon_id, c,
uid=cc.uid, gid=cc.gid, config=None,
keyring=None, reconfig=args.reconfig,
ports=daemon_ports)
else:
raise Error("{} not implemented in command_deploy function".format(daemon_type))
raise Error('daemon type {} not implemented in command_deploy function'
.format(daemon_type))
##################################
@ -3658,6 +3821,11 @@ def list_daemons(detail=True, legacy_dir=None):
err.startswith('%s, version ' % cmd):
version = err.split(' ')[2]
seen_versions[image_id] = version
elif daemon_type == CustomContainer.daemon_type:
# Because a custom container can contain
# everything, we do not know which command
# to execute to get the version.
pass
else:
logger.warning('version for unknown daemon type %s' % daemon_type)
else:

View File

@ -0,0 +1,35 @@
{
"image": "docker.io/prom/alertmanager:v0.20.0",
"ports": [9093, 9094],
"args": [
"-p 9093:9093",
"-p 9094:9094"
],
"dirs": ["etc/alertmanager"],
"files": {
"etc/alertmanager/alertmanager.yml": [
"global:",
" resolve_timeout: 5m",
"",
"route:",
" group_by: ['alertname']",
" group_wait: 10s",
" group_interval: 10s",
" repeat_interval: 1h",
" receiver: 'web.hook'",
"receivers:",
"- name: 'web.hook'",
" webhook_configs:",
" - url: 'http://127.0.0.1:5001/'",
"inhibit_rules:",
" - source_match:",
" severity: 'critical'",
" target_match:",
" severity: 'warning'",
" equal: ['alertname', 'dev', 'instance']"
]
},
"volume_mounts": {
"etc/alertmanager": "/etc/alertmanager"
}
}

View File

@ -1,5 +1,4 @@
# type: ignore
import argparse
import mock
from mock import patch
import os
@ -13,9 +12,12 @@ with patch('builtins.open', create=True):
cd = SourceFileLoader('cephadm', 'cephadm').load_module()
class TestCephAdm(object):
def test_is_fsid(self):
def test_is_not_fsid(self):
assert not cd.is_fsid('no-uuid')
def test_is_fsid(self):
assert cd.is_fsid('e863154d-33c7-4350-bca5-921e0467e55b')
def test__get_parser_image(self):
args = cd._parse_args(['--image', 'foo', 'version'])
assert args.image == 'foo'
@ -253,3 +255,109 @@ default via fe80::2480:28ec:5097:3fe2 dev wlp2s0 proto ra metric 20600 pref medi
'image_id': '16f4549cf7a8f112bbebf7946749e961fbbd1b0838627fe619aab16bc17ce552',
'repo_digest': 'quay.ceph.io/ceph-ci/ceph@sha256:4e13da36c1bd6780b312a985410ae678984c37e6a9493a74c87e4a50b9bda41f'
}
def test_dict_get(self):
result = cd.dict_get({'a': 1}, 'a', require=True)
assert result == 1
result = cd.dict_get({'a': 1}, 'b')
assert result is None
result = cd.dict_get({'a': 1}, 'b', default=2)
assert result == 2
def test_dict_get_error(self):
with pytest.raises(cd.Error):
cd.dict_get({'a': 1}, 'b', require=True)
def test_dict_get_join(self):
result = cd.dict_get_join({'foo': ['a', 'b']}, 'foo')
assert result == 'a\nb'
result = cd.dict_get_join({'foo': [1, 2]}, 'foo')
assert result == '1\n2'
result = cd.dict_get_join({'bar': 'a'}, 'bar')
assert result == 'a'
result = cd.dict_get_join({'a': 1}, 'a')
assert result == 1
class TestCustomContainer(unittest.TestCase):
cc: cd.CustomContainer
def setUp(self):
self.cc = cd.CustomContainer(
'e863154d-33c7-4350-bca5-921e0467e55b',
'container',
config_json={
'entrypoint': 'bash',
'gid': 1000,
'args': [
'--no-healthcheck',
'-p 6800:6800'
],
'envs': ['SECRET=password'],
'ports': [8080, 8443],
'volume_mounts': {
'/CONFIG_DIR': '/foo/conf',
'bar/config': '/bar:ro'
},
'bind_mounts': [
[
'type=bind',
'source=/CONFIG_DIR',
'destination=/foo/conf',
''
],
[
'type=bind',
'source=bar/config',
'destination=/bar:ro',
'ro=true'
]
]
},
image='docker.io/library/hello-world:latest'
)
def test_entrypoint(self):
self.assertEqual(self.cc.entrypoint, 'bash')
def test_uid_gid(self):
self.assertEqual(self.cc.uid, 65534)
self.assertEqual(self.cc.gid, 1000)
def test_ports(self):
self.assertEqual(self.cc.ports, [8080, 8443])
def test_get_container_args(self):
result = self.cc.get_container_args()
self.assertEqual(result, [
'--no-healthcheck',
'-p 6800:6800'
])
def test_get_container_envs(self):
result = self.cc.get_container_envs()
self.assertEqual(result, ['SECRET=password'])
def test_get_container_mounts(self):
result = self.cc.get_container_mounts('/xyz')
self.assertDictEqual(result, {
'/CONFIG_DIR': '/foo/conf',
'/xyz/bar/config': '/bar:ro'
})
def test_get_container_binds(self):
result = self.cc.get_container_binds('/xyz')
self.assertEqual(result, [
[
'type=bind',
'source=/CONFIG_DIR',
'destination=/foo/conf',
''
],
[
'type=bind',
'source=/xyz/bar/config',
'destination=/bar:ro',
'ro=true'
]
])

View File

@ -23,7 +23,8 @@ import subprocess
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
NFSServiceSpec, RGWSpec, ServiceSpec, PlacementSpec, assert_valid_host
NFSServiceSpec, RGWSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
CustomContainerSpec
from cephadm.services.cephadmservice import CephadmDaemonSpec
from mgr_module import MgrModule, HandleCommandResult
@ -37,6 +38,7 @@ from . import utils
from .migrations import Migrations
from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
RbdMirrorService, CrashService, CephadmService
from .services.container import CustomContainerService
from .services.iscsi import IscsiService
from .services.nfs import NFSService
from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
@ -376,6 +378,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
self.node_exporter_service = NodeExporterService(self)
self.crash_service = CrashService(self)
self.iscsi_service = IscsiService(self)
self.container_service = CustomContainerService(self)
self.cephadm_services = {
'mon': self.mon_service,
'mgr': self.mgr_service,
@ -390,6 +393,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
'node-exporter': self.node_exporter_service,
'crash': self.crash_service,
'iscsi': self.iscsi_service,
'container': self.container_service,
}
self.template = TemplateMgr()
@ -653,6 +657,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
suffix = daemon_type not in [
'mon', 'crash', 'nfs',
'prometheus', 'node-exporter', 'grafana', 'alertmanager',
'container'
]
if forcename:
if len([d for d in existing if d.daemon_id == forcename]):
@ -1112,7 +1117,7 @@ To check that the host is reachable:
self.log.exception(ex)
raise
def _get_container_image(self, daemon_name: str) -> str:
def _get_container_image(self, daemon_name: str) -> Optional[str]:
daemon_type = daemon_name.split('.', 1)[0] # type: ignore
if daemon_type in CEPH_TYPES or \
daemon_type == 'nfs' or \
@ -1132,6 +1137,11 @@ To check that the host is reachable:
image = self.container_image_alertmanager
elif daemon_type == 'node-exporter':
image = self.container_image_node_exporter
elif daemon_type == CustomContainerService.TYPE:
# The image can't be resolved, the necessary information
# is only available when a container is deployed (given
# via spec).
image = None
else:
assert False, daemon_type
@ -1668,7 +1678,7 @@ To check that the host is reachable:
).name()):
return self._daemon_action(daemon_type, daemon_id, host, action)
def _daemon_action(self, daemon_type, daemon_id, host, action, image=None):
def _daemon_action(self, daemon_type, daemon_id, host, action, image=None) -> str:
daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
host=host,
daemon_id=daemon_id,
@ -1680,7 +1690,7 @@ To check that the host is reachable:
if action == 'redeploy':
if self.daemon_is_self(daemon_type, daemon_id):
self.mgr_service.fail_over()
return # unreachable.
return '' # unreachable
# stop, recreate the container+unit, then restart
return self._create_daemon(daemon_spec)
elif action == 'reconfig':
@ -1964,16 +1974,35 @@ To check that the host is reachable:
hostname=daemon_spec.host,
).service_id(), overwrite=True):
image = ''
start_time = datetime.datetime.utcnow()
ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
if daemon_spec.daemon_type == 'container':
spec: Optional[CustomContainerSpec] = daemon_spec.spec
if spec is None:
# Exit here immediately because the required service
# spec to create a daemon is not provided. This is only
# provided when a service is applied via 'orch apply'
# command.
msg = "Failed to {} daemon {} on {}: Required " \
"service specification not provided".format(
'reconfigure' if reconfig else 'deploy',
daemon_spec.name(), daemon_spec.host)
self.log.info(msg)
return msg
image = spec.image
if spec.ports:
ports.extend(spec.ports)
cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(
daemon_spec)
daemon_spec.extra_args.extend(['--config-json', '-'])
# TCP port to open in the host firewall
if daemon_spec.ports:
daemon_spec.extra_args.extend(
['--tcp-ports', ' '.join(map(str, daemon_spec.ports))])
if len(ports) > 0:
daemon_spec.extra_args.extend([
'--tcp-ports', ' '.join(map(str, ports))
])
# osd deployments needs an --osd-uuid arg
if daemon_spec.daemon_type == 'osd':
@ -1993,6 +2022,8 @@ To check that the host is reachable:
self._registry_login(daemon_spec.host, self.registry_url,
self.registry_username, self.registry_password)
daemon_spec.extra_args.extend(['--config-json', '-'])
self.log.info('%s daemon %s on %s' % (
'Reconfiguring' if reconfig else 'Deploying',
daemon_spec.name(), daemon_spec.host))
@ -2002,7 +2033,8 @@ To check that the host is reachable:
[
'--name', daemon_spec.name(),
] + daemon_spec.extra_args,
stdin=json.dumps(cephadm_config))
stdin=json.dumps(cephadm_config),
image=image)
if not code and daemon_spec.host in self.cache.daemons:
# prime cached service state with what we (should have)
# just created
@ -2422,6 +2454,7 @@ To check that the host is reachable:
'prometheus': PlacementSpec(count=1),
'node-exporter': PlacementSpec(host_pattern='*'),
'crash': PlacementSpec(host_pattern='*'),
'container': PlacementSpec(count=1),
}
spec.placement = defaults[spec.service_type]
elif spec.service_type in ['mon', 'mgr'] and \
@ -2544,6 +2577,15 @@ To check that the host is reachable:
def apply_alertmanager(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
def add_container(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('container', spec,
self.container_service.prepare_create)
@trivial_completion
def apply_container(self, spec: ServiceSpec) -> str:
return self._apply(spec)
def _get_container_image_info(self, image_name) -> ContainerInspectInfo:
# pick a random host...
host = None

View File

@ -0,0 +1,29 @@
import logging
from typing import List, Any, Tuple, Dict
from ceph.deployment.service_spec import CustomContainerSpec
from .cephadmservice import CephadmService, CephadmDaemonSpec
logger = logging.getLogger(__name__)
class CustomContainerService(CephadmService):
TYPE = 'container'
def prepare_create(self, daemon_spec: CephadmDaemonSpec[CustomContainerSpec]) \
-> CephadmDaemonSpec:
assert self.TYPE == daemon_spec.daemon_type
return daemon_spec
def generate_config(self, daemon_spec: CephadmDaemonSpec[CustomContainerSpec]) \
-> Tuple[Dict[str, Any], List[str]]:
assert self.TYPE == daemon_spec.daemon_type
assert daemon_spec.spec
deps: List[str] = []
spec: CustomContainerSpec = daemon_spec.spec
config: Dict[str, Any] = spec.config_json()
logger.debug(
'Generated configuration for \'%s\' service: config-json=%s, dependencies=%s' %
(self.TYPE, config, deps))
return config, deps

View File

@ -16,7 +16,7 @@ except ImportError:
from execnet.gateway_bootstrap import HostNotFound
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec
NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec
from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.inventory import Devices, Device
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
@ -658,6 +658,28 @@ class TestCephadm(object):
api_user='user',
api_password='password'
), CephadmOrchestrator.apply_iscsi),
(CustomContainerSpec(
service_id='hello-world',
image='docker.io/library/hello-world:latest',
uid=65534,
gid=65534,
dirs=['foo/bar'],
files={
'foo/bar/xyz.conf': 'aaa\nbbb'
},
bind_mounts=[[
'type=bind',
'source=lib/modules',
'destination=/lib/modules',
'ro=true'
]],
volume_mounts={
'foo/bar': '/foo/bar:Z'
},
args=['--no-healthcheck'],
envs=['SECRET=password'],
ports=[8080, 8443]
), CephadmOrchestrator.apply_container),
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))

View File

@ -7,7 +7,7 @@ import json
import pytest
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
IscsiServiceSpec, AlertManagerSpec, HostPlacementSpec
IscsiServiceSpec, AlertManagerSpec, HostPlacementSpec, CustomContainerSpec
from orchestrator import DaemonDescription, OrchestratorError
@ -544,6 +544,20 @@ def test_dd_octopus(dd_json):
),
True
),
(
CustomContainerSpec(
service_type='container',
service_id='hello-world',
image='docker.io/library/hello-world:latest',
),
DaemonDescription(
daemon_type='container',
daemon_id='hello-world.mgr0',
hostname='mgr0',
),
True
),
])
def test_daemon_description_service_name(spec: ServiceSpec,
dd: DaemonDescription,
@ -566,3 +580,56 @@ def test_alertmanager_spec_2():
spec = AlertManagerSpec(user_data={'default_webhook_urls': ['foo']})
assert isinstance(spec.user_data, dict)
assert 'default_webhook_urls' in spec.user_data.keys()
def test_custom_container_spec():
spec = CustomContainerSpec(service_id='hello-world',
image='docker.io/library/hello-world:latest',
entrypoint='/usr/bin/bash',
uid=1000,
gid=2000,
volume_mounts={'foo': '/foo'},
args=['--foo'],
envs=['FOO=0815'],
bind_mounts=[
[
'type=bind',
'source=lib/modules',
'destination=/lib/modules',
'ro=true'
]
],
ports=[8080, 8443],
dirs=['foo', 'bar'],
files={
'foo.conf': 'foo\nbar',
'bar.conf': ['foo', 'bar']
})
assert spec.service_type == 'container'
assert spec.entrypoint == '/usr/bin/bash'
assert spec.uid == 1000
assert spec.gid == 2000
assert spec.volume_mounts == {'foo': '/foo'}
assert spec.args == ['--foo']
assert spec.envs == ['FOO=0815']
assert spec.bind_mounts == [
[
'type=bind',
'source=lib/modules',
'destination=/lib/modules',
'ro=true'
]
]
assert spec.ports == [8080, 8443]
assert spec.dirs == ['foo', 'bar']
assert spec.files == {
'foo.conf': 'foo\nbar',
'bar.conf': ['foo', 'bar']
}
def test_custom_container_spec_config_json():
spec = CustomContainerSpec(service_id='foo', image='foo', dirs=None)
config_json = spec.config_json()
for key in ['entrypoint', 'uid', 'gid', 'bind_mounts', 'dirs']:
assert key not in config_json

View File

@ -197,8 +197,10 @@ export class ServiceFormComponent extends CdForm implements OnInit {
ngOnInit(): void {
this.action = this.actionLabels.CREATE;
this.cephServiceService.getKnownTypes().subscribe((resp: Array<string>) => {
// Remove service type 'osd', this is deployed a different way.
this.serviceTypes = _.difference(resp, ['osd']).sort();
// Remove service types:
// osd - This is deployed a different way.
// container - This should only be used in the CLI.
this.serviceTypes = _.difference(resp, ['container', 'osd']).sort();
});
this.hostService.list().subscribe((resp: object[]) => {
const options: SelectOption[] = [];

View File

@ -379,8 +379,9 @@ class ServiceSpec(object):
"""
KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi mds mgr mon nfs ' \
'node-exporter osd prometheus rbd-mirror rgw'.split()
REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw'.split()
'node-exporter osd prometheus rbd-mirror rgw ' \
'container'.split()
REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container'.split()
@classmethod
def _cls(cls, service_type):
@ -391,7 +392,8 @@ class ServiceSpec(object):
'nfs': NFSServiceSpec,
'osd': DriveGroupSpec,
'iscsi': IscsiServiceSpec,
'alertmanager': AlertManagerSpec
'alertmanager': AlertManagerSpec,
'container': CustomContainerSpec,
}.get(service_type, cls)
if ret == ServiceSpec and not service_type:
raise ServiceSpecValidationError('Spec needs a "service_type" key.')
@ -775,3 +777,67 @@ class AlertManagerSpec(ServiceSpec):
yaml.add_representer(AlertManagerSpec, ServiceSpec.yaml_representer)
class CustomContainerSpec(ServiceSpec):
def __init__(self,
service_type: str = 'container',
service_id: str = None,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
preview_only: bool = False,
image: str = None,
entrypoint: Optional[str] = None,
uid: Optional[int] = None,
gid: Optional[int] = None,
volume_mounts: Optional[Dict[str, str]] = {},
args: Optional[List[str]] = [],
envs: Optional[List[str]] = [],
privileged: Optional[bool] = False,
bind_mounts: Optional[List[List[str]]] = None,
ports: Optional[List[int]] = [],
dirs: Optional[List[str]] = [],
files: Optional[Dict[str, Any]] = {},
):
assert service_type == 'container'
assert service_id is not None
assert image is not None
super(CustomContainerSpec, self).__init__(
service_type, service_id,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only)
self.image = image
self.entrypoint = entrypoint
self.uid = uid
self.gid = gid
self.volume_mounts = volume_mounts
self.args = args
self.envs = envs
self.privileged = privileged
self.bind_mounts = bind_mounts
self.ports = ports
self.dirs = dirs
self.files = files
def config_json(self) -> Dict[str, Any]:
"""
Helper function to get the value of the `--config-json` cephadm
command line option. It will contain all specification properties
that haven't a `None` value. Such properties will get default
values in cephadm.
:return: Returns a dictionary containing all specification
properties.
"""
config_json = {}
for prop in ['image', 'entrypoint', 'uid', 'gid', 'args',
'envs', 'volume_mounts', 'privileged',
'bind_mounts', 'ports', 'dirs', 'files']:
value = getattr(self, prop)
if value is not None:
config_json[prop] = value
return config_json
yaml.add_representer(CustomContainerSpec, ServiceSpec.yaml_representer)