Skip to content

Commit

Permalink
Merge pull request #22 from matter-labs/indexer-prometheus-metrics
Browse files Browse the repository at this point in the history
Indexer prometheus metrics
  • Loading branch information
hatemosphere authored Nov 30, 2022
2 parents 13f9760 + ab5a41f commit 3d611ac
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 6 deletions.
4 changes: 4 additions & 0 deletions apps/indexer/lib/indexer/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ defmodule Indexer.Application do
use Application

alias Indexer.Memory
alias Indexer.Prometheus.PendingBlockOperationsCollector
alias Prometheus.Registry

@impl Application
def start(_type, _args) do
Registry.register_collector(PendingBlockOperationsCollector)

memory_monitor_options =
case Application.get_env(:indexer, :memory_limit) do
nil -> %{}
Expand Down
11 changes: 10 additions & 1 deletion apps/indexer/lib/indexer/block/catchup/fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.Sequence
alias Indexer.Memory.Shrinkable
alias Indexer.Prometheus

@behaviour Block.Fetcher

Expand Down Expand Up @@ -93,6 +94,8 @@ defmodule Indexer.Block.Catchup.Fetcher do
|> Stream.map(&Enum.count/1)
|> Enum.sum()

Prometheus.Instrumenter.missing_blocks(missing_block_count)

Logger.debug(fn -> "Missed blocks in ranges." end,
missing_block_range_count: range_count,
missing_block_count: missing_block_count
Expand Down Expand Up @@ -202,21 +205,27 @@ defmodule Indexer.Block.Catchup.Fetcher do
) do
Logger.metadata(fetcher: :block_catchup, first_block_number: first, last_block_number: last)

case fetch_and_import_range(block_fetcher, range) do
{fetch_duration, result} = :timer.tc(fn -> fetch_and_import_range(block_fetcher, range) end)

Prometheus.Instrumenter.block_full_process(fetch_duration, __MODULE__)

case result do
{:ok, %{inserted: inserted, errors: errors}} ->
errors = cap_seq(sequence, errors)
retry(sequence, errors)

{:ok, inserted: inserted}

{:error, {:import = step, [%Changeset{} | _] = changesets}} = error ->
Prometheus.Instrumenter.import_errors()
Logger.error(fn -> ["failed to validate: ", inspect(changesets), ". Retrying."] end, step: step)

push_back(sequence, range)

error

{:error, {:import = step, reason}} = error ->
Prometheus.Instrumenter.import_errors()
Logger.error(fn -> [inspect(reason), ". Retrying."] end, step: step)

push_back(sequence, range)
Expand Down
13 changes: 10 additions & 3 deletions apps/indexer/lib/indexer/block/fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Indexer.Block.Fetcher do
UncleBlock
}

alias Indexer.Tracer
alias Indexer.{Prometheus, Tracer}

alias Indexer.Transform.{
AddressCoinBalances,
Expand Down Expand Up @@ -121,14 +121,16 @@ defmodule Indexer.Block.Fetcher do
_.._ = range
)
when callback_module != nil do
{fetch_time, fetched_blocks} =
:timer.tc(fn -> EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments) end)
with {:blocks,
{:ok,
%Blocks{
blocks_params: blocks_params,
transactions_params: transactions_params_without_receipts,
block_second_degree_relations_params: block_second_degree_relations_params,
errors: blocks_errors
}}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
}}} <- {:blocks, fetched_blocks},
blocks = TransformBlocks.transform_blocks(blocks_params),
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_params_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params,
Expand Down Expand Up @@ -182,6 +184,7 @@ defmodule Indexer.Block.Fetcher do
transactions: %{params: transactions_with_receipts}
}
) do
Prometheus.Instrumenter.block_batch_fetch(fetch_time, callback_module)
result = {:ok, %{inserted: inserted, errors: blocks_errors}}
update_block_cache(inserted[:blocks])
update_transactions_cache(inserted[:transactions])
Expand Down Expand Up @@ -233,7 +236,11 @@ defmodule Indexer.Block.Fetcher do
}
)

callback_module.import(state, options_with_broadcast)
{import_time, result} = :timer.tc(fn -> callback_module.import(state, options_with_broadcast) end)

no_blocks_to_import = length(options_with_broadcast.blocks.params)
Prometheus.Instrumenter.block_import(import_time / no_blocks_to_import, callback_module)
result
end

def async_import_token_instances(%{token_transfers: token_transfers}) do
Expand Down
27 changes: 25 additions & 2 deletions apps/indexer/lib/indexer/block/realtime/fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
alias Explorer.Counters.AverageBlockTime
alias Indexer.{Block, Tracer}
alias Indexer.Block.Realtime.TaskSupervisor
alias Indexer.Prometheus
alias Indexer.Transform.Addresses
alias Timex.Duration

