diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index afbc900ac0a..13eafb01e63 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, bail}; use std::borrow::Borrow; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::fmt::{self, Debug}; use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; @@ -291,114 +291,52 @@ impl EntityCache { causality_region: eref.causality_region, }; - let mut entity_map = self.store.get_derived(&query).await?; + // Entities that satisfied the query at the start of this block. + let stored = self.store.get_derived(&query).await?; - for (key, entity) in entity_map.iter() { - // Only insert to the cache if it's not already there + for (key, entity) in &stored { if !self.current.contains_key(key) { self.current .insert(key.clone(), Some(Arc::new(entity.clone()))); } } - let mut keys_to_remove = Vec::new(); - - // Apply updates from `updates` and `handler_updates` directly to entities in `entity_map` that match the query - for (key, entity) in entity_map.iter_mut() { - let op = match ( - self.updates.get(key).cloned(), - self.handler_updates.get(key).cloned(), - ) { - (Some(op), None) | (None, Some(op)) => op, - (Some(mut op), Some(op2)) => { - op.accumulate(op2); - op - } - (None, None) => continue, - }; - - let updated_entity = op - .apply_to(&Some(&*entity)) - .map_err(|e| key.unknown_attribute(e))?; - - if let Some(updated_entity) = updated_entity { - *entity = updated_entity; - } else { - // if entity_arc is None, it means that the entity was removed by an update - // mark the key for removal from the map - keys_to_remove.push(key.clone()); + // Candidate set: keys that were matching at baseline, plus keys + // any in-block write has touched whose entity_type and causality + // region are compatible with the query. The latter catches + // entities an in-block write has moved into the matching set or + // created fresh in this block. + let mut candidates: BTreeSet = stored.keys().cloned().collect(); + for key in self.updates.keys().chain(self.handler_updates.keys()) { + if key.entity_type == query.entity_type + && key.causality_region == query.causality_region + { + candidates.insert(key.clone()); } } - // A helper function that checks if an update matches the query and returns the updated entity if it does - fn matches_query( - op: &EntityOp, - query: &DerivedEntityQuery, - key: &EntityKey, - ) -> Result, anyhow::Error> { - match op { - EntityOp::Update(entity) | EntityOp::Overwrite(entity) - if query.matches(key, entity) => - { - Ok(Some(entity.clone())) - } - EntityOp::Remove => Ok(None), - _ => Ok(None), + let mut result = Vec::new(); + for key in candidates { + // Resolve the entity's final in-block state by layering + // store baseline, then self.updates, then self.handler_updates. + // Each layer's op may mutate, replace, or remove the entity. + let mut entity: Option = stored.get(&key).cloned(); + if let Some(op) = self.updates.get(&key).cloned() { + entity = op.apply_to(&entity).map_err(|e| key.unknown_attribute(e))?; } - } - - // Iterate over self.updates to find entities that: - // - Aren't already present in the entity_map - // - Match the query - // If these conditions are met: - // - Check if there's an update for the same entity in handler_updates and apply it. - // - Add the entity to entity_map. - for (key, op) in self.updates.iter() { - if !entity_map.contains_key(key) - && let Some(entity) = matches_query(op, &query, key)? - { - if let Some(handler_op) = self.handler_updates.get(key).cloned() { - // If there's a corresponding update in handler_updates, apply it to the entity - // and insert the updated entity into entity_map - let mut entity = Some(entity); - entity = handler_op - .apply_to(&entity) - .map_err(|e| key.unknown_attribute(e))?; - - if let Some(updated_entity) = entity { - entity_map.insert(key.clone(), updated_entity); - } - } else { - // If there isn't a corresponding update in handler_updates or the update doesn't match the query, just insert the entity from self.updates - entity_map.insert(key.clone(), entity); - } + if let Some(op) = self.handler_updates.get(&key).cloned() { + entity = op.apply_to(&entity).map_err(|e| key.unknown_attribute(e))?; } - } - // Iterate over handler_updates to find entities that: - // - Aren't already present in the entity_map. - // - Aren't present in self.updates. - // - Match the query. - // If these conditions are met, add the entity to entity_map. - for (key, handler_op) in self.handler_updates.iter() { - if !entity_map.contains_key(key) - && !self.updates.contains_key(key) - && let Some(entity) = matches_query(handler_op, &query, key)? + // Include the entity only if its final state still matches the query. + if let Some(entity) = entity + && query.matches(&key, &entity) { - entity_map.insert(key.clone(), entity); + result.push(entity); } } - // Remove entities that are in the store but have been removed by an update. - // We do this last since the loops over updates and handler_updates are only - // concerned with entities that are not in the store yet and by leaving removed - // keys in entity_map we avoid processing these updates a second time when we - // already looked at them when we went through entity_map - for key in keys_to_remove { - entity_map.remove(&key); - } - - Ok(entity_map.into_values().collect()) + Ok(result) } pub fn remove(&mut self, key: EntityKey) { diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 412037f7c1b..39053757f5a 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -2,9 +2,10 @@ use async_trait::async_trait; use graph::blockchain::BlockTime; use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::{ - DeploymentCursorTracker, DerivedEntityQuery, GetScope, LoadRelatedRequest, ReadStore, - StoredDynamicDataSource, WritableStore, + DeploymentCursorTracker, DerivedEntityQuery, EntityLfuCache, GetScope, LoadRelatedRequest, + ReadStore, StoredDynamicDataSource, WritableStore, }; +use graph::components::subgraph::BlockState; use graph::data::store::Id; use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, SubgraphHealth}; use graph::data_source::CausalityRegion; @@ -983,3 +984,379 @@ fn no_interface_mods() { cache.set(key, entity, None).await.unwrap_err(); }) } + +// --------------------------------------------------------------------------- +// Tests for `EntityCache::load_related` under same-block changes to +// derived field membership. Each test exercises a write that should +// move an entity into or out of an `account.wallets` derived +// collection (reassignment, removal, or fresh creation), asserting +// that the returned collection reflects the entity's final state. +// +// Fixture (from `insert_test_data`): +// - account "1" owns wallets "1", "2", "3" +// - account "2" owns wallet "4" +// - account "3" owns no wallets +// +// Tests that need writes in `EntityCache::handler_updates` (rather than +// `updates`) drive a handler lifecycle through `BlockState::enter_handler` +// / `exit_handler`. `EntityCache` has two in-block write layers: writes +// outside a handler go to `self.updates`; writes inside an active +// handler go to `self.handler_updates`. `exit_handler` merges +// `handler_updates` into `updates`. Driving this lifecycle requires +// `BlockState` because the `enter`/`exit` methods on `EntityCache` are +// crate-private. +// --------------------------------------------------------------------------- + +/// Wallet "1" is reassigned from account "1" to account "3" within a +/// single handler. Account "1"'s derived `wallets` collection must +/// reflect the final state and exclude wallet "1". +#[test] +fn load_related_excludes_wallet_that_leaves_collection() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let acc_1 = ACCOUNT_TYPE.parse_id("1").unwrap(); + let acc_3 = ACCOUNT_TYPE.parse_id("3").unwrap(); + let wallet_1_key = WALLET_TYPE.parse_key("1").unwrap(); + + // Precondition: fixture placed wallet "1" under account "1". + // The `set` below is a reassignment of an existing row. + let pre = cache + .get(&wallet_1_key, GetScope::Store) + .await + .unwrap() + .expect("fixture should have inserted wallet 1"); + assert_eq!(pre.get("account").unwrap(), &Value::from(acc_1.clone())); + + let reassigned = create_wallet_entity_no_vid("1", &acc_3, 67_i32); + cache.set(wallet_1_key, reassigned, None).await.unwrap(); + + let request = LoadRelatedRequest { + entity_type: ACCOUNT_TYPE.clone(), + entity_field: "wallets".into(), + entity_id: acc_1.clone(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).await.unwrap(); + + let result_ids: Vec<_> = result + .iter() + .map(|e| e.get("id").unwrap().clone()) + .collect(); + + assert_eq!( + result.len(), + 2, + "account 1 should own 2 wallets after reassignment, got {:?}", + result_ids, + ); + for w in &result { + assert_eq!( + w.get("account").unwrap(), + &Value::from(acc_1.clone()), + "every wallet in account 1's collection must have account=1, got {:?}", + w, + ); + } + }); +} + +/// Wallet "4" is moved from account "2" to account "3" in handler 1, +/// then back to account "2" in handler 2. Account "3"'s derived +/// `wallets` collection must reflect the final state and exclude +/// wallet "4". +#[test] +fn load_related_excludes_wallet_that_leaves_collection_across_handlers() { + run_store_test(|_cache, _store, _deployment, writable| async move { + let acc_2 = ACCOUNT_TYPE.parse_id("2").unwrap(); + let acc_3 = ACCOUNT_TYPE.parse_id("3").unwrap(); + let wallet_4_key = WALLET_TYPE.parse_key("4").unwrap(); + + let mut state = BlockState::new( + writable.clone(), + EntityLfuCache::new(), + SeqGenerator::new(10), + ); + + // Precondition: fixture placed wallet "4" under account "2". + let pre = writable + .get(&wallet_4_key) + .await + .unwrap() + .expect("fixture should have inserted wallet 4"); + assert_eq!(pre.get("account").unwrap(), &Value::from(acc_2.clone())); + + // Handler 1: move wallet "4" to account "3". On exit, this + // write is promoted from handler_updates into updates. + state.enter_handler(); + state + .entity_cache + .set( + wallet_4_key.clone(), + create_wallet_entity_no_vid("4", &acc_3, 32_i32), + None, + ) + .await + .unwrap(); + state.exit_handler(); + + // Handler 2: move wallet "4" back to account "2". This write + // is in handler_updates; the prior handler's write is in updates. + state.enter_handler(); + state + .entity_cache + .set( + wallet_4_key, + create_wallet_entity_no_vid("4", &acc_2, 32_i32), + None, + ) + .await + .unwrap(); + + let request = LoadRelatedRequest { + entity_type: ACCOUNT_TYPE.clone(), + entity_field: "wallets".into(), + entity_id: acc_3.clone(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = state.entity_cache.load_related(&request).await.unwrap(); + + assert!( + result.is_empty(), + "account 3 should own no wallets (wallet 4's final account is 2), got {:?}", + result, + ); + }); +} + +/// Wallet "4" is touched in handler 1 without changing its `account`, +/// then reassigned from account "2" to account "3" in handler 2. +/// Account "3"'s derived `wallets` collection must reflect the final +/// state and include wallet "4". +#[test] +fn load_related_includes_wallet_that_joins_collection_across_handlers() { + run_store_test(|_cache, _store, _deployment, writable| async move { + let acc_2 = ACCOUNT_TYPE.parse_id("2").unwrap(); + let acc_3 = ACCOUNT_TYPE.parse_id("3").unwrap(); + let wallet_4_key = WALLET_TYPE.parse_key("4").unwrap(); + + let mut state = BlockState::new( + writable.clone(), + EntityLfuCache::new(), + SeqGenerator::new(10), + ); + + // Precondition: fixture placed wallet "4" under account "2". + let pre = writable + .get(&wallet_4_key) + .await + .unwrap() + .expect("fixture should have inserted wallet 4"); + assert_eq!(pre.get("account").unwrap(), &Value::from(acc_2.clone())); + + // Handler 1: touch wallet "4" (bump balance) but keep its account. + // On exit, this write is promoted into updates. + state.enter_handler(); + state + .entity_cache + .set( + wallet_4_key.clone(), + create_wallet_entity_no_vid("4", &acc_2, 999_i32), + None, + ) + .await + .unwrap(); + state.exit_handler(); + + // Handler 2: reassign wallet "4" to account "3". This write is + // in handler_updates; handler 1's write remains in updates. + state.enter_handler(); + state + .entity_cache + .set( + wallet_4_key.clone(), + create_wallet_entity_no_vid("4", &acc_3, 999_i32), + None, + ) + .await + .unwrap(); + + let request = LoadRelatedRequest { + entity_type: ACCOUNT_TYPE.clone(), + entity_field: "wallets".into(), + entity_id: acc_3.clone(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = state.entity_cache.load_related(&request).await.unwrap(); + + assert_eq!( + result.len(), + 1, + "account 3 should own wallet 4 after reassignment, got {:?}", + result, + ); + assert_eq!(result[0].get("id").unwrap(), &Value::from("4"),); + assert_eq!( + result[0].get("account").unwrap(), + &Value::from(acc_3.clone()), + "wallet 4 must carry handler 2's post-merge account=3", + ); + }); +} + +/// Wallet "1" is removed via a write that lands directly in +/// `EntityCache::updates` (no active handler). Account "1"'s derived +/// `wallets` collection must reflect the final state and exclude +/// wallet "1". +#[test] +fn load_related_excludes_wallet_removed_via_updates() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let acc_1 = ACCOUNT_TYPE.parse_id("1").unwrap(); + let wallet_1_key = WALLET_TYPE.parse_key("1").unwrap(); + + // Precondition: fixture placed wallet "1" under account "1". + let pre = cache + .get(&wallet_1_key, GetScope::Store) + .await + .unwrap() + .expect("fixture should have inserted wallet 1"); + assert_eq!(pre.get("account").unwrap(), &Value::from(acc_1.clone())); + + cache.remove(wallet_1_key); + + let request = LoadRelatedRequest { + entity_type: ACCOUNT_TYPE.clone(), + entity_field: "wallets".into(), + entity_id: acc_1.clone(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).await.unwrap(); + + let result_ids: Vec<_> = result + .iter() + .map(|e| e.get("id").unwrap().clone()) + .collect(); + + assert_eq!( + result.len(), + 2, + "account 1 should own 2 wallets after wallet 1 is removed, got {:?}", + result_ids, + ); + assert!( + !result_ids.contains(&Value::from("1")), + "removed wallet 1 must not appear, got {:?}", + result_ids, + ); + }); +} + +/// Wallet "1" is removed inside an active handler so the Remove op +/// sits in `EntityCache::handler_updates` at query time. Account "1"'s +/// derived `wallets` collection must reflect the final state and +/// exclude wallet "1". +#[test] +fn load_related_excludes_wallet_removed_via_handler_updates() { + run_store_test(|_cache, _store, _deployment, writable| async move { + let acc_1 = ACCOUNT_TYPE.parse_id("1").unwrap(); + let wallet_1_key = WALLET_TYPE.parse_key("1").unwrap(); + + let mut state = BlockState::new( + writable.clone(), + EntityLfuCache::new(), + SeqGenerator::new(10), + ); + + // Precondition: fixture placed wallet "1" under account "1". + let pre = writable + .get(&wallet_1_key) + .await + .unwrap() + .expect("fixture should have inserted wallet 1"); + assert_eq!(pre.get("account").unwrap(), &Value::from(acc_1.clone())); + + // Remove wallet "1" inside an active handler. The Remove op + // sits in handler_updates and is queried before exit_handler. + state.enter_handler(); + state.entity_cache.remove(wallet_1_key); + + let request = LoadRelatedRequest { + entity_type: ACCOUNT_TYPE.clone(), + entity_field: "wallets".into(), + entity_id: acc_1.clone(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = state.entity_cache.load_related(&request).await.unwrap(); + + let result_ids: Vec<_> = result + .iter() + .map(|e| e.get("id").unwrap().clone()) + .collect(); + + assert_eq!( + result.len(), + 2, + "account 1 should own 2 wallets after wallet 1 is removed, got {:?}", + result_ids, + ); + assert!( + !result_ids.contains(&Value::from("1")), + "removed wallet 1 must not appear, got {:?}", + result_ids, + ); + }); +} + +/// A wallet is created inside a handler with no prior store baseline. +/// `load_related` called from the same handler must observe the new +/// wallet in its parent account's derived collection. +#[test] +fn load_related_includes_wallet_created_in_handler() { + run_store_test(|_cache, _store, _deployment, writable| async move { + let acc_3 = ACCOUNT_TYPE.parse_id("3").unwrap(); + let new_wallet_key = WALLET_TYPE.parse_key("99").unwrap(); + + let mut state = BlockState::new( + writable.clone(), + EntityLfuCache::new(), + SeqGenerator::new(10), + ); + + // Precondition: account "3" starts with no wallets and + // wallet "99" does not exist in the store. + assert!( + writable.get(&new_wallet_key).await.unwrap().is_none(), + "wallet 99 should not exist in the store baseline", + ); + + // Create a fresh wallet under account "3" inside a handler. + state.enter_handler(); + state + .entity_cache + .set( + new_wallet_key, + create_wallet_entity_no_vid("99", &acc_3, 500_i32), + None, + ) + .await + .unwrap(); + + let request = LoadRelatedRequest { + entity_type: ACCOUNT_TYPE.clone(), + entity_field: "wallets".into(), + entity_id: acc_3.clone(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = state.entity_cache.load_related(&request).await.unwrap(); + + assert_eq!( + result.len(), + 1, + "account 3 should own the newly created wallet 99, got {:?}", + result, + ); + assert_eq!(result[0].get("id").unwrap(), &Value::from("99")); + assert_eq!( + result[0].get("account").unwrap(), + &Value::from(acc_3.clone()), + ); + }); +}