Skip to content

Getting Started

Duron is a Python library that makes async workflows replayable. You can pause, resume, or rerun async functions without redoing completed steps. This guide will walk you through the core concepts and get you building your first durable workflow.

Installation

Duron requires Python 3.10 or higher.

Install via pip:

pip install duron

Or if you're using uv:

uv add duron

Core Concepts

Duron introduces two fundamental building blocks for creating replayable workflows:

1. Durable Functions (@duron.durable)

Durable functions are the orchestrators of your workflow. They define the control flow and coordinate multiple operations. Key characteristics:

  • Always take Context as the first parameter - This is your handle to run effects and create streams/signals
  • Deterministic - The same inputs always produce the same execution path
  • Replayable - When resumed, Duron replays logged results to restore state without re-executing completed steps
  • No side effects - All I/O must go through effects
@duron.durable
async def my_workflow(ctx: duron.Context, arg: str) -> str:
    # Orchestration logic here
    result = await ctx.run(some_effect, arg)
    return result

2. Effect Functions (@duron.effect)

Effects wrap any code that interacts with the outside world. This includes:

  • API calls
  • Database queries
  • File I/O
  • Random number generation
  • Any non-deterministic operation

Duron records each effect's return value so it runs once per unique input, even across restarts.

@duron.effect
async def fetch_data(url: str) -> dict:
    # This will only execute once per unique URL
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()

Your First Workflow

Let's build a simple greeting workflow that demonstrates the core concepts:

import asyncio
import random
from pathlib import Path

import duron
from duron.contrib.storage import FileLogStorage


@duron.effect
async def work(name: str) -> str:
    print("⚡ Preparing to greet...")
    await asyncio.sleep(2)  # Simulate I/O
    print("⚡ Greeting...")
    return f"Hello, {name}!"


@duron.effect
async def generate_lucky_number() -> int:
    print("⚡ Generating lucky number...")
    await asyncio.sleep(1)  # Simulate I/O
    return random.randint(1, 100)


@duron.durable
async def greeting_flow(ctx: duron.Context, name: str) -> str:
    # Run both effects in parallel
    message, lucky_number = await asyncio.gather(
        ctx.run(work, name),
        ctx.run(generate_lucky_number)
    )
    return f"{message} Your lucky number is {lucky_number}."


async def main():
    # Create a file-based log storage
    storage = FileLogStorage(Path("log.jsonl"))

    # Invoke the workflow
    async with duron.invoke(greeting_flow, storage) as job:
        await job.start("Alice")
        result = await job.wait()

    print(result)


if __name__ == "__main__":
    asyncio.run(main())

Save this as hello.py and run it:

python hello.py

You'll see output like:

⚡ Preparing to greet...
⚡ Generating lucky number...
⚡ Greeting...
Hello, Alice! Your lucky number is 42.

Understanding Replay

The magic of Duron is in its replay behavior. Run the same script again:

python hello.py

Notice: No "⚡" output! Duron replayed the results from log.jsonl without re-executing the effects. The workflow completes instantly, but produces the exact same result.

This is powerful for:

  • Crash recovery - If your process crashes mid-workflow, resume from the last checkpoint
  • Development - Test workflow logic without hitting external services repeatedly
  • Debugging - Reproduce exact execution paths
  • Cost savings - Don't re-run expensive API calls

Forcing a Fresh Run

To start fresh, delete the log file:

rm log.jsonl
python hello.py

Now you'll see the effects execute again (and potentially get a different lucky number).

Storage Backends

Duron is storage-agnostic. It ships with two built-in options:

from pathlib import Path
from duron.contrib.storage import FileLogStorage

storage = FileLogStorage(Path("logs/workflow.jsonl"))

Stores logs as JSON Lines in a file. Great for:

  • Local development
  • Single-machine workflows
  • Debugging (logs are human-readable)

Memory Storage (Testing Only)

from duron.contrib.storage import MemoryLogStorage

storage = MemoryLogStorage()

Stores logs in memory. Use for:

  • Unit tests
  • Temporary workflows
  • Benchmarking

Note: Memory storage is lost when the process exits.

Custom Storage

Implement the LogStorage protocol for your own backend:

from duron.log import LogStorage

class MyStorage(LogStorage):
    async def read_log(self, lease_id: str) -> list[Entry]:
        # Read from your storage (database, S3, etc.)
        ...

    async def append_log(self, lease_id: str, entries: list[Entry]) -> None:
        # Append to your storage
        ...

    # ... implement other methods

Advanced Features

Streams

Streams allow workflows to produce and consume values over time. Perfect for:

  • Multi-step agent interactions
  • Progress reporting
  • Event-driven workflows
from duron import Provided, Stream, StreamWriter

@duron.durable
async def producer(
    ctx: duron.Context,
    output: StreamWriter[str] = Provided
) -> None:
    for i in range(5):
        await output.send(f"Message {i}")
        await asyncio.sleep(1)

async def main():
    async with duron.invoke(producer, storage) as job:
        stream: Stream[str] = job.open_stream("output", "r")

        await job.start()

        async with stream as s:
            async for message in s:
                print(f"Received: {message}")

        await job.wait()

Signals

Signals enable external interruption of long-running operations:

from duron import Signal, SignalInterrupt, Provided

@duron.durable
async def interruptible_task(
    ctx: duron.Context,
    signal: Signal[None] = Provided
) -> str:
    try:
        async with signal:
            await ctx.run(long_running_effect)
            return "Completed"
    except SignalInterrupt:
        return "Interrupted by user"

async def main():
    async with duron.invoke(interruptible_task, storage) as job:
        signal_writer = job.open_stream("signal", "w")

        await job.start()

        # Later... send interrupt signal
        await signal_writer.send(None)

        result = await job.wait()
        print(result)  # "Interrupted by user"

Tracing

Enable tracing to understand workflow execution:

from duron.tracing import Tracer, setup_tracing

async def main():
    setup_tracing()  # Configure logging

    async with duron.invoke(
        greeting_flow,
        storage,
        tracer=Tracer("session-123")
    ) as job:
        await job.start("Alice")
        result = await job.wait()

Traces are logged to your storage backend for analysis. Upload the jsonl to Trace UI for visualization.