Merge pull request from pebenito/refine-descriptors

Refine descriptors
This commit is contained in:
Chris PeBenito 2023-02-07 09:28:56 -05:00 committed by GitHub
commit 4fb753e31a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 51 additions and 63 deletions

View File

@ -28,7 +28,7 @@ class ConfigDescriptor(CriteriaDescriptor):
def __set__(self, obj, value):
if not value:
self.instances[obj] = None
setattr(obj, self.name, None)
else:
try:
super().__set__(obj, value.strip())
@ -69,7 +69,7 @@ class ConfigSetDescriptor(CriteriaDescriptor):
def __set__(self, obj, value):
if not value:
self.instances[obj] = frozenset()
setattr(obj, self.name, frozenset())
else:
log = obj.log
if callable(self.lookup_function):
@ -94,7 +94,7 @@ class ConfigSetDescriptor(CriteriaDescriptor):
log.info("{}: Invalid {} item: {}".format(
obj.checkname, self.name, e))
self.instances[obj] = frozenset(ret)
setattr(obj, self.name, frozenset(ret))
class ConfigPermissionSetDescriptor(CriteriaPermissionSetDescriptor):
@ -114,7 +114,7 @@ class ConfigPermissionSetDescriptor(CriteriaPermissionSetDescriptor):
def __set__(self, obj, value):
if not value:
self.instances[obj] = frozenset()
setattr(obj, self.name, frozenset())
else:
try:
super().__set__(obj, (v for v in value.split(" ") if v))

View File

@ -18,8 +18,7 @@ from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Collection
from enum import Enum
from typing import Any, Callable, MutableMapping, Optional, Type, Union
from weakref import WeakKeyDictionary
from typing import Callable, Optional, Type, Union
from .util import validate_perms_any
@ -67,37 +66,31 @@ class CriteriaDescriptor:
self.default_value = default_value
self.lookup_function: Optional[Union[Callable, str]] = lookup_function
self.enum_class = enum_class
self.name: str = ""
# use weak references so instances can be
# garbage collected, rather than unnecessarily
# kept around due to this descriptor.
self.instances: MutableMapping = WeakKeyDictionary()
def __set_name__(self, owner, name):
self.name = name
self.name = f"_internal_{name}"
def __get__(self, obj, objtype=None):
if obj is None:
return self
return self.instances.setdefault(obj, self.default_value)
return getattr(obj, self.name, self.default_value)
def __set__(self, obj, value):
if not value:
self.instances[obj] = self.default_value
setattr(obj, self.name, self.default_value)
elif self.regex and getattr(obj, self.regex, False):
self.instances[obj] = re.compile(value)
setattr(obj, self.name, re.compile(value))
elif self.lookup_function:
if callable(self.lookup_function):
lookup = self.lookup_function
else:
lookup = getattr(obj.policy, self.lookup_function)
self.instances[obj] = lookup(value)
setattr(obj, self.name, lookup(value))
elif self.enum_class:
self.instances[obj] = self.enum_class.lookup(value)
setattr(obj, self.name, self.enum_class.lookup(value))
else:
self.instances[obj] = value
setattr(obj, self.name, value)
class CriteriaSetDescriptor(CriteriaDescriptor):
@ -106,19 +99,19 @@ class CriteriaSetDescriptor(CriteriaDescriptor):
def __set__(self, obj, value):
if not value:
self.instances[obj] = self.default_value
setattr(obj, self.name, self.default_value)
elif self.regex and getattr(obj, self.regex, False):
self.instances[obj] = re.compile(value)
setattr(obj, self.name, re.compile(value))
elif self.lookup_function:
if callable(self.lookup_function):
lookup = self.lookup_function
else:
lookup = getattr(obj.policy, self.lookup_function)
self.instances[obj] = frozenset(lookup(v) for v in value)
setattr(obj, self.name, frozenset(lookup(v) for v in value))
elif self.enum_class:
self.instances[obj] = frozenset(self.enum_class.lookup(v) for v in value)
setattr(obj, self.name, frozenset(self.enum_class.lookup(v) for v in value))
else:
self.instances[obj] = frozenset(value)
setattr(obj, self.name, frozenset(value))
class CriteriaPermissionSetDescriptor(CriteriaDescriptor):
@ -146,18 +139,12 @@ class CriteriaPermissionSetDescriptor(CriteriaDescriptor):
def __init__(self, name_regex: Optional[str] = None, default_value=None) -> None:
self.regex: Optional[str] = name_regex
self.default_value = default_value
self.name: str = ""
# use weak references so instances can be
# garbage collected, rather than unnecessarily
# kept around due to this descriptor.
self.instances: MutableMapping = WeakKeyDictionary()
def __set__(self, obj, value):
if not value:
self.instances[obj] = self.default_value
setattr(obj, self.name, self.default_value)
elif self.regex and getattr(obj, self.regex, False):
self.instances[obj] = re.compile(value)
setattr(obj, self.name, re.compile(value))
else:
perms = frozenset(v for v in value)
@ -173,7 +160,7 @@ class CriteriaPermissionSetDescriptor(CriteriaDescriptor):
tclass=tclass,
policy=obj.policy)
self.instances[obj] = perms
setattr(obj, self.name, perms)
#
@ -189,8 +176,8 @@ class NetworkXGraphEdgeDescriptor(ABC):
"""
Descriptor abstract base class for NetworkX graph edge attributes.
Parameter:
name The edge property name
Keyword Parameter:
name Override the graph edge property name.
Instance class attribute use (obj parameter):
G The NetworkX graph
@ -198,8 +185,11 @@ class NetworkXGraphEdgeDescriptor(ABC):
target The edge's target node
"""
def __init__(self, propname: str) -> None:
self.name = propname
def __init__(self, propname: Optional[str] = None) -> None:
self.override_name = propname
def __set_name__(self, owner, name):
self.name = self.override_name if self.override_name else name
def __get__(self, obj, objtype=None):
if obj is None:
@ -278,7 +268,6 @@ class PermissionMapDescriptor:
Descriptor for Permission Map mappings.
Parameter:
name The map setting name.
validator A callable for validating the setting.
Instance class attribute use (obj parameter):
@ -287,10 +276,12 @@ class PermissionMapDescriptor:
perm The mapping's permission
"""
def __init__(self, propname: str, validator: Callable):
self.name: str = propname
def __init__(self, validator: Callable):
self.validator: Callable = validator
def __set_name__(self, owner, name):
self.name = name
def __get__(self, obj, objtype=None):
if obj is None:
return self

View File

@ -2,8 +2,6 @@
#
# SPDX-License-Identifier: LGPL-2.1-only
#
from typing import MutableMapping
from weakref import WeakKeyDictionary
class DiffResultDescriptor:
@ -16,23 +14,21 @@ class DiffResultDescriptor:
def __init__(self, diff_function: str) -> None:
self.diff_function = diff_function
# use weak references so instances can be
# garbage collected, rather than unnecessarily
# kept around due to this descriptor.
self.instances: MutableMapping = WeakKeyDictionary()
def __set_name__(self, owner, name):
self.name = f"_internal_{name}"
def __get__(self, obj, objtype=None):
if obj is None:
return self
if self.instances.setdefault(obj, None) is None:
if getattr(obj, self.name, None) is None:
diff = getattr(obj, self.diff_function)
diff()
return self.instances[obj]
return getattr(obj, self.name)
def __set__(self, obj, value):
self.instances[obj] = value
setattr(obj, self.name, value)
def __delete__(self, obj):
self.instances[obj] = None
setattr(obj, self.name, None)

View File

@ -2,7 +2,7 @@
#
from typing import DefaultDict, Dict, List, Optional, TypeVar
from ..policyrep import PolicyObject, SELinuxPolicy
from ..policyrep import PolicyEnum, PolicyObject, SELinuxPolicy
from .difference import Wrapper, SymbolWrapper
@ -12,4 +12,5 @@ U = TypeVar("U", bound=Wrapper)
Cache = DefaultDict[SELinuxPolicy, Dict[T, U]]
SymbolCache = Cache[T, SymbolWrapper[T]]
RuleList = Optional[DefaultDict[T, List[U]]]
E = TypeVar("E", bound=PolicyEnum)
RuleList = Optional[DefaultDict[E, List[T]]]

View File

@ -595,13 +595,13 @@ class Edge:
The default is False.
"""
transition = EdgeAttrList('transition')
setexec = EdgeAttrList('setexec')
dyntransition = EdgeAttrList('dyntransition')
setcurrent = EdgeAttrList('setcurrent')
entrypoint = EdgeAttrDict('entrypoint')
execute = EdgeAttrDict('execute')
type_transition = EdgeAttrDict('type_transition')
transition = EdgeAttrList()
setexec = EdgeAttrList()
dyntransition = EdgeAttrList()
setcurrent = EdgeAttrList()
entrypoint = EdgeAttrDict()
execute = EdgeAttrDict()
type_transition = EdgeAttrDict()
def __init__(self, graph, source: Type, target: Type, create: bool = False) -> None:
self.G = graph

View File

@ -405,7 +405,7 @@ class InfoFlowStep:
The default is False.
"""
rules = EdgeAttrList('rules')
rules = EdgeAttrList()
# use capacity to store the info flow weight so
# we can use network flow algorithms naturally.

View File

@ -52,9 +52,9 @@ class Mapping:
"""A mapping for a permission in the permission map."""
weight = PermissionMapDescriptor("weight", validate_weight)
direction = PermissionMapDescriptor("direction", validate_direction)
enabled = PermissionMapDescriptor("enabled", bool)
weight = PermissionMapDescriptor(validate_weight)
direction = PermissionMapDescriptor(validate_direction)
enabled = PermissionMapDescriptor(bool)
class_: str
perm: str