Codebase list python-lsassy / 27912ca0-a246-4944-87d0-9c645cbd0d43/main tests / tests.py
27912ca0-a246-4944-87d0-9c645cbd0d43/main

Tree @27912ca0-a246-4944-87d0-9c645cbd0d43/main (Download .tar.gz)

tests.py @27912ca0-a246-4944-87d0-9c645cbd0d43/mainraw · history · blame

#!/usr/bin/env python3
# Author:
#  Romain Bentz (pixis - @hackanddo)
# Website:
#  https://beta.hackndo.com


import unittest

from lsassy.utils.defines import *
from lsassy.modules.dumper import Dumper
from lsassy.modules.impacketconnection import ImpacketConnection
from lsassy.modules.impacketfile import ImpacketFile
from lsassy.modules.logger import Logger
from lsassy.modules.writer import Writer
from lsassy.core import Lsassy
from tests.tests_config import *


class test_impacketconnection(unittest.TestCase):
    def setUp(self):
        self.log = Logger(Logger.Options(verbosity=0, quiet=True))
        self.conn = None

    def tearDown(self):
        if isinstance(self.conn, ImpacketConnection):
            self.conn.clean()

    def test_login_dns_error(self):
        self.conn = ImpacketConnection(ImpacketConnection.Options("pixis.hackndo", domain, da_login, da_password))
        self.conn.set_logger(self.log)
        ret = self.conn.login()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_DNS_ERROR[1], ret.error_msg)

    @unittest.skipUnless(kerberos, "Skipping Kerberos (Set kerberos=True to incude Kerberos tests)")
    def test_login_kerberos_success(self):
        self.conn = ImpacketConnection(ImpacketConnection.Options(target, domain, da_login, da_password, '', '', kerberos, domain_controller))
        self.conn.set_logger(self.log)
        ret = self.conn.login()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_SUCCESS[1], ret.error_msg)

    def test_login_connection_error(self):
        self.conn = ImpacketConnection(ImpacketConnection.Options("255.255.255.255", domain, da_login, da_password))
        self.conn.set_logger(self.log)
        ret = self.conn.login()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_CONNECTION_ERROR[1], ret.error_msg)

    def test_login_login_error(self):
        self.conn = ImpacketConnection(ImpacketConnection.Options(target, domain, da_login, "wrong_password"))
        self.conn.set_logger(self.log)
        ret = self.conn.login()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_LOGIN_FAILURE[1], ret.error_msg)

    def test_login_login_success(self):
        self.conn = ImpacketConnection(ImpacketConnection.Options(target, domain, da_login, da_password))
        self.conn.set_logger(self.log)
        ret = self.conn.login()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_SUCCESS[1], ret.error_msg)

    def test_is_admin(self):
        self.conn = ImpacketConnection(ImpacketConnection.Options(target, domain, da_login, da_password))
        self.conn.set_logger(self.log)
        self.conn.login()
        ret = self.conn.isadmin()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_SUCCESS[1], ret.error_msg)

    @unittest.skipUnless(usr_login and usr_password, "No low privileged user credential provided")
    def test_is_admin_error(self):
        self.conn = ImpacketConnection(ImpacketConnection.Options(target, domain, usr_login, usr_password))
        self.conn.set_logger(self.log)
        self.conn.login()
        ret = self.conn.isadmin()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_ACCESS_DENIED[1], ret.error_msg)


class test_impacketfile(unittest.TestCase):
    def setUp(self):
        self.log = Logger(Logger.Options(verbosity=0, quiet=True))
        self.conn = ImpacketConnection(ImpacketConnection.Options(target, domain, da_login, da_password))
        self.conn.set_logger(self.log)
        self.conn.login()
        self.ifile = ImpacketFile(self.conn, self.log)

    def tearDown(self):
        self.ifile.clean()
        self.conn.clean()

    def test_path_error(self):
        ret = self.ifile.open("RANDOM")
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_PATH_FILE[1], ret.error_msg)

    def test_share_error(self):
        ret = self.ifile.open("RANDOM/path/file")
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_SHARE[1], ret.error_msg)

    def test_file_error(self):
        ret = self.ifile.open("C$/path/file")
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_FILE[1], ret.error_msg)

    def test_file_success(self):
        ret = self.ifile.open("C$/Windows/System32/calc.exe")
        ret.clean()
        self.assertIsInstance(ret, ImpacketFile)


