Skip to content

Commit

Permalink
Support table rename (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
shayonj authored Aug 31, 2024
1 parent 4193915 commit b72d031
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 8 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ Battle tested in production at [Tines](https://www.tines.com/) 🚀
- [Config Check](#config-check-1)
- [Bootstrap](#bootstrap-1)
- [Start sync](#start-sync)
- [DDL Changes Management](#ddl-changes-management)
- [Listing DDL Changes](#listing-ddl-changes)
- [Applying DDL Changes](#applying-ddl-changes)
- [Stats](#stats)
- [Performing switchover](#performing-switchover)
- [Replicating single database with custom tables](#replicating-single-database-with-custom-tables)
- [Exclude tables from replication](#exclude-tables-from-replication)
- [Cleanup](#cleanup)
- [Switchover strategies with minimal downtime](#switchover-strategies-with-minimal-downtime)
- [Rolling restart strategy](#rolling-restart-strategy)
- [DNS Failover strategy](#dns-failover-strategy)
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.7"
services:
source_db:
image: postgres:12
image: postgres:14
ports:
- "5432:5432"
environment:
Expand Down
19 changes: 12 additions & 7 deletions lib/pg_easy_replicate/ddl_audit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def create_trigger_function(conn, group_name)
sanitized_group_name = sanitize_identifier(group_name)

full_table_names = tables.map { |table| "#{schema_name}.#{table}" }
puts "full_table_names: #{full_table_names}"
table_pattern = full_table_names.join("|")

conn.run(<<~SQL)
CREATE OR REPLACE FUNCTION #{internal_schema_name}.pger_ddl_trigger_#{sanitized_group_name}() RETURNS event_trigger AS $$
DECLARE
Expand All @@ -126,12 +127,12 @@ def create_trigger_function(conn, group_name)
IF TG_EVENT = 'ddl_command_end' THEN
FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands()
LOOP
IF obj.object_type = 'table' AND obj.object_identity = ANY(ARRAY['#{full_table_names.join("','")}']) THEN
IF obj.object_identity ~ '^(#{table_pattern})' THEN
INSERT INTO #{internal_schema_name}.#{table_name} (group_name, event_type, object_type, object_identity, ddl_command)
VALUES ('#{group_name}', TG_EVENT, obj.object_type, obj.object_identity, ddl_command);
ELSIF obj.object_type = 'index' THEN
SELECT (regexp_match(ddl_command, 'ON\\s+(\\S+)'))[1] INTO affected_table;
IF affected_table = ANY(ARRAY['#{full_table_names.join("','")}']) THEN
IF affected_table IN ('#{full_table_names.join("','")}') THEN
INSERT INTO #{internal_schema_name}.#{table_name} (group_name, event_type, object_type, object_identity, ddl_command)
VALUES ('#{group_name}', TG_EVENT, obj.object_type, obj.object_identity, ddl_command);
END IF;
Expand All @@ -140,8 +141,7 @@ def create_trigger_function(conn, group_name)
ELSIF TG_EVENT = 'sql_drop' THEN
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects()
LOOP
IF obj.object_type IN ('table', 'index') AND
(obj.object_identity = ANY(ARRAY['#{full_table_names.join("','")}']) OR
IF (obj.object_identity = ANY(ARRAY['#{full_table_names.join("','")}']) OR
obj.object_identity ~ ('^' || '#{schema_name}' || '\\.(.*?)_.*$'))
THEN
INSERT INTO #{internal_schema_name}.#{table_name} (group_name, event_type, object_type, object_identity, ddl_command)
Expand All @@ -151,9 +151,14 @@ def create_trigger_function(conn, group_name)
ELSIF TG_EVENT = 'table_rewrite' THEN
FOR obj IN SELECT * FROM pg_event_trigger_table_rewrite_oid()
LOOP
IF obj.oid::regclass::text = ANY(ARRAY['#{full_table_names.join("','")}']) THEN
SELECT c.relname, n.nspname INTO affected_table
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.oid = obj.oid;
IF affected_table IN ('#{full_table_names.join("','")}') THEN
INSERT INTO #{internal_schema_name}.#{table_name} (group_name, event_type, object_type, object_identity, ddl_command)
VALUES ('#{group_name}', TG_EVENT, 'table', obj.oid::regclass::text, ddl_command);
VALUES ('#{group_name}', TG_EVENT, 'table', affected_table, 'table_rewrite');
END IF;
END LOOP;
END IF;
Expand Down
39 changes: 39 additions & 0 deletions spec/pg_easy_replicate/ddl_audit_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
execute_ddl("DROP INDEX #{schema_name}.idx_sellers_name")

changes = described_class.list_changes(group_name)

expect(changes.size).to eq(2)

create_index_change =
Expand All @@ -155,6 +156,44 @@
)
expect(drop_index_change[:ddl_command]).to include("DROP INDEX")
end

it "captures ALTER TABLE DDL for adding and renaming a column" do
execute_ddl(
"ALTER TABLE #{schema_name}.sellers ADD COLUMN temp_email VARCHAR(255)",
)
execute_ddl(
"ALTER TABLE #{schema_name}.sellers RENAME COLUMN temp_email TO permanent_email",
)

changes = described_class.list_changes(group_name)

expect(changes.size).to eq(2)

sorted_changes = changes.sort_by { |change| change[:created_at] }

add_column_change = sorted_changes[0]
rename_column_change = sorted_changes[1]

expect(add_column_change[:event_type]).to eq("ddl_command_end")
expect(add_column_change[:object_type]).to eq("table")
expect(add_column_change[:object_identity]).to eq(
"#{schema_name}.sellers",
)
expect(add_column_change[:ddl_command]).to include("ALTER TABLE")
expect(add_column_change[:ddl_command]).to include(
"ADD COLUMN temp_email",
)

expect(rename_column_change[:event_type]).to eq("ddl_command_end")
expect(rename_column_change[:object_type]).to eq("table column")
expect(rename_column_change[:object_identity]).to eq(
"#{schema_name}.sellers.permanent_email",
)
expect(rename_column_change[:ddl_command]).to include("ALTER TABLE")
expect(rename_column_change[:ddl_command]).to include(
"RENAME COLUMN temp_email TO permanent_email",
)
end
end

describe ".list_changes" do
Expand Down

0 comments on commit b72d031

Please sign in to comment.