Codebase list python-faraday / 62d1b14 faraday / server / commands / import_vulnerability_template.py
62d1b14

Tree @62d1b14 (Download .tar.gz)

import_vulnerability_template.py @62d1b14raw · history · blame

import csv
import tempfile

import requests
from colorama import init
from colorama import Fore, Style
import logging

from sqlalchemy.exc import IntegrityError

from faraday.server.web import get_app
from faraday.server.models import (
    db,
    VulnerabilityTemplate,
)

CWE_URL = "https://raw.githubusercontent.com/infobyte/faraday_templates/master/vulnerability_templates"

CWE_LANGS = ['en', 'es']

logger = logging.getLogger(__name__)

init()


def import_vulnerability_templates(language):
    imported_rows = 0
    duplicated_rows = 0
    with get_app().app_context():
        try:
            res = requests.get(f'{CWE_URL}/cwe_{language}.csv')
        except Exception as e:
            print(f'[{Fore.RED}-{Style.RESET_ALL}] An error has occurred downloading the file.\n{e}')
            return None

        if res.status_code != 200:
            print(f'[{Fore.RED}-{Style.RESET_ALL}] An error has occurred downloading the file.'
                  f' Response was {res.status_code}')
            return None

        cwe_file = tempfile.TemporaryFile(mode="w+t")
        cwe_file.write(res.content.decode('utf8'))
        cwe_file.seek(0)

        vulnerability_templates = csv.DictReader(cwe_file)
        for vulnerability_template in vulnerability_templates:
            vulnerability_template = dict(vulnerability_template)

            references = [ref.strip() for ref in vulnerability_template['references'].split(',')]
            try:
                v = VulnerabilityTemplate(name=vulnerability_template['name'],
                                          description=vulnerability_template['description'],
                                          severity=vulnerability_template['exploitation'],
                                          resolution=vulnerability_template['resolution'],
                                          references=references,
                                          shipped=True)
                db.session.add(v)
                db.session.flush()
                imported_rows += 1
            except IntegrityError:
                duplicated_rows += 1
                db.session.rollback()
        db.session.commit()

        if imported_rows > 0:
            print(f'[{Fore.GREEN}+{Style.RESET_ALL}] {imported_rows} new vulnerability templates were imported')
        else:
            print(f'[{Fore.YELLOW}+{Style.RESET_ALL}] {duplicated_rows} vulnerability templates were already imported')


def available_languages():
    print(f'[{Fore.GREEN}+{Style.RESET_ALL}] Available languages')
    for lang in CWE_LANGS:
        print(f'[{Fore.GREEN}*{Style.RESET_ALL}] {lang}')


def run(language='en', list_languages=False):

    if list_languages:
        available_languages()
        return None

    if language not in CWE_LANGS:
        print(f'[{Fore.RED}-{Style.RESET_ALL}] Language not available')
        return None

    import_vulnerability_templates(language)