Skip to content

Async Patterns

Yoda's Python bindings are built on pyo3-async-runtimes with a multi-threaded Tokio runtime that is created once at module import time. Every HtapEngine method returns a Python awaitable, so any async framework that drives an event loop can consume the API.

anyio backend constraint

The test suite declares anyio_backend = "asyncio" because pyo3-async-runtimes requires the asyncio event loop. anyio.run() defaults to asyncio, so in practice both asyncio.run() and anyio.run() work. The trio backend is not supported.


asyncio

Use asyncio.run() for a top-level entry point:

python
import asyncio
import yoda


async def main():
    async with await yoda.open(oltp_path="app.db") as engine:
        await engine.execute(
            "CREATE TABLE events (id INTEGER PRIMARY KEY, msg TEXT)"
        )
        await engine.register_table(
            yoda.TableSchema("events", [("id", "int64"), ("msg", "utf8")], ["id"])
        )
        await engine.execute("INSERT INTO events VALUES (1, 'hello')")
        result = await engine.sync_now()
        print(result)

        batches = await engine.query("SELECT COUNT(*) as n FROM events")
        print(batches[0].to_pydict())


asyncio.run(main())

Inside an existing event loop (e.g. a Jupyter notebook or a framework that manages the loop for you), just await directly:

python
engine = await yoda.HtapEngine.create()
batches = await engine.query("SELECT 1 AS x")

anyio

anyio.run() uses asyncio by default, which is compatible with the bindings:

python
import anyio
import yoda


async def main():
    async with await yoda.open(oltp_path="app.db") as engine:
        await engine.execute(
            "CREATE TABLE metrics (id INTEGER PRIMARY KEY, value REAL)"
        )
        await engine.register_table(
            yoda.TableSchema(
                "metrics",
                [("id", "int64"), ("value", "float64")],
                ["id"],
            )
        )
        await engine.execute("INSERT INTO metrics VALUES (1, 3.14)")
        await engine.sync_now()
        batches = await engine.query("SELECT AVG(value) AS avg FROM metrics")
        print(batches[0].to_pydict())


anyio.run(main)

anyio task groups

Use an anyio.create_task_group() to fan out concurrent work:

python
import anyio
import yoda


async def write_batch(engine, start: int, n: int):
    stmts = [f"INSERT INTO t VALUES ({start + i})" for i in range(n)]
    await engine.execute_batch(stmts)


async def main():
    async with await yoda.open(oltp_path="fanout.db") as engine:
        await engine.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)")
        await engine.register_table(
            yoda.TableSchema("t", [("id", "int64")], ["id"])
        )

        # Write three batches concurrently from different coroutines
        async with anyio.create_task_group() as tg:
            tg.start_soon(write_batch, engine, 0, 100)
            tg.start_soon(write_batch, engine, 100, 100)
            tg.start_soon(write_batch, engine, 200, 100)

        result = await engine.sync_now()
        print(result)  # rows_inserted ≥ 300


anyio.run(main)

Concurrent queries

Multiple query() or query_olap() coroutines can run concurrently. On the OLAP side (DataFusion), each query gets its own execution context. On the OLTP side, the engine maintains a round-robin read pool (default: 4 connections) so concurrent point queries do not block each other.

asyncio.gather

python
import asyncio
import yoda


async def main():
    async with await yoda.open(oltp_path="app.db") as engine:
        # ... setup omitted ...

        # Fire three OLAP queries at the same time
        results = await asyncio.gather(
            engine.query("SELECT COUNT(*) FROM orders"),
            engine.query("SELECT SUM(amount) FROM orders"),
            engine.query("SELECT AVG(amount) FROM orders"),
        )
        count_batch, sum_batch, avg_batch = results
        print(count_batch[0].to_pydict())

anyio task group (equivalent)

python
import anyio
import yoda
import pyarrow as pa


async def main():
    async with await yoda.open(oltp_path="app.db") as engine:
        # ... setup omitted ...

        outputs = {}

        async def run_query(key: str, sql: str):
            outputs[key] = await engine.query(sql)

        async with anyio.create_task_group() as tg:
            tg.start_soon(run_query, "count", "SELECT COUNT(*) FROM orders")
            tg.start_soon(run_query, "total", "SELECT SUM(amount) FROM orders")

        print(outputs["count"][0].to_pydict())


anyio.run(main)

Read-pool concurrency

Each concurrent OLTP read (simple SELECT without aggregation) is served by a different connection from the round-robin pool. Increase read_pool_size in HtapConfig if you have more concurrent readers than the default of 4.


