Skip to content

Commit

Permalink
#1034: review suggestions and other improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
rypdal committed Mar 2, 2023
1 parent 8bdad4d commit 1147917
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// </copyright>

using System.Collections.Generic;
using System.Linq;
using OpenTelemetry.Context.Propagation;

namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
Expand All @@ -26,41 +27,38 @@ internal static class AWSMessagingUtils
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
private const int MaxMessageAttributes = 10;

internal static void Inject(IRequestContextAdapter requestAdapter, PropagationContext propagationContext)
internal static void Inject(IRequestContextAdapter request, PropagationContext propagationContext)
{
if (!requestAdapter.HasMessageBody ||
!requestAdapter.HasOriginalRequest)
if (!request.CanInject)
{
return;
}

var carrier = new Dictionary<string, string>();
Propagators.DefaultTextMapPropagator.Inject(propagationContext, carrier, Setter);
Propagators.DefaultTextMapPropagator.Inject(propagationContext, carrier, (c, k, v) => c[k] = v);
if (carrier.Keys.Any(k => request.ContainsAttribute(k)))
{
// If at least one attribute is already present in the request then we skip the injection.
return;
}

int attributesCount = requestAdapter.AttributesCount;
int attributesCount = request.AttributesCount;
if (carrier.Count + attributesCount > MaxMessageAttributes)
{
// TODO: Add logging (event source).
// TODO: add logging (event source).
return;
}

int nextAttributeIndex = attributesCount + 1;
foreach (var param in carrier)
{
if (requestAdapter.ContainsAttribute(param.Key))
if (request.ContainsAttribute(param.Key))
{
continue;
}

requestAdapter.AddAttribute(param.Key, param.Value, nextAttributeIndex);
request.AddAttribute(param.Key, param.Value, nextAttributeIndex);
nextAttributeIndex++;

// Add trace data to message attributes dictionary of the original request.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
requestAdapter.AddAttributeToOriginalRequest(param.Key, param.Value);
}
}

private static void Setter(IDictionary<string, string> carrier, string name, string value) =>
carrier[name] = value;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@
namespace OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
internal interface IRequestContextAdapter
{
bool HasMessageBody { get; }

bool HasOriginalRequest { get; }
bool CanInject { get; }

int AttributesCount { get; }

bool ContainsAttribute(string name);

void AddAttribute(string name, string value, int nextAttributeIndex);

void AddAttributeToOriginalRequest(string name, string value);
void AddAttribute(string name, string value, int attributeIndex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ public SnsRequestContextAdapter(IRequestContext context)
this.originalRequest = context.OriginalRequest as PublishRequest;
}

public bool HasMessageBody =>
this.parameters?.ContainsKey("Message") ?? false;

public bool HasOriginalRequest => this.originalRequest != null;
public bool CanInject => this.originalRequest != null;

public int AttributesCount =>
this.originalRequest?.MessageAttributes.Count ?? 0;
Expand All @@ -45,14 +42,15 @@ public void AddAttribute(string name, string value, int nextAttributeIndex)
return;
}

var attributePrefix = "MessageAttributes.entry." + nextAttributeIndex;
this.parameters.Add(attributePrefix + ".Name", name);
this.parameters.Add(attributePrefix + ".Value.DataType", "String");
this.parameters.Add(attributePrefix + ".Value.StringValue", value);
}
var prefix = "MessageAttributes.entry." + nextAttributeIndex;
this.parameters.Add(prefix + ".Name", name);
this.parameters.Add(prefix + ".Value.DataType", "String");
this.parameters.Add(prefix + ".Value.StringValue", value);

public void AddAttributeToOriginalRequest(string name, string value) =>
// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
this.originalRequest?.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}

public bool ContainsAttribute(string name)
=> this.originalRequest?.MessageAttributes.ContainsKey(name) ?? false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,26 @@ public SqsRequestContextAdapter(IRequestContext context)
this.originalRequest = context.OriginalRequest as SendMessageRequest;
}

public bool HasMessageBody =>
this.parameters?.ContainsKey("MessageBody") ?? false;

public bool HasOriginalRequest => this.originalRequest != null;
public bool CanInject => this.originalRequest != null;

public int AttributesCount =>
this.originalRequest?.MessageAttributes.Count ?? 0;

