Skip to content

ObservableValue class

Bases: Generic[_OutT], Observable[_OutT]

An observable subclass that encapsulates a value and offers a mechanism for streaming its updates reactively.

Notes:

  • The ObservableValue class is a value-centric observable that focuses solely on a single value and its changes over time. It notifies subscribers of the next value when valid, of an error when the value is deemed invalid by a validator, and of completion when the value is cleared and reset.

  • Validators are responsible for validating incoming values. When a value is deemed invalid, the validator must raise an exception so that an error notification can be triggered. However, despite the value being invalid, it is stored alongside the raised exception, ensuring that both remain accessible until a new change is made.

  • Update and retrieval operations are managed through a queue, ensuring that their order is preserved and no inconsistent states or notifications are generated during execution.

  • Changes to the value are delivered to subscribers in a lazy manner, allowing them to receive incremental notifications as they occur.

Source code in pyventus/reactive/observables/observable_value.py
class ObservableValue(Generic[_OutT], Observable[_OutT]):
    """
    An observable subclass that encapsulates a value and offers a mechanism for streaming its updates reactively.

    **Notes:**

    -   The `ObservableValue` class is a value-centric observable that focuses solely on a single value and its
        changes over time. It notifies subscribers of the next value when valid, of an error when the value is
        deemed invalid by a validator, and of completion when the value is cleared and reset.

    -   Validators are responsible for validating incoming values. When a value is deemed invalid, the validator
        must raise an exception so that an error notification can be triggered. However, despite the value being
        invalid, it is stored alongside the raised exception, ensuring that both remain accessible until a new
        change is made.

    -   Update and retrieval operations are managed through a queue, ensuring that their order is preserved and
        no inconsistent states or notifications are generated during execution.

    -   Changes to the value are delivered to subscribers in a lazy manner, allowing them to receive incremental
        notifications as they occur.
    """

    # Attributes for the ObservableValue
    __slots__ = ("__seed", "__value", "__exception", "__validators", "__processing_service")

    def __init__(
        self,
        seed: _OutT,
        validators: list[ObservableValueValidatorType[_OutT]] | None = None,
        debug: bool | None = None,
    ) -> None:
        """
        Initialize an instance of `ObservableValue`.

        :param seed: The initial value for the observable. This value is used during initialization and reset
            operations to restore the observable to its original state. No validation is applied to this value.
        :param validators: A list of validators that check incoming values. When a value is deemed invalid,
            the validator must raise an exception to trigger an error notification. Validators can be either
            synchronous or asynchronous callables.
        :param debug: Specifies the debug mode for the logger. If `None`, the mode is determined based
            on the execution environment.
        """
        # Initialize the base Observable class with the specified debug mode.
        super().__init__(debug=debug)

        # Check if validators are provided and ensure they are of type list.
        if validators is not None and not isinstance(validators, list):
            raise PyventusException("The 'validators' argument must be a list.")

        # Set the initial seed value and initialize the current value and exception.
        self.__seed: _OutT = seed
        self.__value: _OutT = self.__seed
        self.__exception: Exception | None = None

        # Wrap validators in a unified interface if they are provided.
        self.__validators: list[CallableWrapper[[_OutT], None]] | None = (
            [CallableWrapper(validator, force_async=False) for validator in validators] if validators else None
        )

        # Create an AsyncIO processing service to manage value updates and notifications.
        # The enforce_submission_order option is enabled to ensure that changes are processed sequentially.
        self.__processing_service: AsyncIOProcessingService = AsyncIOProcessingService(enforce_submission_order=True)

    @override
    def __repr__(self) -> str:
        return formatted_repr(
            instance=self,
            info=(
                attributes_repr(
                    seed=self.__seed,
                    value=self.__value,
                    exception=self.__exception,
                    validators=self.__validators,
                    processing_service=self.__processing_service,
                )
                + f", {super().__repr__()}"
            ),
        )

    async def _get_error(self, callback: CallableWrapper[[Exception | None], None]) -> None:
        """
        Retrieve the current error, if any, through the specified callback.

        :param callback: The callback that receives the current error, if any.
        :return: None.
        """
        with self._thread_lock:
            exception: Exception | None = self.__exception
        await callback.execute(exception)

    async def _get_value(self, callback: CallableWrapper[[_OutT], None]) -> None:
        """
        Retrieve the current value through the specified callback.

        :param callback: The callback that receives the current value.
        :return: None.
        """
        with self._thread_lock:
            value: _OutT = self.__value
        await callback.execute(value)

    @final  # Prevent overriding in subclasses to maintain the integrity of the _OutT type.
    async def _set_value(self, value: _OutT, subscribers: tuple[Subscriber[_OutT], ...]) -> None:  # type: ignore[misc]
        """
        Update the current value to the specified one.

        The provided value is validated against the defined validators. If it is deemed valid, it is stored,
        and the next notification is triggered. If the value is considered invalid by any of the validators,
        it is stored alongside the raised exception, and an error notification is issued.

        :param value: The value to set as the current value.
        :param subscribers: The collection of subscribers to be notified of the value update.
        :return: None.
        """
        try:
            # Validate the value using defined validators, if any.
            if self.__validators:
                for validator in self.__validators:
                    await validator.execute(value)

            # Acquire lock to ensure thread safety.
            with self._thread_lock:
                # Log value changes if debug mode is enabled.
                if self._logger.debug_enabled:
                    self._logger.debug(
                        action="Updating Value:",
                        msg=f"{self.__value!r} %(levelcolor)s➝ %(defaultcolor)s {value!r}.",
                    )
                    self._logger.debug(
                        action="Updating Error:",
                        msg=f"{self.__exception!r} %(levelcolor)s➝ %(defaultcolor)s None.",
                    )

                # Update the current value and reset the exception.
                self.__value = value
                self.__exception = None

            # Notify subscribers of the new valid value.
            await self._emit_next(value, subscribers)
        except Exception as exception:
            # Acquire lock to ensure thread safety.
            with self._thread_lock:
                # Log value changes if debug mode is enabled.
                if self._logger.debug_enabled:
                    self._logger.debug(
                        action="Updating Value:",
                        msg=f"{self.__value!r} %(levelcolor)s➝ %(defaultcolor)s {value!r}.",
                    )
                    self._logger.debug(
                        action="Updating Error:",
                        msg=f"{self.__exception!r} %(levelcolor)s➝ %(defaultcolor)s {exception!r}.",
                    )

                # Store the current value and the raised exception.
                self.__value = value
                self.__exception = exception

            # Notify subscribers of the error encountered.
            await self._emit_error(exception, subscribers)

    async def _clear_value(self, subscribers: tuple[Subscriber[_OutT], ...]) -> None:
        """
        Clear the current value and reset it to its initial state.

        This operation will trigger the completion notification and remove the specified subscribers.

        :param subscribers: The collection of subscribers to be notified of the value reset.
        :return: None.
        """
        # Acquire lock to ensure thread safety.
        with self._thread_lock:
            # Log value changes if debug mode is enabled.
            if self._logger.debug_enabled:
                self._logger.debug(
                    action="Clearing Value:",
                    msg=f"{self.__value!r} %(levelcolor)s➝ %(defaultcolor)s {self.__seed!r}.",
                )
                self._logger.debug(
                    action="Clearing Error:",
                    msg=f"{self.__exception!r} %(levelcolor)s➝ %(defaultcolor)s None.",
                )

            # Reset the value and exception to their initial state.
            self.__value = self.__seed
            self.__exception = None

        # Notify subscribers that the value has been cleared and reset.
        await self._emit_complete(subscribers)

    async def _prime_subscribers(self, subscribers: set[Subscriber[_OutT]]) -> None:
        """
        Prime the specified subscribers with the current value or error.

        :param subscribers: The set of subscribers to be primed.
        :return: None.
        """
        # Acquire a lock to ensure thread safety.
        with self._thread_lock:
            value: _OutT = self.__value
            exception: Exception | None = self.__exception

        # Log the number of subscribers to prime if debug mode is enabled.
        if self._logger.debug_enabled:
            self._logger.debug(action="Priming Subscribers:", msg=f"{len(subscribers)} total.")

        # Notify the subscribers accordingly.
        if exception is None:
            await self._emit_next(value, subscribers)
        else:
            await self._emit_error(exception, subscribers)

    async def wait_for_tasks(self) -> None:
        """
        Wait for all background tasks in the current asyncio loop associated with the `ObservableValue` to complete.

        :return: None.
        """
        await self.__processing_service.wait_for_tasks()

    def get_error(
        self, callback: Callable[[Exception | None], None | Awaitable[None]], force_async: bool = False
    ) -> None:
        """
        Retrieve the current error, if any.

        The current error will be received through the parameters of the provided callback, and its execution will be
        enqueued with the other operations to ensure that the received error corresponds to the correct state at the
        time the current method was invoked, preventing any inconsistencies.

        :param callback: The callback to be invoked with the current error, if any.
        :param force_async: Determines whether to force the callback to run asynchronously. If `True`, the synchronous
            callback will be converted to run asynchronously in a thread pool using the `asyncio.to_thread` function.
            If `False`, the callback will run synchronously or asynchronously as defined.
        :return: None.
        :raises PyventusException: If the provided callback is a generator.
        """
        # Wrap the callback to ensure a consistent interface for invocation.
        valid_callback: CallableWrapper[[Exception | None], None] = CallableWrapper(callback, force_async=force_async)

        # Verify that the provided callback is not a generator.
        if valid_callback.is_generator:
            raise PyventusException("The 'callback' argument cannot be a generator.")

        # Schedule the processing of the error retrieval with the provided callback, ensuring that the received error
        # corresponds to the state at the time the `get_error` method was invoked, preventing any inconsistencies.
        self.__processing_service.submit(self._get_error, valid_callback)

    def get_value(self, callback: Callable[[_OutT], None | Awaitable[None]], force_async: bool = False) -> None:
        """
        Retrieve the current value.

        The current value will be received through the parameters of the provided callback, and its execution will be
        enqueued with the other operations to ensure that the received value corresponds to the correct state at the
        time the current method was invoked, preventing any inconsistencies.

        :param callback: The callback to be invoked with the current value.
        :param force_async: Determines whether to force the callback to run asynchronously. If `True`, the synchronous
            callback will be converted to run asynchronously in a thread pool using the `asyncio.to_thread` function.
            If `False`, the callback will run synchronously or asynchronously as defined.
        :return: None.
        :raises PyventusException: If the provided callback is a generator.
        """
        # Wrap the callback to ensure a consistent interface for invocation.
        valid_callback: CallableWrapper[[_OutT], None] = CallableWrapper(callback, force_async=force_async)

        # Verify that the provided callback is not a generator.
        if valid_callback.is_generator:
            raise PyventusException("The 'callback' argument cannot be a generator.")

        # Schedule the processing of the value retrieval with the provided callback, ensuring that the received value
        # corresponds to the state at the time the `get_value` method was invoked, preventing any inconsistencies.
        self.__processing_service.submit(self._get_value, valid_callback)

    @final  # Prevent overriding in subclasses to maintain the integrity of the _OutT type.
    def set_value(self, value: _OutT) -> None:  # type: ignore[misc]
        """
        Update the current value to the specified one.

        The provided value is validated against the defined validators. If it is deemed valid, it is stored,
        and the next notification is triggered. If the value is considered invalid by any of the validators,
        it is stored alongside the raised exception, and an error notification is issued.

        :param value: The value to set as the current value.
        :return: None.
        """
        with self._thread_lock:
            subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
        self.__processing_service.submit(self._set_value, value, subscribers)

    def clear_value(self) -> None:
        """
        Clear the current value and reset it to its initial state.

        This operation will trigger the completion notification and the removal of all current subscribers.

        :return: None.
        """
        with self._thread_lock:
            subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
            self._subscribers.clear()
        self.__processing_service.submit(self._clear_value, subscribers)

    @overload
    def prime_subscribers(self, subscriber: Subscriber[_OutT], /) -> Subscriber[_OutT]: ...

    @overload
    def prime_subscribers(self, *subscribers: Subscriber[_OutT]) -> tuple[Subscriber[_OutT], ...]: ...

    def prime_subscribers(self, *subscribers: Subscriber[_OutT]) -> Subscriber[_OutT] | tuple[Subscriber[_OutT], ...]:
        """
        Prime the specified subscribers with the current value or error.

        :param subscribers: One or more subscribers to be primed.
        :return: The same subscribers as input.
        :raises PyventusException: If no subscribers are given.
        """
        if not subscribers:
            raise PyventusException("At least one subscriber must be provided.")

        if len(subscribers) == 1:
            self.__processing_service.submit(
                self._prime_subscribers,
                {self.get_valid_subscriber(subscribers[0])},
            )
            return subscribers[0]
        else:
            self.__processing_service.submit(
                self._prime_subscribers,
                {self.get_valid_subscriber(subscriber) for subscriber in subscribers},
            )
            return subscribers

