Skip to content

ObservableTask class

Bases: Generic[_OutT], Observable[_OutT]

An observable subclass that encapsulates a unit of work and offers a mechanism for streaming its results reactively.

Notes:

  • The ObservableTask class facilitates deferred execution of tasks, allowing subscribers to receive results incrementally as they become available.

  • This class supports the encapsulation of tasks that can be either standard functions or methods, as well as generator functions.

  • Results are streamed to subscribers in a lazy manner, meaning they are produced on demand rather than all at once.

Source code in pyventus/reactive/observables/observable_task.py
class ObservableTask(Generic[_OutT], Observable[_OutT]):
    """
    An observable subclass that encapsulates a unit of work and offers a mechanism for streaming its results reactively.

    **Notes:**

    -   The `ObservableTask` class facilitates deferred execution of tasks, allowing subscribers to receive results
        incrementally as they become available.

    -   This class supports the encapsulation of tasks that can be either standard functions or methods, as well as
        generator functions.

    -   Results are streamed to subscribers in a lazy manner, meaning they are produced on demand rather than all
        at once.
    """

    # Attributes for the ObservableTask
    __slots__ = ("__callback", "__args", "__kwargs", "__processing_service")

    def __init__(
        self,
        callback: ObservableTaskCallbackType[_OutT],
        args: tuple[Any, ...] | None = None,
        kwargs: dict[str, Any] | None = None,
        debug: bool | None = None,
    ) -> None:
        """
        Initialize an instance of `ObservableTask`.

        :param callback: The callback to be encapsulated and made observable.
        :param args: Positional arguments to be passed to the callback.
        :param kwargs: Keyword arguments to be passed to the callback.
        :param debug: Specifies the debug mode for the logger. If `None`,
            the mode is determined based on the execution environment.
        """
        # Initialize the base Observable class with the given debug value.
        super().__init__(debug=debug)

        # Validate the args argument.
        if args and not isinstance(args, tuple):
            raise PyventusException("The 'args' argument must be a tuple.")

        # Validate the kwargs argument.
        if kwargs and not isinstance(kwargs, dict):
            raise PyventusException("The 'kwargs' argument must be a dictionary.")

        # Wrap and set the callback along with its arguments.
        self.__callback = CallableWrapper[..., _OutT](callback, force_async=False)
        self.__args: tuple[Any, ...] = args if args else ()
        self.__kwargs: dict[str, Any] = kwargs if kwargs else {}

        # Set up an AsyncIO processing service for handling the callback execution.
        self.__processing_service: AsyncIOProcessingService = AsyncIOProcessingService()

    @override
    def __repr__(self) -> str:
        return formatted_repr(
            instance=self,
            info=(
                attributes_repr(
                    callback=self.__callback,
                    args=self.__args,
                    kwargs=self.__kwargs,
                    processing_service=self.__processing_service,
                )
                + f", {super().__repr__()}"
            ),
        )

    async def __execute(self) -> None:
        """
        Execute the main callback and emit results to subscribers.

        This method invokes the callback of the ObservableTask with
        the provided arguments and emits results to subscribers in
        a lazy-push manner.

        :return: None.
        """
        try:
            if self.__callback.is_generator:
                # Stream values from the generator callback and emit each value to subscribers.
                async for g_value in self.__callback.stream(*self.__args, **self.__kwargs):
                    await self._emit_next(value=g_value)
            else:
                # Execute the regular callback and emit the result to subscribers.
                r_value: _OutT = await self.__callback.execute(*self.__args, **self.__kwargs)
                await self._emit_next(value=r_value)

                # Indicate that the callback execution is complete.
                raise Completed from None
        except Observable.Completed:
            # Notify subscribers that the observable
            # has completed emitting values.
            await self._emit_complete()
        except Exception as exception:
            # Notify subscribers of any errors
            # encountered during execution.
            await self._emit_error(exception)

    async def wait_for_tasks(self) -> None:
        """
        Wait for all background tasks associated with the `ObservableTask` to complete.

        It ensures that any ongoing tasks are finished before proceeding.

        :return: None.
        """
        # Await the completion of all background tasks.
        await self.__processing_service.wait_for_tasks()

    @contextmanager
    def to_thread(
        self, executor: ThreadPoolExecutor | None = None, shutdown: bool = False
    ) -> Generator[Self, None, None]:
        """
        Configure the execution context block for processing the `ObservableTask` using a thread-based executor.

        This method allows the `ObservableTask` to be executed in a separate thread, utilizing the specified
        executor. Upon exiting the context, the observable task is executed within the provided executor.

        :param executor: An optional `ThreadPoolExecutor` instance for executing the `ObservableTask`.
            If `None`, a new `ThreadPoolExecutor` with default settings will be created and automatically
            shut down after execution.
        :param shutdown: A flag indicating whether to shut down the specified executor upon exiting the
            context. If the executor is `None`, the new executor will always be shut down when the
            context is exited.
        :return: The current ObservableTask instance.
        """
        # Yield the current ObservableTask
        # instance for use within the context.
        yield self

        if executor:
            # Execute the observable task using the provided executor.
            self(executor=executor)

            # Shut down the provided executor if the shutdown flag is set to True.
            if shutdown:
                executor.shutdown()
        else:
            # Create a new ThreadPoolExecutor, execute the observable task
            # within that new thread, and shut it down after execution.
            new_executor = ThreadPoolExecutor()
            self(executor=new_executor)
            new_executor.shutdown()

    def __enter__(self: Self) -> Self:
        """
        Enter the execution context of the observable task.

        This method facilitates interaction with the observable task object
        and ensures that the task is executed upon exiting the context block.

        :return: The observable task instance.
        """
        return self

    def __exit__(
        self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
    ) -> None:
        """
        Exit the execution context of the observable task.

        This method triggers the execution of the observable task
        upon exiting the context block.

        :param exc_type: The type of the raised exception, if any.
        :param exc_val: The raised exception object, if any.
        :param exc_tb: The traceback information, if any.
        :return: None.
        """
        # Execute the observable task.
        self(executor=None)

    def __call__(self, executor: ThreadPoolExecutor | None = None) -> None:
        """
        Execute the current `ObservableTask`.

        **Notes:**

        -   When a thread-based executor is provided, the execution of
            the `ObservableTask` is submitted to that thread.

        -   If no executor is provided, the execution is submitted to
            the AsyncIO processing service of the current `ObservableTask`
            instance.

        -   The execution behavior within the AsyncIO processing service
            depends on whether an AsyncIO event loop is running. For more
            information, refer to the `AsyncIOProcessingService`.

        :param executor: An optional thread-based executor instance for
            processing the `ObservableTask`'s execution.
        :return: None.
        """
        if executor is None:
            # Submit the ObservableTask's execution to the AsyncIO processing service.
            self.__processing_service.submit(self.__execute)
        else:
            # Ensure the provided executor is a ThreadPoolExecutor instance.
            if not isinstance(executor, ThreadPoolExecutor):
                raise PyventusException("The 'executor' argument must be an instance of ThreadPoolExecutor.")

            # Submit the ObservableTask's execution to the specified thread-based executor.
            executor.submit(self.__processing_service.submit, self.__execute)

