Skip to content

Commit

Permalink
feat: handle in-flight exits deletions
Browse files Browse the repository at this point in the history
  • Loading branch information
pgebal authored Sep 16, 2020
1 parent 27379ec commit 6227b79
Show file tree
Hide file tree
Showing 19 changed files with 406 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ start-watcher:
rm -f ./_build/${BAREBUILD_ENV}/rel/watcher/var/sys.config || true && \
echo "Init Watcher DBs" && \
_build/${BAREBUILD_ENV}/rel/watcher/bin/watcher eval "OMG.DB.ReleaseTasks.InitKeyValueDB.run()" && \
_build/${BAREBUILD_ENV}/rel/watcher_info/bin/watcher_info eval "OMG.DB.ReleaseTasks.InitKeysWithValues.run()" && \
echo "Run Watcher" && \
. ${OVERRIDING_VARIABLES} && \
PORT=${WATCHER_PORT} _build/${BAREBUILD_ENV}/rel/watcher/bin/watcher $(OVERRIDING_START)
Expand All @@ -449,6 +450,7 @@ start-watcher_info:
rm -f ./_build/${BAREBUILD_ENV}/rel/watcher_info/var/sys.config || true && \
echo "Init Watcher Info DBs" && \
_build/${BAREBUILD_ENV}/rel/watcher_info/bin/watcher_info eval "OMG.DB.ReleaseTasks.InitKeyValueDB.run()" && \
_build/${BAREBUILD_ENV}/rel/watcher_info/bin/watcher_info eval "OMG.DB.ReleaseTasks.InitKeysWithValues.run()" && \
_build/${BAREBUILD_ENV}/rel/watcher_info/bin/watcher_info eval "OMG.WatcherInfo.ReleaseTasks.InitPostgresqlDB.migrate()" && \
_build/${BAREBUILD_ENV}/rel/watcher_info/bin/watcher_info eval "OMG.WatcherInfo.ReleaseTasks.EthereumTasks.run()" && \
echo "Run Watcher Info" && \
Expand Down
2 changes: 2 additions & 0 deletions apps/omg_db/lib/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ defmodule OMG.DB do
:child_top_block_number,
# watcher
:last_block_getter_eth_height,
:last_ife_exit_deleted_eth_height,
# watcher and child chain
:last_depositor_eth_height,
:last_exiter_eth_height,
Expand All @@ -143,6 +144,7 @@ defmodule OMG.DB do
:last_exit_finalizer_eth_height,
:last_exit_challenger_eth_height,
:last_in_flight_exit_processor_eth_height,
:last_ife_exit_deleted_eth_height,
:last_piggyback_processor_eth_height,
:last_competitor_processor_eth_height,
:last_challenges_responds_processor_eth_height,
Expand Down
50 changes: 50 additions & 0 deletions apps/omg_db/lib/omg_db/release_tasks/init_keys_with_values.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2019-2019 OmiseGO Pte Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule OMG.DB.ReleaseTasks.InitKeysWithValues do
@moduledoc """
Sets values for keys stored in RocksDB, if they are not set.
"""
require Logger

@keys_to_values [last_ife_exit_deleted_eth_height: 0]

def run() do
{:ok, _} = Application.ensure_all_started(:logger)

path = Application.get_env(:omg_db, :path)
Application.put_env(:omg_db, :path, path)

case Application.ensure_all_started(:omg_db) do
{:ok, _} ->
Enum.each(@keys_to_values, &set_single_value/1)

{:error, _} ->
_ = Logger.info("DB not initialized yet, no action required")
:ok
end
end

defp set_single_value({key, init_val}) do
case OMG.DB.RocksDB.get_single_value(key) do
:not_found ->
:ok = OMG.DB.RocksDB.multi_update([{:put, key, init_val}])
_ = Logger.info("#{key} not set. Setting it to #{inspect(init_val)}")
:ok

{:ok, _} ->
:ok
end
end
end
2 changes: 1 addition & 1 deletion apps/omg_db/lib/omg_db/rocksdb/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule OMG.DB.RocksDB.Server do
name: GenServer.name()
}
@doc """
Initializes an empty LevelDB instance explicitly, so we can have control over it.
Initializes an empty RocksDB instance explicitly, so we can have control over it.
NOTE: `init` here is to init the GenServer and that assumes that `init_storage` has already been called
"""
@spec init_storage(binary) :: :ok | {:error, atom}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2019-2020 OmiseGO Pte Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule OMG.DB.ReleaseTasks.InitKeysWithValuesTest do
use ExUnit.Case, async: false
alias OMG.DB.ReleaseTasks.InitKeysWithValues
alias OMG.DB.RocksDB
alias OMG.DB.RocksDB.Server