Functions

get_valid_subscriber staticmethod

get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]

Validate and return the specified subscriber.

PARAMETER DESCRIPTION
subscriber

The subscriber to validate.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
Subscriber[_OutT]

The validated subscriber.

RAISES DESCRIPTION
PyventusException

If the subscriber is not an instance of Subscriber.

Source code in pyventus/reactive/observables/observable.py
@staticmethod
def get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]:
    """
    Validate and return the specified subscriber.

    :param subscriber: The subscriber to validate.
    :return: The validated subscriber.
    :raises PyventusException: If the subscriber is not an instance of `Subscriber`.
    """
    # Validate that the subscriber is an instance of Subscriber.
    if not isinstance(subscriber, Subscriber):
        raise PyventusException("The 'subscriber' argument must be an instance of Subscriber.")
    return subscriber

get_subscribers

get_subscribers() -> set[Subscriber[_OutT]]

Retrieve all registered subscribers.

RETURNS DESCRIPTION
set[Subscriber[_OutT]]

A set of all registered subscribers.

Source code in pyventus/reactive/observables/observable.py
def get_subscribers(self) -> set[Subscriber[_OutT]]:
    """
    Retrieve all registered subscribers.

    :return: A set of all registered subscribers.
    """
    with self.__thread_lock:
        return self.__subscribers.copy()