Expand Down Expand Up @@ -281,8 +282,16 @@ defmodule Indexer.Block.Realtime.Fetcher do

@decorate span(tracer: Tracer)
defp do_fetch_and_import_block(block_number_to_fetch, block_fetcher, retry) do
case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do
{:ok, %{inserted: _, errors: []}} ->
time_before = Timex.now()

{fetch_duration, result} =
:timer.tc(fn -> fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) end)

Prometheus.Instrumenter.block_full_process(fetch_duration, __MODULE__)

case result do
{:ok, %{inserted: inserted, errors: []}} ->
log_import_timings(inserted, fetch_duration, time_before)
Logger.debug("Fetched and imported.")

{:ok, %{inserted: _, errors: [_ | _] = errors}} ->
Expand All @@ -295,6 +304,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
end)

{:error, {:import = step, [%Changeset{} | _] = changesets}} ->
Prometheus.Instrumenter.import_errors()

params = %{
changesets: changesets,
block_number_to_fetch: block_number_to_fetch,
Expand All @@ -318,6 +329,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
end

{:error, {:import = step, reason}} ->
Prometheus.Instrumenter.import_errors()
Logger.error(fn -> inspect(reason) end, step: step)

{:error, {step, reason}} ->
Expand Down Expand Up @@ -346,6 +358,17 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
end

defp log_import_timings(%{blocks: [%{number: number, timestamp: timestamp}]}, fetch_duration, time_before) do
node_delay = Timex.diff(time_before, timestamp, :seconds)
Prometheus.Instrumenter.node_delay(node_delay)

Logger.debug("Block #{number} fetching duration: #{fetch_duration / 1_000_000}s. Node delay: #{node_delay}s.",
fetcher: :block_import_timings
)
end

defp log_import_timings(_inserted, _duration, _time_before), do: nil

defp retry_fetch_and_import_block(%{retry: retry}) when retry < 1, do: :ignore

defp retry_fetch_and_import_block(%{changesets: changesets} = params) do
Expand Down
61 changes: 61 additions & 0 deletions apps/indexer/lib/indexer/prometheus/instrumenter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Indexer.Prometheus.Instrumenter do
@moduledoc """
Blocks fetch and import metrics for `Prometheus`.
"""

use Prometheus.Metric

@histogram [
name: :block_full_processing_duration_microseconds,
labels: [:fetcher],
buckets: [1000, 5000, 10000, 100_000],
duration_unit: :microseconds,
help: "Block whole processing time including fetch and import"
]

@histogram [
name: :block_import_duration_microseconds,
labels: [:fetcher],
buckets: [1000, 5000, 10000, 100_000],
duration_unit: :microseconds,
help: "Block import time"
]

@histogram [
name: :block_batch_fetch_request_duration_microseconds,
labels: [:fetcher],
buckets: [1000, 5000, 10000, 100_000],
duration_unit: :microseconds,
help: "Block fetch batch request processing time"
]

@gauge [name: :missing_block_count, help: "Number of missing blocks in the database"]

@gauge [name: :delay_from_last_node_block, help: "Delay from the last block on the node in seconds"]

@counter [name: :import_errors_count, help: "Number of database import errors"]

def block_full_process(time, fetcher) do
Histogram.observe([name: :block_full_processing_duration_microseconds, labels: [fetcher]], time)
end

def block_import(time, fetcher) do
Histogram.observe([name: :block_import_duration_microseconds, labels: [fetcher]], time)
end

def block_batch_fetch(time, fetcher) do
Histogram.observe([name: :block_batch_fetch_request_duration_microseconds, labels: [fetcher]], time)
end

def missing_blocks(missing_block_count) do
Gauge.set([name: :missing_block_count], missing_block_count)
end

def node_delay(delay) do
Gauge.set([name: :delay_from_last_node_block], delay)
end

def import_errors(error_count \\ 1) do
Counter.inc([name: :import_errors_count], error_count)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Indexer.Prometheus.PendingBlockOperationsCollector do
@moduledoc """
Custom collector to count number of records in pending_block_operations table.
"""

use Prometheus.Collector

alias Explorer.Chain.PendingBlockOperation
alias Explorer.Repo
alias Prometheus.Model

def collect_mf(_registry, callback) do
callback.(
create_gauge(
:pending_block_operations_count,
"Number of records in pending_block_operations table",
Repo.aggregate(PendingBlockOperation, :count)
)
)
end

def collect_metrics(:pending_block_operations_count, count) do
Model.gauge_metrics([{count}])
end

defp create_gauge(name, help, data) do
Model.create_mf(name, help, :gauge, __MODULE__, data)
end
end

0 comments on commit 3d611ac

Please sign in to comment.