Skip to content

Commit

Permalink
adjust logic
Browse files Browse the repository at this point in the history
  • Loading branch information
RogerKSI committed Jul 5, 2023
1 parent 176f629 commit e86052b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 17 deletions.
24 changes: 16 additions & 8 deletions flusher/flusher/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class CustomDateTime(sa.types.TypeDecorator):
impl = sa.DateTime

def process_bind_param(self, value, dialect):
return datetime.fromtimestamp(value / 1e9)

return datetime.fromtimestamp(value / 1e9) if value != None else None

class CustomBase64(sa.types.TypeDecorator):
"""Custom LargeBinary type that accepts base64-encoded string."""
Expand Down Expand Up @@ -161,6 +160,7 @@ def Column(*args, **kwargs):
Column("fee", sa.String),
Column("transaction_id", sa.Integer, sa.ForeignKey("transactions.id"), nullable=True),
Column("accumulated_revenue", sa.BigInteger),
Column("last_request", CustomDateTime, nullable=True),
)

oracle_scripts = sa.Table(
Expand All @@ -175,6 +175,7 @@ def Column(*args, **kwargs):
Column("source_code_url", sa.String),
Column("transaction_id", sa.Integer, sa.ForeignKey("transactions.id"), nullable=True),
Column("version", sa.Integer, nullable=True),
Column("last_request", CustomDateTime, nullable=True),
)

requests = sa.Table(
Expand Down Expand Up @@ -394,27 +395,34 @@ def Column(*args, **kwargs):
Column("count", sa.Integer),
)

data_source_requests_per_days = sa.Table(
"data_source_requests_per_days",
metadata,
Column("date", CustomDate, primary_key=True),
Column("data_source_id", sa.Integer, sa.ForeignKey("data_sources.id"), primary_key=True),
Column("count", sa.Integer),
)

oracle_script_requests = sa.Table(
"oracle_script_requests",
metadata,
Column("oracle_script_id", sa.Integer, sa.ForeignKey("oracle_scripts.id"), primary_key=True),
Column("count", sa.Integer),
)

