Skip to content

Commit

Permalink
Test monitoring queue steps (#3155)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored May 26, 2022
1 parent b06486d commit baf8bfa
Show file tree
Hide file tree
Showing 6 changed files with 897 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,10 @@ module.exports = {
key: 'ENTIRE_REPLICA_SET',
value: 4
}
}),
// Describes the type of sync operation
SyncType: Object.freeze({
Recurring: 'RECURRING', // Scheduled background sync to keep secondaries up to date
Manual: 'MANUAL' // Triggered by a user data write to primary
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ class StateMonitoringQueue {
config.get('redisHost'),
config.get('redisPort')
)
this.registerQueueEventHandlers(
this.queue,
this.enqueueJobAfterSuccess,
this.enqueueJobAfterFailure
)
this.registerQueueEventHandlers({
queue: this.queue,
jobSuccessCallback: this.enqueueJobAfterSuccess,
jobFailureCallback: this.enqueueJobAfterFailure
})
this.registerQueueJobProcessor(this.queue)

await this.startQueue(
Expand Down Expand Up @@ -80,11 +80,15 @@ class StateMonitoringQueue {

/**
* Registers event handlers for logging and job success/failure.
* @param {Object} queue the queue to register events for
* @param {Function<queue, successfulJob, jobResult>} jobSuccessCallback the function to call when a job succeeds
* @param {Function<queue, failedJob>} jobFailureCallback the function to call when a job fails
* @param {Object} params.queue the queue to register events for
* @param {Function<queue, successfulJob, jobResult>} params.jobSuccessCallback the function to call when a job succeeds
* @param {Function<queue, failedJob>} params.jobFailureCallback the function to call when a job fails
*/
registerQueueEventHandlers(queue, jobSuccessCallback, jobFailureCallback) {
registerQueueEventHandlers({
queue,
jobSuccessCallback,
jobFailureCallback
}) {
// Add handlers for logging
queue.on('global:waiting', (jobId) => {
this.log(`Queue Job Waiting - ID ${jobId}`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,12 @@ const MIN_SECONDARY_USER_SYNC_SUCCESS_PERCENT =
* @returns the ID of the newest user on Audius
*/
const getLatestUserIdFromDiscovery = async (discoveryNodeEndpoint) => {
if (!discoveryNodeEndpoint) {
throw new Error('No discovery provider currently selected, exiting')
}

// Will throw error on non-200 response
let latestUserId = 0
try {
// Request all users that have this node as a replica (either primary or secondary)
const resp = await Utils.asyncRetry({
logLabel: 'fetch all users with this node in replica',
logLabel: 'fetch the ID of the newest user on Audius',
asyncFn: async () => {
return axios({
method: 'get',
Expand Down Expand Up @@ -73,11 +69,6 @@ const getNodeUsers = async (
prevUserId = 0,
maxUsers = GET_NODE_USERS_DEFAULT_PAGE_SIZE
) => {
// Fetch discovery node currently connected to libs as this can change
if (!discoveryNodeEndpoint) {
throw new Error('No discovery provider currently selected, exiting')
}

// Will throw error on non-200 response
let nodeUsers
try {
Expand All @@ -86,7 +77,7 @@ const getNodeUsers = async (
setTimeout(
() =>
cancelTokenSource.cancel(
`getNodeUsers took more than ${GET_NODE_USERS_CANCEL_TOKEN_MS}ms and did not time out`
`getNodeUsers() took more than ${GET_NODE_USERS_CANCEL_TOKEN_MS}ms and did not time out`
),
GET_NODE_USERS_CANCEL_TOKEN_MS
)
Expand All @@ -113,13 +104,13 @@ const getNodeUsers = async (
nodeUsers = resp.data.data
} catch (e) {
if (axios.isCancel(e)) {
logger.error(`getNodeUsers request canceled: ${e.message}`)
logger.error(`getNodeUsers() request canceled: ${e.message}`)
}
throw new Error(
`getNodeUsers Error: ${e.toString()} - connected discovery node [${discoveryNodeEndpoint}]`
`getNodeUsers() Error: ${e.toString()} - connected discovery node [${discoveryNodeEndpoint}]`
)
} finally {
logger.info(`getNodeUsers nodeUsers.length: ${nodeUsers?.length}`)
logger.info(`getNodeUsers() nodeUsers.length: ${nodeUsers?.length}`)
}

// Ensure every object in response array contains all required fields
Expand All @@ -140,7 +131,7 @@ const getNodeUsers = async (
)
if (!allRequiredFieldsPresent) {
throw new Error(
'getNodeUsers Error: Unexpected response format during getNodeUsers call'
'getNodeUsers() Error: Unexpected response format during getNodeUsers() call'
)
}
}
Expand Down
6 changes: 3 additions & 3 deletions creator-node/test/StateMonitoringQueue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ describe('test StateMonitoringQueue initialization and logging', function () {
expect(stateMonitoringQueue.queue).to.exist.and.to.be.instanceOf(BullQueue)
expect(stateMonitoringQueue.registerQueueEventHandlers).to.have.been
.calledOnce
expect(
stateMonitoringQueue.registerQueueEventHandlers.getCall(0).args[0]
).to.have.deep.property('name', STATE_MONITORING_QUEUE_NAME)
expect(stateMonitoringQueue.registerQueueEventHandlers.getCall(0).args[0])
.to.have.property('queue')
.that.has.deep.property('name', STATE_MONITORING_QUEUE_NAME)
})

it('kicks off an initial job when initting', async function () {
Expand Down
4 changes: 4 additions & 0 deletions creator-node/test/peerSetManager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ describe('test peerSetManager -- determinePeerHealth', () => {
})
})

afterEach(() => {
nock.cleanAll()
})

it('should throw error if storage path vars are improper', () => {
let verboseHealthCheckResp = {
...baseVerboseHealthCheckResp
Expand Down
Loading

0 comments on commit baf8bfa

Please sign in to comment.