Skip to content

ExecutorEventEmitter class

Bases: EventEmitter

An event emitter subclass that utilizes the concurrent.futures Executor base class to handle the execution of event emissions. It can work with either ThreadPoolExecutor for thread-based execution or ProcessPoolExecutor for process-based execution.

Notes:

  • When using this event emitter, it is important to properly manage the underlying Executor. Once you have finished emitting events, call the shutdown() method to signal the executor to free any resources for pending futures. You can avoid the need to call this method explicitly by using the with statement, which will automatically shut down the Executor (waiting as if Executor.shutdown() were called with wait set to True).

Read more in the Pyventus docs for Executor Event Emitter.

Source code in pyventus/emitters/executor/executor_event_emitter.py
class ExecutorEventEmitter(EventEmitter):
    """
    An event emitter subclass that utilizes the `concurrent.futures` Executor base class to
    handle the execution of event emissions. It can work with either `ThreadPoolExecutor`
    for thread-based execution or `ProcessPoolExecutor` for process-based execution.

    **Notes:**

    -   When using this event emitter, it is important to properly manage the underlying `Executor`.
        Once you have finished emitting events, call the `shutdown()` method to signal the executor to
        free any resources for pending futures. You can avoid the need to call this method explicitly
        by using the `with` statement, which will automatically shut down the `Executor` (waiting as
        if `Executor.shutdown()` were called with `wait` set to `True`).

    ---
    Read more in the
    [Pyventus docs for Executor Event Emitter](https://mdapena.github.io/pyventus/tutorials/emitters/executor/).
    """

    @staticmethod
    def _callback(event_emission: EventEmitter.EventEmission) -> None:
        """
        This method is used as the callback function for the executor
        to process the event emission.
        :param event_emission: The event emission to be executed.
        :return: None
        """
        asyncio.run(event_emission())

    def __init__(
        self,
        executor: Executor = ThreadPoolExecutor(),
        event_linker: Type[EventLinker] = EventLinker,
        debug: bool | None = None,
    ) -> None:
        """
        Initialize an instance of `ExecutorEventEmitter`.
        :param executor: The executor object used to handle the execution of event
            emissions. Defaults to `ThreadPoolExecutor()`.
        :param event_linker: Specifies the type of event linker used to manage and access
            events along with their corresponding event handlers. Defaults to `EventLinker`.
        :param debug: Specifies the debug mode for the logger. If `None`, it is
            determined based on the execution environment.
        """
        # Call the parent class' __init__ method
        super().__init__(event_linker=event_linker, debug=debug)

        # Validate the executor argument
        if executor is None:
            raise PyventusException("The 'executor' argument cannot be None.")
        if not isinstance(executor, Executor):
            raise PyventusException("The 'executor' argument must be an instance of the Executor class.")

        # Set the executor object reference
        self._executor: Executor = executor

    def __enter__(self) -> "ExecutorEventEmitter":
        """
        Returns the current instance of `ExecutorEventEmitter` for context management.
        :return: The current instance of `ExecutorEventEmitter`.
        """
        return self

    def __exit__(
        self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
    ) -> None:
        """
        Cleans up the executor resources when exiting the context.
        :param exc_type: The exception type, if any.
        :param exc_val: The exception value, if any.
        :param exc_tb: The traceback information, if any.
        :return: None
        """
        self.shutdown(wait=True)

    def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None:
        """
        Shuts down the executor and frees any resources it is using.
        :param wait: A boolean indicating whether to wait for the currently pending futures
            to complete before shutting down.
        :param cancel_futures: A boolean indicating whether to cancel any pending futures.
        :return: None
        """
        self._executor.shutdown(wait=wait, cancel_futures=cancel_futures)

    def _process(self, event_emission: EventEmitter.EventEmission) -> None:
        # Submit the event emission to the executor
        self._executor.submit(ExecutorEventEmitter._callback, event_emission)

Functions

emit

emit(event: EmittableEventType, *args: Any, **kwargs: Any) -> None

Emits an event and triggers its associated event handlers.

Notes:

  • When emitting dataclass objects or Exception objects, they are automatically passed to the event handler as the first positional argument, even if you pass additional *args or **kwargs.
  • If there are event handlers subscribed to the global event ..., also known as Ellipsis, they will also be triggered each time an event or exception is emitted.
PARAMETER DESCRIPTION
event

The event to be emitted. It can be str, a dataclass object, or an Exception object.

TYPE: EmittableEventType

args

Positional arguments to be passed to the event handlers.

TYPE: Any DEFAULT: ()

kwargs

Keyword arguments to be passed to the event handlers.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

None

