Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / prowler / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

"""
Faraday Penetration Test IDE
Copyright (C) 2020  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information

"""
from dateutil.parser import parse
import re
import json
from datetime import datetime
from dataclasses import dataclass
from faraday_plugins.plugins.plugin import PluginMultiLineJsonFormat

__author__ = "Nicolas Rebagliati"
__copyright__ = "Copyright (c) 2020, Infobyte LLC"
__credits__ = ["Nicolas Rebagliati"]
__license__ = ""
__version__ = "0.0.1"
__maintainer__ = "Nicolas Rebagliati"
__email__ = "[email protected]"
__status__ = "Development"
CHECK_NUMBER_REGEX = re.compile(r"^(\[check\d\])")

@dataclass
class Issue:
    region: str
    profile: str
    severity: str
    scored: str
    account: str
    message: str
    control: str
    status: str
    level: str
    control_id: str
    timestamp: datetime
    compliance: str
    service: str
    caf_epic: str
    risk: str
    doc_link: str
    remediation: str
    resource_id: str


class ProwlerJsonParser:

    def parse_issues(self, records):
        for record in records:
            json_data = json.loads(record)
            region = json_data.get("Region", "AWS_REGION")
            profile = json_data.get("Profile", "")
            severity = json_data.get("Severity", "info").lower()
            scored = json_data.get("Status", "")
            account = json_data.get("Account Number", "")
            message = json_data.get("Message", "")
            control = CHECK_NUMBER_REGEX.sub("", json_data.get("Control", "")).strip()
            status = json_data.get("Status", "")
            level = json_data.get("Level", "")
            control_id = json_data.get("Control ID", "")
            timestamp = json_data.get("Timestamp", None)
            if timestamp:
                timestamp = parse(timestamp)
            compliance = json_data.get("Compliance", "")
            service = json_data.get("Service", "")
            caf_epic = [json_data.get("CAF Epic", "")]
            risk = json_data.get("Risk", "")
            doc_link = json_data.get("Doc link", "")
            remediation = json_data.get("Remediation", "")
            resource_id = json_data.get("Resource ID", "")
            if status == "FAIL":
                self.issues.append(Issue(region=region, profile=profile, severity=severity, scored=scored,
                                         account=account, message=message, control=control, status=status,
                                         level=level, control_id=control_id, timestamp=timestamp, compliance=compliance,
                                         service=service, caf_epic=caf_epic, risk=risk, doc_link=doc_link,
                                         remediation=remediation, resource_id=resource_id))

    def __init__(self, json_output):
        self.issues = []
        self.parse_issues(json_output.splitlines())


class ProwlerPlugin(PluginMultiLineJsonFormat):
    """ Handle the AWS Prowler tool. Detects the output of the tool
    and adds the information to Faraday.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.id = "prowler"
        self.name = "Prowler"
        self.plugin_version = "0.1"
        self.version = "0.0.1"
        self.json_keys = {"Profile", "Account Number", "Region"}

    def parseOutputString(self, output, debug=False):
        parser = ProwlerJsonParser(output)
        for issue in parser.issues:
            host_name = f"{issue.service}-{issue.account}-{issue.region}"
            host_id = self.createAndAddHost(name=host_name,
                                            description=f"AWS Service: {issue.service} - Account: {issue.account}"
                                                        f" - Region: {issue.region}")

            vuln_desc = f"{issue.risk}\nCompliance: {issue.compliance}\nMessage: {issue.message}"
            self.createAndAddVulnToHost(host_id=host_id, name=issue.control, desc=vuln_desc,
                                        data=f"Resource ID: {issue.resource_id}",
                                        severity=self.normalize_severity(issue.severity), resolution=issue.remediation,
                                        run_date=issue.timestamp, external_id=f"{self.name.upper()}-{issue.control_id}",
                                        ref=[issue.doc_link],
                                        policyviolations=issue.caf_epic)


def createPlugin(*args, **kwargs):
    return ProwlerPlugin(*args, **kwargs)