Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
664 changes: 623 additions & 41 deletions rust/lance/src/dataset/scanner.rs

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion rust/lance/src/index/prefilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct DatasetPreFilter {
// these tasks only when we've done as much work as we can without them.
pub(super) deleted_ids: Option<Arc<SharedPrerequisite<Arc<RowAddrMask>>>>,
pub(super) filtered_ids: Option<Arc<SharedPrerequisite<RowAddrMask>>>,
// Optional allowlist mask to intersect with filter + deletion masks.
pub(super) extra_mask: Option<Arc<RowAddrMask>>,
// When the tasks are finished this is the combined filter
pub(super) final_mask: Mutex<OnceCell<Arc<RowAddrMask>>>,
}
Expand All @@ -58,6 +60,7 @@ impl DatasetPreFilter {
dataset: Arc<Dataset>,
indices: &[IndexMetadata],
filter: Option<Box<dyn FilterLoader>>,
extra_mask: Option<Arc<RowAddrMask>>,
) -> Self {
let mut fragments = RoaringBitmap::new();
if indices.iter().any(|idx| idx.fragment_bitmap.is_none()) {
Expand All @@ -74,6 +77,7 @@ impl DatasetPreFilter {
Self {
deleted_ids,
filtered_ids,
extra_mask,
final_mask: Mutex::new(OnceCell::new()),
}
}
Expand Down Expand Up @@ -238,6 +242,9 @@ impl PreFilter for DatasetPreFilter {
let final_mask = self.final_mask.lock().unwrap();
final_mask.get_or_init(|| {
let mut combined = RowAddrMask::default();
if let Some(extra_mask) = &self.extra_mask {
combined = combined & extra_mask.as_ref().clone();
}
if let Some(filtered_ids) = &self.filtered_ids {
combined = combined & filtered_ids.get_ready();
}
Expand All @@ -251,7 +258,7 @@ impl PreFilter for DatasetPreFilter {
}

fn is_empty(&self) -> bool {
self.deleted_ids.is_none() && self.filtered_ids.is_none()
self.deleted_ids.is_none() && self.filtered_ids.is_none() && self.extra_mask.is_none()
}

/// Get the row id mask for this prefilter
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/index/vector/fixture_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ mod test {
Arc::new(DatasetPreFilter {
deleted_ids: None,
filtered_ids: None,
extra_mask: None,
final_mask: Mutex::new(OnceCell::new()),
}),
&NoOpMetricsCollector,
Expand Down
7 changes: 6 additions & 1 deletion rust/lance/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2672,7 +2672,12 @@ mod tests {
base_id: None,
};

let prefilter = Arc::new(DatasetPreFilter::new(dataset.clone(), &[index_meta], None));
let prefilter = Arc::new(DatasetPreFilter::new(
dataset.clone(),
&[index_meta],
None,
None,
));

let is_not_remapped = Some;
let is_remapped = |row_id| Some(row_id + BIG_OFFSET);
Expand Down
62 changes: 61 additions & 1 deletion rust/lance/src/io/exec/filtered_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,14 @@ impl FilteredReadStream {
}
}

if let Some(allowlist_mask) = options.allowlist_mask.as_ref() {
let allowlist_ranges = row_id_sequence.mask_to_offset_ranges(allowlist_mask);
to_read = Self::intersect_ranges(&to_read, &allowlist_ranges);
if to_read.is_empty() {
continue;
}
}

// Apply index and apply scan range after filter if applicable
Self::apply_index_to_fragment(
evaluated_index,
Expand Down Expand Up @@ -1220,6 +1228,8 @@ pub struct FilteredReadOptions {
/// result to avoid applying this (and instead only apply the refine filter) but in some cases
/// the index result does not cover all fragments or is not exact.
pub full_filter: Option<Expr>,
/// Optional allowlist mask to restrict scan ranges up front.
pub allowlist_mask: Option<Arc<RowAddrMask>>,
/// The threading mode to use for the scan
pub threading_mode: FilteredReadThreadingMode,
/// The size of the I/O buffer to use for the scan
Expand Down Expand Up @@ -1250,6 +1260,7 @@ impl FilteredReadOptions {
projection,
refine_filter: None,
full_filter: None,
allowlist_mask: None,
io_buffer_size_bytes: None,
threading_mode: FilteredReadThreadingMode::OnePartitionMultipleThreads(
get_num_compute_intensive_cpus(),
Expand Down Expand Up @@ -1401,6 +1412,12 @@ impl FilteredReadOptions {
self.io_buffer_size_bytes = Some(io_buffer_size);
self
}

/// Restrict the scan to rows contained in the allowlist mask.
pub fn with_allowlist_mask(mut self, allowlist_mask: Arc<RowAddrMask>) -> Self {
self.allowlist_mask = Some(allowlist_mask);
self
}
}

/// A plan node that reads a dataset, applying an optional filter and projection.
Expand Down Expand Up @@ -1861,7 +1878,11 @@ mod tests {
};
use itertools::Itertools;
use lance_core::datatypes::OnMissing;
use lance_core::utils::tempfile::TempStrDir;
use lance_core::utils::{
address::RowAddress,
mask::{RowAddrMask, RowAddrTreeMap},
tempfile::TempStrDir,
};
use lance_datagen::{array, gen_batch, BatchCount, Dimension, RowCount};
use lance_index::{
optimize::OptimizeOptions,
Expand Down Expand Up @@ -2160,6 +2181,45 @@ mod tests {
fixture.test_plan(options, &u32s(vec![])).await;
}

#[test_log::test(tokio::test)]
async fn test_allowlist_mask() {
let fixture = TestFixture::new().await;

let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([
u64::from(RowAddress::new_from_parts(0, 1)),
u64::from(RowAddress::new_from_parts(0, 5)),
u64::from(RowAddress::new_from_parts(2, 60)),
u64::from(RowAddress::new_from_parts(3, 10)),
]));

let options = FilteredReadOptions::basic_full_read(&fixture.dataset)
.with_allowlist_mask(Arc::new(allowlist));

fixture
.test_plan(options, &u32s(vec![1..2, 5..6, 260..261, 310..311]))
.await;
}

#[test_log::test(tokio::test)]
async fn test_allowlist_mask_with_scan_range() {
let fixture = TestFixture::new().await;

let allowlist = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter([
u64::from(RowAddress::new_from_parts(0, 20)),
u64::from(RowAddress::new_from_parts(2, 60)),
u64::from(RowAddress::new_from_parts(3, 10)),
]));

let options = FilteredReadOptions::basic_full_read(&fixture.dataset)
.with_allowlist_mask(Arc::new(allowlist))
.with_scan_range_before_filter(10..130)
.unwrap();

fixture
.test_plan(options, &u32s(vec![20..21, 260..261]))
.await;
}

#[test_log::test(tokio::test)]
async fn test_batch_size() {
let fixture = TestFixture::new().await;
Expand Down
Loading
Loading