Long-running service with a background sync loop

For production services, start the engine once and run a background sync loop that periodically calls sync_now(). Use asyncio.create_task (or anyio's equivalent) to keep the loop running alongside request handling.

python
import asyncio
import signal
import yoda


async def sync_loop(engine: yoda.HtapEngine, interval: float = 0.5):
    """Sync CDC events to OLAP every `interval` seconds."""
    while True:
        try:
            result = await engine.sync_now()
            if result.events_processed:
                print(f"[sync] {result}")
        except Exception as e:
            print(f"[sync] error: {e}")
        await asyncio.sleep(interval)


async def main():
    engine = await yoda.HtapEngine.create(
        yoda.HtapConfig(oltp_path="service.db")
    )

    # Start background sync
    sync_task = asyncio.create_task(sync_loop(engine))

    # Graceful shutdown on SIGINT / SIGTERM
    loop = asyncio.get_running_loop()
    stop = loop.create_future()
    loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)

    await stop  # wait for signal

    sync_task.cancel()
    try:
        await sync_task
    except asyncio.CancelledError:
        pass

    engine.shutdown()
    print("Engine shut down cleanly.")


asyncio.run(main())

Always call shutdown

Call engine.shutdown() — a synchronous method, no await — or exit the async with block before your process ends. Skipping shutdown is safe for in-memory DataFusion, but with file-backed storage ("arrow_ipc" / "parquet") or DuckDB, unflushed buffers may be left incomplete.


Context manager

HtapEngine supports async with when obtained via yoda.open() or HtapEngine.create():

python
async with await yoda.open(oltp_path="app.db") as engine:
    await engine.execute("INSERT INTO t VALUES (1)")
    await engine.sync_now()
# engine.shutdown() called automatically here

HtapEngine(config) (the sync constructor) does not participate in async with directly when called this way, but the Python wrapper class still implements __aenter__ / __aexit__, so you can also write:

python
engine = yoda.HtapEngine(config)
async with engine:
    await engine.execute("INSERT INTO t VALUES (1)")

If you need more control (e.g. store the engine as an instance variable), use the explicit pattern:

python
class MyService:
    async def start(self):
        self.engine = await yoda.HtapEngine.create(yoda.HtapConfig(oltp_path="svc.db"))

    async def stop(self):
        self.engine.shutdown()

Web handler sketch

The following illustrates a minimal async handler that returns query results as JSON. No actual FastAPI installation is required — adapt to whichever async framework you use.

python
import json
import yoda
import pyarrow as pa

# Module-level engine — created once at startup
_engine: yoda.HtapEngine | None = None


async def startup():
    global _engine
    _engine = await yoda.HtapEngine.create(
        yoda.HtapConfig(oltp_path="app.db")
    )


async def shutdown():
    if _engine:
        _engine.shutdown()


async def handle_analytics(region: str) -> str:
    """Return JSON: total sales per product in a region."""
    assert _engine is not None
    batches = await _engine.query(
        f"SELECT product, SUM(amount) AS total "
        f"FROM sales WHERE region = '{region}' GROUP BY product"
    )
    table = pa.Table.from_batches(batches)
    return json.dumps(table.to_pydict())

SQL injection

The example above inlines region directly into SQL for brevity. In real code, use parameterised queries via the params argument of execute(), or validate/escape the input.


Common pitfalls

Do not call engine methods from a synchronous context after the event loop is running.HtapEngine methods return awaitables; calling them without await does nothing and the coroutine is immediately garbage-collected.

python
# Wrong — coroutine is created but never executed
engine.execute("INSERT ...")

# Correct
await engine.execute("INSERT ...")

Do not share an HtapEngine across processes. The engine holds open file handles and a Tokio runtime. Fork-safety is not guaranteed. Create a new engine in each child process.

Do not skip engine.shutdown() (or the async with exit) in long-running processes. shutdown() is a synchronous call (no await); the sync loop background thread keeps running until shutdown is invoked. In short scripts that exit normally this is fine; in servers it causes resource leaks.

The sync constructor blocks the event loop. HtapEngine(config) runs SQLite PRAGMA setup, WAL enable, trigger installation, and OLAP engine init synchronously. Prefer await yoda.open(...) or await HtapEngine.create(config) from async code to avoid blocking other tasks.

sync_now() is not called automatically. CDC events accumulate in the OLTP log until you call sync_now(). Either call it explicitly after writes, or run a background loop as shown above.

Released under the Apache-2.0 License.