Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1034: AWS SQS and SNS support #1051

Merged
merged 44 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8b93219
#1034: AWS SQS and SNS support
rypdal Feb 28, 2023
7f75e12
#1034: fixed filenames in headers
rypdal Feb 28, 2023
c0ff714
#1034: removed error CHARSET (fixed file encoding) + comment for used…
rypdal Feb 28, 2023
4cc849a
#1034: refactored in order to support "nothing or all" trace data inj…
rypdal Mar 1, 2023
222f782
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 2, 2023
8bdad4d
#1034: swap the order of parent context check for the SNS in SQS case
rypdal Mar 2, 2023
1147917
#1034: review suggestions and other improvements
rypdal Mar 2, 2023
e20c509
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 3, 2023
ed1ef5a
#1034: simplified adapter's interface and moved most of the processin…
rypdal Mar 3, 2023
2061808
#1034: fixed trailing whitespaces
rypdal Mar 3, 2023
c426560
#1034: further improvements
rypdal Mar 3, 2023
165a0d1
#1034: further code simplification
rypdal Mar 3, 2023
9f8d96a
#1034: further code simplifications
rypdal Mar 3, 2023
d55c66b
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 6, 2023
e51a882
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 8, 2023
4a09e75
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 13, 2023
3fbb6b0
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 16, 2023
6996859
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 17, 2023
c98d9e9
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 21, 2023
933c3be
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 24, 2023
94e489c
#1034: replaced string.StartsWith(...) call by it's overload with Str…
rypdal Mar 24, 2023
bdb0077
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 27, 2023
e88edbe
#1034: added sqs messages contexts as activity links
rypdal Mar 27, 2023
5ce87e1
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 29, 2023
5bed7c6
#1034: small code optimization
rypdal Mar 29, 2023
24ae92d
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Apr 11, 2023
d2613c4
#1034: added option for configuring setting parent from SQS messages …
rypdal Apr 12, 2023
546ef3f
Update src/OpenTelemetry.Instrumentation.AWSLambda/AWSLambdaInstrumen…
rypdal Apr 13, 2023
0460e9b
Update src/OpenTelemetry.Instrumentation.AWSLambda/Implementation/AWS…
rypdal Apr 13, 2023
70acd21
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Apr 13, 2023
63e7445
Merge branch 'issue/AWS-Lambda-SQS-SNS-support' of https://github.com…
rypdal Apr 13, 2023
297effc
#1034: aligned naming + unit tests for SetParentFromMessageBatch option
rypdal Apr 13, 2023
9a9fa26
#1034: renamed var to avoid @-synthax
rypdal Apr 13, 2023
c8bec70
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Apr 21, 2023
1031789
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Apr 25, 2023
0e4d385
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
Oberon00 Apr 26, 2023
89cb763
#1034: CHANGELOG update
rypdal Apr 27, 2023
f6a6f7b
#1034: changelog description update and white space fix
rypdal Apr 27, 2023
8cec2c7
Update src/OpenTelemetry.Contrib.Instrumentation.AWS/CHANGELOG.md
rypdal Apr 27, 2023
d0a3001
#1034: long line split
rypdal Apr 27, 2023
6b06054
Merge remote-tracking branch 'upstream/main' into issue/AWS-Lambda-SQ…
rypdal Apr 28, 2023
564407f
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
Oberon00 May 4, 2023
8e1c8df
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
Oberon00 May 10, 2023
8c8b9a0
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
utpilla May 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// <copyright file="AWSMessagingUtils.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using OpenTelemetry.Context.Propagation;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal static class AWSMessagingUtils
rypdal marked this conversation as resolved.
Show resolved Hide resolved
{
internal static IReadOnlyDictionary<string, string> InjectIntoDictionary(PropagationContext propagationContext)
{
var carrier = new Dictionary<string, string>();
Propagators.DefaultTextMapPropagator.Inject(propagationContext, carrier, (c, k, v) => c[k] = v);
return carrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using Amazon.Runtime;

Expand All @@ -24,8 +23,8 @@ internal class AWSServiceHelper
{
internal static IReadOnlyDictionary<string, string> ServiceParameterMap = new Dictionary<string, string>()
{
{ DynamoDbService, "TableName" },
{ SQSService, "QueueUrl" },
{ AWSServiceType.DynamoDbService, "TableName" },
{ AWSServiceType.SQSService, "QueueUrl" },
};

internal static IReadOnlyDictionary<string, string> ParameterAttributeMap = new Dictionary<string, string>()
Expand All @@ -34,9 +33,6 @@ internal class AWSServiceHelper
{ "QueueUrl", AWSSemanticConventions.AttributeAWSSQSQueueUrl },
};

private const string DynamoDbService = "DynamoDBv2";
private const string SQSService = "SQS";

internal static string GetAWSServiceName(IRequestContext requestContext)
=> Utils.RemoveAmazonPrefixFromServiceName(requestContext.Request.ServiceName);

Expand All @@ -47,7 +43,4 @@ internal static string GetAWSOperationName(IRequestContext requestContext)
var operationName = Utils.RemoveSuffix(completeRequestName, suffix);
return operationName;
}

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// <copyright file="AWSServiceType.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal class AWSServiceType
rypdal marked this conversation as resolved.
Show resolved Hide resolved
{
internal const string DynamoDbService = "DynamoDBv2";
internal const string SQSService = "SQS";
internal const string SNSService = "SimpleNotificationService"; // SNS

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);

internal static bool IsSqsService(string service)
=> SQSService.Equals(service, StringComparison.OrdinalIgnoreCase);

internal static bool IsSnsService(string service)
=> SNSService.Equals(service, StringComparison.OrdinalIgnoreCase);
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,20 @@ private static void AddRequestSpecificInformation(Activity activity, IRequestCon
}
}

