Skip to content

Commit

Permalink
Merge branch 'khanayan123/upgrade-azure-functions-package' of github.…
Browse files Browse the repository at this point in the history
…com:DataDog/dd-trace-js into khanayan123/upgrade-azure-functions-package
  • Loading branch information
khanayan123 committed Oct 30, 2024
2 parents 51c5e03 + 811381a commit 7e07f00
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 8 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
"@datadog/native-appsec": "8.2.1",
"@datadog/native-iast-rewriter": "2.5.0",
"@datadog/native-iast-taint-tracking": "3.2.0",
"@datadog/native-metrics": "^2.0.0",
"@datadog/native-metrics": "^3.0.1",
"@datadog/pprof": "5.4.1",
"@datadog/sketches-js": "^2.1.0",
"@opentelemetry/api": ">=1.0.0 <1.9.0",
Expand Down
9 changes: 8 additions & 1 deletion packages/datadog-plugin-google-cloud-pubsub/src/consumer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')

class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
Expand All @@ -11,7 +12,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
const topic = subscription.metadata && subscription.metadata.topic
const childOf = this.tracer.extract('text_map', message.attributes) || null

this.startSpan({
const span = this.startSpan({
childOf,
resource: topic,
type: 'worker',
Expand All @@ -23,6 +24,12 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
'pubsub.ack': 0
}
})
if (this.config.dsmEnabled && message?.attributes) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.attributes)
this.tracer
.setCheckpoint(['direction:in', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize)
}
}

finish (message) {
Expand Down
8 changes: 8 additions & 0 deletions packages/datadog-plugin-google-cloud-pubsub/src/producer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict'

const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getHeadersSize } = require('../../dd-trace/src/datastreams/processor')

class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
static get id () { return 'google-cloud-pubsub' }
Expand All @@ -25,6 +27,12 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
msg.attributes = {}
}
this.tracer.inject(span, 'text_map', msg.attributes)
if (this.config.dsmEnabled) {
const payloadSize = getHeadersSize(msg)
const dataStreamsContext = this.tracer
.setCheckpoint(['direction:out', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize)
DsmPathwayCodec.encode(dataStreamsContext, msg.attributes)
}
}
}
}
Expand Down
118 changes: 116 additions & 2 deletions packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ const id = require('../../dd-trace/src/id')
const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants')

const { expectedSchema, rawExpectedSchema } = require('./naming')
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor')

// The roundtrip to the pubsub emulator takes time. Sometimes a *long* time.
const TIMEOUT = 30000
const dsmTopicName = 'dsm-topic'

describe('Plugin', () => {
let tracer
Expand All @@ -18,6 +21,7 @@ describe('Plugin', () => {

before(() => {
process.env.PUBSUB_EMULATOR_HOST = 'localhost:8081'
process.env.DD_DATA_STREAMS_ENABLED = true
})

after(() => {
Expand All @@ -34,10 +38,12 @@ describe('Plugin', () => {
let resource
let v1
let gax
let expectedProducerHash
let expectedConsumerHash

describe('without configuration', () => {
beforeEach(() => {
return agent.load('google-cloud-pubsub')
return agent.load('google-cloud-pubsub', { dsmEnabled: false })
})

beforeEach(() => {
Expand Down Expand Up @@ -296,7 +302,8 @@ describe('Plugin', () => {
describe('with configuration', () => {
beforeEach(() => {
return agent.load('google-cloud-pubsub', {
service: 'a_test_service'
service: 'a_test_service',
dsmEnabled: false
})
})

Expand All @@ -322,6 +329,113 @@ describe('Plugin', () => {
})
})

describe('data stream monitoring', () => {
let dsmTopic
let sub
let consume

beforeEach(() => {
return agent.load('google-cloud-pubsub', {
dsmEnabled: true
})
})

before(async () => {
const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get()
project = getProjectId()
resource = `projects/${project}/topics/${dsmTopicName}`
pubsub = new PubSub({ projectId: project })
tracer.use('google-cloud-pubsub', { dsmEnabled: true })

dsmTopic = await pubsub.createTopic(dsmTopicName)
dsmTopic = dsmTopic[0]
sub = await dsmTopic.createSubscription('DSM')
sub = sub[0]
consume = function (cb) {
sub.on('message', cb)
}

const dsmFullTopic = `projects/${project}/topics/${dsmTopicName}`

expectedProducerHash = computePathwayHash(
'test',
'tester',
['direction:out', 'topic:' + dsmFullTopic, 'type:google-pubsub'],
ENTRY_PARENT_HASH
)
expectedConsumerHash = computePathwayHash(
'test',
'tester',
['direction:in', 'topic:' + dsmFullTopic, 'type:google-pubsub'],
expectedProducerHash
)
})

describe('should set a DSM checkpoint', () => {
it('on produce', async () => {
await publish(dsmTopic, { data: Buffer.from('DSM produce checkpoint') })

agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
// we should have 1 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
})
}
})
expect(statsPointsReceived).to.be.at.least(1)
expect(agent.dsmStatsExist(agent, expectedProducerHash.readBigUInt64BE(0).toString())).to.equal(true)
}, { timeoutMs: TIMEOUT })
})

it('on consume', async () => {
await publish(dsmTopic, { data: Buffer.from('DSM consume checkpoint') })
await consume(async () => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
// we should have 2 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
})
}
})
expect(statsPointsReceived).to.be.at.least(2)
expect(agent.dsmStatsExist(agent, expectedConsumerHash.readBigUInt64BE(0).toString())).to.equal(true)
}, { timeoutMs: TIMEOUT })
})
})
})

describe('it should set a message payload size', () => {
let recordCheckpointSpy

beforeEach(() => {
recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint')
})

afterEach(() => {
DataStreamsProcessor.prototype.recordCheckpoint.restore()
})

it('when producing a message', async () => {
await publish(dsmTopic, { data: Buffer.from('DSM produce payload size') })
expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize'))
})

it('when consuming a message', async () => {
await publish(dsmTopic, { data: Buffer.from('DSM consume payload size') })

await consume(async () => {
expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize'))
})
})
})
})

function expectSpanWithDefaults (expected) {
const prefixedResource = [expected.meta['pubsub.method'], resource].filter(x => x).join(' ')
const service = expected.meta['pubsub.method'] ? 'test-pubsub' : 'test'
Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,10 @@
dependencies:
node-gyp-build "^3.9.0"

"@datadog/native-metrics@^2.0.0":
version "2.0.0"
resolved "https://registry.npmjs.org/@datadog/native-metrics/-/native-metrics-2.0.0.tgz"
integrity sha512-YklGVwUtmKGYqFf1MNZuOHvTYdKuR4+Af1XkWcMD8BwOAjxmd9Z+97328rCOY8TFUJzlGUPaXzB8j2qgG/BMwA==
"@datadog/native-metrics@^3.0.1":
version "3.0.1"
resolved "https://registry.yarnpkg.com/@datadog/native-metrics/-/native-metrics-3.0.1.tgz#dc276c93785c0377a048e316f23b7c8ff3acfa84"
integrity sha512-0GuMyYyXf+Qpb/F+Fcekz58f2mO37lit9U3jMbWY/m8kac44gCPABzL5q3gWbdH+hWgqYfQoEYsdNDGSrKfwoQ==
dependencies:
node-addon-api "^6.1.0"
node-gyp-build "^3.9.0"
Expand Down

0 comments on commit 7e07f00

Please sign in to comment.