diff --git a/discovery-provider/src/tasks/audius_data.py b/discovery-provider/src/tasks/audius_data.py index 68fe3de9b99..2a19f57d079 100644 --- a/discovery-provider/src/tasks/audius_data.py +++ b/discovery-provider/src/tasks/audius_data.py @@ -1,27 +1,36 @@ import logging +from collections import defaultdict from datetime import datetime from enum import Enum -from typing import Any, Dict, Set, Tuple +from typing import Any, Dict, List, Set, Tuple from sqlalchemy.orm.session import Session, make_transient from src.database_task import DatabaseTask from src.models.playlists.playlist import Playlist +from src.models.users.user import User from src.tasks.playlists import invalidate_old_playlist from src.utils import helpers from src.utils.user_event_constants import audius_data_event_types_arr +from web3.datastructures import AttributeDict logger = logging.getLogger(__name__) -class Action(Enum): +class Action(str, Enum): CREATE = "Create" UPDATE = "Update" DELETE = "Delete" + def __str__(self) -> str: + return str.__str__(self) -class EntityType(Enum): + +class EntityType(str, Enum): PLAYLIST = "Playlist" + def __str__(self) -> str: + return str.__str__(self) + def audius_data_state_update( self, @@ -31,166 +40,287 @@ def audius_data_state_update( block_number, block_timestamp, block_hash, - ipfs_metadata, + ipfs_metadata ) -> Tuple[int, Dict[str, Set[(int)]]]: - num_total_changes = 0 + try: + num_total_changes = 0 + event_blockhash = update_task.web3.toHex(block_hash) - changed_entity_ids: Dict[str, Set[(int)]] = {} + changed_entity_ids: Dict[str, Set[(int)]] = defaultdict(set) - if not audius_data_txs: - return num_total_changes, changed_entity_ids + if not audius_data_txs: + return num_total_changes, changed_entity_ids - playlist_events_lookup: Dict[int, Dict[str, Any]] = {} - # This stores the playlist_ids created or updated in the set of transactions - playlist_ids: Set[int] = set() + entities_to_fetch: Dict[EntityType, Set[int]] = defaultdict(set) + users_to_fetch: Set[int] = set() - for tx_receipt in audius_data_txs: - logger.info(f"AudiusData.py | Processing {tx_receipt}") - txhash = update_task.web3.toHex(tx_receipt.transactionHash) - for event_type in audius_data_event_types_arr: - audius_data_event_tx = get_audius_data_events_tx( - update_task, event_type, tx_receipt - ) + # collect events by entity type and action + collect_entities_to_fetch( + update_task, audius_data_txs, entities_to_fetch, users_to_fetch + ) - processed_entries = 0 - # TODO: Batch reject operations for mismatched signer/userId - for entry in audius_data_event_tx: - user_id = helpers.get_tx_arg(entry, "_userId") - entity_id = helpers.get_tx_arg(entry, "_entityId") - entity_type = helpers.get_tx_arg(entry, "_entityType") - action = helpers.get_tx_arg(entry, "_action") - metadata_cid = helpers.get_tx_arg(entry, "_metadata") - signer = helpers.get_tx_arg(entry, "_signer") - metadata = ( - ipfs_metadata[metadata_cid] - if metadata_cid in ipfs_metadata - else None - ) - logger.info( - f"index.py | AudiusData state update: {user_id}, entity_id={entity_id}, entity_type={entity_type}, action={action}, metadata_cid={metadata_cid}, metadata={metadata} signer={signer}" + # fetch existing playlists + existing_playlist_id_to_playlist: Dict[int, Playlist] = fetch_existing_entities( + session, entities_to_fetch + ) + + # fetch users + existing_user_id_to_user: Dict[int, User] = fetch_users(session, users_to_fetch) + + playlists_to_save: Dict[int, List[Playlist]] = defaultdict(list) + # process in tx order and populate playlists_to_save + for tx_receipt in audius_data_txs: + txhash = update_task.web3.toHex(tx_receipt.transactionHash) + for event_type in audius_data_event_types_arr: + audius_data_event_tx = get_audius_data_events_tx( + update_task, event_type, tx_receipt ) - # Handle playlist creation - if entity_type == EntityType.PLAYLIST.value: - playlist_id = entity_id - logger.info( - f"index.py | AudiusData - playlist detected, id={playlist_id}" + for event in audius_data_event_tx: + params = ManagePlaylistParameters( + event, + playlists_to_save, # actions below populate these records + existing_playlist_id_to_playlist, + existing_user_id_to_user, + ipfs_metadata, + block_timestamp, + block_number, + event_blockhash, + txhash, ) - # look up or populate existing record - if playlist_id in playlist_events_lookup: - existing_playlist_record = playlist_events_lookup[playlist_id][ - "playlist" - ] - else: - existing_playlist_record = lookup_playlist_data_record( - update_task, - session, - playlist_id, - block_number, - block_hash, - txhash, - ) - - if action == Action.CREATE.value or Action.UPDATE.value: - logger.info( - f"index.py | AudiusData - handling {action}, events_lookup={playlist_events_lookup}" - ) - playlist_record = parse_playlist_create_data_event( - update_task, - entry, - user_id, - existing_playlist_record, - metadata, - block_timestamp, - session, - ) - - elif Action.DELETE.value: - existing_playlist_record.is_delete = True - playlist_record = existing_playlist_record - - if playlist_record is not None: - if playlist_id not in playlist_events_lookup: - playlist_events_lookup[playlist_id] = { - "playlist": playlist_record, - "events": [], - } - else: - playlist_events_lookup[playlist_id][ - "playlist" - ] = playlist_record - playlist_events_lookup[playlist_id]["events"].append(event_type) - playlist_ids.add(playlist_id) - processed_entries += 1 - - num_total_changes += processed_entries - - # Update changed entity dictionary - changed_entity_ids["playlist"] = playlist_ids - - for playlist_id, value_obj in playlist_events_lookup.items(): - logger.info( - f"index.py | AudiusData | playlists.py | Adding {value_obj['playlist']})" - ) - if value_obj["events"]: - invalidate_old_playlist(session, playlist_id) - session.add(value_obj["playlist"]) + if ( + params.action == Action.CREATE + and params.entity_type == EntityType.PLAYLIST + ): + create_playlist(params) + elif ( + params.action == Action.UPDATE + and params.entity_type == EntityType.PLAYLIST + ): + update_playlist(params) + + elif ( + params.action == Action.DELETE + and params.entity_type == EntityType.PLAYLIST + ): + delete_playlist(params) + + # compile records_to_save + records_to_save = [] + for _, playlist_records in playlists_to_save.items(): + # flip is_current to true for the last tx in each playlist + playlist_records[-1].is_current = True + records_to_save.extend(playlist_records) + # insert/update all playlist records in this block + session.bulk_save_objects(records_to_save) + num_total_changes += len(records_to_save) + + except Exception as e: + logger.error(f"Exception occurred {e}", exc_info=True) return num_total_changes, changed_entity_ids -def get_audius_data_events_tx(update_task, event_type, tx_receipt): - return getattr( - update_task.audius_data_contract.events, event_type - )().processReceipt(tx_receipt) +class ManagePlaylistParameters: + def __init__( + self, + event: AttributeDict, + playlists_to_save: Dict[int, List[Playlist]], + existing_playlist_id_to_playlist: Dict[int, Playlist], + existing_user_id_to_user: Dict[int, User], + ipfs_metadata: Dict[str, Dict[str, Any]], + block_timestamp: int, + block_number: int, + event_blockhash: str, + txhash: str, + ): + self.user_id = helpers.get_tx_arg(event, "_userId") + self.entity_id = helpers.get_tx_arg(event, "_entityId") + self.entity_type = helpers.get_tx_arg(event, "_entityType") + self.action = helpers.get_tx_arg(event, "_action") + self.metadata_cid = helpers.get_tx_arg(event, "_metadata") + self.signer = helpers.get_tx_arg(event, "_signer") + self.block_datetime = datetime.utcfromtimestamp(block_timestamp) + self.block_integer_time = int(block_timestamp) + self.event = event + self.ipfs_metadata = ipfs_metadata + self.existing_playlist_id_to_playlist = existing_playlist_id_to_playlist + self.block_number = block_number + self.event_blockhash = event_blockhash + self.txhash = txhash + self.existing_user_id_to_user = existing_user_id_to_user + self.playlists_to_save = playlists_to_save -def lookup_playlist_data_record( - update_task, session, playlist_id, block_number, block_hash, txhash -): - event_blockhash = update_task.web3.toHex(block_hash) - # Check if playlist record is in the DB - playlist_exists = ( - session.query(Playlist).filter_by(playlist_id=playlist_id).count() > 0 + +def create_playlist(params: ManagePlaylistParameters): + metadata = params.ipfs_metadata[params.metadata_cid] + # check if playlist already exists + if params.entity_id in params.existing_playlist_id_to_playlist: + return + + track_ids = metadata.get("playlist_contents", []) + playlist_contents = [] + for track_id in track_ids: + playlist_contents.append({"track": track_id, "time": params.block_integer_time}) + create_playlist_record = Playlist( + playlist_id=params.entity_id, + playlist_owner_id=params.user_id, + is_album=metadata.get("is_album", False), + description=metadata["description"], + playlist_image_multihash=metadata["playlist_image_sizes_multihash"], + playlist_image_sizes_multihash=metadata["playlist_image_sizes_multihash"], + playlist_name=metadata["playlist_name"], + is_private=metadata.get("is_private", False), + playlist_contents={"track_ids": playlist_contents}, + created_at=params.block_datetime, + updated_at=params.block_datetime, + blocknumber=params.block_number, + blockhash=params.event_blockhash, + txhash=params.txhash, + is_current=False, + is_delete=False, + ) + + params.playlists_to_save[params.entity_id].append(create_playlist_record) + + +def update_playlist(params: ManagePlaylistParameters): + metadata = params.ipfs_metadata[params.metadata_cid] + # check user owns playlist + if ( + params.signer.lower() + != params.existing_user_id_to_user[params.user_id].wallet.lower() + ): + return + existing_playlist = params.existing_playlist_id_to_playlist[params.entity_id] + existing_playlist.is_current = False # invalidate + if ( + params.entity_id in params.playlists_to_save + ): # override with last updated playlist is in this block + existing_playlist = params.playlists_to_save[params.entity_id][-1] + + updated_playlist = copy_record( + existing_playlist, params.block_number, params.event_blockhash, params.txhash + ) + parse_playlist_create_data_event( + updated_playlist, + metadata, + params.block_integer_time, + params.block_datetime, + ) + params.playlists_to_save[params.entity_id].append(updated_playlist) + + +def delete_playlist(params: ManagePlaylistParameters): + # check user owns playlist + if ( + params.signer.lower() + != params.existing_user_id_to_user[params.user_id].wallet.lower() + ): + return + existing_playlist = params.existing_playlist_id_to_playlist[params.entity_id] + existing_playlist.is_current = False # invalidate old playlist + if params.entity_id in params.playlists_to_save: + # override with last updated playlist is in this block + existing_playlist = params.playlists_to_save[params.entity_id][-1] + + deleted_playlist = copy_record( + existing_playlist, params.block_number, params.event_blockhash, params.txhash ) + deleted_playlist.is_delete = True + + params.playlists_to_save[params.entity_id].append(deleted_playlist) + + +def collect_entities_to_fetch( + update_task, + audius_data_txs, + entities_to_fetch: Dict[EntityType, Set[int]], + users_to_fetch: Set[int], +): + for tx_receipt in audius_data_txs: + for event_type in audius_data_event_types_arr: + audius_data_event_tx = get_audius_data_events_tx( + update_task, event_type, tx_receipt + ) + for event in audius_data_event_tx: + entity_id = helpers.get_tx_arg(event, "_entityId") + entity_type = helpers.get_tx_arg(event, "_entityType") + user_id = helpers.get_tx_arg(event, "_userId") + + entities_to_fetch[entity_type].add(entity_id) + users_to_fetch.add(user_id) - playlist_record = None - if playlist_exists: - playlist_record = ( - session.query(Playlist) - .filter(Playlist.playlist_id == playlist_id, Playlist.is_current == True) - .first() + +def fetch_existing_entities( + session: Session, entities_to_fetch: Dict[EntityType, Set[int]] +): + existing_playlist_id_to_playlist: Dict[int, Playlist] = {} + existing_playlists_query = ( + session.query(Playlist) + .filter( + Playlist.playlist_id.in_(entities_to_fetch[EntityType.PLAYLIST]), + Playlist.is_current == True, ) + .all() + ) + for existing_playlist in existing_playlists_query: + existing_playlist_id_to_playlist[ + existing_playlist.playlist_id + ] = existing_playlist + return existing_playlist_id_to_playlist + - # expunge the result from sqlalchemy so we can modify it without UPDATE statements being made - # https://stackoverflow.com/questions/28871406/how-to-clone-a-sqlalchemy-db-object-with-new-primary-key - session.expunge(playlist_record) - make_transient(playlist_record) - else: - playlist_record = Playlist( - playlist_id=playlist_id, is_current=True, is_delete=False +def fetch_users(session: Session, users_to_fetch: Set[int]): + existing_user_id_to_user: Dict[int, User] = {} + existing_users_query = ( + session.query(User) + .filter( + User.user_id.in_(users_to_fetch), + User.is_current == True, ) + .all() + ) + for user in existing_users_query: + existing_user_id_to_user[user.user_id] = user + return existing_user_id_to_user - # update these fields regardless of type - playlist_record.blocknumber = block_number - playlist_record.blockhash = event_blockhash - playlist_record.txhash = txhash - return playlist_record +def copy_record(old_playlist: Playlist, block_number, event_blockhash, txhash): + new_playlist = Playlist( + playlist_id=old_playlist.playlist_id, + playlist_owner_id=old_playlist.playlist_owner_id, + is_album=old_playlist.is_album, + description=old_playlist.description, + playlist_image_multihash=old_playlist.playlist_image_multihash, + playlist_image_sizes_multihash=old_playlist.playlist_image_sizes_multihash, + playlist_name=old_playlist.playlist_name, + is_private=old_playlist.is_private, + playlist_contents=old_playlist.playlist_contents, + created_at=old_playlist.created_at, + updated_at=old_playlist.updated_at, + blocknumber=block_number, + blockhash=event_blockhash, + txhash=txhash, + is_current=False, + is_delete=old_playlist.is_delete, + ) + return new_playlist + + +def get_audius_data_events_tx(update_task, event_type, tx_receipt): + return getattr( + update_task.audius_data_contract.events, event_type + )().processReceipt(tx_receipt) # Create playlist specific def parse_playlist_create_data_event( - update_task, - entry, - playlist_owner_id, - playlist_record, + playlist_record: Playlist, playlist_metadata, - block_timestamp, - session, + block_integer_time, + block_datetime, ): - block_datetime = datetime.utcfromtimestamp(block_timestamp) - block_integer_time = int(block_timestamp) - playlist_record.playlist_owner_id = playlist_owner_id playlist_record.is_album = ( playlist_metadata["is_album"] if "is_album" in playlist_metadata else False ) diff --git a/discovery-provider/src/tasks/index.py b/discovery-provider/src/tasks/index.py index cf3348243ec..dfc322234a1 100644 --- a/discovery-provider/src/tasks/index.py +++ b/discovery-provider/src/tasks/index.py @@ -328,7 +328,8 @@ def fetch_cid_metadata(db, user_factory_txs, track_factory_txs, audius_data_txs) user_id = event_args._userId entity_type = event_args._entityType cid = event_args._metadata - # TODO - skip if not a multihash + if not cid: + continue logger.info( f"index.py | newcontract {txhash}, {event_args}, {entity_type}, {cid}" ) diff --git a/libs/eth-contracts/ABIs/Wormhole.json b/libs/eth-contracts/ABIs/Wormhole.json index 830697357d2..7dc4a28bee0 100644 --- a/libs/eth-contracts/ABIs/Wormhole.json +++ b/libs/eth-contracts/ABIs/Wormhole.json @@ -1,12 +1,71 @@ { "contractName": "Wormhole", "abi": [ + { + "constant": true, + "inputs": [], + "name": "DOMAIN_SEPARATOR", + "outputs": [ + { + "internalType": "bytes32", + "name": "", + "type": "bytes32" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "LOCK_ASSETS_TYPEHASH", + "outputs": [ + { + "internalType": "bytes32", + "name": "", + "type": "bytes32" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "internalType": "address", + "name": "_tokenAddress", + "type": "address" + }, + { + "internalType": "address", + "name": "_wormholeAddress", + "type": "address" + } + ], + "name": "initialize", + "outputs": [], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": false, + "inputs": [], + "name": "initialize", + "outputs": [], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, { "constant": false, "inputs": [ { "internalType": "address", - "name": "token", + "name": "from", "type": "address" }, { @@ -14,32 +73,83 @@ "name": "amount", "type": "uint256" }, - { - "internalType": "uint16", - "name": "recipientChain", - "type": "uint16" - }, { "internalType": "bytes32", "name": "recipient", "type": "bytes32" }, + { + "internalType": "uint8", + "name": "targetChain", + "type": "uint8" + }, + { + "internalType": "bool", + "name": "refundDust", + "type": "bool" + }, { "internalType": "uint256", - "name": "arbiterFee", + "name": "deadline", "type": "uint256" }, { - "internalType": "uint32", - "name": "nonce", - "type": "uint32" + "internalType": "uint8", + "name": "v", + "type": "uint8" + }, + { + "internalType": "bytes32", + "name": "r", + "type": "bytes32" + }, + { + "internalType": "bytes32", + "name": "s", + "type": "bytes32" } ], - "name": "transferTokens", + "name": "lockAssets", "outputs": [], "payable": false, "stateMutability": "nonpayable", "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "internalType": "address", + "name": "", + "type": "address" + } + ], + "name": "nonces", + "outputs": [ + { + "internalType": "uint32", + "name": "", + "type": "uint32" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "token", + "outputs": [ + { + "internalType": "address", + "name": "", + "type": "address" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" } ] } \ No newline at end of file diff --git a/libs/src/api/entityManager.ts b/libs/src/api/entityManager.ts index 612a71ae834..e4be874474c 100644 --- a/libs/src/api/entityManager.ts +++ b/libs/src/api/entityManager.ts @@ -167,15 +167,14 @@ export class EntityManager extends Base { */ async deletePlaylist({ playlistId, - userId + logger = console }: { playlistId: number - userId: number logger: any - }): Promise { - const responseValues: PlaylistOperationResponse = - this.getDefaultPlaylistReponseValues() + }): Promise<{ blockHash: any; blockNumber: any }> { + const userId: number = parseInt(this.userStateManager.getCurrentUserId()) try { + const resp = await this.manageEntity({ userId, entityType: EntityType.PLAYLIST,