Skip to content

EventLinker class

A base class that orchestrates the linkage of events and their inherent logic.

Notes:

  • This class provides a centralized mechanism for managing the connections between events and their corresponding logic.

  • The EventLinker can be subclassed to create specific namespaces or contexts for managing events within different scopes. Subclassing also allows users to configure the settings of the EventLinker to suit their specific use cases.

  • The EventLinker is designed with thread safety in mind. All methods synchronize access to prevent race conditions when managing mutable properties across multiple threads.

Source code in pyventus/events/linkers/event_linker.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
class EventLinker:
    """
    A base class that orchestrates the linkage of events and their inherent logic.

    **Notes:**

    -   This class provides a centralized mechanism for managing the connections
        between events and their corresponding logic.

    -   The `EventLinker` can be subclassed to create specific namespaces or contexts
        for managing events within different scopes. Subclassing also allows users to
        configure the settings of the `EventLinker` to suit their specific use cases.

    -   The `EventLinker` is designed with *thread safety* in mind. All methods
        synchronize access to prevent race conditions when managing mutable
        properties across multiple threads.
    """

    @final
    class EventLinkerSubCtx(Generic[_SubCtxE], SubscriptionContext[_SubCtxE, EventSubscriber]):
        """
        A context manager for event linker subscriptions.

        **Notes:**

        -   This class establishes a context block for a step-by-step definition of event responses
            prior to the actual subscription, which occurs immediately upon exiting the context block.

        -   This class can be used as either a decorator or a context manager. When used as a
            decorator, it automatically subscribes the decorated callback to the specified events.
            When used as a context manager with the `with` statement, it allows multiple callbacks
            to be associated with the specified events within the context block.

        -   This subscription context can be `stateful`, retaining references to the `event linker`
            and `subscriber`, or `stateless`, which clears the context upon exiting the subscription
            block.

        -   This class is not intended to be subclassed or manually instantiated.
        """

        # Attributes for the EventLinkerSubCtx
        __slots__ = (
            "__events",
            "__event_callback",
            "__success_callback",
            "__failure_callback",
            "__force_async",
            "__once",
        )

        def __init__(
            self,
            events: tuple[SubscribableEventType, ...],
            event_linker: _SubCtxE,
            force_async: bool,
            once: bool,
            is_stateful: bool,
        ) -> None:
            """
            Initialize an instance of `EventLinkerSubCtx`.

            :param events: The events to subscribe/link to.
            :param event_linker: The type of event linker used for the actual subscription.
            :param force_async: Determines whether to force all callbacks to run asynchronously.
            :param once: Specifies if the event subscriber will be a one-time subscription.
            :param is_stateful: A flag indicating whether the context preserves its state (stateful) or
                not (stateless) after exiting the subscription context. If `True`, the context retains its
                state, allowing access to stored objects, including the `event linker` and the `subscriber`
                object. If `False`, the context is stateless, and the stored state is cleared upon exiting
                the subscription context to prevent memory leaks.
            """
            # Initialize the base SubscriptionContext class
            super().__init__(source=event_linker, is_stateful=is_stateful)

            # Initialize variables
            self.__events: tuple[SubscribableEventType, ...] = events
            self.__event_callback: EventCallbackType | None = None
            self.__success_callback: SuccessCallbackType | None = None
            self.__failure_callback: FailureCallbackType | None = None
            self.__force_async: bool = force_async
            self.__once: bool = once

        @override
        def _exit(self) -> EventSubscriber:
            # Ensure that the source is not None
            if self._source is None:  # pragma: no cover
                raise PyventusException("The subscription context is closed.")

            # Check if the event callback has been set
            if self.__event_callback is None:
                raise PyventusException("The event callback has not been set.")

            # Subscribe the defined callbacks to the
            # specified events and store the returned subscriber.
            subscriber: EventSubscriber = self._source.subscribe(
                *self.__events,
                event_callback=self.__event_callback,
                success_callback=self.__success_callback,
                failure_callback=self.__failure_callback,
                force_async=self.__force_async,
                once=self.__once,
            )

            # Remove context-specific attributes to clean up
            # and prevent memory leaks.
            del self.__events
            del self.__event_callback, self.__success_callback, self.__failure_callback
            del self.__force_async, self.__once

            # Return the subscriber
            return subscriber

        def on_event(self, callback: EventCallbackType) -> EventCallbackType:
            """
            Set the main callback for the event.

            :param callback: The callback to be executed when the event occurs.
            :return: The decorated callback.
            """
            self.__event_callback = callback
            return callback

        def on_success(self, callback: SuccessCallbackType) -> SuccessCallbackType:
            """
            Set the success callback for the event.

            :param callback: The callback to be executed when the event response completes successfully.
            :return: The decorated callback.
            """
            self.__success_callback = callback
            return callback

        def on_failure(self, callback: FailureCallbackType) -> FailureCallbackType:
            """
            Set the failure callback for the event.

            :param callback: The callback to be executed when the event response fails.
            :return: The decorated callback.
            """
            self.__failure_callback = callback
            return callback

        def __call__(
            self, callback: EventCallbackType
        ) -> tuple[EventCallbackType, "EventLinker.EventLinkerSubCtx[_SubCtxE]"] | EventCallbackType:
            """
            Subscribe the decorated callback to the specified events.

            :param callback: The callback to be executed when the event occurs.
            :return: A tuple containing the decorated callback and its subscription context
                if the context is stateful; otherwise, returns the decorated callback alone.
            """
            # Store the provided callback as the event callback
            self.__event_callback = callback

            # Set success and failure callbacks to None
            self.__success_callback = None
            self.__failure_callback = None

            # Determine if the subscription context is stateful
            is_stateful: bool = self._is_stateful

            # Call the exit method to finalize the
            # subscription process and clean up any necessary context.
            self.__exit__(None, None, None)

            # Return a tuple containing the decorated callback
            # and the current subscription context if the context
            # is stateful; otherwise, return just the callback.
            return (callback, self) if is_stateful else callback

    __registry: MultiBidict[str, EventSubscriber] = MultiBidict[str, EventSubscriber]()
    """
    A registry that serves as a container for storing events and their associated subscribers. It 
    utilizes an optimized data structure that enables quick lookups, updates, and even deletions 
    of events and their subscribers.
    """

    __max_subscribers: int | None = None
    """The maximum number of subscribers allowed per event, or `None` if there is no limit."""

    __default_success_callback: SuccessCallbackType | None = None
    """ 
    Represents the default success callback that will be assigned to subscribers in the 
    absence of a specific success callback. This callback will be executed upon successful 
    completion of the event response in each subscriber.
    """

    __default_failure_callback: FailureCallbackType | None = None
    """
    Represents the default failure callback that will be assigned to subscribers in the
    absence of a specific failure callback. This callback will be executed when the event 
    response fails in each subscriber.
    """

    __thread_lock: Lock = Lock()
    """
    A `threading.Lock` object used for thread synchronization when accessing and modifying 
    mutable attributes to ensure thread safety. It prevents multiple threads from accessing 
    and modifying mutable properties simultaneously.
    """

    __logger: Logger = Logger(source="EventLinker(ClassReference)", debug=bool(gettrace() is not None))
    """
    The logger used to debug and log information within the `EventLinker` class. The debug mode
    of the logger depends on the execution environment and the value returned by the `gettrace()`
    function. The debug mode can also be influenced by subclassing and overridden in subclasses.
    """

    def __init_subclass__(
        cls,
        max_subscribers: int | None = None,
        default_success_callback: SuccessCallbackType | None = None,
        default_failure_callback: FailureCallbackType | None = None,
        debug: bool | None = None,
    ) -> None:
        """
        Initialize a subclass of `EventLinker`.

        By default, this method sets up the main registry and the thread lock object, but
        it can also be used to configure specific settings of the `EventLinker` subclass.

        :param max_subscribers: The maximum number of subscribers allowed per event, or `None` if there is no limit.
        :param default_success_callback: The default callback to assign as the success callback in the subscribers
            when no specific success callback is provided.
        :param default_failure_callback: The default callback to assign as the failure callback in the subscribers
            when no specific failure callback is provided.
        :param debug: Specifies the debug mode for the subclass logger. If `None`, it is determined based
            on the execution environment.
        :raises PyventusException: If `max_subscribers` is less than 1 or if the provided
            callbacks are invalid.
        :return: None.
        """
        # Initialize the main registry
        cls.__registry = MultiBidict[str, EventSubscriber]()

        # Create a lock object for thread synchronization
        cls.__thread_lock = Lock()

        # Validate the max_subscribers argument
        if max_subscribers is not None and max_subscribers < 1:
            raise PyventusException("The 'max_subscribers' argument must be greater than or equal to 1.")

        # Set the maximum number of subscribers per event
        cls.__max_subscribers = max_subscribers

        # Validate the default success callback, if any
        if default_success_callback is not None:
            validate_callable(default_success_callback)

        # Set the default success callback
        cls.__default_success_callback = default_success_callback

        # Validate the default failure callback, if any
        if default_failure_callback is not None:
            validate_callable(default_failure_callback)

        # Set the default failure callback
        cls.__default_failure_callback = default_failure_callback

        # Validate the debug argument
        if debug is not None and not isinstance(debug, bool):
            raise PyventusException("The 'debug' argument must be a boolean value.")

        # Set up the logger
        cls.__logger = Logger(source=cls, debug=debug if debug is not None else bool(gettrace() is not None))

    @classmethod
    def _get_logger(cls) -> Logger:
        """
        Retrieve the class-level logger instance.

        :return: The class-level logger instance used to debug and log
            information within the `EventLinker` class.
        """
        return cls.__logger

    @classmethod
    def _get_valid_and_unique_event_names(cls, events: tuple[SubscribableEventType, ...]) -> set[str]:
        """
        Validate and extract unique event names from the specified tuple of event objects.

        :param events: A tuple of event objects to validate and extract unique names from.
        :return: A set of unique and valid event names derived from the provided tuple of event objects.
        :raises PyventusException: If the 'events' argument is None, empty, or contains invalid events.
        """
        if not events:
            raise PyventusException("The 'events' argument cannot be None or empty.")
        return {cls.get_valid_event_name(event) for event in events}

    @classmethod
    def _get_valid_and_unique_subscribers(cls, subscribers: tuple[EventSubscriber, ...]) -> set[EventSubscriber]:
        """
        Validate and extract unique subscribers from the specified tuple of subscribers.

        :param subscribers: A tuple of event subscribers to validate and extract unique entries from.
        :return: A set of unique and valid subscribers derived from the provided tuple of subscribers.
        :raises PyventusException: If the 'subscribers' argument is None, empty, or contains invalid subscribers.
        """
        if not subscribers:
            raise PyventusException("The 'subscribers' argument cannot be None or empty.")
        return {cls.get_valid_subscriber(subscriber) for subscriber in subscribers}

    @classmethod
    def get_valid_event_name(cls, event: SubscribableEventType) -> str:
        """
        Determine the name of the event and performs validation.

        :param event: The event to obtain the name for.
        :return: A string that represents the event name.
        :raises PyventusException: If the `event` argument is invalid
            or if the event is not supported.
        """
        # Validate the event argument
        if event is None:
            raise PyventusException("The 'event' argument cannot be None.")

        if event is Ellipsis:
            # If the event is Ellipsis, return its type name
            return type(event).__name__
        elif isinstance(event, str):
            if not event:
                raise PyventusException("String events cannot be empty.")
            # If the event is a non-empty string, return it as the event name
            return event
        elif isinstance(event, type):
            if not is_dataclass(event) and not issubclass(event, Exception):
                raise PyventusException("Type events must be either a dataclass or an exception.")
            # If the event is either a dataclass type or an exception type, return its type name
            return event.__name__
        else:
            # If the event is not supported, raise an exception
            raise PyventusException("Unsupported event")

    @classmethod
    def get_valid_subscriber(cls, subscriber: EventSubscriber) -> EventSubscriber:
        """
        Validate and return the specified subscriber.

        :param subscriber: The subscriber to validate.
        :return: The validated subscriber.
        :raises PyventusException: If the subscriber is not an instance of `EventSubscriber`.
        """
        # Validate that the subscriber is an instance of EventSubscriber
        if not isinstance(subscriber, EventSubscriber):
            raise PyventusException("The 'subscriber' argument must be an instance of EventSubscriber.")
        return subscriber

    @classmethod
    def get_max_subscribers(cls) -> int | None:
        """
        Retrieve the maximum number of subscribers allowed per event.

        :return: The maximum number of subscribers or `None` if there is no limit.
        """
        return cls.__max_subscribers

    @classmethod
    def get_default_success_callback(cls) -> SuccessCallbackType | None:
        """
        Retrieve the default success callback to assign to subscribers when no specific callback is provided.

        :return: The default success callback or `None` if not set.
        """
        return cls.__default_success_callback

    @classmethod
    def get_default_failure_callback(cls) -> FailureCallbackType | None:
        """
        Retrieve the default failure callback to assign to subscribers when no specific callback is provided.

        :return: The default failure callback or `None` if not set.
        """
        return cls.__default_failure_callback

    @classmethod
    def is_empty(cls) -> bool:
        """
        Determine whether the main registry is empty.

        :return: `True` if the main registry is empty, `False` otherwise.
        """
        with cls.__thread_lock:
            return cls.__registry.is_empty

    @classmethod
    def get_registry(cls) -> dict[str, set[EventSubscriber]]:
        """
        Retrieve a shallow copy of the main registry.

        :return: A shallow copy of the main registry, where each
            event is mapped to a set of its linked subscribers.
        """
        with cls.__thread_lock:
            return cls.__registry.to_dict()

    @classmethod
    def get_events(cls) -> set[str]:
        """
        Retrieve all registered events.

        :return: A set of all registered event names.
        """
        with cls.__thread_lock:
            return cls.__registry.keys

    @classmethod
    def get_subscribers(cls) -> set[EventSubscriber]:
        """
        Retrieve all registered subscribers.

        :return: A set of all registered subscribers.
        """
        with cls.__thread_lock:
            return cls.__registry.values

    @classmethod
    def get_event_count(cls) -> int:
        """
        Retrieve the number of registered events.

        :return: The total count of events in the registry.
        """
        with cls.__thread_lock:
            return cls.__registry.key_count

    @classmethod
    def get_subscriber_count(cls) -> int:
        """
        Retrieve the number of registered subscribers.

        :return: The total count of subscribers in the registry.
        """
        with cls.__thread_lock:
            return cls.__registry.value_count

    @classmethod
    def get_events_from_subscribers(cls, *subscribers: EventSubscriber) -> set[str]:
        """
        Retrieve a set of events associated with the specified subscribers.

        :param subscribers: One or more subscribers for which to retrieve associated events.
        :return: A set of events linked to the specified subscribers. Unregistered subscribers are ignored.
        """
        # Validate and retrieve all unique subscribers to avoid duplicate processing
        unique_subscribers: set[EventSubscriber] = cls._get_valid_and_unique_subscribers(subscribers)

        # Return the set of event names associated with the unique subscribers
        with cls.__thread_lock:
            return cls.__registry.get_keys_from_values(unique_subscribers)

    @classmethod
    def get_subscribers_from_events(
        cls, *events: SubscribableEventType, pop_onetime_subscribers: bool = False
    ) -> set[EventSubscriber]:
        """
        Retrieve a set of subscribers associated with the specified events.

        :param events: One or more events for which to retrieve associated subscribers.
        :param pop_onetime_subscribers: If `True`, removes one-time subscribers (those
            with the property `once` set to True) from the registry.
        :return: A set of subscribers linked to the specified events. Unregistered events are ignored.
        """
        # Validate and retrieve all unique event names to avoid duplicate processing
        unique_events: set[str] = cls._get_valid_and_unique_event_names(events)

        # Acquire lock to ensure thread safety
        with cls.__thread_lock:
            # Retrieve subscribers associated with the unique events
            subscribers: set[EventSubscriber] = cls.__registry.get_values_from_keys(unique_events)

            # Just return subscribers if pop_one_time_subscribers is False
            if not pop_onetime_subscribers:
                return subscribers

            # Remove one-time subscribers from the registry
            for subscriber in subscribers:
                if subscriber.once:
                    cls.__registry.remove_value(subscriber)

        # Return the set of subscribers
        return subscribers

    @classmethod
    def get_event_count_from_subscriber(cls, subscriber: EventSubscriber) -> int:
        """
        Retrieve the number of events associated with the given subscriber.

        :param subscriber: The subscriber for which to count the associated events.
        :return: The count of events associated with the specified subscriber,
            or 0 if the subscriber is not found.
        """
        valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)
        with cls.__thread_lock:
            return cls.__registry.get_key_count_from_value(valid_subscriber)

    @classmethod
    def get_subscriber_count_from_event(cls, event: SubscribableEventType) -> int:
        """
        Retrieve the number of subscribers associated with a given event.

        :param event: The event for which to count the associated subscribers.
        :return: The count of subscribers associated with the specified event,
            or 0 if the event is not found.
        """
        valid_event: str = cls.get_valid_event_name(event)
        with cls.__thread_lock:
            return cls.__registry.get_value_count_from_key(valid_event)

    @classmethod
    def contains_event(cls, event: SubscribableEventType) -> bool:
        """
        Determine if the specified event is present in the registry.

        :param event: The event to be checked.
        :return: `True` if the event is found; `False` otherwise.
        """
        valid_event: str = cls.get_valid_event_name(event)
        with cls.__thread_lock:
            return cls.__registry.contains_key(valid_event)

    @classmethod
    def contains_subscriber(cls, subscriber: EventSubscriber) -> bool:
        """
        Determine if the specified subscriber is present in the registry.

        :param subscriber: The subscriber to be checked.
        :return: `True` if the subscriber is found; `False` otherwise.
        """
        valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)
        with cls.__thread_lock:
            return cls.__registry.contains_value(valid_subscriber)

    @classmethod
    def are_linked(cls, event: SubscribableEventType, subscriber: EventSubscriber) -> bool:
        """
        Determine whether the given event is linked with the specified subscriber.

        :param event: The event for which the association is being checked.
        :param subscriber: The subscriber for which the association is being checked.
        :return: `True` if the subscriber is linked to the event; `False` otherwise.
        """
        valid_event: str = cls.get_valid_event_name(event)
        valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)
        with cls.__thread_lock:
            return cls.__registry.are_associated(valid_event, valid_subscriber)

    @classmethod
    def once(
        cls, *events: SubscribableEventType, force_async: bool = False, stateful_subctx: bool = False
    ) -> EventLinkerSubCtx[type[Self]]:
        """
        Subscribe callbacks to the specified events for a single invocation.

        This method can be used as either a decorator or a context manager. When used as a
        decorator, it automatically subscribes the decorated callback to the provided events.
        When used as a context manager with the `with` statement, it allows multiple callbacks
        to be associated with the provided events within the context block.

        :param events: The events to subscribe to.
        :param force_async: Determines whether to force all callbacks to run asynchronously.
            If `True`, synchronous callbacks will be converted to run asynchronously in a
            thread pool, using the `asyncio.to_thread` function. If `False`, callbacks
            will run synchronously or asynchronously as defined.
        :param stateful_subctx: A flag indicating whether the subscription context preserves its state
            (`stateful`) or not (`stateless`) after exiting the subscription block. If `True`, the context retains
            its state, allowing access to stored objects, including the `event linker` and the `subscriber` object.
            If `False`, the context is stateless, and the stored state is cleared upon exiting the subscription
            block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
        :return: A `EventLinkerSubCtx` instance.
        """
        return EventLinker.EventLinkerSubCtx[type[Self]](
            events=events, event_linker=cls, force_async=force_async, once=True, is_stateful=stateful_subctx
        )

    @classmethod
    def on(
        cls, *events: SubscribableEventType, force_async: bool = False, stateful_subctx: bool = False
    ) -> EventLinkerSubCtx[type[Self]]:
        """
        Subscribe callbacks to the specified events.

        This method can be used as either a decorator or a context manager. When used as a
        decorator, it automatically subscribes the decorated callback to the provided events.
        When used as a context manager with the `with` statement, it allows multiple callbacks
        to be associated with the provided events within the context block.

        :param events: The events to subscribe to.
        :param force_async: Determines whether to force all callbacks to run asynchronously.
            If `True`, synchronous callbacks will be converted to run asynchronously in a
            thread pool, using the `asyncio.to_thread` function. If `False`, callbacks
            will run synchronously or asynchronously as defined.
        :param stateful_subctx: A flag indicating whether the subscription context preserves its state
            (`stateful`) or not (`stateless`) after exiting the subscription block. If `True`, the context retains
            its state, allowing access to stored objects, including the `event linker` and the `subscriber` object.
            If `False`, the context is stateless, and the stored state is cleared upon exiting the subscription
            block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
        :return: A `EventLinkerSubCtx` instance.
        """
        return EventLinker.EventLinkerSubCtx[type[Self]](
            events=events, event_linker=cls, force_async=force_async, once=False, is_stateful=stateful_subctx
        )

    @classmethod
    def subscribe(
        cls,
        *events: SubscribableEventType,
        event_callback: EventCallbackType,
        success_callback: SuccessCallbackType | None = None,
        failure_callback: FailureCallbackType | None = None,
        force_async: bool = False,
        once: bool = False,
    ) -> EventSubscriber:
        """
        Subscribe the specified callbacks to the given events.

        :param events: The events to subscribe to.
        :param event_callback: The callback to be executed when the event occurs.
        :param success_callback: The callback to be executed when the event response completes successfully.
        :param failure_callback: The callback to be executed when the event response fails.
        :param force_async: Determines whether to force all callbacks to run asynchronously.
            If `True`, synchronous callbacks will be converted to run asynchronously in a
            thread pool, using the `asyncio.to_thread` function. If `False`, callbacks
            will run synchronously or asynchronously as defined.
        :param once: Specifies if the subscriber will be a one-time subscriber.
        :return: The subscriber associated with the given events.
        """
        # Validate and retrieve all unique event names
        unique_events: set[str] = cls._get_valid_and_unique_event_names(events)

        # Acquire the lock to ensure exclusive access to the main registry
        with cls.__thread_lock:
            # Check if the maximum number of subscribers is set
            if cls.__max_subscribers is not None:
                # For each event name, check if the maximum number
                # of subscribers for the event has been exceeded
                for event in unique_events:
                    if cls.__registry.get_value_count_from_key(event) >= cls.__max_subscribers:
                        raise PyventusException(
                            f"The event '{event}' has exceeded the maximum number of subscribers allowed."
                        )

            # Create a new event subscriber
            subscriber: EventSubscriber = EventSubscriber(
                teardown_callback=cls.remove_subscriber,
                event_callback=event_callback,
                success_callback=success_callback if success_callback else cls.__default_success_callback,
                failure_callback=failure_callback if failure_callback else cls.__default_failure_callback,
                force_async=force_async,
                once=once,
            )

            # Register the subscriber for each unique event
            for event in unique_events:
                cls.__registry.insert(event, subscriber)

        # Log the subscription if debug is enabled
        if cls.__logger.debug_enabled:
            cls.__logger.debug(
                action="Subscribed:", msg=f"{subscriber} {StdOutColors.PURPLE_TEXT('Events:')} {unique_events}"
            )

        # Return the new event subscriber
        return subscriber

    @classmethod
    def remove(cls, event: SubscribableEventType, subscriber: EventSubscriber) -> bool:
        """
        Remove the specified subscriber from the given event.

        :param event: The event from which the subscriber will be removed.
        :param subscriber: The subscriber to be removed from the event.
        :return: `True` if the subscriber was successfully removed; `False` if
            no removal occurred due to the event or subscriber not being registered,
            or if they are not linked.
        """
        # Validate the given event and subscriber
        valid_event: str = cls.get_valid_event_name(event)
        valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)

        # Acquire lock to ensure thread safety
        with cls.__thread_lock:
            # Check if the event and subscriber are registered and linked
            if not cls.__registry.are_associated(valid_event, valid_subscriber):
                return False

            # Remove the subscriber from the event
            cls.__registry.remove(valid_event, valid_subscriber)

        # Log the removal if the debug mode is enabled
        if cls.__logger.debug_enabled:
            cls.__logger.debug(
                action="Removed:", msg=f"{valid_subscriber} {StdOutColors.PURPLE_TEXT('Event:')} '{valid_event}'"
            )

        return True

    @classmethod
    def remove_event(cls, event: SubscribableEventType) -> bool:
        """
        Remove the specified event from the registry.

        :param event: The event to be removed from the registry.
        :return: `True` if the event was successfully removed; `False`
            if the event was not found in the registry.
        """
        # Get the valid event name
        valid_event: str = cls.get_valid_event_name(event)

        # Acquire lock to ensure thread safety
        with cls.__thread_lock:
            # Check if the event is registered; return False if not
            if not cls.__registry.contains_key(valid_event):
                return False

            # Remove the event from the registry
            cls.__registry.remove_key(valid_event)

        # Log the removal if the debug mode is enabled
        if cls.__logger.debug_enabled:
            cls.__logger.debug(action="Removed:", msg=f"Event: '{valid_event}'")

        return True

    @classmethod
    def remove_subscriber(cls, subscriber: EventSubscriber) -> bool:
        """
        Remove the specified subscriber from the registry.

        :param subscriber: The subscriber to be removed from the registry.
        :return: `True` if the subscriber was successfully removed; `False` if
            the subscriber was not found in the registry.
        """
        # Get the valid subscriber instance
        valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)

        # Acquire lock to ensure thread safety
        with cls.__thread_lock:
            # Check if the subscriber is registered; return False if not
            if not cls.__registry.contains_value(valid_subscriber):
                return False

            # Remove the subscriber from the registry
            cls.__registry.remove_value(valid_subscriber)

        # Log the removal if the debug mode is enabled
        if cls.__logger.debug_enabled:
            cls.__logger.debug(action="Removed:", msg=f"{valid_subscriber}")

        return True

    @classmethod
    def remove_all(cls) -> bool:
        """
        Remove all events and subscribers from the registry.

        :return: `True` if the registry was successfully cleared; `False`
            if the registry was already empty.
        """
        # Acquire lock to ensure thread safety
        with cls.__thread_lock:
            # Check if the registry is already empty
            if cls.__registry.is_empty:
                return False

            # Clear the registry
            cls.__registry.clear()

        if cls.__logger.debug_enabled:
            cls.__logger.debug(action="Removed:", msg="All events and subscribers.")

        return True

