Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

user and replica set entity manager #3897

Merged
merged 40 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
daa65bc
add user entity manager
Aug 24, 2022
a2ff380
Update discovery user replica route and entity manager
jowlee Sep 20, 2022
abcd2f1
WIP CN check discovery vs chain
jowlee Sep 20, 2022
226f990
Update libs bindings for entity manager
jowlee Sep 20, 2022
f3d146b
lint discovery
jowlee Sep 20, 2022
bb3cf8f
Fix lint
isaacsolo Sep 20, 2022
9f712af
Turn on dev features
jowlee Sep 20, 2022
41d62d6
removediscovery logs
jowlee Sep 20, 2022
6ff89b4
Add wait for discovery indexing to cn replica set update
jowlee Sep 20, 2022
62cbca4
Fix libs type
jowlee Sep 20, 2022
124efa0
Fix lint
jowlee Sep 20, 2022
2cfe98a
fix api unit test
isaacsolo Sep 20, 2022
6df44fd
Update ns docs for api change
jowlee Sep 20, 2022
7d2ad97
Update gen types
jowlee Sep 20, 2022
8a9a2bf
FIx cn tests
jowlee Sep 21, 2022
fc0deac
Update sdk api
jowlee Sep 21, 2022
61b8686
Update user replica endpoint to return empty when not found
jowlee Sep 23, 2022
0007140
Address comments on cn middleware ensure primary
jowlee Sep 23, 2022
c02f193
Update libs to return err on discovery blocknumber makereq
jowlee Sep 23, 2022
bcc5bab
Put content entity manager and discovery validation behind env var
jowlee Sep 23, 2022
8185a26
Fix discovery lint
jowlee Sep 23, 2022
3678b38
Update creator node
jowlee Sep 26, 2022
432502f
Fix discovery indexing
jowlee Sep 26, 2022
f5d6606
fix libs
jowlee Sep 26, 2022
d93c12c
lint fix discovery
jowlee Sep 26, 2022
c4a27e8
add more entity manager flags and fix bugs w updates
isaacsolo Sep 26, 2022
fcc0ade
update entityManagerAddress in CN config and add handle_lc
isaacsolo Sep 26, 2022
4a838ba
fix types
isaacsolo Sep 27, 2022
b30e79b
Update local cn entity manager addr
jowlee Sep 27, 2022
10f976e
Add timeout to get user replica set
jowlee Sep 27, 2022
3b76a13
Update local env vs prod config for entity manager
jowlee Sep 27, 2022
2ddbf89
Address comments
jowlee Sep 28, 2022
db8523f
Merge branch 'master' into jowlee-user-entity-manager
jowlee Sep 28, 2022
47cc666
Fix merge err
jowlee Sep 28, 2022
536941c
Fix libs bug
jowlee Sep 28, 2022
2de10b3
fix libs lint
jowlee Sep 28, 2022
eaec22f
add comment on wait for repica set indexing
jowlee Sep 28, 2022
61f3516
Merge branch 'master' into jowlee-user-entity-manager
jowlee Sep 28, 2022
a032582
lint fix discovery after merge
jowlee Sep 28, 2022
af07a88
Fix metadata change test
jowlee Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions creator-node/compose/env/base.env
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@ discoveryNodeUnhealthyBlockDiff=500
maxBatchClockStatusBatchSize=5

reconfigSPIdBlacklistString=

