$77 GRAYBYTE WORDPRESS FILE MANAGER $53

SERVER : premium201.web-hosting.com #1 SMP Wed Mar 26 12:08:09 UTC 2025
SERVER IP : 172.67.217.254 | ADMIN IP 216.73.216.180
OPTIONS : CRL = ON | WGT = ON | SDO = OFF | PKEX = OFF
DEACTIVATED : mail

/opt/imunify360/venv/lib/python3.11/site-packages/cryptography/

HOME
Current File : /opt/imunify360/venv/lib/python3.11/site-packages/cryptography//fernet.py
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

from __future__ import annotations

import base64
import binascii
import os
import time
import typing

from cryptography import utils
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.hmac import HMAC


class InvalidToken(Exception):
    pass


_MAX_CLOCK_SKEW = 60


class Fernet:
    def __init__(
        self,
        key: bytes | str,
        backend: typing.Any = None,
    ) -> None:
        try:
            key = base64.urlsafe_b64decode(key)
        except binascii.Error as exc:
            raise ValueError(
                "Fernet key must be 32 url-safe base64-encoded bytes."
            ) from exc
        if len(key) != 32:
            raise ValueError(
                "Fernet key must be 32 url-safe base64-encoded bytes."
            )

        self._signing_key = key[:16]
        self._encryption_key = key[16:]

    @classmethod
    def generate_key(cls) -> bytes:
        return base64.urlsafe_b64encode(os.urandom(32))

    def encrypt(self, data: bytes) -> bytes:
        return self.encrypt_at_time(data, int(time.time()))

    def encrypt_at_time(self, data: bytes, current_time: int) -> bytes:
        iv = os.urandom(16)
        return self._encrypt_from_parts(data, current_time, iv)

    def _encrypt_from_parts(
        self, data: bytes, current_time: int, iv: bytes
    ) -> bytes:
        utils._check_bytes("data", data)

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key),
            modes.CBC(iv),
        ).encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80"
            + current_time.to_bytes(length=8, byteorder="big")
            + iv
            + ciphertext
        )

        h = HMAC(self._signing_key, hashes.SHA256())
        h.update(basic_parts)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)

    def decrypt(self, token: bytes | str, ttl: int | None = None) -> bytes:
        timestamp, data = Fernet._get_unverified_token_data(token)
        if ttl is None:
            time_info = None
        else:
            time_info = (ttl, int(time.time()))
        return self._decrypt_data(data, timestamp, time_info)

    def decrypt_at_time(
        self, token: bytes | str, ttl: int, current_time: int
    ) -> bytes:
        if ttl is None:
            raise ValueError(
                "decrypt_at_time() can only be used with a non-None ttl"
            )
        timestamp, data = Fernet._get_unverified_token_data(token)
        return self._decrypt_data(data, timestamp, (ttl, current_time))

    def extract_timestamp(self, token: bytes | str) -> int:
        timestamp, data = Fernet._get_unverified_token_data(token)
        # Verify the token was not tampered with.
        self._verify_signature(data)
        return timestamp

    @staticmethod
    def _get_unverified_token_data(token: bytes | str) -> tuple[int, bytes]:
        if not isinstance(token, (str, bytes)):
            raise TypeError("token must be bytes or str")

        try:
            data = base64.urlsafe_b64decode(token)
        except (TypeError, binascii.Error):
            raise InvalidToken

        if not data or data[0] != 0x80:
            raise InvalidToken

        if len(data) < 9:
            raise InvalidToken

        timestamp = int.from_bytes(data[1:9], byteorder="big")
        return timestamp, data

    def _verify_signature(self, data: bytes) -> None:
        h = HMAC(self._signing_key, hashes.SHA256())
        h.update(data[:-32])
        try:
            h.verify(data[-32:])
        except InvalidSignature:
            raise InvalidToken

    def _decrypt_data(
        self,
        data: bytes,
        timestamp: int,
        time_info: tuple[int, int] | None,
    ) -> bytes:
        if time_info is not None:
            ttl, current_time = time_info
            if timestamp + ttl < current_time:
                raise InvalidToken

            if current_time + _MAX_CLOCK_SKEW < timestamp:
                raise InvalidToken

        self._verify_signature(data)

        iv = data[9:25]
        ciphertext = data[25:-32]
        decryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv)
        ).decryptor()
        plaintext_padded = decryptor.update(ciphertext)
        try:
            plaintext_padded += decryptor.finalize()
        except ValueError:
            raise InvalidToken
        unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()

        unpadded = unpadder.update(plaintext_padded)
        try:
            unpadded += unpadder.finalize()
        except ValueError:
            raise InvalidToken
        return unpadded


class MultiFernet:
    def __init__(self, fernets: typing.Iterable[Fernet]):
        fernets = list(fernets)
        if not fernets:
            raise ValueError(
                "MultiFernet requires at least one Fernet instance"
            )
        self._fernets = fernets

    def encrypt(self, msg: bytes) -> bytes:
        return self.encrypt_at_time(msg, int(time.time()))

    def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes:
        return self._fernets[0].encrypt_at_time(msg, current_time)

    def rotate(self, msg: bytes | str) -> bytes:
        timestamp, data = Fernet._get_unverified_token_data(msg)
        for f in self._fernets:
            try:
                p = f._decrypt_data(data, timestamp, None)
                break
            except InvalidToken:
                pass
        else:
            raise InvalidToken

        iv = os.urandom(16)
        return self._fernets[0]._encrypt_from_parts(p, timestamp, iv)

    def decrypt(self, msg: bytes | str, ttl: int | None = None) -> bytes:
        for f in self._fernets:
            try:
                return f.decrypt(msg, ttl)
            except InvalidToken:
                pass
        raise InvalidToken

    def decrypt_at_time(
        self, msg: bytes | str, ttl: int, current_time: int
    ) -> bytes:
        for f in self._fernets:
            try:
                return f.decrypt_at_time(msg, ttl, current_time)
            except InvalidToken:
                pass
        raise InvalidToken


Current_dir [ NOT WRITEABLE ] Document_root [ WRITEABLE ]


[ Back ]
NAME
SIZE
LAST TOUCH
USER
CAN-I?
FUNCTIONS
..
--
3 Mar 2026 8.59 AM
root / root
0755
__pycache__
--
3 Mar 2026 8.59 AM
root / root
0755
hazmat
--
3 Mar 2026 8.59 AM
root / root
0755
x509
--
3 Mar 2026 8.59 AM
root / root
0755
__about__.py
0.435 KB
13 Feb 2026 12.40 PM
root / root
0644
__init__.py
0.355 KB
13 Feb 2026 12.40 PM
root / root
0644
exceptions.py
1.062 KB
13 Feb 2026 12.40 PM
root / root
0644
fernet.py
6.539 KB
13 Feb 2026 12.40 PM
root / root
0644
py.typed
0 KB
13 Feb 2026 12.40 PM
root / root
0644
utils.py
3.833 KB
13 Feb 2026 12.40 PM
root / root
0644

GRAYBYTE WORDPRESS FILE MANAGER @ 2025 CONTACT ME
Static GIF