Classes

EventLinkerSubCtx

Bases: Generic[_SubCtxE], SubscriptionContext[_SubCtxE, EventSubscriber]

A context manager for event linker subscriptions.

Notes:

  • This class establishes a context block for a step-by-step definition of event responses prior to the actual subscription, which occurs immediately upon exiting the context block.

  • This class can be used as either a decorator or a context manager. When used as a decorator, it automatically subscribes the decorated callback to the specified events. When used as a context manager with the with statement, it allows multiple callbacks to be associated with the specified events within the context block.

  • This subscription context can be stateful, retaining references to the event linker and subscriber, or stateless, which clears the context upon exiting the subscription block.

  • This class is not intended to be subclassed or manually instantiated.

Source code in pyventus/events/linkers/event_linker.py
@final
class EventLinkerSubCtx(Generic[_SubCtxE], SubscriptionContext[_SubCtxE, EventSubscriber]):
    """
    A context manager for event linker subscriptions.

    **Notes:**

    -   This class establishes a context block for a step-by-step definition of event responses
        prior to the actual subscription, which occurs immediately upon exiting the context block.

    -   This class can be used as either a decorator or a context manager. When used as a
        decorator, it automatically subscribes the decorated callback to the specified events.
        When used as a context manager with the `with` statement, it allows multiple callbacks
        to be associated with the specified events within the context block.

    -   This subscription context can be `stateful`, retaining references to the `event linker`
        and `subscriber`, or `stateless`, which clears the context upon exiting the subscription
        block.

    -   This class is not intended to be subclassed or manually instantiated.
    """

    # Attributes for the EventLinkerSubCtx
    __slots__ = (
        "__events",
        "__event_callback",
        "__success_callback",
        "__failure_callback",
        "__force_async",
        "__once",
    )

    def __init__(
        self,
        events: tuple[SubscribableEventType, ...],
        event_linker: _SubCtxE,
        force_async: bool,
        once: bool,
        is_stateful: bool,
    ) -> None:
        """
        Initialize an instance of `EventLinkerSubCtx`.

        :param events: The events to subscribe/link to.
        :param event_linker: The type of event linker used for the actual subscription.
        :param force_async: Determines whether to force all callbacks to run asynchronously.
        :param once: Specifies if the event subscriber will be a one-time subscription.
        :param is_stateful: A flag indicating whether the context preserves its state (stateful) or
            not (stateless) after exiting the subscription context. If `True`, the context retains its
            state, allowing access to stored objects, including the `event linker` and the `subscriber`
            object. If `False`, the context is stateless, and the stored state is cleared upon exiting
            the subscription context to prevent memory leaks.
        """
        # Initialize the base SubscriptionContext class
        super().__init__(source=event_linker, is_stateful=is_stateful)

        # Initialize variables
        self.__events: tuple[SubscribableEventType, ...] = events
        self.__event_callback: EventCallbackType | None = None
        self.__success_callback: SuccessCallbackType | None = None
        self.__failure_callback: FailureCallbackType | None = None
        self.__force_async: bool = force_async
        self.__once: bool = once

    @override
    def _exit(self) -> EventSubscriber:
        # Ensure that the source is not None
        if self._source is None:  # pragma: no cover
            raise PyventusException("The subscription context is closed.")

        # Check if the event callback has been set
        if self.__event_callback is None:
            raise PyventusException("The event callback has not been set.")

        # Subscribe the defined callbacks to the
        # specified events and store the returned subscriber.
        subscriber: EventSubscriber = self._source.subscribe(
            *self.__events,
            event_callback=self.__event_callback,
            success_callback=self.__success_callback,
            failure_callback=self.__failure_callback,
            force_async=self.__force_async,
            once=self.__once,
        )

        # Remove context-specific attributes to clean up
        # and prevent memory leaks.
        del self.__events
        del self.__event_callback, self.__success_callback, self.__failure_callback
        del self.__force_async, self.__once

        # Return the subscriber
        return subscriber

    def on_event(self, callback: EventCallbackType) -> EventCallbackType:
        """
        Set the main callback for the event.

        :param callback: The callback to be executed when the event occurs.
        :return: The decorated callback.
        """
        self.__event_callback = callback
        return callback

    def on_success(self, callback: SuccessCallbackType) -> SuccessCallbackType:
        """
        Set the success callback for the event.

        :param callback: The callback to be executed when the event response completes successfully.
        :return: The decorated callback.
        """
        self.__success_callback = callback
        return callback

    def on_failure(self, callback: FailureCallbackType) -> FailureCallbackType:
        """
        Set the failure callback for the event.

        :param callback: The callback to be executed when the event response fails.
        :return: The decorated callback.
        """
        self.__failure_callback = callback
        return callback

    def __call__(
        self, callback: EventCallbackType
    ) -> tuple[EventCallbackType, "EventLinker.EventLinkerSubCtx[_SubCtxE]"] | EventCallbackType:
        """
        Subscribe the decorated callback to the specified events.

        :param callback: The callback to be executed when the event occurs.
        :return: A tuple containing the decorated callback and its subscription context
            if the context is stateful; otherwise, returns the decorated callback alone.
        """
        # Store the provided callback as the event callback
        self.__event_callback = callback

        # Set success and failure callbacks to None
        self.__success_callback = None
        self.__failure_callback = None

        # Determine if the subscription context is stateful
        is_stateful: bool = self._is_stateful

        # Call the exit method to finalize the
        # subscription process and clean up any necessary context.
        self.__exit__(None, None, None)

        # Return a tuple containing the decorated callback
        # and the current subscription context if the context
        # is stateful; otherwise, return just the callback.
        return (callback, self) if is_stateful else callback