Functions

get_valid_subscriber staticmethod

get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]

Validate and return the specified subscriber.

PARAMETER DESCRIPTION
subscriber

The subscriber to validate.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
Subscriber[_OutT]

The validated subscriber.

RAISES DESCRIPTION
PyventusException

If the subscriber is not an instance of Subscriber.

Source code in pyventus/reactive/observables/observable.py
@staticmethod
def get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]:
    """
    Validate and return the specified subscriber.

    :param subscriber: The subscriber to validate.
    :return: The validated subscriber.
    :raises PyventusException: If the subscriber is not an instance of `Subscriber`.
    """
    # Validate that the subscriber is an instance of Subscriber.
    if not isinstance(subscriber, Subscriber):
        raise PyventusException("The 'subscriber' argument must be an instance of Subscriber.")
    return subscriber

get_subscribers

get_subscribers() -> set[Subscriber[_OutT]]

Retrieve all registered subscribers.

RETURNS DESCRIPTION
set[Subscriber[_OutT]]

A set of all registered subscribers.

Source code in pyventus/reactive/observables/observable.py
def get_subscribers(self) -> set[Subscriber[_OutT]]:
    """
    Retrieve all registered subscribers.

    :return: A set of all registered subscribers.
    """
    with self.__thread_lock:
        return self.__subscribers.copy()

get_subscriber_count

get_subscriber_count() -> int

Retrieve the number of registered subscribers.

RETURNS DESCRIPTION
int

The total count of subscribers in the observable.

Source code in pyventus/reactive/observables/observable.py
def get_subscriber_count(self) -> int:
    """
    Retrieve the number of registered subscribers.

    :return: The total count of subscribers in the observable.
    """
    with self.__thread_lock:
        return len(self.__subscribers)

contains_subscriber

contains_subscriber(subscriber: Subscriber[_OutT]) -> bool

Determine if the specified subscriber is present in the observable.

PARAMETER DESCRIPTION
subscriber

The subscriber to be checked.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
bool

True if the subscriber is found; False otherwise.

