Codebase list msldap / 07894604-882d-429b-8d8a-26cf2c96cb5c/main msldap / authentication / ntlm / messages / authenticate.py
07894604-882d-429b-8d8a-26cf2c96cb5c/main

Tree @07894604-882d-429b-8d8a-26cf2c96cb5c/main (Download .tar.gz)

authenticate.py @07894604-882d-429b-8d8a-26cf2c96cb5c/mainraw · history · blame

import io

from msldap.authentication.ntlm.structures.fields import Fields
from msldap.authentication.ntlm.structures.negotiate_flags import NegotiateFlags
from msldap.authentication.ntlm.structures.version import Version
from msldap.authentication.ntlm.structures.challenge_response import *

# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/033d32cc-88f9-4483-9bf2-b273055038ce
class NTLMAuthenticate:
	def __init__(self, _use_NTLMv2 = True):
		self.Signature = b'NTLMSSP\x00'
		self.MessageType = 3
		self.LmChallengeResponseFields = None
		self.NtChallengeResponseFields = None
		self.DomainNameFields = None
		self.UserNameFields = None
		self.WorkstationFields = None
		self.EncryptedRandomSessionKeyFields = None
		self.NegotiateFlags = None
		self.Version = None
		self.MIC = None
		self.Payload = None

		# high level
		self.LMChallenge = None
		self.NTChallenge = None
		self.DomainName = None
		self.UserName = None
		self.Workstation = None
		self.EncryptedRandomSession = None

		# this is a global variable that needs to be indicated
		self._use_NTLMv2 = _use_NTLMv2
		
	@staticmethod
	def construct(flags, domainname= None, workstationname= None, username= None, encrypted_session= None, lm_response= None, nt_response= None, version = None, mic = b'\x00'*16):
		auth = NTLMAuthenticate()
		auth.Payload = b''
		
		payload_pos = 8+4+8+8+8+8+8+8+4
		if flags & NegotiateFlags.NEGOTIATE_VERSION:
			if not version:
				raise Exception('NEGOTIATE_VERSION set but no Version supplied!')
				
			auth.Version = version
			
			payload_pos += 8
			
		if mic is not None:
			auth.MIC = mic
			payload_pos += 16
			
		if lm_response:
			data =  lm_response.to_bytes()
			auth.Payload += data
			auth.LmChallengeResponseFields  = Fields(len(data), payload_pos)
			payload_pos += len(data)
			auth.LMChallenge = lm_response
		else:
			auth.LmChallengeResponseFields  = Fields(0,0)
			
		if nt_response:
			data =  nt_response.to_bytes()
			auth.Payload += data
			auth.NtChallengeResponseFields  = Fields(len(data), payload_pos)
			payload_pos += len(data)
			auth.NTChallenge = nt_response
		else:
			auth.NtChallengeResponseFields  = Fields(0,0)
		
		
		if domainname:
			data =  domainname.encode('utf-16le')
			auth.Payload += data
			auth.DomainNameFields  = Fields(len(data), payload_pos)
			payload_pos += len(data)
			auth.DomainName = domainname
		else:
			auth.DomainNameFields  = Fields(0,0)
		
		if username:
			data =  username.encode('utf-16le')
			auth.Payload += data
			auth.UserNameFields  = Fields(len(data), payload_pos)
			payload_pos += len(data)
			auth.UserName = username
		else:
			auth.UserNameFields  = Fields(0,0)
		
		if workstationname:
			data =  workstationname.encode('utf-16le')
			auth.Payload += data
			auth.WorkstationFields  = Fields(len(data), payload_pos)
			payload_pos += len(data)
			auth.Workstation = workstationname
		else:
			auth.WorkstationFields  = Fields(0,0)
			
		if encrypted_session:
			data =  encrypted_session
			auth.Payload += data
			auth.EncryptedRandomSessionKeyFields  = Fields(len(data), payload_pos)
			payload_pos += len(data)
			auth.EncryptedRandomSession = encrypted_session
		else:
			auth.EncryptedRandomSessionKeyFields  = Fields(0,0)
				
		auth.NegotiateFlags = flags		
		return auth
		
	def to_bytes(self):
		t = b''
		t += self.Signature
		t += self.MessageType.to_bytes(4, byteorder = 'little', signed = False)
		
		t += self.LmChallengeResponseFields.to_bytes()
		t += self.NtChallengeResponseFields.to_bytes()
		t += self.DomainNameFields.to_bytes()
		t += self.UserNameFields.to_bytes()
		t += self.WorkstationFields.to_bytes()
		t += self.EncryptedRandomSessionKeyFields.to_bytes()
		t += self.NegotiateFlags.to_bytes(4, byteorder = 'little', signed = False)
		if self.Version:
			t += self.Version.to_bytes()
		if self.MIC is not None:	
			t += self.MIC
		t += self.Payload
		return t
		

	@staticmethod
	def from_bytes(bbuff,_use_NTLMv2 = True):
		return NTLMAuthenticate.from_buffer(io.BytesIO(bbuff), _use_NTLMv2 = _use_NTLMv2)

	@staticmethod
	def from_buffer(buff, _use_NTLMv2 = True):
		auth = NTLMAuthenticate(_use_NTLMv2)
		auth.Signature    = buff.read(8)
		auth.MessageType  = int.from_bytes(buff.read(4), byteorder = 'little', signed = False)
		auth.LmChallengeResponseFields = Fields.from_buffer(buff)
		auth.NtChallengeResponseFields = Fields.from_buffer(buff)
		auth.DomainNameFields = Fields.from_buffer(buff)
		auth.UserNameFields = Fields.from_buffer(buff)
		auth.WorkstationFields = Fields.from_buffer(buff)
		auth.EncryptedRandomSessionKeyFields = Fields.from_buffer(buff)
		auth.NegotiateFlags = NegotiateFlags(int.from_bytes(buff.read(4), byteorder = 'little', signed = False))
		if auth.NegotiateFlags & NegotiateFlags.NEGOTIATE_VERSION: 
			auth.Version = Version.from_buffer(buff)

		# TODO: I'm not sure about this condition!!! Need to test this!
		if auth.NegotiateFlags & NegotiateFlags.NEGOTIATE_ALWAYS_SIGN:
			auth.MIC = buff.read(16)

		currPos = buff.tell()
		auth.Payload = buff.read()

		if auth._use_NTLMv2 and auth.NtChallengeResponseFields.length > 24:
			buff.seek(auth.LmChallengeResponseFields.offset, io.SEEK_SET)
			auth.LMChallenge = LMv2Response.from_buffer(buff)
			

			buff.seek(auth.NtChallengeResponseFields.offset, io.SEEK_SET)
			auth.NTChallenge = NTLMv2Response.from_buffer(buff)

		else:
			buff.seek(auth.LmChallengeResponseFields.offset, io.SEEK_SET)
			auth.LMChallenge = LMResponse.from_buffer(buff)
				
			buff.seek(auth.NtChallengeResponseFields.offset, io.SEEK_SET)
			auth.NTChallenge = NTLMv1Response.from_buffer(buff)

		buff.seek(auth.DomainNameFields.offset,io.SEEK_SET)
		auth.DomainName = buff.read(auth.DomainNameFields.length).decode('utf-16le')
		
		buff.seek(auth.UserNameFields.offset,io.SEEK_SET)
		auth.UserName = buff.read(auth.UserNameFields.length).decode('utf-16le')

		buff.seek(auth.WorkstationFields.offset,io.SEEK_SET)
		auth.Workstation = buff.read(auth.WorkstationFields.length).decode('utf-16le')

		buff.seek(auth.EncryptedRandomSessionKeyFields.offset,io.SEEK_SET)
		auth.EncryptedRandomSession = buff.read(auth.EncryptedRandomSessionKeyFields.length)
		
		buff.seek(currPos, io.SEEK_SET)

		return auth

	def __repr__(self):
		t  = '== NTLMAuthenticate ==\r\n'
		t += 'Signature     : %s\r\n' % repr(self.Signature)
		t += 'MessageType   : %s\r\n' % repr(self.MessageType)
		t += 'NegotiateFlags: %s\r\n' % repr(self.NegotiateFlags)
		t += 'Version       : %s\r\n' % repr(self.Version)
		t += 'MIC           : %s\r\n' % repr(self.MIC.hex() if self.MIC else 'None')
		t += 'LMChallenge   : %s\r\n' % repr(self.LMChallenge)
		t += 'NTChallenge   : %s\r\n' % repr(self.NTChallenge)
		t += 'DomainName    : %s\r\n' % repr(self.DomainName)
		t += 'UserName      : %s\r\n' % repr(self.UserName)
		t += 'Workstation   : %s\r\n' % repr(self.Workstation)
		t += 'EncryptedRandomSession: %s\r\n' % repr(self.EncryptedRandomSession.hex())
		return t

