Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async stream + timeout #1593

Merged
merged 6 commits into from
Jun 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ jobs:
_counter=$(mix credo --only Credo.Check.Readability.SinglePipe | grep -c "Use a function call when a pipeline is only one function long")
echo "Current Credo.Check.Readability.SinglePipe occurrences:"
echo $_counter
if [ $_counter -gt 343 ]; then
if [ $_counter -gt 341 ]; then
echo "Have you been naughty or nice? Find out if Santa knows."
exit 1
fi
Expand Down
116 changes: 65 additions & 51 deletions apps/omg_watcher/lib/omg_watcher/exit_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ defmodule OMG.Watcher.ExitProcessor do
use OMG.Utils.LoggerExt
require Utxo

@timeout 60_000

### Client

@doc """
Expand All @@ -62,7 +64,7 @@ defmodule OMG.Watcher.ExitProcessor do
def new_exits([]), do: {:ok, []}

def new_exits(exits) do
GenServer.call(__MODULE__, {:new_exits, exits})
GenServer.call(__MODULE__, {:new_exits, exits}, @timeout)
end

@doc """
Expand All @@ -74,7 +76,7 @@ defmodule OMG.Watcher.ExitProcessor do
def new_in_flight_exits([]), do: {:ok, []}

def new_in_flight_exits(in_flight_exit_started_events) do
GenServer.call(__MODULE__, {:new_in_flight_exits, in_flight_exit_started_events})
GenServer.call(__MODULE__, {:new_in_flight_exits, in_flight_exit_started_events}, @timeout)
end

@doc """
Expand All @@ -86,7 +88,7 @@ defmodule OMG.Watcher.ExitProcessor do
def finalize_exits([]), do: {:ok, []}

def finalize_exits(finalizations) do
GenServer.call(__MODULE__, {:finalize_exits, finalizations})
GenServer.call(__MODULE__, {:finalize_exits, finalizations}, @timeout)
end

@doc """
Expand All @@ -98,7 +100,7 @@ defmodule OMG.Watcher.ExitProcessor do
def piggyback_exits([]), do: {:ok, []}

def piggyback_exits(piggybacks) do
GenServer.call(__MODULE__, {:piggyback_exits, piggybacks})
GenServer.call(__MODULE__, {:piggyback_exits, piggybacks}, @timeout)
end

@doc """
Expand All @@ -110,7 +112,7 @@ defmodule OMG.Watcher.ExitProcessor do
def challenge_exits([]), do: {:ok, []}

def challenge_exits(challenges) do
GenServer.call(__MODULE__, {:challenge_exits, challenges})
GenServer.call(__MODULE__, {:challenge_exits, challenges}, @timeout)
end

@doc """
Expand All @@ -126,7 +128,7 @@ defmodule OMG.Watcher.ExitProcessor do
def new_ife_challenges([]), do: {:ok, []}

def new_ife_challenges(challenges) do
GenServer.call(__MODULE__, {:new_ife_challenges, challenges})
GenServer.call(__MODULE__, {:new_ife_challenges, challenges}, @timeout)
end

@doc """
Expand All @@ -140,7 +142,7 @@ defmodule OMG.Watcher.ExitProcessor do
def respond_to_in_flight_exits_challenges([]), do: {:ok, []}

def respond_to_in_flight_exits_challenges(responds) do
GenServer.call(__MODULE__, {:respond_to_in_flight_exits_challenges, responds})
GenServer.call(__MODULE__, {:respond_to_in_flight_exits_challenges, responds}, @timeout)
end

@doc """
Expand All @@ -152,7 +154,7 @@ defmodule OMG.Watcher.ExitProcessor do
def challenge_piggybacks([]), do: {:ok, []}

def challenge_piggybacks(challenges) do
GenServer.call(__MODULE__, {:challenge_piggybacks, challenges})
GenServer.call(__MODULE__, {:challenge_piggybacks, challenges}, @timeout)
end

