Skip to content

Celery Event Emitter

🏗️ Work in Progress

This section is currently being rebuilt.

  The CeleryEventEmitter provides a powerful way to build event-driven applications that can handle high volumes of work in a scalable and asynchronous manner.

What is it?

  The CeleryEventEmitter is a concrete implementation of the EventEmitter that leverages the Celery distributed task queue system for event handling. It provides the capability to enqueue and process event emissions in a scalable and asynchronous manner using Celery. This makes the CeleryEventEmitter particularly useful for handling resource-intensive tasks.

How it works

  The CeleryEventEmitter seamlessly handles the emission and processing of events by utilizing the Celery package. Here's a breakdown of how it functions:

  1. Event emission: When an event is triggered, an object is created and submitted as a task to the Celery queue.
  2. Task queue: The Celery broker stores the task in its queue, where it can be retrieved by workers.
  3. Worker processing: Idle Celery workers pull tasks from the queue and execute the event emissions asynchronously in parallel.

Usage

To start using the CeleryEventEmitter, follow these steps:

  1. Install Celery: Before proceeding, make sure you have installed the Celery optional dependency.
  2. Define event handlers: Let's start with the definition of the event handlers. It is important to note that these functions cannot reside in the main module. Therefore, we need to create another module where all our event handlers can be placed. For this example, let's create a file called event_handlers.py and add the handlers to be processed.
    event_handlers.py
    import asyncio
    import time
    
    from pyventus.linkers import EventLinker
    
    
    @EventLinker.on("StringEvent")
    async def slow_async_event_callback():
        print("Starting the async slow process...")
        await asyncio.sleep(5)
        print("Finishing the async slow process!")
    
    
    @EventLinker.on("StringEvent")
    def slow_sync_event_callback():
        print("Starting the sync slow process...")
        time.sleep(5)
        print("Finishing the sync slow process!")
    
  3. Celery worker: Once you have defined the event handlers, the next step is to configure the Celery workers to process event emissions within a distributed task queue system. To accomplish this, create a file called worker.py and include the following worker configuration. These workers will actively listen to a message broker like RabbitMQ or Redis and process incoming tasks. For more advanced configurations, refer to the official Celery documentation.
    Serialization Security

      It's important to set the content type in the Celery app to application/x-python-serialize. This allows the event emission object to be serialized and deserialized when tasks are processed. The CeleryEventEmitter queue can authenticate and validate any serialized payloads through hashing methods and a secret key. Moreover, a custom serializer can be implemented if the default does not meet the specific needs of your project.

    worker.py
    from celery import Celery
    from pyventus.emitters.celery import CeleryEventEmitter
    
    # To ensure Python recognizes the existence of the event handlers, we need to import them.
    from event_handlers import slow_sync_event_callback, slow_async_event_callback
    
    # Using Redis as a broker for example purpose. For the Redis support 'pip install celery[redis]'
    app: Celery = Celery("worker", broker="redis://default:redispw@localhost:6379")
    
    # Optional configuration, see the Celery app user guide.
    app.conf.update(result_expires=3600)
    
    # Set the accepted content type to "application/x-python-serialize" in the Celery app.
    app.conf.accept_content = ["application/json", "application/x-python-serialize"]
    
    # Create the celery event emitter queue.
    celery_event_emitter_queue = CeleryEventEmitter.Queue(celery=app, secret="secret-key")
    
    if __name__ == "__main__":
        app.start()
    
  4. Launching Celery Workers: With the previous configuration and setup complete, we can now launch the Celery worker processes. There are a few differences depending on your operating system:
    • For Linux/macOS:
      celery -A worker worker -l INFO
      
    • For Windows:
      celery -A worker worker -l INFO --pool=solo
      
    • Specifying a Queue:
      celery -A worker worker -l INFO -Q [queue-name]
      
  5. Emitting events: To emit events, we will create a main.py file where we instantiate the CeleryEventEmitter and trigger our first event.
    main.py
    1
    2
    3
    4
    5
    6
    7
    8
    from pyventus import EventEmitter
    from pyventus.emitters.celery import CeleryEventEmitter
    
    from worker import celery_event_emitter_queue
    
    if __name__ == "__main__":
        event_emitter: EventEmitter = CeleryEventEmitter(queue=celery_event_emitter_queue)
        event_emitter.emit("StringEvent")
    

Recap

  We've explored how the CeleryEventEmitter provides an asynchronous and scalable solution for processing events. Here are the key points:

  • Events are emitted and serialized into tasks submitted to the Celery queue.
  • Celery workers then asynchronously process the queued event emissions independently and in parallel.
  • This distributed approach provides scalable event handling under any workload.

  In summary, the CeleryEventEmitter leverages Celery's distributed task queue architecture to efficiently scale event-driven applications through asynchronous parallel processing of event emissions. This elastic approach allows applications to handle increasing workloads in a scalable manner.