ObservableValue class¶
Bases: Generic[_OutT], Observable[_OutT]
An observable subclass that encapsulates a value and offers a mechanism for streaming its updates reactively.
Notes:
-
The
ObservableValueclass is a value-centric observable that focuses solely on a single value and its changes over time. It notifies subscribers of the next value when valid, of an error when the value is deemed invalid by a validator, and of completion when the value is cleared and reset. -
Validators are responsible for validating incoming values. When a value is deemed invalid, the validator must raise an exception so that an error notification can be triggered. However, despite the value being invalid, it is stored alongside the raised exception, ensuring that both remain accessible until a new change is made.
-
Update and retrieval operations are managed through a queue, ensuring that their order is preserved and no inconsistent states or notifications are generated during execution.
-
Changes to the value are delivered to subscribers in a lazy manner, allowing them to receive incremental notifications as they occur.
Source code in pyventus/reactive/observables/observable_value.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 | |
Functions¶
get_valid_subscriber
staticmethod
¶
get_valid_subscriber(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]
Validate and return the specified subscriber.
| PARAMETER | DESCRIPTION |
|---|---|
subscriber
|
The subscriber to validate.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Subscriber[_OutT]
|
The validated subscriber. |
| RAISES | DESCRIPTION |
|---|---|
PyventusException
|
If the subscriber is not an instance of |
Source code in pyventus/reactive/observables/observable.py
get_subscribers
¶
get_subscribers() -> set[Subscriber[_OutT]]
Retrieve all registered subscribers.
| RETURNS | DESCRIPTION |
|---|---|
set[Subscriber[_OutT]]
|
A set of all registered subscribers. |
get_subscriber_count
¶
Retrieve the number of registered subscribers.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The total count of subscribers in the observable. |
contains_subscriber
¶
contains_subscriber(subscriber: Subscriber[_OutT]) -> bool
Determine if the specified subscriber is present in the observable.
| PARAMETER | DESCRIPTION |
|---|---|
subscriber
|
The subscriber to be checked.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
subscribe
¶
subscribe(*, force_async: bool = False, stateful_subctx: bool = False) -> ObservableSubCtx[Self, _OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False) -> Subscriber[_OutT]
subscribe(next_callback: NextCallbackType[_OutT] | None = None, error_callback: ErrorCallbackType | None = None, complete_callback: CompleteCallbackType | None = None, *, force_async: bool = False, stateful_subctx: bool = False) -> Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]
Subscribe the specified callbacks to the current Observable.
This method can be utilized in three ways:
-
As a regular function: Automatically creates and subscribes an observer with the specified callbacks.
-
As a decorator: Creates and subscribes an observer, using the decorated callback as the next callback.
-
As a context manager: Enables a step-by-step definition of the observer's callbacks prior to subscription, which occurs immediately after exiting the context.
| PARAMETER | DESCRIPTION |
|---|---|
next_callback
|
The callback to be executed when the observable emits a new value.
TYPE:
|
error_callback
|
The callback to be executed when the observable encounters an error.
TYPE:
|
complete_callback
|
The callback that will be executed when the observable has completed emitting values.
TYPE:
|
force_async
|
Determines whether to force all callbacks to run asynchronously.
TYPE:
|
stateful_subctx
|
A flag indicating whether the subscription context preserves its state (
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Subscriber[_OutT] | ObservableSubCtx[Self, _OutT]
|
A |
Source code in pyventus/reactive/observables/observable.py
remove_subscriber
¶
remove_subscriber(subscriber: Subscriber[_OutT]) -> bool
Remove the specified subscriber from the observable.
| PARAMETER | DESCRIPTION |
|---|---|
subscriber
|
The subscriber to be removed from the observable.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
remove_all
¶
Remove all subscribers from the observable.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
Source code in pyventus/reactive/observables/observable.py
__init__
¶
__init__(seed: _OutT, validators: list[ObservableValueValidatorType[_OutT]] | None = None, debug: bool | None = None) -> None
Initialize an instance of ObservableValue.
| PARAMETER | DESCRIPTION |
|---|---|
seed
|
The initial value for the observable. This value is used during initialization and reset operations to restore the observable to its original state. No validation is applied to this value.
TYPE:
|
validators
|
A list of validators that check incoming values. When a value is deemed invalid, the validator must raise an exception to trigger an error notification. Validators can be either synchronous or asynchronous callables.
TYPE:
|
debug
|
Specifies the debug mode for the logger. If
TYPE:
|
Source code in pyventus/reactive/observables/observable_value.py
wait_for_tasks
async
¶
Wait for all background tasks in the current asyncio loop associated with the ObservableValue to complete.
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |
Source code in pyventus/reactive/observables/observable_value.py
get_error
¶
get_error(callback: Callable[[Exception | None], None | Awaitable[None]], force_async: bool = False) -> None
Retrieve the current error, if any.
The current error will be received through the parameters of the provided callback, and its execution will be enqueued with the other operations to ensure that the received error corresponds to the correct state at the time the current method was invoked, preventing any inconsistencies.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
The callback to be invoked with the current error, if any.
TYPE:
|
force_async
|
Determines whether to force the callback to run asynchronously. If
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |
| RAISES | DESCRIPTION |
|---|---|
PyventusException
|
If the provided callback is a generator. |
Source code in pyventus/reactive/observables/observable_value.py
get_value
¶
Retrieve the current value.
The current value will be received through the parameters of the provided callback, and its execution will be enqueued with the other operations to ensure that the received value corresponds to the correct state at the time the current method was invoked, preventing any inconsistencies.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
The callback to be invoked with the current value.
TYPE:
|
force_async
|
Determines whether to force the callback to run asynchronously. If
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |
| RAISES | DESCRIPTION |
|---|---|
PyventusException
|
If the provided callback is a generator. |
Source code in pyventus/reactive/observables/observable_value.py
set_value
¶
Update the current value to the specified one.
The provided value is validated against the defined validators. If it is deemed valid, it is stored, and the next notification is triggered. If the value is considered invalid by any of the validators, it is stored alongside the raised exception, and an error notification is issued.
| PARAMETER | DESCRIPTION |
|---|---|
value
|
The value to set as the current value.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |
Source code in pyventus/reactive/observables/observable_value.py
clear_value
¶
Clear the current value and reset it to its initial state.
This operation will trigger the completion notification and the removal of all current subscribers.
| RETURNS | DESCRIPTION |
|---|---|
None
|
None. |
Source code in pyventus/reactive/observables/observable_value.py
prime_subscribers
¶
prime_subscribers(subscriber: Subscriber[_OutT]) -> Subscriber[_OutT]
prime_subscribers(*subscribers: Subscriber[_OutT]) -> tuple[Subscriber[_OutT], ...]
prime_subscribers(*subscribers: Subscriber[_OutT]) -> Subscriber[_OutT] | tuple[Subscriber[_OutT], ...]
Prime the specified subscribers with the current value or error.
| PARAMETER | DESCRIPTION |
|---|---|
subscribers
|
One or more subscribers to be primed.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Subscriber[_OutT] | tuple[Subscriber[_OutT], ...]
|
The same subscribers as input. |
| RAISES | DESCRIPTION |
|---|---|
PyventusException
|
If no subscribers are given. |