Skip to content

Commit

Permalink
feat: add @aws-sdk/client-sqs instrumentation (elastic#3643)
Browse files Browse the repository at this point in the history
Closes: elastic#2957
  • Loading branch information
david-luna authored and fpm-peter committed Aug 20, 2024
1 parent 3ee06e0 commit 6ea131b
Show file tree
Hide file tree
Showing 16 changed files with 2,445 additions and 303 deletions.
2 changes: 1 addition & 1 deletion .ci/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ elif [[ -n "${TAV_MODULE}" ]]; then
memcached)
DOCKER_COMPOSE_FILE=docker-compose-memcached.yml
;;
aws-sdk|@aws-sdk/client-s3|@aws-sdk/client-dynamodb|@aws-sdk/client-sns)
aws-sdk|@aws-sdk/client-s3|@aws-sdk/client-dynamodb|@aws-sdk/client-sns|@aws-sdk/client-sqs)
DOCKER_COMPOSE_FILE=docker-compose-localstack.yml
;;
*)
Expand Down
1 change: 1 addition & 0 deletions .ci/tav.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
{ "name": "@aws-sdk/client-dynamodb", "minVersion": 14 },
{ "name": "@aws-sdk/client-s3", "minVersion": 14 },
{ "name": "@aws-sdk/client-sns", "minVersion": 14 },
{ "name": "@aws-sdk/client-sqs", "minVersion": 14 },
{ "name": "@elastic/elasticsearch", "minVersion": 10 },
{ "name": "@hapi/hapi", "minVersion": 8 },
{ "name": "@opentelemetry/api", "minVersion": 14 },
Expand Down
10 changes: 9 additions & 1 deletion .tav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ aws-sdk:
- node test/instrumentation/modules/@aws-sdk/client-s3.test.js
node: '>=14'


'@aws-sdk/client-sns':
# Maintenance note: This should be updated periodically using:
# node ./dev-utils/tav-versions.js @aws-sdk/client-sns ">=3.15.0 <4"
Expand All @@ -555,6 +554,15 @@ aws-sdk:
- node test/instrumentation/modules/@aws-sdk/client-sns.test.js
node: '>=14'

'@aws-sdk/client-sqs':
# Maintenance note: This should be updated periodically using:
# node ./dev-utils/tav-versions.js @aws-sdk/client-sqs ">=3.15.0 <4"
#
# Test v3.15.0, every N=43 of 220 releases, and current latest.
versions: '3.15.0 || 3.55.0 || 3.165.0 || 3.238.0 || 3.319.0 || 3.409.0 || 3.418.0 || >3.418.0 <4'
commands:
- node test/instrumentation/modules/@aws-sdk/client-sqs.test.js
node: '>=14'

