Skip to content

Observable class

Bases: ABC, Generic[_OutT]

A base class that defines a lazy push-style notification mechanism for streaming data to subscribers.

Notes:

  • The Observable class serves as a foundation for implementing various observable types with different dispatch logic and strategies, encapsulating the essential protocols and workflows for streaming data to subscribers in a reactive manner.

  • This class is parameterized by the type of value that will be streamed through the observable. This type parameter is covariant, allowing it to be either the specified type or any subtype.

  • The subscribe() method can be utilized as a regular function, a decorator, or a context manager. When used as a regular function, it automatically creates and subscribes an observer with the specified callbacks. As a decorator, it creates and subscribes an observer, using the decorated callback as the next callback. Finally, when employed as a context manager with the with statement, it enables a step-by-step definition of the observer's callbacks prior to its subscription, which occurs immediately after exiting the context.

  • This class has been designed with thread safety in mind. All of its methods synchronize access to mutable attributes to prevent race conditions when managing observables in a multi-threaded environment.

Source code in pyventus/reactive/observables/observable.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
class Observable(ABC, Generic[_OutT]):
    """
    A base class that defines a lazy push-style notification mechanism for streaming data to subscribers.

    **Notes:**

    -   The `Observable` class serves as a foundation for implementing various observable types with different
        dispatch logic and strategies, encapsulating the essential protocols and workflows for streaming data
        to subscribers in a reactive manner.

    -   This class is parameterized by the type of value that will be streamed through the observable. This
        type parameter is covariant, allowing it to be either the specified type or any subtype.

    -   The `subscribe()` method can be utilized as a regular function, a decorator, or a context manager.
        When used as a regular function, it automatically creates and subscribes an observer with the specified
        callbacks. As a decorator, it creates and subscribes an observer, using the decorated callback as the
        next callback. Finally, when employed as a context manager with the `with` statement, it enables a
        step-by-step definition of the observer's callbacks prior to its subscription, which occurs
        immediately after exiting the context.

    -   This class has been designed with *thread safety* in mind. All of its methods synchronize access to
        mutable attributes to prevent race conditions when managing observables in a multi-threaded environment.
    """

    @final
    class ObservableSubCtx(Generic[_SubCtxO, _SubCtxT], SubscriptionContext[_SubCtxO, Subscriber[_SubCtxT]]):
        """
        A context manager for Observable subscriptions.

        **Notes:**

        -   This class establishes a context block for a step-by-step definition of the observer's
            callbacks prior to the actual subscription, which occurs immediately upon exiting the
            context block.

        -   This class can be used as either a decorator or a context manager. When used as a
            decorator, it creates and subscribes an observer, using the decorated callback as
            the next callback. When employed as a context manager with the `with` statement,
            it enables a step-by-step definition of the observer's callbacks prior to its
            subscription, which occurs immediately after exiting the context.

        -   This subscription context can be `stateful`, retaining references to the `observable`
            and `subscriber`, or `stateless`, which clears the context upon exiting the
            subscription block.

        -   This class is not intended to be subclassed or manually instantiated.
        """

        # Attributes for the ObservableSubCtx
        __slots__ = ("__next_callback", "__error_callback", "__complete_callback", "__force_async")

        def __init__(self, observable: _SubCtxO, force_async: bool, is_stateful: bool) -> None:
            """
            Initialize an instance of `ObservableSubCtx`.

            :param observable: The observable to which the observer will be subscribed.
            :param force_async: Determines whether to force all callbacks to run asynchronously.
            :param is_stateful: A flag indicating whether the context preserves its state (`stateful`) or
                not (`stateless`) after exiting the subscription context. If `True`, the context retains its
                state, allowing access to stored objects, including the `observable` and the `subscriber`
                object. If `False`, the context is stateless, and the stored state is cleared upon
                exiting the subscription context to prevent memory leaks.
            """
            # Initialize the base SubscriptionContext class
            super().__init__(source=observable, is_stateful=is_stateful)

            # Initialize variables
            self.__next_callback: NextCallbackType[_SubCtxT] | None = None
            self.__error_callback: ErrorCallbackType | None = None
            self.__complete_callback: CompleteCallbackType | None = None
            self.__force_async: bool = force_async

        @override
        def _exit(self) -> Subscriber[_SubCtxT]:
            # Ensure that the source is not None.
            if self._source is None:  # pragma: no cover
                raise PyventusException("The subscription context is closed.")

            # Ensure that at least one callback is defined before performing the subscription.
            if self.__next_callback is None and self.__error_callback is None and self.__complete_callback is None:
                raise PyventusException("At least one callback must be defined before performing the subscription.")

            # Subscribe the defined callbacks to the specified
            # observable and store the returned subscriber.
            subscriber: Subscriber[_SubCtxT] = self._source.subscribe(
                next_callback=self.__next_callback,
                error_callback=self.__error_callback,
                complete_callback=self.__complete_callback,
                force_async=self.__force_async,
            )

            # Remove context-specific attributes.
            del self.__next_callback, self.__error_callback, self.__complete_callback, self.__force_async

            # Return the subscriber.
            return subscriber

        def on_next(self, callback: NextCallbackType[_SubCtxT]) -> NextCallbackType[_SubCtxT]:
            """
            Set the observer's next callback.

            :param callback: The callback to be executed when the observable emits a new value.
            :return: The decorated callback.
            """
            self.__next_callback = callback
            return callback

        def on_error(self, callback: ErrorCallbackType) -> ErrorCallbackType:
            """
            Set the observer's error callback.

            :param callback: The callback to be executed when the observable encounters an error.
            :return: The decorated callback.
            """
            self.__error_callback = callback
            return callback

        def on_complete(self, callback: CompleteCallbackType) -> CompleteCallbackType:
            """
            Set the observer's complete callback.

            :param callback: The callback that will be executed when the observable has completed emitting values.
            :return: The decorated callback.
            """
            self.__complete_callback = callback
            return callback

        def __call__(
            self, callback: NextCallbackType[_SubCtxT]
        ) -> (
            tuple[NextCallbackType[_SubCtxT], "Observable.ObservableSubCtx[_SubCtxO, _SubCtxT]"]
            | NextCallbackType[_SubCtxT]
        ):
            """
            Subscribe the decorated callback as the observer's `next` function for the specified observable.

            :param callback: The callback to be executed when the observable emits a new value.
            :return: A tuple containing the decorated callback and its subscription context
                if the context is `stateful`; otherwise, returns the decorated callback alone.
            """
            # Store the provided callback in the subscription context.
            self.__next_callback = callback

            # Set error and complete callbacks to None.
            self.__error_callback = None
            self.__complete_callback = None

            # Determine if the subscription context is stateful.
            is_stateful: bool = self._is_stateful

            # Call the exit method to finalize the
            # subscription process and clean up any necessary context.
            self.__exit__(None, None, None)

            # Return a tuple containing the decorated callback
            # and the current subscription context if the context
            # is stateful; otherwise, return just the callback.
            return (callback, self) if is_stateful else callback

    @final
    class Completed(Exception):
        """Exception raised to indicate that an observable sequence has completed."""

    @staticmethod
    def get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]:
        """
        Validate and return the specified subscriber.

        :param subscriber: The subscriber to validate.
        :return: The validated subscriber.
        :raises PyventusException: If the subscriber is not an instance of `Subscriber`.
        """
        # Validate that the subscriber is an instance of Subscriber.
        if not isinstance(subscriber, Subscriber):
            raise PyventusException("The 'subscriber' argument must be an instance of Subscriber.")
        return subscriber

    # Attributes for the Observable
    __slots__ = ("__subscribers", "__thread_lock", "__logger")

    def __init__(self, debug: bool | None = None) -> None:
        """
        Initialize an instance of `Observable`.

        :param debug: Specifies the debug mode for the logger. If `None`,
            the mode is determined based on the execution environment.
        """
        # Initialize the set of subscribers.
        self.__subscribers: set[Subscriber[_OutT]] = set()

        # Create a lock object for thread synchronization.
        self.__thread_lock: Lock = Lock()

        # Validate the debug argument.
        if debug is not None and not isinstance(debug, bool):
            raise PyventusException("The 'debug' argument must be a boolean value.")

        # Set up the logger with the appropriate debug mode.
        self.__logger: Logger = Logger(source=self, debug=debug if debug is not None else bool(gettrace() is not None))

    def __repr__(self) -> str:
        """
        Retrieve a string representation of the instance.

        :return: A string representation of the instance.
        """
        return attributes_repr(
            subscribers=self.__subscribers,
            thread_lock=self.__thread_lock,
            debug=self.__logger.debug_enabled,
        )

    @property
    def _logger(self) -> Logger:
        """
        Retrieve the logger instance.

        :return: The logger instance used for logging messages.
        """
        return self.__logger

    @property
    def _thread_lock(self) -> Lock:
        """
        Retrieve the thread lock instance.

        :return: The thread lock instance used to ensure thread-safe operations.
        """
        return self.__thread_lock

    def _log_subscriber_exception(self, subscriber: Subscriber[_OutT], exception: Exception) -> None:
        """
        Log an unhandled exception that occurred during the execution of a subscriber's callback.

        :param subscriber: The subscriber instance that encountered the exception.
        :param exception: The exception instance to be logged.
        :return: None.
        """
        self.__logger.error(action="Exception:", msg=f"{exception!r} at {summarized_repr(subscriber)}.")

    @final  # Prevent overriding in subclasses to maintain the integrity of the _OutT type.
    async def _emit_next(self, value: _OutT) -> None:  # type: ignore[misc]
        """
        Emit the next value to all subscribers.

        This method notifies all subscribers of the next value in the stream.

        :param value: The value to be emitted to all subscribers.
        :return: None.
        """
        # Acquire lock to ensure thread safety.
        with self.__thread_lock:
            # Get all subscribers and filter those with a next callback to optimize execution.
            subscribers: list[Subscriber[_OutT]] = [
                subscriber for subscriber in self.__subscribers if subscriber.has_next_callback
            ]

        # Exit if there are no subscribers.
        if not subscribers:
            # Log a debug message if debug mode is enabled.
            if self.__logger.debug_enabled:
                self.__logger.debug(
                    action="Emitting Next Value:",
                    msg=f"No subscribers to notify of the next value: {value!r}.",
                )
            return

        # Log the emission of the next value if debug mode is enabled.
        if self.__logger.debug_enabled:
            self.__logger.debug(
                action="Emitting Next Value:",
                msg=f"Notifying {len(subscribers)} subscribers of the next value: {value!r}.",
            )

        # If there is only one subscriber, handle it directly.
        if len(subscribers) == 1:
            try:
                # Notify the subscriber and await its response.
                subscriber: Subscriber[_OutT] = subscribers.pop()
                await subscriber.next(value)
            except Exception as e:
                # Log any exceptions that occur during notification.
                self._log_subscriber_exception(subscriber, e)
        else:
            # Notify all subscribers concurrently.
            results = await gather(*[subscriber.next(value) for subscriber in subscribers], return_exceptions=True)

            # Log any exceptions that occur during notification.
            for subscriber, result in zip(subscribers, results, strict=True):
                if isinstance(result, Exception):
                    self._log_subscriber_exception(subscriber, result)

    @final
    async def _emit_error(self, exception: Exception) -> None:
        """
        Emit the error that occurred to all subscribers.

        This method notifies all subscribers of the error that occurred.

        :param exception: The exception to be emitted to all subscribers.
        :return: None.
        """
        # Acquire lock to ensure thread safety.
        with self.__thread_lock:
            # Get all subscribers and filter those with an error callback to optimize execution.
            subscribers: list[Subscriber[_OutT]] = [
                subscriber for subscriber in self.__subscribers if subscriber.has_error_callback
            ]

        # Exit if there are no subscribers.
        if not subscribers:
            # Log an error message.
            self.__logger.error(
                action="Emitting Error:",
                msg=f"No subscribers to notify of the error: {exception!r}.",
            )
            return

        # Log the error emission if debug mode is enabled.
        if self.__logger.debug_enabled:
            self.__logger.debug(
                action="Emitting Error:",
                msg=f"Notifying {len(subscribers)} subscribers of the error: {exception!r}.",
            )

        # If there is only one subscriber, handle it directly.
        if len(subscribers) == 1:
            try:
                # Notify the subscriber and await its response.
                subscriber: Subscriber[_OutT] = subscribers.pop()
                await subscriber.error(exception)
            except Exception as e:
                # Log any exceptions that occur during notification.
                self._log_subscriber_exception(subscriber, e)
        else:
            # Notify all subscribers concurrently.
            results = await gather(*[subscriber.error(exception) for subscriber in subscribers], return_exceptions=True)

            # Log any exceptions that occur during notification.
            for subscriber, result in zip(subscribers, results, strict=True):
                if isinstance(result, Exception):
                    self._log_subscriber_exception(subscriber, result)

    @final
    async def _emit_complete(self) -> None:
        """
        Emit the completion signal to all subscribers.

        This method notifies all subscribers that the stream has completed.

        :return: None.
        """
        # Acquire lock to ensure thread safety.
        with self.__thread_lock:
            # Get all subscribers and filter those with a complete callback to optimize execution.
            subscribers: list[Subscriber[_OutT]] = [
                subscriber for subscriber in self.__subscribers if subscriber.has_complete_callback
            ]

            # Unsubscribe all observers since the stream has completed.
            self.__subscribers.clear()

        # Exit if there are no subscribers.
        if not subscribers:
            # Log a debug message if debug mode is enabled.
            if self.__logger.debug_enabled:
                self.__logger.debug(
                    action="Emitting Completion:",
                    msg="No subscribers to notify of completion.",
                )
            return

        # Log the emission if debug is enabled.
        if self.__logger.debug_enabled:
            self.__logger.debug(
                action="Emitting Completion:",
                msg=f"Notifying {len(subscribers)} subscribers of completion.",
            )

        # If there is only one subscriber, handle it directly.
        if len(subscribers) == 1:
            try:
                # Notify the subscriber and await its response.
                subscriber: Subscriber[_OutT] = subscribers.pop()
                await subscriber.complete()
            except Exception as e:
                # Log any exceptions that occur during notification.
                self._log_subscriber_exception(subscriber, e)
        else:
            # Notify all subscribers concurrently.
            results = await gather(*[subscriber.complete() for subscriber in subscribers], return_exceptions=True)

            # Log any exceptions that occur during notification.
            for subscriber, result in zip(subscribers, results, strict=True):
                if isinstance(result, Exception):
                    self._log_subscriber_exception(subscriber, result)

    def get_subscribers(self) -> set[Subscriber[_OutT]]:
        """
        Retrieve all registered subscribers.

        :return: A set of all registered subscribers.
        """
        with self.__thread_lock:
            return self.__subscribers.copy()

    def get_subscriber_count(self) -> int:
        """
        Retrieve the number of registered subscribers.

        :return: The total count of subscribers in the observable.
        """
        with self.__thread_lock:
            return len(self.__subscribers)

    def contains_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
        """
        Determine if the specified subscriber is present in the observable.

        :param subscriber: The subscriber to be checked.
        :return: `True` if the subscriber is found; `False` otherwise.
        """
        valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)
        with self.__thread_lock:
            return valid_subscriber in self.__subscribers

    @overload
    def subscribe(
        self, *, force_async: bool = False, stateful_subctx: bool = False
    ) -> "Observable.ObservableSubCtx[Self, _OutT]": ...

    @overload
    def subscribe(
        self,
        next_callback: NextCallbackType[_OutT] | None = None,
        error_callback: ErrorCallbackType | None = None,
        complete_callback: CompleteCallbackType | None = None,
        *,
        force_async: bool = False,
    ) -> Subscriber[_OutT]: ...

    def subscribe(
        self,
        next_callback: NextCallbackType[_OutT] | None = None,
        error_callback: ErrorCallbackType | None = None,
        complete_callback: CompleteCallbackType | None = None,
        *,
        force_async: bool = False,
        stateful_subctx: bool = False,
    ) -> Subscriber[_OutT] | "Observable.ObservableSubCtx[Self, _OutT]":
        """
        Subscribe the specified callbacks to the current `Observable`.

        This method can be utilized in three ways:

        -   **As a regular function:** Automatically creates and subscribes an observer
            with the specified callbacks.

        -   **As a decorator:** Creates and subscribes an observer, using the decorated
            callback as the next callback.

        -   **As a context manager:** Enables a step-by-step definition of the observer's
            callbacks prior to subscription, which occurs immediately after exiting the context.

        :param next_callback: The callback to be executed when the observable emits a new value.
        :param error_callback: The callback to be executed when the observable encounters an error.
        :param complete_callback: The callback that will be executed when the observable has completed emitting values.
        :param force_async: Determines whether to force all callbacks to run asynchronously.
        :param stateful_subctx: A flag indicating whether the subscription context preserves its state (`stateful`)
            or not (`stateless`) after exiting the subscription block. If `True`, the context retains its state,
            allowing access to stored objects, including the `observable` and the `subscriber` object. If `False`,
            the context is stateless, and the stored state is cleared upon exiting the subscription block to
            prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
        :return: A `Subscriber` if callbacks are provided; otherwise, an `ObservableSubCtx`.
        """
        if next_callback is None and error_callback is None and complete_callback is None:
            # If no callbacks are provided, create a subscription context for progressive definition.
            return Observable.ObservableSubCtx[Self, _OutT](
                observable=self,
                force_async=force_async,
                is_stateful=stateful_subctx,
            )
        else:
            # Create a subscriber with the provided callbacks.
            subscriber = Subscriber[_OutT](
                teardown_callback=self.remove_subscriber,
                next_callback=next_callback,
                error_callback=error_callback,
                complete_callback=complete_callback,
                force_async=force_async,
            )

            # Acquire lock to ensure thread safety.
            with self.__thread_lock:
                # Add the subscriber to the observable.
                self.__subscribers.add(subscriber)

            # Log the subscription if debug is enabled
            if self.__logger.debug_enabled:
                self.__logger.debug(action="Subscribed:", msg=f"{subscriber}")

            # Return the subscriber.
            return subscriber

    def remove_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
        """
        Remove the specified subscriber from the observable.

        :param subscriber: The subscriber to be removed from the observable.
        :return: `True` if the subscriber was successfully removed; `False` if
            the subscriber was not found in the observable.
        """
        # Get the valid subscriber instance.
        valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)

        # Acquire lock to ensure thread safety.
        with self.__thread_lock:
            # Check if the subscriber is registered; return False if not.
            if valid_subscriber not in self.__subscribers:
                return False

            # Remove the subscriber from the observable.
            self.__subscribers.remove(valid_subscriber)

        # Log the removal if the debug mode is enabled
        if self.__logger.debug_enabled:
            self.__logger.debug(action="Removed:", msg=f"{valid_subscriber}")

        return True

    def remove_all(self) -> bool:
        """
        Remove all subscribers from the observable.

        :return: `True` if the observable was successfully cleared; `False`
            if the observable was already empty.
        """
        # Acquire lock to ensure thread safety
        with self.__thread_lock:
            # Check if the observable is already empty
            if not self.__subscribers:
                return False

            # Clear the observable
            self.__subscribers.clear()

        if self.__logger.debug_enabled:
            self.__logger.debug(action="Removed:", msg="All subscribers.")

        return True

Classes

ObservableSubCtx

Bases: Generic[_SubCtxO, _SubCtxT], SubscriptionContext[_SubCtxO, Subscriber[_SubCtxT]]

A context manager for Observable subscriptions.

Notes:

  • This class establishes a context block for a step-by-step definition of the observer's callbacks prior to the actual subscription, which occurs immediately upon exiting the context block.

  • This class can be used as either a decorator or a context manager. When used as a decorator, it creates and subscribes an observer, using the decorated callback as the next callback. When employed as a context manager with the with statement, it enables a step-by-step definition of the observer's callbacks prior to its subscription, which occurs immediately after exiting the context.

  • This subscription context can be stateful, retaining references to the observable and subscriber, or stateless, which clears the context upon exiting the subscription block.

  • This class is not intended to be subclassed or manually instantiated.

Source code in pyventus/reactive/observables/observable.py
@final
class ObservableSubCtx(Generic[_SubCtxO, _SubCtxT], SubscriptionContext[_SubCtxO, Subscriber[_SubCtxT]]):
    """
    A context manager for Observable subscriptions.

    **Notes:**

    -   This class establishes a context block for a step-by-step definition of the observer's
        callbacks prior to the actual subscription, which occurs immediately upon exiting the
        context block.

    -   This class can be used as either a decorator or a context manager. When used as a
        decorator, it creates and subscribes an observer, using the decorated callback as
        the next callback. When employed as a context manager with the `with` statement,
        it enables a step-by-step definition of the observer's callbacks prior to its
        subscription, which occurs immediately after exiting the context.

    -   This subscription context can be `stateful`, retaining references to the `observable`
        and `subscriber`, or `stateless`, which clears the context upon exiting the
        subscription block.

    -   This class is not intended to be subclassed or manually instantiated.
    """

    # Attributes for the ObservableSubCtx
    __slots__ = ("__next_callback", "__error_callback", "__complete_callback", "__force_async")

    def __init__(self, observable: _SubCtxO, force_async: bool, is_stateful: bool) -> None:
        """
        Initialize an instance of `ObservableSubCtx`.

        :param observable: The observable to which the observer will be subscribed.
        :param force_async: Determines whether to force all callbacks to run asynchronously.
        :param is_stateful: A flag indicating whether the context preserves its state (`stateful`) or
            not (`stateless`) after exiting the subscription context. If `True`, the context retains its
            state, allowing access to stored objects, including the `observable` and the `subscriber`
            object. If `False`, the context is stateless, and the stored state is cleared upon
            exiting the subscription context to prevent memory leaks.
        """
        # Initialize the base SubscriptionContext class
        super().__init__(source=observable, is_stateful=is_stateful)

        # Initialize variables
        self.__next_callback: NextCallbackType[_SubCtxT] | None = None
        self.__error_callback: ErrorCallbackType | None = None
        self.__complete_callback: CompleteCallbackType | None = None
        self.__force_async: bool = force_async

    @override
    def _exit(self) -> Subscriber[_SubCtxT]:
        # Ensure that the source is not None.
        if self._source is None:  # pragma: no cover
            raise PyventusException("The subscription context is closed.")

        # Ensure that at least one callback is defined before performing the subscription.
        if self.__next_callback is None and self.__error_callback is None and self.__complete_callback is None:
            raise PyventusException("At least one callback must be defined before performing the subscription.")

        # Subscribe the defined callbacks to the specified
        # observable and store the returned subscriber.
        subscriber: Subscriber[_SubCtxT] = self._source.subscribe(
            next_callback=self.__next_callback,
            error_callback=self.__error_callback,
            complete_callback=self.__complete_callback,
            force_async=self.__force_async,
        )

        # Remove context-specific attributes.
        del self.__next_callback, self.__error_callback, self.__complete_callback, self.__force_async

        # Return the subscriber.
        return subscriber

    def on_next(self, callback: NextCallbackType[_SubCtxT]) -> NextCallbackType[_SubCtxT]:
        """
        Set the observer's next callback.

        :param callback: The callback to be executed when the observable emits a new value.
        :return: The decorated callback.
        """
        self.__next_callback = callback
        return callback

    def on_error(self, callback: ErrorCallbackType) -> ErrorCallbackType:
        """
        Set the observer's error callback.

        :param callback: The callback to be executed when the observable encounters an error.
        :return: The decorated callback.
        """
        self.__error_callback = callback
        return callback

    def on_complete(self, callback: CompleteCallbackType) -> CompleteCallbackType:
        """
        Set the observer's complete callback.

        :param callback: The callback that will be executed when the observable has completed emitting values.
        :return: The decorated callback.
        """
        self.__complete_callback = callback
        return callback

    def __call__(
        self, callback: NextCallbackType[_SubCtxT]
    ) -> (
        tuple[NextCallbackType[_SubCtxT], "Observable.ObservableSubCtx[_SubCtxO, _SubCtxT]"]
        | NextCallbackType[_SubCtxT]
    ):
        """
        Subscribe the decorated callback as the observer's `next` function for the specified observable.

        :param callback: The callback to be executed when the observable emits a new value.
        :return: A tuple containing the decorated callback and its subscription context
            if the context is `stateful`; otherwise, returns the decorated callback alone.
        """
        # Store the provided callback in the subscription context.
        self.__next_callback = callback

        # Set error and complete callbacks to None.
        self.__error_callback = None
        self.__complete_callback = None

        # Determine if the subscription context is stateful.
        is_stateful: bool = self._is_stateful

        # Call the exit method to finalize the
        # subscription process and clean up any necessary context.
        self.__exit__(None, None, None)

        # Return a tuple containing the decorated callback
        # and the current subscription context if the context
        # is stateful; otherwise, return just the callback.
        return (callback, self) if is_stateful else callback

