Skip to content

Pyventus


Pyventus


Tests Docs Coverage Status Package Version Supported Python Versions Monthly Downloads


Documentation: https://mdapena.github.io/pyventus

Source Code: https://github.com/mdapena/pyventus


  Pyventus is a powerful Python library for event-driven and reactive programming, designed to simplify the development of asynchronous and event-based applications in Python.

Key Features

Pyventus offers several key features, such as:

  • Event-Driven & Reactive Programming ─ Whether you opt for an event-driven design or a reactive approach, Pyventus lets you select the paradigm that best fits your architecture.
  • High Performance ─ Pyventus is designed from the ground up with a focus on efficiency, taking into account optimizations for time complexity, memory usage, and Python-specific features.
  • Sync and Async Support ─ Whether your code is synchronous or asynchronous, Pyventus allows you to seamlessly work with both sync and async callables, as well as access its API from both contexts.
  • Reliable Asynchronous Processing ─ With Pyventus, you have full control over your asynchronous workflows, allowing you to customize how they are processed upon completion, whether they succeed or encounter errors.
  • Intuitive & User-Friendly API ─ Pyventus provides a user-friendly API that simplifies the process of working with event-driven and reactive paradigms, enabling you to organize your code around discrete actions and their responses.
  • Comprehensive Documentation ─ Pyventus offers a comprehensive documentation suite that includes API references, usage examples, and tutorials to effectively leverage all the features and capabilities of the library.

Quick Start

  Pyventus is published as a Python package and can be installed using pip, ideally in a virtual environment for proper dependency isolation. To get started, open up a terminal and install Pyventus with the following command:

pip install pyventus

  Pyventus by default relies on the Python standard library and requires Python 3.10 or higher with no additional dependencies aside from typing-extensions, which is primarily used to support advanced typing features in older versions of Python. However, this package also includes alternative integrations to access additional features such as asynchronous processing with Redis Queue and Celery. For more information on this matter, please refer to the Optional Dependencies section.

Basic Usage

  Let’s begin with some introductory examples that will not only illustrate the core concepts and basic usage of the library but also provide a foundation for more complex scenarios.

A Simple Event-Driven Example

  Starting with the event-driven paradigm, let's explore the capabilities of Pyventus through a simple event-based Hello, World! example, where you will learn how to subscribe to events and emit them within your application.

Hello, World! Example
from pyventus.events import AsyncIOEventEmitter, EventEmitter, EventLinker


@EventLinker.on("GreetEvent")
def handle_greet_event():
    print("Hello, World!")


event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit("GreetEvent")
You can also work with async functions and contexts...
Hello, World! Example (Async version)
from pyventus.events import AsyncIOEventEmitter, EventEmitter, EventLinker


@EventLinker.on("GreetEvent")
async def handle_greet_event():
    print("Hello, World!")


event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit("GreetEvent")

  As we can see from the Hello, World! example, Pyventus follows a simple and intuitive workflow for defining and emitting events. Let’s recap the essential steps involved:

  1. Importing Necessary Components: We first imported the required components from the events module of Pyventus, which included the EventLinker, the EventEmitter, and the AsyncIOEventEmitter factory method.
  2. Linking Events to Callbacks: Next, we used the @EventLinker.on() decorator to define and link the string event GreetEvent to the function handle_greet_event(), which will print "Hello, World!" to the console whenever the GreetEvent is emitted.
  3. Instantiating an Event Emitter: After that, and in order to trigger our event, we used the AsyncIOEventEmitter factory method to create an instance of the event emitter class, which in this case is preconfigured with the AsyncIOProcessingService.
  4. Triggering the Event: Finally, by using the emit() method of the event emitter instance, we triggered the GreetEvent, resulting in the execution of the handle_greet_event() callback.

  Having gained a clear understanding of the workflow showcased in the Hello, World! example, you are now well-equipped to explore more intricate event-driven scenarios and fully harness the capabilities of Pyventus in your own projects. For a deep dive into the package's functionalities, you can refer to the API and Learn sections.

