Skip to content

CeleryEventEmitter class

Bases: EventEmitter

An event emitter subclass that utilizes the Celery distributed system to handle the execution of event emissions.

Notes:

  • This class uses a Celery Queue instance to enqueue event emissions, which are subsequently executed by Celery workers. This approach provides a scalable and distributed method for handling the execution of event emissions.

Read more in the Pyventus docs for Celery Event Emitter.

Source code in pyventus/emitters/celery/celery_event_emitter.py
class CeleryEventEmitter(EventEmitter):
    """
    An event emitter subclass that utilizes the Celery distributed system
    to handle the execution of event emissions.

    **Notes:**

    -   This class uses a Celery Queue instance to enqueue event emissions, which are
        subsequently executed by Celery workers. This approach provides a scalable
        and distributed method for handling the execution of event emissions.

    ---
    Read more in the
    [Pyventus docs for Celery Event Emitter](https://mdapena.github.io/pyventus/tutorials/emitters/celery/).
    """

    class Queue:
        """A Celery event emitter queue used for enqueuing event emissions."""

        class Serializer:
            """An event emitter object serializer for Celery queues."""

            @staticmethod
            def dumps(obj: EventEmitter.EventEmission) -> Any:
                """
                Serializes the event emission object.
                :param obj: The event emission object to be serialized.
                :return: The serialized representation of the event emission object.
                """
                return dumps(obj)  # pragma: no cover

            @staticmethod
            def loads(serialized_obj: Any) -> EventEmitter.EventEmission:
                """
                Deserializes the task payload back to the event emission object.
                :param serialized_obj: The serialized object to be loaded.
                :return: The deserialized event emission object.
                """
                return cast(EventEmitter.EventEmission, loads(serialized_obj))  # pragma: no cover

        class _Payload(NamedTuple):
            """The Celery event emitter queue payload."""

            serialized_obj: bytes
            """Serialized event emission object."""

            obj_hash: bytes | None
            """Hash of the serialized event emission object."""

            @classmethod
            def from_json(cls, **kwargs: Any) -> "CeleryEventEmitter.Queue._Payload":
                """
                Creates a Payload instance from a JSON-compatible dictionary.
                :param kwargs: JSON-compatible dictionary representing the payload.
                :return: Payload instance.
                :raises ValueError: If the JSON data is missing fields or contains extra keys.
                """
                # Get the field names of the named tuple
                tuple_fields: tuple[str, ...] = CeleryEventEmitter.Queue._Payload._fields

                # Check if all expected fields are present
                if not set(tuple_fields).issubset(kwargs.keys()):
                    raise PyventusException("Missing fields in JSON data.")

                # Check for extra keys in the JSON data
                extra_keys = set(kwargs.keys()) - set(tuple_fields)
                if extra_keys:
                    raise PyventusException("Extra keys in JSON data: {}".format(extra_keys))

                # Create the named tuple using the JSON data
                return cls(**kwargs)

            def to_json(self) -> Dict[str, Any]:
                """
                Converts the payload to a JSON-compatible dictionary.
                :return: JSON-compatible dictionary representing the payload.
                """
                return self._asdict()

        def __init__(
            self,
            celery: Celery,
            name: str | None = None,
            secret: str | None = None,
            serializer: Type[Serializer] = Serializer,
        ) -> None:
            """
            Initialize an instance of `CeleryEventEmitter.Queue`.
            :param celery: The Celery object used to enqueue and process event emissions.
            :param name: The name of the queue where the event emission will be enqueued.
                Default is None (task_default_queue).
            :param secret: The secret key used for message authentication and integrity validation.
                This key is used for hashing the event emission object and verifying its integrity.
            :param serializer: The serializer class used for serializing and deserializing event
                emission objects.
            :raises PyventusException: If the Celery object is None, or the secret key is not None
                and empty, or if the content type 'application/x-python-serialize' is not accepted.
            """
            if celery is None:
                raise PyventusException("The 'celery' argument cannot be None.")
            if not isinstance(celery, Celery):
                raise PyventusException("The 'celery' argument must be an instance of the Celery class.")

            if secret is not None and len(secret) == 0:
                raise PyventusException("The 'secret' argument cannot be empty.")

            if "application/x-python-serialize" not in celery.conf.accept_content:
                raise PyventusException(
                    "Unsupported content type. "
                    "'application/x-python-serialize' is not in the list of accepted content types."
                )

            # Set the Celery queue properties
            self._celery: Celery = celery
            self._name: str = self._celery.conf.task_default_queue if name is None else name
            self._secret: bytes | None = secret.encode("utf-8") if secret else None
            self._digestmod: str | Callable[[], Any] | ModuleType = sha256  # The digest algorithm used for hashing
            self._serializer: Type[CeleryEventEmitter.Queue.Serializer] = serializer

            # Register the event processor method as a Celery task
            self._celery.task(self._executor, name=f"pyventus{self._executor.__name__}", queue=self._name)

        def enqueue(self, event_emission: EventEmitter.EventEmission) -> None:
            """
            Enqueues an event emission object for asynchronous processing in Celery.

            This method takes an `EventEmission` object and enqueues it for asynchronous
            execution by Celery workers. If a secret key is provided during initialization,
            the event emission object is first serialized, and its hash is calculated using
            the secret key. This hash is used to verify the integrity of the event emission
            object during execution.

            :param event_emission: The event emission object to be enqueued for asynchronous execution.
            :return: None
            """
            # Serialize the event emission object
            serialized_obj: Any = self._serializer.dumps(event_emission)

            # Calculate the hash of the serialized object if a secret key was provided
            obj_hash: Any | None = None
            if self._secret:  # pragma: no cover
                obj_hash = hmac.new(key=self._secret, msg=serialized_obj, digestmod=self._digestmod).digest()

            # Create a payload with the serialized event emission instance and its hash
            payload = CeleryEventEmitter.Queue._Payload(
                serialized_obj=serialized_obj,
                obj_hash=obj_hash,
            )

            # Send the event emission object to Celery for asynchronous execution
            self._celery.send_task(
                f"pyventus{self._executor.__name__}",
                kwargs=payload.to_json(),
                queue=self._name,
            )

        def _executor(self, **kwargs: Any) -> None:
            """
            Process the enqueued event emission object.

            This method serves as the Celery task responsible
            for processing the enqueued event emission.

            :param kwargs: The JSON-compatible dictionary representing the payload.
            :return: None
            """
            # Create a Payload instance from the JSON data
            payload = CeleryEventEmitter.Queue._Payload.from_json(**kwargs)

            # Check payload
            if self._secret:  # pragma: no cover
                if not payload.obj_hash:
                    raise PyventusException("Invalid payload structure.")

                # Verify the integrity of the payload
                obj_hash: bytes = hmac.new(
                    key=self._secret, msg=payload.serialized_obj, digestmod=self._digestmod
                ).digest()

                # Compare the calculated hash with the provided payload hash
                if not hmac.compare_digest(payload.obj_hash, obj_hash):
                    raise PyventusException("Payload integrity verification failed.")
            elif payload.obj_hash:  # pragma: no cover
                raise PyventusException("Unexpected payload structure.")

            # Deserialize the event emission object
            event_emission = self._serializer.loads(payload.serialized_obj)

            # Check if the deserialized event emission object is valid
            if event_emission is None:  # pragma: no cover
                raise PyventusException("Failed to deserialize the event emission object.")

            # Run the event emission
            asyncio.run(event_emission())

    def __init__(self, queue: Queue, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None:
        """
        Initialize an instance of `CeleryEventEmitter`.
        :param queue: The queue used for enqueuing event emissions in the Celery event emitter.
        :param event_linker: Specifies the type of event linker used to manage and access
            events along with their corresponding event handlers. Defaults to `EventLinker`.
        :param debug: Specifies the debug mode for the logger. If `None`, it is
            determined based on the execution environment.
        """
        # Call the parent class' __init__ method
        super().__init__(event_linker=event_linker, debug=debug)

        # Validate the queue argument
        if queue is None:
            raise PyventusException("The 'queue' argument cannot be None")
        if not isinstance(queue, CeleryEventEmitter.Queue):
            raise PyventusException("The 'queue' argument must be an instance of the CeleryEventEmitter.Queue class.")

        # Store the queue object
        self._queue: CeleryEventEmitter.Queue = queue

    def _process(self, event_emission: EventEmitter.EventEmission) -> None:
        # Add the event emission object to the Celery queue for asynchronous execution
        self._queue.enqueue(event_emission=event_emission)

Classes

Queue

A Celery event emitter queue used for enqueuing event emissions.

Source code in pyventus/emitters/celery/celery_event_emitter.py
class Queue:
    """A Celery event emitter queue used for enqueuing event emissions."""

    class Serializer:
        """An event emitter object serializer for Celery queues."""

        @staticmethod
        def dumps(obj: EventEmitter.EventEmission) -> Any:
            """
            Serializes the event emission object.
            :param obj: The event emission object to be serialized.
            :return: The serialized representation of the event emission object.
            """
            return dumps(obj)  # pragma: no cover

        @staticmethod
        def loads(serialized_obj: Any) -> EventEmitter.EventEmission:
            """
            Deserializes the task payload back to the event emission object.
            :param serialized_obj: The serialized object to be loaded.
            :return: The deserialized event emission object.
            """
            return cast(EventEmitter.EventEmission, loads(serialized_obj))  # pragma: no cover

    class _Payload(NamedTuple):
        """The Celery event emitter queue payload."""

        serialized_obj: bytes
        """Serialized event emission object."""

        obj_hash: bytes | None
        """Hash of the serialized event emission object."""

        @classmethod
        def from_json(cls, **kwargs: Any) -> "CeleryEventEmitter.Queue._Payload":
            """
            Creates a Payload instance from a JSON-compatible dictionary.
            :param kwargs: JSON-compatible dictionary representing the payload.
            :return: Payload instance.
            :raises ValueError: If the JSON data is missing fields or contains extra keys.
            """
            # Get the field names of the named tuple
            tuple_fields: tuple[str, ...] = CeleryEventEmitter.Queue._Payload._fields

            # Check if all expected fields are present
            if not set(tuple_fields).issubset(kwargs.keys()):
                raise PyventusException("Missing fields in JSON data.")

            # Check for extra keys in the JSON data
            extra_keys = set(kwargs.keys()) - set(tuple_fields)
            if extra_keys:
                raise PyventusException("Extra keys in JSON data: {}".format(extra_keys))

            # Create the named tuple using the JSON data
            return cls(**kwargs)

        def to_json(self) -> Dict[str, Any]:
            """
            Converts the payload to a JSON-compatible dictionary.
            :return: JSON-compatible dictionary representing the payload.
            """
            return self._asdict()

    def __init__(
        self,
        celery: Celery,
        name: str | None = None,
        secret: str | None = None,
        serializer: Type[Serializer] = Serializer,
    ) -> None:
        """
        Initialize an instance of `CeleryEventEmitter.Queue`.
        :param celery: The Celery object used to enqueue and process event emissions.
        :param name: The name of the queue where the event emission will be enqueued.
            Default is None (task_default_queue).
        :param secret: The secret key used for message authentication and integrity validation.
            This key is used for hashing the event emission object and verifying its integrity.
        :param serializer: The serializer class used for serializing and deserializing event
            emission objects.
        :raises PyventusException: If the Celery object is None, or the secret key is not None
            and empty, or if the content type 'application/x-python-serialize' is not accepted.
        """
        if celery is None:
            raise PyventusException("The 'celery' argument cannot be None.")
        if not isinstance(celery, Celery):
            raise PyventusException("The 'celery' argument must be an instance of the Celery class.")

        if secret is not None and len(secret) == 0:
            raise PyventusException("The 'secret' argument cannot be empty.")

        if "application/x-python-serialize" not in celery.conf.accept_content:
            raise PyventusException(
                "Unsupported content type. "
                "'application/x-python-serialize' is not in the list of accepted content types."
            )

        # Set the Celery queue properties
        self._celery: Celery = celery
        self._name: str = self._celery.conf.task_default_queue if name is None else name
        self._secret: bytes | None = secret.encode("utf-8") if secret else None
        self._digestmod: str | Callable[[], Any] | ModuleType = sha256  # The digest algorithm used for hashing
        self._serializer: Type[CeleryEventEmitter.Queue.Serializer] = serializer

        # Register the event processor method as a Celery task
        self._celery.task(self._executor, name=f"pyventus{self._executor.__name__}", queue=self._name)

    def enqueue(self, event_emission: EventEmitter.EventEmission) -> None:
        """
        Enqueues an event emission object for asynchronous processing in Celery.

        This method takes an `EventEmission` object and enqueues it for asynchronous
        execution by Celery workers. If a secret key is provided during initialization,
        the event emission object is first serialized, and its hash is calculated using
        the secret key. This hash is used to verify the integrity of the event emission
        object during execution.

        :param event_emission: The event emission object to be enqueued for asynchronous execution.
        :return: None
        """
        # Serialize the event emission object
        serialized_obj: Any = self._serializer.dumps(event_emission)

        # Calculate the hash of the serialized object if a secret key was provided
        obj_hash: Any | None = None
        if self._secret:  # pragma: no cover
            obj_hash = hmac.new(key=self._secret, msg=serialized_obj, digestmod=self._digestmod).digest()

        # Create a payload with the serialized event emission instance and its hash
        payload = CeleryEventEmitter.Queue._Payload(
            serialized_obj=serialized_obj,
            obj_hash=obj_hash,
        )

        # Send the event emission object to Celery for asynchronous execution
        self._celery.send_task(
            f"pyventus{self._executor.__name__}",
            kwargs=payload.to_json(),
            queue=self._name,
        )

    def _executor(self, **kwargs: Any) -> None:
        """
        Process the enqueued event emission object.

        This method serves as the Celery task responsible
        for processing the enqueued event emission.

        :param kwargs: The JSON-compatible dictionary representing the payload.
        :return: None
        """
        # Create a Payload instance from the JSON data
        payload = CeleryEventEmitter.Queue._Payload.from_json(**kwargs)

        # Check payload
        if self._secret:  # pragma: no cover
            if not payload.obj_hash:
                raise PyventusException("Invalid payload structure.")

            # Verify the integrity of the payload
            obj_hash: bytes = hmac.new(
                key=self._secret, msg=payload.serialized_obj, digestmod=self._digestmod
            ).digest()

            # Compare the calculated hash with the provided payload hash
            if not hmac.compare_digest(payload.obj_hash, obj_hash):
                raise PyventusException("Payload integrity verification failed.")
        elif payload.obj_hash:  # pragma: no cover
            raise PyventusException("Unexpected payload structure.")

        # Deserialize the event emission object
        event_emission = self._serializer.loads(payload.serialized_obj)

        # Check if the deserialized event emission object is valid
        if event_emission is None:  # pragma: no cover
            raise PyventusException("Failed to deserialize the event emission object.")

        # Run the event emission
        asyncio.run(event_emission())

Classes

Serializer

An event emitter object serializer for Celery queues.

Source code in pyventus/emitters/celery/celery_event_emitter.py
class Serializer:
    """An event emitter object serializer for Celery queues."""

    @staticmethod
    def dumps(obj: EventEmitter.EventEmission) -> Any:
        """
        Serializes the event emission object.
        :param obj: The event emission object to be serialized.
        :return: The serialized representation of the event emission object.
        """
        return dumps(obj)  # pragma: no cover

    @staticmethod
    def loads(serialized_obj: Any) -> EventEmitter.EventEmission:
        """
        Deserializes the task payload back to the event emission object.
        :param serialized_obj: The serialized object to be loaded.
        :return: The deserialized event emission object.
        """
        return cast(EventEmitter.EventEmission, loads(serialized_obj))  # pragma: no cover
Functions
dumps staticmethod
dumps(obj: EventEmission) -> Any

Serializes the event emission object.

PARAMETER DESCRIPTION
obj

The event emission object to be serialized.

TYPE: EventEmission

RETURNS DESCRIPTION
Any

The serialized representation of the event emission object.

Source code in pyventus/emitters/celery/celery_event_emitter.py
@staticmethod
def dumps(obj: EventEmitter.EventEmission) -> Any:
    """
    Serializes the event emission object.
    :param obj: The event emission object to be serialized.
    :return: The serialized representation of the event emission object.
    """
    return dumps(obj)  # pragma: no cover
loads staticmethod
loads(serialized_obj: Any) -> EventEmission

Deserializes the task payload back to the event emission object.

PARAMETER DESCRIPTION
serialized_obj

The serialized object to be loaded.

TYPE: Any

RETURNS DESCRIPTION
EventEmission

The deserialized event emission object.

Source code in pyventus/emitters/celery/celery_event_emitter.py
@staticmethod
def loads(serialized_obj: Any) -> EventEmitter.EventEmission:
    """
    Deserializes the task payload back to the event emission object.
    :param serialized_obj: The serialized object to be loaded.
    :return: The deserialized event emission object.
    """
    return cast(EventEmitter.EventEmission, loads(serialized_obj))  # pragma: no cover

Functions

__init__
__init__(celery: Celery, name: str | None = None, secret: str | None = None, serializer: Type[Serializer] = Serializer) -> None

Initialize an instance of CeleryEventEmitter.Queue.

PARAMETER DESCRIPTION
celery

The Celery object used to enqueue and process event emissions.

TYPE: Celery

name

The name of the queue where the event emission will be enqueued. Default is None (task_default_queue).

TYPE: str | None DEFAULT: None

secret

The secret key used for message authentication and integrity validation. This key is used for hashing the event emission object and verifying its integrity.

TYPE: str | None DEFAULT: None

serializer

The serializer class used for serializing and deserializing event emission objects.

TYPE: Type[Serializer] DEFAULT: Serializer

RAISES DESCRIPTION
PyventusException

If the Celery object is None, or the secret key is not None and empty, or if the content type 'application/x-python-serialize' is not accepted.

Source code in pyventus/emitters/celery/celery_event_emitter.py
def __init__(
    self,
    celery: Celery,
    name: str | None = None,
    secret: str | None = None,
    serializer: Type[Serializer] = Serializer,
) -> None:
    """
    Initialize an instance of `CeleryEventEmitter.Queue`.
    :param celery: The Celery object used to enqueue and process event emissions.
    :param name: The name of the queue where the event emission will be enqueued.
        Default is None (task_default_queue).
    :param secret: The secret key used for message authentication and integrity validation.
        This key is used for hashing the event emission object and verifying its integrity.
    :param serializer: The serializer class used for serializing and deserializing event
        emission objects.
    :raises PyventusException: If the Celery object is None, or the secret key is not None
        and empty, or if the content type 'application/x-python-serialize' is not accepted.
    """
    if celery is None:
        raise PyventusException("The 'celery' argument cannot be None.")
    if not isinstance(celery, Celery):
        raise PyventusException("The 'celery' argument must be an instance of the Celery class.")

    if secret is not None and len(secret) == 0:
        raise PyventusException("The 'secret' argument cannot be empty.")

    if "application/x-python-serialize" not in celery.conf.accept_content:
        raise PyventusException(
            "Unsupported content type. "
            "'application/x-python-serialize' is not in the list of accepted content types."
        )

    # Set the Celery queue properties
    self._celery: Celery = celery
    self._name: str = self._celery.conf.task_default_queue if name is None else name
    self._secret: bytes | None = secret.encode("utf-8") if secret else None
    self._digestmod: str | Callable[[], Any] | ModuleType = sha256  # The digest algorithm used for hashing
    self._serializer: Type[CeleryEventEmitter.Queue.Serializer] = serializer

    # Register the event processor method as a Celery task
    self._celery.task(self._executor, name=f"pyventus{self._executor.__name__}", queue=self._name)
enqueue
enqueue(event_emission: EventEmission) -> None

Enqueues an event emission object for asynchronous processing in Celery.

This method takes an EventEmission object and enqueues it for asynchronous execution by Celery workers. If a secret key is provided during initialization, the event emission object is first serialized, and its hash is calculated using the secret key. This hash is used to verify the integrity of the event emission object during execution.

PARAMETER DESCRIPTION
event_emission

The event emission object to be enqueued for asynchronous execution.

TYPE: EventEmission

RETURNS DESCRIPTION
None

None

Source code in pyventus/emitters/celery/celery_event_emitter.py
def enqueue(self, event_emission: EventEmitter.EventEmission) -> None:
    """
    Enqueues an event emission object for asynchronous processing in Celery.

    This method takes an `EventEmission` object and enqueues it for asynchronous
    execution by Celery workers. If a secret key is provided during initialization,
    the event emission object is first serialized, and its hash is calculated using
    the secret key. This hash is used to verify the integrity of the event emission
    object during execution.

    :param event_emission: The event emission object to be enqueued for asynchronous execution.
    :return: None
    """
    # Serialize the event emission object
    serialized_obj: Any = self._serializer.dumps(event_emission)

    # Calculate the hash of the serialized object if a secret key was provided
    obj_hash: Any | None = None
    if self._secret:  # pragma: no cover
        obj_hash = hmac.new(key=self._secret, msg=serialized_obj, digestmod=self._digestmod).digest()

    # Create a payload with the serialized event emission instance and its hash
    payload = CeleryEventEmitter.Queue._Payload(
        serialized_obj=serialized_obj,
        obj_hash=obj_hash,
    )

    # Send the event emission object to Celery for asynchronous execution
    self._celery.send_task(
        f"pyventus{self._executor.__name__}",
        kwargs=payload.to_json(),
        queue=self._name,
    )

Functions

emit

emit(event: EmittableEventType, *args: Any, **kwargs: Any) -> None

Emits an event and triggers its associated event handlers.

Notes:

  • When emitting dataclass objects or Exception objects, they are automatically passed to the event handler as the first positional argument, even if you pass additional *args or **kwargs.
  • If there are event handlers subscribed to the global event ..., also known as Ellipsis, they will also be triggered each time an event or exception is emitted.
PARAMETER DESCRIPTION
event

The event to be emitted. It can be str, a dataclass object, or an Exception object.

TYPE: EmittableEventType

args

Positional arguments to be passed to the event handlers.

TYPE: Any DEFAULT: ()

kwargs

Keyword arguments to be passed to the event handlers.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

None

Source code in pyventus/emitters/event_emitter.py
def emit(self, /, event: EmittableEventType, *args: Any, **kwargs: Any) -> None:
    """
    Emits an event and triggers its associated event handlers.

    **Notes:**

    -   When emitting `dataclass` objects or `Exception` objects, they are automatically passed
        to the event handler as the first positional argument, even if you pass additional `*args`
        or `**kwargs`.
    -   If there are event handlers subscribed to the global event `...`, also known as `Ellipsis`,
        they will also be triggered each time an event or exception is emitted.

    :param event: The event to be emitted. It can be `str`, a `dataclass`
        object, or an `Exception` object.
    :param args: Positional arguments to be passed to the event handlers.
    :param kwargs: Keyword arguments to be passed to the event handlers.
    :return: None
    """
    # Raise an exception if the event is None
    if event is None:
        raise PyventusException("The 'event' argument cannot be None.")

    # Raise an exception if the event is a type
    if isinstance(event, type):
        raise PyventusException("The 'event' argument cannot be a type.")

    # Determine the event name
    event_name: str = self._event_linker.get_event_name(event=event if isinstance(event, str) else type(event))

    # Retrieve the event handlers associated with the event
    event_handlers: List[EventHandler] = self._event_linker.get_event_handlers_by_events(event_name, Ellipsis)

    # Sort the event handlers by timestamp
    event_handlers.sort(key=lambda handler: handler.timestamp)

    # Initialize the list of event handlers to be executed
    pending_event_handlers: List[EventHandler] = []

    # Iterate through each event handler
    for event_handler in event_handlers:
        # Check if the event handler is a one-time subscription
        if event_handler.once:
            # If the event handler is a one-time subscription, we attempt to remove it.
            if self._event_linker.remove_event_handler(event_handler=event_handler):  # pragma: no cover (Race-Cond)
                # If the removal is successful, it indicates that the handler has not
                # been processed before, so we add it to the pending list.
                pending_event_handlers.append(event_handler)
        else:
            pending_event_handlers.append(event_handler)

    # Check if the pending list of event handlers is not empty
    if len(pending_event_handlers) > 0:
        # Create a new EventEmission instance
        event_emission: EventEmitter.EventEmission = EventEmitter.EventEmission(
            event=event_name,
            event_handlers=pending_event_handlers,
            event_args=args if isinstance(event, str) else (event, *args),
            event_kwargs=kwargs,
            debug=self._logger.debug_enabled,
        )

        # Log the event emission when debug is enabled
        if self._logger.debug_enabled:  # pragma: no cover
            self._logger.debug(
                action="Emitting Event:",
                msg=f"{event_emission.event}{StdOutColors.PURPLE} ID:{StdOutColors.DEFAULT} {event_emission.id}",
            )

        # Delegate the event emission processing to subclasses
        self._process(event_emission)

    # Log a debug message if there are no event handlers subscribed to the event
    elif self._logger.debug_enabled:  # pragma: no cover
        self._logger.debug(
            action="Emitting Event:",
            msg=f"{event_name}{StdOutColors.PURPLE} Message:{StdOutColors.DEFAULT} No event handlers subscribed",
        )

__init__

__init__(queue: Queue, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None

Initialize an instance of CeleryEventEmitter.

PARAMETER DESCRIPTION
queue

The queue used for enqueuing event emissions in the Celery event emitter.

TYPE: Queue

event_linker

Specifies the type of event linker used to manage and access events along with their corresponding event handlers. Defaults to EventLinker.

TYPE: Type[EventLinker] DEFAULT: EventLinker

debug

Specifies the debug mode for the logger. If None, it is determined based on the execution environment.

TYPE: bool | None DEFAULT: None

Source code in pyventus/emitters/celery/celery_event_emitter.py
def __init__(self, queue: Queue, event_linker: Type[EventLinker] = EventLinker, debug: bool | None = None) -> None:
    """
    Initialize an instance of `CeleryEventEmitter`.
    :param queue: The queue used for enqueuing event emissions in the Celery event emitter.
    :param event_linker: Specifies the type of event linker used to manage and access
        events along with their corresponding event handlers. Defaults to `EventLinker`.
    :param debug: Specifies the debug mode for the logger. If `None`, it is
        determined based on the execution environment.
    """
    # Call the parent class' __init__ method
    super().__init__(event_linker=event_linker, debug=debug)

    # Validate the queue argument
    if queue is None:
        raise PyventusException("The 'queue' argument cannot be None")
    if not isinstance(queue, CeleryEventEmitter.Queue):
        raise PyventusException("The 'queue' argument must be an instance of the CeleryEventEmitter.Queue class.")

    # Store the queue object
    self._queue: CeleryEventEmitter.Queue = queue