Functions

unpack
unpack() -> tuple[_SourceType | None, _SubscriberType | None]

Unpack and retrieve the source object and its associated subscriber.

This method returns a tuple containing the source object and its subscriber, while also handling the cleanup of associated resources to prevent memory leaks. After retrieving the objects, it deletes internal references to the source and subscriber to ensure they are no longer retained.

RETURNS DESCRIPTION
tuple[_SourceType | None, _SubscriberType | None]

A tuple of the form (source, subscriber). Both may be None if the subscription context has either unpacked the state previously or is stateless.

RAISES DESCRIPTION
PyventusException

If this method is called before or during the subscription context, indicating that the resources are not yet available for unpacking.

Source code in pyventus/core/subscriptions/subscription_context.py
def unpack(self) -> tuple[_SourceType | None, _SubscriberType | None]:
    """
    Unpack and retrieve the source object and its associated subscriber.

    This method returns a tuple containing the source object and its subscriber,
    while also handling the cleanup of associated resources to prevent memory leaks.
    After retrieving the objects, it deletes internal references to the source and
    subscriber to ensure they are no longer retained.

    :return: A tuple of the form (source, subscriber). Both may be `None` if the
        subscription context has either unpacked the state previously or is stateless.
    :raises PyventusException: If this method is called before or during the subscription
        context, indicating that the resources are not yet available for unpacking.
    """
    # Create a tuple with the source object and its subscriber
    results: tuple[_SourceType | None, _SubscriberType | None] = (self._source, self._subscriber)

    # Perform cleanup by deleting unnecessary references
    if results[0]:
        del self.__source
    if results[1]:
        del self.__subscriber

    return results