A Simple Reactive Example

  Now, let's take a look at the capabilities of Pyventus within the reactive paradigm through a simple example, where you will not only learn how to define observables and stream data over time, but also how to subscribe to them.

Simple Counter Example
from pyventus.reactive import as_observable_task, Completed


@as_observable_task
def simple_counter(stop: int):
    for count in range(1, stop + 1):
        yield count
    raise Completed


obs = simple_counter(stop=16)
obs.subscribe(
    next_callback=lambda val: print(f"Received: {val}"),
    complete_callback=lambda: print("Done!"),
)
obs()
You can also work with async functions and contexts...
Simple Counter Example (Async version)
from pyventus.reactive import as_observable_task, Completed


@as_observable_task
async def simple_counter(stop: int):
    for count in range(1, stop + 1):
        yield count
    raise Completed


obs = simple_counter(stop=16)
obs.subscribe(
    next_callback=lambda val: print(f"Received: {val}"),
    complete_callback=lambda: print("All done!"),
)
obs()

  As shown in the Simple Counter example, Pyventus follows a simple and intuitive workflow for defining observables and streaming data to subscribers. Let’s recap the essential steps involved:

  1. Importing Necessary Components: We first imported the required components from the reactive module of Pyventus, which included the @as_observable_task decorator and the Completed signal.
  2. Defining Observables: After that, and using the @as_observable_task decorator in conjunction with the simple_counter() function, we defined our observable task, which, once executed, will yield a count from one up to the specified number and raise a Completed signal when done.
  3. Instantiating Observables: Then, we called the simple_counter() function to instantiate the observable task, so that we can subscribe to it and control its execution as needed.
  4. Subscribing to Observables: Next, we added a subscriber to the observable instance by calling its subscribe() method and passing the corresponding callbacks. In this case, we used two lambda functions: one to print the received values and another to indicate when the observable has completed emitting values.
  5. Executing Observables: Finally, and in order to initiate the execution of the observable, we called its instance, which resulted in the execution of the simple_counter() function and the streaming of its results.

  With a clear understanding of the workflow showcased in the Simple Counter example, you are now well-equipped to explore more intricate reactive scenarios and fully harness the capabilities of Pyventus in your own projects. For a deep dive into the package's functionalities, you can refer to the API and Learn sections.

Practical Examples

  To truly see Pyventus in action, let’s explore some practical examples that will not only illustrate specific use cases of the library but also showcase its various features and demonstrate how to use them effectively.

Dynamic Voltage Monitoring: An Event-Driven Perspective

Macro photography of a black circuit board illustrating a voltage sensor in action.

  A common aspect found in many systems is the need to monitor and respond to changes in sensor data. Whether it involves pressure, temperature, or voltage, capturing and reacting accordingly to sensor readings is crucial for any related process.

  In this practical example, we will focus on voltage sensors, where timely detection of low or high voltage conditions can prevent equipment damage and ensure system reliability. However, designing a sensor architecture that is both easy to extend and flexible can be challenging, especially if we want users to simply attach their logic without needing to understand or modify the underlying implementation. This complexity also increases if we aim for an architecture that enables a proper separation of concerns.

  One way to effectively address this challenge is by implementing an event-driven architecture, where each voltage sensor encapsulates its own logic for reading values and only exposes a series of events that users can utilize to attach their domain logic. To translate this into code, we will define a VoltageSensor class that reads voltage levels and emits events based on predefined thresholds using Pyventus. The code below illustrates the implementation of this use case.

Dynamic Voltage Monitoring (Event-Driven Implementation)
import asyncio
import random

from pyventus.events import AsyncIOEventEmitter, EventEmitter, EventLinker


