Skip to content

FastAPIEventEmitter class

Bases: EventEmitter

An event emitter subclass that utilizes FastAPI's BackgroundTasks system to handle the execution of event emissions.

Notes:

  • It provides a convenient way to incorporate event-driven functionality into FastAPI apps.
  • This class offers a powerful mechanism for implementing asynchronous and decoupled operations in FastAPI, such as asynchronously sending emails in an event-driven manner.

Read more in the Pyventus docs for FastAPI Event Emitter.

Source code in pyventus/emitters/fastapi/fastapi_event_emitter.py
class FastAPIEventEmitter(EventEmitter):
    """
    An event emitter subclass that utilizes FastAPI's BackgroundTasks system
    to handle the execution of event emissions.

    **Notes:**

    -   It provides a convenient way to incorporate event-driven functionality
        into FastAPI apps.
    -   This class offers a powerful mechanism for implementing asynchronous
        and decoupled operations in FastAPI, such as asynchronously sending
        emails in an event-driven manner.

    ---
    Read more in the
    [Pyventus docs for FastAPI Event Emitter](https://mdapena.github.io/pyventus/tutorials/emitters/fastapi/).
    """

    @classmethod
    def options(
        cls, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None
    ) -> Callable[[BackgroundTasks], "FastAPIEventEmitter"]:
        """
        Returns a decorator that allows you to configure the `FastAPIEventEmitter` class
        when using FastAPI's `Depends` method.
        :param event_linker: Specifies the type of event linker used to manage and access
            events along with their corresponding event handlers. Defaults to `EventLinker`.
        :param debug: Specifies the debug mode for the logger. If `None`, it is
            determined based on the execution environment.
        :return: A decorator that can be used with the `Depends` method.
        """

        def wrapper(background_tasks: BackgroundTasks) -> "FastAPIEventEmitter":
            """
            A decorator wrapper function that configures the `FastAPIEventEmitter` class with
            the provided options.
            :param background_tasks: The FastAPI `BackgroundTasks` object used to handle
                the execution of event emissions.
            :return: An instance of `FastAPIEventEmitter` configured with the specified options.
            """
            return cls(background_tasks=background_tasks, event_linker=event_linker, debug=debug)

        return wrapper

    def __init__(
        self,
        background_tasks: BackgroundTasks,
        event_linker: Type[EventLinker] = EventLinker,
        debug: bool | None = None,
    ) -> None:
        """
        Initialize an instance of `FastAPIEventEmitter`.
        :param background_tasks: The FastAPI `BackgroundTasks` object used to handle
            the execution of event emissions.
        :param event_linker: Specifies the type of event linker to use for associating
            events with their respective event handlers. Defaults to `EventLinker`.
        :param debug: Specifies the debug mode for the logger. If `None`, it is
            determined based on the execution environment.
        """
        # Call the parent class' __init__ method
        super().__init__(event_linker=event_linker, debug=debug)

        # Check if the provided background_tasks object is valid
        if background_tasks is None:
            raise PyventusException("The 'background_tasks' argument cannot be None.")
        if not isinstance(background_tasks, BackgroundTasks):
            raise PyventusException("The 'background_tasks' argument must be an instance of the BackgroundTasks class.")

        # Set the background tasks
        self._background_tasks: BackgroundTasks = background_tasks

    def _process(self, event_emission: EventEmitter.EventEmission) -> None:
        # Submit the event emission to the background tasks
        self._background_tasks.add_task(event_emission)

Functions

emit

emit(event: EmittableEventType, *args: Any, **kwargs: Any) -> None

Emits an event and triggers its associated event handlers.

Notes:

  • When emitting dataclass objects or Exception objects, they are automatically passed to the event handler as the first positional argument, even if you pass additional *args or **kwargs.
  • If there are event handlers subscribed to the global event ..., also known as Ellipsis, they will also be triggered each time an event or exception is emitted.
PARAMETER DESCRIPTION
event

The event to be emitted. It can be str, a dataclass object, or an Exception object.

TYPE: EmittableEventType

args

Positional arguments to be passed to the event handlers.

TYPE: Any DEFAULT: ()

kwargs

Keyword arguments to be passed to the event handlers.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

None

