Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
133 commits
Select commit Hold shift + click to select a range
a22dc44
Improve
iambriccardo Nov 27, 2025
34a21c3
Improve
iambriccardo Nov 27, 2025
7b55c75
Improve
iambriccardo Nov 27, 2025
b0477a0
Improve
iambriccardo Nov 27, 2025
2921c5a
Improve
iambriccardo Nov 27, 2025
6f7202d
Improve
iambriccardo Nov 27, 2025
9c6eb9c
Improve
iambriccardo Nov 27, 2025
695b4e5
Improve
iambriccardo Nov 27, 2025
330f304
Improve
iambriccardo Nov 28, 2025
6859b19
Improve
iambriccardo Nov 28, 2025
c124deb
Improve
iambriccardo Nov 28, 2025
d978ac0
Improve
iambriccardo Nov 28, 2025
c4f7573
Improve
iambriccardo Nov 28, 2025
5d1bfd1
Improve
iambriccardo Nov 28, 2025
5d8806b
Improve
iambriccardo Nov 28, 2025
afd21e1
Improve
iambriccardo Nov 28, 2025
d357538
Improve
iambriccardo Nov 28, 2025
e06a009
Improve
iambriccardo Nov 28, 2025
da19127
Improve
iambriccardo Nov 28, 2025
77741ef
Improve
iambriccardo Nov 28, 2025
65768e7
Improve
iambriccardo Nov 28, 2025
9241fe9
Improve
iambriccardo Nov 28, 2025
aa25d22
Improve
iambriccardo Nov 28, 2025
f7f2e79
Improve
iambriccardo Dec 1, 2025
639a3f0
Merge remote-tracking branch 'origin/main' into riccardo/feat/ddl-sup…
iambriccardo Dec 1, 2025
b716388
Improve
iambriccardo Dec 1, 2025
7d3b043
Improve
iambriccardo Dec 1, 2025
f201605
Improve
iambriccardo Dec 1, 2025
60fd3f3
Improve
iambriccardo Dec 1, 2025
cb76720
Improve
iambriccardo Dec 1, 2025
d968c21
Improve
iambriccardo Dec 1, 2025
8709be0
Improve
iambriccardo Dec 1, 2025
bc001e6
Improve
iambriccardo Dec 1, 2025
a072abf
Improve
iambriccardo Dec 1, 2025
c3346c5
Improve
iambriccardo Dec 1, 2025
ab87226
Improve
iambriccardo Dec 1, 2025
af61344
Improve
iambriccardo Dec 1, 2025
23483c0
Improve
iambriccardo Dec 1, 2025
486539b
Improve
iambriccardo Dec 2, 2025
f6be7b1
Improve
iambriccardo Dec 2, 2025
2de1c17
Improve
iambriccardo Dec 2, 2025
a50a244
Improve
iambriccardo Dec 2, 2025
09a869a
Improve
iambriccardo Dec 3, 2025
cb06bde
Improve
iambriccardo Dec 5, 2025
dca47b7
Improve
iambriccardo Dec 5, 2025
43b49df
Improve
iambriccardo Dec 9, 2025
d4b9168
Improve
iambriccardo Dec 10, 2025
26503f3
Improve event trigger
iambriccardo Dec 15, 2025
5fd2070
Improve
iambriccardo Dec 15, 2025
1818967
Improve DDL event
iambriccardo Dec 16, 2025
2d0c26e
Improve
iambriccardo Dec 16, 2025
8f67f87
Improve
iambriccardo Dec 16, 2025
c794beb
Improve
iambriccardo Dec 9, 2025
6477658
Improve
iambriccardo Dec 9, 2025
e5e7af6
Improve
iambriccardo Dec 9, 2025
75c49b2
Improve
iambriccardo Dec 9, 2025
58a8be6
Improve
iambriccardo Dec 9, 2025
8a66244
Improve
iambriccardo Dec 9, 2025
b7d4aa1
Improve
iambriccardo Dec 10, 2025
b15c280
Improve
iambriccardo Dec 10, 2025
026483e
Improve
iambriccardo Dec 10, 2025
7d4efd7
Improve
iambriccardo Dec 10, 2025
d85c83a
Improve
iambriccardo Dec 11, 2025
28b79bb
Improve
iambriccardo Dec 11, 2025
f2b97b1
Improve
iambriccardo Dec 11, 2025
7ad40ac
Improve
iambriccardo Dec 12, 2025
13a794c
Improve
iambriccardo Dec 12, 2025
c6d722c
Improve
iambriccardo Dec 12, 2025
ea761d7
Improve
iambriccardo Dec 12, 2025
289fdfd
Improve
iambriccardo Dec 12, 2025
3435515
Improve
iambriccardo Dec 15, 2025
8b784ee
Improve
iambriccardo Dec 16, 2025
0553620
Improve
iambriccardo Dec 17, 2025
c6be346
Improve
iambriccardo Dec 17, 2025
4c9db78
Improve
iambriccardo Dec 17, 2025
c897c20
Improve
iambriccardo Dec 17, 2025
7991e85
feat(experimental): Add DDL support to BigQuery
iambriccardo Dec 11, 2025
ee56674
Improve
iambriccardo Dec 11, 2025
b0c9037
Improve
iambriccardo Dec 11, 2025
634ee86
Improve
iambriccardo Dec 11, 2025
5235216
Improve
iambriccardo Dec 12, 2025
9dcddb5
Improve
iambriccardo Dec 12, 2025
763a5aa
Improve
iambriccardo Dec 12, 2025
6c95f5b
Improve
iambriccardo Dec 12, 2025
74595dd
Improve
iambriccardo Dec 12, 2025
2131f91
Improve
iambriccardo Dec 16, 2025
002878f
Improve
iambriccardo Dec 16, 2025
b041543
Improve
iambriccardo Dec 16, 2025
be95534
Improve
iambriccardo Dec 16, 2025
e40e15a
Improve
iambriccardo Dec 16, 2025
f21cde8
Improve
iambriccardo Dec 16, 2025
707b134
Improve
iambriccardo Dec 16, 2025
e9d5ace
Improve
iambriccardo Dec 17, 2025
a8c2c7d
Improve
iambriccardo Dec 17, 2025
93f89a6
Improve
iambriccardo Dec 17, 2025
95e91a7
Improve
iambriccardo Dec 17, 2025
b5f560c
Improve
iambriccardo Dec 17, 2025
aee3443
Improve
iambriccardo Dec 17, 2025
c9abb34
Improve
iambriccardo Dec 17, 2025
1ccbd70
Improve
iambriccardo Dec 17, 2025
90fc1d3
Improve
iambriccardo Dec 17, 2025
c374c17
Improve
iambriccardo Dec 17, 2025
ec19115
Improve
iambriccardo Dec 17, 2025
cd96120
Improve
iambriccardo Dec 18, 2025
f759fb8
Merge branch 'main' into riccardo/feat/ddl-support
iambriccardo Dec 19, 2025
c66b2f6
Improve
iambriccardo Dec 19, 2025
82c8a38
Improve
iambriccardo Dec 19, 2025
dc9f733
Improve
iambriccardo Dec 9, 2025
c3961ec
Improve
iambriccardo Dec 9, 2025
f9f50ba
Improve
iambriccardo Dec 9, 2025
6a85fb9
Improve
iambriccardo Dec 9, 2025
f5d7c90
Improve
iambriccardo Dec 9, 2025
262533d
Improve
iambriccardo Dec 9, 2025
c659fec
Improve
iambriccardo Dec 10, 2025
2bf09be
Improve
iambriccardo Dec 10, 2025
18f6861
Improve
iambriccardo Dec 10, 2025
f77c5a0
Improve
iambriccardo Dec 10, 2025
4b53d93
Improve
iambriccardo Dec 11, 2025
3f72091
Improve
iambriccardo Dec 11, 2025
46dfdec
Improve
iambriccardo Dec 11, 2025
052a477
Improve
iambriccardo Dec 12, 2025
cbdbe86
Improve
iambriccardo Dec 12, 2025
e9a02e5
Improve
iambriccardo Dec 12, 2025
42212ac
Improve
iambriccardo Dec 12, 2025
166af27
Improve
iambriccardo Dec 12, 2025
f2dbb51
Improve
iambriccardo Dec 15, 2025
3e1c842
Improve
iambriccardo Dec 16, 2025
d7ac6e1
Improve
iambriccardo Dec 17, 2025
e2a4073
Improve
iambriccardo Dec 17, 2025
cb952fe
Improve
iambriccardo Dec 17, 2025
607d30c
Improve
iambriccardo Dec 17, 2025
4e71f6c
fix: modified CI to push docker images to AWS ECR (#501)
Hamm1 Dec 15, 2025
5b91e14
Improve
iambriccardo Dec 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_PORT: 5430
POSTGRES_HOST: localhost
run: |
sudo apt-get install libpq-dev -y
./etl-api/scripts/run_migrations.sh
run: ./scripts/run_migrations.sh etl-api

- name: Run Tests
run: |
Expand Down Expand Up @@ -153,9 +151,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_PORT: 5430
POSTGRES_HOST: localhost
run: |
sudo apt-get install libpq-dev -y
./etl-api/scripts/run_migrations.sh
run: ./scripts/run_migrations.sh etl-api

- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const-oid = { version = "0.9.6", default-features = false }
constant_time_eq = { version = "0.4.2" }
fail = { version = "0.5.1", default-features = false }
futures = { version = "0.3.31", default-features = false }
gcp-bigquery-client = { version = "0.27.0", default-features = false }
gcp-bigquery-client = { git = "https://github.com/iambriccardo/gcp-bigquery-client", default-features = false, rev = "a1cc7895afce36c0c86cd71bab94253fef04f05c" }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Severity: HIGH

Supply Chain Risk: Unverified Git Dependency

Replacing the official gcp-bigquery-client crate (v0.27.0) with a dependency from a personal GitHub fork introduces supply chain security risks. The code at this commit hasn't undergone crates.io vetting, the repository owner could force-push or delete it, and there's no audit trail. Use the official published crate or fork to your organization's repository with proper security review processes.
Helpful? Add 👍 / 👎

💡 Fix Suggestion

Suggestion: Replace the Git dependency from the personal fork with the official published crate from crates.io version 0.27.0. This eliminates supply chain risks by using the vetted, published version. If custom changes are absolutely required, fork the repository to your organization's GitHub account and establish proper security review processes before using it as a dependency.

⚠️ Experimental Feature: This code suggestion is automatically generated. Please review carefully.

Suggested change
gcp-bigquery-client = { git = "https://github.com/iambriccardo/gcp-bigquery-client", default-features = false, rev = "a1cc7895afce36c0c86cd71bab94253fef04f05c" }
gcp-bigquery-client = { version = "0.27.0", default-features = false }

iceberg = { version = "0.7.0", default-features = false }
iceberg-catalog-rest = { version = "0.7.0", default-features = false }
insta = { version = "1.43.1", default-features = false }
Expand All @@ -57,7 +57,7 @@ metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
parquet = { version = "55.0", default-features = false }
pg_escape = { version = "0.1.1", default-features = false }
pin-project-lite = { version = "0.2.16", default-features = false }
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
postgres-replication = { git = "https://github.com/iambriccardo/rust-postgres", default-features = false, rev = "31acf55c7e5c2244e5bb3a36e7afa2a01bf52c38" }
prost = { version = "0.14.1", default-features = false }
rand = { version = "0.9.2", default-features = false }
reqwest = { version = "0.12.22", default-features = false }
Expand All @@ -74,7 +74,7 @@ thiserror = "2.0.12"
tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = ["stats"] }
tikv-jemallocator = { version = "0.6.1", default-features = false, features = ["background_threads_runtime_support", "unprefixed_malloc_on_supported_platforms"] }
tokio = { version = "1.47.0", default-features = false }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
tokio-postgres = { git = "https://github.com/iambriccardo/rust-postgres", default-features = false, rev = "31acf55c7e5c2244e5bb3a36e7afa2a01bf52c38" }
tokio-rustls = { version = "0.26.2", default-features = false }
tracing = { version = "0.1.41", default-features = false }
tracing-actix-web = { version = "0.7.19", default-features = false }
Expand Down
31 changes: 15 additions & 16 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ If you prefer manual setup or have an existing PostgreSQL instance:

#### Single Database Setup

If using one database for both the API and replicator state:
If using one database for both the API and etl state:

```bash
export DATABASE_URL=postgres://USER:PASSWORD@HOST:PORT/DB

# Run both migrations on the same database
./etl-api/scripts/run_migrations.sh
./etl-replicator/scripts/run_migrations.sh
# Run all migrations on the same database
./scripts/run_migrations.sh
```

#### Separate Database Setup
Expand All @@ -116,16 +115,16 @@ If using separate databases (recommended for production):
```bash
# API migrations on the control plane database
export DATABASE_URL=postgres://USER:PASSWORD@API_HOST:PORT/API_DB
./etl-api/scripts/run_migrations.sh
./scripts/run_migrations.sh etl-api

# Replicator migrations on the source database
# ETL migrations on the source database
export DATABASE_URL=postgres://USER:PASSWORD@SOURCE_HOST:PORT/SOURCE_DB
./etl-replicator/scripts/run_migrations.sh
./scripts/run_migrations.sh etl
```

This separation allows you to:
- Scale the control plane independently from replication workloads
- Keep the replicator state close to the source data
- Keep the etl state close to the source data
- Isolate concerns between infrastructure management and data replication

## Database Migrations
Expand All @@ -140,7 +139,7 @@ Located in `etl-api/migrations/`, these create the control plane schema (`app` s

```bash
# From project root
./etl-api/scripts/run_migrations.sh
./scripts/run_migrations.sh etl-api

# Or manually with SQLx CLI
sqlx migrate run --source etl-api/migrations
Expand All @@ -167,19 +166,19 @@ cd etl-api
cargo sqlx prepare
```

### ETL Replicator Migrations
### ETL Migrations

Located in `etl-replicator/migrations/`, these create the replicator's state store schema (`etl` schema) for tracking replication state, table schemas, and mappings.
Located in `etl/migrations/`, these create the etl state store schema (`etl` schema) for tracking replication state, table schemas, and mappings.

**Running replicator migrations:**
**Running etl migrations:**

```bash
# From project root
./etl-replicator/scripts/run_migrations.sh
./scripts/run_migrations.sh etl

# Or manually with SQLx CLI (requires setting search_path)
psql $DATABASE_URL -c "create schema if not exists etl;"
sqlx migrate run --source etl-replicator/migrations --database-url "${DATABASE_URL}?options=-csearch_path%3Detl"
sqlx migrate run --source etl/migrations --database-url "${DATABASE_URL}?options=-csearch_path%3Detl"
```

**Important:** Migrations are run automatically when using the `etl-replicator` binary (see `etl-replicator/src/migrations.rs:16`). However, if you integrate the `etl` crate directly into your own application as a library, you should run these migrations manually before starting your pipeline. This design decision ensures:
Expand All @@ -193,10 +192,10 @@ sqlx migrate run --source etl-replicator/migrations --database-url "${DATABASE_U
- Testing migrations independently
- CI/CD pipelines that separate migration and deployment steps

**Creating a new replicator migration:**
**Creating a new etl migration:**

```bash
cd etl-replicator
cd etl
sqlx migrate add <migration_name>
```

Expand Down
30 changes: 0 additions & 30 deletions etl-api/scripts/run_migrations.sh

This file was deleted.

6 changes: 3 additions & 3 deletions etl-api/src/db/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::db::replicators::{Replicator, ReplicatorsDbError, create_replicator};
use crate::db::sources::Source;
use crate::routes::connect_to_source_database_with_defaults;
use crate::routes::pipelines::PipelineError;
use etl_postgres::replication::{health, schema, slots, state, table_mappings};
use etl_postgres::replication::{destination_metadata, health, schema, slots, state};
use sqlx::{PgExecutor, PgTransaction};
use std::ops::DerefMut;
use thiserror::Error;
Expand Down Expand Up @@ -247,13 +247,13 @@ pub async fn delete_pipeline_cascading(
None
};

// Delete state, schema, and table mappings from the source database, only if ETL tables exist.
// Delete state, schema, and destination metadata from the source database, only if ETL tables exist.
if etl_present {
let _ = state::delete_replication_state_for_all_tables(source_txn.deref_mut(), pipeline.id)
.await?;
let _ = schema::delete_table_schemas_for_all_tables(source_txn.deref_mut(), pipeline.id)
.await?;
let _ = table_mappings::delete_table_mappings_for_all_tables(
let _ = destination_metadata::delete_destination_tables_metadata_for_all_tables(
source_txn.deref_mut(),
pipeline.id,
)
Expand Down
44 changes: 22 additions & 22 deletions etl-api/tests/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,15 +1153,15 @@ async fn rollback_table_state_with_full_reset_succeeds() {
.await
.unwrap();

// Insert a table mapping for this table
sqlx::query("INSERT INTO etl.table_mappings (pipeline_id, source_table_id, destination_table_id) VALUES ($1, $2, 'dest_test_users')")
// Insert destination metadata for this table
sqlx::query("INSERT INTO etl.destination_tables_metadata (pipeline_id, table_id, destination_table_id, snapshot_id, schema_status, replication_mask) VALUES ($1, $2, 'dest_test_users', '0/0'::pg_lsn, 'applied', '\\x01')")
.bind(pipeline_id)
.bind(table_oid)
.execute(&source_db_pool)
.await
.unwrap();

// Verify table schema and mapping exist before reset
// Verify table schema and metadata exist before reset
let schema_count_before: i64 = sqlx::query_scalar(
"select count(*) from etl.table_schemas where pipeline_id = $1 and table_id = $2",
)
Expand All @@ -1172,15 +1172,15 @@ async fn rollback_table_state_with_full_reset_succeeds() {
.unwrap();
assert_eq!(schema_count_before, 1);

let mapping_count_before: i64 = sqlx::query_scalar(
"select count(*) from etl.table_mappings where pipeline_id = $1 and source_table_id = $2",
let metadata_count_before: i64 = sqlx::query_scalar(
"select count(*) from etl.destination_tables_metadata where pipeline_id = $1 and table_id = $2",
)
.bind(pipeline_id)
.bind(table_oid)
.fetch_one(&source_db_pool)
.await
.unwrap();
assert_eq!(mapping_count_before, 1);
assert_eq!(metadata_count_before, 1);

let response = test_rollback(
&app,
Expand Down Expand Up @@ -1222,22 +1222,22 @@ async fn rollback_table_state_with_full_reset_succeeds() {
.unwrap();
assert_eq!(schema_count_after, 0);

// Verify table mapping was deleted
let mapping_count_after: i64 = sqlx::query_scalar(
"select count(*) from etl.table_mappings where pipeline_id = $1 and source_table_id = $2",
// Verify destination metadata was deleted
let metadata_count_after: i64 = sqlx::query_scalar(
"select count(*) from etl.destination_tables_metadata where pipeline_id = $1 and table_id = $2",
)
.bind(pipeline_id)
.bind(table_oid)
.fetch_one(&source_db_pool)
.await
.unwrap();
assert_eq!(mapping_count_after, 0);
assert_eq!(metadata_count_after, 0);

drop_pg_database(&source_db_config).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn rollback_to_init_cleans_up_schemas_and_mappings() {
async fn rollback_to_init_cleans_up_schemas_and_metadata() {
init_test_tracing();
let (app, tenant_id, pipeline_id, source_db_pool, source_db_config) =
setup_pipeline_with_source_db().await;
Expand Down Expand Up @@ -1273,7 +1273,7 @@ async fn rollback_to_init_cleans_up_schemas_and_mappings() {
.await
.unwrap();

sqlx::query("INSERT INTO etl.table_mappings (pipeline_id, source_table_id, destination_table_id) VALUES ($1, $2, 'dest_test_users')")
sqlx::query("INSERT INTO etl.destination_tables_metadata (pipeline_id, table_id, destination_table_id, snapshot_id, schema_status, replication_mask) VALUES ($1, $2, 'dest_test_users', '0/0'::pg_lsn, 'applied', '\\x01')")
.bind(pipeline_id)
.bind(table_oid)
.execute(&source_db_pool)
Expand Down Expand Up @@ -1308,22 +1308,22 @@ async fn rollback_to_init_cleans_up_schemas_and_mappings() {
.unwrap();
assert_eq!(schema_count, 0);

// Verify table mapping was deleted
let mapping_count: i64 = sqlx::query_scalar(
"select count(*) from etl.table_mappings where pipeline_id = $1 and source_table_id = $2",
// Verify destination metadata was deleted
let metadata_count: i64 = sqlx::query_scalar(
"select count(*) from etl.destination_tables_metadata where pipeline_id = $1 and table_id = $2",
)
.bind(pipeline_id)
.bind(table_oid)
.fetch_one(&source_db_pool)
.await
.unwrap();
assert_eq!(mapping_count, 0);
assert_eq!(metadata_count, 0);

drop_pg_database(&source_db_config).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn rollback_to_non_starting_state_keeps_schemas_and_mappings() {
async fn rollback_to_non_starting_state_keeps_schemas_and_metadata() {
init_test_tracing();
let (app, tenant_id, pipeline_id, source_db_pool, source_db_config) =
setup_pipeline_with_source_db().await;
Expand Down Expand Up @@ -1359,7 +1359,7 @@ async fn rollback_to_non_starting_state_keeps_schemas_and_mappings() {
.await
.unwrap();

sqlx::query("INSERT INTO etl.table_mappings (pipeline_id, source_table_id, destination_table_id) VALUES ($1, $2, 'dest_test_users')")
sqlx::query("INSERT INTO etl.destination_tables_metadata (pipeline_id, table_id, destination_table_id, snapshot_id, schema_status, replication_mask) VALUES ($1, $2, 'dest_test_users', '0/0'::pg_lsn, 'applied', '\\x01')")
.bind(pipeline_id)
.bind(table_oid)
.execute(&source_db_pool)
Expand Down Expand Up @@ -1394,16 +1394,16 @@ async fn rollback_to_non_starting_state_keeps_schemas_and_mappings() {
.unwrap();
assert_eq!(schema_count, 1);

// Verify table mapping was NOT deleted
let mapping_count: i64 = sqlx::query_scalar(
"select count(*) from etl.table_mappings where pipeline_id = $1 and source_table_id = $2",
// Verify destination metadata was NOT deleted
let metadata_count: i64 = sqlx::query_scalar(
"select count(*) from etl.destination_tables_metadata where pipeline_id = $1 and table_id = $2",
)
.bind(pipeline_id)
.bind(table_oid)
.fetch_one(&source_db_pool)
.await
.unwrap();
assert_eq!(mapping_count, 1);
assert_eq!(metadata_count, 1);

drop_pg_database(&source_db_config).await;
}
Expand Down
4 changes: 2 additions & 2 deletions etl-api/tests/support/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ pub async fn run_etl_migrations_on_source_database(source_db_config: &PgConnecti
.await
.expect("failed to set search path");

// Run replicator migrations to create the state store tables.
sqlx::migrate!("../etl-replicator/migrations")
// Run migrations to create the etl tables.
sqlx::migrate!("../etl/migrations")
.run(&source_pool)
.await
.expect("failed to run etl migrations");
Expand Down
30 changes: 21 additions & 9 deletions etl-benchmarks/benches/table_copies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use etl::error::EtlResult;
use etl::pipeline::Pipeline;
use etl::state::table::TableReplicationPhaseType;
use etl::test_utils::notify::NotifyingStore;
use etl::types::{Event, TableRow};
use etl::types::{Event, ReplicatedTableSchema, TableRow};
use etl_config::Environment;
use etl_config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig};
use etl_destinations::bigquery::BigQueryDestination;
Expand Down Expand Up @@ -413,21 +413,30 @@ impl Destination for BenchDestination {
"bench_destination"
}

async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> {
async fn truncate_table(
&self,
replicated_table_schema: &ReplicatedTableSchema,
) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.truncate_table(table_id).await,
BenchDestination::BigQuery(dest) => dest.truncate_table(table_id).await,
BenchDestination::Null(dest) => dest.truncate_table(replicated_table_schema).await,
BenchDestination::BigQuery(dest) => dest.truncate_table(replicated_table_schema).await,
}
}

async fn write_table_rows(
&self,
table_id: TableId,
replicated_table_schema: &ReplicatedTableSchema,
table_rows: Vec<TableRow>,
) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.write_table_rows(table_id, table_rows).await,
BenchDestination::BigQuery(dest) => dest.write_table_rows(table_id, table_rows).await,
BenchDestination::Null(dest) => {
dest.write_table_rows(replicated_table_schema, table_rows)
.await
}
BenchDestination::BigQuery(dest) => {
dest.write_table_rows(replicated_table_schema, table_rows)
.await
}
}
}

Expand All @@ -444,13 +453,16 @@ impl Destination for NullDestination {
"null"
}

async fn truncate_table(&self, _table_id: TableId) -> EtlResult<()> {
async fn truncate_table(
&self,
_replicated_table_schema: &ReplicatedTableSchema,
) -> EtlResult<()> {
Ok(())
}

async fn write_table_rows(
&self,
_table_id: TableId,
_replicated_table_schema: &ReplicatedTableSchema,
_table_rows: Vec<TableRow>,
) -> EtlResult<()> {
Ok(())
Expand Down
Loading