Skip to content

ObservableStream class

Bases: Generic[_OutT], Observable[_OutT]

An observable subclass that encapsulates a flow of data and offers a mechanism for streaming its entries reactively.

Notes:

  • The ObservableStream class is a data flow-centric observable that focuses exclusively on the stream of its entries over time.

  • Data streaming is managed through a queue, ensuring that the order of entries is preserved and that no inconsistent notifications are generated.

  • Data is streamed to subscribers in a lazy manner, allowing them to receive incremental notifications as they occur.

Source code in pyventus/reactive/observables/observable_stream.py
class ObservableStream(Generic[_OutT], Observable[_OutT]):
    """
    An observable subclass that encapsulates a flow of data and offers a mechanism for streaming its entries reactively.

    **Notes:**

    -   The `ObservableStream` class is a data flow-centric observable that focuses exclusively on the stream of its
        entries over time.

    -   Data streaming is managed through a queue, ensuring that the order of entries is preserved and that no
        inconsistent notifications are generated.

    -   Data is streamed to subscribers in a lazy manner, allowing them to receive incremental notifications
        as they occur.
    """

    # Attributes for the ObservableStream.
    __slots__ = ("__processing_service",)

    def __init__(self, debug: bool | None = None) -> None:
        """
        Initialize an instance of `ObservableStream`.

        :param debug: Specifies the debug mode for the logger. If `None`, the mode is determined based on the
            execution environment.
        """
        # Initialize the base Observable class with the specified debug mode.
        super().__init__(debug=debug)

        # Create an AsyncIO processing service to manage data entries and notifications.
        # The enforce_submission_order option is enabled to ensure that entries are processed sequentially.
        self.__processing_service: AsyncIOProcessingService = AsyncIOProcessingService(enforce_submission_order=True)

    @override
    def __repr__(self) -> str:
        return formatted_repr(
            instance=self,
            info=(
                attributes_repr(
                    processing_service=self.__processing_service,
                )
                + f", {super().__repr__()}"
            ),
        )

    async def wait_for_tasks(self) -> None:
        """
        Wait for all background tasks in the current asyncio loop associated with the `ObservableStream` to complete.

        :return: None.
        """
        await self.__processing_service.wait_for_tasks()

    @final  # Prevent overriding in subclasses to maintain the integrity of the `_OutT` type.
    def next(self, value: _OutT) -> None:  # type: ignore[misc]
        """
        Push a value entry to the stream and notify all subscribers.

        :param value: The value entry to be pushed to the stream.
        :return: None.
        """
        with self._thread_lock:
            subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
        self.__processing_service.submit(self._emit_next, value, subscribers)

    def error(self, exception: Exception) -> None:
        """
        Push an error entry to the stream and notify all subscribers.

        :param exception: The error entry to be pushed to the stream.
        :return: None.
        """
        with self._thread_lock:
            subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
        self.__processing_service.submit(self._emit_error, exception, subscribers)

    def complete(self) -> None:
        """
        Push a completion entry to the stream and notify all subscribers.

        Once the notification is sent, all notified subscribers will be removed.

        :return: None.
        """
        with self._thread_lock:
            subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
            self._subscribers.clear()
        self.__processing_service.submit(self._emit_complete, subscribers)

Functions

get_valid_subscriber staticmethod

get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]

Validate and return the specified subscriber.

PARAMETER DESCRIPTION
subscriber

The subscriber to validate.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
Subscriber[_OutT]

The validated subscriber.

RAISES DESCRIPTION
PyventusException

If the subscriber is not an instance of Subscriber.

Source code in pyventus/reactive/observables/observable.py
@staticmethod
def get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]:
    """
    Validate and return the specified subscriber.

    :param subscriber: The subscriber to validate.
    :return: The validated subscriber.
    :raises PyventusException: If the subscriber is not an instance of `Subscriber`.
    """
    # Validate that the subscriber is an instance of Subscriber.
    if not isinstance(subscriber, Subscriber):
        raise PyventusException("The 'subscriber' argument must be an instance of Subscriber.")
    return subscriber

get_subscribers

get_subscribers() -> set[Subscriber[_OutT]]

Retrieve all registered subscribers.

RETURNS DESCRIPTION
set[Subscriber[_OutT]]

A set of all registered subscribers.

Source code in pyventus/reactive/observables/observable.py
def get_subscribers(self) -> set[Subscriber[_OutT]]:
    """
    Retrieve all registered subscribers.

    :return: A set of all registered subscribers.
    """
    with self.__thread_lock:
        return self.__subscribers.copy()

get_subscriber_count

get_subscriber_count() -> int

Retrieve the number of registered subscribers.

RETURNS DESCRIPTION
int

