Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / nuclei / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

"""
Faraday Penetration Test IDE
Copyright (C) 2020  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information

"""
import subprocess
import re
import sys
import json
from dateutil.parser import parse
from urllib.parse import urlparse
from packaging import version
from faraday_plugins.plugins.plugin import PluginMultiLineJsonFormat

__author__ = "Nicolas Rebagliati"
__copyright__ = "Copyright (c) 2021, Infobyte LLC"
__credits__ = ["Nicolas Rebagliati"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Nicolas Rebagliati"
__email__ = "[email protected]"
__status__ = "Development"


class NucleiPlugin(PluginMultiLineJsonFormat):
    """ Handle the Nuclei tool. Detects the output of the tool
    and adds the information to Faraday.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.id = "nuclei"
        self.name = "Nuclei"
        self.plugin_version = "1.0.3"
        self.version = "2.5.5"
        self.json_keys = {"matched-at", "template-id", "host"}
        self._command_regex = re.compile(r'^(sudo nuclei|nuclei|\.\/nuclei|^.*?nuclei)\s+.*?')
        self.xml_arg_re = re.compile(r"^.*(-o\s*[^\s]+).*$")
        self._use_temp_file = True
        self._temp_file_extension = "json"

    def parseOutputString(self, output, debug=False):
        for vuln_json in filter(lambda x: x != '', output.split("\n")):
            vuln_dict = json.loads(vuln_json)
            host = vuln_dict.get('host')
            url_data = urlparse(host)
            if url_data.hostname is None:
                host = 'http://' + host
                url_data = urlparse(host)
            ip = vuln_dict.get("ip")
            if not ip:
                ip = self.resolve_hostname(url_data.hostname)
            host_id = self.createAndAddHost(
                name=ip,
                hostnames=[url_data.hostname])
            port = url_data.port
            if not port:
                if url_data.scheme == 'https':
                    port = 443
                else:
                    port = 80
            service_id = self.createAndAddServiceToHost(
                host_id,
                name=url_data.scheme,
                ports=port,
                protocol="tcp",
                status='open',
                version='',
                description='web server')
            matched = vuln_dict.get('matched-at', '')
            if matched:
                matched_data = urlparse(matched)
            else:
                print('Version not supported, use nuclei 2.5.3 or higher')
                sys.exit(1)
            reference = vuln_dict["info"].get('reference', [])
            if not reference:
                reference = []
            else:
                if isinstance(reference, str):
                    if re.match('^- ', reference):
                        reference = list(filter(None, [re.sub('^- ', '', elem) for elem in reference.split('\n')]))
                    else:
                        reference = [reference]
            references = vuln_dict["info"].get('references', [])
            if references:
                if isinstance(references, str):
                    if re.match('^- ', references):
                        references = list(filter(None, [re.sub('^- ', '', elem) for elem in references.split('\n')]))
                    else:
                        references = [references]
            else:
                references = []

            cve = vuln_dict['info'].get('classification', {}).get('cve-id', [])
            if cve:
                cve = [x.upper() for x in cve]

            # TODO CVSSv2, CVSSv3, CWE and CAPEC
            #cvssv2 = vuln_dict['info'].get('classification', {}).get('cvss-score')
            #cvssv3 = vuln_dict['info'].get('classification', {}).get('cvss-metrics')
            cwe = vuln_dict['info'].get('classification', {}).get('cwe-id', [])
            cwe = [x.upper() for x in cwe]
            #capec = vuln_dict['info'].get('metadata', {}).get('capec', [])
            #if isinstance(capec, str):
            #    capec = capec.upper().split(',')

            refs = sorted(list(set(reference + references)))
            refs = list(filter(None, refs))

            tags = vuln_dict['info'].get('tags', [])
            if isinstance(tags, str):
                tags = tags.split(',')

            impact = {}
            impacted = vuln_dict['info'].get('metadata', {}).get('impact')
            if isinstance(impacted, str):
                for x in impacted.split(','):
                    impact[x] = True

            resolution = vuln_dict['info'].get('metadata', {}).get('resolution', '')
            easeofresolution = vuln_dict['info'].get('metadata', {}).get('easeofresolution', None)

            request = vuln_dict.get('request', '')
            if request:
                method = request.split(" ")[0]
            else:
                method = ""

            data = [f"Matched: {vuln_dict.get('matched-at')}",
                    f"Tags: {vuln_dict['info'].get('tags', '')}",
                    f"Template ID: {vuln_dict.get('template-id', '')}"]

            name = vuln_dict["info"].get("name")
            run_date = vuln_dict.get('timestamp')
            if run_date:
                run_date = parse(run_date)
            self.createAndAddVulnWebToService(
                host_id,
                service_id,
                name=name,
                desc=vuln_dict["info"].get("description", name),
                ref=refs,
                severity=vuln_dict["info"].get('severity'),
                tags=tags,
                impact=impact,
                resolution=resolution,
                easeofresolution=easeofresolution,
                cve=cve,
                # TODO CVSSv2, CVSSv3, CWE and CAPEC
                #cvssv2=cvssv2,
                #cvssv3=cvssv3,
                cwe=cwe,
                #capec=capec,
                website=host,
                request=request,
                response=vuln_dict.get('response', ''),
                method=method,
                query=matched_data.query,
                params=matched_data.params,
                path=matched_data.path,
                data="\n".join(data),
                external_id=f"NUCLEI-{vuln_dict.get('template-id', '')}",
                run_date=run_date
            )

    def processCommandString(self, username, current_path, command_string):
        super().processCommandString(username, current_path, command_string)
        arg_match = self.xml_arg_re.match(command_string)
        if arg_match is None:
            return re.sub(r"(^.*?nuclei)",
                          r"\1 --json -irr -o %s" % self._output_file_path,
                          command_string)
        else:
            return re.sub(arg_match.group(1),
                          r"--json -irr -o %s" % self._output_file_path,
                          command_string)

    def canParseCommandString(self, current_input):
        can_parse = super().canParseCommandString(current_input)
        if can_parse:
            try:
                proc = subprocess.Popen([self.command, '-version'], stderr=subprocess.PIPE)
                output = proc.stderr.read()
                match = re.search(r"Current Version: ([0-9.]+)", output.decode('UTF-8'))
                if match:
                    nuclei_version = match.groups()[0]
                    return version.parse(nuclei_version) >= version.parse("2.5.3")
                else:
                    return False
            except Exception as e:
                return False


def createPlugin(*args, **kwargs):
    return NucleiPlugin(*args, **kwargs)