class test_dumper(unittest.TestCase):
    def setUp(self):
        self.log = Logger(Logger.Options(verbosity=0, quiet=True))
        self.conn = ImpacketConnection(ImpacketConnection.Options(target, domain, da_login, da_password))
        self.conn.set_logger(self.log)
        self.conn.login()

    def tearDown(self):
        self.conn.clean()

    """
    DLL Method
    """
    def test_dll_dump_invalid_shell(self):
        ret = Dumper(self.conn).dll_dump(("wmi",), "unknown")
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_METHOD_NOT_SUPPORTED[1], ret.error_msg)

    def test_dll_execute_error(self):
        ret = Dumper(self.conn).dll_dump((), "cmd")
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_DLL_NO_EXECUTE[1], ret.error_msg)

    def test_dll_execute_success(self):
        ret = Dumper(self.conn).dll_dump(("task",), "cmd")
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_SUCCESS[1], ret.error_msg)

    """
    Procdump Method
    """
    def test_procdump_missing_parameter(self):
        ret = Dumper(self.conn).procdump_dump(("wmi",))
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_PROCDUMP_NOT_PROVIDED[1], ret.error_msg)

    def test_procdump_invalid_parameter(self):
        dump_option = Dumper.Options()
        dump_option.procdump_path = "/invalid/path"
        ret = Dumper(self.conn, dump_option).procdump_dump(())
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_PROCDUMP_NOT_FOUND[1], ret.error_msg)

    @unittest.skipUnless(procdump_path, "Procdump path wasn't provided")
    def test_procdump_upload_error(self):
        dump_option = Dumper.Options()
        dump_option.procdump_path = procdump_path
        dump_option.share = "INVALID_SHARE"
        ret = Dumper(self.conn, dump_option).procdump_dump(())
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_PROCDUMP_NOT_UPLOADED[1], ret.error_msg)

    @unittest.skipUnless(procdump_path, "Procdump path wasn't provided")
    def test_procdump_execute_error(self):
        dump_option = Dumper.Options()
        dump_option.procdump_path = procdump_path
        dump = Dumper(self.conn, dump_option)
        ret = dump.procdump_dump(())
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_PROCDUMP_NO_EXECUTE[1], ret.error_msg)
        dump.clean()

    """
    Dumpert Method
    """
    def test_dumpert_missing_parameter(self):
        ret = Dumper(self.conn).dumpert_dump(("wmi",))
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_DUMPERT_NOT_PROVIDED[1], ret.error_msg)

    def test_dumpert_invalid_parameter(self):
        dump_option = Dumper.Options()
        dump_option.dumpert_path = "/invalid/path"
        ret = Dumper(self.conn, dump_option).dumpert_dump(())
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_DUMPERT_NOT_FOUND[1], ret.error_msg)

    @unittest.skipUnless(dumpert_path, "Dumper path wasn't provided")
    def test_dumpert_upload_error(self):
        dump_option = Dumper.Options()
        dump_option.dumpert_path = dumpert_path
        dump_option.share = "INVALID_SHARE"
        ret = Dumper(self.conn, dump_option).dumpert_dump(())
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_DUMPERT_NOT_UPLOADED[1], ret.error_msg)

    @unittest.skipUnless(dumpert_path, "Dumper path wasn't provided")
    def test_dumpert_execute_error(self):
        dump_option = Dumper.Options()
        dump_option.dumpert_path = dumpert_path
        dumper = Dumper(self.conn, dump_option)
        ret = dumper.dumpert_dump(())
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_DUMPERT_NO_EXECUTE[1], ret.error_msg)
        dumper.clean()

    """
    Dump generic
    """
    def test_dump_method_unknown(self):
        dump_option = Dumper.Options()
        dump_option.method = 99
        ret = Dumper(self.conn, dump_option).dump()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_METHOD_NOT_SUPPORTED[1], ret.error_msg)

    def test_dump_success(self):
        dumper = Dumper(self.conn)
        ret = dumper.dump()
        dumper.clean()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_SUCCESS[1], ret.error_msg)




@unittest.skipUnless(procdump_path, "Procdump path wasn't provided")
@unittest.skipUnless(protected_target, "No IP address with protected LSASS was provided")
class test_dumper_protected(unittest.TestCase):
    def setUp(self):
        self.log = Logger(Logger.Options(verbosity=0, quiet=True))
        self.conn = ImpacketConnection(ImpacketConnection.Options(protected_target, domain, da_login, da_password))
        self.conn.set_logger(self.log)
        self.conn.login()

    def tearDown(self):
        self.conn.clean()

    def test_dump_protected(self):
        dump_option = Dumper.Options()
        dump_option.method = 2
        dump_option.procdump_path = procdump_path
        dumper = Dumper(self.conn, dump_option)
        ret = dumper.dump()
        self.assertIsInstance(ret, RetCode)
        self.assertEqual(ERROR_LSASS_PROTECTED[1], ret.error_msg)
        dumper.clean()

class test_lsassy(unittest.TestCase):
    def setUp(self):
        log_options = Logger.Options(verbosity=0, quiet=True)
        write_options = Writer.Options(format="none")
        self.lsassy = Lsassy(target, da_login, domain, da_password, log_options=log_options, write_options=write_options)

    def tearDown(self):
        self.lsassy.clean()

    def test_lsassy_success(self):
        ret = self.lsassy.run()
        self.assertEqual(0, ret)


if __name__ == '__main__':
    unittest.main()