Skip to content

Commit

Permalink
Add support for pool_count (elixir-ecto#636)
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim authored and dkuku committed Nov 4, 2024
1 parent 51628ab commit 980af9c
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 61 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,16 @@ jobs:
- "11.11-alpine"
- "9.6-alpine"
- "9.5-alpine"
include:
- elixirbase: "1.11.4-erlang-23.3.4.9-alpine-3.16.9"
postgres: "16.2-alpine"
pool_count: "4"
steps:
- uses: earthly/actions-setup@v1
- uses: actions/checkout@v3
- name: test ecto_sql
env:
POOL_COUNT: ${{ matrix.pool_count || '1' }}
run: earthly -P --ci --build-arg ELIXIR_BASE=${{matrix.elixirbase}} --build-arg POSTGRES=${{matrix.postgres}} +integration-test-postgres

test-mysql:
Expand Down
15 changes: 9 additions & 6 deletions integration_test/myxql/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ Application.put_env(:ecto, :primary_key_type, :id)
Application.put_env(:ecto, :async_integration_tests, false)
Application.put_env(:ecto_sql, :lock_for_update, "FOR UPDATE")

Code.require_file "../support/repo.exs", __DIR__
Code.require_file("../support/repo.exs", __DIR__)

# Configure MySQL connection
Application.put_env(:ecto_sql, :mysql_test_url,
Application.put_env(
:ecto_sql,
:mysql_test_url,
"ecto://" <> (System.get_env("MYSQL_URL") || "root@127.0.0.1")
)

Expand Down Expand Up @@ -53,7 +55,8 @@ alias Ecto.Integration.PoolRepo
Application.put_env(:ecto_sql, PoolRepo,
adapter: Ecto.Adapters.MyXQL,
url: Application.get_env(:ecto_sql, :mysql_test_url) <> "/ecto_test",
pool_size: 10,
pool_size: 5,
pool_count: String.to_integer(System.get_env("POOL_COUNT", "1")),
show_sensitive_data_on_connection_error: true
)

Expand All @@ -63,8 +66,8 @@ end

# Load support files
ecto = Mix.Project.deps_paths()[:ecto]
Code.require_file "#{ecto}/integration_test/support/schemas.exs", __DIR__
Code.require_file "../support/migration.exs", __DIR__
Code.require_file("#{ecto}/integration_test/support/schemas.exs", __DIR__)
Code.require_file("../support/migration.exs", __DIR__)

defmodule Ecto.Integration.Case do
use ExUnit.CaseTemplate
Expand All @@ -77,7 +80,7 @@ end
{:ok, _} = Ecto.Adapters.MyXQL.ensure_all_started(TestRepo.config(), :temporary)

# Load up the repository, start it, and run migrations
_ = Ecto.Adapters.MyXQL.storage_down(TestRepo.config())
_ = Ecto.Adapters.MyXQL.storage_down(TestRepo.config())
:ok = Ecto.Adapters.MyXQL.storage_up(TestRepo.config())

{:ok, _pid} = TestRepo.start_link()
Expand Down
33 changes: 22 additions & 11 deletions integration_test/pg/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ Application.put_env(:ecto, :async_integration_tests, true)
Application.put_env(:ecto_sql, :lock_for_update, "FOR UPDATE")

# Configure PG connection
Application.put_env(:ecto_sql, :pg_test_url,
Application.put_env(
:ecto_sql,
:pg_test_url,
"ecto://" <> (System.get_env("PG_URL") || "postgres:postgres@127.0.0.1")
)

Code.require_file "../support/repo.exs", __DIR__
Code.require_file("../support/repo.exs", __DIR__)

# Define type module
opts = if Code.ensure_loaded?(Duration), do: [interval_decode_type: Duration], else: []
Expand Down Expand Up @@ -59,21 +61,28 @@ end

pool_repo_config = [
url: Application.get_env(:ecto_sql, :pg_test_url) <> "/ecto_test",
pool_size: 10,
pool_size: 5,
pool_count: String.to_integer(System.get_env("POOL_COUNT", "1")),
max_restarts: 20,
max_seconds: 10
]

Application.put_env(:ecto_sql, PoolRepo, pool_repo_config)
Application.put_env(:ecto_sql, AdvisoryLockPoolRepo, pool_repo_config ++ [
migration_source: "advisory_lock_schema_migrations",
migration_lock: :pg_advisory_lock
])

Application.put_env(
:ecto_sql,
AdvisoryLockPoolRepo,
pool_repo_config ++
[
migration_source: "advisory_lock_schema_migrations",
migration_lock: :pg_advisory_lock
]
)

# Load support files
ecto = Mix.Project.deps_paths()[:ecto]
Code.require_file "#{ecto}/integration_test/support/schemas.exs", __DIR__
Code.require_file "../support/migration.exs", __DIR__
Code.require_file("#{ecto}/integration_test/support/schemas.exs", __DIR__)
Code.require_file("../support/migration.exs", __DIR__)

defmodule Ecto.Integration.Case do
use ExUnit.CaseTemplate
Expand All @@ -86,7 +95,7 @@ end
{:ok, _} = Ecto.Adapters.Postgres.ensure_all_started(TestRepo.config(), :temporary)

# Load up the repository, start it, and run migrations
_ = Ecto.Adapters.Postgres.storage_down(TestRepo.config())
_ = Ecto.Adapters.Postgres.storage_down(TestRepo.config())
:ok = Ecto.Adapters.Postgres.storage_up(TestRepo.config())

{:ok, _pid} = TestRepo.start_link()
Expand All @@ -112,7 +121,9 @@ exclude_list = excludes ++ excludes_above_9_5

cond do
Version.match?(version, "< 9.6.0") ->
ExUnit.configure(exclude: exclude_list ++ excludes_below_9_6 ++ excludes_below_12_0 ++ excludes_below_15_0)
ExUnit.configure(
exclude: exclude_list ++ excludes_below_9_6 ++ excludes_below_12_0 ++ excludes_below_15_0
)

Version.match?(version, "< 12.0.0") ->
ExUnit.configure(exclude: exclude_list ++ excludes_below_12_0 ++ excludes_below_15_0)
Expand Down
115 changes: 71 additions & 44 deletions lib/ecto/adapters/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,22 @@ defmodule Ecto.Adapters.SQL do
disconnect_all(Ecto.Adapter.lookup_meta(repo), interval, opts)
end

def disconnect_all(%{pid: pid} = _adapter_meta, interval, opts) do
DBConnection.disconnect_all(pid, interval, opts)
def disconnect_all(adapter_meta, interval, opts) do
case adapter_meta do
%{partition_supervisor: {name, count}} ->
1..count
|> Enum.map(fn i ->
Task.async(fn ->
DBConnection.disconnect_all({:via, PartitionSupervisor, {name, i}}, interval, opts)
end)
end)
|> Task.await_many(:infinity)

:ok

%{pid: pool} ->
DBConnection.disconnect_all(pool, interval, opts)
end
end

@doc """
Expand Down Expand Up @@ -646,7 +660,7 @@ defmodule Ecto.Adapters.SQL do

defp sql_call(adapter_meta, callback, args, params, opts) do
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
conn = get_conn_or_pool(pool)
conn = get_conn_or_pool(pool, adapter_meta)
opts = with_log(telemetry, params, opts ++ default_opts)
args = args ++ [params, opts]
apply(sql, callback, [conn | args])
Expand All @@ -662,7 +676,7 @@ defmodule Ecto.Adapters.SQL do
end

@doc """
Check if the given `table` exists.
Checks if the given `table` exists.
Returns `true` if the `table` exists in the `repo`, otherwise `false`.
The table is checked against the current database/schema in the connection.
Expand Down Expand Up @@ -702,7 +716,7 @@ defmodule Ecto.Adapters.SQL do
def format_table(%{columns: columns, rows: rows}) do
column_widths =
[columns | rows]
|> List.zip()
|> Enum.zip()
|> Enum.map(&Tuple.to_list/1)
|> Enum.map(fn column_with_rows ->
column_with_rows |> Enum.map(&binary_length/1) |> Enum.max()
Expand Down Expand Up @@ -733,7 +747,7 @@ defmodule Ecto.Adapters.SQL do
defp cells(items, widths) do
cell =
[items, widths]
|> List.zip()
|> Enum.zip()
|> Enum.map(fn {item, width} -> [?|, " ", format_item(item, width), " "] end)

[cell | [?|]]
Expand Down Expand Up @@ -827,6 +841,8 @@ defmodule Ecto.Adapters.SQL do
@pool_opts [:timeout, :pool, :pool_size] ++
[:queue_target, :queue_interval, :ownership_timeout, :repo]

@valid_log_levels ~w(false debug info notice warning error critical alert emergency)a

@doc false
def init(connection, driver, config) do
unless Code.ensure_loaded?(connection) do
Expand All @@ -845,24 +861,12 @@ defmodule Ecto.Adapters.SQL do

log = Keyword.get(config, :log, :debug)

valid_log_levels = [
false,
:debug,
:info,
:notice,
:warning,
:error,
:critical,
:alert,
:emergency
]

if log not in valid_log_levels do
if log not in @valid_log_levels do
raise """
invalid value for :log option in Repo config
The accepted values for the :log option are:
#{Enum.map_join(valid_log_levels, ", ", &inspect/1)}
#{Enum.map_join(@valid_log_levels, ", ", &inspect/1)}
See https://hexdocs.pm/ecto/Ecto.Repo.html for more information.
"""
Expand All @@ -872,35 +876,49 @@ defmodule Ecto.Adapters.SQL do
telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}

config = adapter_config(config)
opts = Keyword.take(config, @pool_opts)
meta = %{telemetry: telemetry, sql: connection, stacktrace: stacktrace, opts: opts}
{:ok, connection.child_spec(config), meta}
end
{name, config} = Keyword.pop(config, :name, config[:repo])
{pool_count, config} = Keyword.pop(config, :pool_count, 1)
{pool, config} = pool_config(config)
child_spec = connection.child_spec(config)

defp adapter_config(config) do
if Keyword.has_key?(config, :pool_timeout) do
message = """
:pool_timeout option no longer has an effect and has been replaced with an improved queuing system.
See \"Queue config\" in DBConnection.start_link/2 documentation for more information.
"""
meta = %{
telemetry: telemetry,
sql: connection,
stacktrace: stacktrace,
opts: Keyword.take(config, @pool_opts)
}

IO.warn(message)
end
if pool_count > 1 do
if name == nil do
raise ArgumentError, "the option :pool_count requires a :name"
end

config
|> Keyword.delete(:name)
|> Keyword.update(:pool, DBConnection.ConnectionPool, &normalize_pool/1)
end
if pool == DBConnection.Ownership do
raise ArgumentError, "the option :pool_count does not work with the SQL sandbox"
end

defp normalize_pool(pool) do
if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
DBConnection.Ownership
name = Module.concat(name, PartitionSupervisor)
partition_opts = [name: name, child_spec: child_spec, partitions: pool_count]
child_spec = Supervisor.child_spec({PartitionSupervisor, partition_opts}, [])
{:ok, child_spec, Map.put(meta, :partition_supervisor, {name, pool_count})}
else
pool
{:ok, child_spec, meta}
end
end

defp pool_config(config) do
{pool, config} = Keyword.pop(config, :pool, DBConnection.ConnectionPool)

pool =
if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
DBConnection.Ownership
else
pool
end

{pool, [pool: pool] ++ config}
end

@doc false
def checkout(adapter_meta, opts, callback) do
checkout_or_transaction(:run, adapter_meta, opts, callback)
Expand Down Expand Up @@ -1385,11 +1403,20 @@ defmodule Ecto.Adapters.SQL do
end
end

apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts])
apply(DBConnection, fun, [get_conn_or_pool(pool, adapter_meta), callback, opts])
end

defp get_conn_or_pool(pool) do
Process.get(key(pool), pool)
defp get_conn_or_pool(pool, adapter_meta) do
case :erlang.get(key(pool)) do
:undefined ->
case adapter_meta do
%{partition_supervisor: {name, _}} -> {:via, PartitionSupervisor, {name, self()}}
_ -> pool
end

conn ->
conn
end
end

defp get_conn(pool) do
Expand Down

0 comments on commit 980af9c

Please sign in to comment.