ObservableTask
class¶
Bases: Generic[_OutT]
, Observable[_OutT]
An observable subclass that encapsulates a unit of work and offers a mechanism for streaming its results reactively.
Notes:
-
The
ObservableTask
class facilitates deferred execution of tasks, allowing subscribers to receive results incrementally as they become available. -
This class supports the encapsulation of tasks that can be either standard functions or methods, as well as generator functions.
-
Results are streamed to subscribers in a lazy manner, meaning they are produced on demand rather than all at once.
Source code in pyventus/reactive/observables/observable_task.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
|
Functions¶
get_valid_subscriber
staticmethod
¶
get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]
Validate and return the specified subscriber.
PARAMETER | DESCRIPTION |
---|---|
subscriber
|
The subscriber to validate.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Subscriber[_OutT]
|
The validated subscriber. |
RAISES | DESCRIPTION |
---|---|
PyventusException
|
If the subscriber is not an instance of |
Source code in pyventus/reactive/observables/observable.py
get_subscribers
¶
get_subscribers() -> set[Subscriber[_OutT]]
Retrieve all registered subscribers.
RETURNS | DESCRIPTION |
---|---|
set[Subscriber[_OutT]]
|
A set of all registered subscribers. |
get_subscriber_count
¶
Retrieve the number of registered subscribers.
RETURNS | DESCRIPTION |
---|---|
int
|
The total count of subscribers in the observable. |
contains_subscriber
¶
contains_subscriber(subscriber: Subscriber[_OutT]) -> bool
Determine if the specified subscriber is present in the observable.
PARAMETER | DESCRIPTION |
---|---|
subscriber
|
The subscriber to be checked.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
subscribe
¶
subscribe(*, force_async: bool = False, stateful_subctx: bool = False) -> ObservableSubCtx[Self, _OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False) -> Subscriber[_OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False, stateful_subctx: bool = False) -> Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]
Subscribe the specified callbacks to the current Observable
.
This method can be utilized in three ways:
-
As a regular function: Automatically creates and subscribes an observer with the specified callbacks.
-
As a decorator: Creates and subscribes an observer, using the decorated callback as the next callback.
-
As a context manager: Enables a step-by-step definition of the observer's callbacks prior to subscription, which occurs immediately after exiting the context.
PARAMETER | DESCRIPTION |
---|---|
next_callback
|
The callback to be executed when the observable emits a new value.
TYPE:
|
error_callback
|
The callback to be executed when the observable encounters an error.
TYPE:
|
complete_callback
|
The callback that will be executed when the observable has completed emitting values.
TYPE:
|
force_async
|
Determines whether to force all callbacks to run asynchronously.
TYPE:
|
stateful_subctx
|
A flag indicating whether the subscription context preserves its state (
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]
|
A |
Source code in pyventus/reactive/observables/observable.py
remove_subscriber
¶
remove_subscriber(subscriber: Subscriber[_OutT]) -> bool
Remove the specified subscriber from the observable.
PARAMETER | DESCRIPTION |
---|---|
subscriber
|
The subscriber to be removed from the observable.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
remove_all
¶
Remove all subscribers from the observable.
RETURNS | DESCRIPTION |
---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
__init__
¶
__init__(callback: ObservableTaskCallbackType[_OutT], args: tuple[Any, ...] | None = None, kwargs: dict[str, Any] | None = None, debug: bool | None = None) -> None
Initialize an instance of ObservableTask
.
PARAMETER | DESCRIPTION |
---|---|
callback
|
The callback to be encapsulated and made observable.
TYPE:
|
args
|
Positional arguments to be passed to the callback.
TYPE:
|
kwargs
|
Keyword arguments to be passed to the callback.
TYPE:
|
debug
|
Specifies the debug mode for the logger. If
TYPE:
|
Source code in pyventus/reactive/observables/observable_task.py
wait_for_tasks
async
¶
Wait for all background tasks associated with the ObservableTask
to complete.
It ensures that any ongoing tasks are finished before proceeding.
RETURNS | DESCRIPTION |
---|---|
None
|
None. |
Source code in pyventus/reactive/observables/observable_task.py
to_thread
¶
to_thread(executor: ThreadPoolExecutor | None = None, shutdown: bool = False) -> Generator[Self, None, None]
Configure the execution context block for processing the ObservableTask
using a thread-based executor.
This method allows the ObservableTask
to be executed in a separate thread, utilizing the specified
executor. Upon exiting the context, the observable task is executed within the provided executor.
PARAMETER | DESCRIPTION |
---|---|
executor
|
An optional
TYPE:
|
shutdown
|
A flag indicating whether to shut down the specified executor upon exiting the context. If the executor is
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Generator[Self, None, None]
|
The current ObservableTask instance. |
Source code in pyventus/reactive/observables/observable_task.py
__enter__
¶
Enter the execution context of the observable task.
This method facilitates interaction with the observable task object and ensures that the task is executed upon exiting the context block.
RETURNS | DESCRIPTION |
---|---|
Self
|
The observable task instance. |
Source code in pyventus/reactive/observables/observable_task.py
__exit__
¶
__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None
Exit the execution context of the observable task.
This method triggers the execution of the observable task upon exiting the context block.
PARAMETER | DESCRIPTION |
---|---|
exc_type
|
The type of the raised exception, if any.
TYPE:
|
exc_val
|
The raised exception object, if any.
TYPE:
|
exc_tb
|
The traceback information, if any.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
None
|
None. |
Source code in pyventus/reactive/observables/observable_task.py
__call__
¶
Execute the current ObservableTask
.
Notes:
-
When a thread-based executor is provided, the execution of the
ObservableTask
is submitted to that thread. -
If no executor is provided, the execution is submitted to the AsyncIO processing service of the current
ObservableTask
instance. -
The execution behavior within the AsyncIO processing service depends on whether an AsyncIO event loop is running. For more information, refer to the
AsyncIOProcessingService
.
PARAMETER | DESCRIPTION |
---|---|
executor
|
An optional thread-based executor instance for processing the
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
None
|
None. |