Functions

unpack
unpack() -> tuple[_SourceType | None, _SubscriberType | None]

Unpack and retrieve the source object and its associated subscriber.

This method returns a tuple containing the source object and its subscriber, while also handling the cleanup of associated resources to prevent memory leaks. After retrieving the objects, it deletes internal references to the source and subscriber to ensure they are no longer retained.

RETURNS DESCRIPTION
tuple[_SourceType | None, _SubscriberType | None]

A tuple of the form (source, subscriber). Both may be None if the subscription context has either unpacked the state previously or is stateless.

RAISES DESCRIPTION
PyventusException

If this method is called before or during the subscription context, indicating that the resources are not yet available for unpacking.

Source code in pyventus/core/subscriptions/subscription_context.py
def unpack(self) -> tuple[_SourceType | None, _SubscriberType | None]:
    """
    Unpack and retrieve the source object and its associated subscriber.

    This method returns a tuple containing the source object and its subscriber,
    while also handling the cleanup of associated resources to prevent memory leaks.
    After retrieving the objects, it deletes internal references to the source and
    subscriber to ensure they are no longer retained.

    :return: A tuple of the form (source, subscriber). Both may be `None` if the
        subscription context has either unpacked the state previously or is stateless.
    :raises PyventusException: If this method is called before or during the subscription
        context, indicating that the resources are not yet available for unpacking.
    """
    # Create a tuple with the source object and its subscriber
    results: tuple[_SourceType | None, _SubscriberType | None] = (self._source, self._subscriber)

    # Perform cleanup by deleting unnecessary references
    if results[0]:
        del self.__source
    if results[1]:
        del self.__subscriber

    return results
