Skip to content

Commit

Permalink
Merge pull request #18 from rkallos/perf/ets-striping
Browse files Browse the repository at this point in the history
Add Peep.Storage.Striped
  • Loading branch information
rkallos authored Oct 31, 2024
2 parents 42e8428 + 02c11a5 commit a0e84c8
Show file tree
Hide file tree
Showing 13 changed files with 1,371 additions and 781 deletions.
76 changes: 60 additions & 16 deletions lib/peep.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ defmodule Peep do
"""
use GenServer
require Logger
alias Peep.{EventHandler, Options, Storage, Statsd}
alias Peep.{EventHandler, Options, Statsd}

defmodule State do
@moduledoc false
defstruct tid: nil,
defstruct name: nil,
interval: nil,
handler_ids: nil,
statsd_opts: nil,
Expand All @@ -94,19 +94,63 @@ defmodule Peep do
end
end

def insert_metric(name, metric, value, tags) do
case Peep.Persistent.storage(name) do
{storage_mod, storage} ->
storage_mod.insert_metric(storage, metric, value, tags)

_ ->
nil
end
end

@doc """
Returns measurements about the size of a running Peep's storage, in number of
ETS elements and in bytes of memory.
"""
def storage_size(name) do
case Peep.Persistent.storage(name) do
{storage_mod, storage} ->
storage_mod.storage_size(storage)

_ ->
nil
end
end

@doc """
Fetches all metrics from the worker. Called when preparing Prometheus or
StatsD data.
"""
defdelegate get_all_metrics(name_or_pid), to: Peep.Storage
def get_all_metrics(name) do
case Peep.Persistent.storage(name) do
{storage_mod, storage} ->
storage_mod.get_all_metrics(storage)

_ ->
nil
end
end

@doc """
Fetches a single metric from storage. Currently only used in tests.
"""
def get_metric(name, metric, tags) do
case Peep.Persistent.storage(name) do
{storage_mod, storage} ->
storage_mod.get_metric(storage, metric, tags)

_ ->
nil
end
end

@impl true
def init(options) do
Process.flag(:trap_exit, true)
tid = Storage.new(options.name)

name = options.name
metrics = options.metrics
handler_ids = EventHandler.attach(metrics, tid, options.global_tags)
handler_ids = EventHandler.attach(metrics, name, options.global_tags)

statsd_opts = options.statsd
statsd_flush_interval = statsd_opts[:flush_interval_ms]
Expand All @@ -122,8 +166,12 @@ defmodule Peep do
nil
end

:ok =
Peep.Persistent.new(options)
|> Peep.Persistent.store()

state = %State{
tid: tid,
name: name,
handler_ids: handler_ids,
statsd_opts: statsd_opts,
statsd_state: statsd_state
Expand All @@ -132,22 +180,17 @@ defmodule Peep do
{:ok, state}
end

@impl true
def handle_call(:get_all_metrics, _from, %State{tid: tid} = state) do
{:reply, Storage.get_all_metrics(tid), state}
end

@impl true
def handle_info(:statsd_flush, %State{statsd_state: nil} = state) do
{:noreply, state}
end

def handle_info(
:statsd_flush,
%State{tid: tid, statsd_state: statsd_state, statsd_opts: statsd_opts} = state
%State{name: name, statsd_state: statsd_state, statsd_opts: statsd_opts} = state
) do
new_statsd_state =
Storage.get_all_metrics(tid)
Peep.get_all_metrics(name)
|> Statsd.make_and_send_packets(statsd_state)

set_statsd_timer(statsd_opts[:flush_interval_ms])
Expand All @@ -158,14 +201,15 @@ defmodule Peep do
# In particular, OTP can sometimes leak `:inet_reply` messages when a UDS datagram
# socket blocks, and Peep should not terminate the server and lose state when that
# happens.
#
#
# https://github.com/rkallos/peep/pull/17
# https://github.com/erlang/otp/issues/8989
{:noreply, state}
end

@impl true
def terminate(_reason, %{handler_ids: handler_ids}) do
def terminate(_reason, %{name: name, handler_ids: handler_ids}) do
Peep.Persistent.erase(name)
EventHandler.detach(handler_ids)
end

Expand Down
15 changes: 7 additions & 8 deletions lib/peep/event_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@ defmodule Peep.EventHandler do
@moduledoc false
require Logger

alias Peep.Storage
alias Telemetry.Metrics.{Counter, Summary, Distribution}

def attach(metrics, tid, global_tags) do
def attach(metrics, name, global_tags) do
metrics_by_event = Enum.group_by(metrics, & &1.event_name)

for {event_name, metrics} <- metrics_by_event do
filtered_metrics = Enum.filter(metrics, &allow_metric?/1)
handler_id = handler_id(event_name, tid)
handler_id = handler_id(event_name, name)

:ok =
:telemetry.attach(
handler_id,
event_name,
&__MODULE__.handle_event/4,
%{
tid: tid,
name: name,
metrics: filtered_metrics,
global_tags: global_tags
}
Expand All @@ -33,7 +32,7 @@ defmodule Peep.EventHandler do
end

def handle_event(_event, measurements, metadata, %{
tid: tid,
name: name,
metrics: metrics,
global_tags: global_tags
}) do
Expand All @@ -45,13 +44,13 @@ defmodule Peep.EventHandler do

tags = Map.new(metric.tags, &{&1, Map.get(tag_values, &1, "")})

Storage.insert_metric(tid, metric, value, tags)
Peep.insert_metric(name, metric, value, tags)
end
end
end

defp handler_id(event_name, tid) do
{__MODULE__, tid, event_name}
defp handler_id(event_name, peep_name) do
{__MODULE__, peep_name, event_name}
end

defp keep?(%{keep: nil}, _metadata), do: true
Expand Down
10 changes: 10 additions & 0 deletions lib/peep/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ defmodule Peep.Options do
doc:
"Additional tags published with every metric. " <>
"Global tags are overriden by the tags specified in the metric definition."
],
storage: [
type: {:in, [:default, :striped]},
default: :default,
doc:
"Which storage implementation to use. " <>
"`:default` uses a single ETS table, with some optimizations for concurrent writing. " <>
"`:striped` uses one ETS table per scheduler thread, " <>
"which trades memory for less lock contention for concurrent writes."
]
]

Expand All @@ -69,6 +78,7 @@ defmodule Peep.Options do
"""

