Skip to content

Commit

Permalink
[Service Bus] Tracing for send API (#11651)
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshaNalluru authored Oct 8, 2020
1 parent d77196b commit 3448028
Show file tree
Hide file tree
Showing 14 changed files with 793 additions and 92 deletions.
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

## 7.0.0-preview.8 (Unreleased)

- `sendMessages` method on the sender and `tryAdd` method to add messages to a batch now support tracing.
[PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651)

## 7.0.0-preview.7 (2020-10-07)

- [Bug Fix] `sendMessages` method on the sender would have previously thrown an error for sending a batch or an array of messages upon a network disconnect, the issue has been fixed now.
[PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651/commits/f262e4562eb78828ee816a54f9a9778692e0eff9)

### New features:

- Message locks can be auto-renewed in all receive methods (receiver.receiveMessages, receiver.subcribe
Expand All @@ -17,7 +22,6 @@
- "properties" in the correlation rule filter now supports `Date`.
[PR 11117](https://github.com/Azure/azure-sdk-for-js/pull/11117)


### Breaking changes

- `ServiceBusClient.createSessionReceiver` has been split into two methods:
Expand Down
11 changes: 10 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import { PageSettings } from '@azure/core-paging';
import { PipelineOptions } from '@azure/core-http';
import { RetryOptions } from '@azure/core-amqp';
import { ServiceClient } from '@azure/core-http';
import { Span } from '@opentelemetry/api';
import { SpanContext } from '@opentelemetry/api';
import { TokenCredential } from '@azure/core-amqp';
import { TokenType } from '@azure/core-amqp';
import { UserAgentOptions } from '@azure/core-http';
Expand Down Expand Up @@ -385,8 +387,10 @@ export interface ServiceBusMessageBatch {
// @internal
_generateMessage(): Buffer;
readonly maxSizeInBytes: number;
// @internal
readonly _messageSpanContexts: SpanContext[];
readonly sizeInBytes: number;
tryAdd(message: ServiceBusMessage): boolean;
tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean;
}

// @public
Expand Down Expand Up @@ -565,6 +569,11 @@ export interface TopicRuntimeProperties {
export interface TopicRuntimePropertiesResponse extends TopicRuntimeProperties, Response {
}

// @public
export interface TryAddOptions {
parentSpan?: Span | SpanContext | null;
}

export { WebSocketImpl }

export { WebSocketOptions }
Expand Down
44 changes: 28 additions & 16 deletions sdk/servicebus/service-bus/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,23 +314,17 @@ export namespace ConnectionContext {
await connectionContext.managementClients[entityPath].close();
}

await refreshConnection(connectionContext);
waitForConnectionRefreshResolve();
waitForConnectionRefreshPromise = undefined;
// The connection should always be brought back up if the sdk did not call connection.close()
// and there was atleast one sender/receiver link on the connection before it went down.
logger.verbose("[%s] state: %O", connectionContext.connectionId, state);
if (!state.wasConnectionCloseCalled && (state.numSenders || state.numReceivers)) {
logger.verbose(
"[%s] connection.close() was not called from the sdk and there were some " +
"senders and/or receivers. We should reconnect.",
connectionContext.connection.id
);
await delay(Constants.connectionReconnectDelay);

const detachCalls: Promise<void>[] = [];

// Calling onDetached on sender
if (!state.wasConnectionCloseCalled && state.numSenders) {
// We don't do recovery for the sender:
// Because we don't want to keep the sender active all the time
// and the "next" send call would bear the burden of creating the link.
// Call onDetached() on sender so that it can gracefully shutdown
// by cleaning up the timers and closing the links.
// We don't call onDetached for sender after `refreshConnection()`
// because any new send calls that potentially initialize links would also get affected if called later.
// TODO: do the same for batching receiver
const detachCalls: Promise<void>[] = [];
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (sender) {
Expand All @@ -352,6 +346,24 @@ export namespace ConnectionContext {
);
}
}
await Promise.all(detachCalls);
}

await refreshConnection(connectionContext);
waitForConnectionRefreshResolve();
waitForConnectionRefreshPromise = undefined;
// The connection should always be brought back up if the sdk did not call connection.close()
// and there was atleast one sender/receiver link on the connection before it went down.
logger.verbose("[%s] state: %O", connectionContext.connectionId, state);
if (!state.wasConnectionCloseCalled && (state.numSenders || state.numReceivers)) {
logger.verbose(
"[%s] connection.close() was not called from the sdk and there were some " +
"senders and/or receivers. We should reconnect.",
connectionContext.connection.id
);
await delay(Constants.connectionReconnectDelay);

const detachCalls: Promise<void>[] = [];

// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
// and streaming receivers can decide whether to reconnect or not.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
import { Span, SpanContext } from "@opentelemetry/api";
import { ServiceBusMessage } from "../serviceBusMessage";

/**
* @ignore
*/
export const TRACEPARENT_PROPERTY = "Diagnostic-Id";

/**
* Populates the `ServiceBusMessage` with `SpanContext` info to support trace propagation.
* Creates and returns a copy of the passed in `ServiceBusMessage` unless the `ServiceBusMessage`
* has already been instrumented.
* @param message The `ServiceBusMessage` to instrument.
* @param span The `Span` containing the context to propagate tracing information.
* @ignore
* @internal
*/
export function instrumentServiceBusMessage(
message: ServiceBusMessage,
span: Span
): ServiceBusMessage {
if (message.properties && message.properties[TRACEPARENT_PROPERTY]) {
return message;
}

// create a copy so the original isn't modified
message = { ...message, properties: { ...message.properties } };

const traceParent = getTraceParentHeader(span.context());
if (traceParent) {
message.properties![TRACEPARENT_PROPERTY] = traceParent;
}

return message;
}

/**
* Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists.
* @param message An individual `ServiceBusMessage` object.
* @internal
* @ignore
*/
export function extractSpanContextFromServiceBusMessage(
message: ServiceBusMessage
): SpanContext | undefined {
if (!message.properties || !message.properties[TRACEPARENT_PROPERTY]) {
return;
}

const diagnosticId = message.properties[TRACEPARENT_PROPERTY] as string;
return extractSpanContextFromTraceParentHeader(diagnosticId);
}
27 changes: 27 additions & 0 deletions sdk/servicebus/service-bus/src/diagnostics/messageSpan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { ConnectionConfig } from "@azure/core-amqp";
import { getTracer } from "@azure/core-tracing";
import { Span, SpanContext, SpanKind } from "@opentelemetry/api";

/**
* @internal
* @ignore
*/
export function createMessageSpan(
parentSpan?: Span | SpanContext | null,
config?: Pick<ConnectionConfig, "entityPath" | "host">
): Span {
const tracer = getTracer();
const span = tracer.startSpan("Azure.ServiceBus.message", {
kind: SpanKind.PRODUCER,
parent: parentSpan
});
span.setAttribute("az.namespace", "Microsoft.ServiceBus");
if (config) {
span.setAttribute("message_bus.destination", config.entityPath);
span.setAttribute("peer.address", config.host);
}
return span;
}
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export {
SubQueue,
SubscribeOptions
} from "./models";
export { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
export { OperationOptionsBase, TryAddOptions } from "./modelsToBeSharedWithEventHubs";
export { ServiceBusReceiver } from "./receivers/receiver";
export { ServiceBusSessionReceiver } from "./receivers/sessionReceiver";
export { ServiceBusSender } from "./sender";
Expand Down
41 changes: 38 additions & 3 deletions sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

// TODO: this code is a straight-copy from EventHubs. Need to merge.

import { Span, SpanContext } from "@opentelemetry/api";
import { Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { OperationOptions } from "@azure/core-http";
import { getTracer, OperationTracingOptions } from "@azure/core-tracing";

/**
* NOTE: This type is intended to mirror the relevant fields and structure from @azure/core-http OperationOptions
Expand All @@ -18,7 +19,41 @@ export type OperationOptionsBase = Pick<OperationOptions, "abortSignal" | "traci
* @ignore
*/
export function getParentSpan(
options: Pick<OperationOptionsBase, "tracingOptions">
options?: OperationTracingOptions
): Span | SpanContext | null | undefined {
return options.tracingOptions?.spanOptions?.parent;
return options?.spanOptions?.parent;
}

export function createSendSpan(
parentSpan?: Span | SpanContext | null,
spanContextsToLink: SpanContext[] = [],
entityPath?: string,
host?: string
): Span {
const links: Link[] = spanContextsToLink.map((context) => {
return {
context
};
});
const tracer = getTracer();
const span = tracer.startSpan("Azure.ServiceBus.send", {
kind: SpanKind.CLIENT,
parent: parentSpan,
links
});

span.setAttribute("az.namespace", "Microsoft.ServiceBus");
span.setAttribute("message_bus.destination", entityPath);
span.setAttribute("peer.address", host);

return span;
}
/**
* The set of options to manually propagate `Span` context for distributed tracing.
*/
export interface TryAddOptions {
/**
* The `Span` or `SpanContext` to use as the `parent` of any spans created while calling operations that make a request to the service.
*/
parentSpan?: Span | SpanContext | null;
}
49 changes: 39 additions & 10 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import {
RetryOptions,
retry
} from "@azure/core-amqp";
import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
import {
createSendSpan,
getParentSpan,
OperationOptionsBase
} from "./modelsToBeSharedWithEventHubs";
import { CanonicalCode, SpanContext } from "@opentelemetry/api";

/**
* A Sender can be used to send messages, schedule messages to be sent at a later time
Expand Down Expand Up @@ -179,14 +184,19 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
const invalidTypeErrMsg =
"Provided value for 'messages' must be of type ServiceBusMessage, ServiceBusMessageBatch or an array of type ServiceBusMessage.";

// link message span contexts
let spanContextsToLink: SpanContext[] = [];
if (isServiceBusMessage(messages)) {
messages = [messages];
}
let batch: ServiceBusMessageBatch;
if (Array.isArray(messages)) {
const batch = await this.createBatch(options);

batch = await this.createBatch(options);
for (const message of messages) {
if (!isServiceBusMessage(message)) {
throw new TypeError(invalidTypeErrMsg);
}
if (!batch.tryAdd(message)) {
if (!batch.tryAdd(message, { parentSpan: getParentSpan(options?.tracingOptions) })) {
// this is too big - throw an error
const error = new MessagingError(
"Messages were too big to fit in a single batch. Remove some messages and try again or create your own batch using createBatch(), which gives more fine-grained control."
Expand All @@ -195,14 +205,33 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
throw error;
}
}

return this._sender.sendBatch(batch, options);
} else if (isServiceBusMessageBatch(messages)) {
return this._sender.sendBatch(messages, options);
} else if (isServiceBusMessage(messages)) {
return this._sender.send(messages, options);
spanContextsToLink = messages._messageSpanContexts;
batch = messages;
} else {
throw new TypeError(invalidTypeErrMsg);
}

const sendSpan = createSendSpan(
getParentSpan(options?.tracingOptions),
spanContextsToLink,
this.entityPath,
this._context.config.host
);

try {
const result = await this._sender.sendBatch(batch, options);
sendSpan.setStatus({ code: CanonicalCode.OK });
return result;
} catch (error) {
sendSpan.setStatus({
code: CanonicalCode.UNKNOWN,
message: error.message
});
throw error;
} finally {
sendSpan.end();
}
throw new TypeError(invalidTypeErrMsg);
}

async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
Expand Down
Loading

0 comments on commit 3448028

Please sign in to comment.