Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (sync service): multi tenancy #1886

Draft
wants to merge 44 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
a7635b4
Add TenantManager that spawns the necessary processes per tenant.
kevin-dp Oct 3, 2024
5d043dd
Initialize separate storage backend for each tenant
alco Oct 17, 2024
bdeaa7d
Fix tests
alco Oct 17, 2024
b5e82c3
mix format
alco Oct 17, 2024
411920d
Fix missing :storage and :tenant_id options where needed
alco Oct 17, 2024
a474bb2
Support configuring a default tenant when running in any environment,…
alco Oct 17, 2024
36706cc
Use default tenant for TS integration tests on CI
alco Oct 17, 2024
fa9a30f
Adjust the health check endpoint for CI
alco Oct 17, 2024
0ec2d47
fixup! Support configuring a default tenant when running in any envir…
alco Oct 17, 2024
967eda1
(tmp) Load requested or default tenant in DeleteShapePlug
alco Oct 17, 2024
73a97d1
Add tenant id to .env.dev
msfstef Oct 21, 2024
92cffbb
Fix typo in options passed to plug.
kevin-dp Oct 21, 2024
3635359
Remove commented tests because they are either obsolete or included i…
kevin-dp Oct 22, 2024
3e56275
Fix unit tests.
kevin-dp Oct 23, 2024
b3c1b6d
Make default tenant optional
kevin-dp Oct 24, 2024
d3e576a
rename TENANT_ID env var to DATABASE_ID
kevin-dp Oct 24, 2024
236416f
Modify health check endpoint to take a database_id
kevin-dp Oct 24, 2024
0164741
Restore allow_shape_deletion config option needed for delete shape plug.
kevin-dp Oct 24, 2024
8658cde
Fix react hooks test
kevin-dp Oct 24, 2024
59ac1e5
Fix rolling deploy integration test
kevin-dp Oct 24, 2024
57bb217
Unit tests for add database plug.
kevin-dp Oct 28, 2024
ae23e4e
Add optional databaseId parameter the ShapeStream for selecting a DB …
kevin-dp Oct 28, 2024
9039c2a
Integration test for multi tenancy
kevin-dp Oct 28, 2024
1d270ec
Fix error message in delete shape plug
kevin-dp Oct 28, 2024
9908d56
Plug for deleting a tenant
kevin-dp Oct 28, 2024
64977ae
Fix clearShape in test setup
kevin-dp Oct 28, 2024
c20052c
Do not take the tenant ID of the 2nd tenant for tge integration test …
kevin-dp Oct 28, 2024
944a136
Rename delete DB plug to remove DB plug.
kevin-dp Oct 28, 2024
1a4911a
Unit tests for remove DB plug
kevin-dp Oct 28, 2024
d65aa58
Remove obsolete comment
kevin-dp Oct 28, 2024
b39272d
Extract duplicated functions for loading tenant to utility module.
kevin-dp Oct 29, 2024
f41a678
Return 404 if tenant is not found
kevin-dp Oct 29, 2024
f2e8bb0
Update OpenAPI spec
kevin-dp Oct 29, 2024
f797d47
Rename id parameter to database_id in add DB plug
kevin-dp Oct 29, 2024
a33d72b
Disable file parallelism in vitest to avoid flaky tests due to some u…
kevin-dp Oct 29, 2024
c068bb3
Handle failure to parse connection string
kevin-dp Oct 29, 2024
02bc39b
Store references to per-tenant ETS tables in a global ETS table.
kevin-dp Oct 30, 2024
305a3c3
WIP shutting down
kevin-dp Oct 30, 2024
1fdcb2a
Reverse error messages
msfstef Oct 30, 2024
cdda115
Shutdown tenant processes and clean up ETS table when tenant is deleted.
kevin-dp Oct 30, 2024
ed2dbd9
Introduce a with_supervised_tenant in component_setup.
kevin-dp Oct 31, 2024
fc9ec7c
Rebase on top of main
kevin-dp Oct 31, 2024
de5360f
Pass tenant_id option to stop_tenant
kevin-dp Oct 31, 2024
6fd3b59
WIP tenant persistence
kevin-dp Oct 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ts_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ jobs:
defaults:
run:
working-directory: ${{ matrix.package_dir }}
env:
DATABASE_ID: ci_test_tenant
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
Expand Down Expand Up @@ -111,7 +113,7 @@ jobs:
mix run --no-halt &

