Codebase list pysmb / 5fe4f8c python3 / nmb / nmb_structs.py
5fe4f8c

Tree @5fe4f8c (Download .tar.gz)

nmb_structs.py @5fe4f8craw · history · blame

import struct

class NMBError(Exception): pass

class NotConnectedError(NMBError):
    """
    Raisd when the underlying NMB connection has been disconnected or not connected yet
    """
    pass

class NMBSessionMessage:

    HEADER_STRUCT_FORMAT = '>BBH'
    HEADER_STRUCT_SIZE = struct.calcsize(HEADER_STRUCT_FORMAT)

    def __init__(self):
        self.reset()

    def reset(self):
        self.type = 0
        self.flags = 0
        self.data = ''

    def decode(self, data, offset):
        data_len = len(data)

        if data_len < offset + self.HEADER_STRUCT_SIZE:
            # Not enough data for decoding
            return 0

        self.reset()
        self.type, self.flags, length = struct.unpack(self.HEADER_STRUCT_FORMAT, data[offset:offset+self.HEADER_STRUCT_SIZE])

        if self.flags & 0x01:
            length |= 0x010000

        if data_len < offset + self.HEADER_STRUCT_SIZE + length:
            return 0

        self.data = data[offset+self.HEADER_STRUCT_SIZE:offset+self.HEADER_STRUCT_SIZE+length]
        return self.HEADER_STRUCT_SIZE + length

class DirectTCPSessionMessage(NMBSessionMessage):

    HEADER_STRUCT_FORMAT = '>I'
    HEADER_STRUCT_SIZE = struct.calcsize(HEADER_STRUCT_FORMAT)

    def decode(self, data, offset):
        data_len = len(data)

        if data_len < offset + self.HEADER_STRUCT_SIZE:
            # Not enough data for decoding
            return 0

        self.reset()
        length = struct.unpack(self.HEADER_STRUCT_FORMAT, data[offset:offset+self.HEADER_STRUCT_SIZE])[0]

        if length >> 24 != 0:
            raise NMBError("Invalid protocol header for Direct TCP session message")

        if data_len < offset + self.HEADER_STRUCT_SIZE + length:
            return 0

        self.data = data[offset+self.HEADER_STRUCT_SIZE:offset+self.HEADER_STRUCT_SIZE+length]
        return self.HEADER_STRUCT_SIZE + length