Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CON-225] Remove manual sync from snapbackSM #3390

Merged
merged 29 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8dbbd06
update labels
jonaylor89 Jun 23, 2022
a4e90b3
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jun 24, 2022
aec297a
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jun 25, 2022
6de2b06
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jun 29, 2022
5e6a443
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jun 29, 2022
1a2fc9b
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 1, 2022
ab2bdb2
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 5, 2022
00f69dd
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 5, 2022
2cc98ce
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 5, 2022
790f3b0
merge
jonaylor89 Jul 5, 2022
a7ab01c
merge
jonaylor89 Jul 6, 2022
772cacb
merge
jonaylor89 Jul 6, 2022
f3a2e87
merge
jonaylor89 Jul 6, 2022
5ab2576
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 6, 2022
c444a1e
new manual sync queue
jonaylor89 Jul 6, 2022
038218a
clean instead of obliterate
jonaylor89 Jul 6, 2022
97e78dc
fix test
jonaylor89 Jul 7, 2022
8ad589d
update
jonaylor89 Jul 7, 2022
37fe3ac
merge
jonaylor89 Jul 7, 2022
a4fd80b
merge
jonaylor89 Jul 7, 2022
b8b3622
add code
jonaylor89 Jul 7, 2022
2aebc9a
fix comment
jonaylor89 Jul 8, 2022
5d9b235
update new manual sync code
jonaylor89 Jul 8, 2022
df22514
Merge branch 'master' into jn-rm-manual-sync
jonaylor89 Jul 8, 2022
96d4e30
nits
jonaylor89 Jul 8, 2022
1abe3da
Merge branch 'jn-rm-manual-sync' of https://github.com/AudiusProject/…
jonaylor89 Jul 8, 2022
f44d2b0
rm unused constants
jonaylor89 Jul 11, 2022
2b0bb47
merge better
jonaylor89 Jul 11, 2022
2d29298
lint fix
jonaylor89 Jul 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions creator-node/src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ async function ensureStorageMiddleware(req, res, next) {
*/
async function issueAndWaitForSecondarySyncRequests(req) {
const serviceRegistry = req.app.get('serviceRegistry')
const { snapbackSM } = serviceRegistry
const { StateMachineManager, manualSyncQueue } = serviceRegistry

// Parse request headers
const pollingDurationMs =
Expand Down Expand Up @@ -332,11 +332,12 @@ async function issueAndWaitForSecondarySyncRequests(req) {
const replicationStart = Date.now()
try {
const secondaryPromises = secondaries.map((secondary) => {
return snapbackSM.issueSyncRequestsUntilSynced(
return StateMachineManager.issueSyncRequestsUntilSynced(
jonaylor89 marked this conversation as resolved.
Show resolved Hide resolved
secondary,
wallet,
primaryClockVal,
pollingDurationMs
pollingDurationMs,
manualSyncQueue
)
})

Expand Down
16 changes: 9 additions & 7 deletions creator-node/src/serviceRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class ServiceRegistry {
app,
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue
stateReconciliationQueue,
manualSyncQueue
) {
jonaylor89 marked this conversation as resolved.
Show resolved Hide resolved
this.logInfo('Setting up Bull queue monitoring...')

Expand Down Expand Up @@ -178,7 +179,7 @@ class ServiceRegistry {
stateReconciliationAdapter,
new BullAdapter(cNodeEndpointToSpIdMapQueue, { readOnlyMode: true }),
new BullAdapter(this.stateMachineQueue, { readOnlyMode: true }),
new BullAdapter(this.manualSyncQueue, { readOnlyMode: true }),
new BullAdapter(manualSyncQueue, { readOnlyMode: true }),
jonaylor89 marked this conversation as resolved.
Show resolved Hide resolved
new BullAdapter(this.recurringSyncQueue, { readOnlyMode: true }),
new BullAdapter(syncProcessingQueue, { readOnlyMode: true }),
new BullAdapter(asyncProcessingQueue, { readOnlyMode: true }),
Expand Down Expand Up @@ -289,12 +290,14 @@ class ServiceRegistry {
const {
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue
stateReconciliationQueue,
manualSyncQueue
} = await this.stateMachineManager.init(this.libs, this.prometheusRegistry)

// SyncQueue construction (requires L1 identity)
// Note - passes in reference to instance of self (serviceRegistry), a very sub-optimal workaround
this.syncQueue = new SyncQueue(config, this.redis, this)
this.manualSyncQueue = manualSyncQueue

// L2URSMRegistration (requires L1 identity)
// Retries indefinitely
Expand All @@ -314,7 +317,8 @@ class ServiceRegistry {
app,
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue
stateReconciliationQueue,
manualSyncQueue
)
} catch (e) {
this.logError(
Expand Down Expand Up @@ -443,10 +447,8 @@ class ServiceRegistry {
*/
async _initSnapbackSM() {
this.snapbackSM = new SnapbackSM(config, this.libs)
const { stateMachineQueue, manualSyncQueue, recurringSyncQueue } =
this.snapbackSM
const { stateMachineQueue, recurringSyncQueue } = this.snapbackSM
this.stateMachineQueue = stateMachineQueue
this.manualSyncQueue = manualSyncQueue
this.recurringSyncQueue = recurringSyncQueue

let isInitialized = false
Expand Down
107 changes: 102 additions & 5 deletions creator-node/src/services/stateMachineManager/index.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
const _ = require('lodash')
const axios = require('axios')

const Utils = require('../utils')
const config = require('../../config')
const { logger: baseLogger } = require('../../logging')
const StateMonitoringManager = require('./stateMonitoring')
const StateReconciliationManager = require('./stateReconciliation')
const NodeToSpIdManager = require('./CNodeToSpIdMapManager')
const { RECONFIG_MODES } = require('./stateMachineConstants')
const { RECONFIG_MODES, SyncType } = require('./stateMachineConstants')
const QueueInterfacer = require('./QueueInterfacer')
const makeOnCompleteCallback = require('./makeOnCompleteCallback')
const {
getNewOrExistingSyncReq
} = require('./stateReconciliation/stateReconciliationUtils')

/**
* Manages the queue for monitoring the state of Content Nodes and
Expand All @@ -29,9 +34,8 @@ class StateMachineManager {
audiusLibs.discoveryProvider.discoveryProviderEndpoint,
prometheusRegistry
)
const stateReconciliationQueue = await stateReconciliationManager.init(
prometheusRegistry
)
const { stateReconciliationQueue, manualSyncQueue } =
await stateReconciliationManager.init(prometheusRegistry)

// Upon completion, make jobs record metrics and enqueue other jobs as necessary
stateMonitoringQueue.on(
Expand Down Expand Up @@ -60,7 +64,8 @@ class StateMachineManager {
return {
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue
stateReconciliationQueue,
manualSyncQueue
}
}

Expand Down Expand Up @@ -133,6 +138,98 @@ class StateMachineManager {
this.highestEnabledReconfigMode = highestEnabledReconfigMode
this.enabledReconfigModesSet = enabledReconfigModesSet
}

/**
* Issues syncRequest for user against secondary, and polls for replication up to primary
* If secondary fails to sync within specified timeoutMs, will error
*/
async issueSyncRequestsUntilSynced(
secondaryUrl,
wallet,
primaryClockVal,
timeoutMs,
queue
) {
// Issue syncRequest before polling secondary for replication
const { duplicateSyncReq, syncReqToEnqueue } = getNewOrExistingSyncReq({
userWallet: wallet,
secondaryEndpoint: secondaryUrl,
primaryEndpoint: this.endpoint,
syncType: SyncType.Manual,
immediate: true
})
if (!_.isEmpty(duplicateSyncReq)) {
// Log duplicate and return
baseLogger.warn(`Duplicate sync request: ${duplicateSyncReq}`)
return
} else if (!_.isEmpty(syncReqToEnqueue)) {
const { jobName, jobData } = syncReqToEnqueue
await queue.add(jobName, jobData)
} else {
// Log error that the sync request couldn't be created and return
baseLogger.error(
`Failed to create manual sync request: ${duplicateSyncReq}`
)
return
}

// Poll clock status and issue syncRequests until secondary is caught up or until timeoutMs
const start = Date.now()
while (Date.now() - start < timeoutMs) {
try {
// Retrieve secondary clock status for user
const secondaryClockStatusResp = await axios({
method: 'get',
baseURL: secondaryUrl,
url: `/users/clock_status/${wallet}`,
responseType: 'json',
timeout: 1000 // 1000ms = 1s
})
const { clockValue: secondaryClockVal, syncInProgress } =
secondaryClockStatusResp.data.data

// If secondary is synced, return successfully
if (secondaryClockVal >= primaryClockVal) {
return

// Else, if a sync is not already in progress on the secondary, issue a new SyncRequest
} else if (!syncInProgress) {
const { duplicateSyncReq, syncReqToEnqueue } =
getNewOrExistingSyncReq({
userWallet: wallet,
secondaryEndpoint: secondaryUrl,
primaryEndpoint: this.endpoint,
syncType: SyncType.Manual
})
if (!_.isEmpty(duplicateSyncReq)) {
// Log duplicate and return
baseLogger.warn(`Duplicate sync request: ${duplicateSyncReq}`)
return
} else if (!_.isEmpty(syncReqToEnqueue)) {
const { jobName, jobData } = syncReqToEnqueue
await queue.add(jobName, jobData)
} else {
// Log error that the sync request couldn't be created and return
baseLogger.error(
`Failed to create manual sync request: ${duplicateSyncReq}`
)
return
}
}

// Give secondary some time to process ongoing or newly enqueued sync
// NOTE - we might want to make this timeout longer
await Utils.timeout(500)
} catch (e) {
// do nothing and let while loop continue
}
}

// This condition will only be hit if the secondary has failed to sync within timeoutMs
throw new Error(
`Secondary ${secondaryUrl} did not sync up to primary for user ${wallet} within ${timeoutMs}ms`
)
}
}

module.exports = StateMachineManager
Loading