Skip to content

Commit

Permalink
Add a connection listener that emits telemetry events (#311)
Browse files Browse the repository at this point in the history
* Add a connection listener that emits telemetry events

* Add documentation

* Address review comment

* Add usage instructions

* Fix wrong module name

* Documentation improvements
  • Loading branch information
v0idpwn authored Jul 1, 2024
1 parent 700cbf6 commit f0bbc4b
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 0 deletions.
112 changes: 112 additions & 0 deletions integration_test/cases/connection_listeners_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,116 @@ defmodule ConnectionListenersTest do
assert is_pid(conn3)
refute conn1 == conn2 == conn3
end

describe "telemetry listener" do
test "emits events with no tag" do
attach_telemetry_forwarding_handler()
err = RuntimeError.exception("oops")

stack = [
{:ok, :state},
{:disconnect, err, :discon},
:ok,
{:error, err}
]

{:ok, agent} = A.start_link(stack)
{:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link()

{:ok, pool} =
P.start_link(
agent: agent,
parent: self(),
connection_listeners: [telemetry_listener],
backoff_min: 1_000
)

assert_receive {:telemetry, :connected, %{tag: nil}}
assert P.close(pool, %Q{})
assert_receive {:telemetry, :disconnected, %{tag: nil}}
after
detach_telemetry_forwarding_handler()
end

test "emits events with tag" do
attach_telemetry_forwarding_handler()
err = RuntimeError.exception("oops")

stack = [
{:ok, :state},
{:disconnect, err, :discon},
:ok,
{:error, err}
]

{:ok, agent} = A.start_link(stack)
{:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link()

tag = make_ref()

{:ok, pool} =
P.start_link(
agent: agent,
parent: self(),
connection_listeners: {[telemetry_listener], tag},
backoff_min: 1_000
)

assert_receive {:telemetry, :connected, %{tag: ^tag}}
assert P.close(pool, %Q{})
assert_receive {:telemetry, :disconnected, %{tag: ^tag}}
after
detach_telemetry_forwarding_handler()
end

test "handles non-graceful disconnects" do
attach_telemetry_forwarding_handler()

stack = [
fn opts ->
send(opts[:parent], {:hi, self()})
{:ok, :state}
end,
{:ok, :state}
]

{:ok, agent} = A.start_link(stack)
{:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link()

{:ok, _pool} =
P.start_link(
agent: agent,
parent: self(),
connection_listeners: [telemetry_listener],
backoff_min: 1_000
)

assert_receive {:hi, pid}
Process.exit(pid, :kill)

assert_receive {:telemetry, :disconnected, %{pid: ^pid}}
after
detach_telemetry_forwarding_handler()
end
end

defp attach_telemetry_forwarding_handler() do
test_pid = self()

:telemetry.attach_many(
"TestHandler",
[
[:db_connection, :connected],
[:db_connection, :disconnected]
],
fn [:db_connection, action], _, metadata, _ ->
send(test_pid, {:telemetry, action, metadata})
end,
%{}
)
end

defp detach_telemetry_forwarding_handler() do
:telemetry.detach("TestHandler")
end
end
8 changes: 8 additions & 0 deletions lib/db_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,12 @@ defmodule DBConnection do
This feature is available since v2.6.0. Before this version `:connection_listeners` only
accepted a list of listener processes.
## Telemetry listener
DBConnection provides a connection listener that emits telemetry events upon
connection and disconnection, see the `DBConnection.TelemetryListener` module
for more info.
## Connection Configuration Callback
The `:configure` function will be called before each individual connection to the
Expand Down Expand Up @@ -560,6 +566,8 @@ defmodule DBConnection do
* `:opts` - All options given to the pool operation
See `DBConnection.TelemetryListener` for enabling `[:db_connection, :connected]`
and `[:db_connection, :disconnected]` events.
"""
@spec start_link(module, [start_option()] | Keyword.t()) :: GenServer.on_start()
def start_link(conn_mod, opts) do
Expand Down
109 changes: 109 additions & 0 deletions lib/db_connection/telemetry_listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
defmodule DBConnection.TelemetryListener do
@moduledoc """
A connection listener that emits telemetry events for connection and disconnection
It monitors connection processes and ensures that disconnection events are
always emitted.
## Usage
Start the listener, and pass it under the `:connection_listeners` option when
starting DBConnection:
{:ok, pid} = TelemetryListener.start_link()
{:ok, _conn} = DBConnection.start_link(SomeModule, connection_listeners: [pid])
# Using a tag, which will be sent in telemetry metadata
{:ok, _conn} = DBConnection.start_link(SomeModule, connection_listeners: {[pid], :my_tag})
# Or, with a Supervisor:
Supervisor.start_link([
{TelemetryListener, [name: MyListener]},
DBConnection.child_spec(SomeModule, connection_listeners: {[MyListener], :my_tag})
])
## Telemetry events
### Connected
`[:db_connection, :connected]` - Executed after a connection is established.
#### Measurements
* `:count` - Always 1
#### Metadata
* `:pid` - The connection pid
* `:tag` - The connection pool tag
### Disconnected
`[:db_connection, :disconnected]` - Executed after a disconnect.
#### Measurements
* `:count` - Always 1
#### Metadata
* `:pid` - The connection pid
* `:tag` - The connection pool tag
"""

use GenServer

@doc "Starts a telemetry listener"
@spec start_link(GenServer.options()) :: {:ok, pid()}
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, nil, opts)
end

@impl GenServer
def init(nil) do
{:ok, %{monitoring: %{}}}
end

@impl GenServer
def handle_info({:connected, pid, tag}, state) do
handle_connected(pid, tag, state)
end

def handle_info({:connected, pid}, state) do
handle_connected(pid, nil, state)
end

def handle_info({:disconnected, pid, _}, state) do
handle_disconnected(pid, state)
end

def handle_info({:disconnected, pid}, state) do
handle_disconnected(pid, state)
end

def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
handle_disconnected(pid, state)
end

defp handle_connected(pid, tag, state) do
:telemetry.execute([:db_connection, :connected], %{count: 1}, %{tag: tag, pid: pid})
ref = Process.monitor(pid)

{:noreply, put_in(state.monitoring[pid], {ref, tag})}
end

defp handle_disconnected(pid, state) do
case state.monitoring[pid] do
# Already handled. We may receive two messages: one from monitor and one
# from listener. For this reason, we need to handle both.
nil ->
{:noreply, state}

{ref, tag} ->
Process.demonitor(ref, [:flush])
:telemetry.execute([:db_connection, :disconnected], %{count: 1}, %{tag: tag, pid: pid})
{:noreply, %{state | monitoring: Map.delete(state.monitoring, pid)}}
end
end
end

0 comments on commit f0bbc4b

Please sign in to comment.