Skip to content

Commit

Permalink
feat(electric): Add a new write-to-pg mode that applies changes direc…
Browse files Browse the repository at this point in the history
…tly to Postgres (electric-sql#698)

When Electric is started in this write mode, instead of creating a
subscription in Postgres and streaming client writes to it over a
logical replication connection, it will use a regular connection to
apply client writes as regular DML statements.

The `direct_writes` mode can be enabled by setting
`ELECTRIC_WRITE_TO_PG_MODE` environment variable to a
 `direct_writes` value. By default, old behavior with creating a
reverse subscription is preserved.
  • Loading branch information
alco authored Dec 7, 2023
1 parent 7d7685e commit 2f3feff
Show file tree
Hide file tree
Showing 29 changed files with 704 additions and 219 deletions.
14 changes: 9 additions & 5 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ on:
pull_request:
paths-ignore:
# Root files
- '*'
- '!pnpm-lock.yaml'
- "*"
- "!pnpm-lock.yaml"
# CI files not related to GH actions
- '.buildkite/**'
- '**/README.md'
- 'docs/**'
- ".buildkite/**"
- "**/README.md"
- "docs/**"

env:
OTP_VERSION: "25.3"
Expand All @@ -25,11 +25,15 @@ jobs:
e2e_tests:
name: E2E tests
runs-on: electric-e2e-8-32
strategy:
matrix:
write_to_pg_mode: [logical_replication, direct_writes]
defaults:
run:
working-directory: e2e
env:
BUILDKITE_ANALYTICS_TOKEN: ${{ secrets.BUILDKITE_TEST_ANALYTICS_E2E }}
ELECTRIC_WRITE_TO_PG_MODE: ${{ matrix.write_to_pg_mode }}
steps:
- uses: actions/checkout@v3
with:
Expand Down
16 changes: 13 additions & 3 deletions components/electric/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ default_pg_proxy_port = "65432"
default_listen_on_ipv6 = "false"
default_database_require_ssl = "false"
default_database_use_ipv6 = "false"
default_write_to_pg_mode = "logical_replication"

###

Expand Down Expand Up @@ -78,12 +79,19 @@ config :logger, :console,
pg_server_port = get_env_int.("LOGICAL_PUBLISHER_PORT", default_pg_server_port)
listen_on_ipv6? = get_env_bool.("ELECTRIC_USE_IPV6", default_listen_on_ipv6)

write_to_pg_mode =
case System.get_env("ELECTRIC_WRITE_TO_PG_MODE", default_write_to_pg_mode) do
"logical_replication" -> :logical_replication
"direct_writes" -> :direct_writes
end

config :electric,
# Used in telemetry, and to identify the server to the client
instance_id: System.get_env("ELECTRIC_INSTANCE_ID", Electric.Utils.uuid4()),
http_port: get_env_int.("HTTP_PORT", default_http_server_port),
pg_server_port: pg_server_port,
listen_on_ipv6?: listen_on_ipv6?
listen_on_ipv6?: listen_on_ipv6?,
write_to_pg_mode: write_to_pg_mode

config :electric, Electric.Replication.Postgres,
pg_client: Electric.Replication.Postgres.Client,
Expand Down Expand Up @@ -127,8 +135,10 @@ if config_env() == :prod do
|> Keyword.put(:replication, "database")

pg_server_host =
System.get_env("LOGICAL_PUBLISHER_HOST") ||
raise("Required environment variable LOGICAL_PUBLISHER_HOST is not set")
if write_to_pg_mode == :logical_replication do
System.get_env("LOGICAL_PUBLISHER_HOST") ||
raise("Required environment variable LOGICAL_PUBLISHER_HOST is not set")
end

proxy_port = get_env_int.("PG_PROXY_PORT", default_pg_proxy_port)

Expand Down
4 changes: 4 additions & 0 deletions components/electric/lib/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Electric do
@moduledoc false

@type reg_name :: {:via, :gproc, {:n, :l, term()}}
@type write_to_pg_mode :: :logical_replication | :direct_writes

@doc """
Register process with the given name
Expand Down Expand Up @@ -99,4 +100,7 @@ defmodule Electric do
def vsn do
@current_vsn
end

@spec write_to_pg_mode :: write_to_pg_mode
def write_to_pg_mode, do: Application.fetch_env!(:electric, :write_to_pg_mode)
end
21 changes: 16 additions & 5 deletions components/electric/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@ defmodule Electric.Application do
Electric.Postgres.Proxy.SASL.SCRAMLockedCache,
Electric.Satellite.SubscriptionManager,
Electric.Satellite.ClientManager,
Electric.Replication.Connectors,
{ThousandIsland,
[port: pg_server_port(), handler_module: Electric.Replication.Postgres.TcpServer] ++
listener_opts()}
Electric.Replication.Connectors
]

children =
children ++
if Electric.write_to_pg_mode() == :logical_replication do
[
{ThousandIsland,
[port: pg_server_port(), handler_module: Electric.Replication.Postgres.TcpServer] ++
listener_opts()}
]
else
[]
end

children =
children ++
unless Application.get_env(:electric, :disable_listeners, false) do
Expand All @@ -41,7 +50,9 @@ defmodule Electric.Application do
|> Enum.each(fn {name, config} ->
Connectors.start_connector(
PostgresConnector,
Keyword.put(config, :origin, to_string(name))
config
|> Keyword.put(:origin, to_string(name))
|> Keyword.put(:write_to_pg_mode, Electric.write_to_pg_mode())
)
end)

Expand Down
3 changes: 2 additions & 1 deletion components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ defmodule Electric.Postgres.Extension do
Migrations.Migration_20230921161418_ProxyCompatibility,
Migrations.Migration_20231009121515_AllowLargeMigrations,
Migrations.Migration_20231010123118_AddPriorityToVersion,
Migrations.Migration_20231016141000_ConvertFunctionToProcedure
Migrations.Migration_20231016141000_ConvertFunctionToProcedure,
Migrations.Migration_20231206130400_ConvertReplicaTriggersToAlways
]
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This function augments the builtin [`session_replication_role`][1] setting to emulate its behaviour for the direct_writes mode.
--
-- With ELECTRIC_WRITE_TO_PG_MODE is set to `logical_replication`, the value of the builtin setting is looked up. On the
-- Electric->PG replication connection its value is `replica`, whereas on regular connections to Postgres it is
-- `origin`.
--
-- When ELECTRIC_WRITE_TO_PG_MODE is set to `direct_writes`, the value of the [custom `electric.session_replication_role` option][2]
-- is looked up. It is used to emulate the same Electric->PG trigger activation behaviour even though a regular connection is
-- used to apply writes to Postgres in this write mode.
--
-- [1]: https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-SESSION-REPLICATION-ROLE
-- [2]: https://www.postgresql.org/docs/14/runtime-config-custom.html
CREATE OR REPLACE FUNCTION <%= @schema %>.__session_replication_role(OUT role text) AS $$
BEGIN
SELECT INTO role current_setting('electric.session_replication_role');
EXCEPTION WHEN undefined_object THEN
SELECT INTO role current_setting('session_replication_role');
END;
$$ LANGUAGE PLPGSQL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
CREATE OR REPLACE FUNCTION <%= @schema %>.install_functions_and_triggers(schema_name TEXT, table_name TEXT, primary_key_list TEXT[], non_pk_column_list TEXT[])
RETURNS VOID
LANGUAGE PLPGSQL
AS $function$
DECLARE
shadow_table_name TEXT := 'shadow__' || schema_name || '__' || table_name;
full_table_identifier TEXT := format('%I.%I', schema_name, table_name);
generated_functions JSONB;
BEGIN
-- Install function to be used in the triggers
generated_functions := <%= @schema %>.install_conflict_resolution_functions(schema_name, table_name, primary_key_list, non_pk_column_list);

-- Install actual triggers
EXECUTE format($$
CREATE OR REPLACE TRIGGER as_first__save_deleted_rows_to_tombstone_table
AFTER DELETE ON %s
FOR EACH ROW
EXECUTE PROCEDURE <%= @schema %>.%I()
$$, full_table_identifier, generated_functions->>'generate_tombstone_entry');
EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER as_first__save_deleted_rows_to_tombstone_table $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__upsert_generate_shadow_rows
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() <> 'replica')
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, full_table_identifier, generated_functions->>'create_shadow_row_from_upsert');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__upsert_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__delete_generate_shadow_rows
BEFORE DELETE ON %s
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() <> 'replica')
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, full_table_identifier, generated_functions->>'update_shadow_row_from_delete');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__delete_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$ DROP TRIGGER IF EXISTS postgres_write__write_resolved_tags ON <%= @schema %>.%I $$, shadow_table_name);
EXECUTE format($$
CREATE CONSTRAINT TRIGGER postgres_write__write_resolved_tags
AFTER UPDATE ON <%= @schema %>.%I
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() <> 'replica' AND NOT NEW._resolved)
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, shadow_table_name, generated_functions->>'write_correct_max_tag');

EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE TRIGGER postgres_write__write_resolved_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__upsert_rows
BEFORE INSERT ON <%= @schema %>.%I
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND pg_trigger_depth() < 1 AND NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, shadow_table_name, generated_functions->>'shadow_insert_to_upsert');

EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE ALWAYS TRIGGER satellite_write__upsert_rows $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__resolve_observed_tags
BEFORE UPDATE ON <%= @schema %>.%I
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, shadow_table_name, generated_functions->>'resolve_observed_tags');

EXECUTE format($$ ALTER TABLE <%= @schema %>.%I ENABLE ALWAYS TRIGGER satellite_write__resolve_observed_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__save_operation_for_reordering
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
WHEN (<%= @schema %>.__session_replication_role() = 'replica' AND pg_trigger_depth() < 1)
EXECUTE PROCEDURE <%= @schema %>.%I();
$$, full_table_identifier, generated_functions->>'reorder_main_op');

EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER satellite_write__save_operation_for_reordering $$, full_table_identifier);
END
$function$;
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230512000000_confli
# This function definition is included here because it is referenced in the definition of
# "trigger_function_installers" below it.
Extension.Functions.by_name(:perform_reordered_op_installer_function),
Extension.Functions.by_name(:__session_replication_role),
@contents["trigger_function_installers"],
@contents["shadow_table_creation_and_update"]
# We need to actually run shadow table creation/updates, but that's handled in the next migration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,86 +482,3 @@ BEGIN
RETURN function_names;
END
$function$;

CREATE OR REPLACE FUNCTION electric.install_functions_and_triggers(schema_name TEXT, table_name TEXT, primary_key_list TEXT[], non_pk_column_list TEXT[])
RETURNS VOID
LANGUAGE PLPGSQL
AS $function$
DECLARE
shadow_table_name TEXT := 'shadow__' || schema_name || '__' || table_name;
full_table_identifier TEXT := format('%I.%I', schema_name, table_name);
generated_functions JSONB;
BEGIN
-- Install function to be used in the triggers
generated_functions := electric.install_conflict_resolution_functions(schema_name, table_name, primary_key_list, non_pk_column_list);

-- Install actual triggers
EXECUTE format($$
CREATE OR REPLACE TRIGGER as_first__save_deleted_rows_to_tombstone_table
AFTER DELETE ON %s
FOR EACH ROW
EXECUTE PROCEDURE electric.%I()
$$, full_table_identifier, generated_functions->>'generate_tombstone_entry');
EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER as_first__save_deleted_rows_to_tombstone_table $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__upsert_generate_shadow_rows
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'create_shadow_row_from_upsert');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__upsert_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__delete_generate_shadow_rows
BEFORE DELETE ON %s
FOR EACH ROW
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'update_shadow_row_from_delete');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__delete_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$ DROP TRIGGER IF EXISTS postgres_write__write_resolved_tags ON electric.%I $$, shadow_table_name);
EXECUTE format($$
CREATE CONSTRAINT TRIGGER postgres_write__write_resolved_tags
AFTER UPDATE ON electric.%I
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
WHEN (NOT NEW._resolved)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'write_correct_max_tag');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE TRIGGER postgres_write__write_resolved_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__upsert_rows
BEFORE INSERT ON electric.%I
FOR EACH ROW
WHEN (pg_trigger_depth() < 1 AND NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'shadow_insert_to_upsert');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE REPLICA TRIGGER satellite_write__upsert_rows $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__resolve_observed_tags
BEFORE UPDATE ON electric.%I
FOR EACH ROW
WHEN (NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'resolve_observed_tags');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE REPLICA TRIGGER satellite_write__resolve_observed_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__save_operation_for_reordering
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
WHEN (pg_trigger_depth() < 1)
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'reorder_main_op');

EXECUTE format($$ ALTER TABLE %s ENABLE REPLICA TRIGGER satellite_write__save_operation_for_reordering $$, full_table_identifier);
END
$function$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Electric.Postgres.Extension.Migrations.Migration_20231206130400_ConvertReplicaTriggersToAlways do
alias Electric.Postgres.Extension

@behaviour Extension.Migration

sql_file =
Path.expand(
"20231206130400_convert_replica_triggers_to_always/replace_replica_triggers.sql",
__DIR__
)

@external_resource sql_file

@migration_sql File.read!(sql_file)

@impl true
def version, do: 2023_12_06_13_04_00

@impl true
def up(schema) do
[
Extension.Functions.by_name(:__session_replication_role),
String.replace(@migration_sql, "electric", schema)
]
end

@impl true
def down(_schema) do
[]
end
end
Loading

0 comments on commit 2f3feff

Please sign in to comment.