$69 GRAYBYTE WORDPRESS FILE MANAGER $41

SERVER : premium201.web-hosting.com #1 SMP Wed Mar 26 12:08:09 UTC 2025
SERVER IP : 104.21.43.35 | ADMIN IP 216.73.216.180
OPTIONS : CRL = ON | WGT = ON | SDO = OFF | PKEX = OFF
DEACTIVATED : mail

/opt/imunify360/venv/lib/python3.11/site-packages/pyasn1/codec/

HOME
Current File : /opt/imunify360/venv/lib/python3.11/site-packages/pyasn1/codec//streaming.py
#
# This file is part of pyasn1 software.
#
# Copyright (c) 2005-2019, Ilya Etingof <[email protected]>
# License: https://pyasn1.readthedocs.io/en/latest/license.html
#
import io
import os

from pyasn1 import error
from pyasn1.type import univ

class CachingStreamWrapper(io.IOBase):
    """Wrapper around non-seekable streams.

    Note that the implementation is tied to the decoder,
    not checking for dangerous arguments for the sake
    of performance.

    The read bytes are kept in an internal cache until
    setting _markedPosition which may reset the cache.
    """
    def __init__(self, raw):
        self._raw = raw
        self._cache = io.BytesIO()
        self._markedPosition = 0

    def peek(self, n):
        result = self.read(n)
        self._cache.seek(-len(result), os.SEEK_CUR)
        return result

    def seekable(self):
        return True

    def seek(self, n=-1, whence=os.SEEK_SET):
        # Note that this not safe for seeking forward.
        return self._cache.seek(n, whence)

    def read(self, n=-1):
        read_from_cache = self._cache.read(n)
        if n != -1:
            n -= len(read_from_cache)
            if not n:  # 0 bytes left to read
                return read_from_cache

        read_from_raw = self._raw.read(n)

        self._cache.write(read_from_raw)

        return read_from_cache + read_from_raw

    @property
    def markedPosition(self):
        """Position where the currently processed element starts.

        This is used for back-tracking in SingleItemDecoder.__call__
        and (indefLen)ValueDecoder and should not be used for other purposes.
        The client is not supposed to ever seek before this position.
        """
        return self._markedPosition

    @markedPosition.setter
    def markedPosition(self, value):
        # By setting the value, we ensure we won't seek back before it.
        # `value` should be the same as the current position
        # We don't check for this for performance reasons.
        self._markedPosition = value

        # Whenever we set _marked_position, we know for sure
        # that we will not return back, and thus it is
        # safe to drop all cached data.
        if self._cache.tell() > io.DEFAULT_BUFFER_SIZE:
            self._cache = io.BytesIO(self._cache.read())
            self._markedPosition = 0

    def tell(self):
        return self._cache.tell()


def asSeekableStream(substrate):
    """Convert object to seekable byte-stream.

    Parameters
    ----------
    substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString`

    Returns
    -------
    : :py:class:`io.IOBase`

    Raises
    ------
    : :py:class:`~pyasn1.error.PyAsn1Error`
        If the supplied substrate cannot be converted to a seekable stream.
    """
    if isinstance(substrate, io.BytesIO):
        return substrate

    elif isinstance(substrate, bytes):
        return io.BytesIO(substrate)

    elif isinstance(substrate, univ.OctetString):
        return io.BytesIO(substrate.asOctets())

    try:
        if substrate.seekable():  # Will fail for most invalid types
            return substrate
        else:
            return CachingStreamWrapper(substrate)

    except AttributeError:
        raise error.UnsupportedSubstrateError(
            "Cannot convert " + substrate.__class__.__name__ +
            " to a seekable bit stream.")


def isEndOfStream(substrate):
    """Check whether we have reached the end of a stream.

    Although it is more effective to read and catch exceptions, this
    function

    Parameters
    ----------
    substrate: :py:class:`IOBase`
        Stream to check

    Returns
    -------
    : :py:class:`bool`
    """
    if isinstance(substrate, io.BytesIO):
        cp = substrate.tell()
        substrate.seek(0, os.SEEK_END)
        result = substrate.tell() == cp
        substrate.seek(cp, os.SEEK_SET)
        yield result

    else:
        received = substrate.read(1)
        if received is None:
            yield

        if received:
            substrate.seek(-1, os.SEEK_CUR)

        yield not received


def peekIntoStream(substrate, size=-1):
    """Peek into stream.

    Parameters
    ----------
    substrate: :py:class:`IOBase`
        Stream to read from.

    size: :py:class:`int`
        How many bytes to peek (-1 = all available)

    Returns
    -------
    : :py:class:`bytes` or :py:class:`str`
        The return type depends on Python major version
    """
    if hasattr(substrate, "peek"):
        received = substrate.peek(size)
        if received is None:
            yield

        while len(received) < size:
            yield

        yield received

    else:
        current_position = substrate.tell()
        try:
            for chunk in readFromStream(substrate, size):
                yield chunk

        finally:
            substrate.seek(current_position)


def readFromStream(substrate, size=-1, context=None):
    """Read from the stream.

    Parameters
    ----------
    substrate: :py:class:`IOBase`
        Stream to read from.

    Keyword parameters
    ------------------
    size: :py:class:`int`
        How many bytes to read (-1 = all available)

    context: :py:class:`dict`
        Opaque caller context will be attached to exception objects created
        by this function.

    Yields
    ------
    : :py:class:`bytes` or :py:class:`str` or :py:class:`SubstrateUnderrunError`
        Read data or :py:class:`~pyasn1.error.SubstrateUnderrunError`
        object if no `size` bytes is readily available in the stream. The
        data type depends on Python major version

    Raises
    ------
    : :py:class:`~pyasn1.error.EndOfStreamError`
        Input stream is exhausted
    """
    while True:
        # this will block unless stream is non-blocking
        received = substrate.read(size)
        if received is None:  # non-blocking stream can do this
            yield error.SubstrateUnderrunError(context=context)

        elif not received and size != 0:  # end-of-stream
            raise error.EndOfStreamError(context=context)

        elif len(received) < size:
            substrate.seek(-len(received), os.SEEK_CUR)

            # behave like a non-blocking stream
            yield error.SubstrateUnderrunError(context=context)

        else:
            break

    yield received


Current_dir [ NOT WRITEABLE ] Document_root [ WRITEABLE ]


[ Back ]
NAME
SIZE
LAST TOUCH
USER
CAN-I?
FUNCTIONS
..
--
3 Mar 2026 8.59 AM
root / root
0755
__pycache__
--
3 Mar 2026 8.59 AM
root / root
0755
ber
--
3 Mar 2026 8.59 AM
root / root
0755
cer
--
3 Mar 2026 8.59 AM
root / root
0755
der
--
3 Mar 2026 8.59 AM
root / root
0755
native
--
3 Mar 2026 8.59 AM
root / root
0755
__init__.py
0.058 KB
13 Feb 2026 12.40 PM
root / root
0644
streaming.py
6.228 KB
13 Feb 2026 12.40 PM
root / root
0644

GRAYBYTE WORDPRESS FILE MANAGER @ 2025 CONTACT ME
Static GIF