Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add newClient method for otlploggrpc gRPC client #5523

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 135 additions & 3 deletions exporters/otlp/otlplog/otlploggrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,144 @@

package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"

import (
"time"

"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
)

// The methods of this type are not expected to be called concurrently.
type client struct {
// TODO: implement.
metadata metadata.MD
exportTimeout time.Duration
requestFunc retry.RequestFunc

// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as conn should only be closed if we created it. Otherwise,
// it is up to the processes that passed conn to close it.
ourConn bool
conn *grpc.ClientConn
lsc collogpb.LogsServiceClient
}

// Used for testing.
var newGRPCClientFn = grpc.NewClient

// newClient creates a new gRPC log client.
func newClient(cfg config) (*client, error) {
// TODO: implement.
return &client{}, nil
c := &client{
exportTimeout: cfg.timeout.Value,
requestFunc: cfg.retryCfg.Value.RequestFunc(retryable),
conn: cfg.gRPCConn.Value,
}

if len(cfg.headers.Value) > 0 {
c.metadata = metadata.New(cfg.headers.Value)
}

if c.conn == nil {
// If the caller did not provide a ClientConn when the client was
// created, create one using the configuration they did provide.
dialOpts := newGRPCDialOptions(cfg)

conn, err := newGRPCClientFn(cfg.endpoint.Value, dialOpts...)
if err != nil {
return nil, err

Check warning on line 60 in exporters/otlp/otlplog/otlploggrpc/client.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlplog/otlploggrpc/client.go#L60

Added line #L60 was not covered by tests
}
// Keep track that we own the lifecycle of this conn and need to close
// it on Shutdown.
c.ourConn = true
c.conn = conn
}

c.lsc = collogpb.NewLogsServiceClient(c.conn)

return c, nil
}

func newGRPCDialOptions(cfg config) []grpc.DialOption {
userAgent := "OTel Go OTLP over gRPC logs exporter/" + Version()
dialOpts := []grpc.DialOption{grpc.WithUserAgent(userAgent)}
dialOpts = append(dialOpts, cfg.dialOptions.Value...)

// Convert other grpc configs to the dial options.
// Service config
if cfg.serviceConfig.Value != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(cfg.serviceConfig.Value))
}
// Prioritize GRPCCredentials over Insecure (passing both is an error).
if cfg.gRPCCredentials.Value != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(cfg.gRPCCredentials.Value))
} else if cfg.insecure.Value {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))

Check warning on line 87 in exporters/otlp/otlplog/otlploggrpc/client.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlplog/otlploggrpc/client.go#L87

Added line #L87 was not covered by tests
} else {
// Default to using the host's root CA.
dialOpts = append(dialOpts, grpc.WithTransportCredentials(
credentials.NewTLS(nil),
))
}
// Compression
if cfg.compression.Value == GzipCompression {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
}
// Reconnection period
if cfg.reconnectionPeriod.Value != 0 {
p := grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
MinConnectTimeout: cfg.reconnectionPeriod.Value,
}
dialOpts = append(dialOpts, grpc.WithConnectParams(p))
}

return dialOpts
}

// retryable returns if err identifies a request that can be retried and a
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}

func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// Additionally, handle RetryInfo.
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

// throttleDelay returns if the status is RetryInfo
// and the duration to wait for if an explicit throttle time is included.
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return true, t.RetryDelay.AsDuration()
}
}
return false, 0
}
227 changes: 227 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"

"github.com/stretchr/testify/assert"
)

func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "SingleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Millisecond),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
s, err := status.New(c, "ErrorInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
)
require.NoError(t, err)
return s
}(),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "ErrorAndRetryInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "with throttle detail"},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
s, err := status.New(c, "DoubleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Minute),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 13 * time.Minute,
},
}

for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}

func TestRetryable(t *testing.T) {
retryableCodes := map[codes.Code]bool{
codes.OK: false,
codes.Canceled: true,
codes.Unknown: false,
codes.InvalidArgument: false,
codes.DeadlineExceeded: true,
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
codes.Unimplemented: false,
codes.Internal: false,
codes.Unavailable: true,
codes.DataLoss: true,
codes.Unauthenticated: false,
}

for c, want := range retryableCodes {
got, _ := retryable(status.Error(c, ""))
assert.Equalf(t, want, got, "evaluate(%s)", c)
}
}

func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)

ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}

func TestNewClient(t *testing.T) {
newGRPCClientFnSwap := newGRPCClientFn
t.Cleanup(func() {
newGRPCClientFn = newGRPCClientFnSwap
})

// The gRPC connection created by newClient.
conn, err := grpc.NewClient("test", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
newGRPCClientFn = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return conn, nil
}

// The gRPC connection created by users.
userConn, err := grpc.NewClient("test 2", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

testCases := []struct {
name string
cfg config
cli *client
}{
{
name: "empty config",
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
},
},
{
name: "with headers",
cfg: config{
headers: newSetting(map[string]string{
"key": "value",
}),
},
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
metadata: map[string][]string{"key": {"value"}},
},
},
{
name: "with gRPC connection",
cfg: config{
gRPCConn: newSetting(userConn),
},
cli: &client{
ourConn: false,
conn: userConn,
lsc: collogpb.NewLogsServiceClient(userConn),
},
},
{
// It is not possible to compare grpc dial options directly, so we just check that the client is created
// and no panic occurs.
name: "with dial options",
cfg: config{
serviceConfig: newSetting("service config"),
gRPCCredentials: newSetting(credentials.NewTLS(nil)),
compression: newSetting(GzipCompression),
reconnectionPeriod: newSetting(10 * time.Second),
},
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cli, err := newClient(tc.cfg)
require.NoError(t, err)

assert.Equal(t, tc.cli.metadata, cli.metadata)
assert.Equal(t, tc.cli.exportTimeout, cli.exportTimeout)
assert.Equal(t, tc.cli.ourConn, cli.ourConn)
assert.Equal(t, tc.cli.conn, cli.conn)
assert.Equal(t, tc.cli.lsc, cli.lsc)
})
}
}
Loading