Source code in pyventus/reactive/observables/observable.py
def contains_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
    """
    Determine if the specified subscriber is present in the observable.

    :param subscriber: The subscriber to be checked.
    :return: `True` if the subscriber is found; `False` otherwise.
    """
    valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)
    with self.__thread_lock:
        return valid_subscriber in self.__subscribers

subscribe

subscribe(*, force_async: bool = False, stateful_subctx: bool = False) -> ObservableSubCtx[Self, _OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False) -> Subscriber[_OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False, stateful_subctx: bool = False) -> Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]

Subscribe the specified callbacks to the current Observable.

This method can be utilized in three ways:

  • As a regular function: Automatically creates and subscribes an observer with the specified callbacks.

  • As a decorator: Creates and subscribes an observer, using the decorated callback as the next callback.

  • As a context manager: Enables a step-by-step definition of the observer's callbacks prior to subscription, which occurs immediately after exiting the context.

PARAMETER DESCRIPTION
next_callback

The callback to be executed when the observable emits a new value.

TYPE: NextCallbackType[_OutT] | None DEFAULT: None

error_callback

The callback to be executed when the observable encounters an error.

TYPE: ErrorCallbackType | None DEFAULT: None

complete_callback

The callback that will be executed when the observable has completed emitting values.

TYPE: CompleteCallbackType | None DEFAULT: None

force_async

Determines whether to force all callbacks to run asynchronously.

TYPE: bool DEFAULT: False

stateful_subctx

A flag indicating whether the subscription context preserves its state (stateful) or not (stateless) after exiting the subscription block. If True, the context retains its state, allowing access to stored objects, including the observable and the subscriber object. If False, the context is stateless, and the stored state is cleared upon exiting the subscription block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]

A Subscriber if callbacks are provided; otherwise, an ObservableSubCtx.

Source code in pyventus/reactive/observables/observable.py
def subscribe(
    self,
    next_callback: NextCallbackType[_OutT] | None = None,
    error_callback: ErrorCallbackType | None = None,
    complete_callback: CompleteCallbackType | None = None,
    *,
    force_async: bool = False,
    stateful_subctx: bool = False,
) -> Subscriber[_OutT] | "Observable.ObservableSubCtx[Self, _OutT]":
    """
    Subscribe the specified callbacks to the current `Observable`.

    This method can be utilized in three ways:

    -   **As a regular function:** Automatically creates and subscribes an observer
        with the specified callbacks.

    -   **As a decorator:** Creates and subscribes an observer, using the decorated
        callback as the next callback.

    -   **As a context manager:** Enables a step-by-step definition of the observer's
        callbacks prior to subscription, which occurs immediately after exiting the context.

    :param next_callback: The callback to be executed when the observable emits a new value.
    :param error_callback: The callback to be executed when the observable encounters an error.
    :param complete_callback: The callback that will be executed when the observable has completed emitting values.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
    :param stateful_subctx: A flag indicating whether the subscription context preserves its state (`stateful`)
        or not (`stateless`) after exiting the subscription block. If `True`, the context retains its state,
        allowing access to stored objects, including the `observable` and the `subscriber` object. If `False`,
        the context is stateless, and the stored state is cleared upon exiting the subscription block to
        prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
    :return: A `Subscriber` if callbacks are provided; otherwise, an `ObservableSubCtx`.
    """
    if next_callback is None and error_callback is None and complete_callback is None:
        # If no callbacks are provided, create a subscription context for progressive definition.
        return Observable.ObservableSubCtx[Self, _OutT](
            observable=self,
            force_async=force_async,
            is_stateful=stateful_subctx,
        )
    else:
        # Create a subscriber with the provided callbacks.
        subscriber = Subscriber[_OutT](
            teardown_callback=self.remove_subscriber,
            next_callback=next_callback,
            error_callback=error_callback,
            complete_callback=complete_callback,
            force_async=force_async,
        )

        # Acquire lock to ensure thread safety.
        with self.__thread_lock:
            # Add the subscriber to the observable.
            self.__subscribers.add(subscriber)

        # Log the subscription if debug is enabled
        if self.__logger.debug_enabled:
            self.__logger.debug(action="Subscribed:", msg=f"{subscriber}")

        # Return the subscriber.
        return subscriber

remove_subscriber

remove_subscriber(subscriber: Subscriber[_OutT]) -> bool

Remove the specified subscriber from the observable.

PARAMETER DESCRIPTION
subscriber

The subscriber to be removed from the observable.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
bool

True if the subscriber was successfully removed; False if the subscriber was not found in the observable.

