Skip to content

Commit

Permalink
[CON-284] Add tracing decorator and decorate non-queue functions (#3750)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaylor89 authored Aug 30, 2022
1 parent 4c46521 commit ed12c45
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 65 deletions.
12 changes: 11 additions & 1 deletion creator-node/src/components/replicaSet/exportComponentService.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const _ = require('lodash')
const models = require('../../models')
const { Transaction } = require('sequelize')
const DBManager = require('../../dbManager')
const { instrumentTracing, tracing } = require('../../tracer')

/**
* Exports all db data (not files) associated with walletPublicKey[] as JSON.
Expand Down Expand Up @@ -148,6 +149,7 @@ const exportComponentService = async ({

return cnodeUsersDict
} catch (e) {
tracing.recordException(e)
await transaction.rollback()

for (const cnodeUserUUID in cnodeUsersDict) {
Expand All @@ -159,6 +161,7 @@ const exportComponentService = async ({
`exportComponentService() - cnodeUserUUID:${cnodeUserUUID} - fixInconsistentUser() executed - numRowsUpdated:${numRowsUpdated}`
)
} catch (e) {
tracing.recordException(e)
logger.error(
`exportComponentService() - cnodeUserUUID:${cnodeUserUUID} - fixInconsistentUser() error - ${e.message}`
)
Expand All @@ -168,4 +171,11 @@ const exportComponentService = async ({
}
}

module.exports = exportComponentService
module.exports = instrumentTracing({
fn: exportComponentService,
options: {
attributes: {
[tracing.CODE_FILEPATH]: __filename
}
}
})
12 changes: 11 additions & 1 deletion creator-node/src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const BlacklistManager = require('./blacklistManager')
const {
issueSyncRequestsUntilSynced
} = require('./services/stateMachineManager/stateReconciliation/stateReconciliationUtils')
const { instrumentTracing, tracing } = require('./tracer')

/**
* Ensure valid cnodeUser and session exist for provided session token
Expand Down Expand Up @@ -259,7 +260,7 @@ async function ensureStorageMiddleware(req, res, next) {
* @dev TODO - move out of middlewares layer because it's not used as middleware -- just as a function some routes call
* @param ignoreWriteQuorum true if write quorum should not be enforced (don't fail the request if write quorum fails)
*/
async function issueAndWaitForSecondarySyncRequests(
async function _issueAndWaitForSecondarySyncRequests(
req,
ignoreWriteQuorum = false
) {
Expand Down Expand Up @@ -449,6 +450,15 @@ async function issueAndWaitForSecondarySyncRequests(
}
}

const issueAndWaitForSecondarySyncRequests = instrumentTracing({
fn: _issueAndWaitForSecondarySyncRequests,
options: {
attributes: {
[tracing.CODE_FILEPATH]: __filename
}
}
})

/**
* Retrieves current FQDN registered on-chain with node's owner wallet
*
Expand Down
113 changes: 63 additions & 50 deletions creator-node/src/routes/nodeSync.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,62 @@ const {
const config = require('../config')
const retry = require('async-retry')
const exportComponentService = require('../components/replicaSet/exportComponentService')
const { instrumentTracing, tracing } = require('../tracer')

const router = express.Router()

const handleExport = async (req, res) => {
const start = Date.now()

const walletPublicKeys = req.query.wallet_public_key // array
const sourceEndpoint = req.query.source_endpoint // string
const forceExport = !!req.query.force_export // boolean

const maxExportClockValueRange = config.get('maxExportClockValueRange')

// Define clock range (min and max) for export
const requestedClockRangeMin = parseInt(req.query.clock_range_min) || 0
const requestedClockRangeMax =
requestedClockRangeMin + (maxExportClockValueRange - 1)

try {
const cnodeUsersDict = await retry(
async () => {
return await exportComponentService({
walletPublicKeys,
requestedClockRangeMin,
requestedClockRangeMax,
forceExport,
logger: req.logger
})
},
{
retries: 3
}
)

req.logger.info(
`Successful export for wallets ${walletPublicKeys} to source endpoint ${
sourceEndpoint || '(not provided)'
} for clock value range [${requestedClockRangeMin},${requestedClockRangeMax}] || route duration ${
Date.now() - start
} ms`
)

return successResponse({ cnodeUsers: cnodeUsersDict })
} catch (e) {
req.logger.error(
`Error in /export for wallets ${walletPublicKeys} to source endpoint ${
sourceEndpoint || '(not provided)'
} for clock value range [${requestedClockRangeMin},${requestedClockRangeMax}] || route duration ${
Date.now() - start
} ms ||`,
e
)
return errorResponseServerError(e.message)
}
}

/**
* Exports all db data (not files) associated with walletPublicKey[] as JSON.
*
Expand All @@ -24,57 +77,17 @@ const router = express.Router()
*/
router.get(
'/export',
handleResponse(async (req, res) => {
const start = Date.now()

const walletPublicKeys = req.query.wallet_public_key // array
const sourceEndpoint = req.query.source_endpoint // string
const forceExport = !!req.query.force_export // boolean

const maxExportClockValueRange = config.get('maxExportClockValueRange')

// Define clock range (min and max) for export
const requestedClockRangeMin = parseInt(req.query.clock_range_min) || 0
const requestedClockRangeMax =
requestedClockRangeMin + (maxExportClockValueRange - 1)

try {
const cnodeUsersDict = await retry(
async () => {
return await exportComponentService({
walletPublicKeys,
requestedClockRangeMin,
requestedClockRangeMax,
forceExport,
logger: req.logger
})
},
{
retries: 3
handleResponse(
instrumentTracing({
name: '/export',
fn: handleExport,
options: {
attributes: {
[tracing.CODE_FILEPATH]: __filename
}
)

req.logger.info(
`Successful export for wallets ${walletPublicKeys} to source endpoint ${
sourceEndpoint || '(not provided)'
} for clock value range [${requestedClockRangeMin},${requestedClockRangeMax}] || route duration ${
Date.now() - start
} ms`
)

return successResponse({ cnodeUsers: cnodeUsersDict })
} catch (e) {
req.logger.error(
`Error in /export for wallets ${walletPublicKeys} to source endpoint ${
sourceEndpoint || '(not provided)'
} for clock value range [${requestedClockRangeMin},${requestedClockRangeMax}] || route duration ${
Date.now() - start
} ms ||`,
e
)
return errorResponseServerError(e.message)
}
})
}
})
)
)

/** Checks if node sync is in progress for wallet. */
Expand Down
4 changes: 4 additions & 0 deletions creator-node/src/routes/tracks.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const DBManager = require('../dbManager')
const { generateListenTimestampAndSignature } = require('../apiSigning')
const BlacklistManager = require('../blacklistManager')
const TranscodingQueue = require('../TranscodingQueue')
const { tracing } = require('../tracer')

const readFile = promisify(fs.readFile)

Expand All @@ -53,6 +54,7 @@ router.post(
ensureStorageMiddleware,
handleTrackContentUpload,
handleResponse(async (req, res) => {
tracing.setSpanAttribute('requestID', req.logContext.requestID)
if (req.fileSizeError || req.fileFilterError) {
removeTrackFolder({ logContext: req.logContext }, req.fileDir)
return errorResponseBadRequest(req.fileSizeError || req.fileFilterError)
Expand All @@ -67,6 +69,7 @@ router.post(
})

if (selfTranscode) {
tracing.info('adding upload track task')
await AsyncProcessingQueue.addTrackContentUploadTask({
logContext: req.logContext,
req: {
Expand All @@ -77,6 +80,7 @@ router.post(
}
})
} else {
tracing.info('adding transcode handoff task')
await AsyncProcessingQueue.addTranscodeHandOffTask({
logContext: req.logContext,
req: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
HEALTHY_SERVICES_TTL_SEC
} = require('../stateMachineConstants')
const SyncRequestDeDuplicator = require('./SyncRequestDeDuplicator')
const { instrumentTracing, tracing } = require('../../../tracer')

const HEALTHY_NODES_CACHE_KEY = 'stateMachineHealthyContentNodes'

Expand Down Expand Up @@ -97,7 +98,7 @@ const getNewOrExistingSyncReq = ({
* Issues syncRequest for user against secondary, and polls for replication up to primary
* If secondary fails to sync within specified timeoutMs, will error
*/
const issueSyncRequestsUntilSynced = async (
const _issueSyncRequestsUntilSynced = async (
primaryUrl,
secondaryUrl,
wallet,
Expand Down Expand Up @@ -163,6 +164,15 @@ const issueSyncRequestsUntilSynced = async (
)
}

const issueSyncRequestsUntilSynced = instrumentTracing({
fn: _issueSyncRequestsUntilSynced,
options: {
attributes: {
[tracing.CODE_FILEPATH]: __filename
}
}
})

const getCachedHealthyNodes = async () => {
const healthyNodes = await redisClient.lrange(HEALTHY_NODES_CACHE_KEY, 0, -1)
return healthyNodes
Expand Down
18 changes: 17 additions & 1 deletion creator-node/src/services/sync/primarySyncFromSecondary.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const SyncHistoryAggregator = require('../../snapbackSM/syncHistoryAggregator')
const initAudiusLibs = require('../initAudiusLibs')
const DecisionTree = require('../../utils/decisionTree')
const UserSyncFailureCountService = require('./UserSyncFailureCountService')
const { instrumentTracing, tracing } = require('../../tracer')
const { fetchExportFromNode } = require('./syncUtil')
const {
FILTER_OUT_ALREADY_PRESENT_DB_ENTRIES_CONSTS
Expand All @@ -33,7 +34,7 @@ const {
* Export data for user from secondary and save locally, until complete
* Should never error, instead return errorObj, else null
*/
module.exports = async function primarySyncFromSecondary({
async function _primarySyncFromSecondary({
wallet,
secondary,
logContext = DEFAULT_LOG_CONTEXT
Expand Down Expand Up @@ -61,9 +62,11 @@ module.exports = async function primarySyncFromSecondary({

let libs
try {
tracing.info('init AudiusLibs')
libs = await initAudiusLibs({ logger })
decisionTree.recordStage({ name: 'initAudiusLibs() success', log: true })
} catch (e) {
tracing.recordException(e)
decisionTree.recordStage({
name: 'initAudiusLibs() Error',
data: { errorMsg: e.message }
Expand Down Expand Up @@ -154,6 +157,7 @@ module.exports = async function primarySyncFromSecondary({
log: true
})
} catch (e) {
tracing.recordException(e)
decisionTree.recordStage({
name: 'saveFilesToDisk() Error',
data: { errorMsg: e.message }
Expand Down Expand Up @@ -191,6 +195,7 @@ module.exports = async function primarySyncFromSecondary({

decisionTree.recordStage({ name: 'Complete Success' })
} catch (e) {
tracing.recordException(e)
await SyncHistoryAggregator.recordSyncFail(wallet)
return e
} finally {
Expand All @@ -200,6 +205,15 @@ module.exports = async function primarySyncFromSecondary({
}
}

const primarySyncFromSecondary = instrumentTracing({
fn: _primarySyncFromSecondary,
options: {
attributes: {
[tracing.CODE_FILEPATH]: __filename
}
}
})

/**
* Fetch data for all files & save to disk
*
Expand Down Expand Up @@ -715,3 +729,5 @@ async function filterOutAlreadyPresentDBEntries({
}
}
}

module.exports = primarySyncFromSecondary
Loading

0 comments on commit ed12c45

Please sign in to comment.