Skip to content

Commit

Permalink
all changes
Browse files Browse the repository at this point in the history
  • Loading branch information
SidSethi committed Aug 4, 2022
1 parent 01f924a commit 3e135fd
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 16 deletions.
4 changes: 2 additions & 2 deletions creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 17 additions & 3 deletions creator-node/src/components/replicaSet/exportComponentService.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ const exportComponentService = async ({
// since clockRecords are returned by clock ASC, clock val at last index is largest clock val
cnodeUser.clock = requestedClockRangeMax
logger.info(
`nodeSync.js#export - cnodeUser clock val ${curCnodeUserClockVal} is higher than requestedClockRangeMax, reset to ${requestedClockRangeMax}`
`exportComponentService() - cnodeUserUUID:${cnodeUser.cnodeUserUUID} - cnodeUser clock val ${curCnodeUserClockVal} is higher than requestedClockRangeMax, reset to ${requestedClockRangeMax}`
)
}

Expand All @@ -114,7 +114,9 @@ const exportComponentService = async ({
)
if (!_.isEmpty(clockRecords) && cnodeUser.clock !== maxClockRecord) {
const errorMsg = `Cannot export - exported data is not consistent. Exported max clock val = ${cnodeUser.clock} and exported max ClockRecord val ${maxClockRecord}. Fixing and trying again...`
logger.error(errorMsg)
logger.error(
`exportComponentService() - cnodeUserUUID:${cnodeUser.cnodeUserUUID} - ${errorMsg}`
)

if (!forceExport) {
throw new Error(errorMsg)
Expand Down Expand Up @@ -146,8 +148,20 @@ const exportComponentService = async ({
return cnodeUsersDict
} catch (e) {
await transaction.rollback()

for (const cnodeUserUUID in cnodeUsersDict) {
await DBManager.fixInconsistentUser(cnodeUserUUID)
try {
const numRowsUpdated = await DBManager.fixInconsistentUser(
cnodeUserUUID
)
logger.warn(
`exportComponentService() - cnodeUserUUID:${cnodeUserUUID} - fixInconsistentUser() executed - numRowsUpdated:${numRowsUpdated}`
)
} catch (e) {
logger.error(
`exportComponentService() - cnodeUserUUID:${cnodeUserUUID} - fixInconsistentUser() error - ${e.message}`
)
}
}
throw new Error(e)
}
Expand Down
13 changes: 8 additions & 5 deletions creator-node/src/dbManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -313,25 +313,28 @@ class DBManager {
* Given a user's UUID, this function will set their clock value equal to the max clock value
* found in the ClockRecords table for that same user
*
* @param userUUID the UUID for the user whose clock needs to be made consistent
* @param cnodeUserUUID the UUID for the user whose clock needs to be made consistent
*/
static async fixInconsistentUser(userUUID) {
sequelize.query(
static async fixInconsistentUser(cnodeUserUUID) {
const [, metadata] = await sequelize.query(
`
UPDATE "CNodeUsers" as cnodeusers
SET clock = subquery.max_clock
FROM (
SELECT "cnodeUserUUID", MAX(clock) AS max_clock
FROM "ClockRecords"
WHERE "cnodeUserUUID" = :userUUID
WHERE "cnodeUserUUID" = :cnodeUserUUID
GROUP BY "cnodeUserUUID"
) AS subquery
WHERE cnodeusers."cnodeUserUUID" = subquery."cnodeUserUUID"
`,
{
replacements: { userUUID }
replacements: { cnodeUserUUID }
}
)

const numRowsUpdated = metadata?.rowCount || 0
return numRowsUpdated
}
}

Expand Down
15 changes: 14 additions & 1 deletion creator-node/src/services/sync/secondarySyncFromPrimary.js
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,20 @@ const handleSyncFromPrimary = async (

await transaction.rollback()

await DBManager.fixInconsistentUser(fetchedCNodeUser.cnodeUserUUID)
try {
const numRowsUpdated = await DBManager.fixInconsistentUser(
fetchedCNodeUser.cnodeUserUUID
)
logger.warn(
logPrefix,
`fixInconsistentUser() executed for ${fetchedCNodeUser.cnodeUserUUID} - numRowsUpdated:${numRowsUpdated}`
)
} catch (e) {
logger.error(
logPrefix,
`fixInconsistentUser() error for ${fetchedCNodeUser.cnodeUserUUID} - ${e.message}`
)
}

return {
error: new Error(e),
Expand Down
237 changes: 235 additions & 2 deletions creator-node/test/dbManager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ const DBManager = require('../src/dbManager')
const BlacklistManager = require('../src/blacklistManager')
const FileManager = require('../src/fileManager')
const DiskManager = require('../src/diskManager')
const { libs } = require('@audius/sdk')
const Utils = libs.Utils
const utils = require('../src/utils')
const {
createStarterCNodeUser,
Expand All @@ -25,9 +23,38 @@ const {
const { getApp } = require('./lib/app')
const { getLibsMock } = require('./lib/libsMock')
const { saveFileToStorage, computeFilesHash } = require('./lib/helpers')
const { fetchDBStateForWallet, assertTableEquality } = require('./lib/utils')

const TestAudioFilePath = path.resolve(__dirname, 'testTrack.mp3')

/** Add state to AudiusUsers table for given userId */
const uploadAudiusUserState = async function ({
app,
userId,
sessionToken,
metadataObj,
audiusUserBlockNumber
}) {
const audiusUserMetadataResp = await request(app)
.post('/audius_users/metadata')
.set('X-Session-ID', sessionToken)
.set('User-Id', userId)
.set('Enforce-Write-Quorum', false)
.send({ metadata: metadataObj })
.expect(200)

await request(app)
.post('/audius_users')
.set('X-Session-ID', sessionToken)
.set('User-Id', userId)
.send({
blockchainUserId: userId,
blockNumber: audiusUserBlockNumber,
metadataFileUUID: audiusUserMetadataResp.body.data.metadataFileUUID
})
.expect(200)
}

describe('Test createNewDataRecord()', async function () {
const req = {
logger: {
Expand Down Expand Up @@ -1068,3 +1095,209 @@ describe('Test fetchFilesHashFromDB()', async function () {
assert.deepEqual(actualResp, expectedResp)
})
})

describe('Test fixInconsistentUser()', async function () {
const userId = 1

let server, app

/** Init server to run DB migrations */
before(async function () {
const appInfo = await getApp(getLibsMock())
server = appInfo.server
app = appInfo.app
})

beforeEach(async function () {
await destroyUsers()
})

/** Wipe all CNodeUsers + dependent data */
after(async function () {
await destroyUsers()

await server.close()
})

it('Confirm no change to healthy users DB state', async function () {
const { cnodeUserUUID, walletPublicKey, sessionToken } = await createStarterCNodeUser(userId)

// Upload some state for user
const audiusUserBlockNumber = 10
const audiusUserMetadata = { test: 'field1' }
const metadataCID = 'QmQMHXPMuey2AT6fPTKnzKQCrRjPS7AbaQdDTM8VXbHC8W'
await uploadAudiusUserState({
app,
userId,
sessionToken,
metadataObj: audiusUserMetadata,
audiusUserBlockNumber
})
const expectedCNodeUserClock = 2

// Confirm expected initial state
const {
cnodeUser: initialCNodeUser,
audiusUsers: initialAudiusUsers,
tracks: initialTracks,
files: initialFiles,
clockRecords: initialClockRecords
} = await fetchDBStateForWallet(walletPublicKey, models)
assertTableEquality(
[initialCNodeUser],
[{ cnodeUserUUID, walletPublicKey, latestBlockNumber: audiusUserBlockNumber, clock: expectedCNodeUserClock }],
['createdAt', 'updatedAt', 'lastLogin']
)
assertTableEquality(
initialAudiusUsers,
[{ cnodeUserUUID, clock: expectedCNodeUserClock, blockchainId: `${userId}`, metadataJSON: audiusUserMetadata, coverArtFileUUID: null, profilePicFileUUID: null }],
['createdAt', 'updatedAt', 'metadataFileUUID']
)
assertTableEquality(initialTracks, [])
assertTableEquality(
initialFiles,
[{ cnodeUserUUID, trackBlockchainId: null, multihash: metadataCID, sourceFile: null, fileName: null, dirMultihash: null, storagePath: DiskManager.computeFilePath(metadataCID, false), type: "metadata", clock: 1, skipped: false }],
['fileUUID', 'createdAt', 'updatedAt']
)
assertTableEquality(
initialClockRecords,
[
{ cnodeUserUUID, clock: 1, sourceTable: "File" },
{ cnodeUserUUID, clock: 2, sourceTable: "AudiusUser" }
],
['createdAt', 'updatedAt']
)

// Call fixInconsistentUser()
const numRowsUpdated = await DBManager.fixInconsistentUser(cnodeUserUUID)
assert.strictEqual(numRowsUpdated, 1)

// Confirm final initial state is unchanged
const {
cnodeUser: finalCNodeUser,
audiusUsers: finalAudiusUsers,
tracks: finalTracks,
files: finalFiles,
clockRecords: finalClockRecords
} = await fetchDBStateForWallet(walletPublicKey, models)
assertTableEquality(
[finalCNodeUser],
[{ cnodeUserUUID, walletPublicKey, latestBlockNumber: audiusUserBlockNumber, clock: expectedCNodeUserClock }],
['createdAt', 'updatedAt', 'lastLogin']
)
assertTableEquality(
finalAudiusUsers,
[{ cnodeUserUUID, clock: expectedCNodeUserClock, blockchainId: `${userId}`, metadataJSON: audiusUserMetadata, coverArtFileUUID: null, profilePicFileUUID: null }],
['createdAt', 'updatedAt', 'metadataFileUUID']
)
assertTableEquality(finalTracks, [])
assertTableEquality(
finalFiles,
[{ cnodeUserUUID, trackBlockchainId: null, multihash: metadataCID, sourceFile: null, fileName: null, dirMultihash: null, storagePath: DiskManager.computeFilePath(metadataCID, false), type: "metadata", clock: 1, skipped: false }],
['fileUUID', 'createdAt', 'updatedAt']
)
assertTableEquality(
finalClockRecords,
[
{ cnodeUserUUID, clock: 1, sourceTable: "File" },
{ cnodeUserUUID, clock: 2, sourceTable: "AudiusUser" }
],
['createdAt', 'updatedAt']
)
})

it('Confirm inconsistent users state is correctly fixed', async function () {
const { cnodeUserUUID, walletPublicKey, sessionToken } = await createStarterCNodeUser(userId)

// Upload some state for user
const audiusUserBlockNumber = 10
const audiusUserMetadata = { test: 'field1' }
const metadataCID = 'QmQMHXPMuey2AT6fPTKnzKQCrRjPS7AbaQdDTM8VXbHC8W'
await uploadAudiusUserState({
app,
userId,
sessionToken,
metadataObj: audiusUserMetadata,
audiusUserBlockNumber
})
const expectedCNodeUserClock = 2
const actualCNodeUserClock = 1

// Change cnodeUser.clock to be inconsistent with ClockRecords
await models.CNodeUser.update(
{ clock: actualCNodeUserClock },
{ where: { cnodeUserUUID }}
)

// Confirm expected initial state
const {
cnodeUser: initialCNodeUser,
audiusUsers: initialAudiusUsers,
tracks: initialTracks,
files: initialFiles,
clockRecords: initialClockRecords
} = await fetchDBStateForWallet(walletPublicKey, models)
assertTableEquality(
[initialCNodeUser],
[{ cnodeUserUUID, walletPublicKey, latestBlockNumber: audiusUserBlockNumber, clock: actualCNodeUserClock }],
['createdAt', 'updatedAt', 'lastLogin']
)
assertTableEquality(
initialAudiusUsers,
[{ cnodeUserUUID, clock: expectedCNodeUserClock, blockchainId: `${userId}`, metadataJSON: audiusUserMetadata, coverArtFileUUID: null, profilePicFileUUID: null }],
['createdAt', 'updatedAt', 'metadataFileUUID']
)
assertTableEquality(initialTracks, [])
assertTableEquality(
initialFiles,
[{ cnodeUserUUID, trackBlockchainId: null, multihash: metadataCID, sourceFile: null, fileName: null, dirMultihash: null, storagePath: DiskManager.computeFilePath(metadataCID, false), type: "metadata", clock: 1, skipped: false }],
['fileUUID', 'createdAt', 'updatedAt']
)
assertTableEquality(
initialClockRecords,
[
{ cnodeUserUUID, clock: 1, sourceTable: "File" },
{ cnodeUserUUID, clock: 2, sourceTable: "AudiusUser" }
],
['createdAt', 'updatedAt']
)

// Call fixInconsistentUser()
const numRowsUpdated = await DBManager.fixInconsistentUser(cnodeUserUUID)
assert.strictEqual(numRowsUpdated, 1)

// Confirm final initial state where CNodeUser clock is consistent with ClockRecords
const {
cnodeUser: finalCNodeUser,
audiusUsers: finalAudiusUsers,
tracks: finalTracks,
files: finalFiles,
clockRecords: finalClockRecords
} = await fetchDBStateForWallet(walletPublicKey, models)
assertTableEquality(
[finalCNodeUser],
[{ cnodeUserUUID, walletPublicKey, latestBlockNumber: audiusUserBlockNumber, clock: expectedCNodeUserClock }],
['createdAt', 'updatedAt', 'lastLogin']
)
assertTableEquality(
finalAudiusUsers,
[{ cnodeUserUUID, clock: expectedCNodeUserClock, blockchainId: `${userId}`, metadataJSON: audiusUserMetadata, coverArtFileUUID: null, profilePicFileUUID: null }],
['createdAt', 'updatedAt', 'metadataFileUUID']
)
assertTableEquality(finalTracks, [])
assertTableEquality(
finalFiles,
[{ cnodeUserUUID, trackBlockchainId: null, multihash: metadataCID, sourceFile: null, fileName: null, dirMultihash: null, storagePath: DiskManager.computeFilePath(metadataCID, false), type: "metadata", clock: 1, skipped: false }],
['fileUUID', 'createdAt', 'updatedAt']
)
assertTableEquality(
finalClockRecords,
[
{ cnodeUserUUID, clock: 1, sourceTable: "File" },
{ cnodeUserUUID, clock: 2, sourceTable: "AudiusUser" }
],
['createdAt', 'updatedAt']
)
})

})
1 change: 0 additions & 1 deletion creator-node/test/issueSyncRequest.jobProcessor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ const models = require('../src/models')
const config = require('../src/config')
const stateMachineConstants = require('../src/services/stateMachineManager/stateMachineConstants')
const { SyncType, QUEUE_NAMES, SYNC_MODES } = stateMachineConstants
const issueSyncRequestJobProcessor = require('../src/services/stateMachineManager/stateReconciliation/issueSyncRequest.jobProcessor')
const {
METRIC_NAMES
} = require('../src/services/prometheusMonitoring/prometheus.constants')
Expand Down
2 changes: 1 addition & 1 deletion creator-node/test/lib/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async function getApp(
libsClient,
blacklistManager = BlacklistManager,
setMockFn = null,
spId = null
spId = 1
) {
// we need to clear the cache that commonjs require builds, otherwise it uses old values for imports etc
// eg if you set a new env var, it doesn't propogate well unless you clear the cache for the config file as well
Expand Down
Loading

0 comments on commit 3e135fd

Please sign in to comment.