# Copyright 2015, Tresys Technology, LLC # # This file is part of SETools. # # SETools is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as # published by the Free Software Foundation, either version 2.1 of # the License, or (at your option) any later version. # # SETools is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with SETools. If not, see # . # import logging import copy from PyQt5.QtCore import pyqtSignal, Qt, QStringListModel, QThread from PyQt5.QtGui import QPalette, QTextCursor from PyQt5.QtWidgets import QCompleter, QHeaderView, QMessageBox, QProgressDialog, QScrollArea, \ QTreeWidgetItem from setools import InfoFlowAnalysis from setools.exception import UnmappedClass, UnmappedPermission from ..logtosignal import LogHandlerToSignal from ..widget import SEToolsWidget from .excludetypes import ExcludeTypes from .permmapedit import PermissionMapEditor class InfoFlowAnalysisTab(SEToolsWidget, QScrollArea): """An information flow analysis tab.""" @property def perm_map(self): return self.query.perm_map @perm_map.setter def perm_map(self, pmap): # copy permission map to keep enabled/disabled # settings private to this map. perm_map = copy.deepcopy(pmap) # transfer enabled/disabled settings from # current permission map, to the new map for classname in self.query.perm_map.classes(): for mapping in self.query.perm_map.perms(classname): try: perm_map.mapping(classname, mapping.perm).enabled = mapping.enabled except (UnmappedClass, UnmappedPermission): pass # apply updated permission map self.query.perm_map = perm_map def __init__(self, parent, policy, perm_map): super(InfoFlowAnalysisTab, self).__init__(parent) self.log = logging.getLogger(__name__) self.policy = policy self.query = InfoFlowAnalysis(policy, perm_map) self.query.source = None self.query.target = None self.setupUi() def __del__(self): self.thread.quit() self.thread.wait(5000) logging.getLogger("setools.infoflow").removeHandler(self.handler) def setupUi(self): self.log.debug("Initializing UI.") self.load_ui("infoflow.ui") # set up error message for missing perm map self.error_msg = QMessageBox(self) self.error_msg.setStandardButtons(QMessageBox.Ok) # set up perm map editor self.permmap_editor = PermissionMapEditor(self, False) # set up source/target autocompletion type_completion_list = [str(t) for t in self.policy.types()] type_completer_model = QStringListModel(self) type_completer_model.setStringList(sorted(type_completion_list)) self.type_completion = QCompleter() self.type_completion.setModel(type_completer_model) self.source.setCompleter(self.type_completion) self.target.setCompleter(self.type_completion) # setup indications of errors on source/target/default self.orig_palette = self.source.palette() self.error_palette = self.source.palette() self.error_palette.setColor(QPalette.Base, Qt.red) self.clear_source_error() self.clear_target_error() # set up processing thread self.thread = ResultsUpdater(self.query) self.thread.raw_line.connect(self.raw_results.appendPlainText) self.thread.finished.connect(self.update_complete) self.thread.flows.connect(self.reset_browser) # set up browser thread self.browser_thread = BrowserUpdater(self.query) self.browser_thread.flows.connect(self.add_browser_children) # create a "busy, please wait" dialog self.busy = QProgressDialog(self) self.busy.setModal(True) self.busy.setRange(0, 0) self.busy.setMinimumDuration(0) self.busy.setCancelButton(None) self.busy.reset() # update busy dialog from infoflow INFO logs self.handler = LogHandlerToSignal() self.handler.message.connect(self.busy.setLabelText) logging.getLogger("setools.infoflow").addHandler(self.handler) # Ensure settings are consistent with the initial .ui state self.max_path_length.setEnabled(self.all_paths.isChecked()) self.source.setEnabled(not self.flows_in.isChecked()) self.target.setEnabled(not self.flows_out.isChecked()) self.criteria_frame.setHidden(not self.criteria_expander.isChecked()) self.notes.setHidden(not self.notes_expander.isChecked()) self.browser_tab.setEnabled(self.flows_in.isChecked() or self.flows_out.isChecked()) # connect signals self.buttonBox.clicked.connect(self.run) self.source.textEdited.connect(self.clear_source_error) self.source.editingFinished.connect(self.set_source) self.target.textEdited.connect(self.clear_target_error) self.target.editingFinished.connect(self.set_target) self.all_paths.toggled.connect(self.all_paths_toggled) self.flows_in.toggled.connect(self.flows_in_toggled) self.flows_out.toggled.connect(self.flows_out_toggled) self.min_perm_weight.valueChanged.connect(self.set_min_weight) self.exclude_types.clicked.connect(self.choose_excluded_types) self.edit_permmap.clicked.connect(self.open_permmap_editor) self.browser.currentItemChanged.connect(self.browser_item_selected) # # Analysis mode # def all_paths_toggled(self, value): self.clear_source_error() self.clear_target_error() self.max_path_length.setEnabled(value) def flows_in_toggled(self, value): self.clear_source_error() self.clear_target_error() self.source.setEnabled(not value) self.limit_paths.setEnabled(not value) self.browser_tab.setEnabled(value) def flows_out_toggled(self, value): self.clear_source_error() self.clear_target_error() self.target.setEnabled(not value) self.limit_paths.setEnabled(not value) self.browser_tab.setEnabled(value) # # Source criteria # def set_source_error(self, error_text): self.log.error("Source type error: {0}".format(error_text)) self.source.setToolTip("Error: {0}".format(error_text)) self.source.setPalette(self.error_palette) def clear_source_error(self): self.source.setToolTip("The source type of the analysis.") self.source.setPalette(self.orig_palette) def set_source(self): try: # look up the type here, so invalid types can be caught immediately text = self.source.text() if text: self.query.source = self.policy.lookup_type(text) else: self.query.source = None except Exception as ex: self.set_source_error(ex) # # Target criteria # def set_target_error(self, error_text): self.log.error("Target type error: {0}".format(error_text)) self.target.setToolTip("Error: {0}".format(error_text)) self.target.setPalette(self.error_palette) def clear_target_error(self): self.target.setToolTip("The target type of the analysis.") self.target.setPalette(self.orig_palette) def set_target(self): try: # look up the type here, so invalid types can be caught immediately text = self.target.text() if text: self.query.target = self.policy.lookup_type(text) else: self.query.target = None except Exception as ex: self.set_target_error(ex) # # Options # def set_min_weight(self, value): self.query.min_weight = value def choose_excluded_types(self): chooser = ExcludeTypes(self, self.policy) chooser.show() def open_permmap_editor(self): self.permmap_editor.show(self.perm_map) def apply_permmap(self, pmap): # used only by permission map editor self.query.perm_map = pmap # # Infoflow browser # def _new_browser_item(self, type_, parent, rules=None, children=None): # build main item item = QTreeWidgetItem(parent if parent else self.browser) item.setText(0, str(type_)) item.type_ = type_ item.children = children if children else [] item.rules = rules if rules else [] item.child_populated = children is not None # add child items for child_type, child_rules in item.children: child_item = self._new_browser_item(child_type, item, rules=child_rules) item.addChild(child_item) item.setExpanded(children is not None) self.log.debug("Built item for {0} with {1} children and {2} rules".format( type_, len(item.children), len(item.rules))) return item def reset_browser(self, root_type, out, children): self.log.debug("Resetting browser.") # clear results self.browser.clear() self.browser_details.clear() # save browser details independent # from main analysis UI settings self.browser_root_type = root_type self.browser_mode = out root = self._new_browser_item(self.browser_root_type, self.browser, children=children) self.browser.insertTopLevelItem(0, root) def browser_item_selected(self, current, previous): if not current: # browser is being reset return self.log.debug("{0} selected in browser.".format(current.type_)) self.browser_details.clear() try: parent_type = current.parent().type_ except AttributeError: # should only hit his on the root item pass else: self.browser_details.appendPlainText("Information flows {0} {1} {2}\n".format( current.parent().type_, "->" if self.browser_mode else "<-", current.type_)) for rule in current.rules: self.browser_details.appendPlainText(rule) self.browser_details.moveCursor(QTextCursor.Start) if not current.child_populated: self.busy.setLabelText("Gathering additional browser details for {0}...".format( current.type_)) self.busy.show() self.browser_thread.out = self.browser_mode self.browser_thread.type_ = current.type_ self.browser_thread.start() def add_browser_children(self, children): item = self.browser.currentItem() item.children = children self.log.debug("Adding children for {0}".format(item.type_)) for child_type, child_rules in item.children: child_item = self._new_browser_item(child_type, item, rules=child_rules) item.addChild(child_item) item.child_populated = True self.busy.reset() # # Results runner # def run(self, button): # right now there is only one button. fail = False if self.source.isEnabled() and not self.query.source: self.set_source_error("A source type is required") fail = True if self.target.isEnabled() and not self.query.target: self.set_target_error("A target type is required.") fail = True if not self.perm_map: self.log.critical("A permission map is required to begin the analysis.") self.error_msg.critical(self, "No permission map available.", "Please load a permission map to begin the analysis.") fail = True if fail: return for mode in [self.all_paths, self.all_shortest_paths, self.flows_in, self.flows_out]: if mode.isChecked(): break self.query.mode = mode.objectName() self.query.max_path_len = self.max_path_length.value() self.query.limit = self.limit_paths.value() # start processing self.busy.setLabelText("Processing query...") self.busy.show() self.raw_results.clear() self.thread.start() def update_complete(self): if not self.busy.wasCanceled(): self.busy.setLabelText("Moving the raw result to top; GUI may be unresponsive") self.busy.repaint() self.raw_results.moveCursor(QTextCursor.Start) if self.flows_in.isChecked() or self.flows_out.isChecked(): # move to browser tab for flows in/out self.results_frame.setCurrentIndex(1) else: self.results_frame.setCurrentIndex(0) self.busy.reset() class ResultsUpdater(QThread): """ Thread for processing queries and updating result widgets. Parameters: query The query object model The model for the results Qt signals: raw_line A string to be appended to the raw results. flows (str, bool, list) Initial information for populating the flows browser. """ raw_line = pyqtSignal(str) flows = pyqtSignal(str, bool, list) def __init__(self, query): super(ResultsUpdater, self).__init__() self.query = query self.log = logging.getLogger(__name__) def __del__(self): self.wait() def run(self): """Run the query and update results.""" assert self.query.limit, "Code doesn't currently handle unlimited (limit=0) paths." self.out = self.query.mode == "flows_out" if self.query.mode == "all_paths": self.transitive(self.query.all_paths(self.query.source, self.query.target, self.query.max_path_len)) elif self.query.mode == "all_shortest_paths": self.transitive(self.query.all_shortest_paths(self.query.source, self.query.target)) elif self.query.mode == "flows_out": self.direct(self.query.infoflows(self.query.source, out=self.out)) else: # flows_in self.direct(self.query.infoflows(self.query.target, out=self.out)) def transitive(self, paths): pathnum = 0 for pathnum, path in enumerate(paths, start=1): self.raw_line.emit("Flow {0}:".format(pathnum)) for stepnum, step in enumerate(path, start=1): self.raw_line.emit(" Step {0}: {1} -> {2}".format(stepnum, step.source, step.target)) for rule in sorted(step.rules): self.raw_line.emit(" {0}".format(rule)) self.raw_line.emit("") if QThread.currentThread().isInterruptionRequested() or (pathnum >= self.query.limit): break else: QThread.yieldCurrentThread() self.raw_line.emit("") self.raw_line.emit("{0} information flow path(s) found.\n".format(pathnum)) self.log.info("{0} information flow path(s) found.".format(pathnum)) def direct(self, flows): flownum = 0 child_types = [] for flownum, flow in enumerate(flows, start=1): self.raw_line.emit("Flow {0}: {1} -> {2}".format(flownum, flow.source, flow.target)) for rule in sorted(flow.rules): self.raw_line.emit(" {0}".format(rule)) self.raw_line.emit("") # Generate results for flow browser if self.out: child_types.append((flow.target, sorted(str(r) for r in flow.rules))) else: child_types.append((flow.source, sorted(str(r) for r in flow.rules))) if QThread.currentThread().isInterruptionRequested(): break else: QThread.yieldCurrentThread() self.raw_line.emit("{0} information flow(s) found.\n".format(flownum)) self.log.info("{0} information flow(s) found.".format(flownum)) # Update browser: root_type = self.query.source if self.out else self.query.target self.flows.emit(str(root_type), self.out, sorted(child_types)) class BrowserUpdater(QThread): """ Thread for processing additional analysis for the browser. Parameters: query The query object model The model for the results Qt signals: flows A list of child types to render in the infoflows browser. """ flows = pyqtSignal(list) def __init__(self, query): super(BrowserUpdater, self).__init__() self.query = query self.type_ = None self.out = None self.log = logging.getLogger(__name__) def __del__(self): self.wait() def run(self): flownum = 0 child_types = [] for flownum, flow in enumerate(self.query.infoflows(self.type_, out=self.out), start=1): # Generate results for flow browser if self.out: child_types.append((flow.target, sorted(str(r) for r in flow.rules))) else: child_types.append((flow.source, sorted(str(r) for r in flow.rules))) if QThread.currentThread().isInterruptionRequested(): break else: QThread.yieldCurrentThread() self.log.debug("{0} additional information flow(s) found.".format(flownum)) # Update browser: self.flows.emit(sorted(child_types))