mirror of
https://github.com/SELinuxProject/setools
synced 2025-03-19 17:53:56 +00:00
Conditional: Convert to direct sepol structure access. Add iterators.
This commit is contained in:
parent
71cfccce06
commit
aeecacc896
@ -23,65 +23,8 @@ from collections import namedtuple
|
||||
|
||||
truth_table_row = namedtuple("truth_table_row", ["values", "result"])
|
||||
|
||||
|
||||
#
|
||||
# Conditional expression factory functions
|
||||
#
|
||||
cdef dict _cond_cache = {}
|
||||
|
||||
cdef inline Conditional conditional_factory_iter(SELinuxPolicy policy, QpolIteratorItem symbol):
|
||||
"""Factory function variant for iterating over Conditional objects."""
|
||||
return conditional_factory(policy, <const qpol_cond_t *> symbol.obj)
|
||||
|
||||
|
||||
cdef inline Conditional conditional_factory(SELinuxPolicy policy, const qpol_cond_t *symbol):
|
||||
"""Factory function for creating Conditional objects."""
|
||||
try:
|
||||
return _cond_cache[<uintptr_t>symbol]
|
||||
except KeyError:
|
||||
c = Conditional()
|
||||
c.policy = policy
|
||||
c.handle = symbol
|
||||
_cond_cache[<uintptr_t>symbol] = c
|
||||
return c
|
||||
|
||||
#
|
||||
# Conditional node factory functions
|
||||
#
|
||||
cdef inline object conditional_node_factory_iter(SELinuxPolicy policy, QpolIteratorItem item):
|
||||
"""Factory function variant for iterating over conditional node objects."""
|
||||
cdef const qpol_cond_expr_node_t *symbol = <const qpol_cond_expr_node_t *> item.obj
|
||||
cdef uint32_t et
|
||||
cdef qpol_bool_t *b
|
||||
|
||||
# Determine if this node is a Boolean or an operator
|
||||
if qpol_cond_expr_node_get_expr_type(policy.handle, symbol, &et):
|
||||
ex = LowLevelPolicyError("Error reading conditional expression node type: {}".format(
|
||||
strerror(errno)))
|
||||
ex.errno = errno
|
||||
raise ex
|
||||
|
||||
if (et == QPOL_COND_EXPR_BOOL):
|
||||
if qpol_cond_expr_node_get_bool(policy.handle, symbol, &b):
|
||||
ex = LowLevelPolicyError("Error reading boolean from conditional expression node: {}".
|
||||
format(strerror(errno)))
|
||||
ex.errno = errno
|
||||
raise ex
|
||||
|
||||
return Boolean.factory(policy, <sepol.cond_bool_datum_t *>b)
|
||||
|
||||
else:
|
||||
return conditional_op_factory(policy, symbol)
|
||||
|
||||
|
||||
cdef inline ConditionalOperator conditional_op_factory(SELinuxPolicy policy, const qpol_cond_expr_node_t *symbol):
|
||||
"""Factory function for creating conditional node objects."""
|
||||
op = ConditionalOperator()
|
||||
op.policy = policy
|
||||
op.handle = symbol
|
||||
return op
|
||||
|
||||
|
||||
#
|
||||
# Classes
|
||||
#
|
||||
@ -120,7 +63,19 @@ cdef class Conditional(PolicySymbol):
|
||||
|
||||
"""A conditional policy block."""
|
||||
|
||||
cdef const qpol_cond_t *handle
|
||||
cdef sepol.cond_node_t *handle
|
||||
|
||||
@staticmethod
|
||||
cdef factory(SELinuxPolicy policy, sepol.cond_node_t *symbol):
|
||||
"""Factory function for creating Conditional objects."""
|
||||
try:
|
||||
return _cond_cache[<uintptr_t>symbol]
|
||||
except KeyError:
|
||||
c = Conditional()
|
||||
c.policy = policy
|
||||
c.handle = symbol
|
||||
_cond_cache[<uintptr_t>symbol] = c
|
||||
return c
|
||||
|
||||
def __contains__(self, other):
|
||||
for b in self.booleans:
|
||||
@ -247,52 +202,18 @@ cdef class Conditional(PolicySymbol):
|
||||
|
||||
def expression(self):
|
||||
"""Iterator over The conditional expression."""
|
||||
cdef qpol_iterator_t *iter;
|
||||
if qpol_cond_get_expr_node_iter(self.policy.handle, self.handle, &iter):
|
||||
raise MemoryError
|
||||
|
||||
return qpol_iterator_factory(self.policy, iter, conditional_node_factory_iter)
|
||||
return ConditionalExprIterator.factory(self.policy, <sepol.cond_expr_t *>self.handle.expr)
|
||||
|
||||
def false_rules(self):
|
||||
"""An iterator over the rules in the false (else) block of the conditional."""
|
||||
cdef qpol_iterator_t *av_iter
|
||||
cdef qpol_iterator_t *te_iter
|
||||
|
||||
cdef uint32_t av_rule_types = QPOL_RULE_ALLOW | QPOL_RULE_AUDITALLOW | QPOL_RULE_DONTAUDIT \
|
||||
| QPOL_RULE_XPERMS_ALLOW | QPOL_RULE_XPERMS_AUDITALLOW | QPOL_RULE_XPERMS_DONTAUDIT
|
||||
|
||||
cdef uint32_t te_rule_types = QPOL_RULE_TYPE_TRANS | QPOL_RULE_TYPE_CHANGE | QPOL_RULE_TYPE_MEMBER
|
||||
|
||||
if qpol_cond_get_av_false_iter(self.policy.handle, self.handle, av_rule_types, &av_iter):
|
||||
raise MemoryError
|
||||
|
||||
if qpol_cond_get_te_false_iter(self.policy.handle, self.handle, te_rule_types, &te_iter):
|
||||
raise MemoryError
|
||||
|
||||
return chain(qpol_iterator_factory(self.policy, av_iter, avrule_factory_iter),
|
||||
qpol_iterator_factory(self.policy, te_iter, terule_factory_iter))
|
||||
pass
|
||||
|
||||
def statement(self):
|
||||
raise NoStatement
|
||||
|
||||
def true_rules(self):
|
||||
"""An iterator over the rules in the true block of the conditional."""
|
||||
cdef qpol_iterator_t *av_iter
|
||||
cdef qpol_iterator_t *te_iter
|
||||
|
||||
cdef uint32_t av_rule_types = QPOL_RULE_ALLOW | QPOL_RULE_AUDITALLOW | QPOL_RULE_DONTAUDIT \
|
||||
| QPOL_RULE_XPERMS_ALLOW | QPOL_RULE_XPERMS_AUDITALLOW | QPOL_RULE_XPERMS_DONTAUDIT
|
||||
|
||||
cdef uint32_t te_rule_types = QPOL_RULE_TYPE_TRANS | QPOL_RULE_TYPE_CHANGE | QPOL_RULE_TYPE_MEMBER
|
||||
|
||||
if qpol_cond_get_av_true_iter(self.policy.handle, self.handle, av_rule_types, &av_iter):
|
||||
raise MemoryError
|
||||
|
||||
if qpol_cond_get_te_true_iter(self.policy.handle, self.handle, te_rule_types, &te_iter):
|
||||
raise MemoryError
|
||||
|
||||
return chain(qpol_iterator_factory(self.policy, av_iter, avrule_factory_iter),
|
||||
qpol_iterator_factory(self.policy, te_iter, terule_factory_iter))
|
||||
pass
|
||||
|
||||
def truth_table(self):
|
||||
"""
|
||||
@ -327,52 +248,48 @@ cdef class ConditionalOperator(PolicySymbol):
|
||||
|
||||
"""A conditional expression operator"""
|
||||
|
||||
cdef const qpol_cond_expr_node_t *handle
|
||||
cdef sepol.cond_expr_t *handle
|
||||
|
||||
_cond_expr_val_to_text = {
|
||||
QPOL_COND_EXPR_NOT: "!",
|
||||
QPOL_COND_EXPR_OR: "||",
|
||||
QPOL_COND_EXPR_AND: "&&",
|
||||
QPOL_COND_EXPR_XOR: "^",
|
||||
QPOL_COND_EXPR_EQ: "==",
|
||||
QPOL_COND_EXPR_NEQ: "!="}
|
||||
sepol.COND_NOT: "!",
|
||||
sepol.COND_OR: "||",
|
||||
sepol.COND_AND: "&&",
|
||||
sepol.COND_XOR: "^",
|
||||
sepol.COND_EQ: "==",
|
||||
sepol.COND_NEQ: "!="}
|
||||
|
||||
_cond_expr_val_to_precedence = {
|
||||
QPOL_COND_EXPR_NOT: 5,
|
||||
QPOL_COND_EXPR_OR: 1,
|
||||
QPOL_COND_EXPR_AND: 3,
|
||||
QPOL_COND_EXPR_XOR: 2,
|
||||
QPOL_COND_EXPR_EQ: 4,
|
||||
QPOL_COND_EXPR_NEQ: 4}
|
||||
sepol.COND_NOT: 5,
|
||||
sepol.COND_OR: 1,
|
||||
sepol.COND_AND: 3,
|
||||
sepol.COND_XOR: 2,
|
||||
sepol.COND_EQ: 4,
|
||||
sepol.COND_NEQ: 4}
|
||||
|
||||
@staticmethod
|
||||
cdef factory(SELinuxPolicy policy, sepol.cond_expr_t *symbol):
|
||||
"""Factory function for conditional operators."""
|
||||
op = ConditionalOperator()
|
||||
op.policy = policy
|
||||
op.handle = symbol
|
||||
return op
|
||||
|
||||
def __str__(self):
|
||||
return self._cond_expr_val_to_text[self._type]
|
||||
return self._cond_expr_val_to_text[self.handle.expr_type]
|
||||
|
||||
def _eq(self, ConditionalOperator other):
|
||||
"""Low-level equality check (C pointers)."""
|
||||
return self.handle == other.handle
|
||||
|
||||
@property
|
||||
def _type(self):
|
||||
"""The type of operator."""
|
||||
cdef uint32_t et
|
||||
if qpol_cond_expr_node_get_expr_type(self.policy.handle, self.handle, &et):
|
||||
ex = LowLevelPolicyError("Error reading conditional expression node type: {}".format(
|
||||
strerror(errno)))
|
||||
ex.errno = errno
|
||||
raise ex
|
||||
|
||||
return et
|
||||
|
||||
@property
|
||||
def precedence(self):
|
||||
"""The precedence of this operator."""
|
||||
return self._cond_expr_val_to_precedence[self._type]
|
||||
return self._cond_expr_val_to_precedence[self.handle.expr_type]
|
||||
|
||||
@property
|
||||
def unary(self):
|
||||
"""T/F the operator is unary"""
|
||||
return self._type == QPOL_COND_EXPR_NOT
|
||||
return self.handle.expr_type == sepol.COND_NOT
|
||||
|
||||
|
||||
#
|
||||
@ -394,3 +311,92 @@ cdef class BooleanHashtabIterator(HashtabIterator):
|
||||
def __next__(self):
|
||||
super().__next__()
|
||||
return Boolean.factory(self.policy, <sepol.cond_bool_datum_t *>self.curr.datum)
|
||||
|
||||
|
||||
cdef class ConditionalIterator(PolicyIterator):
|
||||
|
||||
"""Conditionals iterator."""
|
||||
|
||||
cdef:
|
||||
sepol.cond_node_t *head
|
||||
sepol.cond_node_t *curr
|
||||
|
||||
@staticmethod
|
||||
cdef factory(SELinuxPolicy policy, sepol.cond_node_t *head):
|
||||
"""Constraint iterator factory."""
|
||||
c = ConditionalIterator()
|
||||
c.policy = policy
|
||||
c.head = head
|
||||
c.reset()
|
||||
return c
|
||||
|
||||
def __next__(self):
|
||||
if self.curr == NULL:
|
||||
raise StopIteration
|
||||
|
||||
item = Conditional.factory(self.policy, self.curr)
|
||||
self.curr = self.curr.next
|
||||
return item
|
||||
|
||||
def __len__(self):
|
||||
cdef:
|
||||
sepol.cond_node_t *curr
|
||||
size_t count = 0
|
||||
|
||||
curr = self.head
|
||||
while curr != NULL:
|
||||
count += 1
|
||||
curr = curr.next
|
||||
|
||||
return count
|
||||
|
||||
def reset(self):
|
||||
"""Reset the iterator back to the start."""
|
||||
self.curr = self.head
|
||||
|
||||
|
||||
cdef class ConditionalExprIterator(PolicyIterator):
|
||||
|
||||
"""Conditional expression iterator."""
|
||||
|
||||
cdef:
|
||||
sepol.cond_expr_t *head
|
||||
sepol.cond_expr_t *curr
|
||||
|
||||
@staticmethod
|
||||
cdef factory(SELinuxPolicy policy, sepol.cond_expr_t *head):
|
||||
"""Conditional expression iterator factory."""
|
||||
e = ConditionalExprIterator()
|
||||
e.policy = policy
|
||||
e.head = head
|
||||
e.reset()
|
||||
return e
|
||||
|
||||
def __next__(self):
|
||||
if self.curr == NULL:
|
||||
raise StopIteration
|
||||
|
||||
if self.curr.expr_type == sepol.COND_BOOL:
|
||||
item = Boolean.factory(self.policy,
|
||||
self.policy.handle.p.p.bool_val_to_struct[self.curr.bool - 1])
|
||||
else:
|
||||
item = ConditionalOperator.factory(self.policy, self.curr)
|
||||
|
||||
self.curr = self.curr.next
|
||||
return item
|
||||
|
||||
def __len__(self):
|
||||
cdef:
|
||||
sepol.cond_expr_t *curr
|
||||
size_t count = 0
|
||||
|
||||
curr = self.head
|
||||
while curr != NULL:
|
||||
count += 1
|
||||
curr = curr.next
|
||||
|
||||
return count
|
||||
|
||||
def reset(self):
|
||||
"""Reset the iterator back to the start."""
|
||||
self.curr = self.head
|
||||
|
@ -116,24 +116,6 @@ cdef extern from "include/qpol/cond_query.h":
|
||||
pass
|
||||
ctypedef struct qpol_cond_expr_node_t:
|
||||
pass
|
||||
cdef int QPOL_COND_RULE_LIST
|
||||
cdef int QPOL_COND_RULE_ENABLED
|
||||
int qpol_policy_get_cond_iter(const qpol_policy_t * policy, qpol_iterator_t ** iter)
|
||||
int qpol_cond_get_expr_node_iter(const qpol_policy_t * policy, const qpol_cond_t * cond, qpol_iterator_t ** iter)
|
||||
int qpol_cond_get_av_true_iter(const qpol_policy_t * policy, const qpol_cond_t * cond, uint32_t rule_type_mask, qpol_iterator_t ** iter)
|
||||
int qpol_cond_get_te_true_iter(const qpol_policy_t * policy, const qpol_cond_t * cond, uint32_t rule_type_mask, qpol_iterator_t ** iter)
|
||||
int qpol_cond_get_av_false_iter(const qpol_policy_t * policy, const qpol_cond_t * cond, uint32_t rule_type_mask, qpol_iterator_t ** iter)
|
||||
int qpol_cond_get_te_false_iter(const qpol_policy_t * policy, const qpol_cond_t * cond, uint32_t rule_type_mask, qpol_iterator_t ** iter)
|
||||
int qpol_cond_eval(const qpol_policy_t * policy, const qpol_cond_t * cond, uint32_t * is_true)
|
||||
cdef int QPOL_COND_EXPR_BOOL
|
||||
cdef int QPOL_COND_EXPR_NOT
|
||||
cdef int QPOL_COND_EXPR_OR
|
||||
cdef int QPOL_COND_EXPR_AND
|
||||
cdef int QPOL_COND_EXPR_XOR
|
||||
cdef int QPOL_COND_EXPR_EQ
|
||||
cdef int QPOL_COND_EXPR_NEQ
|
||||
int qpol_cond_expr_node_get_expr_type(const qpol_policy_t * policy, const qpol_cond_expr_node_t * node, uint32_t * expr_type)
|
||||
int qpol_cond_expr_node_get_bool(const qpol_policy_t * policy, const qpol_cond_expr_node_t * node, qpol_bool_t ** cond_bool)
|
||||
|
||||
cdef extern from "include/qpol/constraint_query.h":
|
||||
ctypedef struct qpol_constraint_t:
|
||||
|
@ -589,11 +589,7 @@ cdef class SELinuxPolicy:
|
||||
#
|
||||
def conditionals(self):
|
||||
"""Iterator over all conditional rule blocks."""
|
||||
cdef qpol_iterator_t *iter
|
||||
if qpol_policy_get_cond_iter(self.handle, &iter):
|
||||
raise MemoryError
|
||||
|
||||
return qpol_iterator_factory(self, iter, conditional_factory_iter)
|
||||
return ConditionalIterator.factory(self, self.handle.p.p.cond_list)
|
||||
|
||||
def mlsrules(self):
|
||||
"""Iterator over all MLS rules."""
|
||||
|
@ -274,7 +274,7 @@ cdef class AVRule(PolicyRule):
|
||||
raise ex
|
||||
|
||||
if c:
|
||||
return conditional_factory(self.policy, c)
|
||||
return Conditional.factory(self.policy, <sepol.cond_node_t *>c)
|
||||
else:
|
||||
raise RuleNotConditional
|
||||
|
||||
@ -567,7 +567,7 @@ cdef class TERule(PolicyRule):
|
||||
raise ex
|
||||
|
||||
if c:
|
||||
return conditional_factory(self.policy, c)
|
||||
return Conditional.factory(self.policy, <sepol.cond_node_t *>c)
|
||||
else:
|
||||
raise RuleNotConditional
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user