Skip to content

RQEventEmitter class

Bases: EventEmitter

An event emitter subclass that utilizes the Redis Queue system to handle the execution of event emissions.

Notes:

  • This class uses a Redis Queue instance to enqueue event emissions, which are subsequently executed by Redis Queue workers. This approach provides a scalable and distributed method for handling the execution of event emissions.

Read more in the Pyventus docs for RQ Event Emitter.

Source code in pyventus/emitters/rq/rq_event_emitter.py
class RQEventEmitter(EventEmitter):
    """
    An event emitter subclass that utilizes the Redis Queue system to handle the
    execution of event emissions.

    **Notes:**

    -   This class uses a Redis Queue instance to enqueue event emissions, which are
        subsequently executed by Redis Queue workers. This approach provides a scalable
        and distributed method for handling the execution of event emissions.

    ---
    Read more in the
    [Pyventus docs for RQ Event Emitter](https://mdapena.github.io/pyventus/tutorials/emitters/rq/).
    """

    def __init__(
        self,
        queue: Queue,
        options: Dict[str, Any] | None = None,
        event_linker: Type[EventLinker] = EventLinker,
        debug: bool | None = None,
    ) -> None:
        """
        Initialize an instance of `RQEventEmitter`.
        :param queue: The Redis queue for enqueuing event handlers.
        :param options: Additional options for the RQ package enqueueing method.
            Defaults to an empty dictionary.
        :param event_linker: Specifies the type of event linker used to manage and access
            events along with their corresponding event handlers. Defaults to `EventLinker`.
        :param debug: Specifies the debug mode for the logger. If `None`, it is
            determined based on the execution environment.
        """
        # Call the parent class' __init__ method
        super().__init__(event_linker=event_linker, debug=debug)

        # Validate the queue argument
        if queue is None:
            raise PyventusException("The 'queue' argument cannot be None.")
        if not isinstance(queue, Queue):
            raise PyventusException("The 'queue' argument must be an instance of the Queue class.")

        # Store the Redis queue and RQ options
        self._queue: Queue = queue
        self._options: Dict[str, Any] = options if options is not None else {}

    def _process(self, event_emission: EventEmitter.EventEmission) -> None:
        # Add the event emission to the Redis Queue
        self._queue.enqueue(event_emission, **self._options)

Functions

emit

emit(event: EmittableEventType, *args: Any, **kwargs: Any) -> None

Emits an event and triggers its associated event handlers.

Notes:

  • When emitting dataclass objects or Exception objects, they are automatically passed to the event handler as the first positional argument, even if you pass additional *args or **kwargs.
  • If there are event handlers subscribed to the global event ..., also known as Ellipsis, they will also be triggered each time an event or exception is emitted.
PARAMETER DESCRIPTION
event

The event to be emitted. It can be str, a dataclass object, or an Exception object.

TYPE: EmittableEventType

args

Positional arguments to be passed to the event handlers.

TYPE: Any DEFAULT: ()

kwargs

Keyword arguments to be passed to the event handlers.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

None

