Skip to content

Commit

Permalink
feat: First part of wiring up actual tracing (unary only)
Browse files Browse the repository at this point in the history
There's just a single test so far - more required. We should discuss this commit in detail before going further.
Streaming calls are trickier, because we don't want to stop the activity before the call has actually completed. (We may not even know that in some cases.)
  • Loading branch information
jskeet committed Mar 14, 2024
1 parent 9c2aa13 commit 57c7919
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 6 deletions.
46 changes: 46 additions & 0 deletions Google.Api.Gax.Grpc.Tests/ApiCallTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
using Google.Protobuf;
using Google.Protobuf.Reflection;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -217,6 +220,49 @@ public async Task WithLogging_Async()
Assert.All(entries, entry => Assert.Contains("SimpleMethod", entry.Message));
}

[Fact]
public void WithTracing_Sync()
{
using var helper = new ActivityHelper();
var call = new ApiCall<SimpleRequest, SimpleResponse>(
"SimpleMethod",
(req, cs) => Task.FromResult(default(SimpleResponse)),
(req, cs) => null,
null).WithTracing(helper.Source);

call.Sync(new SimpleRequest(), null);

var activity = helper.CapturedActivity;
Assert.NotNull(activity);
Assert.Contains("SimpleMethod", activity.OperationName);
Assert.Equal(ActivityStatusCode.Ok, activity.Status);
}

private class ActivityHelper : IDisposable
{
public ActivitySource Source { get; }
public Activity CapturedActivity { get; private set; }
private readonly ActivityListener _listener;

internal ActivityHelper([CallerMemberName] string name = null)
{
Source = new ActivitySource(name);
_listener = new ActivityListener
{
ShouldListenTo = candidate => candidate == Source,
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStarted = activity => CapturedActivity = activity
};
ActivitySource.AddActivityListener(_listener);
}

public void Dispose()
{
_listener.Dispose();
Source.Dispose();
}
}

internal class ExtractedRequestParamRequest : IMessage<ExtractedRequestParamRequest>
{
public string TableName { get; set; }
Expand Down
99 changes: 93 additions & 6 deletions Google.Api.Gax.Grpc/ApiCallTracingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;

namespace Google.Api.Gax.Grpc;
Expand All @@ -19,15 +17,72 @@ namespace Google.Api.Gax.Grpc;
/// </summary>
internal static class ApiCallTracingExtensions
{
internal const string AttributeExceptionEventName = "exception";
internal const string AttributeExceptionType = "exception.type";
internal const string AttributeExceptionMessage = "exception.message";
internal const string AttributeExceptionStacktrace = "exception.stacktrace";

internal const string GrpcCallTypeTag = "grpc.call.type";
internal const string UnaryCallType = "unary";
internal const string ServerStreamingCallType = "server_streaming";
internal const string ClientStreamingCallType = "client_streaming";
internal const string BidiStreamingCallType = "bidi_streaming";

// Unary async
internal static Func<TRequest, CallSettings, Task<TResponse>> WithTracing<TRequest, TResponse>(
this Func<TRequest, CallSettings, Task<TResponse>> fn, ActivitySource activitySource, string methodName) =>
fn;
this Func<TRequest, CallSettings, Task<TResponse>> fn, ActivitySource activitySource, string methodName)
{
GaxPreconditions.CheckNotNull(activitySource, nameof(activitySource));
var activityName = FormatActivityName(fn, methodName);
return async (request, callSettings) =>
{
using var activity = activitySource.StartActivity(activityName, ActivityKind.Client);
activity?.SetTag(GrpcCallTypeTag, UnaryCallType);
// TODO: Add a tag with the name of the client, in case a custom source has been provided?
try
{
var response = await fn(request, callSettings).ConfigureAwait(false);
activity?.SetStatus(ActivityStatusCode.Ok);
return response;
}
catch (Exception ex) when (SetActivityException(activity, ex))
{
// We'll never actually get here, because SetActivityException always returns false.
// Alternative: catch without an exception filter, make SetActivityException return void,
// and call ExceptionDispatchInfo.Capture(e).Throw();.
throw;
}
};
}

// Unary sync
internal static Func<TRequest, CallSettings, TResponse> WithTracing<TRequest, TResponse>(
this Func<TRequest, CallSettings, TResponse> fn, ActivitySource activitySource, string methodName) =>
fn;
this Func<TRequest, CallSettings, TResponse> fn, ActivitySource activitySource, string methodName)
{
GaxPreconditions.CheckNotNull(activitySource, nameof(activitySource));
var activityName = FormatActivityName(fn, methodName);
return (request, callSettings) =>
{
using var activity = activitySource.StartActivity(activityName, ActivityKind.Client);
activity?.SetTag(GrpcCallTypeTag, UnaryCallType);
// TODO: Add a tag with the name of the client, in case a custom source has been provided?
try
{
var response = fn(request, callSettings);
activity?.SetStatus(ActivityStatusCode.Ok);
return response;
}
catch (Exception ex) when (SetActivityException(activity, ex))
{
// We'll never actually get here, because SetActivityException always returns false.
// Alternative: catch without an exception filter, make SetActivityException return void,
// and call ExceptionDispatchInfo.Capture(e).Throw();.
// (As this is a sync method we may be okay just to use a throw statement. But we need to
// validate that we're not losing any info.)
throw;
}
};
}

// Server-streaming async
internal static Func<TRequest, CallSettings, Task<AsyncServerStreamingCall<TResponse>>> WithTracing<TRequest, TResponse>(
Expand All @@ -48,4 +103,36 @@ internal static Func<CallSettings, AsyncClientStreamingCall<TRequest, TResponse>
internal static Func<CallSettings, AsyncDuplexStreamingCall<TRequest, TResponse>> WithTracing<TRequest, TResponse>(
this Func<CallSettings, AsyncDuplexStreamingCall<TRequest, TResponse>> fn, ActivitySource activitySource, string methodName) =>
fn;

// This is still very much up in the air, and may even require changes to the parameters, so that we get more information
// (e.g. the full RPC name, the client name etc).
private static string FormatActivityName(Delegate fn, string methodName) => $"{fn.Method.Name}/{methodName}";

// TODO: See if there's a standard way of doing this. It seems odd to have to do it ourselves.
/// <summary>
/// Sets an exception within an activity. We may wish to expose this publicly for integration purposes.
/// This always returns false, so that it can be used as an exception filter.
/// </summary>
private static bool SetActivityException(Activity activity, Exception ex)
{
if (ex is null || activity is null)
{
return false;
}

var tagsCollection = new ActivityTagsCollection
{
{ AttributeExceptionType, ex.GetType().FullName },
{ AttributeExceptionStacktrace, ex.ToString() },
};

if (!string.IsNullOrWhiteSpace(ex.Message))
{
tagsCollection[AttributeExceptionMessage] = ex.Message;
}

activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent(AttributeExceptionEventName, default, tagsCollection));
return false;
}
}

0 comments on commit 57c7919

Please sign in to comment.