Source code in pyventus/reactive/observables/observable.py
def remove_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
    """
    Remove the specified subscriber from the observable.

    :param subscriber: The subscriber to be removed from the observable.
    :return: `True` if the subscriber was successfully removed; `False` if
        the subscriber was not found in the observable.
    """
    # Get the valid subscriber instance.
    valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)

    # Acquire lock to ensure thread safety.
    with self.__thread_lock:
        # Check if the subscriber is registered; return False if not.
        if valid_subscriber not in self.__subscribers:
            return False

        # Remove the subscriber from the observable.
        self.__subscribers.remove(valid_subscriber)

    # Log the removal if the debug mode is enabled
    if self.__logger.debug_enabled:
        self.__logger.debug(action="Removed:", msg=f"{valid_subscriber}")

    return True

remove_all

remove_all() -> bool

Remove all subscribers from the observable.

RETURNS DESCRIPTION
bool

True if the observable was successfully cleared; False if the observable was already empty.

Source code in pyventus/reactive/observables/observable.py
def remove_all(self) -> bool:
    """
    Remove all subscribers from the observable.

    :return: `True` if the observable was successfully cleared; `False`
        if the observable was already empty.
    """
    # Acquire lock to ensure thread safety
    with self.__thread_lock:
        # Check if the observable is already empty
        if not self.__subscribers:
            return False

        # Clear the observable
        self.__subscribers.clear()

    if self.__logger.debug_enabled:
        self.__logger.debug(action="Removed:", msg="All subscribers.")

    return True

__init__

__init__(callback: ObservableTaskCallbackType[_OutT], args: tuple[Any, ...] | None = None, kwargs: dict[str, Any] | None = None, debug: bool | None = None) -> None

Initialize an instance of ObservableTask.

PARAMETER DESCRIPTION
callback

The callback to be encapsulated and made observable.

TYPE: ObservableTaskCallbackType[_OutT]

args

Positional arguments to be passed to the callback.

TYPE: tuple[Any, ...] | None DEFAULT: None

kwargs

Keyword arguments to be passed to the callback.

TYPE: dict[str, Any] | None DEFAULT: None

debug

Specifies the debug mode for the logger. If None, the mode is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/reactive/observables/observable_task.py
def __init__(
    self,
    callback: ObservableTaskCallbackType[_OutT],
    args: tuple[Any, ...] | None = None,
    kwargs: dict[str, Any] | None = None,
    debug: bool | None = None,
) -> None:
    """
    Initialize an instance of `ObservableTask`.

    :param callback: The callback to be encapsulated and made observable.
    :param args: Positional arguments to be passed to the callback.
    :param kwargs: Keyword arguments to be passed to the callback.
    :param debug: Specifies the debug mode for the logger. If `None`,
        the mode is determined based on the execution environment.
    """
    # Initialize the base Observable class with the given debug value.
    super().__init__(debug=debug)

    # Validate the args argument.
    if args and not isinstance(args, tuple):
        raise PyventusException("The 'args' argument must be a tuple.")

    # Validate the kwargs argument.
    if kwargs and not isinstance(kwargs, dict):
        raise PyventusException("The 'kwargs' argument must be a dictionary.")

    # Wrap and set the callback along with its arguments.
    self.__callback = CallableWrapper[..., _OutT](callback, force_async=False)
    self.__args: tuple[Any, ...] = args if args else ()
    self.__kwargs: dict[str, Any] = kwargs if kwargs else {}

    # Set up an AsyncIO processing service for handling the callback execution.
    self.__processing_service: AsyncIOProcessingService = AsyncIOProcessingService()

wait_for_tasks async

wait_for_tasks() -> None

Wait for all background tasks associated with the ObservableTask to complete.

It ensures that any ongoing tasks are finished before proceeding.

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_task.py
async def wait_for_tasks(self) -> None:
    """
    Wait for all background tasks associated with the `ObservableTask` to complete.

    It ensures that any ongoing tasks are finished before proceeding.

    :return: None.
    """
    # Await the completion of all background tasks.
    await self.__processing_service.wait_for_tasks()

to_thread

to_thread(executor: ThreadPoolExecutor | None = None, shutdown: bool = False) -> Generator[Self, None, None]

Configure the execution context block for processing the ObservableTask using a thread-based executor.

This method allows the ObservableTask to be executed in a separate thread, utilizing the specified executor. Upon exiting the context, the observable task is executed within the provided executor.

PARAMETER DESCRIPTION
executor

An optional ThreadPoolExecutor instance for executing the ObservableTask. If None, a new ThreadPoolExecutor with default settings will be created and automatically shut down after execution.

TYPE: ThreadPoolExecutor | None DEFAULT: None