Source code in pyventus/emitters/event_emitter.py
def emit(self, /, event: EmittableEventType, *args: Any, **kwargs: Any) -> None:
    """
    Emits an event and triggers its associated event handlers.

    **Notes:**

    -   When emitting `dataclass` objects or `Exception` objects, they are automatically passed
        to the event handler as the first positional argument, even if you pass additional `*args`
        or `**kwargs`.
    -   If there are event handlers subscribed to the global event `...`, also known as `Ellipsis`,
        they will also be triggered each time an event or exception is emitted.

    :param event: The event to be emitted. It can be `str`, a `dataclass`
        object, or an `Exception` object.
    :param args: Positional arguments to be passed to the event handlers.
    :param kwargs: Keyword arguments to be passed to the event handlers.
    :return: None
    """
    # Raise an exception if the event is None
    if event is None:
        raise PyventusException("The 'event' argument cannot be None.")

    # Raise an exception if the event is a type
    if isinstance(event, type):
        raise PyventusException("The 'event' argument cannot be a type.")

    # Determine the event name
    event_name: str = self._event_linker.get_event_name(event=event if isinstance(event, str) else type(event))

    # Retrieve the event handlers associated with the event
    event_handlers: List[EventHandler] = self._event_linker.get_event_handlers_by_events(event_name, Ellipsis)

    # Sort the event handlers by timestamp
    event_handlers.sort(key=lambda handler: handler.timestamp)

    # Initialize the list of event handlers to be executed
    pending_event_handlers: List[EventHandler] = []

    # Iterate through each event handler
    for event_handler in event_handlers:
        # Check if the event handler is a one-time subscription
        if event_handler.once:
            # If the event handler is a one-time subscription, we attempt to remove it.
            if self._event_linker.remove_event_handler(event_handler=event_handler):  # pragma: no cover (Race-Cond)
                # If the removal is successful, it indicates that the handler has not
                # been processed before, so we add it to the pending list.
                pending_event_handlers.append(event_handler)
        else:
            pending_event_handlers.append(event_handler)

    # Check if the pending list of event handlers is not empty
    if len(pending_event_handlers) > 0:
        # Create a new EventEmission instance
        event_emission: EventEmitter.EventEmission = EventEmitter.EventEmission(
            event=event_name,
            event_handlers=pending_event_handlers,
            event_args=args if isinstance(event, str) else (event, *args),
            event_kwargs=kwargs,
            debug=self._logger.debug_enabled,
        )

        # Log the event emission when debug is enabled
        if self._logger.debug_enabled:  # pragma: no cover
            self._logger.debug(
                action="Emitting Event:",
                msg=f"{event_emission.event}{StdOutColors.PURPLE} ID:{StdOutColors.DEFAULT} {event_emission.id}",
            )

        # Delegate the event emission processing to subclasses
        self._process(event_emission)

    # Log a debug message if there are no event handlers subscribed to the event
    elif self._logger.debug_enabled:  # pragma: no cover
        self._logger.debug(
            action="Emitting Event:",
            msg=f"{event_name}{StdOutColors.PURPLE} Message:{StdOutColors.DEFAULT} No event handlers subscribed",
        )

__init__

__init__(queue: Queue, options: Dict[str, Any] | None = None, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None

Initialize an instance of RQEventEmitter.

PARAMETER DESCRIPTION
queue

The Redis queue for enqueuing event handlers.

TYPE: Queue

options

Additional options for the RQ package enqueueing method. Defaults to an empty dictionary.

TYPE: Dict[str, Any] | None DEFAULT: None

event_linker

Specifies the type of event linker used to manage and access events along with their corresponding event handlers. Defaults to EventLinker.

TYPE: Type[EventLinker] DEFAULT: EventLinker

debug

Specifies the debug mode for the logger. If None, it is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/emitters/rq/rq_event_emitter.py
def __init__(
    self,
    queue: Queue,
    options: Dict[str, Any] | None = None,
    event_linker: Type[EventLinker] = EventLinker,
    debug: bool | None = None,
) -> None:
    """
    Initialize an instance of `RQEventEmitter`.
    :param queue: The Redis queue for enqueuing event handlers.
    :param options: Additional options for the RQ package enqueueing method.
        Defaults to an empty dictionary.
    :param event_linker: Specifies the type of event linker used to manage and access
        events along with their corresponding event handlers. Defaults to `EventLinker`.
    :param debug: Specifies the debug mode for the logger. If `None`, it is
        determined based on the execution environment.
    """
    # Call the parent class' __init__ method
    super().__init__(event_linker=event_linker, debug=debug)

    # Validate the queue argument
    if queue is None:
        raise PyventusException("The 'queue' argument cannot be None.")
    if not isinstance(queue, Queue):
        raise PyventusException("The 'queue' argument must be an instance of the Queue class.")

    # Store the Redis queue and RQ options
    self._queue: Queue = queue
    self._options: Dict[str, Any] = options if options is not None else {}