The total count of subscribers in the observable.

Source code in pyventus/reactive/observables/observable.py
def get_subscriber_count(self) -> int:
    """
    Retrieve the number of registered subscribers.

    :return: The total count of subscribers in the observable.
    """
    with self.__thread_lock:
        return len(self.__subscribers)

contains_subscriber

contains_subscriber(subscriber: Subscriber[_OutT]) -> bool

Determine if the specified subscriber is present in the observable.

PARAMETER DESCRIPTION
subscriber

The subscriber to be checked.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
bool

True if the subscriber is found; False otherwise.

Source code in pyventus/reactive/observables/observable.py
def contains_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
    """
    Determine if the specified subscriber is present in the observable.

    :param subscriber: The subscriber to be checked.
    :return: `True` if the subscriber is found; `False` otherwise.
    """
    valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)
    with self.__thread_lock:
        return valid_subscriber in self.__subscribers

subscribe

subscribe(*, force_async: bool = False, stateful_subctx: bool = False) -> ObservableSubCtx[Self, _OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False) -> Subscriber[_OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False, stateful_subctx: bool = False) -> Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]

Subscribe the specified callbacks to the current Observable.

This method can be utilized in three ways:

  • As a regular function: Automatically creates and subscribes an observer with the specified callbacks.

  • As a decorator: Creates and subscribes an observer, using the decorated callback as the next callback.

  • As a context manager: Enables a step-by-step definition of the observer's callbacks prior to subscription, which occurs immediately after exiting the context.

PARAMETER DESCRIPTION
next_callback

The callback to be executed when the observable emits a new value.

TYPE: NextCallbackType[_OutT] | None DEFAULT: None

error_callback

The callback to be executed when the observable encounters an error.

TYPE: ErrorCallbackType | None DEFAULT: None

complete_callback

The callback that will be executed when the observable has completed emitting values.

TYPE: CompleteCallbackType | None DEFAULT: None

force_async

Determines whether to force all callbacks to run asynchronously.

TYPE: bool DEFAULT: False

stateful_subctx

A flag indicating whether the subscription context preserves its state (stateful) or not (stateless) after exiting the subscription block. If True, the context retains its state, allowing access to stored objects, including the observable and the subscriber object. If False, the context is stateless, and the stored state is cleared upon exiting the subscription block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]

A Subscriber if callbacks are provided; otherwise, an ObservableSubCtx.

Source code in pyventus/reactive/observables/observable.py
def subscribe(
    self,
    next_callback: NextCallbackType[_OutT] | None = None,
    error_callback: ErrorCallbackType | None = None,
    complete_callback: CompleteCallbackType | None = None,
    *,
    force_async: bool = False,
    stateful_subctx: bool = False,
) -> Subscriber[_OutT] | "Observable.ObservableSubCtx[Self, _OutT]":
    """
    Subscribe the specified callbacks to the current `Observable`.

    This method can be utilized in three ways:

    -   **As a regular function:** Automatically creates and subscribes an observer
        with the specified callbacks.

    -   **As a decorator:** Creates and subscribes an observer, using the decorated
        callback as the next callback.

    -   **As a context manager:** Enables a step-by-step definition of the observer's
        callbacks prior to subscription, which occurs immediately after exiting the context.

    :param next_callback: The callback to be executed when the observable emits a new value.
    :param error_callback: The callback to be executed when the observable encounters an error.
    :param complete_callback: The callback that will be executed when the observable has completed emitting values.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
    :param stateful_subctx: A flag indicating whether the subscription context preserves its state (`stateful`)
        or not (`stateless`) after exiting the subscription block. If `True`, the context retains its state,
        allowing access to stored objects, including the `observable` and the `subscriber` object. If `False`,
        the context is stateless, and the stored state is cleared upon exiting the subscription block to
        prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
    :return: A `Subscriber` if callbacks are provided; otherwise, an `ObservableSubCtx`.
    """
    if next_callback is None and error_callback is None and complete_callback is None:
        # If no callbacks are provided, create a subscription context for progressive definition.
        return Observable.ObservableSubCtx[Self, _OutT](
            observable=self,
            force_async=force_async,
            is_stateful=stateful_subctx,
        )
    else:
        # Create a subscriber with the provided callbacks.
        subscriber = Subscriber[_OutT](
            teardown_callback=self.remove_subscriber,
            next_callback=next_callback,
            error_callback=error_callback,
            complete_callback=complete_callback,
            force_async=force_async,
        )

        # Acquire lock to ensure thread safety.
        with self.__thread_lock:
            # Add the subscriber to the observable.
            self.__subscribers.add(subscriber)

        # Log the subscription if debug is enabled
        if self.__logger.debug_enabled:
            self.__logger.debug(action="Subscribed:", msg=f"{subscriber}")

        # Return the subscriber.
        return subscriber

