From a736f28b97691c50eb6524bc4da840510debefdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Sun, 6 Oct 2024 23:39:56 +0200 Subject: [PATCH 1/2] Add support for pool_count --- .github/workflows/ci.yml | 6 ++ integration_test/myxql/test_helper.exs | 15 ++-- integration_test/pg/test_helper.exs | 33 +++++--- lib/ecto/adapters/sql.ex | 113 +++++++++++++++---------- 4 files changed, 105 insertions(+), 62 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b5b76be8..b63ce299 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,10 +58,16 @@ jobs: - "11.11-alpine" - "9.6-alpine" - "9.5-alpine" + include: + - elixirbase: "1.11.4-erlang-23.3.4.9-alpine-3.16.9" + postgres: "16.2-alpine" + pool_count: "4" steps: - uses: earthly/actions-setup@v1 - uses: actions/checkout@v3 - name: test ecto_sql + env: + POOL_COUNT: ${{ matrix.pool_count || '1' }} run: earthly -P --ci --build-arg ELIXIR_BASE=${{matrix.elixirbase}} --build-arg POSTGRES=${{matrix.postgres}} +integration-test-postgres test-mysql: diff --git a/integration_test/myxql/test_helper.exs b/integration_test/myxql/test_helper.exs index fafeb700..922f4d68 100644 --- a/integration_test/myxql/test_helper.exs +++ b/integration_test/myxql/test_helper.exs @@ -5,10 +5,12 @@ Application.put_env(:ecto, :primary_key_type, :id) Application.put_env(:ecto, :async_integration_tests, false) Application.put_env(:ecto_sql, :lock_for_update, "FOR UPDATE") -Code.require_file "../support/repo.exs", __DIR__ +Code.require_file("../support/repo.exs", __DIR__) # Configure MySQL connection -Application.put_env(:ecto_sql, :mysql_test_url, +Application.put_env( + :ecto_sql, + :mysql_test_url, "ecto://" <> (System.get_env("MYSQL_URL") || "root@127.0.0.1") ) @@ -53,7 +55,8 @@ alias Ecto.Integration.PoolRepo Application.put_env(:ecto_sql, PoolRepo, adapter: Ecto.Adapters.MyXQL, url: Application.get_env(:ecto_sql, :mysql_test_url) <> "/ecto_test", - pool_size: 10, + pool_size: 5, + pool_count: String.to_integer(System.get_env("POOL_COUNT", "1")), show_sensitive_data_on_connection_error: true ) @@ -63,8 +66,8 @@ end # Load support files ecto = Mix.Project.deps_paths()[:ecto] -Code.require_file "#{ecto}/integration_test/support/schemas.exs", __DIR__ -Code.require_file "../support/migration.exs", __DIR__ +Code.require_file("#{ecto}/integration_test/support/schemas.exs", __DIR__) +Code.require_file("../support/migration.exs", __DIR__) defmodule Ecto.Integration.Case do use ExUnit.CaseTemplate @@ -77,7 +80,7 @@ end {:ok, _} = Ecto.Adapters.MyXQL.ensure_all_started(TestRepo.config(), :temporary) # Load up the repository, start it, and run migrations -_ = Ecto.Adapters.MyXQL.storage_down(TestRepo.config()) +_ = Ecto.Adapters.MyXQL.storage_down(TestRepo.config()) :ok = Ecto.Adapters.MyXQL.storage_up(TestRepo.config()) {:ok, _pid} = TestRepo.start_link() diff --git a/integration_test/pg/test_helper.exs b/integration_test/pg/test_helper.exs index 0a21ace3..76145740 100644 --- a/integration_test/pg/test_helper.exs +++ b/integration_test/pg/test_helper.exs @@ -6,11 +6,13 @@ Application.put_env(:ecto, :async_integration_tests, true) Application.put_env(:ecto_sql, :lock_for_update, "FOR UPDATE") # Configure PG connection -Application.put_env(:ecto_sql, :pg_test_url, +Application.put_env( + :ecto_sql, + :pg_test_url, "ecto://" <> (System.get_env("PG_URL") || "postgres:postgres@127.0.0.1") ) -Code.require_file "../support/repo.exs", __DIR__ +Code.require_file("../support/repo.exs", __DIR__) # Define type module opts = if Code.ensure_loaded?(Duration), do: [interval_decode_type: Duration], else: [] @@ -59,21 +61,28 @@ end pool_repo_config = [ url: Application.get_env(:ecto_sql, :pg_test_url) <> "/ecto_test", - pool_size: 10, + pool_size: 5, + pool_count: String.to_integer(System.get_env("POOL_COUNT", "1")), max_restarts: 20, max_seconds: 10 ] Application.put_env(:ecto_sql, PoolRepo, pool_repo_config) -Application.put_env(:ecto_sql, AdvisoryLockPoolRepo, pool_repo_config ++ [ - migration_source: "advisory_lock_schema_migrations", - migration_lock: :pg_advisory_lock -]) + +Application.put_env( + :ecto_sql, + AdvisoryLockPoolRepo, + pool_repo_config ++ + [ + migration_source: "advisory_lock_schema_migrations", + migration_lock: :pg_advisory_lock + ] +) # Load support files ecto = Mix.Project.deps_paths()[:ecto] -Code.require_file "#{ecto}/integration_test/support/schemas.exs", __DIR__ -Code.require_file "../support/migration.exs", __DIR__ +Code.require_file("#{ecto}/integration_test/support/schemas.exs", __DIR__) +Code.require_file("../support/migration.exs", __DIR__) defmodule Ecto.Integration.Case do use ExUnit.CaseTemplate @@ -86,7 +95,7 @@ end {:ok, _} = Ecto.Adapters.Postgres.ensure_all_started(TestRepo.config(), :temporary) # Load up the repository, start it, and run migrations -_ = Ecto.Adapters.Postgres.storage_down(TestRepo.config()) +_ = Ecto.Adapters.Postgres.storage_down(TestRepo.config()) :ok = Ecto.Adapters.Postgres.storage_up(TestRepo.config()) {:ok, _pid} = TestRepo.start_link() @@ -112,7 +121,9 @@ exclude_list = excludes ++ excludes_above_9_5 cond do Version.match?(version, "< 9.6.0") -> - ExUnit.configure(exclude: exclude_list ++ excludes_below_9_6 ++ excludes_below_12_0 ++ excludes_below_15_0) + ExUnit.configure( + exclude: exclude_list ++ excludes_below_9_6 ++ excludes_below_12_0 ++ excludes_below_15_0 + ) Version.match?(version, "< 12.0.0") -> ExUnit.configure(exclude: exclude_list ++ excludes_below_12_0 ++ excludes_below_15_0) diff --git a/lib/ecto/adapters/sql.ex b/lib/ecto/adapters/sql.ex index 96ff0e59..50552342 100644 --- a/lib/ecto/adapters/sql.ex +++ b/lib/ecto/adapters/sql.ex @@ -480,8 +480,22 @@ defmodule Ecto.Adapters.SQL do disconnect_all(Ecto.Adapter.lookup_meta(repo), interval, opts) end - def disconnect_all(%{pid: pid} = _adapter_meta, interval, opts) do - DBConnection.disconnect_all(pid, interval, opts) + def disconnect_all(adapter_meta, interval, opts) do + case adapter_meta do + %{partition_supervisor: {name, count}} -> + 1..count + |> Enum.map(fn i -> + Task.async(fn -> + DBConnection.disconnect_all({:via, PartitionSupervisor, {name, i}}, interval, opts) + end) + end) + |> Task.await_many(:infinity) + + :ok + + %{pid: pool} -> + DBConnection.disconnect_all(pool, interval, opts) + end end @doc """ @@ -646,7 +660,7 @@ defmodule Ecto.Adapters.SQL do defp sql_call(adapter_meta, callback, args, params, opts) do %{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta - conn = get_conn_or_pool(pool) + conn = get_conn_or_pool(pool, adapter_meta) opts = with_log(telemetry, params, opts ++ default_opts) args = args ++ [params, opts] apply(sql, callback, [conn | args]) @@ -662,7 +676,7 @@ defmodule Ecto.Adapters.SQL do end @doc """ - Check if the given `table` exists. + Checks if the given `table` exists. Returns `true` if the `table` exists in the `repo`, otherwise `false`. The table is checked against the current database/schema in the connection. @@ -702,7 +716,7 @@ defmodule Ecto.Adapters.SQL do def format_table(%{columns: columns, rows: rows}) do column_widths = [columns | rows] - |> List.zip() + |> Enum.zip() |> Enum.map(&Tuple.to_list/1) |> Enum.map(fn column_with_rows -> column_with_rows |> Enum.map(&binary_length/1) |> Enum.max() @@ -733,7 +747,7 @@ defmodule Ecto.Adapters.SQL do defp cells(items, widths) do cell = [items, widths] - |> List.zip() + |> Enum.zip() |> Enum.map(fn {item, width} -> [?|, " ", format_item(item, width), " "] end) [cell | [?|]] @@ -827,6 +841,8 @@ defmodule Ecto.Adapters.SQL do @pool_opts [:timeout, :pool, :pool_size] ++ [:queue_target, :queue_interval, :ownership_timeout, :repo] + @valid_log_levels ~w(false debug info notice warning error critical alert emergency)a + @doc false def init(connection, driver, config) do unless Code.ensure_loaded?(connection) do @@ -845,24 +861,12 @@ defmodule Ecto.Adapters.SQL do log = Keyword.get(config, :log, :debug) - valid_log_levels = [ - false, - :debug, - :info, - :notice, - :warning, - :error, - :critical, - :alert, - :emergency - ] - - if log not in valid_log_levels do + if log not in @valid_log_levels do raise """ invalid value for :log option in Repo config The accepted values for the :log option are: - #{Enum.map_join(valid_log_levels, ", ", &inspect/1)} + #{Enum.map_join(@valid_log_levels, ", ", &inspect/1)} See https://hexdocs.pm/ecto/Ecto.Repo.html for more information. """ @@ -872,35 +876,45 @@ defmodule Ecto.Adapters.SQL do telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix) telemetry = {config[:repo], log, telemetry_prefix ++ [:query]} - config = adapter_config(config) - opts = Keyword.take(config, @pool_opts) - meta = %{telemetry: telemetry, sql: connection, stacktrace: stacktrace, opts: opts} - {:ok, connection.child_spec(config), meta} - end - - defp adapter_config(config) do - if Keyword.has_key?(config, :pool_timeout) do - message = """ - :pool_timeout option no longer has an effect and has been replaced with an improved queuing system. - See \"Queue config\" in DBConnection.start_link/2 documentation for more information. - """ + {name, config} = Keyword.pop(config, :name, config[:repo]) + {pool_count, config} = Keyword.pop(config, :pool_count, 1) + {pool, config} = pool_config(config) + child_spec = connection.child_spec(config) - IO.warn(message) - end + meta = %{ + telemetry: telemetry, + sql: connection, + stacktrace: stacktrace, + opts: Keyword.take(config, @pool_opts) + } - config - |> Keyword.delete(:name) - |> Keyword.update(:pool, DBConnection.ConnectionPool, &normalize_pool/1) - end + if pool_count > 1 and pool != DBConnection.Ownership do + if name == nil do + raise ArgumentError, "the option :pool_count requires a :name" + end - defp normalize_pool(pool) do - if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do - DBConnection.Ownership + name = Module.concat(name, PartitionSupervisor) + partition_opts = [name: name, child_spec: child_spec, partitions: pool_count] + child_spec = Supervisor.child_spec({PartitionSupervisor, partition_opts}, []) + {:ok, child_spec, Map.put(meta, :partition_supervisor, {name, pool_count})} else - pool + {:ok, child_spec, meta} end end + defp pool_config(config) do + {pool, config} = Keyword.pop(config, :pool, DBConnection.ConnectionPool) + + pool = + if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do + DBConnection.Ownership + else + pool + end + + {pool, [pool: pool] ++ config} + end + @doc false def checkout(adapter_meta, opts, callback) do checkout_or_transaction(:run, adapter_meta, opts, callback) @@ -1385,11 +1399,20 @@ defmodule Ecto.Adapters.SQL do end end - apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts]) + apply(DBConnection, fun, [get_conn_or_pool(pool, adapter_meta), callback, opts]) end - defp get_conn_or_pool(pool) do - Process.get(key(pool), pool) + defp get_conn_or_pool(pool, adapter_meta) do + case :erlang.get(key(pool)) do + :undefined -> + case adapter_meta do + %{partition_supervisor: {name, _}} -> {:via, PartitionSupervisor, {name, self()}} + _ -> pool + end + + conn -> + conn + end end defp get_conn(pool) do From 1efedb3a112ce4ada7af2ab3ca2ea9803c5fb697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Mon, 7 Oct 2024 00:19:13 +0200 Subject: [PATCH 2/2] Raise if used with the sandbox --- lib/ecto/adapters/sql.ex | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/ecto/adapters/sql.ex b/lib/ecto/adapters/sql.ex index 50552342..ab43205c 100644 --- a/lib/ecto/adapters/sql.ex +++ b/lib/ecto/adapters/sql.ex @@ -888,11 +888,15 @@ defmodule Ecto.Adapters.SQL do opts: Keyword.take(config, @pool_opts) } - if pool_count > 1 and pool != DBConnection.Ownership do + if pool_count > 1 do if name == nil do raise ArgumentError, "the option :pool_count requires a :name" end + if pool == DBConnection.Ownership do + raise ArgumentError, "the option :pool_count does not work with the SQL sandbox" + end + name = Module.concat(name, PartitionSupervisor) partition_opts = [name: name, child_spec: child_spec, partitions: pool_count] child_spec = Supervisor.child_spec({PartitionSupervisor, partition_opts}, [])