diff --git a/.changeset/strong-spoons-promise.md b/.changeset/strong-spoons-promise.md new file mode 100644 index 000000000..3ce98d0ec --- /dev/null +++ b/.changeset/strong-spoons-promise.md @@ -0,0 +1,6 @@ +--- +"loro-crdt": patch +"loro-crdt-map": patch +--- + +feat: add JSONPath subscription #883 diff --git a/crates/loro-internal/examples/event.rs b/crates/loro-internal/examples/event.rs index f5b584bfb..b1fb85e46 100644 --- a/crates/loro-internal/examples/event.rs +++ b/crates/loro-internal/examples/event.rs @@ -1,9 +1,10 @@ use std::sync::Arc; use loro_internal::{ + cursor::PosType, event::Diff, handler::{Handler, ValueOrHandler}, - cursor::PosType, ListHandler, LoroDoc, MapHandler, TextHandler, ToJson, TreeHandler, + ListHandler, LoroDoc, MapHandler, TextHandler, ToJson, TreeHandler, }; fn main() { diff --git a/crates/loro-internal/src/container/richtext/richtext_state.rs b/crates/loro-internal/src/container/richtext/richtext_state.rs index a3ea92be8..18708701b 100644 --- a/crates/loro-internal/src/container/richtext/richtext_state.rs +++ b/crates/loro-internal/src/container/richtext/richtext_state.rs @@ -1302,11 +1302,7 @@ impl RichtextState { .unwrap_or(false) } - pub(crate) fn range_has_style_key( - &mut self, - range: Range, - key: &StyleKey, - ) -> bool { + pub(crate) fn range_has_style_key(&mut self, range: Range, key: &StyleKey) -> bool { self.check_cache(); let result = match self.style_ranges.as_ref() { Some(s) => s.range_contains_key(range, key), diff --git a/crates/loro-internal/src/cursor.rs b/crates/loro-internal/src/cursor.rs index 0df80e8d3..f3c355853 100644 --- a/crates/loro-internal/src/cursor.rs +++ b/crates/loro-internal/src/cursor.rs @@ -94,4 +94,4 @@ pub enum PosType { Event, /// The index is based on the entity index. Entity, -} \ No newline at end of file +} diff --git a/crates/loro-internal/src/jsonpath/ast.rs b/crates/loro-internal/src/jsonpath/ast.rs index 0d7a5dafa..f484755d2 100644 --- a/crates/loro-internal/src/jsonpath/ast.rs +++ b/crates/loro-internal/src/jsonpath/ast.rs @@ -2,12 +2,12 @@ use crate::jsonpath::errors::JSONPathError; use crate::jsonpath::JSONPathParser; use std::fmt::{self, Write}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Query { pub segments: Segment, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Segment { Root {}, Child { @@ -20,7 +20,7 @@ pub enum Segment { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Selector { Name { name: String, @@ -39,7 +39,7 @@ pub enum Selector { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum FilterExpression { True_ {}, False_ {}, @@ -81,13 +81,13 @@ pub enum FilterExpression { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum LogicalOperator { And, Or, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ComparisonOperator { Eq, Ne, diff --git a/crates/loro-internal/src/jsonpath/mod.rs b/crates/loro-internal/src/jsonpath/mod.rs index 330a4513e..9bbcdee9b 100644 --- a/crates/loro-internal/src/jsonpath/mod.rs +++ b/crates/loro-internal/src/jsonpath/mod.rs @@ -1,6 +1,9 @@ pub mod ast; pub mod errors; pub mod jsonpath_impl; +pub mod subscription; + +pub use subscription::SubscribeJsonPathCallback; pub mod parser; pub use ast::Query; diff --git a/crates/loro-internal/src/jsonpath/subscription.rs b/crates/loro-internal/src/jsonpath/subscription.rs new file mode 100644 index 000000000..599bc18a1 --- /dev/null +++ b/crates/loro-internal/src/jsonpath/subscription.rs @@ -0,0 +1,551 @@ +//! # JSONPath Subscription Module +//! +//! This module provides a mechanism to subscribe to changes that may affect a JSONPath query +//! result, without re-evaluating the query on every change. +//! +//! ## Design Goals +//! +//! - **Conservative matching**: The matcher may produce false positives (triggering when +//! the query result hasn't actually changed) but must never produce false negatives +//! (missing a notification when the result did change). +//! - **Lightweight notifications**: Callbacks receive no payload; applications can +//! debounce/throttle and evaluate the JSONPath themselves. +//! - **Efficient matching**: Uses an NFA (non-deterministic finite automaton) approach +//! to match event paths against compiled JSONPath queries in O(path_len × steps) time. +//! +//! ## Algorithm Overview +//! +//! 1. **Compilation**: JSONPath is parsed and converted into a sequence of lightweight +//! `MatchSelector`s (no AST storage needed for matching). +//! +//! 2. **NFA Simulation**: When an event arrives, we simulate an NFA where: +//! - States are positions in the step sequence (0..steps.len()) +//! - Transitions occur when a selector matches the current path element +//! - Recursive steps allow staying at the same state while consuming input +//! - Acceptance occurs when any state reaches or exceeds `steps.len()` + +use std::sync::Arc; + +use loro_common::TreeID; +use smallvec::SmallVec; + +use crate::{ + event::{Diff, Index}, + jsonpath::{ + ast::{Query, Segment, Selector}, + JSONPathParser, + }, + utils::subscription::Subscription, + LoroDoc, LoroError, LoroResult, +}; + +/// Callback used by `subscribe_jsonpath`. +/// +/// Note: the callback does **not** carry the query result. It is intended as a +/// lightweight notification so applications can debounce/throttle and evaluate +/// JSONPath themselves if needed. +pub type SubscribeJsonPathCallback = Arc; + +/// Represents a single path segment used for matching against events. +/// +/// Path elements are derived from event paths and represent the location of a change +/// in the document tree. They form the "input alphabet" for our NFA simulation. +#[derive(Debug, Clone, PartialEq, Eq)] +enum PathElem { + /// A map key (e.g., "title" in `$.books[0].title`) + Key(Arc), + /// A sequence (list/array) index. + /// - `Some(i)` means a specific index changed + /// - `None` means "some index changed" (used for list/tree child mutations + /// where we don't know which specific indices are affected) + Seq(Option), + /// A tree node identifier (for Loro's tree CRDT) + Node(TreeID), +} + +impl From<&Index> for PathElem { + fn from(value: &Index) -> Self { + match value { + Index::Key(k) => PathElem::Key(k.as_ref().into()), + Index::Seq(i) => PathElem::Seq(Some(*i)), + Index::Node(n) => PathElem::Node(*n), + } + } +} + +// ============================================================================= +// Lightweight Matching Types +// ============================================================================= +// +// Instead of storing the full Selector AST (which includes heavyweight +// FilterExpression trees), we convert selectors to a simple matching-only enum. +// This reduces memory usage and code complexity. + +/// Simplified selector for matching only - no AST storage needed. +/// +/// All complex selectors (Slice, Wild, Filter, negative Index) become `Wildcard` +/// since they can match any element during conservative path matching. +#[derive(Debug, Clone)] +enum MatchSelector { + /// Exact key match (from `Selector::Name`) + Name(Arc), + /// Exact non-negative index match (from `Selector::Index` where index >= 0) + Index(usize), + /// Matches anything (from Wild, Slice, Filter, or negative Index) + Wildcard, +} + +/// A single step in the linearized JSONPath query. +#[derive(Debug, Clone)] +struct Step { + /// If true, this step uses recursive descent (`..`) and can match at any depth. + recursive: bool, + /// Simplified selectors for matching. + selectors: SmallVec<[MatchSelector; 2]>, +} + +/// A compiled matcher that conservatively checks whether a change at some path +/// can affect the JSONPath query. +#[derive(Debug, Clone)] +struct JsonPathMatcher { + /// Linearized sequence of matching steps + steps: Vec, +} + +impl JsonPathMatcher { + /// Compile a JSONPath query into a matcher. + fn new(query: &Query) -> Self { + let mut steps = Vec::new(); + build_steps(&query.segments, &mut steps); + JsonPathMatcher { steps } + } + + /// Returns true if the provided path (from root) could affect the query result. + fn may_match(&self, path: &[PathElem]) -> bool { + if self.steps.is_empty() { + return true; + } + let positions = self.positions_after(path); + positions.iter().any(|&p| p >= self.steps.len()) + } + + /// Check if any position was reached by passing through a Wildcard step. + /// Wildcard steps come from filter/wild/slice selectors - any key change + /// under them could affect the query result. + #[inline] + fn passed_through_wildcard(&self, positions: &[usize]) -> bool { + positions.iter().any(|&pos| { + pos > 0 + && self.steps[pos - 1] + .selectors + .iter() + .any(|s| matches!(s, MatchSelector::Wildcard)) + }) + } + + /// Simulate the NFA on a path and return all reachable positions after consuming it. + /// + /// - Position `== steps.len()`: Query fully matched + /// - Position `< steps.len()`: Partial match + /// - Empty result: No match possible + fn positions_after(&self, path: &[PathElem]) -> SmallVec<[usize; 8]> { + let mut positions = SmallVec::<[usize; 8]>::new(); + positions.push(0); + + for elem in path { + let mut next = SmallVec::<[usize; 8]>::new(); + + for &pos in positions.iter() { + if pos >= self.steps.len() { + next.push(pos); + continue; + } + + let step = &self.steps[pos]; + + // Recursive descent: can stay at same position + if step.recursive { + next.push(pos); + } + + // If selector matches, advance to next step + if selector_matches(&step.selectors, elem) { + next.push(pos + 1); + } + } + + dedup_positions(&mut next); + if next.is_empty() { + return next; + } + positions = next; + } + + positions + } +} + +/// Check if any selector matches the given path element. +fn selector_matches(selectors: &[MatchSelector], elem: &PathElem) -> bool { + selectors.iter().any(|sel| match sel { + MatchSelector::Name(name) => matches!(elem, PathElem::Key(k) if k == name), + MatchSelector::Index(idx) => match elem { + PathElem::Seq(Some(i)) => *i == *idx, + PathElem::Seq(None) => true, // Unknown index - conservative match + _ => false, + }, + MatchSelector::Wildcard => true, + }) +} + +/// Convert AST Selector to lightweight MatchSelector. +fn to_match_selector(sel: &Selector) -> MatchSelector { + match sel { + Selector::Name { name } => MatchSelector::Name(name.as_str().into()), + Selector::Index { index } if *index >= 0 => MatchSelector::Index(*index as usize), + // Negative index, Slice, Wild, Filter all become Wildcard + _ => MatchSelector::Wildcard, + } +} + +/// Linearize the JSONPath AST into a sequence of matching steps. +fn build_steps(segment: &Segment, steps: &mut Vec) { + match segment { + Segment::Root {} => {} + Segment::Child { left, selectors } => { + build_steps(left, steps); + steps.push(Step { + recursive: false, + selectors: selectors.iter().map(to_match_selector).collect(), + }); + } + Segment::Recursive { left, selectors } => { + build_steps(left, steps); + steps.push(Step { + recursive: true, + selectors: selectors.iter().map(to_match_selector).collect(), + }); + } + } +} + +/// Deduplicate positions in-place. +#[inline] +fn dedup_positions(v: &mut SmallVec<[usize; 8]>) { + v.sort_unstable(); + v.dedup(); +} + +// ============================================================================= +// Public API +// ============================================================================= + +impl LoroDoc { + /// Subscribe to updates that may affect the given JSONPath query. + /// + /// ## Behavior + /// + /// - **Conservative matching**: The callback may fire for changes that don't + /// actually affect the query result (false positives), but it will never + /// miss a change that does affect it (no false negatives). + /// - **Lightweight notifications**: The callback receives no payload. Applications + /// should debounce/throttle and re-evaluate the JSONPath if needed. + /// + /// ## Returns + /// + /// A `Subscription` handle. Drop it to unsubscribe. + #[cfg(feature = "jsonpath")] + pub fn subscribe_jsonpath( + &self, + jsonpath: &str, + callback: SubscribeJsonPathCallback, + ) -> LoroResult { + let query = JSONPathParser::new() + .parse(jsonpath) + .map_err(|e| LoroError::ArgErr(e.to_string().into_boxed_str()))?; + + let matcher = Arc::new(JsonPathMatcher::new(&query)); + + let sub = self.subscribe_root(Arc::new(move |event| { + if event.events.is_empty() { + return; + } + + let matcher = &matcher; + let mut fired = false; + + for container_diff in event.events.iter() { + if fired { + break; + } + + // Convert the container's path to PathElem representation + let base_path: SmallVec<[PathElem; 8]> = container_diff + .path + .iter() + .map(|(_, idx)| PathElem::from(idx)) + .collect(); + + // Check 1: Does the container path itself match the query? + if matcher.may_match(&base_path) { + fired = true; + break; + } + + // Check 2: Map-specific handling - check each changed key + if let Diff::Map(map) = &container_diff.diff { + let base_positions = matcher.positions_after(&base_path); + if base_positions.is_empty() { + continue; + } + + // If we passed through a Wildcard (filter/wild/slice), any key + // change could affect the result - be conservative. + let past_wildcard = matcher.passed_through_wildcard(&base_positions); + + for key in map.updated.keys() { + let mut extended: SmallVec<[PathElem; 8]> = base_path.clone(); + extended.push(PathElem::Key(key.as_ref().into())); + + let extended_positions = matcher.positions_after(&extended); + + // Trigger if key path could match OR we're inside a wildcard selection + if !extended_positions.is_empty() || past_wildcard { + fired = true; + break; + } + } + + if fired { + break; + } + } + + // Check 3: List/Tree/Counter child mutations + // These container types can have child changes at unknown indices + let has_child_changes = matches!( + &container_diff.diff, + Diff::List(_) | Diff::Tree(_) | Diff::Unknown + ); + #[cfg(feature = "counter")] + let has_child_changes = + has_child_changes || matches!(&container_diff.diff, Diff::Counter(_)); + + if has_child_changes { + let mut extended: SmallVec<[PathElem; 8]> = base_path.clone(); + extended.push(PathElem::Seq(None)); // "some child changed" + if !matcher.positions_after(&extended).is_empty() { + fired = true; + } + } + } + + if fired { + (callback)(); + } + })); + + Ok(sub) + } +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use crate::{LoroDoc, MapHandler}; + use std::sync::atomic::{AtomicUsize, Ordering}; + + fn make_book( + doc: &LoroDoc, + idx: usize, + title: &str, + available: bool, + price: i64, + ) -> MapHandler { + let books = doc.get_list("books"); + let book = books + .insert_container(idx, MapHandler::new_detached()) + .unwrap(); + book.insert("title", title).unwrap(); + book.insert("available", available).unwrap(); + book.insert("price", price).unwrap(); + book + } + + #[test] + fn jsonpath_subscribe_triggers_on_specific_key() { + let doc = LoroDoc::new_auto_commit(); + let first_book = make_book(&doc, 0, "Old", true, 10); + doc.commit_then_renew(); + + let hit = Arc::new(AtomicUsize::new(0)); + let hit_ref = hit.clone(); + let _sub = doc + .subscribe_jsonpath( + "$.books[0].title", + Arc::new(move || { + hit_ref.fetch_add(1, Ordering::SeqCst); + }), + ) + .unwrap(); + + first_book.insert("title", "New").unwrap(); + doc.commit_then_renew(); + + assert!(hit.load(Ordering::SeqCst) >= 1); + } + + #[test] + fn jsonpath_subscribe_wildcard_on_list() { + let doc = LoroDoc::new_auto_commit(); + make_book(&doc, 0, "A", true, 10); + let second_book = make_book(&doc, 1, "B", true, 20); + doc.commit_then_renew(); + + let hit = Arc::new(AtomicUsize::new(0)); + let hit_ref = hit.clone(); + let _sub = doc + .subscribe_jsonpath( + "$.books[*].price", + Arc::new(move || { + hit_ref.fetch_add(1, Ordering::SeqCst); + }), + ) + .unwrap(); + + second_book.insert("price", 25).unwrap(); + doc.commit_then_renew(); + + assert!(hit.load(Ordering::SeqCst) >= 1); + } + + #[test] + fn jsonpath_subscribe_negative_index() { + let doc = LoroDoc::new_auto_commit(); + make_book(&doc, 0, "A", true, 10); + make_book(&doc, 1, "B", true, 20); + doc.commit_then_renew(); + + let hit = Arc::new(AtomicUsize::new(0)); + let hit_ref = hit.clone(); + let _sub = doc + .subscribe_jsonpath( + "$.books[-1].title", + Arc::new(move || { + hit_ref.fetch_add(1, Ordering::SeqCst); + }), + ) + .unwrap(); + + let books = doc.get_list("books"); + let last = books.get_child_handler(1).unwrap(); + last.as_map().unwrap().insert("title", "B updated").unwrap(); + doc.commit_then_renew(); + + assert!(hit.load(Ordering::SeqCst) >= 1); + } + + #[test] + fn jsonpath_subscribe_slice_range() { + let doc = LoroDoc::new_auto_commit(); + make_book(&doc, 0, "A", true, 10); + let second_book = make_book(&doc, 1, "B", true, 20); + make_book(&doc, 2, "C", true, 30); + doc.commit_then_renew(); + + let hit = Arc::new(AtomicUsize::new(0)); + let hit_ref = hit.clone(); + let _sub = doc + .subscribe_jsonpath( + "$.books[0:2].title", + Arc::new(move || { + hit_ref.fetch_add(1, Ordering::SeqCst); + }), + ) + .unwrap(); + + second_book.insert("title", "B updated").unwrap(); + doc.commit_then_renew(); + + assert!(hit.load(Ordering::SeqCst) >= 1); + } + + #[test] + fn jsonpath_subscribe_recursive() { + let doc = LoroDoc::new_auto_commit(); + let store = doc.get_map("store"); + let nested = store + .insert_container("inventory", MapHandler::new_detached()) + .unwrap(); + nested.insert("total", 3).unwrap(); + doc.commit_then_renew(); + + let hit = Arc::new(AtomicUsize::new(0)); + let hit_ref = hit.clone(); + let _sub = doc + .subscribe_jsonpath( + "$..total", + Arc::new(move || { + hit_ref.fetch_add(1, Ordering::SeqCst); + }), + ) + .unwrap(); + + nested.insert("total", 4).unwrap(); + doc.commit_then_renew(); + + assert!(hit.load(Ordering::SeqCst) >= 1); + } + + #[test] + fn jsonpath_subscribe_filter_treated_as_wildcard() { + let doc = LoroDoc::new_auto_commit(); + make_book(&doc, 0, "A", true, 10); + let second_book = make_book(&doc, 1, "B", false, 20); + doc.commit_then_renew(); + + let hit = Arc::new(AtomicUsize::new(0)); + let hit_ref = hit.clone(); + let _sub = doc + .subscribe_jsonpath( + "$.books[?@.available].title", + Arc::new(move || { + hit_ref.fetch_add(1, Ordering::SeqCst); + }), + ) + .unwrap(); + + second_book.insert("available", true).unwrap(); + doc.commit_then_renew(); + + assert!(hit.load(Ordering::SeqCst) >= 1); + } + + #[test] + fn jsonpath_subscribe_triggers_once_per_commit() { + let doc = LoroDoc::new_auto_commit(); + let book = make_book(&doc, 0, "A", true, 10); + doc.commit_then_renew(); + + let hit = Arc::new(AtomicUsize::new(0)); + let hit_ref = hit.clone(); + let _sub = doc + .subscribe_jsonpath( + "$.books[0].title", + Arc::new(move || { + hit_ref.fetch_add(1, Ordering::SeqCst); + }), + ) + .unwrap(); + + book.insert("title", "X").unwrap(); + book.insert("title", "Y").unwrap(); + doc.commit_then_renew(); + + assert_eq!(hit.load(Ordering::SeqCst), 1); + } +} diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 3e08ea67e..db696acd0 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -2115,8 +2115,8 @@ impl Default for CommitOptions { #[cfg(test)] mod test { - use loro_common::ID; use crate::{cursor::PosType, loro::ExportMode, version::Frontiers, LoroDoc, ToJson}; + use loro_common::ID; #[test] fn test_sync() { diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index ecb75d79a..b79fe6550 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -1550,11 +1550,11 @@ impl ChangesBlockBytes { #[cfg(test)] mod test { + use crate::cursor::PosType; use crate::{ loro::ExportMode, oplog::convert_change_to_remote, state::TreeParentId, ListHandler, LoroDoc, MovableListHandler, TextHandler, TreeHandler, }; - use crate::cursor::PosType; use super::*; diff --git a/crates/loro-internal/src/oplog/change_store/block_encode.rs b/crates/loro-internal/src/oplog/change_store/block_encode.rs index 0ca09d594..48fec3793 100644 --- a/crates/loro-internal/src/oplog/change_store/block_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/block_encode.rs @@ -683,13 +683,9 @@ mod test { .insert(2, "He", PosType::Unicode) .unwrap(); diagnose(&doc); - doc.get_text("text") - .delete(1, 4, PosType::Unicode) - .unwrap(); + doc.get_text("text").delete(1, 4, PosType::Unicode).unwrap(); diagnose(&doc); - doc.get_text("text") - .delete(0, 2, PosType::Unicode) - .unwrap(); + doc.get_text("text").delete(0, 2, PosType::Unicode).unwrap(); diagnose(&doc); } diff --git a/crates/loro-internal/tests/richtext.rs b/crates/loro-internal/tests/richtext.rs index 92ab6f9c6..553c8e594 100644 --- a/crates/loro-internal/tests/richtext.rs +++ b/crates/loro-internal/tests/richtext.rs @@ -78,13 +78,7 @@ fn unmark(doc: &LoroDoc, range: Range, kind: Kind) { fn mark_kv(doc: &LoroDoc, range: Range, key: &str, value: impl Into) { let richtext = doc.get_text("r"); richtext - .mark( - range.start, - range.end, - key, - value.into(), - PosType::Event, - ) + .mark(range.start, range.end, key, value.into(), PosType::Event) .unwrap(); } diff --git a/crates/loro-internal/tests/test.rs b/crates/loro-internal/tests/test.rs index 6e8d3ca99..09886b1f5 100644 --- a/crates/loro-internal/tests/test.rs +++ b/crates/loro-internal/tests/test.rs @@ -1177,10 +1177,7 @@ fn test_text_splice() { let doc = LoroDoc::new_auto_commit(); let text = doc.get_text("text"); text.insert_unicode(0, "你好").unwrap(); - assert_eq!( - text.splice(1, 1, "世界", PosType::Unicode).unwrap(), - "好" - ); + assert_eq!(text.splice(1, 1, "世界", PosType::Unicode).unwrap(), "好"); assert_eq!(text.to_string(), "你世界"); } diff --git a/crates/loro-internal/tests/undo.rs b/crates/loro-internal/tests/undo.rs index 2465b5bab..0641e5993 100644 --- a/crates/loro-internal/tests/undo.rs +++ b/crates/loro-internal/tests/undo.rs @@ -1,6 +1,8 @@ use std::borrow::Cow; -use loro_internal::{cursor::PosType, handler::UpdateOptions, loro::ExportMode, LoroDoc, UndoManager}; +use loro_internal::{ + cursor::PosType, handler::UpdateOptions, loro::ExportMode, LoroDoc, UndoManager, +}; #[test] fn test_basic_undo_group_checkpoint() { @@ -141,8 +143,7 @@ fn test_undo_group_start_with_remote_ops() { .unwrap(); doc2.import(&doc.export(ExportMode::Snapshot).unwrap()) .unwrap(); - doc2 - .get_text("text") + doc2.get_text("text") .insert(0, "test", PosType::Unicode) .unwrap(); doc.import(&doc2.export(ExportMode::Snapshot).unwrap()) diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index 21ca83dd3..47bffe1ce 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -1301,6 +1301,24 @@ impl LoroDoc { Ok(ans) } + /// Subscribe to changes that may affect a JSONPath query. + /// + /// The callback receives no query result; it is a lightweight notifier and may + /// fire false positives so callers can debounce/throttle before running JSONPath + /// themselves. + #[wasm_bindgen(js_name = "subscribeJsonpath", skip_typescript)] + pub fn subscribe_jsonpath(&self, jsonpath: &str, f: js_sys::Function) -> JsResult { + let observer = observer::Observer::new(f); + let sub = self.doc.subscribe_jsonpath( + jsonpath, + Arc::new(move || { + enqueue_pending_call(observer.clone(), Vec::new()); + }), + )?; + + Ok(subscription_to_js_function_callback(sub)) + } + /// Get the version vector of the current document state. /// /// If you checkout to a specific version, the version vector will change. @@ -6481,6 +6499,12 @@ interface Listener { interface LoroDoc { subscribe(listener: Listener): Subscription; + /** + * Subscribe to changes that may affect a JSONPath query. + * Callback may fire false positives and carries no query result. + * You can debounce/throttle the callback before running `JSONPath(...)` to optimize heavy reads. + */ + subscribeJsonpath(path: string, callback: () => void): Subscription; } interface UndoManager { @@ -6518,6 +6542,12 @@ interface UndoManager { groupEnd(): void; } interface LoroDoc = Record> { + /** + * Subscribe to changes that may affect a JSONPath query. + * Callback may fire false positives and carries no query result. + * You can debounce/throttle the callback before running `JSONPath(...)` to optimize heavy reads. + */ + subscribeJsonpath(path: string, callback: () => void): Subscription; /** * Get a LoroMap by container id * diff --git a/crates/loro-wasm/tests/jsonpath.test.ts b/crates/loro-wasm/tests/jsonpath.test.ts index 0293cb542..ee2da8b64 100644 --- a/crates/loro-wasm/tests/jsonpath.test.ts +++ b/crates/loro-wasm/tests/jsonpath.test.ts @@ -1,4 +1,4 @@ -import { LoroDoc } from "../bundler/index"; +import { LoroDoc, LoroMap } from "../bundler/index"; import { beforeEach, describe, expect, it } from "vitest"; describe("JSONPath", () => { @@ -311,6 +311,46 @@ describe("JSONPath", () => { }); }); + describe("jsonpath subscribe", () => { + it("triggers on matching change and can unsubscribe", () => { + let hit = 0; + const unsubscribe = doc.subscribeJsonpath("$.store.books[0].title", () => { + hit += 1; + }); + + const store = doc.getMap("store"); + const books = store.get("books") as any[]; + books[0] = { ...books[0], title: "Nineteen Eighty-Four" }; + books[0] = { ...books[0], title: "1984 (second)" }; // same commit second change + store.set("books", books); + doc.commit(); + expect(hit).toBe(1); // coalesced within commit + + unsubscribe(); + books[0] = { ...books[0], title: "1984 (second)" }; + store.set("books", books); + doc.commit(); + expect(hit).toBeGreaterThanOrEqual(1); + }); + + it("callback carries no args so caller can debounce/throttle then run JSONPath", () => { + const received: unknown[][] = []; + doc.subscribeJsonpath("$.store.books[*].title", (...args: unknown[]) => { + received.push(args); + }); + + const store = doc.getMap("store"); + const books = store.get("books") as any[]; + books.push({ title: "New Book", author: "Tester", price: 1, available: true }); + books.push({ title: "Another", author: "Tester", price: 2, available: true }); + store.set("books", books); + doc.commit(); + + expect(received.length).toBe(1); + expect(received[0].length).toBe(0); + }); + }); + describe("edge cases and error handling", () => { it.todo("handles quoted keys with special characters", () => { const specialDoc = new LoroDoc(); diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index df61c1b61..99a5b0a33 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -83,6 +83,8 @@ pub use loro_kv_store as kv_store; #[cfg(feature = "jsonpath")] pub use loro_internal::jsonpath; +#[cfg(feature = "jsonpath")] +pub use loro_internal::jsonpath::SubscribeJsonPathCallback; #[cfg(feature = "counter")] mod counter; @@ -1284,6 +1286,22 @@ impl LoroDoc { .map(|vec| vec.into_iter().map(ValueOrContainer::from).collect()) } + /// Subscribe to updates that might affect the given JSONPath query. + /// + /// The callback: + /// - may fire **false positives** (never false negatives) to stay lightweight; + /// - does **not** include the query result so the caller can debounce/throttle + /// and run JSONPath themselves if desired; + /// - can be debounced/throttled before executing an expensive JSONPath read. + #[cfg(feature = "jsonpath")] + pub fn subscribe_jsonpath( + &self, + jsonpath: &str, + callback: SubscribeJsonPathCallback, + ) -> LoroResult { + self.doc.subscribe_jsonpath(jsonpath, callback) + } + /// Get the number of operations in the pending transaction. /// /// The pending transaction is the one that is not committed yet. It will be committed @@ -2307,8 +2325,7 @@ impl LoroText { /// Delete specified character and insert string at the same position at given unicode position. pub fn splice(&self, pos: usize, len: usize, s: &str) -> LoroResult { - self.handler - .splice(pos, len, s, cursor::PosType::Unicode) + self.handler.splice(pos, len, s, cursor::PosType::Unicode) } /// Delete specified range and insert a string at the same UTF-16 position. @@ -2395,14 +2412,13 @@ impl LoroText { key: &str, value: impl Into, ) -> LoroResult<()> { - self.handler - .mark( - range.start, - range.end, - key, - value.into(), - cursor::PosType::Unicode, - ) + self.handler.mark( + range.start, + range.end, + key, + value.into(), + cursor::PosType::Unicode, + ) } /// Mark a range of text with a key-value pair using UTF-8 byte offsets. @@ -2412,14 +2428,13 @@ impl LoroText { key: &str, value: impl Into, ) -> LoroResult<()> { - self.handler - .mark( - range.start, - range.end, - key, - value.into(), - cursor::PosType::Bytes, - ) + self.handler.mark( + range.start, + range.end, + key, + value.into(), + cursor::PosType::Bytes, + ) } /// Mark a range of text with a key-value pair using UTF-16 code unit offsets. @@ -2429,14 +2444,13 @@ impl LoroText { key: &str, value: impl Into, ) -> LoroResult<()> { - self.handler - .mark( - range.start, - range.end, - key, - value.into(), - cursor::PosType::Utf16, - ) + self.handler.mark( + range.start, + range.end, + key, + value.into(), + cursor::PosType::Utf16, + ) } /// Unmark a range of text with a key and a value. diff --git a/crates/loro/tests/text.rs b/crates/loro/tests/text.rs index 3408ad08d..4570d803c 100644 --- a/crates/loro/tests/text.rs +++ b/crates/loro/tests/text.rs @@ -20,31 +20,31 @@ fn test_slice_delta() { let text = doc.get_text("text"); text.insert(0, "Hello world!").unwrap(); text.mark(0..5, "bold", true).unwrap(); - + // Test slice_delta let delta = text.slice_delta(0, 12, PosType::Unicode).unwrap(); println!("{:?}", delta); - + assert_eq!(delta.len(), 2); - + match &delta[0] { TextDelta::Insert { insert, attributes } => { - assert_eq!(insert, "Hello"); - let attrs = attributes.as_ref().unwrap(); - assert!(attrs.contains_key("bold")); - assert_eq!(attrs.get("bold").unwrap(), &true.into()); + assert_eq!(insert, "Hello"); + let attrs = attributes.as_ref().unwrap(); + assert!(attrs.contains_key("bold")); + assert_eq!(attrs.get("bold").unwrap(), &true.into()); } _ => panic!("Expected Insert, got {:?}", delta[0]), } - + match &delta[1] { TextDelta::Insert { insert, attributes } => { - assert_eq!(insert, " world!"); - assert!(attributes.is_none()); + assert_eq!(insert, " world!"); + assert!(attributes.is_none()); } _ => panic!("Expected Insert, got {:?}", delta[1]), } - + // Test slice_delta with partial range let delta = text.slice_delta(2, 8, PosType::Unicode).unwrap(); // "llo wo" @@ -52,16 +52,16 @@ fn test_slice_delta() { assert_eq!(delta.len(), 2); match &delta[0] { TextDelta::Insert { insert, attributes } => { - assert_eq!(insert, "llo"); - let attrs = attributes.as_ref().unwrap(); - assert!(attrs.contains_key("bold")); + assert_eq!(insert, "llo"); + let attrs = attributes.as_ref().unwrap(); + assert!(attrs.contains_key("bold")); } _ => panic!("Expected Insert, got {:?}", delta[0]), } match &delta[1] { TextDelta::Insert { insert, attributes } => { - assert_eq!(insert, " wo"); - assert!(attributes.is_none()); + assert_eq!(insert, " wo"); + assert!(attributes.is_none()); } _ => panic!("Expected Insert, got {:?}", delta[1]), } @@ -82,7 +82,7 @@ fn test_slice_delta_overlapping() { // 234: bold, italic // 56: italic // 7: none - + let delta = text.slice_delta(1, 8, PosType::Unicode).unwrap(); assert_eq!(delta.len(), 4); @@ -92,7 +92,9 @@ fn test_slice_delta_overlapping() { let attrs = attributes.as_ref().unwrap(); assert!(attrs.contains_key("bold")); assert!(!attrs.contains_key("italic")); - } else { panic!("Expected segment 1") } + } else { + panic!("Expected segment 1") + } // "234" if let TextDelta::Insert { insert, attributes } = &delta[1] { @@ -100,7 +102,9 @@ fn test_slice_delta_overlapping() { let attrs = attributes.as_ref().unwrap(); assert!(attrs.contains_key("bold")); assert!(attrs.contains_key("italic")); - } else { panic!("Expected segment 234") } + } else { + panic!("Expected segment 234") + } // "56" if let TextDelta::Insert { insert, attributes } = &delta[2] { @@ -108,13 +112,17 @@ fn test_slice_delta_overlapping() { let attrs = attributes.as_ref().unwrap(); assert!(!attrs.contains_key("bold")); assert!(attrs.contains_key("italic")); - } else { panic!("Expected segment 56") } + } else { + panic!("Expected segment 56") + } // "7" if let TextDelta::Insert { insert, attributes } = &delta[3] { assert_eq!(insert, "7"); assert!(attributes.is_none()); - } else { panic!("Expected segment 7") } + } else { + panic!("Expected segment 7") + } } #[test] @@ -129,18 +137,22 @@ fn test_slice_delta_unicode() { // Slice "好W" (index 1 to 3) let delta = text.slice_delta(1, 3, PosType::Unicode).unwrap(); assert_eq!(delta.len(), 2); - + // "好" if let TextDelta::Insert { insert, attributes } = &delta[0] { assert_eq!(insert, "好"); assert!(attributes.as_ref().unwrap().contains_key("bold")); - } else { panic!("Expected segment '好'") } + } else { + panic!("Expected segment '好'") + } // "W" if let TextDelta::Insert { insert, attributes } = &delta[1] { assert_eq!(insert, "W"); assert!(attributes.is_none()); - } else { panic!("Expected segment 'W'") } + } else { + panic!("Expected segment 'W'") + } } #[test] @@ -150,15 +162,17 @@ fn test_slice_delta_with_deletion() { text.insert(0, "01234").unwrap(); text.mark(0..5, "bold", true).unwrap(); text.delete(2, 2).unwrap(); // delete "23" - // Now "014" (all bold) - + // Now "014" (all bold) + let delta = text.slice_delta(0, 3, PosType::Unicode).unwrap(); assert_eq!(delta.len(), 1); - + if let TextDelta::Insert { insert, attributes } = &delta[0] { assert_eq!(insert, "014"); assert!(attributes.as_ref().unwrap().contains_key("bold")); - } else { panic!("Expected combined segment after deletion") } + } else { + panic!("Expected combined segment after deletion") + } } #[test] @@ -167,30 +181,36 @@ fn test_slice_delta_unicode_boundaries() { let text = doc.get_text("text"); // "😀" is 1 char (scalar) in Rust chars(). text.insert(0, "A😀B").unwrap(); - + // Mark "😀" (index 1 to 2) text.mark(1..2, "bold", true).unwrap(); - + let delta = text.slice_delta(0, 3, PosType::Unicode).unwrap(); assert_eq!(delta.len(), 3); - + // "A" if let TextDelta::Insert { insert, attributes } = &delta[0] { assert_eq!(insert, "A"); assert!(attributes.is_none()); - } else { panic!("Expected 'A'") } - + } else { + panic!("Expected 'A'") + } + // "😀" if let TextDelta::Insert { insert, attributes } = &delta[1] { assert_eq!(insert, "😀"); assert!(attributes.as_ref().unwrap().contains_key("bold")); - } else { panic!("Expected Emoji") } - + } else { + panic!("Expected Emoji") + } + // "B" if let TextDelta::Insert { insert, attributes } = &delta[2] { assert_eq!(insert, "B"); assert!(attributes.is_none()); - } else { panic!("Expected 'B'") } + } else { + panic!("Expected 'B'") + } } #[test] @@ -200,21 +220,32 @@ fn test_slice_delta_discontinuous_styles() { text.insert(0, "AB").unwrap(); text.mark(0..1, "bold", true).unwrap(); // A bold text.mark(1..2, "bold", true).unwrap(); // B bold - // Even though they are applied separately, they might merge if they are adjacent and same. - // Let's see if Loro merges adjacent identical styles. - // Usually they should merge into one span if attributes are equal. - + // Even though they are applied separately, they might merge if they are adjacent and same. + // Let's see if Loro merges adjacent identical styles. + // Usually they should merge into one span if attributes are equal. + let delta = text.slice_delta(0, 2, PosType::Unicode).unwrap(); // Depends on implementation. Loro text delta usually merges adjacent same attributes. // If so, len is 1. if delta.len() == 1 { - if let TextDelta::Insert { insert, attributes } = &delta[0] { - assert_eq!(insert, "AB", "Expected merged segment 'AB', got '{}'", insert); + if let TextDelta::Insert { insert, attributes } = &delta[0] { + assert_eq!( + insert, "AB", + "Expected merged segment 'AB', got '{}'", + insert + ); assert!(attributes.as_ref().unwrap().contains_key("bold")); - } else { panic!("Expected merged segment") } + } else { + panic!("Expected merged segment") + } } else { // If not merged - assert_eq!(delta.len(), 2, "Expected 1 or 2 segments, got {}", delta.len()); + assert_eq!( + delta.len(), + 2, + "Expected 1 or 2 segments, got {}", + delta.len() + ); if let TextDelta::Insert { insert, attributes } = &delta[0] { assert_eq!(insert, "A"); assert!(attributes.as_ref().unwrap().contains_key("bold")); @@ -403,10 +434,7 @@ fn convert_pos_across_coord_systems() { ); // Out of bounds yields None - assert_eq!( - text.convert_pos(10, PosType::Unicode, PosType::Utf16), - None - ); + assert_eq!(text.convert_pos(10, PosType::Unicode, PosType::Utf16), None); } #[test]