@doc """
Expand All @@ -164,7 +166,7 @@ defmodule OMG.Watcher.ExitProcessor do
def finalize_in_flight_exits([]), do: {:ok, []}

def finalize_in_flight_exits(finalizations) do
GenServer.call(__MODULE__, {:finalize_in_flight_exits, finalizations})
GenServer.call(__MODULE__, {:finalize_in_flight_exits, finalizations}, @timeout)
end

@doc """
Expand All @@ -175,7 +177,7 @@ defmodule OMG.Watcher.ExitProcessor do
but under unchanged conditions, it should have unchanged behavior from POV of an outside caller.
"""
def check_validity() do
GenServer.call(__MODULE__, :check_validity)
GenServer.call(__MODULE__, :check_validity, @timeout)
end

def check_validity(timeout) do
Expand All @@ -187,7 +189,7 @@ defmodule OMG.Watcher.ExitProcessor do
"""
@spec get_active_in_flight_exits() :: {:ok, Core.in_flight_exits_response_t()}
def get_active_in_flight_exits() do
GenServer.call(__MODULE__, :get_active_in_flight_exits)
GenServer.call(__MODULE__, :get_active_in_flight_exits, @timeout)
end

@doc """
Expand All @@ -199,7 +201,7 @@ defmodule OMG.Watcher.ExitProcessor do
| {:error, :competitor_not_found}
| {:error, :no_viable_competitor_found}
def get_competitor_for_ife(txbytes) do
GenServer.call(__MODULE__, {:get_competitor_for_ife, txbytes})
GenServer.call(__MODULE__, {:get_competitor_for_ife, txbytes}, @timeout)
end

@doc """
Expand All @@ -209,7 +211,7 @@ defmodule OMG.Watcher.ExitProcessor do
@spec prove_canonical_for_ife(binary()) ::
{:ok, ExitProcessor.Canonicity.prove_canonical_data_t()} | {:error, :no_viable_canonical_proof_found}
def prove_canonical_for_ife(txbytes) do
GenServer.call(__MODULE__, {:prove_canonical_for_ife, txbytes})
GenServer.call(__MODULE__, {:prove_canonical_for_ife, txbytes}, @timeout)
end

@doc """
Expand All @@ -219,7 +221,7 @@ defmodule OMG.Watcher.ExitProcessor do
{:ok, ExitProcessor.Piggyback.input_challenge_data()}
| {:error, ExitProcessor.Piggyback.piggyback_challenge_data_error()}
def get_input_challenge_data(txbytes, input_index) do
GenServer.call(__MODULE__, {:get_input_challenge_data, txbytes, input_index})
GenServer.call(__MODULE__, {:get_input_challenge_data, txbytes, input_index}, @timeout)
end

@doc """
Expand All @@ -229,7 +231,7 @@ defmodule OMG.Watcher.ExitProcessor do
{:ok, ExitProcessor.Piggyback.output_challenge_data()}
| {:error, ExitProcessor.Piggyback.piggyback_challenge_data_error()}
def get_output_challenge_data(txbytes, output_index) do
GenServer.call(__MODULE__, {:get_output_challenge_data, txbytes, output_index})
GenServer.call(__MODULE__, {:get_output_challenge_data, txbytes, output_index}, @timeout)
end

@doc """
Expand All @@ -238,7 +240,7 @@ defmodule OMG.Watcher.ExitProcessor do
@spec create_challenge(Utxo.Position.t()) ::
{:ok, StandardExit.Challenge.t()} | {:error, :utxo_not_spent | :exit_not_found}
def create_challenge(exiting_utxo_pos) do
GenServer.call(__MODULE__, {:create_challenge, exiting_utxo_pos})
GenServer.call(__MODULE__, {:create_challenge, exiting_utxo_pos}, @timeout)
end

### Server
Expand Down Expand Up @@ -305,14 +307,21 @@ defmodule OMG.Watcher.ExitProcessor do
- returns `db_updates`
"""
def handle_call({:new_exits, exits}, _from, state) do
_ = if not Enum.empty?(exits), do: Logger.info("Recognized exits: #{inspect(exits)}")
_ = if not Enum.empty?(exits), do: Logger.info("Recognized #{Enum.count(exits)} exits: #{inspect(exits)}")

{:ok, exit_contract_statuses} = Eth.RootChain.get_standard_exit_structs(get_in(exits, [Access.all(), :exit_id]))

exit_maps =
Enum.map(exits, fn exit_event ->
put_timestamp_and_sft(exit_event, state.min_exit_period_seconds, state.child_block_interval)
end)
exits
|> Task.async_stream(
fn exit_event ->
put_timestamp_and_sft(exit_event, state.min_exit_period_seconds, state.child_block_interval)
end,
timeout: 50_000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this Task.async_stream's :timeout is very handy 👏

on_timeout: :exit,
max_concurrency: System.schedulers_online() * 2
)
|> Enum.map(fn {:ok, result} -> result end)

