Skip to content

Commit

Permalink
[CON-243] - Use prometheus middleware package to track route duration (
Browse files Browse the repository at this point in the history
…#3494)

* wip

* add regex to mw

* move around

* expose trusted notifier data in health check + tests

* why is this not working

* manually works but not tests

* revert mw file

* pull files from head of master

* change namings

* fix tests

* slight refactor

* rename imported package

* remove manual timer

* add comments + remove manual dur tracking

* comments + remove import

* remove default gauge counter

* comments

* fix name change

* capitalize all constants

* update bucket time range

* add autoregister key

* move comment up

* update to only have 6 buckets

Co-authored-by: Sid Sethi <3323835+SidSethi@users.noreply.github.com>
  • Loading branch information
vicky-g and SidSethi authored Jul 20, 2022
1 parent 43e43fe commit 15006a2
Show file tree
Hide file tree
Showing 15 changed files with 418 additions and 209 deletions.
14 changes: 14 additions & 0 deletions creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions creator-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"ethers": "5.4.7",
"exif-parser": "^0.1.12",
"express": "^4.16.3",
"express-prom-bundle": "^6.5.0",
"express-rate-limit": "5.3.0",
"ffmpeg-static": "^2.7.0",
"ffprobe-static": "^3.0.0",
Expand Down
103 changes: 76 additions & 27 deletions creator-node/src/app.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const express = require('express')
const bodyParser = require('body-parser')
const cors = require('cors')
const prometheusMiddleware = require('express-prom-bundle')

const DiskManager = require('./diskManager')
const { sendResponse, errorResponseServerError } = require('./apiHelpers')
Expand All @@ -19,42 +20,18 @@ const {
getRateLimiterMiddleware
} = require('./reqLimiter')
const config = require('./config')
const {
exponentialBucketsRange
} = require('./services/prometheusMonitoring/prometheusUtils')
const healthCheckRoutes = require('./components/healthCheck/healthCheckController')
const contentBlacklistRoutes = require('./components/contentBlacklist/contentBlacklistController')
const replicaSetRoutes = require('./components/replicaSet/replicaSetController')

const app = express()

// middleware functions will be run in order they are added to the app below
// - loggingMiddleware must be first to ensure proper error handling
app.use(loggingMiddleware)
app.use(bodyParser.json({ limit: '1mb' }))
app.use(readOnlyMiddleware)
app.use(cors())

// Rate limit routes
app.use('/users/', userReqLimiter)
app.use('/track*', trackReqLimiter)
app.use('/audius_user/', audiusUserReqLimiter)
app.use('/metadata', metadataReqLimiter)
app.use('/image_upload', imageReqLimiter)
app.use('/ursm_request_for_signature', URSMRequestForSignatureReqLimiter)
app.use('/batch_cids_exist', batchCidsExistReqLimiter)
app.use('/batch_image_cids_exist', batchCidsExistReqLimiter)
app.use(getRateLimiterMiddleware())

// import routes
require('./routes')(app)
app.use('/', healthCheckRoutes)
app.use('/', contentBlacklistRoutes)
app.use('/', replicaSetRoutes)

function errorHandler(err, req, res, next) {
req.logger.error('Internal server error')
req.logger.error(err.stack)
sendResponse(req, res, errorResponseServerError('Internal server error'))
}
app.use(errorHandler)

/**
* Configures express app object with required properties and starts express server
Expand All @@ -63,6 +40,78 @@ app.use(errorHandler)
* @param {ServiceRegistry} serviceRegistry object housing all Content Node Services
*/
const initializeApp = (port, serviceRegistry) => {
const app = express()

// middleware functions will be run in order they are added to the app below
// - loggingMiddleware must be first to ensure proper error handling
app.use(loggingMiddleware)
app.use(bodyParser.json({ limit: '1mb' }))
app.use(readOnlyMiddleware)
app.use(cors())

// Rate limit routes
app.use('/users/', userReqLimiter)
app.use('/track*', trackReqLimiter)
app.use('/audius_user/', audiusUserReqLimiter)
app.use('/metadata', metadataReqLimiter)
app.use('/image_upload', imageReqLimiter)
app.use('/ursm_request_for_signature', URSMRequestForSignatureReqLimiter)
app.use('/batch_cids_exist', batchCidsExistReqLimiter)
app.use('/batch_image_cids_exist', batchCidsExistReqLimiter)
app.use(getRateLimiterMiddleware())

const prometheusRegistry = serviceRegistry.prometheusRegistry

// Metric tracking middleware
app.use(
prometheusMiddleware({
// Use existing registry for compatibility with custom metrics. Can see
// the metrics on /prometheus_metrics
promRegistry: prometheusRegistry.registry,
// Override metric name to include namespace prefix
httpDurationMetricName: `${prometheusRegistry.namespacePrefix}_http_request_duration_seconds`,
// Include HTTP method in duration tracking
includeMethod: true,
// Include HTTP status code in duration tracking
includePath: true,
// Disable default gauge counter to indicate if this middleware is running
includeUp: false,
// The buckets in seconds to measure requests
buckets: [0.2, 0.5, ...exponentialBucketsRange(1, 60, 4)],
// Do not register the default /metrics route, since we have the /prometheus_metrics
autoregister: false,
// Normalizes the path to be tracked in this middleware. For routes with route params,
// this fn maps those routes to generic paths. e.g. /ipfs/QmSomeCid -> /ipfs/#CID
normalizePath: function (req, opts) {
const path = prometheusMiddleware.normalizePath(req, opts)
try {
for (const {
regex,
path: normalizedPath
} of prometheusRegistry.regexes) {
const match = path.match(regex)
if (match) {
return normalizedPath
}
}
} catch (e) {
req.logger.warn(
`DurationTracking || Could not match on regex: ${e.message}`
)
}
return path
}
})
)

// import routes
require('./routes')(app)
app.use('/', healthCheckRoutes)
app.use('/', contentBlacklistRoutes)
app.use('/', replicaSetRoutes)

app.use(errorHandler)

const storagePath = DiskManager.getConfigStoragePath()

// TODO: Can remove these when all routes consume serviceRegistry
Expand Down
16 changes: 0 additions & 16 deletions creator-node/src/routes/tracks.js
Original file line number Diff line number Diff line change
Expand Up @@ -321,17 +321,8 @@ module.exports = function (app) {
transcodedTrackUUID
} = req.body

const prometheusRegistry =
req.app.get('serviceRegistry').prometheusRegistry
const routePostTracksDurationSecondsMetric = prometheusRegistry.getMetric(
prometheusRegistry.metricNames
.ROUTE_POST_TRACKS_DURATION_SECONDS_HISTOGRAM
)
const metricEndTimerFn = routePostTracksDurationSecondsMetric.startTimer()

// Input validation
if (!blockchainTrackId || !blockNumber || !metadataFileUUID) {
metricEndTimerFn({ code: 400 })
return errorResponseBadRequest(
'Must include blockchainTrackId, blockNumber, and metadataFileUUID.'
)
Expand All @@ -340,7 +331,6 @@ module.exports = function (app) {
// Error on outdated blocknumber
const cnodeUser = req.session.cnodeUser
if (blockNumber < cnodeUser.latestBlockNumber) {
metricEndTimerFn({ code: 400 })
return errorResponseBadRequest(
`Invalid blockNumber param ${blockNumber}. Must be greater or equal to previously processed blocknumber ${cnodeUser.latestBlockNumber}.`
)
Expand All @@ -352,7 +342,6 @@ module.exports = function (app) {
where: { fileUUID: metadataFileUUID, cnodeUserUUID }
})
if (!file) {
metricEndTimerFn({ code: 400 })
return errorResponseBadRequest(
`No file db record found for provided metadataFileUUID ${metadataFileUUID}.`
)
Expand All @@ -367,13 +356,11 @@ module.exports = function (app) {
!Array.isArray(metadataJSON.track_segments) ||
!metadataJSON.track_segments.length
) {
metricEndTimerFn({ code: 500 })
return errorResponseServerError(
`Malformatted metadataJSON stored for metadataFileUUID ${metadataFileUUID}.`
)
}
} catch (e) {
metricEndTimerFn({ code: 500 })
return errorResponseServerError(
`No file stored on disk for metadataFileUUID ${metadataFileUUID} at storagePath ${file.storagePath}.`
)
Expand All @@ -387,7 +374,6 @@ module.exports = function (app) {
metadataJSON.cover_art_sizes
)
} catch (e) {
metricEndTimerFn({ code: 500 })
return errorResponseServerError(e.message)
}

Expand Down Expand Up @@ -575,12 +561,10 @@ module.exports = function (app) {
// Discovery only indexes metadata and not files, so we eagerly replicate data but don't await it
issueAndWaitForSecondarySyncRequests(req, true)

metricEndTimerFn({ code: 200 })
return successResponse()
} catch (e) {
req.logger.error(e.message)
await transaction.rollback()
metricEndTimerFn({ code: 500 })
return errorResponseServerError(e.message)
}
})
Expand Down
105 changes: 99 additions & 6 deletions creator-node/src/serviceRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,14 @@ class ServiceRegistry {
)
}

try {
await this._setupRouteDurationTracking(app)
} catch (e) {
this.logError(
`DurationTracking || Failed to setup general duration tracking for all routes: ${e.message}. Skipping..`
)
}

this.servicesThatRequireServerInitialized = true

logInfoWithDuration(
Expand Down Expand Up @@ -368,6 +376,95 @@ class ServiceRegistry {
)
}

/**
* Gets the regexes for routes with route params. Used for mapping paths in metrics to track route durations
*
* Structure:
* {regex: <some regex>, path: <path that a matched path will route to in the normalize fn in prometheus middleware>}
*
* Example of the regex added to PrometheusRegistry:
*
* /ipfs/:CID and /content/:CID -> map to /ipfs/#CID
*
* {
* regex: /(?:^\/ipfs\/(?:([^/]+?))\/?$|^\/content\/(?:([^/]+?))\/?$)/i,
* path: '/ipfs/#CID'
* }
* @param {Object} app
*/
async _setupRouteDurationTracking(app) {
// Get all the routes initialized in the app
const routes = app._router.stack.filter(
(element) =>
(element.route && element.route.path) ||
(element.handle && element.handle.stack)
)

// Iterate through the paths and add the regexes if the route
// has route params

// Express routing is... unpredictable. Use a set to manage seen paths
const seenPaths = new Set()
const parsedRoutes = []
routes.forEach((element) => {
if (element.name === 'router') {
// Iterate through routes initialized with the express router
element.handle.stack.forEach((e) => {
const path = e.route.path
const method = Object.keys(e.route.methods)[0]
const regex = e.regexp
addToParsedRoutes(path, method, regex)
})
} else {
// Iterate through all the other routes initalized with the app
const path = element.route.path
const method = Object.keys(element.route.methods)[0]
const regex = element.regexp
addToParsedRoutes(path, method, regex)
}
})

// Only keep track of the routes with route params, e.g. the route with ':'
// in the route to indicate a route param
function addToParsedRoutes(path, method, regex) {
// Routes may come in the form of an array (e.g. ['/ipfs/:CID', '/content/:CID'])
if (Array.isArray(path)) {
path.forEach((p) => {
if (!seenPaths.has(p + method)) {
if (p.includes(':')) {
seenPaths.add(p + method)
parsedRoutes.push({
path: p,
method,
regex
})
}
}
})
} else {
if (!seenPaths.has(path + method)) {
if (path.includes(':')) {
seenPaths.add(path + method)
parsedRoutes.push({
path,
method,
regex
})
}
}
}
}

const regexes = parsedRoutes
.filter(({ path }) => path.includes(':'))
.map(({ path, regex }) => ({
regex,
path: path.replace(/:/g, '#')
}))

this.prometheusRegistry.initRouteParamRegexes(regexes)
}

/**
* Wait until URSM contract is deployed, then attempt to register on L2 URSM with infinite retries
* Requires L1 identity
Expand Down Expand Up @@ -476,11 +573,7 @@ class ServiceRegistry {
}
}

/*
* Export a singleton instance of the ServiceRegistry
*/
const serviceRegistry = new ServiceRegistry()

// Export a singleton instance of the ServiceRegistry
module.exports = {
serviceRegistry
serviceRegistry: new ServiceRegistry()
}
Loading

0 comments on commit 15006a2

Please sign in to comment.