Codebase list faraday-plugins / c19a85b faraday_plugins / plugins / repo / qualysguard / plugin.py
c19a85b

Tree @c19a85b (Download .tar.gz)

plugin.py @c19a85braw · history · blame

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
"""
Faraday Penetration Test IDE
Copyright (C) 2013  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information
"""
import re
import xml.etree.ElementTree as ET

from faraday_plugins.plugins.plugin import PluginXMLFormat

__author__ = 'Francisco Amato'
__copyright__ = 'Copyright (c) 2013, Infobyte LLC'
__credits__ = ['Francisco Amato']
__license__ = ''
__version__ = '1.0.0'
__maintainer__ = 'Francisco Amato'
__email__ = '[email protected]'
__status__ = 'Development'


def cleaner_results(string):
    try:
        result = string.replace('<P>', '').replace('<UL>', ''). \
            replace('<LI>', '').replace('<BR>', ''). \
            replace('<A HREF="', '').replace('</A>', ' '). \
            replace('" TARGET="_blank">', ' ').replace('&quot;', '"')
        return result
    except:
        return ''


class QualysguardXmlParser():
    """
    The objective of this class is to parse an xml file generated by
    the qualysguard tool.

    TODO: Handle errors.
    TODO: Test qualysguard output version. Handle what happens if the parser
    doesn't support it.
    TODO: Test cases.

    @param qualysguard_xml_filepath A proper xml generated by qualysguard
    """

    def __init__(self, xml_output):
        tree, type_report = self.parse_xml(xml_output)

        if not tree or type_report is None:
            self.items = []
            return

        if type_report == 'ASSET_DATA_REPORT':
            self.items = [data for data in self.get_items_asset_report(tree)]
        elif type_report == 'SCAN':
            self.items = [data for data in self.get_items_scan_report(tree)]

    def parse_xml(self, xml_output):
        """
        Open and parse an xml file.

        TODO: Write custom parser to just read the nodes that we need instead
        of reading the whole file.

        @return xml_tree An xml tree instance. None if error.
        """

        asset_data_report = '<!DOCTYPE ASSET_DATA_REPORT SYSTEM'
        scan_report = '<!DOCTYPE SCAN SYSTEM'

        try:
            tree = ET.fromstring(xml_output)

            if asset_data_report in xml_output:
                type_report = 'ASSET_DATA_REPORT'
            elif scan_report in xml_output:
                type_report = 'SCAN'
            else:
                type_report = None

        except SyntaxError as err:
            return None, None

        return tree, type_report

    def get_items_scan_report(self, tree):
        """
        @return items A list of Host instances
        """
        for node in tree.findall('IP'):
            yield ItemScanReport(node)

    def get_items_asset_report(self, tree):
        """
        @return items A list of Host instances
        """
        for node in tree.find('HOST_LIST').findall('HOST'):
            yield ItemAssetReport(node, tree)


class ItemAssetReport():
    """
    An abstract representation of a Item (HOST) for a Asset Report.
    @param item_node A item_node taken from an qualysguard xml tree
    """

    def __init__(self, item_node, tree):

        self.node = item_node
        self.ip = self.get_text_from_subnode('IP')
        self.hostname = self.get_text_from_subnode('DNS') or ''
        self.os = self.get_text_from_subnode('OPERATING_SYSTEM')
        self.vulns = self.getResults(tree)

    def getResults(self, tree):
        glossary = tree.find('GLOSSARY/VULN_DETAILS_LIST')
        for self.issue in self.node.find('VULN_INFO_LIST'):
            yield ResultsAssetReport(self.issue, glossary)

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text

        return None