get_subscriber_count

get_subscriber_count() -> int

Retrieve the number of registered subscribers.

RETURNS DESCRIPTION
int

The total count of subscribers in the observable.

Source code in pyventus/reactive/observables/observable.py
def get_subscriber_count(self) -> int:
    """
    Retrieve the number of registered subscribers.

    :return: The total count of subscribers in the observable.
    """
    with self.__thread_lock:
        return len(self.__subscribers)

contains_subscriber

contains_subscriber(subscriber: Subscriber[_OutT]) -> bool

Determine if the specified subscriber is present in the observable.

PARAMETER DESCRIPTION
subscriber

The subscriber to be checked.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
bool

True if the subscriber is found; False otherwise.

Source code in pyventus/reactive/observables/observable.py
def contains_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
    """
    Determine if the specified subscriber is present in the observable.

    :param subscriber: The subscriber to be checked.
    :return: `True` if the subscriber is found; `False` otherwise.
    """
    valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)
    with self.__thread_lock:
        return valid_subscriber in self.__subscribers

subscribe

subscribe(*, force_async: bool = False, stateful_subctx: bool = False) -> ObservableSubCtx[Self, _OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False) -> Subscriber[_OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False, stateful_subctx: bool = False) -> Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]

Subscribe the specified callbacks to the current Observable.