__enter__
__enter__() -> Self

Enter the subscription context block.

This method facilitates the progressive definition of an object that will later be subscribed to the specified source.

RETURNS DESCRIPTION
Self

The subscription context manager.

Source code in pyventus/core/subscriptions/subscription_context.py
def __enter__(self: Self) -> Self:
    """
    Enter the subscription context block.

    This method facilitates the progressive definition of an
    object that will later be subscribed to the specified source.

    :return: The subscription context manager.
    """
    return self
__exit__
__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None

Exit the subscription context block.

This method subscribes the defined object to the specified source, and performs any necessary cleanup.

PARAMETER DESCRIPTION
exc_type

The type of the raised exception, if any.

TYPE: type[BaseException] | None

exc_val

The raised exception object, if any.

TYPE: BaseException | None

exc_tb

The traceback information, if any.

TYPE: TracebackType | None

RETURNS DESCRIPTION
None

None.

Source code in pyventus/core/subscriptions/subscription_context.py
def __exit__(
    self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
    """
    Exit the subscription context block.

    This method subscribes the defined object to the
    specified source, and performs any necessary cleanup.

    :param exc_type: The type of the raised exception, if any.
    :param exc_val: The raised exception object, if any.
    :param exc_tb: The traceback information, if any.
    :return: None.
    """
    # Finalize the subscription and retrieve the subscriber
    subscriber: _SubscriberType = self._exit()

    # Ensure the subscriber is valid
    if subscriber is None:  # pragma: no cover
        raise PyventusException("The 'subscriber' argument cannot be None.")

    # Check if a subscriber has already been set to avoid an override
    if self.__subscriber:  # pragma: no cover
        raise PyventusException("A 'subscriber' has already been set.")

    if self.__is_stateful:
        # Retain the subscriber if the context is stateful
        self.__subscriber = subscriber
    else:
        # Remove context-specific references if stateless
        del self.__source, self.__subscriber

    # Remove the stateful context flag
    del self.__is_stateful
__init__
__init__(observable: _SubCtxO, force_async: bool, is_stateful: bool) -> None

Initialize an instance of ObservableSubCtx.

PARAMETER DESCRIPTION
observable

The observable to which the observer will be subscribed.

TYPE: _SubCtxO

force_async

Determines whether to force all callbacks to run asynchronously.

TYPE: bool

is_stateful

A flag indicating whether the context preserves its state (stateful) or not (stateless) after exiting the subscription context. If True, the context retains its state, allowing access to stored objects, including the observable and the subscriber object. If False, the context is stateless, and the stored state is cleared upon exiting the subscription context to prevent memory leaks.

TYPE: bool

Source code in pyventus/reactive/observables/observable.py
def __init__(self, observable: _SubCtxO, force_async: bool, is_stateful: bool) -> None:
    """
    Initialize an instance of `ObservableSubCtx`.

    :param observable: The observable to which the observer will be subscribed.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
    :param is_stateful: A flag indicating whether the context preserves its state (`stateful`) or
        not (`stateless`) after exiting the subscription context. If `True`, the context retains its
        state, allowing access to stored objects, including the `observable` and the `subscriber`
        object. If `False`, the context is stateless, and the stored state is cleared upon
        exiting the subscription context to prevent memory leaks.
    """
    # Initialize the base SubscriptionContext class
    super().__init__(source=observable, is_stateful=is_stateful)

    # Initialize variables
    self.__next_callback: NextCallbackType[_SubCtxT] | None = None
    self.__error_callback: ErrorCallbackType | None = None
    self.__complete_callback: CompleteCallbackType | None = None
    self.__force_async: bool = force_async
on_next
on_next(callback: NextCallbackType[_SubCtxT]) -> NextCallbackType[_SubCtxT]

Set the observer's next callback.

PARAMETER DESCRIPTION
callback

The callback to be executed when the observable emits a new value.

TYPE: NextCallbackType[_SubCtxT]

RETURNS DESCRIPTION
NextCallbackType[_SubCtxT]

The decorated callback.

Source code in pyventus/reactive/observables/observable.py
def on_next(self, callback: NextCallbackType[_SubCtxT]) -> NextCallbackType[_SubCtxT]:
    """
    Set the observer's next callback.

    :param callback: The callback to be executed when the observable emits a new value.
    :return: The decorated callback.
    """
    self.__next_callback = callback
    return callback
on_error
on_error(callback: ErrorCallbackType) -> ErrorCallbackType

Set the observer's error callback.

PARAMETER DESCRIPTION
callback

The callback to be executed when the observable encounters an error.

TYPE: ErrorCallbackType

RETURNS DESCRIPTION
ErrorCallbackType

The decorated callback.

Source code in pyventus/reactive/observables/observable.py
def on_error(self, callback: ErrorCallbackType) -> ErrorCallbackType:
    """
    Set the observer's error callback.

    :param callback: The callback to be executed when the observable encounters an error.
    :return: The decorated callback.
    """
    self.__error_callback = callback
    return callback
on_complete
on_complete(callback: CompleteCallbackType) -> CompleteCallbackType

Set the observer's complete callback.

PARAMETER DESCRIPTION
callback

The callback that will be executed when the observable has completed emitting values.

TYPE: CompleteCallbackType

RETURNS DESCRIPTION
CompleteCallbackType

The decorated callback.

Source code in pyventus/reactive/observables/observable.py
def on_complete(self, callback: CompleteCallbackType) -> CompleteCallbackType:
    """
    Set the observer's complete callback.

    :param callback: The callback that will be executed when the observable has completed emitting values.
    :return: The decorated callback.
    """
    self.__complete_callback = callback
    return callback
__call__
__call__(callback: NextCallbackType[_SubCtxT]) -> tuple[NextCallbackType[_SubCtxT], ObservableSubCtx[_SubCtxO, _SubCtxT]] | NextCallbackType[_SubCtxT]

Subscribe the decorated callback as the observer's next function for the specified observable.

PARAMETER DESCRIPTION
callback

The callback to be executed when the observable emits a new value.

TYPE: NextCallbackType[_SubCtxT]

RETURNS DESCRIPTION
tuple[NextCallbackType[_SubCtxT], ObservableSubCtx[_SubCtxO, _SubCtxT]] | NextCallbackType[_SubCtxT]

A tuple containing the decorated callback and its subscription context if the context is stateful; otherwise, returns the decorated callback alone.

Source code in pyventus/reactive/observables/observable.py
def __call__(
    self, callback: NextCallbackType[_SubCtxT]
) -> (
    tuple[NextCallbackType[_SubCtxT], "Observable.ObservableSubCtx[_SubCtxO, _SubCtxT]"]
    | NextCallbackType[_SubCtxT]
):
    """
    Subscribe the decorated callback as the observer's `next` function for the specified observable.

    :param callback: The callback to be executed when the observable emits a new value.
    :return: A tuple containing the decorated callback and its subscription context
        if the context is `stateful`; otherwise, returns the decorated callback alone.
    """
    # Store the provided callback in the subscription context.
    self.__next_callback = callback

    # Set error and complete callbacks to None.
    self.__error_callback = None
    self.__complete_callback = None

    # Determine if the subscription context is stateful.
    is_stateful: bool = self._is_stateful

    # Call the exit method to finalize the
    # subscription process and clean up any necessary context.
    self.__exit__(None, None, None)

    # Return a tuple containing the decorated callback
    # and the current subscription context if the context
    # is stateful; otherwise, return just the callback.
    return (callback, self) if is_stateful else callback

Completed

Bases: Exception

Exception raised to indicate that an observable sequence has completed.

Source code in pyventus/reactive/observables/observable.py
@final
class Completed(Exception):
    """Exception raised to indicate that an observable sequence has completed."""

Functions

get_valid_subscriber staticmethod

get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]

