diff --git a/apps/omg_db/lib/db.ex b/apps/omg_db/lib/db.ex index 094d86a8a8..5bd44484b9 100644 --- a/apps/omg_db/lib/db.ex +++ b/apps/omg_db/lib/db.ex @@ -143,6 +143,7 @@ defmodule OMG.DB do :last_exit_finalizer_eth_height, :last_exit_challenger_eth_height, :last_in_flight_exit_processor_eth_height, + :last_ife_exit_deleted_eth_height, :last_piggyback_processor_eth_height, :last_competitor_processor_eth_height, :last_challenges_responds_processor_eth_height, diff --git a/apps/omg_eth/lib/omg_eth/root_chain/abi.ex b/apps/omg_eth/lib/omg_eth/root_chain/abi.ex index 5cdbb18afb..b1c71b9638 100644 --- a/apps/omg_eth/lib/omg_eth/root_chain/abi.ex +++ b/apps/omg_eth/lib/omg_eth/root_chain/abi.ex @@ -13,7 +13,7 @@ # limitations under the License. defmodule OMG.Eth.RootChain.Abi do @moduledoc """ - Functions that provide ethereum log decoding + Functions that provide ethereum log decoding """ alias OMG.Eth.Encoding alias OMG.Eth.RootChain.AbiEventSelector diff --git a/apps/omg_eth/lib/omg_eth/root_chain/abi_event_selector.ex b/apps/omg_eth/lib/omg_eth/root_chain/abi_event_selector.ex index 146fd68bb5..e6a7e9dca7 100644 --- a/apps/omg_eth/lib/omg_eth/root_chain/abi_event_selector.ex +++ b/apps/omg_eth/lib/omg_eth/root_chain/abi_event_selector.ex @@ -45,6 +45,19 @@ defmodule OMG.Eth.RootChain.AbiEventSelector do } end + @spec in_flight_exit_deleted() :: ABI.FunctionSelector.t() + def in_flight_exit_deleted() do + %ABI.FunctionSelector{ + function: "InFlightExitDeleted", + input_names: ["exitId"], + inputs_indexed: [true], + method_id: <<25, 145, 196, 195>>, + returns: [], + type: :event, + types: [uint: 160] + } + end + @spec in_flight_exit_challenged() :: ABI.FunctionSelector.t() def in_flight_exit_challenged() do %ABI.FunctionSelector{ diff --git a/apps/omg_eth/lib/omg_eth/root_chain/fields.ex b/apps/omg_eth/lib/omg_eth/root_chain/fields.ex index 93402ed429..b6650b301c 100644 --- a/apps/omg_eth/lib/omg_eth/root_chain/fields.ex +++ b/apps/omg_eth/lib/omg_eth/root_chain/fields.ex @@ -75,6 +75,11 @@ defmodule OMG.Eth.RootChain.Fields do reduce_naming(data, contracts_naming) end + def rename(data, %ABI.FunctionSelector{function: "InFlightExitDeleted"}) do + contracts_naming = [{"exitId", :exit_id}] + reduce_naming(data, contracts_naming) + end + def rename(data, %ABI.FunctionSelector{function: "InFlightExitChallenged"}) do contracts_naming = [ {"challenger", :challenger}, diff --git a/apps/omg_eth/test/omg_eth/root_chain/abi_test.exs b/apps/omg_eth/test/omg_eth/root_chain/abi_test.exs index daff1b809f..7222875d34 100644 --- a/apps/omg_eth/test/omg_eth/root_chain/abi_test.exs +++ b/apps/omg_eth/test/omg_eth/root_chain/abi_test.exs @@ -503,8 +503,36 @@ defmodule OMG.Eth.RootChain.AbiTest do } end - test "if in flight exit finalized can be decoded" do - in_flight_exit_finalized_log = %{ + test "if in flight exit deleted can be decoded" do + in_flight_exit_deleted_log = %{ + :event_signature => "InFlightExitDeleted(uint160)", + "address" => "0x92ce4d7773c57d96210c46a07b89acf725057f21", + "blockHash" => "0xcafbc4b710c5fab8f3d719f65053637407231ecde31a859f1709e3478a2eda54", + "blockNumber" => "0x14a", + "data" => "0x", + "logIndex" => "0x2", + "removed" => false, + "topics" => [ + "0x1991c4c350498b0cc937c6a08bc5bdecf2e4fdd9d918052a880f102e43dbe45c", + "0x000000000000000000000000003fd275046f2823936fd97c1e3c8b225464d7f1" + ], + "transactionHash" => "0xbe310ade41278c5607620311b79363aa520ac46c7ba754bf3027d501c5a95f40", + "transactionIndex" => "0x0" + } + + assert Abi.decode_log(in_flight_exit_deleted_log) == %{ + eth_height: 330, + event_signature: "InFlightExitDeleted(uint160)", + exit_id: 1_423_280_346_484_099_708_949_144_162_169_101_241_792_387_057, + log_index: 2, + root_chain_txhash: + <<190, 49, 10, 222, 65, 39, 140, 86, 7, 98, 3, 17, 183, 147, 99, 170, 82, 10, 196, 108, 123, 167, 84, + 191, 48, 39, 213, 1, 197, 169, 95, 64>> + } + end + + test "if in flight exit output withdrawn can be decoded" do + in_flight_exit_output_withdrawn_log = %{ :event_signature => "InFlightExitOutputWithdrawn(uint160,uint16)", "address" => "0x92ce4d7773c57d96210c46a07b89acf725057f21", "blockHash" => "0x2218cd9358fd6ed3b720b512b645a88a9a3ed9f472e6192fae202f60e40ac7a2", @@ -520,7 +548,7 @@ defmodule OMG.Eth.RootChain.AbiTest do "transactionIndex" => "0x0" } - assert Abi.decode_log(in_flight_exit_finalized_log) == %{ + assert Abi.decode_log(in_flight_exit_output_withdrawn_log) == %{ eth_height: 335, event_signature: "InFlightExitOutputWithdrawn(uint160,uint16)", in_flight_exit_id: 3_853_567_223_408_339_354_111_409_210_931_346_801_537_991_844, diff --git a/apps/omg_watcher/lib/omg_watcher/coordinator_setup.ex b/apps/omg_watcher/lib/omg_watcher/coordinator_setup.ex index 5d2aea1acb..170a0ac10f 100644 --- a/apps/omg_watcher/lib/omg_watcher/coordinator_setup.ex +++ b/apps/omg_watcher/lib/omg_watcher/coordinator_setup.ex @@ -52,6 +52,7 @@ defmodule OMG.Watcher.CoordinatorSetup do ], exit_challenger: [waits_for: :exit_processor, finality_margin: finality_margin], in_flight_exit_processor: [waits_for: :depositor, finality_margin: finality_margin], + in_flight_exit_deleted_processor: [waits_for: :in_flight_exit_processor, finality_margin: finality_margin], piggyback_processor: [waits_for: :in_flight_exit_processor, finality_margin: finality_margin], competitor_processor: [waits_for: :in_flight_exit_processor, finality_margin: finality_margin], challenges_responds_processor: [waits_for: :competitor_processor, finality_margin: finality_margin], diff --git a/apps/omg_watcher/lib/omg_watcher/ethereum_event_aggregator.ex b/apps/omg_watcher/lib/omg_watcher/ethereum_event_aggregator.ex index a68bf0b23b..d4be5f2594 100644 --- a/apps/omg_watcher/lib/omg_watcher/ethereum_event_aggregator.ex +++ b/apps/omg_watcher/lib/omg_watcher/ethereum_event_aggregator.ex @@ -14,7 +14,7 @@ defmodule OMG.Watcher.EthereumEventAggregator do @moduledoc """ This process combines all plasma contract events we're interested in and does eth_getLogs + enriches them if needed - for all Ethereum Event Listener processes. + for all Ethereum Event Listener processes. """ use GenServer require Logger @@ -52,6 +52,11 @@ defmodule OMG.Watcher.EthereumEventAggregator do forward_call(server, :in_flight_exit_started, from_block, to_block, @timeout) end + @spec in_flight_exit_deleted(GenServer.server(), pos_integer(), pos_integer()) :: result() + def in_flight_exit_deleted(server \\ __MODULE__, from_block, to_block) do + forward_call(server, :in_flight_exit_deleted, from_block, to_block, @timeout) + end + @spec in_flight_exit_piggybacked(GenServer.server(), pos_integer(), pos_integer()) :: result() def in_flight_exit_piggybacked(server \\ __MODULE__, from_block, to_block) do # input and output diff --git a/apps/omg_watcher/lib/omg_watcher/exit_processor.ex b/apps/omg_watcher/lib/omg_watcher/exit_processor.ex index 83b865db37..1593433412 100644 --- a/apps/omg_watcher/lib/omg_watcher/exit_processor.ex +++ b/apps/omg_watcher/lib/omg_watcher/exit_processor.ex @@ -79,6 +79,18 @@ defmodule OMG.Watcher.ExitProcessor do GenServer.call(__MODULE__, {:new_in_flight_exits, in_flight_exit_started_events}, @timeout) end + @doc """ + Accepts events and processes them in the state - new in flight exits are tracked. + + Returns `db_updates` to be sent to `OMG.DB` by the caller + """ + # empty list clause to not block the server for no-ops + def delete_in_flight_exits([]), do: {:ok, []} + + def delete_in_flight_exits(in_flight_exit_deleted_events) do + GenServer.call(__MODULE__, {:in_flight_exits_deleted, in_flight_exit_deleted_events}, @timeout) + end + @doc """ Accepts events and processes them in the state - finalized exits are untracked _if valid_ otherwise raises alert @@ -359,6 +371,22 @@ defmodule OMG.Watcher.ExitProcessor do {:reply, {:ok, db_updates}, new_state} end + @doc """ + See `delete_in_flight_exits/1`. Flow: + - spends input utxos + - deletes in-flight exits from state + """ + def handle_call({:delete_in_flight_exits, deletions}, _from, state) do + _ = + if not Enum.empty?(deletions), + do: Logger.info("Recognized #{Enum.count(deletions)} deletions: #{inspect(deletions)}") + + {new_state, deleted_utxos, db_updates} = Core.delete_in_flight_exits(state, deletions) + {:ok, db_updates_from_state, _validities} = State.exit_utxos(deleted_utxos) + + {:reply, {:ok, db_updates ++ db_updates_from_state}, new_state} + end + @doc """ See `finalize_exits/1`. Flow: diff --git a/apps/omg_watcher/lib/omg_watcher/exit_processor/core.ex b/apps/omg_watcher/lib/omg_watcher/exit_processor/core.ex index 65bbfc6619..d8c0240148 100644 --- a/apps/omg_watcher/lib/omg_watcher/exit_processor/core.ex +++ b/apps/omg_watcher/lib/omg_watcher/exit_processor/core.ex @@ -641,4 +641,32 @@ defmodule OMG.Watcher.ExitProcessor.Core do defp sla_margin_safe?(exit_processor_sla_margin, min_exit_period_seconds, ethereum_block_time_seconds), do: exit_processor_sla_margin * ethereum_block_time_seconds < min_exit_period_seconds + + @doc """ + Deletes in-flight exits from state and returns deleted exits + """ + @spec delete_in_flight_exits(__MODULE__.t(), list(map)) :: {__MODULE__.t(), list(InFlightExitInfo.t()), list(any())} + def delete_in_flight_exits(state, deletions) do + exit_ids = + deletions + |> Enum.map(fn %{exit_id: exit_id} -> InFlightExitInfo.to_contract_id(exit_id) end) + |> MapSet.new() + + deleted_ifes_by_key = + state.in_flight_exits + |> Enum.filter(fn {_, ife} -> MapSet.member?(exit_ids, ife.contract_id) end) + |> Map.new() + + deleted_keys = Map.keys(deleted_ifes_by_key) + updated_ifes = Map.drop(state.in_flight_exits, deleted_keys) + + deleted_utxos = + deleted_ifes_by_key + |> Map.values() + |> InFlightExitInfo.get_input_utxos() + + db_updates = Enum.map(deleted_keys, fn key -> {:delete, :in_flight_exit_info, key} end) + + {%{state | in_flight_exits: updated_ifes}, deleted_utxos, db_updates} + end end diff --git a/apps/omg_watcher/lib/omg_watcher/exit_processor/in_flight_exit_info.ex b/apps/omg_watcher/lib/omg_watcher/exit_processor/in_flight_exit_info.ex index 886a23f150..bd5db6bc36 100644 --- a/apps/omg_watcher/lib/omg_watcher/exit_processor/in_flight_exit_info.ex +++ b/apps/omg_watcher/lib/omg_watcher/exit_processor/in_flight_exit_info.ex @@ -193,6 +193,16 @@ defmodule OMG.Watcher.ExitProcessor.InFlightExitInfo do {:put, :in_flight_exit_info, {ife_hash, value}} end + @doc """ + Returns all input utxos for given in-flight exits + """ + @spec get_input_utxos(list(t())) :: list(Utxo.Position.t()) + def get_input_utxos(in_flight_exits) do + in_flight_exits + |> Enum.map(& &1.input_utxos_pos) + |> List.flatten() + end + defp assert_utxo_pos_type(Utxo.position(blknum, txindex, oindex)) when is_integer(blknum) and is_integer(txindex) and is_integer(oindex), do: :ok @@ -417,6 +427,12 @@ defmodule OMG.Watcher.ExitProcessor.InFlightExitInfo do }), do: is_older?(seen_in_pos, oldest_competitor_pos) + @doc """ + Converts integer to in-flight exit contract id + """ + @spec to_contract_id(non_neg_integer) :: <<_::192>> + def to_contract_id(id), do: <> + @doc """ Checks if the competitor being seen at `competitor_pos` (`nil` if unseen) is viable to challenge with, considering the current state of the IFE - that is, only if it is older than IFE tx's inclusion and other competitors diff --git a/apps/omg_watcher/lib/omg_watcher/sync_supervisor.ex b/apps/omg_watcher/lib/omg_watcher/sync_supervisor.ex index 5920227b63..9397d20297 100644 --- a/apps/omg_watcher/lib/omg_watcher/sync_supervisor.ex +++ b/apps/omg_watcher/lib/omg_watcher/sync_supervisor.ex @@ -114,6 +114,7 @@ defmodule OMG.Watcher.SyncSupervisor do [name: :exit_finalized, enrich: false], [name: :exit_challenged, enrich: false], [name: :in_flight_exit_started, enrich: true], + [name: :in_flight_exit_deleted, enrich: false], [name: :in_flight_exit_input_piggybacked, enrich: false], [name: :in_flight_exit_output_piggybacked, enrich: false], [name: :in_flight_exit_challenged, enrich: true], @@ -213,6 +214,15 @@ defmodule OMG.Watcher.SyncSupervisor do get_events_callback: &EthereumEventAggregator.in_flight_exit_withdrawn/2, process_events_callback: &Watcher.ExitProcessor.finalize_in_flight_exits/1 ), + EthereumEventListener.prepare_child( + metrics_collection_interval: metrics_collection_interval, + ethereum_events_check_interval_ms: ethereum_events_check_interval_ms, + contract_deployment_height: contract_deployment_height, + service_name: :in_flight_exit_deleted_processor, + synced_height_update_key: :last_ife_exit_deleted_eth_height, + get_events_callback: &EthereumEventAggregator.in_flight_exit_deleted/2, + process_events_callback: &Watcher.ExitProcessor.delete_in_flight_exits/1 + ), {StatusCache, [event_bus: OMG.Bus, ets: status_cache()]}, {ChildManager, [monitor: Monitor]} ] diff --git a/apps/omg_watcher/test/omg_watcher/exit_processor/core/state_interaction_test.exs b/apps/omg_watcher/test/omg_watcher/exit_processor/core/state_interaction_test.exs index 2eb35b4dec..aaafd75688 100644 --- a/apps/omg_watcher/test/omg_watcher/exit_processor/core/state_interaction_test.exs +++ b/apps/omg_watcher/test/omg_watcher/exit_processor/core/state_interaction_test.exs @@ -289,6 +289,23 @@ defmodule OMG.Watcher.ExitProcessor.Core.StateInteractionTest do assert [_] = Core.get_active_in_flight_exits(processor) end + test "deleting in-flight exits works with State", + %{processor_empty: processor, state_empty: state, alice: alice} do + ife_exit_tx = TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{alice, 9}]) + ife_id = 1 + + state = TestHelper.do_deposit(state, alice, %{amount: 10, currency: @eth, blknum: 1}) + {:ok, _, state} = State.Core.form_block(state) + + {_processor, deleted_utxos, _db_updates} = + processor + |> start_ife_from(ife_exit_tx, exit_id: ife_id) + |> Core.delete_in_flight_exits([%{exit_id: ife_id}]) + + assert {:ok, {[{:delete, :utxo, _}], {[{:utxo_position, 1, 0, 0}], []}}, _} = + State.Core.exit_utxos(deleted_utxos, state) + end + defp mock_utxo_exists(%ExitProcessor.Request{utxos_to_check: positions} = request, state) do %{request | utxo_exists_result: positions |> Enum.map(&State.Core.utxo_exists?(&1, state))} end diff --git a/apps/omg_watcher/test/omg_watcher/exit_processor/core_test.exs b/apps/omg_watcher/test/omg_watcher/exit_processor/core_test.exs index d9451bbf8c..2fd9fda0d5 100644 --- a/apps/omg_watcher/test/omg_watcher/exit_processor/core_test.exs +++ b/apps/omg_watcher/test/omg_watcher/exit_processor/core_test.exs @@ -335,4 +335,42 @@ defmodule OMG.Watcher.ExitProcessor.CoreTest do db_value_map end end + + describe "delete_in_flight_exits/2" do + test "returns deleted utxos and database updates", %{processor_empty: processor, alice: alice} do + ife_exit_tx1 = TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{alice, 9}]) + ife_id1 = 1 + tx_hash1 = Transaction.raw_txhash(ife_exit_tx1) + ife_exit_tx2 = TestHelper.create_recovered([{2, 0, 1, alice}, {2, 0, 2, alice}], @eth, [{alice, 9}]) + ife_id2 = 2 + ife_exit_tx3 = TestHelper.create_recovered([{3, 0, 1, alice}, {3, 0, 2, alice}], @eth, [{alice, 9}]) + ife_id3 = 3 + tx_hash3 = Transaction.raw_txhash(ife_exit_tx3) + + {_processor, deleted_utxos, db_updates} = + processor + |> start_ife_from(ife_exit_tx1, exit_id: ife_id1) + |> start_ife_from(ife_exit_tx2, exit_id: ife_id2) + |> start_ife_from(ife_exit_tx3, exit_id: ife_id3) + |> Core.delete_in_flight_exits([%{exit_id: ife_id1}, %{exit_id: ife_id3}]) + + assert Enum.sort(deleted_utxos) == + Enum.sort([{:utxo_position, 3, 0, 1}, {:utxo_position, 3, 0, 2}, {:utxo_position, 1, 0, 0}]) + + assert Enum.sort(db_updates) == + Enum.sort([{:delete, :in_flight_exit_info, tx_hash1}, {:delete, :in_flight_exit_info, tx_hash3}]) + end + + test "deletes in-flight exits from processor", %{processor_empty: processor, alice: alice} do + ife_exit_tx = TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{alice, 9}]) + ife_id = 1 + + {processor, _deleted_utxos, _db_updates} = + processor + |> start_ife_from(ife_exit_tx, exit_id: ife_id) + |> Core.delete_in_flight_exits([%{exit_id: ife_id}]) + + assert Enum.empty?(processor.in_flight_exits) + end + end end diff --git a/apps/omg_watcher/test/omg_watcher/exit_processor/in_flight_exit_info_test.exs b/apps/omg_watcher/test/omg_watcher/exit_processor/in_flight_exit_info_test.exs new file mode 100644 index 0000000000..f1111674cd --- /dev/null +++ b/apps/omg_watcher/test/omg_watcher/exit_processor/in_flight_exit_info_test.exs @@ -0,0 +1,63 @@ +# Copyright 2019-2020 OmiseGO Pte Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule OMG.Watcher.ExitProcessor.InFlightExitInfoTest do + @moduledoc false + + use OMG.Watcher.ExitProcessor.Case, async: true + + alias OMG.State.Transaction + alias OMG.Utxo.Position + alias OMG.Watcher.ExitProcessor.InFlightExitInfo + + @eth OMG.Eth.zero_address() + + describe "get_input_utxos/1" do + test "returns a list of input utxos" do + inputs1 = [ + Position.encode({:utxo_position, 1, 0, 0}), + Position.encode({:utxo_position, 1, 0, 1}) + ] + + inputs2 = [Position.encode({:utxo_position, 1, 0, 2})] + + ife_infos = [ + ife_info_with_inputs(inputs1), + ife_info_with_inputs(inputs2) + ] + + expected = inputs1 ++ inputs2 + + assert InFlightExitInfo.get_input_utxos(ife_infos) == expected + end + end + + defp ife_info_with_inputs(inputs) do + tx = + Transaction.Payment.new( + [{1, 0, 0}], + [{"alice", @eth, 1}, {"alice", @eth, 2}], + <<0::256>> + ) + + %InFlightExitInfo{ + tx: %Transaction.Signed{raw_tx: tx, sigs: <<1::520>>}, + timestamp: 1, + contract_id: <<1::160>>, + eth_height: 1, + is_active: true, + input_utxos_pos: inputs + } + end +end diff --git a/priv/cabbage b/priv/cabbage index 96d67f4e06..cd262a3b62 160000 --- a/priv/cabbage +++ b/priv/cabbage @@ -1 +1 @@ -Subproject commit 96d67f4e065aad020dfde111419e95e397f443e8 +Subproject commit cd262a3b624a347b05208b154e0c2d7f85dd92a9