Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend config check to assert for REPLICA IDENTITY on tables and drop index bug fix #89

Merged
merged 1 commit into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions lib/pg_easy_replicate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ class Error < StandardError
extend Helper

class << self
def config(special_user_role: nil, copy_schema: false)
def config(
special_user_role: nil,
copy_schema: false,
tables: "",
schema_name: nil
)
abort_with("SOURCE_DB_URL is missing") if source_db_url.nil?
abort_with("TARGET_DB_URL is missing") if target_db_url.nil?

Expand Down Expand Up @@ -56,15 +61,31 @@ def config(special_user_role: nil, copy_schema: false)
user: db_user(target_db_url),
),
pg_dump_exists: pg_dump_exists,
tables_have_replica_identity:
tables_have_replica_identity?(
conn_string: source_db_url,
tables: tables,
schema_name: schema_name,
),
}
rescue => e
abort_with("Unable to check config: #{e.message}")
end
end

def assert_config(special_user_role: nil, copy_schema: false)
def assert_config(
special_user_role: nil,
copy_schema: false,
tables: "",
schema_name: nil
)
config_hash =
config(special_user_role: special_user_role, copy_schema: copy_schema)
config(
special_user_role: special_user_role,
copy_schema: copy_schema,
tables: tables,
schema_name: schema_name,
)

if copy_schema && !config_hash.dig(:pg_dump_exists)
abort_with("pg_dump must exist if copy_schema (-c) is passed")
Expand All @@ -82,6 +103,19 @@ def assert_config(special_user_role: nil, copy_schema: false)
abort_with("User on source database does not have super user privilege")
end

if tables.split(",").size > 0 && (schema_name.nil? || schema_name == "")
abort_with("Schema name is required if tables are passed")
end

unless config_hash.dig(:tables_have_replica_identity)
abort_with(
"Ensure all tables involved in logical replication have an appropriate replica identity set. This can be done using:
1. Default (Primary Key): `ALTER TABLE table_name REPLICA IDENTITY DEFAULT;`
2. Unique Index: `ALTER TABLE table_name REPLICA IDENTITY USING INDEX index_name;`
3. Full (All Columns): `ALTER TABLE table_name REPLICA IDENTITY FULL;`",
)
end

return if config_hash.dig(:target_db_is_super_user)
abort_with("User on target database does not have super user privilege")
end
Expand Down Expand Up @@ -352,5 +386,47 @@ def user_exists?(conn_string:, user: internal_user_name)
)
.any? { |q| q[:username] == user }
end

def tables_have_replica_identity?(
conn_string:,
tables: "",
schema_name: nil
)
schema_name ||= "public"

table_list =
determine_tables(
schema: schema_name,
conn_string: source_db_url,
list: tables,
)
return false if table_list.empty?

formatted_table_list = table_list.map { |table| "'#{table}'" }.join(", ")

