Skip to content

Pyventus


Pyventus


Tests Docs Coverage Status Package version Supported Python versions Code style: black


Documentation: https://mdapena.github.io/pyventus

Source Code: https://github.com/mdapena/pyventus


  Pyventus is a powerful Python package for event-driven programming. It offers a comprehensive suite of tools to easily define, emit, and orchestrate events. With Pyventus, you can build scalable, extensible, and loosely-coupled event-driven applications.

Key Features

Pyventus offers several key features, such as:

  • Sync and Async Support ─ Whether your code is synchronous or asynchronous, Pyventus allows you to define event handlers as either sync or async callbacks and emit events from both scopes.
  • Customization ─ Whether you choose official emitters or custom ones, Pyventus allows you to customize the behavior and capabilities of the event emitters to perfectly align with your unique requirements.
  • An Intuitive API ─ Pyventus provides a user-friendly API for defining events, emitters, and handlers. Its design simplifies the process of working with events, enabling you to organize your code around discrete events and their associated actions.
  • Runtime Flexibility ─ Pyventus' runtime flexibility allows you to switch seamlessly between different built-in or custom event emitter implementations on the fly, providing a dynamic and adaptable environment for event-driven programming.
  • Reliable Event Handling ─ Pyventus allows you to define handlers to customize how events are processed upon completion. Attach success and failure logic to take targeted actions based on the outcome of each event execution.
  • Scalability and Maintainability ─ By adopting an event-driven approach with Pyventus, you can create scalable and maintainable code thanks to the loose coupling between its components that enables extensibility and modularity.
  • Comprehensive Documentation ─ Pyventus provides a comprehensive documentation suite that includes API references, usage examples, and tutorials to effectively leverage all the features and capabilities of the package.

Quick Start

  Pyventus is published as a Python package and can be installed using pip, ideally in a virtual environment for proper dependency isolation. To get started, open up a terminal and install Pyventus with the following command:

pip install pyventus

  Pyventus by default relies on the Python standard library and requires Python 3.10 or higher with no additional dependencies. However, this package also includes alternative integrations to access additional features such as Redis Queue, Celery, and FastAPI. For more information on this matter, please refer to the Optional Dependencies section.

A Simple Example

  Experience the power of Pyventus through a simple Hello, World! example that illustrates the core concepts and basic usage of the package. By following this example, you’ll learn how to subscribe to events and emit them within your application.

Hello, World! Example
from pyventus import EventLinker, EventEmitter, AsyncIOEventEmitter


@EventLinker.on("GreetEvent")
def handle_greet_event():
    print("Hello, World!")


event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit("GreetEvent")
You can also work with async functions and contexts...
Hello, World! Example (Async version)
from pyventus import EventLinker, EventEmitter, AsyncIOEventEmitter


@EventLinker.on("GreetEvent")
async def handle_greet_event():
    print("Hello, World!")


event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit("GreetEvent")

  As we can see from the Hello, World! example, Pyventus follows a simple and intuitive workflow for defining and emitting events. Let's recap the essential steps involved:

  1. Importing Necessary Modules: We first imported the required modules from Pyventus, which encompassed the EventLinker class, the EventEmitter interface, and the AsyncIOEventEmitter implementation.
  2. Linking Events to Callbacks: Next, we used the @EventLinker.on() decorator to define and link the string event GreetEvent to the function handle_greet_event(), which will print 'Hello, World!' to the console whenever the GreetEvent is emitted.
  3. Instantiating an Event Emitter: After that, and in order to trigger our event, we needed to create an instance of the event emitter class. While AsyncIOEventEmitter was utilized, any built-in or custom implementation could be employed.
  4. Triggering the Event: Finally, by using the emit() method of the event emitter instance, we were able to trigger the GreetEvent, which resulted in the execution of the handle_greet_event() callback.

  Having gained a clear understanding of the workflow showcased in the Hello, World! example, you are now well-equipped to explore more intricate event-driven scenarios and fully harness the capabilities of Pyventus in your own projects. For a deep dive into the package's functionalities, you can refer to the Pyventus Tutorials or API.

A Practical Example

  To showcase Pyventus' event-driven capabilities in a real-world scenario, we will explore a practical example of implementing a voltage sensor using an event-driven architecture (crucial for such scenarios). The purpose of this example is to create an efficient voltage sensor that can seamlessly handle real-time data and respond appropriately to specific voltage conditions.

Example ─ Monitoring Voltage Levels Across Devices (Context)

Macro photography of black circuit board

  A common aspect found in many systems is the need to monitor and respond to changes in sensor data. Whether it's pressure sensors, temperature sensors, or other types, capturing and reacting to sensor data is crucial for effective system operation. In our practical example, we will focus on a specific scenario: building a sensor system that monitors voltage levels across devices. The goal of our voltage sensor is to detect potential issues, such as low or high voltage conditions, and respond appropriately in real-time.

  To accomplish our goal, we will define a VoltageSensor class to read voltage levels and emit events based on predefined thresholds. We will create event handlers to respond to these events, performing actions such as activating eco-mode for low voltage or implementing high-voltage protection. Additionally, a shared event handler will provide general notifications for out-of-range voltage situations. The code example below illustrates the implementation of this system.

