Skip to content

Python API Guide

The yoda package exposes Yoda's HTAP engine to Python via PyO3 bindings with a fully async API compatible with both asyncio and anyio.

Installation

From PyPI

sh
pip install yoda
# or with uv
uv add yoda

From source

Build and install an editable (development) wheel using maturin:

sh
git clone https://github.com/ValerioL29/Yoda.git
cd Yoda
uv run maturin develop          # debug build
uv run maturin develop --release  # optimised build

Prerequisites

Building from source requires Rust (stable toolchain), a C compiler, and uv. The DataFusion backend is enabled by default. The DuckDB backend requires the duckdb-backend Cargo feature and a C++ compiler.

Quick-start example

python
import anyio
import pyarrow as pa
import yoda

async def main():
    # Open engine via the async factory (recommended)
    async with await yoda.open(oltp_path="app.db") as engine:
        # 1. Create the table in OLTP (SQLite)
        await engine.execute(
            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)"
        )

        # 2. Register the table for CDC replication to OLAP
        schema = yoda.TableSchema(
            "users",
            [("id", "int64"), ("name", "utf8"), ("age", "int32")],
            ["id"],
        )
        await engine.register_table(schema)

        # 3. Write rows (routed to OLTP / SQLite)
        await engine.execute("INSERT INTO users VALUES (1, 'Alice', 30)")
        await engine.execute("INSERT INTO users VALUES (2, 'Bob', 25)")

        # 4. Sync CDC events to OLAP
        result = await engine.sync_now()
        print(result)  # SyncResult(events=2, inserted=2, updated=0, deleted=0, pruned=None)

        # 5. Run an analytical query (routed to OLAP / DataFusion)
        batches = await engine.query("SELECT AVG(age) as avg_age FROM users")

        # 6. Convert to pandas via PyArrow
        table = pa.Table.from_batches(batches)
        df = table.to_pandas()
        print(df)

anyio.run(main)

Engine shutdown

When using async with await yoda.open(...) as engine:, shutdown() is called automatically on context-manager exit. If you instantiate HtapEngine directly, call engine.shutdown() yourself when done.


yoda.open()

The recommended entry point for async code. Creates and returns an HtapEngine without blocking the event loop.

python
async def open(
    oltp_path: str = "yoda.db",
    *,
    sync_mode: str = "destructive",
    olap_backend: str = "datafusion",
    storage_mode: str = "inmemory",
    storage_path: str | None = None,
    prune_after_sync: bool = True,
    sync_batch_size: int = 1000,
    read_pool_size: int = 4,
    sidecar_source: str | None = None,
    sidecar_tables: list[TimestampTableConfig] | None = None,
    sidecar_poll_batch_size: int = 500,
    sidecar_delete_detection: str = "disabled",
    sidecar_enable_oltp: bool = False,
) -> HtapEngine

All parameters are the same as HtapConfig (documented below). Returns an HtapEngine that supports async with.

python
# Pattern: open, use, auto-close
async with await yoda.open(oltp_path="my.db") as engine:
    ...

HtapConfig

Configuration value object. Construct once and pass to HtapEngine().

python
class HtapConfig:
    def __init__(
        self,
        oltp_path: str = "yoda.db",
        prune_after_sync: bool = True,
        sync_batch_size: int = 1000,
        read_pool_size: int = 4,
        sync_mode: str = "destructive",
        olap_backend: str = "datafusion",
        storage_mode: str = "inmemory",
        storage_path: str | None = None,
        sidecar_source: str | None = None,
        sidecar_tables: list[TimestampTableConfig] | None = None,
        sidecar_poll_batch_size: int = 500,
        sidecar_delete_detection: str = "disabled",
        sidecar_enable_oltp: bool = False,
    ) -> None

Parameters

