diff --git a/creator-node/src/components/replicaSet/exportComponentService.js b/creator-node/src/components/replicaSet/exportComponentService.js index ac19742ecfc..ef94c8042b7 100644 --- a/creator-node/src/components/replicaSet/exportComponentService.js +++ b/creator-node/src/components/replicaSet/exportComponentService.js @@ -3,6 +3,7 @@ const _ = require('lodash') const models = require('../../models') const { Transaction } = require('sequelize') const DBManager = require('../../dbManager') +const { instrumentTracing, tracing } = require('../../tracer') /** * Exports all db data (not files) associated with walletPublicKey[] as JSON. @@ -148,6 +149,7 @@ const exportComponentService = async ({ return cnodeUsersDict } catch (e) { + tracing.recordException(e) await transaction.rollback() for (const cnodeUserUUID in cnodeUsersDict) { @@ -159,6 +161,7 @@ const exportComponentService = async ({ `exportComponentService() - cnodeUserUUID:${cnodeUserUUID} - fixInconsistentUser() executed - numRowsUpdated:${numRowsUpdated}` ) } catch (e) { + tracing.recordException(e) logger.error( `exportComponentService() - cnodeUserUUID:${cnodeUserUUID} - fixInconsistentUser() error - ${e.message}` ) @@ -168,4 +171,11 @@ const exportComponentService = async ({ } } -module.exports = exportComponentService +module.exports = instrumentTracing({ + fn: exportComponentService, + options: { + attributes: { + [tracing.CODE_FILEPATH]: __filename + } + } +}) diff --git a/creator-node/src/middlewares.js b/creator-node/src/middlewares.js index d923696580c..56c947031ed 100644 --- a/creator-node/src/middlewares.js +++ b/creator-node/src/middlewares.js @@ -18,6 +18,7 @@ const BlacklistManager = require('./blacklistManager') const { issueSyncRequestsUntilSynced } = require('./services/stateMachineManager/stateReconciliation/stateReconciliationUtils') +const { instrumentTracing, tracing } = require('./tracer') /** * Ensure valid cnodeUser and session exist for provided session token @@ -259,7 +260,7 @@ async function ensureStorageMiddleware(req, res, next) { * @dev TODO - move out of middlewares layer because it's not used as middleware -- just as a function some routes call * @param ignoreWriteQuorum true if write quorum should not be enforced (don't fail the request if write quorum fails) */ -async function issueAndWaitForSecondarySyncRequests( +async function _issueAndWaitForSecondarySyncRequests( req, ignoreWriteQuorum = false ) { @@ -449,6 +450,15 @@ async function issueAndWaitForSecondarySyncRequests( } } +const issueAndWaitForSecondarySyncRequests = instrumentTracing({ + fn: _issueAndWaitForSecondarySyncRequests, + options: { + attributes: { + [tracing.CODE_FILEPATH]: __filename + } + } +}) + /** * Retrieves current FQDN registered on-chain with node's owner wallet * diff --git a/creator-node/src/routes/nodeSync.js b/creator-node/src/routes/nodeSync.js index ffb32628667..e4d520a3394 100644 --- a/creator-node/src/routes/nodeSync.js +++ b/creator-node/src/routes/nodeSync.js @@ -10,9 +10,62 @@ const { const config = require('../config') const retry = require('async-retry') const exportComponentService = require('../components/replicaSet/exportComponentService') +const { instrumentTracing, tracing } = require('../tracer') const router = express.Router() +const handleExport = async (req, res) => { + const start = Date.now() + + const walletPublicKeys = req.query.wallet_public_key // array + const sourceEndpoint = req.query.source_endpoint // string + const forceExport = !!req.query.force_export // boolean + + const maxExportClockValueRange = config.get('maxExportClockValueRange') + + // Define clock range (min and max) for export + const requestedClockRangeMin = parseInt(req.query.clock_range_min) || 0 + const requestedClockRangeMax = + requestedClockRangeMin + (maxExportClockValueRange - 1) + + try { + const cnodeUsersDict = await retry( + async () => { + return await exportComponentService({ + walletPublicKeys, + requestedClockRangeMin, + requestedClockRangeMax, + forceExport, + logger: req.logger + }) + }, + { + retries: 3 + } + ) + + req.logger.info( + `Successful export for wallets ${walletPublicKeys} to source endpoint ${ + sourceEndpoint || '(not provided)' + } for clock value range [${requestedClockRangeMin},${requestedClockRangeMax}] || route duration ${ + Date.now() - start + } ms` + ) + + return successResponse({ cnodeUsers: cnodeUsersDict }) + } catch (e) { + req.logger.error( + `Error in /export for wallets ${walletPublicKeys} to source endpoint ${ + sourceEndpoint || '(not provided)' + } for clock value range [${requestedClockRangeMin},${requestedClockRangeMax}] || route duration ${ + Date.now() - start + } ms ||`, + e + ) + return errorResponseServerError(e.message) + } +} + /** * Exports all db data (not files) associated with walletPublicKey[] as JSON. * @@ -24,57 +77,17 @@ const router = express.Router() */ router.get( '/export', - handleResponse(async (req, res) => { - const start = Date.now() - - const walletPublicKeys = req.query.wallet_public_key // array - const sourceEndpoint = req.query.source_endpoint // string - const forceExport = !!req.query.force_export // boolean - - const maxExportClockValueRange = config.get('maxExportClockValueRange') - - // Define clock range (min and max) for export - const requestedClockRangeMin = parseInt(req.query.clock_range_min) || 0 - const requestedClockRangeMax = - requestedClockRangeMin + (maxExportClockValueRange - 1) - - try { - const cnodeUsersDict = await retry( - async () => { - return await exportComponentService({ - walletPublicKeys, - requestedClockRangeMin, - requestedClockRangeMax, - forceExport, - logger: req.logger - }) - }, - { - retries: 3 + handleResponse( + instrumentTracing({ + name: '/export', + fn: handleExport, + options: { + attributes: { + [tracing.CODE_FILEPATH]: __filename } - ) - - req.logger.info( - `Successful export for wallets ${walletPublicKeys} to source endpoint ${ - sourceEndpoint || '(not provided)' - } for clock value range [${requestedClockRangeMin},${requestedClockRangeMax}] || route duration ${ - Date.now() - start - } ms` - ) - - return successResponse({ cnodeUsers: cnodeUsersDict }) - } catch (e) { - req.logger.error( - `Error in /export for wallets ${walletPublicKeys} to source endpoint ${ - sourceEndpoint || '(not provided)' - } for clock value range [${requestedClockRangeMin},${requestedClockRangeMax}] || route duration ${ - Date.now() - start - } ms ||`, - e - ) - return errorResponseServerError(e.message) - } - }) + } + }) + ) ) /** Checks if node sync is in progress for wallet. */ diff --git a/creator-node/src/routes/tracks.js b/creator-node/src/routes/tracks.js index d41dd114e84..8603b44ac62 100644 --- a/creator-node/src/routes/tracks.js +++ b/creator-node/src/routes/tracks.js @@ -37,6 +37,7 @@ const DBManager = require('../dbManager') const { generateListenTimestampAndSignature } = require('../apiSigning') const BlacklistManager = require('../blacklistManager') const TranscodingQueue = require('../TranscodingQueue') +const { tracing } = require('../tracer') const readFile = promisify(fs.readFile) @@ -53,6 +54,7 @@ router.post( ensureStorageMiddleware, handleTrackContentUpload, handleResponse(async (req, res) => { + tracing.setSpanAttribute('requestID', req.logContext.requestID) if (req.fileSizeError || req.fileFilterError) { removeTrackFolder({ logContext: req.logContext }, req.fileDir) return errorResponseBadRequest(req.fileSizeError || req.fileFilterError) @@ -67,6 +69,7 @@ router.post( }) if (selfTranscode) { + tracing.info('adding upload track task') await AsyncProcessingQueue.addTrackContentUploadTask({ logContext: req.logContext, req: { @@ -77,6 +80,7 @@ router.post( } }) } else { + tracing.info('adding transcode handoff task') await AsyncProcessingQueue.addTranscodeHandOffTask({ logContext: req.logContext, req: { diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js b/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js index d6e68d6a537..0245109d751 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js @@ -10,6 +10,7 @@ const { HEALTHY_SERVICES_TTL_SEC } = require('../stateMachineConstants') const SyncRequestDeDuplicator = require('./SyncRequestDeDuplicator') +const { instrumentTracing, tracing } = require('../../../tracer') const HEALTHY_NODES_CACHE_KEY = 'stateMachineHealthyContentNodes' @@ -97,7 +98,7 @@ const getNewOrExistingSyncReq = ({ * Issues syncRequest for user against secondary, and polls for replication up to primary * If secondary fails to sync within specified timeoutMs, will error */ -const issueSyncRequestsUntilSynced = async ( +const _issueSyncRequestsUntilSynced = async ( primaryUrl, secondaryUrl, wallet, @@ -163,6 +164,15 @@ const issueSyncRequestsUntilSynced = async ( ) } +const issueSyncRequestsUntilSynced = instrumentTracing({ + fn: _issueSyncRequestsUntilSynced, + options: { + attributes: { + [tracing.CODE_FILEPATH]: __filename + } + } +}) + const getCachedHealthyNodes = async () => { const healthyNodes = await redisClient.lrange(HEALTHY_NODES_CACHE_KEY, 0, -1) return healthyNodes diff --git a/creator-node/src/services/sync/primarySyncFromSecondary.js b/creator-node/src/services/sync/primarySyncFromSecondary.js index e3992232e8a..9948d8c4975 100644 --- a/creator-node/src/services/sync/primarySyncFromSecondary.js +++ b/creator-node/src/services/sync/primarySyncFromSecondary.js @@ -12,6 +12,7 @@ const SyncHistoryAggregator = require('../../snapbackSM/syncHistoryAggregator') const initAudiusLibs = require('../initAudiusLibs') const DecisionTree = require('../../utils/decisionTree') const UserSyncFailureCountService = require('./UserSyncFailureCountService') +const { instrumentTracing, tracing } = require('../../tracer') const { fetchExportFromNode } = require('./syncUtil') const { FILTER_OUT_ALREADY_PRESENT_DB_ENTRIES_CONSTS @@ -33,7 +34,7 @@ const { * Export data for user from secondary and save locally, until complete * Should never error, instead return errorObj, else null */ -module.exports = async function primarySyncFromSecondary({ +async function _primarySyncFromSecondary({ wallet, secondary, logContext = DEFAULT_LOG_CONTEXT @@ -61,9 +62,11 @@ module.exports = async function primarySyncFromSecondary({ let libs try { + tracing.info('init AudiusLibs') libs = await initAudiusLibs({ logger }) decisionTree.recordStage({ name: 'initAudiusLibs() success', log: true }) } catch (e) { + tracing.recordException(e) decisionTree.recordStage({ name: 'initAudiusLibs() Error', data: { errorMsg: e.message } @@ -154,6 +157,7 @@ module.exports = async function primarySyncFromSecondary({ log: true }) } catch (e) { + tracing.recordException(e) decisionTree.recordStage({ name: 'saveFilesToDisk() Error', data: { errorMsg: e.message } @@ -191,6 +195,7 @@ module.exports = async function primarySyncFromSecondary({ decisionTree.recordStage({ name: 'Complete Success' }) } catch (e) { + tracing.recordException(e) await SyncHistoryAggregator.recordSyncFail(wallet) return e } finally { @@ -200,6 +205,15 @@ module.exports = async function primarySyncFromSecondary({ } } +const primarySyncFromSecondary = instrumentTracing({ + fn: _primarySyncFromSecondary, + options: { + attributes: { + [tracing.CODE_FILEPATH]: __filename + } + } +}) + /** * Fetch data for all files & save to disk * @@ -715,3 +729,5 @@ async function filterOutAlreadyPresentDBEntries({ } } } + +module.exports = primarySyncFromSecondary diff --git a/creator-node/src/services/sync/secondarySyncFromPrimary.js b/creator-node/src/services/sync/secondarySyncFromPrimary.js index fb4df39b086..2456f0d46d3 100644 --- a/creator-node/src/services/sync/secondarySyncFromPrimary.js +++ b/creator-node/src/services/sync/secondarySyncFromPrimary.js @@ -12,6 +12,7 @@ const SyncHistoryAggregator = require('../../snapbackSM/syncHistoryAggregator') const DBManager = require('../../dbManager') const UserSyncFailureCountService = require('./UserSyncFailureCountService') const { shouldForceResync } = require('./secondarySyncFromPrimaryUtils') +const { instrumentTracing, tracing } = require('../../tracer') const { fetchExportFromNode } = require('./syncUtil') const handleSyncFromPrimary = async ({ @@ -47,6 +48,7 @@ const handleSyncFromPrimary = async ({ redis.WalletWriteLock.VALID_ACQUIRERS.SecondarySyncFromPrimary ) } catch (e) { + tracing.recordException(e) return { error: new Error( `Cannot change state of wallet ${wallet}. Node sync currently in progress.` @@ -211,6 +213,7 @@ const handleSyncFromPrimary = async ({ userReplicaSet.secondary2 ].filter((url) => url !== myCnodeEndpoint) } catch (e) { + tracing.recordException(e) logger.error( `Couldn't filter out own endpoint from user's replica set to use use as cnode gateways in saveFileForMultihashToFS - ${e.message}` ) @@ -604,6 +607,7 @@ const handleSyncFromPrimary = async ({ : returnValue } } catch (e) { + tracing.recordException(e) await SyncHistoryAggregator.recordSyncFail(wallet) logger.error( `Sync complete for wallet: ${wallet}. Status: Error, message: ${ @@ -623,6 +627,7 @@ const handleSyncFromPrimary = async ({ try { await redis.WalletWriteLock.release(wallet) } catch (e) { + tracing.recordException(e) logger.warn( `Failure to release write lock for ${wallet} with error ${e.message}` ) @@ -642,7 +647,7 @@ const handleSyncFromPrimary = async ({ * Secondaries have no knowledge of the current data state on primary, they simply replicate * what they receive in each export. */ -async function secondarySyncFromPrimary({ +async function _secondarySyncFromPrimary({ serviceRegistry, wallet, creatorNodeEndpoint, @@ -674,6 +679,8 @@ async function secondarySyncFromPrimary({ logContext }) metricEndTimerFn({ result, mode }) + tracing.setSpanAttribute('result', result) + tracing.setSpanAttribute('mode', mode) if (error) { throw new Error(error) @@ -682,4 +689,13 @@ async function secondarySyncFromPrimary({ return { result } } +const secondarySyncFromPrimary = instrumentTracing({ + fn: _secondarySyncFromPrimary, + options: { + attributes: { + [tracing.CODE_FILEPATH]: __filename + } + } +}) + module.exports = secondarySyncFromPrimary diff --git a/creator-node/src/tracer.ts b/creator-node/src/tracer.ts index 88e48039833..83188fa468e 100644 --- a/creator-node/src/tracer.ts +++ b/creator-node/src/tracer.ts @@ -1,6 +1,12 @@ -import type { Span, SpanContext, AttributeValue } from '@opentelemetry/api' -import { trace, context, SpanStatusCode } from '@opentelemetry/api' +import type { + Span, + SpanContext, + SpanOptions, + AttributeValue, + Tracer +} from '@opentelemetry/api' +import { trace, context, SpanStatusCode } from '@opentelemetry/api' import { registerInstrumentations } from '@opentelemetry/instrumentation' import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node' import { Resource } from '@opentelemetry/resources' @@ -12,13 +18,25 @@ import { RedisInstrumentation } from '@opentelemetry/instrumentation-redis' import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express' import { HttpInstrumentation } from '@opentelemetry/instrumentation-http' import { BunyanInstrumentation } from '@opentelemetry/instrumentation-bunyan' -import { PgInstrumentation } from '@opentelemetry/instrumentation-pg' + +import config from './config' const SERVICE_NAME = 'content-node' +const SPID = config.get('spID') +const ENDPOINT = config.get('creatorNodeEndpoint') /** - * Initializes a tracer for content node as well as registers import instrumentions + * Initializes a tracer for content node as well as registers instrumentions * for packages that are frequently used + * WARNING: this function should be run before any other imports + * i.e. + * ``` + * import { setupTracing } from './tracer' + * setupTracing() + * // all other imports + * import { foo } from 'bar' + * import { isEven } from 'pg' + * ``` */ export const setupTracing = () => { /** @@ -32,6 +50,15 @@ export const setupTracing = () => { [ResourceAttributesSC.SERVICE_NAME]: SERVICE_NAME }) }) + + /** + * prebuilt tracing instrumentations are registered + * in order to add trace information and context to + * commonly used library + * + * e.g. `ExpressInstrumention()` monkey-patches express routes and middlewares + * with spans + */ registerInstrumentations({ tracerProvider: provider, instrumentations: [ @@ -45,17 +72,15 @@ export const setupTracing = () => { // Adds spans to redis operations new RedisInstrumentation(), - // Adds spans to postgres transactions - new PgInstrumentation(), - // Injects traceid, spanid, and SpanContext into bunyan logs new BunyanInstrumentation({ // Adds a hook to logs that injects more span info // and the service name into logs logHook: (span, record) => { - record['resource.span'] = span record['resource.service.name'] = provider.resource.attributes['service.name'] + record['resource.service.spid'] = SPID + record['resource.service.endpoint'] = ENDPOINT } }) ] @@ -65,6 +90,85 @@ export const setupTracing = () => { provider.register() } +/** + * Higher-order function that adds opentelemetry tracing to a function. + * This wrapper works for both sync and async functions + * + * @param {string?} param.name optional name to give to the span, defaults to the function name + * @param {TFunction} param.fn the generic function to instrument + * @param {SpanOptions} param.options objects to pass into the span + * @returns the instrumented function + * @throws rethrows any errors from the original fn + * + * Usage of this would look like + * ``` + * const someFunction = instrumentTracing({ fn: _someFunction }) + * const result = someFunction(args)) + * // or + * const result = await someFunction(args) + * ``` + */ +export const instrumentTracing = any>({ + name, + fn, + options +}: { + name?: string + fn: TFunction + options?: SpanOptions +}) => { + // build a wrapper around `fn` that accepts the same parameters and returns the same return type + const wrapper = function ( + ...args: Parameters + ): ReturnType { + const spanName = name || fn.name + const spanOptions = options || {} + return tracing + .getTracer() + .startActiveSpan(spanName, spanOptions, (span: Span) => { + try { + tracing.setSpanAttribute(tracing.CODE_FUNCTION, fn.name) + tracing.setSpanAttribute('spid', SPID) + tracing.setSpanAttribute('endpoint', ENDPOINT) + + // TODO add skip parameter to instrument testing function to NOT log certain args + // tracing.setSpanAttribute('args', JSON.stringify(args)) + const result = fn.apply(this, args) + + // if `fn` is async, await the result + if (result && result.then) { + /** + * by handling promise like this, the caller to this wrapper + * can still use normal async/await syntax to `await` the result + * of this wrapper + * i.e. `const output = await instrumentTracing({ fn: _someFunction })(args)` + * + * based on this package: https://github.com/klny/function-wrapper/blob/master/src/wrapper.js#L25 + */ + return result.then((val: any) => { + span.end() + return val + }) + } + + span.end() + + // re-return result from synchronous function + return result + } catch (e: any) { + tracing.recordException(e) + span.end() + + // rethrow any errors + throw e + } + }) + } + // copy function name + Object.defineProperty(wrapper, 'name', { value: fn.name }) + return wrapper +} + /** * Namespace to store tracing helper function */ @@ -81,22 +185,42 @@ export const tracing = { * This function fetches the tracer from the application context * @returns {Tracer} the tracer for content node */ - getTracer: () => { + getTracer: (): Tracer => { return trace.getTracer(SERVICE_NAME) }, /** - * Helper function that gets the current active span or return `undefined` if there is no active span + * @returns {Span | undefined} the current active span or `undefined` if there is no active span */ getActiveSpan: (): Span | undefined => { return trace.getSpan(context.active()) }, + /** + * Adds a key-value style attribute to the current span + * @param name the attribute key + * @param value the attribute value + */ setSpanAttribute: (name: string, value: AttributeValue) => { const span = tracing.getActiveSpan() span?.setAttribute(name, value) }, + /** + * log a message with severity 'debug' on the current span + * @param {string} msg the message to log + */ + debug: (msg: string) => { + const span = tracing.getActiveSpan() + span?.addEvent(msg, { + 'log.severity': 'debug' + }) + }, + + /** + * log a message with severity 'info' on the current span + * @param {string} msg the message to log + */ info: (msg: string) => { const span = tracing.getActiveSpan() span?.addEvent(msg, { @@ -104,6 +228,10 @@ export const tracing = { }) }, + /** + * log a message with severity 'warn' on the current span + * @param {string} msg the message to log + */ warn: (msg: string) => { const span = tracing.getActiveSpan() span?.addEvent(msg, { @@ -111,6 +239,10 @@ export const tracing = { }) }, + /** + * log a message with severity 'error' on the current span + * @param {string} msg the message to log + */ error: (msg: string) => { const span = tracing.getActiveSpan() span?.setStatus({ code: SpanStatusCode.ERROR, message: msg }) @@ -119,6 +251,10 @@ export const tracing = { }) }, + /** + * records errors on the current trace and sets the span status to `ERROR` + * @param {Error} error the error to record on the span + */ recordException: (error: Error) => { const span = tracing.getActiveSpan() span?.recordException(error) @@ -137,5 +273,6 @@ export const tracing = { module.exports = { setupTracing, + instrumentTracing, tracing }