"""
Faraday Penetration Test IDE
Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
"""
import re
import xml.etree.ElementTree as ET
from faraday_plugins.plugins.plugin import PluginXMLFormat
__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"
class RetinaXmlParser:
"""
The objective of this class is to parse an xml file generated by the retina tool.
TODO: Handle errors.
TODO: Test retina output version. Handle what happens if the parser doesn't support it.
TODO: Test cases.
@param retina_xml_filepath A proper xml generated by retina
"""
def __init__(self, xml_output):
tree = self.parse_xml(xml_output)
if tree:
self.items = self.get_items(tree)
else:
self.items = []
def parse_xml(self, xml_output):
"""
Open and parse an xml file.
TODO: Write custom parser to just read the nodes that we need instead of
reading the whole file.
@return xml_tree An xml tree instance. None if error.
"""
try:
tree = ET.fromstring(xml_output)
except SyntaxError as err:
print(f"SyntaxError: {err}. {xml_output}")
return None
return tree
def get_items(self, tree):
"""
@return items A list of Host instances
"""
for node in tree.findall("hosts/host"):
yield Item(node)
class Item:
"""
An abstract representation of a Item
@param item_node A item_node taken from an retina xml tree
"""
def __init__(self, item_node):
self.node = item_node
self.ip = self.get_text_from_subnode("ip")
self.hostname = "" if self.get_text_from_subnode(
"dnsName") == "unknown" else self.get_text_from_subnode("dnsName")
self.netbiosname = self.get_text_from_subnode("netBIOSName")
self.netbiosdomain = self.get_text_from_subnode("netBIOSDomain")
self.os = self.get_text_from_subnode("os")
self.mac = self.get_text_from_subnode("mac")
self.vulns = self.getResults(item_node)
self.ports = {}
for v in self.vulns:
if v.port not in self.ports:
self.ports[v.port] = []
self.ports[v.port].append(v)
def getResults(self, tree):
"""
:param tree:
"""
for self.issues in tree.findall("audit"):
yield Results(self.issues)
def get_text_from_subnode(self, subnode_xpath_expr):
"""
Finds a subnode in the host node and the retrieves a value from it.
@return An attribute value
"""
sub_node = self.node.find(subnode_xpath_expr)
if sub_node is not None:
return sub_node.text
return None
class Results:
def __init__(self, issue_node):
self.node = issue_node
self.name = self.get_text_from_subnode('name')
self.description = self.get_text_from_subnode('description')
self.solution = self.get_text_from_subnode('fixInformation')
self.severity = self.get_text_from_subnode('risk')
self.cve = "" if self.get_text_from_subnode(
'cve') == 'N/A' else self.get_text_from_subnode('cve')
self.cce = self.get_text_from_subnode('cce')
self.date = self.get_text_from_subnode('date')
self.pciLevel = self.get_text_from_subnode('pciLevel')
self.pciReason = self.get_text_from_subnode('pciReason')
self.pciPassFail = self.get_text_from_subnode('pciPassFail')
self.cvss2Score = self.get_text_from_subnode('cvssScore')
self.exploit = self.get_text_from_subnode('exploit')
self.context = self.get_text_from_subnode('context')
val = self.context.split(":")
self.port = ""
self.protocol = ""
if len(val) == 2:
if val[0] in ['TCP', 'UDP']:
self.protocol = val[0]
self.port = val[1]
self.desc = self.get_text_from_subnode('description')
self.solution = self.solution if self.solution else ""
self.desc += "\nExploit: " + self.exploit if self.exploit else ""
self.desc += "\nContext: " + self.context if self.context else ""
self.ref = []
self.cvss2 = {}
if self.cvss2Score != "N/A":
self.cvss2["vector_string"] = self.cvss2Score.split(' ')[1].replace('[', '').replace(']', '')
def get_text_from_subnode(self, subnode_xpath_expr):
"""
Finds a subnode in the host node and the retrieves a value from it.
@return An attribute value
"""
sub_node = self.node.find(subnode_xpath_expr)
if sub_node is not None:
return sub_node.text
return None
class RetinaPlugin(PluginXMLFormat):
"""
Example plugin to parse retina output.
"""
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
self.identifier_tag = "scanJob"
self.id = "Retina"
self.name = "Retina XML Output Plugin"
self.plugin_version = "0.0.1"
self.version = "Retina Network 5.19.2.2718"
self.framework_version = "1.0.0"
self.options = None
def parseOutputString(self, output):
parser = RetinaXmlParser(output)
for item in parser.items:
hostname = item.hostname if item.hostname else None
h_id = self.createAndAddHost(item.ip, item.os, hostnames=[hostname])
if not item.netbiosname == 'N/A':
self.createAndAddNoteToHost(
h_id, "netBIOSName", item.netbiosname)
if not item.netbiosdomain == 'N/A':
self.createAndAddNoteToHost(
h_id, "netBIOSDomain", item.netbiosdomain)
for k, vulns in item.ports.items():
if k:
for v in vulns:
cve = v.cve.split(",") if v.cve else []
web = False
s_id = self.createAndAddServiceToHost(h_id, 'unknown', v.protocol.lower(), ports=[str(v.port)],
status="open")
if v.port in ['80', '443'] or re.search("ssl|http", v.name.lower()):
web = True
else:
web = False
if web:
v_id = self.createAndAddVulnWebToService(h_id, s_id, v.name, ref=v.ref,
website=hostname, severity=v.severity,
resolution=v.solution, desc=v.desc, cve=cve, cvss2=v.cvss2)
else:
v_id = self.createAndAddVulnToService(h_id, s_id, v.name, ref=v.ref,
severity=v.severity, resolution=v.solution,
desc=v.desc, cve=cve, cvss2=v.cvss2)
else:
for v in vulns:
cve = v.cve.split(",") if v.cve else []
v_id = self.createAndAddVulnToHost(h_id, v.name, ref=v.ref, severity=v.severity,
resolution=v.solution, desc=v.desc, cve=cve, cvss2=v.cvss2)
del parser
def createPlugin(*args, **kwargs):
return RetinaPlugin(*args, **kwargs)