Codebase list python-faraday / 78dcaf8 faraday / server / api / modules / comments.py
78dcaf8

Tree @78dcaf8 (Download .tar.gz)

comments.py @78dcaf8raw · history · blame

# Faraday Penetration Test IDE
# Copyright (C) 2017  Infobyte LLC (http://www.infobytesec.com/)
# See the file 'doc/LICENSE' for the license information
from flask import abort, Blueprint
from marshmallow import fields, ValidationError
from marshmallow.validate import OneOf


from faraday.server.models import db, Host, Service, VulnerabilityGeneric
from faraday.server.api.base import (
    AutoSchema,
    ReadWriteWorkspacedView,
    InvalidUsage,
    CreateWorkspacedMixin,
    GenericWorkspacedView,
    BulkDeleteWorkspacedMixin
)
from faraday.server.models import Comment
comment_api = Blueprint('comment_api', __name__)


class CommentSchema(AutoSchema):
    _id = fields.Integer(dump_only=True, attribute='id')
    object_id = fields.Integer(attribute='object_id', required=True)
    object_type = fields.String(attribute='object_type',
                                validate=OneOf(['host', 'service', 'comment', 'vulnerability']),
                                required=True)
    text = fields.String(attribute='text', required=True)

    class Meta:
        model = Comment
        fields = (
            'id', 'text', 'object_type', 'object_id'
        )


class CommentCreateMixing(CreateWorkspacedMixin):

    def _perform_create(self, data, workspace_name):
        model = {
            'host': Host,
            'service': Service,
            'vulnerability': VulnerabilityGeneric,
            'comment': Comment
        }
        obj = db.session.query(model[data['object_type']]).get(
            data['object_id'])
        workspace = self._get_workspace(workspace_name)
        if not obj:
            raise InvalidUsage('Can\'t comment inexistent object')
        if obj.workspace != workspace:
            raise InvalidUsage('Can\'t comment object of another workspace')
        return super()._perform_create(data, workspace_name)


class CommentView(CommentCreateMixing, ReadWriteWorkspacedView, BulkDeleteWorkspacedMixin):
    route_base = 'comment'
    model_class = Comment
    schema_class = CommentSchema
    order_field = 'create_date'


class UniqueCommentView(GenericWorkspacedView,
                        CommentCreateMixing):
    """
        This view is used by the plugin engine to avoid duplicate comments
        when the same plugin and data was ran multiple times.
    """
    route_base = 'comment_unique'
    model_class = Comment
    schema_class = CommentSchema

    def _perform_create(self, data, workspace_name):
        comment = db.session.query(Comment).filter_by(
            text=data['text'],
            object_type=data['object_type'],
            object_id=data['object_id'],
            workspace=self._get_workspace(workspace_name)
        ).first()

        if comment is not None:
            abort(409, ValidationError(
                {
                    'message': 'Comment already exists',
                    'object': self.schema_class().dump(comment),
                }
            ))
        res = super()._perform_create(data, workspace_name)
        return res


CommentView.register(comment_api)
UniqueCommentView.register(comment_api)