Skip to content

crv.io

Experimental API

IO contracts and run manifests may change during alignment work. Since: 2025-09-25.

crv.io

crv.io — Canonical IO layer for CRV datasets.

Responsibilities

  • Provide a Polars/Arrow-first IO layer that materializes canonical tables defined in crv.core.tables.
  • Guarantee append-only semantics, atomic tmp→ready file renames, per-table manifests, tick-bucket partitioning, and lightweight schema validation against crv.core descriptors.
  • Keep crv.core as the single source of truth for enums, schemas, table descriptors, IDs, and versioning.

Public API

  • IoSettings — Configuration for IO behavior (defaults sourced from crv.core.constants).
  • Dataset — Facade bound to a run that supports append/scan/read/manifest operations.

Source of truth and dependencies

  • crv.core.grammar.TableName is the canonical table name enum; docstrings and type hints refer to it.
  • crv.core.tables provides TableDescriptor instances (columns/dtypes/required/nullable/partitioning=["bucket"]/version=SCHEMA_V).
  • crv.core.ids provides typed identifiers (e.g., RunId) used in annotations and docs.
  • crv.core.versioning provides SCHEMA_V embedded in Parquet key-value metadata.
  • crv.core.errors and crv.core.schema define core normalization/validation semantics; crv.io raises Io* errors for IO-layer failures.

Import DAG discipline

  • Depends only on stdlib, polars/pyarrow (and optionally fsspec later), and crv.core.*.
  • MUST NOT import higher layers: world, mind, lab, viz, or app.

Examples

import polars as pl
from crv.io import IoSettings, Dataset
from crv.core.grammar import TableName

settings = IoSettings(root_dir="out", tick_bucket_size=100)  # doctest: +SKIP
ds = Dataset(settings, run_id="20250101-000000")  # doctest: +SKIP
df = pl.DataFrame({  # doctest: +SKIP
    "tick": [0, 1, 2, 101],
    "observer_agent_id": ["A0", "A1", "A2", "A0"],
    "edge_kind": ["self_to_object"] * 4,
    "edge_weight": [0.0, 0.1, 0.2, 0.3],
})
ds.append(TableName.IDENTITY_EDGES, df)  # doctest: +SKIP

Notes

  • IO write path: tmp parquet → fsync → os.replace(tmp, final) on the same filesystem for atomicity.
  • Partitioning: bucket = tick // IoSettings.tick_bucket_size; bucket dirs are zero‑padded (e.g., bucket=000123).
  • Manifests: JSON per table tracks partitions and parts and is used for pruning in scan().

References