Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MLOB-1555] LLM Observability writers #4699

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/llmobs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: LLMObs
rochdev marked this conversation as resolved.
Show resolved Hide resolved

on:
pull_request:
push:
branches: [master]
schedule:
- cron: '0 4 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
cancel-in-progress: true

jobs:
ubuntu:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
rochdev marked this conversation as resolved.
Show resolved Hide resolved
- uses: ./.github/actions/testagent/start
- uses: ./.github/actions/node/setup
- uses: ./.github/actions/install
- uses: ./.github/actions/node/18
- run: yarn test:llmobs:ci
- uses: ./.github/actions/node/20
- run: yarn test:llmobs:ci
- uses: ./.github/actions/node/latest
- run: yarn test:llmobs:ci
- if: always()
uses: ./.github/actions/testagent/logs
- uses: codecov/codecov-action@v3
rochdev marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
"test:core:ci": "npm run test:core -- --coverage --nyc-arg=--include=\"packages/datadog-core/src/**/*.js\"",
"test:lambda": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/dd-trace/test/lambda/**/*.spec.js\"",
"test:lambda:ci": "nyc --no-clean --include \"packages/dd-trace/src/lambda/**/*.js\" -- npm run test:lambda",
"test:llmobs": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/dd-trace/test/llmobs/**/*.spec.js\"",
"test:llmobs:ci": "nyc --no-clean --include \"packages/dd-trace/src/llmobs/**/*.js\" -- npm run test:llmobs",
"test:plugins": "mocha -r \"packages/dd-trace/test/setup/mocha.js\" \"packages/datadog-instrumentations/test/@($(echo $PLUGINS)).spec.js\" \"packages/datadog-plugin-@($(echo $PLUGINS))/test/**/*.spec.js\"",
"test:plugins:ci": "yarn services && nyc --no-clean --include \"packages/datadog-instrumentations/src/@($(echo $PLUGINS)).js\" --include \"packages/datadog-instrumentations/src/@($(echo $PLUGINS))/**/*.js\" --include \"packages/datadog-plugin-@($(echo $PLUGINS))/src/**/*.js\" -- npm run test:plugins",
"test:plugins:upstream": "node ./packages/dd-trace/test/plugins/suite.js",
Expand Down
16 changes: 16 additions & 0 deletions packages/dd-trace/src/llmobs/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict'

module.exports = {
EVP_PROXY_AGENT_BASE_PATH: 'evp_proxy/v2',
EVP_PROXY_AGENT_ENDPOINT: 'evp_proxy/v2/api/v2/llmobs',
EVP_SUBDOMAIN_HEADER_NAME: 'X-Datadog-EVP-Subdomain',
EVP_SUBDOMAIN_HEADER_VALUE: 'llmobs-intake',
AGENTLESS_SPANS_ENDPOINT: '/api/v2/llmobs',
AGENTLESS_EVALULATIONS_ENDPOINT: '/api/intake/llm-obs/v1/eval-metric',

EVP_PAYLOAD_SIZE_LIMIT: 5 << 20, // 5MB (actual limit is 5.1MB)
EVP_EVENT_SIZE_LIMIT: (1 << 20) - 1024, // 999KB (actual limit is 1MB)

DROPPED_IO_COLLECTION_ERROR: 'dropped_io',
DROPPED_VALUE_TEXT: "[This value has been dropped because this span's size exceeds the 1MB size limit.]"
}
16 changes: 16 additions & 0 deletions packages/dd-trace/src/llmobs/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict'

function encodeUnicode (str) {
if (!str) return str
return str.split('').map(char => {
const code = char.charCodeAt(0)
if (code > 127) {
return `\\u${code.toString(16).padStart(4, '0')}`
}
return char
}).join('')
}

module.exports = {
encodeUnicode
}
108 changes: 108 additions & 0 deletions packages/dd-trace/src/llmobs/writers/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
'use strict'

const request = require('../../exporters/common/request')
const { URL, format } = require('url')

const logger = require('../../log')

const { encodeUnicode } = require('../util')

