Skip to content

Commit

Permalink
[CON-225] Remove manual sync from snapbackSM
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaylor89 authored Jul 11, 2022
1 parent 4be90ff commit 553ad84
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 81 deletions.
40 changes: 20 additions & 20 deletions creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions creator-node/src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const { hasEnoughStorageSpace } = require('./fileManager')
const { getMonitors, MONITORS } = require('./monitors/monitors')
const { verifyRequesterIsValidSP } = require('./apiSigning')
const BlacklistManager = require('./blacklistManager')
const {
issueSyncRequestsUntilSynced
} = require('./services/stateMachineManager/stateReconciliation/stateReconciliationUtils')

/**
* Ensure valid cnodeUser and session exist for provided session token
Expand Down Expand Up @@ -269,7 +272,7 @@ async function ensureStorageMiddleware(req, res, next) {
*/
async function issueAndWaitForSecondarySyncRequests(req) {
const serviceRegistry = req.app.get('serviceRegistry')
const { snapbackSM } = serviceRegistry
const { manualSyncQueue } = serviceRegistry

// Parse request headers
const pollingDurationMs =
Expand Down Expand Up @@ -332,11 +335,13 @@ async function issueAndWaitForSecondarySyncRequests(req) {
const replicationStart = Date.now()
try {
const secondaryPromises = secondaries.map((secondary) => {
return snapbackSM.issueSyncRequestsUntilSynced(
return issueSyncRequestsUntilSynced(
primary,
secondary,
wallet,
primaryClockVal,
pollingDurationMs
pollingDurationMs,
manualSyncQueue
)
})

Expand Down
10 changes: 5 additions & 5 deletions creator-node/src/serviceRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ class ServiceRegistry {
queues: [
stateMonitoringAdapter,
stateReconciliationAdapter,
new BullAdapter(this.manualSyncQueue, { readOnlyMode: true }),
new BullAdapter(this.cNodeEndpointToSpIdMapQueue, {
readOnlyMode: true
}),
new BullAdapter(this.stateMachineQueue, { readOnlyMode: true }),
new BullAdapter(this.manualSyncQueue, { readOnlyMode: true }),
new BullAdapter(this.recurringSyncQueue, { readOnlyMode: true }),
new BullAdapter(syncProcessingQueue, { readOnlyMode: true }),
new BullAdapter(asyncProcessingQueue, { readOnlyMode: true }),
Expand Down Expand Up @@ -289,7 +289,8 @@ class ServiceRegistry {
const {
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue
stateReconciliationQueue,
manualSyncQueue
} = await this.stateMachineManager.init(this.libs, this.prometheusRegistry)
this.stateMonitoringQueue = stateMonitoringQueue
this.cNodeEndpointToSpIdMapQueue = cNodeEndpointToSpIdMapQueue
Expand All @@ -298,6 +299,7 @@ class ServiceRegistry {
// SyncQueue construction (requires L1 identity)
// Note - passes in reference to instance of self (serviceRegistry), a very sub-optimal workaround
this.syncQueue = new SyncQueue(config, this.redis, this)
this.manualSyncQueue = manualSyncQueue

// L2URSMRegistration (requires L1 identity)
// Retries indefinitely
Expand Down Expand Up @@ -441,10 +443,8 @@ class ServiceRegistry {
*/
async _initSnapbackSM() {
this.snapbackSM = new SnapbackSM(config, this.libs)
const { stateMachineQueue, manualSyncQueue, recurringSyncQueue } =
this.snapbackSM
const { stateMachineQueue, recurringSyncQueue } = this.snapbackSM
this.stateMachineQueue = stateMachineQueue
this.manualSyncQueue = manualSyncQueue
this.recurringSyncQueue = recurringSyncQueue

let isInitialized = false
Expand Down
8 changes: 4 additions & 4 deletions creator-node/src/services/stateMachineManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ class StateMachineManager {
audiusLibs.discoveryProvider.discoveryProviderEndpoint,
prometheusRegistry
)
const stateReconciliationQueue = await stateReconciliationManager.init(
prometheusRegistry
)
const { stateReconciliationQueue, manualSyncQueue } =
await stateReconciliationManager.init(prometheusRegistry)

// Upon completion, make jobs record metrics and enqueue other jobs as necessary
stateMonitoringQueue.on(
Expand Down Expand Up @@ -60,7 +59,8 @@ class StateMachineManager {
return {
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue
stateReconciliationQueue,
manualSyncQueue
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ module.exports = {
// Max number of completed/failed jobs to keep in redis for the state monitoring queue
MONITORING_QUEUE_HISTORY: 20,

// Max number of completed/failed jobs to keep in redis for the state monitoring queue
RECONCILIATION_QUEUE_HISTORY: 300,

// Max millis to run a fetch cNodeEndpoint->spId mapping job for before marking it as stalled (1 minute)
C_NODE_ENDPOINT_TO_SP_ID_MAP_QUEUE_MAX_JOB_RUNTIME_MS: 1000 * 60,

Expand Down Expand Up @@ -53,7 +50,9 @@ module.exports = {
// Max number of completed/failed jobs to keep in redis for the cNodeEndpoint->spId map queue
C_NODE_ENDPOINT_TO_SP_ID_MAP: 100,
// Max number of completed/failed jobs to keep in redis for the state monitoring queue
STATE_RECONCILIATION: 300
STATE_RECONCILIATION: 300,
// Max number of completed/failed jobs to keep in redis for the manual sync queue
MANUAL_SYNC: 300
}),

QUEUE_NAMES: Object.freeze({
Expand All @@ -62,7 +61,9 @@ module.exports = {
// Name of queue that only processes jobs to fetch the cNodeEndpoint->spId mapping,
C_NODE_ENDPOINT_TO_SP_ID_MAP: 'c-node-to-endpoint-sp-id-map-queue',
// Name of StateReconciliationQueue
STATE_RECONCILIATION: 'state-reconciliation-queue'
STATE_RECONCILIATION: 'state-reconciliation-queue',
// Name of ManualSyncQueue
MANUAL_SYNC: 'manual-sync-queue'
}),

JOB_NAMES: Object.freeze({
Expand Down
Loading

0 comments on commit 553ad84

Please sign in to comment.