shutdown

A flag indicating whether to shut down the specified executor upon exiting the context. If the executor is None, the new executor will always be shut down when the context is exited.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Generator[Self, None, None]

The current ObservableTask instance.

Source code in pyventus/reactive/observables/observable_task.py
@contextmanager
def to_thread(
    self, executor: ThreadPoolExecutor | None = None, shutdown: bool = False
) -> Generator[Self, None, None]:
    """
    Configure the execution context block for processing the `ObservableTask` using a thread-based executor.

    This method allows the `ObservableTask` to be executed in a separate thread, utilizing the specified
    executor. Upon exiting the context, the observable task is executed within the provided executor.

    :param executor: An optional `ThreadPoolExecutor` instance for executing the `ObservableTask`.
        If `None`, a new `ThreadPoolExecutor` with default settings will be created and automatically
        shut down after execution.
    :param shutdown: A flag indicating whether to shut down the specified executor upon exiting the
        context. If the executor is `None`, the new executor will always be shut down when the
        context is exited.
    :return: The current ObservableTask instance.
    """
    # Yield the current ObservableTask
    # instance for use within the context.
    yield self

    if executor:
        # Execute the observable task using the provided executor.
        self(executor=executor)

        # Shut down the provided executor if the shutdown flag is set to True.
        if shutdown:
            executor.shutdown()
    else:
        # Create a new ThreadPoolExecutor, execute the observable task
        # within that new thread, and shut it down after execution.
        new_executor = ThreadPoolExecutor()
        self(executor=new_executor)
        new_executor.shutdown()

__enter__

__enter__() -> Self

Enter the execution context of the observable task.

This method facilitates interaction with the observable task object and ensures that the task is executed upon exiting the context block.

RETURNS DESCRIPTION
Self

The observable task instance.

Source code in pyventus/reactive/observables/observable_task.py
def __enter__(self: Self) -> Self:
    """
    Enter the execution context of the observable task.

    This method facilitates interaction with the observable task object
    and ensures that the task is executed upon exiting the context block.

    :return: The observable task instance.
    """
    return self

__exit__

__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None

Exit the execution context of the observable task.

This method triggers the execution of the observable task upon exiting the context block.

PARAMETER DESCRIPTION
exc_type

The type of the raised exception, if any.

TYPE: type[BaseException] | None

exc_val

The raised exception object, if any.

TYPE: BaseException | None

exc_tb

The traceback information, if any.

TYPE: TracebackType | None

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_task.py
def __exit__(
    self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
    """
    Exit the execution context of the observable task.

    This method triggers the execution of the observable task
    upon exiting the context block.

    :param exc_type: The type of the raised exception, if any.
    :param exc_val: The raised exception object, if any.
    :param exc_tb: The traceback information, if any.
    :return: None.
    """
    # Execute the observable task.
    self(executor=None)

__call__

__call__(executor: ThreadPoolExecutor | None = None) -> None

Execute the current ObservableTask.

Notes:

  • When a thread-based executor is provided, the execution of the ObservableTask is submitted to that thread.

  • If no executor is provided, the execution is submitted to the AsyncIO processing service of the current ObservableTask instance.

  • The execution behavior within the AsyncIO processing service depends on whether an AsyncIO event loop is running. For more information, refer to the AsyncIOProcessingService.

PARAMETER DESCRIPTION
executor

An optional thread-based executor instance for processing the ObservableTask's execution.

TYPE: ThreadPoolExecutor | None DEFAULT: None

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_task.py
def __call__(self, executor: ThreadPoolExecutor | None = None) -> None:
    """
    Execute the current `ObservableTask`.

    **Notes:**

    -   When a thread-based executor is provided, the execution of
        the `ObservableTask` is submitted to that thread.

    -   If no executor is provided, the execution is submitted to
        the AsyncIO processing service of the current `ObservableTask`
        instance.

    -   The execution behavior within the AsyncIO processing service
        depends on whether an AsyncIO event loop is running. For more
        information, refer to the `AsyncIOProcessingService`.

    :param executor: An optional thread-based executor instance for
        processing the `ObservableTask`'s execution.
    :return: None.
    """
    if executor is None:
        # Submit the ObservableTask's execution to the AsyncIO processing service.
        self.__processing_service.submit(self.__execute)
    else:
        # Ensure the provided executor is a ThreadPoolExecutor instance.
        if not isinstance(executor, ThreadPoolExecutor):
            raise PyventusException("The 'executor' argument must be an instance of ThreadPoolExecutor.")

        # Submit the ObservableTask's execution to the specified thread-based executor.
        executor.submit(self.__processing_service.submit, self.__execute)