Codebase list msldap / 75409fe msldap / authentication / kerberos / wsnet.py
75409fe

Tree @75409fe (Download .tar.gz)

wsnet.py @75409feraw · history · blame

## 
##
## Interface to allow remote kerberos authentication via Multiplexor
## 
##
##
##
##
## TODO: RPC auth type is not implemented or tested!!!!

import enum

from msldap.authentication.spnego.asn1_structs import KRB5Token
from msldap.authentication.kerberos.gssapi import get_gssapi, GSSWrapToken, KRB5_MECH_INDEP_TOKEN
from minikerberos.protocol.asn1_structs import AP_REQ, AP_REP, TGS_REP
from minikerberos.protocol.encryption import Enctype, Key, _enctype_table
from pyodidewsnet.clientauth import WSNETAuth


# mutual auth not supported
# encryption is always on

class ISC_REQ(enum.IntFlag):
	DELEGATE = 1
	MUTUAL_AUTH = 2
	REPLAY_DETECT = 4
	SEQUENCE_DETECT = 8
	CONFIDENTIALITY = 16
	USE_SESSION_KEY = 32
	PROMPT_FOR_CREDS = 64
	USE_SUPPLIED_CREDS = 128
	ALLOCATE_MEMORY = 256
	USE_DCE_STYLE = 512
	DATAGRAM = 1024
	CONNECTION = 2048
	CALL_LEVEL = 4096
	FRAGMENT_SUPPLIED = 8192
	EXTENDED_ERROR = 16384
	STREAM = 32768
	INTEGRITY = 65536
	IDENTIFY = 131072
	NULL_SESSION = 262144
	MANUAL_CRED_VALIDATION = 524288
	RESERVED1 = 1048576
	FRAGMENT_TO_FIT = 2097152
	HTTP = 0x10000000

class MSLDAPWSNetKerberosAuth:
	def __init__(self, settings):
		self.iterations = 0
		self.settings = settings
		self.mode = 'CLIENT'
		self.sspi = WSNETAuth()
		self.client = None
		self.target = None
		self.gssapi = None
		self.etype = None
		self.session_key = None
		self.seq_number = 0
		self.flags = ISC_REQ.CONNECTION
		
		self.setup()
		
	def setup(self):
		if self.settings.encrypt is True:
			self.flags = \
				ISC_REQ.CONFIDENTIALITY |\
				ISC_REQ.INTEGRITY |\
				ISC_REQ.REPLAY_DETECT |\
				ISC_REQ.SEQUENCE_DETECT

	def get_seq_number(self):
		return self.seq_number
		
	async def encrypt(self, data, message_no):
		return self.gssapi.GSS_Wrap(data, message_no)
		
	async def decrypt(self, data, message_no, direction='init', auth_data=None):
		return self.gssapi.GSS_Unwrap(data, message_no, direction=direction, auth_data=auth_data)
	
	def signing_needed(self):
		"""
		Checks if integrity protection was negotiated
		"""
		return ISC_REQ.INTEGRITY in self.flags

	def encryption_needed(self):
		"""
		Checks if confidentiality flag was negotiated
		"""
		return ISC_REQ.CONFIDENTIALITY in self.flags

	def get_session_key(self):
		return self.session_key
	
	async def authenticate(self, authData = None, flags = None, seq_number = 0, cb_data=None):
		try:
			status, ctxattr, apreq, err = await self.sspi.authenticate('KERBEROS', '', self.settings.target.to_target_string(), 3, self.flags.value, authdata = b'')
			if err is not None:
				raise err
			
			self.flags = ISC_REQ(ctxattr)

			self.session_key, err = await self.sspi.get_sessionkey()
			if err is not None:
				return None, None, err

			unwrap = KRB5_MECH_INDEP_TOKEN.from_bytes(apreq)
			aprep = AP_REQ.load(unwrap.data[2:]).native
			subkey = Key(aprep['ticket']['enc-part']['etype'], self.session_key)
			self.gssapi = get_gssapi(subkey)
				
			if aprep['ticket']['enc-part']['etype'] != 23:
				if ISC_REQ.CONFIDENTIALITY in self.flags:
					raw_seq_data, err = await self.sspi.get_sequenceno()
					if err is not None:
						return None, None, err
					self.seq_number = GSSWrapToken.from_bytes(raw_seq_data[16:]).SND_SEQ
				
			return unwrap.data[2:], False, None
		except Exception as e:
			return None, None, e