This method can be utilized in three ways:

  • As a regular function: Automatically creates and subscribes an observer with the specified callbacks.

  • As a decorator: Creates and subscribes an observer, using the decorated callback as the next callback.

  • As a context manager: Enables a step-by-step definition of the observer's callbacks prior to subscription, which occurs immediately after exiting the context.

PARAMETER DESCRIPTION
next_callback

The callback to be executed when the observable emits a new value.

TYPE: NextCallbackType[_OutT] | None DEFAULT: None

error_callback

The callback to be executed when the observable encounters an error.

TYPE: ErrorCallbackType | None DEFAULT: None

complete_callback

The callback that will be executed when the observable has completed emitting values.

TYPE: CompleteCallbackType | None DEFAULT: None

force_async

Determines whether to force all callbacks to run asynchronously.

TYPE: bool DEFAULT: False

stateful_subctx

A flag indicating whether the subscription context preserves its state (stateful) or not (stateless) after exiting the subscription block. If True, the context retains its state, allowing access to stored objects, including the observable and the subscriber object. If False, the context is stateless, and the stored state is cleared upon exiting the subscription block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]

A Subscriber if callbacks are provided; otherwise, an ObservableSubCtx.

Source code in pyventus/reactive/observables/observable.py
def subscribe(
    self,
    next_callback: NextCallbackType[_OutT] | None = None,
    error_callback: ErrorCallbackType | None = None,
    complete_callback: CompleteCallbackType | None = None,
    *,
    force_async: bool = False,
    stateful_subctx: bool = False,
) -> Subscriber[_OutT] | "Observable.ObservableSubCtx[Self, _OutT]":
    """
    Subscribe the specified callbacks to the current `Observable`.

    This method can be utilized in three ways:

    -   **As a regular function:** Automatically creates and subscribes an observer
        with the specified callbacks.

    -   **As a decorator:** Creates and subscribes an observer, using the decorated
        callback as the next callback.

    -   **As a context manager:** Enables a step-by-step definition of the observer's
        callbacks prior to subscription, which occurs immediately after exiting the context.

    :param next_callback: The callback to be executed when the observable emits a new value.
    :param error_callback: The callback to be executed when the observable encounters an error.
    :param complete_callback: The callback that will be executed when the observable has completed emitting values.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
    :param stateful_subctx: A flag indicating whether the subscription context preserves its state (`stateful`)
        or not (`stateless`) after exiting the subscription block. If `True`, the context retains its state,
        allowing access to stored objects, including the `observable` and the `subscriber` object. If `False`,
        the context is stateless, and the stored state is cleared upon exiting the subscription block to
        prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
    :return: A `Subscriber` if callbacks are provided; otherwise, an `ObservableSubCtx`.
    """
    if next_callback is None and error_callback is None and complete_callback is None:
        # If no callbacks are provided, create a subscription context for progressive definition.
        return Observable.ObservableSubCtx[Self, _OutT](
            observable=self,
            force_async=force_async,
            is_stateful=stateful_subctx,
        )
    else:
        # Create a subscriber with the provided callbacks.
        subscriber = Subscriber[_OutT](
            teardown_callback=self.remove_subscriber,
            next_callback=next_callback,
            error_callback=error_callback,
            complete_callback=complete_callback,
            force_async=force_async,
        )

        # Acquire lock to ensure thread safety.
        with self.__thread_lock:
            # Add the subscriber to the observable.
            self.__subscribers.add(subscriber)

        # Log the subscription if debug is enabled
        if self.__logger.debug_enabled:
            self.__logger.debug(action="Subscribed:", msg=f"{subscriber}")

        # Return the subscriber.
        return subscriber

remove_subscriber

remove_subscriber(subscriber: Subscriber[_OutT]) -> bool

Remove the specified subscriber from the observable.

PARAMETER DESCRIPTION
subscriber

The subscriber to be removed from the observable.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
bool

True if the subscriber was successfully removed; False if the subscriber was not found in the observable.

Source code in pyventus/reactive/observables/observable.py
def remove_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
    """
    Remove the specified subscriber from the observable.

    :param subscriber: The subscriber to be removed from the observable.
    :return: `True` if the subscriber was successfully removed; `False` if
        the subscriber was not found in the observable.
    """
    # Get the valid subscriber instance.
    valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)

    # Acquire lock to ensure thread safety.
    with self.__thread_lock:
        # Check if the subscriber is registered; return False if not.
        if valid_subscriber not in self.__subscribers:
            return False

        # Remove the subscriber from the observable.
        self.__subscribers.remove(valid_subscriber)

    # Log the removal if the debug mode is enabled
    if self.__logger.debug_enabled:
        self.__logger.debug(action="Removed:", msg=f"{valid_subscriber}")

    return True

remove_all

remove_all() -> bool

Remove all subscribers from the observable.

RETURNS DESCRIPTION
bool

True if the observable was successfully cleared; False if the observable was already empty.

Source code in pyventus/reactive/observables/observable.py
def remove_all(self) -> bool:
    """
    Remove all subscribers from the observable.

    :return: `True` if the observable was successfully cleared; `False`
        if the observable was already empty.
    """
    # Acquire lock to ensure thread safety
    with self.__thread_lock:
        # Check if the observable is already empty
        if not self.__subscribers:
            return False

        # Clear the observable
        self.__subscribers.clear()

    if self.__logger.debug_enabled:
        self.__logger.debug(action="Removed:", msg="All subscribers.")

    return True

__init__

__init__(seed: _OutT, validators: list[ObservableValueValidatorType[_OutT]] | None = None, debug: bool | None = None) -> None

Initialize an instance of ObservableValue.

PARAMETER DESCRIPTION
seed

The initial value for the observable. This value is used during initialization and reset operations to restore the observable to its original state. No validation is applied to this value.

TYPE: _OutT

validators

A list of validators that check incoming values. When a value is deemed invalid, the validator must raise an exception to trigger an error notification. Validators can be either synchronous or asynchronous callables.

TYPE: list[ObservableValueValidatorType[_OutT]] | None DEFAULT: None

debug

Specifies the debug mode for the logger. If None, the mode is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/reactive/observables/observable_value.py
def __init__(
    self,
    seed: _OutT,
    validators: list[ObservableValueValidatorType[_OutT]] | None = None,
    debug: bool | None = None,
) -> None:
    """
    Initialize an instance of `ObservableValue`.

    :param seed: The initial value for the observable. This value is used during initialization and reset
        operations to restore the observable to its original state. No validation is applied to this value.
    :param validators: A list of validators that check incoming values. When a value is deemed invalid,
        the validator must raise an exception to trigger an error notification. Validators can be either
        synchronous or asynchronous callables.
    :param debug: Specifies the debug mode for the logger. If `None`, the mode is determined based
        on the execution environment.
    """
    # Initialize the base Observable class with the specified debug mode.
    super().__init__(debug=debug)

    # Check if validators are provided and ensure they are of type list.
    if validators is not None and not isinstance(validators, list):
        raise PyventusException("The 'validators' argument must be a list.")

    # Set the initial seed value and initialize the current value and exception.
    self.__seed: _OutT = seed
    self.__value: _OutT = self.__seed
    self.__exception: Exception | None = None

    # Wrap validators in a unified interface if they are provided.
    self.__validators: list[CallableWrapper[[_OutT], None]] | None = (
        [CallableWrapper(validator, force_async=False) for validator in validators] if validators else None
    )

    # Create an AsyncIO processing service to manage value updates and notifications.
    # The enforce_submission_order option is enabled to ensure that changes are processed sequentially.
    self.__processing_service: AsyncIOProcessingService = AsyncIOProcessingService(enforce_submission_order=True)

