diff --git a/docs/graphman.md b/docs/graphman.md index 8c857703dda..5dc03044b92 100644 --- a/docs/graphman.md +++ b/docs/graphman.md @@ -8,6 +8,7 @@ - [Drop](#drop) - [Chain Check Blocks](#check-blocks) - [Chain Call Cache Remove](#chain-call-cache-remove) +- [Chain Rebuild Storage](#chain-rebuild-storage) # ⌘ Info @@ -439,3 +440,71 @@ Remove stale contracts from the call cache that have not been accessed in the la Remove stale contracts from the call cache that have not been accessed in the last 7 days, limiting the removal to a maximum of 100 contracts: graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 --ttl-max-contracts 100 + +# ⌘ Chain Rebuild Storage + +### SYNOPSIS + + Rebuild a chain's storage schema and reset head metadata + + If the storage schema is missing, rebuilds it silently. + If the storage already exists, prompts for confirmation before + dropping and rebuilding it (use --force to skip the prompt). + + USAGE: + graphman --config chain rebuild-storage [OPTIONS] + + ARGS: + Chain name (must be an existing chain, see 'chain list') + + OPTIONS: + -f, --force Skip confirmation prompt when storage already exists + -h, --help Print help information + +### DESCRIPTION + +The `chain rebuild-storage` command recovers from a situation where a chain's storage schema +(e.g. `chain42`) has been dropped or corrupted on the shard but the chain's metadata in +`public.chains` still exists. This can happen after manual database operations or partial failures. + +> **Operational requirement:** Stop graph-node before running this command. + +The command behaves differently depending on the state of the storage: + +**Storage missing** (non-destructive): the command silently rebuilds the schema and resets +head metadata. No confirmation is required. + +**Storage exists** (destructive): the command prompts for confirmation before dropping the +existing schema and rebuilding it from scratch. Use `--force` to skip the prompt. + +In both cases, the command performs the following steps in a single transaction: + +1. Drops the existing storage schema if present (`DROP SCHEMA ... CASCADE`). +2. Upserts the chain's row in `ethereum_networks` on the shard: inserts if missing, or repairs + identity metadata (`namespace`, `net_version`, `genesis_block_hash`) and resets head tracking + columns (`head_block_hash`, `head_block_number`, `head_block_cursor`) to `NULL` if the row exists. +3. Rebuilds the storage schema with empty `blocks`, `call_cache`, and `call_meta` tables. + +After this, graph-node will treat the chain as freshly added and begin syncing from scratch. + +The `public.chains` metadata row is never modified by this command. + +### CONSTRAINTS + +- The chain must already exist in `public.chains`. This command does not create new chains. +- Chains using shared storage (`public`) are not supported. + +### EXAMPLES + +Rebuild missing storage for a chain: + + graphman --config config.toml chain rebuild-storage mainnet + +Force-rebuild existing storage (skips confirmation): + + graphman --config config.toml chain rebuild-storage mainnet --force + +Check what chains are available: + + graphman --config config.toml chain list + diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 42f86730b8e..edae289e399 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -604,6 +604,20 @@ pub enum ChainCommand { /// The block number to ingest number: BlockNumber, }, + + /// Rebuild a chain's storage schema and reset head metadata. + /// + /// If the storage schema is missing, rebuilds it silently. + /// If the storage already exists, prompts for confirmation before + /// dropping and rebuilding it (use --force to skip the prompt). + RebuildStorage { + /// Chain name (must be an existing chain, see 'chain list') + #[clap(value_parser = clap::builder::NonEmptyStringValueParser::new())] + chain_name: String, + /// Skip confirmation prompt when storage already exists + #[clap(long, short)] + force: bool, + }, } #[derive(Clone, Debug, Subcommand)] @@ -1586,6 +1600,10 @@ async fn main() -> anyhow::Result<()> { ctx.chain_store_and_adapter(&name).await?; commands::chain::ingest(&logger, chain_store, ethereum_adapter, number).await } + RebuildStorage { chain_name, force } => { + let (block_store, primary) = ctx.block_store_and_primary_pool().await; + commands::chain::rebuild_storage(primary, block_store, chain_name, force).await + } } } Stats(cmd) => { diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 2f344bdeb7b..f80ef7e9151 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -13,6 +13,7 @@ use graph::components::store::StoreError; use graph::prelude::BlockNumber; use graph::prelude::ChainStore as _; use graph::prelude::LightEthereumBlock; +use graph::prelude::anyhow::Context as _; use graph::prelude::{anyhow, anyhow::bail}; use graph::slog::Logger; use graph::{ @@ -27,11 +28,13 @@ use graph_store_postgres::ChainStore; use graph_store_postgres::PoolCoordinator; use graph_store_postgres::ScopedFutureExt; use graph_store_postgres::Shard; +use graph_store_postgres::Storage; use graph_store_postgres::add_chain; use graph_store_postgres::find_chain; use graph_store_postgres::update_chain_name; use graph_store_postgres::{ConnectionPool, command_support::catalog::block_store}; +use crate::manager::prompt::prompt_for_confirmation; use crate::network_setup::Networks; pub async fn list(primary: ConnectionPool, store: BlockStore) -> Result<(), Error> { @@ -329,3 +332,56 @@ pub async fn ingest( } Ok(()) } + +pub async fn rebuild_storage( + primary: ConnectionPool, + store: BlockStore, + name: String, + force: bool, +) -> Result<(), Error> { + let mut conn = primary.get().await?; + + let chain = block_store::find_chain(&mut conn, &name) + .await? + .ok_or_else(|| { + anyhow!( + "Chain {} not found in public.chains.\n\ + This command only supports chains already present in metadata.", + name + ) + })?; + + if matches!(chain.storage, Storage::Shared) { + bail!( + "Chain {} uses shared storage public and cannot be rebuilt with this command.", + name + ); + } + + let namespace = chain.storage.to_string(); + let shard = &chain.shard; + let ident = chain.network_identifier()?; + + let drop_schema = store.has_namespace(&chain).await?; + if drop_schema { + let prompt = format!( + "Storage {namespace} for chain {name} already exists on shard {shard}.\n\ + This will drop and rebuild chain storage. All cached blocks and call cache \ + data in that namespace will be permanently deleted.\n\ + Proceed?" + ); + if !force && !prompt_for_confirmation(&prompt)? { + println!("Aborting."); + return Ok(()); + } + } + + store + .rebuild_chain_storage(&name, &ident, drop_schema) + .await + .with_context(|| format!("Failed to rebuild storage {namespace} for chain {name}"))?; + + println!("Successfully rebuilt storage {namespace} for chain {name} on shard {shard}."); + + Ok(()) +} diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 0ae4b05597a..b70f3ed00e0 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -484,6 +484,30 @@ impl BlockStore { Ok(()) } + pub async fn has_namespace(&self, chain: &primary::Chain) -> Result { + let pool = self + .pools + .get(&chain.shard) + .ok_or_else(|| internal_error!("no pool for shard {}", chain.shard))?; + let nsp = crate::primary::Namespace::special(chain.storage.to_string()); + let mut conn = pool.get_permitted().await?; + crate::catalog::has_namespace(&mut conn, &nsp).await + } + + pub async fn rebuild_chain_storage( + &self, + chain: &str, + ident: &ChainIdentifier, + drop_schema: bool, + ) -> Result<(), StoreError> { + let chain_store = self + .store(chain) + .await + .ok_or_else(|| internal_error!("No chain store found for {}", chain))?; + + Ok(chain_store.rebuild_storage(ident, drop_schema).await?) + } + // Helper to clone the list of chain stores to avoid holding the lock // while awaiting fn stores(&self) -> Vec> { diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 3fe39462a98..407c9c047ae 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -12,7 +12,7 @@ use graph::parking_lot::RwLock; use graph::prelude::MetricsRegistry; use graph::prelude::alloy::primitives::B256; use graph::prometheus::{CounterVec, GaugeVec}; -use graph::slog::{Logger, info, o}; +use graph::slog::{Logger, debug, info, o}; use graph::stable_hash::crypto_stable_hash; use graph::util::herd_cache::HerdCache; @@ -2479,6 +2479,60 @@ impl ChainStore { .await } + /// Drop the chain's storage schema (if it exists), reset head + /// metadata in `ethereum_networks`, and rebuild the schema with + /// empty tables. If the `ethereum_networks` row is missing, it is + /// created from the provided `ident`. + pub(crate) async fn rebuild_storage( + &self, + ident: &ChainIdentifier, + drop_schema: bool, + ) -> Result<(), Error> { + use public::ethereum_networks as n; + + let nsp = self.storage.to_string(); + + debug!(&self.logger, "Rebuilding storage for chain"; "chain" => &self.chain, "namespace" => &nsp); + + let mut conn = self.pool.get_permitted().await?; + conn.transaction(|conn| { + async { + if drop_schema { + debug!(&self.logger, "Dropping existing schema"; "namespace" => &nsp); + self.storage.drop_storage(conn, &self.chain).await?; + } + + debug!(&self.logger, "Upserting ethereum_networks row"; "chain" => &self.chain); + insert_into(n::table) + .values(( + n::name.eq(&self.chain), + n::namespace.eq(&self.storage), + n::net_version.eq(&ident.net_version), + n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()), + )) + .on_conflict(n::name) + .do_update() + .set(( + n::namespace.eq(&self.storage), + n::net_version.eq(&ident.net_version), + n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()), + n::head_block_hash.eq(None::), + n::head_block_number.eq(None::), + n::head_block_cursor.eq(None::), + )) + .execute(conn) + .await?; + + debug!(&self.logger, "Creating storage schema and tables"; "namespace" => &nsp); + self.storage.create(conn).await?; + + Ok(()) + } + .scope_boxed() + }) + .await + } + pub async fn chain_head_pointers( conn: &mut AsyncPgConnection, ) -> Result, StoreError> { diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index d72641b29b6..7336f3bd3e2 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -660,3 +660,228 @@ fn test_transaction_receipts_in_block_function() { assert!(receipts.is_empty()) }) } + +// ---- rebuild_storage tests ---- + +/// Helper that runs a test only on NETWORK_NAME (private storage). +/// rebuild_storage is not supported on shared storage. +fn run_rebuild_test(chain: FakeBlockList, test: F) +where + F: Fn(Arc, Arc) -> R + Send + Sync + 'static, + R: Future + Send + 'static, +{ + run_test_sequentially(|store| async move { + block_store::set_chain(chain.clone(), NETWORK_NAME).await; + + let chain_store = store + .block_store() + .chain_store(NETWORK_NAME) + .await + .expect("chain store for NETWORK_NAME"); + + test(chain_store, store).await; + }); +} + +#[test] +fn rebuild_storage_with_existing_namespace() { + let chain = vec![&*GENESIS_BLOCK, &*BLOCK_ONE, &*BLOCK_TWO]; + + run_rebuild_test(chain, |chain_store, store| async move { + let block_store = store.block_store(); + + // Look up the chain from primary metadata + let mut conn = PRIMARY_POOL.get().await.unwrap(); + let chain = graph_store_postgres::find_chain(&mut conn, NETWORK_NAME) + .await + .unwrap() + .expect("chain exists in public.chains"); + drop(conn); + + let ident = chain.network_identifier().unwrap(); + + // Verify blocks are present before rebuild + let hashes = chain_store.block_hashes_by_block_number(1).await.unwrap(); + assert!(!hashes.is_empty(), "block 1 should exist before rebuild"); + + // Verify namespace exists + assert!( + block_store.has_namespace(&chain).await.unwrap(), + "namespace should exist before rebuild" + ); + + // Rebuild storage (should drop and recreate) + block_store + .rebuild_chain_storage(NETWORK_NAME, &ident, true) + .await + .expect("rebuild_chain_storage succeeds"); + + // Verify namespace still exists after rebuild + assert!( + block_store.has_namespace(&chain).await.unwrap(), + "namespace should exist after rebuild" + ); + + // Verify blocks are gone after rebuild (tables are fresh) + let hashes = chain_store.block_hashes_by_block_number(1).await.unwrap(); + assert!(hashes.is_empty(), "blocks should be gone after rebuild"); + + // Verify chain identity is intact + let ident_after = chain_store.chain_identifier().await.unwrap(); + assert_eq!(ident.net_version, ident_after.net_version); + assert_eq!(ident.genesis_block_hash, ident_after.genesis_block_hash); + + // Verify head is reset to null + let head = chain_store + .clone() + .chain_head_ptr() + .await + .expect("chain_head_ptr succeeds"); + assert!(head.is_none(), "head should be null after rebuild"); + }); +} + +#[test] +fn rebuild_storage_with_missing_namespace() { + let chain = vec![&*GENESIS_BLOCK, &*BLOCK_ONE]; + + run_rebuild_test(chain, |chain_store, store| async move { + let block_store = store.block_store(); + + let mut conn = PRIMARY_POOL.get().await.unwrap(); + let chain = graph_store_postgres::find_chain(&mut conn, NETWORK_NAME) + .await + .unwrap() + .expect("chain exists in public.chains"); + drop(conn); + + let ident = chain.network_identifier().unwrap(); + let nsp = chain.storage.to_string(); + + // Drop the namespace manually to simulate missing storage + { + let mut conn = chain_store.get_conn_for_test().await.unwrap(); + diesel::sql_query(format!("DROP SCHEMA IF EXISTS {nsp} CASCADE")) + .execute(&mut conn) + .await + .unwrap(); + } + + // Verify namespace is gone + assert!( + !block_store.has_namespace(&chain).await.unwrap(), + "namespace should be gone after manual drop" + ); + + // Rebuild should recreate the missing namespace + block_store + .rebuild_chain_storage(NETWORK_NAME, &ident, false) + .await + .expect("rebuild_chain_storage succeeds on missing namespace"); + + // Verify namespace exists again + assert!( + block_store.has_namespace(&chain).await.unwrap(), + "namespace should exist after rebuild" + ); + + // Verify chain identity is correct + let ident_after = chain_store.chain_identifier().await.unwrap(); + assert_eq!(ident.net_version, ident_after.net_version); + assert_eq!(ident.genesis_block_hash, ident_after.genesis_block_hash); + }); +} + +#[test] +fn rebuild_storage_repairs_ethereum_networks_row() { + let chain = vec![&*GENESIS_BLOCK]; + + run_rebuild_test(chain, |chain_store, store| async move { + let block_store = store.block_store(); + + let mut conn = PRIMARY_POOL.get().await.unwrap(); + let chain = graph_store_postgres::find_chain(&mut conn, NETWORK_NAME) + .await + .unwrap() + .expect("chain exists in public.chains"); + drop(conn); + + let ident = chain.network_identifier().unwrap(); + let nsp = chain.storage.to_string(); + + // Drop the namespace AND delete the ethereum_networks row to + // simulate a fully broken state + { + let mut conn = chain_store.get_conn_for_test().await.unwrap(); + diesel::sql_query(format!("DROP SCHEMA IF EXISTS {nsp} CASCADE")) + .execute(&mut conn) + .await + .unwrap(); + diesel::sql_query(format!( + "DELETE FROM public.ethereum_networks WHERE name = '{}'", + NETWORK_NAME + )) + .execute(&mut conn) + .await + .unwrap(); + } + + // Rebuild should recreate both the namespace and the + // ethereum_networks row via upsert + block_store + .rebuild_chain_storage(NETWORK_NAME, &ident, false) + .await + .expect("rebuild_chain_storage succeeds with missing ethereum_networks row"); + + // Verify the chain identity was restored from the ident we passed in + let ident_after = chain_store.chain_identifier().await.unwrap(); + assert_eq!(ident.net_version, ident_after.net_version); + assert_eq!(ident.genesis_block_hash, ident_after.genesis_block_hash); + + // Verify namespace exists + assert!( + block_store.has_namespace(&chain).await.unwrap(), + "namespace should exist after rebuild" + ); + }); +} + +#[test] +fn has_namespace_returns_false_for_missing_schema() { + let chain = vec![&*GENESIS_BLOCK]; + + run_rebuild_test(chain, |chain_store, store| async move { + let block_store = store.block_store(); + + let mut conn = PRIMARY_POOL.get().await.unwrap(); + let chain = graph_store_postgres::find_chain(&mut conn, NETWORK_NAME) + .await + .unwrap() + .expect("chain exists"); + drop(conn); + + // Namespace should exist initially + assert!(block_store.has_namespace(&chain).await.unwrap()); + + let nsp = chain.storage.to_string(); + + // Drop the schema + { + let mut conn = chain_store.get_conn_for_test().await.unwrap(); + diesel::sql_query(format!("DROP SCHEMA IF EXISTS {nsp} CASCADE")) + .execute(&mut conn) + .await + .unwrap(); + } + + // Should return false now + assert!(!block_store.has_namespace(&chain).await.unwrap()); + + // Rebuild to leave things clean for other tests + let ident = chain.network_identifier().unwrap(); + block_store + .rebuild_chain_storage(NETWORK_NAME, &ident, false) + .await + .unwrap(); + }); +}