{new_state, db_updates} = Core.new_exits(state, exit_maps, exit_contract_statuses)
{:reply, {:ok, db_updates}, new_state}
Expand All @@ -327,20 +336,17 @@ defmodule OMG.Watcher.ExitProcessor do
- updates the `ExitProcessor`'s state
- returns `db_updates`
"""
def handle_call({:new_in_flight_exits, events}, _from, state) do
_ = if not Enum.empty?(events), do: Logger.info("Recognized in-flight exits: #{inspect(events)}")
def handle_call({:new_in_flight_exits, exits}, _from, state) do
_ = if not Enum.empty?(exits), do: Logger.info("Recognized #{Enum.count(exits)} in-flight exits: #{inspect(exits)}")

contract_ife_ids =
Enum.map(
events,
fn %{call_data: %{in_flight_tx: txbytes}} ->
ExPlasma.InFlightExit.txbytes_to_id(txbytes)
end
)
Enum.map(exits, fn %{call_data: %{in_flight_tx: txbytes}} ->
ExPlasma.InFlightExit.txbytes_to_id(txbytes)
end)

# Prepare events data for internal bus
:ok =
events
exits
|> Enum.map(fn %{call_data: %{input_utxos_pos: inputs}} = event ->
{event, inputs}
end)
Expand All @@ -349,7 +355,7 @@ defmodule OMG.Watcher.ExitProcessor do

{:ok, statuses} = Eth.RootChain.get_in_flight_exit_structs(contract_ife_ids)
ife_contract_statuses = Enum.zip(statuses, contract_ife_ids)
{new_state, db_updates} = Core.new_in_flight_exits(state, events, ife_contract_statuses)
{new_state, db_updates} = Core.new_in_flight_exits(state, exits, ife_contract_statuses)
{:reply, {:ok, db_updates}, new_state}
end

Expand All @@ -363,7 +369,7 @@ defmodule OMG.Watcher.ExitProcessor do
- returns `db_updates`, concatenated with those related to the call to `OMG.State`
"""
def handle_call({:finalize_exits, exits}, _from, state) do
_ = if not Enum.empty?(exits), do: Logger.info("Recognized finalizations: #{inspect(exits)}")
_ = if not Enum.empty?(exits), do: Logger.info("Recognized #{Enum.count(exits)} finalizations: #{inspect(exits)}")

{:ok, db_updates_from_state, validities} =
exits |> Enum.map(&Core.exit_key_by_exit_id(state, &1.exit_id)) |> State.exit_utxos()
Expand All @@ -381,7 +387,7 @@ defmodule OMG.Watcher.ExitProcessor do
- returns `db_updates`
"""
def handle_call({:piggyback_exits, exits}, _from, state) do
_ = if not Enum.empty?(exits), do: Logger.info("Recognized piggybacks: #{inspect(exits)}")
_ = if not Enum.empty?(exits), do: Logger.info("Recognized #{Enum.count(exits)} piggybacks: #{inspect(exits)}")
{new_state, db_updates} = Core.new_piggybacks(state, exits)

:ok =
Expand All @@ -400,7 +406,7 @@ defmodule OMG.Watcher.ExitProcessor do
- returns `db_updates`
"""
def handle_call({:challenge_exits, exits}, _from, state) do
_ = if not Enum.empty?(exits), do: Logger.info("Recognized challenges: #{inspect(exits)}")
_ = if not Enum.empty?(exits), do: Logger.info("Recognized #{Enum.count(exits)} challenges: #{inspect(exits)}")
{new_state, db_updates} = Core.challenge_exits(state, exits)
{:reply, {:ok, db_updates}, new_state}
end
Expand All @@ -413,7 +419,10 @@ defmodule OMG.Watcher.ExitProcessor do
- returns `db_updates`
"""
def handle_call({:new_ife_challenges, challenges}, _from, state) do
_ = if not Enum.empty?(challenges), do: Logger.info("Recognized ife challenges: #{inspect(challenges)}")
_ =
if not Enum.empty?(challenges),
do: Logger.info("Recognized #{Enum.count(challenges)} ife challenges: #{inspect(challenges)}")

{new_state, db_updates} = Core.new_ife_challenges(state, challenges)
{:reply, {:ok, db_updates}, new_state}
end
Expand All @@ -427,7 +436,10 @@ defmodule OMG.Watcher.ExitProcessor do
"""

def handle_call({:challenge_piggybacks, challenges}, _from, state) do
_ = if not Enum.empty?(challenges), do: Logger.info("Recognized piggyback challenges: #{inspect(challenges)}")
_ =
if not Enum.empty?(challenges),
do: Logger.info("Recognized #{Enum.count(challenges)} piggyback challenges: #{inspect(challenges)}")

