"""
Faraday Penetration Test IDE
Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
"""
import xml.etree.ElementTree as ET
from faraday_plugins.plugins.plugin import PluginXMLFormat
from faraday_plugins.plugins.plugins_utils import CVE_regex
__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"
class ImpactXmlParser:
"""
The objective of this class is to parse an xml file generated by the impact tool.
TODO: Handle errors.
TODO: Test impact output version. Handle what happens if the parser doesn't support it.
TODO: Test cases.
@param impact_xml_filepath A proper xml generated by impact
"""
def __init__(self, xml_output):
tree = self.parse_xml(xml_output)
if tree:
self.items = self.get_items(tree)
else:
self.items = []
def parse_xml(self, xml_output):
"""
Open and parse an xml file.
TODO: Write custom parser to just read the nodes that we need instead of
reading the whole file.
@return xml_tree An xml tree instance. None if error.
"""
try:
tree = ET.fromstring(xml_output)
except SyntaxError as err:
return None
return tree
def get_items(self, tree):
"""
@return items A list of Host instances
"""
for node in tree.findall("entity/[@class='host']"):
yield Item(node, tree)
class Item:
"""
An abstract representation of a Item
@param item_node A item_node taken from an impact xml tree
"""
def __init__(self, item_node, parent=None):
self.node = item_node
self.arch = self.get_text_from_subnode("property/[@key='arch']")
self.host = self.get_text_from_subnode(
"property/[@key='display_name']")
self.ip = self.get_text_from_subnode("property/[@key='ip']")
self.os = self.get_text_from_subnode(
"property/[@key='os']/property/[@key='entity name']")
self.ports = []
self.services = []
self.process_ports(item_node)
self.process_services(item_node)
self.agent = False
for node in parent.findall("entity/[@class='agent']"):
self.node = node
agentip = node.get('name').split("/")[1]
if self.ip == agentip:
self.agentip = agentip
self.ipfrom = self.get_text_from_subnode(
"property/[@key='Connection Properties']/property/[@key='ip']") or agentip
self.agentype = node.get("type")
self.agentport = self.get_text_from_subnode(
"property/[@key='Connection Properties']//property/[@key='port']") or ""
self.agentsubtype = self.get_text_from_subnode(
"property/[@key='Connection Properties']//property/[@key='subtype']") or ""
self.agentcon = self.get_text_from_subnode(
"property/[@key='Connection Properties']//property/[@key='type']") or ""
self.agent = True
break
self.results = self.getResults(item_node)
def process_ports(self, item_node):
for p in item_node.findall("property/[@key='tcp_ports']/property/[@type='port']"):
self.ports.append({'port': p.get('key'),
'protocol': "tcp",
'status': "open" if p.text == "listen" else p.text})
for p in item_node.findall("property/[@key='udp_ports']/property/[@type='port']"):
self.ports.append({'port': p.get('key'),
'protocol': "udp",
'status': "open" if p.text == "listen" else p.text})
def process_services(self, item_node):
for service in item_node.findall("property/[@key='services']/property"):
service_name = service.get("key")
port, protocol = service.findall('property')[0].get('key').split('-')
self.services.append({
"name": service_name,
"protocol": protocol,
"port": port
})
def getResults(self, tree):
"""
:param tree:
"""
for self.issues in tree.findall("property/[@key='Vulnerabilities']/property/[@type='container']"):
yield Results(self.issues)
# 2017R1 compatibility
for self.issues in tree.findall("property/[@key='exposures']/property/[@type='container']"):
yield Results(self.issues)
def get_text_from_subnode(self, subnode_xpath_expr):
"""
Finds a subnode in the host node and the retrieves a value from it.
@return An attribute value
"""
sub_node = self.node.find(subnode_xpath_expr)
if sub_node is not None:
return sub_node.text
return None
class Results:
def __init__(self, issue_node):
self.node = issue_node
match = CVE_regex.match(issue_node.get("key", ""))
if match:
self.ref = []
self.cve = [match.group()]
else:
self.ref = [issue_node.get("key")]
self.cve = []
self.severity = ""
self.port = "Unknown"
self.service_name = "n/a"
self.protocol = "tcp?"
vuln = issue_node.find("property/property")
if not vuln:
# 2017R1 compatibility
self.ref = []
vuln = issue_node.find("property")
self.name = self.get_text_from_subnode("property/[@key='title']")
self.desc = self.get_text_from_subnode("property/[@key='description']")
self.severity = self.get_text_from_subnode("property/[@key='severity']")
self.service_name = self.get_text_from_subnode("property/[@key='service']")
else:
# 2013R3 xml version
self.name = vuln.get("key")
self.node = vuln
self.desc = self.get_text_from_subnode("property/[@key='description']")
self.port = self.get_text_from_subnode("property/[@key='port']")
def get_text_from_subnode(self, subnode_xpath_expr):
"""
Finds a subnode in the host node and the retrieves a value from it.
@return An attribute value
"""
sub_node = self.node.find(subnode_xpath_expr)
if sub_node is not None:
return sub_node.text
return None
class ImpactPlugin(PluginXMLFormat):
"""
Example plugin to parse impact output.
"""
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
self.identifier_tag = "entities"
self.id = "CoreImpact"
self.name = "Core Impact XML Output Plugin"
self.plugin_version = "0.0.2"
self.version = "Core Impact 2013R1/2017R2"
self.framework_version = "1.0.0"
self.options = None
def parseOutputString(self, output):
parser = ImpactXmlParser(output)
mapped_services = {}
mapped_ports = {}
for item in parser.items:
os_string = f"{item.os} {item.arch}"
h_id = self.createAndAddHost(item.ip, os=os_string, hostnames=[item.host])
for service in item.services:
s_id = self.createAndAddServiceToHost(
h_id,
service['name'],
service['protocol'],
ports=[service['port']],
status='open')
mapped_services[service['name']] = s_id
mapped_ports[service['port']] = s_id
if item.agent:
desc = "Agent Type: " + item.agentype
desc += "\nConn from:" + item.ipfrom
desc += "\nPort:" + item.agentport
desc += "\nProtocol:" + item.agentsubtype
desc += "\nConn:" + item.agentcon
self.createAndAddVulnToHost(
h_id,
"Core Impact Agent",
desc=desc,
severity="HIGH")
for v in item.results:
if v.service_name == "n/a" and v.port == "Unknown":
self.createAndAddVulnToHost(
h_id,
v.name,
desc=v.desc,
severity=v.severity,
ref=v.ref,
cve=v.cve)
else:
s_id = mapped_services.get(v.service_name) or mapped_ports.get(v.port)
self.createAndAddVulnToService(
h_id,
s_id,
v.name,
desc=v.desc,
severity=v.severity,
ref=v.ref,
cve=v.cve)
for p in item.ports:
s_id = self.createAndAddServiceToHost(
h_id,
p['port'],
p['protocol'],
ports=[p['port']],
status=p['status'])
del parser
def createPlugin(*args, **kwargs):
return ImpactPlugin(*args, **kwargs)