diff --git a/creator-node/compose/env/base.env b/creator-node/compose/env/base.env index c90935fc737..872e6d4a6af 100644 --- a/creator-node/compose/env/base.env +++ b/creator-node/compose/env/base.env @@ -74,3 +74,6 @@ discoveryNodeUnhealthyBlockDiff=500 maxBatchClockStatusBatchSize=5 reconfigSPIdBlacklistString= + +entityManagerAddress=0x5b9b42d6e4B2e4Bf8d42Eba32D46918e10899B66 +entityManagerReplicaSetEnabled=true diff --git a/creator-node/src/config.js b/creator-node/src/config.js index b19097a0a9e..0011e270d66 100644 --- a/creator-node/src/config.js +++ b/creator-node/src/config.js @@ -392,6 +392,12 @@ const config = convict({ env: 'dataRegistryAddress', default: null }, + entityManagerAddress: { + doc: 'entity manager registry address', + format: String, + env: 'entityManagerAddress', + default: '0x2F99338637F027CFB7494E46B49987457beCC6E3' + }, dataProviderUrl: { doc: 'data contracts web3 provider url', format: String, @@ -470,6 +476,12 @@ const config = convict({ env: 'considerNodeUnhealthy', default: false }, + entityManagerReplicaSetEnabled: { + doc: 'whether or not to use entity manager to update the replica set', + format: Boolean, + env: 'entityManagerReplicaSetEnabled', + default: false + }, /** sync / snapback configs */ diff --git a/creator-node/src/middlewares.js b/creator-node/src/middlewares.js index c715c997b4b..c8e099362fa 100644 --- a/creator-node/src/middlewares.js +++ b/creator-node/src/middlewares.js @@ -752,7 +752,6 @@ async function getReplicaSetSpIDs({ const RETRY_TIMEOUT_MS = 1000 // 1 seconds let errorMsg = null - let blockNumberIndexed = false for (let retry = 1; retry <= MAX_RETRIES; retry++) { logger.info( `${logPrefix} retry #${retry}/${MAX_RETRIES} || time from start: ${ @@ -761,15 +760,40 @@ async function getReplicaSetSpIDs({ ) try { - // will throw error if blocknumber not found - replicaSet = - await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSetAtBlockNumber( - userId, + if (config.get('entityManagerReplicaSetEnabled')) { + // will throw error if blocknumber not found + const encodedUserId = libs.Utils.encodeHashId(userId) + const spResponse = await libs.discoveryProvider.getUserReplicaSet({ + encodedUserId, blockNumber - ) - errorMsg = null - blockNumberIndexed = true - break + }) + + if (spResponse) { + if (spResponse.primarySpID) { + replicaSet = { + primaryId: spResponse.primarySpID, + secondaryIds: [ + spResponse.secondary1SpID, + spResponse.secondary2SpID + ] + } + errorMsg = null + break + } else { + // The blocknumber was indexed by discovery, but there's still no user replica set returned + errorMsg = `User replica not found in discovery` + break + } + } + } else { + replicaSet = + await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSetAtBlockNumber( + userId, + blockNumber + ) + errorMsg = null + break + } } catch (e) { errorMsg = e.message } // Ignore all errors until MAX_RETRIES exceeded @@ -779,22 +803,14 @@ async function getReplicaSetSpIDs({ // Error if indexed blockNumber but didn't find any replicaSet for user if ( - blockNumberIndexed && - (!replicaSet || - !replicaSet.hasOwnProperty('primaryId') || - !replicaSet.primaryId) + !replicaSet || + !replicaSet.hasOwnProperty('primaryId') || + !replicaSet.primaryId ) { throw new Error( `${logPrefix} ERROR || Failed to retrieve user from UserReplicaSetManager after ${MAX_RETRIES} retries. Aborting.` ) } - - // Error if failed to index target blockNumber - if (!blockNumberIndexed) { - throw new Error( - `${logPrefix} ERROR || Web3 provider failed to index target blockNumber ${blockNumber} after ${MAX_RETRIES} retries. Aborting. Error ${errorMsg}` - ) - } } else if (ensurePrimary && selfSpID) { /** * If ensurePrimary required but no blockNumber provided, poll URSM until returned primary = selfSpID @@ -814,12 +830,30 @@ async function getReplicaSetSpIDs({ ) try { - replicaSet = - await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet( - userId - ) - - errorMsg = null + if (config.get('entityManagerReplicaSetEnabled')) { + const encodedUserId = libs.Utils.encodeHashId(userId) + const spResponse = await libs.discoveryProvider.getUserReplicaSet({ + encodedUserId + }) + + if (spResponse && spResponse.primarySpID) { + replicaSet = { + primaryId: spResponse.primarySpID, + secondaryIds: [ + spResponse.secondary1SpID, + spResponse.secondary2SpID + ] + } + errorMsg = null + } else { + errorMsg = `User replica not found in discovery` + } + } else { + replicaSet = + await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet( + userId + ) + } if ( replicaSet && @@ -863,10 +897,24 @@ async function getReplicaSetSpIDs({ let errorMsg = null try { - replicaSet = - await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet( - userId - ) + if (config.get('entityManagerReplicaSetEnabled')) { + const encodedUserId = libs.Utils.encodeHashId(userId) + const spResponse = await libs.discoveryProvider.getUserReplicaSet({ + encodedUserId + }) + + if (spResponse && spResponse.primarySpID) { + replicaSet = { + primaryId: spResponse.primarySpID, + secondaryIds: [spResponse.secondary1SpID, spResponse.secondary2SpID] + } + } + } else { + replicaSet = + await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet( + userId + ) + } } catch (e) { errorMsg = e.message } diff --git a/creator-node/src/routes/audiusUsers.js b/creator-node/src/routes/audiusUsers.js index ca98eaad932..9294d9ae17f 100644 --- a/creator-node/src/routes/audiusUsers.js +++ b/creator-node/src/routes/audiusUsers.js @@ -1,6 +1,7 @@ const express = require('express') const fs = require('fs-extra') +const config = require('../config') const models = require('../models') const { saveFileFromBufferToDisk } = require('../fileManager') const { @@ -113,16 +114,32 @@ router.post( // Verify that wallet of the user on the blockchain for the given ID matches the user attempting to update const serviceRegistry = req.app.get('serviceRegistry') const { libs } = serviceRegistry - const userResp = await libs.contracts.UserFactoryClient.getUser( - blockchainUserId - ) - if ( - !userResp?.wallet || - userResp.wallet.toLowerCase() !== req.session.wallet.toLowerCase() - ) { - throw new Error( - `Owner wallet ${userResp.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}` + if (config.get('entityManagerReplicaSetEnabled')) { + const encodedUserId = libs.Utils.encodeHashId(blockchainUserId) + const spResponse = await libs.discoveryProvider.getUserReplicaSet({ + encodedUserId, + blockNumber + }) + if ( + (spResponse?.wallet ?? '').toLowerCase() !== + req.session.wallet.toLowerCase() + ) { + throw new Error( + `Owner wallet ${spResponse?.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}` + ) + } + } else { + const userResp = await libs.contracts.UserFactoryClient.getUser( + blockchainUserId ) + if ( + !userResp?.wallet || + userResp.wallet.toLowerCase() !== req.session.wallet.toLowerCase() + ) { + throw new Error( + `Owner wallet ${userResp.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}` + ) + } } const cnodeUserUUID = req.session.cnodeUserUUID diff --git a/creator-node/src/services/initAudiusLibs.js b/creator-node/src/services/initAudiusLibs.js index 96dc887acfc..6038c282b26 100644 --- a/creator-node/src/services/initAudiusLibs.js +++ b/creator-node/src/services/initAudiusLibs.js @@ -31,6 +31,7 @@ module.exports = async ({ const ethRegistryAddress = config.get('ethRegistryAddress') const ethOwnerWallet = config.get('ethOwnerWallet') const dataRegistryAddress = config.get('dataRegistryAddress') + const entityManagerAddress = config.get('entityManagerAddress') const dataProviderUrl = config.get('dataProviderUrl') const delegatePrivateKey = config.get('delegatePrivateKey') const oldDelegatePrivateKey = config.get('oldDelegatePrivateKey') @@ -73,7 +74,8 @@ module.exports = async ({ // pass as array [dataProviderUrl], // TODO - formatting this private key here is not ideal - (oldDelegatePrivateKey || delegatePrivateKey).replace('0x', '') + (oldDelegatePrivateKey || delegatePrivateKey).replace('0x', ''), + entityManagerAddress ) : null, discoveryProviderConfig: enableDiscovery diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts b/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts index 09943698fed..ab8638bc557 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts @@ -97,7 +97,7 @@ const updateReplicaSetJobProcessor = async function ({ audiusLibs = await initAudiusLibs({ enableEthContracts: true, enableContracts: true, - enableDiscovery: false, + enableDiscovery: true, enableIdentity: true, logger }) @@ -695,17 +695,53 @@ const _issueUpdateReplicaSetOp = async ( return response } - await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet( - userId, - newReplicaSetSPIds[0], // new primary - newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2] - // This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain - oldPrimarySpId || chainPrimarySpId, - [ - oldSecondary1SpId || chainSecondarySpIds?.[0], - oldSecondary2SpId || chainSecondarySpIds?.[1] - ] - ) + if (config.get('entityManagerReplicaSetEnabled')) { + logger.info( + `[_issueUpdateReplicaSetOp] updating replica set now ${ + Date.now() - startTimeMs + }ms for userId=${userId} wallet=${wallet}` + ) + + const { blockNumber } = + await audiusLibs.User.updateEntityManagerReplicaSet({ + userId, + primary: newReplicaSetSPIds[0], // new primary + secondaries: newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2] + // This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain + oldPrimary: oldPrimarySpId || chainPrimarySpId, + oldSecondaries: [ + oldSecondary1SpId || chainSecondarySpIds?.[0], + oldSecondary2SpId || chainSecondarySpIds?.[1] + ] + }) + logger.info( + `[_issueUpdateReplicaSetOp] did call audiusLibs.User.updateEntityManagerReplicaSet waiting for ${blockNumber}` + ) + // Wait for blockhash/blockNumber to be indexed + try { + await audiusLibs.User.waitForReplicaSetDiscoveryIndexing( + userId, + newReplicaSetSPIds, + blockNumber + ) + } catch (err) { + throw new Error( + `[_issueUpdateReplicaSetOp] waitForReplicaSetDiscovery Indexing Unable to confirm updated replica set for user ${userId}` + ) + } + } else { + await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet( + userId, + newReplicaSetSPIds[0], // new primary + newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2] + // This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain + oldPrimarySpId || chainPrimarySpId, + [ + oldSecondary1SpId || chainSecondarySpIds?.[0], + oldSecondary2SpId || chainSecondarySpIds?.[1] + ] + ) + } response.issuedReconfig = true logger.info( @@ -807,8 +843,25 @@ const _canReconfig = async ({ }: CanReconfigParams): Promise => { let error try { - const { primaryId: chainPrimarySpId, secondaryIds: chainSecondarySpIds } = - await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(userId) + let chainPrimarySpId, chainSecondarySpIds + if (config.get('entityManagerReplicaSetEnabled')) { + const encodedUserId = libs.Utils.encodeHashId(userId) + const spResponse = await libs.discoveryProvider.getUserReplicaSet({ + encodedUserId + }) + chainPrimarySpId = spResponse?.primarySpID + chainSecondarySpIds = [ + spResponse?.secondary1SpID, + spResponse?.secondary2SpID + ] + } else { + const response = + await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet( + userId + ) + chainPrimarySpId = response.primaryId + chainSecondarySpIds = response.secondaryIds + } if ( !chainPrimarySpId || diff --git a/creator-node/test/lib/libsMock.js b/creator-node/test/lib/libsMock.js index ccb82698656..b4d2165ff3a 100644 --- a/creator-node/test/lib/libsMock.js +++ b/creator-node/test/lib/libsMock.js @@ -1,4 +1,5 @@ const sinon = require('sinon') +const { encode, decode } = require('../../src/hashids') function getLibsMock() { const libsMock = { @@ -76,14 +77,37 @@ function getLibsMock() { Playlist: { getPlaylists: sinon.mock().atLeast(1) }, + Utils: { + encodeHashId: sinon.mock().atLeast(1) + }, discoveryProvider: { - discoveryProviderEndpoint: 'http://docker.for.mac.localhost:5000' + discoveryProviderEndpoint: 'http://docker.for.mac.localhost:5000', + getUserReplicaSet: sinon.mock().atLeast(1) }, web3Manager: { verifySignature: () => '0x7c95A677106218A296EcEF1F577c3aE27f0340cd' } } + libsMock.Utils.encodeHashId.callsFake((id) => { + return encode(id) + }) + + libsMock.discoveryProvider.getUserReplicaSet.callsFake(({ encodedUserId }) => { + const user_id = decode(encodedUserId) + return { + user_id, + "wallet": '0xadd36bad12002f1097cdb7ee24085c28e960fc32', + "primary": 'http://mock-cn1.audius.co', + "secondary1": 'http://mock-cn2.audius.co', + "secondary2": 'http://mock-cn3.audius.co', + "primarySpID": 1, + "secondary1SpID": 2, + "secondary2SpID": 3 + } + }) + + libsMock.contracts.UserReplicaSetManagerClient.getUserReplicaSet.returns({ primaryId: 1, secondaryIds: [2, 3] diff --git a/creator-node/test/updateReplicaSet.jobProcessor.test.js b/creator-node/test/updateReplicaSet.jobProcessor.test.js index 890dfb6bc8a..c16b76da3fc 100644 --- a/creator-node/test/updateReplicaSet.jobProcessor.test.js +++ b/creator-node/test/updateReplicaSet.jobProcessor.test.js @@ -8,6 +8,7 @@ const { getApp } = require('./lib/app') const { getLibsMock } = require('./lib/libsMock') const config = require('../src/config') +const { encode, decode } = require('../src/hashids') const { SyncType, RECONFIG_MODES, @@ -82,15 +83,47 @@ describe('test updateReplicaSet job processor', function () { const autoSelectCreatorNodesStub = sandbox .stub() .resolves({ services: healthyNodes }) + const _updateReplicaSet = sandbox + .stub() + .resolves({ blocknumber: 10 }) + const audiusLibsStub = { ServiceProvider: { autoSelectCreatorNodes: autoSelectCreatorNodesStub }, + User: { + updateEntityManagerReplicaSet: _updateReplicaSet, + waitForReplicaSetDiscoveryIndexing: sandbox.stub() + }, contracts: { UserReplicaSetManagerClient: { updateReplicaSet: updateReplicaSetStub, - _updateReplicaSet: updateReplicaSetStub + _updateReplicaSet } + }, + Utils: { + encodeHashId: sandbox + .mock() + .callsFake((id) => { + return encode(id) + }) + }, + discoveryProvider: { + getUserReplicaSet: sandbox + .mock() + .callsFake(({ encodedUserId }) => { + const user_id = decode(encodedUserId) + return { + user_id, + "wallet": '0x123456789', + "primary": 'http://mock-cn1.audius.co', + "secondary1": 'http://mock-cn2.audius.co', + "secondary2": 'http://mock-cn3.audius.co', + "primarySpID": 1, + "secondary1SpID": 2, + "secondary2SpID": 3 + } + }) } } return proxyquire( diff --git a/discovery-provider/integration_tests/challenges/index_helpers.py b/discovery-provider/integration_tests/challenges/index_helpers.py index cd492d2b332..8967be601a7 100644 --- a/discovery-provider/integration_tests/challenges/index_helpers.py +++ b/discovery-provider/integration_tests/challenges/index_helpers.py @@ -19,8 +19,16 @@ def get_metadata(self, multihash, format, endpoint): class UpdateTask: - def __init__(self, cid_metadata_client, web3, challenge_event_bus, redis=None): + def __init__( + self, + cid_metadata_client, + web3, + challenge_event_bus, + redis=None, + eth_manager=None, + ): self.cid_metadata_client = cid_metadata_client self.web3 = web3 self.challenge_event_bus: ChallengeEventBus = challenge_event_bus self.redis = redis + self.eth_manager = eth_manager diff --git a/discovery-provider/integration_tests/tasks/entity_manager/test_ursm_entity_manager.py b/discovery-provider/integration_tests/tasks/entity_manager/test_ursm_entity_manager.py new file mode 100644 index 00000000000..6f672912155 --- /dev/null +++ b/discovery-provider/integration_tests/tasks/entity_manager/test_ursm_entity_manager.py @@ -0,0 +1,205 @@ +from typing import List + +from integration_tests.challenges.index_helpers import UpdateTask +from integration_tests.utils import populate_mock_db +from src.challenges.challenge_event_bus import ChallengeEventBus, setup_challenge_bus +from src.models.users.user import User +from src.tasks.entity_manager.entity_manager import entity_manager_update +from src.utils.db_session import get_db +from src.utils.eth_manager import EthManager +from web3 import Web3 +from web3.datastructures import AttributeDict + + +def set_patches(mocker): + mocker.patch( + "src.tasks.entity_manager.user_replica_set.get_endpoint_string_from_sp_ids", + return_value="https://cn.io,https://cn2.io,https://cn3.io", + autospec=True, + ) + + def fetch_node_info(self, sp_id, sp_type, redis): + return { + "operator_wallet": "wallet1", + "endpoint": "http://endpoint.io", + "block_number": sp_id, + "delegator_wallet": f"spid{sp_id}", + } + + mocker.patch( + "src.utils.eth_manager.EthManager.fetch_node_info", + side_effect=fetch_node_info, + autospec=True, + ) + + +def test_index_update_user_replica_set_from_sp(app, mocker): + "Tests valid batch of ursm update actions" + + set_patches(mocker) + + tx_receipts = { + "UpdateUserReplicaSet1Tx": [ + { + "args": AttributeDict( + { + "_entityId": 1, + "_entityType": "UserReplicaSet", + "_userId": 1, + "_action": "Update", + "_metadata": "1,2,3:2,3,4", + "_signer": "spid2", + } + ) + }, + ], + } + + def get_events_side_effect(_, tx_receipt): + return tx_receipts[tx_receipt.transactionHash.decode("utf-8")] + + mocker.patch( + "src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx", + side_effect=get_events_side_effect, + autospec=True, + ) + + # setup db and mocked txs + with app.app_context(): + db = get_db() + web3 = Web3() + challenge_event_bus: ChallengeEventBus = setup_challenge_bus() + eth_manager = EthManager(None, None, None) + update_task = UpdateTask(None, web3, challenge_event_bus, None, eth_manager) + + entity_manager_txs = [ + AttributeDict({"transactionHash": update_task.web3.toBytes(text=tx_receipt)}) + for tx_receipt in tx_receipts + ] + + entities = { + "users": [ + { + "user_id": 1, + "handle": "user-1", + "wallet": "user1wallet", + "primary_id": 1, + "secondary_ids": [2, 3], + } + ] + } + populate_mock_db(db, entities) + + with db.scoped_session() as session: + + # index transactions + entity_manager_update( + None, + update_task, + session, + entity_manager_txs, + block_number=0, + block_timestamp=1585336422, + block_hash=0, + metadata={}, + ) + + with db.scoped_session() as session: + + # validate db records + all_users: List[User] = session.query(User).all() + assert len(all_users) == 2 + + user_1: User = ( + session.query(User) + .filter(User.is_current == True, User.user_id == 1) + .first() + ) + assert user_1.primary_id == 2 + assert user_1.secondary_ids == [3, 4] + + +def test_index_update_user_replica_set(app, mocker): + "Tests valid batch of ursm update actions" + + set_patches(mocker) + + def get_events_side_effect(_, tx_receipt): + return tx_receipts[tx_receipt.transactionHash.decode("utf-8")] + + mocker.patch( + "src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx", + side_effect=get_events_side_effect, + autospec=True, + ) + + # setup db and mocked txs + with app.app_context(): + db = get_db() + web3 = Web3() + challenge_event_bus: ChallengeEventBus = setup_challenge_bus() + eth_manager = EthManager(None, None, None) + update_task = UpdateTask(None, web3, challenge_event_bus, None, eth_manager) + + tx_receipts = { + "UpdateUserReplicaSet1Tx": [ + { + "args": AttributeDict( + { + "_entityId": 1, + "_entityType": "UserReplicaSet", + "_userId": 1, + "_action": "Update", + "_metadata": "1,2,3:2,3,4", + "_signer": "spid2", + } + ) + }, + ], + } + + entity_manager_txs = [ + AttributeDict({"transactionHash": update_task.web3.toBytes(text=tx_receipt)}) + for tx_receipt in tx_receipts + ] + + entities = { + "users": [ + { + "user_id": 1, + "handle": "user-1", + "wallet": "user1wallet", + "primary_id": 1, + "secondary_ids": [2, 3], + } + ] + } + populate_mock_db(db, entities) + + with db.scoped_session() as session: + + # index transactions + entity_manager_update( + None, + update_task, + session, + entity_manager_txs, + block_number=0, + block_timestamp=1585336422, + block_hash=0, + metadata={}, + ) + + with db.scoped_session() as session: + + # validate db records + all_users: List[User] = session.query(User).all() + assert len(all_users) == 2 + + user_1: User = ( + session.query(User) + .filter(User.is_current == True, User.user_id == 1) + .first() + ) + assert user_1.primary_id == 2 + assert user_1.secondary_ids == [3, 4] diff --git a/discovery-provider/integration_tests/tasks/entity_manager/test_user_entity_manager.py b/discovery-provider/integration_tests/tasks/entity_manager/test_user_entity_manager.py new file mode 100644 index 00000000000..d4d7fdb27b3 --- /dev/null +++ b/discovery-provider/integration_tests/tasks/entity_manager/test_user_entity_manager.py @@ -0,0 +1,520 @@ +from typing import List + +from integration_tests.challenges.index_helpers import UpdateTask +from integration_tests.utils import populate_mock_db +from src.challenges.challenge_event_bus import ChallengeEventBus, setup_challenge_bus +from src.models.users.user import User +from src.tasks.entity_manager.entity_manager import entity_manager_update +from src.tasks.entity_manager.utils import USER_ID_OFFSET +from src.utils.db_session import get_db +from web3 import Web3 +from web3.datastructures import AttributeDict + + +def set_patches(mocker): + mocker.patch( + "src.tasks.entity_manager.user.get_endpoint_string_from_sp_ids", + return_value="https://cn.io,https://cn2.io,https://cn3.io", + autospec=True, + ) + + def fetch_node_info(self, sp_id, sp_type, redis): + return { + "operator_wallet": "wallet1", + "endpoint": "http://endpoint.io", + "block_number": sp_id, + "delegator_wallet": f"spid{sp_id}", + } + + mocker.patch( + "src.utils.eth_manager.EthManager.fetch_node_info", + side_effect=fetch_node_info, + autospec=True, + ) + + +def test_index_valid_user(app, mocker): + "Tests valid batch of users create/update/delete actions" + + set_patches(mocker) + + # setup db and mocked txs + with app.app_context(): + db = get_db() + web3 = Web3() + challenge_event_bus: ChallengeEventBus = setup_challenge_bus() + update_task = UpdateTask(None, web3, challenge_event_bus) + + tx_receipts = { + "CreateUser1Tx": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET, + "_entityType": "User", + "_userId": USER_ID_OFFSET, + "_action": "Create", + "_metadata": "1,2,3", + "_signer": "user1wallet", + } + ) + }, + ], + "UpdateUser1Tx": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET, + "_entityType": "User", + "_userId": USER_ID_OFFSET, + "_action": "Update", + "_metadata": "QmUpdateUser1", + "_signer": "user1wallet", + } + ) + }, + ], + "CreateUser2Tx": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET + 1, + "_entityType": "User", + "_userId": USER_ID_OFFSET + 1, + "_action": "Create", + "_metadata": "2,3,4", + "_signer": "user2wallet", + } + ) + }, + ], + "UpdateUser2Tx": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET + 1, + "_entityType": "User", + "_userId": USER_ID_OFFSET + 1, + "_action": "Update", + "_metadata": "QmCreateUser2", + "_signer": "user2wallet", + } + ) + }, + ], + } + + entity_manager_txs = [ + AttributeDict({"transactionHash": update_task.web3.toBytes(text=tx_receipt)}) + for tx_receipt in tx_receipts + ] + + def get_events_side_effect(_, tx_receipt): + return tx_receipts[tx_receipt.transactionHash.decode("utf-8")] + + mocker.patch( + "src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx", + side_effect=get_events_side_effect, + autospec=True, + ) + test_metadata = { + "QmCreateUser1": { + "is_verified": False, + "is_deactivated": False, + "name": "raymont", + "handle": "rayjacobson", + "profile_picture": None, + "profile_picture_sizes": "QmYRHAJ4YuLjT4fLLRMg5STnQA4yDpiBmzk5R3iCDTmkmk", + "cover_photo": None, + "cover_photo_sizes": "QmUk61QDUTzhNqjnCAWipSp3jnMmXBmtTUC2mtF5F6VvUy", + "bio": "🌞👄🌞", + "location": "chik fil yay!!", + "creator_node_endpoint": "https://creatornode.audius.co,https://content-node.audius.co,https://blockdaemon-audius-content-06.bdnodes.net", + "associated_wallets": None, + "associated_sol_wallets": None, + "playlist_library": { + "contents": [ + {"playlist_id": "Audio NFTs", "type": "explore_playlist"}, + {"playlist_id": 4327, "type": "playlist"}, + {"playlist_id": 52792, "type": "playlist"}, + {"playlist_id": 63949, "type": "playlist"}, + { + "contents": [ + {"playlist_id": 6833, "type": "playlist"}, + {"playlist_id": 4735, "type": "playlist"}, + {"playlist_id": 114799, "type": "playlist"}, + {"playlist_id": 115049, "type": "playlist"}, + {"playlist_id": 89495, "type": "playlist"}, + ], + "id": "d515f4db-1db2-41df-9e0c-0180302a24f9", + "name": "WIP", + "type": "folder", + }, + { + "contents": [ + {"playlist_id": 9616, "type": "playlist"}, + {"playlist_id": 112826, "type": "playlist"}, + ], + "id": "a0da6552-ddc4-4d13-a19e-ecc63ca23e90", + "name": "Community", + "type": "folder", + }, + { + "contents": [ + {"playlist_id": 128608, "type": "playlist"}, + {"playlist_id": 90778, "type": "playlist"}, + {"playlist_id": 94395, "type": "playlist"}, + {"playlist_id": 97193, "type": "playlist"}, + ], + "id": "1163fbab-e710-4d33-8769-6fcb02719d7b", + "name": "Actually Albums", + "type": "folder", + }, + {"playlist_id": 131423, "type": "playlist"}, + {"playlist_id": 40151, "type": "playlist"}, + ] + }, + "events": {"is_mobile_user": True}, + "user_id": USER_ID_OFFSET, + }, + "QmCreateUser2": { + "is_verified": False, + "is_deactivated": False, + "name": "Forrest", + "handle": "forrest", + "profile_picture": None, + "profile_picture_sizes": "QmNmzMoiLYSAgrLbAAnaPW9q3YZwZvHybbbs59QamzUQxg", + "cover_photo": None, + "cover_photo_sizes": "QmR2fSFvtpWg7nfdYtoJ3KgDNf4YgcuSzKjwZjansW9wcj", + "bio": "On the lookout for that next breakout track... 👀", + "location": "Los Angeles, CA", + "creator_node_endpoint": "https://creatornode2.audius.co,https://creatornode3.audius.co,https://content-node.audius.co", + "associated_wallets": None, + "associated_sol_wallets": None, + "playlist_library": { + "contents": [ + {"playlist_id": "Audio NFTs", "type": "explore_playlist"}, + {"playlist_id": 11363, "type": "playlist"}, + {"playlist_id": 129218, "type": "playlist"}, + ] + }, + "events": None, + "user_id": USER_ID_OFFSET + 1, + }, + "QmUpdateUser1": { + "is_verified": False, + "is_deactivated": False, + "name": "raymont updated", + "handle": "rayjacobson", + "profile_picture": None, + "profile_picture_sizes": "QmYRHAJ4YuLjT4fLLRMg5STnQA4yDpiBmzk5R3iCDTmkmk", + "cover_photo": None, + "cover_photo_sizes": "QmUk61QDUTzhNqjnCAWipSp3jnMmXBmtTUC2mtF5F6VvUy", + "bio": "🌞👄🌞", + "location": "chik fil yay!!", + "creator_node_endpoint": "https://creatornode.audius.co,https://content-node.audius.co,https://blockdaemon-audius-content-06.bdnodes.net", + "associated_wallets": None, + "associated_sol_wallets": None, + "playlist_library": { + "contents": [ + {"playlist_id": "Audio NFTs", "type": "explore_playlist"}, + {"playlist_id": 4327, "type": "playlist"}, + {"playlist_id": 52792, "type": "playlist"}, + {"playlist_id": 63949, "type": "playlist"}, + { + "contents": [ + {"playlist_id": 6833, "type": "playlist"}, + {"playlist_id": 4735, "type": "playlist"}, + {"playlist_id": 114799, "type": "playlist"}, + {"playlist_id": 115049, "type": "playlist"}, + {"playlist_id": 89495, "type": "playlist"}, + ], + "id": "d515f4db-1db2-41df-9e0c-0180302a24f9", + "name": "WIP", + "type": "folder", + }, + { + "contents": [ + {"playlist_id": 9616, "type": "playlist"}, + {"playlist_id": 112826, "type": "playlist"}, + ], + "id": "a0da6552-ddc4-4d13-a19e-ecc63ca23e90", + "name": "Community", + "type": "folder", + }, + { + "contents": [ + {"playlist_id": 128608, "type": "playlist"}, + {"playlist_id": 90778, "type": "playlist"}, + {"playlist_id": 94395, "type": "playlist"}, + {"playlist_id": 97193, "type": "playlist"}, + ], + "id": "1163fbab-e710-4d33-8769-6fcb02719d7b", + "name": "Actually Albums", + "type": "folder", + }, + {"playlist_id": 131423, "type": "playlist"}, + {"playlist_id": 40151, "type": "playlist"}, + ] + }, + "events": {"is_mobile_user": True}, + "user_id": USER_ID_OFFSET, + }, + } + + entities = { + "users": [ + {"user_id": 1, "handle": "user-1", "wallet": "user1wallet"}, + {"user_id": 2, "handle": "user-1", "wallet": "User2Wallet"}, + ] + } + populate_mock_db(db, entities) + + with db.scoped_session() as session: + + # index transactions + entity_manager_update( + None, + update_task, + session, + entity_manager_txs, + block_number=1, + block_timestamp=1585336422, + block_hash=0, + metadata=test_metadata, + ) + + with db.scoped_session() as session: + + # validate db records + all_users: List[User] = session.query(User).all() + assert len(all_users) == 6 + + user_1: User = ( + session.query(User) + .filter(User.is_current == True, User.user_id == USER_ID_OFFSET) + .first() + ) + assert user_1.name == "raymont updated" + + user_2: User = ( + session.query(User) + .filter( + User.is_current == True, + User.user_id == USER_ID_OFFSET + 1, + ) + .first() + ) + assert user_2.name == "Forrest" + + +def test_index_invalid_users(app, mocker): + "Tests invalid batch of useres create/update/delete actions" + set_patches(mocker) + + # setup db and mocked txs + with app.app_context(): + db = get_db() + web3 = Web3() + update_task = UpdateTask(None, web3, None) + + tx_receipts = { + # invalid create + "CreateUserBelowOffset": [ + { + "args": AttributeDict( + { + "_entityId": 1, + "_entityType": "User", + "_userId": 1, + "_action": "Create", + "_metadata": "", + "_signer": "user1wallet", + } + ) + }, + ], + "CreateUserWithBadMetadataDoesNotExist": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET + 1, + "_entityType": "User", + "_userId": 2, + "_action": "Create", + "_metadata": "QmInvalidUserMetadata", + "_signer": "user1wallet", + } + ) + }, + ], + "CreateUserDoesNotMatchSigner": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET + 1, + "_entityType": "User", + "_userId": 1, + "_action": "Create", + "_metadata": "", + "_signer": "InvalidWallet", + } + ) + }, + ], + "CreateUser": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET, + "_entityType": "User", + "_userId": USER_ID_OFFSET, + "_action": "Create", + "_metadata": "3,4,5", + "_signer": "user1wallet", + } + ) + }, + ], + "CreateUserAlreadyExists": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET, + "_entityType": "User", + "_userId": USER_ID_OFFSET, + "_action": "Create", + "_metadata": "QmCreateUser1", + "_signer": "user1wallet", + } + ) + }, + ], + # invalid updates + "UpdateUserInvalidSigner": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET, + "_entityType": "User", + "_userId": 1, + "_action": "Update", + "_metadata": "", + "_signer": "InvalidWallet", + } + ) + }, + ], + "UpdateUserInvalidOwner": [ + { + "args": AttributeDict( + { + "_entityId": USER_ID_OFFSET, + "_entityType": "User", + "_userId": 2, + "_action": "Update", + "_metadata": "", + "_signer": "User2Wallet", + } + ) + }, + ], + } + test_metadata = { + "QmCreateUser1": { + "is_verified": False, + "is_deactivated": False, + "name": "raymont", + "handle": "rayjacobson", + "profile_picture": None, + "profile_picture_sizes": "QmYRHAJ4YuLjT4fLLRMg5STnQA4yDpiBmzk5R3iCDTmkmk", + "cover_photo": None, + "cover_photo_sizes": "QmUk61QDUTzhNqjnCAWipSp3jnMmXBmtTUC2mtF5F6VvUy", + "bio": "🌞👄🌞", + "location": "chik fil yay!!", + "creator_node_endpoint": "https://creatornode.audius.co,https://content-node.audius.co,https://blockdaemon-audius-content-06.bdnodes.net", + "associated_wallets": None, + "associated_sol_wallets": None, + "playlist_library": { + "contents": [ + {"playlist_id": "Audio NFTs", "type": "explore_playlist"}, + {"playlist_id": 4327, "type": "playlist"}, + {"playlist_id": 52792, "type": "playlist"}, + {"playlist_id": 63949, "type": "playlist"}, + { + "contents": [ + {"playlist_id": 6833, "type": "playlist"}, + {"playlist_id": 4735, "type": "playlist"}, + {"playlist_id": 114799, "type": "playlist"}, + {"playlist_id": 115049, "type": "playlist"}, + {"playlist_id": 89495, "type": "playlist"}, + ], + "id": "d515f4db-1db2-41df-9e0c-0180302a24f9", + "name": "WIP", + "type": "folder", + }, + { + "contents": [ + {"playlist_id": 9616, "type": "playlist"}, + {"playlist_id": 112826, "type": "playlist"}, + ], + "id": "a0da6552-ddc4-4d13-a19e-ecc63ca23e90", + "name": "Community", + "type": "folder", + }, + { + "contents": [ + {"playlist_id": 128608, "type": "playlist"}, + {"playlist_id": 90778, "type": "playlist"}, + {"playlist_id": 94395, "type": "playlist"}, + {"playlist_id": 97193, "type": "playlist"}, + ], + "id": "1163fbab-e710-4d33-8769-6fcb02719d7b", + "name": "Actually Albums", + "type": "folder", + }, + {"playlist_id": 131423, "type": "playlist"}, + {"playlist_id": 40151, "type": "playlist"}, + ] + }, + "events": {"is_mobile_user": True}, + "user_id": USER_ID_OFFSET, + } + } + + entity_manager_txs = [ + AttributeDict({"transactionHash": update_task.web3.toBytes(text=tx_receipt)}) + for tx_receipt in tx_receipts + ] + + def get_events_side_effect(_, tx_receipt): + return tx_receipts[tx_receipt.transactionHash.decode("utf-8")] + + mocker.patch( + "src.tasks.entity_manager.entity_manager.get_entity_manager_events_tx", + side_effect=get_events_side_effect, + autospec=True, + ) + entities = { + "users": [ + {"user_id": 1, "handle": "user-1", "wallet": "user1wallet"}, + ] + } + populate_mock_db(db, entities) + + with db.scoped_session() as session: + # index transactions + entity_manager_update( + None, + update_task, + session, + entity_manager_txs, + block_number=0, + block_timestamp=1585336422, + block_hash=0, + metadata=test_metadata, + ) + + # validate db records + all_users: List[User] = session.query(User).all() + assert len(all_users) == 2 # no new users indexed diff --git a/discovery-provider/src/api/v1/users.py b/discovery-provider/src/api/v1/users.py index 071511343ad..31f1e973d1a 100644 --- a/discovery-provider/src/api/v1/users.py +++ b/discovery-provider/src/api/v1/users.py @@ -77,6 +77,7 @@ GetUserListeningHistoryArgs, get_user_listening_history, ) +from src.queries.get_user_replica_set import get_user_replica_set from src.queries.get_user_with_wallet import get_user_with_wallet from src.queries.get_users import get_users from src.queries.get_users_cnode import ReplicaType, get_users_cnode @@ -1470,3 +1471,49 @@ def get(self): # 5. Send back the decoded payload return success_response(payload) + + +GET_REPLICA_SET = "//replica_set" +user_replica_set_full_response = make_full_response( + "users_by_content_node", full_ns, fields.Nested(user_replica_set) +) +user_replica_set_response = make_response( + "users_by_content_node", ns, fields.Nested(user_replica_set) +) + + +@full_ns.route(GET_REPLICA_SET) +class FullGetReplicaSet(Resource): + @record_metrics + @cache(ttl_sec=5) + def _get(self, id: str): + decoded_id = decode_with_abort(id, full_ns) + args = {"user_id": decoded_id} + replica_set = get_user_replica_set(args) + return success_response(replica_set) + + @full_ns.doc( + id="""Get User Replica Set""", + description="""Gets the user's replica set""", + params={ + "id": "A User ID", + }, + ) + @full_ns.expect(current_user_parser) + @full_ns.marshal_with(user_replica_set_full_response) + def get(self, id: str): + return self._get(id) + + +@ns.route(GET_REPLICA_SET) +class GetReplicaSet(FullGetReplicaSet): + @ns.doc( + id="""Get User Replica Set""", + description="""Gets the user's replica set""", + params={ + "id": "A User ID", + }, + ) + @ns.marshal_with(user_replica_set_response) + def get(self, id: str): + return super()._get(id) diff --git a/discovery-provider/src/app.py b/discovery-provider/src/app.py index f648835a69a..cb20f9b9195 100644 --- a/discovery-provider/src/app.py +++ b/discovery-provider/src/app.py @@ -42,6 +42,7 @@ from src.utils import helpers from src.utils.cid_metadata_client import CIDMetadataClient from src.utils.config import ConfigIni, config_files, shared_config +from src.utils.eth_manager import EthManager from src.utils.multi_provider import MultiProvider from src.utils.redis_metrics import METRICS_INTERVAL, SYNCHRONIZE_METRICS_INTERVAL from src.utils.session_manager import SessionManager @@ -550,6 +551,12 @@ def configure_celery(celery, test_config=None): cid_metadata_client, ) + registry_address = web3.toChecksumAddress( + shared_config["eth_contracts"]["registry"] + ) + eth_manager = EthManager(eth_web3, eth_abi_values, registry_address) + eth_manager.init_contracts() + # Clear existing locks used in tasks if present redis_inst.delete("disc_prov_lock") redis_inst.delete("network_peers_lock") @@ -591,6 +598,7 @@ def __init__(self, *args, **kwargs): solana_client_manager=solana_client_manager, challenge_event_bus=setup_challenge_bus(), anchor_program_indexer=anchor_program_indexer, + eth_manager=eth_manager, ) celery.autodiscover_tasks(["src.tasks"], "index", True) diff --git a/discovery-provider/src/database_task.py b/discovery-provider/src/database_task.py index dd0c2bc68c4..5dbacc141c8 100644 --- a/discovery-provider/src/database_task.py +++ b/discovery-provider/src/database_task.py @@ -1,6 +1,7 @@ from celery import Task from redis import Redis from src.challenges.challenge_event_bus import ChallengeEventBus +from src.utils.eth_manager import EthManager from src.utils.session_manager import SessionManager @@ -19,6 +20,7 @@ def __init__( solana_client_manager=None, challenge_event_bus=None, anchor_program_indexer=None, + eth_manager=None, ): self._db = db self._db_read_replica = db_read_replica @@ -32,6 +34,7 @@ def __init__( self._solana_client_manager = solana_client_manager self._challenge_event_bus = challenge_event_bus self._anchor_program_indexer = anchor_program_indexer + self._eth_manager = eth_manager @property def abi_values(self): @@ -80,3 +83,7 @@ def challenge_event_bus(self) -> ChallengeEventBus: @property def anchor_program_indexer(self): return self._anchor_program_indexer + + @property + def eth_manager(self) -> EthManager: + return self._eth_manager diff --git a/discovery-provider/src/queries/get_user_replica_set.py b/discovery-provider/src/queries/get_user_replica_set.py new file mode 100644 index 00000000000..71a0fa6be6e --- /dev/null +++ b/discovery-provider/src/queries/get_user_replica_set.py @@ -0,0 +1,38 @@ +import logging +from typing import List + +from src.models.users.user import User +from src.utils.db_session import get_db_read_replica + +logger = logging.getLogger(__name__) + + +def get_user_replica_set(args): + user_id = args.get("user_id") + + db = get_db_read_replica() + with db.scoped_session() as session: + + # Don't return the user if they have no wallet (user creation did not finish properly on chain) + users: List[User] = ( + session.query(User) + .filter( + User.is_current == True, User.wallet != None, User.user_id == user_id + ) + .all() + ) + if len(users) != 1: + return {} + + user = users[0] + endpoints = user.creator_node_endpoint.split(",") + return { + "user_id": user.user_id, + "wallet": user.wallet, + "primary": endpoints[0], + "secondary1": endpoints[1], + "secondary2": endpoints[2], + "primarySpID": user.primary_id, + "secondary1SpID": user.secondary_ids[0], + "secondary2SpID": user.secondary_ids[1], + } diff --git a/discovery-provider/src/queries/get_users.py b/discovery-provider/src/queries/get_users.py index 8d245eff6aa..a661408570c 100644 --- a/discovery-provider/src/queries/get_users.py +++ b/discovery-provider/src/queries/get_users.py @@ -37,9 +37,7 @@ def get_users_and_ids(): # Create initial query base_query = session.query(User) # Don't return the user if they have no wallet or handle (user creation did not finish properly on chain) - base_query = base_query.filter( - User.is_current == True, User.wallet != None, User.handle != None - ) + base_query = base_query.filter(User.is_current == True, User.wallet != None) # Process filters if "wallet" in args: diff --git a/discovery-provider/src/tasks/entity_manager/entity_manager.py b/discovery-provider/src/tasks/entity_manager/entity_manager.py index 1d52d924c45..a6cd8357006 100644 --- a/discovery-provider/src/tasks/entity_manager/entity_manager.py +++ b/discovery-provider/src/tasks/entity_manager/entity_manager.py @@ -27,6 +27,8 @@ delete_social_record, ) from src.tasks.entity_manager.track import create_track, delete_track, update_track +from src.tasks.entity_manager.user import create_user, update_user +from src.tasks.entity_manager.user_replica_set import update_user_replica_set from src.tasks.entity_manager.utils import ( MANAGE_ENTITY_EVENT_TYPE, Action, @@ -43,7 +45,7 @@ logger = logging.getLogger(__name__) # Please toggle below variable to true for development -ENABLE_DEVELOPMENT_FEATURES = False +ENABLE_DEVELOPMENT_FEATURES = True def entity_manager_update( @@ -102,12 +104,15 @@ def entity_manager_update( start_time_tx = time.time() params = ManageEntityParameters( session, + update_task.redis, challenge_bus, event, new_records, # actions below populate these records existing_records, pending_track_routes, metadata, + update_task.eth_manager, + update_task.web3, block_timestamp, block_number, event_blockhash, @@ -151,6 +156,24 @@ def entity_manager_update( create_social_record(params) elif params.action in delete_social_action_types: delete_social_record(params) + elif ( + params.action == Action.CREATE + and params.entity_type == EntityType.USER + and ENABLE_DEVELOPMENT_FEATURES + ): + create_user(params) + elif ( + params.action == Action.UPDATE + and params.entity_type == EntityType.USER + and ENABLE_DEVELOPMENT_FEATURES + ): + update_user(params) + elif ( + params.action == Action.UPDATE + and params.entity_type == EntityType.USER_REPLICA_SET + and ENABLE_DEVELOPMENT_FEATURES + ): + update_user_replica_set(params) except Exception as e: # swallow exception to keep indexing logger.info( @@ -208,6 +231,9 @@ def copy_original_records(existing_records): return original_records +entity_types_to_fetch = set([EntityType.USER, EntityType.TRACK, EntityType.PLAYLIST]) + + def collect_entities_to_fetch( update_task, entity_manager_txs, @@ -220,9 +246,10 @@ def collect_entities_to_fetch( entity_id = helpers.get_tx_arg(event, "_entityId") entity_type = helpers.get_tx_arg(event, "_entityType") user_id = helpers.get_tx_arg(event, "_userId") - action = helpers.get_tx_arg(event, "_action") - entities_to_fetch[entity_type].add(entity_id) + if entity_type in entity_types_to_fetch: + entities_to_fetch[entity_type].add(entity_id) entities_to_fetch[EntityType.USER].add(user_id) + action = helpers.get_tx_arg(event, "_action") # Query follow operations as needed if action in action_to_record_type.keys(): diff --git a/discovery-provider/src/tasks/entity_manager/user.py b/discovery-provider/src/tasks/entity_manager/user.py new file mode 100644 index 00000000000..4ef77954d09 --- /dev/null +++ b/discovery-provider/src/tasks/entity_manager/user.py @@ -0,0 +1,135 @@ +import logging +from typing import Dict + +from src.models.users.user import User +from src.tasks.entity_manager.user_replica_set import parse_sp_ids +from src.tasks.entity_manager.utils import ( + USER_ID_OFFSET, + Action, + EntityType, + ManageEntityParameters, + copy_user_record, +) +from src.tasks.user_replica_set import get_endpoint_string_from_sp_ids +from src.tasks.users import ( + update_legacy_user_images, + update_user_metadata, + validate_user_record, +) + +logger = logging.getLogger(__name__) + + +def validate_user_tx(params: ManageEntityParameters): + user_id = params.user_id + + if params.entity_type != EntityType.USER: + raise Exception("Invalid User Transaction, wrong entity type") + + if params.action == Action.CREATE: + if user_id in params.existing_records[EntityType.USER]: + raise Exception("Invalid User Transaction, user does not exist") + if user_id < USER_ID_OFFSET: + raise Exception("Invalid User Transaction, user id offset incorrect") + else: + # update / delete specific validations + if user_id not in params.existing_records[EntityType.USER]: + raise Exception("Invalid User Transaction, user does not exist") + wallet = params.existing_records[EntityType.USER][user_id].wallet + if wallet and wallet.lower() != params.signer.lower(): + raise Exception( + "Invalid User Transaction, user wallet signer does not match" + ) + + +def update_user_record(params: ManageEntityParameters, user: User, metadata: Dict): + update_user_metadata( + params.session, params.redis, user, metadata, params.web3, params.challenge_bus + ) + user.metadata_multihash = params.metadata_cid + user = update_legacy_user_images(user) + user = validate_user_record(user) + return user + + +def create_user(params: ManageEntityParameters): + validate_user_tx(params) + + user_id = params.user_id + + user_record = User( + user_id=user_id, + wallet=params.signer.lower(), + txhash=params.txhash, + blockhash=params.event_blockhash, + blocknumber=params.block_number, + created_at=params.block_datetime, + updated_at=params.block_datetime, + is_current=False, + ) + + sp_ids = parse_sp_ids(params.metadata_cid) + + # Update the user's new replica set in the model and save! + user_record.primary_id = sp_ids[0] + user_record.secondary_ids = sp_ids[1:] + + # Update cnode endpoint string reconstructed from sp ID + creator_node_endpoint_str = get_endpoint_string_from_sp_ids( + params.redis, sp_ids[0], sp_ids[1:] + ) + user_record.creator_node_endpoint = creator_node_endpoint_str + + user_record = validate_user_record(user_record) + params.add_user_record(user_id, user_record) + return user_record + + +def update_user(params: ManageEntityParameters): + validate_user_tx(params) + + user_metadata = params.metadata[params.metadata_cid] + user_id = params.entity_id + existing_user = params.existing_records[EntityType.USER][user_id] + existing_user.is_current = False # invalidate + if ( + user_id in params.new_records[EntityType.USER] + and params.new_records[EntityType.USER][user_id] + ): # override with last updated user is in this block + existing_user = params.new_records[EntityType.USER][user_id][-1] + + user_record = copy_user_record( + existing_user, + params.block_number, + params.event_blockhash, + params.txhash, + params.block_datetime, + ) + + # If the user's handle is not set, validate that it is unique + if not user_record.handle: + user_handle_exists = params.session.query( + params.session.query(User) + .filter(User.handle == user_metadata["handle"]) + .exists() + ).scalar() + if user_handle_exists: + # Invalid user handle - should not continue to save... + return + user_record.handle = user_metadata["handle"] + user_record.handle_lc = user_metadata["handle"].lower() + + user_record = update_user_metadata( + params.session, + params.redis, + user_record, + user_metadata, + params.web3, + params.challenge_bus, + ) + user_record.metadata_multihash = params.metadata_cid + user_record = update_legacy_user_images(user_record) + user_record = validate_user_record(user_record) + params.add_user_record(user_id, user_record) + + return user_record diff --git a/discovery-provider/src/tasks/entity_manager/user_replica_set.py b/discovery-provider/src/tasks/entity_manager/user_replica_set.py new file mode 100644 index 00000000000..6e4d81f80f1 --- /dev/null +++ b/discovery-provider/src/tasks/entity_manager/user_replica_set.py @@ -0,0 +1,118 @@ +import logging +from typing import List, Tuple + +from src.tasks.entity_manager.utils import ( + Action, + EntityType, + ManageEntityParameters, + copy_user_record, +) +from src.tasks.user_replica_set import get_endpoint_string_from_sp_ids +from src.utils.eth_manager import ServiceProviderType + +logger = logging.getLogger(__name__) + + +def parse_update_sp_id(params) -> Tuple[List[int], List[int]]: + sp_ids = params.metadata_cid.split(":") + if len(sp_ids) != 2: + raise Exception('Invalid format entity_id should be ":" separated') + return parse_sp_ids(sp_ids[0]), parse_sp_ids(sp_ids[1]) + + +def parse_sp_ids(sp_ids_str: str) -> List[int]: + sp_ids = sp_ids_str.split(",") + for sp_id in sp_ids: + if not sp_id.isdigit(): + raise Exception(f"sp id of {sp_id} is not a digit") + if len(sp_ids) < 3: + raise Exception("Too few updated sp ids") + + return [int(id) for id in sp_ids] + + +def is_valid_user_replica_set_tx(params: ManageEntityParameters) -> None: + user_id = params.user_id + if user_id not in params.existing_records[EntityType.USER]: + # user does not exist + raise Exception("User does not exist") + # Validate the signer is the user or in the current replica set of content nodes + user = params.existing_records[EntityType.USER][user_id] + user_sp_ids = [user.primary_id] + if user.secondary_ids: + user_sp_ids = user_sp_ids + user.secondary_ids + if user.wallet and user.wallet.lower() != params.signer.lower(): + # user does not match signer + # check the content nodes + valid_cn_signer = False + user_replica_set_sp_ids = set(user_sp_ids) + for sp_id in user_replica_set_sp_ids: + sp_info_cached = params.eth_manager.fetch_node_info( + sp_id, ServiceProviderType.CONTENT, params.redis + ) + + delegator_wallet = sp_info_cached["delegator_wallet"] + if delegator_wallet.lower() == params.signer.lower(): + valid_cn_signer = True + if not valid_cn_signer: + raise Exception("Invalid tx signer") + + current_sp_ids, updated_sp_ids = parse_update_sp_id(params) + if current_sp_ids[0] != user_sp_ids[0] or set(current_sp_ids[1:]) != set( + user_sp_ids[1:] + ): + raise Exception( + f"Current sp ids does not match parameters, current: {current_sp_ids} and requested {user_sp_ids}" + ) + + if len(set(updated_sp_ids)) != len(updated_sp_ids): + raise Exception("Duplicate sp ids not allowed") + + for sp_id in updated_sp_ids: + sp_info_cached = params.eth_manager.fetch_node_info( + sp_id, ServiceProviderType.CONTENT, params.redis + ) + if not sp_info_cached or sp_info_cached["endpoint"] == "": + raise Exception( + "Cannot set sp ids to invalid set with unregistered service" + ) + + if params.entity_type != EntityType.USER_REPLICA_SET: + raise Exception("Invalid entity type") + + if params.action != Action.UPDATE: + raise Exception("Invalid tx action") + + +def update_user_replica_set(params: ManageEntityParameters): + is_valid_user_replica_set_tx(params) + + user_id = params.user_id + existing_user = params.existing_records[EntityType.USER][user_id] + existing_user.is_current = False # invalidate + if ( + user_id in params.new_records[EntityType.USER] + ): # override with last updated user is in this block + existing_user = params.new_records[EntityType.USER][user_id][-1] + updated_user = copy_user_record( + existing_user, + params.block_number, + params.event_blockhash, + params.txhash, + params.block_datetime, + ) + # Validate the new replica set is valid + _, updated_sp_ids = parse_update_sp_id(params) + + # Update the user's new replica set in the model and save! + updated_user.primary_id = updated_sp_ids[0] + updated_user.secondary_ids = updated_sp_ids[1:] + updated_user.replica_set_update_signer = params.signer + + # Update cnode endpoint string reconstructed from sp ID + creator_node_endpoint_str = get_endpoint_string_from_sp_ids( + params.redis, updated_sp_ids[0], updated_sp_ids[1:] + ) + updated_user.creator_node_endpoint = creator_node_endpoint_str + + params.add_user_record(user_id, updated_user) diff --git a/discovery-provider/src/tasks/entity_manager/user_replica_set_unit_test.py b/discovery-provider/src/tasks/entity_manager/user_replica_set_unit_test.py new file mode 100644 index 00000000000..b5441db95ae --- /dev/null +++ b/discovery-provider/src/tasks/entity_manager/user_replica_set_unit_test.py @@ -0,0 +1,33 @@ +from src.tasks.entity_manager.user_replica_set import parse_update_sp_id + + +class Params: + def __init__(self, metadata_cid): + self.metadata_cid = metadata_cid + + +def test_parse_update_sp_id(): + params = Params("1,2,3:2,3,4") + result = parse_update_sp_id(params) + assert result == ([1, 2, 3], [2, 3, 4]) + + try: + params = Params("1,2,3:a,3,4") + result = parse_update_sp_id(params) + assert False + except Exception as e: + assert str(e) == "sp id of a is not a digit" + + try: + params = Params("1,2,3,4") + result = parse_update_sp_id(params) + assert False + except Exception as e: + assert str(e) == 'Invalid format entity_id should be ":" separated' + + try: + params = Params("1,2,3:4") + result = parse_update_sp_id(params) + assert False + except Exception as e: + assert str(e) == "Too few updated sp ids" diff --git a/discovery-provider/src/tasks/entity_manager/utils.py b/discovery-provider/src/tasks/entity_manager/utils.py index e98e1bfd0d4..560bc6514e2 100644 --- a/discovery-provider/src/tasks/entity_manager/utils.py +++ b/discovery-provider/src/tasks/entity_manager/utils.py @@ -11,10 +11,13 @@ from src.models.tracks.track_route import TrackRoute from src.models.users.user import User from src.utils import helpers +from src.utils.eth_manager import EthManager +from web3 import Web3 from web3.datastructures import AttributeDict PLAYLIST_ID_OFFSET = 400_000 -TRACK_ID_OFFSET = 1_000_000 +TRACK_ID_OFFSET = 2_000_000 +USER_ID_OFFSET = 3_000_000 class Action(str, Enum): @@ -36,6 +39,7 @@ class EntityType(str, Enum): PLAYLIST = "Playlist" TRACK = "Track" USER = "User" + USER_REPLICA_SET = "UserReplicaSet" FOLLOW = "Follow" SAVE = "Save" REPOST = "Repost" @@ -47,6 +51,7 @@ def __str__(self) -> str: class RecordDict(TypedDict): Playlist: Dict[int, List[Playlist]] Track: Dict[int, List[Track]] + User: Dict[int, List[User]] Follow: Dict[Tuple, List[Follow]] Save: Dict[Tuple, List[Save]] Repost: Dict[Tuple, List[Repost]] @@ -75,12 +80,15 @@ class ManageEntityParameters: def __init__( self, session, + redis, challenge_bus: ChallengeEventBus, event: AttributeDict, new_records: RecordDict, existing_records: ExistingRecordDict, pending_track_routes: List[TrackRoute], metadata: Dict[str, Dict[str, Dict]], + eth_manager: EthManager, + web3: Web3, block_timestamp: int, block_number: int, event_blockhash: str, @@ -96,7 +104,10 @@ def __init__( self.block_integer_time = int(block_timestamp) self.session = session + self.redis = redis self.challenge_bus = challenge_bus + self.web3 = web3 + self.eth_manager = eth_manager self.pending_track_routes = pending_track_routes self.event = event @@ -126,6 +137,50 @@ def add_social_feature_record( key = get_record_key(user_id, entity_type, entity_id) self.new_records[record_type][key].append(record) # type: ignore + def add_user_record(self, user_id: int, user: User): + self.new_records[EntityType.USER][user_id].append(user) # type: ignore + self.existing_records[EntityType.USER][user_id] = user # type: ignore + def get_record_key(user_id: int, entity_type: str, entity_id: int): return (user_id, entity_type.capitalize(), entity_id) + + +def copy_user_record( + old_user: User, + block_number: int, + event_blockhash: str, + txhash: str, + block_datetime: datetime, +): + return User( + user_id=old_user.user_id, + wallet=old_user.wallet, + created_at=old_user.created_at, + handle=old_user.handle, + name=old_user.name, + profile_picture=old_user.profile_picture, + cover_photo=old_user.cover_photo, + bio=old_user.bio, + location=old_user.location, + metadata_multihash=old_user.metadata_multihash, + creator_node_endpoint=old_user.creator_node_endpoint, + is_verified=old_user.is_verified, + handle_lc=old_user.handle_lc, + cover_photo_sizes=old_user.cover_photo_sizes, + profile_picture_sizes=old_user.profile_picture_sizes, + primary_id=old_user.primary_id, + secondary_ids=old_user.secondary_ids, + replica_set_update_signer=old_user.replica_set_update_signer, + has_collectibles=old_user.has_collectibles, + playlist_library=old_user.playlist_library, + is_deactivated=old_user.is_deactivated, + slot=old_user.slot, + user_storage_account=old_user.user_storage_account, + user_authority_account=old_user.user_authority_account, + updated_at=block_datetime, + blocknumber=block_number, + blockhash=event_blockhash, + txhash=txhash, + is_current=False, + ) diff --git a/discovery-provider/src/tasks/index.py b/discovery-provider/src/tasks/index.py index 7c586758310..c6ebb899431 100644 --- a/discovery-provider/src/tasks/index.py +++ b/discovery-provider/src/tasks/index.py @@ -32,7 +32,7 @@ from src.queries.skipped_transactions import add_network_level_skipped_transaction from src.tasks.celery_app import celery from src.tasks.entity_manager.entity_manager import entity_manager_update -from src.tasks.entity_manager.utils import EntityType +from src.tasks.entity_manager.utils import Action, EntityType from src.tasks.playlists import playlist_state_update from src.tasks.social_features import social_feature_state_update from src.tasks.sort_block_transactions import sort_block_transactions @@ -328,16 +328,20 @@ def fetch_cid_metadata(db, user_factory_txs, track_factory_txs, entity_manager_t event_args = entry["args"] user_id = event_args._userId cid = event_args._metadata - if not cid: + event_type = event_args._entityType + action = event_args._action + if not cid or event_type == EntityType.USER_REPLICA_SET: + continue + if action == Action.CREATE and event_type == EntityType.USER: continue cids_txhash_set.add((cid, txhash)) cid_to_user_id[cid] = user_id - if event_args._entityType == EntityType.PLAYLIST: + if event_type == EntityType.PLAYLIST: cid_type[cid] = "playlist_data" - elif event_args._entityType == EntityType.TRACK: + elif event_type == EntityType.TRACK: cid_type[cid] = "track" - elif event_args._entityType == EntityType.USER: + elif event_type == EntityType.USER: cid_type[cid] = "user" # user -> replica set string lookup, used to make user and track cid get_metadata fetches faster @@ -455,7 +459,6 @@ def get_contract_type_for_tx(tx_type_to_grouped_lists_map, tx, tx_receipt): tx_target_contract_address = tx["to"] contract_type = None for tx_type in tx_type_to_grouped_lists_map.keys(): - logger.info(f"index.py | checking {tx_type} vs {tx_target_contract_address}") tx_is_type = tx_target_contract_address == get_contract_addresses()[tx_type] if tx_is_type: contract_type = tx_type diff --git a/discovery-provider/src/tasks/user_replica_set.py b/discovery-provider/src/tasks/user_replica_set.py index 3e5113dd55f..0f404de6013 100644 --- a/discovery-provider/src/tasks/user_replica_set.py +++ b/discovery-provider/src/tasks/user_replica_set.py @@ -9,14 +9,15 @@ from src.models.users.user import User from src.queries.skipped_transactions import add_node_level_skipped_transaction from src.tasks.users import invalidate_old_user, lookup_user_record -from src.utils import helpers +from src.utils import helpers, web3_provider +from src.utils.config import shared_config from src.utils.eth_contracts_helpers import ( content_node_service_type, sp_factory_registry_key, ) from src.utils.indexing_errors import EntityMissingRequiredFieldError, IndexingError from src.utils.model_nullable_validator import all_required_fields_present -from src.utils.redis_cache import get_json_cached_key, get_sp_id_key +from src.utils.redis_cache import get_cn_sp_id_key, get_json_cached_key from src.utils.user_event_constants import ( user_replica_set_manager_event_types_arr, user_replica_set_manager_event_types_lookup, @@ -206,19 +207,19 @@ def get_user_replica_set_mgr_tx(update_task, event_type, tx_receipt): # creator_node_endpoint # If this discrepancy occurs, a client replica set health check sweep will # result in a client-initiated failover operation to a valid set of replicas -def get_endpoint_string_from_sp_ids(update_task, primary, secondaries): +def get_endpoint_string_from_sp_ids(redis, primary, secondaries): sp_factory_inst = None endpoint_string = None primary_endpoint = None try: sp_factory_inst, primary_endpoint = get_endpoint_from_id( - update_task, sp_factory_inst, primary + redis, sp_factory_inst, primary ) endpoint_string = f"{primary_endpoint}" for secondary_id in secondaries: secondary_endpoint = None sp_factory_inst, secondary_endpoint = get_endpoint_from_id( - update_task, sp_factory_inst, secondary_id + redis, sp_factory_inst, secondary_id ) # Conditionally log if endpoint is None after fetching if not secondary_endpoint: @@ -246,7 +247,7 @@ def get_ursm_cnode_endpoint(update_task, sp_id): sp_factory_inst = None try: sp_factory_inst, endpoint = get_endpoint_from_id( - update_task, sp_factory_inst, sp_id + update_task.redis, sp_factory_inst, sp_id ) except Exception as exc: logger.error( @@ -259,12 +260,12 @@ def get_ursm_cnode_endpoint(update_task, sp_id): # Initializes sp_factory if necessary and retrieves spID # Returns initialized instance of contract and endpoint -def get_endpoint_from_id(update_task, sp_factory_inst, sp_id): +def get_endpoint_from_id(redis, sp_factory_inst, sp_id): endpoint = None # Get sp_id cache key - cache_key = get_sp_id_key(sp_id) + cache_key = get_cn_sp_id_key(sp_id) # Attempt to fetch from cache - sp_info_cached = get_json_cached_key(update_task.redis, cache_key) + sp_info_cached = get_json_cached_key(redis, cache_key) if sp_info_cached: endpoint = sp_info_cached[1] logger.info( @@ -277,7 +278,7 @@ def get_endpoint_from_id(update_task, sp_factory_inst, sp_id): f"index.py | user_replica_set.py | CACHE MISS FOR {cache_key}, found {sp_info_cached}" ) if sp_factory_inst is None: - sp_factory_inst = get_sp_factory_inst(update_task) + sp_factory_inst = get_sp_factory_inst() cn_endpoint_info = sp_factory_inst.functions.getServiceEndpointInfo( content_node_service_type, sp_id @@ -291,9 +292,8 @@ def get_endpoint_from_id(update_task, sp_factory_inst, sp_id): # Return instance of ServiceProviderFactory initialized with configs -def get_sp_factory_inst(update_task): - shared_config = update_task.shared_config - eth_web3 = update_task.eth_web3 +def get_sp_factory_inst(): + eth_web3 = web3_provider.get_eth_web3() eth_registry_address = eth_web3.toChecksumAddress( shared_config["eth_contracts"]["registry"] ) @@ -352,7 +352,7 @@ def parse_user_record(update_task, entry, user_record, block_timestamp): # Update cnode endpoint string reconstructed from sp ID creator_node_endpoint_str = get_endpoint_string_from_sp_ids( - update_task, primary, secondaries + update_task.redis, primary, secondaries ) user_record.creator_node_endpoint = creator_node_endpoint_str diff --git a/discovery-provider/src/tasks/users.py b/discovery-provider/src/tasks/users.py index fc88dbe628f..b3a973b892b 100644 --- a/discovery-provider/src/tasks/users.py +++ b/discovery-provider/src/tasks/users.py @@ -22,6 +22,7 @@ from src.utils.model_nullable_validator import all_required_fields_present from src.utils.prometheus_metric import PrometheusMetric, PrometheusMetricNames from src.utils.user_event_constants import user_event_types_arr, user_event_types_lookup +from web3 import Web3 logger = logging.getLogger(__name__) @@ -338,74 +339,99 @@ def parse_user_event( if event_type == user_event_types_lookup["update_multihash"]: # Look up metadata multihash and override with metadata fields if metadata: - # Fields also stored on chain - if "profile_picture" in metadata and metadata["profile_picture"]: - user_record.profile_picture = metadata["profile_picture"] + user_record = update_user_metadata( + session, + update_task.redis, + user_record, + metadata, + update_task.web3, + update_task.challenge_event_bus, + ) - if "cover_photo" in metadata and metadata["cover_photo"]: - user_record.cover_photo = metadata["cover_photo"] + user_record = update_legacy_user_images(user_record) + user_record = validate_user_record(user_record) - if "bio" in metadata and metadata["bio"]: - user_record.bio = metadata["bio"] + return user_record - if "name" in metadata and metadata["name"]: - user_record.name = metadata["name"] - if "location" in metadata and metadata["location"]: - user_record.location = metadata["location"] +def update_user_metadata( + session, + redis, + user_record: User, + metadata: Dict, + web3: Web3, + challenge_event_bus: ChallengeEventBus, +): + # Fields also stored on chain + if "profile_picture" in metadata and metadata["profile_picture"]: + user_record.profile_picture = metadata["profile_picture"] - # Fields with no on-chain counterpart - if ( - "profile_picture_sizes" in metadata - and metadata["profile_picture_sizes"] - ): - user_record.profile_picture = metadata["profile_picture_sizes"] + if "cover_photo" in metadata and metadata["cover_photo"]: + user_record.cover_photo = metadata["cover_photo"] - if "cover_photo_sizes" in metadata and metadata["cover_photo_sizes"]: - user_record.cover_photo = metadata["cover_photo_sizes"] + if "bio" in metadata and metadata["bio"]: + user_record.bio = metadata["bio"] - if ( - "collectibles" in metadata - and metadata["collectibles"] - and isinstance(metadata["collectibles"], dict) - and metadata["collectibles"].items() - ): - user_record.has_collectibles = True - else: - user_record.has_collectibles = False + if "name" in metadata and metadata["name"]: + user_record.name = metadata["name"] - if "associated_wallets" in metadata: - update_user_associated_wallets( - session, - update_task, - user_record, - metadata["associated_wallets"], - "eth", - ) + if "location" in metadata and metadata["location"]: + user_record.location = metadata["location"] - if "associated_sol_wallets" in metadata: - update_user_associated_wallets( - session, - update_task, - user_record, - metadata["associated_sol_wallets"], - "sol", - ) + # Fields with no on-chain counterpart + if "profile_picture_sizes" in metadata and metadata["profile_picture_sizes"]: + user_record.profile_picture = metadata["profile_picture_sizes"] + + if "cover_photo_sizes" in metadata and metadata["cover_photo_sizes"]: + user_record.cover_photo = metadata["cover_photo_sizes"] - if "playlist_library" in metadata and metadata["playlist_library"]: - user_record.playlist_library = metadata["playlist_library"] + if ( + "collectibles" in metadata + and metadata["collectibles"] + and isinstance(metadata["collectibles"], dict) + and metadata["collectibles"].items() + ): + user_record.has_collectibles = True + else: + user_record.has_collectibles = False - if "is_deactivated" in metadata: - user_record.is_deactivated = metadata["is_deactivated"] + if "associated_wallets" in metadata: + update_user_associated_wallets( + session, + web3, + redis, + user_record, + metadata["associated_wallets"], + "eth", + ) - if "events" in metadata and metadata["events"]: - update_user_events( - session, - user_record, - metadata["events"], - update_task.challenge_event_bus, - ) + if "associated_sol_wallets" in metadata: + update_user_associated_wallets( + session, + web3, + redis, + user_record, + metadata["associated_sol_wallets"], + "sol", + ) + + if "playlist_library" in metadata and metadata["playlist_library"]: + user_record.playlist_library = metadata["playlist_library"] + + if "is_deactivated" in metadata: + user_record.is_deactivated = metadata["is_deactivated"] + if "events" in metadata and metadata["events"]: + update_user_events( + session, + user_record, + metadata["events"], + challenge_event_bus, + ) + return user_record + + +def update_legacy_user_images(user_record): # All incoming profile photos intended to be a directory # Any write to profile_picture field is replaced by profile_picture_sizes if user_record.profile_picture: @@ -424,6 +450,10 @@ def parse_user_event( user_record.cover_photo_sizes = user_record.cover_photo user_record.cover_photo = None + return user_record + + +def validate_user_record(user_record): if not all_required_fields_present(User, user_record): raise EntityMissingRequiredFieldError( "user", @@ -435,7 +465,7 @@ def parse_user_event( def update_user_associated_wallets( - session, update_task, user_record, associated_wallets, chain + session, web3, redis, user_record, associated_wallets, chain ): """Updates the user associated wallets table""" try: @@ -473,7 +503,7 @@ def update_user_associated_wallets( continue is_valid_signature = validate_signature( chain, - update_task.web3, + web3, user_record.user_id, associated_wallet, wallet_metadata["signature"], @@ -521,7 +551,7 @@ def update_user_associated_wallets( is_updated_wallets = set(previous_wallets) != added_associated_wallets if is_updated_wallets: - enqueue_immediate_balance_refresh(update_task.redis, [user_record.user_id]) + enqueue_immediate_balance_refresh(redis, [user_record.user_id]) except Exception as e: logger.error( f"index.py | users.py | Fatal updating user associated wallets while indexing {e}", diff --git a/discovery-provider/src/utils/eth_contracts_helpers.py b/discovery-provider/src/utils/eth_contracts_helpers.py index e1cec537d1e..7ede1820232 100644 --- a/discovery-provider/src/utils/eth_contracts_helpers.py +++ b/discovery-provider/src/utils/eth_contracts_helpers.py @@ -3,8 +3,8 @@ from src.utils.helpers import is_fqdn from src.utils.redis_cache import ( + get_cn_sp_id_key, get_json_cached_key, - get_sp_id_key, set_json_cached_key, ) @@ -17,7 +17,7 @@ def fetch_cnode_info(sp_id, sp_factory_instance, redis): - sp_id_key = get_sp_id_key(sp_id) + sp_id_key = get_cn_sp_id_key(sp_id) sp_info_cached = get_json_cached_key(redis, sp_id_key) if sp_info_cached: logger.info( diff --git a/discovery-provider/src/utils/eth_manager.py b/discovery-provider/src/utils/eth_manager.py new file mode 100644 index 00000000000..f96049db7cf --- /dev/null +++ b/discovery-provider/src/utils/eth_manager.py @@ -0,0 +1,126 @@ +import concurrent.futures +import logging +from enum import Enum +from typing import Any, TypedDict + +from redis import Redis +from src.utils.helpers import is_fqdn, load_eth_abi_values +from src.utils.redis_cache import ( + get_cn_sp_id_key, + get_dn_sp_id_key, + get_json_cached_key, + set_json_cached_key, +) +from web3 import Web3 + +logger = logging.getLogger(__name__) +eth_abi_values = load_eth_abi_values() + + +class ServiceProviderType(Enum): + DISCOVERY = bytes("discovery-node", "utf-8") + CONTENT = bytes("content-node", "utf-8") + + +class SPInfo(TypedDict): + operator_wallet: str + delegator_wallet: str + endpoint: str + block_number: int + + +class EthManager: + sp_factory_registry_key = bytes("ServiceProviderFactory", "utf-8") + sp_type = ServiceProviderType + + cnode_info_redis_ttl = 1800 + + def __init__(self, eth_web3: Web3, eth_abi_values: Any, registry_address: str): + self.eth_web3 = eth_web3 + self.eth_abi_values = eth_abi_values + self.registry_address = registry_address + + def init_contracts(self): + eth_registry = self.eth_web3.eth.contract( + address=self.registry_address, abi=eth_abi_values["Registry"]["abi"] + ) + + sp_factory_address = eth_registry.functions.getContract( + EthManager.sp_factory_registry_key + ).call() + sp_factory = self.eth_web3.eth.contract( + address=sp_factory_address, + abi=self.eth_abi_values["ServiceProviderFactory"]["abi"], + ) + + self.eth_registry = eth_registry + self.sp_factory = sp_factory + + def fetch_node_info( + self, sp_id: int, sp_type: ServiceProviderType, redis: Redis + ) -> SPInfo: + sp_id_key = ( + get_cn_sp_id_key(sp_id) + if ServiceProviderType.CONTENT == sp_type + else get_dn_sp_id_key(sp_id) + ) + sp_info_cached = get_json_cached_key(redis, sp_id_key) + + if sp_info_cached: + logger.info( + f"eth_manager.py | Found cached value for spID={sp_id} - {sp_info_cached}" + ) + return { + "operator_wallet": sp_info_cached[0], + "endpoint": sp_info_cached[1], + "block_number": sp_info_cached[2], + "delegator_wallet": sp_info_cached[3], + } + + endpoint_info = self.sp_factory.functions.getServiceEndpointInfo( + sp_type, sp_id + ).call() + set_json_cached_key(redis, sp_id_key, endpoint_info, self.cnode_info_redis_ttl) + logger.info( + f"eth_manager.py | Configured redis {sp_id_key} - {endpoint_info} - TTL {self.cnode_info_redis_ttl}" + ) + sp_info: SPInfo = { + "operator_wallet": endpoint_info[0], + "endpoint": endpoint_info[1], + "block_number": endpoint_info[2], + "delegator_wallet": endpoint_info[3], + } + return sp_info + + def fetch_all_registered_content_nodes( + self, sp_type: ServiceProviderType, redis: Redis + ) -> set: + total_providers = self.sp_factory.functions.getTotalServiceTypeProviders( + sp_type + ).call() + ids_list = list(range(1, total_providers + 1)) + eth_endpoints_set = set() + + # Given the total number of nodes in the network we can now fetch node info in parallel + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + fetch_cnode_futures = { + executor.submit(self.fetch_node_info, i, sp_type, redis): i + for i in ids_list + } + for future in concurrent.futures.as_completed(fetch_cnode_futures): + single_node_fetch_op = fetch_cnode_futures[future] + try: + endpoint_info = future.result() + # Validate the endpoint on chain + # As endpoints get deregistered, this peering system must not slow down with failed connections + # or unanticipated load + eth_sp_endpoint = endpoint_info["endpoint"] + valid_endpoint = is_fqdn(eth_sp_endpoint) + # Only valid FQDN strings are worth validating + if valid_endpoint: + eth_endpoints_set.add(eth_sp_endpoint) + except Exception as exc: + logger.error( + f"eth_contract_helpers.py | ERROR in fetch_node_futures {single_node_fetch_op} generated {exc}" + ) + return eth_endpoints_set diff --git a/discovery-provider/src/utils/redis_cache.py b/discovery-provider/src/utils/redis_cache.py index bd52b410ae6..459c68ac399 100644 --- a/discovery-provider/src/utils/redis_cache.py +++ b/discovery-provider/src/utils/redis_cache.py @@ -175,8 +175,12 @@ def get_playlist_id_cache_key(id): return f"playlist:id:{id}" -def get_sp_id_key(id): - return f"sp:id:{id}" +def get_cn_sp_id_key(id): + return f"sp:cn:id:{id}" + + +def get_dn_sp_id_key(id): + return f"sp:dn:id:{id}" def remove_cached_user_ids(redis, user_ids): diff --git a/libs/src/api/Account.ts b/libs/src/api/Account.ts index b783c7f2dd1..9b6a296775b 100644 --- a/libs/src/api/Account.ts +++ b/libs/src/api/Account.ts @@ -130,7 +130,8 @@ export class Account extends Base { handleUserBankOutcomes = (_outcome?: string, _errorCodes?: {}) => {}, userBankOutcomes: Partial = {}, feePayerOverride: Nullable = null, - generateRecoveryLink = true + generateRecoveryLink = true, + useEntityManager = false ) { const phases = { ADD_REPLICA_SET: 'ADD_REPLICA_SET', @@ -197,25 +198,36 @@ export class Account extends Base { } })() } - // Add user to chain - phase = phases.ADD_USER - const response = await this.User.addUser(metadata) - userId = response.userId - blockHash = response.blockHash - blockNumber = response.blockNumber - - // Assign replica set to user, updates creator_node_endpoint on chain, and then update metadata object on content node + chain (in this order) - phase = phases.ADD_REPLICA_SET - metadata = (await this.User.assignReplicaSet({ userId }))! - - // Upload profile pic and cover photo to primary Content Node and sync across secondaries - phase = phases.UPLOAD_PROFILE_IMAGES - await this.User.uploadProfileImages( - profilePictureFile!, - coverPhotoFile!, - metadata - ) + if (!useEntityManager) { + phase = phases.ADD_USER + const response = await this.User.addUser(metadata) + userId = response.userId + blockHash = response.blockHash + blockNumber = response.blockNumber + + // Assign replica set to user, updates creator_node_endpoint on chain, and then update metadata object on content node + chain (in this order) + phase = phases.ADD_REPLICA_SET + metadata = (await this.User.assignReplicaSet({ userId }))! + // Upload profile pic and cover photo to primary Content Node and sync across secondaries + phase = phases.UPLOAD_PROFILE_IMAGES + await this.User.uploadProfileImages( + profilePictureFile!, + coverPhotoFile!, + metadata, + useEntityManager + ) + } else { + const newMetadata = await this.User.createEntityManagerUser({ + metadata + }) + await this.User.uploadProfileImages( + profilePictureFile!, + coverPhotoFile!, + newMetadata, + useEntityManager + ) + } } catch (e: any) { return { error: e.message, diff --git a/libs/src/api/Users.ts b/libs/src/api/Users.ts index 1b7e9c4bc63..71b316ba1f6 100644 --- a/libs/src/api/Users.ts +++ b/libs/src/api/Users.ts @@ -7,6 +7,7 @@ import { setSpIDForEndpoint } from '../services/creatorNode' import type { ServiceProvider } from './ServiceProvider' +import { EntityManagerClient } from '../services/dataContracts/EntityManagerClient' // User metadata fields that are required on the metadata object and can have // null or non-null values @@ -57,6 +58,7 @@ export class Users extends Base { this.getSocialFeed = this.getSocialFeed.bind(this) this.getTopCreatorsByGenres = this.getTopCreatorsByGenres.bind(this) this.uploadProfileImages = this.uploadProfileImages.bind(this) + this.createEntityManagerUser = this.createEntityManagerUser.bind(this) this.addUser = this.addUser.bind(this) this.updateUser = this.updateUser.bind(this) this.updateCreator = this.updateCreator.bind(this) @@ -337,7 +339,6 @@ export class Users extends Base { const errorMsg = `assignReplicaSet() Error -- Phase ${phase} in ${ Date.now() - fnStartMs }ms: ${e}` - console.log(errorMsg) throw new Error(errorMsg) } @@ -355,7 +356,8 @@ export class Users extends Base { async uploadProfileImages( profilePictureFile: File, coverPhotoFile: File, - metadata: UserMetadata + metadata: UserMetadata, + useEntityManager: boolean ) { let didMetadataUpdate = false if (profilePictureFile) { @@ -372,13 +374,126 @@ export class Users extends Base { if (didMetadataUpdate) { await this.updateAndUploadMetadata({ newMetadata: metadata, - userId: metadata.user_id + userId: metadata.user_id, + useEntityManager }) } return metadata } + async createEntityManagerUser({ metadata }: { metadata: UserMetadata }) { + this.REQUIRES(Services.CREATOR_NODE) + const phases = { + CLEAN_AND_VALIDATE_METADATA: 'CLEAN_AND_VALIDATE_METADATA', + AUTOSELECT_CONTENT_NODES: 'AUTOSELECT_CONTENT_NODES', + SYNC_ACROSS_CONTENT_NODES: 'SYNC_ACROSS_CONTENT_NODES', + SET_PRIMARY: 'SET_PRIMARY', + UPLOAD_METADATA_AND_UPDATE_ON_CHAIN: 'UPLOAD_METADATA_AND_UPDATE_ON_CHAIN' + } + let phase = '' + + const logPrefix = `[User:assignReplicaSet()]` + const fnStartMs = Date.now() + let startMs = fnStartMs + + // The new metadata object that will contain the replica set + try { + // Create starter metadata and validate + phase = phases.CLEAN_AND_VALIDATE_METADATA + + // Autoselect a new replica set and update the metadata object with new content node endpoints + phase = phases.AUTOSELECT_CONTENT_NODES + const response = await this.ServiceProvider.autoSelectCreatorNodes({ + performSyncCheck: false, + preferHigherPatchForPrimary: this.preferHigherPatchForPrimary, + preferHigherPatchForSecondaries: this.preferHigherPatchForSecondaries + }) + console.log( + `${logPrefix} [phase: ${phase}] ServiceProvider.autoSelectCreatorNodes() completed in ${ + Date.now() - startMs + }ms` + ) + startMs = Date.now() + + // Ideally, 1 primary and n-1 secondaries are chosen. The best-worst case scenario is that at least 1 primary + // is chosen. If a primary was not selected (which also implies that secondaries were not chosen), throw + // an error. + const { primary, secondaries } = response + if (!primary) { + throw new Error('Could not select a primary.') + } + const spIds = await Promise.all( + [primary, ...secondaries].map( + async (endpoint) => await this._retrieveSpIDFromEndpoint(endpoint) + ) + ) + + // Create the user with entityMananer + const userId = Users.generateUserId() + const manageEntityResponse = + await this.contracts.EntityManagerClient!.manageEntity( + userId, + EntityManagerClient.EntityType.USER, + userId, + EntityManagerClient.Action.CREATE, + spIds.join(',') + ) + + await this.waitForReplicaSetDiscoveryIndexing( + userId, + spIds, + manageEntityResponse.txReceipt.blockNumber + ) + + // Upload metadata and call update user with the metadata + const newMetadata = this.cleanUserMetadata({ ...metadata }) + this._validateUserMetadata(newMetadata) + + const newContentNodeEndpoints = CreatorNode.buildEndpoint( + primary, + secondaries + ) + newMetadata.wallet = this.web3Manager.getWalletAddress() + newMetadata.user_id = userId + newMetadata.creator_node_endpoint = newContentNodeEndpoints + this.userStateManager.setCurrentUser({ + ...newMetadata, + // Initialize counts to be 0. We don't want to write this data to backends ever really + // (hence the cleanUserMetadata above), but we do want to make sure clients + // can properly "do math" on these numbers. + followee_count: 0, + follower_count: 0, + repost_count: 0 + }) + + // Update the new primary to the auto-selected primary + phase = phases.SET_PRIMARY + await this.creatorNode.setEndpoint(primary) + + // Update metadata in CN and on chain of newly assigned replica set + phase = phases.UPLOAD_METADATA_AND_UPDATE_ON_CHAIN + await this.updateAndUploadMetadata({ + newMetadata, + userId, + useEntityManager: true + }) + console.log( + `${logPrefix} [phase: ${phase}] updateAndUploadMetadata() completed in ${ + Date.now() - startMs + }ms` + ) + + console.log(`${logPrefix} completed in ${Date.now() - fnStartMs}ms`) + return newMetadata + } catch (e) { + const errorMsg = `assignReplicaSet() Error -- Phase ${phase} in ${ + Date.now() - fnStartMs + }ms: ${e}` + throw new Error(errorMsg) + } + } + /** * Create an on-chain non-creator user. Some fields are restricted (ex. * creator_node_endpoint); this should error if the metadata given attempts to set them. @@ -398,8 +513,10 @@ export class Users extends Base { await this.contracts.UserFactoryClient.addUser(newMetadata.handle) ).userId } - const { latestBlockHash: blockHash, latestBlockNumber: blockNumber } = - await this._addUserOperations(userId, newMetadata) + + const result = await this._addUserOperations(userId, newMetadata) + const blockHash: string | undefined = result.latestBlockHash + const blockNumber: number = result.latestBlockNumber newMetadata.wallet = this.web3Manager.getWalletAddress() newMetadata.user_id = userId @@ -416,6 +533,37 @@ export class Users extends Base { return { blockHash, blockNumber, userId } } + async updateEntityManagerReplicaSet({ + userId, + primary, + secondaries, + oldPrimary, + oldSecondaries + }: { + userId: number + primary: number + secondaries: number[] + oldPrimary: number + oldSecondaries: number[] + }) { + const updateReplica = `${[oldPrimary, ...oldSecondaries].join(',')}:${[ + primary, + ...secondaries + ].join(',')}` + + const response = await this.contracts.EntityManagerClient!.manageEntity( + userId, + EntityManagerClient.EntityType.USER_REPLICA_SET, + userId, + EntityManagerClient.Action.UPDATE, + updateReplica + ) + return { + blockHash: response.txReceipt.blockHash, + blockNumber: response.txReceipt.blockNumber + } + } + /** * Updates a user */ @@ -449,13 +597,17 @@ export class Users extends Base { /** * Updates a creator (updates their data on the creator node) */ - async updateCreator(userId: number, metadata: UserMetadata) { + async updateCreator( + userId: number, + metadata: UserMetadata, + useEntityManager: boolean + ) { this.REQUIRES(Services.CREATOR_NODE, Services.DISCOVERY_PROVIDER) this.IS_OBJECT(metadata) const newMetadata = this.cleanUserMetadata(metadata) this._validateUserMetadata(newMetadata) - const logPrefix = `[User:updateCreator()] [userId: ${userId}]` + const logPrefix = `[User:updateCreator()] [userId: ${userId}] [useEntityManager: ${useEntityManager}]` const fnStartMs = Date.now() let startMs = fnStartMs @@ -490,7 +642,8 @@ export class Users extends Base { const { txReceipt: updateEndpointTxReceipt, replicaSetSPIDs } = await this._updateReplicaSetOnChain( userId, - newMetadata.creator_node_endpoint + newMetadata.creator_node_endpoint, + useEntityManager ) updateEndpointTxBlockNumber = updateEndpointTxReceipt?.blockNumber console.log( @@ -499,11 +652,18 @@ export class Users extends Base { }ms` ) startMs = Date.now() - - await this._waitForURSMCreatorNodeEndpointIndexing( - userId, - replicaSetSPIDs - ) + if (useEntityManager) { + await this.waitForReplicaSetDiscoveryIndexing( + userId, + replicaSetSPIDs, + updateEndpointTxBlockNumber + ) + } else { + await this._waitForURSMCreatorNodeEndpointIndexing( + userId, + replicaSetSPIDs + ) + } console.log( `${logPrefix} _waitForURSMCreatorNodeEndpointIndexing() completed in ${ Date.now() - startMs @@ -519,23 +679,49 @@ export class Users extends Base { updateEndpointTxBlockNumber ) - // Write metadata multihash to chain - const updatedMultihashDecoded = Utils.decodeMultihash(metadataMultihash) - const { txReceipt } = - await this.contracts.UserFactoryClient.updateMultihash( + let txReceipt + let latestBlockHash + let latestBlockNumber + + if (!useEntityManager) { + // Write metadata multihash to chain + + const updatedMultihashDecoded = Utils.decodeMultihash(metadataMultihash) + const updateMultiHashResp = + await this.contracts.UserFactoryClient.updateMultihash( + userId, + updatedMultihashDecoded.digest + ) + txReceipt = updateMultiHashResp.txReceipt + + // Write remaining metadata fields to chain + const updateUserResp = await this._updateUserOperations( + newMetadata, + oldMetadata, + userId + ) + latestBlockHash = updateUserResp.latestBlockHash + latestBlockNumber = Math.max( + txReceipt.blockNumber, + updateUserResp.latestBlockNumber + ) + } else { + const response = await this.contracts.EntityManagerClient!.manageEntity( + userId, + EntityManagerClient.EntityType.USER, userId, - updatedMultihashDecoded.digest + EntityManagerClient.Action.UPDATE, + metadataMultihash ) - - // Write remaining metadata fields to chain - let { latestBlockHash, latestBlockNumber } = - await this._updateUserOperations(newMetadata, oldMetadata, userId) + txReceipt = response.txReceipt + latestBlockNumber = txReceipt.blockNumber + } // Write to CN to associate blockchain user id with updated metadata and block number await this.creatorNode.associateCreator( userId, metadataFileUUID, - Math.max(txReceipt.blockNumber, latestBlockNumber) + latestBlockNumber ) // Update libs instance with new user metadata object @@ -605,10 +791,12 @@ export class Users extends Base { */ async updateAndUploadMetadata({ newMetadata, - userId + userId, + useEntityManager }: { newMetadata: UserMetadata userId: number + useEntityManager: boolean }) { this.REQUIRES(Services.CREATOR_NODE, Services.DISCOVERY_PROVIDER) this.IS_OBJECT(newMetadata) @@ -640,26 +828,45 @@ export class Users extends Base { newMetadata.creator_node_endpoint !== oldMetadata.creator_node_endpoint ) { phase = phases.UPDATE_CONTENT_NODE_ENDPOINT_ON_CHAIN - const { replicaSetSPIDs } = await this._updateReplicaSetOnChain( - userId, - newMetadata.creator_node_endpoint - ) + const { txReceipt, replicaSetSPIDs } = + await this._updateReplicaSetOnChain( + userId, + newMetadata.creator_node_endpoint, + useEntityManager + ) console.log( `${logPrefix} [phase: ${phase}] _updateReplicaSetOnChain() completed in ${ Date.now() - startMs }ms` ) - startMs = Date.now() + if (useEntityManager) { + startMs = Date.now() + await this.waitForReplicaSetDiscoveryIndexing( + userId, + replicaSetSPIDs, + txReceipt.blockNumber + ) + // @ts-expect-error + newMetadata.primary_id = replicaSetSPIDs[0] + newMetadata.secondary_ids = replicaSetSPIDs.slice(1) + console.log( + `${logPrefix} [phase: ${phase}] waitForReplicaSetDiscoveryIndexing() completed in ${ + Date.now() - startMs + }ms` + ) + } else { + startMs = Date.now() - await this._waitForURSMCreatorNodeEndpointIndexing( - userId, - replicaSetSPIDs - ) - console.log( - `${logPrefix} [phase: ${phase}] _waitForURSMCreatorNodeEndpointIndexing() completed in ${ - Date.now() - startMs - }ms` - ) + await this._waitForURSMCreatorNodeEndpointIndexing( + userId, + replicaSetSPIDs + ) + console.log( + `${logPrefix} [phase: ${phase}] _waitForURSMCreatorNodeEndpointIndexing() completed in ${ + Date.now() - startMs + }ms` + ) + } } // Upload new metadata object to CN @@ -676,32 +883,48 @@ export class Users extends Base { // Write metadata multihash to chain phase = phases.UPDATE_METADATA_ON_CHAIN - const updatedMultihashDecoded = Utils.decodeMultihash(metadataMultihash) - const { txReceipt } = - await this.contracts.UserFactoryClient.updateMultihash( + let txReceipt + let blockNumber + if (useEntityManager) { + const response = await this.contracts.EntityManagerClient!.manageEntity( + userId, + EntityManagerClient.EntityType.USER, + userId, + EntityManagerClient.Action.UPDATE, + metadataMultihash + ) + txReceipt = response.txReceipt + blockNumber = txReceipt.blockNumber + } else { + const updatedMultihashDecoded = Utils.decodeMultihash(metadataMultihash) + const res = await this.contracts.UserFactoryClient.updateMultihash( userId, updatedMultihashDecoded.digest ) - console.log( - `${logPrefix} [phase: ${phase}] UserFactoryClient.updateMultihash() completed in ${ - Date.now() - startMs - }ms` - ) - startMs = Date.now() + txReceipt = res.txReceipt + console.log( + `${logPrefix} [phase: ${phase}] UserFactoryClient.updateMultihash() completed in ${ + Date.now() - startMs + }ms` + ) + startMs = Date.now() + + // Write remaining metadata fields to chain + phase = phases.UPDATE_USER_ON_CHAIN_OPS + const { latestBlockNumber } = await this._updateUserOperations( + newMetadata, + oldMetadata, + userId, + ['creator_node_endpoint'] + ) + console.log( + `${logPrefix} [phase: ${phase}] _updateUserOperations() completed in ${ + Date.now() - startMs + }ms` + ) + blockNumber = Math.max(txReceipt.blockNumber, latestBlockNumber) + } - // Write remaining metadata fields to chain - phase = phases.UPDATE_USER_ON_CHAIN_OPS - const { latestBlockNumber } = await this._updateUserOperations( - newMetadata, - oldMetadata, - userId, - ['creator_node_endpoint'] - ) - console.log( - `${logPrefix} [phase: ${phase}] _updateUserOperations() completed in ${ - Date.now() - startMs - }ms` - ) startMs = Date.now() // Write to CN to associate blockchain user id with updated metadata and block number @@ -709,7 +932,7 @@ export class Users extends Base { await this.creatorNode.associateCreator( userId, metadataFileUUID, - Math.max(txReceipt.blockNumber, latestBlockNumber) + blockNumber ) console.log( `${logPrefix} [phase: ${phase}] creatorNode.associateCreator() completed in ${ @@ -727,7 +950,6 @@ export class Users extends Base { const errorMsg = `updateAndUploadMetadata() Error -- Phase ${phase} in ${ Date.now() - fnStartMs }ms: ${e}` - console.log(errorMsg) throw new Error(errorMsg) } } @@ -771,6 +993,58 @@ export class Users extends Base { } } + /** + * Waits for the input replica set to be indexed by the discovery node + * If then replica set matches at the requested block number -> return null + * If the replica set response is null at the block number -> throw error + * If the replica set is mismatched at the block number -> throw error + * If the timeout is exceeded before replica set indexed -> throw error + */ + async waitForReplicaSetDiscoveryIndexing( + userId: number, + replicaSetSPIDs: number[], + blockNumber: number, + timeoutMs = 60000 + ): Promise { + const asyncFn = async () => { + while (true) { + const encodedUserId = Utils.encodeHashId(userId) + let replicaSet + try { + // If the discovery node has not yet indexed the blocknumber, + // this method will throw an error + // If the user replica set does not exist, it will return an empty object + // which should lead to the method throwing an error + replicaSet = await this.discoveryProvider.getUserReplicaSet({ + encodedUserId: encodedUserId!, + blockNumber + }) + } catch (err) { + // Do nothing on error + } + if (replicaSet) { + if ( + replicaSet.primarySpID === replicaSetSPIDs[0] && + replicaSet.secondary1SpID === replicaSetSPIDs[1] && + replicaSet.secondary2SpID === replicaSetSPIDs[2] + ) { + break + } else { + throw new Error( + `[User:waitForReplicaSetDiscoveryIndexing()] Indexed block ${blockNumber}, but did not find matching sp ids` + ) + } + } + await Utils.wait(500) + } + } + await Utils.racePromiseWithTimeout( + asyncFn(), + timeoutMs, + `[User:waitForReplicaSetDiscoveryIndexing()] Timeout error after ${timeoutMs}ms` + ) + } + async _waitForURSMCreatorNodeEndpointIndexing( userId: number, replicaSetSPIDs: number[], @@ -964,7 +1238,11 @@ export class Users extends Base { // Perform replica set update // Conditionally write to UserFactory contract, else write to UserReplicaSetManager // This behavior is to ensure backwards compatibility prior to contract deploy - async _updateReplicaSetOnChain(userId: number, creatorNodeEndpoint: string) { + async _updateReplicaSetOnChain( + userId: number, + creatorNodeEndpoint: string, + useEntityManager: boolean + ) { // Attempt to update through UserReplicaSetManagerClient if present if (!this.contracts.UserReplicaSetManagerClient) { await this.contracts.initUserReplicaSetManagerClient() @@ -984,14 +1262,50 @@ export class Users extends Base { this._retrieveSpIDFromEndpoint(secondaries[0]!), this._retrieveSpIDFromEndpoint(secondaries[1]!) ]) + let txReceipt + const currentUser = this.userStateManager.getCurrentUser() + if (!currentUser) throw new Error('Current user missing') // Update in new contract - const txReceipt = - await this.contracts.UserReplicaSetManagerClient?.updateReplicaSet( - userId, - primarySpID, - [secondary1SpID, secondary2SpID] + if (useEntityManager) { + const currentPrimaryEndpoint = CreatorNode.getPrimary( + currentUser.creator_node_endpoint + ) + const currentSecondaries = CreatorNode.getSecondaries( + currentUser.creator_node_endpoint ) + + if (currentSecondaries.length < 2) { + throw new Error( + `Invalid number of secondaries found - received ${currentSecondaries}` + ) + } + + const [oldPrimary, oldSecondary1SpID, oldSecondary2SpID] = + await Promise.all([ + this._retrieveSpIDFromEndpoint(currentPrimaryEndpoint!), + this._retrieveSpIDFromEndpoint(currentSecondaries[0]!), + this._retrieveSpIDFromEndpoint(currentSecondaries[1]!) + ]) + + txReceipt = await this.updateEntityManagerReplicaSet({ + userId, + primary: primarySpID, + secondaries: [secondary1SpID, secondary2SpID], + oldPrimary: oldPrimary, + oldSecondaries: [oldSecondary1SpID, oldSecondary2SpID] + }) + } else { + txReceipt = + await this.contracts.UserReplicaSetManagerClient?.updateReplicaSet( + userId, + primarySpID, + [secondary1SpID, secondary2SpID] + ) + } + if (!txReceipt) { + throw new Error('Unable to update replica set on chain') + } const replicaSetSPIDs = [primarySpID, secondary1SpID, secondary2SpID] return { txReceipt, @@ -1019,4 +1333,14 @@ export class Users extends Base { } return spID } + + // Minimum user ID, intentionally higher than legacy user ID range + static MIN_USER_ID = 2000000 + + // Maximum user ID, reflects postgres max integer value + static MAX_USER_ID = 2147483647 + + static generateUserId(): number { + return Utils.getRandomInt(Users.MIN_USER_ID, Users.MAX_USER_ID) + } } diff --git a/libs/src/data-contracts/signatureSchemas.ts b/libs/src/data-contracts/signatureSchemas.js similarity index 59% rename from libs/src/data-contracts/signatureSchemas.ts rename to libs/src/data-contracts/signatureSchemas.js index 921929523ac..7d5537e4252 100644 --- a/libs/src/data-contracts/signatureSchemas.ts +++ b/libs/src/data-contracts/signatureSchemas.js @@ -7,22 +7,9 @@ * modeled off: https://github.com/ethereum/EIPs/blob/master/EIPS/eip-712.md */ -import type { - EIP712Domain, - EIP712Message, - EIP712TypedData, - EIP712TypeProperty, - EIP712Types -} from 'eth-sig-util' - -type DomainFn = (chainId: number, contactAddress: string) => EIP712Domain - -function getDomainData( - contractName: string, - signatureVersion: string, - chainId: number, - contractAddress: string -): EIP712Domain { +const domains = {} + +function getDomainData (contractName, signatureVersion, chainId, contractAddress) { return { name: contractName, version: signatureVersion, @@ -31,56 +18,42 @@ function getDomainData( } } -const getSocialFeatureFactoryDomain: DomainFn = (chainId, contractAddress) => { +domains.getSocialFeatureFactoryDomain = function (chainId, contractAddress) { return getDomainData('Social Feature Factory', '1', chainId, contractAddress) } -const getUserFactoryDomain: DomainFn = (chainId, contractAddress) => { +domains.getUserFactoryDomain = function (chainId, contractAddress) { return getDomainData('User Factory', '1', chainId, contractAddress) } -const getTrackFactoryDomain: DomainFn = (chainId, contractAddress) => { +domains.getTrackFactoryDomain = function (chainId, contractAddress) { return getDomainData('Track Factory', '1', chainId, contractAddress) } -const getPlaylistFactoryDomain: DomainFn = (chainId, contractAddress) => { +domains.getPlaylistFactoryDomain = function (chainId, contractAddress) { return getDomainData('Playlist Factory', '1', chainId, contractAddress) } -const getUserLibraryFactoryDomain: DomainFn = (chainId, contractAddress) => { +domains.getUserLibraryFactoryDomain = function (chainId, contractAddress) { return getDomainData('User Library Factory', '1', chainId, contractAddress) } -const getIPLDBlacklistFactoryDomain: DomainFn = (chainId, contractAddress) => { +domains.getIPLDBlacklistFactoryDomain = function (chainId, contractAddress) { return getDomainData('IPLD Blacklist Factory', '1', chainId, contractAddress) } -const getUserReplicaSetManagerDomain: DomainFn = (chainId, contractAddress) => { - return getDomainData( - 'User Replica Set Manager', - '1', - chainId, - contractAddress - ) +domains.getUserReplicaSetManagerDomain = function (chainId, contractAddress) { + return getDomainData('User Replica Set Manager', '1', chainId, contractAddress) } -const getEntityManagerDomain: DomainFn = (chainId, contractAddress) => { - return getDomainData('Entity Manager', '1', chainId, contractAddress) +domains.getEntityManagerDomain = function (chainId, contractAddress) { + return getDomainData("Entity Manager", "1", chainId, contractAddress) } -export const domains = { - getSocialFeatureFactoryDomain, - getUserFactoryDomain, - getTrackFactoryDomain, - getPlaylistFactoryDomain, - getUserLibraryFactoryDomain, - getIPLDBlacklistFactoryDomain, - getUserReplicaSetManagerDomain, - getEntityManagerDomain -} +const schemas = {} /* contract signing domain */ -const domain = [ +schemas.domain = [ { name: 'name', type: 'string' }, { name: 'version', type: 'string' }, { name: 'chainId', type: 'uint256' }, @@ -88,33 +61,33 @@ const domain = [ ] /* user factory requests */ -const addUserRequest = [ +schemas.addUserRequest = [ { name: 'handle', type: 'bytes16' }, { name: 'nonce', type: 'bytes32' } ] /* rather than having a schema type for every update op, we have a type for each unique * structure */ -const updateUserBytes32 = [ +schemas.updateUserBytes32 = [ { name: 'userId', type: 'uint' }, { name: 'newValue', type: 'bytes32' }, { name: 'nonce', type: 'bytes32' } ] -const updateUserString = [ +schemas.updateUserString = [ { name: 'userId', type: 'uint' }, { name: 'newValue', type: 'string' }, { name: 'nonce', type: 'bytes32' } ] -const updateUserBool = [ +schemas.updateUserBool = [ { name: 'userId', type: 'uint' }, { name: 'newValue', type: 'bool' }, { name: 'nonce', type: 'bytes32' } ] /* track factory requests */ -const addTrackRequest = [ +schemas.addTrackRequest = [ { name: 'trackOwnerId', type: 'uint' }, { name: 'multihashDigest', type: 'bytes32' }, { name: 'multihashHashFn', type: 'uint8' }, @@ -122,7 +95,7 @@ const addTrackRequest = [ { name: 'nonce', type: 'bytes32' } ] -const updateTrackRequest = [ +schemas.updateTrackRequest = [ { name: 'trackId', type: 'uint' }, { name: 'trackOwnerId', type: 'uint' }, { name: 'multihashDigest', type: 'bytes32' }, @@ -131,37 +104,37 @@ const updateTrackRequest = [ { name: 'nonce', type: 'bytes32' } ] -const deleteTrackRequest = [ +schemas.deleteTrackRequest = [ { name: 'trackId', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] /* social features */ -const addTrackRepostRequest = [ +schemas.addTrackRepostRequest = [ { name: 'userId', type: 'uint' }, { name: 'trackId', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] -const deleteTrackRepostRequest = addTrackRepostRequest +schemas.deleteTrackRepostRequest = schemas.addTrackRepostRequest -const addPlaylistRepostRequest = [ +schemas.addPlaylistRepostRequest = [ { name: 'userId', type: 'uint' }, { name: 'playlistId', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] -const deletePlaylistRepostRequest = addPlaylistRepostRequest +schemas.deletePlaylistRepostRequest = schemas.addPlaylistRepostRequest -const userFollowRequest = [ +schemas.userFollowRequest = [ { name: 'followerUserId', type: 'uint' }, { name: 'followeeUserId', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] -const deleteUserFollowRequest = userFollowRequest +schemas.deleteUserFollowRequest = schemas.userFollowRequest -const createPlaylistRequest = [ +schemas.createPlaylistRequest = [ { name: 'playlistOwnerId', type: 'uint' }, { name: 'playlistName', type: 'string' }, { name: 'isPrivate', type: 'bool' }, @@ -170,83 +143,83 @@ const createPlaylistRequest = [ { name: 'nonce', type: 'bytes32' } ] -const deletePlaylistRequest = [ +schemas.deletePlaylistRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] -const addPlaylistTrackRequest = [ +schemas.addPlaylistTrackRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'addedTrackId', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] -const deletePlaylistTrackRequest = [ +schemas.deletePlaylistTrackRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'deletedTrackId', type: 'uint' }, { name: 'deletedTrackTimestamp', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] -const orderPlaylistTracksRequest = [ +schemas.orderPlaylistTracksRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'trackIdsHash', type: 'bytes32' }, { name: 'nonce', type: 'bytes32' } ] -const updatePlaylistPrivacyRequest = [ +schemas.updatePlaylistPrivacyRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'updatedPlaylistPrivacy', type: 'bool' }, { name: 'nonce', type: 'bytes32' } ] -const updatePlaylistNameRequest = [ +schemas.updatePlaylistNameRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'updatedPlaylistName', type: 'string' }, { name: 'nonce', type: 'bytes32' } ] -const updatePlaylistCoverPhotoRequest = [ +schemas.updatePlaylistCoverPhotoRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'playlistImageMultihashDigest', type: 'bytes32' }, { name: 'nonce', type: 'bytes32' } ] -const updatePlaylistDescriptionRequest = [ +schemas.updatePlaylistDescriptionRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'playlistDescription', type: 'string' }, { name: 'nonce', type: 'bytes32' } ] -const updatePlaylistUPCRequest = [ +schemas.updatePlaylistUPCRequest = [ { name: 'playlistId', type: 'uint' }, { name: 'playlistUPC', type: 'bytes32' }, { name: 'nonce', type: 'bytes32' } ] -const trackSaveRequest = [ +schemas.trackSaveRequest = [ { name: 'userId', type: 'uint' }, { name: 'trackId', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] -const deleteTrackSaveRequest = trackSaveRequest +schemas.deleteTrackSaveRequest = schemas.trackSaveRequest -const playlistSaveRequest = [ +schemas.playlistSaveRequest = [ { name: 'userId', type: 'uint' }, { name: 'playlistId', type: 'uint' }, { name: 'nonce', type: 'bytes32' } ] -const deletePlaylistSaveRequest = playlistSaveRequest +schemas.deletePlaylistSaveRequest = schemas.playlistSaveRequest -const addIPLDBlacklist = [ +schemas.addIPLDBlacklist = [ { name: 'multihashDigest', type: 'bytes32' }, { name: 'nonce', type: 'bytes32' } ] // User replica set manager schemas -const proposeAddOrUpdateContentNode = [ +schemas.proposeAddOrUpdateContentNode = [ { name: 'cnodeSpId', type: 'uint' }, { name: 'cnodeDelegateOwnerWallet', type: 'address' }, { name: 'cnodeOwnerWallet', type: 'address' }, @@ -254,7 +227,7 @@ const proposeAddOrUpdateContentNode = [ { name: 'nonce', type: 'bytes32' } ] -const updateReplicaSet = [ +schemas.updateReplicaSet = [ { name: 'userId', type: 'uint' }, { name: 'primaryId', type: 'uint' }, { name: 'secondaryIdsHash', type: 'bytes32' }, @@ -263,62 +236,27 @@ const updateReplicaSet = [ { name: 'nonce', type: 'bytes32' } ] -const manageEntity = [ +schemas.manageUser = [ { name: 'userId', type: 'uint' }, - { name: 'entityType', type: 'string' }, - { name: 'entityId', type: 'uint' }, { name: 'action', type: 'string' }, - { name: 'metadata', type: 'string' }, - { name: 'nonce', type: 'bytes32' } + { name: 'metadata', type: 'string'}, + { name: 'nonce', type: 'bytes32'} ] -export const schemas = { - domain, - addUserRequest, - updateUserBytes32, - updateUserString, - updateUserBool, - addTrackRequest, - updateTrackRequest, - deleteTrackRequest, - addTrackRepostRequest, - deleteTrackRepostRequest, - addPlaylistRepostRequest, - deletePlaylistRepostRequest, - userFollowRequest, - deleteUserFollowRequest, - createPlaylistRequest, - deletePlaylistRequest, - addPlaylistTrackRequest, - deletePlaylistTrackRequest, - orderPlaylistTracksRequest, - updatePlaylistPrivacyRequest, - updatePlaylistNameRequest, - updatePlaylistCoverPhotoRequest, - updatePlaylistDescriptionRequest, - updatePlaylistUPCRequest, - trackSaveRequest, - deleteTrackSaveRequest, - playlistSaveRequest, - deletePlaylistSaveRequest, - addIPLDBlacklist, - proposeAddOrUpdateContentNode, - updateReplicaSet, - manageEntity -} +schemas.manageEntity = [ + { name: 'userId', type: 'uint'}, + { name: 'entityType', type: 'string'}, + { name: 'entityId', type: 'uint'}, + { name: 'action', type: 'string'}, + { name: 'metadata', type: 'string'}, + { name: 'nonce', type: 'bytes32'}, +] -type MessageSchema = readonly EIP712TypeProperty[] +const generators = {} -function getRequestData( - domainDataFn: DomainFn, - chainId: number, - contractAddress: string, - messageTypeName: string, - messageSchema: MessageSchema, - message: EIP712Message -): EIP712TypedData { +function getRequestData (domainDataFn, chainId, contractAddress, messageTypeName, messageSchema, message) { const domainData = domainDataFn(chainId, contractAddress) - const types: EIP712Types = { + const types = { EIP712Domain: schemas.domain } types[messageTypeName] = messageSchema @@ -331,12 +269,7 @@ function getRequestData( } /* User Factory Generators */ -const getAddUserRequestData = ( - chainId: number, - contractAddress: string, - handle: string, - nonce: string -) => { +generators.getAddUserRequestData = function (chainId, contractAddress, handle, nonce) { const message = { handle: handle, nonce: nonce @@ -351,15 +284,7 @@ const getAddUserRequestData = ( ) } -function _getUpdateUserRequestData( - chainId: number, - contractAddress: string, - messageTypeName: string, - schema: MessageSchema, - userId: number, - newValue: unknown, - nonce: string -) { +function _getUpdateUserRequestData (chainId, contractAddress, messageTypeName, schema, userId, newValue, nonce) { const message = { userId: userId, newValue: newValue, @@ -375,21 +300,7 @@ function _getUpdateUserRequestData( ) } -export type UserUpdateRequestFn = ( - chainId: number, - contactAddress: string, - userId: number, - newValue: unknown, - nonce: string -) => EIP712TypedData - -const getUpdateUserMultihashRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserMultihashRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -401,13 +312,7 @@ const getUpdateUserMultihashRequestData: UserUpdateRequestFn = ( ) } -const getUpdateUserNameRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserNameRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -419,13 +324,7 @@ const getUpdateUserNameRequestData: UserUpdateRequestFn = ( ) } -const getUpdateUserLocationRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserLocationRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -437,13 +336,7 @@ const getUpdateUserLocationRequestData: UserUpdateRequestFn = ( ) } -const getUpdateUserProfilePhotoRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserProfilePhotoRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -455,13 +348,7 @@ const getUpdateUserProfilePhotoRequestData: UserUpdateRequestFn = ( ) } -const getUpdateUserCoverPhotoRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserCoverPhotoRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -473,13 +360,7 @@ const getUpdateUserCoverPhotoRequestData: UserUpdateRequestFn = ( ) } -const getUpdateUserBioRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserBioRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -491,13 +372,7 @@ const getUpdateUserBioRequestData: UserUpdateRequestFn = ( ) } -const getUpdateUserCreatorNodeRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserCreatorNodeRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -509,13 +384,7 @@ const getUpdateUserCreatorNodeRequestData: UserUpdateRequestFn = ( ) } -const getUpdateUserCreatorRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserCreatorRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -527,13 +396,7 @@ const getUpdateUserCreatorRequestData: UserUpdateRequestFn = ( ) } -const getUpdateUserVerifiedRequestData: UserUpdateRequestFn = ( - chainId, - contractAddress, - userId, - newValue, - nonce -) => { +generators.getUpdateUserVerifiedRequestData = function (chainId, contractAddress, userId, newValue, nonce) { return _getUpdateUserRequestData( chainId, contractAddress, @@ -546,15 +409,7 @@ const getUpdateUserVerifiedRequestData: UserUpdateRequestFn = ( } /* Track Factory Generators */ -const getAddTrackRequestData = ( - chainId: number, - contractAddress: string, - trackOwnerId: number, - multihashDigest: string, - multihashHashFn: number, - multihashSize: number, - nonce: string -) => { +generators.getAddTrackRequestData = function (chainId, contractAddress, trackOwnerId, multihashDigest, multihashHashFn, multihashSize, nonce) { const message = { trackOwnerId: trackOwnerId, multihashDigest: multihashDigest, @@ -572,16 +427,7 @@ const getAddTrackRequestData = ( ) } -const getUpdateTrackRequestData = ( - chainId: number, - contractAddress: string, - trackId: number, - trackOwnerId: number, - multihashDigest: string, - multihashHashFn: number, - multihashSize: number, - nonce: string -) => { +generators.getUpdateTrackRequestData = function (chainId, contractAddress, trackId, trackOwnerId, multihashDigest, multihashHashFn, multihashSize, nonce) { const message = { trackId: trackId, trackOwnerId: trackOwnerId, @@ -600,12 +446,7 @@ const getUpdateTrackRequestData = ( ) } -const getDeleteTrackRequestData = ( - chainId: number, - contractAddress: string, - trackId: number, - nonce: string -) => { +generators.getDeleteTrackRequestData = function (chainId, contractAddress, trackId, nonce) { const message = { trackId: trackId, nonce: nonce @@ -621,13 +462,7 @@ const getDeleteTrackRequestData = ( } /* Social Feature Factory Generators */ -const getAddTrackRepostRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - trackId: number, - nonce: string -) => { +generators.getAddTrackRepostRequestData = function (chainId, contractAddress, userId, trackId, nonce) { const message = { userId: userId, trackId: trackId, @@ -643,13 +478,7 @@ const getAddTrackRepostRequestData = ( ) } -const getDeleteTrackRepostRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - trackId: number, - nonce: string -) => { +generators.getDeleteTrackRepostRequestData = function (chainId, contractAddress, userId, trackId, nonce) { const message = { userId: userId, trackId: trackId, @@ -665,13 +494,7 @@ const getDeleteTrackRepostRequestData = ( ) } -const getAddPlaylistRepostRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - playlistId: number, - nonce: string -) => { +generators.getAddPlaylistRepostRequestData = function (chainId, contractAddress, userId, playlistId, nonce) { const message = { userId: userId, playlistId: playlistId, @@ -687,13 +510,7 @@ const getAddPlaylistRepostRequestData = ( ) } -const getDeletePlaylistRepostRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - playlistId: number, - nonce: string -) => { +generators.getDeletePlaylistRepostRequestData = function (chainId, contractAddress, userId, playlistId, nonce) { const message = { userId: userId, playlistId: playlistId, @@ -709,13 +526,7 @@ const getDeletePlaylistRepostRequestData = ( ) } -const getUserFollowRequestData = ( - chainId: number, - contractAddress: string, - followerUserId: number, - followeeUserId: number, - nonce: string -) => { +generators.getUserFollowRequestData = function (chainId, contractAddress, followerUserId, followeeUserId, nonce) { const message = { followerUserId: followerUserId, followeeUserId: followeeUserId, @@ -731,13 +542,7 @@ const getUserFollowRequestData = ( ) } -const getDeleteUserFollowRequestData = ( - chainId: number, - contractAddress: string, - followerUserId: number, - followeeUserId: number, - nonce: string -) => { +generators.getDeleteUserFollowRequestData = function (chainId, contractAddress, followerUserId, followeeUserId, nonce) { const message = { followerUserId: followerUserId, followeeUserId: followeeUserId, @@ -753,13 +558,7 @@ const getDeleteUserFollowRequestData = ( ) } -const getTrackSaveRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - trackId: number, - nonce: string -) => { +generators.getTrackSaveRequestData = function (chainId, contractAddress, userId, trackId, nonce) { const message = { userId: userId, trackId: trackId, @@ -776,13 +575,7 @@ const getTrackSaveRequestData = ( ) } -const getDeleteTrackSaveRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - trackId: number, - nonce: string -) => { +generators.getDeleteTrackSaveRequestData = function (chainId, contractAddress, userId, trackId, nonce) { const message = { userId: userId, trackId: trackId, @@ -799,13 +592,7 @@ const getDeleteTrackSaveRequestData = ( ) } -const getPlaylistSaveRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - playlistId: number, - nonce: string -) => { +generators.getPlaylistSaveRequestData = function (chainId, contractAddress, userId, playlistId, nonce) { const message = { userId: userId, playlistId: playlistId, @@ -822,13 +609,7 @@ const getPlaylistSaveRequestData = ( ) } -const getDeletePlaylistSaveRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - playlistId: number, - nonce: string -) => { +generators.getDeletePlaylistSaveRequestData = function (chainId, contractAddress, userId, playlistId, nonce) { const message = { userId: userId, playlistId: playlistId, @@ -850,16 +631,7 @@ const getDeletePlaylistSaveRequestData = ( /* NOTE: Ensure the value for trackIds hash is generated using the following snippet prior to calling this generator function: * web3New.utils.soliditySha3(web3New.eth.abi.encodeParameter('uint[]', trackIds)) */ -const getCreatePlaylistRequestData = ( - chainId: number, - contractAddress: string, - playlistOwnerId: number, - playlistName: string, - isPrivate: boolean, - isAlbum: boolean, - trackIdsHash: string | null, - nonce: string -) => { +generators.getCreatePlaylistRequestData = function (chainId, contractAddress, playlistOwnerId, playlistName, isPrivate, isAlbum, trackIdsHash, nonce) { const message = { playlistOwnerId: playlistOwnerId, playlistName: playlistName, @@ -879,12 +651,7 @@ const getCreatePlaylistRequestData = ( ) } -const getDeletePlaylistRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - nonce: string -) => { +generators.getDeletePlaylistRequestData = function (chainId, contractAddress, playlistId, nonce) { const message = { playlistId: playlistId, nonce: nonce @@ -899,13 +666,7 @@ const getDeletePlaylistRequestData = ( ) } -const getAddPlaylistTrackRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - addedTrackId: number, - nonce: string -) => { +generators.getAddPlaylistTrackRequestData = function (chainId, contractAddress, playlistId, addedTrackId, nonce) { const message = { playlistId: playlistId, addedTrackId: addedTrackId, @@ -922,14 +683,7 @@ const getAddPlaylistTrackRequestData = ( ) } -const getDeletePlaylistTrackRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - deletedTrackId: number, - deletedTrackTimestamp: number, - nonce: string -) => { +generators.getDeletePlaylistTrackRequestData = function (chainId, contractAddress, playlistId, deletedTrackId, deletedTrackTimestamp, nonce) { const message = { playlistId: playlistId, deletedTrackId: deletedTrackId, @@ -947,13 +701,7 @@ const getDeletePlaylistTrackRequestData = ( ) } -const getOrderPlaylistTracksRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - trackIdsHash: string | null, - nonce: string -) => { +generators.getOrderPlaylistTracksRequestData = function (chainId, contractAddress, playlistId, trackIdsHash, nonce) { const message = { playlistId: playlistId, trackIdsHash: trackIdsHash, @@ -970,13 +718,7 @@ const getOrderPlaylistTracksRequestData = ( ) } -const getUpdatePlaylistNameRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - updatedPlaylistName: string, - nonce: string -) => { +generators.getUpdatePlaylistNameRequestData = function (chainId, contractAddress, playlistId, updatedPlaylistName, nonce) { const message = { playlistId: playlistId, updatedPlaylistName: updatedPlaylistName, @@ -993,13 +735,7 @@ const getUpdatePlaylistNameRequestData = ( ) } -const getUpdatePlaylistPrivacyRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - updatedPlaylistPrivacy: boolean, - nonce: string -) => { +generators.getUpdatePlaylistPrivacyRequestData = function (chainId, contractAddress, playlistId, updatedPlaylistPrivacy, nonce) { const message = { playlistId: playlistId, updatedPlaylistPrivacy: updatedPlaylistPrivacy, @@ -1016,13 +752,7 @@ const getUpdatePlaylistPrivacyRequestData = ( ) } -const getUpdatePlaylistCoverPhotoRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - playlistImageMultihashDigest: string, - nonce: string -) => { +generators.getUpdatePlaylistCoverPhotoRequestData = function (chainId, contractAddress, playlistId, playlistImageMultihashDigest, nonce) { const message = { playlistId: playlistId, playlistImageMultihashDigest: playlistImageMultihashDigest, @@ -1035,17 +765,10 @@ const getUpdatePlaylistCoverPhotoRequestData = ( contractAddress, 'UpdatePlaylistCoverPhotoRequest', schemas.updatePlaylistCoverPhotoRequest, - message - ) + message) } -const getUpdatePlaylistUPCRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - playlistUPC: string, - nonce: string -) => { +generators.getUpdatePlaylistUPCRequestData = function (chainId, contractAddress, playlistId, playlistUPC, nonce) { const message = { playlistId: playlistId, playlistUPC: playlistUPC, @@ -1058,17 +781,10 @@ const getUpdatePlaylistUPCRequestData = ( contractAddress, 'UpdatePlaylistUPCRequest', schemas.updatePlaylistUPCRequest, - message - ) + message) } -const getUpdatePlaylistDescriptionRequestData = ( - chainId: number, - contractAddress: string, - playlistId: number, - playlistDescription: string, - nonce: string -) => { +generators.getUpdatePlaylistDescriptionRequestData = function (chainId, contractAddress, playlistId, playlistDescription, nonce) { const message = { playlistId: playlistId, playlistDescription: playlistDescription, @@ -1081,16 +797,10 @@ const getUpdatePlaylistDescriptionRequestData = ( contractAddress, 'UpdatePlaylistDescriptionRequest', schemas.updatePlaylistDescriptionRequest, - message - ) + message) } -const addIPLDToBlacklistRequestData = ( - chainId: number, - contractAddress: string, - multihashDigest: string, - nonce: string -) => { +generators.addIPLDToBlacklistRequestData = function (chainId, contractAddress, multihashDigest, nonce) { const message = { multihashDigest: multihashDigest, nonce: nonce @@ -1106,15 +816,15 @@ const addIPLDToBlacklistRequestData = ( } /* User Replica Set Manager Generators */ -const getProposeAddOrUpdateContentNodeRequestData = ( - chainId: number, - contractAddress: string, - cnodeSpId: number, - cnodeDelegateOwnerWallet: string, - cnodeOwnerWallet: string, - proposerSpId: number, - nonce: string -) => { +generators.getProposeAddOrUpdateContentNodeRequestData = function ( + chainId, + contractAddress, + cnodeSpId, + cnodeDelegateOwnerWallet, + cnodeOwnerWallet, + proposerSpId, + nonce +) { const message = { cnodeSpId, cnodeDelegateOwnerWallet, @@ -1132,16 +842,16 @@ const getProposeAddOrUpdateContentNodeRequestData = ( ) } -const getUpdateReplicaSetRequestData = ( - chainId: number, - contractAddress: string, - userId: number, - primaryId: number, - secondaryIdsHash: string | null, - oldPrimaryId: number, - oldSecondaryIdsHash: string | null, - nonce: string -) => { +generators.getUpdateReplicaSetRequestData = function ( + chainId, + contractAddress, + userId, + primaryId, + secondaryIdsHash, + oldPrimaryId, + oldSecondaryIdsHash, + nonce +) { const message = { userId, primaryId, @@ -1160,16 +870,40 @@ const getUpdateReplicaSetRequestData = ( ) } -const getManageEntityData = ( - chainId: number, - contractAddress: string, - userId: number, - entityType: string, - entityId: number, - action: string, - metadata: string, - nonce: string -) => { +generators.getManageUserData = function ( + chainId, + contractAddress, + userId, + action, + metadata, + nonce +) { + const message = { + userId, + action, + metadata, + nonce, + } + return getRequestData( + domains.getEntityManagerDomain, + chainId, + contractAddress, + 'ManageUser', + schemas.manageUser, + message + ) +} + +generators.getManageEntityData = function( + chainId, + contractAddress, + userId, + entityType, + entityId, + action, + metadata, + nonce +) { const message = { userId, entityType, @@ -1188,71 +922,29 @@ const getManageEntityData = ( ) } -export const generators = { - getUpdateUserMultihashRequestData, - getAddUserRequestData, - getUpdateUserNameRequestData, - getUpdateUserLocationRequestData, - getUpdateUserProfilePhotoRequestData, - getUpdateUserCoverPhotoRequestData, - getUpdateUserBioRequestData, - getUpdateUserCreatorNodeRequestData, - getUpdateUserCreatorRequestData, - getUpdateUserVerifiedRequestData, - getAddTrackRequestData, - getUpdateTrackRequestData, - getDeleteTrackRequestData, - getAddTrackRepostRequestData, - getDeleteTrackRepostRequestData, - getAddPlaylistRepostRequestData, - getDeletePlaylistRepostRequestData, - getUserFollowRequestData, - getDeleteUserFollowRequestData, - getTrackSaveRequestData, - getDeleteTrackSaveRequestData, - getPlaylistSaveRequestData, - getDeletePlaylistSaveRequestData, - getCreatePlaylistRequestData, - getDeletePlaylistRequestData, - getAddPlaylistTrackRequestData, - getDeletePlaylistTrackRequestData, - getOrderPlaylistTracksRequestData, - getUpdatePlaylistNameRequestData, - getUpdatePlaylistPrivacyRequestData, - getUpdatePlaylistCoverPhotoRequestData, - getUpdatePlaylistUPCRequestData, - getUpdatePlaylistDescriptionRequestData, - addIPLDToBlacklistRequestData, - getProposeAddOrUpdateContentNodeRequestData, - getUpdateReplicaSetRequestData, - getManageEntityData -} - -type NodeCrypto = { randomBytes: (size: number) => Buffer } - /** Return a secure random hex string of nChar length in a browser-compatible way * Taken from https://stackoverflow.com/questions/37378237/how-to-generate-a-random-token-of-32-bit-in-javascript */ -function browserRandomHash(nChar: number) { +function browserRandomHash (nChar) { // convert number of characters to number of bytes - const nBytes = Math.ceil((nChar = (+nChar || 8) / 2)) + var nBytes = Math.ceil(nChar = (+nChar || 8) / 2) // create a typed array of that many bytes - const u = new Uint8Array(nBytes) + var u = new Uint8Array(nBytes) // populate it wit crypto-random values window.crypto.getRandomValues(u) // convert it to an Array of Strings (e.g. '01', 'AF', ..) - const zpad = function (str: string) { + var zpad = function (str) { return '00'.slice(str.length) + str } - const a = Array.prototype.map.call(u, function (x) { + var a = Array.prototype.map.call(u, function (x) { return zpad(x.toString(16)) }) // Array of String to String - let str = a.join('').toLowerCase() + var str = a.join('').toLowerCase() // and snip off the excess digit if we want an odd number if (nChar % 2) str = str.slice(1) @@ -1262,19 +954,20 @@ function browserRandomHash(nChar: number) { // We need to detect whether the nodejs crypto module is available to determine how to // generate secure random numbers below -let nodeCrypto: NodeCrypto | null - +let nodeCrypto try { nodeCrypto = require('crypto') } catch (e) { nodeCrypto = null } -export function getNonce() { +function getNonce () { // detect whether we are in browser or in nodejs, and use the correct csprng if (typeof window === 'undefined' || window === null) { - return '0x' + (nodeCrypto as NodeCrypto).randomBytes(32).toString('hex') + return '0x' + nodeCrypto.randomBytes(32).toString('hex') } else { return '0x' + browserRandomHash(64) } } + +module.exports = { domains, schemas, generators, getNonce } diff --git a/libs/src/sdk/api/generated/default/.openapi-generator/FILES b/libs/src/sdk/api/generated/default/.openapi-generator/FILES index 0c1544ea690..125225be896 100644 --- a/libs/src/sdk/api/generated/default/.openapi-generator/FILES +++ b/libs/src/sdk/api/generated/default/.openapi-generator/FILES @@ -44,8 +44,10 @@ models/TracksResponse.ts models/TrendingPlaylistsResponse.ts models/User.ts models/UserAssociatedWalletResponse.ts +models/UserReplicaSet.ts models/UserResponse.ts models/UserSearch.ts +models/UsersByContentNode.ts models/VerifyToken.ts models/VersionMetadata.ts models/index.ts diff --git a/libs/src/sdk/api/generated/default/apis/UsersApi.ts b/libs/src/sdk/api/generated/default/apis/UsersApi.ts index 33ee44e1e82..55ae6c224af 100644 --- a/libs/src/sdk/api/generated/default/apis/UsersApi.ts +++ b/libs/src/sdk/api/generated/default/apis/UsersApi.ts @@ -67,6 +67,9 @@ import { UserSearch, UserSearchFromJSON, UserSearchToJSON, + UsersByContentNode, + UsersByContentNodeFromJSON, + UsersByContentNodeToJSON, VerifyToken, VerifyTokenFromJSON, VerifyTokenToJSON, @@ -377,6 +380,13 @@ export interface GetUserIDFromWalletRequest { associatedWallet: string; } +export interface GetUserReplicaSetRequest { + /** + * A User ID + */ + id: string; +} + export interface GetUsersTrackHistoryRequest { /** * A User ID @@ -984,6 +994,26 @@ export class UsersApi extends runtime.BaseAPI { }) as Promise>; } + /** + * Gets the user\'s replica set + */ + async getUserReplicaSet(requestParameters: GetUserReplicaSetRequest): Promise> { + if (requestParameters.id === null || requestParameters.id === undefined) { + throw new runtime.RequiredError('id','Required parameter requestParameters.id was null or undefined when calling getUserReplicaSet.'); + } + + const queryParameters: any = {}; + + const headerParameters: runtime.HTTPHeaders = {}; + + return this.request({ + path: `/users/{id}/replica_set`.replace(`{${"id"}}`, encodeURIComponent(String(requestParameters.id))), + method: 'GET', + headers: headerParameters, + query: queryParameters, + }) as Promise>; + } + /** * Get the tracks the user recently listened to. */ diff --git a/libs/src/sdk/api/generated/default/models/UserReplicaSet.ts b/libs/src/sdk/api/generated/default/models/UserReplicaSet.ts new file mode 100644 index 00000000000..cfb94a9529d --- /dev/null +++ b/libs/src/sdk/api/generated/default/models/UserReplicaSet.ts @@ -0,0 +1,71 @@ +// @ts-nocheck +/* tslint:disable */ +/* eslint-disable */ +/** + * API + * Audius V1 API + * + * The version of the OpenAPI document: 1.0 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +/** + * + * @export + * @interface UserReplicaSet + */ +export interface UserReplicaSet { + /** + * + * @type {number} + * @memberof UserReplicaSet + */ + user_id: number; + /** + * + * @type {string} + * @memberof UserReplicaSet + */ + wallet: string; + /** + * + * @type {string} + * @memberof UserReplicaSet + */ + primary?: string; + /** + * + * @type {string} + * @memberof UserReplicaSet + */ + secondary1?: string; + /** + * + * @type {string} + * @memberof UserReplicaSet + */ + secondary2?: string; + /** + * + * @type {number} + * @memberof UserReplicaSet + */ + primarySpID?: number; + /** + * + * @type {number} + * @memberof UserReplicaSet + */ + secondary1SpID?: number; + /** + * + * @type {number} + * @memberof UserReplicaSet + */ + secondary2SpID?: number; +} + diff --git a/libs/src/sdk/api/generated/default/models/UsersByContentNode.ts b/libs/src/sdk/api/generated/default/models/UsersByContentNode.ts new file mode 100644 index 00000000000..3264db89919 --- /dev/null +++ b/libs/src/sdk/api/generated/default/models/UsersByContentNode.ts @@ -0,0 +1,36 @@ +// @ts-nocheck +/* tslint:disable */ +/* eslint-disable */ +/** + * API + * Audius V1 API + * + * The version of the OpenAPI document: 1.0 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +import { + UserReplicaSet, + UserReplicaSetFromJSON, + UserReplicaSetFromJSONTyped, + UserReplicaSetToJSON, +} from './UserReplicaSet'; + +/** + * + * @export + * @interface UsersByContentNode + */ +export interface UsersByContentNode { + /** + * + * @type {UserReplicaSet} + * @memberof UsersByContentNode + */ + data?: UserReplicaSet; +} + diff --git a/libs/src/sdk/api/generated/default/models/index.ts b/libs/src/sdk/api/generated/default/models/index.ts index 09c6d9625c1..f0953aa9993 100644 --- a/libs/src/sdk/api/generated/default/models/index.ts +++ b/libs/src/sdk/api/generated/default/models/index.ts @@ -39,7 +39,9 @@ export * from './TracksResponse'; export * from './TrendingPlaylistsResponse'; export * from './User'; export * from './UserAssociatedWalletResponse'; +export * from './UserReplicaSet'; export * from './UserResponse'; export * from './UserSearch'; +export * from './UsersByContentNode'; export * from './VerifyToken'; export * from './VersionMetadata'; diff --git a/libs/src/sdk/api/generated/full/.openapi-generator/FILES b/libs/src/sdk/api/generated/full/.openapi-generator/FILES index 0592f4a878c..5ace9804088 100644 --- a/libs/src/sdk/api/generated/full/.openapi-generator/FILES +++ b/libs/src/sdk/api/generated/full/.openapi-generator/FILES @@ -63,6 +63,8 @@ models/TrackSegment.ts models/TrendingIdsResponse.ts models/TrendingTimesIds.ts models/UserFull.ts +models/UserReplicaSet.ts +models/UsersByContentNode.ts models/VersionMetadata.ts models/index.ts runtime.ts diff --git a/libs/src/sdk/api/generated/full/apis/UsersApi.ts b/libs/src/sdk/api/generated/full/apis/UsersApi.ts index 0a525654c0a..510f98d9725 100644 --- a/libs/src/sdk/api/generated/full/apis/UsersApi.ts +++ b/libs/src/sdk/api/generated/full/apis/UsersApi.ts @@ -55,6 +55,9 @@ import { TopUsersResponseFull, TopUsersResponseFullFromJSON, TopUsersResponseFullToJSON, + UsersByContentNode, + UsersByContentNodeFromJSON, + UsersByContentNodeToJSON, } from '../models'; export interface GetFavoritesRequest { @@ -369,6 +372,17 @@ export interface GetUserByHandleRequest { userId?: string; } +export interface GetUserReplicaSetRequest { + /** + * A User ID + */ + id: string; + /** + * The user ID of the user making the request + */ + userId?: string; +} + export interface GetUsersTrackHistoryRequest { /** * A User ID @@ -925,6 +939,30 @@ export class UsersApi extends runtime.BaseAPI { }) as Promise>; } + /** + * Gets the user\'s replica set + */ + async getUserReplicaSet(requestParameters: GetUserReplicaSetRequest): Promise> { + if (requestParameters.id === null || requestParameters.id === undefined) { + throw new runtime.RequiredError('id','Required parameter requestParameters.id was null or undefined when calling getUserReplicaSet.'); + } + + const queryParameters: any = {}; + + if (requestParameters.userId !== undefined) { + queryParameters['user_id'] = requestParameters.userId; + } + + const headerParameters: runtime.HTTPHeaders = {}; + + return this.request({ + path: `/users/{id}/replica_set`.replace(`{${"id"}}`, encodeURIComponent(String(requestParameters.id))), + method: 'GET', + headers: headerParameters, + query: queryParameters, + }) as Promise>; + } + /** * Get the tracks the user recently listened to. */ diff --git a/libs/src/sdk/api/generated/full/models/UserReplicaSet.ts b/libs/src/sdk/api/generated/full/models/UserReplicaSet.ts new file mode 100644 index 00000000000..eab3f0acffa --- /dev/null +++ b/libs/src/sdk/api/generated/full/models/UserReplicaSet.ts @@ -0,0 +1,71 @@ +// @ts-nocheck +/* tslint:disable */ +/* eslint-disable */ +/** + * API + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: 1.0 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +/** + * + * @export + * @interface UserReplicaSet + */ +export interface UserReplicaSet { + /** + * + * @type {number} + * @memberof UserReplicaSet + */ + user_id: number; + /** + * + * @type {string} + * @memberof UserReplicaSet + */ + wallet: string; + /** + * + * @type {string} + * @memberof UserReplicaSet + */ + primary?: string; + /** + * + * @type {string} + * @memberof UserReplicaSet + */ + secondary1?: string; + /** + * + * @type {string} + * @memberof UserReplicaSet + */ + secondary2?: string; + /** + * + * @type {number} + * @memberof UserReplicaSet + */ + primarySpID?: number; + /** + * + * @type {number} + * @memberof UserReplicaSet + */ + secondary1SpID?: number; + /** + * + * @type {number} + * @memberof UserReplicaSet + */ + secondary2SpID?: number; +} + diff --git a/libs/src/sdk/api/generated/full/models/UsersByContentNode.ts b/libs/src/sdk/api/generated/full/models/UsersByContentNode.ts new file mode 100644 index 00000000000..db97aa14dd9 --- /dev/null +++ b/libs/src/sdk/api/generated/full/models/UsersByContentNode.ts @@ -0,0 +1,84 @@ +// @ts-nocheck +/* tslint:disable */ +/* eslint-disable */ +/** + * API + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: 1.0 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +import { + UserReplicaSet, + UserReplicaSetFromJSON, + UserReplicaSetFromJSONTyped, + UserReplicaSetToJSON, +} from './UserReplicaSet'; +import { + VersionMetadata, + VersionMetadataFromJSON, + VersionMetadataFromJSONTyped, + VersionMetadataToJSON, +} from './VersionMetadata'; + +/** + * + * @export + * @interface UsersByContentNode + */ +export interface UsersByContentNode { + /** + * + * @type {number} + * @memberof UsersByContentNode + */ + latest_chain_block: number; + /** + * + * @type {number} + * @memberof UsersByContentNode + */ + latest_indexed_block: number; + /** + * + * @type {number} + * @memberof UsersByContentNode + */ + latest_chain_slot_plays: number; + /** + * + * @type {number} + * @memberof UsersByContentNode + */ + latest_indexed_slot_plays: number; + /** + * + * @type {string} + * @memberof UsersByContentNode + */ + signature: string; + /** + * + * @type {string} + * @memberof UsersByContentNode + */ + timestamp: string; + /** + * + * @type {VersionMetadata} + * @memberof UsersByContentNode + */ + version: VersionMetadata; + /** + * + * @type {UserReplicaSet} + * @memberof UsersByContentNode + */ + data?: UserReplicaSet; +} + diff --git a/libs/src/sdk/api/generated/full/models/index.ts b/libs/src/sdk/api/generated/full/models/index.ts index 4559a7bb14d..cbc2bf49e6d 100644 --- a/libs/src/sdk/api/generated/full/models/index.ts +++ b/libs/src/sdk/api/generated/full/models/index.ts @@ -57,4 +57,6 @@ export * from './TrackSegment'; export * from './TrendingIdsResponse'; export * from './TrendingTimesIds'; export * from './UserFull'; +export * from './UserReplicaSet'; +export * from './UsersByContentNode'; export * from './VersionMetadata'; diff --git a/libs/src/services/dataContracts/EntityManagerClient.ts b/libs/src/services/dataContracts/EntityManagerClient.ts index 7c7f1ea1f45..d3a5b7a55af 100644 --- a/libs/src/services/dataContracts/EntityManagerClient.ts +++ b/libs/src/services/dataContracts/EntityManagerClient.ts @@ -13,7 +13,8 @@ export enum Action { export enum EntityType { PLAYLIST = 'Playlist', TRACK = 'Track', - USER = 'User' + USER = 'User', + USER_REPLICA_SET = 'UserReplicaSet' } /** diff --git a/libs/src/services/discoveryProvider/DiscoveryProvider.ts b/libs/src/services/discoveryProvider/DiscoveryProvider.ts index 7874e91503a..f85ba733137 100644 --- a/libs/src/services/discoveryProvider/DiscoveryProvider.ts +++ b/libs/src/services/discoveryProvider/DiscoveryProvider.ts @@ -1035,12 +1035,15 @@ export class DiscoveryProvider { * } * @param retry whether to retry on failure * @param attemptedRetries number of attempted retries (stops retrying at max) + * @param throwError whether to throw error on error performing request or null + * @param blockNumber If provided, throws an error if the discovery node has not yet indexed this block */ async _makeRequest( requestObj: Record, retry = true, attemptedRetries = 0, - throwError = false + throwError = false, + blockNumber?: number ): Promise { const returnOrThrow = (e: ErrorType) => { if (throwError) { @@ -1126,6 +1129,11 @@ export class DiscoveryProvider { this.ethContracts && !this.ethContracts.isInRegressedMode() const blockDiff = await this._getBlocksBehind(parsedResponse) + if (blockNumber && parsedResponse.latest_indexed_block < blockNumber) { + throw new Error( + `Requested blocknumber ${blockNumber}, but discovery is behind at ${parsedResponse.latest_indexed_block}` + ) + } if (notInRegressedMode && blockDiff) { const errorMessage = `${this.discoveryProviderEndpoint} is too far behind [block diff: ${blockDiff}]` if (retry) { @@ -1166,6 +1174,30 @@ export class DiscoveryProvider { return parsedResponse.data } + /** + * Retrieves the user's replica se + * @param params.encodedUserId string of the encoded user id + * @param params.blocNumber optional integer pass to wait until the discovery node has indexed that block number + * @return object containing the user replica set + */ + async getUserReplicaSet({ + encodedUserId, + blockNumber + }: { + encodedUserId: string + blockNumber?: number + }): Promise { + const req = Requests.getUserReplicaSet(encodedUserId) + + return await this._makeRequest( + req, + true, + 0, + false, + blockNumber + ) + } + /** * Gets the healthy discovery provider endpoint used in creating the axios request later. * If the number of retries is over the max count for retires, clear the cache and reselect diff --git a/libs/src/services/discoveryProvider/requests.ts b/libs/src/services/discoveryProvider/requests.ts index 47b7caeb828..f09248d42fb 100644 --- a/libs/src/services/discoveryProvider/requests.ts +++ b/libs/src/services/discoveryProvider/requests.ts @@ -717,3 +717,10 @@ export const verifyToken = (token: string) => { } } } + +export const getUserReplicaSet = (encodedUserId: string) => { + return { + endpoint: `/v1/full/users/${encodedUserId}/replica_set`, + timeout: 5000 + } +} diff --git a/libs/src/utils/types.ts b/libs/src/utils/types.ts index 532f3a11f23..61526b7c50b 100644 --- a/libs/src/utils/types.ts +++ b/libs/src/utils/types.ts @@ -56,6 +56,8 @@ export type UserMetadata = { metadata_multihash: Nullable has_collectibles: boolean collectiblesOrderUnset?: boolean + primary_id: number + secondary_ids: number[] // Only present on the "current" account does_follow_current_user?: boolean