Source code in pyventus/emitters/event_emitter.py
def emit(self, /, event: EmittableEventType, *args: Any, **kwargs: Any) -> None:
    """
    Emits an event and triggers its associated event handlers.

    **Notes:**

    -   When emitting `dataclass` objects or `Exception` objects, they are automatically passed
        to the event handler as the first positional argument, even if you pass additional `*args`
        or `**kwargs`.
    -   If there are event handlers subscribed to the global event `...`, also known as `Ellipsis`,
        they will also be triggered each time an event or exception is emitted.

    :param event: The event to be emitted. It can be `str`, a `dataclass`
        object, or an `Exception` object.
    :param args: Positional arguments to be passed to the event handlers.
    :param kwargs: Keyword arguments to be passed to the event handlers.
    :return: None
    """
    # Raise an exception if the event is None
    if event is None:
        raise PyventusException("The 'event' argument cannot be None.")

    # Raise an exception if the event is a type
    if isinstance(event, type):
        raise PyventusException("The 'event' argument cannot be a type.")

    # Determine the event name
    event_name: str = self._event_linker.get_event_name(event=event if isinstance(event, str) else type(event))

    # Retrieve the event handlers associated with the event
    event_handlers: List[EventHandler] = self._event_linker.get_event_handlers_by_events(event_name, Ellipsis)

    # Sort the event handlers by timestamp
    event_handlers.sort(key=lambda handler: handler.timestamp)

    # Initialize the list of event handlers to be executed
    pending_event_handlers: List[EventHandler] = []

    # Iterate through each event handler
    for event_handler in event_handlers:
        # Check if the event handler is a one-time subscription
        if event_handler.once:
            # If the event handler is a one-time subscription, we attempt to remove it.
            if self._event_linker.remove_event_handler(event_handler=event_handler):  # pragma: no cover (Race-Cond)
                # If the removal is successful, it indicates that the handler has not
                # been processed before, so we add it to the pending list.
                pending_event_handlers.append(event_handler)
        else:
            pending_event_handlers.append(event_handler)

    # Check if the pending list of event handlers is not empty
    if len(pending_event_handlers) > 0:
        # Create a new EventEmission instance
        event_emission: EventEmitter.EventEmission = EventEmitter.EventEmission(
            event=event_name,
            event_handlers=pending_event_handlers,
            event_args=args if isinstance(event, str) else (event, *args),
            event_kwargs=kwargs,
            debug=self._logger.debug_enabled,
        )

        # Log the event emission when debug is enabled
        if self._logger.debug_enabled:  # pragma: no cover
            self._logger.debug(
                action="Emitting Event:",
                msg=f"{event_emission.event}{StdOutColors.PURPLE} ID:{StdOutColors.DEFAULT} {event_emission.id}",
            )

        # Delegate the event emission processing to subclasses
        self._process(event_emission)

    # Log a debug message if there are no event handlers subscribed to the event
    elif self._logger.debug_enabled:  # pragma: no cover
        self._logger.debug(
            action="Emitting Event:",
            msg=f"{event_name}{StdOutColors.PURPLE} Message:{StdOutColors.DEFAULT} No event handlers subscribed",
        )

__init__

__init__(executor: Executor = ThreadPoolExecutor(), event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None

Initialize an instance of ExecutorEventEmitter.

PARAMETER DESCRIPTION
executor

The executor object used to handle the execution of event emissions. Defaults to ThreadPoolExecutor().

TYPE: Executor DEFAULT: ThreadPoolExecutor()

event_linker

Specifies the type of event linker used to manage and access events along with their corresponding event handlers. Defaults to EventLinker.

TYPE: Type[EventLinker] DEFAULT: EventLinker

debug

Specifies the debug mode for the logger. If None, it is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/emitters/executor/executor_event_emitter.py
def __init__(
    self,
    executor: Executor = ThreadPoolExecutor(),
    event_linker: Type[EventLinker] = EventLinker,
    debug: bool | None = None,
) -> None:
    """
    Initialize an instance of `ExecutorEventEmitter`.
    :param executor: The executor object used to handle the execution of event
        emissions. Defaults to `ThreadPoolExecutor()`.
    :param event_linker: Specifies the type of event linker used to manage and access
        events along with their corresponding event handlers. Defaults to `EventLinker`.
    :param debug: Specifies the debug mode for the logger. If `None`, it is
        determined based on the execution environment.
    """
    # Call the parent class' __init__ method
    super().__init__(event_linker=event_linker, debug=debug)

    # Validate the executor argument
    if executor is None:
        raise PyventusException("The 'executor' argument cannot be None.")
    if not isinstance(executor, Executor):
        raise PyventusException("The 'executor' argument must be an instance of the Executor class.")

    # Set the executor object reference
    self._executor: Executor = executor

__enter__

__enter__() -> ExecutorEventEmitter

Returns the current instance of ExecutorEventEmitter for context management.

RETURNS DESCRIPTION
ExecutorEventEmitter

The current instance of ExecutorEventEmitter.

Source code in pyventus/emitters/executor/executor_event_emitter.py
def __enter__(self) -> "ExecutorEventEmitter":
    """
    Returns the current instance of `ExecutorEventEmitter` for context management.
    :return: The current instance of `ExecutorEventEmitter`.
    """
    return self

__exit__

__exit__(exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None

Cleans up the executor resources when exiting the context.

PARAMETER DESCRIPTION
exc_type

The exception type, if any.

TYPE: Type[BaseException] | None

exc_val

The exception value, if any.

TYPE: BaseException | None

exc_tb

The traceback information, if any.

TYPE: TracebackType | None

RETURNS DESCRIPTION
None

None

Source code in pyventus/emitters/executor/executor_event_emitter.py
def __exit__(
    self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
    """
    Cleans up the executor resources when exiting the context.
    :param exc_type: The exception type, if any.
    :param exc_val: The exception value, if any.
    :param exc_tb: The traceback information, if any.
    :return: None
    """
    self.shutdown(wait=True)

shutdown

shutdown(wait: bool = True, cancel_futures: bool = False) -> None

Shuts down the executor and frees any resources it is using.

PARAMETER DESCRIPTION
wait

A boolean indicating whether to wait for the currently pending futures to complete before shutting down.

TYPE: bool DEFAULT: True

cancel_futures

A boolean indicating whether to cancel any pending futures.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
None

None

Source code in pyventus/emitters/executor/executor_event_emitter.py
def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None:
    """
    Shuts down the executor and frees any resources it is using.
    :param wait: A boolean indicating whether to wait for the currently pending futures
        to complete before shutting down.
    :param cancel_futures: A boolean indicating whether to cancel any pending futures.
    :return: None
    """
    self._executor.shutdown(wait=wait, cancel_futures=cancel_futures)