public void AddAttribute(string name, string value, int nextAttributeIndex)
public void AddAttribute(string name, string value, int attributeIndex)
{
if (this.parameters == null)
{
return;
}

var attributePrefix = "MessageAttribute." + nextAttributeIndex;
this.parameters.Add(attributePrefix + ".Name", name);
this.parameters.Add(attributePrefix + ".Value.DataType", "String");
this.parameters.Add(attributePrefix + ".Value.StringValue", value);
var prefix = "MessageAttribute." + attributeIndex;
this.parameters.Add(prefix + ".Name", name);
this.parameters.Add(prefix + ".Value.DataType", "String");
this.parameters.Add(prefix + ".Value.StringValue", value);

// Add injected attributes to the original request as well.
// This dictionary must be in sync with parameters collection to pass through the MD5 hash matching check.
this.originalRequest?.MessageAttributes.Add(name, new MessageAttributeValue { DataType = "String", StringValue = value });
}

public void AddAttributeToOriginalRequest(string name, string value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,14 @@ internal class AWSMessagingUtils

internal static PropagationContext ExtractParentContext(SQSEvent sqsEvent)
{
if (sqsEvent == null)
{
return default;
}

// We assume there can be only one parent that's why we consider only a single (the last) record as the carrier.
var message = sqsEvent.Records.LastOrDefault();
var message = sqsEvent?.Records?.LastOrDefault();
return ExtractParentContext(message);
}

internal static PropagationContext ExtractParentContext(SQSEvent.SQSMessage sqsMessage)
{
if (sqsMessage == null)
if (sqsMessage?.MessageAttributes == null)
{
return default;
}
Expand All @@ -67,52 +62,45 @@ internal static PropagationContext ExtractParentContext(SQSEvent.SQSMessage sqsM

internal static PropagationContext ExtractParentContext(SNSEvent snsEvent)
{
if (snsEvent == null)
{
return default;
}

// We assume there can be only one parent that's why we consider only a single (the last) record as the carrier.
var record = snsEvent.Records.LastOrDefault();
var record = snsEvent?.Records?.LastOrDefault();
return ExtractParentContext(record);
}

internal static PropagationContext ExtractParentContext(SNSEvent.SNSRecord record)
{
return (record?.Sns != null) ?
return (record?.Sns?.MessageAttributes != null) ?
Propagators.DefaultTextMapPropagator.Extract(default, record.Sns.MessageAttributes, SnsMessageAttributeGetter) :
default;
}

internal static PropagationContext ExtractParentContext(SNSEvent.SNSMessage message)
{
return (message != null) ?
return (message?.MessageAttributes != null) ?
Propagators.DefaultTextMapPropagator.Extract(default, message.MessageAttributes, SnsMessageAttributeGetter) :
default;
}

private static IEnumerable<string> SqsMessageAttributeGetter(IDictionary<string, SQSEvent.MessageAttribute> attributes, string attributeName)
{
SQSEvent.MessageAttribute attribute = attributes.GetValueByKeyIgnoringCase(attributeName);
if (attribute == null)
if (!attributes.TryGetValue(attributeName, out var attribute))
{
return null;
}

return attribute.StringValue != null ?
return attribute?.StringValue != null ?
new[] { attribute.StringValue } :
attribute.StringListValues;
attribute?.StringListValues;
}

private static IEnumerable<string> SnsMessageAttributeGetter(IDictionary<string, SNSEvent.MessageAttribute> attributes, string attributeName)
{
SNSEvent.MessageAttribute attribute = attributes.GetValueByKeyIgnoringCase(attributeName);
if (attribute == null)
if (!attributes.TryGetValue(attributeName, out var attribute))
{
return null;
}

switch (attribute.Type)
switch (attribute?.Type)
{
case SnsAttributeTypeString when attribute.Value != null:
return new[] { attribute.Value };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public AWSMessagingUtilsTests()
public void Inject_ParametersCollectionSizeReachesLimit_TraceDataNotInjected(string serviceType)
{
AmazonWebServiceRequest originalRequest = TestsHelper.CreateOriginalRequest(serviceType, 10);
var parameters = TestsHelper.DefaultParameterCollection(serviceType);
var parameters = new ParameterCollection();
parameters.AddStringParameters(serviceType, originalRequest);

var request = new Mock<IRequest>();
Expand All @@ -62,13 +62,13 @@ public void Inject_ParametersCollectionSizeReachesLimit_TraceDataNotInjected(str

AWSMessagingUtils.Inject(adapter, CreatePropagationContext());

Assert.Equal(31, parameters.Count);
Assert.Equal(30, parameters.Count);
}

[Theory]
[InlineData(AWSServiceType.SQSService)]
[InlineData(AWSServiceType.SNSService)]
public void Inject_DefaultParametersCollection_TraceDataInjected(string serviceType)
public void Inject_ParametersCollection_TraceDataInjected(string serviceType)
{
var expectedParameters = new List<KeyValuePair<string, string>>()
{
Expand All @@ -77,7 +77,7 @@ public void Inject_DefaultParametersCollection_TraceDataInjected(string serviceT
};

AmazonWebServiceRequest originalRequest = TestsHelper.CreateOriginalRequest(serviceType, 0);
var parameters = TestsHelper.DefaultParameterCollection(serviceType);
var parameters = new ParameterCollection();

var request = new Mock<IRequest>();
request.Setup(x => x.ParameterCollection)
Expand Down Expand Up @@ -109,7 +109,7 @@ public void Inject_ParametersCollectionWithCustomParameter_TraceDataInjected(str
};

AmazonWebServiceRequest originalRequest = TestsHelper.CreateOriginalRequest(serviceType, 1);
var parameters = TestsHelper.DefaultParameterCollection(serviceType);
var parameters = new ParameterCollection();
parameters.AddStringParameters(serviceType, originalRequest);

var request = new Mock<IRequest>();
Expand All @@ -132,19 +132,21 @@ public void Inject_ParametersCollectionWithCustomParameter_TraceDataInjected(str
[Theory]
[InlineData(AWSServiceType.SQSService)]
[InlineData(AWSServiceType.SNSService)]
public void Inject_ParametersCollectionWithTraceData_TraceDataNotInjected(string serviceType)
public void Inject_ParametersCollectionWithTraceParent_TraceStateNotInjected(string serviceType)
{
// This test just checks the common implementation logic:
// if at least one attribute is already present the whole injection is skipped.
// We just use default trace propagator as an example which injects only traceparent and tracestate.

var expectedParameters = new List<KeyValuePair<string, string>>()
{
new KeyValuePair<string, string>("traceparent", $"00-{TraceId}-{ParentId}-00"),
new KeyValuePair<string, string>("tracestate", "trace-state"),
};

AmazonWebServiceRequest originalRequest = TestsHelper.CreateOriginalRequest(serviceType, 0);
originalRequest.AddAttribute("traceparent", $"00-{TraceId}-{ParentId}-00");
originalRequest.AddAttribute("tracestate", $"trace-state");

var parameters = TestsHelper.DefaultParameterCollection(serviceType);
var parameters = new ParameterCollection();
parameters.AddStringParameters(serviceType, originalRequest);

var request = new Mock<IRequest>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Drawing;
using Amazon.Runtime;
using Amazon.Runtime.Internal;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Contrib.Instrumentation.AWS.Implementation;
using Xunit;
using SNS = Amazon.SimpleNotificationService.Model;
Expand Down Expand Up @@ -113,7 +110,7 @@ internal static void AddStringParameters(this ParameterCollection parameters, st

internal static void AssertStringParameters(string serviceType, List<KeyValuePair<string, string>> expectedParameters, ParameterCollection parameters)
{
Assert.Equal((expectedParameters.Count * 3) + 1, parameters.Count);
Assert.Equal(expectedParameters.Count * 3, parameters.Count);

for (int i = 0; i < expectedParameters.Count; i++)
{
Expand All @@ -131,14 +128,6 @@ internal static void AssertStringParameters(string serviceType, List<KeyValuePai
}
}

internal static ParameterCollection DefaultParameterCollection(string serviceType)
{
return new ParameterCollection
{
{ GetMessageBodyAttributeName(serviceType), string.Empty },
};
}

private static string GetNamePrefix(string serviceType)
{
return serviceType switch
Expand All @@ -148,14 +137,4 @@ private static string GetNamePrefix(string serviceType)
_ => throw new NotSupportedException($"Tests for service type {serviceType} not supported."),
};
}

private static string GetMessageBodyAttributeName(string serviceType)
{
return serviceType switch
{
AWSServiceType.SQSService => "MessageBody",
AWSServiceType.SNSService => "Message",
_ => throw new NotSupportedException($"Tests for service type {serviceType} not supported."),
};
}
}

0 comments on commit 1147917

Please sign in to comment.