mirror of
https://github.com/SELinuxProject/setools
synced 2025-02-21 22:46:50 +00:00
InfoFlowAnalysis: convert output to namedtuples
This commit is contained in:
parent
d23bdd4895
commit
da8b151d5c
12
seinfoflow
12
seinfoflow
@ -90,10 +90,10 @@ try:
|
||||
flownum = 0
|
||||
for flownum, path in enumerate(paths, start=1):
|
||||
print("Flow {0}:".format(flownum))
|
||||
for stepnum, (src, tgt, rules) in enumerate(path, start=1):
|
||||
print(" Step {0}: {1} -> {2}".format(stepnum, src, tgt))
|
||||
for stepnum, step in enumerate(path, start=1):
|
||||
print(" Step {0}: {1} -> {2}".format(stepnum, step.source, step.target))
|
||||
|
||||
for rule in sorted(rules):
|
||||
for rule in sorted(step.rules):
|
||||
print(" ", rule)
|
||||
|
||||
print()
|
||||
@ -105,9 +105,9 @@ try:
|
||||
|
||||
else: # single direct info flow
|
||||
flownum = 0
|
||||
for flownum, (src, tgt, rules) in enumerate(g.infoflows(args.source), start=1):
|
||||
print("Flow {0}: {1} -> {2}".format(flownum, src, tgt))
|
||||
for rule in sorted(rules):
|
||||
for flownum, flow in enumerate(g.infoflows(args.source), start=1):
|
||||
print("Flow {0}: {1} -> {2}".format(flownum, flow.source, flow.target))
|
||||
for rule in sorted(flow.rules):
|
||||
print(" ", rule)
|
||||
|
||||
print()
|
||||
|
@ -18,13 +18,19 @@
|
||||
#
|
||||
import itertools
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
import networkx as nx
|
||||
from networkx.exception import NetworkXError, NetworkXNoPath
|
||||
|
||||
|
||||
__all__ = ['InfoFlowAnalysis']
|
||||
|
||||
# Return values for the analysis
|
||||
# are in the following tuple format:
|
||||
step_output = namedtuple("step", ["source",
|
||||
"target",
|
||||
"rules"])
|
||||
|
||||
|
||||
class InfoFlowAnalysis(object):
|
||||
|
||||
@ -246,7 +252,7 @@ class InfoFlowAnalysis(object):
|
||||
try:
|
||||
for source, target in flows:
|
||||
edge = Edge(self.subG, source, target)
|
||||
yield source, target, edge.rules
|
||||
yield step_output(source, target, edge.rules)
|
||||
except NetworkXError:
|
||||
# NetworkXError: the type is valid but not in graph, e.g.
|
||||
# excluded or disconnected due to min weight
|
||||
@ -284,7 +290,7 @@ class InfoFlowAnalysis(object):
|
||||
"""
|
||||
for s in range(1, len(path)):
|
||||
edge = Edge(self.subG, path[s - 1], path[s])
|
||||
yield edge.source, edge.target, edge.rules
|
||||
yield step_output(edge.source, edge.target, edge.rules)
|
||||
|
||||
#
|
||||
#
|
||||
|
@ -208,20 +208,20 @@ class InfoFlowAnalysisTest(mixins.ValidateRule, unittest.TestCase):
|
||||
steps = list(paths[0])
|
||||
self.assertEqual(2, len(steps))
|
||||
|
||||
s, t, rules = steps[0]
|
||||
self.assertIsInstance(s, Type)
|
||||
self.assertIsInstance(t, Type)
|
||||
self.assertEqual(s, "node1")
|
||||
self.assertEqual(t, "node2")
|
||||
for r in rules:
|
||||
step = steps[0]
|
||||
self.assertIsInstance(step.source, Type)
|
||||
self.assertIsInstance(step.target, Type)
|
||||
self.assertEqual(step.source, "node1")
|
||||
self.assertEqual(step.target, "node2")
|
||||
for r in steps[0].rules:
|
||||
self.assertEqual("allow", r.ruletype)
|
||||
|
||||
s, t, rules = steps[1]
|
||||
self.assertIsInstance(s, Type)
|
||||
self.assertIsInstance(t, Type)
|
||||
self.assertEqual(s, "node2")
|
||||
self.assertEqual(t, "node4")
|
||||
for r in rules:
|
||||
step = steps[1]
|
||||
self.assertIsInstance(step.source, Type)
|
||||
self.assertIsInstance(step.target, Type)
|
||||
self.assertEqual(step.source, "node2")
|
||||
self.assertEqual(step.target, "node4")
|
||||
for r in step.rules:
|
||||
self.assertEqual("allow", r.ruletype)
|
||||
|
||||
def test_301_all_shortest_paths(self):
|
||||
@ -235,20 +235,20 @@ class InfoFlowAnalysisTest(mixins.ValidateRule, unittest.TestCase):
|
||||
steps = list(paths[0])
|
||||
self.assertEqual(2, len(steps))
|
||||
|
||||
s, t, rules = steps[0]
|
||||
self.assertIsInstance(s, Type)
|
||||
self.assertIsInstance(t, Type)
|
||||
self.assertEqual(s, "node1")
|
||||
self.assertEqual(t, "node2")
|
||||
for r in rules:
|
||||
step = steps[0]
|
||||
self.assertIsInstance(step.source, Type)
|
||||
self.assertIsInstance(step.target, Type)
|
||||
self.assertEqual(step.source, "node1")
|
||||
self.assertEqual(step.target, "node2")
|
||||
for r in steps[0].rules:
|
||||
self.assertEqual("allow", r.ruletype)
|
||||
|
||||
s, t, rules = steps[1]
|
||||
self.assertIsInstance(s, Type)
|
||||
self.assertIsInstance(t, Type)
|
||||
self.assertEqual(s, "node2")
|
||||
self.assertEqual(t, "node4")
|
||||
for r in rules:
|
||||
step = steps[1]
|
||||
self.assertIsInstance(step.source, Type)
|
||||
self.assertIsInstance(step.target, Type)
|
||||
self.assertEqual(step.source, "node2")
|
||||
self.assertEqual(step.target, "node4")
|
||||
for r in step.rules:
|
||||
self.assertEqual("allow", r.ruletype)
|
||||
|
||||
def test_302_shortest_path(self):
|
||||
@ -262,20 +262,20 @@ class InfoFlowAnalysisTest(mixins.ValidateRule, unittest.TestCase):
|
||||
steps = list(paths[0])
|
||||
self.assertEqual(2, len(steps))
|
||||
|
||||
s, t, rules = steps[0]
|
||||
self.assertIsInstance(s, Type)
|
||||
self.assertIsInstance(t, Type)
|
||||
self.assertEqual(s, "node1")
|
||||
self.assertEqual(t, "node2")
|
||||
for r in rules:
|
||||
step = steps[0]
|
||||
self.assertIsInstance(step.source, Type)
|
||||
self.assertIsInstance(step.target, Type)
|
||||
self.assertEqual(step.source, "node1")
|
||||
self.assertEqual(step.target, "node2")
|
||||
for r in steps[0].rules:
|
||||
self.assertEqual("allow", r.ruletype)
|
||||
|
||||
s, t, rules = steps[1]
|
||||
self.assertIsInstance(s, Type)
|
||||
self.assertIsInstance(t, Type)
|
||||
self.assertEqual(s, "node2")
|
||||
self.assertEqual(t, "node4")
|
||||
for r in rules:
|
||||
step = steps[1]
|
||||
self.assertIsInstance(step.source, Type)
|
||||
self.assertIsInstance(step.target, Type)
|
||||
self.assertEqual(step.source, "node2")
|
||||
self.assertEqual(step.target, "node4")
|
||||
for r in step.rules:
|
||||
self.assertEqual("allow", r.ruletype)
|
||||
|
||||
def test_303_infoflows_out(self):
|
||||
@ -283,11 +283,11 @@ class InfoFlowAnalysisTest(mixins.ValidateRule, unittest.TestCase):
|
||||
self.a.set_exclude(None)
|
||||
self.a.set_min_weight(1)
|
||||
|
||||
for s, t, rules in self.a.infoflows("node6"):
|
||||
self.assertIsInstance(s, Type)
|
||||
self.assertIsInstance(t, Type)
|
||||
self.assertEqual(s, "node6")
|
||||
for r in rules:
|
||||
for flow in self.a.infoflows("node6"):
|
||||
self.assertIsInstance(flow.source, Type)
|
||||
self.assertIsInstance(flow.target, Type)
|
||||
self.assertEqual(flow.source, "node6")
|
||||
for r in flow.rules:
|
||||
self.assertEqual("allow", r.ruletype)
|
||||
|
||||
def test_304_infoflows_in(self):
|
||||
@ -295,11 +295,11 @@ class InfoFlowAnalysisTest(mixins.ValidateRule, unittest.TestCase):
|
||||
self.a.set_exclude(None)
|
||||
self.a.set_min_weight(1)
|
||||
|
||||
for s, t, rules in self.a.infoflows("node8", out=False):
|
||||
self.assertIsInstance(s, Type)
|
||||
self.assertIsInstance(t, Type)
|
||||
self.assertEqual(t, "node8")
|
||||
for r in rules:
|
||||
for flow in self.a.infoflows("node8", out=False):
|
||||
self.assertIsInstance(flow.source, Type)
|
||||
self.assertIsInstance(flow.target, Type)
|
||||
self.assertEqual(flow.target, "node8")
|
||||
for r in flow.rules:
|
||||
self.assertEqual("allow", r.ruletype)
|
||||
|
||||
def test_900_set_exclude_invalid_type(self):
|
||||
|
Loading…
Reference in New Issue
Block a user