Skip to content

[codex] Add Rerun format support#218

Draft
Guilherme Penedo (guipenedo) wants to merge 65 commits into
mainfrom
codex/rerun-format-support
Draft

[codex] Add Rerun format support#218
Guilherme Penedo (guipenedo) wants to merge 65 commits into
mainfrom
codex/rerun-format-support

Conversation

@guipenedo

Copy link
Copy Markdown
Collaborator

Summary

Adds an initial production Rerun RRD reader for Refiner.

  • Adds optional macrodata-refiner[rerun] dependencies using rerun-sdk[datafusion].
  • Adds read_rerun(...) with atomic RRD file sharding, lossless recording mode, and configurable robotics mode.
  • Adds focused synthetic RRD tests covering recording reads and conversion through to_robot_rows(...).

This is intentionally an early draft PR so cloud jobs can install this exact git ref while the writer and optimization work continue.

Validation

  • uv run --with '.[rerun]' pytest tests/readers/test_rerun_reader.py
  • uv run ty check src/refiner/pipeline/sources/readers/rerun.py src/refiner/pipeline/pipeline.py tests/readers/test_rerun_reader.py
  • commit hooks: ruff, ruff format, ty

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for reading Rerun RRD files by adding the RerunReader and exposing the read_rerun pipeline function. The review feedback highlights several critical issues in the implementation, including an AttributeError due to a non-existent drop_null() method on pyarrow.Table, potential TypeError exceptions when handling null values in list arrays and encoded images, a PEP-8 naming convention violation for the _local_rrd class, and a potential frame count mismatch in robotics mode if camera frames are missing.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +282 to +286
def _collect_table(df: Any) -> pa.Table:
table = df.to_arrow_table()
if _RERUN_SEGMENT_ID in table.column_names and table.num_rows > 0:
table = table.drop_null()
return table

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The pyarrow.Table class does not have a drop_null() method. Calling table.drop_null() will raise an AttributeError at runtime when _RERUN_SEGMENT_ID is present in the table's columns. Instead, use pyarrow.compute.is_valid to filter out rows where the segment ID is null.

Suggested change
def _collect_table(df: Any) -> pa.Table:
table = df.to_arrow_table()
if _RERUN_SEGMENT_ID in table.column_names and table.num_rows > 0:
table = table.drop_null()
return table
def _collect_table(df: Any) -> pa.Table:
table = df.to_arrow_table()
if _RERUN_SEGMENT_ID in table.column_names and table.num_rows > 0:
table = table.filter(pa.compute.is_valid(table.column(_RERUN_SEGMENT_ID)))
return table

Comment on lines +370 to +377
def _singleton_list_array(array: pa.Array) -> np.ndarray:
out = np.full(len(array), np.nan, dtype=np.float64)
for index, scalar in enumerate(array):
value = scalar.as_py()
if value:
out[index] = float(value[0])
return out

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If the list contains a null/None element (e.g., [None]), if value: will evaluate to True because the list is non-empty. However, accessing value[0] will return None, and attempting to cast it to float will raise a TypeError: float() argument must be a string or a real number, not 'NoneType'. To prevent this, ensure that value[0] is not None before casting.

Suggested change
def _singleton_list_array(array: pa.Array) -> np.ndarray:
out = np.full(len(array), np.nan, dtype=np.float64)
for index, scalar in enumerate(array):
value = scalar.as_py()
if value:
out[index] = float(value[0])
return out
def _singleton_list_array(array: pa.Array) -> np.ndarray:
out = np.full(len(array), np.nan, dtype=np.float64)
for index, scalar in enumerate(array):
value = scalar.as_py()
if value and value[0] is not None:
out[index] = float(value[0])
return out

Comment on lines +379 to +391
def _iter_encoded_images(values: pa.Array) -> Iterator[np.ndarray]:
from io import BytesIO

from PIL import Image

for scalar in values:
value = scalar.as_py()
if not value:
continue
data = cast(bytes | bytearray | list[int], value[0])
with Image.open(BytesIO(bytes(data))) as image:
yield np.asarray(image.convert("RGB"), dtype=np.uint8)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to _singleton_list_array, if the list contains a null/None element (e.g., [None]), if not value: will evaluate to False. Then value[0] will be None, and passing it to bytes(data) will raise a TypeError: cannot convert 'NoneType' object to bytes. Ensure that value[0] is not None before attempting to decode the image.