def test():
	test_reconstrut()
	test_construct()
	test_2()
	
def test_2():
	data = bytes.fromhex('4e 54 4c 4d 53 53 50 00 03 00 00 00 18 00 18 006c 00 00 00 54 00 54 00 84 00 00 00 0c 00 0c 0048 00 00 00 08 00 08 00 54 00 00 00 10 00 10 005c 00 00 00 10 00 10 00 d8 00 00 00 35 82 88 e205 01 28 0a 00 00 00 0f 44 00 6f 00 6d 00 61 0069 00 6e 00 55 00 73 00 65 00 72 00 43 00 4f 004d 00 50 00 55 00 54 00 45 00 52 00 86 c3 50 97ac 9c ec 10 25 54 76 4a 57 cc cc 19 aa aa aa aaaa aa aa aa 68 cd 0a b8 51 e5 1c 96 aa bc 92 7beb ef 6a 1c 01 01 00 00 00 00 00 00 00 00 00 0000 00 00 00 aa aa aa aa aa aa aa aa 00 00 00 0002 00 0c 00 44 00 6f 00 6d 00 61 00 69 00 6e 0001 00 0c 00 53 00 65 00 72 00 76 00 65 00 72 0000 00 00 00 00 00 00 00 c5 da d2 54 4f c9 79 9094 ce 1c e9 0b c9 d0 3e')
	challenge = NTLMAuthenticate.from_bytes(data)
	print(repr(challenge))
	
