diff --git a/apps/indexer/lib/indexer/application.ex b/apps/indexer/lib/indexer/application.ex index f261ba8057e5..a65a2660f834 100644 --- a/apps/indexer/lib/indexer/application.ex +++ b/apps/indexer/lib/indexer/application.ex @@ -6,9 +6,13 @@ defmodule Indexer.Application do use Application alias Indexer.Memory + alias Indexer.Prometheus.PendingBlockOperationsCollector + alias Prometheus.Registry @impl Application def start(_type, _args) do + Registry.register_collector(PendingBlockOperationsCollector) + memory_monitor_options = case Application.get_env(:indexer, :memory_limit) do nil -> %{} diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index a870077a5a5a..93dfeca104e3 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -26,6 +26,7 @@ defmodule Indexer.Block.Catchup.Fetcher do alias Indexer.{Block, Tracer} alias Indexer.Block.Catchup.Sequence alias Indexer.Memory.Shrinkable + alias Indexer.Prometheus @behaviour Block.Fetcher @@ -93,6 +94,8 @@ defmodule Indexer.Block.Catchup.Fetcher do |> Stream.map(&Enum.count/1) |> Enum.sum() + Prometheus.Instrumenter.missing_blocks(missing_block_count) + Logger.debug(fn -> "Missed blocks in ranges." end, missing_block_range_count: range_count, missing_block_count: missing_block_count @@ -202,7 +205,11 @@ defmodule Indexer.Block.Catchup.Fetcher do ) do Logger.metadata(fetcher: :block_catchup, first_block_number: first, last_block_number: last) - case fetch_and_import_range(block_fetcher, range) do + {fetch_duration, result} = :timer.tc(fn -> fetch_and_import_range(block_fetcher, range) end) + + Prometheus.Instrumenter.block_full_process(fetch_duration, __MODULE__) + + case result do {:ok, %{inserted: inserted, errors: errors}} -> errors = cap_seq(sequence, errors) retry(sequence, errors) @@ -210,6 +217,7 @@ defmodule Indexer.Block.Catchup.Fetcher do {:ok, inserted: inserted} {:error, {:import = step, [%Changeset{} | _] = changesets}} = error -> + Prometheus.Instrumenter.import_errors() Logger.error(fn -> ["failed to validate: ", inspect(changesets), ". Retrying."] end, step: step) push_back(sequence, range) @@ -217,6 +225,7 @@ defmodule Indexer.Block.Catchup.Fetcher do error {:error, {:import = step, reason}} = error -> + Prometheus.Instrumenter.import_errors() Logger.error(fn -> [inspect(reason), ". Retrying."] end, step: step) push_back(sequence, range) diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 591e8bc9ed96..2fb3c7713dd2 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -29,7 +29,7 @@ defmodule Indexer.Block.Fetcher do UncleBlock } - alias Indexer.Tracer + alias Indexer.{Prometheus, Tracer} alias Indexer.Transform.{ AddressCoinBalances, @@ -121,6 +121,8 @@ defmodule Indexer.Block.Fetcher do _.._ = range ) when callback_module != nil do + {fetch_time, fetched_blocks} = + :timer.tc(fn -> EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments) end) with {:blocks, {:ok, %Blocks{ @@ -128,7 +130,7 @@ defmodule Indexer.Block.Fetcher do transactions_params: transactions_params_without_receipts, block_second_degree_relations_params: block_second_degree_relations_params, errors: blocks_errors - }}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, + }}} <- {:blocks, fetched_blocks}, blocks = TransformBlocks.transform_blocks(blocks_params), {:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_params_without_receipts)}, %{logs: logs, receipts: receipts} = receipt_params, @@ -182,6 +184,7 @@ defmodule Indexer.Block.Fetcher do transactions: %{params: transactions_with_receipts} } ) do + Prometheus.Instrumenter.block_batch_fetch(fetch_time, callback_module) result = {:ok, %{inserted: inserted, errors: blocks_errors}} update_block_cache(inserted[:blocks]) update_transactions_cache(inserted[:transactions]) @@ -233,7 +236,11 @@ defmodule Indexer.Block.Fetcher do } ) - callback_module.import(state, options_with_broadcast) + {import_time, result} = :timer.tc(fn -> callback_module.import(state, options_with_broadcast) end) + + no_blocks_to_import = length(options_with_broadcast.blocks.params) + Prometheus.Instrumenter.block_import(import_time / no_blocks_to_import, callback_module) + result end def async_import_token_instances(%{token_transfers: token_transfers}) do diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index ea96a3e34443..e91da43ce7d8 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -32,6 +32,7 @@ defmodule Indexer.Block.Realtime.Fetcher do alias Explorer.Counters.AverageBlockTime alias Indexer.{Block, Tracer} alias Indexer.Block.Realtime.TaskSupervisor + alias Indexer.Prometheus alias Indexer.Transform.Addresses alias Timex.Duration @@ -281,8 +282,16 @@ defmodule Indexer.Block.Realtime.Fetcher do @decorate span(tracer: Tracer) defp do_fetch_and_import_block(block_number_to_fetch, block_fetcher, retry) do - case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do - {:ok, %{inserted: _, errors: []}} -> + time_before = Timex.now() + + {fetch_duration, result} = + :timer.tc(fn -> fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) end) + + Prometheus.Instrumenter.block_full_process(fetch_duration, __MODULE__) + + case result do + {:ok, %{inserted: inserted, errors: []}} -> + log_import_timings(inserted, fetch_duration, time_before) Logger.debug("Fetched and imported.") {:ok, %{inserted: _, errors: [_ | _] = errors}} -> @@ -295,6 +304,8 @@ defmodule Indexer.Block.Realtime.Fetcher do end) {:error, {:import = step, [%Changeset{} | _] = changesets}} -> + Prometheus.Instrumenter.import_errors() + params = %{ changesets: changesets, block_number_to_fetch: block_number_to_fetch, @@ -318,6 +329,7 @@ defmodule Indexer.Block.Realtime.Fetcher do end {:error, {:import = step, reason}} -> + Prometheus.Instrumenter.import_errors() Logger.error(fn -> inspect(reason) end, step: step) {:error, {step, reason}} -> @@ -346,6 +358,17 @@ defmodule Indexer.Block.Realtime.Fetcher do end end + defp log_import_timings(%{blocks: [%{number: number, timestamp: timestamp}]}, fetch_duration, time_before) do + node_delay = Timex.diff(time_before, timestamp, :seconds) + Prometheus.Instrumenter.node_delay(node_delay) + + Logger.debug("Block #{number} fetching duration: #{fetch_duration / 1_000_000}s. Node delay: #{node_delay}s.", + fetcher: :block_import_timings + ) + end + + defp log_import_timings(_inserted, _duration, _time_before), do: nil + defp retry_fetch_and_import_block(%{retry: retry}) when retry < 1, do: :ignore defp retry_fetch_and_import_block(%{changesets: changesets} = params) do diff --git a/apps/indexer/lib/indexer/prometheus/instrumenter.ex b/apps/indexer/lib/indexer/prometheus/instrumenter.ex new file mode 100644 index 000000000000..d9dfda5dd8a3 --- /dev/null +++ b/apps/indexer/lib/indexer/prometheus/instrumenter.ex @@ -0,0 +1,61 @@ +defmodule Indexer.Prometheus.Instrumenter do + @moduledoc """ + Blocks fetch and import metrics for `Prometheus`. + """ + + use Prometheus.Metric + + @histogram [ + name: :block_full_processing_duration_microseconds, + labels: [:fetcher], + buckets: [1000, 5000, 10000, 100_000], + duration_unit: :microseconds, + help: "Block whole processing time including fetch and import" + ] + + @histogram [ + name: :block_import_duration_microseconds, + labels: [:fetcher], + buckets: [1000, 5000, 10000, 100_000], + duration_unit: :microseconds, + help: "Block import time" + ] + + @histogram [ + name: :block_batch_fetch_request_duration_microseconds, + labels: [:fetcher], + buckets: [1000, 5000, 10000, 100_000], + duration_unit: :microseconds, + help: "Block fetch batch request processing time" + ] + + @gauge [name: :missing_block_count, help: "Number of missing blocks in the database"] + + @gauge [name: :delay_from_last_node_block, help: "Delay from the last block on the node in seconds"] + + @counter [name: :import_errors_count, help: "Number of database import errors"] + + def block_full_process(time, fetcher) do + Histogram.observe([name: :block_full_processing_duration_microseconds, labels: [fetcher]], time) + end + + def block_import(time, fetcher) do + Histogram.observe([name: :block_import_duration_microseconds, labels: [fetcher]], time) + end + + def block_batch_fetch(time, fetcher) do + Histogram.observe([name: :block_batch_fetch_request_duration_microseconds, labels: [fetcher]], time) + end + + def missing_blocks(missing_block_count) do + Gauge.set([name: :missing_block_count], missing_block_count) + end + + def node_delay(delay) do + Gauge.set([name: :delay_from_last_node_block], delay) + end + + def import_errors(error_count \\ 1) do + Counter.inc([name: :import_errors_count], error_count) + end +end diff --git a/apps/indexer/lib/indexer/prometheus/pending_block_operations_collector.ex b/apps/indexer/lib/indexer/prometheus/pending_block_operations_collector.ex new file mode 100644 index 000000000000..e1e836a6bcfa --- /dev/null +++ b/apps/indexer/lib/indexer/prometheus/pending_block_operations_collector.ex @@ -0,0 +1,29 @@ +defmodule Indexer.Prometheus.PendingBlockOperationsCollector do + @moduledoc """ + Custom collector to count number of records in pending_block_operations table. + """ + + use Prometheus.Collector + + alias Explorer.Chain.PendingBlockOperation + alias Explorer.Repo + alias Prometheus.Model + + def collect_mf(_registry, callback) do + callback.( + create_gauge( + :pending_block_operations_count, + "Number of records in pending_block_operations table", + Repo.aggregate(PendingBlockOperation, :count) + ) + ) + end + + def collect_metrics(:pending_block_operations_count, count) do + Model.gauge_metrics([{count}]) + end + + defp create_gauge(name, help, data) do + Model.create_mf(name, help, :gauge, __MODULE__, data) + end +end