class VoltageSensor:

    def __init__(self, name: str, low: float, high: float, event_emitter: EventEmitter) -> None:
        # Initialize the VoltageSensor object with the provided parameters
        self._name: str = name
        self._low: float = low
        self._high: float = high
        self._event_emitter: EventEmitter = event_emitter

    async def __call__(self) -> None:
        # Start voltage readings for the sensor
        print(f"Starting voltage readings for: {self._name}")
        print(f"Low: {self._low:.3g}v | High: {self._high:.3g}v\n-----------\n")

        while True:
            # Simulate sensor readings
            voltage: float = random.uniform(0, 5)
            print("\tSensor Reading:", "\033[32m", f"{voltage:.3g}v", "\033[0m")

            # Emit events based on voltage readings
            if voltage < self._low:
                self._event_emitter.emit("LowVoltageEvent", sensor=self._name, voltage=voltage)
            elif voltage > self._high:
                self._event_emitter.emit("HighVoltageEvent", sensor=self._name, voltage=voltage)

            await asyncio.sleep(1)


@EventLinker.on("LowVoltageEvent")
def handle_low_voltage_event(sensor: str, voltage: float):
    print(f"πŸͺ« Starting low-voltage protection for '{sensor}'. ({voltage:.3g}v)\n")
    # Perform action for low voltage...


@EventLinker.on("HighVoltageEvent")
def handle_high_voltage_event(sensor: str, voltage: float):
    print(f"⚑ Starting high-voltage protection for '{sensor}'. ({voltage:.3g}v)\n")
    # Perform action for high voltage...


@EventLinker.on("LowVoltageEvent", "HighVoltageEvent")
async def handle_voltage_event(sensor: str, voltage: float):
    print(f"\033[31m\nSensor '{sensor}' out of range.\033[0m (Voltage: {voltage:.3g})")
    # Perform notification for out of range voltage...


async def main():
    # Initialize the sensor and run the sensor readings
    sensor = VoltageSensor(name="CarSensor", low=0.5, high=3.9, event_emitter=AsyncIOEventEmitter())
    await asyncio.gather(sensor(), )  # Add new sensors inside the 'gather' for multi-device monitoring


asyncio.run(main())

Non-Blocking HTTP Fetcher: A Responsive Approach

A set of interconnected electronic devices in a dark room.

  In today's interconnected world, retrieving information from the network is essential for many applications. Whether through WebSocket connections, RESTful APIs, or HTTP requests, these methods facilitate vital data exchange. However, blocking network retrievals can severely impact user experience, making it imperative for applications to remain responsive.

  In this practical example, we will explore the design and implementation of a non-blocking HTTP getter function, along with its integration into a console-style application that mimics the behavior of a simplified web browser. While there are various mechanisms for implementing non-blocking HTTP fetchers, such as Python threads or the asyncio library, we will leverage the reactive paradigm of Pyventus due to its readability, declarative style, and ease of implementation.

  To accomplish this, we will first define a basic blocking HTTP function called http_get(). This function will then be transformed into an observable using the @as_observable_task decorator from Pyventus, allowing us to attach subscribers for result notifications. Finally, we will utilize the ThreadPoolExecutor for concurrent execution of the observables, enabling us to handle multiple requests seamlessly while maintaining an interactive user experience.

Non-Blocking HTTP Fetcher (Reactive Implementation)
from concurrent.futures import ThreadPoolExecutor
from http.client import HTTPConnection, HTTPException, HTTPResponse, HTTPSConnection
from urllib.parse import ParseResult, urlparse

from pyventus.reactive import as_observable_task


@as_observable_task
def http_get(url: str) -> str:
    """Perform an HTTP GET request and return the response data."""
    parsed_url: ParseResult = urlparse(url)  # Parse the URL

    # Create a connection based on the URL scheme (HTTP or HTTPS)
    if parsed_url.scheme == "https":
        connection: HTTPConnection = HTTPSConnection(parsed_url.netloc)
    else:
        connection: HTTPConnection = HTTPConnection(parsed_url.netloc)

    # Send the request, retrieve the response, and close the connection
    connection.request("GET", parsed_url.path)
    response: HTTPResponse = connection.getresponse()
    data: str = response.read().decode()
    connection.close()

    # Raise an exception for HTTP errors; otherwise, return the response
    if response.status >= 400:
        raise HTTPException(response.status, data)
    return data


