Codebase list msldap / 75409fe msldap / authentication / ntlm / wsnet.py
75409fe

Tree @75409fe (Download .tar.gz)

wsnet.py @75409feraw · history · blame

from msldap import logger
from msldap.authentication.ntlm.native import NTLMAUTHHandler, NTLMHandlerSettings
from pyodidewsnet.clientauth import WSNETAuth
import enum

class ISC_REQ(enum.IntFlag):
	DELEGATE = 1
	MUTUAL_AUTH = 2
	REPLAY_DETECT = 4
	SEQUENCE_DETECT = 8
	CONFIDENTIALITY = 16
	USE_SESSION_KEY = 32
	PROMPT_FOR_CREDS = 64
	USE_SUPPLIED_CREDS = 128
	ALLOCATE_MEMORY = 256
	USE_DCE_STYLE = 512
	DATAGRAM = 1024
	CONNECTION = 2048
	CALL_LEVEL = 4096
	FRAGMENT_SUPPLIED = 8192
	EXTENDED_ERROR = 16384
	STREAM = 32768
	INTEGRITY = 65536
	IDENTIFY = 131072
	NULL_SESSION = 262144
	MANUAL_CRED_VALIDATION = 524288
	RESERVED1 = 1048576
	FRAGMENT_TO_FIT = 2097152
	HTTP = 0x10000000

class MSLDAPWSNetNTLMAuth:
	def __init__(self, settings):
		self.settings = settings
		self.mode = None #'CLIENT'
		self.sspi = WSNETAuth()
		self.operator = None
		self.client = None
		self.target = None
		self.seq_number = 0
		
		self.session_key = None
		self.ntlm_ctx = NTLMAUTHHandler(NTLMHandlerSettings(None, 'MANUAL'))

	def setup(self):
		return
		
	@property
	def ntlmChallenge(self):
		return self.ntlm_ctx.ntlmChallenge
		
	def get_sealkey(self, mode = 'Client'):
		return self.ntlm_ctx.get_sealkey(mode = mode)
			
	def get_signkey(self, mode = 'Client'):
		return self.ntlm_ctx.get_signkey(mode = mode)

	def signing_needed(self):
		return self.ntlm_ctx.signing_needed()
	
	def encryption_needed(self):
		return self.ntlm_ctx.encryption_needed()
		
	async def encrypt(self, data, message_no):
		return await self.ntlm_ctx.encrypt(data, message_no)
		
	async def decrypt(self, data, sequence_no, direction='init', auth_data=None):
		return await self.ntlm_ctx.decrypt(data, sequence_no, direction=direction, auth_data=auth_data)

	async def sign(self, data, message_no, direction=None, reset_cipher = False):
		return await self.ntlm_ctx.sign(data, message_no, direction=None, reset_cipher = reset_cipher)
		
		
	def SEAL(self, signingKey, sealingKey, messageToSign, messageToEncrypt, seqNum, cipher_encrypt):
		return self.ntlm_ctx.SEAL(signingKey, sealingKey, messageToSign, messageToEncrypt, seqNum, cipher_encrypt)
		
	def SIGN(self, signingKey, message, seqNum, cipher_encrypt):
		return self.ntlm_ctx.SIGN(signingKey, message, seqNum, cipher_encrypt)
	
	def get_session_key(self):
		return self.session_key

	def get_seq_number(self):
		return self.seq_number
		
	def is_extended_security(self):
		return self.ntlm_ctx.is_extended_security()
	
	async def authenticate(self, authData = b'', flags = None, seq_number = 0, cb_data = None):
		try:
			if flags is None:
				flags = ISC_REQ.CONNECTION
			
			if authData is None:
				status, ctxattr, data, err = await self.sspi.authenticate('NTLM', '', '', 3, flags.value, authdata = b'')
				if err is not None:
					raise err
				self.ntlm_ctx.load_negotiate(data)
				return data, True, None
			else:
				self.ntlm_ctx.load_challenge(authData)
				status, ctxattr, data, err = await self.sspi.authenticate('NTLM', '', '', 3, flags.value, authdata = authData)
				if err is not None:
					raise err
				if err is None:
					self.ntlm_ctx.load_authenticate(data)
					self.session_key, err = await self.sspi.get_sessionkey()
					if err is not None:
						raise err
					self.ntlm_ctx.load_sessionkey(self.get_session_key())
				
				await self.sspi.disconnect()
				return data, False, None
		except Exception as e:
			return None, None, e