Skip to content

[FLINK-39027][pipeline-connector][postgres] Support DDL for postgres pipeline connector#4259

Open
zml1206 wants to merge 18 commits intoapache:masterfrom
zml1206:pg_ddl
Open

[FLINK-39027][pipeline-connector][postgres] Support DDL for postgres pipeline connector#4259
zml1206 wants to merge 18 commits intoapache:masterfrom
zml1206:pg_ddl

Conversation

@zml1206
Copy link
Contributor

@zml1206 zml1206 commented Feb 4, 2026

Postgres WAL log cannot parse table structure change records, we can support this through event triggers. This approach performs better than merge schema.

Preliminary work

Create an event listener table in the synchronized database and associate it with event triggers to listen for the ddl_command_end event.

CREATE SCHEMA IF NOT EXISTS audit;

CREATE TABLE audit.ddl_log (
  id BIGSERIAL PRIMARY KEY,          -- use BIGSERIAL to prevent overflow after long-term operation
  ddl_tag TEXT NOT NULL,             -- DDL command types (CREATE, ALTER, DROP, etc.)
  object_type TEXT,                  -- object type (TABLE, INDEX, FUNCTION, etc.)
  object_identity TEXT,              -- object identity
  command_text TEXT,                 -- complete DDL command text
  executing_user TEXT DEFAULT CURRENT_USER, -- user executing DDL
  created_at TIMESTAMP DEFAULT clock_timestamp()
);

-- Create a trigger function to handle DDL operations
CREATE OR REPLACE FUNCTION audit.log_ddl_event()
RETURNS event_trigger
SECURITY DEFINER
AS $$
DECLARE
    r RECORD;
BEGIN
    FOR r IN SELECT * FROM pg_event_trigger_ddl_commands()
    LOOP
        -- Prevent recursive triggering! Ignore DDL operations on the audit table itself.
        IF r.schema_name = 'audit' AND r.object_identity LIKE 'audit.ddl_log%' THEN
            RAISE DEBUG 'Ignoring DDL operation on audit table: %', r.object_identity;
            CONTINUE;
        END IF;

        INSERT INTO audit.ddl_log
            (ddl_tag, object_type, object_identity, command_text)
        VALUES
            (
              r.command_tag,
              r.object_type,
              r.object_identity,
              current_query()
            );
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- Create event trigger
CREATE EVENT TRIGGER capture_ddl_trigger
    ON ddl_command_end
    EXECUTE FUNCTION audit.log_ddl_event();

Supported DDL statements

alter add column
alter drop column
alter rename column
alter column datatype

Truncate table is not supported because PostgreSQL event triggers currently do not support the truncate table command.

fix

fix
@zml1206 zml1206 marked this pull request as ready for review February 5, 2026 13:39
@zml1206
Copy link
Contributor Author

zml1206 commented Feb 9, 2026

@lvyanquan Can you help take a look? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant