Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / zap / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

"""
Faraday Penetration Test IDE
Copyright (C) 2013  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
"""
import re
import xml.etree.ElementTree as ET
from urllib.parse import urlparse

from faraday_plugins.plugins.plugin import PluginXMLFormat

__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"


class ZapXmlParser:
    """
    The objective of this class is to parse an xml
    file generated by the zap tool.

    TODO: Handle errors.
    TODO: Test zap output version. Handle what happens
          if the parser doesn't support it.

    TODO: Test cases.

    @param zap_xml_filepath A proper xml generated by zap
    """

    def __init__(self, xml_output, resolve_hostname):
        self.resolve_hostname = resolve_hostname
        tree = self.parse_xml(xml_output)

        if tree is not None:
            self.sites = [data for data in self.get_items(tree)]
        else:
            self.sites = []

    @staticmethod
    def parse_xml(xml_output):
        """
        Open and parse an xml file.

        TODO: Write custom parser to just read the nodes that we need instead of
        reading the whole file.

        @return xml_tree An xml tree instance. None if error.
        """
        try:
            parser = ET.XMLParser(target=ET.TreeBuilder())
            parser.feed(xml_output)
            tree = parser.close()

        except SyntaxError as err:
            print(f"SyntaxError: {err}. {xml_output}")
            return None

        return tree

    def get_items(self, tree):
        """
        @return items A list of Host instances
        """
        for node in tree.findall('site'):
            yield Site(node, self.resolve_hostname)


def get_attrib_from_subnode(xml_node, subnode_xpath_expr, attrib_name):
    """
    Finds a subnode in the item node and the retrieves a value from it

    @return An attribute value
    """

    node = xml_node.find(subnode_xpath_expr)

    if node is not None:
        return node.get(attrib_name)

    return None


def strip_tags(data):
    """
    Remove html tags from a string
    @return Stripped string
    """
    clean = re.compile('<.*?>')
    return re.sub(clean, '', data)


class Site:

    def __init__(self, item_node, resolve_hostname):
        self.node = item_node

        self.host = self.node.get('host')
        self.ip = resolve_hostname(self.host)
        self.port = self.node.get('port')
        self.ssl = self.node.get('ssl')

        self.items = []
        for alert in self.node.findall('alerts/alertitem'):
            self.items.append(Item(alert))

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text
        return None


class Item:
    """
    An abstract representation of a Item


    @param item_node A item_node taken from an zap xml tree
    """

    def __init__(self, item_node):

        self.node = item_node
        self.id = self.get_text_from_subnode('pluginid')
        self.name = self.get_text_from_subnode('alert')
        self.severity = self.get_text_from_subnode('riskcode')
        self.desc = self.get_text_from_subnode('desc')
        if self.get_text_from_subnode('solution'):
            self.resolution = self.get_text_from_subnode('solution')
        else:
            self.resolution = ''
        self.cwe = []
        self.ref = []
        if self.get_text_from_subnode('reference'):
            links = self.get_text_from_subnode('reference')
            for link in links.split("</p>"):
                link = link.strip().replace("\n", "")
                if link != "":
                    self.ref.append(strip_tags(link))

        if self.get_text_from_subnode('cweid'):
            self.cwe.append("CWE-" + self.get_text_from_subnode('cweid'))

        if self.get_text_from_subnode('wascid'):
            self.ref.append("WASC:" + self.get_text_from_subnode('wascid'))

        self.items = []

        if item_node.find('instances'):
            arr = item_node.find('instances')
        else:
            arr = [item_node]

        for elem in arr:
            uri = elem.find('uri').text
            method = elem.findtext('method', "")
            item = self.parse_uri(uri, method)

            param = elem.findtext("param", "")
            attack = elem.findtext("attack", "")
            if attack and param:
                item["data"] = f"URL:\n {uri}\n Payload:\n {param} = {attack}"
            else:
                item["data"] = f"URL:\n {uri}\n Parameter:\n {param}"

            evidence = elem.findtext("evidence", "")
            if evidence:
                item["data"] = f"URL:\n {uri}\n Parameter:\n {param}\n Evidence:\n {evidence}"
            else:
                item["data"] = f"URL:\n {uri}\n"

            item["pname"] = elem.findtext("param", "")

            self.items.append(item)

    def parse_uri(self, uri, method) -> dict:

        parsed_url = urlparse(uri)
        protocol = parsed_url.scheme
        host = parsed_url.netloc
        port = parsed_url.port
        params = self.extract_params_from_uri(uri)

        return {
            'uri': uri,
            'params': ', '.join(params),
            'host': host,
            'website': f"{protocol}://{host}",
            'protocol': protocol,
            'port': port,
            'method': method,
            'path': parsed_url.path,
            'query': parsed_url.query,
            'data': ""
        }

    @staticmethod
    def extract_params_from_uri(uri):
        params = re.findall(r"(\w+)=", uri)
        return params if params else ''

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text

        return None


class ZapPlugin(PluginXMLFormat):
    """
    Example plugin to parse zap output.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.identifier_tag = "OWASPZAPReport"
        self.id = "Zap"
        self.name = "Zap XML Output Plugin"
        self.plugin_version = "0.0.4"
        self.version = "2.10.0"
        self.framework_version = "1.0.0"
        self.options = None

    def parseOutputString(self, output):
        """
        This method will discard the output the shell sends, it will read it
        from the xml where it expects it to be present.
        """

        parser = ZapXmlParser(output, self.resolve_hostname)

        for site in parser.sites:

            host = []
            if site.host != site.ip:
                host = [site.host]

            if site.ssl == "true":
                service = "https"
            else:
                service = "http"

            h_id = self.createAndAddHost(site.ip, hostnames=host)

            s_id = self.createAndAddServiceToHost(h_id, service, "tcp", ports=[site.port], status='open')

            for item in site.items:
                for instance in item.items:
                    self.createAndAddVulnWebToService(
                        h_id,
                        s_id,
                        item.name,
                        strip_tags(item.desc),
                        website=instance['website'],
                        query=instance['query'],
                        severity=item.severity,
                        path=instance['path'],
                        params=instance['params'],
                        method=instance['method'],
                        ref=item.ref,
                        resolution=strip_tags(item.resolution),
                        data=instance["data"],
                        pname=instance["pname"],
                        external_id="ZAP-" + str(item.id),
                        cwe=item.cwe
                    )

        del parser


def createPlugin(*args, **kwargs):
    return ZapPlugin(*args, **kwargs)