Switch DTA to use TypeAttr objects in the graph instead of strings.

This commit is contained in:
Chris PeBenito 2014-11-09 13:57:35 -05:00
parent eec812b997
commit 7b617bf938
2 changed files with 67 additions and 49 deletions

View File

@ -115,12 +115,15 @@ class DomainTransitionAnalysis(object):
source, target, and rules for each
domain transition.
"""
s = self.policy.lookup_type(source)
t = self.policy.lookup_type(target)
if self.rebuildgraph:
self._build_graph()
if source in self.G and target in self.G:
if s in self.G and t in self.G:
try:
path = nx.shortest_path(self.G, source, target)
path = nx.shortest_path(self.G, s, t)
except nx.exception.NetworkXNoPath:
pass
else:
@ -143,12 +146,15 @@ class DomainTransitionAnalysis(object):
source, target, and rules for each
domain transition.
"""
s = self.policy.lookup_type(source)
t = self.policy.lookup_type(target)
if self.rebuildgraph:
self._build_graph()
if source in self.G and target in self.G:
if s in self.G and t in self.G:
try:
paths = nx.all_simple_paths(self.G, source, target, maxlen)
paths = nx.all_simple_paths(self.G, s, t, maxlen)
except nx.exception.NetworkXNoPath:
pass
else:
@ -170,25 +176,28 @@ class DomainTransitionAnalysis(object):
source, target, and rules for each
domain transition.
"""
s = self.policy.lookup_type(source)
t = self.policy.lookup_type(target)
if self.rebuildgraph:
self._build_graph()
if source in self.G and target in self.G:
if s in self.G and t in self.G:
try:
paths = nx.all_shortest_paths(self.G, source, target)
paths = nx.all_shortest_paths(self.G, s, t)
except nx.exception.NetworkXNoPath:
pass
else:
for p in paths:
yield self.__get_steps(p)
def transitions(self, source):
def transitions(self, type_):
"""
Generator which yields all domain transitions out of a
specified source type.
Parameters:
source The starting type.
type_ The starting type.
Yield: generator(steps)
@ -196,10 +205,12 @@ class DomainTransitionAnalysis(object):
source, target, and rules for each
domain transition.
"""
s = self.policy.lookup_type(type_)
if self.rebuildgraph:
self._build_graph()
for source, target in self.G.out_edges_iter(source):
for source, target in self.G.out_edges_iter(s):
yield source, target, \
self.G.edge[source][target]['transition'], \
self.__get_entrypoints(source, target), \
@ -289,8 +300,6 @@ class DomainTransitionAnalysis(object):
# 3. if the edge has an invalid dyntrans, clear the related
# lists on the edge.
#
# Note: strings are used for node names temporarily, until the
# string->TypeAttr object lookup code is implemented.
def _build_graph(self):
self.G.clear()
@ -310,7 +319,7 @@ class DomainTransitionAnalysis(object):
for r in self.policy.terules():
if r.ruletype == "allow":
if str(r.tclass) not in ["process", "file"]:
if r.tclass not in ["process", "file"]:
continue
perms = r.perms
@ -318,47 +327,47 @@ class DomainTransitionAnalysis(object):
if r.tclass == "process":
if "transition" in perms:
for s, t in itertools.product(
(str(s) for s in r.source.expand()),
(str(t) for t in r.target.expand())):
r.source.expand(),
r.target.expand()):
self.__add_edge(s, t)
self.G[s][t]['transition'].append(r)
if "dyntransition" in perms:
for s, t in itertools.product(
(str(s) for s in r.source.expand()),
(str(t) for t in r.target.expand())):
r.source.expand(),
r.target.expand()):
self.__add_edge(s, t)
self.G[s][t]['dyntransition'].append(r)
if "setexec" in perms:
for s in r.source.expand():
setexec[str(s)].append(r)
setexec[s].append(r)
if "setcurrent" in perms:
for s in r.source.expand():
setcurrent[str(s)].append(r)
setcurrent[s].append(r)
else:
if "execute" in perms:
for s, t in itertools.product(
(str(s) for s in r.source.expand()),
(str(t) for t in r.target.expand())):
r.source.expand(),
r.target.expand()):
execute[s][t].append(r)
if "entrypoint" in perms:
for s, t in itertools.product(
(str(s) for s in r.source.expand()),
(str(t) for t in r.target.expand())):
r.source.expand(),
r.target.expand()):
entrypoint[s][t].append(r)
elif r.ruletype == "type_transition":
if r.tclass != "process":
continue
d = str(r.default)
d = r.default
for s, t in itertools.product(
(str(s) for s in r.source.expand()),
(str(t) for t in r.target.expand())):
r.source.expand(),
r.target.expand()):
type_trans[s][t][d].append(r)
invalid_edge = []

View File

@ -36,20 +36,29 @@ class InfoFlowAnalysisTest(unittest.TestCase):
# don't check node list since the disconnected nodes are not
# removed after removing invalid domain transitions
edges = sorted(self.a.G.out_edges_iter())
self.assertListEqual([("dyntrans100", "bothtrans200"),
("start", "dyntrans100"),
("start", "trans1"),
("trans1", "trans2"),
("trans2", "trans3"),
("trans3", "trans5")], edges)
start = self.p.lookup_type("start")
trans1 = self.p.lookup_type("trans1")
trans2 = self.p.lookup_type("trans2")
trans3 = self.p.lookup_type("trans3")
trans4 = self.p.lookup_type("trans4")
trans5 = self.p.lookup_type("trans5")
dyntrans100 = self.p.lookup_type("dyntrans100")
bothtrans200 = self.p.lookup_type("bothtrans200")
edges = set(self.a.G.out_edges_iter())
self.assertSetEqual(set([(dyntrans100, bothtrans200),
(start, dyntrans100),
(start, trans1),
(trans1, trans2),
(trans2, trans3),
(trans3, trans5)]), edges)
def test_001_bothtrans(self):
"""DTA: type_transition, setexeccon(), and setcon() transitions."""
s = "dyntrans100"
t = "bothtrans200"
e = "bothtrans200_exec"
s = self.p.lookup_type("dyntrans100")
t = self.p.lookup_type("bothtrans200")
e = self.p.lookup_type("bothtrans200_exec")
# regular transition
r = self.a.G.edge[s][t]["transition"]
@ -133,8 +142,8 @@ class InfoFlowAnalysisTest(unittest.TestCase):
def test_010_dyntrans(self):
"""DTA: setcon() transition."""
s = "start"
t = "dyntrans100"
s = self.p.lookup_type("start")
t = self.p.lookup_type("dyntrans100")
# regular transition
r = self.a.G.edge[s][t]["transition"]
@ -179,9 +188,9 @@ class InfoFlowAnalysisTest(unittest.TestCase):
def test_020_trans(self):
"""DTA: type_transition transition."""
s = "start"
t = "trans1"
e = "trans1_exec"
s = self.p.lookup_type("start")
t = self.p.lookup_type("trans1")
e = self.p.lookup_type("trans1_exec")
# regular transition
r = self.a.G.edge[s][t]["transition"]
@ -247,9 +256,9 @@ class InfoFlowAnalysisTest(unittest.TestCase):
def test_030_setexec(self):
"""DTA: setexec() transition."""
s = "trans1"
t = "trans2"
e = "trans2_exec"
s = self.p.lookup_type("trans1")
t = self.p.lookup_type("trans2")
e = self.p.lookup_type("trans2_exec")
# regular transition
r = self.a.G.edge[s][t]["transition"]
@ -312,9 +321,9 @@ class InfoFlowAnalysisTest(unittest.TestCase):
def test_040_two_entrypoint(self):
"""DTA: 2 entrypoints, only one by type_transition."""
s = "trans2"
t = "trans3"
e = ["trans3_exec1", "trans3_exec2"]
s = self.p.lookup_type("trans2")
t = self.p.lookup_type("trans3")
e = [self.p.lookup_type("trans3_exec1"), self.p.lookup_type("trans3_exec2")]
# regular transition
r = self.a.G.edge[s][t]["transition"]
@ -404,9 +413,9 @@ class InfoFlowAnalysisTest(unittest.TestCase):
def test_050_cond_type_trans(self):
"""DTA: conditional type_transition."""
s = "trans3"
t = "trans5"
e = "trans5_exec"
s = self.p.lookup_type("trans3")
t = self.p.lookup_type("trans5")
e = self.p.lookup_type("trans5_exec")
# regular transition
r = self.a.G.edge[s][t]["transition"]