diff --git a/etl/src/workers/apply.rs b/etl/src/workers/apply.rs index 47e578b5b..26acdb00a 100644 --- a/etl/src/workers/apply.rs +++ b/etl/src/workers/apply.rs @@ -9,6 +9,7 @@ use tokio_postgres::types::PgLsn; use tracing::warn; use tracing::{Instrument, debug, error, info}; +use crate::bail; use crate::concurrency::shutdown::ShutdownRx; use crate::concurrency::signal::SignalTx; use crate::concurrency::signal::create_signal; @@ -16,7 +17,7 @@ use crate::destination::Destination; use crate::error::{ErrorKind, EtlError, EtlResult}; use crate::etl_error; use crate::replication::apply::{ApplyLoopAction, ApplyLoopHook, start_apply_loop}; -use crate::replication::client::PgReplicationClient; +use crate::replication::client::{GetOrCreateSlotResult, PgReplicationClient}; use crate::replication::common::get_active_table_replication_states; use crate::state::table::{ TableReplicationError, TableReplicationPhase, TableReplicationPhaseType, @@ -135,7 +136,8 @@ where publication_name = self.config.publication_name ); let apply_worker = async move { - let start_lsn = get_start_lsn(self.pipeline_id, &self.replication_client).await?; + let start_lsn = + get_start_lsn(self.pipeline_id, &self.replication_client, &self.store).await?; // We create the signal used to notify the apply worker that it should force syncing tables. let (force_syncing_tables_tx, force_syncing_tables_rx) = create_signal(); @@ -181,25 +183,74 @@ where /// This function implements critical replication consistency logic by managing the apply worker's /// replication slot. The slot serves as a persistent marker in Postgres's WAL (Write-Ahead Log) /// that tracks the apply worker's progress and prevents WAL deletion of unreplicated data. -async fn get_start_lsn( +/// +/// When creating a new slot, this function validates that all tables are in the Init state. +/// If any table is not in Init state when creating a new slot, it indicates that data was +/// synchronized based on a different apply worker lineage, which would break replication +/// correctness. +async fn get_start_lsn( pipeline_id: PipelineId, replication_client: &PgReplicationClient, + store: &S, ) -> EtlResult { let slot_name: String = EtlReplicationSlot::for_apply_worker(pipeline_id).try_into()?; - // TODO: validate that we only create the slot when we first start replication which - // means when all tables are in the Init state. In any other case we should raise an - // error because that means the apply slot was deleted and creating a fresh slot now - // could cause inconsistent data to be read. - // Addendum: this might be hard to detect in all cases. E.g. what if the apply worker - // starts bunch of table sync workers and before creating a slot the process crashes? - // In this case, the apply worker slot is missing not because someone deleted it but - // because it was never created in the first place. The answer here might be to create - // the apply worker slot as the first thing, before starting table sync workers. + // We try to get or create the slot. Both operations will return an LSN that we can use to start + // streaming events. let slot = replication_client.get_or_create_slot(&slot_name).await?; - let start_lsn = slot.get_start_lsn(); - Ok(start_lsn) + // When creating a new apply worker slot, all tables must be in the `Init` state. If any table + // is not in Init state, it means the table was synchronized based on another apply worker + // lineage (different slot) which will break correctness. + if let GetOrCreateSlotResult::CreateSlot(_) = &slot { + if let Err(err) = validate_tables_in_init_state(store).await { + // Delete the slot before failing, otherwise the system will restart and skip validation + // since the slot will already exist. + replication_client.delete_slot(&slot_name).await?; + + return Err(err); + } + } + + // We return the LSN from which we will start streaming events. + Ok(slot.get_start_lsn()) +} + +/// Validates that all tables are in the Init state. +/// +/// This validation is required when creating a new apply worker slot to ensure replication +/// correctness. If any table has progressed beyond Init state, it indicates the table was +/// synchronized based on a different apply worker lineage. +async fn validate_tables_in_init_state(store: &S) -> EtlResult<()> { + let table_states = store.get_table_replication_states().await?; + + let non_init_tables: Vec<_> = table_states + .iter() + .filter(|(_, phase)| phase.as_type() != TableReplicationPhaseType::Init) + .map(|(table_id, phase)| (*table_id, phase.as_type())) + .collect(); + + if non_init_tables.is_empty() { + return Ok(()); + } + + let table_details: Vec = non_init_tables + .iter() + .map(|(id, phase)| format!("table {id} in state {phase}")) + .collect(); + + bail!( + ErrorKind::InvalidState, + "Cannot create apply worker slot when tables are not in Init state", + format!( + "Creating a new apply worker replication slot requires all tables to be in Init state, \ + but found {} table(s) in non-Init states: {}. This indicates that tables were \ + synchronized based on a different apply worker lineage. To fix this, either restore \ + the original apply worker slot or reset all tables to Init state.", + non_init_tables.len(), + table_details.join(", ") + ) + ); } /// Internal coordination hook that implements apply loop integration with table sync workers. diff --git a/etl/tests/pipeline.rs b/etl/tests/pipeline.rs index 931a896c9..27fcfd83f 100644 --- a/etl/tests/pipeline.rs +++ b/etl/tests/pipeline.rs @@ -26,6 +26,106 @@ use rand::random; use std::time::Duration; use tokio::time::sleep; +#[tokio::test(flavor = "multi_thread")] +async fn pipeline_fails_when_slot_deleted_with_non_init_tables() { + init_test_tracing(); + + let database = spawn_source_database().await; + let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await; + + let store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + destination.clone(), + ); + + // Wait for the table to finish syncing (not in Init state anymore). + let sync_done_notify = store + .notify_on_table_state_type( + database_schema.users_schema().id, + TableReplicationPhaseType::SyncDone, + ) + .await; + + pipeline.start().await.unwrap(); + + sync_done_notify.notified().await; + + pipeline.shutdown_and_wait().await.unwrap(); + + // Verify that the table is in SyncDone state (not Init). + let table_states = store.get_table_replication_states().await; + assert_eq!( + table_states + .get(&database_schema.users_schema().id) + .unwrap() + .as_type(), + TableReplicationPhaseType::SyncDone + ); + + // Verify that the replication slot for the apply worker exists. + let apply_slot_name: String = EtlReplicationSlot::for_apply_worker(pipeline_id) + .try_into() + .unwrap(); + let slot_exists = database + .replication_slot_exists(&apply_slot_name) + .await + .unwrap(); + assert!(slot_exists, "Apply slot should exist after pipeline start"); + + // Delete the apply worker slot to simulate slot loss. + database + .run_sql(&format!( + "select pg_drop_replication_slot('{apply_slot_name}')" + )) + .await + .unwrap(); + let slot_exists = !database + .replication_slot_exists(&apply_slot_name) + .await + .unwrap(); + assert!(slot_exists, "Apply slot should not exist after deletion"); + + // Restart the pipeline - it should fail because tables are not in Init state. + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + destination.clone(), + ); + + // The pipeline starts successfully (the actual work happens in a spawned task). + pipeline.start().await.unwrap(); + + // The error surfaces when we wait for the pipeline to complete. + let wait_result = pipeline.wait().await; + assert!(wait_result.is_err(), "Pipeline wait should fail"); + + let err = wait_result.unwrap_err(); + assert!( + err.kinds().contains(&ErrorKind::InvalidState), + "Error should be InvalidState, got: {:?}", + err.kinds() + ); + + // Verify that the slot was cleaned up (deleted) after the validation failure. + let slot_exists = !database + .replication_slot_exists(&apply_slot_name) + .await + .unwrap(); + assert!( + slot_exists, + "Apply slot should be deleted after validation failure" + ); +} + #[tokio::test(flavor = "multi_thread")] async fn table_schema_copy_survives_pipeline_restarts() { init_test_tracing();