def test_reconstrut(data = None):
	print('=== reconstruct===')
	if not data:
		auth_test_data = bytes.fromhex('4e544c4d5353500003000000180018007c000000180118019400000008000800580000000c000c0060000000100010006c00000010001000ac010000158288e20a00d73a0000000f0d98eb57e9c52820709c99b98ca321a15400450053005400760069006300740069006d00570049004e0031003000580036003400000000000000000000000000000000000000000000000000fade3940b9381c53c91ddcdd0d44000b0101000000000000aec600bfc5fdd4011bfa20699d7628730000000002000800540045005300540001001200570049004e003200300031003900410044000400120074006500730074002e0063006f007200700003002600570049004e003200300031003900410044002e0074006500730074002e0063006f007200700007000800aec600bfc5fdd40106000400020000000800300030000000000000000000000000200000527d27f234de743760966384d36f61ae2aa4fc2a380699f8caa600011b486d890a0010000000000000000000000000000000000009001e0063006900660073002f00310030002e00310030002e00310030002e003200000000000000000000000000fd67edfb41c09465a91fd733deb0b55b')
	else:
		auth_test_data = data
	challenge = NTLMAuthenticate.from_bytes(auth_test_data)
	print(repr(challenge))
	auth_test_data_verify = challenge.to_bytes()
	print('====== reconstructed ====')
	print(hexdump(auth_test_data_verify))
	print('====== original ====')
	print(hexdump(auth_test_data))
	assert auth_test_data == auth_test_data_verify
	
	
def test_construct():
	pass