wait_for_tasks async

wait_for_tasks() -> None

Wait for all background tasks in the current asyncio loop associated with the ObservableValue to complete.

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_value.py
async def wait_for_tasks(self) -> None:
    """
    Wait for all background tasks in the current asyncio loop associated with the `ObservableValue` to complete.

    :return: None.
    """
    await self.__processing_service.wait_for_tasks()

get_error

get_error(callback: Callable[[Exception | None], None | Awaitable[None]], force_async: bool = False) -> None

Retrieve the current error, if any.

The current error will be received through the parameters of the provided callback, and its execution will be enqueued with the other operations to ensure that the received error corresponds to the correct state at the time the current method was invoked, preventing any inconsistencies.

PARAMETER DESCRIPTION
callback

The callback to be invoked with the current error, if any.

TYPE: Callable[[Exception | None], None | Awaitable[None]]

force_async

Determines whether to force the callback to run asynchronously. If True, the synchronous callback will be converted to run asynchronously in a thread pool using the asyncio.to_thread function. If False, the callback will run synchronously or asynchronously as defined.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
None

None.

RAISES DESCRIPTION
PyventusException

If the provided callback is a generator.

Source code in pyventus/reactive/observables/observable_value.py
def get_error(
    self, callback: Callable[[Exception | None], None | Awaitable[None]], force_async: bool = False
) -> None:
    """
    Retrieve the current error, if any.

    The current error will be received through the parameters of the provided callback, and its execution will be
    enqueued with the other operations to ensure that the received error corresponds to the correct state at the
    time the current method was invoked, preventing any inconsistencies.

    :param callback: The callback to be invoked with the current error, if any.
    :param force_async: Determines whether to force the callback to run asynchronously. If `True`, the synchronous
        callback will be converted to run asynchronously in a thread pool using the `asyncio.to_thread` function.
        If `False`, the callback will run synchronously or asynchronously as defined.
    :return: None.
    :raises PyventusException: If the provided callback is a generator.
    """
    # Wrap the callback to ensure a consistent interface for invocation.
    valid_callback: CallableWrapper[[Exception | None], None] = CallableWrapper(callback, force_async=force_async)

    # Verify that the provided callback is not a generator.
    if valid_callback.is_generator:
        raise PyventusException("The 'callback' argument cannot be a generator.")

    # Schedule the processing of the error retrieval with the provided callback, ensuring that the received error
    # corresponds to the state at the time the `get_error` method was invoked, preventing any inconsistencies.
    self.__processing_service.submit(self._get_error, valid_callback)

get_value

get_value(callback: Callable[[_OutT], None | Awaitable[None]], force_async: bool = False) -> None

Retrieve the current value.

The current value will be received through the parameters of the provided callback, and its execution will be enqueued with the other operations to ensure that the received value corresponds to the correct state at the time the current method was invoked, preventing any inconsistencies.

PARAMETER DESCRIPTION
callback

The callback to be invoked with the current value.

TYPE: Callable[[_OutT], None | Awaitable[None]]

force_async

Determines whether to force the callback to run asynchronously. If True, the synchronous callback will be converted to run asynchronously in a thread pool using the asyncio.to_thread function. If False, the callback will run synchronously or asynchronously as defined.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
None

None.

RAISES DESCRIPTION
PyventusException

If the provided callback is a generator.

