ObservableStream class¶
Bases: Generic[_OutT], Observable[_OutT]
An observable subclass that encapsulates a flow of data and offers a mechanism for streaming its entries reactively.
Notes:
-
The
ObservableStreamclass is a data flow-centric observable that focuses exclusively on the stream of its entries over time. -
Data streaming is managed through a queue, ensuring that the order of entries is preserved and that no inconsistent notifications are generated.
-
Data is streamed to subscribers in a lazy manner, allowing them to receive incremental notifications as they occur.
Source code in pyventus/reactive/observables/observable_stream.py
Functions¶
get_valid_subscriber
staticmethod
¶
get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]
Validate and return the specified subscriber.
| PARAMETER | DESCRIPTION |
|---|---|
subscriber
|
The subscriber to validate.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Subscriber[_OutT]
|
The validated subscriber. |
| RAISES | DESCRIPTION |
|---|---|
PyventusException
|
If the subscriber is not an instance of |
Source code in pyventus/reactive/observables/observable.py
get_subscribers
¶
get_subscribers() -> set[Subscriber[_OutT]]
Retrieve all registered subscribers.
| RETURNS | DESCRIPTION |
|---|---|
set[Subscriber[_OutT]]
|
A set of all registered subscribers. |
get_subscriber_count
¶
Retrieve the number of registered subscribers.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The total count of subscribers in the observable. |
contains_subscriber
¶
contains_subscriber(subscriber: Subscriber[_OutT]) -> bool
Determine if the specified subscriber is present in the observable.
| PARAMETER | DESCRIPTION |
|---|---|
subscriber
|
The subscriber to be checked.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
subscribe
¶
subscribe(*, force_async: bool = False, stateful_subctx: bool = False) -> ObservableSubCtx[Self, _OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False) -> Subscriber[_OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False, stateful_subctx: bool = False) -> Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]
Subscribe the specified callbacks to the current Observable.
This method can be utilized in three ways:
-
As a regular function: Automatically creates and subscribes an observer with the specified callbacks.
-
As a decorator: Creates and subscribes an observer, using the decorated callback as the next callback.
-
As a context manager: Enables a step-by-step definition of the observer's callbacks prior to subscription, which occurs immediately after exiting the context.
| PARAMETER | DESCRIPTION |
|---|---|
next_callback
|
The callback to be executed when the observable emits a new value.
TYPE:
|
error_callback
|
The callback to be executed when the observable encounters an error.
TYPE:
|
complete_callback
|
The callback that will be executed when the observable has completed emitting values.
TYPE:
|
force_async
|
Determines whether to force all callbacks to run asynchronously.
TYPE:
|
stateful_subctx
|
A flag indicating whether the subscription context preserves its state (
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]
|
A |
Source code in pyventus/reactive/observables/observable.py
remove_subscriber
¶
remove_subscriber(subscriber: Subscriber[_OutT]) -> bool
Remove the specified subscriber from the observable.
| PARAMETER | DESCRIPTION |
|---|---|
subscriber
|
The subscriber to be removed from the observable.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
remove_all
¶
Remove all subscribers from the observable.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
__init__
¶
Initialize an instance of ObservableStream.
| PARAMETER | DESCRIPTION |
|---|---|
debug
|
Specifies the debug mode for the logger. If
TYPE:
|
Source code in pyventus/reactive/observables/observable_stream.py
wait_for_tasks
async
¶
Wait for all background tasks in the current asyncio loop associated with the ObservableStream to complete.
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |
Source code in pyventus/reactive/observables/observable_stream.py
next
¶
Push a value entry to the stream and notify all subscribers.
| PARAMETER | DESCRIPTION |
|---|---|
value
|
The value entry to be pushed to the stream.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |
Source code in pyventus/reactive/observables/observable_stream.py
error
¶
Push an error entry to the stream and notify all subscribers.
| PARAMETER | DESCRIPTION |
|---|---|
exception
|
The error entry to be pushed to the stream.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |
Source code in pyventus/reactive/observables/observable_stream.py
complete
¶
Push a completion entry to the stream and notify all subscribers.
Once the notification is sent, all notified subscribers will be removed.
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |