Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 50 additions & 12 deletions etl/src/workers/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ use tokio_postgres::types::PgLsn;
use tracing::warn;
use tracing::{Instrument, debug, error, info};

use crate::bail;
use crate::concurrency::shutdown::ShutdownRx;
use crate::concurrency::signal::SignalTx;
use crate::concurrency::signal::create_signal;
use crate::destination::Destination;
use crate::error::{ErrorKind, EtlError, EtlResult};
use crate::etl_error;
use crate::replication::apply::{ApplyLoopAction, ApplyLoopHook, start_apply_loop};
use crate::replication::client::PgReplicationClient;
use crate::replication::client::{GetOrCreateSlotResult, PgReplicationClient};
use crate::replication::common::get_active_table_replication_states;
use crate::state::table::{
TableReplicationError, TableReplicationPhase, TableReplicationPhaseType,
Expand Down Expand Up @@ -135,7 +136,8 @@ where
publication_name = self.config.publication_name
);
let apply_worker = async move {
let start_lsn = get_start_lsn(self.pipeline_id, &self.replication_client).await?;
let start_lsn =
get_start_lsn(self.pipeline_id, &self.replication_client, &self.store).await?;

// We create the signal used to notify the apply worker that it should force syncing tables.
let (force_syncing_tables_tx, force_syncing_tables_rx) = create_signal();
Expand Down Expand Up @@ -181,22 +183,58 @@ where
/// This function implements critical replication consistency logic by managing the apply worker's
/// replication slot. The slot serves as a persistent marker in Postgres's WAL (Write-Ahead Log)
/// that tracks the apply worker's progress and prevents WAL deletion of unreplicated data.
async fn get_start_lsn(
///
/// When creating a new slot, this function validates that all tables are in the Init state.
/// If any table is not in Init state when creating a new slot, it indicates that data was
/// synchronized based on a different apply worker lineage, which would break replication
/// correctness.
async fn get_start_lsn<S: StateStore>(
pipeline_id: PipelineId,
replication_client: &PgReplicationClient,
store: &S,
) -> EtlResult<PgLsn> {
let slot_name: String = EtlReplicationSlot::for_apply_worker(pipeline_id).try_into()?;

// TODO: validate that we only create the slot when we first start replication which
// means when all tables are in the Init state. In any other case we should raise an
// error because that means the apply slot was deleted and creating a fresh slot now
// could cause inconsistent data to be read.
// Addendum: this might be hard to detect in all cases. E.g. what if the apply worker
// starts bunch of table sync workers and before creating a slot the process crashes?
// In this case, the apply worker slot is missing not because someone deleted it but
// because it was never created in the first place. The answer here might be to create
// the apply worker slot as the first thing, before starting table sync workers.
// We try to get or create the slot. Both operations will return an LSN that we can use to start
// streaming events.
let slot = replication_client.get_or_create_slot(&slot_name).await?;

// When creating a new apply worker slot, all tables must be in the `Init` state. If any table
// is not in Init state, it means the table was synchronized based on another apply worker
// lineage (different slot) which will break correctness.
if let GetOrCreateSlotResult::CreateSlot(_) = &slot {
let table_states = store.get_table_replication_states().await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Severity: HIGH

TOCTOU Race Condition: Validation only runs for newly created slots (line 205 checks CreateSlot variant). If the process crashes after slot creation but before validation cleanup (line 216), the slot persists. On restart, get_or_create_slot returns GetSlot variant, completely bypassing this validation block. This allows inconsistent replication state. The comment on line 216 explicitly warns about this issue. Fix: Validate table states BEFORE calling get_or_create_slot(), or validate for BOTH CreateSlot and GetSlot cases.
Helpful? Add 👍 / 👎

💡 Fix Suggestion

Suggestion: To fix the TOCTOU race condition, validate table states BEFORE calling get_or_create_slot() on line 200. Move the table state validation logic (lines 206-235) to occur before line 200, and fail fast if any tables are not in Init state. This ensures validation happens regardless of whether the slot already exists or is newly created. Alternatively, remove the 'if let CreateSlot' conditional check on line 205 and validate for both CreateSlot AND GetSlot cases, but note that this would still require deleting the slot if validation fails for GetSlot, which may have other implications.

Comment on lines +205 to +206

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Clean up slot on validation failure

When get_or_create_slot returns CreateSlot, the code immediately calls get_table_replication_states() and will return the error if that fetch fails, but it never deletes the freshly created apply slot in that failure path. On restart the slot will already exist, so the Init-state validation is skipped and replication can proceed on tables that were already past Init, undermining the new consistency check. Consider dropping the slot whenever the validation step cannot complete so that the check always reruns.

Useful? React with 👍 / 👎.

let non_init_tables: Vec<_> = table_states
.iter()
.filter(|(_, phase)| phase.as_type() != TableReplicationPhaseType::Init)
.map(|(table_id, phase)| (*table_id, phase.as_type()))
.collect();

if !non_init_tables.is_empty() {
// We need to delete the slot before failing, otherwise the system will be restarted, and
// since the slot will be already there, it will skip validation.
replication_client.delete_slot(&slot_name).await?;

let table_details: Vec<String> = non_init_tables
.iter()
.map(|(id, phase)| format!("table {id} in state {phase}"))
.collect();

bail!(
ErrorKind::InvalidState,
"Cannot create apply worker slot when tables are not in Init state",
format!(
"Creating a new apply worker replication slot requires all tables to be in Init state, \
but found {} table(s) in non-Init states: {}. This indicates that tables were \
synchronized based on a different apply worker lineage. To fix this, either restore \
the original apply worker slot or reset all tables to Init state.",
non_init_tables.len(),
table_details.join(", ")
)
);
}
}

let start_lsn = slot.get_start_lsn();

Ok(start_lsn)
Expand Down