Source code in pyventus/reactive/observables/observable_value.py
def get_value(self, callback: Callable[[_OutT], None | Awaitable[None]], force_async: bool = False) -> None:
    """
    Retrieve the current value.

    The current value will be received through the parameters of the provided callback, and its execution will be
    enqueued with the other operations to ensure that the received value corresponds to the correct state at the
    time the current method was invoked, preventing any inconsistencies.

    :param callback: The callback to be invoked with the current value.
    :param force_async: Determines whether to force the callback to run asynchronously. If `True`, the synchronous
        callback will be converted to run asynchronously in a thread pool using the `asyncio.to_thread` function.
        If `False`, the callback will run synchronously or asynchronously as defined.
    :return: None.
    :raises PyventusException: If the provided callback is a generator.
    """
    # Wrap the callback to ensure a consistent interface for invocation.
    valid_callback: CallableWrapper[[_OutT], None] = CallableWrapper(callback, force_async=force_async)

    # Verify that the provided callback is not a generator.
    if valid_callback.is_generator:
        raise PyventusException("The 'callback' argument cannot be a generator.")

    # Schedule the processing of the value retrieval with the provided callback, ensuring that the received value
    # corresponds to the state at the time the `get_value` method was invoked, preventing any inconsistencies.
    self.__processing_service.submit(self._get_value, valid_callback)

set_value

set_value(value: _OutT) -> None

Update the current value to the specified one.

The provided value is validated against the defined validators. If it is deemed valid, it is stored, and the next notification is triggered. If the value is considered invalid by any of the validators, it is stored alongside the raised exception, and an error notification is issued.

PARAMETER DESCRIPTION
value

The value to set as the current value.

TYPE: _OutT

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_value.py
@final  # Prevent overriding in subclasses to maintain the integrity of the _OutT type.
def set_value(self, value: _OutT) -> None:  # type: ignore[misc]
    """
    Update the current value to the specified one.

    The provided value is validated against the defined validators. If it is deemed valid, it is stored,
    and the next notification is triggered. If the value is considered invalid by any of the validators,
    it is stored alongside the raised exception, and an error notification is issued.

    :param value: The value to set as the current value.
    :return: None.
    """
    with self._thread_lock:
        subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
    self.__processing_service.submit(self._set_value, value, subscribers)

clear_value

clear_value() -> None

Clear the current value and reset it to its initial state.

This operation will trigger the completion notification and the removal of all current subscribers.

RETURNS DESCRIPTION
None

None.

Source code in pyventus/reactive/observables/observable_value.py
def clear_value(self) -> None:
    """
    Clear the current value and reset it to its initial state.

    This operation will trigger the completion notification and the removal of all current subscribers.

    :return: None.
    """
    with self._thread_lock:
        subscribers: tuple[Subscriber[_OutT], ...] = tuple(self._subscribers)
        self._subscribers.clear()
    self.__processing_service.submit(self._clear_value, subscribers)

prime_subscribers

prime_subscribers(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]
prime_subscribers(*subscribers: Subscriber[_OutT]) -> tuple[Subscriber[_OutT], ...]
prime_subscribers(*subscribers: Subscriber[_OutT]) -> Subscriber[_OutT] | tuple[Subscriber[_OutT], ...]

Prime the specified subscribers with the current value or error.

PARAMETER DESCRIPTION
subscribers

One or more subscribers to be primed.

TYPE: Subscriber[_OutT] DEFAULT: ()

RETURNS DESCRIPTION
Subscriber[_OutT] | tuple[Subscriber[_OutT], ...]

The same subscribers as input.

RAISES DESCRIPTION
PyventusException

If no subscribers are given.

Source code in pyventus/reactive/observables/observable_value.py
def prime_subscribers(self, *subscribers: Subscriber[_OutT]) -> Subscriber[_OutT] | tuple[Subscriber[_OutT], ...]:
    """
    Prime the specified subscribers with the current value or error.

    :param subscribers: One or more subscribers to be primed.
    :return: The same subscribers as input.
    :raises PyventusException: If no subscribers are given.
    """
    if not subscribers:
        raise PyventusException("At least one subscriber must be provided.")

    if len(subscribers) == 1:
        self.__processing_service.submit(
            self._prime_subscribers,
            {self.get_valid_subscriber(subscribers[0])},
        )
        return subscribers[0]
    else:
        self.__processing_service.submit(
            self._prime_subscribers,
            {self.get_valid_subscriber(subscriber) for subscriber in subscribers},
        )
        return subscribers