From caa82918299ef0f43b8c2a0cc3f51ac7967a6f4e Mon Sep 17 00:00:00 2001 From: Chris Chaffin Date: Mon, 17 Apr 2023 13:52:23 -0500 Subject: [PATCH] add config for rpc.server.duration add tests update change log and split up test remove accidental dependabot change --- .gitignore | 1 + CHANGELOG.md | 1 + .../google.golang.org/grpc/otelgrpc/config.go | 19 +++- .../grpc/otelgrpc/interceptor.go | 12 ++- .../grpc/otelgrpc/test/bufconn_mock.go | 102 ++++++++++++++++++ .../grpc/otelgrpc/test/grpc_test.go | 63 +++++++++-- .../grpc/otelgrpc/test/interceptor_test.go | 16 ++- 7 files changed, 200 insertions(+), 14 deletions(-) create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/test/bufconn_mock.go diff --git a/.gitignore b/.gitignore index 19655fe9a3c..545f57f73e6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ Thumbs.db .tools/ .idea/ .vscode/ +vendor/ *.iml *.so coverage.* diff --git a/CHANGELOG.md b/CHANGELOG.md index acab402d439..c689f8f77a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed - Prevent taking from reservoir in AWS XRay Remote Sampler when there is zero capacity in `go.opentelemetry.io/contrib/samplers/aws/xray`. (#3684) +- Improved `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to optionally, off by default, use peer information as an attribute for the meter rpc.server.duration (#3536) ## [1.16.0-rc.2/0.41.0-rc.2/0.10.0-rc.2] - 2023-03-23 diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/config.go b/instrumentation/google.golang.org/grpc/otelgrpc/config.go index 1879a12e237..482da9d6aea 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/config.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/config.go @@ -43,8 +43,9 @@ type config struct { TracerProvider trace.TracerProvider MeterProvider metric.MeterProvider - meter metric.Meter - rpcServerDuration instrument.Int64Histogram + meter metric.Meter + rpcServerDuration instrument.Int64Histogram + includeRPCServerDurationPeerCtx bool } // Option applies an option value for a config. @@ -127,6 +128,20 @@ func (o meterProviderOption) apply(c *config) { } } +type includePeerAddrAsAttribute struct { + use bool +} + +// WithPeerAddrAsAttribute returns an Option to use peer details when +// recording with the meter rpc.server.duration. +func WithPeerAddrAsAttribute(peerAddrAsAttribute bool) Option { + return includePeerAddrAsAttribute{use: peerAddrAsAttribute} +} + +func (o includePeerAddrAsAttribute) apply(c *config) { + c.includeRPCServerDurationPeerCtx = o.use +} + // WithMeterProvider returns an Option to use the MeterProvider when // creating a Meter. If this option is not provide the global MeterProvider will be used. func WithMeterProvider(mp metric.MeterProvider) Option { diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go index 9601ee8d83f..9a9a1fe058c 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go @@ -321,12 +321,16 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor { ctx = extract(ctx, cfg.Propagators) - name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx)) + name, nameAttr := internal.ParseFullMethod(info.FullMethod) + peerAttrs := peerAttr(peerFromCtx(ctx)) + spanAttrs := append(nameAttr, peerAttrs...) + spanAttrs = append(spanAttrs, RPCSystemGRPC) + ctx, span := tracer.Start( trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), name, trace.WithSpanKind(trace.SpanKindServer), - trace.WithAttributes(attr...), + trace.WithAttributes(spanAttrs...), ) defer span.End() @@ -335,6 +339,10 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor { var statusCode grpc_codes.Code defer func(t time.Time) { elapsedTime := time.Since(t) / time.Millisecond + attr := append(nameAttr, RPCSystemGRPC) + if cfg.includeRPCServerDurationPeerCtx { + attr = append(attr, peerAttrs...) + } attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode))) cfg.rpcServerDuration.Record(ctx, int64(elapsedTime), attr...) }(time.Now()) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/bufconn_mock.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/bufconn_mock.go new file mode 100644 index 00000000000..a665784036d --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/bufconn_mock.go @@ -0,0 +1,102 @@ +package test + +import ( + "context" + "fmt" + "google.golang.org/grpc/test/bufconn" + "net" + "time" +) + +const ( + mockIp = "1.1.1.1" + mockPort = 1234 +) + +var ( + mockAddr, _ = net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", mockIp, mockPort)) +) + +// bufConnMock wraps a bufconn.Lister for the sake of returning a valid address. This is so we can properly test +// returning net.sock.peer.addr and net.sock.peer.port attributes +type bufConnMock struct { + listener *bufconn.Listener +} + +func NewMockBufConn(size int) *bufConnMock { + return &bufConnMock{ + listener: bufconn.Listen(size), + } +} + +func (b *bufConnMock) Accept() (net.Conn, error) { + conn, err := b.listener.Accept() + if err != nil { + return nil, err + } + + return &bufConn{ + conn: conn, + }, nil +} + +func (b *bufConnMock) Close() error { + return b.listener.Close() +} + +func (b *bufConnMock) Addr() net.Addr { + return mockAddr +} + +func (b *bufConnMock) Dial() (net.Conn, error) { + // bufConnect's listener Dial implementation just calls + // DialContext under the covers so don't wrap the connection in our mock here + return b.listener.DialContext(context.Background()) +} + +func (b *bufConnMock) DialContext(ctx context.Context) (net.Conn, error) { + conn, err := b.listener.DialContext(ctx) + if err != nil { + return nil, err + } + + return &bufConn{ + conn: conn, + }, nil +} + +type bufConn struct { + conn net.Conn +} + +func (b *bufConn) Read(bytes []byte) (n int, err error) { + return b.conn.Read(bytes) +} + +func (b *bufConn) Write(bytes []byte) (n int, err error) { + return b.conn.Write(bytes) +} + +func (b *bufConn) Close() error { + return b.conn.Close() +} + +func (b *bufConn) LocalAddr() net.Addr { + return mockAddr +} + +func (b *bufConn) RemoteAddr() net.Addr { + return mockAddr +} + +func (b *bufConn) SetDeadline(t time.Time) error { + return b.conn.SetDeadline(t) +} + +func (b *bufConn) SetReadDeadline(t time.Time) error { + return b.conn.SetReadDeadline(t) +} + +func (b *bufConn) SetWriteDeadline(t time.Time) error { + return b.conn.SetWriteDeadline(t) +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go index 6b7fb667476..a9834545165 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go @@ -26,7 +26,6 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/interop" pb "google.golang.org/grpc/interop/grpc_testing" - "google.golang.org/grpc/test/bufconn" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" @@ -40,7 +39,7 @@ import ( const bufSize = 2048 func doCalls(cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error { - l := bufconn.Listen(bufSize) + l := NewMockBufConn(bufSize) defer l.Close() s := grpc.NewServer(sOpt...) @@ -56,7 +55,7 @@ func doCalls(cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error { dial := func(context.Context, string) (net.Conn, error) { return l.Dial() } conn, err := grpc.DialContext( ctx, - "bufnet", + l.Addr().String(), append([]grpc.DialOption{ grpc.WithContextDialer(dial), grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -113,7 +112,7 @@ func TestInterceptors(t *testing.T) { t.Run("UnaryServerSpans", func(t *testing.T) { checkUnaryServerSpans(t, serverUnarySR.Ended()) - checkUnaryServerRecords(t, serverUnaryMetricReader) + checkUnaryServerRecords(t, serverUnaryMetricReader, false) }) t.Run("StreamServerSpans", func(t *testing.T) { @@ -121,6 +120,25 @@ func TestInterceptors(t *testing.T) { }) } +func TestInterceptorWithPeerAddrAsAttribute(t *testing.T) { + serverUnaryPeerAddrSR := tracetest.NewSpanRecorder() + serverUnaryPeerAddrTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverUnaryPeerAddrSR)) + serverUnaryMetricReaderPeerAddr := metric.NewManualReader() + serverUnaryMPPeerAddr := metric.NewMeterProvider(metric.WithReader(serverUnaryMetricReaderPeerAddr)) + + assert.NoError(t, doCalls(nil, + []grpc.ServerOption{ + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(serverUnaryPeerAddrTP), otelgrpc.WithMeterProvider(serverUnaryMPPeerAddr), otelgrpc.WithPeerAddrAsAttribute(true))), + }, + )) + + t.Run("UnaryServerSpansWithPeerAddr", func(t *testing.T) { + checkUnaryServerSpans(t, serverUnaryPeerAddrSR.Ended()) + checkUnaryServerRecords(t, serverUnaryMetricReaderPeerAddr, true) + }) + +} + func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { require.Len(t, spans, 2) @@ -148,6 +166,8 @@ func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, emptySpan.Attributes()) largeSpan := spans[1] @@ -176,6 +196,8 @@ func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, largeSpan.Attributes()) } @@ -222,6 +244,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, streamInput.Attributes()) streamOutput := spans[1] @@ -270,6 +294,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, streamOutput.Attributes()) pingPong := spans[2] @@ -338,6 +364,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, pingPong.Attributes()) } @@ -390,6 +418,8 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, streamInput.Attributes()) streamOutput := spans[1] @@ -438,6 +468,8 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, streamOutput.Attributes()) pingPong := spans[2] @@ -506,6 +538,8 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, pingPong.Attributes()) } @@ -536,6 +570,8 @@ func checkUnaryServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, emptySpan.Attributes()) largeSpan := spans[1] @@ -564,6 +600,8 @@ func checkUnaryServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(mockIp), + semconv.NetSockPeerPort(mockPort), }, largeSpan.Attributes()) } @@ -585,7 +623,7 @@ func assertEvents(t *testing.T, expected, actual []trace.Event) bool { return !failed } -func checkUnaryServerRecords(t *testing.T, reader metric.Reader) { +func checkUnaryServerRecords(t *testing.T, reader metric.Reader, peerInfoIncluded bool) { rm := metricdata.ResourceMetrics{} err := reader.Collect(context.Background(), &rm) assert.NoError(t, err) @@ -598,12 +636,21 @@ func checkUnaryServerRecords(t *testing.T, reader metric.Reader) { attr := dpt.Attributes.ToSlice() method := getRPCMethod(attr) assert.NotEmpty(t, method) - assert.ElementsMatch(t, []attribute.KeyValue{ + + attrs := []attribute.KeyValue{ + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), semconv.RPCMethod(method), semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, - otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), - }, attr) + } + + if peerInfoIncluded { + attrs = append(attrs, []attribute.KeyValue{ + semconv.NetSockPeerPort(mockPort), + semconv.NetSockPeerAddr(mockIp), + }...) + } + assert.ElementsMatch(t, attrs, attr) } } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go index 16a12771ed4..956bcf0f203 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go @@ -694,10 +694,22 @@ func assertServerSpan(t *testing.T, wantSpanCode codes.Code, wantSpanStatusDescr } // TestUnaryServerInterceptor tests the server interceptor for unary RPCs. -func TestUnaryServerInterceptor(t *testing.T) { +func TestUnaryServerInterceptorWithPeerAddrAsAttribute(t *testing.T) { sr := tracetest.NewSpanRecorder() tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr)) - usi := otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp)) + usi := otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp), otelgrpc.WithPeerAddrAsAttribute(true)) + runServerChecks(t, usi, sr) +} + +// TestUnaryServerInterceptor tests the server interceptor for unary RPCs. +func TestUnaryServerInterceptorWithoutPeerAddrAsAttribute(t *testing.T) { + sr := tracetest.NewSpanRecorder() + tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr)) + usi := otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp), otelgrpc.WithPeerAddrAsAttribute(false)) + runServerChecks(t, usi, sr) +} + +func runServerChecks(t *testing.T, usi grpc.UnaryServerInterceptor, sr *tracetest.SpanRecorder) { for _, check := range serverChecks { name := check.grpcCode.String() t.Run(name, func(t *testing.T) {