Skip to content

Commit

Permalink
[CON-149] Increase snapbackSM test coverage (#3109)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored May 18, 2022
1 parent 6576c98 commit abf82a7
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 24 deletions.
6 changes: 6 additions & 0 deletions creator-node/src/snapbackSM/StateMachineConstants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module.exports = {
// Max number of attempts to select new replica set in reconfig
MAX_SELECT_NEW_REPLICA_SET_ATTEMPTS: 5,
// Max number of attempts to fetch clock statuses from /users/batch_clock_status
MAX_USER_BATCH_CLOCK_FETCH_RETRIES: 5
}
37 changes: 19 additions & 18 deletions creator-node/src/snapbackSM/snapbackSM.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@ const PeerSetManager = require('./peerSetManager')
const { CreatorNode } = require('@audius/libs')
const SecondarySyncHealthTracker = require('./secondarySyncHealthTracker')
const { generateTimestampAndSignature } = require('../apiSigning')
const {
MAX_SELECT_NEW_REPLICA_SET_ATTEMPTS,
MAX_USER_BATCH_CLOCK_FETCH_RETRIES
} = require('./StateMachineConstants')

// Retry delay between requests during monitoring
const SyncMonitoringRetryDelayMs = 15000

// Max number of attempts to select new replica set in reconfig
const MAX_SELECT_NEW_REPLICA_SET_ATTEMPTS = 5

// Timeout for fetching batch clock values
const BATCH_CLOCK_STATUS_REQUEST_TIMEOUT = 10000 // 10s

// Timeout for fetching a clock value for a singular user
const CLOCK_STATUS_REQUEST_TIMEOUT_MS = 2000 // 2s

const MAX_USER_BATCH_CLOCK_FETCH_RETRIES = 5

// Number of users to process in each batch for this._aggregateOps
const AGGREGATE_RECONFIG_AND_POTENTIAL_SYNC_OPS_BATCH_SIZE = 500

Expand Down Expand Up @@ -484,7 +483,9 @@ class SnapbackSM {
replicaSetNodesToUserClockStatusesMap
) {
this.log(
`[issueUpdateReplicaSetOp] userId=${userId} wallet=${wallet} unhealthy replica set=[${unhealthyReplicas}] numHealthyNodes=${healthyNodes.length}`
`[issueUpdateReplicaSetOp] userId=${userId} wallet=${wallet} unhealthy replica set=${JSON.stringify(
unhealthyReplicas
)} numHealthyNodes=${healthyNodes.length}`
)

const response = { errorMsg: null, issuedReconfig: false }
Expand Down Expand Up @@ -639,12 +640,12 @@ class SnapbackSM {
const healthyReplicaSet = new Set(
currentReplicaSet.filter((node) => !unhealthyReplicasSet.has(node))
)
const newReplicaNodes = await this.selectRandomReplicaSetNodes({
const newReplicaNodes = await this.selectRandomReplicaSetNodes(
healthyReplicaSet,
numberOfUnhealthyReplicas: unhealthyReplicasSet.size,
unhealthyReplicasSet.size,
healthyNodes,
wallet
})
)

let newPrimary
if (unhealthyReplicasSet.size === 1) {
Expand Down Expand Up @@ -708,19 +709,18 @@ class SnapbackSM {
* searching for a node that has no state.
*
* If an insufficient amount of new replica set nodes are chosen, this method will throw an error.
* @param {Object} param
* @param {Set<string>} param.healthyReplicasSet a set of the healthy replica set endpoints
* @param {Set<string>} param.numberOfUnhealthyReplicas the number of unhealthy replica set endpoints
* @param {string[]} param.healthyNodes an array of all the healthy nodes available on the network
* @param {string} param.wallet the wallet of the current user
* @returns a string[] of the new replica set nodes
* @param {Set<string>} healthyReplicaSet a set of the healthy replica set endpoints
* @param {number} numberOfUnhealthyReplicas the number of unhealthy replica set endpoints
* @param {string[]} healthyNodes an array of all the healthy nodes available on the network
* @param {string} wallet the wallet of the current user
* @returns {string[]} a string[] of the new replica set nodes
*/
async selectRandomReplicaSetNodes({
async selectRandomReplicaSetNodes(
healthyReplicaSet,
numberOfUnhealthyReplicas,
healthyNodes,
wallet
}) {
) {
const logStr = `[selectRandomReplicaSetNodes] wallet=${wallet} healthyReplicaSet=[${[
...healthyReplicaSet
]}] numberOfUnhealthyReplicas=${numberOfUnhealthyReplicas} numberHealthyNodes=${
Expand Down Expand Up @@ -771,6 +771,7 @@ class SnapbackSM {
return Array.from(newReplicaNodesSet)
}

// TODO: We should not do this pattern of passing a param for the sole purpose of modifying it (will revisit during refactor)
/**
* Given map(replica node => userWallets[]), retrieves clock values for every (node, userWallet) pair
* @param {Object} replicaSetNodesToUserWalletsMap map of <replica set node : wallets>
Expand Down Expand Up @@ -824,7 +825,7 @@ class SnapbackSM {
errorMsg = e
}

// If failed to get response after all attempts, add replica to `unhealthyPeers` list for reconfig
// If failed to get response after all attempts, add replica to `unhealthyPeers` set for reconfig
if (errorMsg) {
this.logError(
`[retrieveClockStatusesForUsersAcrossReplicaSet] Could not fetch clock values for wallets=${walletsOnReplica} on replica=${replica} ${errorMsg.toString()}`
Expand Down
Loading

0 comments on commit abf82a7

Please sign in to comment.