Validate and return the specified subscriber.

PARAMETER DESCRIPTION
subscriber

The subscriber to validate.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
Subscriber[_OutT]

The validated subscriber.

RAISES DESCRIPTION
PyventusException

If the subscriber is not an instance of Subscriber.

Source code in pyventus/reactive/observables/observable.py
@staticmethod
def get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]:
    """
    Validate and return the specified subscriber.

    :param subscriber: The subscriber to validate.
    :return: The validated subscriber.
    :raises PyventusException: If the subscriber is not an instance of `Subscriber`.
    """
    # Validate that the subscriber is an instance of Subscriber.
    if not isinstance(subscriber, Subscriber):
        raise PyventusException("The 'subscriber' argument must be an instance of Subscriber.")
    return subscriber

__init__

__init__(debug: bool | None = None) -> None

Initialize an instance of Observable.

PARAMETER DESCRIPTION
debug

Specifies the debug mode for the logger. If None, the mode is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/reactive/observables/observable.py
def __init__(self, debug: bool | None = None) -> None:
    """
    Initialize an instance of `Observable`.

    :param debug: Specifies the debug mode for the logger. If `None`,
        the mode is determined based on the execution environment.
    """
    # Initialize the set of subscribers.
    self.__subscribers: set[Subscriber[_OutT]] = set()

    # Create a lock object for thread synchronization.
    self.__thread_lock: Lock = Lock()

    # Validate the debug argument.
    if debug is not None and not isinstance(debug, bool):
        raise PyventusException("The 'debug' argument must be a boolean value.")

    # Set up the logger with the appropriate debug mode.
    self.__logger: Logger = Logger(source=self, debug=debug if debug is not None else bool(gettrace() is not None))

