diff --git a/apps/omg/lib/omg/state.ex b/apps/omg/lib/omg/state.ex index d7016ffff5..774f36b276 100644 --- a/apps/omg/lib/omg/state.ex +++ b/apps/omg/lib/omg/state.ex @@ -261,10 +261,10 @@ defmodule OMG.State do - pushes the new block to subscribers of `"blocks"` internal event bus topic """ def handle_cast(:form_block, state) do - _ = Logger.debug("Forming new block...") + _ = Logger.info("Forming new block...") state = Core.claim_fees(state) {:ok, {%Block{number: blknum} = block, db_updates}, new_state} = Core.form_block(state) - _ = Logger.debug("Formed new block ##{blknum}") + _ = Logger.info("Formed new block ##{blknum}") # persistence is required to be here, since propagating the block onwards requires restartability including the # new block diff --git a/apps/omg_child_chain/lib/omg_child_chain/block_queue.ex b/apps/omg_child_chain/lib/omg_child_chain/block_queue.ex index 6f8a2dfa09..a756db5cf3 100644 --- a/apps/omg_child_chain/lib/omg_child_chain/block_queue.ex +++ b/apps/omg_child_chain/lib/omg_child_chain/block_queue.ex @@ -194,6 +194,11 @@ defmodule OMG.ChildChain.BlockQueue do {:noreply, state1} end + @doc false + # Ignore unrelated events + def handle_info({:internal_event_bus, :block_submitting, _}, state), do: {:noreply, state} + def handle_info({:internal_event_bus, :block_submitted, _}, state), do: {:noreply, state} + # private (server) @spec submit_blocks(Core.t()) :: :ok @@ -205,6 +210,7 @@ defmodule OMG.ChildChain.BlockQueue do defp submit(submission) do _ = Logger.info("Submitting: #{inspect(submission)}") + _ = publish_block_submitting_event(submission.num) submit_result = Eth.submit_block(submission.hash, submission.nonce, submission.gas_price) newest_mined_blknum = RootChain.get_mined_child_block() @@ -218,6 +224,7 @@ defmodule OMG.ChildChain.BlockQueue do error {:ok, txhash} -> + _ = publish_block_submitted_event(submission.num) _ = GasAnalyzer.enqueue(txhash) _ = Balance.check() :ok @@ -229,6 +236,34 @@ defmodule OMG.ChildChain.BlockQueue do :ok = final_result end + # Publishes each time a block is being submitted, regardless of failing or being successful. + # This differs from `:enqueue_block` which publishes only once when the block is formed. + # For example, when a block is re-submitted 3 times before it got accepted, there would be + # 1 x `:enqueue_block` and 3 x `:block_submitting` events published. + # + # The telemetry event is emitted for raw metrics propagation. The bus event is published so + # a consumer, potentially a GenServer or other processes can perform extra operations on the data. + defp publish_block_submitting_event(blknum) do + _ = :telemetry.execute([:blknum_submitting, __MODULE__], %{blknum: blknum}) + + {:child_chain, "blocks"} + |> OMG.Bus.Event.new(:block_submitting, blknum) + |> OMG.Bus.direct_local_broadcast() + end + + # Publishes when a block is successfully submitted. Only 1 `block_submitted` event will ever + # be published for each block. + # + # The telemetry event is emitted for raw metrics propagation. The bus event is published so + # a consumer, potentially a GenServer or other processes can perform extra operations on the data. + defp publish_block_submitted_event(blknum) do + _ = :telemetry.execute([:blknum_submitted, __MODULE__], %{blknum: blknum}) + + {:child_chain, "blocks"} + |> OMG.Bus.Event.new(:block_submitted, blknum) + |> OMG.Bus.direct_local_broadcast() + end + defp log_init_error(fields) do fields = Keyword.update!(fields, :known_hashes, fn hashes -> Enum.map(hashes, &Encoding.to_hex/1) end) diff --git a/apps/omg_child_chain/lib/omg_child_chain/block_queue/measure.ex b/apps/omg_child_chain/lib/omg_child_chain/block_queue/measure.ex index 6629db99f9..e61f33688b 100644 --- a/apps/omg_child_chain/lib/omg_child_chain/block_queue/measure.ex +++ b/apps/omg_child_chain/lib/omg_child_chain/block_queue/measure.ex @@ -27,16 +27,23 @@ defmodule OMG.ChildChain.BlockQueue.Measure do alias OMG.ChildChain.BlockQueue alias OMG.ChildChain.BlockQueue.Balance alias OMG.ChildChain.BlockQueue.GasAnalyzer + alias OMG.ChildChain.BlockQueue.SubmissionMonitor alias OMG.Status.Metric.Datadog @supported_events [ [:process, BlockQueue], + # Events triggered when a block is submitting or submitted + [:blknum_submitting, BlockQueue], + [:blknum_submitted, BlockQueue], + # Events providing collections of blocks being submitted or stalled + [:blocks_submitting, SubmissionMonitor], + [:blocks_stalled, SubmissionMonitor], [:gas, GasAnalyzer], [:authority_balance, Balance] ] def supported_events(), do: @supported_events - def handle_event([:process, BlockQueue], _, _state, _config) do + def handle_event([:process, BlockQueue], _measurements, _metadata, _config) do value = self() |> Process.info(:message_queue_len) @@ -47,11 +54,25 @@ defmodule OMG.ChildChain.BlockQueue.Measure do def handle_event([:gas, GasAnalyzer], %{gas: gas}, _, _config) do gwei = div(gas, 1_000_000_000) - _ = Datadog.gauge(name(:block_submission), gwei) + _ = Datadog.gauge(name(:block_submission_gas), gwei) end def handle_event([:authority_balance, Balance], %{authority_balance: authority_balance}, _, _config) do gwei = div(authority_balance, 1_000_000_000) _ = Datadog.gauge(name(:authority_balance), gwei) end + + def handle_event([:blknum_submitting, BlockQueue], %{blknum: blknum}, _, _config) do + _ = Datadog.gauge(name(:block_queue_blknum_submitting), blknum) + _ = Datadog.increment(name(:block_submission_attempt), 1) + end + + def handle_event([:blknum_submitted, BlockQueue], %{blknum: blknum}, _, _config) do + _ = Datadog.gauge(name(:block_queue_blknum_submitted), blknum) + _ = Datadog.increment(name(:block_submission_success), 1) + end + + def handle_event([:blocks_stalled, SubmissionMonitor], %{blocks: blocks}, _, _config) do + _ = Datadog.gauge(name(:block_queue_num_blocks_stalled), length(blocks)) + end end diff --git a/apps/omg_child_chain/lib/omg_child_chain/block_queue/monitor/alarm_handler.ex b/apps/omg_child_chain/lib/omg_child_chain/block_queue/monitor/alarm_handler.ex new file mode 100644 index 0000000000..b8c6aadddd --- /dev/null +++ b/apps/omg_child_chain/lib/omg_child_chain/block_queue/monitor/alarm_handler.ex @@ -0,0 +1,48 @@ +# Copyright 2019-2020 OmiseGO Pte Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule OMG.ChildChain.BlockQueue.Monitor.AlarmHandler do + @moduledoc """ + Listens for :block_submit_stalled alarms and reflect the alarm's state back to the monitor. + """ + require Logger + + # The alarm reporter and monitor happen to be the same module here because we are just + # reflecting the alarm's state back to the reporter. + @reporter OMG.ChildChain.BlockQueue.Monitor + @monitor OMG.ChildChain.BlockQueue.Monitor + + def init(_args) do + {:ok, %{}} + end + + def handle_call(_request, state), do: {:ok, :ok, state} + + def handle_event({:set_alarm, {:block_submit_stalled, %{reporter: @reporter}}}, state) do + _ = Logger.warn(":block_submit_stalled alarm raised.") + :ok = GenServer.cast(@monitor, {:set_alarm, :block_submit_stalled}) + {:ok, state} + end + + def handle_event({:clear_alarm, {:block_submit_stalled, %{reporter: @reporter}}}, state) do + _ = Logger.warn(":block_submit_stalled alarm cleared.") + :ok = GenServer.cast(@monitor, {:clear_alarm, :block_submit_stalled}) + {:ok, state} + end + + def handle_event(event, state) do + _ = Logger.info("#{__MODULE__} got event: #{inspect(event)}. Ignoring.") + {:ok, state} + end +end diff --git a/apps/omg_child_chain/lib/omg_child_chain/block_queue/submission_monitor.ex b/apps/omg_child_chain/lib/omg_child_chain/block_queue/submission_monitor.ex new file mode 100644 index 0000000000..dcca892066 --- /dev/null +++ b/apps/omg_child_chain/lib/omg_child_chain/block_queue/submission_monitor.ex @@ -0,0 +1,172 @@ +# Copyright 2019-2020 OmiseGO Pte Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule OMG.ChildChain.BlockQueue.SubmissionMonitor do + @moduledoc """ + Listens to block events and raises :block_submit_stalled alarm when a pending block + doesn't get successfully submitted within the specified time threshold. + """ + use GenServer + require Logger + + defstruct pending_blocks: [], + root_chain_height: 0, + stall_threshold_blocks: 4, + alarm_module: nil, + alarm_raised: false + + @typep blknum() :: pos_integer() + @typep pending_block() :: {blknum :: blknum(), first_submit_height :: pos_integer()} + + @type t() :: %__MODULE__{ + pending_blocks: [pending_block()], + root_chain_height: non_neg_integer(), + stall_threshold_blocks: pos_integer(), + alarm_module: module(), + alarm_raised: boolean() + } + + # + # GenServer APIs + # + + def start_link(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end + + # + # GenServer behaviors + # + + def init(opts) do + _ = Logger.info("Starting #{__MODULE__}") + _ = install_alarm_handler() + event_bus = Keyword.fetch!(opts, :event_bus_module) + check_interval_ms = Keyword.fetch!(opts, :check_interval_ms) + + state = %__MODULE__{ + pending_blocks: [], + stall_threshold_blocks: Keyword.fetch!(opts, :stall_threshold_blocks), + alarm_module: Keyword.fetch!(opts, :alarm_module), + alarm_raised: false + } + + :ok = event_bus.subscribe({:child_chain, "blocks"}, link: true) + :ok = event_bus.subscribe({:root_chain, "ethereum_new_height"}, link: true) + + {:ok, _} = :timer.send_interval(check_interval_ms, self(), :check_stall) + {:ok, state} + end + + def handle_info(:check_stall, state) do + stalled_blocks = + Enum.filter(state.pending_blocks, fn {_blknum, first_submit_height} -> + state.root_chain_height - first_submit_height >= state.stall_threshold_blocks + end) + + _ = :telemetry.execute([:blocks_stalled, __MODULE__], %{blocks: stalled_blocks}) + _ = log_stalled_blocks(stalled_blocks, state.root_chain_height) + _ = trigger_alarm(state.alarm_module, state.alarm_raised, stalled_blocks) + + {:noreply, state} + end + + # Keeps track of the latest root chain height + def handle_info({:internal_event_bus, :ethereum_new_height, new_height}, state) do + {:noreply, %{state | root_chain_height: new_height}} + end + + # Listens for a block being submitted and add it to monitoring if it hasn't been tracked + def handle_info({:internal_event_bus, :block_submitting, blknum}, state) do + pending_blocks = add_new_blknum(state.pending_blocks, blknum, state.root_chain_height) + {:noreply, %{state | pending_blocks: pending_blocks}} + end + + # Listens for a block that got submitted and drop it from monitoring + def handle_info({:internal_event_bus, :block_submitted, blknum}, state) do + pending_blocks = remove_blknum(state.pending_blocks, blknum) + {:noreply, %{state | pending_blocks: pending_blocks}} + end + + # Ignore unrelated events + def handle_info({:internal_event_bus, :enqueue_block, _}, state) do + {:noreply, state} + end + + # + # Handle incoming alarms + # + # These functions are called by the AlarmHandler so that this monitor process can update + # its internal state according to the raised alarms. + # + def handle_cast({:set_alarm, :block_submit_stalled}, state) do + {:noreply, %{state | alarm_raised: true}} + end + + def handle_cast({:clear_alarm, :block_submit_stalled}, state) do + {:noreply, %{state | alarm_raised: false}} + end + + # + # Private functions + # + + # Add the blknum to tracking only if it is not already tracked + @spec add_new_blknum([{blknum(), any()}], blknum(), non_neg_integer()) :: [pending_block()] + defp add_new_blknum(pending_blocks, blknum, root_chain_height) do + case Enum.any?(pending_blocks, fn {pending_blknum, _} -> pending_blknum == blknum end) do + true -> pending_blocks + false -> [{blknum, root_chain_height} | pending_blocks] + end + end + + @spec remove_blknum([{blknum(), any()}], blknum()) :: [pending_block()] + defp remove_blknum(pending_blocks, blknum) do + Enum.reject(pending_blocks, fn {pending_blknum, _} -> pending_blknum == blknum end) + end + + # + # Alarms management + # + + defp install_alarm_handler() do + case Enum.member?(:gen_event.which_handlers(:alarm_handler), __MODULE__.AlarmHandler) do + true -> :ok + _ -> :alarm_handler.add_alarm_handler(__MODULE__.AlarmHandler) + end + end + + @spec trigger_alarm(alarm :: module(), alarm_raised :: boolean(), stalled_blocks :: [blknum()]) :: :ok + defp trigger_alarm(alarm_module, false, stalled_blocks) when length(stalled_blocks) >= 1 do + alarm_module.set(alarm_module.block_submit_stalled(__MODULE__)) + end + + defp trigger_alarm(alarm_module, true, []) do + alarm_module.clear(alarm_module.block_submit_stalled(__MODULE__)) + end + + defp trigger_alarm(_alarm_module, _alarm_raised, _stalled_blocks), do: :ok + + # + # Logging + # + defp log_stalled_blocks([], _), do: :ok + + defp log_stalled_blocks(stalled_blocks, root_chain_height) do + Logger.warn( + "#{__MODULE__}: Stalled blocks: #{inspect(stalled_blocks)}. " <> + "Current height: #{inspect(root_chain_height)}" + ) + end +end diff --git a/apps/omg_child_chain/lib/omg_child_chain/configuration.ex b/apps/omg_child_chain/lib/omg_child_chain/configuration.ex index 6f9a78d611..a47b004b6b 100644 --- a/apps/omg_child_chain/lib/omg_child_chain/configuration.ex +++ b/apps/omg_child_chain/lib/omg_child_chain/configuration.ex @@ -18,35 +18,40 @@ defmodule OMG.ChildChain.Configuration do """ @app :omg_child_chain - @spec metrics_collection_interval() :: no_return | pos_integer() + @spec metrics_collection_interval() :: no_return() | pos_integer() def metrics_collection_interval() do Application.fetch_env!(@app, :metrics_collection_interval) end - @spec block_queue_eth_height_check_interval_ms() :: no_return | pos_integer() + @spec block_queue_eth_height_check_interval_ms() :: no_return() | pos_integer() def block_queue_eth_height_check_interval_ms() do Application.fetch_env!(@app, :block_queue_eth_height_check_interval_ms) end - @spec submission_finality_margin() :: no_return | pos_integer() + @spec submission_finality_margin() :: no_return() | pos_integer() def submission_finality_margin() do Application.fetch_env!(@app, :submission_finality_margin) end - @spec block_submit_every_nth() :: no_return | pos_integer() + @spec block_submit_every_nth() :: no_return() | pos_integer() def block_submit_every_nth() do Application.fetch_env!(@app, :block_submit_every_nth) end - @spec block_submit_max_gas_price() :: no_return | pos_integer() + @spec block_submit_max_gas_price() :: no_return() | pos_integer() def block_submit_max_gas_price() do Application.fetch_env!(@app, :block_submit_max_gas_price) end + @spec block_submit_stall_threshold_blocks() :: pos_integer() | no_return() + def block_submit_stall_threshold_blocks() do + Application.fetch_env!(@app, :block_submit_stall_threshold_blocks) + end + @doc """ Prepares options Keyword for the FeeServer process """ - @spec fee_server_opts() :: no_return | Keyword.t() + @spec fee_server_opts() :: no_return() | Keyword.t() def fee_server_opts() do fee_server_opts = [ fee_adapter_check_interval_ms: Application.fetch_env!(@app, :fee_adapter_check_interval_ms), @@ -58,7 +63,7 @@ defmodule OMG.ChildChain.Configuration do Keyword.merge(fee_server_opts, fee_adapter: adapter, fee_adapter_opts: adapter_opts) end - @spec fee_adapter_opts() :: no_return | tuple() + @spec fee_adapter_opts() :: no_return() | tuple() defp fee_adapter_opts() do Application.fetch_env!(@app, :fee_adapter) end diff --git a/apps/omg_child_chain/lib/omg_child_chain/sync_supervisor.ex b/apps/omg_child_chain/lib/omg_child_chain/sync_supervisor.ex index 1b0c9bb5c5..218958eb8e 100644 --- a/apps/omg_child_chain/lib/omg_child_chain/sync_supervisor.ex +++ b/apps/omg_child_chain/lib/omg_child_chain/sync_supervisor.ex @@ -20,9 +20,11 @@ defmodule OMG.ChildChain.SyncSupervisor do use Supervisor use OMG.Utils.LoggerExt + alias OMG.Bus alias OMG.ChildChain.BlockQueue alias OMG.ChildChain.BlockQueue.Balance alias OMG.ChildChain.BlockQueue.GasAnalyzer + alias OMG.ChildChain.BlockQueue.SubmissionMonitor alias OMG.ChildChain.ChildManager alias OMG.ChildChain.Configuration alias OMG.ChildChain.CoordinatorSetup @@ -31,6 +33,7 @@ defmodule OMG.ChildChain.SyncSupervisor do alias OMG.EthereumEventListener alias OMG.RootChainCoordinator alias OMG.State + alias OMG.Status.Alert.Alarm @events_bucket :events_bucket def start_link(args) do @@ -64,6 +67,7 @@ defmodule OMG.ChildChain.SyncSupervisor do submission_finality_margin = Configuration.submission_finality_margin() block_submit_every_nth = Configuration.block_submit_every_nth() block_submit_max_gas_price = Configuration.block_submit_max_gas_price() + block_submit_stall_threshold_blocks = OMG.ChildChain.Configuration.block_submit_stall_threshold_blocks() ethereum_events_check_interval_ms = OMG.Configuration.ethereum_events_check_interval_ms() coordinator_eth_height_check_interval_ms = OMG.Configuration.coordinator_eth_height_check_interval_ms() deposit_finality_margin = OMG.Configuration.deposit_finality_margin() @@ -84,6 +88,13 @@ defmodule OMG.ChildChain.SyncSupervisor do block_submit_max_gas_price: block_submit_max_gas_price, child_block_interval: child_block_interval ]}, + {SubmissionMonitor, + [ + check_interval_ms: ethereum_events_check_interval_ms, + stall_threshold_blocks: block_submit_stall_threshold_blocks, + event_bus_module: Bus, + alarm_module: Alarm + ]}, {RootChainCoordinator, CoordinatorSetup.coordinator_setup( metrics_collection_interval, diff --git a/apps/omg_child_chain/test/omg_child_chain/block_queue/submission_monitor_test.exs b/apps/omg_child_chain/test/omg_child_chain/block_queue/submission_monitor_test.exs new file mode 100644 index 0000000000..52764f43ed --- /dev/null +++ b/apps/omg_child_chain/test/omg_child_chain/block_queue/submission_monitor_test.exs @@ -0,0 +1,192 @@ +# Copyright 2019-2020 OmiseGO Pte Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule OMG.ChildChain.BlockQueue.SubmissionMonitorTest do + use ExUnit.Case, async: true + import ExUnit.CaptureLog, only: [capture_log: 1] + alias OMG.ChildChain.BlockQueue.SubmissionMonitor + + setup_all do + {:ok, apps} = Application.ensure_all_started(:omg_status) + + on_exit(fn -> + apps |> Enum.reverse() |> Enum.each(&Application.stop/1) + end) + + :ok + end + + setup do + {:ok, alarm} = __MODULE__.Alarm.start(self()) + stall_threshold_blocks = 10 + check_interval_ms = 10 + + {:ok, monitor} = + SubmissionMonitor.start_link( + alarm_module: __MODULE__.Alarm, + event_bus_module: __MODULE__.BusMock, + stall_threshold_blocks: stall_threshold_blocks, + check_interval_ms: check_interval_ms + ) + + :ok = + on_exit(fn -> + _ = Process.exit(alarm, :test_cleanup) + _ = Process.exit(monitor, :test_cleanup) + _ = Process.sleep(10) + end) + + {:ok, + %{ + alarm: alarm, + monitor: monitor, + stall_threshold_blocks: stall_threshold_blocks, + check_interval_ms: check_interval_ms + }} + end + + test "does not raise :block_submit_stalled alarm when block is below stall threshold", context do + capture_log(fn -> + # Inform the monitor of a pending block + _ = send(context.monitor, {:internal_event_bus, :block_submitting, 1000}) + + # Push the height just below the stalling height + _ = send(context.monitor, {:internal_event_bus, :ethereum_new_height, context.stall_threshold_blocks - 1}) + + # Wait for 10x the check interval to make sure it really does not get raised again. + refute_receive(:got_raise_alarm, context.check_interval_ms * 10) + end) + end + + test "raises :block_submit_stalled alarm when blocks is at stall threshold", context do + capture_log(fn -> + # Inform the monitor of a pending block + _ = send(context.monitor, {:internal_event_bus, :block_submitting, 1000}) + + # Push the height to the stalling height + _ = send(context.monitor, {:internal_event_bus, :ethereum_new_height, context.stall_threshold_blocks}) + + assert_receive(:got_raise_alarm) + end) + end + + test "raises :block_submit_stalled alarm when blocks is above stall threshold", context do + capture_log(fn -> + # Inform the monitor of a pending block + _ = send(context.monitor, {:internal_event_bus, :block_submitting, 1000}) + + # Push the height pass the stalling height + _ = send(context.monitor, {:internal_event_bus, :ethereum_new_height, context.stall_threshold_blocks + 1}) + + assert_receive(:got_raise_alarm) + end) + end + + test "does not raise :block_submit_stalled alarm when it is already raised", context do + # Set the monitor in a raised state + :sys.replace_state(context.monitor, fn state -> %{state | alarm_raised: true} end) + + capture_log(fn -> + # Inform the monitor of a pending block + _ = send(context.monitor, {:internal_event_bus, :block_submitting, 1000}) + + # Push the height pass the stalling height + _ = send(context.monitor, {:internal_event_bus, :ethereum_new_height, context.stall_threshold_blocks}) + + # Wait for 10x the check interval to make sure it really does not get raised again. + refute_receive(:got_raise_alarm, context.check_interval_ms * 10) + end) + end + + test "clears :block_submit_stalled alarm when the stalled block no longer stalls", context do + # Set the monitor in a raised state + :sys.replace_state(context.monitor, fn state -> %{state | alarm_raised: true} end) + + capture_log(fn -> + # Inform the monitor of a pending block + _ = send(context.monitor, {:internal_event_bus, :block_submitting, 1000}) + + # Push the height pass the stalling height + _ = send(context.monitor, {:internal_event_bus, :ethereum_new_height, context.stall_threshold_blocks}) + + # Now we inform the monitor that block #1000 is submitted + _ = send(context.monitor, {:internal_event_bus, :block_submitted, 1000}) + + # Expecting the alarm to be cleared + assert_receive(:got_clear_alarm) + end) + end + + test "does not clear :block_submit_stalled alarm when some but not all stalled blocks got submitted", context do + # Set the monitor in a raised state + :sys.replace_state(context.monitor, fn state -> %{state | alarm_raised: true} end) + + capture_log(fn -> + # Inform the monitor of two pending blocks + _ = send(context.monitor, {:internal_event_bus, :block_submitting, 1000}) + _ = send(context.monitor, {:internal_event_bus, :block_submitting, 2000}) + + # Push the height pass the stalling height + _ = send(context.monitor, {:internal_event_bus, :ethereum_new_height, context.stall_threshold_blocks}) + + # Now we inform the monitor that block #1000 is submitted, leaving #2000 still stalled + _ = send(context.monitor, {:internal_event_bus, :block_submitted, 1000}) + + # Because #2000 is still stalled, the alarm must not be cleared. + # Wait for 10x the check interval to make sure it really does not get cleared. + refute_receive(:got_clear_alarm, context.check_interval_ms * 10) + end) + end + + defmodule Alarm do + @moduledoc """ + Mocks `OMG.Status.Alert.Alarm` so we can observe it for test assertions. + """ + use GenServer + + def start(listener) do + GenServer.start(__MODULE__, [listener], name: __MODULE__) + end + + def init([listener]) do + {:ok, %{listener: listener}} + end + + def block_submit_stalled(reporter) do + {:block_submit_stalled, %{node: Node.self(), reporter: reporter}} + end + + def set({:block_submit_stalled, _details}) do + GenServer.call(__MODULE__, :got_raise_alarm) + end + + def clear({:block_submit_stalled, _details}) do + GenServer.call(__MODULE__, :got_clear_alarm) + end + + def handle_call(:got_raise_alarm, _, state) do + {:reply, send(state.listener, :got_raise_alarm), state} + end + + def handle_call(:got_clear_alarm, _, state) do + {:reply, send(state.listener, :got_clear_alarm), state} + end + end + + defmodule BusMock do + def subscribe(_, _) do + :ok + end + end +end diff --git a/apps/omg_eth/lib/omg_eth/configuration.ex b/apps/omg_eth/lib/omg_eth/configuration.ex index 98a33f9c88..84c30d5a95 100644 --- a/apps/omg_eth/lib/omg_eth/configuration.ex +++ b/apps/omg_eth/lib/omg_eth/configuration.ex @@ -66,10 +66,12 @@ defmodule OMG.Eth.Configuration do Application.fetch_env!(@app, :eth_node) end + @spec ethereum_events_check_interval_ms() :: pos_integer | no_return def ethereum_events_check_interval_ms() do Application.fetch_env!(@app, :ethereum_events_check_interval_ms) end + @spec ethereum_stalled_sync_threshold_ms() :: pos_integer | no_return def ethereum_stalled_sync_threshold_ms() do Application.fetch_env!(@app, :ethereum_stalled_sync_threshold_ms) end diff --git a/apps/omg_status/lib/omg_status/alert/alarm.ex b/apps/omg_status/lib/omg_status/alert/alarm.ex index 7ac9cddae3..dbf34a5db8 100644 --- a/apps/omg_status/lib/omg_status/alert/alarm.ex +++ b/apps/omg_status/lib/omg_status/alert/alarm.ex @@ -33,7 +33,8 @@ defmodule OMG.Status.Alert.Alarm do | :invalid_fee_source | :statsd_client_connection | :main_supervisor_halted - | :system_memory_too_high, alarm_detail} + | :system_memory_too_high + | :block_submit_stalled, alarm_detail} def alarm_types(), do: [ @@ -43,7 +44,8 @@ defmodule OMG.Status.Alert.Alarm do :invalid_fee_source, :statsd_client_connection, :main_supervisor_halted, - :system_memory_too_high + :system_memory_too_high, + :block_submit_stalled ] @spec statsd_client_connection(module()) :: {:statsd_client_connection, alarm_detail} @@ -74,6 +76,10 @@ defmodule OMG.Status.Alert.Alarm do def system_memory_too_high(reporter), do: {:system_memory_too_high, %{node: Node.self(), reporter: reporter}} + @spec block_submit_stalled(module()) :: {:block_submit_stalled, alarm_detail} + def block_submit_stalled(reporter), + do: {:block_submit_stalled, %{node: Node.self(), reporter: reporter}} + @spec set(alarms()) :: :ok | :duplicate def set(alarm), do: do_raise(alarm) diff --git a/apps/omg_status/lib/omg_status/metric/event.ex b/apps/omg_status/lib/omg_status/metric/event.ex index 5f956a0ae0..84f632845c 100644 --- a/apps/omg_status/lib/omg_status/metric/event.ex +++ b/apps/omg_status/lib/omg_status/metric/event.ex @@ -59,10 +59,35 @@ defmodule OMG.Status.Metric.Event do """ def name(:block_transactions), do: "block_transactions" + @doc """ + Childchain Block submission attempted + """ + def name(:block_submission_attempt), do: "block_submission_attempt" + + @doc """ + Childchain Block successfully submitted + """ + def name(:block_submission_success), do: "block_submission_success" + @doc """ Child Chain Block queue gas usage metric """ - def name(:block_submission), do: "block_submission_gas" + def name(:block_submission_gas), do: "block_submission_gas" + + @doc """ + Child Chain BlockQueue's blknum of the block being submitted + """ + def name(:block_queue_blknum_submitting), do: "block_queue_blknum_submitting" + + @doc """ + Child Chain BlockQueue's blknum of the block submitted + """ + def name(:block_queue_blknum_submitted), do: "block_queue_blknum_submitted" + + @doc """ + Child Chain BlockQueue's number of blocks currently being submitted and stalled + """ + def name(:block_queue_num_blocks_stalled), do: "block_queue_num_blocks_stalled" @doc """ Child Chain authority address balance diff --git a/config/config.exs b/config/config.exs index 25030c6cd1..6bdcf92d7e 100644 --- a/config/config.exs +++ b/config/config.exs @@ -46,6 +46,7 @@ config :omg_child_chain, block_queue_eth_height_check_interval_ms: 6_000, block_submit_every_nth: 1, block_submit_max_gas_price: 20_000_000_000, + block_submit_stall_threshold_blocks: 4, metrics_collection_interval: 60_000, fee_adapter_check_interval_ms: 10_000, fee_buffer_duration_ms: 30_000, diff --git a/config/releases.exs b/config/releases.exs index 412baea707..d309b4fb88 100644 --- a/config/releases.exs +++ b/config/releases.exs @@ -5,6 +5,9 @@ import Config # # See https://hexdocs.pm/mix/1.9.0/Mix.Tasks.Release.html#module-runtime-configuration +config :omg_child_chain, + block_submit_stall_threshold_blocks: String.to_integer(System.get_env("BLOCK_SUBMIT_STALL_THRESHOLD_BLOCKS") || "4") + config :omg_watcher_info, OMG.WatcherInfo.DB.Repo, # Have at most `:pool_size` DB connections on standby and serving DB queries. pool_size: String.to_integer(System.get_env("WATCHER_INFO_DB_POOL_SIZE") || "10"), diff --git a/docker-compose.yml b/docker-compose.yml index f99ea73713..c85a3b97b2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,7 +41,6 @@ services: --datadir data/ \ --syncmode 'full' \ --networkid 1337 \ - --gasprice '1' \ --keystore=./data/geth/keystore/ \ --password /tmp/geth-blank-password \ --unlock "0,1" \ diff --git a/docs/deployment_configuration.md b/docs/deployment_configuration.md index 593ca68588..4a40bdfbb4 100644 --- a/docs/deployment_configuration.md +++ b/docs/deployment_configuration.md @@ -22,7 +22,8 @@ ***Child Chain only*** -- "BLOCK_SUBMIT_MAX_GAS_PRICE" - The maximum gas price to use for block submission. The first block submission after application boot will use the max price. The gas price gradually adjusts on subsequent blocks to reach the current optimum price . Defaults to `20000000000` (20 Gwei). +- "BLOCK_SUBMIT_MAX_GAS_PRICE" - The maximum gas price to use for block submission. The first block submission after application boot will use the max price. The gas price gradually adjusts on subsequent blocks to reach the current optimum price. Defaults to `20000000000` (20 Gwei). +- "BLOCK_SUBMIT_STALL_THRESHOLD_BLOCKS" - The number of root chain blocks passed until a child chain block pending submission is considered stalled. Defaults to `4` root chain blocks. - "FEE_ADAPTER" - The adapter to use to populate the fee specs. Either `file` or `feed` (case-insensitive). Defaults to `file` with an empty fee specs. - "FEE_CLAIMER_ADDRESS" - 20-bytes HEX-encoded string of Ethereum address of Fee Claimer. - "FEE_BUFFER_DURATION_MS" - Buffer period during which a fee is still valid after being updated.