Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / fortify / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

import base64
import io
import re
import html
from zipfile import ZipFile

import html2text
from lxml import objectify
from faraday_plugins.plugins.plugin import PluginByExtension


class FortifyPlugin(PluginByExtension):
    """
    Example plugin to parse nmap output.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.id = "Fortify"
        self.name = "Fortify XML Output Plugin"
        self.plugin_version = "0.0.1"
        self.extension = ".fpr"
        self.open_options = {"mode": "rb"}

    def _process_fvdl_vulns(self, fp):

        for host in fp.hosts.keys():
            fp.hosts[host] = self.createAndAddHost(host)

        for vuln_key, vuln in fp.vulns.items():
            self.createAndAddVulnToHost(
                host_id=fp.hosts[vuln['host']],
                name=vuln['name'],
                desc=fp.format_description(vuln_key),
                ref=fp.descriptions[vuln['class']]['references'],
                severity=vuln['severity'],
                resolution="",
                data="",
                external_id=vuln_key.text
            )

    def _process_webinspect_vulns(self, fp):
        for vuln_data in fp.sast_vulns:
            host_id = self.createAndAddHost(
                vuln_data['host'] or vuln_data['website'])

            service_name = vuln_data['service'].get('name', 'unknown')
            protocol_name = 'line number'
            if vuln_data['service']['port'] == '443':
                service_name = 'https'
                protocol_name = 'tcp'
            if vuln_data['service']['port'] == '80':
                service_name = 'http'
                protocol_name = 'tcp'

            service_id = self.createAndAddServiceToHost(
                host_id,
                service_name,
                protocol=protocol_name,
                ports=[vuln_data['service']['port']])
            self.createAndAddVulnWebToService(
                host_id, service_id,
                vuln_data['name'],
                website=vuln_data['website'] or '',
                path=vuln_data['path'] or '',
                query=vuln_data['query'] or '',
                method=vuln_data['method'] or '',
                request=vuln_data['request'] or '',
                ref=vuln_data['references'],
                response=vuln_data['response'] or '',
                desc=vuln_data['description'],
                #resolution=vuln_data[''],
                severity=vuln_data['severity']
            )

    def parseOutputString(self, output):
        fp = FortifyParser(output)
        if fp.fvdl is not None:
            self._process_fvdl_vulns(fp)
        if fp.webinspect is not None:
            self._process_webinspect_vulns(fp)

        return True


class FortifyParser:
    """
    Parser for fortify on demand
    """

    def __init__(self, output):
        self.vulns = {}
        self.sast_vulns = []
        self.hosts = {}
        self.fvdl = None
        self.webinspect = None
        self.audit = None
        self.suppressed = []
        self.vuln_classes = []
        self.descriptions = {}

        self._uncompress_fpr(output)
        self._extract_vulns()
        self._prepare_description_templates()

        # regexes used in format_description
        self.remove_extra_chars = re.compile(r'&(\w*);')
        self.replacements_idx = re.compile(r'<Replace key="(.*?)"[\s\/].*?>')
        self.replacements_holders = re.compile(r'<Replace key=".*?"[\s\/].*?>')
        self.replacements_idx2 = re.compile(r'<Replace key="(.*?)"(\slink="(.*?)")?[\s\/].*?>')

    def _uncompress_fpr(self, output):
        with ZipFile(io.BytesIO(output)) as fprcontent:
            try:
                self.fvdl = objectify.fromstring(fprcontent.read('audit.fvdl'))
            except KeyError:
                pass
            try:
                self.webinspect = objectify.fromstring(fprcontent.read('webinspect.xml'))
            except KeyError:
                pass
            try:
                self.audit = objectify.fromstring(fprcontent.read('audit.xml'))
            except KeyError:
                pass

    def _process_fvdl(self):
        for vuln in self.fvdl.Vulnerabilities.iterchildren():

            vulnID = vuln.InstanceInfo.InstanceID

            if vulnID in self.suppressed:
                continue

            self.vulns[vulnID] = {}

            # the last children of Primary (Entry tags) always contains vuln filename ,path and line
            _last_entry = None
            for _last_entry in vuln.AnalysisInfo.Unified.Trace.Primary.iterchildren():
                pass

            path = _last_entry.Node.SourceLocation.get('path')

            self.vulns[vulnID]['host'] = path
            self.vulns[vulnID]['name'] = "{} {}".format(vuln.ClassInfo.Type,
                                                        getattr(vuln.ClassInfo, "Subtype", ""))
            self.vulns[vulnID]['class'] = vuln.ClassInfo.ClassID
            self.vulns[vulnID]['replacements'] = {}

            self.vulns[vulnID]['severity'] = self.calculate_severity(vuln)

            # placeholder for storing hosts ids when created in main plugin method
            if path not in self.hosts.keys():
                self.hosts[path] = None

            if vuln.ClassInfo.ClassID not in self.vuln_classes:
                self.vuln_classes.append(vuln.ClassInfo.ClassID)

            # fortify bug that when it has no replacements, shows blank in fortify dashboard
            if not hasattr(vuln.AnalysisInfo.Unified, "ReplacementDefinitions"):
                self.vulns[vulnID]['replacements'] = None
                continue

            try:
                getattr(vuln.AnalysisInfo.Unified, "ReplacementDefinitions")

                for repl in vuln.AnalysisInfo.Unified.ReplacementDefinitions.iterchildren(
                        tag="{xmlns://www.fortifysoftware.com/schema/fvdl}Def"):

                    repl_val = repl.get('key')
                    if repl.get('link'):
                        repl_val = repl.get('link')

                    self.vulns[vulnID]['replacements'][repl_val] = repl.get('value')
            except AttributeError:
                self.vulns[vulnID]['replacements'] = None

    def _process_webinspect(self):
        for session in self.webinspect.getchildren():
            hostname = session.Host.text
            port = session.Port.text
            service_data = {}
            if port:
                service_data['port'] = port

            path = session.Request.Path.text
            query = session.Request.FullQuery.text
            method = session.Request.Method.text
            request = ''
            if session.RawRequest.text:
                request = base64.b64decode(session.RawRequest.text)
            response = ''
            if session.RawResponse.text:
                response = base64.b64decode(session.RawResponse.text)
            status_code = session.Response.StatusCode.text

            for issues in session.Issues:
                for issue_data in issues.getchildren():
                    params = ''
                    check_type = issue_data.CheckTypeID
                    if check_type.text.lower() != 'vulnerability':
                        # TODO: when plugins accept tags, we should this as a tag.
                        pass
                    name = issue_data.Name.text
                    external_id = issue_data.VulnerabilityID.text
                    faraday_severities = {
                        0: 'info',
                        1: 'low',
                        2: 'med',
                        3: 'high',
                        4: 'critical'
                    }
                    severity = faraday_severities[issue_data.Severity]
                    references = []
                    try:
                        classifications = issue_data.Classifications.getchildren()
                    except AttributeError:
                        classifications = []

                    for classification in classifications:
                        references.append(classification.text)

                    # Build description
                    description = ''
                    for report_section in issue_data.findall('./ReportSection'):
                        description += f'{report_section.Name.text} \n'
                        description += f'{report_section.SectionText.text} \n'
                    description += f'{issue_data.get("id")} \n'

                    h = html2text.HTML2Text()
                    description = h.handle(description)

                    for repro_step in issue_data.findall('./ReproSteps'):
                        step = repro_step.ReproStep

                        if step is not None:
                            try:
                                params = step.PostParams.text
                            except AttributeError:
                                pass

                        if not hostname:
                            # This seems to be a mobile app
                            hostname = session.URL.text

                        if not port:
                            service_data['name'] = step.Url.text
                            service_data['port'] = step.sourceline

                        self.sast_vulns.append({
                            "host": hostname,
                            "severity": severity,
                            "service": service_data,
                            "name": name,
                            "description": description,
                            "external_id": external_id,
                            "references": references,
                            "method": method,
                            "query": query,
                            "response": response,
                            "request": request.decode('utf-8'),
                            "path": path,
                            "params": params,
                            "status_code": status_code,
                            "website": session.URL.text
                        })

    def _extract_vulns(self):
        # make list of false positives
        try:
            issue_list = self.audit.IssueList.iterchildren()
        except AttributeError:
            issue_list = []

        for issue in issue_list:
            if issue.get('suppressed', 'false').lower() == 'true':
                self.suppressed.append(issue.get('instanceId'))

        if self.fvdl is not None:
            self._process_fvdl()

        if self.webinspect is not None:
            self._process_webinspect()

    def calculate_severity(self, vuln):

        severity = None  # ["critical", "high", "medium", "low", "informational", "unclassified"]
        rulepath = objectify.ObjectPath("FVDL.EngineData.RuleInfo.Rule")
        impact = None
        probability = None
        accuracy = None

        # XML path /FVDL/EngineData/RuleInfo/Rule (many)/MetaInfo/Group (many) the attribute "name"
        # are keys for vuln properties

        for rule in rulepath(self.fvdl):
            if rule.get('id') == vuln.ClassInfo.ClassID:
                for group in rule.MetaInfo.iterchildren():
                    if group.get('name') == "Probability":
                        probability = group
                    if group.get('name') == "Impact":
                        impact = group
                    if group.get('name') == "Accuracy":
                        accuracy = group

        likelihood = (accuracy * vuln.InstanceInfo.Confidence * probability) / 25

        if impact and probability:

            if impact >= 2.5 and likelihood >= 2.5:
                severity = 'critical'
            elif impact >= 2.5 > likelihood:
                severity = 'high'
            elif impact < 2.5 <= likelihood:
                severity = 'medium'
            elif impact < 2.5 and likelihood < 2.5:
                severity = 'low'
        else:
            print("missing severity")

        # print("{}:{}:{}".format(vuln.InstanceInfo.InstanceID, vuln.InstanceInfo.InstanceSeverity, severity))
        return severity

    def concat_vuln_name(self, vuln):
        return "{} {} {}:{}".format(vuln.ClassInfo.Type, vuln.ClassInfo.Subtype,
                                    self.vulns[vuln.InstanceInfo.InstanceID]['filename'],
                                    self.vulns[vuln.InstanceInfo.InstanceID]['line'])

    def _prepare_description_templates(self):
        if self.fvdl is None:
            return
        for description in self.fvdl.Description:
            class_id = description.get("classID")
            self.descriptions[class_id] = {}
            if class_id not in self.vuln_classes:
                continue
            if hasattr(description, 'Tips'):
                tips = "\n".join(map(lambda x: x.text, description.Tips.getchildren()))
            else:
                tips = ""
            text = f"Summary:\n{description.Abstract}\n\nExplanation:\n{description.Explanation}\n\nRecommendations:\n{description.Recommendations}\n\nTips:{tips}"
            self.descriptions[description.get("classID")]['text'] = html.unescape(text)
            # group vuln references
            references = []
            if hasattr(description, "References"):
                references_elements = description.References.getchildren()
                for reference in references_elements:
                    for children in reference.getchildren():
                        name = children.tag.split("}")[1]
                        value = children.text
                        references.append(f"{name}: {value}")
            self.descriptions[description.get("classID")]['references'] = references

    def format_description(self, vulnID):

        text = self.descriptions[self.vulns[vulnID]['class']]['text']
        replacements = self.vulns[vulnID]['replacements']
        if not replacements:
            return text

        # special chars that must shown as-is, have the hmtlentity value duplicated
        text = self.remove_extra_chars.sub(r"&\1;", text)

        for placeholder in self.replacements_holders.findall(text, re.MULTILINE):

            torepl = '<Replace key="{}"/>'
            match = self.replacements_idx2.search(placeholder)

            replace_with = ""
            if match:
                idx = match.group(1)
                if match.group(3):
                    idx = match.group(3)
                    _filekey = f"{idx}.file"
                    _linekey = f"{idx}.line"
                    text = text.replace(placeholder, "").replace(
                        torepl.format(_filekey), replacements[_filekey]).replace(
                        torepl.format(_linekey), replacements[_linekey])
                    continue

                try:
                    replace_with = replacements[idx]
                except KeyError:
                    # Nothing to replace, use empty string
                    text = text.replace(placeholder, "")

            text = text.replace(placeholder, replace_with)

        text += f'{text}\n Instance ID: {vulnID} \n'
        h = html2text.HTML2Text()
        return text


def createPlugin(*args, **kwargs):
    return FortifyPlugin(*args, **kwargs)