Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Tracing for send API #11651

Merged
merged 25 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a8c1d5d
tracing for trySend
HarshaNalluru Oct 5, 2020
a73fd31
tracing for send API
HarshaNalluru Oct 5, 2020
e46e805
API Report
HarshaNalluru Oct 5, 2020
4af1738
Pass tracing in tryAdd for array of messages, fx tryAdd, added new Pa…
HarshaNalluru Oct 6, 2020
c7d3a14
extractSpanContextFromServiceBusMessage tests
HarshaNalluru Oct 6, 2020
b5f77a8
test utils setTracerForTest
HarshaNalluru Oct 6, 2020
cd71e42
tracing tests
HarshaNalluru Oct 6, 2020
d631ee5
messageSpan tests
HarshaNalluru Oct 6, 2020
8375ed8
API Report
HarshaNalluru Oct 6, 2020
8038315
inline TryAddOptions
HarshaNalluru Oct 6, 2020
6fa23f7
API Report
HarshaNalluru Oct 6, 2020
a8b34fc
FIX ServiceBusMessage validations tests
HarshaNalluru Oct 6, 2020
d04a38e
changelog
HarshaNalluru Oct 6, 2020
9742d97
Update sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
HarshaNalluru Oct 6, 2020
f262e45
sender link close - cleaning up new link - bug fix
HarshaNalluru Oct 6, 2020
ba4fef7
Merge branch 'harshan/sb/issue/tracing-send' of https://github.com/Ha…
HarshaNalluru Oct 6, 2020
e46deb4
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Oct 6, 2020
fea2177
let -> const
HarshaNalluru Oct 6, 2020
5c5795f
createSendSpan -> models shared with event-hubs
HarshaNalluru Oct 6, 2020
6b2409a
Changelog
HarshaNalluru Oct 7, 2020
7de9b6f
revert f262e4 and move onDetached to before calling refreshConnection
HarshaNalluru Oct 8, 2020
d1af28e
remove linentity - calling close during intialization
HarshaNalluru Oct 8, 2020
060842e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Oct 8, 2020
3197fbc
skip the flaky test
HarshaNalluru Oct 8, 2020
84e48aa
Trigger pipeline(dummy commit)
HarshaNalluru Oct 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

## 7.0.0-preview.8 (Unreleased)

- `sendMessages` method on the sender and `tryAdd` method to add messages to a batch now support tracing.
[PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651)

## 7.0.0-preview.7 (2020-10-07)

- [Bug Fix] `sendMessages` method on the sender would have previously thrown an error for sending a batch or an array of messages upon a network disconnect, the issue has been fixed now.
[PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651/commits/f262e4562eb78828ee816a54f9a9778692e0eff9)

### New features:

- Message locks can be auto-renewed in all receive methods (receiver.receiveMessages, receiver.subcribe
Expand All @@ -17,7 +22,6 @@
- "properties" in the correlation rule filter now supports `Date`.
[PR 11117](https://github.com/Azure/azure-sdk-for-js/pull/11117)


### Breaking changes

- `ServiceBusClient.createSessionReceiver` has been split into two methods:
Expand Down
11 changes: 10 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import { PageSettings } from '@azure/core-paging';
import { PipelineOptions } from '@azure/core-http';
import { RetryOptions } from '@azure/core-amqp';
import { ServiceClient } from '@azure/core-http';
import { Span } from '@opentelemetry/api';
import { SpanContext } from '@opentelemetry/api';
import { TokenCredential } from '@azure/core-amqp';
import { TokenType } from '@azure/core-amqp';
import { UserAgentOptions } from '@azure/core-http';
Expand Down Expand Up @@ -385,8 +387,10 @@ export interface ServiceBusMessageBatch {
// @internal
_generateMessage(): Buffer;
readonly maxSizeInBytes: number;
// @internal
readonly _messageSpanContexts: SpanContext[];
readonly sizeInBytes: number;
tryAdd(message: ServiceBusMessage): boolean;
tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean;
}

// @public
Expand Down Expand Up @@ -565,6 +569,11 @@ export interface TopicRuntimeProperties {
export interface TopicRuntimePropertiesResponse extends TopicRuntimeProperties, Response {
}

// @public
export interface TryAddOptions {
parentSpan?: Span | SpanContext | null;
}

export { WebSocketImpl }

export { WebSocketOptions }
Expand Down
44 changes: 28 additions & 16 deletions sdk/servicebus/service-bus/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,23 +314,17 @@ export namespace ConnectionContext {
await connectionContext.managementClients[entityPath].close();
}

await refreshConnection(connectionContext);
waitForConnectionRefreshResolve();
waitForConnectionRefreshPromise = undefined;
// The connection should always be brought back up if the sdk did not call connection.close()
// and there was atleast one sender/receiver link on the connection before it went down.
logger.verbose("[%s] state: %O", connectionContext.connectionId, state);
if (!state.wasConnectionCloseCalled && (state.numSenders || state.numReceivers)) {
logger.verbose(
"[%s] connection.close() was not called from the sdk and there were some " +
"senders and/or receivers. We should reconnect.",
connectionContext.connection.id
);
await delay(Constants.connectionReconnectDelay);

const detachCalls: Promise<void>[] = [];

// Calling onDetached on sender
if (!state.wasConnectionCloseCalled && state.numSenders) {
// We don't do recovery for the sender:
// Because we don't want to keep the sender active all the time
// and the "next" send call would bear the burden of creating the link.
// Call onDetached() on sender so that it can gracefully shutdown
// by cleaning up the timers and closing the links.
// We don't call onDetached for sender after `refreshConnection()`
// because any new send calls that potentially initialize links would also get affected if called later.
// TODO: do the same for batching receiver
const detachCalls: Promise<void>[] = [];
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (sender) {
Expand All @@ -352,6 +346,24 @@ export namespace ConnectionContext {
);
}
}
await Promise.all(detachCalls);
}

await refreshConnection(connectionContext);
waitForConnectionRefreshResolve();
waitForConnectionRefreshPromise = undefined;
// The connection should always be brought back up if the sdk did not call connection.close()
// and there was atleast one sender/receiver link on the connection before it went down.
logger.verbose("[%s] state: %O", connectionContext.connectionId, state);
if (!state.wasConnectionCloseCalled && (state.numSenders || state.numReceivers)) {
logger.verbose(
"[%s] connection.close() was not called from the sdk and there were some " +
"senders and/or receivers. We should reconnect.",
connectionContext.connection.id
);
await delay(Constants.connectionReconnectDelay);

const detachCalls: Promise<void>[] = [];

// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
// and streaming receivers can decide whether to reconnect or not.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
import { Span, SpanContext } from "@opentelemetry/api";
import { ServiceBusMessage } from "../serviceBusMessage";

/**
* @ignore
*/
export const TRACEPARENT_PROPERTY = "Diagnostic-Id";

/**
* Populates the `ServiceBusMessage` with `SpanContext` info to support trace propagation.
* Creates and returns a copy of the passed in `ServiceBusMessage` unless the `ServiceBusMessage`
* has already been instrumented.
* @param message The `ServiceBusMessage` to instrument.
* @param span The `Span` containing the context to propagate tracing information.
* @ignore
* @internal
*/
export function instrumentServiceBusMessage(
message: ServiceBusMessage,
span: Span
): ServiceBusMessage {
if (message.properties && message.properties[TRACEPARENT_PROPERTY]) {
return message;
}

// create a copy so the original isn't modified
message = { ...message, properties: { ...message.properties } };

const traceParent = getTraceParentHeader(span.context());
if (traceParent) {
message.properties![TRACEPARENT_PROPERTY] = traceParent;
}

return message;
}

/**
* Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists.
* @param message An individual `ServiceBusMessage` object.
* @internal
* @ignore
*/
export function extractSpanContextFromServiceBusMessage(
message: ServiceBusMessage
): SpanContext | undefined {
if (!message.properties || !message.properties[TRACEPARENT_PROPERTY]) {
return;
}

const diagnosticId = message.properties[TRACEPARENT_PROPERTY] as string;
return extractSpanContextFromTraceParentHeader(diagnosticId);
}
27 changes: 27 additions & 0 deletions sdk/servicebus/service-bus/src/diagnostics/messageSpan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { ConnectionConfig } from "@azure/core-amqp";
import { getTracer } from "@azure/core-tracing";
import { Span, SpanContext, SpanKind } from "@opentelemetry/api";

/**
* @internal
* @ignore
*/
export function createMessageSpan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback not for this PR, but a note for both Event Hubs & Service Bus for which we can log a generic issue on improvements and make changes for both packages at once in the future:

  • The name createMessageSpan() is slightly misleading as it may indicate that I can use it at any time when a message is involved. In reality, this method is tied to the "producer" kind, so is usable only when sending message.
  • Also, jsdoc for this method would be helpful
  • Why is config optional here? I would imagine that we would always have a ConnectionConfig

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes to the first two!

Why is config optional here? I would imagine that we would always have a ConnectionConfig

parentSpan is not a required parameter but feels primary, I believe that was the reason Chris made the config optional in event-hubs. I can make it the first param in both service-bus and event-hubs and make it required in a later PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged #11687

parentSpan?: Span | SpanContext | null,
config?: Pick<ConnectionConfig, "entityPath" | "host">
): Span {
const tracer = getTracer();
const span = tracer.startSpan("Azure.ServiceBus.message", {
kind: SpanKind.PRODUCER,
parent: parentSpan
});
span.setAttribute("az.namespace", "Microsoft.ServiceBus");
if (config) {
span.setAttribute("message_bus.destination", config.entityPath);
span.setAttribute("peer.address", config.host);
}
return span;
}
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export {
SubQueue,
SubscribeOptions
} from "./models";
export { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
export { OperationOptionsBase, TryAddOptions } from "./modelsToBeSharedWithEventHubs";
export { ServiceBusReceiver } from "./receivers/receiver";
export { ServiceBusSessionReceiver } from "./receivers/sessionReceiver";
export { ServiceBusSender } from "./sender";
Expand Down
41 changes: 38 additions & 3 deletions sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

// TODO: this code is a straight-copy from EventHubs. Need to merge.

import { Span, SpanContext } from "@opentelemetry/api";
import { Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { OperationOptions } from "@azure/core-http";
import { getTracer, OperationTracingOptions } from "@azure/core-tracing";

/**
* NOTE: This type is intended to mirror the relevant fields and structure from @azure/core-http OperationOptions
Expand All @@ -18,7 +19,41 @@ export type OperationOptionsBase = Pick<OperationOptions, "abortSignal" | "traci
* @ignore
*/
export function getParentSpan(
options: Pick<OperationOptionsBase, "tracingOptions">
options?: OperationTracingOptions
): Span | SpanContext | null | undefined {
return options.tracingOptions?.spanOptions?.parent;
return options?.spanOptions?.parent;
}

export function createSendSpan(
parentSpan?: Span | SpanContext | null,
spanContextsToLink: SpanContext[] = [],
entityPath?: string,
host?: string
): Span {
const links: Link[] = spanContextsToLink.map((context) => {
return {
context
};
});
const tracer = getTracer();
const span = tracer.startSpan("Azure.ServiceBus.send", {
kind: SpanKind.CLIENT,
parent: parentSpan,
links
});

span.setAttribute("az.namespace", "Microsoft.ServiceBus");
span.setAttribute("message_bus.destination", entityPath);
span.setAttribute("peer.address", host);

return span;
}
/**
* The set of options to manually propagate `Span` context for distributed tracing.
*/
export interface TryAddOptions {
/**
* The `Span` or `SpanContext` to use as the `parent` of any spans created while calling operations that make a request to the service.
*/
parentSpan?: Span | SpanContext | null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed in Event Hubs that null is not an option for parentSpan. Why do we have this here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other places where we allow the span to be "null" which is a valid value and it signifies the rootSpan.

Moreover, I needed to allow this "null" since I'm calling getParentSpan() for the tryAdd over array of messages which may return a null value.

In event-hubs, this tracing for "array of messages" case is duplicated.
"null" is a valid value and event-hubs should also allow this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll log an issue for fixing it in event-hubs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged #11687

}
49 changes: 39 additions & 10 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import {
RetryOptions,
retry
} from "@azure/core-amqp";
import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
import {
createSendSpan,
getParentSpan,
OperationOptionsBase
} from "./modelsToBeSharedWithEventHubs";
import { CanonicalCode, SpanContext } from "@opentelemetry/api";

/**
* A Sender can be used to send messages, schedule messages to be sent at a later time
Expand Down Expand Up @@ -179,14 +184,19 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
const invalidTypeErrMsg =
"Provided value for 'messages' must be of type ServiceBusMessage, ServiceBusMessageBatch or an array of type ServiceBusMessage.";

// link message span contexts
let spanContextsToLink: SpanContext[] = [];
if (isServiceBusMessage(messages)) {
messages = [messages];
}
let batch: ServiceBusMessageBatch;
if (Array.isArray(messages)) {
const batch = await this.createBatch(options);

batch = await this.createBatch(options);
for (const message of messages) {
if (!isServiceBusMessage(message)) {
throw new TypeError(invalidTypeErrMsg);
}
if (!batch.tryAdd(message)) {
if (!batch.tryAdd(message, { parentSpan: getParentSpan(options?.tracingOptions) })) {
// this is too big - throw an error
const error = new MessagingError(
"Messages were too big to fit in a single batch. Remove some messages and try again or create your own batch using createBatch(), which gives more fine-grained control."
Expand All @@ -195,14 +205,33 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
throw error;
}
}

return this._sender.sendBatch(batch, options);
} else if (isServiceBusMessageBatch(messages)) {
return this._sender.sendBatch(messages, options);
} else if (isServiceBusMessage(messages)) {
return this._sender.send(messages, options);
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
spanContextsToLink = messages._messageSpanContexts;
batch = messages;
} else {
throw new TypeError(invalidTypeErrMsg);
}

const sendSpan = createSendSpan(
getParentSpan(options?.tracingOptions),
spanContextsToLink,
this.entityPath,
this._context.config.host
);

try {
const result = await this._sender.sendBatch(batch, options);
sendSpan.setStatus({ code: CanonicalCode.OK });
return result;
} catch (error) {
sendSpan.setStatus({
code: CanonicalCode.UNKNOWN,
message: error.message
});
throw error;
} finally {
sendSpan.end();
}
throw new TypeError(invalidTypeErrMsg);
}

async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
Expand Down
Loading