Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CON-201] Remove QueueInterfacer + increase RS update concurrency #3554

Merged
merged 3 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions creator-node/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,12 @@ const config = convict({
env: 'maxRecurringRequestSyncJobConcurrency',
default: 5
},
maxUpdateReplicaSetJobConcurrency: {
doc: 'Max bull queue concurrency for update replica set jobs',
format: 'nat',
env: 'maxUpdateReplicaSetJobConcurrency',
default: 15
},
peerHealthCheckRequestTimeout: {
doc: 'Timeout [ms] for checking health check route',
format: 'nat',
Expand Down
12 changes: 7 additions & 5 deletions creator-node/src/services/initAudiusLibs.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { logger: genericLogger } = require('../logging')
*
* Configures dataWeb3 to be internal to libs, logged in with delegatePrivateKey in order to write chain TX
*/
module.exports = async (logger = genericLogger) => {
module.exports = async (excludeDiscovery = false, logger = genericLogger) => {
/**
* Define all config variables
*/
Expand Down Expand Up @@ -64,10 +64,12 @@ module.exports = async (logger = genericLogger) => {
// TODO - formatting this private key here is not ideal
delegatePrivateKey.replace('0x', '')
),
discoveryProviderConfig: {
whitelist: discoveryProviderWhitelist,
unhealthyBlockDiff: discoveryNodeUnhealthyBlockDiff
},
discoveryProviderConfig: excludeDiscovery
? null
: {
whitelist: discoveryProviderWhitelist,
unhealthyBlockDiff: discoveryNodeUnhealthyBlockDiff
},
// If an identity service config is present, set up libs with the connection, otherwise do nothing
identityServiceConfig: identityService
? AudiusLibs.configIdentityService(identityService)
Expand Down
17 changes: 0 additions & 17 deletions creator-node/src/services/stateMachineManager/QueueInterfacer.js

This file was deleted.

5 changes: 0 additions & 5 deletions creator-node/src/services/stateMachineManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@ const { logger: baseLogger } = require('../../logging')
const StateMonitoringManager = require('./stateMonitoring')
const StateReconciliationManager = require('./stateReconciliation')
const { RECONFIG_MODES, QUEUE_NAMES } = require('./stateMachineConstants')
const QueueInterfacer = require('./QueueInterfacer')
const makeOnCompleteCallback = require('./makeOnCompleteCallback')
const CNodeToSpIdMapManager = require('./CNodeToSpIdMapManager')

/**
* Manages the queue for monitoring the state of Content Nodes and
* the queue for reconciling anomalies in the state (syncs and replica set updates).
* Use QueueInterfacer for interfacing with the queues.
*/
class StateMachineManager {
async init(audiusLibs, prometheusRegistry) {
this.updateEnabledReconfigModesSet()

// TODO: Remove this and libs another way -- maybe init a new instance for each updateReplicaSet job
QueueInterfacer.init(audiusLibs)

// Initialize class immediately since bull jobs are run on cadence even on deploy
await CNodeToSpIdMapManager.updateCnodeEndpointToSpIdMap(
audiusLibs.ethContracts
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const QueueInterfacer = require('../QueueInterfacer')
const initAudiusLibs = require('../../initAudiusLibs')
const NodeToSpIdManager = require('../CNodeToSpIdMapManager')

/**
Expand All @@ -11,8 +11,9 @@ const NodeToSpIdManager = require('../CNodeToSpIdMapManager')
module.exports = async function ({ logger }) {
let errorMsg = ''
try {
const audiusLibs = await initAudiusLibs(true)
await NodeToSpIdManager.updateCnodeEndpointToSpIdMap(
QueueInterfacer.getAudiusLibs().ethContracts
audiusLibs.ethContracts
)
} catch (e) {
errorMsg = e.message || e.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,19 @@ class StateReconciliationManager {
logger.error(`Job failed to complete. ID=${job?.id}. Error=${err}`)
})

// Register the logic that gets executed to process each new job from the queue
// Register the logic that gets executed to process each new job from the queues
manualSyncQueue.process(
config.get('maxManualRequestSyncJobConcurrency'),
processManualSync
)

recurringSyncQueue.process(
config.get('maxRecurringRequestSyncJobConcurrency'),
processRecurringSync
)
updateReplicaSetQueue.process(1 /** concurrency */, processUpdateReplicaSet)
updateReplicaSetQueue.process(
config.get('maxUpdateReplicaSetJobConcurrency'),
processUpdateReplicaSet
)
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const {
} = require('../stateMachineConstants')
const { retrieveClockValueForUserFromReplica } = require('../stateMachineUtils')
const CNodeToSpIdMapManager = require('../CNodeToSpIdMapManager')
const QueueInterfacer = require('../QueueInterfacer')
const { getNewOrExistingSyncReq } = require('./stateReconciliationUtils')
const initAudiusLibs = require('../../initAudiusLibs')

const reconfigNodeWhitelist = config.get('reconfigNodeWhitelist')
? new Set(config.get('reconfigNodeWhitelist').split(','))
Expand Down Expand Up @@ -48,14 +48,13 @@ module.exports = async function ({
* on a new replica set. Also, the sync check logic is coupled with a user state on the userStateManager.
* There will be an explicit clock value check on the newly selected replica set nodes instead.
*/
const audiusLibs = await initAudiusLibs(true)
const { services: healthyServicesMap } =
await QueueInterfacer.getAudiusLibs().ServiceProvider.autoSelectCreatorNodes(
{
performSyncCheck: false,
whitelist: reconfigNodeWhitelist,
log: true
}
)
await audiusLibs.ServiceProvider.autoSelectCreatorNodes({
performSyncCheck: false,
whitelist: reconfigNodeWhitelist,
log: true
})

const healthyNodes = Object.keys(healthyServicesMap || {})
if (healthyNodes.length === 0)
Expand Down Expand Up @@ -99,6 +98,7 @@ module.exports = async function ({
secondary1,
secondary2,
newReplicaSet,
audiusLibs,
logger
))
} catch (e) {
Expand Down Expand Up @@ -461,6 +461,7 @@ const _issueUpdateReplicaSetOp = async (
secondary1,
secondary2,
newReplicaSet,
audiusLibs,
logger
) => {
const response = {
Expand Down Expand Up @@ -513,7 +514,7 @@ const _issueUpdateReplicaSetOp = async (
// Submit chain tx to update replica set
const startTimeMs = Date.now()
try {
await QueueInterfacer.getAudiusLibs().contracts.UserReplicaSetManagerClient.updateReplicaSet(
await audiusLibs.contracts.UserReplicaSetManagerClient.updateReplicaSet(
userId,
newReplicaSetSPIds[0], // primary
newReplicaSetSPIds.slice(1) // [secondary1, secondary2]
Expand Down
4 changes: 1 addition & 3 deletions creator-node/test/updateReplicaSet.jobProcessor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ describe('test updateReplicaSet job processor', function () {
'../src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.js',
{
'../../../config': config,
'../QueueInterfacer': {
getAudiusLibs: () => audiusLibsStub
},
'../../initAudiusLibs': sandbox.stub().resolves(audiusLibsStub),
'./stateReconciliationUtils': {
getNewOrExistingSyncReq: getNewOrExistingSyncReqStub
},
Expand Down