Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / impact / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

"""
Faraday Penetration Test IDE
Copyright (C) 2013  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information

"""

import xml.etree.ElementTree as ET

from faraday_plugins.plugins.plugin import PluginXMLFormat
from faraday_plugins.plugins.plugins_utils import CVE_regex

__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"


class ImpactXmlParser:
    """
    The objective of this class is to parse an xml file generated by the impact tool.

    TODO: Handle errors.
    TODO: Test impact output version. Handle what happens if the parser doesn't support it.
    TODO: Test cases.

    @param impact_xml_filepath A proper xml generated by impact
    """

    def __init__(self, xml_output):
        tree = self.parse_xml(xml_output)
        if tree:
            self.items = self.get_items(tree)
        else:
            self.items = []

    def parse_xml(self, xml_output):
        """
        Open and parse an xml file.

        TODO: Write custom parser to just read the nodes that we need instead of
        reading the whole file.

        @return xml_tree An xml tree instance. None if error.
        """
        try:
            tree = ET.fromstring(xml_output)
        except SyntaxError as err:
            return None

        return tree

    def get_items(self, tree):
        """
        @return items A list of Host instances
        """
        for node in tree.findall("entity/[@class='host']"):
            yield Item(node, tree)


class Item:
    """
    An abstract representation of a Item


    @param item_node A item_node taken from an impact xml tree
    """

    def __init__(self, item_node, parent=None):
        self.node = item_node

        self.arch = self.get_text_from_subnode("property/[@key='arch']")

        self.host = self.get_text_from_subnode(
            "property/[@key='display_name']")

        self.ip = self.get_text_from_subnode("property/[@key='ip']")

        self.os = self.get_text_from_subnode(
            "property/[@key='os']/property/[@key='entity name']")

        self.ports = []
        self.services = []
        self.process_ports(item_node)
        self.process_services(item_node)

        self.agent = False

        for node in parent.findall("entity/[@class='agent']"):

            self.node = node
            agentip = node.get('name').split("/")[1]

            if self.ip == agentip:
                self.agentip = agentip

                self.ipfrom = self.get_text_from_subnode(
                    "property/[@key='Connection Properties']/property/[@key='ip']") or agentip

                self.agentype = node.get("type")

                self.agentport = self.get_text_from_subnode(
                    "property/[@key='Connection Properties']//property/[@key='port']") or ""

                self.agentsubtype = self.get_text_from_subnode(
                    "property/[@key='Connection Properties']//property/[@key='subtype']") or ""

                self.agentcon = self.get_text_from_subnode(
                    "property/[@key='Connection Properties']//property/[@key='type']") or ""

                self.agent = True
                break

        self.results = self.getResults(item_node)

    def process_ports(self, item_node):
        for p in item_node.findall("property/[@key='tcp_ports']/property/[@type='port']"):
            self.ports.append({'port': p.get('key'),
                               'protocol': "tcp",
                               'status': "open" if p.text == "listen" else p.text})

        for p in item_node.findall("property/[@key='udp_ports']/property/[@type='port']"):
            self.ports.append({'port': p.get('key'),
                               'protocol': "udp",
                               'status': "open" if p.text == "listen" else p.text})

    def process_services(self, item_node):
        for service in item_node.findall("property/[@key='services']/property"):
            service_name = service.get("key")
            port, protocol = service.findall('property')[0].get('key').split('-')
            self.services.append({
                "name": service_name,
                "protocol": protocol,
                "port": port
            })

    def getResults(self, tree):
        """
        :param tree:
        """
        for self.issues in tree.findall("property/[@key='Vulnerabilities']/property/[@type='container']"):
            yield Results(self.issues)
        # 2017R1 compatibility
        for self.issues in tree.findall("property/[@key='exposures']/property/[@type='container']"):
            yield Results(self.issues)

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text

        return None


class Results:

    def __init__(self, issue_node):
        self.node = issue_node
        match = CVE_regex.match(issue_node.get("key", ""))
        if match:
            self.ref = []
            self.cve = [match.group()]
        else:
            self.ref = [issue_node.get("key")]
            self.cve = []
        self.severity = ""
        self.port = "Unknown"
        self.service_name = "n/a"
        self.protocol = "tcp?"
        vuln = issue_node.find("property/property")
        if not vuln:
            # 2017R1 compatibility
            self.ref = []
            vuln = issue_node.find("property")
            self.name = self.get_text_from_subnode("property/[@key='title']")
            self.desc = self.get_text_from_subnode("property/[@key='description']")
            self.severity = self.get_text_from_subnode("property/[@key='severity']")
            self.service_name = self.get_text_from_subnode("property/[@key='service']")
        else:
            # 2013R3 xml version
            self.name = vuln.get("key")
            self.node = vuln
            self.desc = self.get_text_from_subnode("property/[@key='description']")
            self.port = self.get_text_from_subnode("property/[@key='port']")

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text

        return None


class ImpactPlugin(PluginXMLFormat):
    """
    Example plugin to parse impact output.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.identifier_tag = "entities"
        self.id = "CoreImpact"
        self.name = "Core Impact XML Output Plugin"
        self.plugin_version = "0.0.2"
        self.version = "Core Impact 2013R1/2017R2"
        self.framework_version = "1.0.0"
        self.options = None

    def parseOutputString(self, output):
        parser = ImpactXmlParser(output)
        mapped_services = {}
        mapped_ports = {}
        for item in parser.items:
            os_string = f"{item.os} {item.arch}"
            h_id = self.createAndAddHost(item.ip, os=os_string, hostnames=[item.host])

            for service in item.services:
                s_id = self.createAndAddServiceToHost(
                    h_id,
                    service['name'],
                    service['protocol'],
                    ports=[service['port']],
                    status='open')
                mapped_services[service['name']] = s_id
                mapped_ports[service['port']] = s_id

            if item.agent:
                desc = "Agent Type: " + item.agentype
                desc += "\nConn from:" + item.ipfrom
                desc += "\nPort:" + item.agentport
                desc += "\nProtocol:" + item.agentsubtype
                desc += "\nConn:" + item.agentcon

                self.createAndAddVulnToHost(
                    h_id,
                    "Core Impact Agent",
                    desc=desc,
                    severity="HIGH")

            for v in item.results:
                if v.service_name == "n/a" and v.port == "Unknown":
                    self.createAndAddVulnToHost(
                        h_id,
                        v.name,
                        desc=v.desc,
                        severity=v.severity,
                        ref=v.ref,
                        cve=v.cve)
                else:
                    s_id = mapped_services.get(v.service_name) or mapped_ports.get(v.port)
                    self.createAndAddVulnToService(
                        h_id,
                        s_id,
                        v.name,
                        desc=v.desc,
                        severity=v.severity,
                        ref=v.ref,
                        cve=v.cve)

            for p in item.ports:
                s_id = self.createAndAddServiceToHost(
                    h_id,
                    p['port'],
                    p['protocol'],
                    ports=[p['port']],
                    status=p['status'])
        del parser


def createPlugin(*args, **kwargs):
    return ImpactPlugin(*args, **kwargs)