remove_subscriber

remove_subscriber(subscriber: Subscriber[_OutT]) -> bool

Remove the specified subscriber from the observable.

PARAMETER DESCRIPTION
subscriber

The subscriber to be removed from the observable.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
bool

True if the subscriber was successfully removed; False if the subscriber was not found in the observable.

Source code in pyventus/reactive/observables/observable.py
def remove_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
    """
    Remove the specified subscriber from the observable.

    :param subscriber: The subscriber to be removed from the observable.
    :return: `True` if the subscriber was successfully removed; `False` if
        the subscriber was not found in the observable.
    """
    # Get the valid subscriber instance.
    valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)

    # Acquire lock to ensure thread safety.
    with self.__thread_lock:
        # Check if the subscriber is registered; return False if not.
        if valid_subscriber not in self.__subscribers:
            return False

        # Remove the subscriber from the observable.
        self.__subscribers.remove(valid_subscriber)

    # Log the removal if the debug mode is enabled
    if self.__logger.debug_enabled:
        self.__logger.debug(action="Removed:", msg=f"{valid_subscriber}")

    return True

remove_all

remove_all() -> bool

Remove all subscribers from the observable.

RETURNS DESCRIPTION
bool

True if the observable was successfully cleared; False if the observable was already empty.

Source code in pyventus/reactive/observables/observable.py
def remove_all(self) -> bool:
    """
    Remove all subscribers from the observable.

    :return: `True` if the observable was successfully cleared; `False`
        if the observable was already empty.
    """
    # Acquire lock to ensure thread safety
    with self.__thread_lock:
        # Check if the observable is already empty
        if not self.__subscribers:
            return False

        # Clear the observable
        self.__subscribers.clear()

    if self.__logger.debug_enabled:
        self.__logger.debug(action="Removed:", msg="All subscribers.")

    return True

__init__

__init__(debug: bool | None = None) -> None

Initialize an instance of ObservableStream.

PARAMETER DESCRIPTION
debug

Specifies the debug mode for the logger. If None, the mode is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/reactive/observables/observable_stream.py
def __init__(self, debug: bool | None = None) -> None:
    """
    Initialize an instance of `ObservableStream`.

    :param debug: Specifies the debug mode for the logger. If `None`, the mode is determined based on the
        execution environment.
    """
    # Initialize the base Observable class with the specified debug mode.
    super().__init__(debug=debug)

    # Create an AsyncIO processing service to manage data entries and notifications.
    # The enforce_submission_order option is enabled to ensure that entries are processed sequentially.
    self.__processing_service: AsyncIOProcessingService = AsyncIOProcessingService(enforce_submission_order=True)

wait_for_tasks async

wait_for_tasks() -> None

Wait for all background tasks in the current asyncio loop associated with the ObservableStream to complete.

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_stream.py
async def wait_for_tasks(self) -> None:
    """
    Wait for all background tasks in the current asyncio loop associated with the `ObservableStream` to complete.

    :return: None.
    """
    await self.__processing_service.wait_for_tasks()

next

next(value: _OutT) -> None

Push a value entry to the stream and notify all subscribers.

PARAMETER DESCRIPTION
value

The value entry to be pushed to the stream.

TYPE: _OutT

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_stream.py
@final  # Prevent overriding in subclasses to maintain the integrity of the `_OutT` type.
def next(self, value: _OutT) -> None:  # type: ignore[misc]
    """
    Push a value entry to the stream and notify all subscribers.

    :param value: The value entry to be pushed to the stream.
    :return: None.
    """
    with self._thread_lock:
        subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
    self.__processing_service.submit(self._emit_next, value, subscribers)

error

error(exception: Exception) -> None

Push an error entry to the stream and notify all subscribers.

PARAMETER DESCRIPTION
exception

The error entry to be pushed to the stream.

TYPE: Exception

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_stream.py
def error(self, exception: Exception) -> None:
    """
    Push an error entry to the stream and notify all subscribers.

    :param exception: The error entry to be pushed to the stream.
    :return: None.
    """
    with self._thread_lock:
        subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
    self.__processing_service.submit(self._emit_error, exception, subscribers)

complete

complete() -> None

Push a completion entry to the stream and notify all subscribers.

Once the notification is sent, all notified subscribers will be removed.

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_stream.py
def complete(self) -> None:
    """
    Push a completion entry to the stream and notify all subscribers.

    Once the notification is sent, all notified subscribers will be removed.

    :return: None.
    """
    with self._thread_lock:
        subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
        self._subscribers.clear()
    self.__processing_service.submit(self._emit_complete, subscribers)