Skip to content

Commit

Permalink
feat(electric): Allow the migrations proxy to accept WebSocket connec…
Browse files Browse the repository at this point in the history
…tions (electric-sql#738)

Closes VAX-1417.
  • Loading branch information
alco authored Dec 12, 2023
1 parent 071175d commit b57ec92
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 136 deletions.
5 changes: 5 additions & 0 deletions .changeset/poor-wolves-beam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

[VAX-1417] Add the option to tunnel TCP connections to the migrations proxy over regular WebSocket connections.
46 changes: 24 additions & 22 deletions components/electric/README.md

Large diffs are not rendered by default.

40 changes: 1 addition & 39 deletions components/electric/config/runtime.dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,4 @@ config :logger, level: :debug
auth_provider = System.get_env("AUTH_MODE", "secure") |> Electric.Satellite.Auth.build_provider!()
config :electric, Electric.Satellite.Auth, provider: auth_provider

proxy_port = System.get_env("PG_PROXY_PORT", "65432") |> String.to_integer()

proxy_password =
System.get_env("PG_PROXY_PASSWORD", "password")

config :electric, Electric.Replication.Connectors,
postgres_1: [
producer: Electric.Replication.Postgres.LogicalReplicationProducer,
connection: [
host: ~c"localhost",
port: 54321,
database: ~c"electric",
username: ~c"postgres",
password: ~c"password",
replication: ~c"database",
ssl: false
],
replication: [
electric_connection: [
host: "host.docker.internal",
port: 5433,
dbname: "test"
]
],
proxy: [
# listen opts are ThousandIsland.options()
# https://hexdocs.pm/thousand_island/ThousandIsland.html#t:options/0
listen: [
port: proxy_port,
transport_options: [:inet6]
],
password: proxy_password,
log_level: :info
]
]

enable_proxy_tracing? = System.get_env("PROXY_TRACING_ENABLE", "false") in ["yes", "true"]

config :electric, Electric.Postgres.Proxy.Handler.Tracing, enable: enable_proxy_tracing?
config :electric, Electric.Postgres.Proxy.Handler.Tracing, colour: true
146 changes: 88 additions & 58 deletions components/electric/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,25 @@ default_database_require_ssl = "false"
default_database_use_ipv6 = "false"
default_write_to_pg_mode = "logical_replication"

# These defaults are only set for the dev environment, so in prod errors will still get raises if any of the required
# configuration options aren't set by the user.
default_database_url =
if config_env() == :dev, do: "postgresql://postgres:password@localhost:54321/electric"

default_logical_publisher_host = if config_env() == :dev, do: "host.docker.internal"
default_pg_proxy_password = if config_env() == :dev, do: "password"

###

# We only want to raise in prod because this config is also loaded in the test environment where we do not define
# default values for all required options.
raise_in_prod =
if config_env() == :prod do
fn msg -> raise msg end
else
fn _ -> nil end
end

get_env_bool = fn name, default ->
case String.downcase(System.get_env(name, default)) do
truthy when truthy in ~w[yes true] -> true
Expand Down Expand Up @@ -119,66 +136,85 @@ config :electric, Electric.Features,
proxy_ddlx_assign: false,
proxy_ddlx_unassign: false

# The :prod environment is inlined here because by default Mix won't copy any config/runtime.*.exs files when assembling
# a release, and we want a single configuration file in our release.
if config_env() == :prod do
auth_provider =
System.get_env("AUTH_MODE", default_auth_mode) |> Electric.Satellite.Auth.build_provider!()
require_ssl? = get_env_bool.("DATABASE_REQUIRE_SSL", default_database_require_ssl)
use_ipv6? = get_env_bool.("DATABASE_USE_IPV6", default_database_use_ipv6)

postgresql_connection =
case System.get_env("DATABASE_URL", default_database_url) do
nil ->
raise_in_prod.("Required environment variable DATABASE_URL is not set")
nil

database_url ->
database_url
|> Electric.Utils.parse_postgresql_uri()
|> Keyword.put_new(:ssl, require_ssl?)
|> Keyword.put(:ipv6, use_ipv6?)
|> Keyword.update(:timeout, 5_000, &String.to_integer/1)
|> Keyword.put(:replication, "database")
end

config :electric, Electric.Satellite.Auth, provider: auth_provider
pg_server_host =
if write_to_pg_mode == :logical_replication do
System.get_env("LOGICAL_PUBLISHER_HOST", default_logical_publisher_host) ||
raise_in_prod.("Required environment variable LOGICAL_PUBLISHER_HOST is not set")
end

require_ssl? = get_env_bool.("DATABASE_REQUIRE_SSL", default_database_require_ssl)
use_ipv6? = get_env_bool.("DATABASE_USE_IPV6", default_database_use_ipv6)

postgresql_connection =
System.fetch_env!("DATABASE_URL")
|> Electric.Utils.parse_postgresql_uri()
|> Keyword.put_new(:ssl, require_ssl?)
|> Keyword.put(:ipv6, use_ipv6?)
|> Keyword.update(:timeout, 5_000, &String.to_integer/1)
|> Keyword.put(:replication, "database")

pg_server_host =
if write_to_pg_mode == :logical_replication do
System.get_env("LOGICAL_PUBLISHER_HOST") ||
raise("Required environment variable LOGICAL_PUBLISHER_HOST is not set")
end
{use_http_tunnel?, proxy_port_str} =
case String.downcase(System.get_env("PG_PROXY_PORT", default_pg_proxy_port)) do
"http" -> {true, default_pg_proxy_port}
"http:" <> port_str -> {true, port_str}
port_str -> {false, port_str}
end

proxy_port = get_env_int.("PG_PROXY_PORT", default_pg_proxy_port)
proxy_port = String.to_integer(proxy_port_str)

proxy_password =
System.get_env("PG_PROXY_PASSWORD") ||
raise("Required environment variable PG_PROXY_PASSWORD is not set")
proxy_password =
System.get_env("PG_PROXY_PASSWORD", default_pg_proxy_password) ||
raise_in_prod.("Required environment variable PG_PROXY_PASSWORD is not set")

proxy_listener_opts =
if listen_on_ipv6? do
[transport_options: [:inet6]]
else
[]
end
proxy_listener_opts =
if listen_on_ipv6? do
[transport_options: [:inet6]]
else
[]
end

connectors = [
{"postgres_1",
producer: Electric.Replication.Postgres.LogicalReplicationProducer,
connection: postgresql_connection,
replication: [
electric_connection: [
host: pg_server_host,
port: pg_server_port,
dbname: "electric",
connect_timeout: postgresql_connection[:timeout]
]
],
proxy: [
# listen opts are ThousandIsland.options()
# https://hexdocs.pm/thousand_island/ThousandIsland.html#t:options/0
listen: [port: proxy_port] ++ proxy_listener_opts,
password: proxy_password,
log_level: log_level
]}
config :electric, Electric.Replication.Connectors,
postgres_1: [
producer: Electric.Replication.Postgres.LogicalReplicationProducer,
connection: postgresql_connection,
replication: [
electric_connection: [
host: pg_server_host,
port: pg_server_port,
dbname: "electric",
connect_timeout: postgresql_connection[:timeout]
]
],
proxy: [
# listen opts are ThousandIsland.options()
# https://hexdocs.pm/thousand_island/ThousandIsland.html#t:options/0
listen: [port: proxy_port] ++ proxy_listener_opts,
use_http_tunnel?: use_http_tunnel?,
password: proxy_password,
log_level: log_level
]
]

config :electric, Electric.Replication.Connectors, connectors
enable_proxy_tracing? = System.get_env("PROXY_TRACING_ENABLE", "false") in ["yes", "true"]

config :electric, Electric.Postgres.Proxy.Handler.Tracing,
enable: enable_proxy_tracing?,
colour: false

# The :prod environment is inlined here because by default Mix won't copy any config/runtime.*.exs files when assembling
# a release, and we want a single configuration file in our release.
if config_env() == :prod do
auth_provider =
System.get_env("AUTH_MODE", default_auth_mode) |> Electric.Satellite.Auth.build_provider!()

config :electric, Electric.Satellite.Auth, provider: auth_provider

# This is intentionally an atom and not a boolean - we expect to add `:extended` state
telemetry =
Expand All @@ -190,12 +226,6 @@ if config_env() == :prod do
end

config :electric, :telemetry, telemetry

enable_proxy_tracing? = System.get_env("PROXY_TRACING_ENABLE", "false") in ["yes", "true"]

config :electric, Electric.Postgres.Proxy.Handler.Tracing,
enable: enable_proxy_tracing?,
colour: false
else
config :electric, :telemetry, :disabled
Code.require_file("runtime.#{config_env()}.exs", __DIR__)
Expand Down
9 changes: 3 additions & 6 deletions components/electric/lib/electric/plug/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ defmodule Electric.Plug.Migrations do

alias Electric.Postgres.Extension.SchemaCache

import Plug.Conn

require Logger

plug(:match)
plug :match

plug(Plug.Parsers,
plug Plug.Parsers,
parsers: [:json],
pass: ["application/json"],
json_decoder: Jason
)

plug(:dispatch)
plug :dispatch

get "/" do
conn = fetch_query_params(conn)
Expand Down
62 changes: 62 additions & 0 deletions components/electric/lib/electric/plug/proxy_websocket_plug.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule Electric.Plug.ProxyWebsocketPlug do
@moduledoc """
This plug handles requests to establish a WebSocket connection to the migrations proxy.
On hosting providers that only allow inbound HTTP traffic on port 80 (and HTTPS traffic on port 443), Electric's proxy
cannot open its own dedicated port to listen on incoming TCP connections. This plug, together with the server
implementation in `Electric.Postgres.Proxy.WebsocketSerevr`, enables Electric to tunnel TCP traffic over the WebSocket
protocol provided that a matching tunnel is set up on the client side.
Note that unless the migrations proxy's port is configured with the special value "http", this plug will reject any
incoming requests with a 404 status code.
"""

@behaviour Plug

import Plug.Conn

alias Electric.Replication.Connectors

require Logger

def init(handler_opts), do: handler_opts

def call(conn, handler_opts) do
conn_config = conn_config()
proxy_config = Connectors.get_proxy_opts(conn_config)

if proxy_config.use_http_tunnel? do
upgrade_to_websocket(conn, Keyword.put_new(handler_opts, :proxy_config, proxy_config))
else
Logger.warning(
"Attempted WebSocket connection to the migrations proxy but it wasn't enabled."
)

send_resp(conn, 404, "Migrations proxy is not configured to accept WebSocket connections")
end
end

defp upgrade_to_websocket(conn, websocket_opts) do
with {:ok, conn} <- check_if_valid_upgrade(conn) do
conn
|> upgrade_adapter(
:websocket,
{Electric.Postgres.Proxy.WebsocketServer, websocket_opts, []}
)
else
{:error, code, body} ->
Logger.debug("Clients WebSocket connection failed with reason: #{body}")
send_resp(conn, code, body)
end
end

defp check_if_valid_upgrade(%Plug.Conn{} = conn) do
if Bandit.WebSocket.Handshake.valid_upgrade?(conn) do
{:ok, conn}
else
{:error, 400, "Bad request"}
end
end

defp conn_config, do: Electric.Application.pg_connection_opts()
end
3 changes: 1 addition & 2 deletions components/electric/lib/electric/plug/router.ex
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
defmodule Electric.Plug.Router do
use Plug.Router
import Plug.Conn
require Logger

plug :match
plug Plug.Logger
plug :dispatch

forward "/api/migrations", to: Electric.Plug.Migrations
forward "/api/status", to: Electric.Plug.Status
forward "/proxy", to: Electric.Plug.ProxyWebsocketPlug
forward "/ws", to: Electric.Plug.SatelliteWebsocketPlug

match _ do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
defmodule Electric.Plug.SatelliteWebsocketPlug do
@behaviour Plug

import Plug.Conn

require Logger
use Plug.Builder

@protocol_prefix "electric."

Expand Down
5 changes: 2 additions & 3 deletions components/electric/lib/electric/plug/status.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
defmodule Electric.Plug.Status do
use Plug.Router

alias Electric.Replication.PostgresConnector
alias Electric.Replication.PostgresConnectorMng

use Plug.Router
import Plug.Conn

plug :match
plug :dispatch

Expand Down
8 changes: 3 additions & 5 deletions components/electric/lib/electric/postgres/proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ defmodule Electric.Postgres.Proxy do
@spec child_spec(options()) :: Supervisor.child_spec()
def child_spec(args) do
{:ok, conn_config} = Keyword.fetch(args, :conn_config)
handler_config = Keyword.get(args, :handler_config, default_handler_config())

proxy_opts = Connectors.get_proxy_opts(conn_config)

{:ok, listen_opts} = Map.fetch(proxy_opts, :listen)

if !is_integer(listen_opts[:port]),
Expand All @@ -123,10 +121,10 @@ defmodule Electric.Postgres.Proxy do
# ThousandIsland.Logger.attach_logger(log_level)
# end

handler_state =
Handler.initial_state(conn_config, handler_config)
handler_config = Keyword.get(args, :handler_config, default_handler_config())
handler_state = Handler.initial_state(conn_config, handler_config)

Logger.info("Starting Proxy server listening at port #{listen_opts[:port]}")
Logger.info("Starting Proxy server listening on port #{listen_opts[:port]}")

ThousandIsland.child_spec(
Keyword.merge(listen_opts, handler_module: Handler, handler_options: handler_state)
Expand Down
Loading

0 comments on commit b57ec92

Please sign in to comment.