__enter__
__enter__() -> Self

Enter the subscription context block.

This method facilitates the progressive definition of an object that will later be subscribed to the specified source.

RETURNS DESCRIPTION
Self

The subscription context manager.

Source code in pyventus/core/subscriptions/subscription_context.py
def __enter__(self: Self) -> Self:
    """
    Enter the subscription context block.

    This method facilitates the progressive definition of an
    object that will later be subscribed to the specified source.

    :return: The subscription context manager.
    """
    return self
__exit__
__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None

Exit the subscription context block.

This method subscribes the defined object to the specified source, and performs any necessary cleanup.

PARAMETER DESCRIPTION
exc_type

The type of the raised exception, if any.

TYPE: type[BaseException] | None

exc_val

The raised exception object, if any.

TYPE: BaseException | None

exc_tb

The traceback information, if any.

TYPE: TracebackType | None

RETURNS DESCRIPTION
None

None.

Source code in pyventus/core/subscriptions/subscription_context.py
def __exit__(
    self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
    """
    Exit the subscription context block.

    This method subscribes the defined object to the
    specified source, and performs any necessary cleanup.

    :param exc_type: The type of the raised exception, if any.
    :param exc_val: The raised exception object, if any.
    :param exc_tb: The traceback information, if any.
    :return: None.
    """
    # Finalize the subscription and retrieve the subscriber
    subscriber: _SubscriberType = self._exit()

    # Ensure the subscriber is valid
    if subscriber is None:  # pragma: no cover
        raise PyventusException("The 'subscriber' argument cannot be None.")

    # Check if a subscriber has already been set to avoid an override
    if self.__subscriber:  # pragma: no cover
        raise PyventusException("A 'subscriber' has already been set.")

    if self.__is_stateful:
        # Retain the subscriber if the context is stateful
        self.__subscriber = subscriber
    else:
        # Remove context-specific references if stateless
        del self.__source, self.__subscriber

    # Remove the stateful context flag
    del self.__is_stateful
__init__
__init__(events: tuple[SubscribableEventType, ...], event_linker: _SubCtxE, force_async: bool, once: bool, is_stateful: bool) -> None

Initialize an instance of EventLinkerSubCtx.

PARAMETER DESCRIPTION
events

The events to subscribe/link to.

TYPE: tuple[SubscribableEventType, ...]

event_linker

The type of event linker used for the actual subscription.

TYPE: _SubCtxE

force_async

Determines whether to force all callbacks to run asynchronously.

TYPE: bool

once

Specifies if the event subscriber will be a one-time subscription.

TYPE: bool

is_stateful

A flag indicating whether the context preserves its state (stateful) or not (stateless) after exiting the subscription context. If True, the context retains its state, allowing access to stored objects, including the event linker and the subscriber object. If False, the context is stateless, and the stored state is cleared upon exiting the subscription context to prevent memory leaks.

TYPE: bool

Source code in pyventus/events/linkers/event_linker.py
def __init__(
    self,
    events: tuple[SubscribableEventType, ...],
    event_linker: _SubCtxE,
    force_async: bool,
    once: bool,
    is_stateful: bool,
) -> None:
    """
    Initialize an instance of `EventLinkerSubCtx`.

    :param events: The events to subscribe/link to.
    :param event_linker: The type of event linker used for the actual subscription.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
    :param once: Specifies if the event subscriber will be a one-time subscription.
    :param is_stateful: A flag indicating whether the context preserves its state (stateful) or
        not (stateless) after exiting the subscription context. If `True`, the context retains its
        state, allowing access to stored objects, including the `event linker` and the `subscriber`
        object. If `False`, the context is stateless, and the stored state is cleared upon exiting
        the subscription context to prevent memory leaks.
    """
    # Initialize the base SubscriptionContext class
    super().__init__(source=event_linker, is_stateful=is_stateful)

    # Initialize variables
    self.__events: tuple[SubscribableEventType, ...] = events
    self.__event_callback: EventCallbackType | None = None
    self.__success_callback: SuccessCallbackType | None = None
    self.__failure_callback: FailureCallbackType | None = None
    self.__force_async: bool = force_async
    self.__once: bool = once
on_event
on_event(callback: EventCallbackType) -> EventCallbackType

Set the main callback for the event.

PARAMETER DESCRIPTION
callback

The callback to be executed when the event occurs.

TYPE: EventCallbackType

RETURNS DESCRIPTION
EventCallbackType

The decorated callback.

Source code in pyventus/events/linkers/event_linker.py
def on_event(self, callback: EventCallbackType) -> EventCallbackType:
    """
    Set the main callback for the event.

    :param callback: The callback to be executed when the event occurs.
    :return: The decorated callback.
    """
    self.__event_callback = callback
    return callback
on_success
on_success(callback: SuccessCallbackType) -> SuccessCallbackType

Set the success callback for the event.

PARAMETER DESCRIPTION
callback

The callback to be executed when the event response completes successfully.

TYPE: SuccessCallbackType

RETURNS DESCRIPTION
SuccessCallbackType

The decorated callback.

Source code in pyventus/events/linkers/event_linker.py
def on_success(self, callback: SuccessCallbackType) -> SuccessCallbackType:
    """
    Set the success callback for the event.

    :param callback: The callback to be executed when the event response completes successfully.
    :return: The decorated callback.
    """
    self.__success_callback = callback
    return callback
on_failure
on_failure(callback: FailureCallbackType) -> FailureCallbackType

Set the failure callback for the event.

PARAMETER DESCRIPTION
callback

The callback to be executed when the event response fails.

TYPE: FailureCallbackType

RETURNS DESCRIPTION
FailureCallbackType

The decorated callback.

Source code in pyventus/events/linkers/event_linker.py
def on_failure(self, callback: FailureCallbackType) -> FailureCallbackType:
    """
    Set the failure callback for the event.

    :param callback: The callback to be executed when the event response fails.
    :return: The decorated callback.
    """
    self.__failure_callback = callback
    return callback
__call__
__call__(callback: EventCallbackType) -> tuple[EventCallbackType, EventLinkerSubCtx[_SubCtxE]] | EventCallbackType

Subscribe the decorated callback to the specified events.

PARAMETER DESCRIPTION
callback

The callback to be executed when the event occurs.

TYPE: EventCallbackType

RETURNS DESCRIPTION
tuple[EventCallbackType, EventLinkerSubCtx[_SubCtxE]] | EventCallbackType

A tuple containing the decorated callback and its subscription context if the context is stateful; otherwise, returns the decorated callback alone.

Source code in pyventus/events/linkers/event_linker.py
def __call__(
    self, callback: EventCallbackType
) -> tuple[EventCallbackType, "EventLinker.EventLinkerSubCtx[_SubCtxE]"] | EventCallbackType:
    """
    Subscribe the decorated callback to the specified events.

    :param callback: The callback to be executed when the event occurs.
    :return: A tuple containing the decorated callback and its subscription context
        if the context is stateful; otherwise, returns the decorated callback alone.
    """
    # Store the provided callback as the event callback
    self.__event_callback = callback

    # Set success and failure callbacks to None
    self.__success_callback = None
    self.__failure_callback = None

    # Determine if the subscription context is stateful
    is_stateful: bool = self._is_stateful

    # Call the exit method to finalize the
    # subscription process and clean up any necessary context.
    self.__exit__(None, None, None)

    # Return a tuple containing the decorated callback
    # and the current subscription context if the context
    # is stateful; otherwise, return just the callback.
    return (callback, self) if is_stateful else callback

Functions

__init_subclass__

__init_subclass__(max_subscribers: int | None = None, default_success_callback: SuccessCallbackType | None = None, default_failure_callback: FailureCallbackType | None = None, debug: bool | None = None) -> None

Initialize a subclass of EventLinker.

By default, this method sets up the main registry and the thread lock object, but it can also be used to configure specific settings of the EventLinker subclass.

PARAMETER DESCRIPTION
max_subscribers

The maximum number of subscribers allowed per event, or None if there is no limit.

TYPE: int | None DEFAULT: None

default_success_callback

The default callback to assign as the success callback in the subscribers when no specific success callback is provided.

TYPE: SuccessCallbackType | None DEFAULT: None

default_failure_callback

The default callback to assign as the failure callback in the subscribers when no specific failure callback is provided.

TYPE: FailureCallbackType | None DEFAULT: None

debug

Specifies the debug mode for the subclass logger. If None, it is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

RETURNS DESCRIPTION
None

None.

RAISES DESCRIPTION
PyventusException

If max_subscribers is less than 1 or if the provided callbacks are invalid.

Source code in pyventus/events/linkers/event_linker.py
def __init_subclass__(
    cls,
    max_subscribers: int | None = None,
    default_success_callback: SuccessCallbackType | None = None,
    default_failure_callback: FailureCallbackType | None = None,
    debug: bool | None = None,
) -> None:
    """
    Initialize a subclass of `EventLinker`.

    By default, this method sets up the main registry and the thread lock object, but
    it can also be used to configure specific settings of the `EventLinker` subclass.

    :param max_subscribers: The maximum number of subscribers allowed per event, or `None` if there is no limit.
    :param default_success_callback: The default callback to assign as the success callback in the subscribers
        when no specific success callback is provided.
    :param default_failure_callback: The default callback to assign as the failure callback in the subscribers
        when no specific failure callback is provided.
    :param debug: Specifies the debug mode for the subclass logger. If `None`, it is determined based
        on the execution environment.
    :raises PyventusException: If `max_subscribers` is less than 1 or if the provided
        callbacks are invalid.
    :return: None.
    """
    # Initialize the main registry
    cls.__registry = MultiBidict[str, EventSubscriber]()

    # Create a lock object for thread synchronization
    cls.__thread_lock = Lock()

    # Validate the max_subscribers argument
    if max_subscribers is not None and max_subscribers < 1:
        raise PyventusException("The 'max_subscribers' argument must be greater than or equal to 1.")

    # Set the maximum number of subscribers per event
    cls.__max_subscribers = max_subscribers

    # Validate the default success callback, if any
    if default_success_callback is not None:
        validate_callable(default_success_callback)

    # Set the default success callback
    cls.__default_success_callback = default_success_callback

    # Validate the default failure callback, if any
    if default_failure_callback is not None:
        validate_callable(default_failure_callback)

    # Set the default failure callback
    cls.__default_failure_callback = default_failure_callback

    # Validate the debug argument
    if debug is not None and not isinstance(debug, bool):
        raise PyventusException("The 'debug' argument must be a boolean value.")

    # Set up the logger
    cls.__logger = Logger(source=cls, debug=debug if debug is not None else bool(gettrace() is not None))

get_valid_event_name classmethod

get_valid_event_name(event: SubscribableEventType) -> str

Determine the name of the event and performs validation.

PARAMETER DESCRIPTION
event

The event to obtain the name for.

TYPE: SubscribableEventType

RETURNS DESCRIPTION
str

A string that represents the event name.

RAISES DESCRIPTION
PyventusException

If the event argument is invalid or if the event is not supported.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_valid_event_name(cls, event: SubscribableEventType) -> str:
    """
    Determine the name of the event and performs validation.

    :param event: The event to obtain the name for.
    :return: A string that represents the event name.
    :raises PyventusException: If the `event` argument is invalid
        or if the event is not supported.
    """
    # Validate the event argument
    if event is None:
        raise PyventusException("The 'event' argument cannot be None.")

    if event is Ellipsis:
        # If the event is Ellipsis, return its type name
        return type(event).__name__
    elif isinstance(event, str):
        if not event:
            raise PyventusException("String events cannot be empty.")
        # If the event is a non-empty string, return it as the event name
        return event
    elif isinstance(event, type):
        if not is_dataclass(event) and not issubclass(event, Exception):
            raise PyventusException("Type events must be either a dataclass or an exception.")
        # If the event is either a dataclass type or an exception type, return its type name
        return event.__name__
    else:
        # If the event is not supported, raise an exception
        raise PyventusException("Unsupported event")

get_valid_subscriber classmethod

get_valid_subscriber(subscriber: EventSubscriber) -> EventSubscriber

Validate and return the specified subscriber.

PARAMETER DESCRIPTION
subscriber

The subscriber to validate.

TYPE: EventSubscriber

RETURNS DESCRIPTION
EventSubscriber

The validated subscriber.

RAISES DESCRIPTION
PyventusException

If the subscriber is not an instance of EventSubscriber.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_valid_subscriber(cls, subscriber: EventSubscriber) -> EventSubscriber:
    """
    Validate and return the specified subscriber.

    :param subscriber: The subscriber to validate.
    :return: The validated subscriber.
    :raises PyventusException: If the subscriber is not an instance of `EventSubscriber`.
    """
    # Validate that the subscriber is an instance of EventSubscriber
    if not isinstance(subscriber, EventSubscriber):
        raise PyventusException("The 'subscriber' argument must be an instance of EventSubscriber.")
    return subscriber

get_max_subscribers classmethod

get_max_subscribers() -> int | None

Retrieve the maximum number of subscribers allowed per event.

RETURNS DESCRIPTION
int | None

The maximum number of subscribers or None if there is no limit.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_max_subscribers(cls) -> int | None:
    """
    Retrieve the maximum number of subscribers allowed per event.

    :return: The maximum number of subscribers or `None` if there is no limit.
    """
    return cls.__max_subscribers

get_default_success_callback classmethod

get_default_success_callback() -> SuccessCallbackType | None

Retrieve the default success callback to assign to subscribers when no specific callback is provided.

RETURNS DESCRIPTION
SuccessCallbackType | None

The default success callback or None if not set.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_default_success_callback(cls) -> SuccessCallbackType | None:
    """
    Retrieve the default success callback to assign to subscribers when no specific callback is provided.

    :return: The default success callback or `None` if not set.
    """
    return cls.__default_success_callback

get_default_failure_callback classmethod

get_default_failure_callback() -> FailureCallbackType | None

Retrieve the default failure callback to assign to subscribers when no specific callback is provided.

RETURNS DESCRIPTION
FailureCallbackType | None

The default failure callback or None if not set.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_default_failure_callback(cls) -> FailureCallbackType | None:
    """
    Retrieve the default failure callback to assign to subscribers when no specific callback is provided.

    :return: The default failure callback or `None` if not set.
    """
    return cls.__default_failure_callback

is_empty classmethod

is_empty() -> bool

Determine whether the main registry is empty.

RETURNS DESCRIPTION
bool

True if the main registry is empty, False otherwise.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def is_empty(cls) -> bool:
    """
    Determine whether the main registry is empty.

    :return: `True` if the main registry is empty, `False` otherwise.
    """
    with cls.__thread_lock:
        return cls.__registry.is_empty

get_registry classmethod

get_registry() -> dict[str, set[EventSubscriber]]

Retrieve a shallow copy of the main registry.

RETURNS DESCRIPTION
dict[str, set[EventSubscriber]]

A shallow copy of the main registry, where each event is mapped to a set of its linked subscribers.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_registry(cls) -> dict[str, set[EventSubscriber]]:
    """
    Retrieve a shallow copy of the main registry.

    :return: A shallow copy of the main registry, where each
        event is mapped to a set of its linked subscribers.
    """
    with cls.__thread_lock:
        return cls.__registry.to_dict()

get_events classmethod

get_events() -> set[str]

Retrieve all registered events.

RETURNS DESCRIPTION
set[str]

A set of all registered event names.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_events(cls) -> set[str]:
    """
    Retrieve all registered events.

    :return: A set of all registered event names.
    """
    with cls.__thread_lock:
        return cls.__registry.keys

get_subscribers classmethod

get_subscribers() -> set[EventSubscriber]

Retrieve all registered subscribers.

RETURNS DESCRIPTION
set[EventSubscriber]

A set of all registered subscribers.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_subscribers(cls) -> set[EventSubscriber]:
    """
    Retrieve all registered subscribers.

    :return: A set of all registered subscribers.
    """
    with cls.__thread_lock:
        return cls.__registry.values

get_event_count classmethod

get_event_count() -> int

Retrieve the number of registered events.

RETURNS DESCRIPTION
int

The total count of events in the registry.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_event_count(cls) -> int:
    """
    Retrieve the number of registered events.

    :return: The total count of events in the registry.
    """
    with cls.__thread_lock:
        return cls.__registry.key_count

get_subscriber_count classmethod

get_subscriber_count() -> int

Retrieve the number of registered subscribers.

RETURNS DESCRIPTION
int

The total count of subscribers in the registry.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_subscriber_count(cls) -> int:
    """
    Retrieve the number of registered subscribers.

    :return: The total count of subscribers in the registry.
    """
    with cls.__thread_lock:
        return cls.__registry.value_count

get_events_from_subscribers classmethod

get_events_from_subscribers(*subscribers: EventSubscriber) -> set[str]

Retrieve a set of events associated with the specified subscribers.

PARAMETER DESCRIPTION
subscribers

One or more subscribers for which to retrieve associated events.

TYPE: EventSubscriber DEFAULT: ()

RETURNS DESCRIPTION
set[str]

A set of events linked to the specified subscribers. Unregistered subscribers are ignored.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_events_from_subscribers(cls, *subscribers: EventSubscriber) -> set[str]:
    """
    Retrieve a set of events associated with the specified subscribers.

    :param subscribers: One or more subscribers for which to retrieve associated events.
    :return: A set of events linked to the specified subscribers. Unregistered subscribers are ignored.
    """
    # Validate and retrieve all unique subscribers to avoid duplicate processing
    unique_subscribers: set[EventSubscriber] = cls._get_valid_and_unique_subscribers(subscribers)

    # Return the set of event names associated with the unique subscribers
    with cls.__thread_lock:
        return cls.__registry.get_keys_from_values(unique_subscribers)

get_subscribers_from_events classmethod

get_subscribers_from_events(*events: SubscribableEventType, pop_onetime_subscribers: bool = False) -> set[EventSubscriber]

Retrieve a set of subscribers associated with the specified events.

PARAMETER DESCRIPTION
events

One or more events for which to retrieve associated subscribers.

TYPE: SubscribableEventType DEFAULT: ()

pop_onetime_subscribers

If True, removes one-time subscribers (those with the property once set to True) from the registry.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
set[EventSubscriber]

A set of subscribers linked to the specified events. Unregistered events are ignored.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_subscribers_from_events(
    cls, *events: SubscribableEventType, pop_onetime_subscribers: bool = False
) -> set[EventSubscriber]:
    """
    Retrieve a set of subscribers associated with the specified events.

    :param events: One or more events for which to retrieve associated subscribers.
    :param pop_onetime_subscribers: If `True`, removes one-time subscribers (those
        with the property `once` set to True) from the registry.
    :return: A set of subscribers linked to the specified events. Unregistered events are ignored.
    """
    # Validate and retrieve all unique event names to avoid duplicate processing
    unique_events: set[str] = cls._get_valid_and_unique_event_names(events)

    # Acquire lock to ensure thread safety
    with cls.__thread_lock:
        # Retrieve subscribers associated with the unique events
        subscribers: set[EventSubscriber] = cls.__registry.get_values_from_keys(unique_events)

        # Just return subscribers if pop_one_time_subscribers is False
        if not pop_onetime_subscribers:
            return subscribers

        # Remove one-time subscribers from the registry
        for subscriber in subscribers:
            if subscriber.once:
                cls.__registry.remove_value(subscriber)

    # Return the set of subscribers
    return subscribers

get_event_count_from_subscriber classmethod

get_event_count_from_subscriber(subscriber: EventSubscriber) -> int

Retrieve the number of events associated with the given subscriber.

PARAMETER DESCRIPTION
subscriber

The subscriber for which to count the associated events.

TYPE: EventSubscriber

RETURNS DESCRIPTION
int

The count of events associated with the specified subscriber, or 0 if the subscriber is not found.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_event_count_from_subscriber(cls, subscriber: EventSubscriber) -> int:
    """
    Retrieve the number of events associated with the given subscriber.

    :param subscriber: The subscriber for which to count the associated events.
    :return: The count of events associated with the specified subscriber,
        or 0 if the subscriber is not found.
    """
    valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)
    with cls.__thread_lock:
        return cls.__registry.get_key_count_from_value(valid_subscriber)

