Skip to content

Commit

Permalink
fix: allow eth with flat blocks_tx table
Browse files Browse the repository at this point in the history
  • Loading branch information
soad003 committed Feb 9, 2024
1 parent 48a5821 commit 2920a76
Showing 1 changed file with 48 additions and 30 deletions.
78 changes: 48 additions & 30 deletions gsrest/db/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ def merge_address_txs_subquery_results(
]
results = heapq.nlargest(fetch_size,
candidates,
key=partial(transaction_ordering_key,
tx_id_keys))
key=partial(transaction_ordering_key, tx_id_keys))

border_tx_id = results[-1][tx_id_keys] if results else None
return results, border_tx_id
Expand Down Expand Up @@ -234,6 +233,7 @@ def build_select_address_txs_statement(network: str, node_type: NodeType,


class Cassandra:

def eth(func):

def check(*args, **kwargs):
Expand Down Expand Up @@ -367,11 +367,23 @@ def load_parameters(self, keyspace):
"token_config"] = self.load_token_configuration(keyspace)

# check schema for compatibility and set parameter flags
keyspace_name = self.get_keyspace_mapping(keyspace, "transformed")

if keyspace == "eth":
query = ("SELECT column_name FROM system_schema.columns "
"WHERE keyspace_name = %s AND "
"table_name = 'block_transactions';")
result = self.session.execute(query, (keyspace_name, ))
self.parameters[currency]["use_flat_block_txs"] = ("tx_id" in [
x["column_name"] for x in result
]) and keyspace == "eth"
else:
self.parameters[currency]["use_flat_block_txs"] = False
query = (
"SELECT column_name FROM system_schema.columns "
"WHERE keyspace_name = %s AND table_name = 'address_transactions';"
)
keyspace_name = self.get_keyspace_mapping(keyspace, "transformed")

result = self.session.execute(query, (keyspace_name, ))
self.parameters[currency]["use_legacy_log_index"] = ("log_index" in [
x["column_name"] for x in result
Expand Down Expand Up @@ -729,7 +741,8 @@ async def list_txs_by_node_type(self,
id=id,
tx_id_lower_bound=first_tx_id,
tx_id_upper_bound=upper_bound,
is_outgoing=(direction == 'out' if direction is not None else None),
is_outgoing=(direction == 'out'
if direction is not None else None),
include_assets=include_assets,
page=page,
fetch_size=fetch_size)
Expand Down Expand Up @@ -772,17 +785,20 @@ async def list_block_txs_ids(self, currency, height):
return [tx.tx_id for tx in result.one()['txs']]

async def list_block_txs_ids_eth(self, currency, height):
if height is None:
return None
height_group = self.get_id_group(currency, height)
query = ("SELECT txs FROM block_transactions "
"WHERE block_id_group=%s and block_id=%s")
result = await self.execute_async(
currency, 'transformed', query,
[height_group, int(height)])
if one(result) is None:
return None
return result.one()['txs']
if not self.parameters[currency]["use_flat_block_txs"]:
if height is None:
return None
height_group = self.get_id_group(currency, height)
query = ("SELECT txs FROM block_transactions "
"WHERE block_id_group=%s and block_id=%s")
result = await self.execute_async(
currency, 'transformed', query,
[height_group, int(height)])
if one(result) is None:
return None
return result.one()['txs']
else:
return await self.list_block_txs_ids_trx(currency, height)

async def list_block_txs_ids_trx(self, currency, height):
if height is None:
Expand Down Expand Up @@ -1921,19 +1937,19 @@ async def get_id_secondary_group_eth(self, currency, table, id_group):
return 0 if result is None else \
result['max_secondary_id']

async def list_address_txs_ordered(
self,
network: str,
node_type: NodeType,
id,
tx_id_lower_bound: Optional[int],
tx_id_upper_bound: Optional[int],
is_outgoing: Optional[bool],
include_assets: Sequence[Tuple[str, bool]],
page: Optional[int],
fetch_size: int,
cols: Optional[Sequence[str]] = None,
tx_ids: Optional[Sequence[int]] = None):
async def list_address_txs_ordered(self,
network: str,
node_type: NodeType,
id,
tx_id_lower_bound: Optional[int],
tx_id_upper_bound: Optional[int],
is_outgoing: Optional[bool],
include_assets: Sequence[Tuple[str,
bool]],
page: Optional[int],
fetch_size: int,
cols: Optional[Sequence[str]] = None,
tx_ids: Optional[Sequence[int]] = None):
"""Loads a address transactions in execution order
it allows to only get out- or incoming transaction or only
transactions of a certain asset (token), for a given address id
Expand Down Expand Up @@ -1973,7 +1989,8 @@ async def list_address_txs_ordered(

item_id_secondary_group = self.sec_in(secondary_id_group)

directions = [is_outgoing] if is_outgoing is not None else [False, True]
directions = [is_outgoing
] if is_outgoing is not None else [False, True]
results = []
"""
Keep retrieving pages until the demanded fetch_size is fulfilled
Expand Down Expand Up @@ -2087,7 +2104,8 @@ async def list_txs_by_node_type_eth(self,
id=address,
tx_id_lower_bound=first_tx_id,
tx_id_upper_bound=upper_bound,
is_outgoing=(direction == 'out' if direction is not None else None),
is_outgoing=(direction == 'out'
if direction is not None else None),
include_assets=include_assets,
page=page,
fetch_size=fetch_size)
Expand Down

0 comments on commit 2920a76

Please sign in to comment.