get_subscribers

get_subscribers() -> set[Subscriber[_OutT]]

Retrieve all registered subscribers.

RETURNS DESCRIPTION
set[Subscriber[_OutT]]

A set of all registered subscribers.

Source code in pyventus/reactive/observables/observable.py
def get_subscribers(self) -> set[Subscriber[_OutT]]:
    """
    Retrieve all registered subscribers.

    :return: A set of all registered subscribers.
    """
    with self.__thread_lock:
        return self.__subscribers.copy()

get_subscriber_count

get_subscriber_count() -> int

Retrieve the number of registered subscribers.

RETURNS DESCRIPTION
int

The total count of subscribers in the observable.

Source code in pyventus/reactive/observables/observable.py
def get_subscriber_count(self) -> int:
    """
    Retrieve the number of registered subscribers.

    :return: The total count of subscribers in the observable.
    """
    with self.__thread_lock:
        return len(self.__subscribers)

contains_subscriber

contains_subscriber(subscriber: Subscriber[_OutT]) -> bool

Determine if the specified subscriber is present in the observable.

PARAMETER DESCRIPTION
subscriber

The subscriber to be checked.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
bool

True if the subscriber is found; False otherwise.

Source code in pyventus/reactive/observables/observable.py
def contains_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
    """
    Determine if the specified subscriber is present in the observable.

    :param subscriber: The subscriber to be checked.
    :return: `True` if the subscriber is found; `False` otherwise.
    """
    valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)
    with self.__thread_lock:
        return valid_subscriber in self.__subscribers

