diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index eb90f57bef1f..2e6b699dd6ff 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -2,9 +2,14 @@ ## 7.0.0-preview.8 (Unreleased) +- `sendMessages` method on the sender and `tryAdd` method to add messages to a batch now support tracing. + [PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651) ## 7.0.0-preview.7 (2020-10-07) +- [Bug Fix] `sendMessages` method on the sender would have previously thrown an error for sending a batch or an array of messages upon a network disconnect, the issue has been fixed now. + [PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651/commits/f262e4562eb78828ee816a54f9a9778692e0eff9) + ### New features: - Message locks can be auto-renewed in all receive methods (receiver.receiveMessages, receiver.subcribe @@ -17,7 +22,6 @@ - "properties" in the correlation rule filter now supports `Date`. [PR 11117](https://github.com/Azure/azure-sdk-for-js/pull/11117) - ### Breaking changes - `ServiceBusClient.createSessionReceiver` has been split into two methods: diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index 4ebda1dd091a..9363370e1a61 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -15,6 +15,8 @@ import { PageSettings } from '@azure/core-paging'; import { PipelineOptions } from '@azure/core-http'; import { RetryOptions } from '@azure/core-amqp'; import { ServiceClient } from '@azure/core-http'; +import { Span } from '@opentelemetry/api'; +import { SpanContext } from '@opentelemetry/api'; import { TokenCredential } from '@azure/core-amqp'; import { TokenType } from '@azure/core-amqp'; import { UserAgentOptions } from '@azure/core-http'; @@ -385,8 +387,10 @@ export interface ServiceBusMessageBatch { // @internal _generateMessage(): Buffer; readonly maxSizeInBytes: number; + // @internal + readonly _messageSpanContexts: SpanContext[]; readonly sizeInBytes: number; - tryAdd(message: ServiceBusMessage): boolean; + tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean; } // @public @@ -565,6 +569,11 @@ export interface TopicRuntimeProperties { export interface TopicRuntimePropertiesResponse extends TopicRuntimeProperties, Response { } +// @public +export interface TryAddOptions { + parentSpan?: Span | SpanContext | null; +} + export { WebSocketImpl } export { WebSocketOptions } diff --git a/sdk/servicebus/service-bus/src/connectionContext.ts b/sdk/servicebus/service-bus/src/connectionContext.ts index fd35f8139712..9e102393d5b4 100644 --- a/sdk/servicebus/service-bus/src/connectionContext.ts +++ b/sdk/servicebus/service-bus/src/connectionContext.ts @@ -314,23 +314,17 @@ export namespace ConnectionContext { await connectionContext.managementClients[entityPath].close(); } - await refreshConnection(connectionContext); - waitForConnectionRefreshResolve(); - waitForConnectionRefreshPromise = undefined; - // The connection should always be brought back up if the sdk did not call connection.close() - // and there was atleast one sender/receiver link on the connection before it went down. - logger.verbose("[%s] state: %O", connectionContext.connectionId, state); - if (!state.wasConnectionCloseCalled && (state.numSenders || state.numReceivers)) { - logger.verbose( - "[%s] connection.close() was not called from the sdk and there were some " + - "senders and/or receivers. We should reconnect.", - connectionContext.connection.id - ); - await delay(Constants.connectionReconnectDelay); - - const detachCalls: Promise[] = []; - + // Calling onDetached on sender + if (!state.wasConnectionCloseCalled && state.numSenders) { + // We don't do recovery for the sender: + // Because we don't want to keep the sender active all the time + // and the "next" send call would bear the burden of creating the link. // Call onDetached() on sender so that it can gracefully shutdown + // by cleaning up the timers and closing the links. + // We don't call onDetached for sender after `refreshConnection()` + // because any new send calls that potentially initialize links would also get affected if called later. + // TODO: do the same for batching receiver + const detachCalls: Promise[] = []; for (const senderName of Object.keys(connectionContext.senders)) { const sender = connectionContext.senders[senderName]; if (sender) { @@ -352,6 +346,24 @@ export namespace ConnectionContext { ); } } + await Promise.all(detachCalls); + } + + await refreshConnection(connectionContext); + waitForConnectionRefreshResolve(); + waitForConnectionRefreshPromise = undefined; + // The connection should always be brought back up if the sdk did not call connection.close() + // and there was atleast one sender/receiver link on the connection before it went down. + logger.verbose("[%s] state: %O", connectionContext.connectionId, state); + if (!state.wasConnectionCloseCalled && (state.numSenders || state.numReceivers)) { + logger.verbose( + "[%s] connection.close() was not called from the sdk and there were some " + + "senders and/or receivers. We should reconnect.", + connectionContext.connection.id + ); + await delay(Constants.connectionReconnectDelay); + + const detachCalls: Promise[] = []; // Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation // and streaming receivers can decide whether to reconnect or not. diff --git a/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts b/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts new file mode 100644 index 000000000000..f927eff1e7a2 --- /dev/null +++ b/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts @@ -0,0 +1,56 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing"; +import { Span, SpanContext } from "@opentelemetry/api"; +import { ServiceBusMessage } from "../serviceBusMessage"; + +/** + * @ignore + */ +export const TRACEPARENT_PROPERTY = "Diagnostic-Id"; + +/** + * Populates the `ServiceBusMessage` with `SpanContext` info to support trace propagation. + * Creates and returns a copy of the passed in `ServiceBusMessage` unless the `ServiceBusMessage` + * has already been instrumented. + * @param message The `ServiceBusMessage` to instrument. + * @param span The `Span` containing the context to propagate tracing information. + * @ignore + * @internal + */ +export function instrumentServiceBusMessage( + message: ServiceBusMessage, + span: Span +): ServiceBusMessage { + if (message.properties && message.properties[TRACEPARENT_PROPERTY]) { + return message; + } + + // create a copy so the original isn't modified + message = { ...message, properties: { ...message.properties } }; + + const traceParent = getTraceParentHeader(span.context()); + if (traceParent) { + message.properties![TRACEPARENT_PROPERTY] = traceParent; + } + + return message; +} + +/** + * Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists. + * @param message An individual `ServiceBusMessage` object. + * @internal + * @ignore + */ +export function extractSpanContextFromServiceBusMessage( + message: ServiceBusMessage +): SpanContext | undefined { + if (!message.properties || !message.properties[TRACEPARENT_PROPERTY]) { + return; + } + + const diagnosticId = message.properties[TRACEPARENT_PROPERTY] as string; + return extractSpanContextFromTraceParentHeader(diagnosticId); +} diff --git a/sdk/servicebus/service-bus/src/diagnostics/messageSpan.ts b/sdk/servicebus/service-bus/src/diagnostics/messageSpan.ts new file mode 100644 index 000000000000..0202e099abd0 --- /dev/null +++ b/sdk/servicebus/service-bus/src/diagnostics/messageSpan.ts @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { ConnectionConfig } from "@azure/core-amqp"; +import { getTracer } from "@azure/core-tracing"; +import { Span, SpanContext, SpanKind } from "@opentelemetry/api"; + +/** + * @internal + * @ignore + */ +export function createMessageSpan( + parentSpan?: Span | SpanContext | null, + config?: Pick +): Span { + const tracer = getTracer(); + const span = tracer.startSpan("Azure.ServiceBus.message", { + kind: SpanKind.PRODUCER, + parent: parentSpan + }); + span.setAttribute("az.namespace", "Microsoft.ServiceBus"); + if (config) { + span.setAttribute("message_bus.destination", config.entityPath); + span.setAttribute("peer.address", config.host); + } + return span; +} diff --git a/sdk/servicebus/service-bus/src/index.ts b/sdk/servicebus/service-bus/src/index.ts index 6862754f27e3..8389d0c64de1 100644 --- a/sdk/servicebus/service-bus/src/index.ts +++ b/sdk/servicebus/service-bus/src/index.ts @@ -28,7 +28,7 @@ export { SubQueue, SubscribeOptions } from "./models"; -export { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs"; +export { OperationOptionsBase, TryAddOptions } from "./modelsToBeSharedWithEventHubs"; export { ServiceBusReceiver } from "./receivers/receiver"; export { ServiceBusSessionReceiver } from "./receivers/sessionReceiver"; export { ServiceBusSender } from "./sender"; diff --git a/sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts b/sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts index 89f6763881c5..2216f9910609 100644 --- a/sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts +++ b/sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts @@ -3,8 +3,9 @@ // TODO: this code is a straight-copy from EventHubs. Need to merge. -import { Span, SpanContext } from "@opentelemetry/api"; +import { Link, Span, SpanContext, SpanKind } from "@opentelemetry/api"; import { OperationOptions } from "@azure/core-http"; +import { getTracer, OperationTracingOptions } from "@azure/core-tracing"; /** * NOTE: This type is intended to mirror the relevant fields and structure from @azure/core-http OperationOptions @@ -18,7 +19,41 @@ export type OperationOptionsBase = Pick + options?: OperationTracingOptions ): Span | SpanContext | null | undefined { - return options.tracingOptions?.spanOptions?.parent; + return options?.spanOptions?.parent; +} + +export function createSendSpan( + parentSpan?: Span | SpanContext | null, + spanContextsToLink: SpanContext[] = [], + entityPath?: string, + host?: string +): Span { + const links: Link[] = spanContextsToLink.map((context) => { + return { + context + }; + }); + const tracer = getTracer(); + const span = tracer.startSpan("Azure.ServiceBus.send", { + kind: SpanKind.CLIENT, + parent: parentSpan, + links + }); + + span.setAttribute("az.namespace", "Microsoft.ServiceBus"); + span.setAttribute("message_bus.destination", entityPath); + span.setAttribute("peer.address", host); + + return span; +} +/** + * The set of options to manually propagate `Span` context for distributed tracing. + */ +export interface TryAddOptions { + /** + * The `Span` or `SpanContext` to use as the `parent` of any spans created while calling operations that make a request to the service. + */ + parentSpan?: Span | SpanContext | null; } diff --git a/sdk/servicebus/service-bus/src/sender.ts b/sdk/servicebus/service-bus/src/sender.ts index 5d035b4c7f80..07940254c5fb 100644 --- a/sdk/servicebus/service-bus/src/sender.ts +++ b/sdk/servicebus/service-bus/src/sender.ts @@ -21,7 +21,12 @@ import { RetryOptions, retry } from "@azure/core-amqp"; -import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs"; +import { + createSendSpan, + getParentSpan, + OperationOptionsBase +} from "./modelsToBeSharedWithEventHubs"; +import { CanonicalCode, SpanContext } from "@opentelemetry/api"; /** * A Sender can be used to send messages, schedule messages to be sent at a later time @@ -179,14 +184,19 @@ export class ServiceBusSenderImpl implements ServiceBusSender { const invalidTypeErrMsg = "Provided value for 'messages' must be of type ServiceBusMessage, ServiceBusMessageBatch or an array of type ServiceBusMessage."; + // link message span contexts + let spanContextsToLink: SpanContext[] = []; + if (isServiceBusMessage(messages)) { + messages = [messages]; + } + let batch: ServiceBusMessageBatch; if (Array.isArray(messages)) { - const batch = await this.createBatch(options); - + batch = await this.createBatch(options); for (const message of messages) { if (!isServiceBusMessage(message)) { throw new TypeError(invalidTypeErrMsg); } - if (!batch.tryAdd(message)) { + if (!batch.tryAdd(message, { parentSpan: getParentSpan(options?.tracingOptions) })) { // this is too big - throw an error const error = new MessagingError( "Messages were too big to fit in a single batch. Remove some messages and try again or create your own batch using createBatch(), which gives more fine-grained control." @@ -195,14 +205,33 @@ export class ServiceBusSenderImpl implements ServiceBusSender { throw error; } } - - return this._sender.sendBatch(batch, options); } else if (isServiceBusMessageBatch(messages)) { - return this._sender.sendBatch(messages, options); - } else if (isServiceBusMessage(messages)) { - return this._sender.send(messages, options); + spanContextsToLink = messages._messageSpanContexts; + batch = messages; + } else { + throw new TypeError(invalidTypeErrMsg); + } + + const sendSpan = createSendSpan( + getParentSpan(options?.tracingOptions), + spanContextsToLink, + this.entityPath, + this._context.config.host + ); + + try { + const result = await this._sender.sendBatch(batch, options); + sendSpan.setStatus({ code: CanonicalCode.OK }); + return result; + } catch (error) { + sendSpan.setStatus({ + code: CanonicalCode.UNKNOWN, + message: error.message + }); + throw error; + } finally { + sendSpan.end(); } - throw new TypeError(invalidTypeErrMsg); } async createBatch(options?: CreateBatchOptions): Promise { diff --git a/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts b/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts index 354993d73eb7..3a9caf2bf4f0 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts @@ -1,7 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { ServiceBusMessage, toAmqpMessage, isServiceBusMessage } from "./serviceBusMessage"; +import { + ServiceBusMessage, + toAmqpMessage, + isServiceBusMessage, + getMessagePropertyTypeMismatchError +} from "./serviceBusMessage"; import { throwTypeErrorIfParameterMissing } from "./util/errors"; import { ConnectionContext } from "./connectionContext"; import { @@ -10,6 +15,13 @@ import { message as RheaMessageUtil } from "rhea-promise"; import { AmqpMessage } from "@azure/core-amqp"; +import { SpanContext } from "@opentelemetry/api"; +import { + instrumentServiceBusMessage, + TRACEPARENT_PROPERTY +} from "./diagnostics/instrumentServiceBusMessage"; +import { createMessageSpan } from "./diagnostics/messageSpan"; +import { TryAddOptions } from "./modelsToBeSharedWithEventHubs"; /** * @internal @@ -65,7 +77,7 @@ export interface ServiceBusMessageBatch { * @param message An individual service bus message. * @returns A boolean value indicating if the message has been added to the batch or not. */ - tryAdd(message: ServiceBusMessage): boolean; + tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean; /** * The AMQP message containing encoded events that were added to the batch. @@ -77,6 +89,14 @@ export interface ServiceBusMessageBatch { * @ignore */ _generateMessage(): Buffer; + + /** + * Gets the "message" span contexts that were created when adding events to the batch. + * Used internally by the `sendBatch()` method to set up the right spans in traces if tracing is enabled. + * @internal + * @ignore + */ + readonly _messageSpanContexts: SpanContext[]; } /** @@ -95,6 +115,10 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch { * @property Encoded amqp messages. */ private _encodedMessages: Buffer[] = []; + /** + * List of 'message' span contexts. + */ + private _spanContexts: SpanContext[] = []; /** * ServiceBusMessageBatch should not be constructed using `new ServiceBusMessageBatch()` * Use the `createBatch()` method on your `Sender` instead. @@ -132,6 +156,15 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch { return this._encodedMessages.length; } + /** + * Gets the "message" span contexts that were created when adding messages to the batch. + * @internal + * @ignore + */ + get _messageSpanContexts(): SpanContext[] { + return this._spanContexts; + } + /** * Generates an AMQP message that contains the provided encoded messages and annotations. * @@ -210,16 +243,37 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch { * @param message An individual service bus message. * @returns A boolean value indicating if the message has been added to the batch or not. */ - public tryAdd(message: ServiceBusMessage): boolean { + public tryAdd(message: ServiceBusMessage, options: TryAddOptions = {}): boolean { throwTypeErrorIfParameterMissing(this._context.connectionId, "message", message); if (!isServiceBusMessage(message)) { throw new TypeError("Provided value for 'message' must be of type ServiceBusMessage."); } + // check if the event has already been instrumented + const previouslyInstrumented = Boolean( + message.properties && message.properties[TRACEPARENT_PROPERTY] + ); + let spanContext: SpanContext | undefined; + if (!previouslyInstrumented) { + const messageSpan = createMessageSpan(options?.parentSpan, this._context.config); + message = instrumentServiceBusMessage(message, messageSpan); + spanContext = messageSpan.context(); + messageSpan.end(); + } + // Convert ServiceBusMessage to AmqpMessage. const amqpMessage = toAmqpMessage(message); amqpMessage.body = this._context.dataTransformer.encode(message.body); - const encodedMessage = RheaMessageUtil.encode(amqpMessage); + + let encodedMessage: Buffer; + try { + encodedMessage = RheaMessageUtil.encode(amqpMessage); + } catch (error) { + if (error instanceof TypeError || error.name === "TypeError") { + throw getMessagePropertyTypeMismatchError(message) || error; + } + throw error; + } let currentSize = this._sizeInBytes; @@ -261,6 +315,9 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch { // The message will fit in the batch, so it is now safe to store it. this._encodedMessages.push(encodedMessage); + if (spanContext) { + this._spanContexts.push(spanContext); + } this._sizeInBytes = currentSize; return true; diff --git a/sdk/servicebus/service-bus/test/internal/messageSpan.spec.ts b/sdk/servicebus/service-bus/test/internal/messageSpan.spec.ts new file mode 100644 index 000000000000..78f23dcbea70 --- /dev/null +++ b/sdk/servicebus/service-bus/test/internal/messageSpan.spec.ts @@ -0,0 +1,82 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import { createMessageSpan } from "../../src/diagnostics/messageSpan"; +import { TraceFlags, SpanContext } from "@opentelemetry/api"; +import { TestTracer, setTracer, getTracer } from "@azure/core-tracing"; +import { ConnectionConfig } from "@azure/core-amqp"; + +const should = chai.should(); +const assert = chai.assert; + +describe("#createMessageSpan()", () => { + const origTracer = getTracer(); + + before(() => { + setTracer(new TestTracer()); + }); + + after(() => { + setTracer(origTracer); + }); + + const mockSpanContext: SpanContext = { + traceId: "d4cda95b652f4a1592b449d5929fda1b", + spanId: "6e0c63257de34c92", + traceFlags: TraceFlags.SAMPLED + }; + const mockServiceBusConnectionConfig: Pick = { + entityPath: "entity", + host: "foo.example.com" + }; + + it("should create a span without a parent", () => { + const span = createMessageSpan(); + + should.exist(span); + should.exist(span.context().spanId); + should.exist(span.context().traceId); + + should.equal((span as any).name, "Azure.ServiceBus.message"); + assert.deepStrictEqual((span as any).attributes, { + "az.namespace": "Microsoft.ServiceBus" + }); + + span.end(); + }); + + it("should create a span with a parent", () => { + const span = createMessageSpan(mockSpanContext); + + should.exist(span); + should.equal(span.context().traceId, mockSpanContext.traceId); + should.exist(span.context().spanId); + should.not.equal(span.context().spanId, mockSpanContext.spanId); + + should.equal((span as any).name, "Azure.ServiceBus.message"); + assert.deepStrictEqual((span as any).attributes, { + "az.namespace": "Microsoft.ServiceBus" + }); + + span.end(); + }); + + it("should create a span with a serviceBusConfig", () => { + const span = createMessageSpan(mockSpanContext, mockServiceBusConnectionConfig); + + should.exist(span); + should.equal(span.context().traceId, mockSpanContext.traceId); + should.exist(span.context().spanId); + should.not.equal(span.context().spanId, mockSpanContext.spanId); + + should.equal((span as any).name, "Azure.ServiceBus.message"); + assert.deepStrictEqual((span as any).attributes, { + "az.namespace": "Microsoft.ServiceBus", + "message_bus.destination": mockServiceBusConnectionConfig.entityPath, + "peer.address": mockServiceBusConnectionConfig.host + }); + + span.end(); + }); +}); diff --git a/sdk/servicebus/service-bus/test/internal/utils.spec.ts b/sdk/servicebus/service-bus/test/internal/utils.spec.ts index d005aa180404..5f0ea19025fb 100644 --- a/sdk/servicebus/service-bus/test/internal/utils.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/utils.spec.ts @@ -10,6 +10,12 @@ import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-contr import { delay } from "rhea-promise"; import chai from "chai"; import chaiAsPromised from "chai-as-promised"; +import { + extractSpanContextFromServiceBusMessage, + TRACEPARENT_PROPERTY +} from "../../src/diagnostics/instrumentServiceBusMessage"; +import { ServiceBusReceivedMessage } from "../../src"; +import { TraceFlags } from "@opentelemetry/api"; chai.use(chaiAsPromised); const assert = chai.assert; @@ -317,6 +323,69 @@ describe("utils", () => { assert.isFalse(callbackWasCalled); }); }); + + describe("extractSpanContextFromServiceBusMessage", function() { + it("should extract a SpanContext from a properly instrumented ServiceBusMessage", function() { + const traceId = "11111111111111111111111111111111"; + const spanId = "2222222222222222"; + const flags = "00"; + const receivedMessage: ServiceBusReceivedMessage = { + body: "This is a test.", + enqueuedTimeUtc: new Date(), + properties: { + [TRACEPARENT_PROPERTY]: `00-${traceId}-${spanId}-${flags}` + }, + _amqpAnnotatedMessage: { body: "This is a test." } + }; + + const spanContext = extractSpanContextFromServiceBusMessage(receivedMessage); + + assert.exists(spanContext, "Extracted spanContext should be defined."); + assert.equal(spanContext!.traceId, traceId, "Extracted traceId does not match expectation."); + assert.equal(spanContext!.spanId, spanId, "Extracted spanId does not match expectation."); + assert.equal( + spanContext!.traceFlags, + TraceFlags.NONE, + "Extracted traceFlags do not match expectations." + ); + }); + + it("should return undefined when ServiceBusMessage is not properly instrumented", function() { + const traceId = "11111111111111111111111111111111"; + const spanId = "2222222222222222"; + const flags = "00"; + const receivedMessage: ServiceBusReceivedMessage = { + body: "This is a test.", + enqueuedTimeUtc: new Date(), + properties: { + [TRACEPARENT_PROPERTY]: `99-${traceId}-${spanId}-${flags}` + }, + _amqpAnnotatedMessage: { body: "This is a test." } + }; + + const spanContext = extractSpanContextFromServiceBusMessage(receivedMessage); + + assert.notExists( + spanContext, + "Invalid diagnosticId version should return undefined spanContext." + ); + }); + + it("should return undefined when ServiceBusMessage is not instrumented", function() { + const receivedMessage: ServiceBusReceivedMessage = { + body: "This is a test.", + enqueuedTimeUtc: new Date(), + _amqpAnnotatedMessage: { body: "This is a test." } + }; + + const spanContext = extractSpanContextFromServiceBusMessage(receivedMessage); + + assert.notExists( + spanContext, + `Missing property "${TRACEPARENT_PROPERTY}" should return undefined spanContext.` + ); + }); + }); }); function getAbortSignalWithTracking( diff --git a/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts b/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts index 3ba444bd9b77..9f531d5ee271 100644 --- a/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts +++ b/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts @@ -6,7 +6,7 @@ import Long from "long"; const should = chai.should(); import chaiAsPromised from "chai-as-promised"; chai.use(chaiAsPromised); -import { ServiceBusMessage, delay, ServiceBusClient } from "../src"; +import { ServiceBusMessage, delay } from "../src"; import { TestClientType, TestMessage } from "./utils/testUtils"; import { ServiceBusReceiver } from "../src/receivers/receiver"; import { @@ -21,6 +21,9 @@ import { import { ServiceBusSender } from "../src/sender"; import { ServiceBusReceivedMessageWithLock } from "../src/serviceBusMessage"; import { AbortController } from "@azure/abort-controller"; +import { SpanGraph, TestSpan } from "@azure/core-tracing"; +import { setTracerForTest } from "./utils/misc"; +import { TRACEPARENT_PROPERTY } from "../src/diagnostics/instrumentServiceBusMessage"; const noSessionTestClientType = getRandomTestClientTypeWithNoSessions(); const withSessionTestClientType = getRandomTestClientTypeWithSessions(); @@ -380,12 +383,17 @@ describe("Sender Tests", () => { }); describe("ServiceBusMessage validations", function(): void { - let sbClient: ServiceBusClient; + let sbClient: ServiceBusClientForTests; let sender: ServiceBusSender; - before(() => { - sbClient = new ServiceBusClient("Endpoint=sb://a;SharedAccessKeyName=b;SharedAccessKey=c;"); - sender = sbClient.createSender("dummyQueue"); + before(async () => { + sbClient = createServiceBusClientForTests(); + const entityName = await sbClient.test.createTestEntities(TestClientType.UnpartitionedQueue); + sender = sbClient.createSender(entityName.queue!); + }); + + after(async () => { + await sbClient.close(); }); const longString = @@ -399,7 +407,7 @@ describe("ServiceBusMessage validations", function(): void { { message: { body: "", contentType: 1 as any }, expectedErrorMessage: "The property 'contentType' on the message must be of type 'string'", - title: "contenType is of invalid type" + title: "contentType is of invalid type" }, { message: { body: "", label: 1 as any }, @@ -521,3 +529,300 @@ describe("ServiceBusMessage validations", function(): void { }); }); }); + +describe("Tracing for send", function(): void { + let sbClient: ServiceBusClientForTests; + let sender: ServiceBusSender; + let entityName: EntityName; + + before(() => { + sbClient = createServiceBusClientForTests(); + }); + + after(() => { + return sbClient.test.after(); + }); + + beforeEach(async () => { + entityName = await sbClient.test.createTestEntities(TestClientType.UnpartitionedQueue); + + sender = sbClient.test.addToCleanup( + sbClient.createSender(entityName.queue ?? entityName.topic!) + ); + }); + + it("add messages with tryAdd - can be manually traced", async function(): Promise { + const { tracer, resetTracer } = setTracerForTest(); + + const rootSpan = tracer.startSpan("root"); + + const list = [{ name: "Albert" }, { name: "Marie" }]; + + const batch = await sender.createBatch(); + + for (let i = 0; i < 2; i++) { + batch.tryAdd({ body: `${list[i].name}` }, { parentSpan: rootSpan }); + } + await sender.sendMessages(batch); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(2, "Should only have two root spans."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.context().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); + }); + + it("add messages with tryAdd - will not instrument already instrumented messages", async function(): Promise< + void + > { + const { tracer, resetTracer } = setTracerForTest(); + + const rootSpan = tracer.startSpan("test"); + + const list = [ + { name: "Albert" }, + { + name: "Marie", + properties: { + [TRACEPARENT_PROPERTY]: "foo" + } + } + ]; + + const batch = await sender.createBatch(); + + for (let i = 0; i < 2; i++) { + batch.tryAdd( + { body: `${list[i].name}`, properties: list[i].properties }, + { parentSpan: rootSpan } + ); + } + await sender.sendMessages(batch); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(2, "Should only have two root spans."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.ServiceBus.message", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.context().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); + }); + + it("will support tracing batch and send", async function(): Promise { + const { tracer, resetTracer } = setTracerForTest(); + + const rootSpan = tracer.startSpan("root"); + + const list = [{ name: "Albert" }, { name: "Marie" }]; + + const batch = await sender.createBatch(); + for (let i = 0; i < 2; i++) { + batch.tryAdd({ body: `${list[i].name}` }, { parentSpan: rootSpan }); + } + await sender.sendMessages(batch, { + tracingOptions: { + spanOptions: { + parent: rootSpan.context() + } + } + }); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root span."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.send", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.context().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); + }); + + it("array of messages - can be manually traced", async function(): Promise { + const { tracer, resetTracer } = setTracerForTest(); + + const rootSpan = tracer.startSpan("root"); + + const messages = []; + for (let i = 0; i < 5; i++) { + messages.push({ body: `multiple messages - manual trace propagation: ${i}` }); + } + await sender.sendMessages(messages, { + tracingOptions: { + spanOptions: { + parent: rootSpan.context() + } + } + }); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root spans."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.send", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.context().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + + const knownSendSpans = tracer + .getKnownSpans() + .filter((span: TestSpan) => span.name === "Azure.ServiceBus.send"); + knownSendSpans.length.should.equal(1, "There should have been one send span."); + knownSendSpans[0].attributes.should.deep.equal({ + "az.namespace": "Microsoft.ServiceBus", + "message_bus.destination": sender.entityPath, + "peer.address": sbClient.fullyQualifiedNamespace + }); + resetTracer(); + }); + + it("array of messages - skips already instrumented messages when manually traced", async function(): Promise< + void + > { + const { tracer, resetTracer } = setTracerForTest(); + + const rootSpan = tracer.startSpan("root"); + + const messages: ServiceBusMessage[] = []; + for (let i = 0; i < 5; i++) { + messages.push({ body: `multiple messages - manual trace propgation: ${i}` }); + } + messages[0].properties = { [TRACEPARENT_PROPERTY]: "foo" }; + await sender.sendMessages(messages, { + tracingOptions: { + spanOptions: { + parent: rootSpan.context() + } + } + }); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root spans."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.message", + children: [] + }, + { + name: "Azure.ServiceBus.send", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.context().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); + }); +}); diff --git a/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts b/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts index bd54bffad68c..fc276673e19b 100644 --- a/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts +++ b/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts @@ -196,58 +196,61 @@ describe("sessions tests - requires completely clean entity for each test", () }); }); - describe(testClientType + ": SessionReceiver with empty string as sessionId", function(): void { - afterEach(async () => { - await afterEachTest(); - }); - - // Sending messages with different session id, so that we know for sure we pick the right one - // and that Service Bus is not choosing a random one for us - const testMessagesWithDifferentSessionIds: ServiceBusMessage[] = [ - { - body: "hello1", - messageId: `test message ${Math.random()}`, - sessionId: TestMessage.sessionId - }, - { - body: "hello2", - messageId: `test message ${Math.random()}`, - sessionId: "" - } - ]; - - async function testComplete_batching(): Promise { - await sender.sendMessages(testMessagesWithDifferentSessionIds[0]); - await sender.sendMessages(testMessagesWithDifferentSessionIds[1]); - - const entityNames = serviceBusClient.test.getTestEntities(testClientType); - - // get the next available session ID rather than specifying one - receiver = await serviceBusClient.test.acceptSessionWithPeekLock(entityNames, ""); - - const msgs = await receiver.receiveMessages(2); + describe.skip( + testClientType + ": SessionReceiver with empty string as sessionId", + function(): void { + afterEach(async () => { + await afterEachTest(); + }); + + // Sending messages with different session id, so that we know for sure we pick the right one + // and that Service Bus is not choosing a random one for us + const testMessagesWithDifferentSessionIds: ServiceBusMessage[] = [ + { + body: "hello1", + messageId: `test message ${Math.random()}`, + sessionId: TestMessage.sessionId + }, + { + body: "hello2", + messageId: `test message ${Math.random()}`, + sessionId: "" + } + ]; + + async function testComplete_batching(): Promise { + await sender.sendMessages(testMessagesWithDifferentSessionIds[0]); + await sender.sendMessages(testMessagesWithDifferentSessionIds[1]); + + const entityNames = serviceBusClient.test.getTestEntities(testClientType); + + // get the next available session ID rather than specifying one + receiver = await serviceBusClient.test.acceptSessionWithPeekLock(entityNames, ""); + + const msgs = await receiver.receiveMessages(2); + + should.equal(msgs.length, 1, "Unexpected number of messages received"); + + should.equal(receiver.sessionId, "", "Unexpected sessionId in receiver"); + should.equal( + testMessagesWithDifferentSessionIds[1].body === msgs[0].body && + testMessagesWithDifferentSessionIds[1].messageId === msgs[0].messageId && + testMessagesWithDifferentSessionIds[1].sessionId === msgs[0].sessionId, + true, + "Received Message doesnt match expected test message" + ); + await msgs[0].complete(); - should.equal(msgs.length, 1, "Unexpected number of messages received"); + const peekedMsgsInSession = await receiver.peekMessages(1); + should.equal(peekedMsgsInSession.length, 0, "Unexpected number of messages peeked"); - should.equal(receiver.sessionId, "", "Unexpected sessionId in receiver"); - should.equal( - testMessagesWithDifferentSessionIds[1].body === msgs[0].body && - testMessagesWithDifferentSessionIds[1].messageId === msgs[0].messageId && - testMessagesWithDifferentSessionIds[1].sessionId === msgs[0].sessionId, - true, - "Received Message doesnt match expected test message" - ); - await msgs[0].complete(); - - const peekedMsgsInSession = await receiver.peekMessages(1); - should.equal(peekedMsgsInSession.length, 0, "Unexpected number of messages peeked"); + await receiver.close(); + } - await receiver.close(); + it("complete() removes message from random session", async function(): Promise { + await beforeEachNoSessionTest(); + await testComplete_batching(); + }); } - - it("complete() removes message from random session", async function(): Promise { - await beforeEachNoSessionTest(); - await testComplete_batching(); - }); - }); + ); }); diff --git a/sdk/servicebus/service-bus/test/utils/misc.ts b/sdk/servicebus/service-bus/test/utils/misc.ts index 773edab2713d..5ad19cb27fd4 100644 --- a/sdk/servicebus/service-bus/test/utils/misc.ts +++ b/sdk/servicebus/service-bus/test/utils/misc.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. import { defaultLock } from "@azure/core-amqp"; +import { NoOpTracer, setTracer, TestTracer } from "@azure/core-tracing"; import { Delivery, ServiceBusReceivedMessage } from "../../src"; import { LinkEntity } from "../../src/core/linkEntity"; import { ServiceBusMessageImpl } from "../../src/serviceBusMessage"; @@ -25,3 +26,15 @@ export function getDeliveryProperty(message: ServiceBusReceivedMessage): Deliver export function isLinkLocked(linkEntity: LinkEntity): boolean { return defaultLock.isBusy(linkEntity["_openLock"]); } + +export function setTracerForTest( + tracer?: T +): { tracer: T; resetTracer: () => void } { + tracer = tracer ?? (new TestTracer() as T); + setTracer(tracer); + + return { + tracer, + resetTracer: () => setTracer(new NoOpTracer()) + }; +}