Add embedded itsdangerous
faraday is not ready for versions >=2.1.*
see https://github.com/infobyte/faraday/issues/446
Sophie Brun
2 years ago
0 | python-faraday (3.19.0-0kali2) kali-dev; urgency=medium | |
1 | ||
2 | * Add embedded itsdangerous | |
3 | ||
4 | -- Sophie Brun <[email protected]> Wed, 13 Apr 2022 17:26:50 +0200 | |
5 | ||
0 | 6 | python-faraday (3.19.0-0kali1) kali-dev; urgency=medium |
1 | 7 | |
2 | 8 | * New upstream version 3.19.0 |
1 | 1 | Date: Mon, 6 Dec 2021 17:30:46 +0100 |
2 | 2 | Subject: Add usr/lib/python3/dist-packages/faraday/vendor in import path |
3 | 3 | |
4 | Last-Update: 2021-12-07 | |
4 | Last-Update: 2022-04-13 | |
5 | 5 | Add usr/lib/python3/dist-packages/faraday/vendor in PYTHONPATH to use |
6 | the embedded version instead of the packages version of SQLalchemy. | |
6 | the embedded version instead of the packages version of SQLalchemy and | |
7 | itsdangerous. | |
7 | 8 | --- |
8 | faraday/manage.py | 2 ++ | |
9 | faraday/searcher/searcher.py | 2 ++ | |
10 | faraday/start_server.py | 2 ++ | |
11 | 3 files changed, 6 insertions(+) | |
9 | faraday/manage.py | 2 ++ | |
10 | faraday/searcher/searcher.py | 2 ++ | |
11 | faraday/server/api/modules/token.py | 3 +++ | |
12 | faraday/server/api/modules/websocket_auth.py | 3 +++ | |
13 | faraday/server/app.py | 3 +++ | |
14 | faraday/server/websocket_factories.py | 3 +++ | |
15 | faraday/start_server.py | 2 ++ | |
16 | tests/conftest.py | 3 +++ | |
17 | tests/test_api_login.py | 3 +++ | |
18 | 9 files changed, 24 insertions(+) | |
12 | 19 | |
13 | 20 | diff --git a/faraday/manage.py b/faraday/manage.py |
14 | 21 | index 47b3347..7f3a531 100755 |
36 | 43 | from faraday.searcher.api import Api |
37 | 44 | from faraday.searcher.validator import validate_rules |
38 | 45 | from faraday.server.models import Service, Host |
46 | diff --git a/faraday/server/api/modules/token.py b/faraday/server/api/modules/token.py | |
47 | index bb656cb..ccd0e2b 100644 | |
48 | --- a/faraday/server/api/modules/token.py | |
49 | +++ b/faraday/server/api/modules/token.py | |
50 | @@ -1,6 +1,9 @@ | |
51 | import datetime | |
52 | import logging | |
53 | ||
54 | +import sys | |
55 | +sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') | |
56 | + | |
57 | from itsdangerous import TimedJSONWebSignatureSerializer | |
58 | from flask import Blueprint, request | |
59 | from flask_security.utils import hash_data | |
60 | diff --git a/faraday/server/api/modules/websocket_auth.py b/faraday/server/api/modules/websocket_auth.py | |
61 | index 879ba12..f3d07a6 100644 | |
62 | --- a/faraday/server/api/modules/websocket_auth.py | |
63 | +++ b/faraday/server/api/modules/websocket_auth.py | |
64 | @@ -1,6 +1,9 @@ | |
65 | # Faraday Penetration Test IDE | |
66 | # Copyright (C) 2016 Infobyte LLC (http://www.infobytesec.com/) | |
67 | # See the file 'doc/LICENSE' for the license information | |
68 | +import sys | |
69 | +sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') | |
70 | + | |
71 | import logging | |
72 | import flask | |
73 | from flask import Blueprint | |
74 | diff --git a/faraday/server/app.py b/faraday/server/app.py | |
75 | index 11c8371..9823804 100644 | |
76 | --- a/faraday/server/app.py | |
77 | +++ b/faraday/server/app.py | |
78 | @@ -8,6 +8,9 @@ import datetime | |
79 | import bleach | |
80 | import pyotp | |
81 | import requests | |
82 | +import sys | |
83 | +sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') | |
84 | + | |
85 | from flask_limiter import Limiter | |
86 | from flask_limiter.util import get_remote_address | |
87 | from itsdangerous import TimedJSONWebSignatureSerializer, SignatureExpired, BadSignature | |
88 | diff --git a/faraday/server/websocket_factories.py b/faraday/server/websocket_factories.py | |
89 | index 93aaaa5..bc433bc 100644 | |
90 | --- a/faraday/server/websocket_factories.py | |
91 | +++ b/faraday/server/websocket_factories.py | |
92 | @@ -4,6 +4,9 @@ Copyright (C) 2013 Infobyte LLC (http://www.infobytesec.com/) | |
93 | See the file 'doc/LICENSE' for the license information | |
94 | ||
95 | """ | |
96 | +import sys | |
97 | +sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') | |
98 | + | |
99 | import json | |
100 | import logging | |
101 | import itsdangerous | |
39 | 102 | diff --git a/faraday/start_server.py b/faraday/start_server.py |
40 | 103 | index 4ab2c68..a29b855 100644 |
41 | 104 | --- a/faraday/start_server.py |
49 | 112 | import psycopg2 |
50 | 113 | from alembic.runtime.migration import MigrationContext |
51 | 114 | |
115 | diff --git a/tests/conftest.py b/tests/conftest.py | |
116 | index be5e98c..2e8daa2 100644 | |
117 | --- a/tests/conftest.py | |
118 | +++ b/tests/conftest.py | |
119 | @@ -17,6 +17,9 @@ from pathlib import Path | |
120 | from pytest_factoryboy import register | |
121 | from sqlalchemy import event | |
122 | ||
123 | +import sys | |
124 | +sys.path.insert(1, 'debian/vendor') | |
125 | + | |
126 | from faraday.server.app import create_app | |
127 | from faraday.server.models import db | |
128 | from tests import factories | |
129 | diff --git a/tests/test_api_login.py b/tests/test_api_login.py | |
130 | index c4173da..24a8d96 100644 | |
131 | --- a/tests/test_api_login.py | |
132 | +++ b/tests/test_api_login.py | |
133 | @@ -1,5 +1,8 @@ | |
134 | import pytest | |
135 | from flask_security.utils import hash_password | |
136 | +import sys | |
137 | + | |
138 | +sys.path.insert(1, '/usr/lib/python3/dist-packages/faraday/vendor') | |
139 | from itsdangerous import TimedJSONWebSignatureSerializer | |
140 | ||
141 | from faraday.server.models import User |
17 | 17 | # remove unwanted files: .gitignore and useless README |
18 | 18 | rm debian/faraday/usr/lib/python3*/dist-packages/faraday/migrations/versions/.gitignore |
19 | 19 | rm debian/faraday/usr/lib/python3*/dist-packages/faraday/migrations/README |
20 | # copy vendor to embed sqlalchemy | |
20 | # copy vendor to embed sqlalchemy and other Python modules | |
21 | 21 | cp -r debian/vendor debian/faraday/usr/lib/python3*/dist-packages/faraday/ |
22 | 22 | |
23 | 23 | override_dh_installchangelogs: |
0 | debian/vendor/itsdangerous/docs/_static/itsdangerous-logo.png | |
1 | debian/vendor/itsdangerous/docs/_static/itsdangerous-logo-sidebar.png |
0 | from ._json import json | |
1 | from .encoding import base64_decode as base64_decode | |
2 | from .encoding import base64_encode as base64_encode | |
3 | from .encoding import want_bytes as want_bytes | |
4 | from .exc import BadData as BadData | |
5 | from .exc import BadHeader as BadHeader | |
6 | from .exc import BadPayload as BadPayload | |
7 | from .exc import BadSignature as BadSignature | |
8 | from .exc import BadTimeSignature as BadTimeSignature | |
9 | from .exc import SignatureExpired as SignatureExpired | |
10 | from .jws import JSONWebSignatureSerializer | |
11 | from .jws import TimedJSONWebSignatureSerializer | |
12 | from .serializer import Serializer as Serializer | |
13 | from .signer import HMACAlgorithm as HMACAlgorithm | |
14 | from .signer import NoneAlgorithm as NoneAlgorithm | |
15 | from .signer import Signer as Signer | |
16 | from .timed import TimedSerializer as TimedSerializer | |
17 | from .timed import TimestampSigner as TimestampSigner | |
18 | from .url_safe import URLSafeSerializer as URLSafeSerializer | |
19 | from .url_safe import URLSafeTimedSerializer as URLSafeTimedSerializer | |
20 | ||
21 | __version__ = "2.0.1" |
0 | import json as _json | |
1 | import typing as _t | |
2 | from types import ModuleType | |
3 | ||
4 | ||
5 | class _CompactJSON: | |
6 | """Wrapper around json module that strips whitespace.""" | |
7 | ||
8 | @staticmethod | |
9 | def loads(payload: _t.Union[str, bytes]) -> _t.Any: | |
10 | return _json.loads(payload) | |
11 | ||
12 | @staticmethod | |
13 | def dumps(obj: _t.Any, **kwargs: _t.Any) -> str: | |
14 | kwargs.setdefault("ensure_ascii", False) | |
15 | kwargs.setdefault("separators", (",", ":")) | |
16 | return _json.dumps(obj, **kwargs) | |
17 | ||
18 | ||
19 | class DeprecatedJSON(ModuleType): | |
20 | def __getattribute__(self, item: str) -> _t.Any: | |
21 | import warnings | |
22 | ||
23 | warnings.warn( | |
24 | "Importing 'itsdangerous.json' is deprecated and will be" | |
25 | " removed in ItsDangerous 2.1. Use Python's 'json' module" | |
26 | " instead.", | |
27 | DeprecationWarning, | |
28 | stacklevel=2, | |
29 | ) | |
30 | return getattr(_json, item) | |
31 | ||
32 | ||
33 | json = DeprecatedJSON("json") |
0 | import base64 | |
1 | import string | |
2 | import struct | |
3 | import typing as _t | |
4 | ||
5 | from .exc import BadData | |
6 | ||
7 | _t_str_bytes = _t.Union[str, bytes] | |
8 | ||
9 | ||
10 | def want_bytes( | |
11 | s: _t_str_bytes, encoding: str = "utf-8", errors: str = "strict" | |
12 | ) -> bytes: | |
13 | if isinstance(s, str): | |
14 | s = s.encode(encoding, errors) | |
15 | ||
16 | return s | |
17 | ||
18 | ||
19 | def base64_encode(string: _t_str_bytes) -> bytes: | |
20 | """Base64 encode a string of bytes or text. The resulting bytes are | |
21 | safe to use in URLs. | |
22 | """ | |
23 | string = want_bytes(string) | |
24 | return base64.urlsafe_b64encode(string).rstrip(b"=") | |
25 | ||
26 | ||
27 | def base64_decode(string: _t_str_bytes) -> bytes: | |
28 | """Base64 decode a URL-safe string of bytes or text. The result is | |
29 | bytes. | |
30 | """ | |
31 | string = want_bytes(string, encoding="ascii", errors="ignore") | |
32 | string += b"=" * (-len(string) % 4) | |
33 | ||
34 | try: | |
35 | return base64.urlsafe_b64decode(string) | |
36 | except (TypeError, ValueError): | |
37 | raise BadData("Invalid base64-encoded data") | |
38 | ||
39 | ||
40 | # The alphabet used by base64.urlsafe_* | |
41 | _base64_alphabet = f"{string.ascii_letters}{string.digits}-_=".encode("ascii") | |
42 | ||
43 | _int64_struct = struct.Struct(">Q") | |
44 | _int_to_bytes = _int64_struct.pack | |
45 | _bytes_to_int = _t.cast("_t.Callable[[bytes], _t.Tuple[int]]", _int64_struct.unpack) | |
46 | ||
47 | ||
48 | def int_to_bytes(num: int) -> bytes: | |
49 | return _int_to_bytes(num).lstrip(b"\x00") | |
50 | ||
51 | ||
52 | def bytes_to_int(bytestr: bytes) -> int: | |
53 | return _bytes_to_int(bytestr.rjust(8, b"\x00"))[0] |
0 | import typing as _t | |
1 | from datetime import datetime | |
2 | ||
3 | _t_opt_any = _t.Optional[_t.Any] | |
4 | _t_opt_exc = _t.Optional[Exception] | |
5 | ||
6 | ||
7 | class BadData(Exception): | |
8 | """Raised if bad data of any sort was encountered. This is the base | |
9 | for all exceptions that ItsDangerous defines. | |
10 | ||
11 | .. versionadded:: 0.15 | |
12 | """ | |
13 | ||
14 | def __init__(self, message: str): | |
15 | super().__init__(message) | |
16 | self.message = message | |
17 | ||
18 | def __str__(self) -> str: | |
19 | return self.message | |
20 | ||
21 | ||
22 | class BadSignature(BadData): | |
23 | """Raised if a signature does not match.""" | |
24 | ||
25 | def __init__(self, message: str, payload: _t_opt_any = None): | |
26 | super().__init__(message) | |
27 | ||
28 | #: The payload that failed the signature test. In some | |
29 | #: situations you might still want to inspect this, even if | |
30 | #: you know it was tampered with. | |
31 | #: | |
32 | #: .. versionadded:: 0.14 | |
33 | self.payload: _t_opt_any = payload | |
34 | ||
35 | ||
36 | class BadTimeSignature(BadSignature): | |
37 | """Raised if a time-based signature is invalid. This is a subclass | |
38 | of :class:`BadSignature`. | |
39 | """ | |
40 | ||
41 | def __init__( | |
42 | self, | |
43 | message: str, | |
44 | payload: _t_opt_any = None, | |
45 | date_signed: _t.Optional[datetime] = None, | |
46 | ): | |
47 | super().__init__(message, payload) | |
48 | ||
49 | #: If the signature expired this exposes the date of when the | |
50 | #: signature was created. This can be helpful in order to | |
51 | #: tell the user how long a link has been gone stale. | |
52 | #: | |
53 | #: .. versionchanged:: 2.0 | |
54 | #: The datetime value is timezone-aware rather than naive. | |
55 | #: | |
56 | #: .. versionadded:: 0.14 | |
57 | self.date_signed = date_signed | |
58 | ||
59 | ||
60 | class SignatureExpired(BadTimeSignature): | |
61 | """Raised if a signature timestamp is older than ``max_age``. This | |
62 | is a subclass of :exc:`BadTimeSignature`. | |
63 | """ | |
64 | ||
65 | ||
66 | class BadHeader(BadSignature): | |
67 | """Raised if a signed header is invalid in some form. This only | |
68 | happens for serializers that have a header that goes with the | |
69 | signature. | |
70 | ||
71 | .. versionadded:: 0.24 | |
72 | """ | |
73 | ||
74 | def __init__( | |
75 | self, | |
76 | message: str, | |
77 | payload: _t_opt_any = None, | |
78 | header: _t_opt_any = None, | |
79 | original_error: _t_opt_exc = None, | |
80 | ): | |
81 | super().__init__(message, payload) | |
82 | ||
83 | #: If the header is actually available but just malformed it | |
84 | #: might be stored here. | |
85 | self.header: _t_opt_any = header | |
86 | ||
87 | #: If available, the error that indicates why the payload was | |
88 | #: not valid. This might be ``None``. | |
89 | self.original_error: _t_opt_exc = original_error | |
90 | ||
91 | ||
92 | class BadPayload(BadData): | |
93 | """Raised if a payload is invalid. This could happen if the payload | |
94 | is loaded despite an invalid signature, or if there is a mismatch | |
95 | between the serializer and deserializer. The original exception | |
96 | that occurred during loading is stored on as :attr:`original_error`. | |
97 | ||
98 | .. versionadded:: 0.15 | |
99 | """ | |
100 | ||
101 | def __init__(self, message: str, original_error: _t_opt_exc = None): | |
102 | super().__init__(message) | |
103 | ||
104 | #: If available, the error that indicates why the payload was | |
105 | #: not valid. This might be ``None``. | |
106 | self.original_error: _t_opt_exc = original_error |
0 | import hashlib | |
1 | import time | |
2 | import warnings | |
3 | from datetime import datetime | |
4 | from datetime import timezone | |
5 | from decimal import Decimal | |
6 | from numbers import Real | |
7 | ||
8 | from ._json import _CompactJSON | |
9 | from .encoding import base64_decode | |
10 | from .encoding import base64_encode | |
11 | from .encoding import want_bytes | |
12 | from .exc import BadData | |
13 | from .exc import BadHeader | |
14 | from .exc import BadPayload | |
15 | from .exc import BadSignature | |
16 | from .exc import SignatureExpired | |
17 | from .serializer import Serializer | |
18 | from .signer import HMACAlgorithm | |
19 | from .signer import NoneAlgorithm | |
20 | ||
21 | ||
22 | class JSONWebSignatureSerializer(Serializer): | |
23 | """This serializer implements JSON Web Signature (JWS) support. Only | |
24 | supports the JWS Compact Serialization. | |
25 | ||
26 | .. deprecated:: 2.0 | |
27 | Will be removed in ItsDangerous 2.1. Use a dedicated library | |
28 | such as authlib. | |
29 | """ | |
30 | ||
31 | jws_algorithms = { | |
32 | "HS256": HMACAlgorithm(hashlib.sha256), | |
33 | "HS384": HMACAlgorithm(hashlib.sha384), | |
34 | "HS512": HMACAlgorithm(hashlib.sha512), | |
35 | "none": NoneAlgorithm(), | |
36 | } | |
37 | ||
38 | #: The default algorithm to use for signature generation | |
39 | default_algorithm = "HS512" | |
40 | ||
41 | default_serializer = _CompactJSON | |
42 | ||
43 | def __init__( | |
44 | self, | |
45 | secret_key, | |
46 | salt=None, | |
47 | serializer=None, | |
48 | serializer_kwargs=None, | |
49 | signer=None, | |
50 | signer_kwargs=None, | |
51 | algorithm_name=None, | |
52 | ): | |
53 | warnings.warn( | |
54 | "JWS support is deprecated and will be removed in" | |
55 | " ItsDangerous 2.1. Use a dedicated JWS/JWT library such as" | |
56 | " authlib.", | |
57 | DeprecationWarning, | |
58 | stacklevel=2, | |
59 | ) | |
60 | super().__init__( | |
61 | secret_key, | |
62 | salt=salt, | |
63 | serializer=serializer, | |
64 | serializer_kwargs=serializer_kwargs, | |
65 | signer=signer, | |
66 | signer_kwargs=signer_kwargs, | |
67 | ) | |
68 | ||
69 | if algorithm_name is None: | |
70 | algorithm_name = self.default_algorithm | |
71 | ||
72 | self.algorithm_name = algorithm_name | |
73 | self.algorithm = self.make_algorithm(algorithm_name) | |
74 | ||
75 | def load_payload(self, payload, serializer=None, return_header=False): | |
76 | payload = want_bytes(payload) | |
77 | ||
78 | if b"." not in payload: | |
79 | raise BadPayload('No "." found in value') | |
80 | ||
81 | base64d_header, base64d_payload = payload.split(b".", 1) | |
82 | ||
83 | try: | |
84 | json_header = base64_decode(base64d_header) | |
85 | except Exception as e: | |
86 | raise BadHeader( | |
87 | "Could not base64 decode the header because of an exception", | |
88 | original_error=e, | |
89 | ) | |
90 | ||
91 | try: | |
92 | json_payload = base64_decode(base64d_payload) | |
93 | except Exception as e: | |
94 | raise BadPayload( | |
95 | "Could not base64 decode the payload because of an exception", | |
96 | original_error=e, | |
97 | ) | |
98 | ||
99 | try: | |
100 | header = super().load_payload(json_header, serializer=_CompactJSON) | |
101 | except BadData as e: | |
102 | raise BadHeader( | |
103 | "Could not unserialize header because it was malformed", | |
104 | original_error=e, | |
105 | ) | |
106 | ||
107 | if not isinstance(header, dict): | |
108 | raise BadHeader("Header payload is not a JSON object", header=header) | |
109 | ||
110 | payload = super().load_payload(json_payload, serializer=serializer) | |
111 | ||
112 | if return_header: | |
113 | return payload, header | |
114 | ||
115 | return payload | |
116 | ||
117 | def dump_payload(self, header, obj): | |
118 | base64d_header = base64_encode( | |
119 | self.serializer.dumps(header, **self.serializer_kwargs) | |
120 | ) | |
121 | base64d_payload = base64_encode( | |
122 | self.serializer.dumps(obj, **self.serializer_kwargs) | |
123 | ) | |
124 | return base64d_header + b"." + base64d_payload | |
125 | ||
126 | def make_algorithm(self, algorithm_name): | |
127 | try: | |
128 | return self.jws_algorithms[algorithm_name] | |
129 | except KeyError: | |
130 | raise NotImplementedError("Algorithm not supported") | |
131 | ||
132 | def make_signer(self, salt=None, algorithm=None): | |
133 | if salt is None: | |
134 | salt = self.salt | |
135 | ||
136 | key_derivation = "none" if salt is None else None | |
137 | ||
138 | if algorithm is None: | |
139 | algorithm = self.algorithm | |
140 | ||
141 | return self.signer( | |
142 | self.secret_keys, | |
143 | salt=salt, | |
144 | sep=".", | |
145 | key_derivation=key_derivation, | |
146 | algorithm=algorithm, | |
147 | ) | |
148 | ||
149 | def make_header(self, header_fields): | |
150 | header = header_fields.copy() if header_fields else {} | |
151 | header["alg"] = self.algorithm_name | |
152 | return header | |
153 | ||
154 | def dumps(self, obj, salt=None, header_fields=None): | |
155 | """Like :meth:`.Serializer.dumps` but creates a JSON Web | |
156 | Signature. It also allows for specifying additional fields to be | |
157 | included in the JWS header. | |
158 | """ | |
159 | header = self.make_header(header_fields) | |
160 | signer = self.make_signer(salt, self.algorithm) | |
161 | return signer.sign(self.dump_payload(header, obj)) | |
162 | ||
163 | def loads(self, s, salt=None, return_header=False): | |
164 | """Reverse of :meth:`dumps`. If requested via ``return_header`` | |
165 | it will return a tuple of payload and header. | |
166 | """ | |
167 | payload, header = self.load_payload( | |
168 | self.make_signer(salt, self.algorithm).unsign(want_bytes(s)), | |
169 | return_header=True, | |
170 | ) | |
171 | ||
172 | if header.get("alg") != self.algorithm_name: | |
173 | raise BadHeader("Algorithm mismatch", header=header, payload=payload) | |
174 | ||
175 | if return_header: | |
176 | return payload, header | |
177 | ||
178 | return payload | |
179 | ||
180 | def loads_unsafe(self, s, salt=None, return_header=False): | |
181 | kwargs = {"return_header": return_header} | |
182 | return self._loads_unsafe_impl(s, salt, kwargs, kwargs) | |
183 | ||
184 | ||
185 | class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer): | |
186 | """Works like the regular :class:`JSONWebSignatureSerializer` but | |
187 | also records the time of the signing and can be used to expire | |
188 | signatures. | |
189 | ||
190 | JWS currently does not specify this behavior but it mentions a | |
191 | possible extension like this in the spec. Expiry date is encoded | |
192 | into the header similar to what's specified in `draft-ietf-oauth | |
193 | -json-web-token <http://self-issued.info/docs/draft-ietf-oauth-json | |
194 | -web-token.html#expDef>`_. | |
195 | """ | |
196 | ||
197 | DEFAULT_EXPIRES_IN = 3600 | |
198 | ||
199 | def __init__(self, secret_key, expires_in=None, **kwargs): | |
200 | super().__init__(secret_key, **kwargs) | |
201 | ||
202 | if expires_in is None: | |
203 | expires_in = self.DEFAULT_EXPIRES_IN | |
204 | ||
205 | self.expires_in = expires_in | |
206 | ||
207 | def make_header(self, header_fields): | |
208 | header = super().make_header(header_fields) | |
209 | iat = self.now() | |
210 | exp = iat + self.expires_in | |
211 | header["iat"] = iat | |
212 | header["exp"] = exp | |
213 | return header | |
214 | ||
215 | def loads(self, s, salt=None, return_header=False): | |
216 | payload, header = super().loads(s, salt, return_header=True) | |
217 | ||
218 | if "exp" not in header: | |
219 | raise BadSignature("Missing expiry date", payload=payload) | |
220 | ||
221 | int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload) | |
222 | ||
223 | try: | |
224 | header["exp"] = int(header["exp"]) | |
225 | except ValueError: | |
226 | raise int_date_error | |
227 | ||
228 | if header["exp"] < 0: | |
229 | raise int_date_error | |
230 | ||
231 | if header["exp"] < self.now(): | |
232 | raise SignatureExpired( | |
233 | "Signature expired", | |
234 | payload=payload, | |
235 | date_signed=self.get_issue_date(header), | |
236 | ) | |
237 | ||
238 | if return_header: | |
239 | return payload, header | |
240 | ||
241 | return payload | |
242 | ||
243 | def get_issue_date(self, header): | |
244 | """If the header contains the ``iat`` field, return the date the | |
245 | signature was issued, as a timezone-aware | |
246 | :class:`datetime.datetime` in UTC. | |
247 | ||
248 | .. versionchanged:: 2.0 | |
249 | The timestamp is returned as a timezone-aware ``datetime`` | |
250 | in UTC rather than a naive ``datetime`` assumed to be UTC. | |
251 | """ | |
252 | rv = header.get("iat") | |
253 | ||
254 | if isinstance(rv, (Real, Decimal)): | |
255 | return datetime.fromtimestamp(int(rv), tz=timezone.utc) | |
256 | ||
257 | def now(self): | |
258 | return int(time.time()) |
0 | import json | |
1 | import typing as _t | |
2 | ||
3 | from .encoding import want_bytes | |
4 | from .exc import BadPayload | |
5 | from .exc import BadSignature | |
6 | from .signer import _make_keys_list | |
7 | from .signer import Signer | |
8 | ||
9 | _t_str_bytes = _t.Union[str, bytes] | |
10 | _t_opt_str_bytes = _t.Optional[_t_str_bytes] | |
11 | _t_kwargs = _t.Dict[str, _t.Any] | |
12 | _t_opt_kwargs = _t.Optional[_t_kwargs] | |
13 | _t_signer = _t.Type[Signer] | |
14 | _t_fallbacks = _t.List[_t.Union[_t_kwargs, _t.Tuple[_t_signer, _t_kwargs], _t_signer]] | |
15 | _t_load_unsafe = _t.Tuple[bool, _t.Any] | |
16 | _t_secret_key = _t.Union[_t.Iterable[_t_str_bytes], _t_str_bytes] | |
17 | ||
18 | ||
19 | def is_text_serializer(serializer: _t.Any) -> bool: | |
20 | """Checks whether a serializer generates text or binary.""" | |
21 | return isinstance(serializer.dumps({}), str) | |
22 | ||
23 | ||
24 | class Serializer: | |
25 | """A serializer wraps a :class:`~itsdangerous.signer.Signer` to | |
26 | enable serializing and securely signing data other than bytes. It | |
27 | can unsign to verify that the data hasn't been changed. | |
28 | ||
29 | The serializer provides :meth:`dumps` and :meth:`loads`, similar to | |
30 | :mod:`json`, and by default uses :mod:`json` internally to serialize | |
31 | the data to bytes. | |
32 | ||
33 | The secret key should be a random string of ``bytes`` and should not | |
34 | be saved to code or version control. Different salts should be used | |
35 | to distinguish signing in different contexts. See :doc:`/concepts` | |
36 | for information about the security of the secret key and salt. | |
37 | ||
38 | :param secret_key: The secret key to sign and verify with. Can be a | |
39 | list of keys, oldest to newest, to support key rotation. | |
40 | :param salt: Extra key to combine with ``secret_key`` to distinguish | |
41 | signatures in different contexts. | |
42 | :param serializer: An object that provides ``dumps`` and ``loads`` | |
43 | methods for serializing data to a string. Defaults to | |
44 | :attr:`default_serializer`, which defaults to :mod:`json`. | |
45 | :param serializer_kwargs: Keyword arguments to pass when calling | |
46 | ``serializer.dumps``. | |
47 | :param signer: A ``Signer`` class to instantiate when signing data. | |
48 | Defaults to :attr:`default_signer`, which defaults to | |
49 | :class:`~itsdangerous.signer.Signer`. | |
50 | :param signer_kwargs: Keyword arguments to pass when instantiating | |
51 | the ``Signer`` class. | |
52 | :param fallback_signers: List of signer parameters to try when | |
53 | unsigning with the default signer fails. Each item can be a dict | |
54 | of ``signer_kwargs``, a ``Signer`` class, or a tuple of | |
55 | ``(signer, signer_kwargs)``. Defaults to | |
56 | :attr:`default_fallback_signers`. | |
57 | ||
58 | .. versionchanged:: 2.0 | |
59 | Added support for key rotation by passing a list to | |
60 | ``secret_key``. | |
61 | ||
62 | .. versionchanged:: 2.0 | |
63 | Removed the default SHA-512 fallback signer from | |
64 | ``default_fallback_signers``. | |
65 | ||
66 | .. versionchanged:: 1.1 | |
67 | Added support for ``fallback_signers`` and configured a default | |
68 | SHA-512 fallback. This fallback is for users who used the yanked | |
69 | 1.0.0 release which defaulted to SHA-512. | |
70 | ||
71 | .. versionchanged:: 0.14 | |
72 | The ``signer`` and ``signer_kwargs`` parameters were added to | |
73 | the constructor. | |
74 | """ | |
75 | ||
76 | #: The default serialization module to use to serialize data to a | |
77 | #: string internally. The default is :mod:`json`, but can be changed | |
78 | #: to any object that provides ``dumps`` and ``loads`` methods. | |
79 | default_serializer: _t.Any = json | |
80 | ||
81 | #: The default ``Signer`` class to instantiate when signing data. | |
82 | #: The default is :class:`itsdangerous.signer.Signer`. | |
83 | default_signer: _t_signer = Signer | |
84 | ||
85 | #: The default fallback signers to try when unsigning fails. | |
86 | default_fallback_signers: _t_fallbacks = [] | |
87 | ||
88 | def __init__( | |
89 | self, | |
90 | secret_key: _t_secret_key, | |
91 | salt: _t_opt_str_bytes = b"itsdangerous", | |
92 | serializer: _t.Any = None, | |
93 | serializer_kwargs: _t_opt_kwargs = None, | |
94 | signer: _t.Optional[_t_signer] = None, | |
95 | signer_kwargs: _t_opt_kwargs = None, | |
96 | fallback_signers: _t.Optional[_t_fallbacks] = None, | |
97 | ): | |
98 | #: The list of secret keys to try for verifying signatures, from | |
99 | #: oldest to newest. The newest (last) key is used for signing. | |
100 | #: | |
101 | #: This allows a key rotation system to keep a list of allowed | |
102 | #: keys and remove expired ones. | |
103 | self.secret_keys: _t.List[bytes] = _make_keys_list(secret_key) | |
104 | ||
105 | if salt is not None: | |
106 | salt = want_bytes(salt) | |
107 | # if salt is None then the signer's default is used | |
108 | ||
109 | self.salt = salt | |
110 | ||
111 | if serializer is None: | |
112 | serializer = self.default_serializer | |
113 | ||
114 | self.serializer: _t.Any = serializer | |
115 | self.is_text_serializer: bool = is_text_serializer(serializer) | |
116 | ||
117 | if signer is None: | |
118 | signer = self.default_signer | |
119 | ||
120 | self.signer: _t_signer = signer | |
121 | self.signer_kwargs: _t_kwargs = signer_kwargs or {} | |
122 | ||
123 | if fallback_signers is None: | |
124 | fallback_signers = list(self.default_fallback_signers or ()) | |
125 | ||
126 | self.fallback_signers: _t_fallbacks = fallback_signers | |
127 | self.serializer_kwargs: _t_kwargs = serializer_kwargs or {} | |
128 | ||
129 | @property | |
130 | def secret_key(self) -> bytes: | |
131 | """The newest (last) entry in the :attr:`secret_keys` list. This | |
132 | is for compatibility from before key rotation support was added. | |
133 | """ | |
134 | return self.secret_keys[-1] | |
135 | ||
136 | def load_payload( | |
137 | self, payload: bytes, serializer: _t.Optional[_t.Any] = None | |
138 | ) -> _t.Any: | |
139 | """Loads the encoded object. This function raises | |
140 | :class:`.BadPayload` if the payload is not valid. The | |
141 | ``serializer`` parameter can be used to override the serializer | |
142 | stored on the class. The encoded ``payload`` should always be | |
143 | bytes. | |
144 | """ | |
145 | if serializer is None: | |
146 | serializer = self.serializer | |
147 | is_text = self.is_text_serializer | |
148 | else: | |
149 | is_text = is_text_serializer(serializer) | |
150 | ||
151 | try: | |
152 | if is_text: | |
153 | return serializer.loads(payload.decode("utf-8")) | |
154 | ||
155 | return serializer.loads(payload) | |
156 | except Exception as e: | |
157 | raise BadPayload( | |
158 | "Could not load the payload because an exception" | |
159 | " occurred on unserializing the data.", | |
160 | original_error=e, | |
161 | ) | |
162 | ||
163 | def dump_payload(self, obj: _t.Any) -> bytes: | |
164 | """Dumps the encoded object. The return value is always bytes. | |
165 | If the internal serializer returns text, the value will be | |
166 | encoded as UTF-8. | |
167 | """ | |
168 | return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs)) | |
169 | ||
170 | def make_signer(self, salt: _t_opt_str_bytes = None) -> Signer: | |
171 | """Creates a new instance of the signer to be used. The default | |
172 | implementation uses the :class:`.Signer` base class. | |
173 | """ | |
174 | if salt is None: | |
175 | salt = self.salt | |
176 | ||
177 | return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs) | |
178 | ||
179 | def iter_unsigners(self, salt: _t_opt_str_bytes = None) -> _t.Iterator[Signer]: | |
180 | """Iterates over all signers to be tried for unsigning. Starts | |
181 | with the configured signer, then constructs each signer | |
182 | specified in ``fallback_signers``. | |
183 | """ | |
184 | if salt is None: | |
185 | salt = self.salt | |
186 | ||
187 | yield self.make_signer(salt) | |
188 | ||
189 | for fallback in self.fallback_signers: | |
190 | if isinstance(fallback, dict): | |
191 | kwargs = fallback | |
192 | fallback = self.signer | |
193 | elif isinstance(fallback, tuple): | |
194 | fallback, kwargs = fallback | |
195 | else: | |
196 | kwargs = self.signer_kwargs | |
197 | ||
198 | for secret_key in self.secret_keys: | |
199 | yield fallback(secret_key, salt=salt, **kwargs) | |
200 | ||
201 | def dumps(self, obj: _t.Any, salt: _t_opt_str_bytes = None) -> _t_str_bytes: | |
202 | """Returns a signed string serialized with the internal | |
203 | serializer. The return value can be either a byte or unicode | |
204 | string depending on the format of the internal serializer. | |
205 | """ | |
206 | payload = want_bytes(self.dump_payload(obj)) | |
207 | rv = self.make_signer(salt).sign(payload) | |
208 | ||
209 | if self.is_text_serializer: | |
210 | return rv.decode("utf-8") | |
211 | ||
212 | return rv | |
213 | ||
214 | def dump(self, obj: _t.Any, f: _t.IO, salt: _t_opt_str_bytes = None) -> None: | |
215 | """Like :meth:`dumps` but dumps into a file. The file handle has | |
216 | to be compatible with what the internal serializer expects. | |
217 | """ | |
218 | f.write(self.dumps(obj, salt)) | |
219 | ||
220 | def loads( | |
221 | self, s: _t_str_bytes, salt: _t_opt_str_bytes = None, **kwargs: _t.Any | |
222 | ) -> _t.Any: | |
223 | """Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the | |
224 | signature validation fails. | |
225 | """ | |
226 | s = want_bytes(s) | |
227 | last_exception = None | |
228 | ||
229 | for signer in self.iter_unsigners(salt): | |
230 | try: | |
231 | return self.load_payload(signer.unsign(s)) | |
232 | except BadSignature as err: | |
233 | last_exception = err | |
234 | ||
235 | raise _t.cast(BadSignature, last_exception) | |
236 | ||
237 | def load(self, f: _t.IO, salt: _t_opt_str_bytes = None) -> _t.Any: | |
238 | """Like :meth:`loads` but loads from a file.""" | |
239 | return self.loads(f.read(), salt) | |
240 | ||
241 | def loads_unsafe( | |
242 | self, s: _t_str_bytes, salt: _t_opt_str_bytes = None | |
243 | ) -> _t_load_unsafe: | |
244 | """Like :meth:`loads` but without verifying the signature. This | |
245 | is potentially very dangerous to use depending on how your | |
246 | serializer works. The return value is ``(signature_valid, | |
247 | payload)`` instead of just the payload. The first item will be a | |
248 | boolean that indicates if the signature is valid. This function | |
249 | never fails. | |
250 | ||
251 | Use it for debugging only and if you know that your serializer | |
252 | module is not exploitable (for example, do not use it with a | |
253 | pickle serializer). | |
254 | ||
255 | .. versionadded:: 0.15 | |
256 | """ | |
257 | return self._loads_unsafe_impl(s, salt) | |
258 | ||
259 | def _loads_unsafe_impl( | |
260 | self, | |
261 | s: _t_str_bytes, | |
262 | salt: _t_opt_str_bytes, | |
263 | load_kwargs: _t_opt_kwargs = None, | |
264 | load_payload_kwargs: _t_opt_kwargs = None, | |
265 | ) -> _t_load_unsafe: | |
266 | """Low level helper function to implement :meth:`loads_unsafe` | |
267 | in serializer subclasses. | |
268 | """ | |
269 | if load_kwargs is None: | |
270 | load_kwargs = {} | |
271 | ||
272 | try: | |
273 | return True, self.loads(s, salt=salt, **load_kwargs) | |
274 | except BadSignature as e: | |
275 | if e.payload is None: | |
276 | return False, None | |
277 | ||
278 | if load_payload_kwargs is None: | |
279 | load_payload_kwargs = {} | |
280 | ||
281 | try: | |
282 | return ( | |
283 | False, | |
284 | self.load_payload(e.payload, **load_payload_kwargs), | |
285 | ) | |
286 | except BadPayload: | |
287 | return False, None | |
288 | ||
289 | def load_unsafe(self, f: _t.IO, salt: _t_opt_str_bytes = None) -> _t_load_unsafe: | |
290 | """Like :meth:`loads_unsafe` but loads from a file. | |
291 | ||
292 | .. versionadded:: 0.15 | |
293 | """ | |
294 | return self.loads_unsafe(f.read(), salt=salt) |
0 | import hashlib | |
1 | import hmac | |
2 | import typing as _t | |
3 | ||
4 | from .encoding import _base64_alphabet | |
5 | from .encoding import base64_decode | |
6 | from .encoding import base64_encode | |
7 | from .encoding import want_bytes | |
8 | from .exc import BadSignature | |
9 | ||
10 | _t_str_bytes = _t.Union[str, bytes] | |
11 | _t_opt_str_bytes = _t.Optional[_t_str_bytes] | |
12 | _t_secret_key = _t.Union[_t.Iterable[_t_str_bytes], _t_str_bytes] | |
13 | ||
14 | ||
15 | class SigningAlgorithm: | |
16 | """Subclasses must implement :meth:`get_signature` to provide | |
17 | signature generation functionality. | |
18 | """ | |
19 | ||
20 | def get_signature(self, key: bytes, value: bytes) -> bytes: | |
21 | """Returns the signature for the given key and value.""" | |
22 | raise NotImplementedError() | |
23 | ||
24 | def verify_signature(self, key: bytes, value: bytes, sig: bytes) -> bool: | |
25 | """Verifies the given signature matches the expected | |
26 | signature. | |
27 | """ | |
28 | return hmac.compare_digest(sig, self.get_signature(key, value)) | |
29 | ||
30 | ||
31 | class NoneAlgorithm(SigningAlgorithm): | |
32 | """Provides an algorithm that does not perform any signing and | |
33 | returns an empty signature. | |
34 | """ | |
35 | ||
36 | def get_signature(self, key: bytes, value: bytes) -> bytes: | |
37 | return b"" | |
38 | ||
39 | ||
40 | class HMACAlgorithm(SigningAlgorithm): | |
41 | """Provides signature generation using HMACs.""" | |
42 | ||
43 | #: The digest method to use with the MAC algorithm. This defaults to | |
44 | #: SHA1, but can be changed to any other function in the hashlib | |
45 | #: module. | |
46 | default_digest_method: _t.Any = staticmethod(hashlib.sha1) | |
47 | ||
48 | def __init__(self, digest_method: _t.Any = None): | |
49 | if digest_method is None: | |
50 | digest_method = self.default_digest_method | |
51 | ||
52 | self.digest_method: _t.Any = digest_method | |
53 | ||
54 | def get_signature(self, key: bytes, value: bytes) -> bytes: | |
55 | mac = hmac.new(key, msg=value, digestmod=self.digest_method) | |
56 | return mac.digest() | |
57 | ||
58 | ||
59 | def _make_keys_list(secret_key: _t_secret_key) -> _t.List[bytes]: | |
60 | if isinstance(secret_key, (str, bytes)): | |
61 | return [want_bytes(secret_key)] | |
62 | ||
63 | return [want_bytes(s) for s in secret_key] | |
64 | ||
65 | ||
66 | class Signer: | |
67 | """A signer securely signs bytes, then unsigns them to verify that | |
68 | the value hasn't been changed. | |
69 | ||
70 | The secret key should be a random string of ``bytes`` and should not | |
71 | be saved to code or version control. Different salts should be used | |
72 | to distinguish signing in different contexts. See :doc:`/concepts` | |
73 | for information about the security of the secret key and salt. | |
74 | ||
75 | :param secret_key: The secret key to sign and verify with. Can be a | |
76 | list of keys, oldest to newest, to support key rotation. | |
77 | :param salt: Extra key to combine with ``secret_key`` to distinguish | |
78 | signatures in different contexts. | |
79 | :param sep: Separator between the signature and value. | |
80 | :param key_derivation: How to derive the signing key from the secret | |
81 | key and salt. Possible values are ``concat``, ``django-concat``, | |
82 | or ``hmac``. Defaults to :attr:`default_key_derivation`, which | |
83 | defaults to ``django-concat``. | |
84 | :param digest_method: Hash function to use when generating the HMAC | |
85 | signature. Defaults to :attr:`default_digest_method`, which | |
86 | defaults to :func:`hashlib.sha1`. Note that the security of the | |
87 | hash alone doesn't apply when used intermediately in HMAC. | |
88 | :param algorithm: A :class:`SigningAlgorithm` instance to use | |
89 | instead of building a default :class:`HMACAlgorithm` with the | |
90 | ``digest_method``. | |
91 | ||
92 | .. versionchanged:: 2.0 | |
93 | Added support for key rotation by passing a list to | |
94 | ``secret_key``. | |
95 | ||
96 | .. versionchanged:: 0.18 | |
97 | ``algorithm`` was added as an argument to the class constructor. | |
98 | ||
99 | .. versionchanged:: 0.14 | |
100 | ``key_derivation`` and ``digest_method`` were added as arguments | |
101 | to the class constructor. | |
102 | """ | |
103 | ||
104 | #: The default digest method to use for the signer. The default is | |
105 | #: :func:`hashlib.sha1`, but can be changed to any :mod:`hashlib` or | |
106 | #: compatible object. Note that the security of the hash alone | |
107 | #: doesn't apply when used intermediately in HMAC. | |
108 | #: | |
109 | #: .. versionadded:: 0.14 | |
110 | default_digest_method: _t.Any = staticmethod(hashlib.sha1) | |
111 | ||
112 | #: The default scheme to use to derive the signing key from the | |
113 | #: secret key and salt. The default is ``django-concat``. Possible | |
114 | #: values are ``concat``, ``django-concat``, and ``hmac``. | |
115 | #: | |
116 | #: .. versionadded:: 0.14 | |
117 | default_key_derivation: str = "django-concat" | |
118 | ||
119 | def __init__( | |
120 | self, | |
121 | secret_key: _t_secret_key, | |
122 | salt: _t_opt_str_bytes = b"itsdangerous.Signer", | |
123 | sep: _t_str_bytes = b".", | |
124 | key_derivation: _t.Optional[str] = None, | |
125 | digest_method: _t.Optional[_t.Any] = None, | |
126 | algorithm: _t.Optional[SigningAlgorithm] = None, | |
127 | ): | |
128 | #: The list of secret keys to try for verifying signatures, from | |
129 | #: oldest to newest. The newest (last) key is used for signing. | |
130 | #: | |
131 | #: This allows a key rotation system to keep a list of allowed | |
132 | #: keys and remove expired ones. | |
133 | self.secret_keys: _t.List[bytes] = _make_keys_list(secret_key) | |
134 | self.sep: bytes = want_bytes(sep) | |
135 | ||
136 | if self.sep in _base64_alphabet: | |
137 | raise ValueError( | |
138 | "The given separator cannot be used because it may be" | |
139 | " contained in the signature itself. ASCII letters," | |
140 | " digits, and '-_=' must not be used." | |
141 | ) | |
142 | ||
143 | if salt is not None: | |
144 | salt = want_bytes(salt) | |
145 | else: | |
146 | salt = b"itsdangerous.Signer" | |
147 | ||
148 | self.salt = salt | |
149 | ||
150 | if key_derivation is None: | |
151 | key_derivation = self.default_key_derivation | |
152 | ||
153 | self.key_derivation: str = key_derivation | |
154 | ||
155 | if digest_method is None: | |
156 | digest_method = self.default_digest_method | |
157 | ||
158 | self.digest_method: _t.Any = digest_method | |
159 | ||
160 | if algorithm is None: | |
161 | algorithm = HMACAlgorithm(self.digest_method) | |
162 | ||
163 | self.algorithm: SigningAlgorithm = algorithm | |
164 | ||
165 | @property | |
166 | def secret_key(self) -> bytes: | |
167 | """The newest (last) entry in the :attr:`secret_keys` list. This | |
168 | is for compatibility from before key rotation support was added. | |
169 | """ | |
170 | return self.secret_keys[-1] | |
171 | ||
172 | def derive_key(self, secret_key: _t_opt_str_bytes = None) -> bytes: | |
173 | """This method is called to derive the key. The default key | |
174 | derivation choices can be overridden here. Key derivation is not | |
175 | intended to be used as a security method to make a complex key | |
176 | out of a short password. Instead you should use large random | |
177 | secret keys. | |
178 | ||
179 | :param secret_key: A specific secret key to derive from. | |
180 | Defaults to the last item in :attr:`secret_keys`. | |
181 | ||
182 | .. versionchanged:: 2.0 | |
183 | Added the ``secret_key`` parameter. | |
184 | """ | |
185 | if secret_key is None: | |
186 | secret_key = self.secret_keys[-1] | |
187 | else: | |
188 | secret_key = want_bytes(secret_key) | |
189 | ||
190 | if self.key_derivation == "concat": | |
191 | return _t.cast(bytes, self.digest_method(self.salt + secret_key).digest()) | |
192 | elif self.key_derivation == "django-concat": | |
193 | return _t.cast( | |
194 | bytes, self.digest_method(self.salt + b"signer" + secret_key).digest() | |
195 | ) | |
196 | elif self.key_derivation == "hmac": | |
197 | mac = hmac.new(secret_key, digestmod=self.digest_method) | |
198 | mac.update(self.salt) | |
199 | return mac.digest() | |
200 | elif self.key_derivation == "none": | |
201 | return secret_key | |
202 | else: | |
203 | raise TypeError("Unknown key derivation method") | |
204 | ||
205 | def get_signature(self, value: _t_str_bytes) -> bytes: | |
206 | """Returns the signature for the given value.""" | |
207 | value = want_bytes(value) | |
208 | key = self.derive_key() | |
209 | sig = self.algorithm.get_signature(key, value) | |
210 | return base64_encode(sig) | |
211 | ||
212 | def sign(self, value: _t_str_bytes) -> bytes: | |
213 | """Signs the given string.""" | |
214 | value = want_bytes(value) | |
215 | return value + self.sep + self.get_signature(value) | |
216 | ||
217 | def verify_signature(self, value: _t_str_bytes, sig: _t_str_bytes) -> bool: | |
218 | """Verifies the signature for the given value.""" | |
219 | try: | |
220 | sig = base64_decode(sig) | |
221 | except Exception: | |
222 | return False | |
223 | ||
224 | value = want_bytes(value) | |
225 | ||
226 | for secret_key in reversed(self.secret_keys): | |
227 | key = self.derive_key(secret_key) | |
228 | ||
229 | if self.algorithm.verify_signature(key, value, sig): | |
230 | return True | |
231 | ||
232 | return False | |
233 | ||
234 | def unsign(self, signed_value: _t_str_bytes) -> bytes: | |
235 | """Unsigns the given string.""" | |
236 | signed_value = want_bytes(signed_value) | |
237 | ||
238 | if self.sep not in signed_value: | |
239 | raise BadSignature(f"No {self.sep!r} found in value") | |
240 | ||
241 | value, sig = signed_value.rsplit(self.sep, 1) | |
242 | ||
243 | if self.verify_signature(value, sig): | |
244 | return value | |
245 | ||
246 | raise BadSignature(f"Signature {sig!r} does not match", payload=value) | |
247 | ||
248 | def validate(self, signed_value: _t_str_bytes) -> bool: | |
249 | """Only validates the given signed value. Returns ``True`` if | |
250 | the signature exists and is valid. | |
251 | """ | |
252 | try: | |
253 | self.unsign(signed_value) | |
254 | return True | |
255 | except BadSignature: | |
256 | return False |
0 | import time | |
1 | import typing | |
2 | import typing as _t | |
3 | from datetime import datetime | |
4 | from datetime import timezone | |
5 | ||
6 | from .encoding import base64_decode | |
7 | from .encoding import base64_encode | |
8 | from .encoding import bytes_to_int | |
9 | from .encoding import int_to_bytes | |
10 | from .encoding import want_bytes | |
11 | from .exc import BadSignature | |
12 | from .exc import BadTimeSignature | |
13 | from .exc import SignatureExpired | |
14 | from .serializer import Serializer | |
15 | from .signer import Signer | |
16 | ||
17 | _t_str_bytes = _t.Union[str, bytes] | |
18 | _t_opt_str_bytes = _t.Optional[_t_str_bytes] | |
19 | _t_opt_int = _t.Optional[int] | |
20 | ||
21 | if _t.TYPE_CHECKING: | |
22 | import typing_extensions as _te | |
23 | ||
24 | ||
25 | class TimestampSigner(Signer): | |
26 | """Works like the regular :class:`.Signer` but also records the time | |
27 | of the signing and can be used to expire signatures. The | |
28 | :meth:`unsign` method can raise :exc:`.SignatureExpired` if the | |
29 | unsigning failed because the signature is expired. | |
30 | """ | |
31 | ||
32 | def get_timestamp(self) -> int: | |
33 | """Returns the current timestamp. The function must return an | |
34 | integer. | |
35 | """ | |
36 | return int(time.time()) | |
37 | ||
38 | def timestamp_to_datetime(self, ts: int) -> datetime: | |
39 | """Convert the timestamp from :meth:`get_timestamp` into an | |
40 | aware :class`datetime.datetime` in UTC. | |
41 | ||
42 | .. versionchanged:: 2.0 | |
43 | The timestamp is returned as a timezone-aware ``datetime`` | |
44 | in UTC rather than a naive ``datetime`` assumed to be UTC. | |
45 | """ | |
46 | return datetime.fromtimestamp(ts, tz=timezone.utc) | |
47 | ||
48 | def sign(self, value: _t_str_bytes) -> bytes: | |
49 | """Signs the given string and also attaches time information.""" | |
50 | value = want_bytes(value) | |
51 | timestamp = base64_encode(int_to_bytes(self.get_timestamp())) | |
52 | sep = want_bytes(self.sep) | |
53 | value = value + sep + timestamp | |
54 | return value + sep + self.get_signature(value) | |
55 | ||
56 | # Ignore overlapping signatures check, return_timestamp is the only | |
57 | # parameter that affects the return type. | |
58 | ||
59 | @typing.overload | |
60 | def unsign( # type: ignore | |
61 | self, | |
62 | signed_value: _t_str_bytes, | |
63 | max_age: _t_opt_int = None, | |
64 | return_timestamp: "_te.Literal[False]" = False, | |
65 | ) -> bytes: | |
66 | ... | |
67 | ||
68 | @typing.overload | |
69 | def unsign( | |
70 | self, | |
71 | signed_value: _t_str_bytes, | |
72 | max_age: _t_opt_int = None, | |
73 | return_timestamp: "_te.Literal[True]" = True, | |
74 | ) -> _t.Tuple[bytes, datetime]: | |
75 | ... | |
76 | ||
77 | def unsign( | |
78 | self, | |
79 | signed_value: _t_str_bytes, | |
80 | max_age: _t_opt_int = None, | |
81 | return_timestamp: bool = False, | |
82 | ) -> _t.Union[_t.Tuple[bytes, datetime], bytes]: | |
83 | """Works like the regular :meth:`.Signer.unsign` but can also | |
84 | validate the time. See the base docstring of the class for | |
85 | the general behavior. If ``return_timestamp`` is ``True`` the | |
86 | timestamp of the signature will be returned as an aware | |
87 | :class:`datetime.datetime` object in UTC. | |
88 | ||
89 | .. versionchanged:: 2.0 | |
90 | The timestamp is returned as a timezone-aware ``datetime`` | |
91 | in UTC rather than a naive ``datetime`` assumed to be UTC. | |
92 | """ | |
93 | try: | |
94 | result = super().unsign(signed_value) | |
95 | sig_error = None | |
96 | except BadSignature as e: | |
97 | sig_error = e | |
98 | result = e.payload or b"" | |
99 | ||
100 | sep = want_bytes(self.sep) | |
101 | ||
102 | # If there is no timestamp in the result there is something | |
103 | # seriously wrong. In case there was a signature error, we raise | |
104 | # that one directly, otherwise we have a weird situation in | |
105 | # which we shouldn't have come except someone uses a time-based | |
106 | # serializer on non-timestamp data, so catch that. | |
107 | if sep not in result: | |
108 | if sig_error: | |
109 | raise sig_error | |
110 | ||
111 | raise BadTimeSignature("timestamp missing", payload=result) | |
112 | ||
113 | value, ts_bytes = result.rsplit(sep, 1) | |
114 | ts_int: _t_opt_int = None | |
115 | ts_dt: _t.Optional[datetime] = None | |
116 | ||
117 | try: | |
118 | ts_int = bytes_to_int(base64_decode(ts_bytes)) | |
119 | except Exception: | |
120 | pass | |
121 | ||
122 | # Signature is *not* okay. Raise a proper error now that we have | |
123 | # split the value and the timestamp. | |
124 | if sig_error is not None: | |
125 | if ts_int is not None: | |
126 | ts_dt = self.timestamp_to_datetime(ts_int) | |
127 | ||
128 | raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt) | |
129 | ||
130 | # Signature was okay but the timestamp is actually not there or | |
131 | # malformed. Should not happen, but we handle it anyway. | |
132 | if ts_int is None: | |
133 | raise BadTimeSignature("Malformed timestamp", payload=value) | |
134 | ||
135 | # Check timestamp is not older than max_age | |
136 | if max_age is not None: | |
137 | age = self.get_timestamp() - ts_int | |
138 | ||
139 | if age > max_age: | |
140 | raise SignatureExpired( | |
141 | f"Signature age {age} > {max_age} seconds", | |
142 | payload=value, | |
143 | date_signed=self.timestamp_to_datetime(ts_int), | |
144 | ) | |
145 | ||
146 | if age < 0: | |
147 | raise SignatureExpired( | |
148 | f"Signature age {age} < 0 seconds", | |
149 | payload=value, | |
150 | date_signed=self.timestamp_to_datetime(ts_int), | |
151 | ) | |
152 | ||
153 | if return_timestamp: | |
154 | return value, self.timestamp_to_datetime(ts_int) | |
155 | ||
156 | return value | |
157 | ||
158 | def validate(self, signed_value: _t_str_bytes, max_age: _t_opt_int = None) -> bool: | |
159 | """Only validates the given signed value. Returns ``True`` if | |
160 | the signature exists and is valid.""" | |
161 | try: | |
162 | self.unsign(signed_value, max_age=max_age) | |
163 | return True | |
164 | except BadSignature: | |
165 | return False | |
166 | ||
167 | ||
168 | class TimedSerializer(Serializer): | |
169 | """Uses :class:`TimestampSigner` instead of the default | |
170 | :class:`.Signer`. | |
171 | """ | |
172 | ||
173 | default_signer: _t.Type[TimestampSigner] = TimestampSigner | |
174 | ||
175 | def iter_unsigners( | |
176 | self, salt: _t_opt_str_bytes = None | |
177 | ) -> _t.Iterator[TimestampSigner]: | |
178 | return _t.cast("_t.Iterator[TimestampSigner]", super().iter_unsigners(salt)) | |
179 | ||
180 | # TODO: Signature is incompatible because parameters were added | |
181 | # before salt. | |
182 | ||
183 | def loads( # type: ignore | |
184 | self, | |
185 | s: _t_str_bytes, | |
186 | max_age: _t_opt_int = None, | |
187 | return_timestamp: bool = False, | |
188 | salt: _t_opt_str_bytes = None, | |
189 | ) -> _t.Any: | |
190 | """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the | |
191 | signature validation fails. If a ``max_age`` is provided it will | |
192 | ensure the signature is not older than that time in seconds. In | |
193 | case the signature is outdated, :exc:`.SignatureExpired` is | |
194 | raised. All arguments are forwarded to the signer's | |
195 | :meth:`~TimestampSigner.unsign` method. | |
196 | """ | |
197 | s = want_bytes(s) | |
198 | last_exception = None | |
199 | ||
200 | for signer in self.iter_unsigners(salt): | |
201 | try: | |
202 | base64d, timestamp = signer.unsign( | |
203 | s, max_age=max_age, return_timestamp=True | |
204 | ) | |
205 | payload = self.load_payload(base64d) | |
206 | ||
207 | if return_timestamp: | |
208 | return payload, timestamp | |
209 | ||
210 | return payload | |
211 | except SignatureExpired: | |
212 | # The signature was unsigned successfully but was | |
213 | # expired. Do not try the next signer. | |
214 | raise | |
215 | except BadSignature as err: | |
216 | last_exception = err | |
217 | ||
218 | raise _t.cast(BadSignature, last_exception) | |
219 | ||
220 | def loads_unsafe( # type: ignore | |
221 | self, | |
222 | s: _t_str_bytes, | |
223 | max_age: _t_opt_int = None, | |
224 | salt: _t_opt_str_bytes = None, | |
225 | ) -> _t.Tuple[bool, _t.Any]: | |
226 | return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age}) |
0 | import typing as _t | |
1 | import zlib | |
2 | ||
3 | from ._json import _CompactJSON | |
4 | from .encoding import base64_decode | |
5 | from .encoding import base64_encode | |
6 | from .exc import BadPayload | |
7 | from .serializer import Serializer | |
8 | from .timed import TimedSerializer | |
9 | ||
10 | ||
11 | class URLSafeSerializerMixin(Serializer): | |
12 | """Mixed in with a regular serializer it will attempt to zlib | |
13 | compress the string to make it shorter if necessary. It will also | |
14 | base64 encode the string so that it can safely be placed in a URL. | |
15 | """ | |
16 | ||
17 | default_serializer = _CompactJSON | |
18 | ||
19 | def load_payload( | |
20 | self, | |
21 | payload: bytes, | |
22 | *args: _t.Any, | |
23 | serializer: _t.Optional[_t.Any] = None, | |
24 | **kwargs: _t.Any, | |
25 | ) -> _t.Any: | |
26 | decompress = False | |
27 | ||
28 | if payload.startswith(b"."): | |
29 | payload = payload[1:] | |
30 | decompress = True | |
31 | ||
32 | try: | |
33 | json = base64_decode(payload) | |
34 | except Exception as e: | |
35 | raise BadPayload( | |
36 | "Could not base64 decode the payload because of an exception", | |
37 | original_error=e, | |
38 | ) | |
39 | ||
40 | if decompress: | |
41 | try: | |
42 | json = zlib.decompress(json) | |
43 | except Exception as e: | |
44 | raise BadPayload( | |
45 | "Could not zlib decompress the payload before decoding the payload", | |
46 | original_error=e, | |
47 | ) | |
48 | ||
49 | return super().load_payload(json, *args, **kwargs) | |
50 | ||
51 | def dump_payload(self, obj: _t.Any) -> bytes: | |
52 | json = super().dump_payload(obj) | |
53 | is_compressed = False | |
54 | compressed = zlib.compress(json) | |
55 | ||
56 | if len(compressed) < (len(json) - 1): | |
57 | json = compressed | |
58 | is_compressed = True | |
59 | ||
60 | base64d = base64_encode(json) | |
61 | ||
62 | if is_compressed: | |
63 | base64d = b"." + base64d | |
64 | ||
65 | return base64d | |
66 | ||
67 | ||
68 | class URLSafeSerializer(URLSafeSerializerMixin, Serializer): | |
69 | """Works like :class:`.Serializer` but dumps and loads into a URL | |
70 | safe string consisting of the upper and lowercase character of the | |
71 | alphabet as well as ``'_'``, ``'-'`` and ``'.'``. | |
72 | """ | |
73 | ||
74 | ||
75 | class URLSafeTimedSerializer(URLSafeSerializerMixin, TimedSerializer): | |
76 | """Works like :class:`.TimedSerializer` but dumps and loads into a | |
77 | URL safe string consisting of the upper and lowercase character of | |
78 | the alphabet as well as ``'_'``, ``'-'`` and ``'.'``. | |
79 | """ |