ParameterTypeDefaultDescription
oltp_pathstr"yoda.db"Path to the SQLite database file used for OLTP writes.
prune_after_syncboolTrueDelete CDC log entries after each successful sync cycle.
sync_batch_sizeint1000Maximum CDC events processed per sync_now() call.
read_pool_sizeint4Number of concurrent OLTP read connections (round-robin pool).
sync_modestr"destructive""destructive" — mirror semantics (UPDATE overwrites, DELETE removes). "temporal" — SCD Type 2 append-only history.
olap_backendstr"datafusion""datafusion" (default, always available) or "duckdb" (requires duckdb-backend feature).
storage_modestr"inmemory"DataFusion storage: "inmemory", "arrow_ipc", or "parquet". "arrow_ipc" and "parquet" require storage_path.
storage_pathstr | NoneNoneDirectory for file-backed DataFusion storage (Arrow IPC or Parquet). Also used as the DuckDB file path when olap_backend="duckdb".
sidecar_sourcestr | NoneNoneConnection string / path for the external DB to follow in sidecar mode. Setting this activates sidecar CDC.
sidecar_tableslist[TimestampTableConfig] | NoneNonePer-table sidecar configuration. Required when sidecar_source is set.
sidecar_poll_batch_sizeint500Rows fetched from the external DB per poll cycle.
sidecar_delete_detectionstr"disabled""disabled" or "soft_delete:<column>", e.g. "soft_delete:deleted_at".
sidecar_enable_oltpboolFalseAlso create a local OLTP (SQLite) alongside the sidecar source. Rarely needed.

Eager validation

HtapConfig.__init__ validates sync_mode, olap_backend, storage_mode, and sidecar_delete_detection immediately and raises ValueError if an unknown value is passed. No silent defaults.


HtapEngine

The main engine class. All I/O methods are async and must be awaited.

python
class HtapEngine:
    def __init__(self, config: HtapConfig | None = None) -> None
    @classmethod
    async def create(cls, config: HtapConfig | None = None) -> HtapEngine
    async def __aenter__(self) -> HtapEngine
    async def __aexit__(self, *exc: object) -> None

Constructor vs. factory

FormBlocks event loop?Recommended?
HtapEngine(config)Yes (blocking init)Only in sync startup code or tests
await HtapEngine.create(config)No (uses anyio.to_thread.run_sync)Yes, from async code
await yoda.open(...)NoYes, preferred one-liner

Write operations

execute

python
async def execute(self, sql: str, params: list | None = None) -> int

Execute a single DML or DDL statement. Returns the number of rows affected.

SQL is routed by SqlParserRouter: writes (INSERT / UPDATE / DELETE) and DDL go to OLTP (SQLite). Analytical queries with aggregates, GROUP BY, JOINs, or window functions go to OLAP.

params are typed: bool, int, float, None, and str are preserved. Wide Python ints outside the signed 64-bit range are serialised as strings.

python
rows = await engine.execute(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    [1, "Alice", 30],
)

Error handling

All engine methods raise RuntimeError on Rust-side failures (e.g. constraint violations, SQL parse errors). Wrap calls in try/except RuntimeError where needed.

execute_batch

python
async def execute_batch(self, statements: list[str]) -> None

Execute a list of SQL strings in a single OLTP transaction. Avoids per-statement fsync overhead and is substantially faster for bulk loads (~18 K ops/sec vs ~400 ops/sec for individual execute() calls with CDC triggers active).

python
await engine.execute_batch([
    "INSERT INTO users VALUES (1, 'Alice', 30)",
    "INSERT INTO users VALUES (2, 'Bob', 25)",
    "INSERT INTO users VALUES (3, 'Charlie', 35)",
])

Query operations

query

python
async def query(self, sql: str) -> list[pyarrow.RecordBatch]

Execute a SQL query. The router automatically sends analytical queries (aggregates, GROUP BY, window functions, CTEs, JOINs, set operations, subqueries) to OLAP, and simple point queries (SELECT … WHERE pk =) to OLTP.

Returns a list[pyarrow.RecordBatch] via the Arrow PyArrow FFI bridge. The conversion is zero-copy on the Arrow side.

No bound parameters

query does not accept a params argument — only execute does. To filter analytical queries by user input, build the SQL with explicit literal escaping or, preferably, run the filter through execute (which routes to OLTP and supports bound parameters) before aggregating. Treat untrusted input that must reach query as unsafe and validate it at the boundary.

