Codebase list python-faraday / 3e07e228-43f0-4a7b-89e9-9db103c60f3f/main faraday / server / utils / filters.py
3e07e228-43f0-4a7b-89e9-9db103c60f3f/main

Tree @3e07e228-43f0-4a7b-89e9-9db103c60f3f/main (Download .tar.gz)

filters.py @3e07e228-43f0-4a7b-89e9-9db103c60f3f/mainraw · history · blame

"""
Faraday Penetration Test IDE
Copyright (C) 2020  Infobyte LLC (http://www.infobytesec.com/)
See the file 'doc/LICENSE' for the license information

"""
import re
import typing
import numbers
import datetime
from distutils.util import strtobool

from dateutil.parser import parse
from sqlalchemy import inspect
from collections.abc import Iterable
from dateutil.parser._parser import ParserError
from marshmallow import Schema, fields, ValidationError, types, validate
from marshmallow_sqlalchemy.convert import ModelConverter

from faraday.server.models import VulnerabilityWeb, Host, Service
from faraday.server.utils.search import OPERATORS
from faraday.server.fields import JSONType

WHITE_LIST = [
    'tags__name',
    'service__name',
    'type',
    'policy_violations__name',
    'host__os',
    'references__name',
    'evidence__filename',
    'service__port',
    'hostnames',
    'creator'
]


COUNT_FIELDS = [
    'host__vulnerability_critical_generic_count',
    'host__vulnerability_high_generic_count',
    'host__vulnerability_medium_generic_count',
    'host__vulnerability_low_generic_count',
    'host__vulnerability_info_generic_count',
]

VULNERABILITY_FIELDS = [str(algo).split('.')[1] for algo in inspect(VulnerabilityWeb).attrs] + WHITE_LIST + COUNT_FIELDS


VALID_OPERATORS = set(OPERATORS.keys()) - set(['desc', 'asc'])


class FlaskRestlessFilterSchema(Schema):
    name = fields.String(validate=validate.OneOf(VULNERABILITY_FIELDS), required=True)
    val = fields.Raw(required=True)
    op = fields.String(validate=validate.OneOf(list(OPERATORS.keys())), required=True)
    valid_relationship = {
        'host': Host,
        'service': Service
    }

    def load(
        self,
        data: typing.Union[
            typing.Mapping[str, typing.Any],
            typing.Iterable[typing.Mapping[str, typing.Any]],
        ],
        *,
        many: bool = None,
        partial: typing.Union[bool, types.StrSequenceOrSet] = None,
        unknown: str = None
    ):
        data = super().load(data, many=many, partial=partial, unknown=unknown)
        if not isinstance(data, list):
            res = self._validate_filter_types(data)
        else:
            res = []
            for filter_ in data:
                res += self._validate_filter_types(filter_)
        return res

    def _validate_filter_types(self, filter_):
        if isinstance(filter_['val'], str) and '\x00' in filter_['val']:
            raise ValidationError('Value can\'t containt null chars')
        converter = ModelConverter()
        column_name = filter_['name']
        if '__' in column_name:
            # relation attribute search, example service__port:80
            model_name, column_name = column_name.split('__')
            model = self.valid_relationship.get(model_name, None)
            if not model:
                raise ValidationError('Invalid Relationship')
            column = getattr(model, column_name)
        else:
            column = getattr(VulnerabilityWeb, column_name)

        if not getattr(column, 'type', None) and filter_['op'].lower():
            if filter_['op'].lower() in ['eq', '==']:
                if filter_['name'] in ['creator', 'hostnames']:
                    # make sure that creator and hostname are compared against a string
                    if not isinstance(filter_['val'], str):
                        raise ValidationError('Relationship attribute to compare to must be a string')
                    return [filter_]
            else:
                raise ValidationError('Field does not support in operator')

        if filter_['op'].lower() in ['in', 'not_in']:
            # in and not_in must be used with Iterable
            if not isinstance(filter_['val'], Iterable):
                filter_['val'] = [filter_['val']]

        field = converter.column2field(column)
        if filter_['op'].lower() in ['ilike', 'like']:
            # like muse be used with string
            if isinstance(filter_['val'], numbers.Number) or isinstance(field, fields.Number):
                raise ValidationError('Can\'t perfom ilike/like against numbers')
            if isinstance(column.type, JSONType):
                raise ValidationError('Can\'t perfom ilike/like against JSON Type column')
            if isinstance(field, fields.Boolean):
                raise ValidationError('Can\'t perfom ilike/like against boolean type column')

        # somes field are date/datetime.
        # we use dateutil parse to validate the string value which contains a date or datetime
        valid_date = False
        try:
            valid_date = isinstance(parse(filter_['val']), datetime.datetime)
        except (ParserError, TypeError):
            valid_date = False

        if valid_date and isinstance(field, fields.DateTime):
            if re.match(r'^\d{4}-\d{1,2}-\d{1,2}$', filter_['val']):
                # If que have a valid date (not datetime)
                # then we must search by range to avoid matching with datetime
                start = parse(filter_['val'])
                end = (start + datetime.timedelta(hours=23, minutes=59, seconds=59)).strftime('%Y-%m-%dT%H:%M:%S.%f%z')
                start = start.strftime('%Y-%m-%dT%H:%M:%S.%f%z')
                # here we transform the original filter and we add a range
                # we could try to change search.py generated query, however changing the query will use
                # postgresql syntax only (type cast)
                return [
                        {'name': filter_['name'], 'op': '>=', 'val': start},
                        {'name': filter_['name'], 'op': '<=', 'val': end},
                ]

        if filter_['op'].lower() in ['<', '>', 'ge', 'geq', 'lt']:
            # we check that operators can be only used against date or numbers
            if not valid_date and not isinstance(filter_['val'], numbers.Number):
                raise ValidationError('Operators <,> can be used only with numbers or dates')

            if not isinstance(field, (fields.Date, fields.DateTime, fields.Number)):
                raise ValidationError('Using comparison operator against a field that does not supports it')

        # if the field is boolean, the value must be valid otherwise postgresql will raise an error
        if isinstance(field, fields.Boolean) and not isinstance(filter_['val'], bool):
            try:
                strtobool(filter_['val'])
            except (AttributeError, ValueError):
                raise ValidationError('Can\'t compare Boolean field against a non boolean value. Please use True or False')

        if isinstance(field, (fields.Date, fields.DateTime)) and valid_date:
            filter_['val'] = parse(filter_['val']).isoformat()  # bugfix: when user sends string like: 1/1/2020
            return [filter_]

        # we try to deserialize the value, any error means that the value was not valid for the field typ3
        # previous checks were added since postgresql is very strict with operators.
        try:
            field.deserialize(filter_['val'])
        except TypeError:
            raise ValidationError('Invalid value type')

        return [filter_]