def main():
    print(
        "🌐  Welcome to the Reactive HTTP Fetcher!\n\nπŸ’‘  Try searching for:\n"
        "    1. - https://httpbin.org/get\n"
        "    2. - https://httpbin.org/uuid\n"
        "    3. - https://httpbin.org/ip\n"
        "    4. - https://httpbin.org/404"
    )

    prompt = "\nπŸ”—  Enter the URL (Type '\033[36mexit\033[0m' to quit): "
    metrics = {"success_count": 0, "error_count": 0}  # Initialize metrics
    executor = ThreadPoolExecutor()  # Create a thread pool for concurrent execution

    while True:
        # Get user input
        url = input(prompt)
        if url.lower() == "exit":
            break

        # Call the HTTP function, which returns an observable
        obs = http_get(url)

        # Subscribe to the observable using a subscription context
        with obs.subscribe() as subctx:

            @subctx.on_next
            def next(result: str) -> None:
                metrics["success_count"] += 1  # Increment success count
                print(f"\r{' ' * len(prompt)}\rβœ…  HTTPResponse: {result!r}\n", end=f"{prompt}")

            @subctx.on_error
            def error(error: Exception) -> None:
                metrics["error_count"] += 1  # Increment error count
                print(f"\r{' ' * len(prompt)}\r⚠️   HTTPException: {error!r}\n", end=f"{prompt}")

        obs(executor)  # Execute the observable with the thread pool
        print(f"πŸ”  Requested URL: \033[32m{url!r}\033[0m \n⏳  Fetching data... ")

    # Shutdown the executor
    executor.shutdown()

    # Print summary of requests
    print(
        f"\n🎯  Summary:\n"
        f"    - Total Requests: {metrics['success_count'] + metrics['error_count']}\n"
        f"    - Successful Requests: \033[32m{metrics['success_count']}\033[0m\n"
        f"    - Error Requests: \033[31m{metrics['error_count']}\033[0m"
    )


main()

