Skip to content

Commit

Permalink
feat(kafkajs): add kafka cluster id to spans and dsm metrics (#4808)
Browse files Browse the repository at this point in the history
Adds Kafka cluster ID to KafkaJS spans and DSM metrics
  • Loading branch information
wconti27 authored Oct 25, 2024
1 parent c007354 commit a081659
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 110 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ services:
- KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CLUSTER_ID=r4zt_wrqTRuT7W2NJsB_GA
- CLUSTER_ID=5L6g3nShT-eMCtK--X86sw
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
Expand Down
186 changes: 123 additions & 63 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,45 +52,59 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
const send = producer.send
const bootstrapServers = this._brokers

producer.send = function () {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
const kafkaClusterIdPromise = getKafkaClusterId(this)

return innerAsyncResource.runInAsyncScope(() => {
if (!producerStartCh.hasSubscribers) {
return send.apply(this, arguments)
}
producer.send = function () {
const wrappedSend = (clusterId) => {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')

try {
const { topic, messages = [] } = arguments[0]
for (const message of messages) {
if (message !== null && typeof message === 'object') {
message.headers = message.headers || {}
}
return innerAsyncResource.runInAsyncScope(() => {
if (!producerStartCh.hasSubscribers) {
return send.apply(this, arguments)
}
producerStartCh.publish({ topic, messages, bootstrapServers })

const result = send.apply(this, arguments)

result.then(
innerAsyncResource.bind(res => {
producerFinishCh.publish(undefined)
producerCommitCh.publish(res)
}),
innerAsyncResource.bind(err => {
if (err) {
producerErrorCh.publish(err)

try {
const { topic, messages = [] } = arguments[0]
for (const message of messages) {
if (message !== null && typeof message === 'object') {
message.headers = message.headers || {}
}
producerFinishCh.publish(undefined)
})
)
}
producerStartCh.publish({ topic, messages, bootstrapServers, clusterId })

return result
} catch (e) {
producerErrorCh.publish(e)
producerFinishCh.publish(undefined)
throw e
}
})
const result = send.apply(this, arguments)

result.then(
innerAsyncResource.bind(res => {
producerFinishCh.publish(undefined)
producerCommitCh.publish(res)
}),
innerAsyncResource.bind(err => {
if (err) {
producerErrorCh.publish(err)
}
producerFinishCh.publish(undefined)
})
)

return result
} catch (e) {
producerErrorCh.publish(e)
producerFinishCh.publish(undefined)
throw e
}
})
}

if (!isPromise(kafkaClusterIdPromise)) {
// promise is already resolved
return wrappedSend(kafkaClusterIdPromise)
} else {
// promise is not resolved
return kafkaClusterIdPromise.then((clusterId) => {
return wrappedSend(clusterId)
})
}
}
return producer
})
Expand All @@ -100,59 +114,71 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
return createConsumer.apply(this, arguments)
}

const eachMessageExtractor = (args) => {
const kafkaClusterIdPromise = getKafkaClusterId(this)

const eachMessageExtractor = (args, clusterId) => {
const { topic, partition, message } = args[0]
return { topic, partition, message, groupId }
return { topic, partition, message, groupId, clusterId }
}

const eachBatchExtractor = (args) => {
const eachBatchExtractor = (args, clusterId) => {
const { batch } = args[0]
const { topic, partition, messages } = batch
return { topic, partition, messages, groupId }
return { topic, partition, messages, groupId, clusterId }
}

const consumer = createConsumer.apply(this, arguments)

consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent)

const run = consumer.run

const groupId = arguments[0].groupId

consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
eachMessage = wrapFunction(
eachMessage,
consumerStartCh,
consumerFinishCh,
consumerErrorCh,
eachMessageExtractor
)

eachBatch = wrapFunction(
eachBatch,
batchConsumerStartCh,
batchConsumerFinishCh,
batchConsumerErrorCh,
eachBatchExtractor
)

return run({
eachMessage,
eachBatch,
...runArgs
})
const wrapConsume = (clusterId) => {
return run({
eachMessage: wrappedCallback(
eachMessage,
consumerStartCh,
consumerFinishCh,
consumerErrorCh,
eachMessageExtractor,
clusterId
),
eachBatch: wrappedCallback(
eachBatch,
batchConsumerStartCh,
batchConsumerFinishCh,
batchConsumerErrorCh,
eachBatchExtractor,
clusterId
),
...runArgs
})
}

if (!isPromise(kafkaClusterIdPromise)) {
// promise is already resolved
return wrapConsume(kafkaClusterIdPromise)
} else {
// promise is not resolved
return kafkaClusterIdPromise.then((clusterId) => {
return wrapConsume(clusterId)
})
}
}

return consumer
})
return Kafka
})

