crv.io.dataset
Experimental API
crv.io.dataset
Dataset facade for crv.io.
Provides a convenient object to operate on a specific run (run_id) with append/scan/read/manifest/rebuild_manifest helpers. The IO layer is Polars/Arrow‑first and treats crv.core as the single source of truth for canonical table names, descriptors (columns/dtypes/required/nullable/partitioning), and schema versioning.
Source of truth - Table names: crv.core.grammar.TableName - Descriptors: crv.core.tables (partitioning=[ "bucket" ], version=SCHEMA_V) - IDs: crv.core.ids (RunId) - Schema models/combination rules: crv.core.schema - Versioning metadata: crv.core.versioning.SCHEMA_V
Import DAG discipline: - Depends only on stdlib, polars/pyarrow, and crv.core.* (via read/write/manifest modules). - Must not import higher layers (world, mind, lab, viz, app).
crv.io.dataset.Dataset
Facade bound to a specific IoSettings and run_id.
Notes
- run_id should be produced by crv.core.ids.make_run_id (type: RunId). It is stored as a string on disk; the NewType is used for clarity and static checks.
- All schema/dtype decisions are delegated to crv.core.tables; this class does not redefine schema contracts.
- Writes are append‑only and atomic (tmp → ready rename) per part; manifests are updated per append and can be rebuilt from the filesystem if needed.
Source code in src/crv/io/dataset.py
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | |
crv.io.dataset.Dataset.append
append(
table: crv.core.grammar.TableName | str,
df: polars.DataFrame,
*,
validate_schema: bool = True,
validate_rows: bool = False
) -> dict[str, Any]
Append a Polars DataFrame to a canonical table with atomic semantics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
crv.core.grammar.TableName | str
|
Canonical table name (enum or lower_snake string). |
required |
df
|
polars.DataFrame
|
Frame to append. Must include tick; bucket is computed as tick // IoSettings.tick_bucket_size. |
required |
validate_schema
|
bool
|
Validate frame against crv.core.tables descriptor. |
True
|
validate_rows
|
bool
|
Reserved for future row‑level validation (unused). |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, typing.Any]
|
dict[str, Any]: Summary including: - table (str) - run_id (str) - parts (list[dict]): Per‑part {"bucket_id","path","rows","bytes","tick_min","tick_max"} - rows (int): Total rows written - buckets (list[int]): Buckets touched |
Raises:
| Type | Description |
|---|---|
crv.io.errors.IoSchemaError
|
Schema validation failed vs core descriptor. |
crv.io.errors.IoWriteError
|
Parquet write/rename failed. |
crv.io.errors.IoManifestError
|
Manifest write failed. |
Notes
- Parquet parts embed crv.core.versioning.SCHEMA_V and table/run_id metadata.
- Single‑writer semantics initially; no inter‑process locking.
Source code in src/crv/io/dataset.py
crv.io.dataset.Dataset.scan
scan(
table: crv.core.grammar.TableName | str, where: dict[str, typing.Any] | None = None
) -> pl.LazyFrame
Return a Polars LazyFrame pruned by manifest when available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
crv.core.grammar.TableName | str
|
Canonical table name. |
required |
where
|
dict | None
|
Optional tick range filter: {"tick_min": int | None, "tick_max": int | None}. |
None
|
Returns:
| Type | Description |
|---|---|
polars.LazyFrame
|
pl.LazyFrame: A lazy scan over selected parts. If a manifest is present, |
polars.LazyFrame
|
partition (bucket) pruning is applied. Row‑level tick filters are applied |
polars.LazyFrame
|
explicitly to ensure correctness even after pruning. |
Source code in src/crv/io/dataset.py
crv.io.dataset.Dataset.read
read(
table: crv.core.grammar.TableName | str,
where: dict[str, typing.Any] | None = None,
limit: int | None = None,
) -> pl.DataFrame
Collect a DataFrame from scan(), optionally applying a pre‑collect row cap.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
crv.core.grammar.TableName | str
|
Canonical table name. |
required |
where
|
dict | None
|
Optional {"tick_min","tick_max"} filter. |
None
|
limit
|
int | None
|
Optional row cap applied before collect(). |
None
|
Returns:
| Type | Description |
|---|---|
polars.DataFrame
|
pl.DataFrame: Materialized frame (possibly empty). |
Source code in src/crv/io/dataset.py
crv.io.dataset.Dataset.manifest
Load the per‑table manifest if present.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
crv.core.grammar.TableName | str
|
Canonical table name. |
required |
Returns:
| Type | Description |
|---|---|
crv.io.manifest.TableManifest | None
|
TableManifest | None: The manifest model if found; otherwise None. |
Source code in src/crv/io/dataset.py
crv.io.dataset.Dataset.rebuild_manifest
Rebuild the manifest by scanning parquet files under the table directory, recomputing per‑part stats (rows, bytes, tick_min, tick_max), and writing the manifest atomically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
crv.core.grammar.TableName | str
|
Canonical table name. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
TableManifest |
crv.io.manifest.TableManifest
|
Newly built manifest. |
Notes
This is a slower, best‑effort recovery/validation path (lazy Polars scans).