Merge pull request #57394 from phlogistonjohn/jjm-smb-module-testing

smb: improve smb module test coverage

Reviewed-by: Adam King <adking@redhat.com>
Reviewed-by: Avan Thakkar <athakkar@redhat.com>
This commit is contained in:
Adam King 2024-07-01 15:52:29 -04:00 committed by GitHub
commit 0434c44554
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 734 additions and 88 deletions

View File

@ -54,9 +54,11 @@ class CephFSPathResolver:
map to real paths in the cephfs volume and determine if those paths exist.
"""
def __init__(self, mgr: Module_T) -> None:
def __init__(
self, mgr: Module_T, *, client: Optional[CephfsClient] = None
) -> None:
self._mgr = mgr
self._cephfs_client = CephfsClient(mgr)
self._cephfs_client = client or CephfsClient(mgr)
def resolve(
self, volume: str, subvolumegroup: str, subvolume: str, path: str

View File

@ -13,8 +13,6 @@ from typing import (
)
import logging
import random
import string
import time
from ceph.deployment.service_spec import SMBSpec
@ -46,10 +44,10 @@ from .proto import (
OrchSubmitter,
PathResolver,
Simplified,
checked,
)
from .resources import SMBResource
from .results import ErrorResult, Result, ResultGroup
from .utils import checked, ynbool
ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
ShareRef = Union[resources.Share, resources.RemovedShare]
@ -956,11 +954,6 @@ def _ug_refs(cluster: resources.Cluster) -> Collection[str]:
}
def _ynbool(value: bool) -> str:
"""Convert a bool to an smb.conf compatible string."""
return 'Yes' if value else 'No'
def _generate_share(
share: resources.Share, resolver: PathResolver, cephx_entity: str
) -> Dict[str, Dict[str, str]]:
@ -988,8 +981,8 @@ def _generate_share(
'ceph:config_file': '/etc/ceph/ceph.conf',
'ceph:filesystem': share.cephfs.volume,
'ceph:user_id': cephx_entity,
'read only': _ynbool(share.readonly),
'browseable': _ynbool(share.browseable),
'read only': ynbool(share.readonly),
'browseable': ynbool(share.browseable),
'kernel share modes': 'no',
'x:ceph:id': f'{share.cluster_id}.{share.share_id}',
}
@ -1251,11 +1244,3 @@ def _cephx_data_entity(cluster_id: str) -> str:
use for data access.
"""
return f'client.smb.fs.cluster.{cluster_id}'
def rand_name(prefix: str, max_len: int = 18, suffix_len: int = 8) -> str:
trunc = prefix[: (max_len - suffix_len)]
suffix = ''.join(
random.choice(string.ascii_lowercase) for _ in range(suffix_len)
)
return f'{trunc}{suffix}'

View File

@ -12,10 +12,10 @@ from .proto import (
EntryKey,
Self,
Simplifiable,
one,
)
from .resources import SMBResource
from .results import ErrorResult
from .utils import one
T = TypeVar('T')

View File

