Skip to content

AsyncIOEventEmitter class

Bases: EventEmitter

An event emitter subclass that utilizes the AsyncIO framework to handle the execution of event emissions.

Notes:

  • When used in an asynchronous context where an event loop is already running, the event emission is scheduled and processed on that existing loop. If the event loop is closed before all callbacks complete, any remaining scheduled callbacks will be canceled.

  • When used in a synchronous context where no event loop is active, a new event loop is started and subsequently closed by the asyncio.run() method. Within this loop, the event emission is executed. The loop then waits for all scheduled tasks to finish before closing.


Read more in the Pyventus docs for AsyncIO Event Emitter.

Source code in pyventus/emitters/asyncio/asyncio_event_emitter.py
class AsyncIOEventEmitter(EventEmitter):
    """
    An event emitter subclass that utilizes the AsyncIO framework to handle
    the execution of event emissions.

    **Notes:**

    -   When used in an asynchronous context where an event loop is already running,
        the event emission is scheduled and processed on that existing loop. If the
        event loop is closed before all callbacks complete, any remaining scheduled
        callbacks will be canceled.

    -   When used in a synchronous context where no event loop is active, a new event
        loop is started and subsequently closed by the `asyncio.run()` method. Within
        this loop, the event emission is executed. The loop then waits for all
        scheduled tasks to finish before closing.

    ---
    Read more in the
    [Pyventus docs for AsyncIO Event Emitter](https://mdapena.github.io/pyventus/tutorials/emitters/asyncio/).
    """

    @property
    def __is_loop_running(self) -> bool:
        """
        Check if there is currently an active AsyncIO event loop.
        :return: `True` if an event loop is running, `False` otherwise.
        """
        try:
            asyncio.get_running_loop()
            return True
        except RuntimeError:
            return False

    def __init__(self, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None:
        """
        Initialize an instance of `AsyncIOEventEmitter`.
        :param event_linker: Specifies the type of event linker used to manage and access
            events along with their corresponding event handlers. Defaults to `EventLinker`.
        :param debug: Specifies the debug mode for the logger. If `None`, it is
            determined based on the execution environment.
        """
        # Call the parent class' __init__ method
        super().__init__(event_linker=event_linker, debug=debug)

        # Initialize the set of background futures
        self._background_futures: Set[Future] = set()  # type: ignore[type-arg]

    def _process(self, event_emission: EventEmitter.EventEmission) -> None:
        # Check if there is an active event loop
        is_loop_running: bool = self.__is_loop_running

        # Log the execution context, if debug mode is enabled
        if self._logger.debug_enabled:  # pragma: no cover
            self._logger.debug(action=f"Context:", msg=f"{'Async' if is_loop_running else 'Sync'}")

        if is_loop_running:
            # Schedule the event emission in the running loop as a future
            future = asyncio.ensure_future(event_emission())

            # Remove the Future from the set of background futures after completion
            future.add_done_callback(self._background_futures.remove)

            # Add the Future to the set of background futures
            self._background_futures.add(future)
        else:
            # Run the event emission in a blocking manner
            asyncio.run(event_emission())

Functions

emit

emit(event: EmittableEventType, *args: Any, **kwargs: Any) -> None

Emits an event and triggers its associated event handlers.

Notes:

  • When emitting dataclass objects or Exception objects, they are automatically passed to the event handler as the first positional argument, even if you pass additional *args or **kwargs.
  • If there are event handlers subscribed to the global event ..., also known as Ellipsis, they will also be triggered each time an event or exception is emitted.
PARAMETER DESCRIPTION
event

The event to be emitted. It can be str, a dataclass object, or an Exception object.

TYPE: EmittableEventType

args

Positional arguments to be passed to the event handlers.

TYPE: Any DEFAULT: ()

kwargs

Keyword arguments to be passed to the event handlers.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

None

Source code in pyventus/emitters/event_emitter.py
def emit(self, /, event: EmittableEventType, *args: Any, **kwargs: Any) -> None:
    """
    Emits an event and triggers its associated event handlers.

    **Notes:**

    -   When emitting `dataclass` objects or `Exception` objects, they are automatically passed
        to the event handler as the first positional argument, even if you pass additional `*args`
        or `**kwargs`.
    -   If there are event handlers subscribed to the global event `...`, also known as `Ellipsis`,
        they will also be triggered each time an event or exception is emitted.

    :param event: The event to be emitted. It can be `str`, a `dataclass`
        object, or an `Exception` object.
    :param args: Positional arguments to be passed to the event handlers.
    :param kwargs: Keyword arguments to be passed to the event handlers.
    :return: None
    """
    # Raise an exception if the event is None
    if event is None:
        raise PyventusException("The 'event' argument cannot be None.")

    # Raise an exception if the event is a type
    if isinstance(event, type):
        raise PyventusException("The 'event' argument cannot be a type.")

    # Determine the event name
    event_name: str = self._event_linker.get_event_name(event=event if isinstance(event, str) else type(event))

    # Retrieve the event handlers associated with the event
    event_handlers: List[EventHandler] = self._event_linker.get_event_handlers_by_events(event_name, Ellipsis)

    # Sort the event handlers by timestamp
    event_handlers.sort(key=lambda handler: handler.timestamp)

    # Initialize the list of event handlers to be executed
    pending_event_handlers: List[EventHandler] = []

    # Iterate through each event handler
    for event_handler in event_handlers:
        # Check if the event handler is a one-time subscription
        if event_handler.once:
            # If the event handler is a one-time subscription, we attempt to remove it.
            if self._event_linker.remove_event_handler(event_handler=event_handler):  # pragma: no cover (Race-Cond)
                # If the removal is successful, it indicates that the handler has not
                # been processed before, so we add it to the pending list.
                pending_event_handlers.append(event_handler)
        else:
            pending_event_handlers.append(event_handler)

    # Check if the pending list of event handlers is not empty
    if len(pending_event_handlers) > 0:
        # Create a new EventEmission instance
        event_emission: EventEmitter.EventEmission = EventEmitter.EventEmission(
            event=event_name,
            event_handlers=pending_event_handlers,
            event_args=args if isinstance(event, str) else (event, *args),
            event_kwargs=kwargs,
            debug=self._logger.debug_enabled,
        )

        # Log the event emission when debug is enabled
        if self._logger.debug_enabled:  # pragma: no cover
            self._logger.debug(
                action="Emitting Event:",
                msg=f"{event_emission.event}{StdOutColors.PURPLE} ID:{StdOutColors.DEFAULT} {event_emission.id}",
            )

        # Delegate the event emission processing to subclasses
        self._process(event_emission)

    # Log a debug message if there are no event handlers subscribed to the event
    elif self._logger.debug_enabled:  # pragma: no cover
        self._logger.debug(
            action="Emitting Event:",
            msg=f"{event_name}{StdOutColors.PURPLE} Message:{StdOutColors.DEFAULT} No event handlers subscribed",
        )

__init__

__init__(event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None

Initialize an instance of AsyncIOEventEmitter.

PARAMETER DESCRIPTION
event_linker

Specifies the type of event linker used to manage and access events along with their corresponding event handlers. Defaults to EventLinker.

TYPE: Type[EventLinker] DEFAULT: EventLinker

debug

Specifies the debug mode for the logger. If None, it is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/emitters/asyncio/asyncio_event_emitter.py
def __init__(self, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None:
    """
    Initialize an instance of `AsyncIOEventEmitter`.
    :param event_linker: Specifies the type of event linker used to manage and access
        events along with their corresponding event handlers. Defaults to `EventLinker`.
    :param debug: Specifies the debug mode for the logger. If `None`, it is
        determined based on the execution environment.
    """
    # Call the parent class' __init__ method
    super().__init__(event_linker=event_linker, debug=debug)

    # Initialize the set of background futures
    self._background_futures: Set[Future] = set()  # type: ignore[type-arg]