Skip to content

crv.io.read

Experimental API

crv.io.read

Read utilities for canonical CRV tables.

Overview - scan(): Returns a Polars LazyFrame with optional manifest-based pruning. - read(): Collects a DataFrame from scan(), with an optional pre-collect row cap.

Source of truth - Canonical table names come from crv.core.grammar.TableName. - Table descriptors (columns/dtypes/required/nullable/partitioning) come from crv.core.tables. - Schema models and combination rules live in crv.core.schema; this module performs no schema normalization. - Version metadata (SCHEMA_V) comes from crv.core.versioning.

Pruning semantics - If a table manifest exists, partition (bucket) pruning uses tick_min/tick_max overlap checks. - Regardless of pruning, when a filter is provided, we apply explicit row-level tick filters to ensure correctness even if files slightly over-cover the range. - If no manifest is present, we fall back to an FS walk to discover parts.

Import DAG discipline - Depends on stdlib, polars, and crv.io helpers; does not import higher layers (world/mind/lab/viz/app).

Notes - File protocol baseline; remote backends (e.g., fsspec) can be layered later.

crv.io.read.scan

scan(
    settings: crv.io.config.IoSettings,
    run_id: crv.core.ids.RunId,
    table: crv.core.grammar.TableName | str,
    where: dict[str, typing.Any] | None = None,
) -> pl.LazyFrame

Create a LazyFrame scanning selected parts, with optional manifest-based pruning.

Parameters:

Name Type Description Default
settings crv.io.config.IoSettings

IO configuration used to resolve paths/layout.

required
run_id crv.core.ids.RunId

Run identifier (see crv.core.ids.RunId; stored as string on disk).

required
table crv.core.grammar.TableName | str

Canonical table name (enum or lower_snake string).

required
where dict[str, typing.Any] | None

Optional tick filter with keys: - "tick_min": int | None - "tick_max": int | None

None

Returns:

Type Description
polars.LazyFrame

pl.LazyFrame: Lazy scan over discovered parts. When a manifest exists, partitions

polars.LazyFrame

(buckets) are pruned using tick range overlap; explicit row-level filters on "tick"

polars.LazyFrame

are applied to guarantee correctness even after pruning.

Notes
  • When no manifest is present, we conservatively include all *.parquet files found via a directory walk under the table directory.
  • Callers should rely on crv.core.tables descriptors for schema/dtype expectations.
Source code in src/crv/io/read.py
def scan(
    settings: IoSettings,
    run_id: RunId,
    table: TableName | str,
    where: dict[str, Any] | None = None,
) -> pl.LazyFrame:
    """
    Create a LazyFrame scanning selected parts, with optional manifest-based pruning.

    Args:
        settings (IoSettings): IO configuration used to resolve paths/layout.
        run_id (RunId): Run identifier (see crv.core.ids.RunId; stored as string on disk).
        table (TableName | str): Canonical table name (enum or lower_snake string).
        where (dict[str, Any] | None): Optional tick filter with keys:
            - "tick_min": int | None
            - "tick_max": int | None

    Returns:
        pl.LazyFrame: Lazy scan over discovered parts. When a manifest exists, partitions
        (buckets) are pruned using tick range overlap; explicit row-level filters on "tick"
        are applied to guarantee correctness even after pruning.

    Notes:
        - When no manifest is present, we conservatively include all *.parquet files found
          via a directory walk under the table directory.
        - Callers should rely on crv.core.tables descriptors for schema/dtype expectations.
    """
    tname = _normalize_table_name(table)
    manifest = load_manifest(settings, run_id, tname)
    if manifest is None:
        raise IoManifestError(
            f"manifest missing for table {tname!r} in run {run_id!r}; call Dataset.rebuild_manifest()"
        )
    paths = _paths_from_manifest(settings, run_id, tname, manifest, where)

    if not paths:
        # Return an empty scan via a dummy empty dataframe to keep types simple.
        # Consumers can handle empty results.
        return pl.LazyFrame()

    lf = pl.scan_parquet(paths)

    # Apply row-level tick filter if provided
    if where:
        tick_min = where.get("tick_min")
        tick_max = where.get("tick_max")
        if tick_min is not None:
            lf = lf.filter(pl.col("tick") >= int(tick_min))
        if tick_max is not None:
            lf = lf.filter(pl.col("tick") <= int(tick_max))

    return lf

crv.io.read.read

read(
    settings: crv.io.config.IoSettings,
    run_id: crv.core.ids.RunId,
    table: crv.core.grammar.TableName | str,
    where: dict[str, typing.Any] | None = None,
    limit: int | None = None,
) -> pl.DataFrame

Collect a DataFrame from scan(), optionally applying a pre-collect row cap.

Parameters:

Name Type Description Default
settings crv.io.config.IoSettings

IO configuration used to resolve paths/layout.

required
run_id crv.core.ids.RunId

Run identifier (see crv.core.ids.RunId; stored as string on disk).

required
table crv.core.grammar.TableName | str

Canonical table name (enum or lower_snake string).

required
where dict[str, typing.Any] | None

Optional {"tick_min","tick_max"} filter.

None
limit int | None

Optional row cap applied before collect().

None

Returns:

Type Description
polars.DataFrame

pl.DataFrame: Materialized frame (possibly empty).

Notes
  • Equivalent to scan(...).limit(limit).collect() when limit is provided.
  • Schema/dtype contracts are defined in crv.core.tables; this function does not perform validation or casting (that occurs on write).
Source code in src/crv/io/read.py
def read(
    settings: IoSettings,
    run_id: RunId,
    table: TableName | str,
    where: dict[str, Any] | None = None,
    limit: int | None = None,
) -> pl.DataFrame:
    """
    Collect a DataFrame from scan(), optionally applying a pre-collect row cap.

    Args:
        settings (IoSettings): IO configuration used to resolve paths/layout.
        run_id (RunId): Run identifier (see crv.core.ids.RunId; stored as string on disk).
        table (TableName | str): Canonical table name (enum or lower_snake string).
        where (dict[str, Any] | None): Optional {"tick_min","tick_max"} filter.
        limit (int | None): Optional row cap applied before collect().

    Returns:
        pl.DataFrame: Materialized frame (possibly empty).

    Notes:
        - Equivalent to scan(...).limit(limit).collect() when limit is provided.
        - Schema/dtype contracts are defined in crv.core.tables; this function does not
          perform validation or casting (that occurs on write).
    """
    lf = scan(settings, run_id, table, where=where)
    if limit is not None:
        lf = lf.limit(int(limit))
    return lf.collect()