subscribe

subscribe(*, force_async: bool = False, stateful_subctx: bool = False) -> ObservableSubCtx[Self, _OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False) -> Subscriber[_OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False, stateful_subctx: bool = False) -> Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]

Subscribe the specified callbacks to the current Observable.

This method can be utilized in three ways:

  • As a regular function: Automatically creates and subscribes an observer with the specified callbacks.

  • As a decorator: Creates and subscribes an observer, using the decorated callback as the next callback.

  • As a context manager: Enables a step-by-step definition of the observer's callbacks prior to subscription, which occurs immediately after exiting the context.

PARAMETER DESCRIPTION
next_callback

The callback to be executed when the observable emits a new value.

TYPE: NextCallbackType[_OutT] | None DEFAULT: None

error_callback

The callback to be executed when the observable encounters an error.

TYPE: ErrorCallbackType | None DEFAULT: None

complete_callback

The callback that will be executed when the observable has completed emitting values.

TYPE: CompleteCallbackType | None DEFAULT: None

force_async

Determines whether to force all callbacks to run asynchronously.

TYPE: bool DEFAULT: False

stateful_subctx

A flag indicating whether the subscription context preserves its state (stateful) or not (stateless) after exiting the subscription block. If True, the context retains its state, allowing access to stored objects, including the observable and the subscriber object. If False, the context is stateless, and the stored state is cleared upon exiting the subscription block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]