{new_state, db_updates} = Core.challenge_piggybacks(state, challenges)
{:reply, {:ok, db_updates}, new_state}
end
Expand All @@ -440,7 +452,10 @@ defmodule OMG.Watcher.ExitProcessor do
- returns `db_updates`
"""
def handle_call({:respond_to_in_flight_exits_challenges, responds}, _from, state) do
_ = if not Enum.empty?(responds), do: Logger.info("Recognized response to IFE challenge: #{inspect(responds)}")
_ =
if not Enum.empty?(responds),
do: Logger.info("Recognized #{Enum.count(responds)} response to IFE challenge: #{inspect(responds)}")

{new_state, db_updates} = Core.respond_to_in_flight_exits_challenges(state, responds)
{:reply, {:ok, db_updates}, new_state}
end
Expand All @@ -456,8 +471,9 @@ defmodule OMG.Watcher.ExitProcessor do
- reflects this result in the `ExitProcessor`'s state
- returns `db_updates`, concatenated with those related to the call to `OMG.State`
"""

def handle_call({:finalize_in_flight_exits, finalizations}, _from, state) do
_ = if not Enum.empty?(finalizations), do: Logger.info("Recognized ife finalizations: #{inspect(finalizations)}")
_ = Logger.info("Recognized #{Enum.count(finalizations)} ife finalizations: #{inspect(finalizations)}")

# necessary, so that the processor knows the current state of inclusion of exiting IFE txs
state2 = update_with_ife_txs_from_blocks(state)
Expand Down Expand Up @@ -501,8 +517,9 @@ defmodule OMG.Watcher.ExitProcessor do
@doc """
See `get_active_in_flight_exits/0`.
"""
def handle_call(:get_active_in_flight_exits, _from, state),
do: {:reply, {:ok, Core.get_active_in_flight_exits(state)}, state}
def handle_call(:get_active_in_flight_exits, _from, state) do
{:reply, {:ok, Core.get_active_in_flight_exits(state)}, state}
end

@doc """
See `get_competitor_for_ife/1`. Flow:
Expand Down Expand Up @@ -645,19 +662,21 @@ defmodule OMG.Watcher.ExitProcessor do
do: %{request | ife_input_utxo_exists_result: do_utxo_exists?(positions)}

defp do_utxo_exists?(positions) do
result = positions |> Enum.map(&State.utxo_exists?/1)
result = Enum.map(positions, &State.utxo_exists?/1)
_ = Logger.debug("utxos_to_check: #{inspect(positions)}, utxo_exists_result: #{inspect(result)}")
result
end

defp get_spending_blocks(%ExitProcessor.Request{spends_to_get: positions} = request),
do: %{request | blocks_result: do_get_spending_blocks(positions)}
defp get_spending_blocks(%ExitProcessor.Request{spends_to_get: positions} = request) do
%{request | blocks_result: do_get_spending_blocks(positions)}
end

defp get_ife_input_spending_blocks(%ExitProcessor.Request{ife_input_spends_to_get: positions} = request),
do: %{request | ife_input_spending_blocks_result: do_get_spending_blocks(positions)}
defp get_ife_input_spending_blocks(%ExitProcessor.Request{ife_input_spends_to_get: positions} = request) do
%{request | ife_input_spending_blocks_result: do_get_spending_blocks(positions)}
end

defp do_get_spending_blocks(spent_positions_to_get) do
blknums = spent_positions_to_get |> Enum.map(&do_get_spent_blknum/1)
blknums = Enum.map(spent_positions_to_get, &do_get_spent_blknum/1)
_ = Logger.debug("spends_to_get: #{inspect(spent_positions_to_get)}, spent_blknum_result: #{inspect(blknums)}")

blknums
Expand Down Expand Up @@ -693,12 +712,7 @@ defmodule OMG.Watcher.ExitProcessor do
{invalidities_by_ife_id, state_db_updates}
end

@spec put_timestamp_and_sft(
exit_event :: map(),
min_exit_period_seconds :: pos_integer(),
child_block_interval :: pos_integer()
) ::
map()
@spec put_timestamp_and_sft(map(), pos_integer(), pos_integer()) :: map()
defp put_timestamp_and_sft(
%{eth_height: eth_height, call_data: %{utxo_pos: utxo_pos_enc}} = exit_event,
min_exit_period_seconds,
Expand Down