wait-on: |
http-get://localhost:3000/v1/health
http-get://localhost:3000/v1/health?database_id=${{ env.DATABASE_ID }}

tail: true
log-output-resume: stderr
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/_macros.luxinc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
[shell $shell_name]
-$fail_pattern

!DATABASE_URL=$database_url PORT=$port $env ../scripts/electric_dev.sh
!DATABASE_ID=integration_test_tenant DATABASE_URL=$database_url PORT=$port $env ../scripts/electric_dev.sh
[endmacro]

[macro teardown]
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

[my invalidated_slot_error=
"""
[error] GenServer Electric.Connection.Manager terminating
[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default, "integration_test_tenant"}} terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_integration"

This slot has been invalidated because it exceeded the maximum reserved size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@
?Txn received in Shapes.Consumer: %Electric.Replication.Changes.Transaction{xid: $xid

# Both consumers hit their call limit and exit with simulated storage failures.
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "[0-9-]+"}} terminating
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "integration_test_tenant", "[0-9-]+"}} terminating
??Simulated storage failure
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "[0-9-]+"}} terminating
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "integration_test_tenant", "[0-9-]+"}} terminating
??Simulated storage failure

# The log collector process and the replication client both exit, as their lifetimes are tied
# together by the supervision tree design.
??[error] GenServer {Electric.Registry.Processes, {Electric.Replication.ShapeLogCollector, :default}} terminating
??[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default}} terminating
??[error] GenServer {Electric.Registry.Processes, {Electric.Replication.ShapeLogCollector, :default, "integration_test_tenant"}} terminating
??[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default, "integration_test_tenant"}} terminating

# Observe that both shape consumers and the replication client have restarted.
??[debug] Found existing replication slot
Expand Down
8 changes: 4 additions & 4 deletions integration-tests/tests/rolling-deploy.lux
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# First service should be health and active
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
!curl -X GET http://localhost:3000/v1/health?database_id=integration_test_tenant
??{"status":"active"}

## Start the second sync service.
Expand All @@ -35,9 +35,9 @@

# Second service should be in waiting state, ready to take over
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
!curl -X GET http://localhost:3000/v1/health?database_id=integration_test_tenant
??{"status":"active"}
!curl -X GET http://localhost:3001/v1/health
!curl -X GET http://localhost:3001/v1/health?database_id=integration_test_tenant
??{"status":"waiting"}

## Terminate first electric
Expand All @@ -55,7 +55,7 @@

# Second service is now healthy and active
[shell orchestator]
!curl -X GET http://localhost:3001/v1/health
!curl -X GET http://localhost:3001/v1/health?database_id=integration_test_tenant
??{"status":"active"}

[cleanup]
Expand Down
3 changes: 2 additions & 1 deletion packages/react-hooks/test/support/global-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { makePgClient } from './test-helpers'

const url = process.env.ELECTRIC_URL ?? `http://localhost:3000`
const proxyUrl = process.env.ELECTRIC_PROXY_CACHE_URL ?? `http://localhost:3002`
const databaseId = process.env.DATABASE_ID ?? `test_tenant`

// name of proxy cache container to execute commands against,
// see docker-compose.yml that spins it up for details
Expand All @@ -29,7 +30,7 @@ function waitForElectric(url: string): Promise<void> {
)

const tryHealth = async () =>
fetch(`${url}/v1/health`)
fetch(`${url}/v1/health?database_id=${databaseId}`)
.then(async (res): Promise<void> => {
if (!res.ok) return tryHealth()
const { status } = (await res.json()) as { status: string }
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ CACHE_MAX_AGE=1
CACHE_STALE_AGE=3
# using a small chunk size of 10kB for dev to speed up tests
LOG_CHUNK_BYTES_THRESHOLD=10000
DATABASE_ID=test_tenant
# configuring a second database for multi-tenancy integration testing
OTHER_DATABASE_URL=postgresql://postgres:password@localhost:54322/electric?sslmode=disable
3 changes: 3 additions & 0 deletions packages/sync-service/.env.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
LOG_LEVEL=info
DATABASE_URL=postgresql://postgres:password@localhost:54321/postgres?sslmode=disable
DATABASE_ID=test_tenant
44 changes: 24 additions & 20 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ end
# handle_sasl_reports: true

if config_env() == :test do
config(:logger, level: :info)
config(:electric, pg_version_for_tests: env!("POSTGRES_VERSION", :integer, 150_001))
config :electric, pg_version_for_tests: env!("POSTGRES_VERSION", :integer, 150_001)
end

electric_instance_id = :default
Expand Down Expand Up @@ -81,28 +80,32 @@ otel_simple_processor =
config :opentelemetry,
processors: [otel_batch_processor, otel_simple_processor] |> Enum.reject(&is_nil/1)

connection_opts =
if Config.config_env() == :test do
[
hostname: "localhost",
port: 54321,
username: "postgres",
password: "password",
database: "postgres",
sslmode: :disable
]
else
{:ok, database_url_config} =
env!("DATABASE_URL", :string)
|> Electric.ConfigParser.parse_postgresql_uri()
database_url = env!("DATABASE_URL", :string, nil)
default_tenant = env!("DATABASE_ID", :string, nil)

case {database_url, default_tenant} do
{nil, nil} ->
# No default tenant provided
:ok

{nil, _} ->
raise "DATABASE_URL must be provided when DATABASE_ID is set"

{_, nil} ->
raise "DATABASE_ID must be provided when DATABASE_URL is set"

{_, _} ->
# A default tenant is provided
{:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url)

database_ipv6_config =
env!("DATABASE_USE_IPV6", :boolean, false)

database_url_config ++ [ipv6: database_ipv6_config]
end
connection_opts = database_url_config ++ [ipv6: database_ipv6_config]

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)
config :electric, default_connection_opts: Electric.Utils.obfuscate_password(connection_opts)
config :electric, default_tenant: default_tenant
end

enable_integration_testing = env!("ENABLE_INTEGRATION_TESTING", :boolean, false)
cache_max_age = env!("CACHE_MAX_AGE", :integer, 60)
Expand Down Expand Up @@ -201,4 +204,5 @@ config :electric,
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv,
listen_on_ipv6?: env!("LISTEN_ON_IPV6", :boolean, false)
listen_on_ipv6?: env!("LISTEN_ON_IPV6", :boolean, false),
tenant_tables_name: :tenant_tables
18 changes: 18 additions & 0 deletions packages/sync-service/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ services:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf
postgres2:
image: postgres:16-alpine
environment:
POSTGRES_DB: electric
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "54322:5433"
volumes:
- ./postgres2.conf:/etc/postgresql.conf:ro
- ./init.sql:/docker-entrypoint-initdb.d/00_shared_init.sql:ro
tmpfs:
- /var/lib/postgresql/data
- /tmp
entrypoint:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf
nginx:
image: nginx:latest
ports:
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/dev/postgres2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
listen_addresses = '*'
wal_level = logical # minimal, replica, or logical
port = 5433
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: why do you need to change the internal listening port instead of using existing pg conf? The PGs are always different either on mapped port from the POV of host machine, or on hostname from POV of other docker containers in the compose file

107 changes: 33 additions & 74 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
defmodule Electric.Application do
use Application
require Config

@process_registry_name Electric.Registry.Processes
def process_registry, do: @process_registry_name

@spec process_name(atom(), atom()) :: {:via, atom(), atom()}
def process_name(electric_instance_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id}}}
@spec process_name(atom(), String.t(), atom()) :: {:via, atom(), atom()}
def process_name(electric_instance_id, tenant_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id}}}
end

@spec process_name(atom(), atom(), term()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, module, id) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, id}}}
@spec process_name(atom(), String.t(), atom(), term()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, tenant_id, module, id) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id, id}}}
end

@impl true
Expand All @@ -20,27 +21,14 @@ defmodule Electric.Application do

config = configure()

shape_log_collector = Electric.Replication.ShapeLogCollector.name(config.electric_instance_id)
tenant_id = Application.get_env(:electric, :default_tenant)
tenant_opts = [electric_instance_id: config.electric_instance_id]

connection_manager_opts = [
router_opts = [
electric_instance_id: config.electric_instance_id,
connection_opts: config.connection_opts,
replication_opts: [
publication_name: config.replication_opts.publication_name,
try_creating_publication?: true,
slot_name: config.replication_opts.slot_name,
slot_temporary?: config.replication_opts.slot_temporary?,
transaction_received:
{Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]},
relation_received:
{Electric.Replication.ShapeLogCollector, :handle_relation_msg, [shape_log_collector]}
],
pool_opts: [
name: Electric.DbPool,
pool_size: config.pool_opts.size,
types: PgInterop.Postgrex.Types
],
persistent_kv: config.persistent_kv
tenant_manager: Electric.TenantManager.name(tenant_opts),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false),
registry: @process_registry_name
]

# The root application supervisor starts the core global processes, including the HTTP
Expand All @@ -61,41 +49,38 @@ defmodule Electric.Application do
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
{Registry,
name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
{Electric.Postgres.Inspector.EtsInspector, pool: Electric.DbPool},
Electric.TenantSupervisor,
{Electric.TenantManager, router_opts},
{Bandit,
plug:
{Electric.Plug.Router,
storage: config.storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, config.shape_cache_opts},
get_service_status: &Electric.ServiceStatus.check/0,
inspector: config.inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
plug: {Electric.Plug.Router, router_opts},
port: Application.fetch_env!(:electric, :service_port),
thousand_island_options: http_listener_options()}
],
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port)),
[{Electric.Connection.Supervisor, connection_manager_opts}]
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port))
])

Supervisor.start_link(children,
strategy: :one_for_one,
name: Electric.Supervisor
)
{:ok, sup_pid} =
Supervisor.start_link(children,
strategy: :one_for_one,
name: Electric.Supervisor
)

if tenant_id do
connection_opts = Application.fetch_env!(:electric, :default_connection_opts)
Electric.TenantManager.create_tenant(tenant_id, connection_opts, tenant_opts)
end

{:ok, sup_pid}
end

# This function is called once in the application's start() callback. It reads configuration
# from the OTP application env, runs some pre-processing functions and stores the processed
# configuration as a single map using `:persistent_term`.
defp configure do
electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)
tenant_tables_name = Application.fetch_env!(:electric, :tenant_tables_name)
:ets.new(tenant_tables_name, [:public, :named_table, :set, {:read_concurrency, true}])

{storage_module, storage_in_opts} = Application.fetch_env!(:electric, :storage)
storage_opts = storage_module.shared_opts(storage_in_opts)
storage = {storage_module, storage_opts}
electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)

{kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv)
persistent_kv = apply(kv_module, kv_fun, [kv_params])
Expand All @@ -105,33 +90,9 @@ defmodule Electric.Application do
slot_name = "electric_slot_#{replication_stream_id}"
slot_temporary? = Application.get_env(:electric, :replication_slot_temporary?, false)

get_pg_version_fn = fn ->
Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager)
end

prepare_tables_mfa =
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version_fn, publication_name]}

inspector =
{Electric.Postgres.Inspector.EtsInspector, server: Electric.Postgres.Inspector.EtsInspector}

shape_cache_opts = [
electric_instance_id: electric_instance_id,
storage: storage,
inspector: inspector,
prepare_tables_fn: prepare_tables_mfa,
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold),
log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id),
consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id),
registry: Registry.ShapeChanges
]

config = %Electric.Application.Configuration{
electric_instance_id: electric_instance_id,
storage: storage,
persistent_kv: persistent_kv,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
replication_opts: %{
stream_id: replication_stream_id,
publication_name: publication_name,
Expand All @@ -140,9 +101,7 @@ defmodule Electric.Application do
},
pool_opts: %{
size: Application.fetch_env!(:electric, :db_pool_size)
},
inspector: inspector,
shape_cache_opts: shape_cache_opts
}
}

Electric.Application.Configuration.save(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ defmodule Electric.Application.Configuration do

defstruct ~w[
electric_instance_id
storage
persistent_kv
connection_opts
replication_opts
pool_opts
inspector
shape_cache_opts
]a

@type t :: %__MODULE__{}
Expand Down
Loading
Loading