Skip to content

Commit

Permalink
#1034: AWS SQS and SNS support (#1051)
Browse files Browse the repository at this point in the history
  • Loading branch information
rypdal authored May 15, 2023
1 parent d3f70ef commit ee5749c
Show file tree
Hide file tree
Showing 21 changed files with 825 additions and 21 deletions.
3 changes: 3 additions & 0 deletions src/OpenTelemetry.Contrib.Instrumentation.AWS/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
([#1095](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1095))
* Removes `AddAWSInstrumentation` method with default configure default parameter.
([#1117](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1117))
* Global propagator is now used to inject into sent SQS and SNS message
attributes (in addition to X-Ray propagation).
([#1051](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1051))

## 1.0.2

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// <copyright file="AWSMessagingUtils.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using OpenTelemetry.Context.Propagation;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal static class AWSMessagingUtils
{
internal static IReadOnlyDictionary<string, string> InjectIntoDictionary(PropagationContext propagationContext)
{
var carrier = new Dictionary<string, string>();
Propagators.DefaultTextMapPropagator.Inject(propagationContext, carrier, (c, k, v) => c[k] = v);
return carrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using Amazon.Runtime;

Expand All @@ -24,8 +23,8 @@ internal class AWSServiceHelper
{
internal static IReadOnlyDictionary<string, string> ServiceParameterMap = new Dictionary<string, string>()
{
{ DynamoDbService, "TableName" },
{ SQSService, "QueueUrl" },
{ AWSServiceType.DynamoDbService, "TableName" },
{ AWSServiceType.SQSService, "QueueUrl" },
};

internal static IReadOnlyDictionary<string, string> ParameterAttributeMap = new Dictionary<string, string>()
Expand All @@ -34,9 +33,6 @@ internal class AWSServiceHelper
{ "QueueUrl", AWSSemanticConventions.AttributeAWSSQSQueueUrl },
};

private const string DynamoDbService = "DynamoDBv2";
private const string SQSService = "SQS";

internal static string GetAWSServiceName(IRequestContext requestContext)
=> Utils.RemoveAmazonPrefixFromServiceName(requestContext.Request.ServiceName);

Expand All @@ -47,7 +43,4 @@ internal static string GetAWSOperationName(IRequestContext requestContext)
var operationName = Utils.RemoveSuffix(completeRequestName, suffix);
return operationName;
}

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// <copyright file="AWSServiceType.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal class AWSServiceType
{
internal const string DynamoDbService = "DynamoDBv2";
internal const string SQSService = "SQS";
internal const string SNSService = "SimpleNotificationService"; // SNS

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);

internal static bool IsSqsService(string service)
=> SQSService.Equals(service, StringComparison.OrdinalIgnoreCase);

internal static bool IsSnsService(string service)
=> SNSService.Equals(service, StringComparison.OrdinalIgnoreCase);
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,20 @@ private static void AddRequestSpecificInformation(Activity activity, IRequestCon
}
}

if (AWSServiceHelper.IsDynamoDbService(service))
if (AWSServiceType.IsDynamoDbService(service))
{
activity.SetTag(SemanticConventions.AttributeDbSystem, AWSSemanticConventions.AttributeValueDynamoDb);
}
else if (AWSServiceType.IsSqsService(service))
{
SqsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
else if (AWSServiceType.IsSnsService(service))
{
SnsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
}

private static void AddStatusCodeToActivity(Activity activity, int status_code)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// <copyright file="SnsRequestContextHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SimpleNotificationService.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal class SnsRequestContextHelper
{
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<string, string> attributes)
{
var parameters = context.Request?.ParameterCollection;
var originalRequest = context.OriginalRequest as PublishRequest;
if (originalRequest?.MessageAttributes == null || parameters == null)
{
return;
}

if (attributes.Keys.Any(k => originalRequest.MessageAttributes.ContainsKey(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = originalRequest.MessageAttributes.Count;
if (attributes.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
AddAttribute(parameters, originalRequest, param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
}

private static void AddAttribute(ParameterCollection parameters, PublishRequest originalRequest, string name, string value, int attributeIndex)
{
var prefix = "MessageAttributes.entry." + attributeIndex;
parameters.Add(prefix + ".Name", name);
parameters.Add(prefix + ".Value.DataType", "String");
parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
originalRequest.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// <copyright file="SqsRequestContextHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SQS.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal class SqsRequestContextHelper
{
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<string, string> attributes)
{
var parameters = context.Request?.ParameterCollection;
var originalRequest = context.OriginalRequest as SendMessageRequest;
if (originalRequest?.MessageAttributes == null || parameters == null)
{
return;
}

if (attributes.Keys.Any(k => originalRequest.MessageAttributes.ContainsKey(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = originalRequest.MessageAttributes.Count;
if (attributes.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
AddAttribute(parameters, originalRequest, param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
}

private static void AddAttribute(ParameterCollection parameters, SendMessageRequest originalRequest, string name, string value, int attributeIndex)
{
var prefix = "MessageAttribute." + attributeIndex;
parameters.Add(prefix + ".Name", name);
parameters.Add(prefix + ".Value.DataType", "String");
parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
originalRequest.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry.Contrib.Extensions.AWSXRay" Version="1.0.1" />
<PackageReference Include="AWSSDK.Core" Version="3.5.1.24" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.5.0.27" />
<PackageReference Include="AWSSDK.SQS" Version="3.5.0.26" />
<PackageReference Include="OpenTelemetry.Contrib.Extensions.AWSXRay" Version="1.0.1" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(RepoRoot)\src\Shared\Guard.cs" Link="Includes\Guard.cs" />
<Compile Include="$(RepoRoot)\src\Shared\SemanticConventions.cs" Link="Includes\SemanticConventions.cs"/>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.AWSLambdaInstrumentationOptions() -> void
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.DisableAwsXRayContextExtraction.get -> bool
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.DisableAwsXRayContextExtraction.set -> void
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.SetParentFromBatch.get -> bool
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.SetParentFromBatch.set -> void
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaWrapper
OpenTelemetry.Instrumentation.AWSLambda.TracerProviderBuilderExtensions
static OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaWrapper.Trace<TInput, TResult>(OpenTelemetry.Trace.TracerProvider tracerProvider, System.Func<TInput, Amazon.Lambda.Core.ILambdaContext, TResult> lambdaHandler, TInput input, Amazon.Lambda.Core.ILambdaContext context, System.Diagnostics.ActivityContext parentContext = default(System.Diagnostics.ActivityContext)) -> TResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,13 @@ public class AWSLambdaInstrumentationOptions
/// Gets or sets a value indicating whether AWS X-Ray context extraction should be disabled.
/// </summary>
public bool DisableAwsXRayContextExtraction { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the parent Activity should be set when a potentially batched event is received where multiple parents are potentially available (e.g. SQS).
/// If set to true, the parent is set using the last received record (e.g. last message). Otherwise the parent is not set. In both cases, links will be created for such events.
/// </summary>
/// <remarks>
/// Currently, the only event type to which this applies is SQS.
/// </remarks>
public bool SetParentFromBatch { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
Expand Down Expand Up @@ -170,9 +171,10 @@ public static Task<TResult> TraceAsync<TInput, TResult>(

internal static Activity OnFunctionStart<TInput>(TInput input, ILambdaContext context, ActivityContext parentContext = default)
{
IEnumerable<ActivityLink> links = null;
if (parentContext == default)
{
parentContext = AWSLambdaUtils.ExtractParentContext(input);
(parentContext, links) = AWSLambdaUtils.ExtractParentContext(input);
if (parentContext == default && !DisableAwsXRayContextExtraction)
{
parentContext = AWSLambdaUtils.GetXRayParentContext();
Expand All @@ -184,7 +186,7 @@ internal static Activity OnFunctionStart<TInput>(TInput input, ILambdaContext co

// We assume that functionTags and httpTags have no intersection.
var activityName = AWSLambdaUtils.GetFunctionName(context) ?? "AWS Lambda Invoke";
var activity = AWSLambdaActivitySource.StartActivity(activityName, ActivityKind.Server, parentContext, functionTags.Concat(httpTags));
var activity = AWSLambdaActivitySource.StartActivity(activityName, ActivityKind.Server, parentContext, functionTags.Concat(httpTags), links);

return activity;
}
Expand Down
2 changes: 2 additions & 0 deletions src/OpenTelemetry.Instrumentation.AWSLambda/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
([#943](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/943))
* BREAKING (behavior): `AddAWSLambdaConfigurations` no longer calls `AddService`
([#1080](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1080))
* Added tracing of AWS Lambda handlers receiving SQS and SNS messages.
([#1051](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1051))

## 1.1.0-beta.2

Expand Down
Loading

0 comments on commit ee5749c

Please sign in to comment.