Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / retina / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

"""
Faraday Penetration Test IDE
Copyright (C) 2013  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information

"""
import re
import xml.etree.ElementTree as ET

from faraday_plugins.plugins.plugin import PluginXMLFormat

__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"


class RetinaXmlParser:
    """
    The objective of this class is to parse an xml file generated by the retina tool.

    TODO: Handle errors.
    TODO: Test retina output version. Handle what happens if the parser doesn't support it.
    TODO: Test cases.

    @param retina_xml_filepath A proper xml generated by retina
    """

    def __init__(self, xml_output):
        tree = self.parse_xml(xml_output)
        if tree:
            self.items = self.get_items(tree)
        else:
            self.items = []

    def parse_xml(self, xml_output):
        """
        Open and parse an xml file.

        TODO: Write custom parser to just read the nodes that we need instead of
        reading the whole file.

        @return xml_tree An xml tree instance. None if error.
        """
        try:
            tree = ET.fromstring(xml_output)
        except SyntaxError as err:
            print(f"SyntaxError: {err}. {xml_output}")
            return None

        return tree

    def get_items(self, tree):
        """
        @return items A list of Host instances
        """
        for node in tree.findall("hosts/host"):
            yield Item(node)


class Item:
    """
    An abstract representation of a Item


    @param item_node A item_node taken from an retina xml tree
    """

    def __init__(self, item_node):
        self.node = item_node
        self.ip = self.get_text_from_subnode("ip")
        self.hostname = "" if self.get_text_from_subnode(
            "dnsName") == "unknown" else self.get_text_from_subnode("dnsName")
        self.netbiosname = self.get_text_from_subnode("netBIOSName")
        self.netbiosdomain = self.get_text_from_subnode("netBIOSDomain")
        self.os = self.get_text_from_subnode("os")
        self.mac = self.get_text_from_subnode("mac")

        self.vulns = self.getResults(item_node)
        self.ports = {}
        for v in self.vulns:
            if v.port not in self.ports:
                self.ports[v.port] = []
            self.ports[v.port].append(v)

    def getResults(self, tree):
        """
        :param tree:
        """
        for self.issues in tree.findall("audit"):
            yield Results(self.issues)

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text

        return None


class Results:

    def __init__(self, issue_node):
        self.node = issue_node
        self.name = self.get_text_from_subnode('name')

        self.description = self.get_text_from_subnode('description')
        self.solution = self.get_text_from_subnode('fixInformation')
        self.severity = self.get_text_from_subnode('risk')
        self.cve = "" if self.get_text_from_subnode(
            'cve') == 'N/A' else self.get_text_from_subnode('cve')
        self.cce = self.get_text_from_subnode('cce')
        self.date = self.get_text_from_subnode('date')
        self.pciLevel = self.get_text_from_subnode('pciLevel')
        self.pciReason = self.get_text_from_subnode('pciReason')
        self.pciPassFail = self.get_text_from_subnode('pciPassFail')
        self.cvss2Score = self.get_text_from_subnode('cvssScore')
        self.exploit = self.get_text_from_subnode('exploit')
        self.context = self.get_text_from_subnode('context')
        val = self.context.split(":")
        self.port = ""
        self.protocol = ""
        if len(val) == 2:
            if val[0] in ['TCP', 'UDP']:
                self.protocol = val[0]
                self.port = val[1]

        self.desc = self.get_text_from_subnode('description')
        self.solution = self.solution if self.solution else ""
        self.desc += "\nExploit: " + self.exploit if self.exploit else ""
        self.desc += "\nContext: " + self.context if self.context else ""

        self.ref = []
        self.cvss2 = {}
        if self.cvss2Score != "N/A":
            self.cvss2["vector_string"] = self.cvss2Score.split(' ')[1].replace('[', '').replace(']', '')

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text

        return None


class RetinaPlugin(PluginXMLFormat):
    """
    Example plugin to parse retina output.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.identifier_tag = "scanJob"
        self.id = "Retina"
        self.name = "Retina XML Output Plugin"
        self.plugin_version = "0.0.1"
        self.version = "Retina Network 5.19.2.2718"
        self.framework_version = "1.0.0"
        self.options = None

    def parseOutputString(self, output):

        parser = RetinaXmlParser(output)
        for item in parser.items:
            hostname = item.hostname if item.hostname else None
            h_id = self.createAndAddHost(item.ip, item.os, hostnames=[hostname])

            if not item.netbiosname == 'N/A':
                self.createAndAddNoteToHost(
                    h_id, "netBIOSName", item.netbiosname)

            if not item.netbiosdomain == 'N/A':
                self.createAndAddNoteToHost(
                    h_id, "netBIOSDomain", item.netbiosdomain)

            for k, vulns in item.ports.items():
                if k:
                    for v in vulns:
                        cve = v.cve.split(",") if v.cve else []
                        web = False
                        s_id = self.createAndAddServiceToHost(h_id, 'unknown', v.protocol.lower(), ports=[str(v.port)],
                                                              status="open")
                        if v.port in ['80', '443'] or re.search("ssl|http", v.name.lower()):
                            web = True
                        else:
                            web = False
                        if web:
                            v_id = self.createAndAddVulnWebToService(h_id, s_id, v.name, ref=v.ref,
                                                                     website=hostname, severity=v.severity,
                                                                     resolution=v.solution, desc=v.desc, cve=cve, cvss2=v.cvss2)
                        else:
                            v_id = self.createAndAddVulnToService(h_id, s_id, v.name, ref=v.ref,
                                                                  severity=v.severity, resolution=v.solution,
                                                                  desc=v.desc, cve=cve, cvss2=v.cvss2)
                else:
                    for v in vulns:
                        cve = v.cve.split(",") if v.cve else []
                        v_id = self.createAndAddVulnToHost(h_id, v.name, ref=v.ref, severity=v.severity,
                                                           resolution=v.solution, desc=v.desc, cve=cve, cvss2=v.cvss2)
        del parser


def createPlugin(*args, **kwargs):
    return RetinaPlugin(*args, **kwargs)