Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1034: AWS SQS and SNS support #1051

Merged
merged 44 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8b93219
#1034: AWS SQS and SNS support
rypdal Feb 28, 2023
7f75e12
#1034: fixed filenames in headers
rypdal Feb 28, 2023
c0ff714
#1034: removed error CHARSET (fixed file encoding) + comment for used…
rypdal Feb 28, 2023
4cc849a
#1034: refactored in order to support "nothing or all" trace data inj…
rypdal Mar 1, 2023
222f782
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 2, 2023
8bdad4d
#1034: swap the order of parent context check for the SNS in SQS case
rypdal Mar 2, 2023
1147917
#1034: review suggestions and other improvements
rypdal Mar 2, 2023
e20c509
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 3, 2023
ed1ef5a
#1034: simplified adapter's interface and moved most of the processin…
rypdal Mar 3, 2023
2061808
#1034: fixed trailing whitespaces
rypdal Mar 3, 2023
c426560
#1034: further improvements
rypdal Mar 3, 2023
165a0d1
#1034: further code simplification
rypdal Mar 3, 2023
9f8d96a
#1034: further code simplifications
rypdal Mar 3, 2023
d55c66b
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 6, 2023
e51a882
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 8, 2023
4a09e75
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 13, 2023
3fbb6b0
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 16, 2023
6996859
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 17, 2023
c98d9e9
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 21, 2023
933c3be
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 24, 2023
94e489c
#1034: replaced string.StartsWith(...) call by it's overload with Str…
rypdal Mar 24, 2023
bdb0077
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 27, 2023
e88edbe
#1034: added sqs messages contexts as activity links
rypdal Mar 27, 2023
5ce87e1
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Mar 29, 2023
5bed7c6
#1034: small code optimization
rypdal Mar 29, 2023
24ae92d
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Apr 11, 2023
d2613c4
#1034: added option for configuring setting parent from SQS messages …
rypdal Apr 12, 2023
546ef3f
Update src/OpenTelemetry.Instrumentation.AWSLambda/AWSLambdaInstrumen…
rypdal Apr 13, 2023
0460e9b
Update src/OpenTelemetry.Instrumentation.AWSLambda/Implementation/AWS…
rypdal Apr 13, 2023
70acd21
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Apr 13, 2023
63e7445
Merge branch 'issue/AWS-Lambda-SQS-SNS-support' of https://github.com…
rypdal Apr 13, 2023
297effc
#1034: aligned naming + unit tests for SetParentFromMessageBatch option
rypdal Apr 13, 2023
9a9fa26
#1034: renamed var to avoid @-synthax
rypdal Apr 13, 2023
c8bec70
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Apr 21, 2023
1031789
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
rypdal Apr 25, 2023
0e4d385
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
Oberon00 Apr 26, 2023
89cb763
#1034: CHANGELOG update
rypdal Apr 27, 2023
f6a6f7b
#1034: changelog description update and white space fix
rypdal Apr 27, 2023
8cec2c7
Update src/OpenTelemetry.Contrib.Instrumentation.AWS/CHANGELOG.md
rypdal Apr 27, 2023
d0a3001
#1034: long line split
rypdal Apr 27, 2023
6b06054
Merge remote-tracking branch 'upstream/main' into issue/AWS-Lambda-SQ…
rypdal Apr 28, 2023
564407f
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
Oberon00 May 4, 2023
8e1c8df
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
Oberon00 May 10, 2023
8c8b9a0
Merge branch 'main' into issue/AWS-Lambda-SQS-SNS-support
utpilla May 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// <copyright file="AWSMessagingUtils.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using OpenTelemetry.Context.Propagation;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal static class AWSMessagingUtils
rypdal marked this conversation as resolved.
Show resolved Hide resolved
{
internal static void Inject(IRequestContextAdapter request, PropagationContext propagationContext)
{
if (!request.CanInject)
{
return;
}

var carrier = new Dictionary<string, string>();
Propagators.DefaultTextMapPropagator.Inject(propagationContext, carrier, (c, k, v) => c[k] = v);
request.AddAttributes(carrier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using Amazon.Runtime;

Expand All @@ -24,8 +23,8 @@ internal class AWSServiceHelper
{
internal static IReadOnlyDictionary<string, string> ServiceParameterMap = new Dictionary<string, string>()
{
{ DynamoDbService, "TableName" },
{ SQSService, "QueueUrl" },
{ AWSServiceType.DynamoDbService, "TableName" },
{ AWSServiceType.SQSService, "QueueUrl" },
};

internal static IReadOnlyDictionary<string, string> ParameterAttributeMap = new Dictionary<string, string>()
Expand All @@ -34,9 +33,6 @@ internal class AWSServiceHelper
{ "QueueUrl", AWSSemanticConventions.AttributeAWSSQSQueueUrl },
};

private const string DynamoDbService = "DynamoDBv2";
private const string SQSService = "SQS";

internal static string GetAWSServiceName(IRequestContext requestContext)
=> Utils.RemoveAmazonPrefixFromServiceName(requestContext.Request.ServiceName);

Expand All @@ -47,7 +43,4 @@ internal static string GetAWSOperationName(IRequestContext requestContext)
var operationName = Utils.RemoveSuffix(completeRequestName, suffix);
return operationName;
}

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// <copyright file="AWSServiceType.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;

internal class AWSServiceType
rypdal marked this conversation as resolved.
Show resolved Hide resolved
{
internal const string DynamoDbService = "DynamoDBv2";
internal const string SQSService = "SQS";
internal const string SNSService = "SimpleNotificationService"; // SNS

internal static bool IsDynamoDbService(string service)
=> DynamoDbService.Equals(service, StringComparison.OrdinalIgnoreCase);

internal static bool IsSqsService(string service)
=> SQSService.Equals(service, StringComparison.OrdinalIgnoreCase);

internal static bool IsSnsService(string service)
=> SNSService.Equals(service, StringComparison.OrdinalIgnoreCase);
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,18 @@ private void AddRequestSpecificInformation(Activity activity, IRequestContext re
}
}

if (AWSServiceHelper.IsDynamoDbService(service))
if (AWSServiceType.IsDynamoDbService(service))
{
activity.SetTag(SemanticConventions.AttributeDbSystem, AWSSemanticConventions.AttributeValueDynamoDb);
}
else if (AWSServiceType.IsSqsService(service))
{
AWSMessagingUtils.Inject(new SqsRequestContextAdapter(requestContext), new PropagationContext(activity.Context, Baggage.Current));
}
else if (AWSServiceType.IsSnsService(service))
{
AWSMessagingUtils.Inject(new SnsRequestContextAdapter(requestContext), new PropagationContext(activity.Context, Baggage.Current));
}
}

private void AddStatusCodeToActivity(Activity activity, int status_code)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// <copyright file="IRequestContextAdapter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal interface IRequestContextAdapter
rypdal marked this conversation as resolved.
Show resolved Hide resolved
{
bool CanInject { get; }

void AddAttributes(IReadOnlyDictionary<string, string> attributes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// <copyright file="SnsRequestContextAdapter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SimpleNotificationService.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal class SnsRequestContextAdapter : IRequestContextAdapter
{
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

private readonly ParameterCollection? parameters;
private readonly PublishRequest? originalRequest;

public SnsRequestContextAdapter(IRequestContext context)
{
this.parameters = context.Request?.ParameterCollection;
this.originalRequest = context.OriginalRequest as PublishRequest;
}

public bool CanInject => this.originalRequest?.MessageAttributes != null && this.parameters != null;

public void AddAttributes(IReadOnlyDictionary<string, string> attributes)
{
if (!this.CanInject)
rypdal marked this conversation as resolved.
Show resolved Hide resolved
{
return;
}

if (attributes.Keys.Any(k => this.ContainsAttribute(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = this.originalRequest?.MessageAttributes.Count ?? 0;
if (attributes.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
this.AddAttribute(param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
}

private void AddAttribute(string name, string value, int nextAttributeIndex)
{
if (!this.CanInject)
{
return;
}

var prefix = "MessageAttributes.entry." + nextAttributeIndex;
this.parameters?.Add(prefix + ".Name", name);
this.parameters?.Add(prefix + ".Value.DataType", "String");
this.parameters?.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
this.originalRequest?.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}

private bool ContainsAttribute(string name)
=> this.originalRequest?.MessageAttributes.ContainsKey(name) ?? false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// <copyright file="SqsRequestContextAdapter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System.Collections.Generic;
using System.Linq;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using Amazon.SQS.Model;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal class SqsRequestContextAdapter : IRequestContextAdapter
{
// SQS/SNS message attributes collection size limit according to
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html and
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

private readonly ParameterCollection? parameters;
private readonly SendMessageRequest? originalRequest;

public SqsRequestContextAdapter(IRequestContext context)
{
this.parameters = context.Request?.ParameterCollection;
this.originalRequest = context.OriginalRequest as SendMessageRequest;
}

public bool CanInject => this.originalRequest?.MessageAttributes != null && this.parameters != null;

public void AddAttributes(IReadOnlyDictionary<string, string> attributes)
{
if (!this.CanInject)
{
return;
}

if (attributes.Keys.Any(k => this.ContainsAttribute(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = this.originalRequest?.MessageAttributes.Count ?? 0;
if (attributes.Count + attributesCount > MaxMessageAttributes)
{
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in attributes)
{
this.AddAttribute(param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;
}
}

private void AddAttribute(string name, string value, int attributeIndex)
{
if (!this.CanInject)
{
return;
}

var prefix = "MessageAttribute." + attributeIndex;
this.parameters?.Add(prefix + ".Name", name);
this.parameters?.Add(prefix + ".Value.DataType", "String");
this.parameters?.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
this.originalRequest?.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}

private bool ContainsAttribute(string name) =>
this.originalRequest?.MessageAttributes.ContainsKey(name) ?? false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.7.101.15" />
rypdal marked this conversation as resolved.
Show resolved Hide resolved
<PackageReference Include="AWSSDK.SQS" Version="3.7.100.79" />
<PackageReference Include="OpenTelemetry.Contrib.Extensions.AWSXRay" Version="1.0.1" />
<PackageReference Include="AWSSDK.Core" Version="3.5.1.24" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Internal\Guard.cs" Link="Includes\Guard.cs" />
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Contrib.Shared\Api\SemanticConventions.cs" Link="Includes\SemanticConventions.cs"/>
<Compile Include="$(RepoRoot)\src\OpenTelemetry.Contrib.Shared\Api\SemanticConventions.cs" Link="Includes\SemanticConventions.cs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
using System.Linq;
using Amazon.Lambda.APIGatewayEvents;
using Amazon.Lambda.Core;
using Amazon.Lambda.SNSEvents;
using Amazon.Lambda.SQSEvents;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Contrib.Extensions.AWSXRay.Trace;

Expand Down Expand Up @@ -74,6 +76,18 @@ internal static ActivityContext ExtractParentContext<TInput>(TInput input)
case APIGatewayHttpApiV2ProxyRequest apiGatewayHttpApiV2ProxyRequest:
propagationContext = Propagators.DefaultTextMapPropagator.Extract(default, apiGatewayHttpApiV2ProxyRequest, GetHeaderValues);
break;
case SQSEvent sqsEvent:
propagationContext = AWSMessagingUtils.ExtractParentContext(sqsEvent);
break;
case SQSEvent.SQSMessage sqsMessage:
propagationContext = AWSMessagingUtils.ExtractParentContext(sqsMessage);
break;
case SNSEvent snsEvent:
propagationContext = AWSMessagingUtils.ExtractParentContext(snsEvent);
break;
case SNSEvent.SNSRecord snsRecord:
propagationContext = AWSMessagingUtils.ExtractParentContext(snsRecord);
break;
}

return propagationContext.ActivityContext;
Expand Down
Loading