python
batches = await engine.query("SELECT AVG(age) as avg_age FROM users")
table = pa.Table.from_batches(batches)

query_olap

python
async def query_olap(self, sql: str) -> list[pyarrow.RecordBatch]

Force a query directly to the OLAP backend, bypassing the router. Useful for analytical queries that the router might mis-classify, or when you know the data is only in OLAP.

Sync operations

sync_now

python
async def sync_now(self) -> SyncResult

Drain the pending CDC event log and apply all events to OLAP. In sidecar mode this polls the external database. Returns a SyncResult describing what happened.

sync_status

python
async def sync_status(self) -> SyncStatus

Return the current sync engine status without triggering a sync cycle.

initial_sync

python
async def initial_sync(self, table_name: str) -> int

Bulk-load all existing OLTP rows for a table into OLAP. Use this after calling register_table() on a table that already has data. Returns the number of rows loaded.

python
# Register a pre-populated table
await engine.register_table(schema)
rows = await engine.initial_sync("orders")
print(f"Loaded {rows} existing rows")

Schema operations

register_table

python
async def register_table(self, schema: TableSchema) -> None

Register a table for CDC-based replication. Installs triggers on the OLTP side and creates the corresponding table in OLAP. Must be called before any CDC events for the table are consumed.

Raises RuntimeError if the table is already registered.

add_column

python
async def add_column(self, table_name: str, column_name: str, data_type: str) -> None

Add a column to a registered table. Propagates the schema change to the OLAP table and rebuilds the CDC triggers. data_type uses the same type strings as TableSchema (see below).

python
await engine.execute("ALTER TABLE users ADD COLUMN email TEXT")
await engine.add_column("users", "email", "utf8")

drop_column

python
async def drop_column(self, table_name: str, column_name: str) -> None

Drop a column from a registered table. Tears down CDC triggers (SQLite rejects DROP COLUMN while triggers reference the column), alters the OLTP schema, and updates OLAP.

Raises RuntimeError if you attempt to drop a primary key column.

Lifecycle

shutdown

python
def shutdown(self) -> None

Release engine resources. Currently a no-op — resources are freed when the Python object is garbage-collected. Safe to call explicitly in cleanup code. Called automatically by __aexit__ when using async with.


TableSchema

Describes the schema of a table to be registered for HTAP replication.

python
class TableSchema:
    def __init__(
        self,
        name: str,
        columns: list[tuple[str, str]],
        pk: list[str],
    ) -> None
ParameterTypeDescription
namestrTable name (must match the actual SQL table name).
columnslist[tuple[str, str]]Ordered list of (column_name, type_string) pairs.
pklist[str]List of primary key column names. PK columns are non-nullable.

Supported type strings

String(s)Arrow type
"int8"Int8
"int16"Int16
"int32", "int"Int32
"int64", "bigint"Int64
"uint8"UInt8
"uint16"UInt16
"uint32"UInt32
"uint64"UInt64
"float16", "half"Float16
"float32", "float"Float32
"float64", "double"Float64
"utf8", "string", "text"Utf8
"bool", "boolean"Boolean
"binary", "bytes"Binary

Type strings are case-insensitive. An unknown type raises ValueError immediately at construction time.

Date / Timestamp / Decimal / List / Struct

These Arrow types are not available in TableSchema. Columns with those SQLite affinities fall back to the SQL path inside the sync engine and are not accelerated by the Arrow bulk-INSERT path. Use "int64" for Unix epoch timestamps.

python
schema = yoda.TableSchema(
    "orders",
    [
        ("order_id", "int64"),
        ("customer_id", "int64"),
        ("total", "float64"),
        ("status", "utf8"),
    ],
    ["order_id"],
)

SyncResult

Returned by engine.sync_now(). Immutable (frozen).

python
class SyncResult:
    events_processed: int   # CDC events consumed this cycle
    rows_inserted: int      # rows INSERTed into OLAP
    rows_updated: int       # rows UPDATEd in OLAP
    rows_deleted: int       # rows DELETEd from OLAP
    pruned_count: int | None  # CDC log entries pruned (None if pruning disabled)
