From a087d7b10d37455dc7039192add1bef6df7de4c8 Mon Sep 17 00:00:00 2001 From: William Conti <58711692+wconti27@users.noreply.github.com> Date: Thu, 19 Sep 2024 09:34:02 -0400 Subject: [PATCH] add dsm schema tracking (#4687) --- packages/dd-trace/src/constants.js | 9 +- packages/dd-trace/src/datastreams/fnv.js | 23 ++++ .../dd-trace/src/datastreams/processor.js | 29 ++++ .../src/datastreams/schemas/schema.js | 8 ++ .../src/datastreams/schemas/schema_builder.js | 125 ++++++++++++++++++ .../src/datastreams/schemas/schema_sampler.js | 29 ++++ .../schemas/schema_builder.spec.js | 57 ++++++++ .../schemas/schema_sampler.spec.js | 39 ++++++ 8 files changed, 318 insertions(+), 1 deletion(-) create mode 100644 packages/dd-trace/src/datastreams/fnv.js create mode 100644 packages/dd-trace/src/datastreams/schemas/schema.js create mode 100644 packages/dd-trace/src/datastreams/schemas/schema_builder.js create mode 100644 packages/dd-trace/src/datastreams/schemas/schema_sampler.js create mode 100644 packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js create mode 100644 packages/dd-trace/test/datastreams/schemas/schema_sampler.spec.js diff --git a/packages/dd-trace/src/constants.js b/packages/dd-trace/src/constants.js index 4c38b7916e5..61f5b705ddb 100644 --- a/packages/dd-trace/src/constants.js +++ b/packages/dd-trace/src/constants.js @@ -37,5 +37,12 @@ module.exports = { APPSEC_PROPAGATION_KEY: '_dd.p.appsec', PAYLOAD_TAG_REQUEST_PREFIX: 'aws.request.body', PAYLOAD_TAG_RESPONSE_PREFIX: 'aws.response.body', - PAYLOAD_TAGGING_MAX_TAGS: 758 + PAYLOAD_TAGGING_MAX_TAGS: 758, + SCHEMA_DEFINITION: 'schema.definition', + SCHEMA_WEIGHT: 'schema.weight', + SCHEMA_TYPE: 'schema.type', + SCHEMA_ID: 'schema.id', + SCHEMA_TOPIC: 'schema.topic', + SCHEMA_OPERATION: 'schema.operation', + SCHEMA_NAME: 'schema.name' } diff --git a/packages/dd-trace/src/datastreams/fnv.js b/packages/dd-trace/src/datastreams/fnv.js new file mode 100644 index 00000000000..c226ec40cd4 --- /dev/null +++ b/packages/dd-trace/src/datastreams/fnv.js @@ -0,0 +1,23 @@ +const FNV_64_PRIME = BigInt('0x100000001B3') +const FNV1_64_INIT = BigInt('0xCBF29CE484222325') + +function fnv (data, hvalInit, fnvPrime, fnvSize) { + let hval = hvalInit + for (const byte of data) { + hval = (hval * fnvPrime) % fnvSize + hval = hval ^ BigInt(byte) + } + return hval +} + +function fnv64 (data) { + if (!Buffer.isBuffer(data)) { + data = Buffer.from(data, 'utf-8') + } + const byteArray = new Uint8Array(data) + return fnv(byteArray, FNV1_64_INIT, FNV_64_PRIME, BigInt(2) ** BigInt(64)) +} + +module.exports = { + fnv64 +} diff --git a/packages/dd-trace/src/datastreams/processor.js b/packages/dd-trace/src/datastreams/processor.js index cd8220a267e..8670c1571f5 100644 --- a/packages/dd-trace/src/datastreams/processor.js +++ b/packages/dd-trace/src/datastreams/processor.js @@ -9,6 +9,8 @@ const { DataStreamsWriter } = require('./writer') const { computePathwayHash } = require('./pathway') const { types } = require('util') const { PATHWAY_HASH } = require('../../../../ext/tags') +const { SchemaBuilder } = require('./schemas/schema_builder') +const { SchemaSampler } = require('./schemas/schema_sampler') const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex') @@ -194,6 +196,7 @@ class DataStreamsProcessor { this.version = version || '' this.sequence = 0 this.flushInterval = flushInterval + this._schemaSamplers = {} if (this.enabled) { this.timer = setInterval(this.onInterval.bind(this), flushInterval) @@ -352,6 +355,32 @@ class DataStreamsProcessor { setUrl (url) { this.writer.setUrl(url) } + + trySampleSchema (topic) { + const nowMs = Date.now() + + if (!this._schemaSamplers[topic]) { + this._schemaSamplers[topic] = new SchemaSampler() + } + + const sampler = this._schemaSamplers[topic] + return sampler.trySample(nowMs) + } + + canSampleSchema (topic) { + const nowMs = Date.now() + + if (!this._schemaSamplers[topic]) { + this._schemaSamplers[topic] = new SchemaSampler() + } + + const sampler = this._schemaSamplers[topic] + return sampler.canSample(nowMs) + } + + getSchema (schemaName, iterator) { + return SchemaBuilder.getSchema(schemaName, iterator) + } } module.exports = { diff --git a/packages/dd-trace/src/datastreams/schemas/schema.js b/packages/dd-trace/src/datastreams/schemas/schema.js new file mode 100644 index 00000000000..4378e37d080 --- /dev/null +++ b/packages/dd-trace/src/datastreams/schemas/schema.js @@ -0,0 +1,8 @@ +class Schema { + constructor (definition, id) { + this.definition = definition + this.id = id + } +} + +module.exports = { Schema } diff --git a/packages/dd-trace/src/datastreams/schemas/schema_builder.js b/packages/dd-trace/src/datastreams/schemas/schema_builder.js new file mode 100644 index 00000000000..a65863d4d87 --- /dev/null +++ b/packages/dd-trace/src/datastreams/schemas/schema_builder.js @@ -0,0 +1,125 @@ +const LRUCache = require('lru-cache') +const { fnv64 } = require('../fnv') +const { Schema } = require('./schema') + +const maxDepth = 10 +const maxProperties = 1000 +const CACHE = new LRUCache({ max: 32 }) + +class SchemaBuilder { + constructor (iterator) { + this.schema = new OpenApiSchema() + this.iterator = iterator + this.proerties = 0 + } + + addProperty (schemaName, fieldName, isArray, type, description, ref, format, enumValues) { + if (this.properties >= maxProperties) { + return false + } + this.properties += 1 + let property = new OpenApiSchema.PROPERTY(type, description, ref, format, enumValues, null) + if (isArray) { + property = new OpenApiSchema.PROPERTY('array', null, null, null, null, property) + } + this.schema.components.schemas[schemaName].properties[fieldName] = property + return true + } + + build () { + this.iterator.iterateOverSchema(this) + const noNones = convertToJsonCompatible(this.schema) + const definition = jsonStringify(noNones) + const id = fnv64(Buffer.from(definition, 'utf-8')).toString() + return new Schema(definition, id) + } + + shouldExtractSchema (schemaName, depth) { + if (depth > maxDepth) { + return false + } + if (schemaName in this.schema.components.schemas) { + return false + } + this.schema.components.schemas[schemaName] = new OpenApiSchema.SCHEMA() + return true + } + + static getSchema (schemaName, iterator) { + if (!CACHE.has(schemaName)) { + CACHE.set(schemaName, new SchemaBuilder(iterator).build()) + } + return CACHE.get(schemaName) + } +} + +class OpenApiSchema { + constructor () { + this.openapi = '3.0.0' + this.components = new OpenApiComponents() + } +} + +OpenApiSchema.SCHEMA = class { + constructor () { + this.type = 'object' + this.properties = {} + } +} + +OpenApiSchema.PROPERTY = class { + constructor (type, description = null, ref = null, format = null, enumValues = null, items = null) { + this.type = type + this.description = description + this.$ref = ref + this.format = format + this.enum = enumValues + this.items = items + } +} + +class OpenApiComponents { + constructor () { + this.schemas = {} + } +} + +function convertToJsonCompatible (obj) { + if (Array.isArray(obj)) { + return obj.filter(item => item !== null).map(item => convertToJsonCompatible(item)) + } else if (obj && typeof obj === 'object') { + const jsonObj = {} + for (const [key, value] of Object.entries(obj)) { + if (value !== null) { + jsonObj[key] = convertToJsonCompatible(value) + } + } + return jsonObj + } + return obj +} + +function convertKey (key) { + if (key === 'enumValues') { + return 'enum' + } + return key +} + +function jsonStringify (obj, indent = 2) { + // made to stringify json exactly similar to python / java in order for hashing to be the same + const jsonString = JSON.stringify(obj, (_, value) => value, indent) + return jsonString.replace(/^ +/gm, ' ') // Replace leading spaces with single space + .replace(/\n/g, '') // Remove newlines + .replace(/{ /g, '{') // Remove space after '{' + .replace(/ }/g, '}') // Remove space before '}' + .replace(/\[ /g, '[') // Remove space after '[' + .replace(/ \]/g, ']') // Remove space before ']' +} + +module.exports = { + SchemaBuilder, + OpenApiSchema, + convertToJsonCompatible, + convertKey +} diff --git a/packages/dd-trace/src/datastreams/schemas/schema_sampler.js b/packages/dd-trace/src/datastreams/schemas/schema_sampler.js new file mode 100644 index 00000000000..903a4ea1dec --- /dev/null +++ b/packages/dd-trace/src/datastreams/schemas/schema_sampler.js @@ -0,0 +1,29 @@ +const SAMPLE_INTERVAL_MILLIS = 30 * 1000 + +class SchemaSampler { + constructor () { + this.weight = 0 + this.lastSampleMs = 0 + } + + trySample (currentTimeMs) { + if (currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS) { + if (currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS) { + this.lastSampleMs = currentTimeMs + const weight = this.weight + this.weight = 0 + return weight + } + } + return 0 + } + + canSample (currentTimeMs) { + this.weight += 1 + return currentTimeMs >= this.lastSampleMs + SAMPLE_INTERVAL_MILLIS + } +} + +module.exports = { + SchemaSampler +} diff --git a/packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js b/packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js new file mode 100644 index 00000000000..db602ef83aa --- /dev/null +++ b/packages/dd-trace/test/datastreams/schemas/schema_builder.spec.js @@ -0,0 +1,57 @@ +'use strict' + +require('../../setup/tap') + +const { SchemaBuilder } = require('../../../src/datastreams/schemas/schema_builder') +const { expect } = require('chai') + +class Iterator { + iterateOverSchema (builder) { + builder.addProperty('person', 'name', false, 'string', 'name of the person', null, null, null) + builder.addProperty('person', 'phone_numbers', true, 'string', null, null, null, null) + builder.addProperty('person', 'person_name', false, 'string', null, null, null, null) + builder.addProperty('person', 'address', false, 'object', null, '#/components/schemas/address', null, null) + builder.addProperty('address', 'zip', false, 'number', null, null, 'int', null) + builder.addProperty('address', 'street', false, 'string', null, null, null, null) + } +} + +describe('SchemaBuilder', () => { + it('should convert schema correctly to JSON', () => { + const builder = new SchemaBuilder(new Iterator()) + + const shouldExtractPerson = builder.shouldExtractSchema('person', 0) + const shouldExtractAddress = builder.shouldExtractSchema('address', 1) + const shouldExtractPerson2 = builder.shouldExtractSchema('person', 0) + const shouldExtractTooDeep = builder.shouldExtractSchema('city', 11) + const schema = builder.build() + + const expectedSchema = { + components: { + schemas: { + person: { + properties: { + name: { description: 'name of the person', type: 'string' }, + phone_numbers: { items: { type: 'string' }, type: 'array' }, + person_name: { type: 'string' }, + address: { $ref: '#/components/schemas/address', type: 'object' } + }, + type: 'object' + }, + address: { + properties: { zip: { format: 'int', type: 'number' }, street: { type: 'string' } }, + type: 'object' + } + } + }, + openapi: '3.0.0' + } + + expect(JSON.parse(schema.definition)).to.deep.equal(expectedSchema) + expect(schema.id).to.equal('9510078321201428652') + expect(shouldExtractPerson).to.be.true + expect(shouldExtractAddress).to.be.true + expect(shouldExtractPerson2).to.be.false + expect(shouldExtractTooDeep).to.be.false + }) +}) diff --git a/packages/dd-trace/test/datastreams/schemas/schema_sampler.spec.js b/packages/dd-trace/test/datastreams/schemas/schema_sampler.spec.js new file mode 100644 index 00000000000..80e288a66b6 --- /dev/null +++ b/packages/dd-trace/test/datastreams/schemas/schema_sampler.spec.js @@ -0,0 +1,39 @@ +'use strict' + +require('../../setup/tap') + +const { SchemaSampler } = require('../../../src/datastreams/schemas/schema_sampler') +const { expect } = require('chai') + +describe('SchemaSampler', () => { + it('samples with correct weights', () => { + const currentTimeMs = 100000 + const sampler = new SchemaSampler() + + const canSample1 = sampler.canSample(currentTimeMs) + const weight1 = sampler.trySample(currentTimeMs) + + const canSample2 = sampler.canSample(currentTimeMs + 1000) + const weight2 = sampler.trySample(currentTimeMs + 1000) + + const canSample3 = sampler.canSample(currentTimeMs + 2000) + const weight3 = sampler.trySample(currentTimeMs + 2000) + + const canSample4 = sampler.canSample(currentTimeMs + 30000) + const weight4 = sampler.trySample(currentTimeMs + 30000) + + const canSample5 = sampler.canSample(currentTimeMs + 30001) + const weight5 = sampler.trySample(currentTimeMs + 30001) + + expect(canSample1).to.be.true + expect(weight1).to.equal(1) + expect(canSample2).to.be.false + expect(weight2).to.equal(0) + expect(canSample3).to.be.false + expect(weight3).to.equal(0) + expect(canSample4).to.be.true + expect(weight4).to.equal(3) + expect(canSample5).to.be.false + expect(weight5).to.equal(0) + }) +})