Skip to content

RedisProcessingService class

Bases: ProcessingService

A processing service that utilizes the Redis Queue framework to handle the execution of calls.

Notes:

  • This service leverages the RQ Python package to enqueue the provided callbacks into a Redis distributed task system, which is monitored by multiple workers. Once enqueued, these callbacks are eligible for retrieval and processing by available workers, enabling a scalable and distributed approach to handling calls asynchronously.

  • Synchronous callbacks are executed in a blocking manner inside the worker, while asynchronous callbacks are processed within a new asyncio event loop using the asyncio.run() function.

Source code in pyventus/core/processing/redis/redis_processing_service.py
class RedisProcessingService(ProcessingService):
    """
    A processing service that utilizes the `Redis Queue` framework to handle the execution of calls.

    **Notes:**

    -   This service leverages the `RQ` Python package to enqueue the provided callbacks into a Redis
        distributed task system, which is monitored by multiple workers. Once enqueued, these callbacks
        are eligible for retrieval and processing by available workers, enabling a scalable and
        distributed approach to handling calls asynchronously.

    -   Synchronous callbacks are executed in a blocking manner inside the worker, while asynchronous
        callbacks are processed within a new asyncio event loop using the `asyncio.run()` function.
    """

    # Attributes for the RedisProcessingService
    __slots__ = ("__queue", "__options")

    def __init__(self, queue: Queue, options: dict[str, Any] | None = None) -> None:
        """
        Initialize an instance of `RedisProcessingService`.

        :param queue: The Redis queue used to enqueue and process callbacks.
        :param options: Additional options for the RQ package enqueueing method.
            Defaults to None (an empty dictionary).
        :return: None.
        :raises PyventusException: If the 'queue' argument is None or not an instance
            of the `Queue` class.
        """
        # Validate the queue instance.
        if queue is None or not isinstance(queue, Queue):
            raise PyventusException("The 'queue' argument must be an instance of the Queue class.")

        # Store the Redis queue and RQ options
        self.__queue: Queue = queue
        self.__options: dict[str, Any] = options if options else {}

    def __repr__(self) -> str:
        """
        Retrieve a string representation of the instance.

        :return: A string representation of the instance.
        """
        return formatted_repr(
            instance=self,
            info=attributes_repr(
                queue=self.__queue,
                options=self.__options,
            ),
        )

    @override
    def submit(self, callback: ProcessingServiceCallbackType, *args: Any, **kwargs: Any) -> None:
        # Send the callback and its arguments to Redis for asynchronous execution.
        self.__queue.enqueue(callback, *args, **kwargs, **self.__options)

Functions

__init__

__init__(queue: Queue, options: dict[str, Any] | None = None) -> None

Initialize an instance of RedisProcessingService.

PARAMETER DESCRIPTION
queue

The Redis queue used to enqueue and process callbacks.

TYPE: Queue

options

Additional options for the RQ package enqueueing method. Defaults to None (an empty dictionary).

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
None

None.

RAISES DESCRIPTION
PyventusException

If the 'queue' argument is None or not an instance of the Queue class.

Source code in pyventus/core/processing/redis/redis_processing_service.py
def __init__(self, queue: Queue, options: dict[str, Any] | None = None) -> None:
    """
    Initialize an instance of `RedisProcessingService`.

    :param queue: The Redis queue used to enqueue and process callbacks.
    :param options: Additional options for the RQ package enqueueing method.
        Defaults to None (an empty dictionary).
    :return: None.
    :raises PyventusException: If the 'queue' argument is None or not an instance
        of the `Queue` class.
    """
    # Validate the queue instance.
    if queue is None or not isinstance(queue, Queue):
        raise PyventusException("The 'queue' argument must be an instance of the Queue class.")

    # Store the Redis queue and RQ options
    self.__queue: Queue = queue
    self.__options: dict[str, Any] = options if options else {}

submit

submit(callback: ProcessingServiceCallbackType, *args: Any, **kwargs: Any) -> None
Source code in pyventus/core/processing/redis/redis_processing_service.py
@override
def submit(self, callback: ProcessingServiceCallbackType, *args: Any, **kwargs: Any) -> None:
    # Send the callback and its arguments to Redis for asynchronous execution.
    self.__queue.enqueue(callback, *args, **kwargs, **self.__options)