Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion etl-destinations/src/bigquery/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ mod tests {
TableName::new("public".to_string(), "test_table".to_string()),
columns,
));
let replication_mask = ReplicationMask::build(&table_schema, &column_names).unwrap();
let replication_mask = ReplicationMask::build_or_all(&table_schema, &column_names);

ReplicatedTableSchema::from_mask(table_schema, replication_mask)
}
Expand Down
137 changes: 120 additions & 17 deletions etl-postgres/src/replication/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use sqlx::{PgExecutor, PgPool, Row};
use std::collections::HashMap;
use tokio_postgres::types::Type as PgType;

use crate::types::{ColumnSchema, TableId, TableName, TableSchema};
use crate::types::{ColumnSchema, SnapshotId, TableId, TableName, TableSchema};

macro_rules! define_type_mappings {
(
Expand Down Expand Up @@ -134,34 +134,33 @@ define_type_mappings! {
DATE_RANGE => "DATE_RANGE"
}

/// Stores a table schema in the database.
/// Stores a table schema in the database with a specific snapshot ID.
///
/// Inserts or updates table schema and column information in schema storage tables
/// using a transaction to ensure atomicity.
/// Upserts table schema and replaces all column information in schema storage tables
/// using a transaction to ensure atomicity. If a schema version already exists for
/// the same (pipeline_id, table_id, snapshot_id), columns are deleted and re-inserted.
pub async fn store_table_schema(
pool: &PgPool,
pipeline_id: i64,
table_schema: &TableSchema,
) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;

// Insert or update table schema record
// Upsert table schema version
let table_schema_id: i64 = sqlx::query(
r#"
insert into etl.table_schemas (pipeline_id, table_id, schema_name, table_name)
values ($1, $2, $3, $4)
on conflict (pipeline_id, table_id)
do update set
schema_name = excluded.schema_name,
table_name = excluded.table_name,
updated_at = now()
insert into etl.table_schemas (pipeline_id, table_id, schema_name, table_name, snapshot_id)
values ($1, $2, $3, $4, $5::pg_lsn)
on conflict (pipeline_id, table_id, snapshot_id)
do update set schema_name = excluded.schema_name, table_name = excluded.table_name
returning id
"#,
)
.bind(pipeline_id)
.bind(table_schema.id.into_inner() as i64)
.bind(&table_schema.name.schema)
.bind(&table_schema.name.name)
.bind(table_schema.snapshot_id.to_pg_lsn_string())
.fetch_one(&mut *tx)
.await?
.get(0);
Expand Down Expand Up @@ -199,20 +198,34 @@ pub async fn store_table_schema(
Ok(())
}

/// Loads all table schemas for a pipeline from the database.
/// Loads all table schemas for a pipeline from the database at the latest snapshot.
///
/// Retrieves table schemas and columns from schema storage tables,
/// reconstructing complete [`TableSchema`] objects.
/// reconstructing complete [`TableSchema`] objects. This is equivalent to
/// calling [`load_table_schemas_at_snapshot`] with the maximum LSN value.
pub async fn load_table_schemas(
pool: &PgPool,
pipeline_id: i64,
) -> Result<Vec<TableSchema>, sqlx::Error> {
load_table_schemas_at_snapshot(pool, pipeline_id, SnapshotId::max()).await
}

/// Loads a single table schema with the largest snapshot_id <= the requested snapshot.
///
/// Returns `None` if no schema version exists for the table at or before the given snapshot.
pub async fn load_table_schema_at_snapshot(
pool: &PgPool,
pipeline_id: i64,
table_id: TableId,
snapshot_id: SnapshotId,
) -> Result<Option<TableSchema>, sqlx::Error> {
let rows = sqlx::query(
r#"
select
ts.table_id,
ts.schema_name,
ts.table_name,
ts.snapshot_id::text as snapshot_id,
tc.column_name,
tc.column_type,
tc.type_modifier,
Expand All @@ -222,11 +235,93 @@ pub async fn load_table_schemas(
tc.primary_key_ordinal_position
from etl.table_schemas ts
inner join etl.table_columns tc on ts.id = tc.table_schema_id
where ts.pipeline_id = $1
order by ts.table_id, tc.column_order
where ts.id = (
select id from etl.table_schemas
where pipeline_id = $1 and table_id = $2 and snapshot_id <= $3::pg_lsn
order by snapshot_id desc
limit 1
)
order by tc.column_order
"#,
)
.bind(pipeline_id)
.bind(SqlxTableId(table_id.into_inner()))
.bind(snapshot_id.to_pg_lsn_string())
.fetch_all(pool)
.await?;

if rows.is_empty() {
return Ok(None);
}

let first_row = &rows[0];
let table_oid: SqlxTableId = first_row.get("table_id");
let table_id = TableId::new(table_oid.0);
let schema_name: String = first_row.get("schema_name");
let table_name: String = first_row.get("table_name");
let snapshot_id_str: String = first_row.get("snapshot_id");
let snapshot_id = SnapshotId::from_pg_lsn_string(&snapshot_id_str)
.map_err(|e| sqlx::Error::Protocol(e.to_string()))?;

let mut table_schema = TableSchema::with_snapshot_id(
table_id,
TableName::new(schema_name, table_name),
vec![],
snapshot_id,
);

for row in rows {
table_schema.add_column_schema(parse_column_schema(&row));
}

Ok(Some(table_schema))
}

/// Loads all table schemas for a pipeline at a specific snapshot point.
///
/// For each table, retrieves the schema version with the largest snapshot_id
/// that is <= the requested snapshot_id. Tables without any schema version
/// at or before the snapshot are excluded from the result.
pub async fn load_table_schemas_at_snapshot(
pool: &PgPool,
pipeline_id: i64,
snapshot_id: SnapshotId,
) -> Result<Vec<TableSchema>, sqlx::Error> {
// Use DISTINCT ON to efficiently find the latest schema version for each table.
// PostgreSQL optimizes DISTINCT ON with ORDER BY using index scans when possible.
let rows = sqlx::query(
r#"
with latest_schemas as (
select distinct on (ts.table_id)
ts.id,
ts.table_id,
ts.schema_name,
ts.table_name,
ts.snapshot_id
from etl.table_schemas ts
where ts.pipeline_id = $1
and ts.snapshot_id <= $2::pg_lsn
order by ts.table_id, ts.snapshot_id desc
)
select
ls.table_id,
ls.schema_name,
ls.table_name,
ls.snapshot_id::text as snapshot_id,
tc.column_name,
tc.column_type,
tc.type_modifier,
tc.nullable,
tc.primary_key,
tc.column_order,
tc.primary_key_ordinal_position
from latest_schemas ls
inner join etl.table_columns tc on ls.id = tc.table_schema_id
order by ls.table_id, tc.column_order
"#,
)
.bind(pipeline_id)
.bind(snapshot_id.to_pg_lsn_string())
.fetch_all(pool)
.await?;

Expand All @@ -237,9 +332,17 @@ pub async fn load_table_schemas(
let table_id = TableId::new(table_oid.0);
let schema_name: String = row.get("schema_name");
let table_name: String = row.get("table_name");
let snapshot_id_str: String = row.get("snapshot_id");
let row_snapshot_id = SnapshotId::from_pg_lsn_string(&snapshot_id_str)
.map_err(|e| sqlx::Error::Protocol(e.to_string()))?;

let entry = table_schemas.entry(table_id).or_insert_with(|| {
TableSchema::new(table_id, TableName::new(schema_name, table_name), vec![])
TableSchema::with_snapshot_id(
table_id,
TableName::new(schema_name, table_name),
vec![],
row_snapshot_id,
)
});

entry.add_column_schema(parse_column_schema(&row));
Expand Down
10 changes: 9 additions & 1 deletion etl-postgres/src/tokio/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ pub enum TableModification<'a> {
DropColumn {
name: &'a str,
},
/// Alter an existing column with the specified alteration.
/// Alter an existing column with the specified alteration (e.g., "type bigint").
AlterColumn {
name: &'a str,
alteration: &'a str,
},
/// Rename an existing column.
RenameColumn {
old_name: &'a str,
new_name: &'a str,
},
ReplicaIdentity {
value: &'a str,
},
Expand Down Expand Up @@ -211,6 +216,9 @@ impl<G: GenericClient> PgDatabase<G> {
TableModification::AlterColumn { name, alteration } => {
format!("alter column {name} {alteration}")
}
TableModification::RenameColumn { old_name, new_name } => {
format!("rename column {old_name} to {new_name}")
}
TableModification::ReplicaIdentity { value } => {
format!("replica identity {value}")
}
Expand Down
Loading