Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add block processing queue to watcher info #1560

Merged
merged 27 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/omg_db/test/omg_db/models/payment_exit_info_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule OMG.DB.PaymentExitInfoTest do

describe "exit_infos" do
test "should return empty list if given empty list of positions", %{db_dir: _dir, db_pid: pid} do
db_writes = create_write(:exit_info, pid)
_db_writes = create_write(:exit_info, pid)

{:ok, exits} = PaymentExitInfo.exit_infos([], pid)

Expand All @@ -52,7 +52,7 @@ defmodule OMG.DB.PaymentExitInfoTest do
db_writes = create_write(:exit_info, pid)
sliced_db_writes = Enum.slice(db_writes, test_range)

utxo_pos_list = Enum.map(sliced_db_writes, fn {utxo_pos, _} = write -> utxo_pos end)
utxo_pos_list = Enum.map(sliced_db_writes, fn {utxo_pos, _} = _write -> utxo_pos end)

{:ok, exits} = PaymentExitInfo.exit_infos(utxo_pos_list, pid)

Expand Down
5 changes: 5 additions & 0 deletions apps/omg_status/lib/omg_status/metric/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ defmodule OMG.Status.Metric.Event do
"""
def name(:db_message_queue_len), do: "db_message_queue_len"

@doc """
OMG.WatcherInfo.DB.PendingBlock queue length
"""
def name(:pending_block_queue_length), do: "pending_block_queue_length"

@doc """
OMG.DB KV layer has three types of actions: write, read, multiread
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule OMG.Status.Monitor.MemoryMonitorTest do
assert_receive :got_clear_alarm
end

test "works with :buffered_memory and :cached_memory values are not provided", context do
test "works with :buffered_memory and :cached_memory values are not provided" do
set_memsup(total_memory: 1000, free_memory: 100)
assert_receive :got_raise_alarm
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ defmodule OMG.Watcher.Integration.TransactionSubmitTest do
"txindex" => tx_index
} = WatcherHelper.success?("transaction.submit_typed", typed_data_signed)

IntegrationTest.wait_for_block_fetch(tx_blknum, @timeout)
IntegrationTest.wait_for_block_inserted_in_db(tx_blknum, @timeout)

assert [
%{
Expand Down
22 changes: 19 additions & 3 deletions apps/omg_watcher/test/support/integration/test_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ defmodule OMG.Watcher.Integration.TestHelper do
|> WaitFor.ok(timeout)
end

def wait_for_block_fetch(block_nr, timeout) do
def wait_for_block_fetch(block_number, timeout) do
# TODO query to State used in tests instead of an event system, remove when event system is here
fn ->
if State.get_status() |> elem(0) <= block_nr,
if elem(State.get_status(), 0) <= block_number,
do: :repeat,
else: {:ok, block_nr}
else: {:ok, block_number}
end
|> WaitFor.ok(timeout)

Expand All @@ -51,6 +51,22 @@ defmodule OMG.Watcher.Integration.TestHelper do
Process.sleep(100)
end

@doc """
The above wait_for_block_fetch/2 function only waits for a block to appear in the
state and add some "random" sleep to give the database time to process and write the block.
This function will instead poll for block.get until found (or timeout).
"""
def wait_for_block_inserted_in_db(block_number, timeout) do
func = fn ->
case WatcherHelper.get_block(block_number) do
{:error, _} -> :repeat
{:ok, %{"blknum" => ^block_number}} -> {:ok, block_number}
end
end

WaitFor.ok(func, timeout)
end

@doc """
We need to wait on both a margin of eth blocks and exit processing
"""
Expand Down
9 changes: 9 additions & 0 deletions apps/omg_watcher/test/support/watcher_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ defmodule Support.WatcherHelper do
success?("/account.get_balance", %{"address" => Encoding.to_hex(address)})
end

def get_block(blknum) do
response_body = rpc_call("block.get", %{blknum: blknum}, 200)

case response_body do
%{"success" => false, "data" => error} -> {:error, error}
%{"success" => true, "data" => block} -> {:ok, block}
end
end

def get_exit_data(blknum, txindex, oindex),
do: get_exit_data(Utxo.Position.encode(Utxo.position(blknum, txindex, oindex)))

Expand Down
13 changes: 13 additions & 0 deletions apps/omg_watcher_info/lib/omg_watcher_info/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,17 @@ defmodule OMG.WatcherInfo.Application do

Supervisor.start_link(children, opts)
end

def start_phase(:attach_telemetry, :normal, _phase_args) do
handlers = [
["measure-watcher-info", OMG.WatcherInfo.Measure.supported_events(), &OMG.WatcherInfo.Measure.handle_event/4, nil]
]

Enum.each(handlers, fn handler ->
case apply(:telemetry, :attach_many, handler) do
:ok -> :ok
{:error, :already_exists} -> :ok
end
end)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,51 @@

defmodule OMG.WatcherInfo.BlockApplicationConsumer do
@moduledoc """
Subscribes for new blocks and inserts them to WatcherInfo.DB.
Subscribes for new blocks and inserts them into a pending queue of blocks waiting to be processed.
"""
alias OMG.WatcherInfo.DB
require Logger

### Client

def start_link(_args) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
use GenServer

### Server
alias OMG.WatcherInfo.DB

use GenServer
@default_bus_module OMG.Bus

def init(:ok) do
:ok = OMG.Bus.subscribe({:child_chain, "block.get"}, link: true)
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: Keyword.get(args, :name, __MODULE__))
end

def init(args) do
_ = Logger.info("Started #{inspect(__MODULE__)}")

bus_module = Keyword.get(args, :bus_module, @default_bus_module)
:ok = bus_module.subscribe({:child_chain, "block.get"}, link: true)

{:ok, %{}}
end

# Listens for blocks and insert them to the WatcherDB.
def handle_info({:internal_event_bus, :block_received, block_application}, state) do
_ =
{:ok, _} =
block_application
|> to_mined_block()
|> DB.Block.insert_with_transactions()
|> to_pending_block()
|> DB.PendingBlock.insert()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PendingBlock is somewhat different naming then I would expect. the world queue is not used anywhere :)))


