-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add block processing queue to watcher info #1560
Conversation
apps/omg_watcher_info/test/omg_watcher_info/pending_block_processor_test.exs
Outdated
Show resolved
Hide resolved
apps/omg_watcher_info/test/omg_watcher_info/pending_block_processor_test.exs
Outdated
Show resolved
Hide resolved
apps/omg_watcher_info/test/omg_watcher_info/db/pending_block_test.exs
Outdated
Show resolved
Hide resolved
apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor.ex
Outdated
Show resolved
Hide resolved
apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor.ex
Outdated
Show resolved
Hide resolved
apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor.ex
Outdated
Show resolved
Hide resolved
|> to_mined_block() | ||
|> DB.Block.insert_with_transactions() | ||
|> to_pending_block() | ||
|> DB.PendingBlock.insert() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PendingBlock
is somewhat different naming then I would expect. the world queue
is not used anywhere :)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what ino said
2d07a51
to
8e0835e
Compare
state and add some "random" sleep to give the database time to process and write the block. | ||
This function will instead poll for block.get until found (or timeout). | ||
""" | ||
def wait_for_block_inserted_in_db(block_nr, timeout) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here's my take on these helper functions. they're bloating the test suite and making the tests themselves harder to read. I prefer my tests self complete. Everything about a test is exactly in the test module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of complaints, but good progress overall!
apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor/storage.ex
Show resolved
Hide resolved
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
defmodule OMG.WatcherInfo.PendingBlockProcessor do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the OTP goodies. really clean.
|
||
def change() do | ||
create table(:pending_blocks, primary_key: false) do | ||
add :blknum, :bigint, null: false, primary_key: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null false! 👍
alice = TestHelper.generate_entity() | ||
bob = TestHelper.generate_entity() | ||
|
||
tx_1 = TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{bob, 300}]) | ||
tx_2 = TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{bob, 500}]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these perhaps unnecessarily complex? the data that block_application_consumer can accept is really arbitrary. and the test should only prove that :term_to_binary and :binary_to_term does what it needs to do
Process.send(pid, {:internal_event_bus, :block_received, block}, [:noconnect]) | ||
|
||
# this waits for all messages in process inbox is processed | ||
_ = :sys.get_state(pid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
end | ||
|
||
test "calls get_next_pending_block/0 when triggered", %{pid: pid} do | ||
storage_pid = start_storage_mock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm... I don't understand how this works.
you're calling start_storage_mock/0
in every test, but this process is a named process:
GenServer.start_link(__MODULE__, args, name: __MODULE__)
.
The internal state of this module is being updated and accessed from all tests (except the ones below). So the only explanation is that these are not executed async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, so the reason is that everything inside a describe is executed top-down.
which wasn't obvious to me.
here's a couple of suggestions:
- move the DumbStorage to the bottom
- move setup_with_storage/0 to the bottom
couple DumpStorage with the instance of PendingBlockProcessor with an Agent. Agent holds key pairs %{pending block processor pid => dump storage pid}. then you can drop the DumpStorage naming.
To me, it's easier to reason about your tests. I didn't understand why all tests start new PendingBlockProcessor but they all funnel towards the same instance of storage.
Also, it seems you could have accomplished the same thing with Postgres Sandbox?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove the mocked storage, the genserver is too simple now to require mocking.
Application.fetch_env!(@app, :pending_block_processing_interval) | ||
end | ||
|
||
def pending_block_queue_length_check_interval() do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we find a compromise in naming?
would block_queue_check_interval
suffice?
similarly above. The word queue
implies pending. And queue
is a lot more common naming standard.
# TODO query to State used in tests instead of an event system, remove when event system is here | ||
fn -> | ||
if State.get_status() |> elem(0) <= block_nr, | ||
if State.get_status() |> elem(0) <= block_number, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what this is, but we need to be careful in comparisons.
number < atom < reference < fun < port < pid < tuple < map < nil < list < bit string
pending_block_processing_interval: 1000, | ||
block_queue_check_interval: 10_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no particular reason why these two constants live in a configuration, right? They're not reused or available to configuration from outside. They could live in the module itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It helps with seeing an overview of all the rhythms in the system though!
…nfo-block-processing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, approving with some minor comments
|
||
def start_phase(:attach_telemetry, :normal, _phase_args) do | ||
handlers = [ | ||
["measure-global", OMG.WatcherInfo.Measure.supported_events(), &OMG.WatcherInfo.Measure.handle_event/4, nil] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
["measure-global", OMG.WatcherInfo.Measure.supported_events(), &OMG.WatcherInfo.Measure.handle_event/4, nil] | |
["measure-watcher-info", OMG.WatcherInfo.Measure.supported_events(), &OMG.WatcherInfo.Measure.handle_event/4, nil] |
to follow the existing pattern:
"measure-state"
->OMG.State.Measure.*
"measure-db"
=>OMG.DB.Measure.*
# TODO query to State used in tests instead of an event system, remove when event system is here | ||
fn -> | ||
if State.get_status() |> elem(0) <= block_nr, | ||
if State.get_status() |> elem(0) <= block_number, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if State.get_status() |> elem(0) <= block_number, | |
if elem(State.get_status(), 0) <= block_number, |
@@ -170,6 +174,7 @@ defmodule OMG.WatcherInfo.DB.Block do | |||
|> prepare_inserts(db_txs_stream, "db_txs_", DB.Transaction) | |||
|> prepare_inserts(db_outputs_stream, "db_outputs_", DB.TxOutput) | |||
|> DB.TxOutput.spend_utxos(db_inputs) | |||
|> Ecto.Multi.delete("pending_block", pending_block, []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:mindblown: Interesting approach
(pending_block in __MODULE__) | ||
|> from(order_by: :blknum, limit: 1) | ||
|> Repo.all() | ||
|> Enum.at(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(pending_block in __MODULE__)
|> from(order_by: :blknum, limit: 1)
|> Repo.one()
a bit less implementation to worry about
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can also remove the limit: 1
;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can go with Repo.one
but need to keep the limit: 1
otherwise it'll raise when there is more than 1 pending block in the table, https://hexdocs.pm/ecto/Ecto.Repo.html#c:one/2 :)
}) do | ||
# sobelow_skip ["Misc.BinToTerm"] | ||
@spec insert_pending_block(PendingBlock.t()) :: {:ok, %__MODULE__{}} | {:error, any()} | ||
def insert_pending_block(pending_block) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is just me. I got confused whether it's inserting a new pending block vs. inserting an existing pending block into a proper block. Especially when reading from its caller, e.g:
elixir-omg/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor/storage.ex
Lines 29 to 31 in 0ee6d37
def process_block(block) do | |
Block.insert_pending_block(block) | |
end |
and when there are similar calls next to each other:
elixir-omg/apps/omg_watcher_info/test/omg_watcher_info/db/block_test.exs
Lines 334 to 336 in 0ee6d37
pending_block = insert(:pending_block, %{data: :erlang.term_to_binary(mined_block), blknum: 1000}) | |
{:ok, block} = DB.Block.insert_pending_block(pending_block) |
Can we name it insert_from_pending_block()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, will update 👍
:erlang.trace(pid, true, [:receive]) | ||
|
||
assert get_all_pending_blocks() == [] | ||
assert get_all_blocks() == [] | ||
|
||
%{blknum: blknum_1} = insert(:pending_block) | ||
%{blknum: blknum_2} = insert(:pending_block) | ||
%{blknum: blknum_3} = insert(:pending_block) | ||
|
||
assert_receive {:trace, ^pid, :receive, :timeout}, @interval + 1 | ||
get_state(pid) | ||
|
||
assert [%{blknum: ^blknum_1}] = get_all_blocks() | ||
assert [%{blknum: ^blknum_2}, %{blknum: ^blknum_3}] = get_all_pending_blocks() | ||
|
||
assert_receive {:trace, ^pid, :receive, :timeout} | ||
get_state(pid) | ||
|
||
assert [%{blknum: ^blknum_3}] = get_all_pending_blocks() | ||
assert [%{blknum: ^blknum_1}, %{blknum: ^blknum_2}] = get_all_blocks() | ||
|
||
assert_receive {:trace, ^pid, :receive, :timeout} | ||
get_state(pid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting use of trace & assert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Ino for the suggestion!
%{blknum: blknum_2} = insert(:pending_block) | ||
%{blknum: blknum_3} = insert(:pending_block) | ||
|
||
assert_receive {:trace, ^pid, :receive, :timeout}, @interval + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does + 1
represent? I'm assuming it's for the DB insert but not too sure and will be afraid to change.
Maybe name it in a variable or add as comment to help future maintainers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually not needed, just need to wait for at least @inteval
, will remove.
handler_id = {__MODULE__, :rand.uniform(100)} | ||
|
||
on_exit(fn -> | ||
:telemetry.detach(handler_id) | ||
end) | ||
|
||
Map.put(tags, :handler_id, handler_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice nice nice
pending_block_processing_interval: 1000, | ||
block_queue_check_interval: 10_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It helps with seeing an overview of all the rhythms in the system though!
* feat: sync v1.0.1 changes back to master (#1599) * Inomurko/reorg block getter (#1554) * dont store blockgetter events * dont fetch blockgetter events through aggregator * get_block_submitted_events * get_block_submitted_events * return tuple * the right contract encoding * unused api function * prevent race condition for status cache (#1558) * prevent race condition for status cache * Changelog for v1.0.0 (#1556) * restart strategy (#1565) * restart strategy * restart strategy * restart strategy * Update changelog for v1.0.0 * global block get interval (#1576) * Update changelog for v1.0.0 * feat: increase ExitProcessor timeouts (#1592) * increase timeouts * docs: changelog Co-authored-by: Unnawut Leepaisalsuwanna <unnawut@omisego.co> * chore: update watcher docker-compose to v1.0.1 * docs: small non-content fix to changelog Co-authored-by: Ino Murko <ino.murko@outlook.com> Co-authored-by: Thibault <thibault@omisego.co> * Add block processing queue to watcher info (#1560) * [WIP] initial queuing work * refactor: queue processing * catch db timeouts * update existing tests * add tests * continue with tests * finish tests * Fix integration test * refactor * refactor * refactor tests * remove retry_count * remove on exist genserver shutdown * add telemetry queue length event * sobelow skip BinToTerm * remove pending block status * naming * fix tests * fix tests * rename config * remove unused alias * fix PR minor comments * missing file * Add Transaction filter by end_datetime (#1595) * add new query * add if case * format * refactor * add test * fix feature spec * use address * fix assert * fix test * dummy assert * add get single tx * add wait tx * working waiter pooler * add wait helper cabbage test * test end_datetime querty * add swagger generator spec * use second * fix allow constraints * update constraints validator * fix test * fix test * edit name * add empty line' * more fixes * fix warning * refactor use the elixir way * improve test * fix test and typo * use cheap fn * sort * use any * use any boolean return * use Enum.all * final credo * format * use default params * update constaints test for end_datetime * mix format * @default_paging constant * fix test end_datetime * fix test and rename feature file' * mix format * Revert "explain analyze updates (#1569)" (#1601) This reverts commit 3431f26. This commit is intended to be only deployed to develop env, thus a short life commit. Reverting this in preparation of release. * release artifacts (#1597) * release artifacts Co-authored-by: Unnawut Leepaisalsuwanna <921194+unnawut@users.noreply.github.com> Co-authored-by: Ino Murko <ino.murko@outlook.com> Co-authored-by: Thibault <thibault@omisego.co> Co-authored-by: Mederic <32560642+mederic-p@users.noreply.github.com> Co-authored-by: Jarindr Thitadilaka <jarindr23@gmail.com>
* chore: merge master back to v1.0.2 (#1606) * feat: sync v1.0.1 changes back to master (#1599) * Inomurko/reorg block getter (#1554) * dont store blockgetter events * dont fetch blockgetter events through aggregator * get_block_submitted_events * get_block_submitted_events * return tuple * the right contract encoding * unused api function * prevent race condition for status cache (#1558) * prevent race condition for status cache * Changelog for v1.0.0 (#1556) * restart strategy (#1565) * restart strategy * restart strategy * restart strategy * Update changelog for v1.0.0 * global block get interval (#1576) * Update changelog for v1.0.0 * feat: increase ExitProcessor timeouts (#1592) * increase timeouts * docs: changelog Co-authored-by: Unnawut Leepaisalsuwanna <unnawut@omisego.co> * chore: update watcher docker-compose to v1.0.1 * docs: small non-content fix to changelog Co-authored-by: Ino Murko <ino.murko@outlook.com> Co-authored-by: Thibault <thibault@omisego.co> * Add block processing queue to watcher info (#1560) * [WIP] initial queuing work * refactor: queue processing * catch db timeouts * update existing tests * add tests * continue with tests * finish tests * Fix integration test * refactor * refactor * refactor tests * remove retry_count * remove on exist genserver shutdown * add telemetry queue length event * sobelow skip BinToTerm * remove pending block status * naming * fix tests * fix tests * rename config * remove unused alias * fix PR minor comments * missing file * Add Transaction filter by end_datetime (#1595) * add new query * add if case * format * refactor * add test * fix feature spec * use address * fix assert * fix test * dummy assert * add get single tx * add wait tx * working waiter pooler * add wait helper cabbage test * test end_datetime querty * add swagger generator spec * use second * fix allow constraints * update constraints validator * fix test * fix test * edit name * add empty line' * more fixes * fix warning * refactor use the elixir way * improve test * fix test and typo * use cheap fn * sort * use any * use any boolean return * use Enum.all * final credo * format * use default params * update constaints test for end_datetime * mix format * @default_paging constant * fix test end_datetime * fix test and rename feature file' * mix format * Revert "explain analyze updates (#1569)" (#1601) This reverts commit 3431f26. This commit is intended to be only deployed to develop env, thus a short life commit. Reverting this in preparation of release. * release artifacts (#1597) * release artifacts Co-authored-by: Unnawut Leepaisalsuwanna <921194+unnawut@users.noreply.github.com> Co-authored-by: Ino Murko <ino.murko@outlook.com> Co-authored-by: Thibault <thibault@omisego.co> Co-authored-by: Mederic <32560642+mederic-p@users.noreply.github.com> Co-authored-by: Jarindr Thitadilaka <jarindr23@gmail.com> * docs: v1.0.2 change logs (#1611) * chore: bump version in VERSION file (#1613) Co-authored-by: Unnawut Leepaisalsuwanna <921194+unnawut@users.noreply.github.com> Co-authored-by: Ino Murko <ino.murko@outlook.com> Co-authored-by: Thibault <thibault@omisego.co> Co-authored-by: Mederic <32560642+mederic-p@users.noreply.github.com> Co-authored-by: Jarindr Thitadilaka <jarindr23@gmail.com>
Overview
This PR adds a pending block queue to the watcher info.
Watcher info can take a lot of time to process blocks received so we store the block data as a big binary in a queue and then we process this queue one by one.
Queue length is regularly polled and sent to Datadog to check that we're not falling behind under the
pending_block_queue_length
metric name.The monitor should be updated.
Changes
pending_blocks
tablePendingBlockProcessor
genserver to processpending
blocks`PendingBlockQueueLengthChecker
genserver to poll queue lengthTesting
Run tests.