SELinuxPolicy: Move methods.

No code changes.
This commit is contained in:
Chris PeBenito 2018-08-15 11:01:23 -04:00
parent be87732a65
commit 47f2f59dbc

View File

@ -95,131 +95,9 @@ cdef class SELinuxPolicy:
memo[id(self)] = self
return self
#
# Policy loading functions
#
cdef _load_policy(self, str filename):
"""Load the specified policy."""
cdef:
sepol.sepol_policy_file_t *pfile = NULL
FILE *infile = NULL
self.log.info("Opening SELinux policy \"{0}\"".format(filename))
self.sh = sepol.sepol_handle_create()
if self.sh == NULL:
raise MemoryError
sepol.sepol_msg_set_callback(self.sh, sepol_logging_callback, self.handle)
if sepol.sepol_policydb_create(&self.handle) < 0:
raise MemoryError
if sepol.sepol_policy_file_create(&pfile) < 0:
raise MemoryError
infile = fopen(filename, "rb")
if infile == NULL:
PyErr_SetFromErrnoWithFilename(OSError, filename)
sepol.sepol_policy_file_set_handle(pfile, self.sh)
sepol.sepol_policy_file_set_fp(pfile, infile)
if sepol.sepol_policydb_read(self.handle, pfile) < 0:
raise InvalidPolicy("Invalid policy: {}. A binary policy must be specified. "
"(use e.g. policy.{} or sepolicy) Source policies are not "
"supported.".format(filename,
sepol.sepol_policy_kern_vers_max()))
fclose(infile)
sepol.sepol_policy_file_free(pfile)
#
# Load policy properties
#
self.handle_unknown = HandleUnknown(self.handle.p.handle_unknown)
self.target_platform = PolicyTarget(self.handle.p.target_platform)
self.version = self.handle.p.policyvers
self.mls = <bint>self.handle.p.mls
#
# (Re)create data structures
#
if self.handle.p.attr_type_map != NULL:
self._rebuild_attrs_from_map()
# if source policies are supported in the
# future this should only run on the
# kernel policy:
#self._synthesize_attrs()
self._set_permissive_flags()
if self.mls:
self._create_mls_val_to_struct()
self.log.info("Successfully opened SELinux policy \"{0}\"".format(filename))
self.path = filename
cdef _load_running_policy(self):
"""Try to load the current running policy."""
cdef:
int min_ver = sepol.sepol_policy_kern_vers_min()
int max_ver = sepol.sepol_policy_kern_vers_max()
const char *base_policy_path = selinux.selinux_binary_policy_path()
const char *current_policy_path = selinux.selinux_current_policy_path()
list potential_policies = []
self.log.info("Attempting to locate current running policy.")
self.log.debug("SELinuxfs exists: {}".format(selinux.selinuxfs_exists()))
self.log.debug("Sepol version range: {}-{}".format(min_ver, max_ver))
self.log.debug("Current policy path: {}".format(current_policy_path
if current_policy_path != NULL else None))
self.log.debug("Binary policy path: {}".format(base_policy_path
if base_policy_path != NULL else None))
# first try libselinux for current policy
if current_policy_path != NULL:
potential_policies.append(current_policy_path)
# look through the supported policy versions
if base_policy_path != NULL:
for version in range(max_ver, min_ver - 1, -1):
potential_policies.append("{0}.{1}".format(base_policy_path, version))
self.log.debug("Potential policies: {}".format(potential_policies))
for filename in potential_policies:
try:
self._load_policy(filename)
except OSError as err:
if err.errno != ENOENT:
raise
else:
break
else:
raise RuntimeError("Unable to locate an SELinux policy to load.")
#
# Policy statistics
#
cdef cache_constraint_counts(self):
"""Count all constraints in one iteration."""
if not self.constraint_counts:
self.constraint_counts = collections.Counter(r.ruletype for r in self.constraints())
cdef cache_terule_counts(self):
"""Count all TE rules in one iteration."""
if not self.terule_counts:
self.terule_counts = TERuleIterator.factory(self, &self.handle.p.te_avtab).ruletype_count()
self.terule_counts[TERuletype.type_transition.value] += \
len(FileNameTERuleIterator.factory(self, &self.handle.p.filename_trans))
for c in self.conditionals():
self.terule_counts.update(c.true_rules().ruletype_count())
self.terule_counts.update(c.false_rules().ruletype_count())
self.terule_counts
@property
def allow_count(self):
"""The number of (type) allow rules."""
@ -783,6 +661,106 @@ cdef class SELinuxPolicy:
#
# Internal methods
#
cdef _load_policy(self, str filename):
"""Load the specified policy."""
cdef:
sepol.sepol_policy_file_t *pfile = NULL
FILE *infile = NULL
self.log.info("Opening SELinux policy \"{0}\"".format(filename))
self.sh = sepol.sepol_handle_create()
if self.sh == NULL:
raise MemoryError
sepol.sepol_msg_set_callback(self.sh, sepol_logging_callback, self.handle)
if sepol.sepol_policydb_create(&self.handle) < 0:
raise MemoryError
if sepol.sepol_policy_file_create(&pfile) < 0:
raise MemoryError
infile = fopen(filename, "rb")
if infile == NULL:
PyErr_SetFromErrnoWithFilename(OSError, filename)
sepol.sepol_policy_file_set_handle(pfile, self.sh)
sepol.sepol_policy_file_set_fp(pfile, infile)
if sepol.sepol_policydb_read(self.handle, pfile) < 0:
raise InvalidPolicy("Invalid policy: {}. A binary policy must be specified. "
"(use e.g. policy.{} or sepolicy) Source policies are not "
"supported.".format(filename,
sepol.sepol_policy_kern_vers_max()))
fclose(infile)
sepol.sepol_policy_file_free(pfile)
#
# Load policy properties
#
self.handle_unknown = HandleUnknown(self.handle.p.handle_unknown)
self.target_platform = PolicyTarget(self.handle.p.target_platform)
self.version = self.handle.p.policyvers
self.mls = <bint>self.handle.p.mls
#
# (Re)create data structures
#
if self.handle.p.attr_type_map != NULL:
self._rebuild_attrs_from_map()
# if source policies are supported in the
# future this should only run on the
# kernel policy:
#self._synthesize_attrs()
self._set_permissive_flags()
if self.mls:
self._create_mls_val_to_struct()
self.log.info("Successfully opened SELinux policy \"{0}\"".format(filename))
self.path = filename
cdef _load_running_policy(self):
"""Try to load the current running policy."""
cdef:
int min_ver = sepol.sepol_policy_kern_vers_min()
int max_ver = sepol.sepol_policy_kern_vers_max()
const char *base_policy_path = selinux.selinux_binary_policy_path()
const char *current_policy_path = selinux.selinux_current_policy_path()
list potential_policies = []
self.log.info("Attempting to locate current running policy.")
self.log.debug("SELinuxfs exists: {}".format(selinux.selinuxfs_exists()))
self.log.debug("Sepol version range: {}-{}".format(min_ver, max_ver))
self.log.debug("Current policy path: {}".format(current_policy_path
if current_policy_path != NULL else None))
self.log.debug("Binary policy path: {}".format(base_policy_path
if base_policy_path != NULL else None))
# first try libselinux for current policy
if current_policy_path != NULL:
potential_policies.append(current_policy_path)
# look through the supported policy versions
if base_policy_path != NULL:
for version in range(max_ver, min_ver - 1, -1):
potential_policies.append("{0}.{1}".format(base_policy_path, version))
self.log.debug("Potential policies: {}".format(potential_policies))
for filename in potential_policies:
try:
self._load_policy(filename)
except OSError as err:
if err.errno != ENOENT:
raise
else:
break
else:
raise RuntimeError("Unable to locate an SELinux policy to load.")
cdef _set_permissive_flags(self):
"""
Set permissive flag in type datums.
@ -994,3 +972,19 @@ cdef class SELinuxPolicy:
# memory now owned by policydb, do not free
tmp_name = NULL
tmp_type = NULL
cdef cache_constraint_counts(self):
"""Count all constraints in one iteration."""
if not self.constraint_counts:
self.constraint_counts = collections.Counter(r.ruletype for r in self.constraints())
cdef cache_terule_counts(self):
"""Count all TE rules in one iteration."""
if not self.terule_counts:
self.terule_counts = TERuleIterator.factory(self, &self.handle.p.te_avtab).ruletype_count()
self.terule_counts[TERuletype.type_transition.value] += \
len(FileNameTERuleIterator.factory(self, &self.handle.p.filename_trans))
for c in self.conditionals():
self.terule_counts.update(c.true_rules().ruletype_count())
self.terule_counts.update(c.false_rules().ruletype_count())