Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async stream + timeout #1593

Merged
merged 6 commits into from
Jun 20, 2020
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions apps/omg_watcher/lib/omg_watcher/exit_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ defmodule OMG.Watcher.ExitProcessor do
def new_exits([]), do: {:ok, []}

def new_exits(exits) do
GenServer.call(__MODULE__, {:new_exits, exits})
GenServer.call(__MODULE__, {:new_exits, exits}, 60_000)
end

@doc """
Expand All @@ -74,7 +74,7 @@ defmodule OMG.Watcher.ExitProcessor do
def new_in_flight_exits([]), do: {:ok, []}

def new_in_flight_exits(in_flight_exit_started_events) do
GenServer.call(__MODULE__, {:new_in_flight_exits, in_flight_exit_started_events})
GenServer.call(__MODULE__, {:new_in_flight_exits, in_flight_exit_started_events}, 60_000)
end

@doc """
Expand All @@ -86,7 +86,7 @@ defmodule OMG.Watcher.ExitProcessor do
def finalize_exits([]), do: {:ok, []}

def finalize_exits(finalizations) do
GenServer.call(__MODULE__, {:finalize_exits, finalizations})
GenServer.call(__MODULE__, {:finalize_exits, finalizations}, 60_000)
end

@doc """
Expand All @@ -98,7 +98,7 @@ defmodule OMG.Watcher.ExitProcessor do
def piggyback_exits([]), do: {:ok, []}

def piggyback_exits(piggybacks) do
GenServer.call(__MODULE__, {:piggyback_exits, piggybacks})
GenServer.call(__MODULE__, {:piggyback_exits, piggybacks}, 60_000)
end

@doc """
Expand All @@ -110,7 +110,7 @@ defmodule OMG.Watcher.ExitProcessor do
def challenge_exits([]), do: {:ok, []}

def challenge_exits(challenges) do
GenServer.call(__MODULE__, {:challenge_exits, challenges})
GenServer.call(__MODULE__, {:challenge_exits, challenges}, 60_000)
end

@doc """
Expand All @@ -126,7 +126,7 @@ defmodule OMG.Watcher.ExitProcessor do
def new_ife_challenges([]), do: {:ok, []}

def new_ife_challenges(challenges) do
GenServer.call(__MODULE__, {:new_ife_challenges, challenges})
GenServer.call(__MODULE__, {:new_ife_challenges, challenges}, 60_000)
end

@doc """
Expand All @@ -140,7 +140,7 @@ defmodule OMG.Watcher.ExitProcessor do
def respond_to_in_flight_exits_challenges([]), do: {:ok, []}

def respond_to_in_flight_exits_challenges(responds) do
GenServer.call(__MODULE__, {:respond_to_in_flight_exits_challenges, responds})
GenServer.call(__MODULE__, {:respond_to_in_flight_exits_challenges, responds}, 60_000)
end

@doc """
Expand All @@ -152,7 +152,7 @@ defmodule OMG.Watcher.ExitProcessor do
def challenge_piggybacks([]), do: {:ok, []}

def challenge_piggybacks(challenges) do
GenServer.call(__MODULE__, {:challenge_piggybacks, challenges})
GenServer.call(__MODULE__, {:challenge_piggybacks, challenges}, 60_000)
end

@doc """
Expand All @@ -164,7 +164,7 @@ defmodule OMG.Watcher.ExitProcessor do
def finalize_in_flight_exits([]), do: {:ok, []}

def finalize_in_flight_exits(finalizations) do
GenServer.call(__MODULE__, {:finalize_in_flight_exits, finalizations})
GenServer.call(__MODULE__, {:finalize_in_flight_exits, finalizations}, 60_000)
end