{:noreply, state}
end

defp to_mined_block(%{} = block) do
%{
defp to_pending_block(%{} = block) do
data = %{
eth_height: block.eth_height,
blknum: block.number,
blkhash: block.hash,
timestamp: block.timestamp,
transactions: block.transactions
}

%{
data: :erlang.term_to_binary(data),
blknum: block.number
}
end
end
28 changes: 28 additions & 0 deletions apps/omg_watcher_info/lib/omg_watcher_info/configuration.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2019-2020 OmiseGO Pte Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule OMG.WatcherInfo.Configuration do
@moduledoc """
Provides access to applications configuration
"""
@app :omg_watcher_info

def pending_block_processing_interval() do
Application.fetch_env!(@app, :pending_block_processing_interval)
end

def block_queue_check_interval() do
Application.fetch_env!(@app, :block_queue_check_interval)
end
end
23 changes: 14 additions & 9 deletions apps/omg_watcher_info/lib/omg_watcher_info/db/block.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule OMG.WatcherInfo.DB.Block do
alias OMG.Utils.Paginator
alias OMG.WatcherInfo.DB
alias OMG.WatcherInfo.DB.Block.Chunk
alias OMG.WatcherInfo.DB.PendingBlock

import Ecto.Query, only: [from: 2]

Expand Down Expand Up @@ -142,16 +143,19 @@ defmodule OMG.WatcherInfo.DB.Block do
end

@doc """
Inserts complete and sorted enumerable of transactions for particular block number
Takes a pending block, decode its data and inserts its content in the database.
"""
@spec insert_with_transactions(mined_block()) :: {:ok, %__MODULE__{}}
def insert_with_transactions(%{
transactions: transactions,
blknum: block_number,
blkhash: blkhash,
timestamp: timestamp,
eth_height: eth_height
}) do
# sobelow_skip ["Misc.BinToTerm"]
mederic-p marked this conversation as resolved.
Show resolved Hide resolved
@spec insert_from_pending_block(PendingBlock.t()) :: {:ok, %__MODULE__{}} | {:error, any()}
def insert_from_pending_block(pending_block) do
%{
transactions: transactions,
blknum: block_number,
blkhash: blkhash,
timestamp: timestamp,
eth_height: eth_height
} = :erlang.binary_to_term(pending_block.data)

{db_txs, db_outputs, db_inputs} = prepare_db_transactions(transactions, block_number)

current_block = %{
Expand All @@ -170,6 +174,7 @@ defmodule OMG.WatcherInfo.DB.Block do
|> prepare_inserts(db_txs_stream, "db_txs_", DB.Transaction)
|> prepare_inserts(db_outputs_stream, "db_outputs_", DB.TxOutput)
|> DB.TxOutput.spend_utxos(db_inputs)
|> Ecto.Multi.delete("pending_block", pending_block, [])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:mindblown: Interesting approach


{insert_duration, result} = :timer.tc(DB.Repo, :transaction, [multi])

Expand Down
66 changes: 66 additions & 0 deletions apps/omg_watcher_info/lib/omg_watcher_info/db/pending_block.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2019-2020 OmiseGO Pte Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule OMG.WatcherInfo.DB.PendingBlock do
@moduledoc """
Ecto schema for pending block data
These are valid block data received by the internal bus that should be stored in the database.
This intermediate table is needed as the messages received by the bus are not persisted
and if for some reason the writing to the database fails, we would lose these data.
"""
use Ecto.Schema
use OMG.Utils.LoggerExt

import Ecto.Changeset
import Ecto.Query, only: [from: 2]

alias OMG.WatcherInfo.DB.Repo

@type t() :: %{
blknum: pos_integer(),
data: binary()
}

@primary_key {:blknum, :integer, []}

schema "pending_blocks" do
field(:data, :binary)

timestamps(type: :utc_datetime_usec)
end

@spec insert(map()) :: {:ok, %__MODULE__{}} | {:error, Ecto.Changeset.t()}
def insert(params) do
params
|> insert_changeset()
|> Repo.insert()
end

@spec get_next_to_process() :: nil | %__MODULE__{}
def get_next_to_process() do
(pending_block in __MODULE__)
|> from(order_by: :blknum, limit: 1)
|> Repo.one()
end

@spec get_count() :: non_neg_integer()
def get_count(), do: Repo.aggregate(__MODULE__, :count)

defp insert_changeset(params) do
%__MODULE__{}
|> cast(params, [:blknum, :data])
|> validate_required([:blknum, :data])
|> unique_constraint(:blknum, name: :pending_blocks_pkey)
end
end
33 changes: 33 additions & 0 deletions apps/omg_watcher_info/lib/omg_watcher_info/measure.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2019-2020 OmiseGO Pte Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule OMG.WatcherInfo.Measure do
@moduledoc """
Counting business metrics sent to Datadog.
"""
import OMG.Status.Metric.Event, only: [name: 1]

alias OMG.Status.Metric.Datadog
alias OMG.WatcherInfo.PendingBlockQueueLengthChecker

@supported_events [
[:pending_block_queue_length, PendingBlockQueueLengthChecker]
]

def supported_events(), do: @supported_events

def handle_event([:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: length}, _state, _config) do
_ = Datadog.gauge(name(:pending_block_queue_length), length)
end
end
Loading