A Subscriber if callbacks are provided; otherwise, an ObservableSubCtx.

Source code in pyventus/reactive/observables/observable.py
def subscribe(
    self,
    next_callback: NextCallbackType[_OutT] | None = None,
    error_callback: ErrorCallbackType | None = None,
    complete_callback: CompleteCallbackType | None = None,
    *,
    force_async: bool = False,
    stateful_subctx: bool = False,
) -> Subscriber[_OutT] | "Observable.ObservableSubCtx[Self, _OutT]":
    """
    Subscribe the specified callbacks to the current `Observable`.

    This method can be utilized in three ways:

    -   **As a regular function:** Automatically creates and subscribes an observer
        with the specified callbacks.

    -   **As a decorator:** Creates and subscribes an observer, using the decorated
        callback as the next callback.

    -   **As a context manager:** Enables a step-by-step definition of the observer's
        callbacks prior to subscription, which occurs immediately after exiting the context.

    :param next_callback: The callback to be executed when the observable emits a new value.
    :param error_callback: The callback to be executed when the observable encounters an error.
    :param complete_callback: The callback that will be executed when the observable has completed emitting values.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
    :param stateful_subctx: A flag indicating whether the subscription context preserves its state (`stateful`)
        or not (`stateless`) after exiting the subscription block. If `True`, the context retains its state,
        allowing access to stored objects, including the `observable` and the `subscriber` object. If `False`,
        the context is stateless, and the stored state is cleared upon exiting the subscription block to
        prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
    :return: A `Subscriber` if callbacks are provided; otherwise, an `ObservableSubCtx`.
    """
    if next_callback is None and error_callback is None and complete_callback is None:
        # If no callbacks are provided, create a subscription context for progressive definition.
        return Observable.ObservableSubCtx[Self, _OutT](
            observable=self,
            force_async=force_async,
            is_stateful=stateful_subctx,
        )
    else:
        # Create a subscriber with the provided callbacks.
        subscriber = Subscriber[_OutT](
            teardown_callback=self.remove_subscriber,
            next_callback=next_callback,
            error_callback=error_callback,
            complete_callback=complete_callback,
            force_async=force_async,
        )

        # Acquire lock to ensure thread safety.
        with self.__thread_lock:
            # Add the subscriber to the observable.
            self.__subscribers.add(subscriber)

        # Log the subscription if debug is enabled
        if self.__logger.debug_enabled:
            self.__logger.debug(action="Subscribed:", msg=f"{subscriber}")

        # Return the subscriber.
        return subscriber

remove_subscriber

remove_subscriber(subscriber: Subscriber[_OutT]) -> bool

Remove the specified subscriber from the observable.

PARAMETER DESCRIPTION
subscriber

The subscriber to be removed from the observable.

TYPE: Subscriber[_OutT]

RETURNS DESCRIPTION
bool

True if the subscriber was successfully removed; False if the subscriber was not found in the observable.

Source code in pyventus/reactive/observables/observable.py
def remove_subscriber(self, subscriber: Subscriber[_OutT]) -> bool:
    """
    Remove the specified subscriber from the observable.

    :param subscriber: The subscriber to be removed from the observable.
    :return: `True` if the subscriber was successfully removed; `False` if
        the subscriber was not found in the observable.
    """
    # Get the valid subscriber instance.
    valid_subscriber: Subscriber[_OutT] = self.get_valid_subscriber(subscriber)

    # Acquire lock to ensure thread safety.
    with self.__thread_lock:
        # Check if the subscriber is registered; return False if not.
        if valid_subscriber not in self.__subscribers:
            return False

        # Remove the subscriber from the observable.
        self.__subscribers.remove(valid_subscriber)

    # Log the removal if the debug mode is enabled
    if self.__logger.debug_enabled:
        self.__logger.debug(action="Removed:", msg=f"{valid_subscriber}")

    return True

remove_all

remove_all() -> bool

Remove all subscribers from the observable.

RETURNS DESCRIPTION
bool

True if the observable was successfully cleared; False if the observable was already empty.

Source code in pyventus/reactive/observables/observable.py
def remove_all(self) -> bool:
    """
    Remove all subscribers from the observable.

    :return: `True` if the observable was successfully cleared; `False`
        if the observable was already empty.
    """
    # Acquire lock to ensure thread safety
    with self.__thread_lock:
        # Check if the observable is already empty
        if not self.__subscribers:
            return False

        # Clear the observable
        self.__subscribers.clear()

    if self.__logger.debug_enabled:
        self.__logger.debug(action="Removed:", msg="All subscribers.")

    return True