@doc """
Expand All @@ -175,7 +175,7 @@ defmodule OMG.Watcher.ExitProcessor do
but under unchanged conditions, it should have unchanged behavior from POV of an outside caller.
"""
def check_validity() do
GenServer.call(__MODULE__, :check_validity)
GenServer.call(__MODULE__, :check_validity, 60_000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this value to module attribute and reuse it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah!

end

def check_validity(timeout) do
Expand All @@ -187,7 +187,7 @@ defmodule OMG.Watcher.ExitProcessor do
"""
@spec get_active_in_flight_exits() :: {:ok, Core.in_flight_exits_response_t()}
def get_active_in_flight_exits() do
GenServer.call(__MODULE__, :get_active_in_flight_exits)
GenServer.call(__MODULE__, :get_active_in_flight_exits, 60_000)
end

@doc """
Expand All @@ -199,7 +199,7 @@ defmodule OMG.Watcher.ExitProcessor do
| {:error, :competitor_not_found}
| {:error, :no_viable_competitor_found}
def get_competitor_for_ife(txbytes) do
GenServer.call(__MODULE__, {:get_competitor_for_ife, txbytes})
GenServer.call(__MODULE__, {:get_competitor_for_ife, txbytes}, 60_000)
end

@doc """
Expand All @@ -209,7 +209,7 @@ defmodule OMG.Watcher.ExitProcessor do
@spec prove_canonical_for_ife(binary()) ::
{:ok, ExitProcessor.Canonicity.prove_canonical_data_t()} | {:error, :no_viable_canonical_proof_found}
def prove_canonical_for_ife(txbytes) do
GenServer.call(__MODULE__, {:prove_canonical_for_ife, txbytes})
GenServer.call(__MODULE__, {:prove_canonical_for_ife, txbytes}, 60_000)
end

@doc """
Expand All @@ -219,7 +219,7 @@ defmodule OMG.Watcher.ExitProcessor do
{:ok, ExitProcessor.Piggyback.input_challenge_data()}
| {:error, ExitProcessor.Piggyback.piggyback_challenge_data_error()}
def get_input_challenge_data(txbytes, input_index) do
GenServer.call(__MODULE__, {:get_input_challenge_data, txbytes, input_index})
GenServer.call(__MODULE__, {:get_input_challenge_data, txbytes, input_index}, 60_000)
end

@doc """
Expand All @@ -229,7 +229,7 @@ defmodule OMG.Watcher.ExitProcessor do
{:ok, ExitProcessor.Piggyback.output_challenge_data()}
| {:error, ExitProcessor.Piggyback.piggyback_challenge_data_error()}
def get_output_challenge_data(txbytes, output_index) do
GenServer.call(__MODULE__, {:get_output_challenge_data, txbytes, output_index})
GenServer.call(__MODULE__, {:get_output_challenge_data, txbytes, output_index}, 60_000)
end

@doc """
Expand All @@ -238,7 +238,7 @@ defmodule OMG.Watcher.ExitProcessor do
@spec create_challenge(Utxo.Position.t()) ::
{:ok, StandardExit.Challenge.t()} | {:error, :utxo_not_spent | :exit_not_found}
def create_challenge(exiting_utxo_pos) do
GenServer.call(__MODULE__, {:create_challenge, exiting_utxo_pos})
GenServer.call(__MODULE__, {:create_challenge, exiting_utxo_pos}, 60_000)
end

### Server
Expand Down Expand Up @@ -310,9 +310,16 @@ defmodule OMG.Watcher.ExitProcessor do
{:ok, exit_contract_statuses} = Eth.RootChain.get_standard_exit_structs(get_in(exits, [Access.all(), :exit_id]))

exit_maps =
Enum.map(exits, fn exit_event ->
put_timestamp_and_sft(exit_event, state.min_exit_period_seconds, state.child_block_interval)
end)
exits
|> Task.async_stream(
fn exit_event ->
put_timestamp_and_sft(exit_event, state.min_exit_period_seconds, state.child_block_interval)
end,
timeout: 50_000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this Task.async_stream's :timeout is very handy 👏

on_timeout: :exit,
max_concurrency: System.schedulers_online() * 2
)
|> Enum.map(fn {:ok, result} -> result end)

{new_state, db_updates} = Core.new_exits(state, exit_maps, exit_contract_statuses)
{:reply, {:ok, db_updates}, new_state}
Expand All @@ -331,12 +338,16 @@ defmodule OMG.Watcher.ExitProcessor do
_ = if not Enum.empty?(events), do: Logger.info("Recognized in-flight exits: #{inspect(events)}")

contract_ife_ids =
Enum.map(
events,
events
|> Task.async_stream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExPlasma.InFlightExit.txbytes_to_id(txbytes) is already a local computation though, no longer making RPC calls.

Maybe at least we can remove the custom timeout and so defaults back to 5000ms which is plenty enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh geez. I didn't even check. Now I remember you did this!

fn %{call_data: %{in_flight_tx: txbytes}} ->
ExPlasma.InFlightExit.txbytes_to_id(txbytes)
end
end,
timeout: 50_000,
on_timeout: :exit,
max_concurrency: System.schedulers_online() * 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would not be larger then our rate limit to infura right (is there a rate limit)? (btw, I actually have no idea what both number are 😅)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the beam installs a scheduler per CPU core. so if you have 8 cores times 2, you'd do at most 16 exits at the same time.

)
|> Enum.map(fn {:ok, result} -> result end)

# Prepare events data for internal bus
:ok =
Expand Down