entityManagerAddress=0x5b9b42d6e4B2e4Bf8d42Eba32D46918e10899B66
jowlee marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 12 additions & 0 deletions creator-node/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ const config = convict({
env: 'dataRegistryAddress',
default: null
},
entityManagerAddress: {
doc: 'entity manager registry address',
format: String,
env: 'entityManagerAddress',
default: '0x2F99338637F027CFB7494E46B49987457beCC6E3'
jowlee marked this conversation as resolved.
Show resolved Hide resolved
},
dataProviderUrl: {
doc: 'data contracts web3 provider url',
format: String,
Expand Down Expand Up @@ -464,6 +470,12 @@ const config = convict({
env: 'considerNodeUnhealthy',
default: false
},
entityManagerReplicaSetEnabled: {
doc: 'whether or not to use entity manager to update the replica set',
format: Boolean,
env: 'entityManagerReplicaSetEnabled',
default: true
},

/** sync / snapback configs */

Expand Down
108 changes: 78 additions & 30 deletions creator-node/src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,6 @@ async function getReplicaSetSpIDs({
const RETRY_TIMEOUT_MS = 1000 // 1 seconds

let errorMsg = null
let blockNumberIndexed = false
for (let retry = 1; retry <= MAX_RETRIES; retry++) {
logger.info(
`${logPrefix} retry #${retry}/${MAX_RETRIES} || time from start: ${
Expand All @@ -761,15 +760,40 @@ async function getReplicaSetSpIDs({
)

try {
// will throw error if blocknumber not found
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSetAtBlockNumber(
userId,
if (config.get('entityManagerReplicaSetEnabled')) {
// will throw error if blocknumber not found
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId,
blockNumber
)
errorMsg = null
blockNumberIndexed = true
break
})

if (spResponse) {
if (spResponse.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [
spResponse.secondary1SpID,
spResponse.secondary2SpID
]
}
errorMsg = null
break
} else {
// The blocknumber was indexed by discovery, but there's still no user replica set returned
errorMsg = `User replica not found in discovery`
break
}
}
} else {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSetAtBlockNumber(
userId,
blockNumber
)
errorMsg = null
break
}
} catch (e) {
errorMsg = e.message
} // Ignore all errors until MAX_RETRIES exceeded
Expand All @@ -779,22 +803,14 @@ async function getReplicaSetSpIDs({

// Error if indexed blockNumber but didn't find any replicaSet for user
if (
blockNumberIndexed &&
(!replicaSet ||
!replicaSet.hasOwnProperty('primaryId') ||
!replicaSet.primaryId)
!replicaSet ||
!replicaSet.hasOwnProperty('primaryId') ||
!replicaSet.primaryId
) {
throw new Error(
`${logPrefix} ERROR || Failed to retrieve user from UserReplicaSetManager after ${MAX_RETRIES} retries. Aborting.`
)
}

// Error if failed to index target blockNumber
if (!blockNumberIndexed) {
throw new Error(
`${logPrefix} ERROR || Web3 provider failed to index target blockNumber ${blockNumber} after ${MAX_RETRIES} retries. Aborting. Error ${errorMsg}`
)
}
} else if (ensurePrimary && selfSpID) {
/**
* If ensurePrimary required but no blockNumber provided, poll URSM until returned primary = selfSpID
Expand All @@ -814,12 +830,30 @@ async function getReplicaSetSpIDs({
)

try {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)

errorMsg = null
if (config.get('entityManagerReplicaSetEnabled')) {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})

if (spResponse && spResponse.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [
spResponse.secondary1SpID,
spResponse.secondary2SpID
]
}
errorMsg = null
} else {
errorMsg = `User replica not found in discovery`
}
} else {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)
}

if (
replicaSet &&
Expand Down Expand Up @@ -863,10 +897,24 @@ async function getReplicaSetSpIDs({

let errorMsg = null
try {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)
if (config.get('entityManagerReplicaSetEnabled')) {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})

if (spResponse && spResponse.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [spResponse.secondary1SpID, spResponse.secondary2SpID]
}
}
} else {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)
}
} catch (e) {
errorMsg = e.message
}
Expand Down
35 changes: 26 additions & 9 deletions creator-node/src/routes/audiusUsers.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const express = require('express')
const fs = require('fs-extra')

const config = require('../config')
const models = require('../models')
const { saveFileFromBufferToDisk } = require('../fileManager')
const {
Expand Down Expand Up @@ -113,16 +114,32 @@ router.post(
// Verify that wallet of the user on the blockchain for the given ID matches the user attempting to update
const serviceRegistry = req.app.get('serviceRegistry')
const { libs } = serviceRegistry
const userResp = await libs.contracts.UserFactoryClient.getUser(
blockchainUserId
)
if (
!userResp?.wallet ||
userResp.wallet.toLowerCase() !== req.session.wallet.toLowerCase()
) {
throw new Error(
`Owner wallet ${userResp.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}`
if (config.get('entityManagerReplicaSetEnabled')) {
const encodedUserId = libs.Utils.encodeHashId(blockchainUserId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId,
blockNumber
})
if (
(spResponse?.wallet ?? '').toLowerCase() !==
req.session.wallet.toLowerCase()
) {
throw new Error(
`Owner wallet ${spResponse?.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}`
)
}
} else {
const userResp = await libs.contracts.UserFactoryClient.getUser(
blockchainUserId
)
if (
!userResp?.wallet ||
userResp.wallet.toLowerCase() !== req.session.wallet.toLowerCase()
) {
throw new Error(
`Owner wallet ${userResp.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}`
)
}
}

const cnodeUserUUID = req.session.cnodeUserUUID
Expand Down
4 changes: 3 additions & 1 deletion creator-node/src/services/initAudiusLibs.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = async ({
const ethRegistryAddress = config.get('ethRegistryAddress')
const ethOwnerWallet = config.get('ethOwnerWallet')
const dataRegistryAddress = config.get('dataRegistryAddress')
const entityManagerAddress = config.get('entityManagerAddress')
const dataProviderUrl = config.get('dataProviderUrl')
const delegatePrivateKey = config.get('delegatePrivateKey')
const oldDelegatePrivateKey = config.get('oldDelegatePrivateKey')
Expand Down Expand Up @@ -73,7 +74,8 @@ module.exports = async ({
// pass as array
[dataProviderUrl],
// TODO - formatting this private key here is not ideal
(oldDelegatePrivateKey || delegatePrivateKey).replace('0x', '')
(oldDelegatePrivateKey || delegatePrivateKey).replace('0x', ''),
entityManagerAddress
)
: null,
discoveryProviderConfig: enableDiscovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ const updateReplicaSetJobProcessor = async function ({
audiusLibs = await initAudiusLibs({
enableEthContracts: true,
enableContracts: true,
enableDiscovery: false,
enableDiscovery: true,
enableIdentity: true,
logger
})
Expand Down Expand Up @@ -695,17 +695,53 @@ const _issueUpdateReplicaSetOp = async (
return response
}

await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet(
userId,
newReplicaSetSPIds[0], // new primary
newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2]
// This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain
oldPrimarySpId || chainPrimarySpId,
[
oldSecondary1SpId || chainSecondarySpIds?.[0],
oldSecondary2SpId || chainSecondarySpIds?.[1]
]
)
if (config.get('entityManagerReplicaSetEnabled')) {
logger.info(
`[_issueUpdateReplicaSetOp] updating replica set now ${
Date.now() - startTimeMs
}ms for userId=${userId} wallet=${wallet}`
)

const { blockNumber } =
await audiusLibs.User.updateEntityManagerReplicaSet({
userId,
primary: newReplicaSetSPIds[0], // new primary
secondaries: newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2]
// This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain
oldPrimary: oldPrimarySpId || chainPrimarySpId,
oldSecondaries: [
oldSecondary1SpId || chainSecondarySpIds?.[0],
oldSecondary2SpId || chainSecondarySpIds?.[1]
]
})
logger.info(
`[_issueUpdateReplicaSetOp] did call audiusLibs.User.updateEntityManagerReplicaSet waiting for ${blockNumber}`
)
// Wait for blockhash/blockNumber to be indexed
try {
await audiusLibs.User.waitForReplicaSetDiscoveryIndexing(
userId,
newReplicaSetSPIds,
blockNumber
)
} catch (err) {
throw new Error(
`[_issueUpdateReplicaSetOp] waitForReplicaSetDiscovery Indexing Unable to confirm updated replica set for user ${userId}`
)
}
} else {
await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet(
userId,
newReplicaSetSPIds[0], // new primary
newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2]
// This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain
oldPrimarySpId || chainPrimarySpId,
[
oldSecondary1SpId || chainSecondarySpIds?.[0],
oldSecondary2SpId || chainSecondarySpIds?.[1]
]
)
}

response.issuedReconfig = true
logger.info(
Expand Down Expand Up @@ -807,8 +843,25 @@ const _canReconfig = async ({
}: CanReconfigParams): Promise<CanReconfigReturnValue> => {
let error
try {
const { primaryId: chainPrimarySpId, secondaryIds: chainSecondarySpIds } =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(userId)
let chainPrimarySpId, chainSecondarySpIds
if (config.get('entityManagerReplicaSetEnabled')) {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})
chainPrimarySpId = spResponse?.primarySpID
chainSecondarySpIds = [
spResponse?.secondary1SpID,
spResponse?.secondary2SpID
]
} else {
const response =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)
chainPrimarySpId = response.primaryId
chainSecondarySpIds = response.secondaryIds
}

if (
!chainPrimarySpId ||
Expand Down
26 changes: 25 additions & 1 deletion creator-node/test/lib/libsMock.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const sinon = require('sinon')
const { encode, decode } = require('../../src/hashids')

function getLibsMock() {
const libsMock = {
Expand Down Expand Up @@ -67,11 +68,34 @@ function getLibsMock() {
Playlist: {
getPlaylists: sinon.mock().atLeast(1)
},
Utils: {
encodeHashId: sinon.mock().atLeast(1)
},
discoveryProvider: {
discoveryProviderEndpoint: 'http://docker.for.mac.localhost:5000'
discoveryProviderEndpoint: 'http://docker.for.mac.localhost:5000',
getUserReplicaSet: sinon.mock().atLeast(1)
}
}

libsMock.Utils.encodeHashId.callsFake((id) => {
return encode(id)
})

libsMock.discoveryProvider.getUserReplicaSet.callsFake(({ encodedUserId }) => {
const user_id = decode(encodedUserId)
return {
user_id,
"wallet": '0xadd36bad12002f1097cdb7ee24085c28e960fc32',
"primary": 'http://mock-cn1.audius.co',
"secondary1": 'http://mock-cn2.audius.co',
"secondary2": 'http://mock-cn3.audius.co',
"primarySpID": 1,
"secondary1SpID": 2,
"secondary2SpID": 3
}
})


libsMock.contracts.UserReplicaSetManagerClient.getUserReplicaSet.returns({
primaryId: 1,
secondaryIds: [2, 3]
Expand Down
Loading