Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / openvas / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
import re
from collections import defaultdict
from copy import copy
"""
Faraday Penetration Test IDE
Copyright (C) 2013  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information

"""

from faraday_plugins.plugins.plugin import PluginXMLFormat
from faraday_plugins.plugins.plugins_utils import filter_services

import xml.etree.ElementTree as ET

__author__ = "Francisco Amato"
__copyright__ = "Copyright (c) 2013, Infobyte LLC"
__credits__ = ["Francisco Amato"]
__license__ = ""
__version__ = "1.0.0"
__maintainer__ = "Francisco Amato"
__email__ = "[email protected]"
__status__ = "Development"


class OpenvasXmlParser:
    """
    The objective of this class is to parse an xml file generated by the openvas tool.

    TODO: Handle errors.
    TODO: Test openvas output version. Handle what happens if the parser doesn't support it.
    TODO: Test cases.

    @param openvas_xml_filepath A proper xml generated by openvas
    """

    def __init__(self, xml_output, logger):
        self.target = None
        self.port = "80"
        self.host = None
        self.logger = logger
        tree = self.parse_xml(xml_output)
        if tree:
            self.hosts = self.get_hosts(tree)
            self.items = list(self.get_items(tree, self.hosts))
        else:
            self.items = []

    def parse_xml(self, xml_output):
        """
        Open and parse an xml file.

        TODO: Write custom parser to just read the nodes that we need instead of
        reading the whole file.

        @return xml_tree An xml tree instance. None if error.
        """
        try:
            tree = ET.fromstring(xml_output)
        except SyntaxError as err:
            self.logger.error("SyntaxError: %s. %s", err, xml_output)
            return None
        return tree

    def get_items(self, tree, hosts):
        """
        @return items A list of Host instances
        """
        try:
            report = tree.find('report')
            if report:
                results = report.findall('results')
                if results:
                    nodes = report.findall('results')[0]
                else:
                    nodes = tree.findall('result')
            else:
                nodes = tree.findall('result')

            for node in nodes:
                try:
                    yield Item(node, hosts)
                except Exception as e:
                    self.logger.error(f"Error generating Iteem from {node.attrib} [{e}]")

        except Exception as e:
            self.logger.error(f"Tag not found: {e}")

    def get_hosts(self, tree):
        # Hosts are located in: /report/report/host
        # hosts_dict will contain has keys its details and its hostnames
        hosts = tree.findall('report/host')
        hosts_dict = {}
        for host in hosts:
            ip = self.do_clean(host.find('ip').text)
            details = self.get_data_from_detail(host.findall('detail'))
            hosts_dict[ip] = details
        return hosts_dict

    def get_data_from_detail(self, details):
        data = {}
        details_data = defaultdict(list)
        hostnames = []
        for item in details:
            name = self.do_clean(item.find('name').text)
            value = self.do_clean(item.find('value').text)
            if "EXIT" in name:
                continue
            if name == 'hostname':
                hostnames.append(value)
            else:
                details_data[name].append(value)
        data['details'] = details_data
        data['hostnames'] = hostnames
        return data

    def do_clean(self, value):
        myreturn = ""
        if value is not None:
            myreturn = re.sub(r"\s+", " ", value)
        return myreturn.strip()


def get_attrib_from_subnode(xml_node, subnode_xpath_expr, attrib_name):
    """
    Finds a subnode in the item node and the retrieves a value from it

    @return An attribute value
    """

    node = xml_node.find(subnode_xpath_expr)

    if node is not None:
        return node.get(attrib_name)

    return None


