Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: block queue metrics and stalled submission alarm #1649

Merged
merged 42 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b057f25
feat: change block formation log from debug to info
Jul 16, 2020
f637985
feat: BlockQueue publishes :block_submitting and :block_submitted events
Jul 16, 2020
5c771dd
feat: BlockQueue.Monitor that raises/clears :block_submission_stalled…
Jul 16, 2020
b8a8c81
refactor: tracks root chain height directly from the bus
Jul 16, 2020
17c76c4
refactor: revert variable naming
Jul 16, 2020
a55a7e5
fix: remove remaining EtherereumHeight.get()
Jul 16, 2020
ec310b3
fix: typespec
Jul 16, 2020
fb97563
feat: add telemetry and statsd metric for blknum_submitting and blknu…
Jul 16, 2020
244f608
feat: add telemetry and statsd metric for blocks_submitting and block…
Jul 16, 2020
782c3e3
fix: rename :block_submission to :block_submission_gas
Jul 16, 2020
de16c8e
feat: add statsd metric for :block_submission_success
Jul 16, 2020
c5da3ea
fix: remove non services
Jul 16, 2020
c4e437f
docs: clearer explanation of block queue metrics
Jul 16, 2020
b87a712
docs: more explanation on event publishing
Jul 16, 2020
03b983b
feat: add :block_submission_attempt statsd metric
Jul 16, 2020
8773021
fix: dialyzer
Jul 16, 2020
0f127a8
feat: block submission stall config and supervisor setup
Jul 16, 2020
04a5754
format: formatting and naming alignment
Jul 17, 2020
d1594d3
test: add BlockQueue.Monitor test
Jul 17, 2020
0916252
refactor: shorten config name
Jul 17, 2020
44c5ebc
test: fix tests
Jul 17, 2020
1771b48
Merge branch 'master' into unnawut/block-submit-stall-monitor
Jul 17, 2020
14c6f37
fix: broken service startup
Jul 17, 2020
d4079fc
feat: configurable block submit stall from env var
Jul 17, 2020
047bd98
fix: move config namespace
Jul 17, 2020
d8d3bf3
refactor: brackets everywhere
Jul 17, 2020
69fe633
fix: config naming
Jul 17, 2020
2a20535
fix: wrong namespace
Jul 17, 2020
79b2b22
test: fix app name
Jul 17, 2020
e452b96
test: fix init
Jul 17, 2020
7ce931d
refactor: use :block_submit_stalled everywhere
Jul 17, 2020
91f4d4b
fix: ignore unrelated events
Jul 17, 2020
bd4b2c1
fix: telemetry deprecation
Jul 17, 2020
aaacf5c
fix: remove misleading metric :block_queue_num_blocks_submitting due …
Jul 17, 2020
5faa0a2
fix: remove dup geth argument
Jul 17, 2020
365d029
fix: remove unused block_submit_stall_check_interval_ms
Jul 17, 2020
0a3ae70
Merge branch 'master' into unnawut/block-submit-stall-monitor
Aug 17, 2020
d6543e5
refactor: add alarm function naming
Aug 17, 2020
d676020
refactor: reduce alarm function pattern matching cases
Aug 17, 2020
02a541f
refactor: move release task to releases.exs
Aug 25, 2020
1fb5f4d
fix: remove obsolete tests
Aug 25, 2020
b0f8748
Merge branch 'master' into unnawut/block-submit-stall-monitor
unnawut Aug 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/omg/lib/omg/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ defmodule OMG.State do
- pushes the new block to subscribers of `"blocks"` internal event bus topic
"""
def handle_cast(:form_block, state) do
_ = Logger.debug("Forming new block...")
_ = Logger.info("Forming new block...")
unnawut marked this conversation as resolved.
Show resolved Hide resolved
state = Core.claim_fees(state)
{:ok, {%Block{number: blknum} = block, db_updates}, new_state} = Core.form_block(state)
_ = Logger.debug("Formed new block ##{blknum}")
_ = Logger.info("Formed new block ##{blknum}")

# persistence is required to be here, since propagating the block onwards requires restartability including the
# new block
Expand Down
35 changes: 35 additions & 0 deletions apps/omg_child_chain/lib/omg_child_chain/block_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ defmodule OMG.ChildChain.BlockQueue do
{:noreply, state1}
end

@doc false
# Ignore unrelated events
def handle_info({:internal_event_bus, :block_submitting, _}, state), do: {:noreply, state}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda curious, all OMG.Bus subscriber need to implement listener for all topics? Otherwise why we need to implement a noreply?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sharing the same "blocks" topic in :ok = OMG.Bus.subscribe({:child_chain, "blocks"}, link: true).

We can make the topic more granular but it opens a different can of worms, i.e. setting a new convention, to include in this PR 😢

def handle_info({:internal_event_bus, :block_submitted, _}, state), do: {:noreply, state}

# private (server)

@spec submit_blocks(Core.t()) :: :ok
Expand All @@ -205,6 +210,7 @@ defmodule OMG.ChildChain.BlockQueue do

defp submit(submission) do
_ = Logger.info("Submitting: #{inspect(submission)}")
_ = publish_block_submitting_event(submission.num)

submit_result = Eth.submit_block(submission.hash, submission.nonce, submission.gas_price)
newest_mined_blknum = RootChain.get_mined_child_block()
Expand All @@ -218,6 +224,7 @@ defmodule OMG.ChildChain.BlockQueue do
error

{:ok, txhash} ->
_ = publish_block_submitted_event(submission.num)
_ = GasAnalyzer.enqueue(txhash)
_ = Balance.check()
:ok
Expand All @@ -229,6 +236,34 @@ defmodule OMG.ChildChain.BlockQueue do
:ok = final_result
end

# Publishes each time a block is being submitted, regardless of failing or being successful.
# This differs from `:enqueue_block` which publishes only once when the block is formed.
# For example, when a block is re-submitted 3 times before it got accepted, there would be
# 1 x `:enqueue_block` and 3 x `:block_submitting` events published.
#
# The telemetry event is emitted for raw metrics propagation. The bus event is published so
# a consumer, potentially a GenServer or other processes can perform extra operations on the data.
defp publish_block_submitting_event(blknum) do
_ = :telemetry.execute([:blknum_submitting, __MODULE__], %{blknum: blknum})

{:child_chain, "blocks"}
|> OMG.Bus.Event.new(:block_submitting, blknum)
|> OMG.Bus.direct_local_broadcast()
end

# Publishes when a block is successfully submitted. Only 1 `block_submitted` event will ever
# be published for each block.
#
# The telemetry event is emitted for raw metrics propagation. The bus event is published so
# a consumer, potentially a GenServer or other processes can perform extra operations on the data.
defp publish_block_submitted_event(blknum) do
_ = :telemetry.execute([:blknum_submitted, __MODULE__], %{blknum: blknum})

{:child_chain, "blocks"}
|> OMG.Bus.Event.new(:block_submitted, blknum)
|> OMG.Bus.direct_local_broadcast()
end

defp log_init_error(fields) do
fields = Keyword.update!(fields, :known_hashes, fn hashes -> Enum.map(hashes, &Encoding.to_hex/1) end)

Expand Down
25 changes: 23 additions & 2 deletions apps/omg_child_chain/lib/omg_child_chain/block_queue/measure.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@ defmodule OMG.ChildChain.BlockQueue.Measure do
alias OMG.ChildChain.BlockQueue
alias OMG.ChildChain.BlockQueue.Balance
alias OMG.ChildChain.BlockQueue.GasAnalyzer
alias OMG.ChildChain.BlockQueue.SubmissionMonitor
alias OMG.Status.Metric.Datadog

@supported_events [
[:process, BlockQueue],
# Events triggered when a block is submitting or submitted
[:blknum_submitting, BlockQueue],
[:blknum_submitted, BlockQueue],
# Events providing collections of blocks being submitted or stalled
[:blocks_submitting, SubmissionMonitor],
[:blocks_stalled, SubmissionMonitor],
[:gas, GasAnalyzer],
[:authority_balance, Balance]
]
def supported_events(), do: @supported_events

def handle_event([:process, BlockQueue], _, _state, _config) do
def handle_event([:process, BlockQueue], _measurements, _metadata, _config) do
value =
self()
|> Process.info(:message_queue_len)
Expand All @@ -47,11 +54,25 @@ defmodule OMG.ChildChain.BlockQueue.Measure do

def handle_event([:gas, GasAnalyzer], %{gas: gas}, _, _config) do
gwei = div(gas, 1_000_000_000)
_ = Datadog.gauge(name(:block_submission), gwei)
_ = Datadog.gauge(name(:block_submission_gas), gwei)
end

def handle_event([:authority_balance, Balance], %{authority_balance: authority_balance}, _, _config) do
gwei = div(authority_balance, 1_000_000_000)
_ = Datadog.gauge(name(:authority_balance), gwei)
end

def handle_event([:blknum_submitting, BlockQueue], %{blknum: blknum}, _, _config) do
_ = Datadog.gauge(name(:block_queue_blknum_submitting), blknum)
_ = Datadog.increment(name(:block_submission_attempt), 1)
end

def handle_event([:blknum_submitted, BlockQueue], %{blknum: blknum}, _, _config) do
_ = Datadog.gauge(name(:block_queue_blknum_submitted), blknum)
_ = Datadog.increment(name(:block_submission_success), 1)
end

def handle_event([:blocks_stalled, SubmissionMonitor], %{blocks: blocks}, _, _config) do
_ = Datadog.gauge(name(:block_queue_num_blocks_stalled), length(blocks))
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2019-2020 OmiseGO Pte Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule OMG.ChildChain.BlockQueue.Monitor.AlarmHandler do
@moduledoc """
Listens for :block_submit_stalled alarms and reflect the alarm's state back to the monitor.
"""
require Logger

# The alarm reporter and monitor happen to be the same module here because we are just
# reflecting the alarm's state back to the reporter.
@reporter OMG.ChildChain.BlockQueue.Monitor
@monitor OMG.ChildChain.BlockQueue.Monitor

def init(_args) do
{:ok, %{}}
end

def handle_call(_request, state), do: {:ok, :ok, state}

def handle_event({:set_alarm, {:block_submit_stalled, %{reporter: @reporter}}}, state) do
_ = Logger.warn(":block_submit_stalled alarm raised.")
:ok = GenServer.cast(@monitor, {:set_alarm, :block_submit_stalled})
{:ok, state}
end

def handle_event({:clear_alarm, {:block_submit_stalled, %{reporter: @reporter}}}, state) do
_ = Logger.warn(":block_submit_stalled alarm cleared.")
:ok = GenServer.cast(@monitor, {:clear_alarm, :block_submit_stalled})
{:ok, state}
end

def handle_event(event, state) do
_ = Logger.info("#{__MODULE__} got event: #{inspect(event)}. Ignoring.")
{:ok, state}
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Copyright 2019-2020 OmiseGO Pte Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule OMG.ChildChain.BlockQueue.SubmissionMonitor do
@moduledoc """
Listens to block events and raises :block_submit_stalled alarm when a pending block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so essentially what you want to figure out is:
the difference between blocks mined minus blocks formed, correct?