request_count_per_days = sa.Table(
"request_count_per_days",
oracle_script_requests_per_days = sa.Table(
"oracle_script_requests_per_days",
metadata,
Column("date", CustomDate, primary_key=True),
Column("oracle_script_id", sa.Integer, sa.ForeignKey("oracle_scripts.id"), primary_key=True),
Column("count", sa.Integer),
)

request_count_per_oracle_script_and_days = sa.Table(
"request_count_per_oracle_script_and_days",
request_count_per_days = sa.Table(
"request_count_per_days",
metadata,
Column("date", CustomDate, primary_key=True),
Column("oracle_script_id", sa.Integer, sa.ForeignKey("oracle_scripts.id"), primary_key=True),
Column("count", sa.Integer),
Column("last_update", CustomDateTime),
)

incoming_packets = sa.Table(
Expand Down
55 changes: 46 additions & 9 deletions flusher/flusher/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
related_data_source_oracle_scripts,
historical_oracle_statuses,
data_source_requests,
data_source_requests_per_days,
oracle_script_requests,
oracle_script_requests_per_days,
request_count_per_days,
request_count_per_oracle_script_and_days,
incoming_packets,
outgoing_packets,
counterparty_chains,
Expand Down Expand Up @@ -64,9 +65,14 @@ def get_request_count(self, date):
select([request_count_per_days.c.count]).where(request_count_per_days.c.date == date)
).scalar()

def get_request_per_oracle_script_count(self, date, oracle_script_id):
def get_oracle_script_requests_count_per_day(self, date, oracle_script_id):
return self.conn.execute(
select([request_count_per_oracle_script_and_days.c.count]).where((request_count_per_oracle_script_and_days.c.date == date) & (request_count_per_oracle_script_and_days.c.oracle_script_id == oracle_script_id))
select([oracle_script_requests_per_days.c.count]).where((oracle_script_requests_per_days.c.date == date) & (oracle_script_requests_per_days.c.oracle_script_id == oracle_script_id))
).scalar()

def get_data_source_requests_count_per_day(self, date, data_source_id):
return self.conn.execute(
select([data_source_requests_per_days.c.count]).where((data_source_requests_per_days.c.date == date) & (data_source_requests_per_days.c.data_source_id == data_source_id))
).scalar()

def get_data_source_id(self, id):
Expand Down Expand Up @@ -150,7 +156,8 @@ def handle_new_request(self, msg):
del msg["tx_hash"]
if "timestamp" in msg:
self.handle_set_request_count_per_day({"date": msg["timestamp"]})
self.handle_set_request_count_per_oracle_script_and_day({"date": msg["timestamp"], "oracle_script_id": msg["oracle_script_id"], "last_update": msg["timestamp"]})
self.handle_update_oracle_script_requests_count_per_day({"date": msg["timestamp"], "oracle_script_id": msg["oracle_script_id"]})
self.update_oracle_script_last_request(msg["oracle_script_id"], msg["timestamp"])
del msg["timestamp"]
self.conn.execute(requests.insert(), msg)
self.increase_oracle_script_count(msg["oracle_script_id"])
Expand All @@ -173,6 +180,10 @@ def handle_update_related_ds_os(self, msg):

def handle_new_raw_request(self, msg):
self.increase_data_source_count(msg["data_source_id"])
if "timestamp" in msg:
self.handle_update_data_source_requests_count_per_day({"date": msg["timestamp"], "data_source_id": msg["data_source_id"]})
self.update_data_source_last_request(msg["data_source_id"], msg["timestamp"])
del msg["timestamp"]
self.handle_update_related_ds_os(
{
"oracle_script_id": self.conn.execute(
Expand Down Expand Up @@ -398,16 +409,28 @@ def handle_set_request_count_per_day(self, msg):
request_count_per_days.update(condition).values(count=request_count_per_days.c.count + 1)
)

def handle_set_request_count_per_oracle_script_and_day(self, msg):
if self.get_request_per_oracle_script_count(msg["date"], msg["oracle_script_id"]) is None:
def handle_update_oracle_script_requests_count_per_day(self, msg):
if self.get_oracle_script_requests_count_per_day(msg["date"], msg["oracle_script_id"]) is None:
msg["count"] = 1
self.conn.execute(oracle_script_requests_per_days.insert(), msg)
else:
condition = True
for col in oracle_script_requests_per_days.primary_key.columns.values():
condition = (col == msg[col.name]) & condition
self.conn.execute(
oracle_script_requests_per_days.update(condition).values(count=oracle_script_requests_per_days.c.count + 1)
)

def handle_update_data_source_requests_count_per_day(self, msg):
if self.get_data_source_requests_count_per_day(msg["date"], msg["data_source_id"]) is None:
msg["count"] = 1
self.conn.execute(request_count_per_oracle_script_and_days.insert(), msg)
self.conn.execute(data_source_requests_per_days.insert(), msg)
else:
condition = True
for col in request_count_per_oracle_script_and_days.primary_key.columns.values():
for col in data_source_requests_per_days.primary_key.columns.values():
condition = (col == msg[col.name]) & condition
self.conn.execute(
request_count_per_oracle_script_and_days.update(condition).values(count=request_count_per_oracle_script_and_days.c.count + 1)
data_source_requests_per_days.update(condition).values(count=data_source_requests_per_days.c.count + 1)
)

def handle_new_incoming_packet(self, msg):
Expand Down Expand Up @@ -447,6 +470,20 @@ def increase_oracle_script_count(self, id):
)
)

def update_oracle_script_last_request(self, id, timestamp):
self.conn.execute(
oracle_scripts.update(oracle_scripts.c.oracle_script_id == id).values(
last_request=timestamp
)
)

def update_data_source_last_request(self, id, timestamp):
self.conn.execute(
data_sources.update(data_sources.c.data_source_id == id).values(
last_request=timestamp
)
)

def handle_new_historical_bonded_token_on_validator(self, msg):
self.conn.execute(
insert(historical_bonded_token_on_validators)
Expand Down
15 changes: 15 additions & 0 deletions flusher/flusher/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ def init(chain_id, topic, replay_topic, db):
requests.resolve_status;
"""
)
engine.execute(
"""
CREATE VIEW data_source_statistic_last_1_day
AS
SELECT data_sources.id,
count(*) AS count
FROM data_sources
join raw_requests
ON data_sources.id = raw_requests.data_source_id
join requests
ON raw_requests.request_id = requests.id
WHERE requests.request_time >= CAST(EXTRACT(epoch FROM NOW()) AS INT) - 86400
GROUP BY data_sources.id;
"""
)
# TODO: replace select&group_by d.validator_id with d.delegator_id
engine.execute(
"""
Expand Down
1 change: 1 addition & 0 deletions hooks/emitter/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (h *Hook) emitRawRequestAndValRequest(
"data_source_id": raw.DataSourceID,
"fee": fee.Amount,
"calldata": parseBytes(raw.Calldata),
"timestamp": ctx.BlockTime().UnixNano(),
})
ds := h.oracleKeeper.MustGetDataSource(ctx, raw.DataSourceID)
h.AddAccountsInTx(ds.Treasury)
Expand Down

0 comments on commit e86052b

Please sign in to comment.