class Item:
    """
    An abstract representation of a Item
    @param item_node A item_node taken from an openvas xml tree
    """

    def __init__(self, item_node, hosts):
        self.node = item_node
        self.host = self.get_text_from_subnode('host')
        self.threat = self.get_text_from_subnode('threat')
        self.subnet = self.get_text_from_subnode('subnet')
        if self.subnet == '':
            self.subnet = self.host
        self.port = None
        self.severity = self.severity_mapper()
        self.severity_nr = self.get_text_from_subnode("severity")
        self.service = "Unknown"
        self.protocol = ""
        details = self.node.findall("detection/result/details/detail")
        self.cpe = details[0].findtext("value") if details else None
        port_string = self.get_text_from_subnode('port')
        info = port_string.split("/")
        if len(info) < 2:
            info = details[1].findtext("value").split("/") if details else ["", ""]
        self.protocol = "".join(filter(lambda x: x.isalpha() or x in ("-", "_"), info[1]))
        self.port = "".join(filter(lambda x: x.isdigit(), info[0])) or None
        if not self.port:
            self.service = info[0]
        else:
            if hosts:
                host_details = hosts[self.host].get('details')
                self.service = self.get_service(port_string, self.port, host_details)
            else:
                self.service = "Not Service"
        self.nvt = self.node.findall('nvt')[0]
        self.node = self.nvt
        self.id = self.node.get('oid')
        self.cvss_base = self.get_text_from_subnode("cvss_base")
        self.name = self.get_text_from_subnode('name')
        self.cve = self.get_text_from_subnode('cve') if self.get_text_from_subnode('cve') != "NOCVE" else ""
        self.bid = self.get_text_from_subnode('bid') if self.get_text_from_subnode('bid') != "NOBID" else ""
        self.xref = self.get_text_from_subnode('xref') if self.get_text_from_subnode('xref') != "NOXREF" else ""
        self.cwe = []
        if "URL:https://cwe.mitre.org/data/definitions/" in self.xref:
            self.cwe.append("CWE-"+self.xref.split("URL:https://cwe.mitre.org/data/definitions/")[1]
                            .replace("html", ""))
        self.description = ''
        self.resolution = ''
        self.cvss_vector = ''
        self.tags = self.get_text_from_subnode('tags')
        self.data = self.get_text_from_subnode('description')
        self.data += f'\n\nid {item_node.attrib.get("id")}'
        if self.tags:
            tags_data = self.get_data_from_tags(self.tags)
            self.description = tags_data['description']
            self.resolution = tags_data['solution']
            self.cvss_vector = tags_data['cvss_base_vector']
            if tags_data['impact']:
                self.data += f'\n\nImpact: {tags_data["impact"]}'

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None and sub_node.text is not None:
            return sub_node.text.strip()
        return ''

    def severity_mapper(self):
        severity = self.get_text_from_subnode('threat')
        if severity == 'Alarm':
            severity = 'Critical'
        return severity

    def get_service(self, port_string, port, details_from_host):
        # details_from_host:
        # name: name of detail
        # value: list with the values associated with the name
        details_from_host_copy = copy(details_from_host)
        services = details_from_host_copy.pop("Services", None)
        if services:
            service_detail = self.get_service_from_details("Services", services, port)
            if service_detail:
                return service_detail
        for name, value in details_from_host_copy.items():
            service_detail = self.get_service_from_details(name, value, port)
            if service_detail:
                return service_detail
        # if the service is not in details_from_host, we will search it in
        # the file port_mapper.txt
        services_mapper = filter_services()
        for service in services_mapper:
            if service[0] == port_string:
                return service[1]
        return "Unknown"

    def do_clean(self, value):
        myreturn = ""
        if value is not None:
            myreturn = re.sub(r"\s+", " ", value)

        return myreturn.strip()

    def get_service_from_details(self, name, value_list, port):
        # detail:
        # name: name of detail
        # value_list: list with the values associated with the name
        res = None
        priority = 0
        if name == 'Services':
            for value in value_list:
                value_splited = value.split(',')
                if value_splited[0] == port:
                    res = value_splited[2]
                    break
        else:
            for value in value_list:
                if '/' in value:
                    auxiliar_value = value.split('/')[0]
                    if auxiliar_value == port:
                        res = name
                        priority = 2

                elif value.isdigit() and priority == 0:
                    if value == port:
                        res = name
                        priority = 1

                elif '::' in value and priority == 0:
                    aux_value = value.split('::')[0]
                    if aux_value == port:
                        res = name
        return res

    def get_data_from_tags(self, tags_text):
        clean_text = self.do_clean(tags_text)
        tags = clean_text.split('|')
        summary = ''
        insight = ''
        data = {
            'solution': '',
            'cvss_base_vector': '',
            'description': '',
            'impact': ''
        }
        for tag in tags:
            splited_tag = tag.split('=', 1)
            if splited_tag[0] in data.keys():
                data[splited_tag[0]] = splited_tag[1]
            elif splited_tag[0] == 'summary':
                summary = splited_tag[1]
            elif splited_tag[0] == 'insight':
                insight = splited_tag[1]

        data['description'] = ' '.join([summary, insight]).strip()

        return data


