Skip to content

Commit

Permalink
wrap all callbacks in instrumentations with wrapFunction (#4634)
Browse files Browse the repository at this point in the history
These were the remaining areas with non-trivial code that might not be
caught by try/catches when our code throws an exception. Adding
`wrapFunction` adds that try/catch appropriately, but only when SSI is
enabled. Otherwise these are all passthroughs. In the future, we may
enable those try/catches at all times.
  • Loading branch information
bengl committed Aug 28, 2024
1 parent 198ead2 commit 474a735
Show file tree
Hide file tree
Showing 39 changed files with 127 additions and 119 deletions.
8 changes: 4 additions & 4 deletions packages/datadog-instrumentations/src/aws-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ function wrapSmithySend (send) {
})

if (typeof cb === 'function') {
args[args.length - 1] = function (err, result) {
args[args.length - 1] = shimmer.wrapFunction(cb, cb => function (err, result) {
const message = getMessage(request, err, result)

completeChannel.publish(message)
Expand All @@ -89,7 +89,7 @@ function wrapSmithySend (send) {
responseFinishChannel.publish(message.response.error)
}
})
}
})
} else { // always a promise
return send.call(this, command, ...args)
.then(
Expand All @@ -113,7 +113,7 @@ function wrapSmithySend (send) {

function wrapCb (cb, serviceName, request, ar) {
// eslint-disable-next-line n/handle-callback-err
return function wrappedCb (err, response) {
return shimmer.wrapFunction(cb, cb => function wrappedCb (err, response) {
const obj = { request, response }
return ar.runInAsyncScope(() => {
channel(`apm:aws:response:start:${serviceName}`).publish(obj)
Expand Down Expand Up @@ -141,7 +141,7 @@ function wrapCb (cb, serviceName, request, ar) {
throw e
}
})
}
})
}

function getMessage (request, error, result) {
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/body-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { channel, addHook, AsyncResource } = require('./helpers/instrument')
const bodyParserReadCh = channel('datadog:body-parser:read:finish')

function publishRequestBodyAndNext (req, res, next) {
return function () {
return shimmer.wrapFunction(next, next => function () {
if (bodyParserReadCh.hasSubscribers && req) {
const abortController = new AbortController()
const body = req.body
Expand All @@ -17,7 +17,7 @@ function publishRequestBodyAndNext (req, res, next) {
}

return next.apply(this, arguments)
}
})
}

addHook({
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/cassandra-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ function finish (finishCh, errorCh, error) {
}

function wrapCallback (finishCh, errorCh, asyncResource, callback) {
return asyncResource.bind(function (err) {
return shimmer.wrapFunction(callback, callback => asyncResource.bind(function (err) {
finish(finishCh, errorCh, err)
if (callback) {
return callback.apply(this, arguments)
}
})
}))
}

function isRequestValid (exec, args, length) {
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ function wrapLayerHandle (layer) {
}

function wrapNext (req, next) {
return function (error) {
return shimmer.wrapFunction(next, next => function (error) {
if (error) {
errorChannel.publish({ req, error })
}
Expand All @@ -99,7 +99,7 @@ function wrapNext (req, next) {
finishChannel.publish({ req })

next.apply(this, arguments)
}
})
}

addHook({ name: 'connect', versions: ['>=3'] }, connect => {
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/cookie-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { channel, addHook } = require('./helpers/instrument')
const cookieParserReadCh = channel('datadog:cookie-parser:read:finish')

function publishRequestCookieAndNext (req, res, next) {
return function cookieParserWrapper () {
return shimmer.wrapFunction(next, next => function cookieParserWrapper () {
if (cookieParserReadCh.hasSubscribers && req) {
const abortController = new AbortController()

Expand All @@ -18,7 +18,7 @@ function publishRequestCookieAndNext (req, res, next) {
}

return next.apply(this, arguments)
}
})
}

addHook({
Expand Down
8 changes: 4 additions & 4 deletions packages/datadog-instrumentations/src/couchbase.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ function wrap (prefix, fn) {

startCh.publish({ bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts })

arguments[callbackIndex] = asyncResource.bind(function (error, result) {
arguments[callbackIndex] = shimmer.wrapFunction(cb, cb => asyncResource.bind(function (error, result) {
if (error) {
errorCh.publish(error)
}
finishCh.publish(result)
return cb.apply(this, arguments)
})
}))

try {
return fn.apply(this, arguments)
Expand Down Expand Up @@ -118,13 +118,13 @@ function wrapCBandPromise (fn, name, startData, thisArg, args) {
// v3 offers callback or promises event handling
// NOTE: this does not work with v3.2.0-3.2.1 cluster.query, as there is a bug in the couchbase source code
const cb = callbackResource.bind(args[cbIndex])
args[cbIndex] = asyncResource.bind(function (error, result) {
args[cbIndex] = shimmer.wrapFunction(cb, cb => asyncResource.bind(function (error, result) {
if (error) {
errorCh.publish(error)
}
finishCh.publish({ result })
return cb.apply(thisArg, arguments)
})
}))
}
const res = fn.apply(thisArg, args)

Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/cucumber.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ function wrapRun (pl, isLatestVersion) {
testStartCh.publish(testStartPayload)
})
try {
this.eventBroadcaster.on('envelope', (testCase) => {
this.eventBroadcaster.on('envelope', shimmer.wrapFunction(null, () => (testCase) => {
// Only supported from >=8.0.0
if (testCase?.testCaseFinished) {
const { testCaseFinished: { willBeRetried } } = testCase
Expand All @@ -206,7 +206,7 @@ function wrapRun (pl, isLatestVersion) {
})
}
}
})
}))
let promise

asyncResource.runInAsyncScope(() => {
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/dns.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ function wrap (prefix, fn, expectedArgs, rrtype) {
return asyncResource.runInAsyncScope(() => {
startCh.publish(startArgs)

arguments[arguments.length - 1] = asyncResource.bind(function (error, result) {
arguments[arguments.length - 1] = shimmer.wrapFunction(cb, cb => asyncResource.bind(function (error, result) {
if (error) {
errorCh.publish(error)
}
finishCh.publish(result)
cb.apply(this, arguments)
})
}))

try {
return fn.apply(this, arguments)
Expand Down
8 changes: 4 additions & 4 deletions packages/datadog-instrumentations/src/elasticsearch.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ function createWrapSelect () {
return function () {
if (arguments.length === 1) {
const cb = arguments[0]
arguments[0] = function (err, connection) {
arguments[0] = shimmer.wrapFunction(cb, cb => function (err, connection) {
if (connectCh.hasSubscribers && connection && connection.host) {
connectCh.publish({ hostname: connection.host.host, port: connection.host.port })
}
cb(err, connection)
}
})
}
return request.apply(this, arguments)
}
Expand Down Expand Up @@ -86,10 +86,10 @@ function createWrapRequest (name) {
if (typeof cb === 'function') {
cb = parentResource.bind(cb)

arguments[lastIndex] = asyncResource.bind(function (error) {
arguments[lastIndex] = shimmer.wrapFunction(cb, cb => asyncResource.bind(function (error) {
finish(params, error)
return cb.apply(null, arguments)
})
}))
return request.apply(this, arguments)
} else {
const promise = request.apply(this, arguments)
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/express.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ addHook({ name: 'express', versions: ['>=4'] }, express => {
const queryParserReadCh = channel('datadog:query:read:finish')

function publishQueryParsedAndNext (req, res, next) {
return function () {
return shimmer.wrapFunction(next, next => function () {
if (queryParserReadCh.hasSubscribers && req) {
const abortController = new AbortController()
const query = req.query
Expand All @@ -60,7 +60,7 @@ function publishQueryParsedAndNext (req, res, next) {
}

return next.apply(this, arguments)
}
})
}

addHook({
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/fastify.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function wrapFastify (fastify, hasParsingEvents) {
}

function wrapAddHook (addHook) {
return function addHookWithTrace (name, fn) {
return shimmer.wrapFunction(addHook, addHook => function addHookWithTrace (name, fn) {
fn = arguments[arguments.length - 1]

if (typeof fn !== 'function') return addHook.apply(this, arguments)
Expand Down Expand Up @@ -78,7 +78,7 @@ function wrapAddHook (addHook) {
})

return addHook.apply(this, arguments)
}
})
}

function onRequest (request, reply, done) {
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/find-my-way.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ function wrapOn (on) {
return function onWithTrace (method, path, opts) {
const index = typeof opts === 'function' ? 2 : 3
const handler = arguments[index]
const wrapper = function (req) {
const wrapper = shimmer.wrapFunction(handler, handler => function (req) {
routeChannel.publish({ req, route: path })

return handler.apply(this, arguments)
}
})

if (typeof handler === 'function') {
arguments[index] = wrapper
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,15 @@ function createWrapFunction (prefix = '', override = '') {
if (cb) {
const outerResource = new AsyncResource('bound-anonymous-fn')

arguments[lastIndex] = innerResource.bind(function (e) {
arguments[lastIndex] = shimmer.wrapFunction(cb, cb => innerResource.bind(function (e) {
if (e !== null && typeof e === 'object') { // fs.exists receives a boolean
errorChannel.publish(e)
}

finishChannel.publish()

return outerResource.runInAsyncScope(() => cb.apply(this, arguments))
})
}))
}

return innerResource.runInAsyncScope(() => {
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/google-cloud-pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ function wrapMethod (method) {
if (typeof cb === 'function') {
const outerAsyncResource = new AsyncResource('bound-anonymous-fn')

arguments[arguments.length - 1] = innerAsyncResource.bind(function (error) {
arguments[arguments.length - 1] = shimmer.wrapFunction(cb, cb => innerAsyncResource.bind(function (error) {
if (error) {
requestErrorCh.publish(error)
}

requestFinishCh.publish()

return outerAsyncResource.runInAsyncScope(() => cb.apply(this, arguments))
})
}))

return method.apply(this, arguments)
} else {
Expand Down
10 changes: 4 additions & 6 deletions packages/datadog-instrumentations/src/grpc/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,18 @@ function wrapMethod (method, path, type, hasPeer) {
return method
}

const wrapped = function () {
const wrapped = shimmer.wrapFunction(method, method => function () {
const args = ensureMetadata(this, arguments, 1)
return callMethod(this, method, args, path, args[1], type, hasPeer)
}

Object.assign(wrapped, method)
})

patched.add(wrapped)

return wrapped
}

function wrapCallback (ctx, callback = () => { }) {
return function (err) {
return shimmer.wrapFunction(callback, callback => function (err) {
if (err) {
ctx.error = err
errorChannel.publish(ctx)
Expand All @@ -111,7 +109,7 @@ function wrapCallback (ctx, callback = () => { }) {
return callback.apply(this, arguments)
// No async end channel needed
})
}
})
}

function createWrapEmit (ctx, hasPeer = false) {
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/grpc/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ function wrapStream (call, ctx, onCancel) {
}

function wrapCallback (callback = () => {}, call, ctx, onCancel) {
return function (err, value, trailer, flags) {
return shimmer.wrapFunction(callback, callback => function (err, value, trailer, flags) {
if (err) {
ctx.error = err
errorChannel.publish(ctx)
Expand All @@ -136,7 +136,7 @@ function wrapCallback (callback = () => {}, call, ctx, onCancel) {
return callback.apply(this, arguments)
// No async end channel needed
})
}
})
}

function wrapSendStatus (sendStatus, ctx) {
Expand Down
12 changes: 6 additions & 6 deletions packages/datadog-instrumentations/src/hapi.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@ function wrapServer (server) {
}

function wrapStart (start) {
return function () {
return shimmer.wrapFunction(start, start => function () {
if (this && typeof this.ext === 'function') {
this.ext('onPreResponse', onPreResponse)
}

return start.apply(this, arguments)
}
})
}

function wrapExt (ext) {
return function (events, method, options) {
return shimmer.wrapFunction(ext, ext => function (events, method, options) {
if (events !== null && typeof events === 'object') {
arguments[0] = wrapEvents(events)
} else {
arguments[1] = wrapExtension(method)
}

return ext.apply(this, arguments)
}
})
}

function wrapDispatch (dispatch) {
Expand Down Expand Up @@ -92,15 +92,15 @@ function wrapEvents (events) {
function wrapHandler (handler) {
if (typeof handler !== 'function') return handler

return function (request, h) {
return shimmer.wrapFunction(handler, handler => function (request, h) {
const req = request && request.raw && request.raw.req

if (!req) return handler.apply(this, arguments)

return hapiTracingChannel.traceSync(() => {
return handler.apply(this, arguments)
})
}
})
}

function onPreResponse (request, h) {
Expand Down
6 changes: 3 additions & 3 deletions packages/datadog-instrumentations/src/http/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ function patch (http, methodName) {
let callback = args.callback

if (callback) {
callback = function () {
callback = shimmer.wrapFunction(args.callback, cb => function () {
return asyncStartChannel.runStores(ctx, () => {
return args.callback.apply(this, arguments)
return cb.apply(this, arguments)
})
}
})
}

const options = args.options
Expand Down
Loading

0 comments on commit 474a735

Please sign in to comment.