Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dsm): add datastreams monitoring schema tracking #4687

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion packages/dd-trace/src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,12 @@ module.exports = {
APPSEC_PROPAGATION_KEY: '_dd.p.appsec',
PAYLOAD_TAG_REQUEST_PREFIX: 'aws.request.body',
PAYLOAD_TAG_RESPONSE_PREFIX: 'aws.response.body',
PAYLOAD_TAGGING_MAX_TAGS: 758
PAYLOAD_TAGGING_MAX_TAGS: 758,
SCHEMA_DEFINITION: 'schema.definition',
SCHEMA_WEIGHT: 'schema.weight',
SCHEMA_TYPE: 'schema.type',
SCHEMA_ID: 'schema.id',
SCHEMA_TOPIC: 'schema.topic',
SCHEMA_OPERATION: 'schema.operation',
SCHEMA_NAME: 'schema.name'
}
23 changes: 23 additions & 0 deletions packages/dd-trace/src/datastreams/fnv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const FNV_64_PRIME = BigInt('0x100000001B3')
const FNV1_64_INIT = BigInt('0xCBF29CE484222325')

function fnv (data, hvalInit, fnvPrime, fnvSize) {
let hval = hvalInit
for (const byte of data) {
hval = (hval * fnvPrime) % fnvSize
hval = hval ^ BigInt(byte)
}
return hval
}

function fnv64 (data) {
if (!Buffer.isBuffer(data)) {
data = Buffer.from(data, 'utf-8')
}
const byteArray = new Uint8Array(data)
return fnv(byteArray, FNV1_64_INIT, FNV_64_PRIME, BigInt(2) ** BigInt(64))
}

module.exports = {
fnv64
}
29 changes: 29 additions & 0 deletions packages/dd-trace/src/datastreams/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const { DataStreamsWriter } = require('./writer')
const { computePathwayHash } = require('./pathway')
const { types } = require('util')
const { PATHWAY_HASH } = require('../../../../ext/tags')
const { SchemaBuilder } = require('./schemas/schema_builder')
const { SchemaSampler } = require('./schemas/schema_sampler')

const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex')

