Skip to content

Commit

Permalink
add dsm schema tracking (#4687)
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Sep 19, 2024
1 parent d871335 commit a087d7b
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 1 deletion.
9 changes: 8 additions & 1 deletion packages/dd-trace/src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,12 @@ module.exports = {
APPSEC_PROPAGATION_KEY: '_dd.p.appsec',
PAYLOAD_TAG_REQUEST_PREFIX: 'aws.request.body',
PAYLOAD_TAG_RESPONSE_PREFIX: 'aws.response.body',
PAYLOAD_TAGGING_MAX_TAGS: 758
PAYLOAD_TAGGING_MAX_TAGS: 758,
SCHEMA_DEFINITION: 'schema.definition',
SCHEMA_WEIGHT: 'schema.weight',
SCHEMA_TYPE: 'schema.type',
SCHEMA_ID: 'schema.id',
SCHEMA_TOPIC: 'schema.topic',
SCHEMA_OPERATION: 'schema.operation',
SCHEMA_NAME: 'schema.name'
}
23 changes: 23 additions & 0 deletions packages/dd-trace/src/datastreams/fnv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const FNV_64_PRIME = BigInt('0x100000001B3')
const FNV1_64_INIT = BigInt('0xCBF29CE484222325')

function fnv (data, hvalInit, fnvPrime, fnvSize) {
let hval = hvalInit
for (const byte of data) {
hval = (hval * fnvPrime) % fnvSize
hval = hval ^ BigInt(byte)
}
return hval
}

function fnv64 (data) {
if (!Buffer.isBuffer(data)) {
data = Buffer.from(data, 'utf-8')
}
const byteArray = new Uint8Array(data)
return fnv(byteArray, FNV1_64_INIT, FNV_64_PRIME, BigInt(2) ** BigInt(64))
}

module.exports = {
fnv64
}
29 changes: 29 additions & 0 deletions packages/dd-trace/src/datastreams/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const { DataStreamsWriter } = require('./writer')
const { computePathwayHash } = require('./pathway')
const { types } = require('util')
const { PATHWAY_HASH } = require('../../../../ext/tags')
const { SchemaBuilder } = require('./schemas/schema_builder')
const { SchemaSampler } = require('./schemas/schema_sampler')

const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex')

Expand Down Expand Up @@ -194,6 +196,7 @@ class DataStreamsProcessor {
this.version = version || ''
this.sequence = 0
this.flushInterval = flushInterval
this._schemaSamplers = {}

if (this.enabled) {
this.timer = setInterval(this.onInterval.bind(this), flushInterval)
Expand Down Expand Up @@ -352,6 +355,32 @@ class DataStreamsProcessor {
setUrl (url) {
this.writer.setUrl(url)
}

trySampleSchema (topic) {
const nowMs = Date.now()

if (!this._schemaSamplers[topic]) {
this._schemaSamplers[topic] = new SchemaSampler()
}

const sampler = this._schemaSamplers[topic]
return sampler.trySample(nowMs)
}

canSampleSchema (topic) {
const nowMs = Date.now()

if (!this._schemaSamplers[topic]) {
this._schemaSamplers[topic] = new SchemaSampler()
}

const sampler = this._schemaSamplers[topic]
return sampler.canSample(nowMs)
}

getSchema (schemaName, iterator) {
return SchemaBuilder.getSchema(schemaName, iterator)
}
}

module.exports = {
Expand Down
8 changes: 8 additions & 0 deletions packages/dd-trace/src/datastreams/schemas/schema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class Schema {
constructor (definition, id) {
this.definition = definition
this.id = id
}
}

module.exports = { Schema }
125 changes: 125 additions & 0 deletions packages/dd-trace/src/datastreams/schemas/schema_builder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
const LRUCache = require('lru-cache')
const { fnv64 } = require('../fnv')
const { Schema } = require('./schema')

const maxDepth = 10
const maxProperties = 1000
const CACHE = new LRUCache({ max: 32 })

class SchemaBuilder {
constructor (iterator) {
this.schema = new OpenApiSchema()
this.iterator = iterator
this.proerties = 0
}

addProperty (schemaName, fieldName, isArray, type, description, ref, format, enumValues) {
if (this.properties >= maxProperties) {
return false
}
this.properties += 1
let property = new OpenApiSchema.PROPERTY(type, description, ref, format, enumValues, null)
if (isArray) {
property = new OpenApiSchema.PROPERTY('array', null, null, null, null, property)
}
this.schema.components.schemas[schemaName].properties[fieldName] = property
return true
}

build () {
this.iterator.iterateOverSchema(this)
const noNones = convertToJsonCompatible(this.schema)
const definition = jsonStringify(noNones)
const id = fnv64(Buffer.from(definition, 'utf-8')).toString()
return new Schema(definition, id)
}

shouldExtractSchema (schemaName, depth) {
if (depth > maxDepth) {
return false
}
if (schemaName in this.schema.components.schemas) {
return false
}
this.schema.components.schemas[schemaName] = new OpenApiSchema.SCHEMA()
return true
}

static getSchema (schemaName, iterator) {
if (!CACHE.has(schemaName)) {
CACHE.set(schemaName, new SchemaBuilder(iterator).build())
}
return CACHE.get(schemaName)
}
}

class OpenApiSchema {
constructor () {
this.openapi = '3.0.0'
this.components = new OpenApiComponents()
}
}

OpenApiSchema.SCHEMA = class {
constructor () {
this.type = 'object'
this.properties = {}
}
}

OpenApiSchema.PROPERTY = class {
constructor (type, description = null, ref = null, format = null, enumValues = null, items = null) {
this.type = type
this.description = description
this.$ref = ref
this.format = format
this.enum = enumValues
this.items = items
}
}

class OpenApiComponents {
constructor () {
this.schemas = {}
}
}

function convertToJsonCompatible (obj) {
if (Array.isArray(obj)) {
return obj.filter(item => item !== null).map(item => convertToJsonCompatible(item))
} else if (obj && typeof obj === 'object') {
const jsonObj = {}
for (const [key, value] of Object.entries(obj)) {
if (value !== null) {
jsonObj[key] = convertToJsonCompatible(value)
}
}
return jsonObj
}
return obj
}

function convertKey (key) {
if (key === 'enumValues') {
return 'enum'
}
return key
}

function jsonStringify (obj, indent = 2) {
// made to stringify json exactly similar to python / java in order for hashing to be the same
const jsonString = JSON.stringify(obj, (_, value) => value, indent)
return jsonString.replace(/^ +/gm, ' ') // Replace leading spaces with single space
.replace(/\n/g, '') // Remove newlines
.replace(/{ /g, '{') // Remove space after '{'
.replace(/ }/g, '}') // Remove space before '}'
.replace(/\[ /g, '[') // Remove space after '['
.replace(/ \]/g, ']') // Remove space before ']'
}

module.exports = {
SchemaBuilder,
OpenApiSchema,
convertToJsonCompatible,
convertKey
}
29 changes: 29 additions & 0 deletions packages/dd-trace/src/datastreams/schemas/schema_sampler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
const SAMPLE_INTERVAL_MILLIS = 30 * 1000

class SchemaSampler {
constructor () {
this.weight = 0
this.lastSampleMs = 0
}

trySample (currentTimeMs) {
if (currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS) {
if (currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS) {
this.lastSampleMs = currentTimeMs
const weight = this.weight
this.weight = 0
return weight
}
}
return 0
}

canSample (currentTimeMs) {
this.weight += 1
return currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS
}
}

module.exports = {
SchemaSampler
}
57 changes: 57 additions & 0 deletions packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict'

require('../../setup/tap')

const { SchemaBuilder } = require('../../../src/datastreams/schemas/schema_builder')
const { expect } = require('chai')

class Iterator {
iterateOverSchema (builder) {
builder.addProperty('person', 'name', false, 'string', 'name of the person', null, null, null)
builder.addProperty('person', 'phone_numbers', true, 'string', null, null, null, null)
builder.addProperty('person', 'person_name', false, 'string', null, null, null, null)
builder.addProperty('person', 'address', false, 'object', null, '#/components/schemas/address', null, null)
builder.addProperty('address', 'zip', false, 'number', null, null, 'int', null)
builder.addProperty('address', 'street', false, 'string', null, null, null, null)
}
}

describe('SchemaBuilder', () => {
it('should convert schema correctly to JSON', () => {
const builder = new SchemaBuilder(new Iterator())

const shouldExtractPerson = builder.shouldExtractSchema('person', 0)
const shouldExtractAddress = builder.shouldExtractSchema('address', 1)
const shouldExtractPerson2 = builder.shouldExtractSchema('person', 0)
const shouldExtractTooDeep = builder.shouldExtractSchema('city', 11)
const schema = builder.build()

const expectedSchema = {
components: {
schemas: {
person: {
properties: {
name: { description: 'name of the person', type: 'string' },
phone_numbers: { items: { type: 'string' }, type: 'array' },
person_name: { type: 'string' },
address: { $ref: '#/components/schemas/address', type: 'object' }
},
type: 'object'
},
address: {
properties: { zip: { format: 'int', type: 'number' }, street: { type: 'string' } },
type: 'object'
}
}
},
openapi: '3.0.0'
}

expect(JSON.parse(schema.definition)).to.deep.equal(expectedSchema)
expect(schema.id).to.equal('9510078321201428652')
expect(shouldExtractPerson).to.be.true
expect(shouldExtractAddress).to.be.true
expect(shouldExtractPerson2).to.be.false
expect(shouldExtractTooDeep).to.be.false
})
})
39 changes: 39 additions & 0 deletions packages/dd-trace/test/datastreams/schemas/schema_sampler.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict'

require('../../setup/tap')

const { SchemaSampler } = require('../../../src/datastreams/schemas/schema_sampler')
const { expect } = require('chai')

describe('SchemaSampler', () => {
it('samples with correct weights', () => {
const currentTimeMs = 100000
const sampler = new SchemaSampler()

const canSample1 = sampler.canSample(currentTimeMs)
const weight1 = sampler.trySample(currentTimeMs)

const canSample2 = sampler.canSample(currentTimeMs + 1000)
const weight2 = sampler.trySample(currentTimeMs + 1000)

const canSample3 = sampler.canSample(currentTimeMs + 2000)
const weight3 = sampler.trySample(currentTimeMs + 2000)

const canSample4 = sampler.canSample(currentTimeMs + 30000)
const weight4 = sampler.trySample(currentTimeMs + 30000)

const canSample5 = sampler.canSample(currentTimeMs + 30001)
const weight5 = sampler.trySample(currentTimeMs + 30001)

expect(canSample1).to.be.true
expect(weight1).to.equal(1)
expect(canSample2).to.be.false
expect(weight2).to.equal(0)
expect(canSample3).to.be.false
expect(weight3).to.equal(0)
expect(canSample4).to.be.true
expect(weight4).to.equal(3)
expect(canSample5).to.be.false
expect(weight5).to.equal(0)
})
})

0 comments on commit a087d7b

Please sign in to comment.