diff --git a/debian/changelog b/debian/changelog index 0690123..743d87b 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +python-faraday (3.19.0-0kali2) kali-dev; urgency=medium + + * Add embedded itsdangerous + + -- Sophie Brun Wed, 13 Apr 2022 17:26:50 +0200 + python-faraday (3.19.0-0kali1) kali-dev; urgency=medium * New upstream version 3.19.0 diff --git a/debian/patches/Add-vendor-in-path.patch b/debian/patches/Add-vendor-in-path.patch index 41e10b2..ea40c26 100644 --- a/debian/patches/Add-vendor-in-path.patch +++ b/debian/patches/Add-vendor-in-path.patch @@ -2,14 +2,21 @@ Date: Mon, 6 Dec 2021 17:30:46 +0100 Subject: Add usr/lib/python3/dist-packages/faraday/vendor in import path -Last-Update: 2021-12-07 +Last-Update: 2022-04-13 Add usr/lib/python3/dist-packages/faraday/vendor in PYTHONPATH to use -the embedded version instead of the packages version of SQLalchemy. +the embedded version instead of the packages version of SQLalchemy and +itsdangerous. --- - faraday/manage.py | 2 ++ - faraday/searcher/searcher.py | 2 ++ - faraday/start_server.py | 2 ++ - 3 files changed, 6 insertions(+) + faraday/manage.py | 2 ++ + faraday/searcher/searcher.py | 2 ++ + faraday/server/api/modules/token.py | 3 +++ + faraday/server/api/modules/websocket_auth.py | 3 +++ + faraday/server/app.py | 3 +++ + faraday/server/websocket_factories.py | 3 +++ + faraday/start_server.py | 2 ++ + tests/conftest.py | 3 +++ + tests/test_api_login.py | 3 +++ + 9 files changed, 24 insertions(+) diff --git a/faraday/manage.py b/faraday/manage.py index 47b3347..7f3a531 100755 @@ -37,6 +44,62 @@ from faraday.searcher.api import Api from faraday.searcher.validator import validate_rules from faraday.server.models import Service, Host +diff --git a/faraday/server/api/modules/token.py b/faraday/server/api/modules/token.py +index bb656cb..ccd0e2b 100644 +--- a/faraday/server/api/modules/token.py ++++ b/faraday/server/api/modules/token.py +@@ -1,6 +1,9 @@ + import datetime + import logging + ++import sys ++sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') ++ + from itsdangerous import TimedJSONWebSignatureSerializer + from flask import Blueprint, request + from flask_security.utils import hash_data +diff --git a/faraday/server/api/modules/websocket_auth.py b/faraday/server/api/modules/websocket_auth.py +index 879ba12..f3d07a6 100644 +--- a/faraday/server/api/modules/websocket_auth.py ++++ b/faraday/server/api/modules/websocket_auth.py +@@ -1,6 +1,9 @@ + # Faraday Penetration Test IDE + # Copyright (C) 2016 Infobyte LLC (http://www.infobytesec.com/) + # See the file 'doc/LICENSE' for the license information ++import sys ++sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') ++ + import logging + import flask + from flask import Blueprint +diff --git a/faraday/server/app.py b/faraday/server/app.py +index 11c8371..9823804 100644 +--- a/faraday/server/app.py ++++ b/faraday/server/app.py +@@ -8,6 +8,9 @@ import datetime + import bleach + import pyotp + import requests ++import sys ++sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') ++ + from flask_limiter import Limiter + from flask_limiter.util import get_remote_address + from itsdangerous import TimedJSONWebSignatureSerializer, SignatureExpired, BadSignature +diff --git a/faraday/server/websocket_factories.py b/faraday/server/websocket_factories.py +index 93aaaa5..bc433bc 100644 +--- a/faraday/server/websocket_factories.py ++++ b/faraday/server/websocket_factories.py +@@ -4,6 +4,9 @@ Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/) + See the file 'doc/LICENSE' for the license information + + """ ++import sys ++sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') ++ + import json + import logging + import itsdangerous diff --git a/faraday/start_server.py b/faraday/start_server.py index 4ab2c68..a29b855 100644 --- a/faraday/start_server.py @@ -50,3 +113,30 @@ import psycopg2 from alembic.runtime.migration import MigrationContext +diff --git a/tests/conftest.py b/tests/conftest.py +index be5e98c..2e8daa2 100644 +--- a/tests/conftest.py ++++ b/tests/conftest.py +@@ -17,6 +17,9 @@ from pathlib import Path + from pytest_factoryboy import register + from sqlalchemy import event + ++import sys ++sys.path.insert(1, 'debian/vendor') ++ + from faraday.server.app import create_app + from faraday.server.models import db + from tests import factories +diff --git a/tests/test_api_login.py b/tests/test_api_login.py +index c4173da..24a8d96 100644 +--- a/tests/test_api_login.py ++++ b/tests/test_api_login.py +@@ -1,5 +1,8 @@ + import pytest + from flask_security.utils import hash_password ++import sys ++ ++sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') + from itsdangerous import TimedJSONWebSignatureSerializer + + from faraday.server.models import User diff --git a/debian/rules b/debian/rules index d4b5951..6ea28cf 100755 --- a/debian/rules +++ b/debian/rules @@ -18,7 +18,7 @@ # remove unwanted files: .gitignore and useless README rm debian/faraday/usr/lib/python3*/dist-packages/faraday/migrations/versions/.gitignore rm debian/faraday/usr/lib/python3*/dist-packages/faraday/migrations/README - # copy vendor to embed sqlalchemy + # copy vendor to embed sqlalchemy and other Python modules cp -r debian/vendor debian/faraday/usr/lib/python3*/dist-packages/faraday/ override_dh_installchangelogs: diff --git a/debian/source/include-binaries b/debian/source/include-binaries new file mode 100644 index 0000000..7f3d8f8 --- /dev/null +++ b/debian/source/include-binaries @@ -0,0 +1,2 @@ +debian/vendor/itsdangerous/docs/_static/itsdangerous-logo.png +debian/vendor/itsdangerous/docs/_static/itsdangerous-logo-sidebar.png diff --git a/debian/vendor/itsdangerous/__init__.py b/debian/vendor/itsdangerous/__init__.py new file mode 100644 index 0000000..5010252 --- /dev/null +++ b/debian/vendor/itsdangerous/__init__.py @@ -0,0 +1,22 @@ +from ._json import json +from .encoding import base64_decode as base64_decode +from .encoding import base64_encode as base64_encode +from .encoding import want_bytes as want_bytes +from .exc import BadData as BadData +from .exc import BadHeader as BadHeader +from .exc import BadPayload as BadPayload +from .exc import BadSignature as BadSignature +from .exc import BadTimeSignature as BadTimeSignature +from .exc import SignatureExpired as SignatureExpired +from .jws import JSONWebSignatureSerializer +from .jws import TimedJSONWebSignatureSerializer +from .serializer import Serializer as Serializer +from .signer import HMACAlgorithm as HMACAlgorithm +from .signer import NoneAlgorithm as NoneAlgorithm +from .signer import Signer as Signer +from .timed import TimedSerializer as TimedSerializer +from .timed import TimestampSigner as TimestampSigner +from .url_safe import URLSafeSerializer as URLSafeSerializer +from .url_safe import URLSafeTimedSerializer as URLSafeTimedSerializer + +__version__ = "2.0.1" diff --git a/debian/vendor/itsdangerous/_json.py b/debian/vendor/itsdangerous/_json.py new file mode 100644 index 0000000..9368da2 --- /dev/null +++ b/debian/vendor/itsdangerous/_json.py @@ -0,0 +1,34 @@ +import json as _json +import typing as _t +from types import ModuleType + + +class _CompactJSON: + """Wrapper around json module that strips whitespace.""" + + @staticmethod + def loads(payload: _t.Union[str, bytes]) -> _t.Any: + return _json.loads(payload) + + @staticmethod + def dumps(obj: _t.Any, **kwargs: _t.Any) -> str: + kwargs.setdefault("ensure_ascii", False) + kwargs.setdefault("separators", (",", ":")) + return _json.dumps(obj, **kwargs) + + +class DeprecatedJSON(ModuleType): + def __getattribute__(self, item: str) -> _t.Any: + import warnings + + warnings.warn( + "Importing 'itsdangerous.json' is deprecated and will be" + " removed in ItsDangerous 2.1. Use Python's 'json' module" + " instead.", + DeprecationWarning, + stacklevel=2, + ) + return getattr(_json, item) + + +json = DeprecatedJSON("json") diff --git a/debian/vendor/itsdangerous/encoding.py b/debian/vendor/itsdangerous/encoding.py new file mode 100644 index 0000000..cacc1c5 --- /dev/null +++ b/debian/vendor/itsdangerous/encoding.py @@ -0,0 +1,54 @@ +import base64 +import string +import struct +import typing as _t + +from .exc import BadData + +_t_str_bytes = _t.Union[str, bytes] + + +def want_bytes( + s: _t_str_bytes, encoding: str = "utf-8", errors: str = "strict" +) -> bytes: + if isinstance(s, str): + s = s.encode(encoding, errors) + + return s + + +def base64_encode(string: _t_str_bytes) -> bytes: + """Base64 encode a string of bytes or text. The resulting bytes are + safe to use in URLs. + """ + string = want_bytes(string) + return base64.urlsafe_b64encode(string).rstrip(b"=") + + +def base64_decode(string: _t_str_bytes) -> bytes: + """Base64 decode a URL-safe string of bytes or text. The result is + bytes. + """ + string = want_bytes(string, encoding="ascii", errors="ignore") + string += b"=" * (-len(string) % 4) + + try: + return base64.urlsafe_b64decode(string) + except (TypeError, ValueError): + raise BadData("Invalid base64-encoded data") + + +# The alphabet used by base64.urlsafe_* +_base64_alphabet = f"{string.ascii_letters}{string.digits}-_=".encode("ascii") + +_int64_struct = struct.Struct(">Q") +_int_to_bytes = _int64_struct.pack +_bytes_to_int = _t.cast("_t.Callable[[bytes], _t.Tuple[int]]", _int64_struct.unpack) + + +def int_to_bytes(num: int) -> bytes: + return _int_to_bytes(num).lstrip(b"\x00") + + +def bytes_to_int(bytestr: bytes) -> int: + return _bytes_to_int(bytestr.rjust(8, b"\x00"))[0] diff --git a/debian/vendor/itsdangerous/exc.py b/debian/vendor/itsdangerous/exc.py new file mode 100644 index 0000000..c38a6af --- /dev/null +++ b/debian/vendor/itsdangerous/exc.py @@ -0,0 +1,107 @@ +import typing as _t +from datetime import datetime + +_t_opt_any = _t.Optional[_t.Any] +_t_opt_exc = _t.Optional[Exception] + + +class BadData(Exception): + """Raised if bad data of any sort was encountered. This is the base + for all exceptions that ItsDangerous defines. + + .. versionadded:: 0.15 + """ + + def __init__(self, message: str): + super().__init__(message) + self.message = message + + def __str__(self) -> str: + return self.message + + +class BadSignature(BadData): + """Raised if a signature does not match.""" + + def __init__(self, message: str, payload: _t_opt_any = None): + super().__init__(message) + + #: The payload that failed the signature test. In some + #: situations you might still want to inspect this, even if + #: you know it was tampered with. + #: + #: .. versionadded:: 0.14 + self.payload: _t_opt_any = payload + + +class BadTimeSignature(BadSignature): + """Raised if a time-based signature is invalid. This is a subclass + of :class:`BadSignature`. + """ + + def __init__( + self, + message: str, + payload: _t_opt_any = None, + date_signed: _t.Optional[datetime] = None, + ): + super().__init__(message, payload) + + #: If the signature expired this exposes the date of when the + #: signature was created. This can be helpful in order to + #: tell the user how long a link has been gone stale. + #: + #: .. versionchanged:: 2.0 + #: The datetime value is timezone-aware rather than naive. + #: + #: .. versionadded:: 0.14 + self.date_signed = date_signed + + +class SignatureExpired(BadTimeSignature): + """Raised if a signature timestamp is older than ``max_age``. This + is a subclass of :exc:`BadTimeSignature`. + """ + + +class BadHeader(BadSignature): + """Raised if a signed header is invalid in some form. This only + happens for serializers that have a header that goes with the + signature. + + .. versionadded:: 0.24 + """ + + def __init__( + self, + message: str, + payload: _t_opt_any = None, + header: _t_opt_any = None, + original_error: _t_opt_exc = None, + ): + super().__init__(message, payload) + + #: If the header is actually available but just malformed it + #: might be stored here. + self.header: _t_opt_any = header + + #: If available, the error that indicates why the payload was + #: not valid. This might be ``None``. + self.original_error: _t_opt_exc = original_error + + +class BadPayload(BadData): + """Raised if a payload is invalid. This could happen if the payload + is loaded despite an invalid signature, or if there is a mismatch + between the serializer and deserializer. The original exception + that occurred during loading is stored on as :attr:`original_error`. + + .. versionadded:: 0.15 + """ + + def __init__(self, message: str, original_error: _t_opt_exc = None): + super().__init__(message) + + #: If available, the error that indicates why the payload was + #: not valid. This might be ``None``. + self.original_error: _t_opt_exc = original_error diff --git a/debian/vendor/itsdangerous/jws.py b/debian/vendor/itsdangerous/jws.py new file mode 100644 index 0000000..2353a30 --- /dev/null +++ b/debian/vendor/itsdangerous/jws.py @@ -0,0 +1,259 @@ +import hashlib +import time +import warnings +from datetime import datetime +from datetime import timezone +from decimal import Decimal +from numbers import Real + +from ._json import _CompactJSON +from .encoding import base64_decode +from .encoding import base64_encode +from .encoding import want_bytes +from .exc import BadData +from .exc import BadHeader +from .exc import BadPayload +from .exc import BadSignature +from .exc import SignatureExpired +from .serializer import Serializer +from .signer import HMACAlgorithm +from .signer import NoneAlgorithm + + +class JSONWebSignatureSerializer(Serializer): + """This serializer implements JSON Web Signature (JWS) support. Only + supports the JWS Compact Serialization. + + .. deprecated:: 2.0 + Will be removed in ItsDangerous 2.1. Use a dedicated library + such as authlib. + """ + + jws_algorithms = { + "HS256": HMACAlgorithm(hashlib.sha256), + "HS384": HMACAlgorithm(hashlib.sha384), + "HS512": HMACAlgorithm(hashlib.sha512), + "none": NoneAlgorithm(), + } + + #: The default algorithm to use for signature generation + default_algorithm = "HS512" + + default_serializer = _CompactJSON + + def __init__( + self, + secret_key, + salt=None, + serializer=None, + serializer_kwargs=None, + signer=None, + signer_kwargs=None, + algorithm_name=None, + ): + warnings.warn( + "JWS support is deprecated and will be removed in" + " ItsDangerous 2.1. Use a dedicated JWS/JWT library such as" + " authlib.", + DeprecationWarning, + stacklevel=2, + ) + super().__init__( + secret_key, + salt=salt, + serializer=serializer, + serializer_kwargs=serializer_kwargs, + signer=signer, + signer_kwargs=signer_kwargs, + ) + + if algorithm_name is None: + algorithm_name = self.default_algorithm + + self.algorithm_name = algorithm_name + self.algorithm = self.make_algorithm(algorithm_name) + + def load_payload(self, payload, serializer=None, return_header=False): + payload = want_bytes(payload) + + if b"." not in payload: + raise BadPayload('No "." found in value') + + base64d_header, base64d_payload = payload.split(b".", 1) + + try: + json_header = base64_decode(base64d_header) + except Exception as e: + raise BadHeader( + "Could not base64 decode the header because of an exception", + original_error=e, + ) + + try: + json_payload = base64_decode(base64d_payload) + except Exception as e: + raise BadPayload( + "Could not base64 decode the payload because of an exception", + original_error=e, + ) + + try: + header = super().load_payload(json_header, serializer=_CompactJSON) + except BadData as e: + raise BadHeader( + "Could not unserialize header because it was malformed", + original_error=e, + ) + + if not isinstance(header, dict): + raise BadHeader("Header payload is not a JSON object", header=header) + + payload = super().load_payload(json_payload, serializer=serializer) + + if return_header: + return payload, header + + return payload + + def dump_payload(self, header, obj): + base64d_header = base64_encode( + self.serializer.dumps(header, **self.serializer_kwargs) + ) + base64d_payload = base64_encode( + self.serializer.dumps(obj, **self.serializer_kwargs) + ) + return base64d_header + b"." + base64d_payload + + def make_algorithm(self, algorithm_name): + try: + return self.jws_algorithms[algorithm_name] + except KeyError: + raise NotImplementedError("Algorithm not supported") + + def make_signer(self, salt=None, algorithm=None): + if salt is None: + salt = self.salt + + key_derivation = "none" if salt is None else None + + if algorithm is None: + algorithm = self.algorithm + + return self.signer( + self.secret_keys, + salt=salt, + sep=".", + key_derivation=key_derivation, + algorithm=algorithm, + ) + + def make_header(self, header_fields): + header = header_fields.copy() if header_fields else {} + header["alg"] = self.algorithm_name + return header + + def dumps(self, obj, salt=None, header_fields=None): + """Like :meth:`.Serializer.dumps` but creates a JSON Web + Signature. It also allows for specifying additional fields to be + included in the JWS header. + """ + header = self.make_header(header_fields) + signer = self.make_signer(salt, self.algorithm) + return signer.sign(self.dump_payload(header, obj)) + + def loads(self, s, salt=None, return_header=False): + """Reverse of :meth:`dumps`. If requested via ``return_header`` + it will return a tuple of payload and header. + """ + payload, header = self.load_payload( + self.make_signer(salt, self.algorithm).unsign(want_bytes(s)), + return_header=True, + ) + + if header.get("alg") != self.algorithm_name: + raise BadHeader("Algorithm mismatch", header=header, payload=payload) + + if return_header: + return payload, header + + return payload + + def loads_unsafe(self, s, salt=None, return_header=False): + kwargs = {"return_header": return_header} + return self._loads_unsafe_impl(s, salt, kwargs, kwargs) + + +class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer): + """Works like the regular :class:`JSONWebSignatureSerializer` but + also records the time of the signing and can be used to expire + signatures. + + JWS currently does not specify this behavior but it mentions a + possible extension like this in the spec. Expiry date is encoded + into the header similar to what's specified in `draft-ietf-oauth + -json-web-token `_. + """ + + DEFAULT_EXPIRES_IN = 3600 + + def __init__(self, secret_key, expires_in=None, **kwargs): + super().__init__(secret_key, **kwargs) + + if expires_in is None: + expires_in = self.DEFAULT_EXPIRES_IN + + self.expires_in = expires_in + + def make_header(self, header_fields): + header = super().make_header(header_fields) + iat = self.now() + exp = iat + self.expires_in + header["iat"] = iat + header["exp"] = exp + return header + + def loads(self, s, salt=None, return_header=False): + payload, header = super().loads(s, salt, return_header=True) + + if "exp" not in header: + raise BadSignature("Missing expiry date", payload=payload) + + int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload) + + try: + header["exp"] = int(header["exp"]) + except ValueError: + raise int_date_error + + if header["exp"] < 0: + raise int_date_error + + if header["exp"] < self.now(): + raise SignatureExpired( + "Signature expired", + payload=payload, + date_signed=self.get_issue_date(header), + ) + + if return_header: + return payload, header + + return payload + + def get_issue_date(self, header): + """If the header contains the ``iat`` field, return the date the + signature was issued, as a timezone-aware + :class:`datetime.datetime` in UTC. + + .. versionchanged:: 2.0 + The timestamp is returned as a timezone-aware ``datetime`` + in UTC rather than a naive ``datetime`` assumed to be UTC. + """ + rv = header.get("iat") + + if isinstance(rv, (Real, Decimal)): + return datetime.fromtimestamp(int(rv), tz=timezone.utc) + + def now(self): + return int(time.time()) diff --git a/debian/vendor/itsdangerous/py.typed b/debian/vendor/itsdangerous/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/debian/vendor/itsdangerous/serializer.py b/debian/vendor/itsdangerous/serializer.py new file mode 100644 index 0000000..36a73fb --- /dev/null +++ b/debian/vendor/itsdangerous/serializer.py @@ -0,0 +1,295 @@ +import json +import typing as _t + +from .encoding import want_bytes +from .exc import BadPayload +from .exc import BadSignature +from .signer import _make_keys_list +from .signer import Signer + +_t_str_bytes = _t.Union[str, bytes] +_t_opt_str_bytes = _t.Optional[_t_str_bytes] +_t_kwargs = _t.Dict[str, _t.Any] +_t_opt_kwargs = _t.Optional[_t_kwargs] +_t_signer = _t.Type[Signer] +_t_fallbacks = _t.List[_t.Union[_t_kwargs, _t.Tuple[_t_signer, _t_kwargs], _t_signer]] +_t_load_unsafe = _t.Tuple[bool, _t.Any] +_t_secret_key = _t.Union[_t.Iterable[_t_str_bytes], _t_str_bytes] + + +def is_text_serializer(serializer: _t.Any) -> bool: + """Checks whether a serializer generates text or binary.""" + return isinstance(serializer.dumps({}), str) + + +class Serializer: + """A serializer wraps a :class:`~itsdangerous.signer.Signer` to + enable serializing and securely signing data other than bytes. It + can unsign to verify that the data hasn't been changed. + + The serializer provides :meth:`dumps` and :meth:`loads`, similar to + :mod:`json`, and by default uses :mod:`json` internally to serialize + the data to bytes. + + The secret key should be a random string of ``bytes`` and should not + be saved to code or version control. Different salts should be used + to distinguish signing in different contexts. See :doc:`/concepts` + for information about the security of the secret key and salt. + + :param secret_key: The secret key to sign and verify with. Can be a + list of keys, oldest to newest, to support key rotation. + :param salt: Extra key to combine with ``secret_key`` to distinguish + signatures in different contexts. + :param serializer: An object that provides ``dumps`` and ``loads`` + methods for serializing data to a string. Defaults to + :attr:`default_serializer`, which defaults to :mod:`json`. + :param serializer_kwargs: Keyword arguments to pass when calling + ``serializer.dumps``. + :param signer: A ``Signer`` class to instantiate when signing data. + Defaults to :attr:`default_signer`, which defaults to + :class:`~itsdangerous.signer.Signer`. + :param signer_kwargs: Keyword arguments to pass when instantiating + the ``Signer`` class. + :param fallback_signers: List of signer parameters to try when + unsigning with the default signer fails. Each item can be a dict + of ``signer_kwargs``, a ``Signer`` class, or a tuple of + ``(signer, signer_kwargs)``. Defaults to + :attr:`default_fallback_signers`. + + .. versionchanged:: 2.0 + Added support for key rotation by passing a list to + ``secret_key``. + + .. versionchanged:: 2.0 + Removed the default SHA-512 fallback signer from + ``default_fallback_signers``. + + .. versionchanged:: 1.1 + Added support for ``fallback_signers`` and configured a default + SHA-512 fallback. This fallback is for users who used the yanked + 1.0.0 release which defaulted to SHA-512. + + .. versionchanged:: 0.14 + The ``signer`` and ``signer_kwargs`` parameters were added to + the constructor. + """ + + #: The default serialization module to use to serialize data to a + #: string internally. The default is :mod:`json`, but can be changed + #: to any object that provides ``dumps`` and ``loads`` methods. + default_serializer: _t.Any = json + + #: The default ``Signer`` class to instantiate when signing data. + #: The default is :class:`itsdangerous.signer.Signer`. + default_signer: _t_signer = Signer + + #: The default fallback signers to try when unsigning fails. + default_fallback_signers: _t_fallbacks = [] + + def __init__( + self, + secret_key: _t_secret_key, + salt: _t_opt_str_bytes = b"itsdangerous", + serializer: _t.Any = None, + serializer_kwargs: _t_opt_kwargs = None, + signer: _t.Optional[_t_signer] = None, + signer_kwargs: _t_opt_kwargs = None, + fallback_signers: _t.Optional[_t_fallbacks] = None, + ): + #: The list of secret keys to try for verifying signatures, from + #: oldest to newest. The newest (last) key is used for signing. + #: + #: This allows a key rotation system to keep a list of allowed + #: keys and remove expired ones. + self.secret_keys: _t.List[bytes] = _make_keys_list(secret_key) + + if salt is not None: + salt = want_bytes(salt) + # if salt is None then the signer's default is used + + self.salt = salt + + if serializer is None: + serializer = self.default_serializer + + self.serializer: _t.Any = serializer + self.is_text_serializer: bool = is_text_serializer(serializer) + + if signer is None: + signer = self.default_signer + + self.signer: _t_signer = signer + self.signer_kwargs: _t_kwargs = signer_kwargs or {} + + if fallback_signers is None: + fallback_signers = list(self.default_fallback_signers or ()) + + self.fallback_signers: _t_fallbacks = fallback_signers + self.serializer_kwargs: _t_kwargs = serializer_kwargs or {} + + @property + def secret_key(self) -> bytes: + """The newest (last) entry in the :attr:`secret_keys` list. This + is for compatibility from before key rotation support was added. + """ + return self.secret_keys[-1] + + def load_payload( + self, payload: bytes, serializer: _t.Optional[_t.Any] = None + ) -> _t.Any: + """Loads the encoded object. This function raises + :class:`.BadPayload` if the payload is not valid. The + ``serializer`` parameter can be used to override the serializer + stored on the class. The encoded ``payload`` should always be + bytes. + """ + if serializer is None: + serializer = self.serializer + is_text = self.is_text_serializer + else: + is_text = is_text_serializer(serializer) + + try: + if is_text: + return serializer.loads(payload.decode("utf-8")) + + return serializer.loads(payload) + except Exception as e: + raise BadPayload( + "Could not load the payload because an exception" + " occurred on unserializing the data.", + original_error=e, + ) + + def dump_payload(self, obj: _t.Any) -> bytes: + """Dumps the encoded object. The return value is always bytes. + If the internal serializer returns text, the value will be + encoded as UTF-8. + """ + return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs)) + + def make_signer(self, salt: _t_opt_str_bytes = None) -> Signer: + """Creates a new instance of the signer to be used. The default + implementation uses the :class:`.Signer` base class. + """ + if salt is None: + salt = self.salt + + return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs) + + def iter_unsigners(self, salt: _t_opt_str_bytes = None) -> _t.Iterator[Signer]: + """Iterates over all signers to be tried for unsigning. Starts + with the configured signer, then constructs each signer + specified in ``fallback_signers``. + """ + if salt is None: + salt = self.salt + + yield self.make_signer(salt) + + for fallback in self.fallback_signers: + if isinstance(fallback, dict): + kwargs = fallback + fallback = self.signer + elif isinstance(fallback, tuple): + fallback, kwargs = fallback + else: + kwargs = self.signer_kwargs + + for secret_key in self.secret_keys: + yield fallback(secret_key, salt=salt, **kwargs) + + def dumps(self, obj: _t.Any, salt: _t_opt_str_bytes = None) -> _t_str_bytes: + """Returns a signed string serialized with the internal + serializer. The return value can be either a byte or unicode + string depending on the format of the internal serializer. + """ + payload = want_bytes(self.dump_payload(obj)) + rv = self.make_signer(salt).sign(payload) + + if self.is_text_serializer: + return rv.decode("utf-8") + + return rv + + def dump(self, obj: _t.Any, f: _t.IO, salt: _t_opt_str_bytes = None) -> None: + """Like :meth:`dumps` but dumps into a file. The file handle has + to be compatible with what the internal serializer expects. + """ + f.write(self.dumps(obj, salt)) + + def loads( + self, s: _t_str_bytes, salt: _t_opt_str_bytes = None, **kwargs: _t.Any + ) -> _t.Any: + """Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the + signature validation fails. + """ + s = want_bytes(s) + last_exception = None + + for signer in self.iter_unsigners(salt): + try: + return self.load_payload(signer.unsign(s)) + except BadSignature as err: + last_exception = err + + raise _t.cast(BadSignature, last_exception) + + def load(self, f: _t.IO, salt: _t_opt_str_bytes = None) -> _t.Any: + """Like :meth:`loads` but loads from a file.""" + return self.loads(f.read(), salt) + + def loads_unsafe( + self, s: _t_str_bytes, salt: _t_opt_str_bytes = None + ) -> _t_load_unsafe: + """Like :meth:`loads` but without verifying the signature. This + is potentially very dangerous to use depending on how your + serializer works. The return value is ``(signature_valid, + payload)`` instead of just the payload. The first item will be a + boolean that indicates if the signature is valid. This function + never fails. + + Use it for debugging only and if you know that your serializer + module is not exploitable (for example, do not use it with a + pickle serializer). + + .. versionadded:: 0.15 + """ + return self._loads_unsafe_impl(s, salt) + + def _loads_unsafe_impl( + self, + s: _t_str_bytes, + salt: _t_opt_str_bytes, + load_kwargs: _t_opt_kwargs = None, + load_payload_kwargs: _t_opt_kwargs = None, + ) -> _t_load_unsafe: + """Low level helper function to implement :meth:`loads_unsafe` + in serializer subclasses. + """ + if load_kwargs is None: + load_kwargs = {} + + try: + return True, self.loads(s, salt=salt, **load_kwargs) + except BadSignature as e: + if e.payload is None: + return False, None + + if load_payload_kwargs is None: + load_payload_kwargs = {} + + try: + return ( + False, + self.load_payload(e.payload, **load_payload_kwargs), + ) + except BadPayload: + return False, None + + def load_unsafe(self, f: _t.IO, salt: _t_opt_str_bytes = None) -> _t_load_unsafe: + """Like :meth:`loads_unsafe` but loads from a file. + + .. versionadded:: 0.15 + """ + return self.loads_unsafe(f.read(), salt=salt) diff --git a/debian/vendor/itsdangerous/signer.py b/debian/vendor/itsdangerous/signer.py new file mode 100644 index 0000000..aa12005 --- /dev/null +++ b/debian/vendor/itsdangerous/signer.py @@ -0,0 +1,257 @@ +import hashlib +import hmac +import typing as _t + +from .encoding import _base64_alphabet +from .encoding import base64_decode +from .encoding import base64_encode +from .encoding import want_bytes +from .exc import BadSignature + +_t_str_bytes = _t.Union[str, bytes] +_t_opt_str_bytes = _t.Optional[_t_str_bytes] +_t_secret_key = _t.Union[_t.Iterable[_t_str_bytes], _t_str_bytes] + + +class SigningAlgorithm: + """Subclasses must implement :meth:`get_signature` to provide + signature generation functionality. + """ + + def get_signature(self, key: bytes, value: bytes) -> bytes: + """Returns the signature for the given key and value.""" + raise NotImplementedError() + + def verify_signature(self, key: bytes, value: bytes, sig: bytes) -> bool: + """Verifies the given signature matches the expected + signature. + """ + return hmac.compare_digest(sig, self.get_signature(key, value)) + + +class NoneAlgorithm(SigningAlgorithm): + """Provides an algorithm that does not perform any signing and + returns an empty signature. + """ + + def get_signature(self, key: bytes, value: bytes) -> bytes: + return b"" + + +class HMACAlgorithm(SigningAlgorithm): + """Provides signature generation using HMACs.""" + + #: The digest method to use with the MAC algorithm. This defaults to + #: SHA1, but can be changed to any other function in the hashlib + #: module. + default_digest_method: _t.Any = staticmethod(hashlib.sha1) + + def __init__(self, digest_method: _t.Any = None): + if digest_method is None: + digest_method = self.default_digest_method + + self.digest_method: _t.Any = digest_method + + def get_signature(self, key: bytes, value: bytes) -> bytes: + mac = hmac.new(key, msg=value, digestmod=self.digest_method) + return mac.digest() + + +def _make_keys_list(secret_key: _t_secret_key) -> _t.List[bytes]: + if isinstance(secret_key, (str, bytes)): + return [want_bytes(secret_key)] + + return [want_bytes(s) for s in secret_key] + + +class Signer: + """A signer securely signs bytes, then unsigns them to verify that + the value hasn't been changed. + + The secret key should be a random string of ``bytes`` and should not + be saved to code or version control. Different salts should be used + to distinguish signing in different contexts. See :doc:`/concepts` + for information about the security of the secret key and salt. + + :param secret_key: The secret key to sign and verify with. Can be a + list of keys, oldest to newest, to support key rotation. + :param salt: Extra key to combine with ``secret_key`` to distinguish + signatures in different contexts. + :param sep: Separator between the signature and value. + :param key_derivation: How to derive the signing key from the secret + key and salt. Possible values are ``concat``, ``django-concat``, + or ``hmac``. Defaults to :attr:`default_key_derivation`, which + defaults to ``django-concat``. + :param digest_method: Hash function to use when generating the HMAC + signature. Defaults to :attr:`default_digest_method`, which + defaults to :func:`hashlib.sha1`. Note that the security of the + hash alone doesn't apply when used intermediately in HMAC. + :param algorithm: A :class:`SigningAlgorithm` instance to use + instead of building a default :class:`HMACAlgorithm` with the + ``digest_method``. + + .. versionchanged:: 2.0 + Added support for key rotation by passing a list to + ``secret_key``. + + .. versionchanged:: 0.18 + ``algorithm`` was added as an argument to the class constructor. + + .. versionchanged:: 0.14 + ``key_derivation`` and ``digest_method`` were added as arguments + to the class constructor. + """ + + #: The default digest method to use for the signer. The default is + #: :func:`hashlib.sha1`, but can be changed to any :mod:`hashlib` or + #: compatible object. Note that the security of the hash alone + #: doesn't apply when used intermediately in HMAC. + #: + #: .. versionadded:: 0.14 + default_digest_method: _t.Any = staticmethod(hashlib.sha1) + + #: The default scheme to use to derive the signing key from the + #: secret key and salt. The default is ``django-concat``. Possible + #: values are ``concat``, ``django-concat``, and ``hmac``. + #: + #: .. versionadded:: 0.14 + default_key_derivation: str = "django-concat" + + def __init__( + self, + secret_key: _t_secret_key, + salt: _t_opt_str_bytes = b"itsdangerous.Signer", + sep: _t_str_bytes = b".", + key_derivation: _t.Optional[str] = None, + digest_method: _t.Optional[_t.Any] = None, + algorithm: _t.Optional[SigningAlgorithm] = None, + ): + #: The list of secret keys to try for verifying signatures, from + #: oldest to newest. The newest (last) key is used for signing. + #: + #: This allows a key rotation system to keep a list of allowed + #: keys and remove expired ones. + self.secret_keys: _t.List[bytes] = _make_keys_list(secret_key) + self.sep: bytes = want_bytes(sep) + + if self.sep in _base64_alphabet: + raise ValueError( + "The given separator cannot be used because it may be" + " contained in the signature itself. ASCII letters," + " digits, and '-_=' must not be used." + ) + + if salt is not None: + salt = want_bytes(salt) + else: + salt = b"itsdangerous.Signer" + + self.salt = salt + + if key_derivation is None: + key_derivation = self.default_key_derivation + + self.key_derivation: str = key_derivation + + if digest_method is None: + digest_method = self.default_digest_method + + self.digest_method: _t.Any = digest_method + + if algorithm is None: + algorithm = HMACAlgorithm(self.digest_method) + + self.algorithm: SigningAlgorithm = algorithm + + @property + def secret_key(self) -> bytes: + """The newest (last) entry in the :attr:`secret_keys` list. This + is for compatibility from before key rotation support was added. + """ + return self.secret_keys[-1] + + def derive_key(self, secret_key: _t_opt_str_bytes = None) -> bytes: + """This method is called to derive the key. The default key + derivation choices can be overridden here. Key derivation is not + intended to be used as a security method to make a complex key + out of a short password. Instead you should use large random + secret keys. + + :param secret_key: A specific secret key to derive from. + Defaults to the last item in :attr:`secret_keys`. + + .. versionchanged:: 2.0 + Added the ``secret_key`` parameter. + """ + if secret_key is None: + secret_key = self.secret_keys[-1] + else: + secret_key = want_bytes(secret_key) + + if self.key_derivation == "concat": + return _t.cast(bytes, self.digest_method(self.salt + secret_key).digest()) + elif self.key_derivation == "django-concat": + return _t.cast( + bytes, self.digest_method(self.salt + b"signer" + secret_key).digest() + ) + elif self.key_derivation == "hmac": + mac = hmac.new(secret_key, digestmod=self.digest_method) + mac.update(self.salt) + return mac.digest() + elif self.key_derivation == "none": + return secret_key + else: + raise TypeError("Unknown key derivation method") + + def get_signature(self, value: _t_str_bytes) -> bytes: + """Returns the signature for the given value.""" + value = want_bytes(value) + key = self.derive_key() + sig = self.algorithm.get_signature(key, value) + return base64_encode(sig) + + def sign(self, value: _t_str_bytes) -> bytes: + """Signs the given string.""" + value = want_bytes(value) + return value + self.sep + self.get_signature(value) + + def verify_signature(self, value: _t_str_bytes, sig: _t_str_bytes) -> bool: + """Verifies the signature for the given value.""" + try: + sig = base64_decode(sig) + except Exception: + return False + + value = want_bytes(value) + + for secret_key in reversed(self.secret_keys): + key = self.derive_key(secret_key) + + if self.algorithm.verify_signature(key, value, sig): + return True + + return False + + def unsign(self, signed_value: _t_str_bytes) -> bytes: + """Unsigns the given string.""" + signed_value = want_bytes(signed_value) + + if self.sep not in signed_value: + raise BadSignature(f"No {self.sep!r} found in value") + + value, sig = signed_value.rsplit(self.sep, 1) + + if self.verify_signature(value, sig): + return value + + raise BadSignature(f"Signature {sig!r} does not match", payload=value) + + def validate(self, signed_value: _t_str_bytes) -> bool: + """Only validates the given signed value. Returns ``True`` if + the signature exists and is valid. + """ + try: + self.unsign(signed_value) + return True + except BadSignature: + return False diff --git a/debian/vendor/itsdangerous/timed.py b/debian/vendor/itsdangerous/timed.py new file mode 100644 index 0000000..5ea957f --- /dev/null +++ b/debian/vendor/itsdangerous/timed.py @@ -0,0 +1,227 @@ +import time +import typing +import typing as _t +from datetime import datetime +from datetime import timezone + +from .encoding import base64_decode +from .encoding import base64_encode +from .encoding import bytes_to_int +from .encoding import int_to_bytes +from .encoding import want_bytes +from .exc import BadSignature +from .exc import BadTimeSignature +from .exc import SignatureExpired +from .serializer import Serializer +from .signer import Signer + +_t_str_bytes = _t.Union[str, bytes] +_t_opt_str_bytes = _t.Optional[_t_str_bytes] +_t_opt_int = _t.Optional[int] + +if _t.TYPE_CHECKING: + import typing_extensions as _te + + +class TimestampSigner(Signer): + """Works like the regular :class:`.Signer` but also records the time + of the signing and can be used to expire signatures. The + :meth:`unsign` method can raise :exc:`.SignatureExpired` if the + unsigning failed because the signature is expired. + """ + + def get_timestamp(self) -> int: + """Returns the current timestamp. The function must return an + integer. + """ + return int(time.time()) + + def timestamp_to_datetime(self, ts: int) -> datetime: + """Convert the timestamp from :meth:`get_timestamp` into an + aware :class`datetime.datetime` in UTC. + + .. versionchanged:: 2.0 + The timestamp is returned as a timezone-aware ``datetime`` + in UTC rather than a naive ``datetime`` assumed to be UTC. + """ + return datetime.fromtimestamp(ts, tz=timezone.utc) + + def sign(self, value: _t_str_bytes) -> bytes: + """Signs the given string and also attaches time information.""" + value = want_bytes(value) + timestamp = base64_encode(int_to_bytes(self.get_timestamp())) + sep = want_bytes(self.sep) + value = value + sep + timestamp + return value + sep + self.get_signature(value) + + # Ignore overlapping signatures check, return_timestamp is the only + # parameter that affects the return type. + + @typing.overload + def unsign( # type: ignore + self, + signed_value: _t_str_bytes, + max_age: _t_opt_int = None, + return_timestamp: "_te.Literal[False]" = False, + ) -> bytes: + ... + + @typing.overload + def unsign( + self, + signed_value: _t_str_bytes, + max_age: _t_opt_int = None, + return_timestamp: "_te.Literal[True]" = True, + ) -> _t.Tuple[bytes, datetime]: + ... + + def unsign( + self, + signed_value: _t_str_bytes, + max_age: _t_opt_int = None, + return_timestamp: bool = False, + ) -> _t.Union[_t.Tuple[bytes, datetime], bytes]: + """Works like the regular :meth:`.Signer.unsign` but can also + validate the time. See the base docstring of the class for + the general behavior. If ``return_timestamp`` is ``True`` the + timestamp of the signature will be returned as an aware + :class:`datetime.datetime` object in UTC. + + .. versionchanged:: 2.0 + The timestamp is returned as a timezone-aware ``datetime`` + in UTC rather than a naive ``datetime`` assumed to be UTC. + """ + try: + result = super().unsign(signed_value) + sig_error = None + except BadSignature as e: + sig_error = e + result = e.payload or b"" + + sep = want_bytes(self.sep) + + # If there is no timestamp in the result there is something + # seriously wrong. In case there was a signature error, we raise + # that one directly, otherwise we have a weird situation in + # which we shouldn't have come except someone uses a time-based + # serializer on non-timestamp data, so catch that. + if sep not in result: + if sig_error: + raise sig_error + + raise BadTimeSignature("timestamp missing", payload=result) + + value, ts_bytes = result.rsplit(sep, 1) + ts_int: _t_opt_int = None + ts_dt: _t.Optional[datetime] = None + + try: + ts_int = bytes_to_int(base64_decode(ts_bytes)) + except Exception: + pass + + # Signature is *not* okay. Raise a proper error now that we have + # split the value and the timestamp. + if sig_error is not None: + if ts_int is not None: + ts_dt = self.timestamp_to_datetime(ts_int) + + raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt) + + # Signature was okay but the timestamp is actually not there or + # malformed. Should not happen, but we handle it anyway. + if ts_int is None: + raise BadTimeSignature("Malformed timestamp", payload=value) + + # Check timestamp is not older than max_age + if max_age is not None: + age = self.get_timestamp() - ts_int + + if age > max_age: + raise SignatureExpired( + f"Signature age {age} > {max_age} seconds", + payload=value, + date_signed=self.timestamp_to_datetime(ts_int), + ) + + if age < 0: + raise SignatureExpired( + f"Signature age {age} < 0 seconds", + payload=value, + date_signed=self.timestamp_to_datetime(ts_int), + ) + + if return_timestamp: + return value, self.timestamp_to_datetime(ts_int) + + return value + + def validate(self, signed_value: _t_str_bytes, max_age: _t_opt_int = None) -> bool: + """Only validates the given signed value. Returns ``True`` if + the signature exists and is valid.""" + try: + self.unsign(signed_value, max_age=max_age) + return True + except BadSignature: + return False + + +class TimedSerializer(Serializer): + """Uses :class:`TimestampSigner` instead of the default + :class:`.Signer`. + """ + + default_signer: _t.Type[TimestampSigner] = TimestampSigner + + def iter_unsigners( + self, salt: _t_opt_str_bytes = None + ) -> _t.Iterator[TimestampSigner]: + return _t.cast("_t.Iterator[TimestampSigner]", super().iter_unsigners(salt)) + + # TODO: Signature is incompatible because parameters were added + # before salt. + + def loads( # type: ignore + self, + s: _t_str_bytes, + max_age: _t_opt_int = None, + return_timestamp: bool = False, + salt: _t_opt_str_bytes = None, + ) -> _t.Any: + """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the + signature validation fails. If a ``max_age`` is provided it will + ensure the signature is not older than that time in seconds. In + case the signature is outdated, :exc:`.SignatureExpired` is + raised. All arguments are forwarded to the signer's + :meth:`~TimestampSigner.unsign` method. + """ + s = want_bytes(s) + last_exception = None + + for signer in self.iter_unsigners(salt): + try: + base64d, timestamp = signer.unsign( + s, max_age=max_age, return_timestamp=True + ) + payload = self.load_payload(base64d) + + if return_timestamp: + return payload, timestamp + + return payload + except SignatureExpired: + # The signature was unsigned successfully but was + # expired. Do not try the next signer. + raise + except BadSignature as err: + last_exception = err + + raise _t.cast(BadSignature, last_exception) + + def loads_unsafe( # type: ignore + self, + s: _t_str_bytes, + max_age: _t_opt_int = None, + salt: _t_opt_str_bytes = None, + ) -> _t.Tuple[bool, _t.Any]: + return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age}) diff --git a/debian/vendor/itsdangerous/url_safe.py b/debian/vendor/itsdangerous/url_safe.py new file mode 100644 index 0000000..f76fa24 --- /dev/null +++ b/debian/vendor/itsdangerous/url_safe.py @@ -0,0 +1,80 @@ +import typing as _t +import zlib + +from ._json import _CompactJSON +from .encoding import base64_decode +from .encoding import base64_encode +from .exc import BadPayload +from .serializer import Serializer +from .timed import TimedSerializer + + +class URLSafeSerializerMixin(Serializer): + """Mixed in with a regular serializer it will attempt to zlib + compress the string to make it shorter if necessary. It will also + base64 encode the string so that it can safely be placed in a URL. + """ + + default_serializer = _CompactJSON + + def load_payload( + self, + payload: bytes, + *args: _t.Any, + serializer: _t.Optional[_t.Any] = None, + **kwargs: _t.Any, + ) -> _t.Any: + decompress = False + + if payload.startswith(b"."): + payload = payload[1:] + decompress = True + + try: + json = base64_decode(payload) + except Exception as e: + raise BadPayload( + "Could not base64 decode the payload because of an exception", + original_error=e, + ) + + if decompress: + try: + json = zlib.decompress(json) + except Exception as e: + raise BadPayload( + "Could not zlib decompress the payload before decoding the payload", + original_error=e, + ) + + return super().load_payload(json, *args, **kwargs) + + def dump_payload(self, obj: _t.Any) -> bytes: + json = super().dump_payload(obj) + is_compressed = False + compressed = zlib.compress(json) + + if len(compressed) < (len(json) - 1): + json = compressed + is_compressed = True + + base64d = base64_encode(json) + + if is_compressed: + base64d = b"." + base64d + + return base64d + + +class URLSafeSerializer(URLSafeSerializerMixin, Serializer): + """Works like :class:`.Serializer` but dumps and loads into a URL + safe string consisting of the upper and lowercase character of the + alphabet as well as ``'_'``, ``'-'`` and ``'.'``. + """ + + +class URLSafeTimedSerializer(URLSafeSerializerMixin, TimedSerializer): + """Works like :class:`.TimedSerializer` but dumps and loads into a + URL safe string consisting of the upper and lowercase character of + the alphabet as well as ``'_'``, ``'-'`` and ``'.'``. + """