Codebase list python-faraday / 62d1b14 tests / test_websocket_BroadcastServerProtocol.py
62d1b14

Tree @62d1b14 (Download .tar.gz)

test_websocket_BroadcastServerProtocol.py @62d1b14raw · history · blame

import pytest
from faraday.server.models import Agent, Executor
from faraday.server.websocket_factories import WorkspaceServerFactory, \
    update_executors, BroadcastServerProtocol

from tests.factories import AgentFactory, ExecutorFactory


class TransportMock:
    def write(self, data: bytearray):
        pass


@pytest.fixture
def proto():
    factory = WorkspaceServerFactory('ws://127.0.0.1')
    proto = factory.buildProtocol(('127.0.0.1', 0))
    proto.maskServerFrames = False
    proto.logFrames = False
    proto.send_queue = []
    proto.state = BroadcastServerProtocol.STATE_CLOSING
    proto.transport = TransportMock()

    return proto


class TestWebsocketBroadcastServerProtocol:

    def _join_agent(self, test_client, session):
        agent = AgentFactory.create(token='pepito')
        session.add(agent)
        session.commit()

        headers = {"Authorization": f"Agent {agent.token}"}
        token = test_client.post('/v3/agent_websocket_token', headers=headers).json['token']
        return token

    def test_join_agent_message_with_invalid_token_fails(self, session, proto, test_client):
        message = '{"action": "JOIN_AGENT", "token": "pepito" }'
        assert not proto.onMessage(message, False)

    def test_join_agent_message_without_token_fails(self, session, proto, test_client):
        message = '{"action": "JOIN_AGENT"}'
        assert not proto.onMessage(message, False)

    def test_join_agent_message_with_valid_token(self, session, proto, workspace, test_client):
        token = self._join_agent(test_client, session)
        message = f'{{"action": "JOIN_AGENT", "workspace": "{workspace.name}", "token": "{token}", "executors": [] }}'
        assert proto.onMessage(message, False)

    def test_leave_agent_happy_path(self, session, proto, workspace, test_client):
        token = self._join_agent(test_client, session)
        message = f'{{"action": "JOIN_AGENT", "workspace": "{workspace.name}", "token": "{token}", "executors": [] }}'
        assert proto.onMessage(message, False)

        message = '{"action": "LEAVE_AGENT" }'
        assert proto.onMessage(message, False)

    def test_agent_status(self, session, proto, workspace, test_client):
        token = self._join_agent(test_client, session)
        agent = Agent.query.one()
        assert not agent.is_online
        message = f'{{"action": "JOIN_AGENT", "workspace": "{workspace.name}", "token": "{token}", "executors": [] }}'
        assert proto.onMessage(message, False)
        assert agent.is_online

        message = '{"action": "LEAVE_AGENT"}'
        assert proto.onMessage(message, False)
        assert not agent.is_online


class TestCheckExecutors:

    def test_new_executors_not_in_database(self, session):
        agent = AgentFactory.create()
        executors = [
            {'executor_name': 'nmap_executor', 'args': {'param1': True}}
        ]

        assert update_executors(agent, executors)

    def test_executors_with_missing_args(self, session):
        agent = AgentFactory.create()
        executors = [
            {'executor_name': 'nmap_executor'}
        ]

        assert update_executors(agent, executors)

    def test_executors_with_missing_executor_name(self, session):
        agent = AgentFactory.create()
        executors = [
            {'invalid_key': 'nmap_executor'}
        ]

        assert update_executors(agent, executors)

    def test_executors_with_some_invalid_executors(self, session):
        agent = AgentFactory.create()
        executors = [
            {'invalid_key': 'nmap_executor'},
            {'executor_name': 'executor 1', 'args': {'param1': True}}
        ]

        assert update_executors(agent, executors)
        count_executors = Executor.query.filter_by(agent=agent).count()
        assert count_executors == 1

    def test_executor_already_in_database_but_new_parameters_incoming(self, session):
        agent = AgentFactory.create()
        old_params = {'old_param': True}
        executor = ExecutorFactory.create(agent=agent, parameters_metadata=old_params)
        session.add(executor)
        session.commit()
        new_params = old_params
        new_params.update({'param1': True})
        executors = [
            {'executor_name': executor.name, 'args': new_params}
        ]

        assert update_executors(agent, executors)
        from_db_executor = Executor.query.filter_by(id=executor.id, agent=agent).first()
        assert from_db_executor.parameters_metadata == new_params

    def test_new_executor_and_delete_the_old_one(self, session):
        agent = AgentFactory.create()
        params = {'old_param': True}
        executor = ExecutorFactory.create(
            name='old_executor',
            agent=agent,
            parameters_metadata=params
        )
        session.add(executor)
        session.commit()
        executors = [
            {'executor_name': 'new executor', 'args': {'param1': True}}
        ]

        assert update_executors(agent, executors)
        count_executors = Executor.query.filter_by(agent=agent).count()
        assert count_executors == 1
        current_executor = Executor.query.filter_by(agent=agent).first()
        assert current_executor.name == 'new executor'
        assert current_executor.parameters_metadata == {'param1': True}

    def test_remove_all_executors(self, session):
        agent = AgentFactory.create()
        params = {'old_param': True}
        executor = ExecutorFactory.create(
            name='old_executor',
            agent=agent,
            parameters_metadata=params
        )
        session.add(executor)
        session.commit()
        executors = [
        ]

        assert update_executors(agent, executors)
        count_executors = Executor.query.filter_by(agent=agent).count()
        assert count_executors == 0

    def test_remove_one_of_two_executors(self, session):
        agent = AgentFactory.create()
        executor = ExecutorFactory.create(
            name='executor 1',
            agent=agent,
            parameters_metadata={'param1': True}
        )
        session.add(executor)
        another_executor = ExecutorFactory.create(
            name='executor 2',
            agent=agent,
            parameters_metadata={'param2': True}
        )
        session.add(executor)
        session.add(another_executor)
        session.commit()
        executors = [
            {'executor_name': 'executor 2', 'args': {'param2': True}}
        ]

        assert update_executors(agent, executors)
        count_executors = Executor.query.filter_by(agent=agent).count()
        assert count_executors == 1
        from_db_executor = Executor.query.filter_by(id=another_executor.id, agent=agent).first()
        assert from_db_executor.name == 'executor 2'