setup do
{:ok, dir} = Briefly.create(directory: true)
:ok = Server.init_storage(dir)
:ok = Application.put_env(:omg_db, :path, dir, persistent: true)
{:ok, started_apps} = Application.ensure_all_started(:omg_db)

on_exit(fn ->
:ok = Application.put_env(:omg_db, :path, nil)
Enum.map(started_apps, fn app -> _ = Application.stop(app) end)
end)

{:ok, %{}}
end

test ":last_ife_exit_deleted_eth_height is set if it wasn't set previously" do
:ok = RocksDB.multi_update([{:delete, :last_ife_exit_deleted_eth_height, 0}])

assert InitKeysWithValues.run() == :ok
assert RocksDB.get_single_value(:last_ife_exit_deleted_eth_height) == {:ok, 0}
end

test "value under :last_ife_exit_deleted_eth_height is not changed if it was already set" do
initial_value = 5
:ok = RocksDB.multi_update([{:put, :last_ife_exit_deleted_eth_height, initial_value}])

assert InitKeysWithValues.run() == :ok
assert RocksDB.get_single_value(:last_ife_exit_deleted_eth_height) == {:ok, initial_value}
end

test "does not fail when omg db is not started" do
:ok = Application.stop(:omg_db)
:ok = Application.put_env(:omg_db, :path, nil)

assert InitKeysWithValues.run() == :ok
end
end
2 changes: 1 addition & 1 deletion apps/omg_eth/lib/omg_eth/root_chain/abi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
defmodule OMG.Eth.RootChain.Abi do
@moduledoc """
Functions that provide ethereum log decoding
Functions that provide ethereum log decoding
"""
alias OMG.Eth.Encoding
alias OMG.Eth.RootChain.AbiEventSelector
Expand Down
13 changes: 13 additions & 0 deletions apps/omg_eth/lib/omg_eth/root_chain/abi_event_selector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ defmodule OMG.Eth.RootChain.AbiEventSelector do
}
end

@spec in_flight_exit_deleted() :: ABI.FunctionSelector.t()
def in_flight_exit_deleted() do
%ABI.FunctionSelector{
function: "InFlightExitDeleted",
input_names: ["exitId"],
inputs_indexed: [true],
method_id: <<25, 145, 196, 195>>,
returns: [],
type: :event,
types: [uint: 160]
}
end

@spec in_flight_exit_challenged() :: ABI.FunctionSelector.t()
def in_flight_exit_challenged() do
%ABI.FunctionSelector{
Expand Down
5 changes: 5 additions & 0 deletions apps/omg_eth/lib/omg_eth/root_chain/fields.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ defmodule OMG.Eth.RootChain.Fields do
reduce_naming(data, contracts_naming)
end

def rename(data, %ABI.FunctionSelector{function: "InFlightExitDeleted"}) do
contracts_naming = [{"exitId", :exit_id}]
reduce_naming(data, contracts_naming)
end

def rename(data, %ABI.FunctionSelector{function: "InFlightExitChallenged"}) do
contracts_naming = [
{"challenger", :challenger},
Expand Down
34 changes: 31 additions & 3 deletions apps/omg_eth/test/omg_eth/root_chain/abi_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,36 @@ defmodule OMG.Eth.RootChain.AbiTest do
}
end

test "if in flight exit finalized can be decoded" do
in_flight_exit_finalized_log = %{
test "if in flight exit deleted can be decoded" do
in_flight_exit_deleted_log = %{
:event_signature => "InFlightExitDeleted(uint160)",
"address" => "0x89afce326e7da55647d22e24336c6a2816c99f6b",
"blockHash" => "0xa27ed6299f3d74954e2c32629a5d807743627f8e57f83c8cbeaa4351da73f597",
"blockNumber" => "0x3e8",
"data" => "0x",
"logIndex" => "0x0",
"removed" => false,
"topics" => [
"0x1991c4c350498b0cc937c6a08bc5bdecf2e4fdd9d918052a880f102e43dbe45c",
"0x00000000000000000000000000d1d291fd21f1899f4c9d621f65dd1e0aa2355d"
],
"transactionHash" => "0xbe310ade41278c5607620311b79363aa520ac46c7ba754bf3027d501c5a95f40",
"transactionIndex" => "0x0"
}

assert Abi.decode_log(in_flight_exit_deleted_log) == %{
eth_height: 1000,
event_signature: "InFlightExitDeleted(uint160)",
exit_id: 4_679_199_003_952_701_118_642_806_135_853_996_264_334_177_629,
log_index: 0,
root_chain_txhash:
<<190, 49, 10, 222, 65, 39, 140, 86, 7, 98, 3, 17, 183, 147, 99, 170, 82, 10, 196, 108, 123, 167, 84,
191, 48, 39, 213, 1, 197, 169, 95, 64>>
}
end

test "if in flight exit output withdrawn can be decoded" do
in_flight_exit_output_withdrawn_log = %{
:event_signature => "InFlightExitOutputWithdrawn(uint160,uint16)",
"address" => "0x92ce4d7773c57d96210c46a07b89acf725057f21",
"blockHash" => "0x2218cd9358fd6ed3b720b512b645a88a9a3ed9f472e6192fae202f60e40ac7a2",
Expand All @@ -520,7 +548,7 @@ defmodule OMG.Eth.RootChain.AbiTest do
"transactionIndex" => "0x0"
}

assert Abi.decode_log(in_flight_exit_finalized_log) == %{
assert Abi.decode_log(in_flight_exit_output_withdrawn_log) == %{
eth_height: 335,
event_signature: "InFlightExitOutputWithdrawn(uint160,uint16)",
in_flight_exit_id: 3_853_567_223_408_339_354_111_409_210_931_346_801_537_991_844,
Expand Down
1 change: 1 addition & 0 deletions apps/omg_watcher/lib/omg_watcher/coordinator_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ defmodule OMG.Watcher.CoordinatorSetup do
],
exit_challenger: [waits_for: :exit_processor, finality_margin: finality_margin],
in_flight_exit_processor: [waits_for: :depositor, finality_margin: finality_margin],
in_flight_exit_deleted_processor: [waits_for: :in_flight_exit_processor, finality_margin: finality_margin],
piggyback_processor: [waits_for: :in_flight_exit_processor, finality_margin: finality_margin],
competitor_processor: [waits_for: :in_flight_exit_processor, finality_margin: finality_margin],
challenges_responds_processor: [waits_for: :competitor_processor, finality_margin: finality_margin],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
defmodule OMG.Watcher.EthereumEventAggregator do
@moduledoc """
This process combines all plasma contract events we're interested in and does eth_getLogs + enriches them if needed
for all Ethereum Event Listener processes.
for all Ethereum Event Listener processes.
"""
use GenServer
require Logger
Expand Down Expand Up @@ -52,6 +52,11 @@ defmodule OMG.Watcher.EthereumEventAggregator do
forward_call(server, :in_flight_exit_started, from_block, to_block, @timeout)
end

@spec in_flight_exit_deleted(GenServer.server(), pos_integer(), pos_integer()) :: result()
def in_flight_exit_deleted(server \\ __MODULE__, from_block, to_block) do
forward_call(server, :in_flight_exit_deleted, from_block, to_block, @timeout)
end

@spec in_flight_exit_piggybacked(GenServer.server(), pos_integer(), pos_integer()) :: result()
def in_flight_exit_piggybacked(server \\ __MODULE__, from_block, to_block) do
# input and output
Expand Down
28 changes: 28 additions & 0 deletions apps/omg_watcher/lib/omg_watcher/exit_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ defmodule OMG.Watcher.ExitProcessor do
GenServer.call(__MODULE__, {:new_in_flight_exits, in_flight_exit_started_events}, @timeout)
end

@doc """
Accepts events and processes them in the state - new in flight exits are tracked.
Returns `db_updates` to be sent to `OMG.DB` by the caller
"""
# empty list clause to not block the server for no-ops
def delete_in_flight_exits([]), do: {:ok, []}

def delete_in_flight_exits(in_flight_exit_deleted_events) do
GenServer.call(__MODULE__, {:delete_in_flight_exits, in_flight_exit_deleted_events}, @timeout)
end

@doc """
Accepts events and processes them in the state - finalized exits are untracked _if valid_ otherwise raises alert
Expand Down Expand Up @@ -359,6 +371,22 @@ defmodule OMG.Watcher.ExitProcessor do
{:reply, {:ok, db_updates}, new_state}
end

@doc """
See `delete_in_flight_exits/1`. Flow:
- spends input utxos
- deletes in-flight exits from state
"""
def handle_call({:delete_in_flight_exits, deletions}, _from, state) do
_ =
if not Enum.empty?(deletions),
do: Logger.info("Recognized #{Enum.count(deletions)} deletions: #{inspect(deletions)}")

{new_state, deleted_utxos, db_updates} = Core.delete_in_flight_exits(state, deletions)
{:ok, db_updates_from_state, _validities} = State.exit_utxos(deleted_utxos)

{:reply, {:ok, db_updates ++ db_updates_from_state}, new_state}
end

@doc """
See `finalize_exits/1`. Flow:
Expand Down
28 changes: 28 additions & 0 deletions apps/omg_watcher/lib/omg_watcher/exit_processor/core.ex
Original file line number Diff line number Diff line change
Expand Up @@ -641,4 +641,32 @@ defmodule OMG.Watcher.ExitProcessor.Core do

defp sla_margin_safe?(exit_processor_sla_margin, min_exit_period_seconds, ethereum_block_time_seconds),
do: exit_processor_sla_margin * ethereum_block_time_seconds < min_exit_period_seconds

@doc """
Deletes in-flight exits from state and returns deleted exits
"""
@spec delete_in_flight_exits(__MODULE__.t(), list(map)) :: {__MODULE__.t(), list(InFlightExitInfo.t()), list(any())}
def delete_in_flight_exits(state, deletions) do
exit_ids =
deletions
|> Enum.map(fn %{exit_id: exit_id} -> InFlightExitInfo.to_contract_id(exit_id) end)
|> MapSet.new()

deleted_ifes_by_key =
state.in_flight_exits
|> Enum.filter(fn {_, ife} -> MapSet.member?(exit_ids, ife.contract_id) end)
|> Map.new()

deleted_keys = Map.keys(deleted_ifes_by_key)
updated_ifes = Map.drop(state.in_flight_exits, deleted_keys)

deleted_utxos =
deleted_ifes_by_key
|> Map.values()
|> InFlightExitInfo.get_input_utxos()

db_updates = Enum.map(deleted_keys, fn key -> {:delete, :in_flight_exit_info, key} end)

{%{state | in_flight_exits: updated_ifes}, deleted_utxos, db_updates}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ defmodule OMG.Watcher.ExitProcessor.InFlightExitInfo do
{:put, :in_flight_exit_info, {ife_hash, value}}
end

@doc """
Returns all input utxos for given in-flight exits
"""
@spec get_input_utxos(list(t())) :: list(Utxo.Position.t())
def get_input_utxos(in_flight_exits) do
in_flight_exits
|> Enum.map(& &1.input_utxos_pos)
|> List.flatten()
end

defp assert_utxo_pos_type(Utxo.position(blknum, txindex, oindex))
when is_integer(blknum) and is_integer(txindex) and is_integer(oindex),
do: :ok
Expand Down Expand Up @@ -417,6 +427,12 @@ defmodule OMG.Watcher.ExitProcessor.InFlightExitInfo do
}),
do: is_older?(seen_in_pos, oldest_competitor_pos)

@doc """
Converts integer to contract's in-flight exit id
"""
@spec to_contract_id(non_neg_integer) :: <<_::192>>
def to_contract_id(id), do: <<id::192>>

@doc """
Checks if the competitor being seen at `competitor_pos` (`nil` if unseen) is viable to challenge with, considering the
current state of the IFE - that is, only if it is older than IFE tx's inclusion and other competitors
Expand Down
10 changes: 10 additions & 0 deletions apps/omg_watcher/lib/omg_watcher/sync_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ defmodule OMG.Watcher.SyncSupervisor do
[name: :exit_finalized, enrich: false],
[name: :exit_challenged, enrich: false],
[name: :in_flight_exit_started, enrich: true],
[name: :in_flight_exit_deleted, enrich: false],
[name: :in_flight_exit_input_piggybacked, enrich: false],
[name: :in_flight_exit_output_piggybacked, enrich: false],
[name: :in_flight_exit_challenged, enrich: true],
Expand Down Expand Up @@ -213,6 +214,15 @@ defmodule OMG.Watcher.SyncSupervisor do
get_events_callback: &EthereumEventAggregator.in_flight_exit_withdrawn/2,
process_events_callback: &Watcher.ExitProcessor.finalize_in_flight_exits/1
),
EthereumEventListener.prepare_child(
metrics_collection_interval: metrics_collection_interval,
ethereum_events_check_interval_ms: ethereum_events_check_interval_ms,
contract_deployment_height: contract_deployment_height,
service_name: :in_flight_exit_deleted_processor,
synced_height_update_key: :last_ife_exit_deleted_eth_height,
get_events_callback: &EthereumEventAggregator.in_flight_exit_deleted/2,
process_events_callback: &Watcher.ExitProcessor.delete_in_flight_exits/1
),
{StatusCache, [event_bus: OMG.Bus, ets: status_cache()]},
{ChildManager, [monitor: Monitor]}
]
Expand Down
Loading

0 comments on commit 6227b79

Please sign in to comment.