""""
Faraday Penetration Test IDE
Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
"""
import sys
import re
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
from faraday_plugins.plugins.plugin import PluginXMLFormat
from faraday_plugins.plugins.plugins_utils import CVE_regex
__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"
class NetsparkerXmlParser:
"""
The objective of this class is to parse an xml file generated by the netsparker tool.
TODO: Handle errors.
TODO: Test netsparker output version. Handle what happens if the parser doesn't support it.
TODO: Test cases.
@param netsparker_xml_filepath A proper xml generated by netsparker
"""
def __init__(self, xml_output, plugin):
self.filepath = xml_output
self.plugin = plugin
tree = self.parse_xml(xml_output)
if tree:
self.items = [data for data in self.get_items(tree)]
else:
self.items = []
def parse_xml(self, xml_output):
"""
Open and parse an xml file.
TODO: Write custom parser to just read the nodes that we need instead of
reading the whole file.
@return xml_tree An xml tree instance. None if error.
"""
try:
tree = ET.fromstring(xml_output)
except SyntaxError as err:
self.plugin.logger.error(f"SyntaxError: {err}. {xml_output}")
return None
return tree
def get_items(self, tree):
"""
@return items A list of Host instances
"""
for node in tree.findall("vulnerability"):
yield Item(node)
class Item:
"""
An abstract representation of a Item
@param item_node A item_node taken from an netsparker xml tree
"""
def __init__(self, item_node):
self.node = item_node
self.url = self.get_text_from_subnode("url")
host = re.search(
r"(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]"
r"{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2"
r"[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]"
r"{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|"
"pro|aero|coop|museum|[a-zA-Z]{2}))[\\:]*([0-9]+)*([/]*($|[\\(\\)a-zA-Z0-9\\.\\,\\?\'\\\\+&%\\$#\\=~_\\-]+)).*?$",
self.url)
self.protocol = host.group(1)
self.hostname = host.group(4)
self.port = 80
if self.protocol == 'https':
self.port = 443
if host.group(11) is not None:
self.port = host.group(11)
self.name = self.get_text_from_subnode("type")
self.name_title = self.get_text_from_subnode("title")
self.desc = self.get_text_from_subnode("description")
self.severity = self.re_map_severity(self.get_text_from_subnode("severity"))
self.certainty = self.get_text_from_subnode("certainty")
self.method = self.get_text_from_subnode("vulnerableparametertype")
self.param = self.get_text_from_subnode("vulnerableparameter")
self.paramval = self.get_text_from_subnode("vulnerableparametervalue")
self.reference = self.get_text_from_subnode("externalReferences")
remedy = self.get_text_from_subnode("remedy")
self.resolution = self.get_text_from_subnode("actionsToTake") + remedy if remedy else ""
self.request = self.get_text_from_subnode("rawrequest")
self.response = self.get_text_from_subnode("rawresponse")
self.kvulns = []
self.cve = []
for v in self.node.findall("knownvulnerabilities/knownvulnerability"):
self.node = v
recheck = CVE_regex.search(v.find('title').text)
if recheck:
self.cve.append(recheck.group())
self.kvulns.append(self.get_text_from_subnode(
"severity") + "-" + self.get_text_from_subnode("title"))
self.extra = []
for v in item_node.findall("extrainformation/info"):
name_tag = v.find('name')
value_tag = v.find('value')
if name_tag is not None:
self.extra.append(f"{name_tag.text}:{value_tag.text}")
self.node = item_node
self.node = item_node.find("classification")
self.owasp = self.get_text_from_subnode("OWASP")
self.wasc = self.get_text_from_subnode("WASC")
self.cwe = self.get_text_from_subnode("CWE")
self.capec = self.get_text_from_subnode("CAPEC")
self.pci = self.get_text_from_subnode("PCI")
self.pci2 = self.get_text_from_subnode("PCI2")
self.node = item_node.find("classification/CVSS")
self.cvss_full_vector = self.get_text_from_subnode("vector")
self.cvss_score = self.get_text_from_subnode("score[1]/value") if self.get_text_from_subnode("score[1]/value") else None
self.cvss3 = {}
self.ref = []
if self.cwe:
self.cwe = ["CWE-" + self.cwe]
if self.owasp:
self.ref.append("OWASP-" + self.owasp)
if self.reference:
self.ref.extend(sorted(set(re.findall(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+', self.reference))))
if self.cvss_full_vector:
self.cvss3["vector_string"] = self.cvss_full_vector
self.data = ""
self.data += "\nKnowVulns: " + \
"\n".join(self.kvulns) if self.kvulns else ""
self.data += "\nWASC: " + self.wasc if self.wasc else ""
self.data += "\nCertainty: " + self.certainty if self.certainty else ""
self.data += "\nPCI: " + self.pci if self.pci else ""
self.data += "\nPCI2: " + self.pci2 if self.pci2 else ""
self.data += "\nCAPEC: " + self.capec if self.capec else ""
self.data += "\nPARAM: " + self.param if self.param else ""
self.data += "\nPARAM VAL: " + \
repr(self.paramval) if self.paramval else ""
self.data += "\nExtra: " + "\n".join(self.extra) if self.extra else ""
@staticmethod
def re_map_severity(severity):
if severity == "Important":
return "high"
return severity
def get_text_from_subnode(self, subnode_xpath_expr):
"""
Finds a subnode in the host node and the retrieves a value from it.
@return An attribute value
"""
if self.node:
sub_node = self.node.find(subnode_xpath_expr)
if sub_node is not None:
return sub_node.text
return None
class NetsparkerPlugin(PluginXMLFormat):
"""
Example plugin to parse netsparker output.
"""
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
self.identifier_tag = "netsparker"
self.id = "Netsparker"
self.name = "Netsparker XML Output Plugin"
self.plugin_version = "0.0.1"
self.version = "Netsparker 3.1.1.0"
self.framework_version = "1.0.0"
self.options = None
def parseOutputString(self, output):
parser = NetsparkerXmlParser(output, self)
host_names_resolve = {}
for i in parser.items:
if i.hostname not in host_names_resolve:
ip = self.resolve_hostname(i.hostname)
host_names_resolve[i.hostname] = ip
else:
ip = host_names_resolve[i.hostname]
h_id = self.createAndAddHost(ip, hostnames=[i.hostname])
s_id = self.createAndAddServiceToHost(h_id, str(i.port),
protocol = str(i.protocol),
ports=[str(i.port)],
status="open")
if i.resolution is not None:
resolution = BeautifulSoup(i.resolution, "lxml").text
else:
resolution = ""
if i.desc is not None:
desc = BeautifulSoup(i.desc, "lxml").text
else:
desc = ""
if i.name_title is None:
name = i.name
else:
name = i.name_title
self.createAndAddVulnWebToService(h_id, s_id, name, ref=i.ref, website=i.hostname,
severity=i.severity, desc=desc, path=i.url, method=i.method,
request=i.request, response=i.response, resolution=resolution,
pname=i.param, data=i.data, cve=i.cve, cwe=i.cwe, cvss3=i.cvss3)
del parser
def createPlugin(*args, **kwargs):
return NetsparkerPlugin(*args, **kwargs)