get_subscriber_count_from_event classmethod

get_subscriber_count_from_event(event: SubscribableEventType) -> int

Retrieve the number of subscribers associated with a given event.

PARAMETER DESCRIPTION
event

The event for which to count the associated subscribers.

TYPE: SubscribableEventType

RETURNS DESCRIPTION
int

The count of subscribers associated with the specified event, or 0 if the event is not found.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def get_subscriber_count_from_event(cls, event: SubscribableEventType) -> int:
    """
    Retrieve the number of subscribers associated with a given event.

    :param event: The event for which to count the associated subscribers.
    :return: The count of subscribers associated with the specified event,
        or 0 if the event is not found.
    """
    valid_event: str = cls.get_valid_event_name(event)
    with cls.__thread_lock:
        return cls.__registry.get_value_count_from_key(valid_event)

contains_event classmethod

contains_event(event: SubscribableEventType) -> bool

Determine if the specified event is present in the registry.

PARAMETER DESCRIPTION
event

The event to be checked.

TYPE: SubscribableEventType

RETURNS DESCRIPTION
bool

True if the event is found; False otherwise.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def contains_event(cls, event: SubscribableEventType) -> bool:
    """
    Determine if the specified event is present in the registry.

    :param event: The event to be checked.
    :return: `True` if the event is found; `False` otherwise.
    """
    valid_event: str = cls.get_valid_event_name(event)
    with cls.__thread_lock:
        return cls.__registry.contains_key(valid_event)