Voltage Sensor System with Pyventus (Practical Example)
import asyncio
import random

from pyventus import EventEmitter, EventLinker, AsyncIOEventEmitter


class VoltageSensor:

    def __init__(self, name: str, low: float, high: float, event_emitter: EventEmitter) -> None:
        # Initialize the VoltageSensor object with the provided parameters
        self._name: str = name
        self._low: float = low
        self._high: float = high
        self._event_emitter: EventEmitter = event_emitter

    async def __call__(self) -> None:
        # Start voltage readings for the sensor
        print(f"Starting voltage readings for: {self._name}")
        print(f"Low: {self._low:.3g}v | High: {self._high:.3g}v\n-----------\n")

        while True:
            # Simulate sensor readings
            voltage: float = random.uniform(0, 5)
            print("\tSensor Reading:", "\033[32m", f"{voltage:.3g}v", "\033[0m")

            # Emit events based on voltage readings
            if voltage < self._low:
                self._event_emitter.emit("LowVoltageEvent", sensor=self._name, voltage=voltage)
            elif voltage > self._high:
                self._event_emitter.emit("HighVoltageEvent", sensor=self._name, voltage=voltage)

            await asyncio.sleep(1)


@EventLinker.on("LowVoltageEvent")
def handle_low_voltage_event(sensor: str, voltage: float):
    print(f"🪫 Turning on eco-mode for '{sensor}'. ({voltage:.3g}v)\n")
    # Perform action for low voltage...


@EventLinker.on("HighVoltageEvent")
async def handle_high_voltage_event(sensor: str, voltage: float):
    print(f"⚡ Starting high-voltage protection for '{sensor}'. ({voltage:.3g}v)\n")
    # Perform action for high voltage...


@EventLinker.on("LowVoltageEvent", "HighVoltageEvent")
def handle_voltage_event(sensor: str, voltage: float):
    print(f"\033[31m\nSensor '{sensor}' out of range.\033[0m (Voltage: {voltage:.3g})")
    # Perform notification for out of range voltage...


async def main():
    # Initialize the sensor and run the sensor readings
    sensor = VoltageSensor(name="PressureSensor", low=0.5, high=3.9, event_emitter=AsyncIOEventEmitter())
    await asyncio.gather(sensor(), )  # Add new sensors inside the 'gather' for multi-device monitoring


asyncio.run(main())

  As we can see from this practical example, Pyventus enables us to easily build an event-driven system for voltage sensors that is flexible, efficient, and highly responsive. With its intuitive API and support for both synchronous and asynchronous operations, we were able to effectively monitor voltage levels, detect anomalies, and trigger appropriate actions in real-time.

Support for Synchronous and Asynchronous Code

  Pyventus is designed from the ground up to seamlessly support both synchronous and asynchronous programming models. Its unified sync/async API allows you to define event callbacks and emit events across sync and async contexts.

Subscribing Event Handlers with Sync and Async Callbacks

1
2
3
4
5
6
7
8
@EventLinker.on("MyEvent")
def sync_event_callback():
    pass  # Synchronous event handling


@EventLinker.on("MyEvent")
async def async_event_callback():
    pass  # Asynchronous event handling
You can optimize the execution of your callbacks based on their workload...

  By default, event handlers in Pyventus are executed concurrently during an event emission, running their sync and async callbacks as defined. However, if you have a sync callback that involves I/O or non-CPU bound operations, you can enable the force_async parameter to offload it to a thread pool, ensuring optimal performance and responsiveness. The force_async parameter utilizes the asyncio.to_thread() function to execute sync callbacks asynchronously.

1
2
3
4
5
6
7
@EventLinker.on("BlockingIO", force_async=True)
def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

Emitting Events from Sync and Async Contexts

1
2
3
4
5
6
7
8
# Emitting an event within a sync function
def sync_function(event_emitter: EventEmitter):
    event_emitter.emit("MyEvent")


# Emitting an event within an async function
async def async_function(event_emitter: EventEmitter):
    event_emitter.emit("MyEvent")
Event propagation within different contexts...

  While Pyventus provides a base EventEmitter class with a unified sync/async API, the specific propagation behavior when emitting events may vary depending on the concrete EventEmitter used. For example, the AsyncIOEventEmitter implementation leverages the AsyncIO event loop to schedule callbacks added from asynchronous contexts without blocking. But alternative emitters could structure propagation differently to suit their needs.

Runtime Flexibility and Customization

  At its core, Pyventus utilizes a modular event emitter design that allows you to switch seamlessly between different built-in or custom event emitter implementations on the fly. Whether you opt for official emitters or decide to create your custom ones, Pyventus allows you to tailor the behavior and capabilities of the event emitters to perfectly align with your unique requirements.

