Skip to content

Commit

Permalink
Use only one channel instead of 6
Browse files Browse the repository at this point in the history
  • Loading branch information
uurien committed Sep 23, 2024
1 parent 36015cc commit c691443
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 69 deletions.
33 changes: 15 additions & 18 deletions packages/datadog-instrumentations/src/mysql2.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ addHook({ name: 'mysql2', file: 'lib/connection.js', versions: ['>=1'] }, (Conne
const startCh = channel('apm:mysql2:query:start')
const finishCh = channel('apm:mysql2:query:finish')
const errorCh = channel('apm:mysql2:query:error')
const startConnectionQueryCh = channel('datadog:mysql2:connection:query:start')
const startConnectionExecuteCh = channel('datadog:mysql2:connection:execute:start')
const startOuterQueryCh = channel('datadog:mysql2:outerquery:start')
const shouldEmitEndAfterQueryAbort = semver.intersects(version, '>=1.3.3')

shimmer.wrap(Connection.prototype, 'addCommand', addCommand => function (cmd) {
Expand All @@ -33,14 +32,14 @@ addHook({ name: 'mysql2', file: 'lib/connection.js', versions: ['>=1'] }, (Conne
})

shimmer.wrap(Connection.prototype, 'query', query => function (sql, values, cb) {
if (!startConnectionQueryCh.hasSubscribers) return query.apply(this, arguments)
if (!startOuterQueryCh.hasSubscribers) return query.apply(this, arguments)

const sqlIsString = typeof sql === 'string'

const sqlString = sqlIsString ? sql : sql?.sql
if (sqlString) {
const abortController = new AbortController()
startConnectionQueryCh.publish({ sql: sqlString, abortController })
startOuterQueryCh.publish({ sql: sqlString, abortController })

if (abortController.signal.aborted) {
let queryCommand = sql
Expand Down Expand Up @@ -70,12 +69,12 @@ addHook({ name: 'mysql2', file: 'lib/connection.js', versions: ['>=1'] }, (Conne
})

shimmer.wrap(Connection.prototype, 'execute', execute => function (sql, values, cb) {
if (!startConnectionExecuteCh.hasSubscribers) return execute.apply(this, arguments)
if (!startOuterQueryCh.hasSubscribers) return execute.apply(this, arguments)

const sqlString = typeof sql === 'object' ? sql?.sql : sql
if (sqlString) {
const abortController = new AbortController()
startConnectionExecuteCh.publish({ sql, abortController })
startOuterQueryCh.publish({ sql, abortController })

if (abortController.signal.aborted) {
const addCommand = this.addCommand
Expand Down Expand Up @@ -150,17 +149,16 @@ addHook({ name: 'mysql2', file: 'lib/connection.js', versions: ['>=1'] }, (Conne
})

addHook({ name: 'mysql2', file: 'lib/pool.js', versions: ['>=1'] }, (Pool, version) => {
const startPoolQueryCh = channel('datadog:mysql2:pool:query:start')
const startPoolExecuteCh = channel('datadog:mysql2:pool:execute:start')
const startOuterQueryCh = channel('datadog:mysql2:outerquery:start')
const shouldEmitEndAfterQueryAbort = semver.intersects(version, '>=1.3.3')

shimmer.wrap(Pool.prototype, 'query', query => function (sql, values, cb) {
if (!startPoolQueryCh.hasSubscribers) return query.apply(this, arguments)
if (!startOuterQueryCh.hasSubscribers) return query.apply(this, arguments)

const sqlString = typeof sql === 'object' ? sql?.sql : sql
if (sqlString) {
const abortController = new AbortController()
startPoolQueryCh.publish({ sql, abortController })
startOuterQueryCh.publish({ sql, abortController })

if (abortController.signal.aborted) {
const getConnection = this.getConnection
Expand Down Expand Up @@ -193,10 +191,10 @@ addHook({ name: 'mysql2', file: 'lib/pool.js', versions: ['>=1'] }, (Pool, versi
})

shimmer.wrap(Pool.prototype, 'execute', execute => function (sql, values, cb) {
if (!startPoolExecuteCh.hasSubscribers) return execute.apply(this, arguments)
if (!startOuterQueryCh.hasSubscribers) return execute.apply(this, arguments)

const abortController = new AbortController()
startPoolExecuteCh.publish({ sql, abortController })
startOuterQueryCh.publish({ sql, abortController })

if (abortController.signal.aborted) {
if (typeof values === 'function') {
Expand All @@ -217,18 +215,17 @@ addHook({ name: 'mysql2', file: 'lib/pool.js', versions: ['>=1'] }, (Pool, versi

// PoolNamespace.prototype.query does not exist in mysql2<2.3.0
addHook({ name: 'mysql2', file: 'lib/pool_cluster.js', versions: ['>=2.3.0'] }, PoolCluster => {
const startPoolNamespaceQueryCh = channel('datadog:mysql2:poolnamespace:query:start')
const startPoolNamespaceExecuteCh = channel('datadog:mysql2:poolnamespace:execute:start')
const startOuterQueryCh = channel('datadog:mysql2:outerquery:start')

shimmer.wrap(PoolCluster.prototype, 'of', of => function () {
const poolNamespace = of.apply(this, arguments)

if (startPoolNamespaceQueryCh.hasSubscribers) {
if (startOuterQueryCh.hasSubscribers) {
shimmer.wrap(poolNamespace, 'query', query => function (sql, values, cb) {
if (typeof sql === 'object') sql = sql?.sql

const abortController = new AbortController()
startPoolNamespaceQueryCh.publish({ sql, abortController })
startOuterQueryCh.publish({ sql, abortController })

if (abortController.signal.aborted) {
const getConnection = this.getConnection
Expand Down Expand Up @@ -257,12 +254,12 @@ addHook({ name: 'mysql2', file: 'lib/pool_cluster.js', versions: ['>=2.3.0'] },
})
}

if (startPoolNamespaceExecuteCh.hasSubscribers) {
if (startOuterQueryCh.hasSubscribers) {
shimmer.wrap(poolNamespace, 'execute', execute => function (sql, values, cb) {
if (typeof sql === 'object') sql = sql?.sql

const abortController = new AbortController()
startPoolNamespaceExecuteCh.publish({ sql, abortController })
startOuterQueryCh.publish({ sql, abortController })

if (abortController.signal.aborted) {
if (typeof values === 'function') {
Expand Down
25 changes: 1 addition & 24 deletions packages/datadog-instrumentations/test/mysql2.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ describe('mysql2 instrumentation', () => {
let apmQueryStartChannel, apmQueryStart, mysql2Version

before(() => {
startCh = channel('datadog:mysql2:outerquery:start')
return agent.load(['mysql2'])
})

Expand Down Expand Up @@ -65,10 +66,6 @@ describe('mysql2 instrumentation', () => {
})

describe('Connection.prototype.query', () => {
beforeEach(() => {
startCh = channel('datadog:mysql2:connection:query:start')
})

describe('with string as query', () => {
describe('with callback', () => {
it('should abort the query on abortController.abort()', (done) => {
Expand Down Expand Up @@ -231,10 +228,6 @@ describe('mysql2 instrumentation', () => {
})

describe('Connection.prototype.execute', () => {
beforeEach(() => {
startCh = channel('datadog:mysql2:connection:execute:start')
})

describe('with the query in options', () => {
it('should abort the query on abortController.abort()', (done) => {
startCh.subscribe(abort)
Expand Down Expand Up @@ -327,10 +320,6 @@ describe('mysql2 instrumentation', () => {
})

describe('Pool.prototype.query', () => {
beforeEach(() => {
startCh = channel('datadog:mysql2:pool:query:start')
})

describe('with callback', () => {
it('should abort the query on abortController.abort()', (done) => {
startCh.subscribe(abort)
Expand Down Expand Up @@ -402,10 +391,6 @@ describe('mysql2 instrumentation', () => {
})

describe('Pool.prototype.execute', () => {
beforeEach(() => {
startCh = channel('datadog:mysql2:pool:execute:start')
})

describe('with callback', () => {
it('should abort the query on abortController.abort()', (done) => {
startCh.subscribe(abort)
Expand Down Expand Up @@ -469,10 +454,6 @@ describe('mysql2 instrumentation', () => {
})

describe('PoolNamespace.prototype.query', () => {
beforeEach(() => {
startCh = channel('datadog:mysql2:poolnamespace:query:start')
})

it('should abort the query on abortController.abort()', (done) => {
startCh.subscribe(abort)
const namespace = poolCluster.of()
Expand Down Expand Up @@ -510,10 +491,6 @@ describe('mysql2 instrumentation', () => {
})

describe('PoolNamespace.prototype.execute', () => {
beforeEach(() => {
startCh = channel('datadog:mysql2:poolnamespace:execute:start')
})

it('should abort the query on abortController.abort()', (done) => {
startCh.subscribe(abort)

Expand Down
7 changes: 1 addition & 6 deletions packages/dd-trace/src/appsec/channels.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ module.exports = {
setUncaughtExceptionCaptureCallbackStart: dc.channel('datadog:process:setUncaughtExceptionCaptureCallback:start'),
pgQueryStart: dc.channel('apm:pg:query:start'),
pgPoolQueryStart: dc.channel('datadog:pg:pool:query:start'),
mysql2ConnectionQueryStart: dc.channel('datadog:mysql2:connection:query:start'),
mysql2ConnectionExecuteStart: dc.channel('datadog:mysql2:connection:execute:start'),
mysql2PoolQueryStart: dc.channel('datadog:mysql2:pool:query:start'),
mysql2PoolExecuteStart: dc.channel('datadog:mysql2:pool:execute:start'),
mysql2PoolNamespaceQueryStart: dc.channel('datadog:mysql2:poolnamespace:query:start'),
mysql2PoolNamespaceExecuteStart: dc.channel('datadog:mysql2:poolnamespace:execute:start'),
mysql2OuterQueryStart: dc.channel('datadog:mysql2:outerquery:start'),
wafRunFinished: dc.channel('datadog:waf:run:finish')
}
25 changes: 4 additions & 21 deletions packages/dd-trace/src/appsec/rasp/sql_injection.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@ const {
pgQueryStart,
pgPoolQueryStart,
wafRunFinished,
mysql2ConnectionQueryStart,
mysql2ConnectionExecuteStart,
mysql2PoolQueryStart,
mysql2PoolExecuteStart,
mysql2PoolNamespaceQueryStart,
mysql2PoolNamespaceExecuteStart
mysql2OuterQueryStart
} = require('../channels')
const { storage } = require('../../../../datadog-core')
const addresses = require('../addresses')
Expand All @@ -29,26 +24,14 @@ function enable (_config) {
pgPoolQueryStart.subscribe(analyzePgSqlInjection)
wafRunFinished.subscribe(clearQuerySet)

mysql2ConnectionQueryStart.subscribe(analyzeMysql2SqlInjection)
mysql2ConnectionExecuteStart.subscribe(analyzeMysql2SqlInjection)
mysql2PoolQueryStart.subscribe(analyzeMysql2SqlInjection)
mysql2PoolExecuteStart.subscribe(analyzeMysql2SqlInjection)
mysql2PoolNamespaceQueryStart.subscribe(analyzeMysql2SqlInjection)
mysql2PoolNamespaceExecuteStart.subscribe(analyzeMysql2SqlInjection)
mysql2OuterQueryStart.subscribe(analyzeMysql2SqlInjection)
}

function disable () {
if (pgQueryStart.hasSubscribers) pgQueryStart.unsubscribe(analyzePgSqlInjection)
if (pgPoolQueryStart.hasSubscribers) pgPoolQueryStart.unsubscribe(analyzePgSqlInjection)
if (wafRunFinished.hasSubscribers) wafRunFinished.unsubscribe(clearQuerySet)
if (mysql2ConnectionQueryStart.hasSubscribers) mysql2ConnectionQueryStart.unsubscribe(analyzeMysql2SqlInjection)
if (mysql2ConnectionExecuteStart.hasSubscribers) mysql2ConnectionExecuteStart.unsubscribe(analyzeMysql2SqlInjection)
if (mysql2PoolQueryStart.hasSubscribers) mysql2PoolQueryStart.unsubscribe(analyzeMysql2SqlInjection)
if (mysql2PoolExecuteStart.hasSubscribers) mysql2PoolExecuteStart.unsubscribe(analyzeMysql2SqlInjection)
if (mysql2PoolNamespaceQueryStart.hasSubscribers) mysql2PoolNamespaceQueryStart.unsubscribe(analyzeMysql2SqlInjection)
if (mysql2PoolNamespaceExecuteStart.hasSubscribers) {
mysql2PoolNamespaceExecuteStart.unsubscribe(analyzeMysql2SqlInjection)
}
if (mysql2OuterQueryStart.hasSubscribers) mysql2OuterQueryStart.unsubscribe(analyzeMysql2SqlInjection)
}

function analyzeMysql2SqlInjection (ctx) {
Expand Down Expand Up @@ -76,7 +59,7 @@ function analyzeSqlInjection (query, dbSystem, abortController) {
let executedQueries = reqQueryMap.get(req)
if (executedQueries?.has(query)) return

// Do not waste time executing same query twice
// Do not waste time checking same query twice
// This also will prevent double calls in pg.Pool internal queries
if (!executedQueries) {
executedQueries = new Set()
Expand Down

0 comments on commit c691443

Please sign in to comment.