From 3e135fd920f42bd6d20b4270612be21991b11b24 Mon Sep 17 00:00:00 2001 From: Sid Sethi <3323835+SidSethi@users.noreply.github.com> Date: Thu, 4 Aug 2022 15:15:44 +0000 Subject: [PATCH] all changes --- creator-node/package-lock.json | 4 +- .../replicaSet/exportComponentService.js | 20 +- creator-node/src/dbManager.js | 13 +- .../services/sync/secondarySyncFromPrimary.js | 15 +- creator-node/test/dbManager.test.js | 237 +++++++++++++++++- .../issueSyncRequest.jobProcessor.test.js | 1 - creator-node/test/lib/app.js | 2 +- creator-node/test/lib/utils.js | 83 +++++- 8 files changed, 359 insertions(+), 16 deletions(-) diff --git a/creator-node/package-lock.json b/creator-node/package-lock.json index b62667fcfbe..8e8a01d5695 100644 --- a/creator-node/package-lock.json +++ b/creator-node/package-lock.json @@ -4001,7 +4001,7 @@ "bmp-js": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/bmp-js/-/bmp-js-0.1.0.tgz", - "integrity": "sha1-4Fpj95amwf8l9Hcex62twUjAcjM=" + "integrity": "sha512-vHdS19CnY3hwiNdkaqk93DvjVLfbEcI8mys4UjuWrlX1haDmroo8o4xCzh4wD6DGV6HxRCyauwhHRqMTfERtjw==" }, "bn.js": { "version": "4.12.0", @@ -4259,7 +4259,7 @@ "buffer-equal": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/buffer-equal/-/buffer-equal-0.0.1.tgz", - "integrity": "sha1-kbx0sR6kBbyRa8aqkI+q+ltKrEs=" + "integrity": "sha512-RgSV6InVQ9ODPdLWJ5UAqBqJBOg370Nz6ZQtRzpt6nUjc8v0St97uJ4PYC6NztqIScrAXafKM3mZPMygSe1ggA==" }, "buffer-layout": { "version": "1.2.2", diff --git a/creator-node/src/components/replicaSet/exportComponentService.js b/creator-node/src/components/replicaSet/exportComponentService.js index 0ba3350b3ec..8a59ea25763 100644 --- a/creator-node/src/components/replicaSet/exportComponentService.js +++ b/creator-node/src/components/replicaSet/exportComponentService.js @@ -104,7 +104,7 @@ const exportComponentService = async ({ // since clockRecords are returned by clock ASC, clock val at last index is largest clock val cnodeUser.clock = requestedClockRangeMax logger.info( - `nodeSync.js#export - cnodeUser clock val ${curCnodeUserClockVal} is higher than requestedClockRangeMax, reset to ${requestedClockRangeMax}` + `exportComponentService() - cnodeUserUUID:${cnodeUser.cnodeUserUUID} - cnodeUser clock val ${curCnodeUserClockVal} is higher than requestedClockRangeMax, reset to ${requestedClockRangeMax}` ) } @@ -114,7 +114,9 @@ const exportComponentService = async ({ ) if (!_.isEmpty(clockRecords) && cnodeUser.clock !== maxClockRecord) { const errorMsg = `Cannot export - exported data is not consistent. Exported max clock val = ${cnodeUser.clock} and exported max ClockRecord val ${maxClockRecord}. Fixing and trying again...` - logger.error(errorMsg) + logger.error( + `exportComponentService() - cnodeUserUUID:${cnodeUser.cnodeUserUUID} - ${errorMsg}` + ) if (!forceExport) { throw new Error(errorMsg) @@ -146,8 +148,20 @@ const exportComponentService = async ({ return cnodeUsersDict } catch (e) { await transaction.rollback() + for (const cnodeUserUUID in cnodeUsersDict) { - await DBManager.fixInconsistentUser(cnodeUserUUID) + try { + const numRowsUpdated = await DBManager.fixInconsistentUser( + cnodeUserUUID + ) + logger.warn( + `exportComponentService() - cnodeUserUUID:${cnodeUserUUID} - fixInconsistentUser() executed - numRowsUpdated:${numRowsUpdated}` + ) + } catch (e) { + logger.error( + `exportComponentService() - cnodeUserUUID:${cnodeUserUUID} - fixInconsistentUser() error - ${e.message}` + ) + } } throw new Error(e) } diff --git a/creator-node/src/dbManager.js b/creator-node/src/dbManager.js index b88a8eb258b..66553f3e914 100644 --- a/creator-node/src/dbManager.js +++ b/creator-node/src/dbManager.js @@ -313,25 +313,28 @@ class DBManager { * Given a user's UUID, this function will set their clock value equal to the max clock value * found in the ClockRecords table for that same user * - * @param userUUID the UUID for the user whose clock needs to be made consistent + * @param cnodeUserUUID the UUID for the user whose clock needs to be made consistent */ - static async fixInconsistentUser(userUUID) { - sequelize.query( + static async fixInconsistentUser(cnodeUserUUID) { + const [, metadata] = await sequelize.query( ` UPDATE "CNodeUsers" as cnodeusers SET clock = subquery.max_clock FROM ( SELECT "cnodeUserUUID", MAX(clock) AS max_clock FROM "ClockRecords" - WHERE "cnodeUserUUID" = :userUUID + WHERE "cnodeUserUUID" = :cnodeUserUUID GROUP BY "cnodeUserUUID" ) AS subquery WHERE cnodeusers."cnodeUserUUID" = subquery."cnodeUserUUID" `, { - replacements: { userUUID } + replacements: { cnodeUserUUID } } ) + + const numRowsUpdated = metadata?.rowCount || 0 + return numRowsUpdated } } diff --git a/creator-node/src/services/sync/secondarySyncFromPrimary.js b/creator-node/src/services/sync/secondarySyncFromPrimary.js index 0995cfa0a48..c13ae64c54d 100644 --- a/creator-node/src/services/sync/secondarySyncFromPrimary.js +++ b/creator-node/src/services/sync/secondarySyncFromPrimary.js @@ -560,7 +560,20 @@ const handleSyncFromPrimary = async ( await transaction.rollback() - await DBManager.fixInconsistentUser(fetchedCNodeUser.cnodeUserUUID) + try { + const numRowsUpdated = await DBManager.fixInconsistentUser( + fetchedCNodeUser.cnodeUserUUID + ) + logger.warn( + logPrefix, + `fixInconsistentUser() executed for ${fetchedCNodeUser.cnodeUserUUID} - numRowsUpdated:${numRowsUpdated}` + ) + } catch (e) { + logger.error( + logPrefix, + `fixInconsistentUser() error for ${fetchedCNodeUser.cnodeUserUUID} - ${e.message}` + ) + } return { error: new Error(e), diff --git a/creator-node/test/dbManager.test.js b/creator-node/test/dbManager.test.js index 7e4c943bf9d..8115b857a86 100644 --- a/creator-node/test/dbManager.test.js +++ b/creator-node/test/dbManager.test.js @@ -12,8 +12,6 @@ const DBManager = require('../src/dbManager') const BlacklistManager = require('../src/blacklistManager') const FileManager = require('../src/fileManager') const DiskManager = require('../src/diskManager') -const { libs } = require('@audius/sdk') -const Utils = libs.Utils const utils = require('../src/utils') const { createStarterCNodeUser, @@ -25,9 +23,38 @@ const { const { getApp } = require('./lib/app') const { getLibsMock } = require('./lib/libsMock') const { saveFileToStorage, computeFilesHash } = require('./lib/helpers') +const { fetchDBStateForWallet, assertTableEquality } = require('./lib/utils') const TestAudioFilePath = path.resolve(__dirname, 'testTrack.mp3') +/** Add state to AudiusUsers table for given userId */ +const uploadAudiusUserState = async function ({ + app, + userId, + sessionToken, + metadataObj, + audiusUserBlockNumber +}) { + const audiusUserMetadataResp = await request(app) + .post('/audius_users/metadata') + .set('X-Session-ID', sessionToken) + .set('User-Id', userId) + .set('Enforce-Write-Quorum', false) + .send({ metadata: metadataObj }) + .expect(200) + + await request(app) + .post('/audius_users') + .set('X-Session-ID', sessionToken) + .set('User-Id', userId) + .send({ + blockchainUserId: userId, + blockNumber: audiusUserBlockNumber, + metadataFileUUID: audiusUserMetadataResp.body.data.metadataFileUUID + }) + .expect(200) +} + describe('Test createNewDataRecord()', async function () { const req = { logger: { @@ -1068,3 +1095,209 @@ describe('Test fetchFilesHashFromDB()', async function () { assert.deepEqual(actualResp, expectedResp) }) }) + +describe('Test fixInconsistentUser()', async function () { + const userId = 1 + + let server, app + + /** Init server to run DB migrations */ + before(async function () { + const appInfo = await getApp(getLibsMock()) + server = appInfo.server + app = appInfo.app + }) + + beforeEach(async function () { + await destroyUsers() + }) + + /** Wipe all CNodeUsers + dependent data */ + after(async function () { + await destroyUsers() + + await server.close() + }) + + it('Confirm no change to healthy users DB state', async function () { + const { cnodeUserUUID, walletPublicKey, sessionToken } = await createStarterCNodeUser(userId) + + // Upload some state for user + const audiusUserBlockNumber = 10 + const audiusUserMetadata = { test: 'field1' } + const metadataCID = 'QmQMHXPMuey2AT6fPTKnzKQCrRjPS7AbaQdDTM8VXbHC8W' + await uploadAudiusUserState({ + app, + userId, + sessionToken, + metadataObj: audiusUserMetadata, + audiusUserBlockNumber + }) + const expectedCNodeUserClock = 2 + + // Confirm expected initial state + const { + cnodeUser: initialCNodeUser, + audiusUsers: initialAudiusUsers, + tracks: initialTracks, + files: initialFiles, + clockRecords: initialClockRecords + } = await fetchDBStateForWallet(walletPublicKey, models) + assertTableEquality( + [initialCNodeUser], + [{ cnodeUserUUID, walletPublicKey, latestBlockNumber: audiusUserBlockNumber, clock: expectedCNodeUserClock }], + ['createdAt', 'updatedAt', 'lastLogin'] + ) + assertTableEquality( + initialAudiusUsers, + [{ cnodeUserUUID, clock: expectedCNodeUserClock, blockchainId: `${userId}`, metadataJSON: audiusUserMetadata, coverArtFileUUID: null, profilePicFileUUID: null }], + ['createdAt', 'updatedAt', 'metadataFileUUID'] + ) + assertTableEquality(initialTracks, []) + assertTableEquality( + initialFiles, + [{ cnodeUserUUID, trackBlockchainId: null, multihash: metadataCID, sourceFile: null, fileName: null, dirMultihash: null, storagePath: DiskManager.computeFilePath(metadataCID, false), type: "metadata", clock: 1, skipped: false }], + ['fileUUID', 'createdAt', 'updatedAt'] + ) + assertTableEquality( + initialClockRecords, + [ + { cnodeUserUUID, clock: 1, sourceTable: "File" }, + { cnodeUserUUID, clock: 2, sourceTable: "AudiusUser" } + ], + ['createdAt', 'updatedAt'] + ) + + // Call fixInconsistentUser() + const numRowsUpdated = await DBManager.fixInconsistentUser(cnodeUserUUID) + assert.strictEqual(numRowsUpdated, 1) + + // Confirm final initial state is unchanged + const { + cnodeUser: finalCNodeUser, + audiusUsers: finalAudiusUsers, + tracks: finalTracks, + files: finalFiles, + clockRecords: finalClockRecords + } = await fetchDBStateForWallet(walletPublicKey, models) + assertTableEquality( + [finalCNodeUser], + [{ cnodeUserUUID, walletPublicKey, latestBlockNumber: audiusUserBlockNumber, clock: expectedCNodeUserClock }], + ['createdAt', 'updatedAt', 'lastLogin'] + ) + assertTableEquality( + finalAudiusUsers, + [{ cnodeUserUUID, clock: expectedCNodeUserClock, blockchainId: `${userId}`, metadataJSON: audiusUserMetadata, coverArtFileUUID: null, profilePicFileUUID: null }], + ['createdAt', 'updatedAt', 'metadataFileUUID'] + ) + assertTableEquality(finalTracks, []) + assertTableEquality( + finalFiles, + [{ cnodeUserUUID, trackBlockchainId: null, multihash: metadataCID, sourceFile: null, fileName: null, dirMultihash: null, storagePath: DiskManager.computeFilePath(metadataCID, false), type: "metadata", clock: 1, skipped: false }], + ['fileUUID', 'createdAt', 'updatedAt'] + ) + assertTableEquality( + finalClockRecords, + [ + { cnodeUserUUID, clock: 1, sourceTable: "File" }, + { cnodeUserUUID, clock: 2, sourceTable: "AudiusUser" } + ], + ['createdAt', 'updatedAt'] + ) + }) + + it('Confirm inconsistent users state is correctly fixed', async function () { + const { cnodeUserUUID, walletPublicKey, sessionToken } = await createStarterCNodeUser(userId) + + // Upload some state for user + const audiusUserBlockNumber = 10 + const audiusUserMetadata = { test: 'field1' } + const metadataCID = 'QmQMHXPMuey2AT6fPTKnzKQCrRjPS7AbaQdDTM8VXbHC8W' + await uploadAudiusUserState({ + app, + userId, + sessionToken, + metadataObj: audiusUserMetadata, + audiusUserBlockNumber + }) + const expectedCNodeUserClock = 2 + const actualCNodeUserClock = 1 + + // Change cnodeUser.clock to be inconsistent with ClockRecords + await models.CNodeUser.update( + { clock: actualCNodeUserClock }, + { where: { cnodeUserUUID }} + ) + + // Confirm expected initial state + const { + cnodeUser: initialCNodeUser, + audiusUsers: initialAudiusUsers, + tracks: initialTracks, + files: initialFiles, + clockRecords: initialClockRecords + } = await fetchDBStateForWallet(walletPublicKey, models) + assertTableEquality( + [initialCNodeUser], + [{ cnodeUserUUID, walletPublicKey, latestBlockNumber: audiusUserBlockNumber, clock: actualCNodeUserClock }], + ['createdAt', 'updatedAt', 'lastLogin'] + ) + assertTableEquality( + initialAudiusUsers, + [{ cnodeUserUUID, clock: expectedCNodeUserClock, blockchainId: `${userId}`, metadataJSON: audiusUserMetadata, coverArtFileUUID: null, profilePicFileUUID: null }], + ['createdAt', 'updatedAt', 'metadataFileUUID'] + ) + assertTableEquality(initialTracks, []) + assertTableEquality( + initialFiles, + [{ cnodeUserUUID, trackBlockchainId: null, multihash: metadataCID, sourceFile: null, fileName: null, dirMultihash: null, storagePath: DiskManager.computeFilePath(metadataCID, false), type: "metadata", clock: 1, skipped: false }], + ['fileUUID', 'createdAt', 'updatedAt'] + ) + assertTableEquality( + initialClockRecords, + [ + { cnodeUserUUID, clock: 1, sourceTable: "File" }, + { cnodeUserUUID, clock: 2, sourceTable: "AudiusUser" } + ], + ['createdAt', 'updatedAt'] + ) + + // Call fixInconsistentUser() + const numRowsUpdated = await DBManager.fixInconsistentUser(cnodeUserUUID) + assert.strictEqual(numRowsUpdated, 1) + + // Confirm final initial state where CNodeUser clock is consistent with ClockRecords + const { + cnodeUser: finalCNodeUser, + audiusUsers: finalAudiusUsers, + tracks: finalTracks, + files: finalFiles, + clockRecords: finalClockRecords + } = await fetchDBStateForWallet(walletPublicKey, models) + assertTableEquality( + [finalCNodeUser], + [{ cnodeUserUUID, walletPublicKey, latestBlockNumber: audiusUserBlockNumber, clock: expectedCNodeUserClock }], + ['createdAt', 'updatedAt', 'lastLogin'] + ) + assertTableEquality( + finalAudiusUsers, + [{ cnodeUserUUID, clock: expectedCNodeUserClock, blockchainId: `${userId}`, metadataJSON: audiusUserMetadata, coverArtFileUUID: null, profilePicFileUUID: null }], + ['createdAt', 'updatedAt', 'metadataFileUUID'] + ) + assertTableEquality(finalTracks, []) + assertTableEquality( + finalFiles, + [{ cnodeUserUUID, trackBlockchainId: null, multihash: metadataCID, sourceFile: null, fileName: null, dirMultihash: null, storagePath: DiskManager.computeFilePath(metadataCID, false), type: "metadata", clock: 1, skipped: false }], + ['fileUUID', 'createdAt', 'updatedAt'] + ) + assertTableEquality( + finalClockRecords, + [ + { cnodeUserUUID, clock: 1, sourceTable: "File" }, + { cnodeUserUUID, clock: 2, sourceTable: "AudiusUser" } + ], + ['createdAt', 'updatedAt'] + ) + }) + +}) \ No newline at end of file diff --git a/creator-node/test/issueSyncRequest.jobProcessor.test.js b/creator-node/test/issueSyncRequest.jobProcessor.test.js index 5b6f4f085a3..cefbe91bff5 100644 --- a/creator-node/test/issueSyncRequest.jobProcessor.test.js +++ b/creator-node/test/issueSyncRequest.jobProcessor.test.js @@ -12,7 +12,6 @@ const models = require('../src/models') const config = require('../src/config') const stateMachineConstants = require('../src/services/stateMachineManager/stateMachineConstants') const { SyncType, QUEUE_NAMES, SYNC_MODES } = stateMachineConstants -const issueSyncRequestJobProcessor = require('../src/services/stateMachineManager/stateReconciliation/issueSyncRequest.jobProcessor') const { METRIC_NAMES } = require('../src/services/prometheusMonitoring/prometheus.constants') diff --git a/creator-node/test/lib/app.js b/creator-node/test/lib/app.js index 35d65461c1a..ca5bed54c34 100644 --- a/creator-node/test/lib/app.js +++ b/creator-node/test/lib/app.js @@ -13,7 +13,7 @@ async function getApp( libsClient, blacklistManager = BlacklistManager, setMockFn = null, - spId = null + spId = 1 ) { // we need to clear the cache that commonjs require builds, otherwise it uses old values for imports etc // eg if you set a new env var, it doesn't propogate well unless you clear the cache for the config file as well diff --git a/creator-node/test/lib/utils.js b/creator-node/test/lib/utils.js index 92a7926d74e..f57af9fbc0a 100644 --- a/creator-node/test/lib/utils.js +++ b/creator-node/test/lib/utils.js @@ -1,3 +1,6 @@ +const assert = require('assert') +const _ = require('lodash') + const Utils = require('../../src/utils') const stringifiedDateFields = (obj) => { @@ -45,9 +48,87 @@ const generateRandomCID = (numRandomDigits = 5, maxRandomNumber = 10000) => { ) } +const fetchDBStateForWallet = async (walletPublicKey, models) => { + const response = { + cnodeUser: null, + audiusUsers: null, + tracks: null, + files: null, + clockRecords: null + } + + const cnodeUser = stringifiedDateFields( + await models.CNodeUser.findOne({ + where: { + walletPublicKey + }, + raw: true + }) + ) + + if (!cnodeUser || Object.keys(cnodeUser).length === 0) { + return response + } else { + response.cnodeUser = cnodeUser + } + + const cnodeUserUUID = cnodeUser.cnodeUserUUID + + const audiusUsers = ( + await models.AudiusUser.findAll({ + where: { cnodeUserUUID }, + raw: true + }) + ).map(stringifiedDateFields) + response.audiusUsers = audiusUsers + + const tracks = ( + await models.Track.findAll({ + where: { cnodeUserUUID }, + raw: true + }) + ).map(stringifiedDateFields) + response.tracks = tracks + + const files = ( + await models.File.findAll({ + where: { cnodeUserUUID }, + raw: true + }) + ).map(stringifiedDateFields) + response.files = files + + const clockRecords = ( + await models.ClockRecord.findAll({ + where: { cnodeUserUUID }, + raw: true + }) + ).map(stringifiedDateFields) + response.clockRecords = clockRecords + + return response +} + +const assertTableEquality = (tableA, tableB, comparisonOmittedFields = []) => { + assert.deepStrictEqual( + _.orderBy( + tableA.map((entry) => _.omit(entry, comparisonOmittedFields)), + ['clock'], + ['asc'] + ), + _.orderBy( + tableB.map((entry) => _.omit(entry, comparisonOmittedFields)), + ['clock'], + ['asc'] + ) + ) +} + module.exports = { wait, stringifiedDateFields, functionThatThrowsWithMessage, - generateRandomCID + generateRandomCID, + fetchDBStateForWallet, + assertTableEquality }