class FlaskRestlessOperator(Schema):
    _or = fields.List(fields.Nested("self"), attribute='or', data_key='or')
    _and = fields.List(fields.Nested("self"), attribute='and', data_key='and')

    def load(
        self,
        data: typing.Union[
            typing.Mapping[str, typing.Any],
            typing.Iterable[typing.Mapping[str, typing.Any]],
        ],
        *,
        many: bool = None,
        partial: typing.Union[bool, types.StrSequenceOrSet] = None,
        unknown: str = None
    ):
        if not isinstance(data, list):
            data = [data]

        res = []
        # the next iteration is required for allow polymorphism in the list of filters
        for search_filter in data:
            # we try to validate against filter schema since the list could contain
            # operatores mixed with filters in the list
            try:
                res += FlaskRestlessFilterSchema(many=False).load(search_filter)
            except ValidationError:
                res.append(self._do_load(
                    search_filter, many=False, partial=partial, unknown=unknown, postprocess=True
                ))

        return res


class FlaskRestlessGroupFieldSchema(Schema):
    field = fields.String(validate=validate.OneOf(VULNERABILITY_FIELDS), required=True)


class FlaskRestlessOrderFieldSchema(Schema):
    field = fields.String(validate=validate.OneOf(VULNERABILITY_FIELDS), required=True)
    direction = fields.String(validate=validate.OneOf(["asc", "desc"]), required=False)


class FilterSchema(Schema):
    filters = fields.List(fields.Nested("FlaskRestlessSchema"))
    order_by = fields.List(fields.Nested(FlaskRestlessOrderFieldSchema))
    group_by = fields.List(fields.Nested(FlaskRestlessGroupFieldSchema))
    limit = fields.Integer()
    offset = fields.Integer()


class FlaskRestlessSchema(Schema):
    valid_schemas = [
        FilterSchema,
        FlaskRestlessFilterSchema,
        FlaskRestlessOperator,
    ]

    def load(
        self,
        data: typing.Union[
            typing.Mapping[str, typing.Any],
            typing.Iterable[typing.Mapping[str, typing.Any]],
        ],
        *,
        many: bool = None,
        partial: typing.Union[bool, types.StrSequenceOrSet] = None,
        unknown: str = None
    ):
        many = False
        if isinstance(data, list):
            many = True
        for schema in self.valid_schemas:
            try:
                return schema(many=many).load(data)
            except ValidationError:
                continue
        raise ValidationError('No valid schema found. data {}'.format(data))