sql = <<~SQL
SELECT t.relname AS table_name,
CASE
WHEN t.relreplident = 'd' THEN 'default'
WHEN t.relreplident = 'n' THEN 'nothing'
WHEN t.relreplident = 'i' THEN 'index'
WHEN t.relreplident = 'f' THEN 'full'
END AS replica_identity
FROM pg_class t
JOIN pg_namespace ns ON t.relnamespace = ns.oid
WHERE ns.nspname = '#{schema_name}'
AND t.relkind = 'r'
AND t.relname IN (#{formatted_table_list})
SQL

results =
Query.run(
query: sql,
connection_url: conn_string,
user: db_user(conn_string),
)

results.all? { |r| r[:replica_identity] != "nothing" }
end
end
end
10 changes: 10 additions & 0 deletions lib/pg_easy_replicate/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@ class CLI < Thor
aliases: "-c",
boolean: true,
desc: "Copy schema to the new database"
method_option :tables,
aliases: "-t",
desc:
"Comma separated list of table names. Default: All tables"
method_option :schema_name,
aliases: "-s",
desc:
"Name of the schema tables are in, only required if passing list of tables"
def config_check
PgEasyReplicate.assert_config(
special_user_role: options[:special_user_role],
copy_schema: options[:copy_schema],
tables: options[:tables],
schema_name: options[:schema_name],
)

puts "✅ Config is looking good."
Expand Down
27 changes: 27 additions & 0 deletions lib/pg_easy_replicate/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,32 @@ def abort_with(msg)
raise(msg) if test_env?
abort(msg)
end

def determine_tables(conn_string:, list: "", schema: nil)
schema ||= "public"

tables = list&.split(",") || []
if tables.size > 0
tables
else
list_all_tables(schema: schema, conn_string: conn_string)
end
end

def list_all_tables(schema:, conn_string:)
Query
.run(
query:
"SELECT table_name
FROM information_schema.tables
WHERE table_schema = '#{schema}' AND
table_type = 'BASE TABLE'
ORDER BY table_name",
connection_url: conn_string,
user: db_user(conn_string),
)
.map(&:values)
.flatten
end
end
end
6 changes: 3 additions & 3 deletions lib/pg_easy_replicate/index_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def self.drop_indices(
tables: tables,
schema: schema,
).each do |index|
drop_sql = "DROP INDEX CONCURRENTLY #{index[:index_name]};"
drop_sql = "DROP INDEX CONCURRENTLY #{schema}.#{index[:index_name]};"

Query.run(
query: drop_sql,
Expand Down Expand Up @@ -56,8 +56,8 @@ def self.recreate_indices(
end

def self.fetch_indices(conn_string:, tables:, schema:)
return [] if tables.split(",").empty?
table_list = tables.split(",").map { |table| "'#{table}'" }.join(",")
return [] if tables.empty?
table_list = tables.map { |table| "'#{table}'" }.join(",")

sql = <<-SQL
SELECT
Expand Down
85 changes: 28 additions & 57 deletions lib/pg_easy_replicate/orchestrate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def start_sync(options)

Group.create(
name: options[:group_name],
table_names: tables,
table_names: tables.join(","),
schema_name: schema_name,
started_at: Time.now.utc,
recreate_indices_post_copy: options[:recreate_indices_post_copy],
Expand All @@ -63,7 +63,7 @@ def start_sync(options)
else
Group.create(
name: options[:group_name],
table_names: tables,
table_names: tables.join(","),
schema_name: schema_name,
started_at: Time.now.utc,
failed_at: Time.now.utc,
Expand Down Expand Up @@ -92,42 +92,24 @@ def add_tables_to_publication(
schema:,
group_name:,
conn_string:,
tables: ""
tables: []
)
logger.info(
"Adding tables up publication",
{ publication_name: publication_name(group_name) },
)

tables
.split(",")
.map do |table_name|
Query.run(
query:
"ALTER PUBLICATION #{quote_ident(publication_name(group_name))}
ADD TABLE #{quote_ident(table_name)}",
connection_url: conn_string,
schema: schema,
)
end
rescue => e
raise "Unable to add tables to publication: #{e.message}"
end

def list_all_tables(schema:, conn_string:)
Query
.run(
tables.map do |table_name|
Query.run(
query:
"SELECT table_name
FROM information_schema.tables
WHERE table_schema = '#{schema}' AND
table_type = 'BASE TABLE'
ORDER BY table_name",
"ALTER PUBLICATION #{quote_ident(publication_name(group_name))}
ADD TABLE #{quote_ident(table_name)}",
connection_url: conn_string,
schema: schema,
)
.map(&:values)
.flatten
.join(",")
end
rescue => e
raise "Unable to add tables to publication: #{e.message}"
end

def drop_publication(group_name:, conn_string:)
Expand Down Expand Up @@ -225,10 +207,11 @@ def switchover(
lag_delta_size: nil
)
group = Group.find(group_name)
tables_list = group[:table_names].split(",")

run_vacuum_analyze(
conn_string: target_conn_string,
tables: group[:table_names],
tables: tables_list,
schema: group[:schema_name],
)

Expand All @@ -239,7 +222,7 @@ def switchover(
IndexManager.recreate_indices(
source_conn_string: source_db_url,
target_conn_string: target_db_url,
tables: group[:table_names],
tables: tables_list,
schema: group[:schema_name],
)
end
Expand All @@ -257,7 +240,7 @@ def switchover(
# Run vacuum analyze to refresh the planner post switchover
run_vacuum_analyze(
conn_string: target_conn_string,
tables: group[:table_names],
tables: tables_list,
schema: group[:schema_name],
)
drop_subscription(
Expand Down Expand Up @@ -369,38 +352,26 @@ def refresh_sequences(conn_string:, schema: nil)
end

def run_vacuum_analyze(conn_string:, tables:, schema:)
tables
.split(",")
.each do |t|
logger.info(
"Running vacuum analyze on #{t}",
schema: schema,
table: t,
)
Query.run(
query: "VACUUM VERBOSE ANALYZE #{t}",
connection_url: conn_string,
schema: schema,
transaction: false,
)
end
tables.each do |t|
logger.info(
"Running vacuum analyze on #{t}",
schema: schema,
table: t,
)
Query.run(
query: "VACUUM VERBOSE ANALYZE #{t}",
connection_url: conn_string,
schema: schema,
transaction: false,
)
end
rescue => e
raise "Unable to run vacuum and analyze: #{e.message}"
end

def mark_switchover_complete(group_name)
Group.update(group_name: group_name, switchover_completed_at: Time.now)
end

private

def determine_tables(schema:, conn_string:, list: "")
tables = list&.split(",") || []
unless tables.size > 0
return list_all_tables(schema: schema, conn_string: conn_string)
end
""
end
end
end
end
Loading