Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / netsparker / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

""""
Faraday Penetration Test IDE
Copyright (C) 2013  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information

"""
import sys
import re
import xml.etree.ElementTree as ET

from bs4 import BeautifulSoup

from faraday_plugins.plugins.plugin import PluginXMLFormat
from faraday_plugins.plugins.plugins_utils import CVE_regex

__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"


class NetsparkerXmlParser:
    """
    The objective of this class is to parse an xml file generated by the netsparker tool.

    TODO: Handle errors.
    TODO: Test netsparker output version. Handle what happens if the parser doesn't support it.
    TODO: Test cases.

    @param netsparker_xml_filepath A proper xml generated by netsparker
    """

    def __init__(self, xml_output, plugin):
        self.filepath = xml_output
        self.plugin = plugin

        tree = self.parse_xml(xml_output)
        if tree:
            self.items = [data for data in self.get_items(tree)]
        else:
            self.items = []

    def parse_xml(self, xml_output):
        """
        Open and parse an xml file.

        TODO: Write custom parser to just read the nodes that we need instead of
        reading the whole file.

        @return xml_tree An xml tree instance. None if error.
        """
        try:
            tree = ET.fromstring(xml_output)
        except SyntaxError as err:
            self.plugin.logger.error(f"SyntaxError: {err}. {xml_output}")
            return None

        return tree

    def get_items(self, tree):
        """
        @return items A list of Host instances
        """
        for node in tree.findall("vulnerability"):
            yield Item(node)


class Item:
    """
    An abstract representation of a Item


    @param item_node A item_node taken from an netsparker xml tree
    """

    def __init__(self, item_node):
        self.node = item_node
        self.url = self.get_text_from_subnode("url")
        host = re.search(
            r"(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]"
            r"{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2"
            r"[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]"
            r"{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|"
            "pro|aero|coop|museum|[a-zA-Z]{2}))[\\:]*([0-9]+)*([/]*($|[\\(\\)a-zA-Z0-9\\.\\,\\?\'\\\\+&%\\$#\\=~_\\-]+)).*?$",
            self.url)
        self.protocol = host.group(1)
        self.hostname = host.group(4)
        self.port = 80

        if self.protocol == 'https':
            self.port = 443
        if host.group(11) is not None:
            self.port = host.group(11)

        self.name = self.get_text_from_subnode("type")
        self.name_title = self.get_text_from_subnode("title")
        self.desc = self.get_text_from_subnode("description")
        self.severity = self.re_map_severity(self.get_text_from_subnode("severity"))
        self.certainty = self.get_text_from_subnode("certainty")
        self.method = self.get_text_from_subnode("vulnerableparametertype")
        self.param = self.get_text_from_subnode("vulnerableparameter")
        self.paramval = self.get_text_from_subnode("vulnerableparametervalue")
        self.reference = self.get_text_from_subnode("externalReferences")
        remedy = self.get_text_from_subnode("remedy")
        self.resolution = self.get_text_from_subnode("actionsToTake") + remedy if remedy else ""
        self.request = self.get_text_from_subnode("rawrequest")
        self.response = self.get_text_from_subnode("rawresponse")
        self.kvulns = []
        self.cve = []
        for v in self.node.findall("knownvulnerabilities/knownvulnerability"):
            self.node = v
            recheck = CVE_regex.search(v.find('title').text)
            if recheck:
                self.cve.append(recheck.group())
            self.kvulns.append(self.get_text_from_subnode(
                "severity") + "-" + self.get_text_from_subnode("title"))

        self.extra = []
        for v in item_node.findall("extrainformation/info"):
            name_tag = v.find('name')
            value_tag = v.find('value')
            if name_tag is not None:
                self.extra.append(f"{name_tag.text}:{value_tag.text}")

        self.node = item_node
        self.node = item_node.find("classification")
        self.owasp = self.get_text_from_subnode("OWASP")
        self.wasc = self.get_text_from_subnode("WASC")
        self.cwe = self.get_text_from_subnode("CWE")
        self.capec = self.get_text_from_subnode("CAPEC")
        self.pci = self.get_text_from_subnode("PCI")
        self.pci2 = self.get_text_from_subnode("PCI2")
        self.node = item_node.find("classification/CVSS")
        self.cvss_full_vector = self.get_text_from_subnode("vector")
        self.cvss_score = self.get_text_from_subnode("score[1]/value") if self.get_text_from_subnode("score[1]/value") else None
        self.cvss3 = {}
        self.ref = []
        if self.cwe:
            self.cwe = ["CWE-" + self.cwe]
        if self.owasp:
            self.ref.append("OWASP-" + self.owasp)
        if self.reference:
            self.ref.extend(sorted(set(re.findall(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+', self.reference))))
        if self.cvss_full_vector:
            self.cvss3["vector_string"] = self.cvss_full_vector
        self.data = ""
        self.data += "\nKnowVulns: " + \
            "\n".join(self.kvulns) if self.kvulns else ""
        self.data += "\nWASC: " + self.wasc if self.wasc else ""
        self.data += "\nCertainty: " + self.certainty if self.certainty else ""
        self.data += "\nPCI: " + self.pci if self.pci else ""
        self.data += "\nPCI2: " + self.pci2 if self.pci2 else ""
        self.data += "\nCAPEC: " + self.capec if self.capec else ""
        self.data += "\nPARAM: " + self.param if self.param else ""
        self.data += "\nPARAM VAL: " + \
            repr(self.paramval) if self.paramval else ""
        self.data += "\nExtra: " + "\n".join(self.extra) if self.extra else ""

    @staticmethod
    def re_map_severity(severity):
        if severity == "Important":
            return "high"
        return severity

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        if self.node:
            sub_node = self.node.find(subnode_xpath_expr)
            if sub_node is not None:
                return sub_node.text
        return None


class NetsparkerPlugin(PluginXMLFormat):
    """
    Example plugin to parse netsparker output.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.identifier_tag = "netsparker"
        self.id = "Netsparker"
        self.name = "Netsparker XML Output Plugin"
        self.plugin_version = "0.0.1"
        self.version = "Netsparker 3.1.1.0"
        self.framework_version = "1.0.0"
        self.options = None

    def parseOutputString(self, output):
        parser = NetsparkerXmlParser(output, self)
        host_names_resolve = {}
        for i in parser.items:

            if i.hostname not in host_names_resolve:
                ip = self.resolve_hostname(i.hostname)
                host_names_resolve[i.hostname] = ip
            else:
                ip = host_names_resolve[i.hostname]
            h_id = self.createAndAddHost(ip, hostnames=[i.hostname])

            s_id = self.createAndAddServiceToHost(h_id, str(i.port),
                                                       protocol = str(i.protocol),
                                                       ports=[str(i.port)],
                                                       status="open")

            if i.resolution is not None:
                resolution = BeautifulSoup(i.resolution, "lxml").text
            else:
                resolution = ""

            if i.desc is not None:
                desc = BeautifulSoup(i.desc, "lxml").text
            else:
                desc = ""
            if i.name_title is None:
                name = i.name
            else:
                name = i.name_title
            self.createAndAddVulnWebToService(h_id, s_id, name, ref=i.ref, website=i.hostname,
                                                     severity=i.severity, desc=desc, path=i.url, method=i.method,
                                                     request=i.request, response=i.response, resolution=resolution,
                                                     pname=i.param, data=i.data, cve=i.cve, cwe=i.cwe, cvss3=i.cvss3)

        del parser


def createPlugin(*args, **kwargs):
    return NetsparkerPlugin(*args, **kwargs)