contains_subscriber classmethod

contains_subscriber(subscriber: EventSubscriber) -> bool

Determine if the specified subscriber is present in the registry.

PARAMETER DESCRIPTION
subscriber

The subscriber to be checked.

TYPE: EventSubscriber

RETURNS DESCRIPTION
bool

True if the subscriber is found; False otherwise.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def contains_subscriber(cls, subscriber: EventSubscriber) -> bool:
    """
    Determine if the specified subscriber is present in the registry.

    :param subscriber: The subscriber to be checked.
    :return: `True` if the subscriber is found; `False` otherwise.
    """
    valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)
    with cls.__thread_lock:
        return cls.__registry.contains_value(valid_subscriber)

are_linked classmethod

are_linked(event: SubscribableEventType, subscriber: EventSubscriber) -> bool

Determine whether the given event is linked with the specified subscriber.

PARAMETER DESCRIPTION
event

The event for which the association is being checked.

TYPE: SubscribableEventType

subscriber

The subscriber for which the association is being checked.

TYPE: EventSubscriber

RETURNS DESCRIPTION
bool

True if the subscriber is linked to the event; False otherwise.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def are_linked(cls, event: SubscribableEventType, subscriber: EventSubscriber) -> bool:
    """
    Determine whether the given event is linked with the specified subscriber.

    :param event: The event for which the association is being checked.
    :param subscriber: The subscriber for which the association is being checked.
    :return: `True` if the subscriber is linked to the event; `False` otherwise.
    """
    valid_event: str = cls.get_valid_event_name(event)
    valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)
    with cls.__thread_lock:
        return cls.__registry.are_associated(valid_event, valid_subscriber)