defstruct Keyword.keys(@schema)
@type t() :: %__MODULE__{}

@spec docs() :: String.t()
def docs do
Expand Down
66 changes: 66 additions & 0 deletions lib/peep/persistent.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule Peep.Persistent do
@moduledoc false
defstruct [:name, :storage]

@type name() :: atom()

@typep storage_default() :: {:default, :ets.tid()}
@typep storage_striped() :: {:striped, %{pos_integer() => :ets.tid()}}
@typep storage() :: storage_default() | storage_striped()

@type t() :: %__MODULE__{name: name(), storage: storage()}

@spec new(Peep.Options.t()) :: t()
def new(%Peep.Options{} = options) do
%Peep.Options{name: name, storage: storage_impl} = options

storage =
case storage_impl do
:default ->
{:default, Peep.Storage.ETS.new()}

:striped ->
{:striped, Peep.Storage.Striped.new()}
end

%__MODULE__{
name: name,
storage: storage
}
end

@spec store(t()) :: :ok
def store(%__MODULE__{} = term) do
%__MODULE__{name: name} = term
:persistent_term.put(key(name), term)
end

@spec fetch(name()) :: t() | nil
def fetch(name) when is_atom(name) do
:persistent_term.get(key(name), nil)
end

@spec erase(name()) :: :ok
def erase(name) when is_atom(name) do
:persistent_term.erase(name)
:ok
end

@spec storage(name()) :: {module(), term()} | nil
def storage(name) when is_atom(name) do
case fetch(name) do
%__MODULE__{storage: {:default, tid}} ->
{Peep.Storage.ETS, tid}

%__MODULE__{storage: {:striped, tids}} ->
{Peep.Storage.Striped, tids}

_ ->
nil
end
end

defp key(name) when is_atom(name) do
{Peep, name}
end
end
Loading

0 comments on commit a0e84c8

Please sign in to comment.