Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CON-225] Remove manual sync from snapbackSM #3390

Merged
merged 29 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8dbbd06
update labels
jonaylor89 Jun 23, 2022
a4e90b3
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jun 24, 2022
aec297a
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jun 25, 2022
6de2b06
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jun 29, 2022
5e6a443
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jun 29, 2022
1a2fc9b
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 1, 2022
ab2bdb2
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 5, 2022
00f69dd
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 5, 2022
2cc98ce
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 5, 2022
790f3b0
merge
jonaylor89 Jul 5, 2022
a7ab01c
merge
jonaylor89 Jul 6, 2022
772cacb
merge
jonaylor89 Jul 6, 2022
f3a2e87
merge
jonaylor89 Jul 6, 2022
5ab2576
Merge branch 'master' of https://github.com/AudiusProject/audius-prot…
jonaylor89 Jul 6, 2022
c444a1e
new manual sync queue
jonaylor89 Jul 6, 2022
038218a
clean instead of obliterate
jonaylor89 Jul 6, 2022
97e78dc
fix test
jonaylor89 Jul 7, 2022
8ad589d
update
jonaylor89 Jul 7, 2022
37fe3ac
merge
jonaylor89 Jul 7, 2022
a4fd80b
merge
jonaylor89 Jul 7, 2022
b8b3622
add code
jonaylor89 Jul 7, 2022
2aebc9a
fix comment
jonaylor89 Jul 8, 2022
5d9b235
update new manual sync code
jonaylor89 Jul 8, 2022
df22514
Merge branch 'master' into jn-rm-manual-sync
jonaylor89 Jul 8, 2022
96d4e30
nits
jonaylor89 Jul 8, 2022
1abe3da
Merge branch 'jn-rm-manual-sync' of https://github.com/AudiusProject/…
jonaylor89 Jul 8, 2022
f44d2b0
rm unused constants
jonaylor89 Jul 11, 2022
2b0bb47
merge better
jonaylor89 Jul 11, 2022
2d29298
lint fix
jonaylor89 Jul 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions creator-node/src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const { hasEnoughStorageSpace } = require('./fileManager')
const { getMonitors, MONITORS } = require('./monitors/monitors')
const { verifyRequesterIsValidSP } = require('./apiSigning')
const BlacklistManager = require('./blacklistManager')
const {
issueSyncRequestsUntilSynced
} = require('./services/stateMachineManager/stateReconciliation/stateReconciliationUtils')

