"""
Faraday Penetration Test IDE
Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
"""
import xml.etree.ElementTree as ET
from dateutil.parser import parse
from faraday_plugins.plugins.plugin import PluginXMLFormat
__author__ = "Blas"
__copyright__ = "Copyright (c) 2019, Infobyte LLC"
__credits__ = ["Blas", "Nicolas Rebagliati"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Blas"
__email__ = "[email protected]"
__status__ = "Development"
from faraday_plugins.plugins.repo.nessus.DTO import ReportHost, Report, ReportItem
class NessusParser:
"""
The objective of this class is to parse an xml file generated by the nessus tool.
TODO: Handle errors.
TODO: Test nessus output version. Handle what happens if the parser doesn't support it.
TODO: Test cases.
@param nessus_filepath A proper simple report generated by nessus
"""
def __init__(self, output):
self.tree = ET.fromstring(output)
self.report = []
if self.tree:
self.report = self.__get_report()
def __get_report(self) -> Report:
report = self.tree.find('Report')
return Report(report) if report else None
class NessusPlugin(PluginXMLFormat):
"""
Example plugin to parse nessus output.
"""
def __init__(self, *arg, **kwargs):
super().__init__(*arg, **kwargs)
self.extension = ".nessus"
self.identifier_tag = "NessusClientData_v2"
self.id = "Nessus"
self.name = "Nessus XML Output Plugin"
self.plugin_version = "0.0.1"
self.version = "5.2.4"
self.framework_version = "1.0.1"
self.options = None
@staticmethod
def parse_compliance_data(data: dict):
compliance_data = {}
for key, value in data.items():
if 'compliance-' in key:
compliance_name = key.split("}")[-1]
compliance_data[compliance_name] = value
return compliance_data
def map_properties(self, host: ReportHost):
if self.hostname_resolution:
name = host.host_properties.host_ip if host.host_properties.host_ip else host.name
else:
name = host.name
hostnames = [host.host_properties.host_fqdn]
if host.host_properties.host_rdns and host.host_properties.host_rdns not in hostnames:
hostnames.append(host.host_properties.host_rdns)
return {
"name": name,
"hostnames": hostnames,
"mac": host.host_properties.mac_address,
"os": host.host_properties.operating_system
}
@staticmethod
def map_item(host_id, run_date, plugin_name, item: ReportItem) -> dict:
data = item.plugin_output
data += f'{item.exploit_available}'
return {
"host_id": host_id,
"name": plugin_name,
"severity": item.risk_factor,
"data": data,
"external_id": item.plugin_id_attr,
"run_date": run_date,
"desc": item.description,
"resolution": item.solution,
"ref": [],
}
def map_policy_general(self, kwargs, item: ReportItem):
kwargs.update({"policyviolations": []})
if item.plugin_family_attr == 'Policy Compliance':
data = item.get_data()
bis_benchmark_data = kwargs["desc"].split('\n')
compliance_data = self.parse_compliance_data(data)
compliance_info = compliance_data.get('compliance-info', '')
if compliance_info and not kwargs["desc"]:
kwargs["desc"] = compliance_info
compliance_reference = compliance_data.get(
'compliance-reference', '').replace('|', ':').split(',')
compliance_result = compliance_data.get('compliance-result', '')
for reference in compliance_reference:
kwargs["ref"].append(reference)
compliance_check_name = compliance_data.get('compliance-check-name', '')
compliance_solution = compliance_data.get('compliance-solution', '')
if compliance_solution and not kwargs["resolution"]:
kwargs["resolution"] = compliance_solution
policy_item = f'{compliance_check_name} - {compliance_result}'
for policy_check_data in bis_benchmark_data:
if 'ref.' in policy_check_data:
kwargs["ref"].append(policy_check_data)
if 'compliance-see-also' in compliance_data:
kwargs["ref"].append(compliance_data.get('compliance-see-also'))
# We used this info from tenable: https://community.tenable.com/s/article/Compliance-checks-in-SecurityCenter
kwargs["policyviolations"].append(policy_item)
kwargs["name"] = f'{kwargs["name"]}: {policy_item}'
return kwargs
def parseOutputString(self, output):
"""
This method will discard the output the shell sends, it will read it from
the xml where it expects it to be present.
NOTE: if 'debug' is true then it is being run from a test case and the
output being sent is valid.
"""
try:
parser = NessusParser(output)
except Exception as e:
self.logger.error(str(e))
return None
report_hosts = parser.report.report_hosts
if report_hosts:
for host in report_hosts:
run_date = host.host_properties.host_end
if run_date:
run_date = parse(run_date)
website = host.host_properties.host_fqdn
host_id = self.createAndAddHost(**self.map_properties(host))
for item in host.report_items:
vulnerability_name = item.plugin_name
if not vulnerability_name:
continue
item_name = item.svc_name_attr
_main_data = self.map_item(
host_id, run_date, vulnerability_name, item)
_main_data = self.map_add_ref(_main_data, item)
if item_name == 'general':
_main_data = self.map_policy_general(_main_data, item)
self.createAndAddVulnToHost(**_main_data)
else:
_main_data["service_id"] = self.createAndAddServiceToHost(
host_id, name=item_name, protocol=item.protocol_attr,
ports=item.port_attr)
if item_name == 'www' or item_name == 'http':
_main_data.update({"website": website})
self.createAndAddVulnWebToService(**_main_data)
else:
self.createAndAddVulnToService(**_main_data)
@staticmethod
def map_add_ref(kwargs, item: ReportItem):
kwargs["cvss2"] = {}
kwargs["cvss3"] = {}
if item.see_also:
kwargs["ref"].append(item.see_also)
if item.cpe:
kwargs["ref"].append(item.cpe)
if item.cve:
kwargs["cve"] = item.cve
if item.cwe:
kwargs["cwe"] = item.cwe
if item.cvss3_vector:
kwargs["cvss3"]["vector_string"] = item.cvss3_vector
if item.cvss_vector:
kwargs["cvss2"]["vector_string"] = item.cvss_vector
return kwargs
def createPlugin(*args, **kwargs):
return NessusPlugin(*args, **kwargs)