From e86052b78439a4d96f6becac206997c4242403c6 Mon Sep 17 00:00:00 2001 From: Kitipong Sirirueangsakul Date: Wed, 5 Jul 2023 19:51:31 +0700 Subject: [PATCH] adjust logic --- flusher/flusher/db.py | 24 +++++++++++------ flusher/flusher/handler.py | 55 +++++++++++++++++++++++++++++++------- flusher/flusher/init.py | 15 +++++++++++ hooks/emitter/oracle.go | 1 + 4 files changed, 78 insertions(+), 17 deletions(-) diff --git a/flusher/flusher/db.py b/flusher/flusher/db.py index 4fb493319..4f4f41d8c 100644 --- a/flusher/flusher/db.py +++ b/flusher/flusher/db.py @@ -59,8 +59,7 @@ class CustomDateTime(sa.types.TypeDecorator): impl = sa.DateTime def process_bind_param(self, value, dialect): - return datetime.fromtimestamp(value / 1e9) - + return datetime.fromtimestamp(value / 1e9) if value != None else None class CustomBase64(sa.types.TypeDecorator): """Custom LargeBinary type that accepts base64-encoded string.""" @@ -161,6 +160,7 @@ def Column(*args, **kwargs): Column("fee", sa.String), Column("transaction_id", sa.Integer, sa.ForeignKey("transactions.id"), nullable=True), Column("accumulated_revenue", sa.BigInteger), + Column("last_request", CustomDateTime, nullable=True), ) oracle_scripts = sa.Table( @@ -175,6 +175,7 @@ def Column(*args, **kwargs): Column("source_code_url", sa.String), Column("transaction_id", sa.Integer, sa.ForeignKey("transactions.id"), nullable=True), Column("version", sa.Integer, nullable=True), + Column("last_request", CustomDateTime, nullable=True), ) requests = sa.Table( @@ -394,6 +395,14 @@ def Column(*args, **kwargs): Column("count", sa.Integer), ) +data_source_requests_per_days = sa.Table( + "data_source_requests_per_days", + metadata, + Column("date", CustomDate, primary_key=True), + Column("data_source_id", sa.Integer, sa.ForeignKey("data_sources.id"), primary_key=True), + Column("count", sa.Integer), +) + oracle_script_requests = sa.Table( "oracle_script_requests", metadata, @@ -401,20 +410,19 @@ def Column(*args, **kwargs): Column("count", sa.Integer), ) -request_count_per_days = sa.Table( - "request_count_per_days", +oracle_script_requests_per_days = sa.Table( + "oracle_script_requests_per_days", metadata, Column("date", CustomDate, primary_key=True), + Column("oracle_script_id", sa.Integer, sa.ForeignKey("oracle_scripts.id"), primary_key=True), Column("count", sa.Integer), ) -request_count_per_oracle_script_and_days = sa.Table( - "request_count_per_oracle_script_and_days", +request_count_per_days = sa.Table( + "request_count_per_days", metadata, Column("date", CustomDate, primary_key=True), - Column("oracle_script_id", sa.Integer, sa.ForeignKey("oracle_scripts.id"), primary_key=True), Column("count", sa.Integer), - Column("last_update", CustomDateTime), ) incoming_packets = sa.Table( diff --git a/flusher/flusher/handler.py b/flusher/flusher/handler.py index 7121d5139..c2b3b7509 100644 --- a/flusher/flusher/handler.py +++ b/flusher/flusher/handler.py @@ -29,9 +29,10 @@ related_data_source_oracle_scripts, historical_oracle_statuses, data_source_requests, + data_source_requests_per_days, oracle_script_requests, + oracle_script_requests_per_days, request_count_per_days, - request_count_per_oracle_script_and_days, incoming_packets, outgoing_packets, counterparty_chains, @@ -64,9 +65,14 @@ def get_request_count(self, date): select([request_count_per_days.c.count]).where(request_count_per_days.c.date == date) ).scalar() - def get_request_per_oracle_script_count(self, date, oracle_script_id): + def get_oracle_script_requests_count_per_day(self, date, oracle_script_id): return self.conn.execute( - select([request_count_per_oracle_script_and_days.c.count]).where((request_count_per_oracle_script_and_days.c.date == date) & (request_count_per_oracle_script_and_days.c.oracle_script_id == oracle_script_id)) + select([oracle_script_requests_per_days.c.count]).where((oracle_script_requests_per_days.c.date == date) & (oracle_script_requests_per_days.c.oracle_script_id == oracle_script_id)) + ).scalar() + + def get_data_source_requests_count_per_day(self, date, data_source_id): + return self.conn.execute( + select([data_source_requests_per_days.c.count]).where((data_source_requests_per_days.c.date == date) & (data_source_requests_per_days.c.data_source_id == data_source_id)) ).scalar() def get_data_source_id(self, id): @@ -150,7 +156,8 @@ def handle_new_request(self, msg): del msg["tx_hash"] if "timestamp" in msg: self.handle_set_request_count_per_day({"date": msg["timestamp"]}) - self.handle_set_request_count_per_oracle_script_and_day({"date": msg["timestamp"], "oracle_script_id": msg["oracle_script_id"], "last_update": msg["timestamp"]}) + self.handle_update_oracle_script_requests_count_per_day({"date": msg["timestamp"], "oracle_script_id": msg["oracle_script_id"]}) + self.update_oracle_script_last_request(msg["oracle_script_id"], msg["timestamp"]) del msg["timestamp"] self.conn.execute(requests.insert(), msg) self.increase_oracle_script_count(msg["oracle_script_id"]) @@ -173,6 +180,10 @@ def handle_update_related_ds_os(self, msg): def handle_new_raw_request(self, msg): self.increase_data_source_count(msg["data_source_id"]) + if "timestamp" in msg: + self.handle_update_data_source_requests_count_per_day({"date": msg["timestamp"], "data_source_id": msg["data_source_id"]}) + self.update_data_source_last_request(msg["data_source_id"], msg["timestamp"]) + del msg["timestamp"] self.handle_update_related_ds_os( { "oracle_script_id": self.conn.execute( @@ -398,16 +409,28 @@ def handle_set_request_count_per_day(self, msg): request_count_per_days.update(condition).values(count=request_count_per_days.c.count + 1) ) - def handle_set_request_count_per_oracle_script_and_day(self, msg): - if self.get_request_per_oracle_script_count(msg["date"], msg["oracle_script_id"]) is None: + def handle_update_oracle_script_requests_count_per_day(self, msg): + if self.get_oracle_script_requests_count_per_day(msg["date"], msg["oracle_script_id"]) is None: + msg["count"] = 1 + self.conn.execute(oracle_script_requests_per_days.insert(), msg) + else: + condition = True + for col in oracle_script_requests_per_days.primary_key.columns.values(): + condition = (col == msg[col.name]) & condition + self.conn.execute( + oracle_script_requests_per_days.update(condition).values(count=oracle_script_requests_per_days.c.count + 1) + ) + + def handle_update_data_source_requests_count_per_day(self, msg): + if self.get_data_source_requests_count_per_day(msg["date"], msg["data_source_id"]) is None: msg["count"] = 1 - self.conn.execute(request_count_per_oracle_script_and_days.insert(), msg) + self.conn.execute(data_source_requests_per_days.insert(), msg) else: condition = True - for col in request_count_per_oracle_script_and_days.primary_key.columns.values(): + for col in data_source_requests_per_days.primary_key.columns.values(): condition = (col == msg[col.name]) & condition self.conn.execute( - request_count_per_oracle_script_and_days.update(condition).values(count=request_count_per_oracle_script_and_days.c.count + 1) + data_source_requests_per_days.update(condition).values(count=data_source_requests_per_days.c.count + 1) ) def handle_new_incoming_packet(self, msg): @@ -447,6 +470,20 @@ def increase_oracle_script_count(self, id): ) ) + def update_oracle_script_last_request(self, id, timestamp): + self.conn.execute( + oracle_scripts.update(oracle_scripts.c.oracle_script_id == id).values( + last_request=timestamp + ) + ) + + def update_data_source_last_request(self, id, timestamp): + self.conn.execute( + data_sources.update(data_sources.c.data_source_id == id).values( + last_request=timestamp + ) + ) + def handle_new_historical_bonded_token_on_validator(self, msg): self.conn.execute( insert(historical_bonded_token_on_validators) diff --git a/flusher/flusher/init.py b/flusher/flusher/init.py index d874d5f77..7a0ad4a75 100644 --- a/flusher/flusher/init.py +++ b/flusher/flusher/init.py @@ -168,6 +168,21 @@ def init(chain_id, topic, replay_topic, db): requests.resolve_status; """ ) + engine.execute( + """ +CREATE VIEW data_source_statistic_last_1_day +AS + SELECT data_sources.id, + count(*) AS count + FROM data_sources + join raw_requests + ON data_sources.id = raw_requests.data_source_id + join requests + ON raw_requests.request_id = requests.id + WHERE requests.request_time >= CAST(EXTRACT(epoch FROM NOW()) AS INT) - 86400 + GROUP BY data_sources.id; + """ + ) # TODO: replace select&group_by d.validator_id with d.delegator_id engine.execute( """ diff --git a/hooks/emitter/oracle.go b/hooks/emitter/oracle.go index 3a9d57ece..4693d98cd 100644 --- a/hooks/emitter/oracle.go +++ b/hooks/emitter/oracle.go @@ -69,6 +69,7 @@ func (h *Hook) emitRawRequestAndValRequest( "data_source_id": raw.DataSourceID, "fee": fee.Amount, "calldata": parseBytes(raw.Calldata), + "timestamp": ctx.BlockTime().UnixNano(), }) ds := h.oracleKeeper.MustGetDataSource(ctx, raw.DataSourceID) h.AddAccountsInTx(ds.Treasury)