If yes, why are you storing pending blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly to track {blknum, root_chain_height}. There was a lot of false alarms when I tracked just the mined - formed because of the large swing in block time. And it's a large swing all the time, not just once in a while, so the alarm is barely useful with an alarm every few minutes.

Some supporting info. Below is rootchain blocktime over a single day. You can see the spikes throughout the day, multiple times each hour:

blocktime

And in the diagram below, if we use the average blocktime of 12.5 - 15s as usual, only 6% (369/6401) of all blocks in a day will fall in the range.

Histogram of blocktime

So in my trial it was impossible to stick to mined - formed, because over a single day mined - formed >= 4 could be from 10 seconds to 188 seconds. So tracking the chch blknum against the first root chain height it was submitted at was the most reliable way to track stalled submissions as far as I tried.


Source data: https://docs.google.com/spreadsheets/d/1wzniBoO2nonRo83im0VRcaSPj_pr-ZZXKV-yiOOFVOo/edit

doesn't get successfully submitted within the specified time threshold.
"""
use GenServer
require Logger

defstruct pending_blocks: [],
root_chain_height: 0,
stall_threshold_blocks: 4,
alarm_module: nil,
alarm_raised: false

@typep blknum() :: pos_integer()
@typep pending_block() :: {blknum :: blknum(), first_submit_height :: pos_integer()}

@type t() :: %__MODULE__{
pending_blocks: [pending_block()],
root_chain_height: non_neg_integer(),
stall_threshold_blocks: pos_integer(),
alarm_module: module(),
alarm_raised: boolean()
}

#
# GenServer APIs
#

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end

#
# GenServer behaviors
#

def init(opts) do
_ = Logger.info("Starting #{__MODULE__}")
_ = install_alarm_handler()
event_bus = Keyword.fetch!(opts, :event_bus_module)
check_interval_ms = Keyword.fetch!(opts, :check_interval_ms)

state = %__MODULE__{
pending_blocks: [],
stall_threshold_blocks: Keyword.fetch!(opts, :stall_threshold_blocks),
alarm_module: Keyword.fetch!(opts, :alarm_module),
alarm_raised: false
}

:ok = event_bus.subscribe({:child_chain, "blocks"}, link: true)
:ok = event_bus.subscribe({:root_chain, "ethereum_new_height"}, link: true)

{:ok, _} = :timer.send_interval(check_interval_ms, self(), :check_stall)
{:ok, state}
end

def handle_info(:check_stall, state) do
stalled_blocks =
Enum.filter(state.pending_blocks, fn {_blknum, first_submit_height} ->
state.root_chain_height - first_submit_height >= state.stall_threshold_blocks
end)

_ = :telemetry.execute([:blocks_stalled, __MODULE__], %{blocks: stalled_blocks})
_ = log_stalled_blocks(stalled_blocks, state.root_chain_height)
_ = trigger_alarm(state.alarm_module, state.alarm_raised, stalled_blocks)

{:noreply, state}
end

# Keeps track of the latest root chain height
def handle_info({:internal_event_bus, :ethereum_new_height, new_height}, state) do
{:noreply, %{state | root_chain_height: new_height}}
end

# Listens for a block being submitted and add it to monitoring if it hasn't been tracked
def handle_info({:internal_event_bus, :block_submitting, blknum}, state) do
pending_blocks = add_new_blknum(state.pending_blocks, blknum, state.root_chain_height)
{:noreply, %{state | pending_blocks: pending_blocks}}
end

# Listens for a block that got submitted and drop it from monitoring
def handle_info({:internal_event_bus, :block_submitted, blknum}, state) do
pending_blocks = remove_blknum(state.pending_blocks, blknum)
{:noreply, %{state | pending_blocks: pending_blocks}}
end

# Ignore unrelated events
def handle_info({:internal_event_bus, :enqueue_block, _}, state) do
{:noreply, state}
end

#
# Handle incoming alarms
#
# These functions are called by the AlarmHandler so that this monitor process can update
# its internal state according to the raised alarms.
#
def handle_cast({:set_alarm, :block_submit_stalled}, state) do
{:noreply, %{state | alarm_raised: true}}
end

def handle_cast({:clear_alarm, :block_submit_stalled}, state) do
{:noreply, %{state | alarm_raised: false}}
end

#
# Private functions
#

# Add the blknum to tracking only if it is not already tracked
@spec add_new_blknum([{blknum(), any()}], blknum(), non_neg_integer()) :: [pending_block()]
defp add_new_blknum(pending_blocks, blknum, root_chain_height) do
case Enum.any?(pending_blocks, fn {pending_blknum, _} -> pending_blknum == blknum end) do
true -> pending_blocks
false -> [{blknum, root_chain_height} | pending_blocks]
end
end

@spec remove_blknum([{blknum(), any()}], blknum()) :: [pending_block()]
defp remove_blknum(pending_blocks, blknum) do
Enum.reject(pending_blocks, fn {pending_blknum, _} -> pending_blknum == blknum end)
end

#
# Alarms management
#

defp install_alarm_handler() do
case Enum.member?(:gen_event.which_handlers(:alarm_handler), __MODULE__.AlarmHandler) do
true -> :ok
_ -> :alarm_handler.add_alarm_handler(__MODULE__.AlarmHandler)
end
end

@spec trigger_alarm(alarm :: module(), alarm_raised :: boolean(), stalled_blocks :: [blknum()]) :: :ok
defp trigger_alarm(alarm_module, false, stalled_blocks) when length(stalled_blocks) >= 1 do
alarm_module.set(alarm_module.block_submit_stalled(__MODULE__))
end

defp trigger_alarm(alarm_module, true, []) do
alarm_module.clear(alarm_module.block_submit_stalled(__MODULE__))
end

defp trigger_alarm(_alarm_module, _alarm_raised, _stalled_blocks), do: :ok

#
# Logging
#
defp log_stalled_blocks([], _), do: :ok

defp log_stalled_blocks(stalled_blocks, root_chain_height) do
Logger.warn(
"#{__MODULE__}: Stalled blocks: #{inspect(stalled_blocks)}. " <>
"Current height: #{inspect(root_chain_height)}"
)
end
end
Loading