Codebase list python-lsassy / c26b393 lsassy / impacketfile.py
c26b393

Tree @c26b393 (Download .tar.gz)

impacketfile.py @c26b393raw · history · blame

import logging
import time

from impacket.smb3structs import *


class ImpacketFile:
    """
    Remote file representation

    This class uses impacket method to create a file object with usual read methods so that it looks like a local
    file from another library point of view. Methods are
    - open
    - read
    - close
    - seek
    - tell
    """
    def __init__(self, session):
        self._session = session
        self._share_name = None
        self._fpath = None
        self._currentOffset = 0
        self._total_read = 0
        self._tid = None
        self._fid = None
        self._fileInfo = None
        self._endOfFile = None

        self._opened = False

        self._buffer_min_size = 1024 * 8
        self._buffer_data = {
            "offset": 0,
            "size": 0,
            "buffer": ""
        }

    def get_connection(self):
        """
        Method to access a private attribute
        :return: session instance
        """
        return self._session

    def _open_share(self):
        try:
            self._tid = self._session.smb_session.connectTree(self._share_name)
        except Exception as e:
            logging.warning("ConnectTree error with '{}'".format(self._share_name), exc_info=True)
            return None
        return self

    @staticmethod
    def create_file(session, share, path, file, content):
        path = path.replace("\\", "/")
        try:
            share, fpath = share, path + "/" + file
        except Exception as e:
            logging.warning("Parsing error with '{}'".format(path), exc_info=True)
            return None
        try:
            tid = session.smb_session.connectTree(share)
        except Exception as e:
            logging.warning("ConnectTree error with '{}'".format(share), exc_info=True)
            return None

        fid = None

        try:
            fid = session.smb_session._SMBConnection.create(tid, fpath, FILE_WRITE_DATA, FILE_SHARE_WRITE, FILE_NON_DIRECTORY_FILE, FILE_OVERWRITE_IF, 0)
            finished = False
            MAX_FILE_WRITE = session.smb_session._SMBConnection._Connection['MaxWriteSize']
            rnd = 0
            while not finished:
                data = content[rnd*MAX_FILE_WRITE:(rnd+1)*MAX_FILE_WRITE]
                if len(data) == 0:
                    break
                session.smb_session._SMBConnection.write(tid, fid, data, rnd*MAX_FILE_WRITE, len(data))
                rnd += 1
        finally:
            if fid is not None:
                logging.debug("File {}{} created!".format(share, fpath))
                session.smb_session._SMBConnection.close(tid, fid)
                session.smb_session._SMBConnection.disconnectTree(tid)
                return True
        if tid is not None:
            session.smb_session._SMBConnection.disconnectTree(tid)
        return None

    @staticmethod
    def delete(session, file_path, timeout=5):
        t = time.time()
        while True:
            try:
                session.smb_session.deleteFile("C$", file_path)
                logging.debug("File {}{} deleted".format("C$", file_path))
                return True
            except BrokenPipeError:
                if time.time() - t > timeout:
                    logging.warning("File wasn't removed `{}{}`, connection lost".format("C$", file_path),
                                    exc_info=True)
                    return None
                logging.debug("Trying to reconnect ...")
                if session.login():
                    logging.success("Reconnected after unexpected disconnection for proper cleanup")
            except Exception as e:
                if "STATUS_OBJECT_NAME_NOT_FOUND" in str(e) or "STATUS_NO_SUCH_FILE" in str(e):
                    return True
                if time.time() - t > timeout:
                    logging.warning("File wasn't removed `{}{}`".format("C$", file_path), exc_info=True)
                    return None
                logging.debug("Unable to delete file `{}{}`. Retrying...".format("C$", file_path))
                time.sleep(0.5)

    def open(self, share, path, file, timeout=3):
        """
        Open remote file
        :param share: Share location of the remote file
        :param path: Path of the remote file on provided share
        :param file: Remote filename
        :param timeout: Timeout if file access hangs
        :return: instance of this class
        """
        path = path.replace("\\", "/")
        try:
            self._share_name, self._fpath = share, path + "/" + file
        except Exception as e:
            logging.warning("Parsing error with '{}'".format(path), exc_info=True)
            return None

        if self._open_share() is None:
            return None

        t = time.time()
        while True:
            try:
                self._fid = self._session.smb_session.openFile(self._tid, self._fpath)
                logging.info("{} handle acquired".format(self._fpath))
                break
            except Exception as e:
                if time.time() - t > timeout:
                    logging.warning("Unable to open remote file {}".format( self._fpath), exc_info=True)
                    return None
                logging.debug("Unable to open remote file {}. Retrying...".format(self._fpath))
                time.sleep(0.5)

        self._fileInfo = self._session.smb_session.queryInfo(self._tid, self._fid)
        self._endOfFile = self._fileInfo.fields["EndOfFile"]
        self._opened = True
        return self

    def read(self, size):
        """
        Read an amount of bytes on the remote file

        This method uses some caching mechanisms to increase reading speed
        :param size: Number of bytes to read
        :return: Buffer containing file's content
        """
        if size == 0:
            return b''

        if (self._buffer_data["offset"] <= self._currentOffset <= self._buffer_data["offset"] + self._buffer_data[
            "size"]
                and self._buffer_data["offset"] + self._buffer_data["size"] > self._currentOffset + size):
            value = self._buffer_data["buffer"][
                    self._currentOffset - self._buffer_data["offset"]:self._currentOffset - self._buffer_data[
                        "offset"] + size]
        else:
            self._buffer_data["offset"] = self._currentOffset

            """
            If data size is too small, read self._buffer_min_size bytes and cache them
            """
            if size < self._buffer_min_size:
                value = self._session.smb_session.readFile(self._tid, self._fid, self._currentOffset, self._buffer_min_size)
                self._buffer_data["size"] = self._buffer_min_size
                self._total_read += self._buffer_min_size

            else:
                value = self._session.smb_session.readFile(self._tid, self._fid, self._currentOffset, size + self._buffer_min_size)
                while len(value) < size+self._buffer_min_size and self._currentOffset + len(value) < self._endOfFile:
                    value += self._session.smb_session.readFile(self._tid, self._fid, self._currentOffset + len(value), size + self._buffer_min_size - len(value))
                self._buffer_data["size"] = size + self._buffer_min_size
                self._total_read += size

            self._buffer_data["buffer"] = value

        self._currentOffset += size

        return value[:size]

    def close(self):
        """
        Close handle to remote file
        """
        if self._opened:
            self._session.smb_session.closeFile(self._tid, self._fid)
            self._session.smb_session.disconnectTree(self._tid)
            self._opened = False

    def seek(self, offset, whence=0):
        """
        Seek a certain byte on the remote file
        :param offset: Offset on the remote file
        :param whence: 0 if absolute offset, 1 if relative offset, 2 if relative to the end of file
        """
        if whence == 0:
            self._currentOffset = offset
        elif whence == 1:
            self._currentOffset += offset
        elif whence == 2:
            self._currentOffset = self._endOfFile - offset
        else:
            raise Exception('Seek function whence value must be between 0-2')

    def tell(self):
        """
        Get current offset
        :return: Current offset
        """
        return self._currentOffset

    def size(self):
        """
        Get remote file size
        :return: Remote file size
        """
        return self._endOfFile

    def get_path(self):
        """
        Get remote file path
        :return: Remote file path (share, path)
        """
        return self._share_name, self._fpath

    def get_file_path(self):
        """
        Get relative file path
        :return:  Relative file path
        """
        return self._fpath

    def get_session(self):
        """
        Get current session
        :return: Current session
        """
        return self._session