CRV IO
Canonical Polars/Arrow‑first IO for CRV datasets. Aligns with crv.core.tables and enforces append‑only semantics, atomic renames, per‑table manifests, tick‑bucket partitioning, and schema validation.
At a Glance
- Append‑only, atomic writes (
*.tmp→ fsync → atomic rename). - Per‑table manifests with bucket/part metadata and tick ranges.
- Manifest‑guided pruning on reads (by tick range).
- Validation against core descriptors (strict by default).
- Import DAG: stdlib, Polars/Arrow, and
crv.core.*only.
Scope & Guarantees
- Append‑only writer (single‑writer initial semantics; per‑bucket locks can be added later).
- Partitioning:
bucket = tick // tick_bucket_size, directories zero‑padded (e.g.,bucket=000123). - Manifests record per‑part and per‑bucket stats for reader pruning.
- Strict schema mode:
- No columns beyond (required ∪ nullable).
- Safe scalar casts to (
i64,f64,str). Struct/List[Struct]accepted (Phase 1: shallow checks).
Path Layout
root_dir is configurable via IoSettings (default: out).
Append Semantics
- Write to
*.parquet.tmp, fsync file & directory, thenos.replace(tmp → final)atomically.
Manifests
Each table’s manifest.json includes:
- Per‑part:
rows,bytes,tick_min,tick_max,created_at. - Per‑bucket:
row_count,byte_size, tick range.
Readers use manifest ranges to prune scans when where is provided. A rebuild path rescans Parquet files if a manifest is missing.
Validation Against Core
- Source of truth:
crv.core.tables. - Enforced in strict mode:
- Required/nullable sets and dtypes.
- Safe scalar casts (
i64,f64,str). - Shallow acceptance of
StructandList[Struct].
Public API
import polars as pl
from crv.io import IoSettings, Dataset
from crv.core.grammar import TableName
# Configure the IO layer
settings = IoSettings(root_dir="out", tick_bucket_size=100)
# Bind to a specific run
ds = Dataset(settings, run_id="20250101-000000")
# Append a DataFrame
summary = ds.append(TableName.IDENTITY_EDGES, pl.DataFrame({...}))
# Lazy scan with pruning by tick range
lf = ds.scan(TableName.IDENTITY_EDGES, where={"tick_min": 0, "tick_max": 120})
# Eager read (optional limit)
df = ds.read(TableName.IDENTITY_EDGES, where={"tick_min": 0, "tick_max": 120}, limit=None)
# Inspect or rebuild a manifest
m = ds.manifest(TableName.IDENTITY_EDGES)
m2 = ds.rebuild_manifest(TableName.IDENTITY_EDGES)
Quickstart
import polars as pl
from crv.io import IoSettings, Dataset
from crv.core.grammar import TableName
# Configure where to write (default: "out")
settings = IoSettings(root_dir="out", tick_bucket_size=100)
# Bind to a run
ds = Dataset(settings, run_id="20250101-000000")
# Prepare a minimal identity_edges frame (only required columns)
df = pl.DataFrame({
"tick": [0, 1, 2, 101],
"observer_agent_id": ["A0", "A1", "A2", "A0"],
"edge_kind": ["self_to_object"] * 4,
"edge_weight": [0.0, 0.1, 0.2, 0.3],
})
# Append atomically; returns a summary
summary = ds.append(TableName.IDENTITY_EDGES, df)
print(summary)
# Scan with pruning (by tick range)
lf = ds.scan(TableName.IDENTITY_EDGES, where={"tick_min": 0, "tick_max": 120})
print(lf.collect())
# Inspect manifest
m = ds.manifest(TableName.IDENTITY_EDGES)
print(m)
Configuration (IoSettings)
root_dir: str = "out"partitioning: Literal["tick_buckets"] = "tick_buckets"tick_bucket_size: int = 100row_group_size: int = 128 * 1024compression: Literal["zstd","lz4","snappy"] = "zstd"strict_schema: bool = Truewrite_manifest_every_n: int = 1(reserved for batching)fs_protocol: str = "file",fs_options: dict = {}(future fsspec)
Config loading (env > TOML > defaults):
# crv.toml
[io]
root_dir = "out"
tick_bucket_size = 100
compression = "zstd"
strict_schema = true
write_manifest_every_n = 1
# pyproject.toml
[tool.crv.io]
root_dir = "out"
tick_bucket_size = 100
compression = "zstd"
strict_schema = true
# Environment variables
export CRV_IO_ROOT_DIR="out"
export CRV_IO_TICK_BUCKET_SIZE="100"
export CRV_IO_ROW_GROUP_SIZE="131072"
export CRV_IO_COMPRESSION="zstd" # zstd|lz4|snappy
export CRV_IO_FS_PROTOCOL="file"
export CRV_IO_STRICT_SCHEMA="1" # 1/0/true/false/yes/no/on/off
export CRV_IO_WRITE_MANIFEST_EVERY_N="1"
Run Bundle
from crv.io.config import IoSettings
from crv.io.run_manifest import write_run_bundle_manifest
from crv.core.ids import RunId
# Resolve settings with precedence (env > TOML > defaults)
settings = IoSettings.load()
run_id = RunId("20250101-000000")
payload = write_run_bundle_manifest(settings, run_id, meta={"note": "demo"})
# Writes: <root>/runs/<run_id>/bundle.manifest.json
print(payload["io"], payload["tables"].keys())
Testing
- Path/bucket math.
- Append atomicity & manifest updates.
- Scan pruning via manifest ranges.
- Manifest rebuild from FS.
- Strict schema enforcement.
- Import DAG isolation (no world/mind/lab/viz/app imports).
Future Work
- Per‑bucket write locks for multi‑writer scenarios.
- fsspec remote storage (s3fs/gcsfs).
- Row‑level validation and deep struct validation.
- Streaming APIs (tail manifests).
Contributing
- Align with
plans/io/io_module_starter_plan.mdandplans/io/run_bundle_and_lab_integration_plan.md. - Respect the import DAG (no downstream imports).