Swapping Event Emitter Implementations at Runtime

  By leveraging the principle of dependency inversion and using the base EventEmitter as a dependency, you can change the concrete implementation on the fly. Let's demonstrate this using the AsyncIO Event Emitter and the Executor Event Emitter:

Event Emitter Runtime Flexibility Example
from pyventus import EventLinker, EventEmitter, AsyncIOEventEmitter, ExecutorEventEmitter


@EventLinker.on("GreetEvent")
def handle_greet_event(name: str = "World"):
    print(f"Hello, {name}!")


if __name__ == "__main__":
    def main(event_emitter: EventEmitter) -> None:
        event_emitter.emit("GreetEvent", name=type(event_emitter).__name__)


    main(event_emitter=AsyncIOEventEmitter())
    with ExecutorEventEmitter() as executor_event_emitter:
        main(event_emitter=executor_event_emitter)

Defining Custom Event Emitters

  To illustrate Pyventus' customization capabilities, we will define and implement a custom event emitter class for the FastAPI framework. This class will efficiently handle the execution of event emissions through its background tasks workflow.

Custom Event Emitter Example
from fastapi import BackgroundTasks

from pyventus import EventEmitter, EventLinker


class FastAPIEventEmitter(EventEmitter):
    """A custom event emitter that uses the FastAPI background tasks."""

    def __init__(self, background_tasks: BackgroundTasks):
        super().__init__(event_linker=EventLinker, debug=False)
        self._background_tasks = background_tasks

    def _process(self, event_emission: EventEmitter.EventEmission) -> None:
        self._background_tasks.add_task(event_emission)  # Process the event emission as a background task
Official FastAPIEventEmitter Integration.

In case you're interested in integrating Pyventus with FastAPI, you can refer to the official Pyventus FastAPI Event Emitter implementation.

Event Objects and Global Events

  In addition to string events, Pyventus also supports Event Objects, which provide a structured way to define events and encapsulate relevant data payloads.

Event Object Example
@dataclass  # Define a Python dataclass representing the event and its payload.
class OrderCreatedEvent:
    order_id: int
    payload: dict[str, any]


@EventLinker.on(OrderCreatedEvent)  # Subscribe event handlers to the event.
def handle_order_created_event(event: OrderCreatedEvent):
    # Pyventus will automatically pass the Event Object 
    # as the first positional argument.
    print(f"Event Object: {event}")


event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit(
    event=OrderCreatedEvent(  # Emit an instance of the event!
        order_id=6452879,
        payload={},
    ),
)

  Furthermore, Pyventus provides support for Global Events, which are particularly useful for implementing cross-cutting concerns such as logging, monitoring, or analytics. By subscribing event handlers to ... or Ellipsis, you can capture all events that may occur within that EventLinker context.

Global Event Example
1
2
3
4
5
6
7
@EventLinker.on(...)
def handle_any_event(*args, **kwargs):
    print(f"Perform logging...\nArgs: {args}\tKwargs: {kwargs}")


event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit("GreetEvent", name="Pyventus")

Success and Error Handling

  With Pyventus, you can customize how events are handled upon completion, whether they succeed or encounter errors. This customization is achieved by using either the EventLinker's on() or once() decorator within a with statement block. Inside this block, you can define not only the event callbacks but also the overall workflow of the event. Now, let’s explore this simple yet powerful Pythonic syntax of Pyventus through an example.

Success and Error Handling Example
from pyventus import EventLinker, EventEmitter, AsyncIOEventEmitter

# Create an event linker for the "DivisionEvent"
with EventLinker.on("DivisionEvent") as linker:
    @linker.on_event
    def divide(a: float, b: float) -> float:
        return a / b

    @linker.on_success
    def handle_success(result: float) -> None:
        print(f"Division result: {result:.3g}")

    @linker.on_failure
    def handle_failure(e: Exception) -> None:
        print(f"Oops, something went wrong: {e}")

event_emitter: EventEmitter = AsyncIOEventEmitter()  # Create an event emitter
event_emitter.emit("DivisionEvent", a=1, b=0)  # Example: Division by zero
event_emitter.emit("DivisionEvent", a=1, b=2)  # Example: Valid division

  As we have seen from the example, Pyventus offers a reliable and Pythonic solution for customizing event handling. By utilizing the EventLinker and its decorators within a with statement block, we were able to define the DivisionEvent and specify the callbacks for division, success, and failure cases.

Continuous Evolution

  Pyventus continuously adapts to support developers across technological and programming domains. Its aim is to remain at the forefront of event-driven design. Future development may introduce new official event emitters, expanding compatibility with different technologies through seamless integration.

  Current default emitters provide reliable out-of-the-box capabilities for common use cases. They efficiently handle core event operations and lay the foundation for building event-driven applications.

Driving Innovation Through Collaboration

  Pyventus is an open source project that welcomes community involvement. If you wish to contribute additional event emitters, improvements, or bug fixes, please check the Contributing section for guidelines on collaborating. Together, we can further the possibilities of event-driven development.

License

  Pyventus is distributed as open source software and is released under the MIT License. You can view the full text of the license in the LICENSE file located in the Pyventus repository.