class BaseLLMObsWriter {
constructor ({ interval, timeout, endpoint, intake, eventType, protocol, port }) {
this._interval = interval || 1000 // 1s
this._timeout = timeout || 5000 // 5s
this._eventType = eventType

this._buffer = []
this._bufferLimit = 1000
this._bufferSize = 0

this._url = new URL(format({
protocol: protocol || 'https:',
hostname: intake,
port: port || 443,
pathname: endpoint
}))

this._headers = {
'Content-Type': 'application/json'
}

this._periodic = setInterval(() => {
this.flush()
}, this._interval).unref()

process.once('beforeExit', () => {
this.destroy()
})

this._destroyed = false

logger.debug(`Started ${this.constructor.name} to ${this._url}`)
}

append (event, byteLength) {
if (this._buffer.length >= this._bufferLimit) {
logger.warn(`${this.constructor.name} event buffer full (limit is ${this._bufferLimit}), dropping event`)
return
}

this._bufferSize += byteLength || Buffer.from(JSON.stringify(event)).byteLength
this._buffer.push(event)
}

flush () {
if (this._buffer.length === 0) {
return
}

const events = this._buffer
this._buffer = []
this._bufferSize = 0
const payload = this._encode(this.makePayload(events))

const options = {
headers: this._headers,
method: 'POST',
url: this._url,
timeout: this._timeout
}

request(payload, options, (err, resp, code) => {
if (err) {
logger.error(
`Error sending ${events.length} LLMObs ${this._eventType} events to ${this._url}: ${err.message}`
)
} else if (code >= 300) {
logger.error(
`Error sending ${events.length} LLMObs ${this._eventType} events to ${this._url}: ${code}`
)
} else {
logger.debug(`Sent ${events.length} LLMObs ${this._eventType} events to ${this._url}`)
}
})
}

makePayload (events) {}

destroy () {
if (!this._destroyed) {
logger.debug(`Stopping ${this.constructor.name}`)
clearInterval(this._periodic)
process.removeListener('beforeExit', this.destroy)
this.flush()
this._destroyed = true
}
}

_encode (payload) {
return JSON.stringify(payload, (key, value) => {
if (typeof value === 'string') {
return encodeUnicode(value) // serialize unicode characters
}
return value
Comment on lines +100 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for clarification, can you explain what exactly's happening here? Does json.stringify() get called first then we run the encodeUnicode() helper on the result afterwards?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it gets run as JSON.stringify is happening. when passing a callback function to JSON.stringify, it'll execute that function over any values in the object. since we need to encode unicode characters (ie \u2013) for our decoder on ingestion, this function will make sure we encode those special characters with the correct unicode value (I think json.dumps does this for us on the Python SDK, but JSON.stringify doesn't do it by default here). There might be a better approach for this, will wait for Node.js folks input on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}).replace(/\\\\u/g, '\\u') // remove double escaping
}
}

module.exports = BaseLLMObsWriter
29 changes: 29 additions & 0 deletions packages/dd-trace/src/llmobs/writers/evaluations.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict'

const { AGENTLESS_EVALULATIONS_ENDPOINT } = require('../constants')
const BaseWriter = require('./base')

class LLMObsEvalMetricsWriter extends BaseWriter {
constructor (config) {
super({
endpoint: AGENTLESS_EVALULATIONS_ENDPOINT,
intake: `api.${config.site}`,
eventType: 'evaluation_metric'
})

this._headers['DD-API-KEY'] = config.llmobs?.apiKey || config.apiKey
}

makePayload (events) {
return {
data: {
type: this._eventType,
attributes: {
metrics: events
}
}
}
}
}

module.exports = LLMObsEvalMetricsWriter
19 changes: 19 additions & 0 deletions packages/dd-trace/src/llmobs/writers/spans/agentProxy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict'

const { EVP_SUBDOMAIN_HEADER_NAME, EVP_SUBDOMAIN_HEADER_VALUE, EVP_PROXY_AGENT_ENDPOINT } = require('../../constants')
const LLMObsBaseSpanWriter = require('./base')

class LLMObsAgentProxySpanWriter extends LLMObsBaseSpanWriter {
constructor (config) {
super({
intake: config.hostname || 'localhost',
protocol: 'http:',
endpoint: EVP_PROXY_AGENT_ENDPOINT,
port: config.port
})

this._headers[EVP_SUBDOMAIN_HEADER_NAME] = EVP_SUBDOMAIN_HEADER_VALUE
}
}

module.exports = LLMObsAgentProxySpanWriter
17 changes: 17 additions & 0 deletions packages/dd-trace/src/llmobs/writers/spans/agentless.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'

const { AGENTLESS_SPANS_ENDPOINT } = require('../../constants')
const LLMObsBaseSpanWriter = require('./base')

class LLMObsAgentlessSpanWriter extends LLMObsBaseSpanWriter {
constructor (config) {
super({
intake: `llmobs-intake.${config.site}`,
endpoint: AGENTLESS_SPANS_ENDPOINT
})

this._headers['DD-API-KEY'] = config.llmobs?.apiKey || config.apiKey
}
}

module.exports = LLMObsAgentlessSpanWriter
52 changes: 52 additions & 0 deletions packages/dd-trace/src/llmobs/writers/spans/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict'

const {
EVP_EVENT_SIZE_LIMIT,
EVP_PAYLOAD_SIZE_LIMIT,
DROPPED_VALUE_TEXT,
DROPPED_IO_COLLECTION_ERROR
} = require('../../constants')
const BaseWriter = require('../base')
const logger = require('../../../log')

class LLMObsSpanWriter extends BaseWriter {
constructor (options) {
super({
...options,
eventType: 'span'
})
}

append (event) {
const eventSizeBytes = Buffer.from(JSON.stringify(event)).byteLength
if (eventSizeBytes > EVP_EVENT_SIZE_LIMIT) {
logger.warn(`Dropping event input/output because its size (${eventSizeBytes}) exceeds the 1MB event size limit`)
event = this._truncateSpanEvent(event)
}

if (this._bufferSize + eventSizeBytes > EVP_PAYLOAD_SIZE_LIMIT) {
logger.debug('Flusing queue because queing next event will exceed EvP payload limit')
this.flush()
}

super.append(event, eventSizeBytes)
}

makePayload (events) {
return {
'_dd.stage': 'raw',
event_type: this._eventType,
spans: events
}
}

_truncateSpanEvent (event) {
event.meta.input = { value: DROPPED_VALUE_TEXT }
event.meta.output = { value: DROPPED_VALUE_TEXT }

event.collection_errors = [DROPPED_IO_COLLECTION_ERROR]
return event
}
}

module.exports = LLMObsSpanWriter
17 changes: 17 additions & 0 deletions packages/dd-trace/test/llmobs/util.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'

const {
encodeUnicode
} = require('../../src/llmobs/util')

describe('util', () => {
describe('encodeUnicode', () => {
it('should encode unicode characters', () => {
expect(encodeUnicode('😀')).to.equal('\\ud83d\\ude00')
})

it('should encode only unicode characters in a string', () => {
expect(encodeUnicode('test 😀')).to.equal('test \\ud83d\\ude00')
})
})
})
Loading
Loading