diff --git a/gsrest/db/cassandra.py b/gsrest/db/cassandra.py index f250356e..cfb0a542 100644 --- a/gsrest/db/cassandra.py +++ b/gsrest/db/cassandra.py @@ -187,8 +187,7 @@ def merge_address_txs_subquery_results( ] results = heapq.nlargest(fetch_size, candidates, - key=partial(transaction_ordering_key, - tx_id_keys)) + key=partial(transaction_ordering_key, tx_id_keys)) border_tx_id = results[-1][tx_id_keys] if results else None return results, border_tx_id @@ -234,6 +233,7 @@ def build_select_address_txs_statement(network: str, node_type: NodeType, class Cassandra: + def eth(func): def check(*args, **kwargs): @@ -367,11 +367,23 @@ def load_parameters(self, keyspace): "token_config"] = self.load_token_configuration(keyspace) # check schema for compatibility and set parameter flags + keyspace_name = self.get_keyspace_mapping(keyspace, "transformed") + + if keyspace == "eth": + query = ("SELECT column_name FROM system_schema.columns " + "WHERE keyspace_name = %s AND " + "table_name = 'block_transactions';") + result = self.session.execute(query, (keyspace_name, )) + self.parameters[currency]["use_flat_block_txs"] = ("tx_id" in [ + x["column_name"] for x in result + ]) and keyspace == "eth" + else: + self.parameters[currency]["use_flat_block_txs"] = False query = ( "SELECT column_name FROM system_schema.columns " "WHERE keyspace_name = %s AND table_name = 'address_transactions';" ) - keyspace_name = self.get_keyspace_mapping(keyspace, "transformed") + result = self.session.execute(query, (keyspace_name, )) self.parameters[currency]["use_legacy_log_index"] = ("log_index" in [ x["column_name"] for x in result @@ -729,7 +741,8 @@ async def list_txs_by_node_type(self, id=id, tx_id_lower_bound=first_tx_id, tx_id_upper_bound=upper_bound, - is_outgoing=(direction == 'out' if direction is not None else None), + is_outgoing=(direction == 'out' + if direction is not None else None), include_assets=include_assets, page=page, fetch_size=fetch_size) @@ -772,17 +785,20 @@ async def list_block_txs_ids(self, currency, height): return [tx.tx_id for tx in result.one()['txs']] async def list_block_txs_ids_eth(self, currency, height): - if height is None: - return None - height_group = self.get_id_group(currency, height) - query = ("SELECT txs FROM block_transactions " - "WHERE block_id_group=%s and block_id=%s") - result = await self.execute_async( - currency, 'transformed', query, - [height_group, int(height)]) - if one(result) is None: - return None - return result.one()['txs'] + if not self.parameters[currency]["use_flat_block_txs"]: + if height is None: + return None + height_group = self.get_id_group(currency, height) + query = ("SELECT txs FROM block_transactions " + "WHERE block_id_group=%s and block_id=%s") + result = await self.execute_async( + currency, 'transformed', query, + [height_group, int(height)]) + if one(result) is None: + return None + return result.one()['txs'] + else: + return await self.list_block_txs_ids_trx(currency, height) async def list_block_txs_ids_trx(self, currency, height): if height is None: @@ -1921,19 +1937,19 @@ async def get_id_secondary_group_eth(self, currency, table, id_group): return 0 if result is None else \ result['max_secondary_id'] - async def list_address_txs_ordered( - self, - network: str, - node_type: NodeType, - id, - tx_id_lower_bound: Optional[int], - tx_id_upper_bound: Optional[int], - is_outgoing: Optional[bool], - include_assets: Sequence[Tuple[str, bool]], - page: Optional[int], - fetch_size: int, - cols: Optional[Sequence[str]] = None, - tx_ids: Optional[Sequence[int]] = None): + async def list_address_txs_ordered(self, + network: str, + node_type: NodeType, + id, + tx_id_lower_bound: Optional[int], + tx_id_upper_bound: Optional[int], + is_outgoing: Optional[bool], + include_assets: Sequence[Tuple[str, + bool]], + page: Optional[int], + fetch_size: int, + cols: Optional[Sequence[str]] = None, + tx_ids: Optional[Sequence[int]] = None): """Loads a address transactions in execution order it allows to only get out- or incoming transaction or only transactions of a certain asset (token), for a given address id @@ -1973,7 +1989,8 @@ async def list_address_txs_ordered( item_id_secondary_group = self.sec_in(secondary_id_group) - directions = [is_outgoing] if is_outgoing is not None else [False, True] + directions = [is_outgoing + ] if is_outgoing is not None else [False, True] results = [] """ Keep retrieving pages until the demanded fetch_size is fulfilled @@ -2087,7 +2104,8 @@ async def list_txs_by_node_type_eth(self, id=address, tx_id_lower_bound=first_tx_id, tx_id_upper_bound=upper_bound, - is_outgoing=(direction == 'out' if direction is not None else None), + is_outgoing=(direction == 'out' + if direction is not None else None), include_assets=include_assets, page=page, fetch_size=fetch_size)