# - undici@4.7.0 added its diagnostics_channel support.
# - In undici@4.7.1 the `request.origin` property was added, which we need
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ See the <<upgrade-to-v4>> guide.
* Add support for `@aws-sdk/client-sns`, one of the AWS SDK v3 clients.
({issues}2956[#2956])
* Add support for `@aws-sdk/client-sqs`, one of the AWS SDK v3 clients.
({issues}2957[#2957])
[float]
===== Bug fixes
Expand Down
1 change: 1 addition & 0 deletions docs/supported-technologies.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ The Node.js agent will automatically instrument the following modules to give yo
|https://www.npmjs.com/package/aws-sdk[aws-sdk] |>=2.858.0 <3 |Will instrument SQS send/receive/delete messages, all S3 methods, all DynamoDB methods, and the SNS publish method
|https://www.npmjs.com/package/@aws-sdk/client-s3[@aws-sdk/client-s3] |>=3.15.0 <4 |Will instrument all S3 methods
|https://www.npmjs.com/package/@aws-sdk/client-s3[@aws-sdk/client-sns] |>=3.15.0 <4 |Will instrument the SNS publish method
|https://www.npmjs.com/package/@aws-sdk/client-s3[@aws-sdk/client-sqs] |>=3.15.0 <4 |Will instrument SQS send/receive/delete messages
|https://www.npmjs.com/package/@aws-sdk/client-dynamodb[@aws-sdk/client-dynamodb] |>=3.15.0 <4 |Will instrument all DynamoDB methods
|https://www.npmjs.com/package/cassandra-driver[cassandra-driver] |>=3.0.0 <5 |Will instrument all queries
|https://www.npmjs.com/package/elasticsearch[elasticsearch] |>=8.0.0 |Will instrument all queries
Expand Down
332 changes: 332 additions & 0 deletions lib/instrumentation/modules/@aws-sdk/client-sqs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
/*
* Copyright Elasticsearch B.V. and other contributors where applicable.
* Licensed under the BSD 2-Clause License; you may not use this file except in
* compliance with the BSD 2-Clause License.
*/

'use strict';

const {
MAX_MESSAGES_PROCESSED_FOR_TRACE_CONTEXT,
OUTCOME_FAILURE,
} = require('../../../constants');
const NAME = 'SQS';
const TYPE = 'messaging';
const SUBTYPE = 'sqs';
const elasticAPMStash = Symbol('elasticAPMStash');

// TODO: simthy-client already resolves span.action from the command name but does not have these values
// DeleteMessageCommand => span.action = 'DeleteMessage'
// do we need to have same messages (keep this key/val pairs) or its okay to have the ones form smithy-client?
const OPERATIONS_TO_ACTIONS = {
DeleteMessage: 'delete',
DeleteMessageBatch: 'delete_batch',
ReceiveMessage: 'poll',
SendMessageBatch: 'send_batch',
SendMessage: 'send',
unknown: 'unknown',
};
const OPERATIONS = Object.keys(OPERATIONS_TO_ACTIONS);
const MAX_SQS_MESSAGE_ATTRIBUTES = 10;
const queueMetrics = new Map();

/**
* Returns middlewares to instrument an S3Client instance
*
* @param {import('@aws-sdk/client-sqs').SQSClient} client
* @param {any} agent
* @returns {import('../@smithy/smithy-client').AWSMiddlewareEntry[]}
*/
function sqsMiddlewareFactory(client, agent) {
return [
{
middleware: (next, context) => async (args) => {
const ins = agent._instrumentation;
const log = agent.logger;
const span = ins.currSpan();
const input = args.input;

// W3C trace-context propagation.
const commandName = context.commandName.replace('Command', '');
const runContext = ins.currRunContext();
const parentSpan =
span || runContext.currSpan() || runContext.currTransaction();

if (parentSpan) {
const toPropagate = [];

if (commandName === 'SendMessage' && input.MessageAttributes) {
toPropagate.push(input.MessageAttributes);
} else if (
commandName === 'SendMessageBatch' &&
Array.isArray(input.Entries)
) {
toPropagate.push(...input.Entries.map((e) => e.MessageAttributes));
}

// Though our spec only mentions a 10-message-attribute limit for *SQS*, we'll
// do the same limit here, because
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
// mentions the 10-message-attribute limit for SQS subscriptions.
toPropagate.forEach((msgAttrs) => {
const attrsCount = Object.keys(msgAttrs).length + 2;
if (attrsCount > MAX_SQS_MESSAGE_ATTRIBUTES) {
log.warn(
{ QueueUrl: input.QueueUrl },
'cannot propagate trace-context with SQS message, too many MessageAttributes',
);
return;
}
parentSpan.propagateTraceContextHeaders(
msgAttrs,
function (msgAttrs, name, value) {
if (name.startsWith('elastic-')) {
return;
}
msgAttrs[name] = { DataType: 'String', StringValue: value };
},
);
});
}

// Ensure there is a span from the wrapped `client.send()`.
if (!span || !(span.type === TYPE && span.subtype === SUBTYPE)) {
return await next(args);
}

// Action is not equal to command/operation name, we have to map it
span.action = OPERATIONS_TO_ACTIONS[commandName] || 'unknown';

const queueName = getQueueNameFromCommand(args);
let toFrom = 'from';
if (span.action === 'send' || span.action === 'send_batch') {
toFrom = 'to';
}
span.name = `SQS ${span.action.toUpperCase()} ${toFrom} ${queueName}`;

let err;
let result;
let response;
let statusCode;
try {
result = await next(args);
response = result && result.response;
statusCode = response && response.statusCode;
} catch (ex) {
// Save the error for use in `finally` below, but re-throw it to
// not impact code flow.
err = ex;

// This code path happens with a GetObject conditional request
// that returns a 304 Not Modified.
statusCode = err && err.$metadata && err.$metadata.httpStatusCode;
throw ex;
} finally {
if (statusCode) {
span._setOutcomeFromHttpStatusCode(statusCode);
} else {
span._setOutcomeFromErrorCapture(OUTCOME_FAILURE);
}
if (err && (!statusCode || statusCode >= 400)) {
agent.captureError(err, { skipOutcome: true });
}

// Destination context.
const region = await client.config.region();
const service = { type: SUBTYPE };
const destCtx = { service };

if (context[elasticAPMStash]) {
destCtx.address = context[elasticAPMStash].hostname;
destCtx.port = context[elasticAPMStash].port;
}

if (region) {
destCtx.cloud = { region };
}

span._setDestinationContext(destCtx);

// Message context
span.setMessageContext({ queue: { name: queueName } });

const receiveMsgData =
span.action === 'poll' && result && result.output;
if (receiveMsgData) {
// Links
const links = getSpanLinksFromResponseData(result && result.output);
if (links) {
span._addLinks(links);
}

// Metrics
recordMetrics(queueName, receiveMsgData, agent);
}

span.end();
}

return result;
},
options: { step: 'initialize', priority: 'high', name: 'elasticAPMSpan' },
},
{
middleware: (next, context) => async (args) => {
const req = args.request;
let port = req.port;

// Resolve port for HTTP(S) protocols
if (port === undefined) {
if (req.protocol === 'https:') {
port = 443;
} else if (req.protocol === 'http:') {
port = 80;
}
}

context[elasticAPMStash] = {
hostname: req.hostname,
port,
};
return next(args);
},
options: { step: 'finalizeRequest', name: 'elasticAPMHTTPInfo' },
},
];
}

/**
* Get the queue name from a command
* @param {import('@aws-sdk/types').Command} command the command sent by the SQS client
* @returns {string} the queue name
*/
function getQueueNameFromCommand(command) {
const queueUrl = command && command.input && command.input.QueueUrl;

if (queueUrl) {
try {
const url = new URL(queueUrl);
return url.pathname.split('/').pop();
} catch {}
}

return 'unknown';
}

/**
* @typedef {import('@aws-sdk/client-sqs').Message} Message
*/
/**
* Extract span links from up to 1000 messages in this batch.
* https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-messaging.md#receiving-trace-context
*
* A span link is created from a `traceparent` message attribute in a message.
* `msg.messageAttributes` is of the form:
* { <attribute-name>: { DataType: <attr-type>, StringValue: <attr-value>, ... } }
* For example:
* { traceparent: { DataType: 'String', StringValue: 'test-traceparent' } }
*
* @param { {Messages?: Message[]} } data
* @returns { Array<{ context: string }> }
*/
function getSpanLinksFromResponseData(data) {
if (!data || !data.Messages || data.Messages.length === 0) {
return null;
}
const links = [];
const limit = Math.min(
data.Messages.length,
MAX_MESSAGES_PROCESSED_FOR_TRACE_CONTEXT,
);
for (let i = 0; i < limit; i++) {
const attrs = data.Messages[i].MessageAttributes;
if (!attrs) {
continue;
}

let traceparent;
const attrNames = Object.keys(attrs);
for (let j = 0; j < attrNames.length; j++) {
const attrVal = attrs[attrNames[j]];
if (attrVal.DataType !== 'String') {
continue;
}
const attrNameLc = attrNames[j].toLowerCase();
if (attrNameLc === 'traceparent') {
traceparent = attrVal.StringValue;
break;
}
}
if (traceparent) {
links.push({ context: traceparent });
}
}
return links;
}

/**
* Record queue related metrics
*
* Creates metric collector objects on first run, and
* updates their data with data from received messages
* @param {string} queueName
* @param { {Messages?: Message[]} } data
* @param {any} agent
*/
function recordMetrics(queueName, data, agent) {
const messages = data && data.Messages;
if (!messages || messages.length < 1) {
return;
}
if (!queueMetrics.get(queueName)) {
const collector = agent._metrics.createQueueMetricsCollector(queueName);
if (!collector) {
return;
}
queueMetrics.set(queueName, collector);
}
const metrics = queueMetrics.get(queueName);

for (const message of messages) {
const sentTimestamp =
message.Attributes && message.Attributes.SentTimestamp;
const delay = new Date().getTime() - sentTimestamp;
metrics.updateStats(delay);
}
}

/**
* Tells if the command needs to be ignored or not
* @param {import('@aws-sdk/types').Command} command the command sent by the SNS client
* @param {any} config the agent configuration
* @returns {boolean} false if the command should create a span
*/
function sqsShouldIgnoreCommand(command, config) {
const commandName = command.constructor.name;
const operation = commandName.replace(/Command$/, '');

if (OPERATIONS.indexOf(operation) === -1) {
return true;
}

if (config.ignoreMessageQueuesRegExp) {
const queueName = getQueueNameFromCommand(command);
if (queueName) {
for (const rule of config.ignoreMessageQueuesRegExp) {
if (rule.test(queueName)) {
return true;
}
}
}
}

return false;
}

module.exports = {
SQS_NAME: NAME,
SQS_TYPE: TYPE,
SQS_SUBTYPE: SUBTYPE,
sqsMiddlewareFactory,
sqsShouldIgnoreCommand,
};
Loading

0 comments on commit 6ea131b

Please sign in to comment.