python
result = await engine.sync_now()
print(f"Processed {result.events_processed} events: "
      f"+{result.rows_inserted} ~{result.rows_updated} -{result.rows_deleted}")

SyncStatus

Returned by engine.sync_status(). Immutable (frozen).

python
class SyncStatus:
    lag: int                    # number of unsynced CDC events
    last_synced_seq: int | None  # sequence number of last synced event (None before first sync)

A lag of 0 means OLAP is fully up-to-date. last_synced_seq is None before the first successful sync cycle.


TimestampTableConfig

Per-table configuration for sidecar mode. Only used when sidecar_source is set in HtapConfig.

python
class TimestampTableConfig:
    def __init__(
        self,
        table_name: str,
        primary_key: list[str],
        created_at_column: str = "created_at",
        updated_at_column: str = "updated_at",
        columns: list[str] = [],
    ) -> None
ParameterTypeDefaultDescription
table_namestrTable name in the external source database.
primary_keylist[str]Primary key columns (supports composite PKs).
created_at_columnstr"created_at"Column used to detect new inserts (compared to updated_at).
updated_at_columnstr"updated_at"Watermark column: polls rows where updated_at > last_watermark.
columnslist[str][]Columns to SELECT (empty list means SELECT *).

See Sidecar Mode → INSERT vs UPDATE Heuristic for how rows from the source DB are classified as inserts vs updates.

python
config = yoda.HtapConfig(
    oltp_path="/tmp/local.db",
    sidecar_source="/data/production.db",
    sidecar_tables=[
        yoda.TimestampTableConfig(
            table_name="orders",
            primary_key=["order_id"],
            columns=["order_id", "customer_id", "total", "created_at", "updated_at"],
        ),
    ],
    sidecar_enable_oltp=False,
)

For more details on sidecar mode, see Sidecar Mode.


End-to-end example with Pandas

python
import anyio
import pyarrow as pa
import yoda


async def analytics_report():
    async with await yoda.open(oltp_path="sales.db") as engine:
        # Setup
        await engine.execute(
            "CREATE TABLE sales "
            "(id INTEGER PRIMARY KEY, product TEXT, amount REAL, region TEXT)"
        )
        await engine.register_table(
            yoda.TableSchema(
                "sales",
                [
                    ("id", "int64"),
                    ("product", "utf8"),
                    ("amount", "float64"),
                    ("region", "utf8"),
                ],
                ["id"],
            )
        )

        # Insert rows in one transaction
        await engine.execute_batch([
            "INSERT INTO sales VALUES (1, 'Widget', 29.99, 'EMEA')",
            "INSERT INTO sales VALUES (2, 'Gadget', 49.99, 'APAC')",
            "INSERT INTO sales VALUES (3, 'Widget', 29.99, 'APAC')",
            "INSERT INTO sales VALUES (4, 'Gadget', 49.99, 'EMEA')",
        ])

        # Sync to OLAP
        result = await engine.sync_now()
        print(result)

        # Analytical query -> list[pyarrow.RecordBatch]
        batches = await engine.query(
            "SELECT region, SUM(amount) AS total "
            "FROM sales GROUP BY region ORDER BY total DESC"
        )

        # Convert to pandas
        df = pa.Table.from_batches(batches).to_pandas()
        print(df)
        # region   total
        # EMEA     79.98
        # APAC     79.98


anyio.run(analytics_report)

Error handling

All engine methods raise RuntimeError for Rust-side errors. HtapConfig and TableSchema constructors raise ValueError for bad input (unknown type strings, invalid parameter values).

python
try:
    await engine.execute("INSERT INTO users VALUES (1, 'Alice', 30)")
except RuntimeError as e:
    print(f"Engine error: {e}")

try:
    schema = yoda.TableSchema("t", [("id", "nonexistent_type")], ["id"])
except ValueError as e:
    print(f"Config error: {e}")  # ValueError: unknown type: nonexistent_type

Released under the Apache-2.0 License.