"""
Faraday Penetration Test IDE
Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
"""
from re import findall
from urllib.parse import urlsplit
from lxml import etree
from faraday_plugins.plugins.plugin import PluginXMLFormat
from faraday_plugins.plugins.repo.acunetix.DTO import Acunetix, Scan
__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"
class AcunetixXmlParser:
"""
The objective of this class is to parse an xml file generated by
the acunetix tool.
TODO: Handle errors.
TODO: Test acunetix output version. Handle what happens if
the parser doesn't support it.
TODO: Test cases.
@param acunetix_xml_filepath A proper xml generated by acunetix
"""
def __init__(self, xml_output):
tree = self.parse_xml(xml_output)
self.acunetix = Acunetix(tree)
@staticmethod
def parse_xml(xml_output):
"""
Open and parse an xml file.
TODO: Write custom parser to just read the nodes that we need instead
of reading the whole file.
@return xml_tree An xml tree instance. None if error.
"""
try:
parser = etree.XMLParser(recover=True)
tree = etree.fromstring(xml_output, parser=parser)
except SyntaxError as err:
print(f"SyntaxError: {err}. {xml_output}")
return None
return tree
class AcunetixPlugin(PluginXMLFormat):
"""
Example plugin to parse acunetix output.
"""
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
self.identifier_tag = "ScanGroup"
self.id = "Acunetix"
self.name = "Acunetix XML Output Plugin"
self.plugin_version = "0.0.1"
self.version = "9"
self.framework_version = "1.0.0"
self.options = None
self._current_output = None
self.target = None
def parseOutputString(self, output):
"""
This method will discard the output the shell sends, it will read it
from the xml where it expects it to be present.
NOTE: if 'debug' is true then it is being run from a test case and the
output being sent is valid.
"""
parser = AcunetixXmlParser(output)
for site in parser.acunetix.scan:
url_data = self.get_domain(site)
if not url_data:
continue
if url_data.hostname:
self.old_structure(url_data, site)
else:
self.new_structure(site)
def new_structure(self, site):
for item in site.reportitems.reportitem:
if not item.technicaldetails.request:
self.logger.warning("No request data")
continue
request_host = findall('Host: (.*)', item.technicaldetails.request)
if request_host:
host = request_host[0]
url = f'http://{host}'
url_data = urlsplit(url)
site_ip = self.resolve_hostname(host)
h_id = self.createAndAddHost(site_ip, site.os, hostnames=[host])
s_id = self.createAndAddServiceToHost(
h_id,
"http",
"tcp",
ports=['443'],
version=site.banner,
status='open')
self.create_vul(item, h_id, s_id, url_data)
else:
self.logger.warning("No host in request")
def old_structure(self, url_data, site: Scan):
site_ip = self.resolve_hostname(url_data.hostname)
if url_data.hostname:
hostnames = [url_data.hostname]
else:
hostnames = None
port = url_data.port or (443 if url_data.scheme == "https" else 80)
h_id = self.createAndAddHost(site_ip, site.os, hostnames=hostnames)
s_id = self.createAndAddServiceToHost(
h_id,
"http",
"tcp",
ports=[port],
version=site.banner,
status='open')
for item in site.reportitems.reportitem:
self.create_vul(item, h_id, s_id, url_data)
def create_vul(self, item, h_id, s_id, url_data):
description = item.description
cvss3 = {}
if item.cvss3.node is not None:
cvss3['vector_string'] = item.cvss3.descriptor
cvss2 = {}
if item.cvss.node is not None:
cvss2['vector_string'] = item.cvss.descriptor
if item.affects:
description += f'\nPath: {item.affects}'
if item.parameter:
description += f'\nParameter: {item.parameter}'
try:
cve = [item.cvelist.cve.text if item.cvelist.cve else ""]
except Exception:
cve = []
try:
cwe = [item.cwelist.cwe.text if item.cwelist.cwe else ""]
except:
cwe = []
self.createAndAddVulnWebToService(
h_id,
s_id,
item.name,
description,
website=url_data.hostname,
severity=item.severity,
resolution=item.recommendation,
path=item.affects,
params=item.parameter,
request=item.technicaldetails.request,
response=item.technicaldetails.response,
ref=[i.url for i in item.references.reference],
cve=cve,
cwe=cwe,
cvss2=cvss2,
cvss3=cvss3)
@staticmethod
def get_domain(scan: Scan):
url = scan.start_url
if not url.startswith('http'):
url = f'http://{url}'
url_data = urlsplit(url)
if not url_data.scheme:
url_data = urlsplit(scan.crawler.start_url_attr)
return url_data
def createPlugin(*args, **kwargs):
return AcunetixPlugin(*args, **kwargs)