if (AWSServiceHelper.IsDynamoDbService(service))
if (AWSServiceType.IsDynamoDbService(service))
{
activity.SetTag(SemanticConventions.AttributeDbSystem, AWSSemanticConventions.AttributeValueDynamoDb);
}
else if (AWSServiceType.IsSqsService(service))
{
SqsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
else if (AWSServiceType.IsSnsService(service))
{
SnsRequestContextHelper.AddAttributes(
requestContext, AWSMessagingUtils.InjectIntoDictionary(new PropagationContext(activity.Context, Baggage.Current)));
}
}

private static void AddStatusCodeToActivity(Activity activity, int status_code)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// <copyright file="SnsRequestContextHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SimpleNotificationService.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal class SnsRequestContextHelper
{
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<string, string> attributes)
{
var parameters = context.Request?.ParameterCollection;
var originalRequest = context.OriginalRequest as PublishRequest;
if (originalRequest?.MessageAttributes == null || parameters == null)
{
return;
}

if (attributes.Keys.Any(k => originalRequest.MessageAttributes.ContainsKey(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = originalRequest.MessageAttributes.Count;
if (attributes.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
AddAttribute(parameters, originalRequest, param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
}

private static void AddAttribute(ParameterCollection parameters, PublishRequest originalRequest, string name, string value, int attributeIndex)
{
var prefix = "MessageAttributes.entry." + attributeIndex;
parameters.Add(prefix + ".Name", name);
parameters.Add(prefix + ".Value.DataType", "String");
parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
originalRequest.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// <copyright file="SqsRequestContextHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SQS.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal class SqsRequestContextHelper
{
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

internal static void AddAttributes(IRequestContext context, IReadOnlyDictionary<string, string> attributes)
{
var parameters = context.Request?.ParameterCollection;
var originalRequest = context.OriginalRequest as SendMessageRequest;
if (originalRequest?.MessageAttributes == null || parameters == null)
{
return;
}

if (attributes.Keys.Any(k => originalRequest.MessageAttributes.ContainsKey(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = originalRequest.MessageAttributes.Count;
if (attributes.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
AddAttribute(parameters, originalRequest, param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
}

private static void AddAttribute(ParameterCollection parameters, SendMessageRequest originalRequest, string name, string value, int attributeIndex)
{
var prefix = "MessageAttribute." + attributeIndex;
parameters.Add(prefix + ".Name", name);
parameters.Add(prefix + ".Value.DataType", "String");
parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
originalRequest.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry.Contrib.Extensions.AWSXRay" Version="1.0.1" />
<PackageReference Include="AWSSDK.Core" Version="3.5.1.24" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.5.0.27" />
<PackageReference Include="AWSSDK.SQS" Version="3.5.0.26" />
<PackageReference Include="OpenTelemetry.Contrib.Extensions.AWSXRay" Version="1.0.1" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Internal\Guard.cs" Link="Includes\Guard.cs" />
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Contrib.Shared\Api\SemanticConventions.cs" Link="Includes\SemanticConventions.cs"/>
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Contrib.Shared\Api\SemanticConventions.cs" Link="Includes\SemanticConventions.cs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.AWSLambdaInstrumentationOptions() -> void
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.DisableAwsXRayContextExtraction.get -> bool
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.DisableAwsXRayContextExtraction.set -> void
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.SetParentFromBatch.get -> bool
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaInstrumentationOptions.SetParentFromBatch.set -> void
OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaWrapper
OpenTelemetry.Instrumentation.AWSLambda.TracerProviderBuilderExtensions
static OpenTelemetry.Instrumentation.AWSLambda.AWSLambdaWrapper.Trace<TInput, TResult>(OpenTelemetry.Trace.TracerProvider tracerProvider, System.Func<TInput, Amazon.Lambda.Core.ILambdaContext, TResult> lambdaHandler, TInput input, Amazon.Lambda.Core.ILambdaContext context, System.Diagnostics.ActivityContext parentContext = default(System.Diagnostics.ActivityContext)) -> TResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,13 @@ public class AWSLambdaInstrumentationOptions
/// Gets or sets a value indicating whether AWS X-Ray context extraction should be disabled.
/// </summary>
public bool DisableAwsXRayContextExtraction { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the parent Activity should be set when a potentially batched event is received where multiple parents are potentially available (e.g. SQS).
/// If set to true, the parent is set using the last received record (e.g. last message). Otherwise the parent is not set. In both cases, links will be created for such events.
/// </summary>
/// <remarks>
/// Currently, the only event type to which this applies is SQS.
/// </remarks>
public bool SetParentFromBatch { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
Expand Down Expand Up @@ -170,9 +171,10 @@ public static Task<TResult> TraceAsync<TInput, TResult>(

internal static Activity OnFunctionStart<TInput>(TInput input, ILambdaContext context, ActivityContext parentContext = default)
{
IEnumerable<ActivityLink> links = null;
if (parentContext == default)
{
parentContext = AWSLambdaUtils.ExtractParentContext(input);
(parentContext, links) = AWSLambdaUtils.ExtractParentContext(input);
if (parentContext == default && !DisableAwsXRayContextExtraction)
{
parentContext = AWSLambdaUtils.GetXRayParentContext();
Expand All @@ -184,7 +186,7 @@ internal static Activity OnFunctionStart<TInput>(TInput input, ILambdaContext co

// We assume that functionTags and httpTags have no intersection.
var activityName = AWSLambdaUtils.GetFunctionName(context) ?? "AWS Lambda Invoke";
var activity = AWSLambdaActivitySource.StartActivity(activityName, ActivityKind.Server, parentContext, functionTags.Concat(httpTags));
var activity = AWSLambdaActivitySource.StartActivity(activityName, ActivityKind.Server, parentContext, functionTags.Concat(httpTags), links);

return activity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
using System.Linq;
using Amazon.Lambda.APIGatewayEvents;
using Amazon.Lambda.Core;
using Amazon.Lambda.SNSEvents;
using Amazon.Lambda.SQSEvents;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Contrib.Extensions.AWSXRay.Trace;

Expand Down Expand Up @@ -63,20 +65,33 @@ internal static ActivityContext GetXRayParentContext()
return activityContext;
}

internal static ActivityContext ExtractParentContext<TInput>(TInput input)
internal static (ActivityContext ParentContext, IEnumerable<ActivityLink> Links) ExtractParentContext<TInput>(TInput input)
{
PropagationContext propagationContext = default;
PropagationContext parentContext = default;
IEnumerable<ActivityLink> links = null;
switch (input)
{
case APIGatewayProxyRequest apiGatewayProxyRequest:
propagationContext = Propagators.DefaultTextMapPropagator.Extract(default, apiGatewayProxyRequest, GetHeaderValues);
parentContext = Propagators.DefaultTextMapPropagator.Extract(default, apiGatewayProxyRequest, GetHeaderValues);
break;
case APIGatewayHttpApiV2ProxyRequest apiGatewayHttpApiV2ProxyRequest:
propagationContext = Propagators.DefaultTextMapPropagator.Extract(default, apiGatewayHttpApiV2ProxyRequest, GetHeaderValues);
parentContext = Propagators.DefaultTextMapPropagator.Extract(default, apiGatewayHttpApiV2ProxyRequest, GetHeaderValues);
break;
case SQSEvent sqsEvent:
(parentContext, links) = AWSMessagingUtils.ExtractParentContext(sqsEvent);
break;
case SQSEvent.SQSMessage sqsMessage:
parentContext = AWSMessagingUtils.ExtractParentContext(sqsMessage);
break;
case SNSEvent snsEvent:
parentContext = AWSMessagingUtils.ExtractParentContext(snsEvent);
break;
case SNSEvent.SNSRecord snsRecord:
parentContext = AWSMessagingUtils.ExtractParentContext(snsRecord);
break;
}

return propagationContext.ActivityContext;
return (parentContext.ActivityContext, links);
}

internal static string GetCloudProvider()
Expand Down
Loading