Event-Driven Programming: Key Highlights of Pyventus

  Alongside the standard functionalities of event-driven programming, Pyventus also introduces some unique aspects that set it apart from other implementations. In this section, we will cover some of these key features and how to use them effectively.

  • Event Objects ─ Besides supporting string-based events, as we've seen in previous examples, Pyventus also supports Event Objects, which provide a structured way to define events and encapsulate relevant data payloads.

    @dataclass  # Define a Python dataclass to represent an event and its payload.
    class OrderCreatedEvent:
        order_id: int
        payload: dict
    
    
    @EventLinker.on(OrderCreatedEvent)  # Use the event class to attach subscribers.
    def handle_order_created_event(event: OrderCreatedEvent):
        # The event instance is automatically passed as the first argument.
        # In methods with self or cls, the event is passed after those arguments.
        print(f"Event Object: {event}")
    
    
    event_emitter: EventEmitter = AsyncIOEventEmitter()
    event_emitter.emit(
        event=OrderCreatedEvent(  # Emit an instance of the event!
            order_id=6452879,
            payload={},
        ),
    )
    
  • Global Events ─ In addition to Event Objects and string-based events, Pyventus also provides support for global events, which are particularly useful for implementing cross-cutting concerns such as logging, monitoring, and analytics. By subscribing event callbacks to ..., you can capture all events that may occur within that specific EventLinker context.

    1
    2
    3
    4
    5
    6
    7
    @EventLinker.on(...)
    def logging(*args, **kwargs):
        print(f"Logging:\n- Args: {args}\n- Kwargs: {kwargs}")
    
    
    event_emitter: EventEmitter = AsyncIOEventEmitter()
    event_emitter.emit("AnyEvent", name="Pyventus")
    
  • Success and Error Handling ─ With Pyventus, you can customize how events are handled upon completion, whether they succeed or encounter errors. This customization is achieved through the configuration of the success and failure callbacks in the event workflow definition, which is done during the subscription process.

    from pyventus.events import AsyncIOEventEmitter, EventEmitter, EventLinker
    
    # Create a subscription context for the "DivisionEvent" event
    with EventLinker.on("DivisionEvent") as subctx:
    
        @subctx.on_event
        def divide(a: float, b: float) -> float:
            return a / b
    
        @subctx.on_success
        def handle_success(result: float) -> None:
            print(f"Division result: {result:.3g}")
    
        @subctx.on_failure
        def handle_failure(e: Exception) -> None:
            print(f"Oops, something went wrong: {e}")
    
    
    event_emitter: EventEmitter = AsyncIOEventEmitter()  # Create an event emitter
    event_emitter.emit("DivisionEvent", a=1, b=0)  # Example: Division by zero
    event_emitter.emit("DivisionEvent", a=1, b=2)  # Example: Valid division
    

    You can also set up your callbacks using the subscribe() method...

      Alternatively, for more straightforward definitions, such as lambda functions, or when you have existing functions defined elsewhere in your code, you can utilize the subscribe() method to set up these callbacks.

    from pyventus.events import AsyncIOEventEmitter, EventEmitter, EventLinker
    
    EventLinker.subscribe(
        "DivisionEvent",
        event_callback=lambda a, b: a / b,
        success_callback=lambda result: print(f"Division result: {result:.3g}"),
        failure_callback=lambda e: print(f"Oops, something went wrong: {e}"),
    )
    
    event_emitter: EventEmitter = AsyncIOEventEmitter()  # Create an event emitter
    event_emitter.emit("DivisionEvent", a=1, b=0)  # Example: Division by zero
    event_emitter.emit("DivisionEvent", a=1, b=2)  # Example: Valid division
    
  • Sync and Async Support ─ Pyventus is designed from the ground up to seamlessly support both synchronous and asynchronous programming models. Its unified sync/async API allows you to define event callbacks as either sync or async callables, as well as emit events from both contexts.

    1
    2
    3
    4
    5
    6
    7
    8
    @EventLinker.on("MyEvent")
    def sync_event_callback():
        pass  # Synchronous event handling
    
    
    @EventLinker.on("MyEvent")
    async def async_event_callback():
        pass  # Asynchronous event handling
    

    You can optimize the execution of your callbacks based on their workload...

      By default, event subscribers in Pyventus are executed concurrently during an event emission, running their sync and async callbacks as defined. However, if you have a sync callback that involves I/O or non-CPU bound operations, you can enable the force_async parameter to offload it to a thread pool, ensuring optimal performance and responsiveness. The offloading process is handled by the asyncio.to_thread() function.

    1
    2
    3
    4
    5
    6
    7
    @EventLinker.on("BlockingIO", force_async=True)
    def blocking_io():
        print(f"start blocking_io at {time.strftime('%X')}")
        # Note that time.sleep() can be replaced with any blocking
        # IO-bound operation, such as file operations.
        time.sleep(1)
        print(f"blocking_io complete at {time.strftime('%X')}")
    
    1
    2
    3
    4
    5
    6
    7
    8
    def sync_function(event_emitter: EventEmitter):
        # Emitting events from sync functions
        event_emitter.emit("MyEvent")
    
    
    async def async_function(event_emitter: EventEmitter):
        # Emitting events from async functions
        event_emitter.emit("MyEvent")
    

    Considerations on the processing of event emissions...

      It's important to note that, while Pyventus provides a unified sync/async API, the processing of each event emission will depend on the concrete implementation of the ProcessingService used in the event emitter. For example, an event emitter configured with the AsyncIOProcessingService will leverage the AsyncIO framework to handle the execution of the event emission, whereas other implementations may structure their propagation differently.

  • Runtime Flexibility ─ At its core, Pyventus utilizes a modular event emitter design that, along with the EventLinker, allows you to change the event emitter at runtime without needing to reconfigure all subscriptions or apply complex logic.

    from concurrent.futures import ThreadPoolExecutor
    
    from pyventus.events import AsyncIOEventEmitter, EventEmitter, EventLinker, ExecutorEventEmitter
    
    
    @EventLinker.on("Event")
    def handle_event(msg: str):
        print(msg)
    
    
    def main(event_emitter: EventEmitter) -> None:
        event_emitter.emit("Event", msg=f"{event_emitter}")
    
    
    if __name__ == "__main__":
        executor = ThreadPoolExecutor()
        main(event_emitter=AsyncIOEventEmitter())
        main(event_emitter=ExecutorEventEmitter(executor))
        executor.shutdown()
    