/**
* Ensure valid cnodeUser and session exist for provided session token
Expand Down Expand Up @@ -269,7 +272,7 @@ async function ensureStorageMiddleware(req, res, next) {
*/
async function issueAndWaitForSecondarySyncRequests(req) {
const serviceRegistry = req.app.get('serviceRegistry')
const { snapbackSM } = serviceRegistry
const { manualSyncQueue } = serviceRegistry

// Parse request headers
const pollingDurationMs =
Expand Down Expand Up @@ -332,11 +335,13 @@ async function issueAndWaitForSecondarySyncRequests(req) {
const replicationStart = Date.now()
try {
const secondaryPromises = secondaries.map((secondary) => {
return snapbackSM.issueSyncRequestsUntilSynced(
return issueSyncRequestsUntilSynced(
jonaylor89 marked this conversation as resolved.
Show resolved Hide resolved
primary,
secondary,
wallet,
primaryClockVal,
pollingDurationMs
pollingDurationMs,
manualSyncQueue
)
})

Expand Down
30 changes: 20 additions & 10 deletions creator-node/src/serviceRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ class ServiceRegistry {
return this.blacklistManager
}

_setupBullMonitoring(app) {
setupBullMonitoring(
app,
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue,
manualSyncQueue
) {
jonaylor89 marked this conversation as resolved.
Show resolved Hide resolved
this.logInfo('Setting up Bull queue monitoring...')

const { queue: syncProcessingQueue } = this.syncQueue
Expand Down Expand Up @@ -172,11 +178,9 @@ class ServiceRegistry {
queues: [
stateMonitoringAdapter,
stateReconciliationAdapter,
new BullAdapter(this.cNodeEndpointToSpIdMapQueue, {
readOnlyMode: true
}),
new BullAdapter(manualSyncQueue, { readOnlyMode: true }),
new BullAdapter(cNodeEndpointToSpIdMapQueue, { readOnlyMode: true }),
new BullAdapter(this.stateMachineQueue, { readOnlyMode: true }),
new BullAdapter(this.manualSyncQueue, { readOnlyMode: true }),
new BullAdapter(this.recurringSyncQueue, { readOnlyMode: true }),
new BullAdapter(syncProcessingQueue, { readOnlyMode: true }),
new BullAdapter(asyncProcessingQueue, { readOnlyMode: true }),
Expand Down Expand Up @@ -289,7 +293,8 @@ class ServiceRegistry {
const {
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue
stateReconciliationQueue,
manualSyncQueue
} = await this.stateMachineManager.init(this.libs, this.prometheusRegistry)
this.stateMonitoringQueue = stateMonitoringQueue
this.cNodeEndpointToSpIdMapQueue = cNodeEndpointToSpIdMapQueue
Expand All @@ -298,6 +303,7 @@ class ServiceRegistry {
// SyncQueue construction (requires L1 identity)
// Note - passes in reference to instance of self (serviceRegistry), a very sub-optimal workaround
this.syncQueue = new SyncQueue(config, this.redis, this)
this.manualSyncQueue = manualSyncQueue

// L2URSMRegistration (requires L1 identity)
// Retries indefinitely
Expand All @@ -313,7 +319,13 @@ class ServiceRegistry {
await this.skippedCIDsRetryQueue.init()

try {
this._setupBullMonitoring(app)
this.setupBullMonitoring(
app,
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue,
manualSyncQueue
)
} catch (e) {
this.logError(
`Failed to initialize bull monitoring UI: ${e.message || e}. Skipping..`
Expand Down Expand Up @@ -441,10 +453,8 @@ class ServiceRegistry {
*/
async _initSnapbackSM() {
this.snapbackSM = new SnapbackSM(config, this.libs)
const { stateMachineQueue, manualSyncQueue, recurringSyncQueue } =
this.snapbackSM
const { stateMachineQueue, recurringSyncQueue } = this.snapbackSM
this.stateMachineQueue = stateMachineQueue
this.manualSyncQueue = manualSyncQueue
this.recurringSyncQueue = recurringSyncQueue

let isInitialized = false
Expand Down
8 changes: 4 additions & 4 deletions creator-node/src/services/stateMachineManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ class StateMachineManager {
audiusLibs.discoveryProvider.discoveryProviderEndpoint,
prometheusRegistry
)
const stateReconciliationQueue = await stateReconciliationManager.init(
prometheusRegistry
)
const { stateReconciliationQueue, manualSyncQueue } =
await stateReconciliationManager.init(prometheusRegistry)

// Upon completion, make jobs record metrics and enqueue other jobs as necessary
stateMonitoringQueue.on(
Expand Down Expand Up @@ -60,7 +59,8 @@ class StateMachineManager {
return {
stateMonitoringQueue,
cNodeEndpointToSpIdMapQueue,
stateReconciliationQueue
stateReconciliationQueue,
manualSyncQueue
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ module.exports = {
// Max number of completed/failed jobs to keep in redis for the state monitoring queue
RECONCILIATION_QUEUE_HISTORY: 300,

// Max number of completed/failed jobs to keep in redis for the state monitoring queue
MANUAL_SYNC_QUEUE_HISTORY: 300,
jonaylor89 marked this conversation as resolved.
Show resolved Hide resolved

// Max millis to run a fetch cNodeEndpoint->spId mapping job for before marking it as stalled (1 minute)
C_NODE_ENDPOINT_TO_SP_ID_MAP_QUEUE_MAX_JOB_RUNTIME_MS: 1000 * 60,

Expand Down Expand Up @@ -53,7 +56,9 @@ module.exports = {
// Max number of completed/failed jobs to keep in redis for the cNodeEndpoint->spId map queue
C_NODE_ENDPOINT_TO_SP_ID_MAP: 100,
// Max number of completed/failed jobs to keep in redis for the state monitoring queue
STATE_RECONCILIATION: 300
STATE_RECONCILIATION: 300,
// Max number of completed/failed jobs to keep in redis for the manual sync queue
MANUAL_SYNC: 300
}),

QUEUE_NAMES: Object.freeze({
Expand All @@ -62,7 +67,9 @@ module.exports = {
// Name of queue that only processes jobs to fetch the cNodeEndpoint->spId mapping,
C_NODE_ENDPOINT_TO_SP_ID_MAP: 'c-node-to-endpoint-sp-id-map-queue',
// Name of StateReconciliationQueue
STATE_RECONCILIATION: 'state-reconciliation-queue'
STATE_RECONCILIATION: 'state-reconciliation-queue',
// Name of ManualSyncQueue
MANUAL_SYNC: 'manual-sync-queue'
jonaylor89 marked this conversation as resolved.
Show resolved Hide resolved
}),

JOB_NAMES: Object.freeze({
Expand Down
Loading