@ -6,7 +6,16 @@ import orchestrator
from ceph.deployment.service_spec import PlacementSpec, SMBSpec
from mgr_module import MgrModule, Option
from . import cli, fs, handler, mon_store, rados_store, resources, results
from . import (
cli,
fs,
handler,
mon_store,
rados_store,
resources,
results,
utils,
)
from .enums import AuthMode, JoinSourceType, UserGroupSourceType
from .proto import AccessAuthorizer, Simplified
@ -116,7 +125,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
'a domain join username & password value'
' must contain a "%" separator'
)
rname = handler.rand_name(cluster_id)
rname = utils.rand_name(cluster_id)
join_sources.append(
resources.JoinSource(
source_type=JoinSourceType.RESOURCE,
@ -156,7 +165,7 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
for unpw in define_user_pass or []:
username, password = unpw.split('%', 1)
users.append({'name': username, 'password': password})
rname = handler.rand_name(cluster_id)
rname = utils.rand_name(cluster_id)
user_group_settings.append(
resources.UserGroupSource(
source_type=UserGroupSourceType.RESOURCE, ref=rname

View File

@ -9,7 +9,6 @@ from typing import (
List,
Optional,
Tuple,
TypeVar,
)
import sys
@ -155,27 +154,3 @@ class AccessAuthorizer(Protocol):
self, volume: str, entity: str, caps: str = ''
) -> None:
... # pragma: no cover
T = TypeVar('T')
# TODO: move to a utils.py
def one(lst: List[T]) -> T:
if len(lst) != 1:
raise ValueError("list does not contain exactly one element")
return lst[0]
class IsNoneError(ValueError):
pass
def checked(v: Optional[T]) -> T:
"""Ensures the provided value is not a None or raises a IsNoneError.
Intended use is similar to an `assert v is not None` but more usable in
one-liners and list/dict/etc comprehensions.
"""
if v is None:
raise IsNoneError('value is None')
return v

View File

@ -650,6 +650,7 @@ def load(data: Simplified) -> List[Any]:
"""
# Given a bare list/iterator. Assume it contains loadable objects.
if not isinstance(data, dict):
assert not isinstance(data, (str, bytes))
return list(chain.from_iterable(load(v) for v in data))
# Given a "list object"
if _RESOURCE_TYPE not in data and _RESOURCES in data:

View File

@ -1,10 +1,12 @@
from typing import Dict, List, Optional, Union, cast
from typing import Dict, List, Optional, Tuple, Union, cast
import errno
import json
import yaml
from ceph.deployment.service_spec import PlacementSpec
from object_format import ErrorResponseBase
from . import resourcelib, validation
from .enums import (
@ -16,7 +18,8 @@ from .enums import (
LoginCategory,
UserGroupSourceType,
)
from .proto import Self, Simplified, checked
from .proto import Self, Simplified
from .utils import checked
def _get_intent(data: Simplified) -> Intent:
@ -34,11 +37,22 @@ def _present(data: Simplified) -> bool:
return _get_intent(data) == Intent.PRESENT
class InvalidResourceError(ValueError):
class InvalidResourceError(ValueError, ErrorResponseBase):
def __init__(self, msg: str, data: Simplified) -> None:
super().__init__(msg)
self.resource_data = data
def to_simplified(self) -> Simplified:
return {
'resource': self.resource_data,
'msg': str(self),
'success': False,
}
def format_response(self) -> Tuple[int, str, str]:
data = json.dumps(self.to_simplified())
return -errno.EINVAL, data, "Invalid resource"
@classmethod
def wrap(cls, err: Exception, data: Simplified) -> Exception:
if isinstance(err, ValueError) and not isinstance(
@ -48,6 +62,26 @@ class InvalidResourceError(ValueError):
return err
class InvalidInputError(ValueError, ErrorResponseBase):
summary_max = 1024
def __init__(self, msg: str, content: str) -> None:
super().__init__(msg)
self.content = content
def to_simplified(self) -> Simplified:
return {
'input': self.content[: self.summary_max],
'truncated_input': len(self.content) > self.summary_max,
'msg': str(self),
'success': False,
}
def format_response(self) -> Tuple[int, str, str]:
data = json.dumps(self.to_simplified())
return -errno.EINVAL, data, "Invalid input"
class _RBase:
# mypy doesn't currently (well?) support class decorators adding methods
# so we use a base class to add this method to all our resource classes.
@ -297,8 +331,7 @@ class WrappedPlacementSpec(PlacementSpec):
# improperly typed. They are improperly typed because typing.Self
# didn't exist and the old correct way is a PITA to write (and
# remember). Thus a lot of classmethods are return the exact class
# which is technically incorrect. This fine class is guilty of the same
# sin. :-)
# which is technically incorrect.
return cast(Self, cls.from_json(data))
@classmethod
@ -424,19 +457,27 @@ SMBResource = Union[
]
def load_text(blob: str) -> List[SMBResource]:
def load_text(
blob: str, *, input_sample_max: int = 1024
) -> List[SMBResource]:
"""Given JSON or YAML return a list of SMBResource objects deserialized
from the input.
"""
json_err = None
try:
data = yaml.safe_load(blob)
except ValueError:
pass
try:
# apparently JSON is not always as strict subset of YAML
# therefore trying to parse as JSON first is not a waste:
# https://john-millikin.com/json-is-not-a-yaml-subset
data = json.loads(blob)
except ValueError:
pass
return load(data)
except ValueError as err:
json_err = err
try:
data = yaml.safe_load(blob) if json_err else data
except (ValueError, yaml.parser.ParserError) as err:
raise InvalidInputError(str(err), blob) from err
if not isinstance(data, (list, dict)):
raise InvalidInputError("input must be an object or list", blob)
return load(cast(Simplified, data))
def load(data: Simplified) -> List[SMBResource]:

View File

@ -2,8 +2,9 @@ from typing import Iterable, Iterator, List, Optional
import errno
from .proto import Simplified, one
from .proto import Simplified
from .resources import SMBResource
from .utils import one
_DOMAIN = 'domain'

View File

@ -22,3 +22,24 @@ import smb.enums
)
def test_stringified(value, strval):
assert str(value) == strval
def test_login_access_expand():
assert smb.enums.LoginAccess.ADMIN.expand() == smb.enums.LoginAccess.ADMIN
assert (
smb.enums.LoginAccess.READ_ONLY.expand()
== smb.enums.LoginAccess.READ_ONLY
)
assert (
smb.enums.LoginAccess.READ_ONLY_SHORT.expand()
== smb.enums.LoginAccess.READ_ONLY
)
assert (
smb.enums.LoginAccess.READ_WRITE.expand()
== smb.enums.LoginAccess.READ_WRITE
)
assert (
smb.enums.LoginAccess.READ_WRITE_SHORT.expand()
== smb.enums.LoginAccess.READ_WRITE
)
assert smb.enums.LoginAccess.NONE.expand() == smb.enums.LoginAccess.NONE

View File

@ -0,0 +1,67 @@
from unittest import mock
import pytest
import smb.fs
def test_mocked_fs_authorizer():
def mmcmd(cmd):
assert cmd['filesystem'] == 'cephfs'
if 'kaboom' in cmd['entity']:
return -5, 'oops', 'fail'
return 0, 'ok', 'nice'
m = mock.MagicMock()
m.mon_command.side_effect = mmcmd
fsauth = smb.fs.FileSystemAuthorizer(m)
fsauth.authorize_entity('cephfs', 'client.smb.foo')
with pytest.raises(smb.fs.AuthorizationGrantError):
fsauth.authorize_entity('cephfs', 'client.smb.kaboom')
def test_mocked_fs_path_resolver(monkeypatch):
# we have to "re-patch" whatever cephfs module gets mocked with because
# the ObjectNotFound attribute is not an exception in the test environment
monkeypatch.setattr('cephfs.ObjectNotFound', KeyError)
def mmcmd(cmd):
if cmd['prefix'] == 'fs subvolume getpath':
if cmd['vol_name'] == 'cephfs' and cmd['sub_name'] == 'beta':
return 0, '/volumes/cool/path/f00d-600d', ''
return -5, '', 'eek'
m = mock.MagicMock()
m.mon_command.side_effect = mmcmd
fspr = smb.fs.CephFSPathResolver(m, client=m)
# resolve
path = fspr.resolve('cephfs', '', '', '/zowie')
assert path == '/zowie'
path = fspr.resolve('cephfs', 'alpha', 'beta', '/zowie')
assert path == '/volumes/cool/path/f00d-600d/zowie'
with pytest.raises(smb.fs.CephFSSubvolumeResolutionError):
path = fspr.resolve('ouch', 'alpha', 'beta', '/zowie')
# resolve_exists
m.connection_pool.get_fs_handle.return_value.statx.return_value = {
'mode': 0o41777
}
path = fspr.resolve_exists('cephfs', 'alpha', 'beta', '/zowie')
assert path == '/volumes/cool/path/f00d-600d/zowie'
m.connection_pool.get_fs_handle.return_value.statx.return_value = {
'mode': 0o101777
}
with pytest.raises(NotADirectoryError):
fspr.resolve_exists('cephfs', 'alpha', 'beta', '/zowie')
m.connection_pool.get_fs_handle.return_value.statx.side_effect = (
mock.MagicMock(side_effect=OSError('nope'))
)
with pytest.raises(FileNotFoundError):
fspr.resolve_exists('cephfs', 'alpha', 'beta', '/zowie')

View File

@ -574,14 +574,6 @@ def test_apply_no_matching_cluster_error(thandler):
assert not rg.success
def test_one():
assert smb.proto.one(['a']) == 'a'
with pytest.raises(ValueError):
smb.proto.one([])
with pytest.raises(ValueError):
smb.proto.one(['a', 'b'])
def test_apply_full_cluster_create(thandler):
to_apply = [
smb.resources.JoinAuth(
@ -1259,21 +1251,36 @@ def test_apply_cluster_bad_linked_auth(thandler):
assert rs['results'][1]['msg'] == 'join auth linked to different cluster'
def test_rand_name():
name = smb.handler.rand_name('bob')
assert name.startswith('bob')
assert len(name) == 11
name = smb.handler.rand_name('carla')
assert name.startswith('carla')
assert len(name) == 13
name = smb.handler.rand_name('dangeresque')
assert name.startswith('dangeresqu')
assert len(name) == 18
name = smb.handler.rand_name('fhqwhgadsfhqwhgadsfhqwhgads')
assert name.startswith('fhqwhgadsf')
assert len(name) == 18
name = smb.handler.rand_name('')
assert len(name) == 8
def test_apply_cluster_bad_linked_ug(thandler):
to_apply = [
smb.resources.UsersAndGroups(
users_groups_id='ug1',
values=smb.resources.UserGroupSettings(
users=[{"username": "foo"}],
groups=[],
),
linked_to_cluster='mycluster2',
),
smb.resources.Cluster(
cluster_id='mycluster1',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
smb.resources.UserGroupSource(
source_type=smb.resources.UserGroupSourceType.RESOURCE,
ref='ug1',
),
],
),
]
results = thandler.apply(to_apply)
assert not results.success
rs = results.to_simplified()
assert len(rs['results']) == 2
assert rs['results'][0]['msg'] == 'linked_to_cluster id not valid'
assert (
rs['results'][1]['msg']
== 'users and groups linked to different cluster'
)
def test_apply_with_create_only(thandler):
@ -1362,3 +1369,167 @@ def test_apply_with_create_only(thandler):
'shares',
'mycluster1.foodirs',
) in thandler.internal_store.data
def test_remove_in_use_cluster(thandler):
thandler.internal_store.overwrite(
{
'clusters.foo': {
'resource_type': 'ceph.smb.cluster',
'cluster_id': 'foo',
'auth_mode': 'active-directory',
'intent': 'present',
'domain_settings': {
'realm': 'dom1.example.com',
'join_sources': [
{
'source_type': 'resource',
'ref': 'foo1',
}
],
},
},
'join_auths.foo1': {
'resource_type': 'ceph.smb.join.auth',
'auth_id': 'foo1',
'intent': 'present',
'auth': {
'username': 'testadmin',
'password': 'Passw0rd',
},
},
'shares.foo.s1': {
'resource_type': 'ceph.smb.share',
'cluster_id': 'foo',
'share_id': 's1',
'intent': 'present',
'name': 'Ess One',
'readonly': False,
'browseable': True,
'cephfs': {
'volume': 'cephfs',
'path': '/',
'provider': 'samba-vfs',
},
},
}
)
to_apply = [
smb.resources.RemovedCluster(
cluster_id='foo',
),
]
results = thandler.apply(to_apply)
rs = results.to_simplified()
assert not results.success
assert 'cluster in use' in rs['results'][0]['msg']
def test_remove_in_use_join_auth(thandler):
thandler.internal_store.overwrite(
{
'clusters.foo': {
'resource_type': 'ceph.smb.cluster',
'cluster_id': 'foo',
'auth_mode': 'active-directory',
'intent': 'present',
'domain_settings': {
'realm': 'dom1.example.com',
'join_sources': [
{
'source_type': 'resource',
'ref': 'foo1',
}
],
},
},
'join_auths.foo1': {
'resource_type': 'ceph.smb.join.auth',
'auth_id': 'foo1',
'intent': 'present',
'auth': {
'username': 'testadmin',
'password': 'Passw0rd',
},
},
'shares.foo.s1': {
'resource_type': 'ceph.smb.share',
'cluster_id': 'foo',
'share_id': 's1',
'intent': 'present',
'name': 'Ess One',
'readonly': False,
'browseable': True,
'cephfs': {
'volume': 'cephfs',
'path': '/',
'provider': 'samba-vfs',
},
},
}
)
to_apply = [
smb.resources.JoinAuth(
auth_id='foo1',
intent=smb.enums.Intent.REMOVED,
),
]
results = thandler.apply(to_apply)
rs = results.to_simplified()
assert not results.success
assert 'resource in use' in rs['results'][0]['msg']
def test_remove_in_use_ug(thandler):
thandler.internal_store.overwrite(
{
'clusters.foo': {
'resource_type': 'ceph.smb.cluster',
'cluster_id': 'foo',
'auth_mode': 'user',
'intent': 'present',
'user_group_settings': [
{
'source_type': 'resource',
'ref': 'foo1',
}
],
},
'users_and_groups.foo1': {
'resource_type': 'ceph.smb.usersgroups',
'users_groups_id': 'foo1',
'intent': 'present',
'values': {
'users': [{"username": "foo"}],
'groups': [],
},
},
'shares.foo.s1': {
'resource_type': 'ceph.smb.share',
'cluster_id': 'foo',
'share_id': 's1',
'intent': 'present',
'name': 'Ess One',
'readonly': False,
'browseable': True,
'cephfs': {
'volume': 'cephfs',
'path': '/',
'provider': 'samba-vfs',
},
},
}
)
to_apply = [
smb.resources.UsersAndGroups(
users_groups_id='foo1',
intent=smb.enums.Intent.REMOVED,
),
]
results = thandler.apply(to_apply)
rs = results.to_simplified()
assert not results.success
assert 'resource in use' in rs['results'][0]['msg']

View File

@ -499,6 +499,112 @@ login_control:
"exc_type": ValueError,
"error": "admins",
},
# bad value in category field in login_control
{
"yaml": """
resource_type: ceph.smb.share
cluster_id: floop
share_id: ploof
cephfs:
volume: abc
path: /share1
subvolume: foo
restrict_access: true
""",
"exc_type": ValueError,
"error": "restricted access",
},
# removed share, no cluster id value
{
"yaml": """
resource_type: ceph.smb.share
cluster_id: ""
share_id: whammo
intent: removed
""",
"exc_type": ValueError,
"error": "cluster_id",
},
# removed share, no share id value
{
"yaml": """
resource_type: ceph.smb.share
cluster_id: whammo
share_id: ""
intent: removed
""",
"exc_type": ValueError,
"error": "share_id",
},
# share w/o cephfs sub-obj
{
"yaml": """
resource_type: ceph.smb.share
cluster_id: whammo
share_id: blammo
""",
"exc_type": ValueError,
"error": "cephfs",
},
# ad cluster, invalid join source, no ref
{
"yaml": """
resource_type: ceph.smb.cluster
cluster_id: whammo
auth_mode: active-directory
domain_settings:
realm: FOO.EXAMPLE.NET
join_sources:
- {}
""",
"exc_type": ValueError,
"error": "reference value",
},
# removed cluster, no cluster_id value
{
"yaml": """
resource_type: ceph.smb.cluster
cluster_id: ""
intent: removed
""",
"exc_type": ValueError,
"error": "cluster_id",
},
# u&g, missing id value
{
"yaml": """
resource_type: ceph.smb.usersgroups
users_groups_id: ""
""",
"exc_type": ValueError,
"error": "users_groups_id",
},
# u&g, bad linked_to_cluster value
{
"yaml": """
resource_type: ceph.smb.usersgroups
users_groups_id: wobble
linked_to_cluster: ~~~
values:
users:
- name: charlie
password: 7unaF1sh
- name: lucky
password: CH4rmz
groups: []
""",
"exc_type": ValueError,
"error": "not a valid",
},
# join auth, missing id value
{
"yaml": """
resource_type: ceph.smb.join.auth
auth_id: ""
""",
"exc_type": ValueError,
"error": "auth_id",
},
],
)
def test_load_error(params):
@ -642,3 +748,93 @@ login_control:
assert share.login_control[3].name == 'delbard'
assert share.login_control[3].category == enums.LoginCategory.USER
assert share.login_control[3].access == enums.LoginAccess.NONE
@pytest.mark.parametrize(
"params",
[
# single share json
{
"txt": """
{
"resource_type": "ceph.smb.share",
"cluster_id": "foo",
"share_id": "bar",
"cephfs": {"volume": "zippy", "path": "/"}
}
""",
'simplified': [
{
'resource_type': 'ceph.smb.share',
'cluster_id': 'foo',
'share_id': 'bar',
'intent': 'present',
'name': 'bar',
'cephfs': {
'volume': 'zippy',
'path': '/',
'provider': 'samba-vfs',
},
'browseable': True,
'readonly': False,
}
],
},
# single share yaml
{
"txt": """
resource_type: ceph.smb.share
cluster_id: foo
share_id: bar
cephfs: {volume: zippy, path: /}
""",
'simplified': [
{
'resource_type': 'ceph.smb.share',
'cluster_id': 'foo',
'share_id': 'bar',
'intent': 'present',
'name': 'bar',
'cephfs': {
'volume': 'zippy',
'path': '/',
'provider': 'samba-vfs',
},
'browseable': True,
'readonly': False,
}
],
},
# invalid share yaml
{
"txt": """
resource_type: ceph.smb.share
""",
'exc_type': ValueError,
'error': 'missing',
},
# invalid input
{
"txt": """
:
""",
'exc_type': ValueError,
'error': 'parsing',
},
# invalid json, but useless yaml
{
"txt": """
slithy
""",
'exc_type': ValueError,
'error': 'input',
},
],
)
def test_load_text(params):
if 'simplified' in params:
loaded = smb.resources.load_text(params['txt'])
assert params['simplified'] == [r.to_simplified() for r in loaded]
else:
with pytest.raises(params['exc_type'], match=params['error']):
smb.resources.load_text(params['txt'])

View File

@ -555,6 +555,24 @@ def test_cluster_create_user1(tmodule):
assert len(result.src.user_group_settings) == 1
def test_cluster_create_user2(tmodule):
_example_cfg_1(tmodule)
result = tmodule.cluster_create(
'dizzle',
smb.enums.AuthMode.USER,
define_user_pass=['alice%123letmein', 'bob%1n0wh4t1t15'],
)
assert result.success
assert result.status['state'] == 'created'
assert result.src.cluster_id == 'dizzle'
assert len(result.src.user_group_settings) == 1
assert (
result.src.user_group_settings[0].source_type
== smb.enums.UserGroupSourceType.RESOURCE
)
def test_cluster_create_badpass(tmodule):
_example_cfg_1(tmodule)
@ -649,3 +667,73 @@ domain_settings:
ref: foo
""".strip()
)
def test_apply_invalid_res(tmodule):
result = tmodule.apply_resources(
"""
resource_type: ceph.smb.cluster
cluster_id: ""
auth_mode: doop
"""
)
assert not result.success
assert 'doop' in result.to_simplified()['results'][0]['msg']
def test_show_all(tmodule):
_example_cfg_1(tmodule)
out = tmodule.show()
assert 'resources' in out
res = out['resources']
assert len(res) == 4
assert {r['resource_type'] for r in res} == {
'ceph.smb.cluster',
'ceph.smb.share',
'ceph.smb.join.auth',
}
def test_show_shares(tmodule):
_example_cfg_1(tmodule)
out = tmodule.show(['ceph.smb.share'])
assert 'resources' in out
res = out['resources']
assert len(res) == 2
assert {r['resource_type'] for r in res} == {
'ceph.smb.share',
}
def test_show_shares_in_cluster(tmodule):
_example_cfg_1(tmodule)
out = tmodule.show(['ceph.smb.share.foo'])
assert 'resources' in out
res = out['resources']
assert len(res) == 2
assert {r['resource_type'] for r in res} == {
'ceph.smb.share',
}
assert {r['cluster_id'] for r in res} == {'foo'}
def test_show_specific_share(tmodule):
_example_cfg_1(tmodule)
out = tmodule.show(['ceph.smb.share.foo.s1'])
assert 'resources' not in out
assert out['resource_type'] == 'ceph.smb.share'
assert out['cluster_id'] == 'foo'
assert out['share_id'] == 's1'
def test_show_nomatches(tmodule):
_example_cfg_1(tmodule)
out = tmodule.show(['ceph.smb.share.foo.whoops'])
assert 'resources' in out
assert out['resources'] == []
def test_show_invalid_input(tmodule):
_example_cfg_1(tmodule)
with pytest.raises(smb.cli.InvalidInputValue):
tmodule.show(['ceph.smb.export'])

View File

@ -0,0 +1,43 @@
import pytest
import smb.utils
def test_one():
assert smb.utils.one(['a']) == 'a'
with pytest.raises(ValueError):
smb.utils.one([])
with pytest.raises(ValueError):
smb.utils.one(['a', 'b'])
def test_rand_name():
name = smb.utils.rand_name('bob')
assert name.startswith('bob')
assert len(name) == 11
name = smb.utils.rand_name('carla')
assert name.startswith('carla')
assert len(name) == 13
name = smb.utils.rand_name('dangeresque')
assert name.startswith('dangeresqu')
assert len(name) == 18
name = smb.utils.rand_name('fhqwhgadsfhqwhgadsfhqwhgads')
assert name.startswith('fhqwhgadsf')
assert len(name) == 18
name = smb.utils.rand_name('')
assert len(name) == 8
def test_checked():
assert smb.utils.checked('foo') == 'foo'
assert smb.utils.checked(77) == 77
assert smb.utils.checked(0) == 0
with pytest.raises(smb.utils.IsNoneError):
smb.utils.checked(None)
def test_ynbool():
assert smb.utils.ynbool(True) == 'Yes'
assert smb.utils.ynbool(False) == 'No'
# for giggles
assert smb.utils.ynbool(0) == 'No'

View File

@ -0,0 +1,46 @@
"""Assorted utility functions for smb mgr module."""
from typing import List, Optional, TypeVar
import random
import string
T = TypeVar('T')
def one(lst: List[T]) -> T:
"""Given a list, ensure that the list contains exactly one item and return
it. A ValueError will be raised in the case that the list does not contain
exactly one item.
"""
if len(lst) != 1:
raise ValueError("list does not contain exactly one element")
return lst[0]
class IsNoneError(ValueError):
"""A ValueError subclass raised by ``checked`` function."""
pass
def checked(v: Optional[T]) -> T:
"""Ensures the provided value is not a None or raises a IsNoneError.
Intended use is similar to an `assert v is not None` but more usable in
one-liners and list/dict/etc comprehensions.
"""
if v is None:
raise IsNoneError('value is None')
return v
def ynbool(value: bool) -> str:
"""Convert a bool to an smb.conf-style boolean string."""
return 'Yes' if value else 'No'
def rand_name(prefix: str, max_len: int = 18, suffix_len: int = 8) -> str:
trunc = prefix[: (max_len - suffix_len)]
suffix = ''.join(
random.choice(string.ascii_lowercase) for _ in range(suffix_len)
)
return f'{trunc}{suffix}'

View File

@ -26,7 +26,7 @@ def valid_id(value: str) -> bool:
def check_id(value: str) -> None:
"""Raise ValueError if value is not a valid ID."""
if not valid_id(value):
raise ValueError(f"{value:!r} is not a valid ID")
raise ValueError(f"{value!r} is not a valid ID")
def valid_share_name(value: str) -> bool:
@ -37,7 +37,7 @@ def valid_share_name(value: str) -> bool:
def check_share_name(value: str) -> None:
"""Raise ValueError if value is not a valid share name."""
if not valid_share_name(value):
raise ValueError(f"{value:!r} is not a valid share name")
raise ValueError(f"{value!r} is not a valid share name")
# alias for normpath so other smb libs can just import validation module