Reactive Programming: Key Highlights of Pyventus

  In addition to the standard functionalities of reactive programming, Pyventus also provides some unique aspects that set it apart from other implementations. In this section, we will explore some of these key features and how to use them effectively.

  • Python Callables as Observable Tasks ─ Whether you are working with generators or regular functions, Pyventus allows you to easily convert any Python callable into an observable task. These tasks are specialized observables that encapsulate a unit of work and provide a mechanism for streaming their results to a series of subscribers.

    1
    2
    3
    4
    5
    6
    7
    @as_observable_task
    def compute_square(n):
        return n * n
    
    obs = compute_square(2)
    obs.subscribe(print)
    obs()
    

    You can also work with async functions...

    1
    2
    3
    4
    5
    6
    7
    8
    @as_observable_task
    async def fetch_data():
        await asyncio.sleep(1)
        return {"data": "Sample Data"}
    
    obs = fetch_data()
    obs.subscribe(print)
    obs()
    
    @as_observable_task
    def simple_counter(stop: int):
        for count in range(1, stop + 1):
            yield count
        raise Completed
    
    
    obs = simple_counter(stop=16)
    obs.subscribe(print)
    obs()
    

    You can also work with async generators...

    @as_observable_task
    async def async_counter(stop: int):
        for count in range(1, stop + 1):
            await asyncio.sleep(0.25)
            yield count
        raise Completed
    
    obs = async_counter(stop=16)
    obs.subscribe(print)
    obs()
    
  • Multicast Support ─ Observables in Pyventus are designed from the ground up to efficiently support both unicast and multicast scenarios. So, it doesn't matter if you need to work with either single or multiple subscribers; Pyventus allows you to utilize these notification models and even optimizes the processing of each to ensure optimal performance.

    @as_observable_task
    def simple_counter(stop: int):
        for count in range(1, stop + 1):
            yield count
        raise Completed
    
    
    obs = simple_counter(stop=16)
    obs.subscribe(next_callback=lambda val: print(f"Subscriber 1 - Received: {val}"))
    obs.subscribe(next_callback=lambda val: print(f"Subscriber 2 - Received: {val}"))
    obs.subscribe(next_callback=lambda val: print(f"Subscriber 3 - Received: {val}"))
    obs()
    
  • Success and Error Handling ─ With Pyventus, you can customize how data streams are handled upon completion, whether they succeed or encounter errors. This customization is achieved through the configuration of the complete and error callbacks in the observer definition, which is done during the subscription process.

    @as_observable_task
    async def interactive_counter():
        stop: int = int(input("Please enter a number to count up to: "))  # Can raise ValueError
        for count in range(1, stop + 1):
            yield count
        raise Completed
    
    
    obs = interactive_counter()
    obs.subscribe(
        next_callback=lambda val: print(f"Received: {val}"),
        error_callback=lambda err: print(f"Error: {err}"),
        complete_callback=lambda: print("All done!"),
    )
    obs()
    
  • Declarative Subscription Model ─ Alongside standard subscription models, such as using lambda functions or predefined callbacks, Pyventus also provides a declarative subscription model that allows you to not only define the observer's callbacks inline and in a step-by-step manner but also to do so right before the subscription takes place.

    with obs.subscribe() as subctx:
    
        @subctx.on_next
        def next(value: int) -> None:
            print(f"Received: {value}")
    
        @subctx.on_error
        def error(error: Exception) -> None:
            print(f"Error: {error}")
    
        @subctx.on_complete
        def complete() -> None:
            print("All done!")
    

    You can also use the subscribe() method as a decorator...

      The subscribe() method, besides being used as a regular function and a context manager, can also be utilized as a decorator. When used this way, it creates and subscribes an observer, using the decorated function as its next callback.

    1
    2
    3
    @obs.subscribe()
    def next(value: int) -> None:
        print(f"Received: {value}")
    
  • Simplified Execution for Observable Tasks ─ Having to explicitly call each observable task to initiate their execution can be tedious and easily overlooked, especially when working with multiple observables at the same time. However, by using observable tasks within a with statement block, you can avoid this manual work and enable what is known as their execution context, which will allow you to work with them as usual while ensuring that they are called upon exiting the context block.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @as_observable_task
    def simple_counter(stop: int):
        for count in range(1, stop + 1):
            yield count
        raise Completed
    
    with simple_counter(stop=16) as obs:
        obs.subscribe(lambda val: print(f"Subscriber 1 - Received: {val}"))
        obs.subscribe(lambda val: print(f"Subscriber 2 - Received: {val}"))
    
  • Thread Offloading for Observable Tasks ─ By default, the processing of each observable task is handled by the AsyncIO framework, either synchronously or asynchronously depending on the context. However, for multithreaded environments, Pyventus also provides support for running these observable tasks in separate threads.

    from concurrent.futures import ThreadPoolExecutor
    from pyventus.reactive import as_observable_task, Completed
    
    @as_observable_task
    def simple_counter(stop: int):
        for count in range(1, stop + 1):
            yield count
        raise Completed
    
    if __name__ == "__main__":
    
        with ThreadPoolExecutor() as executor:
    
            obs1 = simple_counter(16)
            obs1.subscribe(print)
            obs1(executor)
    
            obs2 = simple_counter(16)
            obs2.subscribe(print)
            obs2(executor)
    

    Thread offloading is also available for the execution context of observable tasks...

    from concurrent.futures import ThreadPoolExecutor
    from pyventus.reactive import as_observable_task, Completed
    
    @as_observable_task
    def simple_counter(stop: int):
        for count in range(1, stop + 1):
            yield count
        raise Completed
    
    if __name__ == "__main__":
    
        with ThreadPoolExecutor() as executor:
    
            with simple_counter(16).to_thread(executor) as obs1:
                obs1.subscribe(print)
    
            with simple_counter(16).to_thread(executor) as obs2:
                obs2.subscribe(print)
    

Additional Highlights

  Beyond the core functionalities of event-driven and reactive programming, Pyventus also includes some additional features that are worth noting. In this section, we will explore these aspects and how to use them effectively.

  • Debugging Utilities ─ Debugging plays a crucial role in the development of asynchronous and event-driven applications, as it allows you to understand what’s going on under the hood and provides valuable insights when troubleshooting errors. For this reason, Pyventus offers a clear string representation of each component, along with a debug mode flag that lets you view the package's logs for a better understanding of the processes at work.

  • Efficient Import Management ─ Pyventus encapsulates each paradigm into its own isolated package, so that you can not only have a clear boundary between event-driven and reactive programming features, but also apply Python import optimizations based on the required paradigm. For example, if you are only working with the events module of Pyventus and never import the reactive package, Python does not load it.

License

  Pyventus is distributed as open-source software and is released under the MIT License. For a detailed view of the license, please refer to the LICENSE file located in the Pyventus repository.