def _iter_encoded_images(values: pa.Array) -> Iterator[np.ndarray]:
    from io import BytesIO

    from PIL import Image

    for scalar in values:
        value = scalar.as_py()
        if not value or value[0] is None:
            continue
        data = cast(bytes | bytearray | list[int], value[0])
        with Image.open(BytesIO(bytes(data))) as image:
            yield np.asarray(image.convert(

Comment on lines +289 to +290
class _local_rrd:
def __init__(self, source: DataFile) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

According to PEP-8 naming conventions, class names should normally use the CapWords (CamelCase) convention. Consider renaming _local_rrd to _LocalRRD to improve code style and maintainability. Note that you will also need to update its usage on line 126.

Suggested change
class _local_rrd:
def __init__(self, source: DataFile) -> None:
class _LocalRRD:
def __init__(self, source: DataFile) -> None:
References
  1. PEP-8: Class names should normally use the CapWords convention. (link)

Comment on lines +254 to +260
for name, column in _camera_columns(schema, table, self.camera_prefix).items():
values = table.column(column).combine_chunks()
row[name] = VideoFrameSequence(
lambda values=values: _iter_encoded_images(values),
fps=self.fps or 30.0,
frame_count=len(values),
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In _robotics_row, frame_count is set to len(values) (the total number of rows in the timeline table). However, _iter_encoded_images skips null/empty frames using continue. If there are any missing/null frames in the camera column, the iterator will yield fewer frames than frame_count. This mismatch can cause downstream validation errors (e.g., in read_lerobot or other writers) and lead to out-of-sync alignment between video frames and other timeline data (like actions/states). Consider forward-filling missing frames or handling nulls explicitly to ensure the yielded frame count matches len(values).

Comment thread benchmark/rerun/refresh_aws_secrets.py Fixed
Comment thread benchmark/rerun/refresh_aws_secrets.py Fixed
@guipenedo

Copy link
Copy Markdown
Collaborator Author

Summary of the Rerun optimization / benchmark campaign:

This branch went through two related tracks: Rerun support hardening and performance work. The final commit on the branch is the stable cleanup recipe (26b0a2b5), but the path to get there included a lot of benchmark-driven churn. Here is the full breakdown of what worked and what did not.

  1. Support / correctness hardening
  • Added the Rerun reader and writer, then hardened them for real recordings, explicit robotics selections, and edge cases around metadata, output columns, and schema drift.
  • The useful support changes that stayed were the ones that removed accidental extra work without narrowing behavior: avoiding unused static reads, skipping redundant schema work, reusing schema component maps, filling scalar matrices in place, and hardening output column collision handling.
  • One encoded-image offset cache was tried and then reverted because it did not justify the complexity.
  1. Benchmarking / measurement infrastructure
  • Added the cloud benchmark harness, comparison helper, AWS secret refresh flow, and AWS profile handling so the measurements were repeatable and did not leak credentials.
  • These changes made the signal trustworthy, but they were measurement-only work. They did not themselves change runtime.
  1. Raw RRD copy / staging path
  • The main working area was the raw-copy path and staging pipeline.
  • Things that helped and stayed: skipping unnecessary table work for raw-copy benchmarks, direct-copying raw source chunks, avoiding double metadata scans, hardlinking staged copies on local filesystems, parallelizing staged source opens, parallelizing metadata scans, and tuning the writer loop / buffering / local-to-remote upload path.
  • The strongest concrete wins here were the direct-copy and local upload changes: local synthetic runs consistently showed the direct-copy path orders of magnitude faster than the fallback in the small benchmark, and the DataFile.copy path using put_file improved the cloud rrd-copy results. Buffering tweaks helped too, but the gains were smaller and sometimes noisy.
  • Things that did not stick: native remote RRD staging, metadata-fusion experiments, a single-store lookup fast path, and some reader fanout changes. Those were benchmarked, found noisy or regressive, and reverted.
  • The cloud rrd-copy result moved from the earlier baseline around 83.61s down into the low 80s on the better raw-copy / upload path, but the intermediate tuning was noisy enough that several of the more aggressive experiments were rolled back.
  1. Cleanup matcher / reducer path
  • The cleanup path became the other real win.
  • Helpful changes that stayed: caching FinalizedShardWorker.worker_token, tightening the default-root cleanup parsing, precomputing cleanup keys, and using fixed-position / precomputed-string matching instead of regex in the hot path.
  • Local matcher benchmarks showed a clear win: the regex path was around 0.87s on the synthetic workload, while the precomputed-key path got down to about 0.23s.
  • The sink-level cleanup path also improved materially once the listing prefix and worker token were cached, but a dedicated sink benchmark harness was later removed from the final recipe because it was just measurement scaffolding, not product code.
  • An explicit cleanup_key field was tried and then removed again. It did not prove better than deriving the key from the cached worker_token, and it complicated typing and downstream code for no clear gain.
  1. What was reverted or did not win
  • Reverted: native remote RRD staging, metadata fusion, fast-path single-store lookup, reader fanout oversubscription, and a few cleanup micro-optimizations that did not beat the best existing result.
  • Not kept: the sink-level benchmark harness and the cleanup_key field experiment.
  • Kept: the smaller but reliable improvements that reduced repeated work without narrowing support.

Current state

  • The branch is clean and pushed.
  • The final commit keeps the best stable cleanup recipe and the working Rerun support hardening.
  • The broader lesson from the benchmark history is that the real wins came from removing repeated work in the hot path, not from widening concurrency or adding more special cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants