add lance writer#120
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces support for writing Lance datasets by adding a new write_lance sink. The implementation follows a distributed write model where individual workers stream rows into uncommitted Lance fragments, followed by a single-worker reducer stage that commits the finalized fragments to the dataset. The changes include documentation updates, new dependencies in pyproject.toml, and the core logic in src/refiner/pipeline/sinks/lance.py. Feedback focuses on improving the robustness of background thread error handling and optimizing the efficiency of the reducer stage by narrowing the scope of file discovery to job-specific prefixes.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c9c69a55f6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
ba8deea to
73e7b51
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ba8deea3b4
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| for fragment in fragment_json | ||
| ] | ||
| ) | ||
| read_version = existing.version |
There was a problem hiding this comment.
Preserve base dataset version for append commits
In append mode this reducer reads existing.version right before LanceDataset.commit, but Lance distributed writes expect read_version to be the version captured when fragments were produced. If another writer updates or overwrites the dataset between fragment generation and reducer commit, this code can commit stale fragments against a newer table state instead of triggering the intended conflict path, which can silently mix data from incompatible write windows. Persist the writer-stage base version with shard metadata and use that fixed version at commit time.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 73e7b5142b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| lance = _import_lance() | ||
| fragment_json: list[str] = [] | ||
| schema: pa.Schema | None = None | ||
| for rel_path in sorted(metadata_paths): |
There was a problem hiding this comment.
Deduplicate metadata paths before committing fragments
LanceCommitReducerSink._run_commit iterates metadata_paths directly, but _managed_paths can surface duplicate entries from DataFolder.find on some backends (the repo already accounts for this in file-cleanup tests). If the same metadata file appears twice, this loop re-reads it and appends the same fragment JSON twice, which can duplicate rows in the committed Lance dataset (or trigger commit-time fragment conflicts). Deduplicating metadata_paths before reading prevents duplicate fragment commits.
Useful? React with 👍 / 👎.
884259b to
9bb3e7d
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9bb3e7d634
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
627e321 to
e210d4c
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e210d4c0e4
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| queue = self._queue() | ||
| for batch in batches: | ||
| self._raise_if_failed() | ||
| queue.put(batch) |
There was a problem hiding this comment.
Prevent shard writer deadlock on failed fragment thread
_StreamingShardWriter.put_batches performs a blocking queue.put(batch) on a bounded queue, but only checks task_future before and after the call. If the Lance fragment thread fails after the pre-check and the queue is full (common with large shards that produce many batches), the producer blocks forever in put and never reaches the post-check, so the worker hangs instead of failing fast. This is triggered by write-time Lance errors (for example transient object-store failures) during active streaming.
Useful? React with 👍 / 👎.
6314462 to
7b971c2
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7b971c2458
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| if not self.closed: | ||
| self.closed = True | ||
| self._raise_if_failed() | ||
| self._queue().put(_QUEUE_CLOSED) |
There was a problem hiding this comment.
Prevent finish from hanging on failed writer thread
Handle the sentinel enqueue in finish() without an unbounded blocking put, because this can deadlock the worker: if the Lance writer thread fails after _raise_if_failed() but before consuming more items, and the queue is already full, self._queue().put(_QUEUE_CLOSED) blocks forever. In that state shard finalization (on_shard_complete) and worker shutdown (close) can hang instead of surfacing the write failure.
Useful? React with 👍 / 👎.
7b971c2 to
3c84489
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3c84489189
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| fragments = lance.fragment.write_fragments( | ||
| reader, | ||
| self.dataset_uri, | ||
| schema=self.schema, | ||
| mode=self.mode, |
There was a problem hiding this comment.
Propagate DataFolder credentials into Lance fragment writes
write_lance_dataset accepts DataFolderLike, but _StreamingShardWriter._run passes only self.dataset_uri into lance.fragment.write_fragments and never forwards storage options from the resolved DataFolder. This breaks runs where access is configured via (path, fs) or non-env filesystem options (custom endpoints, temporary creds, etc.): other sinks can write through the configured fsspec filesystem, while Lance writes fall back to Lance defaults/environment and fail to read/write the target URI.
Useful? React with 👍 / 👎.
Purpose
Add a Lance dataset writer to Refiner pipelines.
Key design decisions
write_lance(output, mode="create")with optionalpylancedependency.RecordBatchReader._refiner_lance_fragments/<job_id>/and uses a reducer stage to commit only finalized worker fragments.Test evidence
uv run pytest tests/pipeline/test_sinks.py -q-> 13 passeduv run ruff check --force-exclude .uv run ruff format --force-exclude --check .uv run ty checkuv run pytestwas run; it hit a transient Hugging Face 429 in an unrelated LeRobot hub test. The failed test was rerun after the reset window and passed.