diff --git a/exporters/otlp/otlplog/otlploggrpc/client.go b/exporters/otlp/otlplog/otlploggrpc/client.go index 4f885bd417e..98fc3c0a239 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client.go +++ b/exporters/otlp/otlplog/otlploggrpc/client.go @@ -4,6 +4,8 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" import ( + "context" + "fmt" "time" "google.golang.org/genproto/googleapis/rpc/errdetails" @@ -16,8 +18,10 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + logpb "go.opentelemetry.io/proto/otlp/logs/v1" ) // The methods of this type are not expected to be called concurrently. @@ -107,6 +111,99 @@ func newGRPCDialOptions(cfg config) []grpc.DialOption { return dialOpts } +// UploadLogs sends proto logs to connected endpoint. +// +// Retryable errors from the server will be handled according to any +// RetryConfig the client was created with. +// +// The otlplog.Exporter synchronizes access to client methods, and +// ensures this is not called after the Exporter is shutdown. Only thing +// to do here is send data. +func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error { + select { + case <-ctx.Done(): + // Do not upload if the context is already expired. + return ctx.Err() + default: + } + + ctx, cancel := c.exportContext(ctx) + defer cancel() + + return c.requestFunc(ctx, func(ctx context.Context) error { + resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{ + ResourceLogs: rl, + }) + if resp != nil && resp.PartialSuccess != nil { + msg := resp.PartialSuccess.GetErrorMessage() + n := resp.PartialSuccess.GetRejectedLogRecords() + if n != 0 || msg != "" { + err := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n) + otel.Handle(err) + } + } + // nil is converted to OK. + if status.Code(err) == codes.OK { + // Success. + return nil + } + return err + }) +} + +// Shutdown shuts down the client, freeing all resources. +// +// Any active connections to a remote endpoint are closed if they were created +// by the client. Any gRPC connection passed during creation using +// WithGRPCConn will not be closed. It is the caller's responsibility to +// handle cleanup of that resource. +// +// The otlplog.Exporter synchronizes access to client methods and +// ensures this is called only once. The only thing that needs to be done +// here is to release any computational resources the client holds. +func (c *client) Shutdown(ctx context.Context) error { + c.metadata = nil + c.requestFunc = nil + c.lsc = nil + + // Release the connection if we created it. + err := ctx.Err() + if c.ourConn { + closeErr := c.conn.Close() + // A context timeout error takes precedence over this error. + if err == nil && closeErr != nil { + err = closeErr + } + } + c.conn = nil + return err +} + +// exportContext returns a copy of parent with an appropriate deadline and +// cancellation function based on the clients configured export timeout. +// +// It is the callers responsibility to cancel the returned context once its +// use is complete, via the parent or directly with the returned CancelFunc, to +// ensure all resources are correctly released. +func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) { + var ( + ctx context.Context + cancel context.CancelFunc + ) + + if c.exportTimeout > 0 { + ctx, cancel = context.WithTimeout(parent, c.exportTimeout) + } else { + ctx, cancel = context.WithCancel(parent) + } + + if c.metadata.Len() > 0 { + ctx = metadata.NewOutgoingContext(ctx, c.metadata) + } + + return ctx, cancel +} + // retryable returns if err identifies a request that can be retried and a // duration to wait for if an explicit throttle time is included in err. func retryable(err error) (bool, time.Duration) { diff --git a/exporters/otlp/otlplog/otlploggrpc/client_test.go b/exporters/otlp/otlplog/otlploggrpc/client_test.go index f3330dc4fbb..1fa69b94fa4 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/client_test.go @@ -4,21 +4,140 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" import ( + "context" + "fmt" + "net" + "sync" "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" + "go.opentelemetry.io/otel" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + cpb "go.opentelemetry.io/proto/otlp/common/v1" + lpb "go.opentelemetry.io/proto/otlp/logs/v1" + rpb "go.opentelemetry.io/proto/otlp/resource/v1" +) - "github.com/stretchr/testify/assert" +var ( + // Sat Jan 01 2000 00:00:00 GMT+0000. + ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0)) + obs = ts.Add(30 * time.Second) + + kvAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, + }} + kvBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, + }} + kvSrvName = &cpb.KeyValue{Key: "service.name", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, + }} + kvSrvVer = &cpb.KeyValue{Key: "service.version", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, + }} + + pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO + pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR + + pbBodyA = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: "a", + }, + } + pbBodyB = &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{ + StringValue: "b", + }, + } + + spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1} + spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2} + traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} + flagsA = byte(1) + flagsB = byte(0) + + logRecords = []*lpb.LogRecord{ + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevA, + SeverityText: "A", + Body: pbBodyA, + Attributes: []*cpb.KeyValue{kvAlice}, + Flags: uint32(flagsA), + TraceId: traceIDA, + SpanId: spanIDA, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevA, + SeverityText: "A", + Body: pbBodyA, + Attributes: []*cpb.KeyValue{kvBob}, + Flags: uint32(flagsA), + TraceId: traceIDA, + SpanId: spanIDA, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevB, + SeverityText: "B", + Body: pbBodyB, + Attributes: []*cpb.KeyValue{kvAlice}, + Flags: uint32(flagsB), + TraceId: traceIDB, + SpanId: spanIDB, + }, + { + TimeUnixNano: uint64(ts.UnixNano()), + ObservedTimeUnixNano: uint64(obs.UnixNano()), + SeverityNumber: pbSevB, + SeverityText: "B", + Body: pbBodyB, + Attributes: []*cpb.KeyValue{kvBob}, + Flags: uint32(flagsB), + TraceId: traceIDB, + SpanId: spanIDB, + }, + } + + scope = &cpb.InstrumentationScope{ + Name: "test/code/path", + Version: "v0.1.0", + } + scopeLogs = []*lpb.ScopeLogs{ + { + Scope: scope, + LogRecords: logRecords, + SchemaUrl: semconv.SchemaURL, + }, + } + + res = &rpb.Resource{ + Attributes: []*cpb.KeyValue{kvSrvName, kvSrvVer}, + } + resourceLogs = []*lpb.ResourceLogs{{ + Resource: res, + ScopeLogs: scopeLogs, + SchemaUrl: semconv.SchemaURL, + }} ) func TestThrottleDelay(t *testing.T) { @@ -225,3 +344,219 @@ func TestNewClient(t *testing.T) { }) } } + +type exportResult struct { + Response *collogpb.ExportLogsServiceResponse + Err error +} + +// storage stores uploaded OTLP log data in their proto form. +type storage struct { + dataMu sync.Mutex + data []*lpb.ResourceLogs +} + +// newStorage returns a configure storage ready to store received requests. +func newStorage() *storage { + return &storage{} +} + +// Add adds the request to the Storage. +func (s *storage) Add(request *collogpb.ExportLogsServiceRequest) { + s.dataMu.Lock() + defer s.dataMu.Unlock() + s.data = append(s.data, request.ResourceLogs...) +} + +// Dump returns all added ResourceLogs and clears the storage. +func (s *storage) Dump() []*lpb.ResourceLogs { + s.dataMu.Lock() + defer s.dataMu.Unlock() + + var data []*lpb.ResourceLogs + data, s.data = s.data, []*lpb.ResourceLogs{} + return data +} + +// grpcCollector is an OTLP gRPC server that collects all requests it receives. +type grpcCollector struct { + collogpb.UnimplementedLogsServiceServer + + headersMu sync.Mutex + headers metadata.MD + storage *storage + + resultCh <-chan exportResult + listener net.Listener + srv *grpc.Server +} + +var _ collogpb.LogsServiceServer = (*grpcCollector)(nil) + +// newGRPCCollector returns a *grpcCollector that is listening at the provided +// endpoint. +// +// If endpoint is an empty string, the returned collector will be listening on +// the localhost interface at an OS chosen port. +// +// If errCh is not nil, the collector will respond to Export calls with errors +// sent on that channel. This means that if errCh is not nil Export calls will +// block until an error is received. +func newGRPCCollector(endpoint string, resultCh <-chan exportResult) (*grpcCollector, error) { + if endpoint == "" { + endpoint = "localhost:0" + } + + c := &grpcCollector{ + storage: newStorage(), + resultCh: resultCh, + } + + var err error + c.listener, err = net.Listen("tcp", endpoint) + if err != nil { + return nil, err + } + + c.srv = grpc.NewServer() + collogpb.RegisterLogsServiceServer(c.srv, c) + go func() { _ = c.srv.Serve(c.listener) }() + + return c, nil +} + +// Export handles the export req. +func (c *grpcCollector) Export(ctx context.Context, req *collogpb.ExportLogsServiceRequest) (*collogpb.ExportLogsServiceResponse, error) { + c.storage.Add(req) + + if h, ok := metadata.FromIncomingContext(ctx); ok { + c.headersMu.Lock() + c.headers = metadata.Join(c.headers, h) + c.headersMu.Unlock() + } + + if c.resultCh != nil { + r := <-c.resultCh + if r.Response == nil { + return &collogpb.ExportLogsServiceResponse{}, r.Err + } + return r.Response, r.Err + } + return &collogpb.ExportLogsServiceResponse{}, nil +} + +// Collect returns the Storage holding all collected requests. +func (c *grpcCollector) Collect() *storage { + return c.storage +} + +func TestClient(t *testing.T) { + factory := func(rCh <-chan exportResult) (*client, *grpcCollector) { + coll, err := newGRPCCollector("", rCh) + require.NoError(t, err) + + addr := coll.listener.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := newConfig(opts) + client, err := newClient(cfg) + require.NoError(t, err) + return client, coll + } + + t.Run("ClientHonorsContextErrors", func(t *testing.T) { + testCtxErrs := func(factory func() func(context.Context) error) func(t *testing.T) { + return func(t *testing.T) { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + t.Run("DeadlineExceeded", func(t *testing.T) { + innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond) + t.Cleanup(innerCancel) + <-innerCtx.Done() + + f := factory() + assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded) + }) + + t.Run("Canceled", func(t *testing.T) { + innerCtx, innerCancel := context.WithCancel(ctx) + innerCancel() + + f := factory() + assert.ErrorIs(t, f(innerCtx), context.Canceled) + }) + } + } + + t.Run("Shutdown", testCtxErrs(func() func(context.Context) error { + c, _ := factory(nil) + return c.Shutdown + })) + + t.Run("UploadLog", testCtxErrs(func() func(context.Context) error { + c, _ := factory(nil) + return func(ctx context.Context) error { + return c.UploadLogs(ctx, nil) + } + })) + + t.Run("UploadLogs", func(t *testing.T) { + ctx := context.Background() + client, coll := factory(nil) + + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.Shutdown(ctx)) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") + diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal)) + if diff != "" { + t.Fatalf("unexpected ResourceLogs:\n%s", diff) + } + }) + + t.Run("PartialSuccess", func(t *testing.T) { + const n, msg = 2, "bad data" + rCh := make(chan exportResult, 3) + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + RejectedLogRecords: n, + ErrorMessage: msg, + }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + // Should not be logged. + RejectedLogRecords: 0, + ErrorMessage: "", + }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{}, + } + + ctx := context.Background() + client, _ := factory(rCh) + + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) + + var errs []error + eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) }) + otel.SetErrorHandler(eh) + + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + + require.Equal(t, 1, len(errs)) + want := fmt.Sprintf("%s (%d log records rejected)", msg, n) + assert.ErrorContains(t, errs[0], want) + }) + }) +} diff --git a/exporters/otlp/otlplog/otlploggrpc/go.mod b/exporters/otlp/otlplog/otlploggrpc/go.mod index 3995a051b60..b1e7bb9d5ab 100644 --- a/exporters/otlp/otlplog/otlploggrpc/go.mod +++ b/exporters/otlp/otlplog/otlploggrpc/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/cenkalti/backoff/v4 v4.3.0 + github.com/google/go-cmp v0.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/log v0.4.0