diff --git a/integration_test/cases/connection_listeners_test.exs b/integration_test/cases/connection_listeners_test.exs index d2d0f9a..d208574 100644 --- a/integration_test/cases/connection_listeners_test.exs +++ b/integration_test/cases/connection_listeners_test.exs @@ -203,4 +203,116 @@ defmodule ConnectionListenersTest do assert is_pid(conn3) refute conn1 == conn2 == conn3 end + + describe "telemetry listener" do + test "emits events with no tag" do + attach_telemetry_forwarding_handler() + err = RuntimeError.exception("oops") + + stack = [ + {:ok, :state}, + {:disconnect, err, :discon}, + :ok, + {:error, err} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link() + + {:ok, pool} = + P.start_link( + agent: agent, + parent: self(), + connection_listeners: [telemetry_listener], + backoff_min: 1_000 + ) + + assert_receive {:telemetry, :connected, %{tag: nil}} + assert P.close(pool, %Q{}) + assert_receive {:telemetry, :disconnected, %{tag: nil}} + after + detach_telemetry_forwarding_handler() + end + + test "emits events with tag" do + attach_telemetry_forwarding_handler() + err = RuntimeError.exception("oops") + + stack = [ + {:ok, :state}, + {:disconnect, err, :discon}, + :ok, + {:error, err} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link() + + tag = make_ref() + + {:ok, pool} = + P.start_link( + agent: agent, + parent: self(), + connection_listeners: {[telemetry_listener], tag}, + backoff_min: 1_000 + ) + + assert_receive {:telemetry, :connected, %{tag: ^tag}} + assert P.close(pool, %Q{}) + assert_receive {:telemetry, :disconnected, %{tag: ^tag}} + after + detach_telemetry_forwarding_handler() + end + + test "handles non-graceful disconnects" do + attach_telemetry_forwarding_handler() + + stack = [ + fn opts -> + send(opts[:parent], {:hi, self()}) + {:ok, :state} + end, + {:ok, :state} + ] + + {:ok, agent} = A.start_link(stack) + {:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link() + + {:ok, _pool} = + P.start_link( + agent: agent, + parent: self(), + connection_listeners: [telemetry_listener], + backoff_min: 1_000 + ) + + assert_receive {:hi, pid} + Process.exit(pid, :kill) + + assert_receive {:telemetry, :disconnected, %{pid: ^pid}} + after + detach_telemetry_forwarding_handler() + end + end + + defp attach_telemetry_forwarding_handler() do + test_pid = self() + + :telemetry.attach_many( + "TestHandler", + [ + [:db_connection, :connected], + [:db_connection, :disconnected] + ], + fn [:db_connection, action], _, metadata, _ -> + send(test_pid, {:telemetry, action, metadata}) + end, + %{} + ) + end + + defp detach_telemetry_forwarding_handler() do + :telemetry.detach("TestHandler") + end end diff --git a/lib/db_connection.ex b/lib/db_connection.ex index c863819..188ec48 100644 --- a/lib/db_connection.ex +++ b/lib/db_connection.ex @@ -524,6 +524,12 @@ defmodule DBConnection do This feature is available since v2.6.0. Before this version `:connection_listeners` only accepted a list of listener processes. + ## Telemetry listener + + DBConnection provides a connection listener that emits telemetry events upon + connection and disconnection, see the `DBConnection.TelemetryListener` module + for more info. + ## Connection Configuration Callback The `:configure` function will be called before each individual connection to the @@ -560,6 +566,8 @@ defmodule DBConnection do * `:opts` - All options given to the pool operation + See `DBConnection.TelemetryListener` for enabling `[:db_connection, :connected]` + and `[:db_connection, :disconnected]` events. """ @spec start_link(module, [start_option()] | Keyword.t()) :: GenServer.on_start() def start_link(conn_mod, opts) do diff --git a/lib/db_connection/telemetry_listener.ex b/lib/db_connection/telemetry_listener.ex new file mode 100644 index 0000000..5e1f16e --- /dev/null +++ b/lib/db_connection/telemetry_listener.ex @@ -0,0 +1,109 @@ +defmodule DBConnection.TelemetryListener do + @moduledoc """ + A connection listener that emits telemetry events for connection and disconnection + + It monitors connection processes and ensures that disconnection events are + always emitted. + + ## Usage + + Start the listener, and pass it under the `:connection_listeners` option when + starting DBConnection: + + {:ok, pid} = TelemetryListener.start_link() + {:ok, _conn} = DBConnection.start_link(SomeModule, connection_listeners: [pid]) + + # Using a tag, which will be sent in telemetry metadata + {:ok, _conn} = DBConnection.start_link(SomeModule, connection_listeners: {[pid], :my_tag}) + + # Or, with a Supervisor: + Supervisor.start_link([ + {TelemetryListener, [name: MyListener]}, + DBConnection.child_spec(SomeModule, connection_listeners: {[MyListener], :my_tag}) + ]) + + + ## Telemetry events + + ### Connected + + `[:db_connection, :connected]` - Executed after a connection is established. + + #### Measurements + + * `:count` - Always 1 + + #### Metadata + + * `:pid` - The connection pid + * `:tag` - The connection pool tag + + ### Disconnected + + `[:db_connection, :disconnected]` - Executed after a disconnect. + + #### Measurements + + * `:count` - Always 1 + + #### Metadata + + * `:pid` - The connection pid + * `:tag` - The connection pool tag + """ + + use GenServer + + @doc "Starts a telemetry listener" + @spec start_link(GenServer.options()) :: {:ok, pid()} + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, nil, opts) + end + + @impl GenServer + def init(nil) do + {:ok, %{monitoring: %{}}} + end + + @impl GenServer + def handle_info({:connected, pid, tag}, state) do + handle_connected(pid, tag, state) + end + + def handle_info({:connected, pid}, state) do + handle_connected(pid, nil, state) + end + + def handle_info({:disconnected, pid, _}, state) do + handle_disconnected(pid, state) + end + + def handle_info({:disconnected, pid}, state) do + handle_disconnected(pid, state) + end + + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + handle_disconnected(pid, state) + end + + defp handle_connected(pid, tag, state) do + :telemetry.execute([:db_connection, :connected], %{count: 1}, %{tag: tag, pid: pid}) + ref = Process.monitor(pid) + + {:noreply, put_in(state.monitoring[pid], {ref, tag})} + end + + defp handle_disconnected(pid, state) do + case state.monitoring[pid] do + # Already handled. We may receive two messages: one from monitor and one + # from listener. For this reason, we need to handle both. + nil -> + {:noreply, state} + + {ref, tag} -> + Process.demonitor(ref, [:flush]) + :telemetry.execute([:db_connection, :disconnected], %{count: 1}, %{tag: tag, pid: pid}) + {:noreply, %{state | monitoring: Map.delete(state.monitoring, pid)}} + end + end +end