Skip to content

Commit

Permalink
[CON-234] Enable write quorum (#3408)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored and dmanjunath committed Jul 15, 2022
1 parent 74e7679 commit 6199add
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 37 deletions.
6 changes: 3 additions & 3 deletions creator-node/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,13 @@ const config = convict({
default: 5000 // 5000ms = 5s (prod default)
},
enforceWriteQuorum: {
doc: 'Boolean flag indicating whether or not primary should reject write on 2/3 replication across replica set',
doc: 'Boolean flag indicating whether or not primary should reject write until 2/3 replication across replica set',
format: Boolean,
env: 'enforceWriteQuorum',
default: false
default: true
},
manualSyncsDisabled: {
doc: 'Disables issuing of manual syncs in order to test SnapbackSM Recurring Sync logic.',
doc: 'Disables issuing of manual syncs in order to test state machine Recurring Sync logic.',
format: 'BooleanCustom',
env: 'manualSyncsDisabled',
default: false
Expand Down
124 changes: 107 additions & 17 deletions creator-node/src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,33 +267,84 @@ async function ensureStorageMiddleware(req, res, next) {
}

/**
* Issue SyncRequests to both secondaries, and wait for at least one to sync before returning
* @dev TODO - move out of middlewares layer
* Issue sync requests to both secondaries, and wait for at least one to sync before returning.
* If write quorum is enforced (determined by header + env var + ignoreWriteQuorum param) and no secondary
* completes a sync in time, then the function throws an error.
*
* Order of precedence for determining if write quorum is enforced:
* - If ignoreWriteQuorum param is passed, don't enforce write quorum regardless of header or env var
* - If client passes Enforce-Write-Quorum header as false, don't enforce write quorum regardless of the enforceWriteQuorum env var
* - If client passes Enforce-Write-Quorum header as true, enforce write quorum regardless of the enforceWriteQuorum env var
* - If client doesn't pass Enforce-Write-Quorum header, enforce write quorum if enforceWriteQuorum env var is true
*
* @dev TODO - move out of middlewares layer because it's not used as middleware -- just as a function some routes call
* @param ignoreWriteQuorum true if write quorum should not be enforced (don't fail the request if write quorum fails)
*/
async function issueAndWaitForSecondarySyncRequests(req) {
async function issueAndWaitForSecondarySyncRequests(
req,
ignoreWriteQuorum = false
) {
const route = req.url.split('?')[0]
const serviceRegistry = req.app.get('serviceRegistry')
const { manualSyncQueue } = serviceRegistry
const { manualSyncQueue, prometheusRegistry } = serviceRegistry

const histogram = prometheusRegistry.getMetric(
prometheusRegistry.metricNames.WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM
)
const endHistogramTimer = histogram.startTimer()

// Parse request headers
const pollingDurationMs =
req.header('Polling-Duration-ms') ||
config.get('issueAndWaitForSecondarySyncRequestsPollingDurationMs')
const enforceWriteQuorum =
req.header('Enforce-Write-Quorum') || config.get('enforceWriteQuorum')
const enforceWriteQuorumHeader = req.header('Enforce-Write-Quorum')
const writeQuorumHeaderTrue =
enforceWriteQuorumHeader === true || enforceWriteQuorumHeader === 'true'
const writeQuorumHeaderFalse =
enforceWriteQuorumHeader === false || enforceWriteQuorumHeader === 'false'
const writeQuorumHeaderEmpty =
!writeQuorumHeaderFalse || enforceWriteQuorumHeader === 'null'
let enforceWriteQuorum = false

if (!ignoreWriteQuorum) {
if (writeQuorumHeaderTrue) enforceWriteQuorum = true
// writeQuorumHeaderEmpty is for undefined/null/empty values where it's not explicitly false
else if (writeQuorumHeaderEmpty && config.get('enforceWriteQuorum')) {
enforceWriteQuorum = true
}
}

// This sync request uses the manual sync queue, so we can't proceed if manual syncs are disabled
if (config.get('manualSyncsDisabled')) {
req.logger.info(
`issueAndWaitForSecondarySyncRequests - Cannot proceed due to manualSyncsDisabled ${config.get(
'manualSyncsDisabled'
)})`
)
endHistogramTimer({
enforceWriteQuorum: String(enforceWriteQuorum),
ignoreWriteQuorum: String(ignoreWriteQuorum),
route,
result: 'failed_short_circuit'
})
const errorMsg = `issueAndWaitForSecondarySyncRequests Error - Cannot proceed due to manualSyncsDisabled ${config.get(
'manualSyncsDisabled'
)})`
req.logger.error(errorMsg)
if (enforceWriteQuorum) {
throw new Error(errorMsg)
}
return
}

// Wallet is required and should've been set in auth middleware
if (!req.session || !req.session.wallet) {
req.logger.error(
`issueAndWaitForSecondarySyncRequests Error - req.session.wallet missing`
)
endHistogramTimer({
enforceWriteQuorum: String(enforceWriteQuorum),
ignoreWriteQuorum: String(ignoreWriteQuorum),
route,
result: 'failed_short_circuit'
})
const errorMsg = `issueAndWaitForSecondarySyncRequests Error - req.session.wallet missing`
req.logger.error(errorMsg)
if (enforceWriteQuorum) {
throw new Error(errorMsg)
}
return
}
const wallet = req.session.wallet
Expand All @@ -304,9 +355,18 @@ async function issueAndWaitForSecondarySyncRequests(req) {
!req.session.creatorNodeEndpoints ||
!Array.isArray(req.session.creatorNodeEndpoints)
) {
req.logger.info(
'issueAndWaitForSecondarySyncRequests - Cannot process sync op - this node is not primary or invalid creatorNodeEndpoints.'
)
endHistogramTimer({
enforceWriteQuorum: String(enforceWriteQuorum),
ignoreWriteQuorum: String(ignoreWriteQuorum),
route,
result: 'failed_short_circuit'
})
const errorMsg =
'issueAndWaitForSecondarySyncRequests Error - Cannot process sync op - this node is not primary or invalid creatorNodeEndpoints'
req.logger.error(errorMsg)
if (enforceWriteQuorum) {
throw new Error(errorMsg)
}
return
}

Expand All @@ -316,6 +376,12 @@ async function issueAndWaitForSecondarySyncRequests(req) {
)

if (primary !== config.get('creatorNodeEndpoint')) {
endHistogramTimer({
enforceWriteQuorum: String(enforceWriteQuorum),
ignoreWriteQuorum: String(ignoreWriteQuorum),
route,
result: 'failed_short_circuit'
})
throw new Error(
`issueAndWaitForSecondarySyncRequests Error - Cannot process sync op since this node is not the primary for user ${wallet}. Instead found ${primary}.`
)
Expand All @@ -326,6 +392,12 @@ async function issueAndWaitForSecondarySyncRequests(req) {
where: { walletPublicKey: wallet }
})
if (!cnodeUser || !cnodeUser.clock) {
endHistogramTimer({
enforceWriteQuorum: String(enforceWriteQuorum),
ignoreWriteQuorum: String(ignoreWriteQuorum),
route,
result: 'failed_short_circuit'
})
throw new Error(
`issueAndWaitForSecondarySyncRequests Error - Failed to retrieve current clock value for user ${wallet} on current node.`
)
Expand Down Expand Up @@ -353,7 +425,19 @@ async function issueAndWaitForSecondarySyncRequests(req) {
Date.now() - replicationStart
}ms`
)
endHistogramTimer({
enforceWriteQuorum: String(enforceWriteQuorum),
ignoreWriteQuorum: String(ignoreWriteQuorum),
route,
result: 'succeeded'
})
} catch (e) {
endHistogramTimer({
enforceWriteQuorum: String(enforceWriteQuorum),
ignoreWriteQuorum: String(ignoreWriteQuorum),
route,
result: 'failed_sync'
})
const errorMsg = `issueAndWaitForSecondarySyncRequests Error - Failed to reach 2/3 write quorum for user ${wallet} in ${
Date.now() - replicationStart
}ms`
Expand All @@ -368,6 +452,12 @@ async function issueAndWaitForSecondarySyncRequests(req) {

// If any error during replication, error if quorum is enforced
} catch (e) {
endHistogramTimer({
enforceWriteQuorum: String(enforceWriteQuorum),
ignoreWriteQuorum: String(ignoreWriteQuorum),
route,
result: 'failed_uncaught_error'
})
req.logger.error(
`issueAndWaitForSecondarySyncRequests Error - wallet ${wallet} ||`,
e.message
Expand Down
7 changes: 4 additions & 3 deletions creator-node/src/routes/audiusUsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ module.exports = function (app) {
return errorResponseServerError(`Could not save to db: ${e}`)
}

// This call is not await-ed to avoid delaying or erroring
issueAndWaitForSecondarySyncRequests(req)
// Await 2/3 write quorum (replicating data to at least 1 secondary)
await issueAndWaitForSecondarySyncRequests(req)

return successResponse({
metadataMultihash: multihash,
Expand Down Expand Up @@ -176,7 +176,8 @@ module.exports = function (app) {

await transaction.commit()

await issueAndWaitForSecondarySyncRequests(req)
// Discovery only indexes metadata and not files, so we eagerly replicate data but don't await it
issueAndWaitForSecondarySyncRequests(req, true)

return successResponse()
} catch (e) {
Expand Down
4 changes: 2 additions & 2 deletions creator-node/src/routes/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,8 @@ module.exports = function (app) {
return errorResponseServerError(e)
}

// Must be awaitted and cannot be try-catched, ensuring that error from inside this rejects request
await issueAndWaitForSecondarySyncRequests(req)
// Discovery only indexes metadata and not files, so we eagerly replicate data but don't await it
issueAndWaitForSecondarySyncRequests(req, true)

return successResponse({ dirCID })
})
Expand Down
7 changes: 4 additions & 3 deletions creator-node/src/routes/playlists.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ module.exports = function (app) {
)
}

// This call is not await-ed to avoid delaying or erroring
issueAndWaitForSecondarySyncRequests(req)
// Await 2/3 write quorum (replicating data to at least 1 secondary)
await issueAndWaitForSecondarySyncRequests(req)

return successResponse({
metadataMultihash: multihash,
Expand Down Expand Up @@ -171,7 +171,8 @@ module.exports = function (app) {

await transaction.commit()

await issueAndWaitForSecondarySyncRequests(req)
// Discovery only indexes metadata and not files, so we eagerly replicate data but don't await it
issueAndWaitForSecondarySyncRequests(req, true)

return successResponse()
} catch (e) {
Expand Down
7 changes: 4 additions & 3 deletions creator-node/src/routes/tracks.js
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ module.exports = function (app) {
return errorResponseServerError(`Could not save to db db: ${e}`)
}

// This call is not await-ed to avoid delaying or erroring
issueAndWaitForSecondarySyncRequests(req)
// Await 2/3 write quorum (replicating data to at least 1 secondary)
await issueAndWaitForSecondarySyncRequests(req)

return successResponse({
metadataMultihash: multihash,
Expand Down Expand Up @@ -576,7 +576,8 @@ module.exports = function (app) {

await transaction.commit()

await issueAndWaitForSecondarySyncRequests(req)
// Discovery only indexes metadata and not files, so we eagerly replicate data but don't await it
issueAndWaitForSecondarySyncRequests(req, true)

metricEndTimerFn({ code: 200 })
return successResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ let MetricNames = {
'route_post_tracks_duration_seconds',
ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM:
'issue_sync_request_monitoring_duration_seconds',
FIND_SYNC_REQUEST_COUNTS_GAUGE: 'find_sync_request_counts'
FIND_SYNC_REQUEST_COUNTS_GAUGE: 'find_sync_request_counts',
WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM: 'write_quorum_duration_seconds'
}
// Add a histogram for each job in the state machine queues.
// Some have custom labels below, and all of them use the label: uncaughtError=true/false
Expand Down Expand Up @@ -71,7 +72,7 @@ const MetricLabels = Object.freeze({
reconfigType: [
'one_secondary', // Only one secondary was replaced in the user's replica set
'multiple_secondaries', // Both secondaries were replaced in the user's replica set
'primary_and_or_secondaries', // A secondary gets promoted to new primary and one or both secondaries get replaced with new random nodes,
'primary_and_or_secondaries', // A secondary gets promoted to new primary and one or both secondaries get replaced with new random nodes
'null' // No change was made to the user's replica set because the job short-circuited before selecting or was unable to select new node(s)
]
},
Expand All @@ -88,6 +89,31 @@ const MetricLabels = Object.freeze({
'sync_request_already_enqueued', // Sync was found but a duplicate request has already been enqueued so no need to enqueue another
'new_sync_request_unable_to_enqueue' // Sync was found but something prevented a new request from being created
]
},
[MetricNames.WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM]: {
// Whether or not write quorum is enabled/enforced
enforceWriteQuorum: ['false', 'true'],
// Whether or not write quorum is ignored for this specific route (even if it's enabled in general).
// If it's ignored, it will still attempt replication but not fail the request if replication fails
ignoreWriteQuorum: ['false', 'true'],
// The route that triggered write quorum
route: [
// Routes that use write quorum but don't enforce it (ignoreWriteQuorum should be true):
'/image_upload',
'/audius_users',
'/playlists',
'/tracks',
// Routes that strictly enforce write quorum (ignoreWriteQuorum should be false)
'/audius_users/metadata',
'/playlists/metadata',
'/tracks/metadata'
],
result: [
'succeeded', // Data was replicated to one or more secondaries
'failed_short_circuit', // Failed before attempting to sync because some basic condition wasn't met (node not primary, missing wallet, or manual syncs disabled)
'failed_uncaught_error', // Failed due to some uncaught exception. This should never happen
'failed_sync' // Failed to reach 2/3 quorum because no syncs were successful
]
}
})
const MetricLabelNames = Object.freeze(
Expand Down Expand Up @@ -168,6 +194,22 @@ const Metrics = Object.freeze({
help: "Counts for each find-sync-requests job's result when looking for syncs that should be requested from a primary to a secondary",
labelNames: MetricLabelNames[MetricNames.FIND_SYNC_REQUEST_COUNTS_GAUGE]
}
},
[MetricNames.WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM]: {
metricType: MetricTypes.HISTOGRAM,
metricConfig: {
name: MetricNames.WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM,
help: 'Seconds spent attempting to replicate data to a secondary node for write quorum',
labelNames:
MetricLabelNames[MetricNames.WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM],
// 5 buckets in the range of 1 second to max seconds before timing out write quorum
buckets: exponentialBucketsRange(
1,
config.get('issueAndWaitForSecondarySyncRequestsPollingDurationMs') /
1000,
5
)
}
}
})

Expand Down
6 changes: 6 additions & 0 deletions creator-node/test/audiusUsers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ describe('Test AudiusUsers', function () {
.post('/audius_users/metadata')
.set('X-Session-ID', session.sessionToken)
.set('User-Id', session.userId)
.set('Enforce-Write-Quorum', false)
.send({ metadata })
.expect(200)

Expand Down Expand Up @@ -96,6 +97,7 @@ describe('Test AudiusUsers', function () {
.post('/audius_users/metadata')
.set('X-Session-ID', session.sessionToken)
.set('User-Id', session.userId)
.set('Enforce-Write-Quorum', false)
.send({ metadata })
.expect(200)

Expand Down Expand Up @@ -133,6 +135,7 @@ describe('Test AudiusUsers', function () {
.post('/audius_users/metadata')
.set('X-Session-ID', session.sessionToken)
.set('User-Id', session.userId)
.set('Enforce-Write-Quorum', false)
.send({ metadata })
.expect(200)

Expand Down Expand Up @@ -164,6 +167,7 @@ describe('Test AudiusUsers', function () {
.post('/audius_users/metadata')
.set('X-Session-ID', session.sessionToken)
.set('User-Id', session.userId)
.set('Enforce-Write-Quorum', false)
.send({ metadata: metadata1 })
.expect(200)

Expand All @@ -179,6 +183,7 @@ describe('Test AudiusUsers', function () {
.post('/audius_users/metadata')
.set('X-Session-ID', session.sessionToken)
.set('User-Id', session.userId)
.set('Enforce-Write-Quorum', false)
.send({ metadata: metadata2 })
.expect(200)

Expand All @@ -203,6 +208,7 @@ describe('Test AudiusUsers', function () {
.post('/audius_users/metadata')
.set('X-Session-ID', session.sessionToken)
.set('User-Id', session.userId)
.set('Enforce-Write-Quorum', false)
.send({ metadata })
.expect(200)

Expand Down
Loading

0 comments on commit 6199add

Please sign in to comment.