Skip to content

Commit

Permalink
[CON-294] Make manual sync queue less backed up (#3549)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored Jul 26, 2022
1 parent 87c4f2c commit 57560fb
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ module.exports = {
// Max number of attempts to select new replica set in reconfig
MAX_SELECT_NEW_REPLICA_SET_ATTEMPTS: 5,

// Max number of attempts to run a job that attempts to issue a sync (manual or recurring)
MAX_ISSUE_SYNC_JOB_ATTEMPTS: 3,
// Max number of attempts to run a job that attempts to issue a manual sync
MAX_ISSUE_MANUAL_SYNC_JOB_ATTEMPTS: 2,

// Max number of attempts to run a job that attempts to issue a recurring sync
MAX_ISSUE_RECURRING_SYNC_JOB_ATTEMPTS: 2,

QUEUE_HISTORY: Object.freeze({
// Max number of completed/failed jobs to keep in redis for the monitor-state queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ const _findReplicaSetUpdatesForUser = async (
)

/**
* For each secondary, enqueue `potentialSyncRequest` if healthy else add to `unhealthyReplicas`
* For each secondary, add to `unhealthyReplicas` if unhealthy
*/
for (const secondaryInfo of secondariesInfo) {
const secondary = secondaryInfo.endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class StateReconciliationManager {

// Register the logic that gets executed to process each new job from the queue
manualSyncQueue.process(
1, // config.get('maxManualRequestSyncJobConcurrency'),
config.get('maxManualRequestSyncJobConcurrency'),
processManualSync
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ const {
retrieveClockValueForUserFromReplica,
makeHistogramToRecord
} = require('../stateMachineUtils')
const SyncRequestDeDuplicator = require('./SyncRequestDeDuplicator')
const SecondarySyncHealthTracker = require('./SecondarySyncHealthTracker')
const {
SYNC_MONITORING_RETRY_DELAY_MS,
QUEUE_NAMES,
SYNC_MODES,
SyncType,
MAX_ISSUE_SYNC_JOB_ATTEMPTS
MAX_ISSUE_MANUAL_SYNC_JOB_ATTEMPTS,
MAX_ISSUE_RECURRING_SYNC_JOB_ATTEMPTS
} = require('../stateMachineConstants')
const primarySyncFromSecondary = require('../../sync/primarySyncFromSecondary')

Expand Down Expand Up @@ -72,10 +72,11 @@ module.exports = async function ({
}

// Enqueue a new sync request if one needs to be enqueued and we haven't retried too many times yet
if (
!_.isEmpty(syncReqToEnqueue) &&
attemptNumber < MAX_ISSUE_SYNC_JOB_ATTEMPTS
) {
const maxRetries =
syncReqToEnqueue?.syncType === SyncType.Manual
? MAX_ISSUE_MANUAL_SYNC_JOB_ATTEMPTS
: MAX_ISSUE_RECURRING_SYNC_JOB_ATTEMPTS
if (!_.isEmpty(syncReqToEnqueue) && attemptNumber < maxRetries) {
logger.info(`Retrying issue-sync-request after attempt #${attemptNumber}`)
const queueName =
syncReqToEnqueue?.syncType === SyncType.Manual
Expand All @@ -86,7 +87,7 @@ module.exports = async function ({
}
} else {
logger.info(
`Gave up retrying issue-sync-request after ${attemptNumber} failed attempts`
`Gave up retrying issue-sync-request (type: ${syncReqToEnqueue?.syncType}) after ${attemptNumber} failed attempts`
)
}

Expand Down Expand Up @@ -133,9 +134,13 @@ async function _handleIssueSyncRequest({

/**
* Remove sync from SyncRequestDeDuplicator once it moves to Active status, before processing.
* It is ok for two identical syncs to be present in Active and Waiting, just not two in Waiting
* It is ok for two identical syncs to be present in Active and Waiting, just not two in Waiting.
* We don't dedupe manual syncs.
*/
SyncRequestDeDuplicator.removeSync(syncType, userWallet, secondaryEndpoint)
if (syncType === SyncType.Recurring) {
const SyncRequestDeDuplicator = require('./SyncRequestDeDuplicator')
SyncRequestDeDuplicator.removeSync(syncType, userWallet, secondaryEndpoint)
}

/**
* Do not issue syncRequest if SecondaryUserSyncFailureCountForToday already exceeded threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ const getNewOrExistingSyncReq = ({
syncRequestParameters
}

// Record sync in syncDeDuplicator
SyncRequestDeDuplicator.recordSync(
syncType,
userWallet,
secondaryEndpoint,
syncReqToEnqueue
)
// Record sync in syncDeDuplicator for recurring syncs only
if (syncType === SyncType.Recurring) {
SyncRequestDeDuplicator.recordSync(
syncType,
userWallet,
secondaryEndpoint,
syncReqToEnqueue
)
}

return { syncReqToEnqueue }
}
Expand Down

0 comments on commit 57560fb

Please sign in to comment.