Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / sslyze / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

import re
from faraday_plugins.plugins.plugin import PluginXMLFormat

try:
    from lxml import etree as ET
except ImportError:
    import xml.etree.ElementTree as ET

WEAK_CIPHER_LIST = [
    "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
    "TLS_RSA_WITH_AES_128_CBC_SHA",
    "TLS_RSA_WITH_AES_128_CBC_SHA256",
    "TLS_RSA_WITH_AES_128_GCM_SHA256",
    "TLS_RSA_WITH_AES_256_CBC_SHA",
    "TLS_RSA_WITH_AES_256_CBC_SHA256",
    "TLS_RSA_WITH_AES_256_GCM_SHA384",
    "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
    "TLS_RSA_WITH_CAMELLIA_256_CBC_SHA",
    "TLS_RSA_WITH_CAMELLIA_128_CBC_SHA"
]


class SslyzeXmlParser:

    def __init__(self, xml_output):
        self.parser = self.parse_xml(xml_output)
        self.target = self.get_target(self.parser)
        self.certificate = self.get_hostname_validation(self.parser)
        self.cipher_suite = self.get_weak_cipher_suite(self.parser)
        self.heart_bleed = self.get_heartbleed(self.parser)
        self.open_ssl_ccs = self.get_openssl_ccs(self.parser)

    def parse_xml(self, xml_output):
        try:
            tree = ET.fromstring(xml_output)
            return tree
        except IndexError:
            print("Syntax error")
            return None

    def get_target(self, tree):
        return tree.xpath('//target')

    def get_hostname_validation(self, tree):
        return tree.xpath('//hostnameValidation')

    def get_protocol_name(self, tree):
        protocol_supported = []
        protocols = []
        protocols.append(tree.xpath('//sslv2'))
        protocols.append(tree.xpath('//sslv3'))
        protocols.append(tree.xpath('//tlsv1'))
        protocols.append(tree.xpath('//tlsv1_1'))
        protocols.append(tree.xpath('//tlsv1_2'))
        protocols.append(tree.xpath('//tlsv1_3'))

        for protocol in protocols:
            if protocol[0].attrib['isProtocolSupported'] == "True":
                protocol_supported.append(protocol[0])

        return protocol_supported

    def get_weak_cipher_suite(self, tree):
        protocols = self.get_protocol_name(tree)
        weak_cipher = {}

        for protocol in protocols:
            weak_cipher[protocol.tag] = []
            for ciphers in protocol:
                if ciphers.tag == 'preferredCipherSuite' or ciphers.tag == 'acceptedCipherSuites':
                    for cipher in ciphers:
                        if cipher.attrib['name'] in WEAK_CIPHER_LIST:
                            if not cipher.attrib['name'] in weak_cipher[protocol.tag]:
                                weak_cipher[protocol.tag].append(cipher.attrib['name'])

        return weak_cipher

    def get_heartbleed(self, tree):
        return tree.xpath('//heartbleed')

    def get_openssl_ccs(self, tree):
        return tree.xpath('//openssl_ccs')


class SslyzePlugin(PluginXMLFormat):

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.identifier_tag = "document"
        self.identifier_tag_attributes = {'SSLyzeVersion'}
        self.id = "Sslyze_XML"
        self.name = "Sslyze Plugin"
        self.plugin_version = "0.0.1"
        self.version = "2.0.6"
        self.framework_version = "1.0.0"
        self.options = None
        self._current_output = None
        self._use_temp_file = True
        self._temp_file_extension = "xml"

    def report_belongs_to(self, **kwargs):
        if super().report_belongs_to(**kwargs):
            report_path = kwargs.get("report_path", "")
            with open(report_path) as f:
                output = f.read()
            return re.search("SSLyzeVersion", output) is not None
        return False

    def parseOutputString(self, output):
        parser = SslyzeXmlParser(output)
        host = parser.target[0].attrib['host']
        ip = parser.target[0].attrib['ip']
        port = parser.target[0].attrib['port']
        protocol = parser.target[0].attrib['tlsWrappedProtocol']
        cipher = parser.cipher_suite

        # Creating host
        host_id = self.createAndAddHost(ip)
        # Creating service CHANGE NAME
        service_id = self.createAndAddServiceToHost(
            host_id,
            name=protocol,
            protocol=protocol,
            ports=[port],
        )

        # Checking if certificate matches
        certificate = parser.certificate[0].attrib['certificateMatchesServerHostname']
        server_hostname = parser.certificate[0].attrib['serverHostname']
        if certificate.lower() == 'false':
            self.createAndAddVulnToService(
                host_id,
                service_id,
                name="Certificate mismatch",
                desc=f"Certificate does not match server hostname {server_hostname}",
                severity="info")
        # Ciphers
        cipher = parser.cipher_suite

        for key in cipher:
            for value in cipher[key]:
                self.createAndAddVulnToService(
                    host_id,
                    service_id,
                    name=value,
                    desc=f"In protocol [{key}], weak cipher suite: {value}",
                    severity="low")

        # Heartbleed
        heartbleed = parser.heart_bleed

        if heartbleed[0][0].attrib['isVulnerable'].lower() == 'true':
            self.createAndAddVulnToService(
                host_id,
                service_id,
                name="OpenSSL Heartbleed",
                desc="OpenSSL Heartbleed is vulnerable",
                severity="critical")

        # OpenSsl CCS Injection
        openssl_ccs = parser.open_ssl_ccs

        if openssl_ccs[0][0].attrib['isVulnerable'].lower() == 'true':
            self.createAndAddVulnToService(
                host_id,
                service_id,
                name="OpenSSL CCS Injection",
                desc="OpenSSL CCS Injection is vulnerable",
                severity="medium")


def createPlugin(*args, **kwargs):
    return SslyzePlugin(*args, **kwargs)