const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => {
const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs, clusterId) => {
return typeof fn === 'function'
? function (...args) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const extractedArgs = extractArgs(args)
const extractedArgs = extractArgs(args, clusterId)

startCh.publish(extractedArgs)
try {
const result = fn.apply(this, args)
Expand All @@ -179,3 +205,37 @@ const wrapFunction = (fn, startCh, finishCh, errorCh, extractArgs) => {
}
: fn
}

const getKafkaClusterId = (kafka) => {
if (kafka._ddKafkaClusterId) {
return kafka._ddKafkaClusterId
}

if (!kafka.admin) {
return null
}

const admin = kafka.admin()

if (!admin.describeCluster) {
return null
}

return admin.connect()
.then(() => {
return admin.describeCluster()
})
.then((clusterInfo) => {
const clusterId = clusterInfo?.clusterId
kafka._ddKafkaClusterId = clusterId
admin.disconnect()
return clusterId
})
.catch((error) => {
throw error
})
}

function isPromise (obj) {
return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'
}
9 changes: 6 additions & 3 deletions packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
static get operation () { return 'consume-batch' }

start ({ topic, partition, messages, groupId }) {
start ({ topic, partition, messages, groupId, clusterId }) {
if (!this.config.dsmEnabled) return
for (const message of messages) {
if (!message || !message.headers) continue
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], null, payloadSize)
const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']
if (clusterId) {
edgeTags.push(`kafka_cluster_id:${clusterId}`)
}
this.tracer.setCheckpoint(edgeTags, null, payloadSize)
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions packages/datadog-plugin-kafkajs/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
}
}

start ({ topic, partition, message, groupId }) {
start ({ topic, partition, message, groupId, clusterId }) {
const childOf = extract(this.tracer, message.headers)
const span = this.startSpan({
childOf,
Expand All @@ -71,7 +71,8 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
meta: {
component: 'kafkajs',
'kafka.topic': topic,
'kafka.message.offset': message.offset
'kafka.message.offset': message.offset,
'kafka.cluster_id': clusterId
},
metrics: {
'kafka.partition': partition
Expand All @@ -80,8 +81,11 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
if (this.config.dsmEnabled && message?.headers) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.headers)
this.tracer
.setCheckpoint(['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka'], span, payloadSize)
const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']
if (clusterId) {
edgeTags.push(`kafka_cluster_id:${clusterId}`)
}
this.tracer.setCheckpoint(edgeTags, span, payloadSize)
}

if (afterStartCh.hasSubscribers) {
Expand Down
14 changes: 10 additions & 4 deletions packages/datadog-plugin-kafkajs/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ class KafkajsProducerPlugin extends ProducerPlugin {
}
}

start ({ topic, messages, bootstrapServers }) {
start ({ topic, messages, bootstrapServers, clusterId }) {
const span = this.startSpan({
resource: topic,
meta: {
component: 'kafkajs',
'kafka.topic': topic
'kafka.topic': topic,
'kafka.cluster_id': clusterId
},
metrics: {
'kafka.batch_size': messages.length
Expand All @@ -85,8 +86,13 @@ class KafkajsProducerPlugin extends ProducerPlugin {
this.tracer.inject(span, 'text_map', message.headers)
if (this.config.dsmEnabled) {
const payloadSize = getMessageSize(message)
const dataStreamsContext = this.tracer
.setCheckpoint(['direction:out', `topic:${topic}`, 'type:kafka'], span, payloadSize)
const edgeTags = ['direction:out', `topic:${topic}`, 'type:kafka']

if (clusterId) {
edgeTags.push(`kafka_cluster_id:${clusterId}`)
}

const dataStreamsContext = this.tracer.setCheckpoint(edgeTags, span, payloadSize)
DsmPathwayCodec.encode(dataStreamsContext, message.headers)
}
}
Expand Down
Loading

0 comments on commit a081659

Please sign in to comment.