class ResultsAssetReport():
    """
    A abstraction of Results for a Asset Report of Qualysguard.
    """

    def __init__(self, issue_node, glossary):

        # VULN_INFO ElementTree
        self.node = issue_node
        self.port = self.get_text_from_subnode(self.node, 'PORT')
        self.protocol = self.get_text_from_subnode(self.node, 'PROTOCOL')
        self.name = self.get_text_from_subnode(self.node, 'QID')
        self.external_id = self.name
        self.result = self.get_text_from_subnode(self.node, 'RESULT')

        self.severity_dict = {
            '1': 'info',
            '2': 'low',
            '3': 'med',
            '4': 'high',
            '5': 'critical'}

        # GLOSSARY TAG
        self.glossary = glossary
        self.severity = self.severity_dict.get(self.get_text_from_glossary('SEVERITY'), 'info')
        self.title = self.get_text_from_glossary('TITLE')
        self.cvss2 = {}
        self.pci = self.get_text_from_glossary('PCI_FLAG')
        self.solution = self.get_text_from_glossary('SOLUTION')
        self.impact = self.get_text_from_glossary('IMPACT')

        # Description
        self.desc = cleaner_results(self.get_text_from_glossary('THREAT'))
        if not self.desc:
            self.desc = ''
        if self.result:
            self.desc += '\n\nResult: ' + cleaner_results(self.result)
        if self.impact:
            self.desc += '\n\nImpact: ' + cleaner_results(self.impact)
        if self.result:
            self.desc += '\n\nSolution: ' + cleaner_results(self.solution)

        # References
        self.ref = []
        self.cve = []

        cve_id = self.get_text_from_glossary('CVE_ID_LIST/CVE_ID/ID')
        if cve_id:
            self.cve.append(cve_id)

        if self.pci:
            self.ref.append(f'PCI: {self.pci}')

    def get_text_from_glossary(self, tag):
        """
        Finds a subnode in the glossary and retrieves a value of this.
        Filter by QualysId.

        @return An attribute value
        """

        for vuln_detail in self.glossary:
            id_act = vuln_detail.get('id').strip('qid_')
            if id_act == self.name:
                text = vuln_detail.find(tag)
                if text is not None:
                    return text.text
                else:
                    return None

    def get_text_from_subnode(self, node, subnode_xpath_expr):
        """
        Finds a subnode in the node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text
        return None


class ItemScanReport():
    """
    An abstract representation of a Item for a 'SCAN' report of Qualysguard.

    @param item_node A item_node taken from an qualysguard xml tree
    """

    def __init__(self, item_node):
        self.node = item_node
        self.ip = item_node.get('value')
        self.os = self.get_text_from_subnode('OS')
        self.hostname = self.get_hostname(item_node)
        self.vulns = self.getResults(item_node)

    def getResults(self, tree):
        """
        :param tree:
        """
        for self.issues in tree.findall('VULNS/CAT'):
            for v in self.issues.findall('VULN'):
                yield ResultsScanReport(v, self.issues)
        for self.issues in tree.findall('INFOS/CAT'):
            for v in self.issues.findall('INFO'):
                yield ResultsScanReport(v, self.issues)
        for self.issues in tree.findall('SERVICES/CAT'):
            for v in self.issues.findall('SERVICE'):
                yield ResultsScanReport(v, self.issues)
        for self.issues in tree.findall('PRACTICES/CAT'):
            for v in self.issues.findall('PRACTICE'):
                yield ResultsScanReport(v, self.issues)

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text

        return None

    def get_hostname(self, node):
        hostname = node.get('name')

        if hostname == 'No registered hostname':
            return ""

        return hostname


class ResultsScanReport():
    """
    An abstraction of Result for Qualysguard 'SCAN' Report.
    """

    def __init__(self, issue_node, parent):
        self.node = issue_node
        self.port = parent.get('port')
        self.protocol = parent.get('protocol')
        self.name = self.node.get('number')
        self.external_id = self.node.get('number')
        self.title = self.get_text_from_subnode('TITLE')
        self.cvss2 = {}

        self.diagnosis = self.get_text_from_subnode('DIAGNOSIS')
        self.solution = self.get_text_from_subnode('SOLUTION')
        self.result = self.get_text_from_subnode('RESULT')
        self.consequence = self.get_text_from_subnode('CONSEQUENCE')

        self.severity_dict = {
            '1': 'info',
            '2': 'low',
            '3': 'med',
            '4': 'high',
            '5': 'critical'}

        self.severity = self.severity_dict.get(self.node.get('severity'), 'info')

        self.desc = cleaner_results(self.diagnosis)
        if self.result:
            self.desc += '\nResult: ' + cleaner_results(self.result)
        else:
            self.desc += ''

        if self.consequence:
            self.desc += '\nConsequence: ' + cleaner_results(self.consequence)
        else:
            self.desc += ''

        self.ref = []
        self.cve = []
        for r in issue_node.findall('CVE_ID_LIST/CVE_ID'):
            self.node = r
            self.cve.append(self.get_text_from_subnode('ID'))
        for r in issue_node.findall('BUGTRAQ_ID_LIST/BUGTRAQ_ID'):
            self.node = r
            self.ref.append('bid-' + self.get_text_from_subnode('ID'))

    def get_text_from_subnode(self, subnode_xpath_expr):
        """
        Finds a subnode in the host node and the retrieves a value from it.

        @return An attribute value
        """
        sub_node = self.node.find(subnode_xpath_expr)
        if sub_node is not None:
            return sub_node.text
        return None


class QualysguardPlugin(PluginXMLFormat):
    """
    Example plugin to parse qualysguard output.
    """

    def __init__(self, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        self.identifier_tag = ["ASSET_DATA_REPORT", "SCAN"]
        self.id = 'Qualysguard'
        self.name = 'Qualysguard XML Output Plugin'
        self.plugin_version = '0.0.2'
        self.version = 'Qualysguard 8.17.1.0.2'
        self.framework_version = '1.0.0'
        self.options = None
        self.open_options = {"mode": "r", "encoding": "utf-8"}

    def parseOutputString(self, output):

        parser = QualysguardXmlParser(output)

        for item in parser.items:
            h_id = self.createAndAddHost(
                item.ip,
                item.os,
                hostnames=[item.hostname])

            for v in item.vulns:
                if v.port is None:
                    self.createAndAddVulnToHost(
                        h_id,
                        v.title if v.title else v.name,
                        ref=v.ref,
                        severity=v.severity,
                        resolution=v.solution if v.solution else '',
                        desc=v.desc,
                        external_id=v.external_id,
                        cve=v.cve,
                        cvss2=v.cvss2
                    )

                else:
                    web = False

                    try:
                        port = v.port
                        name = v.name
                    except (UnicodeDecodeError, AttributeError):
                        port = v.port
                        name = v.name

                    s_id = self.createAndAddServiceToHost(
                        h_id,
                        v.port,
                        v.protocol,
                        ports=[port],
                        status='open')
                    if port in ['80', '443'] or re.search('ssl|http', name):
                        web = True
                    else:
                        web = False

                    if web:
                        self.createAndAddVulnWebToService(
                            h_id,
                            s_id,
                            v.title if v.title else v.name,
                            ref=v.ref,
                            website=item.ip,
                            severity=v.severity,
                            desc=v.desc,
                            resolution=v.solution if v.solution else '',
                            external_id=v.external_id,
                            cve=v.cve,
                            cvss2=v.cvss2
                        )

                    else:
                        self.createAndAddVulnToService(
                            h_id,
                            s_id,
                            v.title if v.title else v.name,
                            ref=v.ref,
                            severity=v.severity,
                            desc=v.desc,
                            resolution=v.solution if v.solution else '',
                            external_id=v.external_id,
                            cve=v.cve,
                            cvss2=v.cvss2
                        )

        del parser


def createPlugin(*args, **kwargs):
    return QualysguardPlugin(*args, **kwargs)