Expand Down Expand Up @@ -194,6 +196,7 @@ class DataStreamsProcessor {
this.version = version || ''
this.sequence = 0
this.flushInterval = flushInterval
this._schemaSamplers = {}

if (this.enabled) {
this.timer = setInterval(this.onInterval.bind(this), flushInterval)
Expand Down Expand Up @@ -352,6 +355,32 @@ class DataStreamsProcessor {
setUrl (url) {
this.writer.setUrl(url)
}

trySampleSchema (topic) {
const nowMs = Date.now()

if (!this._schemaSamplers[topic]) {
this._schemaSamplers[topic] = new SchemaSampler()
}

const sampler = this._schemaSamplers[topic]
return sampler.trySample(nowMs)
}

canSampleSchema (topic) {
const nowMs = Date.now()

if (!this._schemaSamplers[topic]) {
this._schemaSamplers[topic] = new SchemaSampler()
}

const sampler = this._schemaSamplers[topic]
return sampler.canSample(nowMs)
}

getSchema (schemaName, iterator) {
return SchemaBuilder.getSchema(schemaName, iterator)
}
}

module.exports = {
Expand Down
8 changes: 8 additions & 0 deletions packages/dd-trace/src/datastreams/schemas/schema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class Schema {
constructor (definition, id) {
this.definition = definition
this.id = id
}
}

module.exports = { Schema }
125 changes: 125 additions & 0 deletions packages/dd-trace/src/datastreams/schemas/schema_builder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
const LRUCache = require('lru-cache')
const { fnv64 } = require('../fnv')
const { Schema } = require('./schema')

const maxDepth = 10
const maxProperties = 1000
const CACHE = new LRUCache({ max: 32 })

class SchemaBuilder {
constructor (iterator) {
this.schema = new OpenApiSchema()
this.iterator = iterator
this.proerties = 0
}

addProperty (schemaName, fieldName, isArray, type, description, ref, format, enumValues) {
if (this.properties >= maxProperties) {
return false
}
this.properties += 1
let property = new OpenApiSchema.PROPERTY(type, description, ref, format, enumValues, null)
if (isArray) {
property = new OpenApiSchema.PROPERTY('array', null, null, null, null, property)
}
this.schema.components.schemas[schemaName].properties[fieldName] = property
return true
}

build () {
this.iterator.iterateOverSchema(this)
const noNones = convertToJsonCompatible(this.schema)
const definition = jsonStringify(noNones)
const id = fnv64(Buffer.from(definition, 'utf-8')).toString()
return new Schema(definition, id)
}

shouldExtractSchema (schemaName, depth) {
if (depth > maxDepth) {
return false
}
if (schemaName in this.schema.components.schemas) {
return false
}
this.schema.components.schemas[schemaName] = new OpenApiSchema.SCHEMA()
return true
}

static getSchema (schemaName, iterator) {
if (!CACHE.has(schemaName)) {
CACHE.set(schemaName, new SchemaBuilder(iterator).build())
}
return CACHE.get(schemaName)
}
}

class OpenApiSchema {
constructor () {
this.openapi = '3.0.0'
this.components = new OpenApiComponents()
}
}

OpenApiSchema.SCHEMA = class {
constructor () {
this.type = 'object'
this.properties = {}
}
}

OpenApiSchema.PROPERTY = class {
constructor (type, description = null, ref = null, format = null, enumValues = null, items = null) {
this.type = type
this.description = description
this.$ref = ref
this.format = format
this.enum = enumValues
this.items = items
}
}

class OpenApiComponents {
constructor () {
this.schemas = {}
}
}

function convertToJsonCompatible (obj) {
if (Array.isArray(obj)) {
return obj.filter(item => item !== null).map(item => convertToJsonCompatible(item))
} else if (obj && typeof obj === 'object') {
const jsonObj = {}
for (const [key, value] of Object.entries(obj)) {
if (value !== null) {
jsonObj[key] = convertToJsonCompatible(value)
}
}
return jsonObj
}
return obj
}

function convertKey (key) {
if (key === 'enumValues') {
return 'enum'
}
return key
}

function jsonStringify (obj, indent = 2) {
// made to stringify json exactly similar to python / java in order for hashing to be the same
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's a good catch. Note that we can also remap things in the backend. But it's better if we are able to get the data right from the start.
Do you know if the order of fields will be the same between languages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, its just based on the order the property was added. Should we sort them alphabetically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current order works, it should be added in a fixed order for a given schema.

const jsonString = JSON.stringify(obj, (_, value) => value, indent)
return jsonString.replace(/^ +/gm, ' ') // Replace leading spaces with single space
.replace(/\n/g, '') // Remove newlines
.replace(/{ /g, '{') // Remove space after '{'
.replace(/ }/g, '}') // Remove space before '}'
.replace(/\[ /g, '[') // Remove space after '['
.replace(/ \]/g, ']') // Remove space before ']'
}

module.exports = {
SchemaBuilder,
OpenApiSchema,
convertToJsonCompatible,
convertKey
}
29 changes: 29 additions & 0 deletions packages/dd-trace/src/datastreams/schemas/schema_sampler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
const SAMPLE_INTERVAL_MILLIS = 30 * 1000

class SchemaSampler {
constructor () {
this.weight = 0
this.lastSampleMs = 0
}

trySample (currentTimeMs) {
if (currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS) {
if (currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS) {
this.lastSampleMs = currentTimeMs
const weight = this.weight
this.weight = 0
return weight
}
}
return 0
}

canSample (currentTimeMs) {
this.weight += 1
return currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS
}
}

module.exports = {
SchemaSampler
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict'

require('../../setup/tap')

const { SchemaBuilder } = require('../../../src/datastreams/schemas/schema_builder')
const { expect } = require('chai')

class Iterator {
iterateOverSchema (builder) {
builder.addProperty('person', 'name', false, 'string', 'name of the person', null, null, null)
builder.addProperty('person', 'phone_numbers', true, 'string', null, null, null, null)
builder.addProperty('person', 'person_name', false, 'string', null, null, null, null)
builder.addProperty('person', 'address', false, 'object', null, '#/components/schemas/address', null, null)
builder.addProperty('address', 'zip', false, 'number', null, null, 'int', null)
builder.addProperty('address', 'street', false, 'string', null, null, null, null)
}
}

describe('SchemaBuilder', () => {
it('should convert schema correctly to JSON', () => {
const builder = new SchemaBuilder(new Iterator())

const shouldExtractPerson = builder.shouldExtractSchema('person', 0)
const shouldExtractAddress = builder.shouldExtractSchema('address', 1)
const shouldExtractPerson2 = builder.shouldExtractSchema('person', 0)
const shouldExtractTooDeep = builder.shouldExtractSchema('city', 11)
const schema = builder.build()

const expectedSchema = {
components: {
schemas: {
person: {
properties: {
name: { description: 'name of the person', type: 'string' },
phone_numbers: { items: { type: 'string' }, type: 'array' },
person_name: { type: 'string' },
address: { $ref: '#/components/schemas/address', type: 'object' }
},
type: 'object'
},
address: {
properties: { zip: { format: 'int', type: 'number' }, street: { type: 'string' } },
type: 'object'
}
}
},
openapi: '3.0.0'
}

expect(JSON.parse(schema.definition)).to.deep.equal(expectedSchema)
expect(schema.id).to.equal('9510078321201428652')
expect(shouldExtractPerson).to.be.true
expect(shouldExtractAddress).to.be.true
expect(shouldExtractPerson2).to.be.false
expect(shouldExtractTooDeep).to.be.false
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict'

require('../../setup/tap')

const { SchemaSampler } = require('../../../src/datastreams/schemas/schema_sampler')
const { expect } = require('chai')

describe('SchemaSampler', () => {
it('samples with correct weights', () => {
const currentTimeMs = 100000
const sampler = new SchemaSampler()

const canSample1 = sampler.canSample(currentTimeMs)
const weight1 = sampler.trySample(currentTimeMs)

const canSample2 = sampler.canSample(currentTimeMs + 1000)
const weight2 = sampler.trySample(currentTimeMs + 1000)

const canSample3 = sampler.canSample(currentTimeMs + 2000)
const weight3 = sampler.trySample(currentTimeMs + 2000)

const canSample4 = sampler.canSample(currentTimeMs + 30000)
const weight4 = sampler.trySample(currentTimeMs + 30000)

const canSample5 = sampler.canSample(currentTimeMs + 30001)
const weight5 = sampler.trySample(currentTimeMs + 30001)

expect(canSample1).to.be.true
expect(weight1).to.equal(1)
expect(canSample2).to.be.false
expect(weight2).to.equal(0)
expect(canSample3).to.be.false
expect(weight3).to.equal(0)
expect(canSample4).to.be.true
expect(weight4).to.equal(3)
expect(canSample5).to.be.false
expect(weight5).to.equal(0)
})
})
Loading