Skip to content

Event Emitters

  Now that your events and responses are all linked up, you may need a way to dispatch these events throughout your application and trigger their corresponding responses. For this purpose, Pyventus provides you with what is known as an Event Emitter.

  The Event Emitter in Pyventus is essentially a base class that allows you to orchestrate the emission of events within your application in a modular and decoupled manner.

Getting Started

  In order to start working with the Event Emitter, the first thing you will need to do is to create a new instance of it. To do so, you will need what is known as an event processor, which is an instance of the Processing Service interface. This processor is crucial for the Event Emitter, as it is responsible for processing the emission of events.

1
2
3
4
5
from pyventus.events import EventEmitter
from pyventus.core.processing.asyncio import AsyncIOProcessingService  # (1)!

event_emitter = EventEmitter(event_processor=AsyncIOProcessingService())
event_emitter.emit("MyEvent")
  1. You can import and use any of the supported processing services, or even create your own custom ones.
1
2
3
4
5
from pyventus.events import AsyncIOEventEmitter, EventEmitter


event_emitter: EventEmitter = AsyncIOEventEmitter()  # (1)!
event_emitter.emit("MyEvent")
  1. There is a factory method for each of the supported processing services. For reference, you can check the Event Emitter utilities.

Using Custom Event Linkers

  By default, the Event Emitter comes with the base Event Linker class configured as the primary access point for events and their responses. However, you have the flexibility to specify the Event Linker class that the Event Emitter will use.

from pyventus.events import EventEmitter, EventLinker
from pyventus.core.processing.asyncio import AsyncIOProcessingService


class CustomEventLinker(EventLinker): ...

CustomEventLinker.subscribe("MyEvent", event_callback=print)
EventLinker.subscribe("MyEvent", event_callback=print)  # (1)!

event_emitter = EventEmitter(
    event_linker=CustomEventLinker,
    event_processor=AsyncIOProcessingService(),
)
event_emitter.emit("MyEvent", "Hello, World!")
  1. This event subscriber will not be triggered, as it is subscribed to the Event Linker base class rather than the custom one that was created.
from pyventus.events import AsyncIOEventEmitter, EventEmitter, EventLinker


class CustomEventLinker(EventLinker): ...


CustomEventLinker.subscribe("MyEvent", event_callback=print)
EventLinker.subscribe("MyEvent", event_callback=print)  # (1)!


event_emitter: EventEmitter = AsyncIOEventEmitter(
    event_linker=CustomEventLinker,
)
event_emitter.emit("MyEvent", "Hello, World!")
  1. This event subscriber will not be triggered, as it is subscribed to the Event Linker base class rather than the custom one that was created.

Using Custom Event Processors

  If you need a different strategy for processing event emissions, you can easily do so by providing an instance of a subclass of the Processing Service interface to the Event Emitter.

import asyncio

from pyventus.core.processing import ProcessingService
from pyventus.events import EventEmitter, EventLinker


class CustomProcessingService(ProcessingService):
    def submit(self, callback, *args, **kwargs):  # (1)!
        coro = callback(*args, **kwargs)  # (2)!
        if asyncio.iscoroutine(coro):
            asyncio.run(coro)


event_emitter = EventEmitter(event_processor=CustomProcessingService())

EventLinker.subscribe("MyEvent", event_callback=print)
event_emitter.emit("MyEvent", "Hello, World!")
  1. Implement this method to define the processing strategy. See the example below.
  2. The given callback can be either a synchronous or an asynchronous Python function. For typing, you can refer to the base submit() method.

Runtime Flexibility

  Thanks to the separation of concerns between the Event Linker and the Event Emitter, you can easily change the Event Emitter at runtime without needing to reconfigure all connections or implement complex logic.

from concurrent.futures import ThreadPoolExecutor

from pyventus.events import AsyncIOEventEmitter, EventEmitter, EventLinker, ExecutorEventEmitter


def main(event_emitter: EventEmitter) -> None:
    event_emitter.emit("MyEvent", f"Using: {event_emitter}!")


if __name__ == "__main__":
    executor = ThreadPoolExecutor()
    EventLinker.subscribe("MyEvent", event_callback=print)

    main(event_emitter=AsyncIOEventEmitter())
    main(event_emitter=ExecutorEventEmitter(executor))

    executor.shutdown()

Debug Mode

  Pyventus' Event Emitter offers a useful debug mode feature to help you understand the flow of events and troubleshoot your event-driven application. You can enable debug mode in the Event Emitter using the following options:

Global Debug Mode

  By default, Pyventus leverages Python's global debug tracing feature to determine whether the code is running in debug mode or not. When this mode is enabled, all local debug flags are set to True unless they are already configured. To activate global debug mode, simply run your code in a debugger like pdb.

EventLinker Global Debug Mode

Instance Debug Mode

  Alternatively, if you want to enable or disable debug mode for a specific Event Emitter instance, you can use the debug flag. Setting the debug flag to True enables debug mode for that instance, while setting it to False disables debug mode.

1
2
3
4
from pyventus.events import AsyncIOEventEmitter, EventEmitter

event_emitter: EventEmitter = AsyncIOEventEmitter(debug=True)
event_emitter.emit("MyEvent", "Hello, World!")
1
2
3
4
from pyventus.events import AsyncIOEventEmitter, EventEmitter

event_emitter: EventEmitter = AsyncIOEventEmitter(debug=False)
event_emitter.emit("MyEvent", "Hello, World!")