Source code in pyventus/emitters/event_emitter.py
def emit(self, /, event: EmittableEventType, *args: Any, **kwargs: Any) -> None:
    """
    Emits an event and triggers its associated event handlers.

    **Notes:**

    -   When emitting `dataclass` objects or `Exception` objects, they are automatically passed
        to the event handler as the first positional argument, even if you pass additional `*args`
        or `**kwargs`.
    -   If there are event handlers subscribed to the global event `...`, also known as `Ellipsis`,
        they will also be triggered each time an event or exception is emitted.

    :param event: The event to be emitted. It can be `str`, a `dataclass`
        object, or an `Exception` object.
    :param args: Positional arguments to be passed to the event handlers.
    :param kwargs: Keyword arguments to be passed to the event handlers.
    :return: None
    """
    # Raise an exception if the event is None
    if event is None:
        raise PyventusException("The 'event' argument cannot be None.")

    # Raise an exception if the event is a type
    if isinstance(event, type):
        raise PyventusException("The 'event' argument cannot be a type.")

    # Determine the event name
    event_name: str = self._event_linker.get_event_name(event=event if isinstance(event, str) else type(event))

    # Retrieve the event handlers associated with the event
    event_handlers: List[EventHandler] = self._event_linker.get_event_handlers_by_events(event_name, Ellipsis)

    # Sort the event handlers by timestamp
    event_handlers.sort(key=lambda handler: handler.timestamp)

    # Initialize the list of event handlers to be executed
    pending_event_handlers: List[EventHandler] = []

    # Iterate through each event handler
    for event_handler in event_handlers:
        # Check if the event handler is a one-time subscription
        if event_handler.once:
            # If the event handler is a one-time subscription, we attempt to remove it.
            if self._event_linker.remove_event_handler(event_handler=event_handler):  # pragma: no cover (Race-Cond)
                # If the removal is successful, it indicates that the handler has not
                # been processed before, so we add it to the pending list.
                pending_event_handlers.append(event_handler)
        else:
            pending_event_handlers.append(event_handler)

    # Check if the pending list of event handlers is not empty
    if len(pending_event_handlers) > 0:
        # Create a new EventEmission instance
        event_emission: EventEmitter.EventEmission = EventEmitter.EventEmission(
            event=event_name,
            event_handlers=pending_event_handlers,
            event_args=args if isinstance(event, str) else (event, *args),
            event_kwargs=kwargs,
            debug=self._logger.debug_enabled,
        )

        # Log the event emission when debug is enabled
        if self._logger.debug_enabled:  # pragma: no cover
            self._logger.debug(
                action="Emitting Event:",
                msg=f"{event_emission.event}{StdOutColors.PURPLE} ID:{StdOutColors.DEFAULT} {event_emission.id}",
            )

        # Delegate the event emission processing to subclasses
        self._process(event_emission)

    # Log a debug message if there are no event handlers subscribed to the event
    elif self._logger.debug_enabled:  # pragma: no cover
        self._logger.debug(
            action="Emitting Event:",
            msg=f"{event_name}{StdOutColors.PURPLE} Message:{StdOutColors.DEFAULT} No event handlers subscribed",
        )

options classmethod

options(event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> Callable[[BackgroundTasks], FastAPIEventEmitter]

Returns a decorator that allows you to configure the FastAPIEventEmitter class when using FastAPI's Depends method.

PARAMETER DESCRIPTION
event_linker

Specifies the type of event linker used to manage and access events along with their corresponding event handlers. Defaults to EventLinker.

TYPE: Type[EventLinker] DEFAULT: EventLinker

debug

Specifies the debug mode for the logger. If None, it is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

RETURNS DESCRIPTION
Callable[[BackgroundTasks], FastAPIEventEmitter]

A decorator that can be used with the Depends method.

Source code in pyventus/emitters/fastapi/fastapi_event_emitter.py
@classmethod
def options(
    cls, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None
) -> Callable[[BackgroundTasks], "FastAPIEventEmitter"]:
    """
    Returns a decorator that allows you to configure the `FastAPIEventEmitter` class
    when using FastAPI's `Depends` method.
    :param event_linker: Specifies the type of event linker used to manage and access
        events along with their corresponding event handlers. Defaults to `EventLinker`.
    :param debug: Specifies the debug mode for the logger. If `None`, it is
        determined based on the execution environment.
    :return: A decorator that can be used with the `Depends` method.
    """

    def wrapper(background_tasks: BackgroundTasks) -> "FastAPIEventEmitter":
        """
        A decorator wrapper function that configures the `FastAPIEventEmitter` class with
        the provided options.
        :param background_tasks: The FastAPI `BackgroundTasks` object used to handle
            the execution of event emissions.
        :return: An instance of `FastAPIEventEmitter` configured with the specified options.
        """
        return cls(background_tasks=background_tasks, event_linker=event_linker, debug=debug)

    return wrapper

__init__

__init__(background_tasks: BackgroundTasks, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None

Initialize an instance of FastAPIEventEmitter.

PARAMETER DESCRIPTION
background_tasks

The FastAPI BackgroundTasks object used to handle the execution of event emissions.

TYPE: BackgroundTasks

event_linker

Specifies the type of event linker to use for associating events with their respective event handlers. Defaults to EventLinker.

TYPE: Type[EventLinker] DEFAULT: EventLinker

debug

Specifies the debug mode for the logger. If None, it is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/emitters/fastapi/fastapi_event_emitter.py
def __init__(
    self,
    background_tasks: BackgroundTasks,
    event_linker: Type[EventLinker] = EventLinker,
    debug: bool | None = None,
) -> None:
    """
    Initialize an instance of `FastAPIEventEmitter`.
    :param background_tasks: The FastAPI `BackgroundTasks` object used to handle
        the execution of event emissions.
    :param event_linker: Specifies the type of event linker to use for associating
        events with their respective event handlers. Defaults to `EventLinker`.
    :param debug: Specifies the debug mode for the logger. If `None`, it is
        determined based on the execution environment.
    """
    # Call the parent class' __init__ method
    super().__init__(event_linker=event_linker, debug=debug)

    # Check if the provided background_tasks object is valid
    if background_tasks is None:
        raise PyventusException("The 'background_tasks' argument cannot be None.")
    if not isinstance(background_tasks, BackgroundTasks):
        raise PyventusException("The 'background_tasks' argument must be an instance of the BackgroundTasks class.")

    # Set the background tasks
    self._background_tasks: BackgroundTasks = background_tasks