diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index bb7c0a64c0..0c3a278079 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -8,6 +8,12 @@ All notable changes to experimental packages in this project will be documented ### :boom: Breaking Change * fix(instrumentation)!:remove unused description property from interface [#4847](https://github.com/open-telemetry/opentelemetry-js/pull/4847) @blumamir +* feat(exporter-*-otlp-*)!: use transport interface in node.js exporters [#4743](https://github.com/open-telemetry/opentelemetry-js/pull/4743) @pichlermarc + * (user-facing) `headers` was intended for internal use has been removed from all exporters + * (user-facing) `compression` was intended for internal use and has been removed from all exporters + * (user-facing) `hostname` was intended for use in tests and is not used by any exporters, it will be removed in a future release +* fix(exporter-*-otlp-*)!: ensure `User-Agent` header cannot be overwritten by the user [#4743](https://github.com/open-telemetry/opentelemetry-js/pull/4743) @pichlermarc + * allowing overrides of the `User-Agent` header was not specification compliant. ### :rocket: (Enhancement) @@ -79,7 +85,7 @@ All notable changes to experimental packages in this project will be documented ### :bug: (Bug Fix) -* fix(instrumentation): Update `import-in-the-middle` to fix [numerous bugs](https://github.com/DataDog/import-in-the-middle/releases/tag/v1.8.0) [#4745](https://github.com/open-telemetry/opentelemetry-js/pull/4745) @timfish +* fix(instrumentation): Update `import-in-the-middle` to fix [numerous bugs](https://github.com/DataDog/import-in-the-middle/pull/91) [#4745](https://github.com/open-telemetry/opentelemetry-js/pull/4745) @timfish ### :books: (Refine Doc) diff --git a/experimental/packages/exporter-logs-otlp-http/src/platform/node/OTLPLogExporter.ts b/experimental/packages/exporter-logs-otlp-http/src/platform/node/OTLPLogExporter.ts index 093061424b..1837993e91 100644 --- a/experimental/packages/exporter-logs-otlp-http/src/platform/node/OTLPLogExporter.ts +++ b/experimental/packages/exporter-logs-otlp-http/src/platform/node/OTLPLogExporter.ts @@ -49,16 +49,15 @@ export class OTLPLogExporter ...config, }, JsonLogsSerializer, - 'application/json' + { + ...baggageUtils.parseKeyPairsIntoRecord( + getEnv().OTEL_EXPORTER_OTLP_LOGS_HEADERS + ), + ...parseHeaders(config?.headers), + ...USER_AGENT, + 'Content-Type': 'application/json', + } ); - this.headers = { - ...this.headers, - ...USER_AGENT, - ...baggageUtils.parseKeyPairsIntoRecord( - getEnv().OTEL_EXPORTER_OTLP_LOGS_HEADERS - ), - ...parseHeaders(config?.headers), - }; } getDefaultUrl(config: OTLPExporterNodeConfigBase): string { diff --git a/experimental/packages/exporter-logs-otlp-http/test/node/OTLPLogExporter.test.ts b/experimental/packages/exporter-logs-otlp-http/test/node/OTLPLogExporter.test.ts index e7f1c92e5d..8e6b076ed6 100644 --- a/experimental/packages/exporter-logs-otlp-http/test/node/OTLPLogExporter.test.ts +++ b/experimental/packages/exporter-logs-otlp-http/test/node/OTLPLogExporter.test.ts @@ -43,7 +43,7 @@ class MockedResponse extends Stream { super(); } - send(data: string) { + send(data: Uint8Array) { this.emit('data', data); this.emit('end'); } @@ -65,6 +65,9 @@ describe('OTLPLogExporter', () => { afterEach(() => { fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.restore(); }); @@ -83,7 +86,9 @@ describe('OTLPLogExporter', () => { it('should include user-agent header by default', () => { const exporter = new OTLPLogExporter(); assert.strictEqual( - exporter.headers['User-Agent'], + exporter['_transport']['_transport']['_parameters']['headers'][ + 'User-Agent' + ], `OTel-OTLP-Exporter-JavaScript/${VERSION}` ); }); @@ -91,7 +96,10 @@ describe('OTLPLogExporter', () => { it('should use headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_LOGS_HEADERS = 'foo=bar'; const exporter = new OTLPLogExporter(); - assert.strictEqual(exporter.headers.foo, 'bar'); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'bar' + ); delete envSource.OTEL_EXPORTER_OTLP_LOGS_HEADERS; }); @@ -106,13 +114,19 @@ describe('OTLPLogExporter', () => { it('should override headers defined via env with headers defined in constructor', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; - const collectorExporter = new OTLPLogExporter({ + const exporter = new OTLPLogExporter({ headers: { foo: 'constructor', }, }); - assert.strictEqual(collectorExporter.headers.foo, 'constructor'); - assert.strictEqual(collectorExporter.headers.bar, 'foo'); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'constructor' + ); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); }); @@ -152,10 +166,12 @@ describe('OTLPLogExporter', () => { assert.strictEqual(options.method, 'POST'); assert.strictEqual(options.path, '/'); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); collectorExporter.export(logs, () => {}); @@ -165,10 +181,12 @@ describe('OTLPLogExporter', () => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { assert.strictEqual(options.headers['foo'], 'bar'); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); @@ -180,10 +198,12 @@ describe('OTLPLogExporter', () => { assert.strictEqual(options.agent.keepAlive, true); assert.strictEqual(options.agent.options.keepAliveMsecs, 2000); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); @@ -192,10 +212,13 @@ describe('OTLPLogExporter', () => { it('should successfully send the logs', done => { const fakeRequest = new Stream.PassThrough(); - sinon.stub(http, 'request').returns(fakeRequest as any); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); + sinon.stub(http, 'request').returns(fakeRequest as any); let buff = Buffer.from(''); - fakeRequest.on('end', () => { + fakeRequest.on('finish', () => { const responseBody = buff.toString(); const json = JSON.parse(responseBody) as IExportLogsServiceRequest; const log1 = json.resourceLogs?.[0].scopeLogs?.[0].logRecords?.[0]; @@ -222,9 +245,11 @@ describe('OTLPLogExporter', () => { const spyLoggerError = sinon.stub(diag, 'error'); sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + }); return fakeRequest as any; }); @@ -237,9 +262,11 @@ describe('OTLPLogExporter', () => { it('should log the error message', done => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - const mockResError = new MockedResponse(400); - cb(mockResError); - mockResError.send('failed'); + queueMicrotask(() => { + const mockRes = new MockedResponse(400); + cb(mockRes); + mockRes.send(Buffer.from('failure')); + }); return fakeRequest as any; }); diff --git a/experimental/packages/exporter-logs-otlp-proto/src/platform/node/OTLPLogExporter.ts b/experimental/packages/exporter-logs-otlp-proto/src/platform/node/OTLPLogExporter.ts index de3f8697b1..893a06b4ab 100644 --- a/experimental/packages/exporter-logs-otlp-proto/src/platform/node/OTLPLogExporter.ts +++ b/experimental/packages/exporter-logs-otlp-proto/src/platform/node/OTLPLogExporter.ts @@ -45,16 +45,14 @@ export class OTLPLogExporter implements LogRecordExporter { constructor(config: OTLPExporterConfigBase = {}) { - super(config, ProtobufLogsSerializer, 'application/x-protobuf'); - const env = getEnv(); - this.headers = { - ...this.headers, - ...USER_AGENT, + super(config, ProtobufLogsSerializer, { ...baggageUtils.parseKeyPairsIntoRecord( - env.OTEL_EXPORTER_OTLP_LOGS_HEADERS + getEnv().OTEL_EXPORTER_OTLP_LOGS_HEADERS ), ...parseHeaders(config?.headers), - }; + ...USER_AGENT, + 'Content-Type': 'application/x-protobuf', + }); } getDefaultUrl(config: OTLPExporterConfigBase): string { diff --git a/experimental/packages/exporter-logs-otlp-proto/test/logHelper.ts b/experimental/packages/exporter-logs-otlp-proto/test/logHelper.ts index ab2f088503..c5a110ca1a 100644 --- a/experimental/packages/exporter-logs-otlp-proto/test/logHelper.ts +++ b/experimental/packages/exporter-logs-otlp-proto/test/logHelper.ts @@ -175,7 +175,7 @@ export class MockedResponse extends Stream { super(); } - send(data: string) { + send(data: Uint8Array) { this.emit('data', data); this.emit('end'); } diff --git a/experimental/packages/exporter-logs-otlp-proto/test/node/OTLPLogExporter.test.ts b/experimental/packages/exporter-logs-otlp-proto/test/node/OTLPLogExporter.test.ts index 71a88635b9..9cf3961fe5 100644 --- a/experimental/packages/exporter-logs-otlp-proto/test/node/OTLPLogExporter.test.ts +++ b/experimental/packages/exporter-logs-otlp-proto/test/node/OTLPLogExporter.test.ts @@ -61,6 +61,9 @@ describe('OTLPLogExporter - node with proto over http', () => { afterEach(() => { fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.restore(); }); @@ -152,34 +155,51 @@ describe('OTLPLogExporter - node with proto over http', () => { it('should include user-agent header by default', () => { const exporter = new OTLPLogExporter(); assert.strictEqual( - exporter.headers['User-Agent'], + exporter['_transport']['_transport']['_parameters']['headers'][ + 'User-Agent' + ], `OTel-OTLP-Exporter-JavaScript/${VERSION}` ); }); it('should use headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_LOGS_HEADERS = 'foo=bar'; - const collectorExporter = new OTLPLogExporter(); - assert.strictEqual(collectorExporter.headers.foo, 'bar'); + const exporter = new OTLPLogExporter(); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'bar' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should override global headers config with signal headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; envSource.OTEL_EXPORTER_OTLP_LOGS_HEADERS = 'foo=boo'; - const collectorExporter = new OTLPLogExporter(); - assert.strictEqual(collectorExporter.headers.foo, 'boo'); - assert.strictEqual(collectorExporter.headers.bar, 'foo'); + const exporter = new OTLPLogExporter(); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'boo' + ); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_LOGS_HEADERS = ''; envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should override headers defined via env with headers defined in constructor', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; - const collectorExporter = new OTLPLogExporter({ + const exporter = new OTLPLogExporter({ headers: { foo: 'constructor', }, }); - assert.strictEqual(collectorExporter.headers.foo, 'constructor'); - assert.strictEqual(collectorExporter.headers.bar, 'foo'); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'constructor' + ); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); }); @@ -209,10 +229,12 @@ describe('OTLPLogExporter - node with proto over http', () => { assert.strictEqual(options.method, 'POST'); assert.strictEqual(options.path, '/'); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); collectorExporter.export(logs, () => {}); @@ -222,10 +244,12 @@ describe('OTLPLogExporter - node with proto over http', () => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { assert.strictEqual(options.headers['foo'], 'bar'); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); collectorExporter.export(logs, () => {}); @@ -236,10 +260,12 @@ describe('OTLPLogExporter - node with proto over http', () => { assert.strictEqual(options.agent.keepAlive, true); assert.strictEqual(options.agent.options.keepAliveMsecs, 2000); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); collectorExporter.export(logs, () => {}); @@ -247,6 +273,9 @@ describe('OTLPLogExporter - node with proto over http', () => { it('should successfully send the logs', done => { const fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.stub(http, 'request').returns(fakeRequest as any); let buff = Buffer.from(''); @@ -277,9 +306,11 @@ describe('OTLPLogExporter - node with proto over http', () => { const spyLoggerError = sinon.stub(diag, 'error'); sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + }); return fakeRequest as any; }); @@ -292,9 +323,11 @@ describe('OTLPLogExporter - node with proto over http', () => { it('should log the error message', done => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - const mockResError = new MockedResponse(400); - cb(mockResError); - mockResError.send('failed'); + queueMicrotask(() => { + const mockRes = new MockedResponse(400); + cb(mockRes); + mockRes.send(Buffer.from('failure')); + }); return fakeRequest as any; }); @@ -329,6 +362,9 @@ describe('OTLPLogExporter - node with proto over http', () => { it('should successfully send the logs', done => { const fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.stub(http, 'request').returns(fakeRequest as any); const spySetHeader = sinon.spy(); (fakeRequest as any).setHeader = spySetHeader; @@ -368,7 +404,7 @@ describe('export - real http request destroyed before response received', () => setTimeout(() => { res.statusCode = 200; res.end(); - }, 200); + }, 1000); }); before(done => { server.listen(8082, done); @@ -386,11 +422,15 @@ describe('export - real http request destroyed before response received', () => logs.push(Object.assign({}, mockedReadableLogRecord)); collectorExporter.export(logs, result => { - assert.strictEqual(result.code, ExportResultCode.FAILED); - const error = result.error as OTLPExporterError; - assert.ok(error !== undefined); - assert.strictEqual(error.message, 'Request Timeout'); - done(); + try { + assert.strictEqual(result.code, ExportResultCode.FAILED); + const error = result.error as OTLPExporterError; + assert.ok(error !== undefined); + assert.strictEqual(error.message, 'Request Timeout'); + done(); + } catch (e) { + done(e); + } }); }); it('should log the timeout request error message when timeout is 100', done => { diff --git a/experimental/packages/exporter-trace-otlp-http/src/platform/node/OTLPTraceExporter.ts b/experimental/packages/exporter-trace-otlp-http/src/platform/node/OTLPTraceExporter.ts index 6a978bc897..aa91b8c237 100644 --- a/experimental/packages/exporter-trace-otlp-http/src/platform/node/OTLPTraceExporter.ts +++ b/experimental/packages/exporter-trace-otlp-http/src/platform/node/OTLPTraceExporter.ts @@ -43,16 +43,14 @@ export class OTLPTraceExporter implements SpanExporter { constructor(config: OTLPExporterNodeConfigBase = {}) { - super(config, JsonTraceSerializer, 'application/json'); - const env = getEnv(); - this.headers = { - ...this.headers, - ...USER_AGENT, + super(config, JsonTraceSerializer, { ...baggageUtils.parseKeyPairsIntoRecord( - env.OTEL_EXPORTER_OTLP_TRACES_HEADERS + getEnv().OTEL_EXPORTER_OTLP_TRACES_HEADERS ), ...parseHeaders(config?.headers), - }; + ...USER_AGENT, + 'Content-Type': 'application/json', + }); } getDefaultUrl(config: OTLPExporterNodeConfigBase): string { diff --git a/experimental/packages/exporter-trace-otlp-http/test/node/CollectorTraceExporter.test.ts b/experimental/packages/exporter-trace-otlp-http/test/node/CollectorTraceExporter.test.ts index e9af4ec37b..171a8dcf14 100644 --- a/experimental/packages/exporter-trace-otlp-http/test/node/CollectorTraceExporter.test.ts +++ b/experimental/packages/exporter-trace-otlp-http/test/node/CollectorTraceExporter.test.ts @@ -33,7 +33,6 @@ import { ensureSpanIsCorrect, mockedReadableSpan, } from '../traceHelper'; -import { nextTick } from 'process'; import { MockedResponse } from './nodeHelpers'; import { IExportTraceServiceRequest } from '@opentelemetry/otlp-transformer'; import { VERSION } from '../../src/version'; @@ -51,6 +50,9 @@ describe('OTLPTraceExporter - node with json over http', () => { afterEach(() => { fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.restore(); }); @@ -166,51 +168,54 @@ describe('OTLPTraceExporter - node with json over http', () => { }); it('should use headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar'; - const collectorExporter = new OTLPTraceExporter(); - assert.strictEqual(collectorExporter.headers.foo, 'bar'); + const exporter = new OTLPTraceExporter(); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'bar' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should include user agent in header', () => { - const collectorExporter = new OTLPTraceExporter(); + const exporter = new OTLPTraceExporter(); assert.strictEqual( - collectorExporter.headers['User-Agent'], + exporter['_transport']['_transport']['_parameters']['headers'][ + 'User-Agent' + ], `OTel-OTLP-Exporter-JavaScript/${VERSION}` ); }); it('should override global headers config with signal headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; envSource.OTEL_EXPORTER_OTLP_TRACES_HEADERS = 'foo=boo'; - const collectorExporter = new OTLPTraceExporter(); - assert.strictEqual(collectorExporter.headers.foo, 'boo'); - assert.strictEqual(collectorExporter.headers.bar, 'foo'); + const exporter = new OTLPTraceExporter(); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'boo' + ); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_TRACES_HEADERS = ''; envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should override headers defined via env with headers defined in constructor', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; - const collectorExporter = new OTLPTraceExporter({ + const exporter = new OTLPTraceExporter({ headers: { foo: 'constructor', }, }); - assert.strictEqual(collectorExporter.headers.foo, 'constructor'); - assert.strictEqual(collectorExporter.headers.bar, 'foo'); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'constructor' + ); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); - it('should use compression defined via env', () => { - envSource.OTEL_EXPORTER_OTLP_COMPRESSION = 'gzip'; - const collectorExporter = new OTLPTraceExporter(); - assert.strictEqual(collectorExporter.compression, 'gzip'); - envSource.OTEL_EXPORTER_OTLP_COMPRESSION = ''; - }); - it('should override global compression config with signal compression defined via env', () => { - envSource.OTEL_EXPORTER_OTLP_COMPRESSION = 'foo'; - envSource.OTEL_EXPORTER_OTLP_TRACES_COMPRESSION = 'gzip'; - const collectorExporter = new OTLPTraceExporter(); - assert.strictEqual(collectorExporter.compression, 'gzip'); - envSource.OTEL_EXPORTER_OTLP_COMPRESSION = ''; - envSource.OTEL_EXPORTER_OTLP_TRACES_COMPRESSION = ''; - }); }); describe('export', () => { @@ -234,11 +239,13 @@ describe('OTLPTraceExporter - node with json over http', () => { collectorExporter.export(spans, () => {}); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); const options = args[0]; assert.strictEqual(options.hostname, 'foo.bar.com'); @@ -252,11 +259,13 @@ describe('OTLPTraceExporter - node with json over http', () => { collectorExporter.export(spans, () => {}); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); const options = args[0]; assert.strictEqual(options.headers['foo'], 'bar'); @@ -268,11 +277,13 @@ describe('OTLPTraceExporter - node with json over http', () => { collectorExporter.export(spans, () => {}); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); const options = args[0]; assert.strictEqual(options.headers['Content-Encoding'], undefined); @@ -284,11 +295,14 @@ describe('OTLPTraceExporter - node with json over http', () => { collectorExporter.export(spans, () => {}); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); const options = args[0]; const agent = options.agent; @@ -302,15 +316,18 @@ describe('OTLPTraceExporter - node with json over http', () => { const clock = sinon.useFakeTimers(); collectorExporter.export(spans, () => {}); - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; + const mockRes = new MockedResponse(200); + + queueMicrotask(() => { + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); - callback(mockRes); - mockRes.send('success'); clock.restore(); - nextTick(() => { + queueMicrotask(() => { const clock = sinon.useFakeTimers(); collectorExporter.export(spans, () => {}); @@ -319,7 +336,7 @@ describe('OTLPTraceExporter - node with json over http', () => { const callback2 = args2[1]; callback2(mockRes); - mockRes2.send('success'); + mockRes2.send(Buffer.from('success')); const [firstExportAgent, secondExportAgent] = stubRequest.args.map( a => a[0].agent @@ -334,7 +351,7 @@ describe('OTLPTraceExporter - node with json over http', () => { it('should successfully send the spans', done => { let buff = Buffer.from(''); - fakeRequest.on('end', () => { + fakeRequest.on('finish', () => { const responseBody = buff.toString(); const json = JSON.parse(responseBody) as IExportTraceServiceRequest; const span1 = json.resourceSpans?.[0].scopeSpans?.[0].spans?.[0]; @@ -357,7 +374,7 @@ describe('OTLPTraceExporter - node with json over http', () => { const callback = args[1]; callback(mockRes); - mockRes.send('success'); + mockRes.send(Buffer.from('success')); }); it('should log the successful message', done => { @@ -367,12 +384,15 @@ describe('OTLPTraceExporter - node with json over http', () => { collectorExporter.export(spans, responseSpy); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); + setTimeout(() => { assert.strictEqual(stubLoggerError.args.length, 0); assert.strictEqual( @@ -389,11 +409,14 @@ describe('OTLPTraceExporter - node with json over http', () => { collectorExporter.export(spans, responseSpy); setTimeout(() => { - const mockResError = new MockedResponse(400); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockResError); - mockResError.send('failed'); + + queueMicrotask(() => { + const mockRes = new MockedResponse(400); + callback(mockRes); + mockRes.send(Buffer.from('failure')); + }); setTimeout(() => { const result = responseSpy.args[0][0] as core.ExportResult; @@ -401,7 +424,6 @@ describe('OTLPTraceExporter - node with json over http', () => { const error = result.error as OTLPExporterError; assert.ok(error !== undefined); assert.strictEqual(error.code, 400); - assert.strictEqual(error.data, 'failed'); done(); }); }); @@ -431,7 +453,7 @@ describe('OTLPTraceExporter - node with json over http', () => { it('should successfully send the spans', done => { let buff = Buffer.from(''); - fakeRequest.on('end', () => { + fakeRequest.on('finish', () => { const responseBody = zlib.gunzipSync(buff).toString(); const json = JSON.parse(responseBody) as IExportTraceServiceRequest; @@ -455,7 +477,7 @@ describe('OTLPTraceExporter - node with json over http', () => { const callback = args[1]; callback(mockRes); - mockRes.send('success'); + mockRes.send(Buffer.from('success')); }); }); @@ -483,6 +505,9 @@ describe('OTLPTraceExporter - node with json over http', () => { describe('export - with timeout', () => { beforeEach(() => { fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any); spySetHeader = sinon.spy(); (fakeRequest as any).setHeader = spySetHeader; @@ -501,6 +526,7 @@ describe('OTLPTraceExporter - node with json over http', () => { spans = []; spans.push(Object.assign({}, mockedReadableSpan)); }); + it('should log the timeout request error message', done => { const responseSpy = sinon.spy(); collectorExporter.export(spans, responseSpy); @@ -513,7 +539,7 @@ describe('OTLPTraceExporter - node with json over http', () => { assert.strictEqual(result.code, core.ExportResultCode.FAILED); const error = result.error as OTLPExporterError; assert.ok(error !== undefined); - assert.strictEqual(error.message, 'Request Timeout'); + assert.deepEqual(error, { code: 'ECONNRESET' }); done(); }); diff --git a/experimental/packages/exporter-trace-otlp-http/test/node/nodeHelpers.ts b/experimental/packages/exporter-trace-otlp-http/test/node/nodeHelpers.ts index d2dce6517b..e63d21b17c 100644 --- a/experimental/packages/exporter-trace-otlp-http/test/node/nodeHelpers.ts +++ b/experimental/packages/exporter-trace-otlp-http/test/node/nodeHelpers.ts @@ -24,7 +24,7 @@ export class MockedResponse extends Stream { super(); } - send(data: string) { + send(data: Uint8Array) { this.emit('data', data); this.emit('end'); } diff --git a/experimental/packages/exporter-trace-otlp-proto/src/platform/node/OTLPTraceExporter.ts b/experimental/packages/exporter-trace-otlp-proto/src/platform/node/OTLPTraceExporter.ts index 75637863ed..79da4ddc28 100644 --- a/experimental/packages/exporter-trace-otlp-proto/src/platform/node/OTLPTraceExporter.ts +++ b/experimental/packages/exporter-trace-otlp-proto/src/platform/node/OTLPTraceExporter.ts @@ -43,16 +43,14 @@ export class OTLPTraceExporter implements SpanExporter { constructor(config: OTLPExporterNodeConfigBase = {}) { - super(config, ProtobufTraceSerializer, 'application/x-protobuf'); - const env = getEnv(); - this.headers = { - ...this.headers, - ...USER_AGENT, + super(config, ProtobufTraceSerializer, { ...baggageUtils.parseKeyPairsIntoRecord( - env.OTEL_EXPORTER_OTLP_TRACES_HEADERS + getEnv().OTEL_EXPORTER_OTLP_TRACES_HEADERS ), ...parseHeaders(config?.headers), - }; + ...USER_AGENT, + 'Content-Type': 'application/x-protobuf', + }); } getDefaultUrl(config: OTLPExporterNodeConfigBase) { diff --git a/experimental/packages/exporter-trace-otlp-proto/test/node/OTLPTraceExporter.test.ts b/experimental/packages/exporter-trace-otlp-proto/test/node/OTLPTraceExporter.test.ts index 90c212e8aa..6adf335c1a 100644 --- a/experimental/packages/exporter-trace-otlp-proto/test/node/OTLPTraceExporter.test.ts +++ b/experimental/packages/exporter-trace-otlp-proto/test/node/OTLPTraceExporter.test.ts @@ -63,14 +63,19 @@ describe('OTLPTraceExporter - node with proto over http', () => { afterEach(() => { fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.restore(); }); describe('default behavior for headers', () => { - const collectorExporter = new OTLPTraceExporter(); + const exporter = new OTLPTraceExporter(); it('should include user agent in header', () => { assert.strictEqual( - collectorExporter.headers['User-Agent'], + exporter['_transport']['_transport']['_parameters']['headers'][ + 'User-Agent' + ], `OTel-OTLP-Exporter-JavaScript/${VERSION}` ); }); @@ -164,28 +169,43 @@ describe('OTLPTraceExporter - node with proto over http', () => { }); it('should use headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar'; - const collectorExporter = new OTLPTraceExporter(); - assert.strictEqual(collectorExporter.headers.foo, 'bar'); + const exporter = new OTLPTraceExporter(); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'bar' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should override global headers config with signal headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; envSource.OTEL_EXPORTER_OTLP_TRACES_HEADERS = 'foo=boo'; - const collectorExporter = new OTLPTraceExporter(); - assert.strictEqual(collectorExporter.headers.foo, 'boo'); - assert.strictEqual(collectorExporter.headers.bar, 'foo'); + const exporter = new OTLPTraceExporter(); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'boo' + ); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_TRACES_HEADERS = ''; envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should override headers defined via env with headers defined in constructor', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; - const collectorExporter = new OTLPTraceExporter({ + const exporter = new OTLPTraceExporter({ headers: { foo: 'constructor', }, }); - assert.strictEqual(collectorExporter.headers.foo, 'constructor'); - assert.strictEqual(collectorExporter.headers.bar, 'foo'); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['foo'], + 'constructor' + ); + assert.strictEqual( + exporter['_transport']['_transport']['_parameters']['headers']['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); }); @@ -211,14 +231,20 @@ describe('OTLPTraceExporter - node with proto over http', () => { it('should open the connection', done => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - assert.strictEqual(options.hostname, 'foo.bar.com'); - assert.strictEqual(options.method, 'POST'); - assert.strictEqual(options.path, '/'); - - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + try { + assert.strictEqual(options.hostname, 'foo.bar.com'); + assert.strictEqual(options.method, 'POST'); + assert.strictEqual(options.path, '/'); + } catch (e) { + done(e); + } + + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); @@ -229,10 +255,13 @@ describe('OTLPTraceExporter - node with proto over http', () => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { assert.strictEqual(options.headers['foo'], 'bar'); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); + return fakeRequest as any; }); @@ -244,10 +273,13 @@ describe('OTLPTraceExporter - node with proto over http', () => { assert.strictEqual(options.agent.keepAlive, true); assert.strictEqual(options.agent.options.keepAliveMsecs, 2000); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); + return fakeRequest as any; }); @@ -256,10 +288,13 @@ describe('OTLPTraceExporter - node with proto over http', () => { it('should successfully send the spans', done => { const fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.stub(http, 'request').returns(fakeRequest as any); let buff = Buffer.from(''); - fakeRequest.on('end', () => { + fakeRequest.on('finish', () => { const data = exportRequestServiceProto.decode(buff); const json = data?.toJSON() as IExportTraceServiceRequest; const span1 = json.resourceSpans?.[0].scopeSpans?.[0].spans?.[0]; @@ -286,24 +321,33 @@ describe('OTLPTraceExporter - node with proto over http', () => { const spyLoggerError = sinon.stub(diag, 'error'); sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + }); + return fakeRequest as any; }); collectorExporter.export(spans, result => { - assert.strictEqual(result.code, ExportResultCode.SUCCESS); - assert.strictEqual(spyLoggerError.args.length, 0); - done(); + try { + assert.strictEqual(result.code, ExportResultCode.SUCCESS); + assert.strictEqual(spyLoggerError.args.length, 0); + done(); + } catch (e) { + done(e); + } }); }); it('should log the error message', done => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - const mockResError = new MockedResponse(400); - cb(mockResError); - mockResError.send('failed'); + queueMicrotask(() => { + const mockRes = new MockedResponse(400); + cb(mockRes); + mockRes.send(Buffer.from('failure')); + }); return fakeRequest as any; }); @@ -338,12 +382,15 @@ describe('OTLPTraceExporter - node with proto over http', () => { it('should successfully send the spans', done => { const fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.stub(http, 'request').returns(fakeRequest as any); const spySetHeader = sinon.spy(); (fakeRequest as any).setHeader = spySetHeader; let buff = Buffer.from(''); - fakeRequest.on('end', () => { + fakeRequest.on('finish', () => { const unzippedBuff = zlib.gunzipSync(buff); const data = exportRequestServiceProto.decode(unzippedBuff); const json = data?.toJSON() as IExportTraceServiceRequest; @@ -377,7 +424,7 @@ describe('export - real http request destroyed before response received', () => setTimeout(() => { res.statusCode = 200; res.end(); - }, 200); + }, 1000); }); before(done => { server.listen(8080, done); @@ -385,23 +432,6 @@ describe('export - real http request destroyed before response received', () => after(done => { server.close(done); }); - it('should log the timeout request error message when timeout is 1', done => { - collectorExporterConfig = { - url: 'http://localhost:8080', - timeoutMillis: 1, - }; - collectorExporter = new OTLPTraceExporter(collectorExporterConfig); - spans = []; - spans.push(Object.assign({}, mockedReadableSpan)); - - collectorExporter.export(spans, result => { - assert.strictEqual(result.code, ExportResultCode.FAILED); - const error = result.error as OTLPExporterError; - assert.ok(error !== undefined); - assert.strictEqual(error.message, 'Request Timeout'); - done(); - }); - }); it('should log the timeout request error message when timeout is 100', done => { collectorExporterConfig = { url: 'http://localhost:8080', diff --git a/experimental/packages/exporter-trace-otlp-proto/test/traceHelper.ts b/experimental/packages/exporter-trace-otlp-proto/test/traceHelper.ts index 3a21d9b79d..efbdc03283 100644 --- a/experimental/packages/exporter-trace-otlp-proto/test/traceHelper.ts +++ b/experimental/packages/exporter-trace-otlp-proto/test/traceHelper.ts @@ -268,7 +268,7 @@ export class MockedResponse extends Stream { super(); } - send(data: string) { + send(data: Uint8Array) { this.emit('data', data); this.emit('end'); } diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/src/platform/node/OTLPMetricExporter.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/src/platform/node/OTLPMetricExporter.ts index 1f0dc59b27..bf57b807dc 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/src/platform/node/OTLPMetricExporter.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/src/platform/node/OTLPMetricExporter.ts @@ -42,16 +42,14 @@ class OTLPExporterNodeProxy extends OTLPExporterNodeBase< IExportMetricsServiceResponse > { constructor(config?: OTLPExporterNodeConfigBase & OTLPMetricExporterOptions) { - super(config, JsonMetricsSerializer, 'application/json'); - const env = getEnv(); - this.headers = { - ...this.headers, - ...USER_AGENT, + super(config, JsonMetricsSerializer, { ...baggageUtils.parseKeyPairsIntoRecord( - env.OTEL_EXPORTER_OTLP_METRICS_HEADERS + getEnv().OTEL_EXPORTER_OTLP_METRICS_HEADERS ), ...parseHeaders(config?.headers), - }; + ...USER_AGENT, + 'Content-Type': 'application/json', + }); } getDefaultUrl(config: OTLPExporterNodeConfigBase): string { diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts index b72ed5f8cb..7b19b84f02 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts @@ -74,6 +74,9 @@ describe('OTLPMetricExporter - node with json over http', () => { afterEach(async () => { fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); await shutdown(); sinon.restore(); }); @@ -337,38 +340,62 @@ describe('OTLPMetricExporter - node with json over http', () => { }); it('should use headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar'; - const collectorExporter = new OTLPMetricExporter(); - assert.strictEqual(collectorExporter._otlpExporter.headers.foo, 'bar'); + const exporter = new OTLPMetricExporter(); + assert.strictEqual( + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['foo'], + 'bar' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should include user agent in header', () => { - const collectorExporter = new OTLPMetricExporter(); + const exporter = new OTLPMetricExporter(); assert.strictEqual( - collectorExporter._otlpExporter.headers['User-Agent'], + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['User-Agent'], `OTel-OTLP-Exporter-JavaScript/${VERSION}` ); }); it('should override global headers config with signal headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; envSource.OTEL_EXPORTER_OTLP_METRICS_HEADERS = 'foo=boo'; - const collectorExporter = new OTLPMetricExporter(); - assert.strictEqual(collectorExporter._otlpExporter.headers.foo, 'boo'); - assert.strictEqual(collectorExporter._otlpExporter.headers.bar, 'foo'); + const exporter = new OTLPMetricExporter(); + assert.strictEqual( + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['foo'], + 'boo' + ); + assert.strictEqual( + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_METRICS_HEADERS = ''; envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should override headers defined via env with headers defined in constructor', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; - const collectorExporter = new OTLPMetricExporter({ + const exporter = new OTLPMetricExporter({ headers: { foo: 'constructor', }, }); assert.strictEqual( - collectorExporter._otlpExporter.headers.foo, + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['foo'], 'constructor' ); - assert.strictEqual(collectorExporter._otlpExporter.headers.bar, 'foo'); + assert.strictEqual( + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should use delta temporality defined via env', () => { @@ -468,11 +495,13 @@ describe('OTLPMetricExporter - node with json over http', () => { collectorExporter.export(metrics, () => {}); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); const options = args[0]; assert.strictEqual(options.hostname, 'foo.bar.com'); @@ -486,11 +515,14 @@ describe('OTLPMetricExporter - node with json over http', () => { collectorExporter.export(metrics, () => {}); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); + const options = args[0]; assert.strictEqual(options.headers['foo'], 'bar'); done(); @@ -501,11 +533,14 @@ describe('OTLPMetricExporter - node with json over http', () => { collectorExporter.export(metrics, () => {}); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); const options = args[0]; const agent = options.agent; assert.strictEqual(agent.keepAlive, true); @@ -584,7 +619,7 @@ describe('OTLPMetricExporter - node with json over http', () => { const callback = args[1]; callback(mockRes); - mockRes.send('success'); + mockRes.send(Buffer.from('success')); }); it('should log the successful message', done => { @@ -595,11 +630,15 @@ describe('OTLPMetricExporter - node with json over http', () => { collectorExporter.export(metrics, responseSpy); setTimeout(() => { - const mockRes = new MockedResponse(200); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('success'); + + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + callback(mockRes); + mockRes.send(Buffer.from('success')); + }); + setTimeout(() => { assert.strictEqual(stubLoggerError.args.length, 0); assert.strictEqual( @@ -619,18 +658,20 @@ describe('OTLPMetricExporter - node with json over http', () => { collectorExporter.export(metrics, responseSpy); setTimeout(() => { - const mockRes = new MockedResponse(400); const args = stubRequest.args[0]; const callback = args[1]; - callback(mockRes); - mockRes.send('failed'); + queueMicrotask(() => { + const mockRes = new MockedResponse(400); + callback(mockRes); + mockRes.send(Buffer.from('failure')); + }); + setTimeout(() => { const result = responseSpy.args[0][0] as core.ExportResult; assert.strictEqual(result.code, core.ExportResultCode.FAILED); const error = result.error as OTLPExporterError; assert.ok(error !== undefined); assert.strictEqual(error.code, 400); - assert.strictEqual(error.data, 'failed'); done(); }); }); diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/nodeHelpers.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/nodeHelpers.ts index d2dce6517b..e63d21b17c 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/nodeHelpers.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/nodeHelpers.ts @@ -24,7 +24,7 @@ export class MockedResponse extends Stream { super(); } - send(data: string) { + send(data: Uint8Array) { this.emit('data', data); this.emit('end'); } diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/src/OTLPMetricExporter.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/src/OTLPMetricExporter.ts index 944b3e892b..4834c5a698 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/src/OTLPMetricExporter.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/src/OTLPMetricExporter.ts @@ -42,16 +42,14 @@ class OTLPMetricExporterNodeProxy extends OTLPExporterNodeBase< IExportMetricsServiceResponse > { constructor(config?: OTLPExporterNodeConfigBase & OTLPMetricExporterOptions) { - super(config, ProtobufMetricsSerializer, 'application/x-protobuf'); - const env = getEnv(); - this.headers = { - ...this.headers, - ...USER_AGENT, + super(config, ProtobufMetricsSerializer, { ...baggageUtils.parseKeyPairsIntoRecord( - env.OTEL_EXPORTER_OTLP_METRICS_HEADERS + getEnv().OTEL_EXPORTER_OTLP_METRICS_HEADERS ), ...parseHeaders(config?.headers), - }; + ...USER_AGENT, + 'Content-Type': 'application/x-protobuf', + }); } getDefaultUrl(config: OTLPExporterNodeConfigBase): string { diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/OTLPMetricExporter.test.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/OTLPMetricExporter.test.ts index 969f433458..300fd8eb05 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/OTLPMetricExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/OTLPMetricExporter.test.ts @@ -70,14 +70,19 @@ describe('OTLPMetricExporter - node with proto over http', () => { afterEach(() => { fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.restore(); }); describe('default behavior for headers', () => { - const collectorExporter = new OTLPMetricExporter(); + const exporter = new OTLPMetricExporter(); it('should include user agent in header', () => { assert.strictEqual( - collectorExporter._otlpExporter.headers['User-Agent'], + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['User-Agent'], `OTel-OTLP-Exporter-JavaScript/${VERSION}` ); }); @@ -175,31 +180,53 @@ describe('OTLPMetricExporter - node with proto over http', () => { }); it('should use headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar'; - const collectorExporter = new OTLPMetricExporter(); - assert.strictEqual(collectorExporter._otlpExporter.headers.foo, 'bar'); + const exporter = new OTLPMetricExporter(); + assert.strictEqual( + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['foo'], + 'bar' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should override global headers config with signal headers defined via env', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; envSource.OTEL_EXPORTER_OTLP_METRICS_HEADERS = 'foo=boo'; - const collectorExporter = new OTLPMetricExporter(); - assert.strictEqual(collectorExporter._otlpExporter.headers.foo, 'boo'); - assert.strictEqual(collectorExporter._otlpExporter.headers.bar, 'foo'); + const exporter = new OTLPMetricExporter(); + assert.strictEqual( + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['foo'], + 'boo' + ); + assert.strictEqual( + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_METRICS_HEADERS = ''; envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); it('should override headers defined via env with headers defined in constructor', () => { envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar,bar=foo'; - const collectorExporter = new OTLPMetricExporter({ + const exporter = new OTLPMetricExporter({ headers: { foo: 'constructor', }, }); assert.strictEqual( - collectorExporter._otlpExporter.headers.foo, + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['foo'], 'constructor' ); - assert.strictEqual(collectorExporter._otlpExporter.headers.bar, 'foo'); + assert.strictEqual( + exporter._otlpExporter['_transport']['_transport']['_parameters'][ + 'headers' + ]['bar'], + 'foo' + ); envSource.OTEL_EXPORTER_OTLP_HEADERS = ''; }); }); @@ -246,10 +273,12 @@ describe('OTLPMetricExporter - node with proto over http', () => { assert.strictEqual(options.method, 'POST'); assert.strictEqual(options.path, '/'); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); @@ -260,11 +289,12 @@ describe('OTLPMetricExporter - node with proto over http', () => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { assert.strictEqual(options.headers['foo'], 'bar'); - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - - done(); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); return fakeRequest as any; }); @@ -273,14 +303,19 @@ describe('OTLPMetricExporter - node with proto over http', () => { it('should have keep alive and keepAliveMsecs option set', done => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - assert.strictEqual(options.agent.keepAlive, true); - assert.strictEqual(options.agent.options.keepAliveMsecs, 2000); - - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); - - done(); + try { + assert.strictEqual(options.agent.keepAlive, true); + assert.strictEqual(options.agent.options.keepAliveMsecs, 2000); + + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + done(); + }); + } catch (e) { + done(e); + } return fakeRequest as any; }); @@ -289,74 +324,84 @@ describe('OTLPMetricExporter - node with proto over http', () => { it('should successfully send metrics', done => { const fakeRequest = new Stream.PassThrough(); + Object.defineProperty(fakeRequest, 'setTimeout', { + value: function (_timeout: number) {}, + }); sinon.stub(http, 'request').returns(fakeRequest as any); let buff = Buffer.from(''); - fakeRequest.on('end', () => { - const data = exportRequestServiceProto.decode(buff); - const json = data?.toJSON() as any; - - // The order of the metrics is not guaranteed. - const counterIndex = metrics.scopeMetrics[0].metrics.findIndex( - it => it.descriptor.name === 'int-counter' - ); - const observableIndex = metrics.scopeMetrics[0].metrics.findIndex( - it => it.descriptor.name === 'double-observable-gauge' - ); - const histogramIndex = metrics.scopeMetrics[0].metrics.findIndex( - it => it.descriptor.name === 'int-histogram' - ); - - const metric1 = - json.resourceMetrics[0].scopeMetrics[0].metrics[counterIndex]; - const metric2 = - json.resourceMetrics[0].scopeMetrics[0].metrics[observableIndex]; - const metric3 = - json.resourceMetrics[0].scopeMetrics[0].metrics[histogramIndex]; - - assert.ok(typeof metric1 !== 'undefined', "counter doesn't exist"); - ensureExportedCounterIsCorrect( - metric1, - metrics.scopeMetrics[0].metrics[counterIndex].dataPoints[0].endTime, - metrics.scopeMetrics[0].metrics[counterIndex].dataPoints[0].startTime - ); - assert.ok( - typeof metric2 !== 'undefined', - "observable gauge doesn't exist" - ); - ensureExportedObservableGaugeIsCorrect( - metric2, - metrics.scopeMetrics[0].metrics[observableIndex].dataPoints[0] - .endTime, - metrics.scopeMetrics[0].metrics[observableIndex].dataPoints[0] - .startTime - ); - assert.ok( - typeof metric3 !== 'undefined', - "value recorder doesn't exist" - ); - ensureExportedHistogramIsCorrect( - metric3, - metrics.scopeMetrics[0].metrics[histogramIndex].dataPoints[0].endTime, - metrics.scopeMetrics[0].metrics[histogramIndex].dataPoints[0] - .startTime, - [0, 100], - ['0', '2', '0'] - ); - - ensureExportMetricsServiceRequestIsSet(json); - done(); + fakeRequest.on('finish', () => { + try { + const data = exportRequestServiceProto.decode(buff); + const json = data?.toJSON() as any; + + // The order of the metrics is not guaranteed. + const counterIndex = metrics.scopeMetrics[0].metrics.findIndex( + it => it.descriptor.name === 'int-counter' + ); + const observableIndex = metrics.scopeMetrics[0].metrics.findIndex( + it => it.descriptor.name === 'double-observable-gauge' + ); + const histogramIndex = metrics.scopeMetrics[0].metrics.findIndex( + it => it.descriptor.name === 'int-histogram' + ); + + const metric1 = + json.resourceMetrics[0].scopeMetrics[0].metrics[counterIndex]; + const metric2 = + json.resourceMetrics[0].scopeMetrics[0].metrics[observableIndex]; + const metric3 = + json.resourceMetrics[0].scopeMetrics[0].metrics[histogramIndex]; + + assert.ok(typeof metric1 !== 'undefined', "counter doesn't exist"); + ensureExportedCounterIsCorrect( + metric1, + metrics.scopeMetrics[0].metrics[counterIndex].dataPoints[0].endTime, + metrics.scopeMetrics[0].metrics[counterIndex].dataPoints[0] + .startTime + ); + assert.ok( + typeof metric2 !== 'undefined', + "observable gauge doesn't exist" + ); + ensureExportedObservableGaugeIsCorrect( + metric2, + metrics.scopeMetrics[0].metrics[observableIndex].dataPoints[0] + .endTime, + metrics.scopeMetrics[0].metrics[observableIndex].dataPoints[0] + .startTime + ); + assert.ok( + typeof metric3 !== 'undefined', + "value recorder doesn't exist" + ); + ensureExportedHistogramIsCorrect( + metric3, + metrics.scopeMetrics[0].metrics[histogramIndex].dataPoints[0] + .endTime, + metrics.scopeMetrics[0].metrics[histogramIndex].dataPoints[0] + .startTime, + [0, 100], + ['0', '2', '0'] + ); + + ensureExportMetricsServiceRequestIsSet(json); + done(); + } catch (e) { + done(e); + } }); fakeRequest.on('data', chunk => { buff = Buffer.concat([buff, chunk]); }); - const clock = sinon.useFakeTimers(); - collectorExporter.export(metrics, () => {}); - clock.tick(200); - clock.restore(); + try { + collectorExporter.export(metrics, () => {}); + } catch (error) { + done(error); + } }); it('should log the successful message', done => { @@ -364,9 +409,12 @@ describe('OTLPMetricExporter - node with proto over http', () => { const spyLoggerError = sinon.stub(diag, 'error'); sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - const mockRes = new MockedResponse(200); - cb(mockRes); - mockRes.send('success'); + queueMicrotask(() => { + const mockRes = new MockedResponse(200); + cb(mockRes); + mockRes.send(Buffer.from('success')); + }); + return fakeRequest as any; }); @@ -377,20 +425,26 @@ describe('OTLPMetricExporter - node with proto over http', () => { }); }); - it('should log the error message', done => { + it('should return the error code message', done => { sinon.stub(http, 'request').callsFake((options: any, cb: any) => { - const mockResError = new MockedResponse(400); - cb(mockResError); - mockResError.send('failed'); + queueMicrotask(() => { + const mockRes = new MockedResponse(400); + cb(mockRes); + mockRes.send(Buffer.from('failure')); + }); return fakeRequest as any; }); collectorExporter.export(metrics, result => { - assert.strictEqual(result.code, ExportResultCode.FAILED); - // @ts-expect-error verify error code - assert.strictEqual(result.error.code, 400); - done(); + try { + assert.strictEqual(result.code, ExportResultCode.FAILED); + // @ts-expect-error verify error code + assert.strictEqual(result.error.code, 400); + done(); + } catch (e) { + done(e); + } }); }); }); diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/metricsHelper.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/metricsHelper.ts index ef70a75c05..effc732b37 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/metricsHelper.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-proto/test/metricsHelper.ts @@ -227,7 +227,7 @@ export class MockedResponse extends Stream { super(); } - send(data: string) { + send(data: Uint8Array) { this.emit('data', data); this.emit('end'); } diff --git a/experimental/packages/otlp-exporter-base/src/OTLPExporterBase.ts b/experimental/packages/otlp-exporter-base/src/OTLPExporterBase.ts index 6581e2cf22..0e8c37f00d 100644 --- a/experimental/packages/otlp-exporter-base/src/OTLPExporterBase.ts +++ b/experimental/packages/otlp-exporter-base/src/OTLPExporterBase.ts @@ -35,6 +35,9 @@ export abstract class OTLPExporterBase< ExportItem, > { public readonly url: string; + /** + * @deprecated scheduled for removal. This is only used in tests. + */ public readonly hostname: string | undefined; public readonly timeoutMillis: number; protected _concurrencyLimit: number; diff --git a/experimental/packages/otlp-grpc-exporter-base/src/export-response.ts b/experimental/packages/otlp-exporter-base/src/export-response.ts similarity index 79% rename from experimental/packages/otlp-grpc-exporter-base/src/export-response.ts rename to experimental/packages/otlp-exporter-base/src/export-response.ts index c13af631e1..4e0eb9703e 100644 --- a/experimental/packages/otlp-grpc-exporter-base/src/export-response.ts +++ b/experimental/packages/otlp-exporter-base/src/export-response.ts @@ -24,4 +24,12 @@ export interface ExportResponseFailure { error: Error; } -export type ExportResponse = ExportResponseSuccess | ExportResponseFailure; +export interface ExportResponseRetryable { + status: 'retryable'; + retryInMillis?: number; +} + +export type ExportResponse = + | ExportResponseSuccess + | ExportResponseFailure + | ExportResponseRetryable; diff --git a/experimental/packages/otlp-grpc-exporter-base/src/exporter-transport.ts b/experimental/packages/otlp-exporter-base/src/exporter-transport.ts similarity index 100% rename from experimental/packages/otlp-grpc-exporter-base/src/exporter-transport.ts rename to experimental/packages/otlp-exporter-base/src/exporter-transport.ts diff --git a/experimental/packages/otlp-exporter-base/src/index.ts b/experimental/packages/otlp-exporter-base/src/index.ts index 9ded103782..a2722f7ef5 100644 --- a/experimental/packages/otlp-exporter-base/src/index.ts +++ b/experimental/packages/otlp-exporter-base/src/index.ts @@ -27,3 +27,12 @@ export { configureExporterTimeout, invalidTimeout, } from './util'; + +export { + ExportResponse, + ExportResponseFailure, + ExportResponseSuccess, + ExportResponseRetryable, +} from './export-response'; + +export { IExporterTransport } from './exporter-transport'; diff --git a/experimental/packages/otlp-exporter-base/src/is-export-retryable.ts b/experimental/packages/otlp-exporter-base/src/is-export-retryable.ts new file mode 100644 index 0000000000..8b4569987b --- /dev/null +++ b/experimental/packages/otlp-exporter-base/src/is-export-retryable.ts @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export function isExportRetryable(statusCode: number): boolean { + const retryCodes = [429, 502, 503, 504]; + return retryCodes.includes(statusCode); +} + +export function parseRetryAfterToMills( + retryAfter?: string | undefined | null +): number | undefined { + if (retryAfter == null) { + return undefined; + } + + const seconds = Number.parseInt(retryAfter, 10); + if (Number.isInteger(seconds)) { + return seconds > 0 ? seconds * 1000 : -1; + } + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After#directives + const delay = new Date(retryAfter).getTime() - Date.now(); + + if (delay >= 0) { + return delay; + } + return 0; +} diff --git a/experimental/packages/otlp-exporter-base/src/platform/index.ts b/experimental/packages/otlp-exporter-base/src/platform/index.ts index fc857a5802..08a2b63613 100644 --- a/experimental/packages/otlp-exporter-base/src/platform/index.ts +++ b/experimental/packages/otlp-exporter-base/src/platform/index.ts @@ -16,9 +16,6 @@ export { OTLPExporterNodeBase, - sendWithHttp, - createHttpAgent, - configureCompression, OTLPExporterNodeConfigBase, CompressionAlgorithm, } from './node'; diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts b/experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts index 5d39153831..5ab20427f0 100644 --- a/experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts +++ b/experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts @@ -14,17 +14,16 @@ * limitations under the License. */ -import type * as http from 'http'; -import type * as https from 'https'; - import { OTLPExporterBase } from '../../OTLPExporterBase'; -import { OTLPExporterNodeConfigBase, CompressionAlgorithm } from './types'; -import * as otlpTypes from '../../types'; -import { parseHeaders } from '../../util'; -import { createHttpAgent, sendWithHttp, configureCompression } from './util'; +import { OTLPExporterNodeConfigBase } from './types'; +import { configureCompression } from './util'; import { diag } from '@opentelemetry/api'; import { getEnv, baggageUtils } from '@opentelemetry/core'; import { ISerializer } from '@opentelemetry/otlp-transformer'; +import { IExporterTransport } from '../../exporter-transport'; +import { createHttpExporterTransport } from './http-exporter-transport'; +import { OTLPExporterError } from '../../types'; +import { createRetryingTransport } from '../../retryable-transport'; /** * Collector Metric Exporter abstract base class @@ -33,32 +32,53 @@ export abstract class OTLPExporterNodeBase< ExportItem, ServiceResponse, > extends OTLPExporterBase { - DEFAULT_HEADERS: Record = {}; - headers: Record; - agent: http.Agent | https.Agent | undefined; - compression: CompressionAlgorithm; private _serializer: ISerializer; - private _contentType: string; + private _transport: IExporterTransport; constructor( config: OTLPExporterNodeConfigBase = {}, serializer: ISerializer, - contentType: string + signalSpecificHeaders: Record ) { super(config); - this._contentType = contentType; // eslint-disable-next-line @typescript-eslint/no-explicit-any if ((config as any).metadata) { diag.warn('Metadata cannot be set when using http'); } - this.headers = Object.assign( - this.DEFAULT_HEADERS, - parseHeaders(config.headers), - baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS) - ); - this.agent = createHttpAgent(config); - this.compression = configureCompression(config.compression); this._serializer = serializer; + + // populate keepAlive for use with new settings + if (config?.keepAlive != null) { + if (config.httpAgentOptions != null) { + if (config.httpAgentOptions.keepAlive == null) { + // specific setting is not set, populate with non-specific setting. + config.httpAgentOptions.keepAlive = config.keepAlive; + } + // do nothing, use specific setting otherwise + } else { + // populate specific option if AgentOptions does not exist. + config.httpAgentOptions = { + keepAlive: config.keepAlive, + }; + } + } + const nonSignalSpecificHeaders = baggageUtils.parseKeyPairsIntoRecord( + getEnv().OTEL_EXPORTER_OTLP_HEADERS + ); + + this._transport = createRetryingTransport({ + transport: createHttpExporterTransport({ + agentOptions: config.httpAgentOptions ?? { keepAlive: true }, + compression: configureCompression(config.compression), + headers: Object.assign( + {}, + nonSignalSpecificHeaders, + signalSpecificHeaders + ), + url: this.url, + timeoutMillis: this.timeoutMillis, + }), + }); } onInit(_config: OTLPExporterNodeConfigBase): void {} @@ -66,22 +86,30 @@ export abstract class OTLPExporterNodeBase< send( objects: ExportItem[], onSuccess: () => void, - onError: (error: otlpTypes.OTLPExporterError) => void + onError: (error: OTLPExporterError) => void ): void { if (this._shutdownOnce.isCalled) { diag.debug('Shutdown already started. Cannot send objects'); return; } - const promise = new Promise((resolve, reject) => { - sendWithHttp( - this, - this._serializer.serializeRequest(objects) ?? new Uint8Array(), - this._contentType, - resolve, - reject - ); - }).then(onSuccess, onError); + const data = this._serializer.serializeRequest(objects); + + if (data == null) { + onError(new Error('Could not serialize message')); + return; + } + + const promise = this._transport.send(data).then(response => { + if (response.status === 'success') { + onSuccess(); + return; + } + if (response.status === 'failure' && response.error) { + onError(response.error); + } + onError(new OTLPExporterError('Export failed with unknown error')); + }, onError); this._sendingPromises.push(promise); const popPromise = () => { diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/http-exporter-transport.ts b/experimental/packages/otlp-exporter-base/src/platform/node/http-exporter-transport.ts new file mode 100644 index 0000000000..79647f1d9b --- /dev/null +++ b/experimental/packages/otlp-exporter-base/src/platform/node/http-exporter-transport.ts @@ -0,0 +1,67 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type { + HttpRequestParameters, + sendWithHttp, +} from './http-transport-types'; + +// NOTE: do not change these type imports to actual imports. Doing so WILL break `@opentelemetry/instrumentation-http`, +// as they'd be imported before the http/https modules can be wrapped. +import type * as https from 'https'; +import type * as http from 'http'; +import { ExportResponse } from '../../export-response'; +import { IExporterTransport } from '../../exporter-transport'; + +class HttpExporterTransport implements IExporterTransport { + private _send: sendWithHttp | null = null; + private _agent: http.Agent | https.Agent | null = null; + + constructor(private _parameters: HttpRequestParameters) {} + + async send(data: Uint8Array): Promise { + if (this._send == null) { + // Lazy require to ensure that http/https is not required before instrumentations can wrap it. + const { + sendWithHttp, + createHttpAgent, + // eslint-disable-next-line @typescript-eslint/no-var-requires + } = require('./http-transport-utils'); + this._agent = createHttpAgent( + this._parameters.url, + this._parameters.agentOptions + ); + this._send = sendWithHttp; + } + + return new Promise(resolve => { + // this will always be defined + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this._send?.(this._parameters, this._agent!, data, result => { + resolve(result); + }); + }); + } + shutdown() { + // intentionally left empty, nothing to do. + } +} + +export function createHttpExporterTransport( + parameters: HttpRequestParameters +): IExporterTransport { + return new HttpExporterTransport(parameters); +} diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-types.ts b/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-types.ts new file mode 100644 index 0000000000..da33d02cd9 --- /dev/null +++ b/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-types.ts @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type * as http from 'http'; +import type * as https from 'https'; +import { ExportResponse } from '../../export-response'; + +export type sendWithHttp = ( + params: HttpRequestParameters, + agent: http.Agent | https.Agent, + data: Uint8Array, + onDone: (response: ExportResponse) => void +) => void; + +export interface HttpRequestParameters { + timeoutMillis: number; + url: string; + headers: Record; + compression: 'gzip' | 'none'; + agentOptions: http.AgentOptions | https.AgentOptions; +} diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-utils.ts b/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-utils.ts new file mode 100644 index 0000000000..e1c13855a5 --- /dev/null +++ b/experimental/packages/otlp-exporter-base/src/platform/node/http-transport-utils.ts @@ -0,0 +1,153 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import * as http from 'http'; +import * as https from 'https'; +import * as zlib from 'zlib'; +import { Readable } from 'stream'; +import { HttpRequestParameters } from './http-transport-types'; +import { ExportResponse } from '../../export-response'; +import { + isExportRetryable, + parseRetryAfterToMills, +} from '../../is-export-retryable'; +import { OTLPExporterError } from '../../types'; + +export const DEFAULT_EXPORT_INITIAL_BACKOFF = 1000; +export const DEFAULT_EXPORT_MAX_BACKOFF = 5000; +export const DEFAULT_EXPORT_BACKOFF_MULTIPLIER = 1.5; + +/** + * Sends data using http + * @param params + * @param agent + * @param data + * @param onDone + */ +export function sendWithHttp( + params: HttpRequestParameters, + agent: http.Agent | https.Agent, + data: Uint8Array, + onDone: (response: ExportResponse) => void +): void { + const parsedUrl = new URL(params.url); + const nodeVersion = Number(process.versions.node.split('.')[0]); + + const options: http.RequestOptions | https.RequestOptions = { + hostname: parsedUrl.hostname, + port: parsedUrl.port, + path: parsedUrl.pathname, + method: 'POST', + headers: { + ...params.headers, + }, + agent: agent, + }; + + const request = parsedUrl.protocol === 'http:' ? http.request : https.request; + + const req = request(options, (res: http.IncomingMessage) => { + const responseData: Buffer[] = []; + res.on('data', chunk => responseData.push(chunk)); + + res.on('end', () => { + if (req.destroyed) { + return; + } + if (res.statusCode && res.statusCode < 299) { + onDone({ + status: 'success', + data: Buffer.concat(responseData), + }); + } else if (res.statusCode && isExportRetryable(res.statusCode)) { + onDone({ + status: 'retryable', + retryInMillis: parseRetryAfterToMills(res.headers['retry-after']), + }); + } else { + const error = new OTLPExporterError(res.statusMessage, res.statusCode); + onDone({ + status: 'failure', + error, + }); + } + }); + }); + + req.setTimeout(params.timeoutMillis, () => { + req.destroy(); + onDone({ + status: 'failure', + error: new Error('Request Timeout'), + }); + }); + req.on('error', (error: Error | any) => { + onDone({ + status: 'failure', + error: error, + }); + }); + + const reportTimeoutErrorEvent = nodeVersion >= 14 ? 'close' : 'abort'; + req.on(reportTimeoutErrorEvent, () => { + onDone({ + status: 'failure', + error: new Error('Request timed out'), + }); + }); + + compressAndSend(req, params.compression, data, (error: Error) => { + onDone({ + status: 'failure', + error, + }); + }); +} + +function compressAndSend( + req: http.ClientRequest, + compression: 'gzip' | 'none', + data: Uint8Array, + onError: (error: Error) => void +) { + let dataStream = readableFromUint8Array(data); + + if (compression === 'gzip') { + req.setHeader('Content-Encoding', 'gzip'); + dataStream = dataStream + .on('error', onError) + .pipe(zlib.createGzip()) + .on('error', onError); + } + + dataStream.pipe(req); +} + +function readableFromUint8Array(buff: string | Uint8Array): Readable { + const readable = new Readable(); + readable.push(buff); + readable.push(null); + + return readable; +} + +export function createHttpAgent( + rawUrl: string, + agentOptions: http.AgentOptions | https.AgentOptions +) { + const parsedUrl = new URL(rawUrl); + const Agent = parsedUrl.protocol === 'http:' ? http.Agent : https.Agent; + return new Agent(agentOptions); +} diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/index.ts b/experimental/packages/otlp-exporter-base/src/platform/node/index.ts index b8b13bda20..fcfca512a8 100644 --- a/experimental/packages/otlp-exporter-base/src/platform/node/index.ts +++ b/experimental/packages/otlp-exporter-base/src/platform/node/index.ts @@ -15,5 +15,4 @@ */ export { OTLPExporterNodeBase } from './OTLPExporterNodeBase'; -export { sendWithHttp, createHttpAgent, configureCompression } from './util'; export { OTLPExporterNodeConfigBase, CompressionAlgorithm } from './types'; diff --git a/experimental/packages/otlp-exporter-base/src/platform/node/util.ts b/experimental/packages/otlp-exporter-base/src/platform/node/util.ts index d9bca47088..06a55af9ed 100644 --- a/experimental/packages/otlp-exporter-base/src/platform/node/util.ts +++ b/experimental/packages/otlp-exporter-base/src/platform/node/util.ts @@ -16,179 +16,10 @@ import * as url from 'url'; import * as http from 'http'; import * as https from 'https'; -import * as zlib from 'zlib'; -import { Readable } from 'stream'; -import { OTLPExporterNodeBase } from './OTLPExporterNodeBase'; import { OTLPExporterNodeConfigBase } from '.'; import { diag } from '@opentelemetry/api'; import { CompressionAlgorithm } from './types'; import { getEnv } from '@opentelemetry/core'; -import { OTLPExporterError } from '../../types'; -import { - DEFAULT_EXPORT_MAX_ATTEMPTS, - DEFAULT_EXPORT_INITIAL_BACKOFF, - DEFAULT_EXPORT_BACKOFF_MULTIPLIER, - DEFAULT_EXPORT_MAX_BACKOFF, - isExportRetryable, - parseRetryAfterToMills, -} from '../../util'; - -/** - * Sends data using http - * @param collector - * @param data - * @param contentType - * @param onSuccess - * @param onError - */ -export function sendWithHttp( - collector: OTLPExporterNodeBase, - data: string | Uint8Array, - contentType: string, - onSuccess: () => void, - onError: (error: OTLPExporterError) => void -): void { - const exporterTimeout = collector.timeoutMillis; - const parsedUrl = new url.URL(collector.url); - const nodeVersion = Number(process.versions.node.split('.')[0]); - let retryTimer: ReturnType; - let req: http.ClientRequest; - let reqIsDestroyed = false; - - const exporterTimer = setTimeout(() => { - clearTimeout(retryTimer); - reqIsDestroyed = true; - - if (req.destroyed) { - const err = new OTLPExporterError('Request Timeout'); - onError(err); - } else { - // req.abort() was deprecated since v14 - nodeVersion >= 14 ? req.destroy() : req.abort(); - } - }, exporterTimeout); - - const options: http.RequestOptions | https.RequestOptions = { - hostname: parsedUrl.hostname, - port: parsedUrl.port, - path: parsedUrl.pathname, - method: 'POST', - headers: { - 'Content-Type': contentType, - ...collector.headers, - }, - agent: collector.agent, - }; - - const request = parsedUrl.protocol === 'http:' ? http.request : https.request; - - const sendWithRetry = ( - retries = DEFAULT_EXPORT_MAX_ATTEMPTS, - minDelay = DEFAULT_EXPORT_INITIAL_BACKOFF - ) => { - req = request(options, (res: http.IncomingMessage) => { - let responseData = ''; - res.on('data', chunk => (responseData += chunk)); - - res.on('aborted', () => { - if (reqIsDestroyed) { - const err = new OTLPExporterError('Request Timeout'); - onError(err); - } - }); - - res.on('end', () => { - if (reqIsDestroyed === false) { - if (res.statusCode && res.statusCode < 299) { - diag.debug(`statusCode: ${res.statusCode}`, responseData); - onSuccess(); - // clear all timers since request was completed and promise was resolved - clearTimeout(exporterTimer); - clearTimeout(retryTimer); - } else if ( - res.statusCode && - isExportRetryable(res.statusCode) && - retries > 0 - ) { - let retryTime: number; - minDelay = DEFAULT_EXPORT_BACKOFF_MULTIPLIER * minDelay; - - // retry after interval specified in Retry-After header - if (res.headers['retry-after']) { - retryTime = parseRetryAfterToMills(res.headers['retry-after']!); - } else { - // exponential backoff with jitter - retryTime = Math.round( - Math.random() * (DEFAULT_EXPORT_MAX_BACKOFF - minDelay) + - minDelay - ); - } - - retryTimer = setTimeout(() => { - sendWithRetry(retries - 1, minDelay); - }, retryTime); - } else { - const error = new OTLPExporterError( - res.statusMessage, - res.statusCode, - responseData - ); - onError(error); - // clear all timers since request was completed and promise was resolved - clearTimeout(exporterTimer); - clearTimeout(retryTimer); - } - } - }); - }); - - req.on('error', (error: Error | any) => { - if (reqIsDestroyed) { - const err = new OTLPExporterError('Request Timeout', error.code); - onError(err); - } else { - onError(error); - } - clearTimeout(exporterTimer); - clearTimeout(retryTimer); - }); - - req.on('abort', () => { - if (reqIsDestroyed) { - const err = new OTLPExporterError('Request Timeout'); - onError(err); - } - clearTimeout(exporterTimer); - clearTimeout(retryTimer); - }); - - switch (collector.compression) { - case CompressionAlgorithm.GZIP: { - req.setHeader('Content-Encoding', 'gzip'); - const dataStream = readableFromUnit8Array(data); - dataStream - .on('error', onError) - .pipe(zlib.createGzip()) - .on('error', onError) - .pipe(req); - - break; - } - default: - req.end(Buffer.from(data)); - break; - } - }; - sendWithRetry(); -} - -function readableFromUnit8Array(buff: string | Uint8Array): Readable { - const readable = new Readable(); - readable.push(buff); - readable.push(null); - - return readable; -} export function createHttpAgent( config: OTLPExporterNodeConfigBase diff --git a/experimental/packages/otlp-exporter-base/src/retryable-transport.ts b/experimental/packages/otlp-exporter-base/src/retryable-transport.ts new file mode 100644 index 0000000000..e48192c1fd --- /dev/null +++ b/experimental/packages/otlp-exporter-base/src/retryable-transport.ts @@ -0,0 +1,72 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { IExporterTransport } from './exporter-transport'; +import { ExportResponse } from './export-response'; + +const MAX_ATTEMPTS = 5; +const INITIAL_BACKOFF = 1000; +const MAX_BACKOFF = 5000; +const BACKOFF_MULTIPLIER = 1.5; +const JITTER = 0.2; + +/** + * Get a pseudo-random jitter that falls in the range of [-JITTER, +JITTER] + */ +function getJitter() { + return Math.random() * (2 * JITTER) - JITTER; +} + +class RetryingTransport implements IExporterTransport { + constructor(private _transport: IExporterTransport) {} + + private retry(data: Uint8Array, inMillis: number): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + this._transport.send(data).then(resolve, reject); + }, inMillis); + }); + } + + async send(data: Uint8Array): Promise { + let result = await this._transport.send(data); + let attempts = MAX_ATTEMPTS; + let nextBackoff = INITIAL_BACKOFF; + + while (result.status === 'retryable' && attempts > 0) { + attempts--; + const backoff = Math.min(nextBackoff, MAX_BACKOFF) + getJitter(); + nextBackoff = nextBackoff * BACKOFF_MULTIPLIER; + result = await this.retry(data, result.retryInMillis ?? backoff); + } + + return result; + } + + shutdown() { + return this._transport.shutdown(); + } +} + +/** + * Creates an Exporter Transport that retries on 'retryable' response. + */ +export function createRetryingTransport(options: { + // Underlying transport to wrap. + transport: IExporterTransport; +}): IExporterTransport { + return new RetryingTransport(options.transport); +} diff --git a/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts b/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts new file mode 100644 index 0000000000..cd7be6ebbb --- /dev/null +++ b/experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts @@ -0,0 +1,180 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as sinon from 'sinon'; +import * as assert from 'assert'; +import { IExporterTransport } from '../../src'; +import { createRetryingTransport } from '../../src/retryable-transport'; +import { ExportResponse } from '../../src'; +import { assertRejects } from '../testHelper'; + +describe('RetryingTransport', function () { + describe('send', function () { + it('does not retry when underlying transport succeeds', async function () { + // arrange + const expectedResponse: ExportResponse = { + status: 'success', + }; + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + // make transport succeed + send: sinon.stub().returns(Promise.resolve(expectedResponse)), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + const actualResponse = await transport.send(mockData); + + // assert + sinon.assert.calledOnceWithExactly(transportStubs.send, mockData); + assert.deepEqual(actualResponse, expectedResponse); + }); + + it('does not retry when underlying transport fails', async function () { + // arrange + const expectedResponse: ExportResponse = { + status: 'failure', + error: new Error(), + }; + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + // make transport fail + send: sinon.stub().returns(Promise.resolve(expectedResponse)), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + const actualResponse = await transport.send(mockData); + + // assert + sinon.assert.calledOnceWithExactly(transportStubs.send, mockData); + assert.deepEqual(actualResponse, expectedResponse); + }); + + it('does not retry when underlying transport rejects', async function () { + // arrange + const expectedError = new Error('error'); + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + // make transport reject + send: sinon.stub().rejects(expectedError), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + await assertRejects(() => transport.send(mockData)); + + // assert + sinon.assert.calledOnceWithExactly(transportStubs.send, mockData); + }); + + it('does retry when the underlying transport returns retryable', async function () { + // arrange + const retryResponse: ExportResponse = { + status: 'retryable', + }; + const successResponse: ExportResponse = { + status: 'success', + }; + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + send: sinon + .stub() + .onFirstCall() + .returns(Promise.resolve(retryResponse)) + .onSecondCall() + .returns(Promise.resolve(successResponse)), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + const actualResponse = await transport.send(mockData); + + // assert + sinon.assert.calledTwice(transportStubs.send); + sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData); + assert.deepEqual(actualResponse, successResponse); + }); + + it('does reject when the underlying transport rejects on retry', async function () { + // arrange + const expectedError = new Error('error'); + const retryResponse: ExportResponse = { + status: 'retryable', + }; + + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + send: sinon + .stub() + .onFirstCall() + .resolves(retryResponse) + .onSecondCall() + .rejects(expectedError), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + await assertRejects(() => transport.send(mockData)); + + // assert + sinon.assert.calledTwice(transportStubs.send); + sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData); + }); + + it('does retry 5 times, then resolves as retryable', async function () { + // arrange + // make random return a negative value so that what's passed to setTimeout() is negative and therefore gets executed immediately. + Math.random = sinon.stub().returns(-Infinity); + + const retryResponse: ExportResponse = { + status: 'retryable', + }; + + const mockData = Uint8Array.from([1, 2, 3]); + + const transportStubs = { + send: sinon.stub().resolves(retryResponse), + shutdown: sinon.stub(), + }; + const mockTransport = transportStubs; + const transport = createRetryingTransport({ transport: mockTransport }); + + // act + const result = await transport.send(mockData); + + // assert + sinon.assert.callCount(transportStubs.send, 6); // 1 initial try and 5 retries + sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData); + assert.strictEqual(result, retryResponse); + }); + }); +}); diff --git a/experimental/packages/otlp-exporter-base/test/node/util.test.ts b/experimental/packages/otlp-exporter-base/test/node/util.test.ts index 0d3891cae6..19abbee952 100644 --- a/experimental/packages/otlp-exporter-base/test/node/util.test.ts +++ b/experimental/packages/otlp-exporter-base/test/node/util.test.ts @@ -16,7 +16,6 @@ import * as assert from 'assert'; import { configureExporterTimeout, invalidTimeout } from '../../src/util'; -import { sendWithHttp } from '../../src/platform/node/util'; import { CompressionAlgorithm } from '../../src/platform/node/types'; import { configureCompression } from '../../src/platform/node/util'; import { diag } from '@opentelemetry/api'; @@ -24,31 +23,8 @@ import * as sinon from 'sinon'; import { OTLPExporterNodeBase } from '../../src/platform/node/OTLPExporterNodeBase'; import { OTLPExporterNodeConfigBase } from '../../src/platform/node/types'; -import { OTLPExporterError } from '../../src/types'; -import { PassThrough } from 'stream'; -import * as http from 'http'; -import * as zlib from 'zlib'; import { ISerializer } from '@opentelemetry/otlp-transformer'; -// Meant to simulate http.IncomingMessage, at least the parts that sendWithHttp cares about -// but make it a PassThrough so we can inspect it for the test -class HttpResponse extends PassThrough { - statusCode: number; - statusMessage: string; - - constructor(statusCode = 200, statusMessage = 'OK') { - super(); - this.statusCode = statusCode; - this.statusMessage = statusMessage; - } -} - -// Meant to simulate http.ClientRequest, at least the parts that sendWithHttp cares about -// but make it a PassThrough so we can inspect it for the test -class HttpRequest extends PassThrough { - setHeader(name: string, value: string) {} -} - // Barebones exporter for use by sendWithHttp type ExporterConfig = OTLPExporterNodeConfigBase; class Exporter extends OTLPExporterNodeBase { @@ -68,7 +44,7 @@ const noopSerializer: ISerializer = { describe('force flush', () => { it('forceFlush should flush spans and return', async () => { - const exporter = new Exporter({}, noopSerializer, ''); + const exporter = new Exporter({}, noopSerializer, {}); await exporter.forceFlush(); }); }); @@ -188,167 +164,3 @@ describe('configureCompression', () => { ); }); }); - -describe('sendWithHttp', () => { - let exporter: Exporter; - let httpRequestStub: sinon.SinonStub; - let mockRequest: HttpRequest; - let setHeaderSpy: sinon.SinonSpy; - - const spanData: object = { - foo: 'bar', - bar: 'baz', - }; - - beforeEach(() => { - // Create stub of http.request (used by sendWithHttp) - httpRequestStub = sinon.stub(http, 'request'); - - // Mock out a request - mockRequest = new HttpRequest(); - setHeaderSpy = sinon.spy(mockRequest, 'setHeader'); - - // Mock out response - const response = new HttpResponse(); - response.end('OK'); - - // Stub out http.request so it calls our callback with the mocked response - // and also so it returns our mocked request stream so we can observe. We don't - // really care about the response for the purpose of this test, but we do want - // to observe the request compression behavior. - httpRequestStub.returns(mockRequest).callsArgWith(1, response); - }); - - afterEach(function () { - httpRequestStub.restore(); - setHeaderSpy.restore(); - }); - - it('should send with no compression if configured to do so', () => { - exporter = new Exporter( - { - url: 'http://foobar.com', - compression: CompressionAlgorithm.NONE, - }, - noopSerializer, - '' - ); - const data = JSON.stringify(spanData); - - // Show that data is written to the request stream - let requestData = ''; - mockRequest.on('data', chunk => (requestData += chunk)); - mockRequest.on('end', () => { - assert.strictEqual(requestData, data); - }); - - // use fake timers to replace setTimeout in sendWithHttp function - const clock = sinon.useFakeTimers(); - - sendWithHttp( - exporter, - data, - 'application/json', - () => { - // Show that we aren't setting the gzip encoding header - assert(setHeaderSpy.withArgs('Content-Encoding', 'gzip').notCalled); - }, - (err: OTLPExporterError) => { - assert.fail(err); - } - ); - - clock.restore(); - }); - - it('should send with gzip compression if configured to do so', () => { - exporter = new Exporter( - { - url: 'http://foobar.com', - compression: CompressionAlgorithm.GZIP, - }, - noopSerializer, - '' - ); - - const data = JSON.stringify(spanData); - const compressedData = zlib.gzipSync(Buffer.from(data)); - - // Show that compressed data is written to the request stream - const buffers: Buffer[] = []; - mockRequest.on('data', chunk => buffers.push(Buffer.from(chunk))); - mockRequest.on('end', () => { - assert(Buffer.concat(buffers).equals(compressedData)); - }); - - // use fake timers to replace setTimeout in sendWithHttp function - const clock = sinon.useFakeTimers(); - - sendWithHttp( - exporter, - data, - 'application/json', - () => { - // Show that we are setting the gzip encoding header - assert(setHeaderSpy.withArgs('Content-Encoding', 'gzip').calledOnce); - }, - (err: OTLPExporterError) => { - assert.fail(err); - } - ); - - clock.restore(); - }); - - it('should work with gzip compression enabled even after multiple requests', () => { - exporter = new Exporter( - { - url: 'http://foobar.com', - compression: CompressionAlgorithm.GZIP, - }, - noopSerializer, - '' - ); - - const data = JSON.stringify(spanData); - const compressedData = zlib.gzipSync(Buffer.from(data)); - - for (let i = 0; i < 5; i++) { - mockRequest = new HttpRequest(); - setHeaderSpy.restore(); - setHeaderSpy = sinon.spy(mockRequest, 'setHeader'); - - const response = new HttpResponse(); - response.end('OK'); - - httpRequestStub.restore(); - httpRequestStub = sinon.stub(http, 'request'); - httpRequestStub.returns(mockRequest).callsArgWith(1, response); - - // Show that compressed data is written to the request stream - const buffers: Buffer[] = []; - mockRequest.on('data', chunk => buffers.push(Buffer.from(chunk))); - mockRequest.on('end', () => { - assert(Buffer.concat(buffers).equals(compressedData)); - }); - - // use fake timers to replace setTimeout in sendWithHttp function - const clock = sinon.useFakeTimers(); - - sendWithHttp( - exporter, - data, - 'application/json', - () => { - // Show that we are setting the gzip encoding header - assert(setHeaderSpy.withArgs('Content-Encoding', 'gzip').calledOnce); - }, - (err: OTLPExporterError) => { - assert.fail(err); - } - ); - - clock.restore(); - } - }); -}); diff --git a/experimental/packages/otlp-exporter-base/test/testHelper.ts b/experimental/packages/otlp-exporter-base/test/testHelper.ts index 41b0c95882..c693237511 100644 --- a/experimental/packages/otlp-exporter-base/test/testHelper.ts +++ b/experimental/packages/otlp-exporter-base/test/testHelper.ts @@ -77,3 +77,36 @@ export function ensureHeadersContain( ); }); } + +/** + * Changes to the below code should be applied to opentelemetry-core/test/test-utils.ts too. + */ + +interface ErrorLikeConstructor { + new (): Error; +} + +/** + * Node.js v8.x and browser compatible `assert.rejects`. + */ +export async function assertRejects( + actual: any, + expected?: RegExp | ErrorLikeConstructor +) { + let rejected; + try { + if (typeof actual === 'function') { + await actual(); + } else { + await actual; + } + } catch (err) { + rejected = true; + if (expected != null) { + assert.throws(() => { + throw err; + }, expected); + } + } + assert(rejected, 'Promise not rejected'); +} diff --git a/experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts b/experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts index 8ffc1458b2..646e674bf5 100644 --- a/experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts +++ b/experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts @@ -28,7 +28,7 @@ import { } from './grpc-exporter-transport'; import { configureCompression, configureCredentials } from './util'; import { ISerializer } from '@opentelemetry/otlp-transformer'; -import { IExporterTransport } from './exporter-transport'; +import { IExporterTransport } from '@opentelemetry/otlp-exporter-base'; /** * OTLP Exporter abstract base class diff --git a/experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts b/experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts index 81e7ff4c4d..342e83d91e 100644 --- a/experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts +++ b/experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts @@ -22,8 +22,10 @@ import type { ChannelCredentials, Client, } from '@grpc/grpc-js'; -import { ExportResponse } from './export-response'; -import { IExporterTransport } from './exporter-transport'; +import { + ExportResponse, + IExporterTransport, +} from '@opentelemetry/otlp-exporter-base'; // values taken from '@grpc/grpc-js` so that we don't need to require/import it. const GRPC_COMPRESSION_NONE = 0; diff --git a/experimental/packages/otlp-grpc-exporter-base/test/OTLPGRPCExporterNodeBase.test.ts b/experimental/packages/otlp-grpc-exporter-base/test/OTLPGRPCExporterNodeBase.test.ts index d1fe77f962..99677dd05a 100644 --- a/experimental/packages/otlp-grpc-exporter-base/test/OTLPGRPCExporterNodeBase.test.ts +++ b/experimental/packages/otlp-grpc-exporter-base/test/OTLPGRPCExporterNodeBase.test.ts @@ -19,8 +19,11 @@ import * as assert from 'assert'; import { OTLPGRPCExporterNodeBase } from '../src/OTLPGRPCExporterNodeBase'; import { OTLPGRPCExporterConfigNode } from '../src/types'; import { mockedReadableSpan } from './traceHelper'; -import { ExportResponse, ExportResponseSuccess } from '../src/export-response'; -import { IExporterTransport } from '../src/exporter-transport'; +import { + ExportResponse, + ExportResponseSuccess, + IExporterTransport, +} from '@opentelemetry/otlp-exporter-base'; import { ISerializer } from '@opentelemetry/otlp-transformer'; import sinon = require('sinon'); diff --git a/experimental/packages/otlp-grpc-exporter-base/test/grpc-exporter-transport.test.ts b/experimental/packages/otlp-grpc-exporter-base/test/grpc-exporter-transport.test.ts index 857669a5f4..edd98bcfd6 100644 --- a/experimental/packages/otlp-grpc-exporter-base/test/grpc-exporter-transport.test.ts +++ b/experimental/packages/otlp-grpc-exporter-base/test/grpc-exporter-transport.test.ts @@ -29,7 +29,7 @@ import { types } from 'util'; import { ExportResponseFailure, ExportResponseSuccess, -} from '../src/export-response'; +} from '@opentelemetry/otlp-exporter-base'; const testServiceDefinition = { export: {