Skip to content

crv.io.run_manifest

Experimental API

crv.io.run_manifest

Run-bundle manifest writer and indexers for canonical tables and artifacts.

Layout (file protocol baseline) - /runs//bundle.manifest.json - /runs//tables//manifest.json - /runs//artifacts/lab/{tidy,probes,policy,audit}/*

Notes - This module depends only on stdlib and crv.core.* (for SCHEMA_V) plus local io helpers. - It does NOT read parquet files for canonical table stats; it only reads per-table manifests. - For artifacts, it best-effort collects file sizes and optionally row counts if Polars is importable. + +Tables Index Schema (bundle.manifest["tables"]): +- "": { + rows: int, + bytes: int, + total_rows: int, # canonical alias of rows + total_bytes: int, # canonical alias of bytes + tick_min: int | null, + tick_max: int | null, + buckets: list[int] + } +

crv.io.run_manifest.bundle_manifest_path

bundle_manifest_path(
    settings: crv.io.config.IoSettings, run_id: crv.core.ids.RunId
) -> str

Path to the run-bundle manifest file.

Returns:

Name Type Description
str str

"/runs//bundle.manifest.json"

Source code in src/crv/io/run_manifest.py
def bundle_manifest_path(settings: IoSettings, run_id: RunId) -> str:
    """
    Path to the run-bundle manifest file.

    Returns:
        str: "<root>/runs/<run_id>/bundle.manifest.json"
    """
    return os.path.join(run_root(settings, str(run_id)), "bundle.manifest.json")

crv.io.run_manifest.collect_tables_index

collect_tables_index(
    settings: crv.io.config.IoSettings, run_id: crv.core.ids.RunId
) -> dict[str, Any]

Build an index of canonical tables for a run by reading per-table manifests.

Returns:

Type Description
dict[str, typing.Any]

dict[str, Any]: Mapping of table_name -> {rows, bytes, tick_min, tick_max, buckets}

Source code in src/crv/io/run_manifest.py
def collect_tables_index(settings: IoSettings, run_id: RunId) -> dict[str, Any]:
    """
    Build an index of canonical tables for a run by reading per-table manifests.

    Returns:
        dict[str, Any]: Mapping of table_name -> {rows, bytes, tick_min, tick_max, buckets}
    """
    troot = tables_root(settings, str(run_id))
    try:
        names = os.listdir(troot)
    except FileNotFoundError:
        names = []

    out: dict[str, Any] = {}
    for name in sorted(names):
        table_dir = os.path.join(troot, name)
        if not os.path.isdir(table_dir):
            continue
        m = load_manifest(settings, str(run_id), name)
        if m is None:
            continue
        out[name] = _aggregate_table_manifest(m)
    return out

crv.io.run_manifest.collect_artifacts_index

collect_artifacts_index(
    settings: crv.io.config.IoSettings, run_id: crv.core.ids.RunId
) -> dict[str, Any]

Index lab artifacts (tidy/probes/policy/audit) present under the run's artifacts directory.

Returns:

Type Description
dict[str, typing.Any]

dict[str, Any]: {"lab": {"tidy":[...], "probes":[...], "policy":[...], "audit":[...]}, "world":{}, "mind":{}}

Source code in src/crv/io/run_manifest.py
def collect_artifacts_index(settings: IoSettings, run_id: RunId) -> dict[str, Any]:
    """
    Index lab artifacts (tidy/probes/policy/audit) present under the run's artifacts directory.

    Returns:
        dict[str, Any]: {"lab": {"tidy":[...], "probes":[...], "policy":[...], "audit":[...]}, "world":{}, "mind":{}}
    """
    run_dir = run_root(settings, str(run_id))
    lab_root = os.path.join(run_dir, "artifacts", "lab")

    subdirs = ("tidy", "probes", "policy", "audit")
    idx: dict[str, Any] = {"lab": {k: [] for k in subdirs}, "world": {}, "mind": {}}

    # Optional row counting for parquet via Polars (lazy import)
    try:
        import polars as pl  # type: ignore
    except Exception:
        pl = None  # type: ignore[assignment]

    for sub in subdirs:
        sdir = os.path.join(lab_root, sub)
        try:
            files = os.listdir(sdir)
        except FileNotFoundError:
            files = []
        for fn in sorted(files):
            ap = os.path.join(sdir, fn)
            if not os.path.isfile(ap):
                continue
            entry: dict[str, Any] = {
                "path": _relpath_from_run(run_dir, ap),
                "bytes": int(os.path.getsize(ap)) if os.path.exists(ap) else 0,
            }
            if pl is not None and fn.endswith(".parquet"):
                try:
                    # Prefer a lightweight row count using a tiny scan
                    lf = pl.scan_parquet(ap)
                    n = lf.select([pl.len().alias("_n")]).collect()
                    entry["rows"] = int(n["_n"][0]) if n.height else 0
                except Exception:
                    # Best-effort only
                    pass
            idx["lab"][sub].append(entry)

    return idx

crv.io.run_manifest.write_run_bundle_manifest

write_run_bundle_manifest(
    settings: crv.io.config.IoSettings,
    run_id: crv.core.ids.RunId,
    *,
    meta: dict[str, typing.Any] | None = None
) -> dict[str, Any]

Compose and persist the run-bundle manifest atomically.

Returns:

Type Description
dict[str, typing.Any]

dict[str, Any]: The manifest payload that was written.

Source code in src/crv/io/run_manifest.py
def write_run_bundle_manifest(
    settings: IoSettings,
    run_id: RunId,
    *,
    meta: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """
    Compose and persist the run-bundle manifest atomically.

    Returns:
        dict[str, Any]: The manifest payload that was written.
    """
    run_dir = run_root(settings, str(run_id))
    makedirs(run_dir, exist_ok=True)

    tables_idx = collect_tables_index(settings, run_id)
    # Canonical alias fields: total_rows and total_bytes mirror rows and bytes in bundle tables index
    tables_idx_bc = {
        name: {**entry, "total_rows": entry.get("rows", 0), "total_bytes": entry.get("bytes", 0)}
        for name, entry in tables_idx.items()
    }

    payload: dict[str, Any] = {
        "schema_version": 1,
        "run": {
            "run_id": str(run_id),
            "created_at": _utc_now_iso(),
            "core_schema_v": {
                "major": SCHEMA_V.major,
                "minor": SCHEMA_V.minor,
                "date": SCHEMA_V.date,
            },
            "git": _git_info(),
        },
        "env": {
            "python": sys.version,
            "polars": _safe_pkg_version("polars"),
            "pyarrow": _safe_pkg_version("pyarrow"),
        },
        "io": {
            "root_dir": settings.root_dir,
            "partitioning": settings.partitioning,
            "tick_bucket_size": settings.tick_bucket_size,
            "row_group_size": settings.row_group_size,
            "compression": settings.compression,
        },
        "tables": tables_idx_bc,
        "artifacts": collect_artifacts_index(settings, run_id),
        "meta": meta or {},
    }

    final_path = bundle_manifest_path(settings, run_id)
    tmp_path = final_path + ".tmp"
    data = json.dumps(payload, indent=2).encode("utf-8")
    makedirs(os.path.dirname(final_path), exist_ok=True)
    with open_write(tmp_path) as fh:
        fh.write(data)
    rename_atomic(tmp_path, final_path)
    return payload