From 829e6a52fdb48db3816651da46581ed0f7debf70 Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Fri, 23 Jan 2026 13:49:08 -0500 Subject: [PATCH 01/10] feat(scanner): add row id allowlist for plain scans --- rust/lance/src/dataset/scanner.rs | 98 +++++++++++++++++++++++++++++-- 1 file changed, 94 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a0812d6caf4..d47e9f92fcb 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::HashMap; use std::ops::Range; use std::pin::Pin; use std::sync::{Arc, LazyLock}; @@ -77,6 +78,7 @@ use tracing::{info_span, instrument, Span}; use super::Dataset; use crate::dataset::row_offsets_to_row_addresses; +use crate::dataset::rowids::load_row_id_sequence; use crate::dataset::utils::SchemaAdapter; use crate::index::vector::utils::{ default_distance_type_for, get_vector_dim, get_vector_type, validate_distance_type_for, @@ -497,6 +499,9 @@ pub struct Scanner { /// Filter. filter: LanceFilter, + /// Optional allowlist of row ids (in dataset row-id space) + row_id_allowlist: Option>, + /// Optional full text search query full_text_query: Option, @@ -770,6 +775,7 @@ impl Scanner { prefilter: false, materialization_style: MaterializationStyle::Heuristic, filter: LanceFilter::default(), + row_id_allowlist: None, full_text_query: None, batch_size: None, batch_readahead: get_num_compute_intensive_cpus(), @@ -812,6 +818,19 @@ impl Scanner { self } + /// Provide an allowlist of row ids (in dataset row-id space). + /// + /// For unstable row ids, the allowlist must come from the same snapshot. + /// For stable row ids, values remain valid across versions. + pub fn row_id_allowlist(&mut self, row_ids: I) -> &mut Self + where + I: IntoIterator, + { + let allow_list: RowAddrTreeMap = row_ids.into_iter().collect(); + self.row_id_allowlist = Some(Arc::new(RowAddrMask::from_allowed(allow_list))); + self + } + pub fn from_fragment(dataset: Arc, fragment: Fragment) -> Self { Self { fragments: Some(vec![fragment]), @@ -2043,7 +2062,20 @@ impl Scanner { .full_expr .as_ref() .and_then(TakeOperation::try_from_expr); - if let Some((take_op, remainder)) = take_op { + if let Some(allowlist) = &self.row_id_allowlist { + // Apply the allowlist as a hard constraint and intersect any explicit take. + let mut mask = allowlist.as_ref().clone(); + if let Some((take_op, remainder)) = take_op { + let take_mask = self.take_op_as_row_id_mask(take_op).await?; + mask = mask & take_mask; + filter_plan.expr_filter_plan = remainder + .map(ExprFilterPlan::new_refine_only) + .unwrap_or(ExprFilterPlan::default()); + } else { + filter_plan.expr_filter_plan.make_refine_only(); + } + self.mask_as_take_input(mask)? + } else if let Some((take_op, remainder)) = take_op { // If there is any remainder use it as the filter (we don't even try and combine an indexed // search on the filter with a take as that seems excessive) filter_plan.expr_filter_plan = remainder @@ -2371,9 +2403,7 @@ impl Scanner { } } - fn u64s_as_take_input(&self, u64s: Vec) -> Result> { - let row_addrs = RowAddrTreeMap::from_iter(u64s); - let row_addr_mask = RowAddrMask::from_allowed(row_addrs); + fn mask_as_take_input(&self, row_addr_mask: RowAddrMask) -> Result> { let index_result = IndexExprResult::Exact(row_addr_mask); let fragments_covered = RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32)); @@ -2386,6 +2416,66 @@ impl Scanner { Ok(Arc::new(OneShotExec::new(stream))) } + fn u64s_as_take_input(&self, u64s: Vec) -> Result> { + let row_addrs = RowAddrTreeMap::from_iter(u64s); + let row_addr_mask = RowAddrMask::from_allowed(row_addrs); + self.mask_as_take_input(row_addr_mask) + } + + async fn row_addrs_to_row_ids(&self, row_addrs: &[u64]) -> Result> { + if !self.dataset.manifest.uses_stable_row_ids() { + return Ok(row_addrs.to_vec()); + } + + let mut addrs_by_frag: HashMap> = HashMap::new(); + for addr in row_addrs.iter().copied() { + if addr == RowAddress::TOMBSTONE_ROW { + continue; + } + let addr = RowAddress::new_from_u64(addr); + addrs_by_frag + .entry(addr.fragment_id()) + .or_default() + .push(addr.row_offset()); + } + + if addrs_by_frag.is_empty() { + return Ok(Vec::new()); + } + + let mut row_ids = Vec::new(); + for fragment in self.dataset.manifest.fragments.iter() { + let frag_id = fragment.id as u32; + let Some(offsets) = addrs_by_frag.remove(&frag_id) else { + continue; + }; + let sequence = load_row_id_sequence(self.dataset.as_ref(), fragment).await?; + row_ids.extend( + offsets + .into_iter() + .filter_map(|offset| sequence.get(offset as usize)), + ); + } + + Ok(row_ids) + } + + async fn take_op_as_row_id_mask(&self, take_op: TakeOperation) -> Result { + let row_ids = match take_op { + TakeOperation::RowIds(ids) => ids, + TakeOperation::RowAddrs(addrs) => self.row_addrs_to_row_ids(&addrs).await?, + TakeOperation::RowOffsets(offsets) => { + let mut addrs = + row_offsets_to_row_addresses(self.dataset.as_ref(), &offsets).await?; + addrs.retain(|addr| *addr != RowAddress::TOMBSTONE_ROW); + self.row_addrs_to_row_ids(&addrs).await? + } + }; + Ok(RowAddrMask::from_allowed(RowAddrTreeMap::from_iter( + row_ids, + ))) + } + async fn take_source(&self, take_op: TakeOperation) -> Result> { // We generally assume that late materialization does not make sense for take operations // so we can just use the physical projection From 48d98d0263046562d0a2b2d6ec80a7479da1f886 Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Fri, 23 Jan 2026 14:28:18 -0500 Subject: [PATCH 02/10] feat(prefilter): apply allowlist mask in fts/ann prefilters --- rust/lance/src/dataset/scanner.rs | 11 ++++++++++- rust/lance/src/index/prefilter.rs | 9 ++++++++- rust/lance/src/index/vector/fixture_test.rs | 1 + rust/lance/src/index/vector/ivf.rs | 7 ++++++- rust/lance/src/io/exec/fts.rs | 19 +++++++++++++++++++ rust/lance/src/io/exec/knn.rs | 9 +++++++++ rust/lance/src/io/exec/utils.rs | 2 ++ 7 files changed, 55 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index d47e9f92fcb..bab1ff58fe0 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3009,6 +3009,7 @@ impl Scanner { query.clone(), params.clone(), prefilter_source.clone(), + self.row_id_allowlist.clone(), ))) } @@ -3040,6 +3041,7 @@ impl Scanner { query.clone(), params.clone(), prefilter_source.clone(), + self.row_id_allowlist.clone(), )); let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; @@ -3868,7 +3870,13 @@ impl Scanner { let prefilter_source = self .prefilter_source(filter_plan, self.get_indexed_frags(index)) .await?; - let inner_fanout_search = new_knn_exec(self.dataset.clone(), index, q, prefilter_source)?; + let inner_fanout_search = new_knn_exec( + self.dataset.clone(), + index, + q, + prefilter_source, + self.row_id_allowlist.clone(), + )?; let sort_expr = PhysicalSortExpr { expr: expressions::col(DIST_COL, inner_fanout_search.schema().as_ref())?, options: SortOptions { @@ -3927,6 +3935,7 @@ impl Scanner { index, &query, prefilter_source.clone(), + self.row_id_allowlist.clone(), )?; let sort_expr = PhysicalSortExpr { expr: expressions::col(DIST_COL, ann_node.schema().as_ref())?, diff --git a/rust/lance/src/index/prefilter.rs b/rust/lance/src/index/prefilter.rs index 917cfe12b45..d43fd2d70d6 100644 --- a/rust/lance/src/index/prefilter.rs +++ b/rust/lance/src/index/prefilter.rs @@ -49,6 +49,8 @@ pub struct DatasetPreFilter { // these tasks only when we've done as much work as we can without them. pub(super) deleted_ids: Option>>>, pub(super) filtered_ids: Option>>, + // Optional allowlist mask to intersect with filter + deletion masks. + pub(super) extra_mask: Option>, // When the tasks are finished this is the combined filter pub(super) final_mask: Mutex>>, } @@ -58,6 +60,7 @@ impl DatasetPreFilter { dataset: Arc, indices: &[IndexMetadata], filter: Option>, + extra_mask: Option>, ) -> Self { let mut fragments = RoaringBitmap::new(); if indices.iter().any(|idx| idx.fragment_bitmap.is_none()) { @@ -74,6 +77,7 @@ impl DatasetPreFilter { Self { deleted_ids, filtered_ids, + extra_mask, final_mask: Mutex::new(OnceCell::new()), } } @@ -238,6 +242,9 @@ impl PreFilter for DatasetPreFilter { let final_mask = self.final_mask.lock().unwrap(); final_mask.get_or_init(|| { let mut combined = RowAddrMask::default(); + if let Some(extra_mask) = &self.extra_mask { + combined = combined & extra_mask.as_ref().clone(); + } if let Some(filtered_ids) = &self.filtered_ids { combined = combined & filtered_ids.get_ready(); } @@ -251,7 +258,7 @@ impl PreFilter for DatasetPreFilter { } fn is_empty(&self) -> bool { - self.deleted_ids.is_none() && self.filtered_ids.is_none() + self.deleted_ids.is_none() && self.filtered_ids.is_none() && self.extra_mask.is_none() } /// Get the row id mask for this prefilter diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index 3445a3cd5d4..acc75069508 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -278,6 +278,7 @@ mod test { Arc::new(DatasetPreFilter { deleted_ids: None, filtered_ids: None, + extra_mask: None, final_mask: Mutex::new(OnceCell::new()), }), &NoOpMetricsCollector, diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index c14cdeada81..6713e853ed1 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2672,7 +2672,12 @@ mod tests { base_id: None, }; - let prefilter = Arc::new(DatasetPreFilter::new(dataset.clone(), &[index_meta], None)); + let prefilter = Arc::new(DatasetPreFilter::new( + dataset.clone(), + &[index_meta], + None, + None, + )); let is_not_remapped = Some; let is_remapped = |row_id| Some(row_id + BIG_OFFSET); diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 72ef63846df..154e8dea20c 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -20,6 +20,7 @@ use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::stream::{self}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; +use lance_core::utils::mask::RowAddrMask; use lance_core::{utils::tracing::StreamTracingExt, ROW_ID}; use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, MetricsExt, PARTITIONS_SEARCHED_METRIC}; @@ -80,6 +81,7 @@ pub struct MatchQueryExec { query: MatchQuery, params: FtsSearchParams, prefilter_source: PreFilterSource, + allowlist_mask: Option>, properties: PlanProperties, metrics: ExecutionPlanMetricsSet, @@ -114,6 +116,7 @@ impl MatchQueryExec { query: MatchQuery, params: FtsSearchParams, prefilter_source: PreFilterSource, + allowlist_mask: Option>, ) -> Self { let properties = PlanProperties::new( EquivalenceProperties::new(FTS_SCHEMA.clone()), @@ -126,6 +129,7 @@ impl MatchQueryExec { query, params, prefilter_source, + allowlist_mask, properties, metrics: ExecutionPlanMetricsSet::new(), } @@ -174,6 +178,7 @@ impl ExecutionPlan for MatchQueryExec { query: self.query.clone(), params: self.params.clone(), prefilter_source: PreFilterSource::None, + allowlist_mask: self.allowlist_mask.clone(), properties: self.properties.clone(), metrics: ExecutionPlanMetricsSet::new(), } @@ -199,6 +204,7 @@ impl ExecutionPlan for MatchQueryExec { query: self.query.clone(), params: self.params.clone(), prefilter_source, + allowlist_mask: self.allowlist_mask.clone(), properties: self.properties.clone(), metrics: ExecutionPlanMetricsSet::new(), } @@ -222,6 +228,7 @@ impl ExecutionPlan for MatchQueryExec { let params = self.params.clone(); let ds = self.dataset.clone(); let prefilter_source = self.prefilter_source.clone(); + let allowlist_mask = self.allowlist_mask.clone(); let metrics = Arc::new(FtsIndexMetrics::new(&self.metrics, partition)); let column = query.column.ok_or(DataFusionError::Execution(format!( "column not set for MatchQuery {}", @@ -247,6 +254,7 @@ impl ExecutionPlan for MatchQueryExec { &prefilter_source, ds, &[index_meta], + allowlist_mask, )?; let inverted_idx = index @@ -510,6 +518,7 @@ pub struct PhraseQueryExec { query: PhraseQuery, params: FtsSearchParams, prefilter_source: PreFilterSource, + allowlist_mask: Option>, properties: PlanProperties, metrics: ExecutionPlanMetricsSet, } @@ -543,6 +552,7 @@ impl PhraseQueryExec { query: PhraseQuery, mut params: FtsSearchParams, prefilter_source: PreFilterSource, + allowlist_mask: Option>, ) -> Self { let properties = PlanProperties::new( EquivalenceProperties::new(FTS_SCHEMA.clone()), @@ -557,6 +567,7 @@ impl PhraseQueryExec { query, params, prefilter_source, + allowlist_mask, properties, metrics: ExecutionPlanMetricsSet::new(), } @@ -598,6 +609,7 @@ impl ExecutionPlan for PhraseQueryExec { query: self.query.clone(), params: self.params.clone(), prefilter_source: PreFilterSource::None, + allowlist_mask: self.allowlist_mask.clone(), properties: self.properties.clone(), metrics: ExecutionPlanMetricsSet::new(), }, @@ -621,6 +633,7 @@ impl ExecutionPlan for PhraseQueryExec { query: self.query.clone(), params: self.params.clone(), prefilter_source, + allowlist_mask: self.allowlist_mask.clone(), properties: self.properties.clone(), metrics: ExecutionPlanMetricsSet::new(), } @@ -644,6 +657,7 @@ impl ExecutionPlan for PhraseQueryExec { let params = self.params.clone(); let ds = self.dataset.clone(); let prefilter_source = self.prefilter_source.clone(); + let allowlist_mask = self.allowlist_mask.clone(); let metrics = Arc::new(FtsIndexMetrics::new(&self.metrics, partition)); let stream = stream::once(async move { let _timer = metrics.baseline_metrics.elapsed_compute().timer(); @@ -669,6 +683,7 @@ impl ExecutionPlan for PhraseQueryExec { &prefilter_source, ds, &[index_meta], + allowlist_mask, )?; let index = index @@ -1210,6 +1225,7 @@ pub mod tests { MatchQuery::new("blah".to_string()).with_column(Some("text".to_string())), FtsSearchParams::default(), PreFilterSource::None, + None, ); match_query .execute(0, Arc::new(TaskContext::default())) @@ -1242,6 +1258,7 @@ pub mod tests { PhraseQuery::new("blah".to_string()), FtsSearchParams::new().with_phrase_slop(Some(0)), PreFilterSource::None, + None, ); phrase_query .execute(0, Arc::new(TaskContext::default())) @@ -1254,6 +1271,7 @@ pub mod tests { MatchQuery::new("blah".to_string()).with_column(Some("text".to_string())), FtsSearchParams::default(), PreFilterSource::None, + None, ); let boost_input_two = MatchQueryExec::new( @@ -1261,6 +1279,7 @@ pub mod tests { MatchQuery::new("blah".to_string()).with_column(Some("text".to_string())), FtsSearchParams::default(), PreFilterSource::None, + None, ); let boost_query = BoostQueryExec::new( diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index f8f8869617a..e8b162407ea 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -36,6 +36,7 @@ use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::{future, stream, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use lance_core::utils::futures::FinallyStreamExt; +use lance_core::utils::mask::RowAddrMask; use lance_core::ROW_ID; use lance_core::{utils::tokio::get_num_compute_intensive_cpus, ROW_ID_FIELD}; use lance_datafusion::utils::{ @@ -324,6 +325,7 @@ pub fn new_knn_exec( indices: &[IndexMetadata], query: &Query, prefilter_source: PreFilterSource, + allowlist_mask: Option>, ) -> Result> { let ivf_node = ANNIvfPartitionExec::try_new( dataset.clone(), @@ -337,6 +339,7 @@ pub fn new_knn_exec( indices.to_vec(), query.clone(), prefilter_source, + allowlist_mask, )?; Ok(Arc::new(sub_index)) @@ -599,6 +602,7 @@ pub struct ANNIvfSubIndexExec { /// Prefiltering input prefilter_source: PreFilterSource, + allowlist_mask: Option>, /// Datafusion Plan Properties properties: PlanProperties, @@ -613,6 +617,7 @@ impl ANNIvfSubIndexExec { indices: Vec, query: Query, prefilter_source: PreFilterSource, + allowlist_mask: Option>, ) -> Result { if input.schema().field_with_name(PART_ID_COLUMN).is_err() { return Err(Error::Index { @@ -635,6 +640,7 @@ impl ANNIvfSubIndexExec { indices, query, prefilter_source, + allowlist_mask, properties, metrics: ExecutionPlanMetricsSet::new(), }) @@ -955,6 +961,7 @@ impl ExecutionPlan for ANNIvfSubIndexExec { indices: self.indices.clone(), query: self.query.clone(), prefilter_source, + allowlist_mask: self.allowlist_mask.clone(), properties: self.properties.clone(), metrics: ExecutionPlanMetricsSet::new(), } @@ -978,6 +985,7 @@ impl ExecutionPlan for ANNIvfSubIndexExec { let column = self.query.column.clone(); let indices = self.indices.clone(); let prefilter_source = self.prefilter_source.clone(); + let allowlist_mask = self.allowlist_mask.clone(); let metrics = Arc::new(AnnIndexMetrics::new(&self.metrics, partition)); let metrics_clone = metrics.clone(); let timer = Instant::now(); @@ -1034,6 +1042,7 @@ impl ExecutionPlan for ANNIvfSubIndexExec { ds.clone(), &indices, prefilter_loader, + allowlist_mask, )); let state = Arc::new(ANNIvfEarlySearchResults::new(indices.len(), query.k)); diff --git a/rust/lance/src/io/exec/utils.rs b/rust/lance/src/io/exec/utils.rs index 1cfa767d4aa..22bdec0b807 100644 --- a/rust/lance/src/io/exec/utils.rs +++ b/rust/lance/src/io/exec/utils.rs @@ -51,6 +51,7 @@ pub(crate) fn build_prefilter( prefilter_source: &PreFilterSource, ds: Arc, index_meta: &[IndexMetadata], + extra_mask: Option>, ) -> Result> { let prefilter_loader = match &prefilter_source { PreFilterSource::FilteredRowIds(src_node) => { @@ -67,6 +68,7 @@ pub(crate) fn build_prefilter( ds, index_meta, prefilter_loader, + extra_mask, ))) } From e02207747cbab836cf604d7244747a6d8d32c432 Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Fri, 23 Jan 2026 15:04:08 -0500 Subject: [PATCH 03/10] feat(flat-search): apply allowlist to flat fts/knn --- rust/lance/src/dataset/scanner.rs | 3 ++ rust/lance/src/io/exec/fts.rs | 51 ++++++++++++++++++++++++++++--- rust/lance/src/io/exec/knn.rs | 36 ++++++++++++++++++++++ 3 files changed, 86 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index bab1ff58fe0..0b8a0899a05 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3134,6 +3134,7 @@ impl Scanner { params.clone(), scan_node, FTS_SCHEMA.clone(), + self.row_id_allowlist.clone(), )); Ok(flat_match_plan) } @@ -3706,6 +3707,7 @@ impl Scanner { q.params(), input, schema, + self.row_id_allowlist.clone(), ))) } _ => { @@ -3763,6 +3765,7 @@ impl Scanner { &q.column, q.key.clone(), metric_type, + self.row_id_allowlist.clone(), )?); let lower: Option<(Expr, Arc)> = q diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 154e8dea20c..79fdf0ecee7 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::datatypes::{Float32Type, UInt64Type}; -use arrow_array::{Float32Array, RecordBatch, UInt64Array}; +use arrow_array::{BooleanArray, Float32Array, RecordBatch, UInt64Array}; use arrow_schema::SchemaRef; use datafusion::common::Statistics; use datafusion::error::{DataFusionError, Result as DataFusionResult}; @@ -18,7 +18,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Pla use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning}; use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::stream::{self}; -use futures::{FutureExt, StreamExt, TryStreamExt}; +use futures::{future, FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; use lance_core::utils::mask::RowAddrMask; use lance_core::{utils::tracing::StreamTracingExt, ROW_ID}; @@ -347,6 +347,7 @@ pub struct FlatMatchQueryExec { query: MatchQuery, params: FtsSearchParams, unindexed_input: Arc, + allowlist_mask: Option>, properties: PlanProperties, metrics: ExecutionPlanMetricsSet, @@ -382,6 +383,7 @@ impl FlatMatchQueryExec { params: FtsSearchParams, unindexed_input: Arc, schema: SchemaRef, + allowlist_mask: Option>, ) -> Self { let properties = PlanProperties::new( EquivalenceProperties::new(schema), @@ -394,6 +396,7 @@ impl FlatMatchQueryExec { query, params, unindexed_input, + allowlist_mask, properties, metrics: ExecutionPlanMetricsSet::new(), } @@ -428,6 +431,7 @@ impl ExecutionPlan for FlatMatchQueryExec { query: self.query.clone(), params: self.params.clone(), unindexed_input, + allowlist_mask: self.allowlist_mask.clone(), properties: self.properties.clone(), metrics: ExecutionPlanMetricsSet::new(), })) @@ -443,13 +447,18 @@ impl ExecutionPlan for FlatMatchQueryExec { let ds = self.dataset.clone(); let metrics = Arc::new(FtsIndexMetrics::new(&self.metrics, partition)); let metrics_clone = metrics.clone(); + let allowlist_mask = self.allowlist_mask.clone(); let column = query.column.ok_or(DataFusionError::Execution(format!( "column not set for MatchQuery {}", query.terms )))?; - let unindexed_input = - document_input(self.unindexed_input.execute(partition, context)?, &column)?; + let unindexed_input = self.unindexed_input.execute(partition, context)?; + let unindexed_input = match allowlist_mask { + Some(allowlist) => filter_stream_by_allowlist(unindexed_input, allowlist), + None => unindexed_input, + }; + let unindexed_input = document_input(unindexed_input, &column)?; let schema = self.schema(); let stream = stream::once(async move { @@ -512,6 +521,39 @@ impl ExecutionPlan for FlatMatchQueryExec { } } +fn filter_batch_by_allowlist( + batch: RecordBatch, + allowlist: &RowAddrMask, +) -> DataFusionResult { + let row_ids = batch.column_by_name(ROW_ID).ok_or_else(|| { + DataFusionError::Execution(format!( + "allowlist requires '{}' column in FTS input", + ROW_ID + )) + })?; + let row_ids = row_ids.as_primitive::(); + let mask = BooleanArray::from_iter( + row_ids + .iter() + .map(|value| Some(value.map(|id| allowlist.selected(id)).unwrap_or(false))), + ); + arrow::compute::filter_record_batch(&batch, &mask) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) +} + +fn filter_stream_by_allowlist( + input: SendableRecordBatchStream, + allowlist: Arc, +) -> SendableRecordBatchStream { + let schema = input.schema(); + let stream = input.and_then(move |batch| { + let allowlist = allowlist.clone(); + async move { filter_batch_by_allowlist(batch, allowlist.as_ref()) } + }); + let stream = stream.try_filter(|batch| future::ready(batch.num_rows() > 0)); + Box::pin(RecordBatchStreamAdapter::new(schema, stream)) +} + #[derive(Debug)] pub struct PhraseQueryExec { dataset: Arc, @@ -1246,6 +1288,7 @@ pub mod tests { FtsSearchParams::default(), flat_input, FTS_SCHEMA.clone(), + None, ); flat_match_query .execute(0, Arc::new(TaskContext::default())) diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index e8b162407ea..22c23a6f537 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -116,6 +116,7 @@ pub struct KNNVectorDistanceExec { pub query: ArrayRef, pub column: String, pub distance_type: DistanceType, + allowlist_mask: Option>, output_schema: SchemaRef, properties: PlanProperties, @@ -145,6 +146,7 @@ impl KNNVectorDistanceExec { column: &str, query: ArrayRef, distance_type: DistanceType, + allowlist_mask: Option>, ) -> Result { let mut output_schema = input.schema().as_ref().clone(); let (_, element_type) = get_vector_type(&(&output_schema).try_into()?, column)?; @@ -174,6 +176,7 @@ impl KNNVectorDistanceExec { query, column: column.to_string(), distance_type, + allowlist_mask, output_schema, properties, metrics: ExecutionPlanMetricsSet::new(), @@ -214,6 +217,7 @@ impl ExecutionPlan for KNNVectorDistanceExec { &self.column, self.query.clone(), self.distance_type, + self.allowlist_mask.clone(), )?)) } @@ -226,7 +230,18 @@ impl ExecutionPlan for KNNVectorDistanceExec { let key = self.query.clone(); let column = self.column.clone(); let dt = self.distance_type; + let allowlist_mask = self.allowlist_mask.clone(); let stream = input_stream + .and_then(move |batch| { + let allowlist_mask = allowlist_mask.clone(); + async move { + let batch = match allowlist_mask.as_ref() { + Some(allowlist) => filter_batch_by_allowlist(batch, allowlist)?, + None => batch, + }; + Ok(batch) + } + }) .try_filter(|batch| future::ready(batch.num_rows() > 0)) .map(move |batch| { let key = key.clone(); @@ -297,6 +312,26 @@ impl ExecutionPlan for KNNVectorDistanceExec { } } +fn filter_batch_by_allowlist( + batch: RecordBatch, + allowlist: &RowAddrMask, +) -> DataFusionResult { + let row_ids = batch.column_by_name(ROW_ID).ok_or_else(|| { + DataFusionError::Execution(format!( + "allowlist requires '{}' column in KNN input", + ROW_ID + )) + })?; + let row_ids = row_ids.as_primitive::(); + let mask = BooleanArray::from_iter( + row_ids + .iter() + .map(|value| Some(value.map(|id| allowlist.selected(id)).unwrap_or(false))), + ); + arrow::compute::filter_record_batch(&batch, &mask) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) +} + pub static KNN_INDEX_SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(Schema::new(vec![ Field::new(DIST_COL, DataType::Float32, true), @@ -1536,6 +1571,7 @@ mod tests { "vector", Arc::new(generate_random_array(dim)), DistanceType::L2, + None, ) .unwrap(); assert_eq!( From 2794e251a13487bbaca13f3821a1d1e1d36dff62 Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Tue, 27 Jan 2026 12:07:16 -0500 Subject: [PATCH 04/10] feat(scanner): support row-id allowlist in filtered reads --- rust/lance/src/dataset/scanner.rs | 243 ++++++++++++++++++++---- rust/lance/src/io/exec/filtered_read.rs | 62 +++++- 2 files changed, 267 insertions(+), 38 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 0b8a0899a05..f7baabd4fec 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2074,7 +2074,36 @@ impl Scanner { } else { filter_plan.expr_filter_plan.make_refine_only(); } - self.mask_as_take_input(mask)? + let mut read_options = + FilteredReadOptions::new(self.dataset.empty_projection().with_row_id()) + .with_allowlist_mask(Arc::new(mask.clone())); + + if let Some(fragments) = self.fragments.as_ref() { + read_options = read_options.with_fragments(Arc::new(fragments.clone())); + } + + if let Some(batch_size) = self.batch_size { + read_options = read_options.with_batch_size(batch_size as u32); + } + + if let Some(fragment_readahead) = self.fragment_readahead { + read_options = read_options.with_fragment_readahead(fragment_readahead); + } + + if self.include_deleted_rows { + read_options = read_options.with_deleted_rows()?; + } + + if let Some(io_buffer_size_bytes) = self.io_buffer_size { + read_options = read_options.with_io_buffer_size(io_buffer_size_bytes); + } + + let index_input = self.mask_as_take_input(mask)?; + Arc::new(FilteredReadExec::try_new( + self.dataset.clone(), + read_options, + Some(index_input), + )?) } else if let Some((take_op, remainder)) = take_op { // If there is any remainder use it as the filter (we don't even try and combine an indexed // search on the filter with a take as that seems excessive) @@ -2349,6 +2378,10 @@ impl Scanner { read_options = read_options.with_io_buffer_size(io_buffer_size_bytes); } + if let Some(allowlist_mask) = &self.row_id_allowlist { + read_options = read_options.with_allowlist_mask(allowlist_mask.clone()); + } + let index_input = filter_plan.index_query.clone().map(|index_query| { Arc::new(ScalarIndexExec::new(self.dataset.clone(), index_query)) as Arc @@ -3110,23 +3143,48 @@ impl Scanner { let filter_columns = Planner::column_names_in_expr(expr); columns.extend(filter_columns); } - let flat_fts_scan_schema = Arc::new(self.dataset.schema().project(&columns).unwrap()); - let mut scan_node = self.scan_fragments( - true, - false, - false, - false, - false, - flat_fts_scan_schema, - Arc::new(fragments), - None, - false, - ); + let fragments = Arc::new(fragments); + let scan_node: Arc = if self.row_id_allowlist.is_some() + && !self.dataset.is_legacy_storage() + { + let mut read_filter = filter_plan.clone(); + read_filter.make_refine_only(); + let projection = self + .dataset + .empty_projection() + .with_row_id() + .union_columns(&columns, OnMissing::Error)?; + let PlannedFilteredScan { plan, .. } = self + .filtered_read( + &read_filter, + projection, + false, + Some(fragments.clone()), + None, + false, + ) + .await?; + plan + } else { + let flat_fts_scan_schema = Arc::new(self.dataset.schema().project(&columns).unwrap()); + let mut scan_node = self.scan_fragments( + true, + false, + false, + false, + false, + flat_fts_scan_schema, + fragments.clone(), + None, + false, + ); - if let Some(expr) = filter_plan.full_expr.as_ref() { - // If there is a prefilter we need to manually apply it to the new data - scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?); - } + if let Some(expr) = filter_plan.full_expr.as_ref() { + // If there is a prefilter we need to manually apply it to the new data + scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?); + } + scan_node + }; let flat_match_plan = Arc::new(FlatMatchQueryExec::new( self.dataset.clone(), @@ -3317,29 +3375,54 @@ impl Scanner { let filter_columns = Planner::column_names_in_expr(expr); columns.extend(filter_columns); } - let vector_scan_projection = Arc::new(self.dataset.schema().project(&columns).unwrap()); + let fragments = Arc::new(unindexed_fragments); // Note: we could try and use the scalar indices here to reduce the scope of this scan but the // most common case is that fragments that are newer than the vector index are going to be newer - // than the scalar indices anyways - let mut scan_node = self.scan_fragments( - true, - false, - false, - false, - false, - vector_scan_projection, - Arc::new(unindexed_fragments), - // Can't pushdown limit/offset in an ANN search - None, - // We are re-ordering anyways, so no need to get data in data - // in a deterministic order. - false, - ); + // than the scalar indices anyways. + let scan_node: Arc = + if self.row_id_allowlist.is_some() && !self.dataset.is_legacy_storage() { + let mut read_filter = filter_plan.clone(); + read_filter.make_refine_only(); + let projection = self + .dataset + .empty_projection() + .with_row_id() + .union_columns(&columns, OnMissing::Error)?; + let PlannedFilteredScan { plan, .. } = self + .filtered_read( + &read_filter, + projection, + false, + Some(fragments.clone()), + None, + false, + ) + .await?; + plan + } else { + let vector_scan_projection = + Arc::new(self.dataset.schema().project(&columns).unwrap()); + let mut scan_node = self.scan_fragments( + true, + false, + false, + false, + false, + vector_scan_projection, + fragments.clone(), + // Can't pushdown limit/offset in an ANN search + None, + // We are re-ordering anyways, so no need to get data in data + // in a deterministic order. + false, + ); - if let Some(expr) = filter_plan.full_expr.as_ref() { - // If there is a prefilter we need to manually apply it to the new data - scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?); - } + if let Some(expr) = filter_plan.full_expr.as_ref() { + // If there is a prefilter we need to manually apply it to the new data + scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?); + } + scan_node + }; // first we do flat search on just the new data let topk_appended = self.flat_knn(scan_node, &q)?; @@ -5533,6 +5616,44 @@ mod test { assert_eq!(expected_row_ids, actual_row_ids); } + #[tokio::test] + async fn test_row_id_allowlist_stable_row_ids() -> Result<()> { + let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, true).await?; + let dataset = &test_ds.dataset; + + let batch = dataset + .scan() + .project(&["i"])? + .with_row_id() + .try_into_batch() + .await?; + let row_ids = batch + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::(); + let row_id_values = row_ids.values(); + let allowlist = vec![row_id_values[0], row_id_values[150], row_id_values[250]]; + + let filtered = dataset + .scan() + .project(&["i"])? + .with_row_id() + .row_id_allowlist(allowlist.clone()) + .try_into_batch() + .await?; + let filtered_ids: Vec = filtered + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + .collect(); + + assert_eq!(allowlist, filtered_ids); + Ok(()) + } + #[tokio::test] async fn test_scan_unordered_with_row_id() { // This test doesn't make sense for v2 files, there is no way to get an out-of-order scan @@ -7733,6 +7854,54 @@ mod test { ) .await?; + log::info!("Test case: Combined KNN/ANN with allowlist"); + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, (i), (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=... + KNNVectorDistance: metric=l2 + LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=6), expr=... + ANNSubIndex: name=..., k=6, deltas=1, metric=L2 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, (i), (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=... + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[vec], num_fragments=1, range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=--, refine_filter=-- + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=6), expr=... + ANNSubIndex: name=..., k=6, deltas=1, metric=L2 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + }; + assert_plan_equals( + &dataset.dataset, + |scan| Ok(scan.nearest("vec", &q, 6)?.row_id_allowlist([0_u64])), + expected, + ) + .await?; + // new data and with filter log::info!("Test case: Combined KNN/ANN with postfilter"); let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 0a32da7813a..da4c042b1fa 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -564,6 +564,14 @@ impl FilteredReadStream { } } + if let Some(allowlist_mask) = options.allowlist_mask.as_ref() { + let allowlist_ranges = row_id_sequence.mask_to_offset_ranges(allowlist_mask); + to_read = Self::intersect_ranges(&to_read, &allowlist_ranges); + if to_read.is_empty() { + continue; + } + } + // Apply index and apply scan range after filter if applicable Self::apply_index_to_fragment( evaluated_index, @@ -1220,6 +1228,8 @@ pub struct FilteredReadOptions { /// result to avoid applying this (and instead only apply the refine filter) but in some cases /// the index result does not cover all fragments or is not exact. pub full_filter: Option, + /// Optional allowlist mask to restrict scan ranges up front. + pub allowlist_mask: Option>, /// The threading mode to use for the scan pub threading_mode: FilteredReadThreadingMode, /// The size of the I/O buffer to use for the scan @@ -1250,6 +1260,7 @@ impl FilteredReadOptions { projection, refine_filter: None, full_filter: None, + allowlist_mask: None, io_buffer_size_bytes: None, threading_mode: FilteredReadThreadingMode::OnePartitionMultipleThreads( get_num_compute_intensive_cpus(), @@ -1401,6 +1412,12 @@ impl FilteredReadOptions { self.io_buffer_size_bytes = Some(io_buffer_size); self } + + /// Restrict the scan to rows contained in the allowlist mask. + pub fn with_allowlist_mask(mut self, allowlist_mask: Arc) -> Self { + self.allowlist_mask = Some(allowlist_mask); + self + } } /// A plan node that reads a dataset, applying an optional filter and projection. @@ -1861,7 +1878,11 @@ mod tests { }; use itertools::Itertools; use lance_core::datatypes::OnMissing; - use lance_core::utils::tempfile::TempStrDir; + use lance_core::utils::{ + address::RowAddress, + mask::{RowAddrMask, RowAddrTreeMap}, + tempfile::TempStrDir, + }; use lance_datagen::{array, gen_batch, BatchCount, Dimension, RowCount}; use lance_index::{ optimize::OptimizeOptions, @@ -2160,6 +2181,45 @@ mod tests { fixture.test_plan(options, &u32s(vec![])).await; } + #[test_log::test(tokio::test)] + async fn test_allowlist_mask() { + let fixture = TestFixture::new().await; + + let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([ + u64::from(RowAddress::new_from_parts(0, 1)), + u64::from(RowAddress::new_from_parts(0, 5)), + u64::from(RowAddress::new_from_parts(2, 60)), + u64::from(RowAddress::new_from_parts(3, 10)), + ])); + + let options = FilteredReadOptions::basic_full_read(&fixture.dataset) + .with_allowlist_mask(Arc::new(allowlist)); + + fixture + .test_plan(options, &u32s(vec![1..2, 5..6, 260..261, 310..311])) + .await; + } + + #[test_log::test(tokio::test)] + async fn test_allowlist_mask_with_scan_range() { + let fixture = TestFixture::new().await; + + let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([ + u64::from(RowAddress::new_from_parts(0, 20)), + u64::from(RowAddress::new_from_parts(2, 60)), + u64::from(RowAddress::new_from_parts(3, 10)), + ])); + + let options = FilteredReadOptions::basic_full_read(&fixture.dataset) + .with_allowlist_mask(Arc::new(allowlist)) + .with_scan_range_before_filter(10..130) + .unwrap(); + + fixture + .test_plan(options, &u32s(vec![20..21, 260..261])) + .await; + } + #[test_log::test(tokio::test)] async fn test_batch_size() { let fixture = TestFixture::new().await; From c11971fb57cafa8e9b44723b8d097fe1815cb10c Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Tue, 27 Jan 2026 13:43:22 -0500 Subject: [PATCH 05/10] test(scanner): add row_id allowlist runtime coverage --- rust/lance/src/dataset/scanner.rs | 228 ++++++++++++++++++++++++++++++ 1 file changed, 228 insertions(+) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index f7baabd4fec..cb709fd3955 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -5089,6 +5089,59 @@ mod test { assert_eq!(expected_i, actual_i); } + #[tokio::test] + async fn test_knn_allowlist_runtime() -> Result<()> { + let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false).await?; + let key: Float32Array = (32..64).map(|v| v as f32).collect(); + + let all_row_ids = collect_row_ids(test_ds.dataset.scan()).await?; + assert!(all_row_ids.len() > 1); + + let baseline_flat = { + let mut scan = test_ds.dataset.scan(); + scan.nearest("vec", &key, 1).unwrap(); + scan.use_index(false); + collect_row_ids(scan).await? + }; + let mut candidate = *all_row_ids.last().unwrap(); + if candidate == baseline_flat[0] { + candidate = all_row_ids[all_row_ids.len() - 2]; + } + assert_ne!(candidate, baseline_flat[0]); + + let filtered_flat = { + let mut scan = test_ds.dataset.scan(); + scan.nearest("vec", &key, 1).unwrap(); + scan.use_index(false); + scan.row_id_allowlist([candidate]); + collect_row_ids(scan).await? + }; + assert_eq!(filtered_flat, vec![candidate]); + + test_ds.make_vector_index().await?; + let all_row_ids = collect_row_ids(test_ds.dataset.scan()).await?; + + let baseline_ann = { + let mut scan = test_ds.dataset.scan(); + scan.nearest("vec", &key, 1).unwrap(); + collect_row_ids(scan).await? + }; + let mut candidate = *all_row_ids.last().unwrap(); + if candidate == baseline_ann[0] { + candidate = all_row_ids[all_row_ids.len() - 2]; + } + assert_ne!(candidate, baseline_ann[0]); + + let filtered_ann = { + let mut scan = test_ds.dataset.scan(); + scan.nearest("vec", &key, 1).unwrap(); + scan.row_id_allowlist([candidate]); + collect_row_ids(scan).await? + }; + assert_eq!(filtered_ann, vec![candidate]); + Ok(()) + } + #[rstest] #[tokio::test] async fn test_can_project_distance() { @@ -5654,6 +5707,181 @@ mod test { Ok(()) } + async fn collect_row_ids(mut scan: Scanner) -> Result> { + scan.with_row_id(); + let stream = scan.try_into_stream().await?; + let schema = stream.schema(); + let batches = stream.try_collect::>().await?; + if batches.is_empty() { + return Ok(Vec::new()); + } + let batch = concat_batches(&schema, &batches)?; + let row_ids = batch + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values(); + Ok(row_ids.to_vec()) + } + + #[tokio::test] + async fn test_row_id_allowlist_unstable_row_ids() -> Result<()> { + let test_ds = TestVectorDataset::new(LanceFileVersion::Legacy, false).await?; + let dataset = &test_ds.dataset; + + let allowlist = vec![0_u64, 50_u64, (1_u64 << 32) + 10]; + let filtered = dataset + .scan() + .project(&["i"])? + .with_row_id() + .row_id_allowlist(allowlist.clone()) + .try_into_batch() + .await?; + let filtered_ids: BTreeSet = filtered + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + .collect(); + + assert_eq!(BTreeSet::from_iter(allowlist), filtered_ids); + Ok(()) + } + + #[tokio::test] + async fn test_allowlist_with_deletions() -> Result<()> { + let write_params = WriteParams { + data_storage_version: Some(LanceFileVersion::Stable), + enable_stable_row_ids: false, + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = gen_batch() + .col("i", array::step::()) + .into_ram_dataset_with_params( + FragmentCount::from(1), + FragmentRowCount::from(10), + Some(write_params), + ) + .await + .unwrap(); + dataset.delete("i = 1 OR i = 3").await.unwrap(); + + let allowlist = vec![1_u64, 3_u64, 4_u64]; + let filtered = dataset + .scan() + .project(&["i"])? + .with_row_id() + .row_id_allowlist(allowlist) + .try_into_batch() + .await?; + let filtered_ids: Vec = filtered + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + .collect(); + + assert_eq!(filtered_ids, vec![4]); + Ok(()) + } + + #[tokio::test] + async fn test_fts_allowlist_runtime() -> Result<()> { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("s", DataType::Utf8, true), + ArrowField::new("i", DataType::Int32, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec![ + "hello world", + "goodbye", + "hello there", + ])), + Arc::new(Int32Array::from(vec![0, 1, 2])), + ], + )?; + let tmp_dir = TempStrDir::default(); + let mut dataset = Dataset::write( + RecordBatchIterator::new(vec![Ok(batch)], schema.clone()), + &tmp_dir, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::Stable), + enable_stable_row_ids: false, + ..Default::default() + }), + ) + .await?; + let params = lance_index::scalar::inverted::tokenizer::InvertedIndexParams::default() + .with_position(true); + dataset + .create_index(&["s"], IndexType::Inverted, None, ¶ms, true) + .await?; + + let query = FullTextSearchQuery::new("hello".to_owned()); + let baseline_indexed = { + let mut scan = dataset.scan(); + scan.full_text_search(query.clone()).unwrap(); + collect_row_ids(scan).await? + }; + assert!(!baseline_indexed.is_empty()); + + let allowlist = baseline_indexed.iter().take(2).copied().collect::>(); + let filtered_indexed = { + let mut scan = dataset.scan(); + scan.full_text_search(query.clone()).unwrap(); + scan.row_id_allowlist(allowlist.clone()); + collect_row_ids(scan).await? + }; + assert_eq!( + BTreeSet::from_iter(allowlist), + BTreeSet::from_iter(filtered_indexed) + ); + + let new_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["hello new", "other"])), + Arc::new(Int32Array::from(vec![3, 4])), + ], + )?; + dataset + .append(RecordBatchIterator::new(vec![Ok(new_batch)], schema), None) + .await?; + + let baseline_with_unindexed = { + let mut scan = dataset.scan(); + scan.full_text_search(query.clone()).unwrap(); + collect_row_ids(scan).await? + }; + let baseline_set = BTreeSet::from_iter(baseline_indexed.clone()); + let unindexed_only = BTreeSet::from_iter(baseline_with_unindexed) + .difference(&baseline_set) + .copied() + .collect::>(); + assert!(!unindexed_only.is_empty()); + + let allowlist = vec![unindexed_only[0], *baseline_indexed.last().unwrap()]; + let filtered_with_unindexed = { + let mut scan = dataset.scan(); + scan.full_text_search(query).unwrap(); + scan.row_id_allowlist(allowlist.clone()); + collect_row_ids(scan).await? + }; + assert_eq!( + BTreeSet::from_iter(allowlist), + BTreeSet::from_iter(filtered_with_unindexed) + ); + Ok(()) + } + #[tokio::test] async fn test_scan_unordered_with_row_id() { // This test doesn't make sense for v2 files, there is no way to get an out-of-order scan From 6075e88fbcb24e05d604df9ab8bb1e184d35f6ef Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Tue, 27 Jan 2026 14:17:39 -0500 Subject: [PATCH 06/10] test(scanner): add allowlist + filter plain scan case --- rust/lance/src/dataset/scanner.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index cb709fd3955..e43a4338e19 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -5791,6 +5791,36 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_row_id_allowlist_with_filter_plain_scan() -> Result<()> { + let dataset = gen_batch() + .col("i", array::step::()) + .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .await + .unwrap(); + + let allowlist = vec![1_u64, 3_u64, 5_u64, 7_u64, 9_u64]; + let filtered = dataset + .scan() + .project(&["i"])? + .with_row_id() + .row_id_allowlist(allowlist) + .filter("i >= 5")? + .try_into_batch() + .await?; + let filtered_ids: BTreeSet = filtered + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + .collect(); + + assert_eq!(filtered_ids, BTreeSet::from_iter(vec![5, 7, 9])); + Ok(()) + } + #[tokio::test] async fn test_fts_allowlist_runtime() -> Result<()> { let schema = Arc::new(ArrowSchema::new(vec![ From 0667bbd3f9fee5cc5f0071c0fe62cc257ac356f4 Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Tue, 27 Jan 2026 18:49:04 -0500 Subject: [PATCH 07/10] test(scanner): cover row_id_allowlist paths --- rust/lance/src/dataset/scanner.rs | 53 +++++++++++++++++++ rust/lance/src/io/exec/fts.rs | 86 +++++++++++++++++++++++++++++++ rust/lance/src/io/exec/knn.rs | 43 ++++++++++++++-- 3 files changed, 179 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index e43a4338e19..62cf301c9f6 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -5821,6 +5821,59 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_row_id_allowlist_with_take_rowoffset_stable() -> Result<()> { + let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, true).await?; + let dataset = &test_ds.dataset; + + let batch = dataset + .scan() + .project(&[ROW_OFFSET])? + .with_row_id() + .try_into_batch() + .await?; + let row_offsets = batch + .column_by_name(ROW_OFFSET) + .unwrap() + .as_primitive::() + .values(); + let row_ids = batch + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values(); + + let allowlist = vec![row_ids[10], row_ids[20], row_ids[30]]; + let offsets = vec![row_offsets[20], row_offsets[30], row_offsets[40]]; + let filter = format!( + "_rowoffset IN ({}, {}, {}) AND i >= 0", + offsets[0], offsets[1], offsets[2] + ); + + let filtered = dataset + .scan() + .project(&["i"])? + .with_row_id() + .row_id_allowlist(allowlist) + .filter(&filter)? + .try_into_batch() + .await?; + let filtered_ids: BTreeSet = filtered + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + .collect(); + + assert_eq!( + filtered_ids, + BTreeSet::from_iter(vec![row_ids[20], row_ids[30]]) + ); + Ok(()) + } + #[tokio::test] async fn test_fts_allowlist_runtime() -> Result<()> { let schema = Arc::new(ArrowSchema::new(vec![ diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 79fdf0ecee7..134f3878cff 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -1216,7 +1216,15 @@ impl ExecutionPlan for BooleanQueryExec { pub mod tests { use std::sync::{Arc, Mutex}; + use arrow::datatypes::UInt64Type; + use arrow_array::cast::AsArray; + use arrow_array::{Float32Array, Int32Array, RecordBatch, UInt64Array}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::{execution::TaskContext, physical_plan::ExecutionPlan}; + use futures::{stream, TryStreamExt}; + use lance_core::utils::mask::{RowAddrMask, RowAddrTreeMap}; + use lance_core::ROW_ID; use lance_datafusion::datagen::DatafusionDatagenExt; use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; use lance_datafusion::utils::PARTITIONS_SEARCHED_METRIC; @@ -1256,6 +1264,84 @@ pub mod tests { } } + #[test] + fn test_filter_batch_by_allowlist() { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new(ROW_ID, DataType::UInt64, false), + ArrowField::new("score", DataType::Float32, false), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(UInt64Array::from(vec![1_u64, 2, 3])), + Arc::new(Float32Array::from(vec![0.1_f32, 0.2, 0.3])), + ], + ) + .unwrap(); + let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([2_u64, 3])); + let filtered = super::filter_batch_by_allowlist(batch, &allowlist).unwrap(); + let row_ids = filtered + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values(); + assert_eq!(row_ids.as_ref(), &[2_u64, 3]); + } + + #[test] + fn test_filter_batch_by_allowlist_missing_row_id() { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "x", + DataType::Int32, + false, + )])); + let batch = + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1]))]).unwrap(); + let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([1_u64])); + let err = super::filter_batch_by_allowlist(batch, &allowlist).unwrap_err(); + assert!(err.to_string().contains("allowlist requires")); + } + + #[tokio::test] + async fn test_filter_stream_by_allowlist_drops_empty() { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new(ROW_ID, DataType::UInt64, false), + ArrowField::new("score", DataType::Float32, false), + ])); + let batch_empty = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt64Array::from(vec![1_u64])), + Arc::new(Float32Array::from(vec![0.1_f32])), + ], + ) + .unwrap(); + let batch_keep = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt64Array::from(vec![2_u64, 3])), + Arc::new(Float32Array::from(vec![0.2_f32, 0.3])), + ], + ) + .unwrap(); + + let input = stream::iter(vec![Ok(batch_empty), Ok(batch_keep)]); + let input = Box::pin(RecordBatchStreamAdapter::new(schema, input)); + let allowlist = Arc::new(RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([ + 2_u64, + ]))); + let filtered = super::filter_stream_by_allowlist(input, allowlist); + let batches = filtered.try_collect::>().await.unwrap(); + + assert_eq!(batches.len(), 1); + let row_ids = batches[0] + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values(); + assert_eq!(row_ids.as_ref(), &[2_u64]); + } + #[test] fn execute_without_context() { // These tests ensure we can create nodes and call execute without a tokio Runtime diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index 22c23a6f537..9f302f24e1a 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -1395,12 +1395,15 @@ mod tests { use super::*; use arrow::compute::{concat_batches, sort_to_indices, take_record_batch}; - use arrow::datatypes::Float32Type; + use arrow::datatypes::{Float32Type, UInt64Type}; use arrow_array::{ - ArrayRef, FixedSizeListArray, Float32Array, Int32Array, RecordBatchIterator, StringArray, + ArrayRef, FixedSizeListArray, Float32Array, Int32Array, RecordBatch, RecordBatchIterator, + StringArray, UInt64Array, }; - use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_core::utils::mask::{RowAddrMask, RowAddrTreeMap}; use lance_core::utils::tempfile::TempStrDir; + use lance_core::ROW_ID; use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts}; use lance_datagen::{array, BatchCount, RowCount}; use lance_index::optimize::OptimizeOptions; @@ -1465,6 +1468,40 @@ mod tests { assert_eq!(query.maximum_nprobes, Some(50)); } + #[test] + fn test_filter_batch_by_allowlist() { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new(ROW_ID, DataType::UInt64, false), + ArrowField::new("score", DataType::Float32, false), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(UInt64Array::from(vec![1_u64, 2, 3])), + Arc::new(Float32Array::from(vec![0.1_f32, 0.2, 0.3])), + ], + ) + .unwrap(); + let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([2_u64, 3])); + let filtered = filter_batch_by_allowlist(batch, &allowlist).unwrap(); + let row_ids = filtered + .column_by_name(ROW_ID) + .unwrap() + .as_primitive::() + .values(); + assert_eq!(row_ids.as_ref(), &[2_u64, 3]); + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "x", + DataType::Int32, + false, + )])); + let batch = + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1]))]).unwrap(); + let err = filter_batch_by_allowlist(batch, &allowlist).unwrap_err(); + assert!(err.to_string().contains("allowlist requires")); + } + #[tokio::test] async fn knn_flat_search() { let schema = Arc::new(ArrowSchema::new(vec![ From 69676c42497c8859da174538bb4ef6a10b5fedfa Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Tue, 27 Jan 2026 21:49:35 -0500 Subject: [PATCH 08/10] test(scanner): fix clippy in allowlist test --- rust/lance/src/dataset/scanner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 62cf301c9f6..22bc06f35dc 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -5844,7 +5844,7 @@ mod test { .values(); let allowlist = vec![row_ids[10], row_ids[20], row_ids[30]]; - let offsets = vec![row_offsets[20], row_offsets[30], row_offsets[40]]; + let offsets = [row_offsets[20], row_offsets[30], row_offsets[40]]; let filter = format!( "_rowoffset IN ({}, {}, {}) AND i >= 0", offsets[0], offsets[1], offsets[2] From a3aa23686a73b0f2dab57684022bb021b1b7700e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Larochelle?= Date: Wed, 28 Jan 2026 19:23:24 -0500 Subject: [PATCH 09/10] perf(scanner): preallocate row_ids buffer Co-authored-by: Will Jones --- rust/lance/src/dataset/scanner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 22bc06f35dc..6dcec2a1824 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2476,7 +2476,7 @@ impl Scanner { return Ok(Vec::new()); } - let mut row_ids = Vec::new(); + let mut row_ids = Vec::with_capacity(row_addrs.len()); for fragment in self.dataset.manifest.fragments.iter() { let frag_id = fragment.id as u32; let Some(offsets) = addrs_by_frag.remove(&frag_id) else { From 994f1748ee4c7f9eab8dab0e5f15117c95af6b8d Mon Sep 17 00:00:00 2001 From: fredlarochelle Date: Wed, 28 Jan 2026 21:20:07 -0500 Subject: [PATCH 10/10] test(fts/knn): use record_batch macro in allowlist tests --- rust/lance/src/io/exec/fts.rs | 47 +++++++---------------------------- rust/lance/src/io/exec/knn.rs | 25 +++++-------------- 2 files changed, 15 insertions(+), 57 deletions(-) diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 134f3878cff..728c1696754 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -1218,7 +1218,7 @@ pub mod tests { use arrow::datatypes::UInt64Type; use arrow_array::cast::AsArray; - use arrow_array::{Float32Array, Int32Array, RecordBatch, UInt64Array}; + use arrow_array::{record_batch, Float32Array, Int32Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::{execution::TaskContext, physical_plan::ExecutionPlan}; @@ -1266,16 +1266,9 @@ pub mod tests { #[test] fn test_filter_batch_by_allowlist() { - let schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new(ROW_ID, DataType::UInt64, false), - ArrowField::new("score", DataType::Float32, false), - ])); - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(UInt64Array::from(vec![1_u64, 2, 3])), - Arc::new(Float32Array::from(vec![0.1_f32, 0.2, 0.3])), - ], + let batch = record_batch!( + (ROW_ID, UInt64, [1_u64, 2_u64, 3_u64]), + ("score", Float32, [0.1_f32, 0.2_f32, 0.3_f32]) ) .unwrap(); let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([2_u64, 3])); @@ -1290,13 +1283,7 @@ pub mod tests { #[test] fn test_filter_batch_by_allowlist_missing_row_id() { - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "x", - DataType::Int32, - false, - )])); - let batch = - RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1]))]).unwrap(); + let batch = record_batch!(("x", Int32, [1_i32])).unwrap(); let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([1_u64])); let err = super::filter_batch_by_allowlist(batch, &allowlist).unwrap_err(); assert!(err.to_string().contains("allowlist requires")); @@ -1304,26 +1291,10 @@ pub mod tests { #[tokio::test] async fn test_filter_stream_by_allowlist_drops_empty() { - let schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new(ROW_ID, DataType::UInt64, false), - ArrowField::new("score", DataType::Float32, false), - ])); - let batch_empty = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(UInt64Array::from(vec![1_u64])), - Arc::new(Float32Array::from(vec![0.1_f32])), - ], - ) - .unwrap(); - let batch_keep = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(UInt64Array::from(vec![2_u64, 3])), - Arc::new(Float32Array::from(vec![0.2_f32, 0.3])), - ], - ) - .unwrap(); + let batch_empty = record_batch!((ROW_ID, UInt64, [1]), ("score", Float32, [0.1])).unwrap(); + let batch_keep = + record_batch!((ROW_ID, UInt64, [2, 3]), ("score", Float32, [0.2, 0.3])).unwrap(); + let schema = batch_empty.schema(); let input = stream::iter(vec![Ok(batch_empty), Ok(batch_keep)]); let input = Box::pin(RecordBatchStreamAdapter::new(schema, input)); diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index 9f302f24e1a..5e6ab437c9a 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -1397,8 +1397,8 @@ mod tests { use arrow::compute::{concat_batches, sort_to_indices, take_record_batch}; use arrow::datatypes::{Float32Type, UInt64Type}; use arrow_array::{ - ArrayRef, FixedSizeListArray, Float32Array, Int32Array, RecordBatch, RecordBatchIterator, - StringArray, UInt64Array, + record_batch, ArrayRef, FixedSizeListArray, Float32Array, Int32Array, RecordBatch, + RecordBatchIterator, StringArray, UInt64Array, }; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use lance_core::utils::mask::{RowAddrMask, RowAddrTreeMap}; @@ -1470,16 +1470,9 @@ mod tests { #[test] fn test_filter_batch_by_allowlist() { - let schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new(ROW_ID, DataType::UInt64, false), - ArrowField::new("score", DataType::Float32, false), - ])); - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(UInt64Array::from(vec![1_u64, 2, 3])), - Arc::new(Float32Array::from(vec![0.1_f32, 0.2, 0.3])), - ], + let batch = record_batch!( + (ROW_ID, UInt64, [1_u64, 2_u64, 3_u64]), + ("score", Float32, [0.1_f32, 0.2_f32, 0.3_f32]) ) .unwrap(); let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([2_u64, 3])); @@ -1491,13 +1484,7 @@ mod tests { .values(); assert_eq!(row_ids.as_ref(), &[2_u64, 3]); - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "x", - DataType::Int32, - false, - )])); - let batch = - RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1]))]).unwrap(); + let batch = record_batch!(("x", Int32, [1_i32])).unwrap(); let err = filter_batch_by_allowlist(batch, &allowlist).unwrap_err(); assert!(err.to_string().contains("allowlist requires")); }