once classmethod

once(*events: SubscribableEventType, force_async: bool = False, stateful_subctx: bool = False) -> EventLinkerSubCtx[type[Self]]

Subscribe callbacks to the specified events for a single invocation.

This method can be used as either a decorator or a context manager. When used as a decorator, it automatically subscribes the decorated callback to the provided events. When used as a context manager with the with statement, it allows multiple callbacks to be associated with the provided events within the context block.

PARAMETER DESCRIPTION
events

The events to subscribe to.

TYPE: SubscribableEventType DEFAULT: ()

force_async

Determines whether to force all callbacks to run asynchronously. If True, synchronous callbacks will be converted to run asynchronously in a thread pool, using the asyncio.to_thread function. If False, callbacks will run synchronously or asynchronously as defined.

TYPE: bool DEFAULT: False

stateful_subctx

A flag indicating whether the subscription context preserves its state (stateful) or not (stateless) after exiting the subscription block. If True, the context retains its state, allowing access to stored objects, including the event linker and the subscriber object. If False, the context is stateless, and the stored state is cleared upon exiting the subscription block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
EventLinkerSubCtx[type[Self]]

A EventLinkerSubCtx instance.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def once(
    cls, *events: SubscribableEventType, force_async: bool = False, stateful_subctx: bool = False
) -> EventLinkerSubCtx[type[Self]]:
    """
    Subscribe callbacks to the specified events for a single invocation.

    This method can be used as either a decorator or a context manager. When used as a
    decorator, it automatically subscribes the decorated callback to the provided events.
    When used as a context manager with the `with` statement, it allows multiple callbacks
    to be associated with the provided events within the context block.

    :param events: The events to subscribe to.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
        If `True`, synchronous callbacks will be converted to run asynchronously in a
        thread pool, using the `asyncio.to_thread` function. If `False`, callbacks
        will run synchronously or asynchronously as defined.
    :param stateful_subctx: A flag indicating whether the subscription context preserves its state
        (`stateful`) or not (`stateless`) after exiting the subscription block. If `True`, the context retains
        its state, allowing access to stored objects, including the `event linker` and the `subscriber` object.
        If `False`, the context is stateless, and the stored state is cleared upon exiting the subscription
        block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
    :return: A `EventLinkerSubCtx` instance.
    """
    return EventLinker.EventLinkerSubCtx[type[Self]](
        events=events, event_linker=cls, force_async=force_async, once=True, is_stateful=stateful_subctx
    )

on classmethod

on(*events: SubscribableEventType, force_async: bool = False, stateful_subctx: bool = False) -> EventLinkerSubCtx[type[Self]]

Subscribe callbacks to the specified events.

This method can be used as either a decorator or a context manager. When used as a decorator, it automatically subscribes the decorated callback to the provided events. When used as a context manager with the with statement, it allows multiple callbacks to be associated with the provided events within the context block.

PARAMETER DESCRIPTION
events

The events to subscribe to.

TYPE: SubscribableEventType DEFAULT: ()

force_async

Determines whether to force all callbacks to run asynchronously. If True, synchronous callbacks will be converted to run asynchronously in a thread pool, using the asyncio.to_thread function. If False, callbacks will run synchronously or asynchronously as defined.

TYPE: bool DEFAULT: False

stateful_subctx

A flag indicating whether the subscription context preserves its state (stateful) or not (stateless) after exiting the subscription block. If True, the context retains its state, allowing access to stored objects, including the event linker and the subscriber object. If False, the context is stateless, and the stored state is cleared upon exiting the subscription block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
EventLinkerSubCtx[type[Self]]

A EventLinkerSubCtx instance.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def on(
    cls, *events: SubscribableEventType, force_async: bool = False, stateful_subctx: bool = False
) -> EventLinkerSubCtx[type[Self]]:
    """
    Subscribe callbacks to the specified events.

    This method can be used as either a decorator or a context manager. When used as a
    decorator, it automatically subscribes the decorated callback to the provided events.
    When used as a context manager with the `with` statement, it allows multiple callbacks
    to be associated with the provided events within the context block.

    :param events: The events to subscribe to.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
        If `True`, synchronous callbacks will be converted to run asynchronously in a
        thread pool, using the `asyncio.to_thread` function. If `False`, callbacks
        will run synchronously or asynchronously as defined.
    :param stateful_subctx: A flag indicating whether the subscription context preserves its state
        (`stateful`) or not (`stateless`) after exiting the subscription block. If `True`, the context retains
        its state, allowing access to stored objects, including the `event linker` and the `subscriber` object.
        If `False`, the context is stateless, and the stored state is cleared upon exiting the subscription
        block to prevent memory leaks. The term 'subctx' refers to 'Subscription Context'.
    :return: A `EventLinkerSubCtx` instance.
    """
    return EventLinker.EventLinkerSubCtx[type[Self]](
        events=events, event_linker=cls, force_async=force_async, once=False, is_stateful=stateful_subctx
    )

subscribe classmethod

subscribe(*events: SubscribableEventType, event_callback: EventCallbackType, success_callback: SuccessCallbackType | None = None, failure_callback: FailureCallbackType | None = None, force_async: bool = False, once: bool = False) -> EventSubscriber

Subscribe the specified callbacks to the given events.

PARAMETER DESCRIPTION
events

The events to subscribe to.

TYPE: SubscribableEventType DEFAULT: ()

event_callback

The callback to be executed when the event occurs.

TYPE: EventCallbackType

success_callback

The callback to be executed when the event response completes successfully.

TYPE: SuccessCallbackType | None DEFAULT: None

failure_callback

The callback to be executed when the event response fails.

TYPE: FailureCallbackType | None DEFAULT: None

force_async

Determines whether to force all callbacks to run asynchronously. If True, synchronous callbacks will be converted to run asynchronously in a thread pool, using the asyncio.to_thread function. If False, callbacks will run synchronously or asynchronously as defined.

TYPE: bool DEFAULT: False

once

Specifies if the subscriber will be a one-time subscriber.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
EventSubscriber

The subscriber associated with the given events.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def subscribe(
    cls,
    *events: SubscribableEventType,
    event_callback: EventCallbackType,
    success_callback: SuccessCallbackType | None = None,
    failure_callback: FailureCallbackType | None = None,
    force_async: bool = False,
    once: bool = False,
) -> EventSubscriber:
    """
    Subscribe the specified callbacks to the given events.

    :param events: The events to subscribe to.
    :param event_callback: The callback to be executed when the event occurs.
    :param success_callback: The callback to be executed when the event response completes successfully.
    :param failure_callback: The callback to be executed when the event response fails.
    :param force_async: Determines whether to force all callbacks to run asynchronously.
        If `True`, synchronous callbacks will be converted to run asynchronously in a
        thread pool, using the `asyncio.to_thread` function. If `False`, callbacks
        will run synchronously or asynchronously as defined.
    :param once: Specifies if the subscriber will be a one-time subscriber.
    :return: The subscriber associated with the given events.
    """
    # Validate and retrieve all unique event names
    unique_events: set[str] = cls._get_valid_and_unique_event_names(events)

    # Acquire the lock to ensure exclusive access to the main registry
    with cls.__thread_lock:
        # Check if the maximum number of subscribers is set
        if cls.__max_subscribers is not None:
            # For each event name, check if the maximum number
            # of subscribers for the event has been exceeded
            for event in unique_events:
                if cls.__registry.get_value_count_from_key(event) >= cls.__max_subscribers:
                    raise PyventusException(
                        f"The event '{event}' has exceeded the maximum number of subscribers allowed."
                    )

        # Create a new event subscriber
        subscriber: EventSubscriber = EventSubscriber(
            teardown_callback=cls.remove_subscriber,
            event_callback=event_callback,
            success_callback=success_callback if success_callback else cls.__default_success_callback,
            failure_callback=failure_callback if failure_callback else cls.__default_failure_callback,
            force_async=force_async,
            once=once,
        )

        # Register the subscriber for each unique event
        for event in unique_events:
            cls.__registry.insert(event, subscriber)

    # Log the subscription if debug is enabled
    if cls.__logger.debug_enabled:
        cls.__logger.debug(
            action="Subscribed:", msg=f"{subscriber} {StdOutColors.PURPLE_TEXT('Events:')} {unique_events}"
        )

    # Return the new event subscriber
    return subscriber

remove classmethod

remove(event: SubscribableEventType, subscriber: EventSubscriber) -> bool

Remove the specified subscriber from the given event.

PARAMETER DESCRIPTION
event

The event from which the subscriber will be removed.

TYPE: SubscribableEventType

subscriber

The subscriber to be removed from the event.

TYPE: EventSubscriber

RETURNS DESCRIPTION
bool

True if the subscriber was successfully removed; False if no removal occurred due to the event or subscriber not being registered, or if they are not linked.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def remove(cls, event: SubscribableEventType, subscriber: EventSubscriber) -> bool:
    """
    Remove the specified subscriber from the given event.

    :param event: The event from which the subscriber will be removed.
    :param subscriber: The subscriber to be removed from the event.
    :return: `True` if the subscriber was successfully removed; `False` if
        no removal occurred due to the event or subscriber not being registered,
        or if they are not linked.
    """
    # Validate the given event and subscriber
    valid_event: str = cls.get_valid_event_name(event)
    valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)

    # Acquire lock to ensure thread safety
    with cls.__thread_lock:
        # Check if the event and subscriber are registered and linked
        if not cls.__registry.are_associated(valid_event, valid_subscriber):
            return False

        # Remove the subscriber from the event
        cls.__registry.remove(valid_event, valid_subscriber)

    # Log the removal if the debug mode is enabled
    if cls.__logger.debug_enabled:
        cls.__logger.debug(
            action="Removed:", msg=f"{valid_subscriber} {StdOutColors.PURPLE_TEXT('Event:')} '{valid_event}'"
        )

    return True

remove_event classmethod

remove_event(event: SubscribableEventType) -> bool

Remove the specified event from the registry.

PARAMETER DESCRIPTION
event

The event to be removed from the registry.

TYPE: SubscribableEventType

RETURNS DESCRIPTION
bool

True if the event was successfully removed; False if the event was not found in the registry.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def remove_event(cls, event: SubscribableEventType) -> bool:
    """
    Remove the specified event from the registry.

    :param event: The event to be removed from the registry.
    :return: `True` if the event was successfully removed; `False`
        if the event was not found in the registry.
    """
    # Get the valid event name
    valid_event: str = cls.get_valid_event_name(event)

    # Acquire lock to ensure thread safety
    with cls.__thread_lock:
        # Check if the event is registered; return False if not
        if not cls.__registry.contains_key(valid_event):
            return False

        # Remove the event from the registry
        cls.__registry.remove_key(valid_event)

    # Log the removal if the debug mode is enabled
    if cls.__logger.debug_enabled:
        cls.__logger.debug(action="Removed:", msg=f"Event: '{valid_event}'")

    return True

remove_subscriber classmethod

remove_subscriber(subscriber: EventSubscriber) -> bool

Remove the specified subscriber from the registry.

PARAMETER DESCRIPTION
subscriber

The subscriber to be removed from the registry.

TYPE: EventSubscriber

RETURNS DESCRIPTION
bool

True if the subscriber was successfully removed; False if the subscriber was not found in the registry.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def remove_subscriber(cls, subscriber: EventSubscriber) -> bool:
    """
    Remove the specified subscriber from the registry.

    :param subscriber: The subscriber to be removed from the registry.
    :return: `True` if the subscriber was successfully removed; `False` if
        the subscriber was not found in the registry.
    """
    # Get the valid subscriber instance
    valid_subscriber: EventSubscriber = cls.get_valid_subscriber(subscriber)

    # Acquire lock to ensure thread safety
    with cls.__thread_lock:
        # Check if the subscriber is registered; return False if not
        if not cls.__registry.contains_value(valid_subscriber):
            return False

        # Remove the subscriber from the registry
        cls.__registry.remove_value(valid_subscriber)

    # Log the removal if the debug mode is enabled
    if cls.__logger.debug_enabled:
        cls.__logger.debug(action="Removed:", msg=f"{valid_subscriber}")

    return True

remove_all classmethod

remove_all() -> bool

Remove all events and subscribers from the registry.

RETURNS DESCRIPTION
bool

True if the registry was successfully cleared; False if the registry was already empty.

Source code in pyventus/events/linkers/event_linker.py
@classmethod
def remove_all(cls) -> bool:
    """
    Remove all events and subscribers from the registry.

    :return: `True` if the registry was successfully cleared; `False`
        if the registry was already empty.
    """
    # Acquire lock to ensure thread safety
    with cls.__thread_lock:
        # Check if the registry is already empty
        if cls.__registry.is_empty:
            return False

        # Clear the registry
        cls.__registry.clear()

    if cls.__logger.debug_enabled:
        cls.__logger.debug(action="Removed:", msg="All events and subscribers.")

    return True