class OpenvasPlugin(PluginXMLFormat):
    """
    Example plugin to parse openvas output.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.identifier_tag = ["report", "get_results_response"]
        self.id = "Openvas"
        self.name = "Openvas XML Output Plugin"
        self.plugin_version = "0.3"
        self.version = "9.0.3"
        self.framework_version = "1.0.0"
        self.options = None

    def report_belongs_to(self, **kwargs):
        if super().report_belongs_to(**kwargs):
            report_path = kwargs.get("report_path", "")
            with open(report_path) as f:
                output = f.read()
            return re.search("OpenVAS", output) is not None \
                   or re.search('<omp>', output) is not None \
                   or re.search('<owner>', output) is not None
        return False

    def parseOutputString(self, output):
        """
        This method will discard the output the shell sends, it will read it
        from the xml where it expects it to be present.
        """
        parser = OpenvasXmlParser(output, self.logger)
        ids = {}
        # The following threats values will not be taken as vulns
        self.ignored_severities = ['Log', 'Debug']
        for ip, values in parser.hosts.items():
            # values contains: ip details and ip hostnames
            os_report = values['details'].get('best_os_txt')
            h_id = self.createAndAddHost(
                ip,
                os_report[0] if os_report else '',
                hostnames=values['hostnames']
            )
            ids[ip] = h_id
        for item in parser.items:

            if item.name is not None:
                ref = []
                cve = []
                cvss2 = {}
                if item.cve:
                    cves = item.cve.split(',')
                    for i in cves:
                        cve.append(i.strip())
                if item.bid:
                    bids = item.bid.split(',')
                    for bid in bids:
                        ref.append(f"BID-{bid.strip()}")
                if item.xref:
                    ref.append(item.xref)
                if item.tags and item.cvss_vector:
                    cvss2["vector_string"] = item.cvss_vector
                if item.cpe:
                    ref.append(f"{item.cpe}")
                if item.severity_nr:
                    if float(item.severity_nr) >= 9.0:
                        item.threat = "Critical"
                        item.severity = "Critical"
                    ref.append(f"SEVERITY NUMBER: {item.severity_nr}")
                if item.threat:
                    ref.append(f"THREAT: {item.threat}")

                if item.subnet in ids:
                    h_id = ids[item.host]
                else:
                    h_id = self.createAndAddHost(
                        item.subnet,
                        hostnames=[item.host])
                    ids[item.subnet] = h_id

                if not item.port:
                    if item.severity not in self.ignored_severities:
                        v_id = self.createAndAddVulnToHost(
                            h_id,
                            item.name,
                            desc=item.description,
                            severity=item.severity,
                            resolution=item.resolution,
                            ref=ref,
                            external_id=f"OPENVAS-{item.id}",
                            data=item.data,
                            cve=cve,
                            cwe=item.cwe,
                            cvss2=cvss2
                        )
                else:
                    if item.service:
                        web = re.search(
                            r'^(www|http)',
                            item.service)
                    else:
                        web = item.port in ('80', '443', '8080')

                    if item.subnet + "_" + item.port in ids:
                        s_id = ids[item.subnet + "_" + item.port]
                    else:
                        s_id = self.createAndAddServiceToHost(
                            h_id,
                            item.service,
                            item.protocol,
                            ports=[str(item.port)]
                        )
                        ids[item.subnet + "_" + item.port] = s_id
                    if web:
                        if item.severity not in self.ignored_severities:
                            v_id = self.createAndAddVulnWebToService(
                                h_id,
                                s_id,
                                item.name,
                                desc=item.description,
                                website=item.host,
                                severity=item.severity,
                                ref=ref,
                                resolution=item.resolution,
                                external_id=f"OPENVAS-{item.id}",
                                data=item.data,
                                cve=cve,
                                cwe=item.cwe,
                                cvss2=cvss2
                            )
                    elif item.severity not in self.ignored_severities:
                        self.createAndAddVulnToService(
                            h_id,
                            s_id,
                            item.name,
                            desc=item.description,
                            severity=item.severity,
                            ref=ref,
                            resolution=item.resolution,
                            external_id=f"OPENVAS-{item.id}",
                            data=item.data,
                            cve=cve,
                            cwe=item.cwe,
                            cvss2=cvss2
                        )
        del parser

    @staticmethod
    def _isIPV4(ip):
        if len(ip.split(".")) == 4:
            return True
        else:
            return False


def createPlugin(*args, **kwargs):
    return OpenvasPlugin(*args, **kwargs)