-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add block processing queue to watcher info (#1560)
* [WIP] initial queuing work * refactor: queue processing * catch db timeouts * update existing tests * add tests * continue with tests * finish tests * Fix integration test * refactor * refactor * refactor tests * remove retry_count * remove on exist genserver shutdown * add telemetry queue length event * sobelow skip BinToTerm * remove pending block status * naming * fix tests * fix tests * rename config * remove unused alias * fix PR minor comments * missing file
- Loading branch information
Showing
33 changed files
with
933 additions
and
52 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 28 additions & 0 deletions
28
apps/omg_watcher_info/lib/omg_watcher_info/configuration.ex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Copyright 2019-2020 OmiseGO Pte Ltd | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
defmodule OMG.WatcherInfo.Configuration do | ||
@moduledoc """ | ||
Provides access to applications configuration | ||
""" | ||
@app :omg_watcher_info | ||
|
||
def pending_block_processing_interval() do | ||
Application.fetch_env!(@app, :pending_block_processing_interval) | ||
end | ||
|
||
def block_queue_check_interval() do | ||
Application.fetch_env!(@app, :block_queue_check_interval) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
66 changes: 66 additions & 0 deletions
66
apps/omg_watcher_info/lib/omg_watcher_info/db/pending_block.ex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
# Copyright 2019-2020 OmiseGO Pte Ltd | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
defmodule OMG.WatcherInfo.DB.PendingBlock do | ||
@moduledoc """ | ||
Ecto schema for pending block data | ||
These are valid block data received by the internal bus that should be stored in the database. | ||
This intermediate table is needed as the messages received by the bus are not persisted | ||
and if for some reason the writing to the database fails, we would lose these data. | ||
""" | ||
use Ecto.Schema | ||
use OMG.Utils.LoggerExt | ||
|
||
import Ecto.Changeset | ||
import Ecto.Query, only: [from: 2] | ||
|
||
alias OMG.WatcherInfo.DB.Repo | ||
|
||
@type t() :: %{ | ||
blknum: pos_integer(), | ||
data: binary() | ||
} | ||
|
||
@primary_key {:blknum, :integer, []} | ||
|
||
schema "pending_blocks" do | ||
field(:data, :binary) | ||
|
||
timestamps(type: :utc_datetime_usec) | ||
end | ||
|
||
@spec insert(map()) :: {:ok, %__MODULE__{}} | {:error, Ecto.Changeset.t()} | ||
def insert(params) do | ||
params | ||
|> insert_changeset() | ||
|> Repo.insert() | ||
end | ||
|
||
@spec get_next_to_process() :: nil | %__MODULE__{} | ||
def get_next_to_process() do | ||
(pending_block in __MODULE__) | ||
|> from(order_by: :blknum, limit: 1) | ||
|> Repo.one() | ||
end | ||
|
||
@spec get_count() :: non_neg_integer() | ||
def get_count(), do: Repo.aggregate(__MODULE__, :count) | ||
|
||
defp insert_changeset(params) do | ||
%__MODULE__{} | ||
|> cast(params, [:blknum, :data]) | ||
|> validate_required([:blknum, :data]) | ||
|> unique_constraint(:blknum, name: :pending_blocks_pkey) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
# Copyright 2019-2020 OmiseGO Pte Ltd | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
defmodule OMG.WatcherInfo.Measure do | ||
@moduledoc """ | ||
Counting business metrics sent to Datadog. | ||
""" | ||
import OMG.Status.Metric.Event, only: [name: 1] | ||
|
||
alias OMG.Status.Metric.Datadog | ||
alias OMG.WatcherInfo.PendingBlockQueueLengthChecker | ||
|
||
@supported_events [ | ||
[:pending_block_queue_length, PendingBlockQueueLengthChecker] | ||
] | ||
|
||
def supported_events(), do: @supported_events | ||
|
||
def handle_event([:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: length}, _state, _config) do | ||
_ = Datadog.gauge(name(:pending_block_queue_length), length) | ||
end | ||
end |
Oops, something went wrong.