From 8c642008224fb10c2fc3aefea6b8fa5bddf5383e Mon Sep 17 00:00:00 2001 From: Shayon Mukherjee Date: Sun, 21 Jan 2024 16:58:01 -0500 Subject: [PATCH] Extend config check to assert for REPLICA IDENTITY on tables and drop index bug fix - config check now performs an additional check to ensure all tables replicating (or about to be) have some form of replica identity setup. This way, if when the subscription starts, it doesn't fail hard if the one of the tables cannot be replicate. This usually involves having a PK, index or FULL REPLICA identity - Bug fix with drop indices when custom set of tables are passed - Refactor a bit of tables list is derived --- lib/pg_easy_replicate.rb | 82 ++++++++++++++++++- lib/pg_easy_replicate/cli.rb | 10 +++ lib/pg_easy_replicate/helper.rb | 27 +++++++ lib/pg_easy_replicate/index_manager.rb | 6 +- lib/pg_easy_replicate/orchestrate.rb | 85 +++++++------------- spec/pg_easy_replicate/index_manager_spec.rb | 66 ++++++++++++--- spec/pg_easy_replicate/orchestrate_spec.rb | 34 +++++--- spec/pg_easy_replicate_spec.rb | 38 ++++++++- 8 files changed, 265 insertions(+), 83 deletions(-) diff --git a/lib/pg_easy_replicate.rb b/lib/pg_easy_replicate.rb index 4fbd5a7..bbbfff6 100644 --- a/lib/pg_easy_replicate.rb +++ b/lib/pg_easy_replicate.rb @@ -26,7 +26,12 @@ class Error < StandardError extend Helper class << self - def config(special_user_role: nil, copy_schema: false) + def config( + special_user_role: nil, + copy_schema: false, + tables: "", + schema_name: nil + ) abort_with("SOURCE_DB_URL is missing") if source_db_url.nil? abort_with("TARGET_DB_URL is missing") if target_db_url.nil? @@ -56,15 +61,31 @@ def config(special_user_role: nil, copy_schema: false) user: db_user(target_db_url), ), pg_dump_exists: pg_dump_exists, + tables_have_replica_identity: + tables_have_replica_identity?( + conn_string: source_db_url, + tables: tables, + schema_name: schema_name, + ), } rescue => e abort_with("Unable to check config: #{e.message}") end end - def assert_config(special_user_role: nil, copy_schema: false) + def assert_config( + special_user_role: nil, + copy_schema: false, + tables: "", + schema_name: nil + ) config_hash = - config(special_user_role: special_user_role, copy_schema: copy_schema) + config( + special_user_role: special_user_role, + copy_schema: copy_schema, + tables: tables, + schema_name: schema_name, + ) if copy_schema && !config_hash.dig(:pg_dump_exists) abort_with("pg_dump must exist if copy_schema (-c) is passed") @@ -82,6 +103,19 @@ def assert_config(special_user_role: nil, copy_schema: false) abort_with("User on source database does not have super user privilege") end + if tables.split(",").size > 0 && (schema_name.nil? || schema_name == "") + abort_with("Schema name is required if tables are passed") + end + + unless config_hash.dig(:tables_have_replica_identity) + abort_with( + "Ensure all tables involved in logical replication have an appropriate replica identity set. This can be done using: + 1. Default (Primary Key): `ALTER TABLE table_name REPLICA IDENTITY DEFAULT;` + 2. Unique Index: `ALTER TABLE table_name REPLICA IDENTITY USING INDEX index_name;` + 3. Full (All Columns): `ALTER TABLE table_name REPLICA IDENTITY FULL;`", + ) + end + return if config_hash.dig(:target_db_is_super_user) abort_with("User on target database does not have super user privilege") end @@ -352,5 +386,47 @@ def user_exists?(conn_string:, user: internal_user_name) ) .any? { |q| q[:username] == user } end + + def tables_have_replica_identity?( + conn_string:, + tables: "", + schema_name: nil + ) + schema_name ||= "public" + + table_list = + determine_tables( + schema: schema_name, + conn_string: source_db_url, + list: tables, + ) + return false if table_list.empty? + + formatted_table_list = table_list.map { |table| "'#{table}'" }.join(", ") + + sql = <<~SQL + SELECT t.relname AS table_name, + CASE + WHEN t.relreplident = 'd' THEN 'default' + WHEN t.relreplident = 'n' THEN 'nothing' + WHEN t.relreplident = 'i' THEN 'index' + WHEN t.relreplident = 'f' THEN 'full' + END AS replica_identity + FROM pg_class t + JOIN pg_namespace ns ON t.relnamespace = ns.oid + WHERE ns.nspname = '#{schema_name}' + AND t.relkind = 'r' + AND t.relname IN (#{formatted_table_list}) + SQL + + results = + Query.run( + query: sql, + connection_url: conn_string, + user: db_user(conn_string), + ) + + results.all? { |r| r[:replica_identity] != "nothing" } + end end end diff --git a/lib/pg_easy_replicate/cli.rb b/lib/pg_easy_replicate/cli.rb index 4dc25be..3b36e06 100644 --- a/lib/pg_easy_replicate/cli.rb +++ b/lib/pg_easy_replicate/cli.rb @@ -16,10 +16,20 @@ class CLI < Thor aliases: "-c", boolean: true, desc: "Copy schema to the new database" + method_option :tables, + aliases: "-t", + desc: + "Comma separated list of table names. Default: All tables" + method_option :schema_name, + aliases: "-s", + desc: + "Name of the schema tables are in, only required if passing list of tables" def config_check PgEasyReplicate.assert_config( special_user_role: options[:special_user_role], copy_schema: options[:copy_schema], + tables: options[:tables], + schema_name: options[:schema_name], ) puts "✅ Config is looking good." diff --git a/lib/pg_easy_replicate/helper.rb b/lib/pg_easy_replicate/helper.rb index acae51f..b384f39 100644 --- a/lib/pg_easy_replicate/helper.rb +++ b/lib/pg_easy_replicate/helper.rb @@ -72,5 +72,32 @@ def abort_with(msg) raise(msg) if test_env? abort(msg) end + + def determine_tables(conn_string:, list: "", schema: nil) + schema ||= "public" + + tables = list&.split(",") || [] + if tables.size > 0 + tables + else + list_all_tables(schema: schema, conn_string: conn_string) + end + end + + def list_all_tables(schema:, conn_string:) + Query + .run( + query: + "SELECT table_name + FROM information_schema.tables + WHERE table_schema = '#{schema}' AND + table_type = 'BASE TABLE' + ORDER BY table_name", + connection_url: conn_string, + user: db_user(conn_string), + ) + .map(&:values) + .flatten + end end end diff --git a/lib/pg_easy_replicate/index_manager.rb b/lib/pg_easy_replicate/index_manager.rb index aae7da4..19db25c 100644 --- a/lib/pg_easy_replicate/index_manager.rb +++ b/lib/pg_easy_replicate/index_manager.rb @@ -17,7 +17,7 @@ def self.drop_indices( tables: tables, schema: schema, ).each do |index| - drop_sql = "DROP INDEX CONCURRENTLY #{index[:index_name]};" + drop_sql = "DROP INDEX CONCURRENTLY #{schema}.#{index[:index_name]};" Query.run( query: drop_sql, @@ -56,8 +56,8 @@ def self.recreate_indices( end def self.fetch_indices(conn_string:, tables:, schema:) - return [] if tables.split(",").empty? - table_list = tables.split(",").map { |table| "'#{table}'" }.join(",") + return [] if tables.empty? + table_list = tables.map { |table| "'#{table}'" }.join(",") sql = <<-SQL SELECT diff --git a/lib/pg_easy_replicate/orchestrate.rb b/lib/pg_easy_replicate/orchestrate.rb index 4816e35..b1965c4 100644 --- a/lib/pg_easy_replicate/orchestrate.rb +++ b/lib/pg_easy_replicate/orchestrate.rb @@ -46,7 +46,7 @@ def start_sync(options) Group.create( name: options[:group_name], - table_names: tables, + table_names: tables.join(","), schema_name: schema_name, started_at: Time.now.utc, recreate_indices_post_copy: options[:recreate_indices_post_copy], @@ -63,7 +63,7 @@ def start_sync(options) else Group.create( name: options[:group_name], - table_names: tables, + table_names: tables.join(","), schema_name: schema_name, started_at: Time.now.utc, failed_at: Time.now.utc, @@ -92,42 +92,24 @@ def add_tables_to_publication( schema:, group_name:, conn_string:, - tables: "" + tables: [] ) logger.info( "Adding tables up publication", { publication_name: publication_name(group_name) }, ) - tables - .split(",") - .map do |table_name| - Query.run( - query: - "ALTER PUBLICATION #{quote_ident(publication_name(group_name))} - ADD TABLE #{quote_ident(table_name)}", - connection_url: conn_string, - schema: schema, - ) - end - rescue => e - raise "Unable to add tables to publication: #{e.message}" - end - - def list_all_tables(schema:, conn_string:) - Query - .run( + tables.map do |table_name| + Query.run( query: - "SELECT table_name - FROM information_schema.tables - WHERE table_schema = '#{schema}' AND - table_type = 'BASE TABLE' - ORDER BY table_name", + "ALTER PUBLICATION #{quote_ident(publication_name(group_name))} + ADD TABLE #{quote_ident(table_name)}", connection_url: conn_string, + schema: schema, ) - .map(&:values) - .flatten - .join(",") + end + rescue => e + raise "Unable to add tables to publication: #{e.message}" end def drop_publication(group_name:, conn_string:) @@ -225,10 +207,11 @@ def switchover( lag_delta_size: nil ) group = Group.find(group_name) + tables_list = group[:table_names].split(",") run_vacuum_analyze( conn_string: target_conn_string, - tables: group[:table_names], + tables: tables_list, schema: group[:schema_name], ) @@ -239,7 +222,7 @@ def switchover( IndexManager.recreate_indices( source_conn_string: source_db_url, target_conn_string: target_db_url, - tables: group[:table_names], + tables: tables_list, schema: group[:schema_name], ) end @@ -257,7 +240,7 @@ def switchover( # Run vacuum analyze to refresh the planner post switchover run_vacuum_analyze( conn_string: target_conn_string, - tables: group[:table_names], + tables: tables_list, schema: group[:schema_name], ) drop_subscription( @@ -369,21 +352,19 @@ def refresh_sequences(conn_string:, schema: nil) end def run_vacuum_analyze(conn_string:, tables:, schema:) - tables - .split(",") - .each do |t| - logger.info( - "Running vacuum analyze on #{t}", - schema: schema, - table: t, - ) - Query.run( - query: "VACUUM VERBOSE ANALYZE #{t}", - connection_url: conn_string, - schema: schema, - transaction: false, - ) - end + tables.each do |t| + logger.info( + "Running vacuum analyze on #{t}", + schema: schema, + table: t, + ) + Query.run( + query: "VACUUM VERBOSE ANALYZE #{t}", + connection_url: conn_string, + schema: schema, + transaction: false, + ) + end rescue => e raise "Unable to run vacuum and analyze: #{e.message}" end @@ -391,16 +372,6 @@ def run_vacuum_analyze(conn_string:, tables:, schema:) def mark_switchover_complete(group_name) Group.update(group_name: group_name, switchover_completed_at: Time.now) end - - private - - def determine_tables(schema:, conn_string:, list: "") - tables = list&.split(",") || [] - unless tables.size > 0 - return list_all_tables(schema: schema, conn_string: conn_string) - end - "" - end end end end diff --git a/spec/pg_easy_replicate/index_manager_spec.rb b/spec/pg_easy_replicate/index_manager_spec.rb index f862a2b..ad16e13 100644 --- a/spec/pg_easy_replicate/index_manager_spec.rb +++ b/spec/pg_easy_replicate/index_manager_spec.rb @@ -18,12 +18,24 @@ result = described_class.fetch_indices( conn_string: connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) expect(result).to eq( [ + { + table_name: "items", + index_name: "items_id_index", + index_definition: + "CREATE INDEX items_id_index ON pger_test.items USING btree (id)", + }, + { + table_name: "items", + index_name: "items_seller_id_index", + index_definition: + "CREATE INDEX items_seller_id_index ON pger_test.items USING btree (seller_id)", + }, { table_name: "sellers", index_name: "sellers_id_index", @@ -57,12 +69,24 @@ result = described_class.fetch_indices( conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) expect(result).to eq( [ + { + table_name: "items", + index_name: "items_id_index", + index_definition: + "CREATE INDEX items_id_index ON pger_test.items USING btree (id)", + }, + { + table_name: "items", + index_name: "items_seller_id_index", + index_definition: + "CREATE INDEX items_seller_id_index ON pger_test.items USING btree (seller_id)", + }, { table_name: "sellers", index_name: "sellers_id_index", @@ -81,14 +105,14 @@ described_class.drop_indices( source_conn_string: connection_url, target_conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) result = described_class.fetch_indices( conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) @@ -112,12 +136,24 @@ result = described_class.fetch_indices( conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) expect(result).to eq( [ + { + table_name: "items", + index_name: "items_id_index", + index_definition: + "CREATE INDEX items_id_index ON pger_test.items USING btree (id)", + }, + { + table_name: "items", + index_name: "items_seller_id_index", + index_definition: + "CREATE INDEX items_seller_id_index ON pger_test.items USING btree (seller_id)", + }, { table_name: "sellers", index_name: "sellers_id_index", @@ -136,7 +172,7 @@ described_class.drop_indices( source_conn_string: connection_url, target_conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) @@ -145,7 +181,7 @@ result = described_class.fetch_indices( conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) @@ -154,7 +190,7 @@ described_class.recreate_indices( source_conn_string: connection_url, target_conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) @@ -162,12 +198,24 @@ result = described_class.fetch_indices( conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) expect(result).to eq( [ + { + table_name: "items", + index_name: "items_id_index", + index_definition: + "CREATE INDEX items_id_index ON pger_test.items USING btree (id)", + }, + { + table_name: "items", + index_name: "items_seller_id_index", + index_definition: + "CREATE INDEX items_seller_id_index ON pger_test.items USING btree (seller_id)", + }, { table_name: "sellers", index_name: "sellers_id_index", diff --git a/spec/pg_easy_replicate/orchestrate_spec.rb b/spec/pg_easy_replicate/orchestrate_spec.rb index f90d06c..3dc55dd 100644 --- a/spec/pg_easy_replicate/orchestrate_spec.rb +++ b/spec/pg_easy_replicate/orchestrate_spec.rb @@ -77,7 +77,7 @@ group_name: "cluster1", schema: test_schema, conn_string: connection_url, - tables: "items,sellers", + tables: %w[items sellers], ) expect(pg_publication_tables(connection_url: connection_url)).to eq( @@ -101,7 +101,7 @@ group_name: "cluster1", schema: test_schema, conn_string: connection_url, - tables: "sellers,", + tables: ["sellers"], ) expect(pg_publication_tables(connection_url: connection_url)).to eq( @@ -133,7 +133,7 @@ schema: test_schema, conn_string: connection_url, ) - expect(r).to eq("items,sellers") + expect(r.sort).to eq(%w[items sellers]) end end @@ -333,7 +333,8 @@ described_class.run_vacuum_analyze( conn_string: target_connection_url, schema: test_schema, - tables: PgEasyReplicate::Group.find("cluster1")[:table_names], + tables: + PgEasyReplicate::Group.find("cluster1")[:table_names].split(","), ) sleep 2 @@ -441,12 +442,24 @@ result = PgEasyReplicate::IndexManager.fetch_indices( conn_string: target_connection_url, - tables: "sellers, items", + tables: %w[sellers items], schema: test_schema, ) expect(result).to eq( [ + { + table_name: "items", + index_name: "items_id_index", + index_definition: + "CREATE INDEX items_id_index ON pger_test.items USING btree (id)", + }, + { + table_name: "items", + index_name: "items_seller_id_index", + index_definition: + "CREATE INDEX items_seller_id_index ON pger_test.items USING btree (seller_id)", + }, { table_name: "sellers", index_name: "sellers_id_index", @@ -497,7 +510,10 @@ setup_roles setup_tables("james-bond_role_regular") - PgEasyReplicate.assert_config(special_user_role: "james-bond_super_role") + PgEasyReplicate.assert_config( + special_user_role: "james-bond_super_role", + schema_name: test_schema, + ) PgEasyReplicate.bootstrap( { group_name: "cluster1", @@ -516,7 +532,7 @@ ENV["TARGET_DB_URL"] = target_connection_url end - it "succesfully raises create subscription super user error" do + it "succesfully raises lack of super user error" do conn1 = PgEasyReplicate::Query.connect( connection_url: connection_url("james-bond_role_regular"), @@ -545,9 +561,7 @@ schema_name: test_schema, recreate_indices_post_copy: true, ) - end.to raise_error( - /Starting sync failed: Unable to create subscription: PG::InsufficientPrivilege: ERROR: must be superuser to create subscriptions/, - ) + end.to raise_error(/Starting sync failed: PG::InsufficientPrivilege/) # expect(PgEasyReplicate::Group.find("cluster1")).to include( # switchover_completed_at: nil, diff --git a/spec/pg_easy_replicate_spec.rb b/spec/pg_easy_replicate_spec.rb index 1cea2a4..c149642 100644 --- a/spec/pg_easy_replicate_spec.rb +++ b/spec/pg_easy_replicate_spec.rb @@ -6,8 +6,12 @@ end describe ".config" do + before { setup_tables } + + after { teardown_tables } + it "returns the config for both databases" do - result = described_class.config + result = described_class.config(schema_name: test_schema) expect(result).to eq( { source_db_is_super_user: true, @@ -19,6 +23,7 @@ { name: "max_worker_processes", setting: "8" }, { name: "wal_level", setting: "logical" }, ], + tables_have_replica_identity: true, target_db: [ { name: "max_logical_replication_workers", setting: "4" }, { name: "max_replication_slots", setting: "10" }, @@ -84,6 +89,7 @@ target_db_is_super_user: false, target_db: [{ name: "wal_level", setting: "logical" }], source_db: [{ name: "wal_level", setting: "logical" }], + tables_have_replica_identity: true, }, ) expect { described_class.assert_config }.to raise_error( @@ -106,6 +112,36 @@ described_class.assert_config(copy_schema: true) }.to raise_error("pg_dump must exist if copy_schema (-c) is passed") end + + it "raises error when tables don't have replicat identity" do + allow(described_class).to receive(:config).and_return( + { + source_db_is_super_user: true, + target_db_is_super_user: true, + target_db: [{ name: "wal_level", setting: "logical" }], + source_db: [{ name: "wal_level", setting: "logical" }], + tables_have_replica_identity: false, + }, + ) + expect { described_class.assert_config }.to raise_error( + /Ensure all tables involved in logical replication have an appropriate replica identity/, + ) + end + + it "raises error when table is provided but schema isn't" do + allow(described_class).to receive(:config).and_return( + { + source_db_is_super_user: true, + target_db_is_super_user: true, + target_db: [{ name: "wal_level", setting: "logical" }], + source_db: [{ name: "wal_level", setting: "logical" }], + tables_have_replica_identity: true, + }, + ) + expect { + described_class.assert_config(tables: "items") + }.to raise_error(/Schema name is required if tables are passed/) + end end describe ".is_super_user?" do