From 8ea6da9c2f6ffa1130d2cf9804b9ffa85d1c6e2c Mon Sep 17 00:00:00 2001 From: Chris PeBenito Date: Thu, 17 Jul 2014 09:39:06 -0400 Subject: [PATCH] Add user implementation and user query. TODOL: MLS default level and range criteria --- libapol/__init__.py | 1 + libapol/policyrep/__init__.py | 9 ++ libapol/policyrep/user.py | 54 +++++++++- libapol/userquery.py | 104 ++++++++++++++++++++ seinfo | 12 +++ tests/__init__.py | 1 + tests/userquery.conf | 179 ++++++++++++++++++++++++++++++++++ tests/userquery.py | 101 +++++++++++++++++++ 8 files changed, 460 insertions(+), 1 deletion(-) create mode 100644 libapol/userquery.py create mode 100644 tests/userquery.conf create mode 100644 tests/userquery.py diff --git a/libapol/__init__.py b/libapol/__init__.py index 10f20eb..e040d31 100644 --- a/libapol/__init__.py +++ b/libapol/__init__.py @@ -26,6 +26,7 @@ from policyrep import SELinuxPolicy # Component Queries import typequery +import userquery import boolquery import polcapquery diff --git a/libapol/policyrep/__init__.py b/libapol/policyrep/__init__.py index a1dd918..563cb13 100644 --- a/libapol/policyrep/__init__.py +++ b/libapol/policyrep/__init__.py @@ -91,6 +91,15 @@ class SELinuxPolicy(object): yield t qiter.next() + def users(self): + """Generator which yields all users.""" + + qiter = self.policy.get_user_iter() + while not qiter.end(): + yield user.User(self.policy, qpol.qpol_user_from_void(qiter.get_item())) + qiter.next() + + def bools(self): """Generator which yields all Booleans.""" diff --git a/libapol/policyrep/user.py b/libapol/policyrep/user.py index ccd8637..ac029f2 100644 --- a/libapol/policyrep/user.py +++ b/libapol/policyrep/user.py @@ -16,11 +16,63 @@ # License along with SETools. If not, see # . # + +import string + import setools.qpol as qpol + +import role +import mls import symbol class User(symbol.PolicySymbol): """A user.""" - pass + + @property + def roles(self): + """The user's set of roles.""" + + r = set() + + aiter = self.qpol_symbol.get_role_iter(self.policy) + while not aiter.end(): + item = role.Role( + self.policy, qpol.qpol_role_from_void(aiter.get_item())) + + # object_r is implicitly added to all roles by the compiler. + # technically it is incorrect to skip it, but policy writers + # and analysts don't expect to see it in results, and it + # will confuse, especially for set equality user queries. + if item != "object_r": + r.add(item) + + aiter.next() + + return r + + @property + def mls_default(self): + """The user's default MLS level.""" + return mls.MLSRange(self.policy, self.qpol_symbol.get_range(self.policy)) + + @property + def mls_range(self): + """The user's MLS range.""" + return mls.MLSLevel(self.policy, self.qpol_symbol.get_dfltlevel(self.policy)) + + def statement(self): + roles = list(self.roles) + stmt = "user {0} ".format(self) + if (len(roles) > 1): + stmt += "{{ {0} }}".format(string.join(str(r) for r in roles)) + else: + stmt += str(roles[0]) + + try: + stmt += " level {0.mls_default} range {0.mls_range};".format(self) + except AttributeError: + stmt += ";" + + return stmt diff --git a/libapol/userquery.py b/libapol/userquery.py new file mode 100644 index 0000000..1201d37 --- /dev/null +++ b/libapol/userquery.py @@ -0,0 +1,104 @@ +# Copyright 2014, Tresys Technology, LLC +# +# This file is part of SETools. +# +# SETools is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation, either version 2.1 of +# the License, or (at your option) any later version. +# +# SETools is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with SETools. If not, see +# . +# +import re + +import compquery + + +class UserQuery(compquery.ComponentQuery): + + """Query SELinux policy users.""" + + def __init__(self, policy, + name="", name_regex=False, + roles=set(), roles_equal=False, roles_regex=False): + """ + Parameter: + policy The policy to query. + name The user name to match. + name_regex If true, regular expression matching + will be used on the user names. + roles The attribute to match. + roles_equal If true, only types with role sets + that are equal to the criteria will + match. Otherwise, any intersection + will match. + roles_regex If true, regular expression matching + will be used on the role names. + """ + + self.policy = policy + self.set_name(name, regex=name_regex) + self.set_roles(roles, regex=roles_regex, equal=roles_equal) + + def results(self): + """Generator which yields all matching users.""" + + for u in self.policy.users(): + if self.name and not self._match_regex( + u, + self.name, + self.name_regex, + self.name_cmp): + continue + + if self.roles and not self._match_regex_or_set( + set(str(r) for r in u.roles), + self.roles, + self.roles_equal, + self.roles_regex, + self.roles_cmp): + continue + + # TODO: default level and range + + yield u + + def set_roles(self, roles, **opts): + """ + Set the criteria for the users's roles. + + Parameter: + roles Name to match the component's attributes. + + Keyword Options: + regex If true, regular expression matching will be used. + equal If true, the role set of the user + must equal the attributes criteria to + match. If false, any intersection in the + critera will cause a rule match. + + Exceptions: + NameError Invalid keyword option. + """ + + self.roles = roles + + for k in opts.keys(): + if k == "regex": + self.roles_regex = opts[k] + elif k == "equal": + self.roles_equal = opts[k] + else: + raise NameError("Invalid roles option: {0}".format(k)) + + if self.roles_regex: + self.roles_cmp = re.compile(self.roles) + else: + self.roles_cmp = None diff --git a/seinfo b/seinfo index 5fc8f86..109c941 100755 --- a/seinfo +++ b/seinfo @@ -108,3 +108,15 @@ if args.polcapquery: print(cap.statement()) else: print(cap) + +if args.userquery: + if isinstance(args.userquery, str): + q = libapol.userquery.UserQuery(p, args.userquery) + else: + q = libapol.userquery.UserQuery(p) + + for u in sorted(q.results()): + if args.expand: + print(u.statement()) + else: + print(u) diff --git a/tests/__init__.py b/tests/__init__.py index 3fd517e..e7dce8f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -21,3 +21,4 @@ import polcapquery import infoflow import terulequery import typequery +import userquery diff --git a/tests/userquery.conf b/tests/userquery.conf new file mode 100644 index 0000000..35a4e24 --- /dev/null +++ b/tests/userquery.conf @@ -0,0 +1,179 @@ +class infoflow +class infoflow2 +class infoflow3 +class infoflow4 +class infoflow5 +class infoflow6 +class infoflow7 + +sid kernel +sid security + +common infoflow +{ + low_w + med_w + hi_w + low_r + med_r + hi_r +} + +class infoflow +inherits infoflow + +class infoflow2 +inherits infoflow +{ + super_w + super_r +} + +class infoflow3 +{ + null +} + +class infoflow4 +inherits infoflow + +class infoflow5 +inherits infoflow + +class infoflow6 +inherits infoflow + +class infoflow7 +inherits infoflow +{ + super_w + super_r + super_none + super_both + super_unmapped +} + +sensitivity low_s; +sensitivity medium_s alias med; +sensitivity high_s; + +dominance { low_s med high_s } + +category here; +category there; +category elsewhere alias lost; + +#level decl +level low_s:here.there; +level med:here, elsewhere; +level high_s:here.lost; + +#some constraints +mlsconstrain infoflow hi_r ((l1 dom l2) or (t1 == mls_exempt)); + +attribute mls_exempt; + +type system; +role system; +role system types system; + +################################################################################ +# Type enforcement declarations and rules + +######################################## +# +# User Query +# + +role test1_r; + +role test2a_r; +role test2b_r; + +role test10a_r; +role test10b_r; +role test10c_r; + +role test11a_r; +role test11b_r; +role test11c_r; + +role test12a_r; +role test12b_r; +role test12c_r; + +# test 1 +# name: test1_u +# roles: unset +user test1_u roles test1_r level med range med; + +# test 2 +# name: test2_u(1|2) regex +# roles: unset +user test2_u1 roles test2a_r level med range med; +user test2_u2 roles test2b_r level med range med; + +# test 10 +# name: unset +# roles: test10a_r,test10b_r +user test10_u1 roles test10a_r level med range med; +user test10_u2 roles { test10a_r test10b_r } level med range med; +user test10_u3 roles { test10a_r test10b_r test10c_r } level med range med; +user test10_u4 roles { test10b_r test10c_r } level med range med; +user test10_u5 roles { test10a_r test10c_r } level med range med; +user test10_u6 roles test10b_r level med range med; +user test10_u7 roles test10c_r level med range med; + +# test 11 +# name: unset +# roles: test11a_r,test11b_r equal +user test11_u1 roles test11a_r level med range med; +user test11_u2 roles { test11a_r test11b_r } level med range med; +user test11_u3 roles { test11a_r test11b_r test11c_r } level med range med; +user test11_u4 roles { test11b_r test11c_r } level med range med; +user test11_u5 roles { test11a_r test11c_r } level med range med; +user test11_u6 roles test11b_r level med range med; +user test11_u7 roles test11c_r level med range med; + +# test 12 +# name: unset +# roles: test12(a|b)_r regex +user test12_u1 roles test12a_r level med range med; +user test12_u2 roles { test12a_r test12b_r } level med range med; +user test12_u3 roles { test12a_r test12b_r test12c_r } level med range med; +user test12_u4 roles { test12b_r test12c_r } level med range med; +user test12_u5 roles { test12a_r test12c_r } level med range med; +user test12_u6 roles test12b_r level med range med; +user test12_u7 roles test12c_r level med range med; + + + +################################################################################ + +#users +user system roles system level med range low_s - high_s:here.lost; + +#normal constraints +constrain infoflow hi_w (u1 == u2); + +#isids +sid kernel system:system:system:medium_s:here +sid security system:system:system:high_s:lost + +#fs_use +fs_use_trans devpts system:object_r:system:low_s; +fs_use_xattr ext3 system:object_r:system:low_s; +fs_use_task pipefs system:object_r:system:low_s; + +#genfscon +genfscon proc / system:object_r:system:med +genfscon proc /sys system:object_r:system:low_s +genfscon selinuxfs / system:object_r:system:high_s:here.there + +portcon tcp 80 system:object_r:system:low_s + +netifcon eth0 system:object_r:system:low_s system:object_r:system:low_s + +nodecon 127.0.0.1 255.255.255.255 system:object_r:system:low_s:here +nodecon ::1 ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff system:object_r:system:low_s:here + diff --git a/tests/userquery.py b/tests/userquery.py new file mode 100644 index 0000000..5b669ea --- /dev/null +++ b/tests/userquery.py @@ -0,0 +1,101 @@ +# Copyright 2014, Tresys Technology, LLC +# +# This file is part of SETools. +# +# SETools is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# SETools is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with SETools. If not, see . +# +import unittest + +from libapol import SELinuxPolicy +from libapol.userquery import UserQuery + + +class UserQueryTest(unittest.TestCase): + + def setUp(self): + self.p = SELinuxPolicy("tests/userquery.conf") + + def test_000_unset(self): + """User query with no criteria.""" + # query with no parameters gets all types. + for numusers, u in enumerate(self.p.users(), start=1): + pass + + q = UserQuery(self.p) + for q_numusers, u in enumerate(q.results(), start=1): + pass + + self.assertEqual(numusers, q_numusers) + + def test_001_name_exact(self): + """User query with exact name match.""" + q = UserQuery(self.p, name="test1_u") + + # manually consume the generator: + u = sorted(q.results()) + self.assertEqual(len(u), 1) + + self.assertEqual(u[0], "test1_u") + + def test_002_name_regex(self): + """User query with regex name match.""" + q = UserQuery(self.p, name="test2_u(1|2)", name_regex=True) + + # manually consume the generator: + u = sorted(q.results()) + self.assertEqual(len(u), 2) + + self.assertEqual(u[0], "test2_u1") + self.assertEqual(u[1], "test2_u2") + + def test_010_role_intersect(self): + """User query with role set intersection.""" + q = UserQuery(self.p, roles=["test10a_r", "test10b_r"]) + + # manually consume the generator: + u = sorted(q.results()) + self.assertEqual(len(u), 6) + + self.assertEqual(u[0], "test10_u1") + self.assertEqual(u[1], "test10_u2") + self.assertEqual(u[2], "test10_u3") + self.assertEqual(u[3], "test10_u4") + self.assertEqual(u[4], "test10_u5") + self.assertEqual(u[5], "test10_u6") + + def test_011_role_equality(self): + """User query with role set equality.""" + q = UserQuery( + self.p, roles=["test11a_r", "test11b_r"], roles_equal=True) + + # manually consume the generator: + u = sorted(q.results()) + self.assertEqual(len(u), 1) + + self.assertEqual(u[0], "test11_u2") + + def test_012_role_regex(self): + """User query with role regex match.""" + q = UserQuery(self.p, roles="test12(a|b)_r", roles_regex=True) + + # manually consume the generator: + u = sorted(q.results()) + self.assertEqual(len(u), 6) + + self.assertEqual(u[0], "test12_u1") + self.assertEqual(u[1], "test12_u2") + self.assertEqual(u[2], "test12_u3") + self.assertEqual(u[3], "test12_u4") + self.assertEqual(u[4], "test12_u5") + self.assertEqual(u[5], "test12_u6")