diff --git a/README.md b/README.md index 906059d3..884fcf37 100644 --- a/README.md +++ b/README.md @@ -8,13 +8,13 @@ as the end-to-end performance between telemetry data producers and receivers. **This package is still experimental and subject to change.** It is currently used by an [experimental OTLP/Arrow gRPC exporter and receiver](https://github.com/open-telemetry/experimental-arrow-collector). -Other important links: -- [OTEP/Specification](https://github.com/lquerel/oteps/blob/main/text/0156-columnar-encoding.md) describing the - motivations, the protocol, the schemas, the benchmark results and the different phases of this project. The OTEP is - still [pending, unmerged](https://github.com/open-telemetry/oteps/pull/171). -- [Donation proposal](https://github.com/open-telemetry/community/issues/1332) - approved, but repo not yet transferred in OTel org. -- [Arrow schemas](docs/data_model.md) used by this package. -- [Project Roadmap](https://github.com/f5/otel-arrow-adapter/milestones?direction=asc&sort=due_date&state=open) and [project Board](https://github.com/orgs/f5/projects/1/views/2) describing the current state of the project. +Important links: +- [OTEP](https://github.com/lquerel/oteps/blob/main/text/0156-columnar-encoding.md) - protocol specification + (status: [pending, unmerged](https://github.com/open-telemetry/oteps/pull/171)). +- [Donation](https://github.com/open-telemetry/community/issues/1332) - approved, but repo not yet transferred in OTel org. +- [Arrow Data Model](docs/data_model.md) - Mapping OTLP entities to Arrow Schemas. +- [Benchmark results](docs/benchmarks.md) - Based on synthetic and production data. +- [Validation process](docs/validation_process.md) - Encoding/Decoding validation process. - [Slides](https://docs.google.com/presentation/d/12uLXmMWNelAyAiKFYMR0i7E7N4dPhzBi2_HLshFOLak/edit?usp=sharing) (01/30/2023 Maintainers meeting). ## Benchmark summary diff --git a/docs/validation_process.md b/docs/validation_process.md new file mode 100644 index 00000000..fef15020 --- /dev/null +++ b/docs/validation_process.md @@ -0,0 +1,45 @@ +# Encoding/Decoding Validation Process + +This document describes the validation process employed to confirm the correctness +and resilience of encoding and decoding process of OTLP entities to/from OTel Arrow +entities. The OTel Arrow protocol is designed to enhance the transport efficiency +of ALL forms of telemetry data, including metrics, logs, and traces, in a seamless +manner. The complexity and dynamic nature of the OTLP entities make it challenging +to validate the encoding/decoding process with traditional methods (traditional +QA process, or unit tests). The approach followed by this project offers a more +systematic and robust method. + +## Automatic generation of metrics, logs, and traces + +In this step we generate a large amount of random metrics, logs, and traces having +different characteristics in terms of shape, size, and content. The goal is to +test the capacity of the encoding process to handle a wide range of diversity in +the data. + +Mandatory fields will not be generated systematically in order to test the +robustness of the encoding/decoding process (the original Protobuf encoding +doesn't enforce mandatory fields). + +## Generic comparison of OTLP entities before and after encoding/decoding + +OTLP entities are serialized in their JSON representation before encoding and +after decoding. These two JSON representations must be logically equivalent. +This comparison is implemented by a dedicated function that recursively traverses +the 2 JSON trees and compares the values of the fields taking into account that +the fields are not ordered and that the batching process may have changed the +internal organization of the data. + +## Decoding of invalid data + +A generic process has been implemented to inject specific errors and data changes +in the encoded data. For example, the existing process keep the Arrow records +but randomly change their payload types. The goal is to test the resilience of +the decoding process to invalid data. The decoding layer must be able to handle +any invalid data and return appropriate error messages without crashing. + +## Capturing and Replaying production data + +A new version of the OTel file exporter has been implemented to capture OTLP +traffic in a generic JSON format (with ZSTD compression). A set of tools have +been developed to replay this data, convert it to OTel Arrow, validate the +encoding/decoding process, and assess the compression and end-to-end performance. \ No newline at end of file diff --git a/pkg/arrow/errors.go b/pkg/arrow/errors.go index a0ed7a66..e75770ec 100644 --- a/pkg/arrow/errors.go +++ b/pkg/arrow/errors.go @@ -25,7 +25,8 @@ var ( // ErrNotStructType is returned when an array is not of type Struct. ErrNotStructType = errors.New("not arrow.StructType") - // ErrNotListOfStructsType is returned when an array is not of type List of structs. + // ErrNotListOfStructsType is returned when an array is not of type List of + // structs. ErrNotListOfStructsType = errors.New("not arrow.ListType of arrow.StructType") // ErrNotListType is returned when an array is not of type list. ErrNotListType = errors.New("not an arrow.ListType") @@ -34,9 +35,14 @@ var ( ErrNotArrayStruct = errors.New("not an arrow array.Struct") // ErrNotArrayList is returned when an array is not an array.List. ErrNotArrayList = errors.New("not an arrow array.List") - // ErrNotArrayListOfStructs is returned when an array is not an array.List of array.Struct. + // ErrNotArrayListOfStructs is returned when an array is not an array.List + // of array.Struct. ErrNotArrayListOfStructs = errors.New("not an Arrow array.List of array.Struct") - // ErrDuplicateFieldName is returned when a field name is duplicated in the same struct. + // ErrDuplicateFieldName is returned when a field name is duplicated in the + // same struct. ErrDuplicateFieldName = errors.New("duplicate field name") + + // ErrMissingFieldName is returned when a field name is missing in a struct. + ErrMissingFieldName = errors.New("missing field name") ) diff --git a/pkg/arrow/from_schema.go b/pkg/arrow/from_schema.go index 278296c5..4e96cfa0 100644 --- a/pkg/arrow/from_schema.go +++ b/pkg/arrow/from_schema.go @@ -40,6 +40,19 @@ func FieldIDFromSchema(schema *arrow.Schema, fieldName string) (int, error) { return ids[0], nil } +// MandatoryFieldIDFromSchema returns the field id of a field from an Arrow +// schema or an error if the field is not present or duplicated. +func MandatoryFieldIDFromSchema(schema *arrow.Schema, fieldName string) (int, error) { + ids := schema.FieldIndices(fieldName) + if len(ids) == 0 { + return 0, werror.WrapWithContext(ErrMissingFieldName, map[string]interface{}{"fieldName": fieldName}) + } + if len(ids) > 1 { + return 0, werror.WrapWithContext(ErrDuplicateFieldName, map[string]interface{}{"fieldName": fieldName}) + } + return ids[0], nil +} + // StructFieldIDFromSchema returns the field id of a struct // field from an Arrow schema or AbsentFieldID for an unknown field. // diff --git a/pkg/datagen/default.go b/pkg/datagen/default.go index 2b36701e..55c14779 100644 --- a/pkg/datagen/default.go +++ b/pkg/datagen/default.go @@ -15,6 +15,9 @@ package datagen import ( + "fmt" + + "github.com/brianvoe/gofakeit/v6" "go.opentelemetry.io/collector/pdata/pcommon" ) @@ -42,6 +45,42 @@ func (te TestEntropy) shuffleAttrs(fs ...func(Attrs)) pcommon.Map { return attrs } +// RandomAttributes returns a random set of attributes. The number of attributes +// is random [0,10). The type and value of each attribute is also random. +func (te TestEntropy) RandomAttributes() pcommon.Map { + count := te.rng.Intn(10) + + attrs := pcommon.NewMap() + + for i := 0; i < count; i++ { + switch te.rng.Intn(3) { + case 0: + attrs.PutStr(fmt.Sprintf("attr_%d", i), gofakeit.LoremIpsumWord()) + case 1: + attrs.PutInt(fmt.Sprintf("attr_%d", i), te.rng.Int63()) + case 2: + attrs.PutDouble(fmt.Sprintf("attr_%d", i), te.rng.Float64()) + case 3: + attrs.PutBool(fmt.Sprintf("attr_%d", i), te.rng.Intn(2) == 0) + case 4: + attrs.PutEmpty(fmt.Sprintf("attr_%d", i)) + case 5: + attrs.PutEmptyBytes(fmt.Sprintf("attr_%d", i)).FromRaw([]byte(gofakeit.LoremIpsumWord())) + case 6: + vMap := attrs.PutEmptyMap(fmt.Sprintf("attr_%d", i)) + vMap.PutInt("int", te.rng.Int63()) + vMap.PutStr("str", gofakeit.LoremIpsumWord()) + case 7: + vSlice := attrs.PutEmptySlice(fmt.Sprintf("attr_%d", i)) + vSlice.AppendEmpty().SetBool(te.rng.Intn(2) == 0) + vSlice.AppendEmpty().SetDouble(te.rng.Float64()) + vSlice.AppendEmpty().SetInt(te.rng.Int63()) + } + } + + return attrs +} + func (te TestEntropy) NewStandardAttributes() pcommon.Map { return te.shuffleAttrs( func(attrs Attrs) { attrs.PutStr("hostname", pick(te, HOSTNAMES)) }, diff --git a/pkg/datagen/logs.go b/pkg/datagen/logs.go index 19d25766..4d7e20f8 100644 --- a/pkg/datagen/logs.go +++ b/pkg/datagen/logs.go @@ -59,6 +59,7 @@ func (lg *LogsGenerator) Generate(batchSize int, collectInterval time.Duration) lg.LogWarnRecord(logRecords.AppendEmpty()) lg.LogErrorRecord(logRecords.AppendEmpty()) //lg.LogInfoComplexRecord(logRecords.AppendEmpty()) + lg.RandomLogRecord(logRecords.AppendEmpty()) } return result @@ -109,3 +110,82 @@ func (dg *DataGenerator) complexLogRecord(log plog.LogRecord, sev plog.SeverityN log.SetTraceID(dg.Id16Bytes()) log.SetSpanID(dg.Id8Bytes()) } + +// RandomLogRecord generates a random log record. The list of fields set is random. +// The value of these fields is also random. +func (dg *DataGenerator) RandomLogRecord(log plog.LogRecord) { + if dg.GenBool() { + log.SetTimestamp(dg.CurrentTime()) + } + if dg.GenBool() { + log.SetObservedTimestamp(dg.CurrentTime()) + } + if dg.GenBool() { + log.SetSeverityNumber(plog.SeverityNumber(gofakeit.Number(0, 4))) + } + if dg.GenBool() { + log.SetSeverityText(gofakeit.LetterN(4)) + } + if dg.GenBool() { + dg.RandomBody(log.Body()) + } + if dg.GenBool() { + dg.RandomAttributes().CopyTo(log.Attributes()) + } + if dg.GenBool() { + log.SetTraceID(dg.Id16Bytes()) + } + if dg.GenBool() { + log.SetSpanID(dg.Id8Bytes()) + } + if dg.GenBool() { + log.SetDroppedAttributesCount(uint32(gofakeit.Number(0, 1000))) + } + if dg.GenBool() { + log.SetFlags(plog.LogRecordFlags(gofakeit.Number(0, 1000))) + } +} + +func (dg *DataGenerator) RandomBody(body pcommon.Value) { + switch dg.GenI64Range(0, 11) { + case 0: + // Body with a random string + body.SetStr(gofakeit.LoremIpsumSentence(20)) + case 1: + // Body an empty string + body.SetStr("") + case 2: + // Empty body + case 3: + // Body with a random int + body.SetInt(gofakeit.Int64()) + case 4: + // Body with a random double + body.SetDouble(gofakeit.Float64()) + case 5: + // Body with a random bool + body.SetBool(gofakeit.Bool()) + case 6: + // Body with a slice of random bytes + body.SetEmptyBytes().FromRaw(dg.GenId(10)) + case 7: + // Body with an empty slice of bytes + body.SetEmptyBytes() + case 8: + // Body with a random map + bodyMap := body.SetEmptyMap() + bodyMap.PutStr("attr1", gofakeit.LoremIpsumSentence(10)) + bodyMap.PutInt("attr2", 1) + case 9: + // Body with an empty map + body.SetEmptyMap() + case 10: + // Body with a random slice + bodySlice := body.SetEmptySlice() + bodySlice.AppendEmpty().SetStr(gofakeit.LoremIpsumSentence(10)) + bodySlice.AppendEmpty().SetInt(gofakeit.Int64()) + case 11: + // Body with an empty slice + body.SetEmptySlice() + } +} diff --git a/pkg/datagen/trace.go b/pkg/datagen/trace.go index 2c3ad101..87dfad56 100644 --- a/pkg/datagen/trace.go +++ b/pkg/datagen/trace.go @@ -17,6 +17,7 @@ package datagen import ( "time" + "github.com/brianvoe/gofakeit/v6" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -67,6 +68,27 @@ func (tg *TraceGenerator) Generate(batchSize int, collectInterval time.Duration) return result } +func (tg *TraceGenerator) GenerateRandomTraces(batchSize int, collectInterval time.Duration) ptrace.Traces { + result := ptrace.NewTraces() + + resourceSpans := result.ResourceSpans().AppendEmpty() + pick(tg.TestEntropy, tg.resourceAttributes).CopyTo(resourceSpans.Resource().Attributes()) + + scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() + pick(tg.TestEntropy, tg.instrumentationScopes).CopyTo(scopeSpans.Scope()) + + resourceSpans.SetSchemaUrl("https://opentelemetry.io/schemas/1.0.0") + + spans := scopeSpans.Spans() + + for i := 0; i < batchSize; i++ { + tg.AdvanceTime(collectInterval) + tg.AddRandomSpansTo(spans) + } + + return result +} + func (dg *DataGenerator) Spans(spans ptrace.SpanSlice) { dg.NextId16Bytes() traceId := dg.Id16Bytes() @@ -131,6 +153,68 @@ func (dg *DataGenerator) Spans(spans ptrace.SpanSlice) { ) } +func (dg *DataGenerator) AddRandomSpansTo(spans ptrace.SpanSlice) { + dg.NextId16Bytes() + traceId := dg.Id16Bytes() + + dg.NextId8Bytes() + rootSpanId := dg.Id8Bytes() + rootStartTime := dg.CurrentTime() + rootEndTime := dg.CurrentTime() + 1 + pcommon.Timestamp(dg.rng.Intn(6)) + + dg.AdvanceTime(time.Duration(dg.rng.Intn(10))) + + span := spans.AppendEmpty() + if dg.GenBool() { + span.SetTraceID(traceId) + } + if dg.GenBool() { + span.SetSpanID(rootSpanId) + } + if dg.GenBool() { + span.SetParentSpanID(rootSpanId) + } + if dg.GenBool() { + span.TraceState().FromRaw(gofakeit.LoremIpsumWord()) + } + if dg.GenBool() { + span.SetName("GET /user-info") + } + if dg.GenBool() { + span.SetStartTimestamp(rootStartTime) + } + if dg.GenBool() { + span.SetEndTimestamp(rootEndTime) + } + if dg.GenBool() { + span.SetKind(ptrace.SpanKindServer) + } + if dg.GenBool() { + dg.RandomAttributes().CopyTo(span.Attributes()) + } + if dg.GenBool() { + span.SetDroppedAttributesCount(uint32(dg.rng.Intn(10))) + } + if dg.GenBool() { + dg.RandomEvents(span.Events()) + } + if dg.GenBool() { + span.SetDroppedEventsCount(uint32(dg.rng.Intn(10))) + } + if dg.GenBool() { + dg.RandomLinks(span.Links(), traceId, rootSpanId) + } + if dg.GenBool() { + span.SetDroppedLinksCount(uint32(dg.rng.Intn(10))) + } + if dg.GenBool() { + span.Status().SetCode(ptrace.StatusCodeOk) + } + if dg.GenBool() { + span.Status().SetMessage("OK") + } +} + // events returns a slice of events for the span. func (dg *DataGenerator) events(ses ptrace.SpanEventSlice) { eventCount := dg.rng.Intn(8) + 2 @@ -148,6 +232,27 @@ func (dg *DataGenerator) events(ses ptrace.SpanEventSlice) { } } +func (dg *DataGenerator) RandomEvents(ses ptrace.SpanEventSlice) { + eventCount := dg.rng.Intn(10) + + for i := 0; i < eventCount; i++ { + event := ses.AppendEmpty() + if dg.GenBool() { + event.SetTimestamp(dg.CurrentTime() + pcommon.Timestamp(dg.rng.Intn(5))) + } + if dg.GenBool() { + event.SetName(gofakeit.LoremIpsumWord()) + } + if dg.GenBool() { + attributes := dg.RandomAttributes() + attributes.CopyTo(event.Attributes()) + } + if dg.GenBool() { + event.SetDroppedAttributesCount(uint32(dg.rng.Intn(10))) + } + } +} + // links returns a slice of links for the span. func (dg *DataGenerator) links(sls ptrace.SpanLinkSlice, traceID pcommon.TraceID, spanID pcommon.SpanID) { linkCount := dg.rng.Intn(8) + 2 @@ -160,3 +265,27 @@ func (dg *DataGenerator) links(sls ptrace.SpanLinkSlice, traceID pcommon.TraceID dg.NewStandardSpanLinkAttributes().CopyTo(sl.Attributes()) } } + +func (dg *DataGenerator) RandomLinks(sls ptrace.SpanLinkSlice, traceID pcommon.TraceID, spanID pcommon.SpanID) { + linkCount := dg.rng.Intn(10) + + for i := 0; i < linkCount; i++ { + sl := sls.AppendEmpty() + if dg.GenBool() { + sl.SetTraceID(traceID) + } + if dg.GenBool() { + sl.SetSpanID(spanID) + } + if dg.GenBool() { + sl.TraceState().FromRaw(gofakeit.LoremIpsumWord()) + } + if dg.GenBool() { + attributes := dg.RandomAttributes() + attributes.CopyTo(sl.Attributes()) + } + if dg.GenBool() { + sl.SetDroppedAttributesCount(uint32(dg.rng.Intn(10))) + } + } +} diff --git a/pkg/otel/common/arrow/attributes.go b/pkg/otel/common/arrow/attributes.go index 98d4c43a..bfa395dd 100644 --- a/pkg/otel/common/arrow/attributes.go +++ b/pkg/otel/common/arrow/attributes.go @@ -26,7 +26,6 @@ import ( "github.com/apache/arrow/go/v12/arrow/array" "go.opentelemetry.io/collector/pdata/pcommon" - "github.com/f5/otel-arrow-adapter/pkg/otel/common" "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema" "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/builder" "github.com/f5/otel-arrow-adapter/pkg/werror" @@ -180,53 +179,6 @@ func (b *AttributesBuilder) Append(attrs pcommon.Map) error { }) } -func (b *AttributesBuilder) AppendUniqueAttributes(attrs pcommon.Map, smattrs *common.SharedAttributes, mattrs *common.SharedAttributes) error { - if b.released { - return werror.Wrap(ErrBuilderAlreadyReleased) - } - - uniqueAttrsCount := attrs.Len() - if smattrs != nil { - uniqueAttrsCount -= smattrs.Len() - } - if mattrs != nil { - uniqueAttrsCount -= mattrs.Len() - } - - return b.builder.Append(uniqueAttrsCount, func() error { - var err error - - attrs.Range(func(key string, v pcommon.Value) bool { - if key == "" { - // Skip entries with empty keys - return true - } - - // Skip the current attribute if it is a scope metric shared attribute - // or a metric shared attribute - smattrsFound := false - mattrsFound := false - if smattrs != nil { - _, smattrsFound = smattrs.Attributes[key] - } - if mattrs != nil { - _, mattrsFound = mattrs.Attributes[key] - } - if smattrsFound || mattrsFound { - return true - } - - b.kb.AppendNonEmpty(key) - err = werror.WrapWithContext(b.ib.Append(&v), map[string]interface{}{"key": key, "value": v}) - - uniqueAttrsCount-- - return err == nil && uniqueAttrsCount > 0 - }) - - return err - }) -} - // Release releases the memory allocated by the builder. func (b *AttributesBuilder) Release() { if !b.released { @@ -303,60 +255,6 @@ func (c *Attributes16Accumulator) AppendWithID(parentID uint16, attrs pcommon.Ma return nil } -// ToDo Remove this method once shared attributes are removed logs and traces. - -func (c *Attributes16Accumulator) AppendUniqueAttributesWithID(parentID uint16, attrs pcommon.Map, smattrs *common.SharedAttributes, mattrs *common.SharedAttributes) error { - uniqueAttrsCount := attrs.Len() - if smattrs != nil { - uniqueAttrsCount -= smattrs.Len() - } - if mattrs != nil { - uniqueAttrsCount -= mattrs.Len() - } - - if uniqueAttrsCount == 0 { - return nil - } - - if c.attrsMapCount == math.MaxUint16 { - panic("The maximum number of group of attributes has been reached (max is uint16).") - } - - attrs.Range(func(key string, v pcommon.Value) bool { - if key == "" { - // Skip entries with empty keys - return true - } - - // Skip the current attribute if it is a scope metric shared attribute - // or a metric shared attribute - smattrsFound := false - mattrsFound := false - if smattrs != nil { - _, smattrsFound = smattrs.Attributes[key] - } - if mattrs != nil { - _, mattrsFound = mattrs.Attributes[key] - } - if smattrsFound || mattrsFound { - return true - } - - c.attrs = append(c.attrs, Attr16{ - ParentID: parentID, - Key: key, - Value: &v, - }) - - uniqueAttrsCount-- - return uniqueAttrsCount > 0 - }) - - c.attrsMapCount++ - - return nil -} - // Sort sorts the attributes based on the provided sorter. // The sorter is part of the global configuration and can be different for // different payload types. @@ -410,58 +308,6 @@ func (c *Attributes32Accumulator) Append(ID uint32, attrs pcommon.Map) error { return nil } -func (c *Attributes32Accumulator) AppendUniqueAttributesWithID(ID uint32, attrs pcommon.Map, smattrs *common.SharedAttributes, mattrs *common.SharedAttributes) error { - uniqueAttrsCount := attrs.Len() - if smattrs != nil { - uniqueAttrsCount -= smattrs.Len() - } - if mattrs != nil { - uniqueAttrsCount -= mattrs.Len() - } - - if uniqueAttrsCount == 0 { - return nil - } - - if c.attrsMapCount == math.MaxUint32 { - panic("The maximum number of group of attributes has been reached (max is uint32).") - } - - attrs.Range(func(key string, v pcommon.Value) bool { - if key == "" { - // Skip entries with empty keys - return true - } - - // Skip the current attribute if it is a scope metric shared attribute - // or a metric shared attribute - smattrsFound := false - mattrsFound := false - if smattrs != nil { - _, smattrsFound = smattrs.Attributes[key] - } - if mattrs != nil { - _, mattrsFound = mattrs.Attributes[key] - } - if smattrsFound || mattrsFound { - return true - } - - c.attrs = append(c.attrs, Attr32{ - ParentID: ID, - Key: key, - Value: &v, - }) - - uniqueAttrsCount-- - return uniqueAttrsCount > 0 - }) - - c.attrsMapCount++ - - return nil -} - // Sort sorts the attributes based on the provided sorter. // The sorter is part of the global configuration and can be different for // different payload types. @@ -476,6 +322,10 @@ func (c *Attributes32Accumulator) Reset() { } func Equal(a, b *pcommon.Value) bool { + if a == nil || b == nil { + return false + } + switch a.Type() { case pcommon.ValueTypeInt: if b.Type() == pcommon.ValueTypeInt { @@ -519,6 +369,10 @@ func Equal(a, b *pcommon.Value) bool { } func IsLess(a, b *pcommon.Value) bool { + if a == nil || b == nil { + return false + } + switch a.Type() { case pcommon.ValueTypeInt: if b.Type() == pcommon.ValueTypeInt { @@ -558,6 +412,10 @@ func IsLess(a, b *pcommon.Value) bool { } func Compare(a, b *pcommon.Value) int { + if a == nil || b == nil { + return -1 + } + switch a.Type() { case pcommon.ValueTypeInt: aI := a.Int() diff --git a/pkg/otel/common/arrow/attributes_16.go b/pkg/otel/common/arrow/attributes_16.go index cf02ccc7..19f55e0b 100644 --- a/pkg/otel/common/arrow/attributes_16.go +++ b/pkg/otel/common/arrow/attributes_16.go @@ -87,17 +87,6 @@ type ( } ) -func NewAttrs16Builder(rBuilder *builder.RecordBuilderExt, payloadType *PayloadType, sorter Attrs16Sorter) *Attrs16Builder { - b := &Attrs16Builder{ - released: false, - builder: rBuilder, - accumulator: NewAttributes16Accumulator(sorter), - payloadType: payloadType, - } - b.init() - return b -} - func NewAttrs16BuilderWithEncoding(rBuilder *builder.RecordBuilderExt, payloadType *PayloadType, config *Attrs16Config) *Attrs16Builder { b := &Attrs16Builder{ released: false, diff --git a/pkg/otel/common/arrow/resource.go b/pkg/otel/common/arrow/resource.go index 779e781c..cb614967 100644 --- a/pkg/otel/common/arrow/resource.go +++ b/pkg/otel/common/arrow/resource.go @@ -61,11 +61,6 @@ type ResourceBuilder struct { dacb *builder.Uint32Builder // `dropped_attributes_count` field builder } -// NewResourceBuilder creates a new resource builder with a given allocator. -func NewResourceBuilder(builder *builder.StructBuilder) *ResourceBuilder { - return ResourceBuilderFrom(builder) -} - // ResourceBuilderFrom creates a new resource builder from an existing struct builder. func ResourceBuilderFrom(builder *builder.StructBuilder) *ResourceBuilder { aib := builder.Uint16DeltaBuilder(constants.ID) diff --git a/pkg/otel/common/arrow/scope.go b/pkg/otel/common/arrow/scope.go index 5707dea7..f20e98a6 100644 --- a/pkg/otel/common/arrow/scope.go +++ b/pkg/otel/common/arrow/scope.go @@ -47,11 +47,6 @@ type ScopeBuilder struct { dacb *builder.Uint32Builder // Dropped attributes count builder } -// NewScopeBuilder creates a new instrumentation scope array builder with a given allocator. -func NewScopeBuilder(builder *builder.StructBuilder) *ScopeBuilder { - return ScopeBuilderFrom(builder) -} - // ScopeBuilderFrom creates a new instrumentation scope array builder from an existing struct builder. func ScopeBuilderFrom(sb *builder.StructBuilder) *ScopeBuilder { aib := sb.Uint16DeltaBuilder(constants.ID) diff --git a/pkg/otel/common/attributes.go b/pkg/otel/common/attributes.go index 15277a9d..367ea9c2 100644 --- a/pkg/otel/common/attributes.go +++ b/pkg/otel/common/attributes.go @@ -18,89 +18,8 @@ import ( "sort" "go.opentelemetry.io/collector/pdata/pcommon" - - "github.com/f5/otel-arrow-adapter/pkg/otel/pdata" ) -// SharedAttributes is a data structure representing the shared attributes of a set of metrics. -type SharedAttributes struct { - Attributes map[string]pcommon.Value -} - -// NewSharedAttributesFrom creates a new SharedAttributes from a [pcommon.Map] of attributes. -func NewSharedAttributesFrom(attrs pcommon.Map) *SharedAttributes { - attributes := make(map[string]pcommon.Value) - - attrs.Range(func(k string, v pcommon.Value) bool { - attributes[k] = v - return true - }) - - return &SharedAttributes{ - Attributes: attributes, - } -} - -// CopyTo copies the current SharedAttributes to a [pcommon.Map] of attributes. -// The attributes are sorted by key before being copied to make the tests -// deterministic. -func (sa *SharedAttributes) CopyTo(attrs pcommon.Map) { - for k, v := range sa.Attributes { - v.CopyTo(attrs.PutEmpty(k)) - } -} - -func (sa *SharedAttributes) Clone() *SharedAttributes { - attributes := make(map[string]pcommon.Value) - for k, v := range sa.Attributes { - attributes[k] = v - } - - return &SharedAttributes{ - Attributes: attributes, - } -} - -// IntersectWithMap intersects the current SharedAttributes with a [pcommon.Map] of attributes -// and returns the number of shared attributes after the intersection. -func (sa *SharedAttributes) IntersectWithMap(attrs pcommon.Map) int { - for k, v := range sa.Attributes { - if otherV, ok := attrs.Get(k); ok { - if !pdata.ValuesEqual(v, otherV) { - delete(sa.Attributes, k) - } - } else { - delete(sa.Attributes, k) - } - } - - return len(sa.Attributes) -} - -func (sa *SharedAttributes) IntersectWith(other *SharedAttributes) int { - for k, v := range sa.Attributes { - if otherV, ok := other.Attributes[k]; ok { - if !pdata.ValuesEqual(v, otherV) { - delete(sa.Attributes, k) - } - } else { - delete(sa.Attributes, k) - } - } - return len(sa.Attributes) -} - -// Has returns true if the current SharedAttributes has the given attribute. -func (sa *SharedAttributes) Has(k string) bool { - _, ok := sa.Attributes[k] - return ok -} - -// Len returns the number of attributes in the current SharedAttributes. -func (sa *SharedAttributes) Len() int { - return len(sa.Attributes) -} - type ( AttributeEntry struct { Key string diff --git a/pkg/otel/common/otlp/attributes.go b/pkg/otel/common/otlp/attributes.go index 5ac51771..313edfd9 100644 --- a/pkg/otel/common/otlp/attributes.go +++ b/pkg/otel/common/otlp/attributes.go @@ -1,121 +1,445 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package otlp +// ToDo This file will replace pkg/otel/common/otlp/attributes.go once all OTel entities will be migrated to the hybrid model. + import ( "github.com/apache/arrow/go/v12/arrow" - "github.com/apache/arrow/go/v12/arrow/array" "go.opentelemetry.io/collector/pdata/pcommon" arrowutils "github.com/f5/otel-arrow-adapter/pkg/arrow" "github.com/f5/otel-arrow-adapter/pkg/otel/common" + carrow "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" + oschema "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema" "github.com/f5/otel-arrow-adapter/pkg/otel/constants" "github.com/f5/otel-arrow-adapter/pkg/werror" ) -type AttributeIds struct { - Id int +type ( + // AttributeIDs is a struct containing the Arrow field IDs of the + // attributes. + AttributeIDs struct { + ParentID int + ParentIDDeltaEncoded bool + Key int + Type int + Str int + Int int + Double int + Bool int + Bytes int + Ser int + } + + // Attributes16Store is a store for attributes. + // The attributes are stored in a map by ID. This ID represents the + // identifier of the main entity (span, event, link, etc.) to which the + // attributes are attached. So the maximum number of attributes per entity + // is not limited. + Attributes16Store struct { + lastID uint16 + attributesByID map[uint16]*pcommon.Map + } + + // Attributes32Store is a store for attributes. + // The attributes are stored in a map by ID. This ID represents the + // identifier of the main entity (span, event, link, etc.) to which the + // attributes are attached. So the maximum number of attributes per entity + // is not limited. + Attributes32Store struct { + lastID uint32 + attributesByID map[uint32]*pcommon.Map + } + + Attrs16ParentIdDecoder struct { + prevParentID uint16 + prevKey string + prevValue *pcommon.Value + encodingType int + } + + Attrs32ParentIdDecoder struct { + prevParentID uint32 + prevKey string + prevValue *pcommon.Value + encodingType int + } +) + +// NewAttributes16Store creates a new Attributes16Store. +func NewAttributes16Store() *Attributes16Store { + return &Attributes16Store{ + attributesByID: make(map[uint16]*pcommon.Map), + } } -// ToDo this file must be removed once all the attributes are migrated to the new format (see logs and traces). +// NewAttributes32Store creates a new Attributes32Store. +func NewAttributes32Store() *Attributes32Store { + return &Attributes32Store{ + attributesByID: make(map[uint32]*pcommon.Map), + } +} -func NewAttributeIds(structDT *arrow.StructType) (*AttributeIds, error) { - id, _ := arrowutils.FieldIDFromStruct(structDT, constants.Attributes) - return &AttributeIds{Id: id}, nil +// AttributesByDeltaID returns the attributes for the given Delta ID. +func (s *Attributes16Store) AttributesByDeltaID(ID uint16) *pcommon.Map { + s.lastID += ID + if m, ok := s.attributesByID[s.lastID]; ok { + return m + } + return nil } -func NewSharedAttributeIds(structDT *arrow.StructType) *AttributeIds { - id, found := structDT.FieldIdx(constants.SharedAttributes) - if !found { - return nil +// AttributesByID returns the attributes for the given ID. +func (s *Attributes16Store) AttributesByID(ID uint16) *pcommon.Map { + if m, ok := s.attributesByID[ID]; ok { + return m } - return &AttributeIds{Id: id} + return nil } -func NewSharedEventAttributeIds(structDT *arrow.StructType) *AttributeIds { - id, found := structDT.FieldIdx(constants.SharedEventAttributes) - if !found { - return nil +// AttributesByID returns the attributes for the given ID. +func (s *Attributes32Store) AttributesByID(ID uint32) *pcommon.Map { + if m, ok := s.attributesByID[ID]; ok { + return m } - return &AttributeIds{Id: id} + return nil } -func NewSharedLinkAttributeIds(structDT *arrow.StructType) *AttributeIds { - id, found := structDT.FieldIdx(constants.SharedLinkAttributes) - if !found { - return nil +// AttributesByDeltaID returns the attributes for the given Delta ID. +func (s *Attributes32Store) AttributesByDeltaID(ID uint32) *pcommon.Map { + s.lastID += ID + if m, ok := s.attributesByID[s.lastID]; ok { + return m } - return &AttributeIds{Id: id} + return nil } -func AppendAttributesInto(attrs pcommon.Map, parentArr *array.Struct, row int, attributeIds *AttributeIds) error { - marr, err := attributesFromStruct(attributeIds.Id, parentArr, row) +// Attributes16StoreFrom creates an Attribute16Store from an arrow.Record. +// Note: This function consume the record. +func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error { + defer record.Release() + + attrIDS, err := SchemaToAttributeIDs(record.Schema()) if err != nil { return werror.Wrap(err) } - if marr == nil { - return nil + + attrsCount := int(record.NumRows()) + + parentIdDecoder := NewAttrs16ParentIdDecoder() + + // Read all key/value tuples from the record and reconstruct the attributes + // map by ID. + for i := 0; i < attrsCount; i++ { + key, err := arrowutils.StringFromRecord(record, attrIDS.Key, i) + if err != nil { + return werror.Wrap(err) + } + + vType, err := arrowutils.U8FromRecord(record, attrIDS.Type, i) + if err != nil { + return werror.Wrap(err) + } + + value := pcommon.NewValueEmpty() + switch pcommon.ValueType(vType) { + case pcommon.ValueTypeStr: + v, err := arrowutils.StringFromRecord(record, attrIDS.Str, i) + if err != nil { + return werror.Wrap(err) + } + value.SetStr(v) + case pcommon.ValueTypeInt: + v, err := arrowutils.I64FromRecord(record, attrIDS.Int, i) + if err != nil { + return werror.Wrap(err) + } + value.SetInt(v) + case pcommon.ValueTypeDouble: + v, err := arrowutils.F64FromRecord(record, attrIDS.Double, i) + if err != nil { + return werror.Wrap(err) + } + value.SetDouble(v) + case pcommon.ValueTypeBool: + v, err := arrowutils.BoolFromRecord(record, attrIDS.Bool, i) + if err != nil { + return werror.Wrap(err) + } + value.SetBool(v) + case pcommon.ValueTypeBytes: + v, err := arrowutils.BinaryFromRecord(record, attrIDS.Bytes, i) + if err != nil { + return werror.Wrap(err) + } + value.SetEmptyBytes().FromRaw(v) + case pcommon.ValueTypeSlice: + v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) + if err != nil { + return werror.Wrap(err) + } + if err = common.Deserialize(v, value); err != nil { + return werror.Wrap(err) + } + case pcommon.ValueTypeMap: + v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) + if err != nil { + return werror.Wrap(err) + } + if err = common.Deserialize(v, value); err != nil { + return werror.Wrap(err) + } + default: + // silently ignore unknown types to avoid DOS attacks + } + + deltaOrParentID, err := arrowutils.U16FromRecord(record, attrIDS.ParentID, i) + if err != nil { + return werror.Wrap(err) + } + parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value) + + m, ok := store.attributesByID[parentID] + if !ok { + newMap := pcommon.NewMap() + m = &newMap + store.attributesByID[parentID] = m + } + value.CopyTo(m.PutEmpty(key)) } - return UpdateAttributesFrom(attrs, marr, row) + return nil } -func UpdateAttributesFrom(attrs pcommon.Map, marr *array.Map, row int) error { - if marr.IsNull(row) { - return nil +// Attributes32StoreFrom creates an Attributes32Store from an arrow.Record. +// Note: This function consume the record. +func Attributes32StoreFrom(record arrow.Record, store *Attributes32Store) error { + defer record.Release() + + attrIDS, err := SchemaToAttributeIDs(record.Schema()) + if err != nil { + return werror.Wrap(err) } - start := int(marr.Offsets()[row]) - end := int(marr.Offsets()[row+1]) + attrsCount := int(record.NumRows()) - attrs.EnsureCapacity(end - start) + parentIdDecoder := NewAttrs32ParentIdDecoder() - keys := marr.Keys() - values, ok := marr.Items().(*array.SparseUnion) - if !ok { - return werror.WrapWithContext(common.ErrNotArraySparseUnion, map[string]interface{}{"row": row}) - } + // Read all key/value tuples from the record and reconstruct the attributes + // map by ID. + for i := 0; i < attrsCount; i++ { + key, err := arrowutils.StringFromRecord(record, attrIDS.Key, i) + if err != nil { + return werror.Wrap(err) + } - for i := start; i < end; i++ { - key, err := arrowutils.StringFromArray(keys, i) + vType, err := arrowutils.U8FromRecord(record, attrIDS.Type, i) if err != nil { return werror.Wrap(err) } + value := pcommon.NewValueEmpty() + switch pcommon.ValueType(vType) { + case pcommon.ValueTypeStr: + v, err := arrowutils.StringFromRecord(record, attrIDS.Str, i) + if err != nil { + return werror.Wrap(err) + } + value.SetStr(v) + case pcommon.ValueTypeInt: + v, err := arrowutils.I64FromRecord(record, attrIDS.Int, i) + if err != nil { + return werror.Wrap(err) + } + value.SetInt(v) + case pcommon.ValueTypeDouble: + v, err := arrowutils.F64FromRecord(record, attrIDS.Double, i) + if err != nil { + return werror.Wrap(err) + } + value.SetDouble(v) + case pcommon.ValueTypeBool: + v, err := arrowutils.BoolFromRecord(record, attrIDS.Bool, i) + if err != nil { + return werror.Wrap(err) + } + value.SetBool(v) + case pcommon.ValueTypeBytes: + v, err := arrowutils.BinaryFromRecord(record, attrIDS.Bytes, i) + if err != nil { + return werror.Wrap(err) + } + value.SetEmptyBytes().FromRaw(v) + case pcommon.ValueTypeSlice: + v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) + if err != nil { + return werror.Wrap(err) + } + if err = common.Deserialize(v, value); err != nil { + return werror.Wrap(err) + } + case pcommon.ValueTypeMap: + v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) + if err != nil { + return werror.Wrap(err) + } + if err = common.Deserialize(v, value); err != nil { + return werror.Wrap(err) + } + default: + // silently ignore unknown types to avoid DOS attacks + } - if err = UpdateValueFrom(attrs.PutEmpty(key), values, i); err != nil { + deltaOrParentID, err := arrowutils.U32FromRecord(record, attrIDS.ParentID, i) + if err != nil { return werror.Wrap(err) } + parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value) + + m, ok := store.attributesByID[parentID] + if !ok { + newMap := pcommon.NewMap() + m = &newMap + store.attributesByID[parentID] = m + } + value.CopyTo(m.PutEmpty(key)) } + return nil } -func attributesFromStruct(fieldID int, parentArr *array.Struct, row int) (marr *array.Map, err error) { - if fieldID == -1 { - return nil, nil +// SchemaToAttributeIDs pre-computes the field IDs for the attributes record. +func SchemaToAttributeIDs(schema *arrow.Schema) (*AttributeIDs, error) { + parentID, err := arrowutils.MandatoryFieldIDFromSchema(schema, constants.ParentID) + if err != nil { + return nil, werror.Wrap(err) + } + + deltaEncoded := false + v, found := schema.Field(parentID).Metadata.GetValue(oschema.EncodingKey) + if found { + deltaEncoded = v == oschema.DeltaEncodingValue + } + + key, err := arrowutils.MandatoryFieldIDFromSchema(schema, constants.AttributeKey) + if err != nil { + return nil, werror.Wrap(err) + } + + vType, err := arrowutils.MandatoryFieldIDFromSchema(schema, constants.AttributeType) + if err != nil { + return nil, werror.Wrap(err) + } + vStr, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeStr) + if err != nil { + return nil, werror.Wrap(err) + } + vInt, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeInt) + if err != nil { + return nil, werror.Wrap(err) + } + vDouble, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeDouble) + if err != nil { + return nil, werror.Wrap(err) + } + vBool, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeBool) + if err != nil { + return nil, werror.Wrap(err) + } + vBytes, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeBytes) + if err != nil { + return nil, werror.Wrap(err) + } + vSer, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeSer) + if err != nil { + return nil, werror.Wrap(err) + } + + return &AttributeIDs{ + ParentID: parentID, + ParentIDDeltaEncoded: deltaEncoded, + Key: key, + Type: vType, + Str: vStr, + Int: vInt, + Double: vDouble, + Bool: vBool, + Bytes: vBytes, + Ser: vSer, + }, nil +} + +func NewAttrs16ParentIdDecoder() *Attrs16ParentIdDecoder { + return &Attrs16ParentIdDecoder{ + encodingType: carrow.ParentIdDeltaGroupEncoding, } +} - column := parentArr.Field(fieldID) - switch arr := column.(type) { - case *array.Map: - if arr.IsNull(row) { - return +func (d *Attrs16ParentIdDecoder) Decode(deltaOrParentID uint16, key string, value *pcommon.Value) uint16 { + switch d.encodingType { + case carrow.ParentIdNoEncoding: + return deltaOrParentID + case carrow.ParentIdDeltaEncoding: + decodedParentID := d.prevParentID + deltaOrParentID + d.prevParentID = decodedParentID + return decodedParentID + case carrow.ParentIdDeltaGroupEncoding: + if d.prevKey == key && carrow.Equal(d.prevValue, value) { + parentID := d.prevParentID + deltaOrParentID + d.prevParentID = parentID + return parentID + } else { + d.prevKey = key + d.prevValue = value + d.prevParentID = deltaOrParentID + return deltaOrParentID } + default: + panic("unknown attrs16 parent ID encoding type") + } +} - marr = arr +func NewAttrs32ParentIdDecoder() *Attrs32ParentIdDecoder { + return &Attrs32ParentIdDecoder{ + encodingType: carrow.ParentIdDeltaGroupEncoding, + } +} + +func (d *Attrs32ParentIdDecoder) Decode(deltaOrParentID uint32, key string, value *pcommon.Value) uint32 { + switch d.encodingType { + case carrow.ParentIdNoEncoding: + return deltaOrParentID + case carrow.ParentIdDeltaEncoding: + decodedParentID := d.prevParentID + deltaOrParentID + d.prevParentID = decodedParentID + return decodedParentID + case carrow.ParentIdDeltaGroupEncoding: + if d.prevKey == key && carrow.Equal(d.prevValue, value) { + parentID := d.prevParentID + deltaOrParentID + d.prevParentID = parentID + return parentID + } else { + d.prevKey = key + d.prevValue = value + d.prevParentID = deltaOrParentID + return deltaOrParentID + } default: - err = werror.WrapWithContext(common.ErrNotArrayMap, map[string]interface{}{"row": row, "fieldID": fieldID}) + panic("unknown attrs32 parent ID encoding type") } - return } diff --git a/pkg/otel/common/otlp/attributes_new.go b/pkg/otel/common/otlp/attributes_new.go deleted file mode 100644 index 07ab96db..00000000 --- a/pkg/otel/common/otlp/attributes_new.go +++ /dev/null @@ -1,445 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package otlp - -// ToDo This file will replace pkg/otel/common/otlp/attributes.go once all OTel entities will be migrated to the hybrid model. - -import ( - "github.com/apache/arrow/go/v12/arrow" - "go.opentelemetry.io/collector/pdata/pcommon" - - arrowutils "github.com/f5/otel-arrow-adapter/pkg/arrow" - "github.com/f5/otel-arrow-adapter/pkg/otel/common" - carrow "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" - oschema "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema" - "github.com/f5/otel-arrow-adapter/pkg/otel/constants" - "github.com/f5/otel-arrow-adapter/pkg/werror" -) - -type ( - // AttributeIDs is a struct containing the Arrow field IDs of the - // attributes. - AttributeIDs struct { - ParentID int - ParentIDDeltaEncoded bool - Key int - Type int - Str int - Int int - Double int - Bool int - Bytes int - Ser int - } - - // Attributes16Store is a store for attributes. - // The attributes are stored in a map by ID. This ID represents the - // identifier of the main entity (span, event, link, etc.) to which the - // attributes are attached. So the maximum number of attributes per entity - // is not limited. - Attributes16Store struct { - lastID uint16 - attributesByID map[uint16]*pcommon.Map - } - - // Attributes32Store is a store for attributes. - // The attributes are stored in a map by ID. This ID represents the - // identifier of the main entity (span, event, link, etc.) to which the - // attributes are attached. So the maximum number of attributes per entity - // is not limited. - Attributes32Store struct { - lastID uint32 - attributesByID map[uint32]*pcommon.Map - } - - Attrs16ParentIdDecoder struct { - prevParentID uint16 - prevKey string - prevValue *pcommon.Value - encodingType int - } - - Attrs32ParentIdDecoder struct { - prevParentID uint32 - prevKey string - prevValue *pcommon.Value - encodingType int - } -) - -// NewAttributes16Store creates a new Attributes16Store. -func NewAttributes16Store() *Attributes16Store { - return &Attributes16Store{ - attributesByID: make(map[uint16]*pcommon.Map), - } -} - -// NewAttributes32Store creates a new Attributes32Store. -func NewAttributes32Store() *Attributes32Store { - return &Attributes32Store{ - attributesByID: make(map[uint32]*pcommon.Map), - } -} - -// AttributesByDeltaID returns the attributes for the given Delta ID. -func (s *Attributes16Store) AttributesByDeltaID(ID uint16) *pcommon.Map { - s.lastID += ID - if m, ok := s.attributesByID[s.lastID]; ok { - return m - } - return nil -} - -// AttributesByID returns the attributes for the given ID. -func (s *Attributes16Store) AttributesByID(ID uint16) *pcommon.Map { - if m, ok := s.attributesByID[ID]; ok { - return m - } - return nil -} - -// AttributesByID returns the attributes for the given ID. -func (s *Attributes32Store) AttributesByID(ID uint32) *pcommon.Map { - if m, ok := s.attributesByID[ID]; ok { - return m - } - return nil -} - -// AttributesByDeltaID returns the attributes for the given Delta ID. -func (s *Attributes32Store) AttributesByDeltaID(ID uint32) *pcommon.Map { - s.lastID += ID - if m, ok := s.attributesByID[s.lastID]; ok { - return m - } - return nil -} - -// Attributes16StoreFrom creates an Attribute16Store from an arrow.Record. -// Note: This function consume the record. -func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error { - defer record.Release() - - attrIDS, err := SchemaToAttributeIDs(record.Schema()) - if err != nil { - return werror.Wrap(err) - } - - attrsCount := int(record.NumRows()) - - parentIdDecoder := NewAttrs16ParentIdDecoder() - - // Read all key/value tuples from the record and reconstruct the attributes - // map by ID. - for i := 0; i < attrsCount; i++ { - key, err := arrowutils.StringFromRecord(record, attrIDS.Key, i) - if err != nil { - return werror.Wrap(err) - } - - vType, err := arrowutils.U8FromRecord(record, attrIDS.Type, i) - if err != nil { - return werror.Wrap(err) - } - - value := pcommon.NewValueEmpty() - switch pcommon.ValueType(vType) { - case pcommon.ValueTypeStr: - v, err := arrowutils.StringFromRecord(record, attrIDS.Str, i) - if err != nil { - return werror.Wrap(err) - } - value.SetStr(v) - case pcommon.ValueTypeInt: - v, err := arrowutils.I64FromRecord(record, attrIDS.Int, i) - if err != nil { - return werror.Wrap(err) - } - value.SetInt(v) - case pcommon.ValueTypeDouble: - v, err := arrowutils.F64FromRecord(record, attrIDS.Double, i) - if err != nil { - return werror.Wrap(err) - } - value.SetDouble(v) - case pcommon.ValueTypeBool: - v, err := arrowutils.BoolFromRecord(record, attrIDS.Bool, i) - if err != nil { - return werror.Wrap(err) - } - value.SetBool(v) - case pcommon.ValueTypeBytes: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Bytes, i) - if err != nil { - return werror.Wrap(err) - } - value.SetEmptyBytes().FromRaw(v) - case pcommon.ValueTypeSlice: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) - if err != nil { - return werror.Wrap(err) - } - if err = common.Deserialize(v, value); err != nil { - return werror.Wrap(err) - } - case pcommon.ValueTypeMap: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) - if err != nil { - return werror.Wrap(err) - } - if err = common.Deserialize(v, value); err != nil { - return werror.Wrap(err) - } - default: - // silently ignore unknown types to avoid DOS attacks - } - - deltaOrParentID, err := arrowutils.U16FromRecord(record, attrIDS.ParentID, i) - if err != nil { - return werror.Wrap(err) - } - parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value) - - m, ok := store.attributesByID[parentID] - if !ok { - newMap := pcommon.NewMap() - m = &newMap - store.attributesByID[parentID] = m - } - value.CopyTo(m.PutEmpty(key)) - } - - return nil -} - -// Attributes32StoreFrom creates an Attributes32Store from an arrow.Record. -// Note: This function consume the record. -func Attributes32StoreFrom(record arrow.Record, store *Attributes32Store) error { - defer record.Release() - - attrIDS, err := SchemaToAttributeIDs(record.Schema()) - if err != nil { - return werror.Wrap(err) - } - - attrsCount := int(record.NumRows()) - - parentIdDecoder := NewAttrs32ParentIdDecoder() - - // Read all key/value tuples from the record and reconstruct the attributes - // map by ID. - for i := 0; i < attrsCount; i++ { - key, err := arrowutils.StringFromRecord(record, attrIDS.Key, i) - if err != nil { - return werror.Wrap(err) - } - - vType, err := arrowutils.U8FromRecord(record, attrIDS.Type, i) - if err != nil { - return werror.Wrap(err) - } - value := pcommon.NewValueEmpty() - switch pcommon.ValueType(vType) { - case pcommon.ValueTypeStr: - v, err := arrowutils.StringFromRecord(record, attrIDS.Str, i) - if err != nil { - return werror.Wrap(err) - } - value.SetStr(v) - case pcommon.ValueTypeInt: - v, err := arrowutils.I64FromRecord(record, attrIDS.Int, i) - if err != nil { - return werror.Wrap(err) - } - value.SetInt(v) - case pcommon.ValueTypeDouble: - v, err := arrowutils.F64FromRecord(record, attrIDS.Double, i) - if err != nil { - return werror.Wrap(err) - } - value.SetDouble(v) - case pcommon.ValueTypeBool: - v, err := arrowutils.BoolFromRecord(record, attrIDS.Bool, i) - if err != nil { - return werror.Wrap(err) - } - value.SetBool(v) - case pcommon.ValueTypeBytes: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Bytes, i) - if err != nil { - return werror.Wrap(err) - } - value.SetEmptyBytes().FromRaw(v) - case pcommon.ValueTypeSlice: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) - if err != nil { - return werror.Wrap(err) - } - if err = common.Deserialize(v, value); err != nil { - return werror.Wrap(err) - } - case pcommon.ValueTypeMap: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) - if err != nil { - return werror.Wrap(err) - } - if err = common.Deserialize(v, value); err != nil { - return werror.Wrap(err) - } - default: - // silently ignore unknown types to avoid DOS attacks - } - - deltaOrParentID, err := arrowutils.U32FromRecord(record, attrIDS.ParentID, i) - if err != nil { - return werror.Wrap(err) - } - parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value) - - m, ok := store.attributesByID[parentID] - if !ok { - newMap := pcommon.NewMap() - m = &newMap - store.attributesByID[parentID] = m - } - value.CopyTo(m.PutEmpty(key)) - } - - return nil -} - -// SchemaToAttributeIDs pre-computes the field IDs for the attributes record. -func SchemaToAttributeIDs(schema *arrow.Schema) (*AttributeIDs, error) { - parentID, err := arrowutils.FieldIDFromSchema(schema, constants.ParentID) - if err != nil { - return nil, werror.Wrap(err) - } - - deltaEncoded := false - v, found := schema.Field(parentID).Metadata.GetValue(oschema.EncodingKey) - if found { - deltaEncoded = v == oschema.DeltaEncodingValue - } - - key, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeKey) - if err != nil { - return nil, werror.Wrap(err) - } - - vType, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeType) - if err != nil { - return nil, werror.Wrap(err) - } - vStr, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeStr) - if err != nil { - return nil, werror.Wrap(err) - } - vInt, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeInt) - if err != nil { - return nil, werror.Wrap(err) - } - vDouble, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeDouble) - if err != nil { - return nil, werror.Wrap(err) - } - vBool, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeBool) - if err != nil { - return nil, werror.Wrap(err) - } - vBytes, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeBytes) - if err != nil { - return nil, werror.Wrap(err) - } - vSer, err := arrowutils.FieldIDFromSchema(schema, constants.AttributeSer) - if err != nil { - return nil, werror.Wrap(err) - } - - return &AttributeIDs{ - ParentID: parentID, - ParentIDDeltaEncoded: deltaEncoded, - Key: key, - Type: vType, - Str: vStr, - Int: vInt, - Double: vDouble, - Bool: vBool, - Bytes: vBytes, - Ser: vSer, - }, nil -} - -func NewAttrs16ParentIdDecoder() *Attrs16ParentIdDecoder { - return &Attrs16ParentIdDecoder{ - encodingType: carrow.ParentIdDeltaGroupEncoding, - } -} - -func (d *Attrs16ParentIdDecoder) Decode(deltaOrParentID uint16, key string, value *pcommon.Value) uint16 { - switch d.encodingType { - case carrow.ParentIdNoEncoding: - return deltaOrParentID - case carrow.ParentIdDeltaEncoding: - decodedParentID := d.prevParentID + deltaOrParentID - d.prevParentID = decodedParentID - return decodedParentID - case carrow.ParentIdDeltaGroupEncoding: - if d.prevKey == key && carrow.Equal(d.prevValue, value) { - parentID := d.prevParentID + deltaOrParentID - d.prevParentID = parentID - return parentID - } else { - d.prevKey = key - d.prevValue = value - d.prevParentID = deltaOrParentID - return deltaOrParentID - } - default: - panic("unknown attrs16 parent ID encoding type") - } -} - -func NewAttrs32ParentIdDecoder() *Attrs32ParentIdDecoder { - return &Attrs32ParentIdDecoder{ - encodingType: carrow.ParentIdDeltaGroupEncoding, - } -} - -func (d *Attrs32ParentIdDecoder) Decode(deltaOrParentID uint32, key string, value *pcommon.Value) uint32 { - switch d.encodingType { - case carrow.ParentIdNoEncoding: - return deltaOrParentID - case carrow.ParentIdDeltaEncoding: - decodedParentID := d.prevParentID + deltaOrParentID - d.prevParentID = decodedParentID - return decodedParentID - case carrow.ParentIdDeltaGroupEncoding: - if d.prevKey == key && carrow.Equal(d.prevValue, value) { - parentID := d.prevParentID + deltaOrParentID - d.prevParentID = parentID - return parentID - } else { - d.prevKey = key - d.prevValue = value - d.prevParentID = deltaOrParentID - return deltaOrParentID - } - default: - panic("unknown attrs32 parent ID encoding type") - } -} diff --git a/pkg/otel/common/otlp/attributes_test.go b/pkg/otel/common/otlp/attributes_test.go deleted file mode 100644 index 8d5cc93c..00000000 --- a/pkg/otel/common/otlp/attributes_test.go +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package otlp - -import ( - "testing" - - "github.com/apache/arrow/go/v12/arrow" - "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/memory" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" - - carrow "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" - "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema" - "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/builder" - "github.com/f5/otel-arrow-adapter/pkg/otel/internal" - "github.com/f5/otel-arrow-adapter/pkg/otel/stats" -) - -func TestAttributes(t *testing.T) { - t.Parallel() - - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) - - s := arrow.NewSchema([]arrow.Field{ - {Name: "attributes", Type: carrow.AttributesDT, Metadata: schema.Metadata(schema.Optional)}, - }, nil) - - rBuilder := builder.NewRecordBuilderExt(pool, s, DefaultDictConfig, stats.NewProducerStats()) - defer rBuilder.Release() - - var record arrow.Record - var err error - - maxIter := 10 - - // Create Arrow record from OTLP attributes - for { - b := carrow.AttributesBuilderFrom(rBuilder.MapBuilder("attributes")) - for i := 0; i < maxIter; i++ { - err = b.Append(internal.Attrs1()) - require.NoError(t, err) - err = b.Append(internal.Attrs2()) - require.NoError(t, err) - err = b.Append(internal.Attrs3()) - require.NoError(t, err) - err = b.Append(internal.Attrs4()) - require.NoError(t, err) - err = b.Append(internal.Attrs5()) - require.NoError(t, err) - } - - record, err = rBuilder.NewRecord() - if err == nil { - break - } - assert.Error(t, schema.ErrSchemaNotUpToDate) - } - defer record.Release() - - // Retrieve the Arrow Map representing the attributes - arr := record.Columns()[0].(*array.Map) - - // Check the OTLP Arrow encoding and OTLP decoding by - // comparing the original and decoded attributes. - row := 0 - for i := 0; i < maxIter; i++ { - - value := pcommon.NewMap() - err = UpdateAttributesFrom(value, arr, row) - require.NoError(t, err) - assert.Equal(t, internal.Attrs1(), value) - row++ - - value = pcommon.NewMap() - err = UpdateAttributesFrom(value, arr, row) - require.NoError(t, err) - assert.Equal(t, internal.Attrs2(), value) - row++ - - value = pcommon.NewMap() - err = UpdateAttributesFrom(value, arr, row) - require.NoError(t, err) - assert.Equal(t, internal.Attrs3(), value) - row++ - - value = pcommon.NewMap() - err = UpdateAttributesFrom(value, arr, row) - require.NoError(t, err) - assert.Equal(t, internal.Attrs4(), value) - row++ - - value = pcommon.NewMap() - err = UpdateAttributesFrom(value, arr, row) - require.NoError(t, err) - assert.Equal(t, internal.Attrs5(), value) - row++ - } -} diff --git a/pkg/otel/common/otlp/errors.go b/pkg/otel/common/otlp/errors.go index 57f5bf70..fef67448 100644 --- a/pkg/otel/common/otlp/errors.go +++ b/pkg/otel/common/otlp/errors.go @@ -20,6 +20,7 @@ package otlp import "errors" var ( + ErrMissingRelatedData = errors.New("missing related data") ErrInvalidTypeCode = errors.New("invalid type code") ErrInvalidFieldId = errors.New("invalid field id") ErrParentIDMissing = errors.New("parent id missing") diff --git a/pkg/otel/common/otlp/resource.go b/pkg/otel/common/otlp/resource.go index c3090721..79d5daf6 100644 --- a/pkg/otel/common/otlp/resource.go +++ b/pkg/otel/common/otlp/resource.go @@ -30,23 +30,6 @@ type ResourceIds struct { SchemaUrl int } -// ToDo remove this function once metrics have been converted to the model v1 -func NewResourceIds(resSpansDT *arrow.StructType) (*ResourceIds, error) { - resId, resDT, err := arrowutils.StructFieldIDFromStruct(resSpansDT, constants.Resource) - if err != nil { - return nil, werror.Wrap(err) - } - - attributeIds, _ := arrowutils.FieldIDFromStruct(resDT, constants.ID) - droppedAttributesCount, _ := arrowutils.FieldIDFromStruct(resDT, constants.DroppedAttributesCount) - - return &ResourceIds{ - Resource: resId, - ID: attributeIds, - DroppedAttributesCount: droppedAttributesCount, - }, nil -} - func NewResourceIdsFromSchema(schema *arrow.Schema) (*ResourceIds, error) { resource, resDT, err := arrowutils.StructFieldIDFromSchema(schema, constants.Resource) if err != nil { @@ -65,37 +48,6 @@ func NewResourceIdsFromSchema(schema *arrow.Schema) (*ResourceIds, error) { }, nil } -// ToDo remove this function once metrics have been converted to the model v1 - -// UpdateResourceWith updates a resource with the content of an Arrow array. -func UpdateResourceWith(r pcommon.Resource, resList *arrowutils.ListOfStructs, row int, resIds *ResourceIds, attrsStore *Attributes16Store) error { - _, resArr, err := resList.StructByID(resIds.Resource, row) - if err != nil { - return werror.WrapWithContext(err, map[string]interface{}{"row": row}) - } - - // Read dropped attributes count - droppedAttributesCount, err := arrowutils.U32FromStruct(resArr, row, resIds.DroppedAttributesCount) - if err != nil { - return werror.WrapWithContext(err, map[string]interface{}{"row": row}) - } - r.SetDroppedAttributesCount(droppedAttributesCount) - - // Read attributes - attrsId, err := arrowutils.NullableU16FromStruct(resArr, row, resIds.ID) - if err != nil { - return werror.WrapWithContext(err, map[string]interface{}{"row": row}) - } - if attrsId != nil { - attrs := attrsStore.AttributesByDeltaID(*attrsId) - if attrs != nil { - attrs.CopyTo(r.Attributes()) - } - } - - return err -} - func UpdateResourceFromRecord(r pcommon.Resource, record arrow.Record, row int, resIds *ResourceIds, attrsStore *Attributes16Store) (schemaUrl string, err error) { resArr, err := arrowutils.StructFromRecord(record, resIds.Resource, row) if err != nil { diff --git a/pkg/otel/common/otlp/scope.go b/pkg/otel/common/otlp/scope.go index 6841f0ef..6cb58fc9 100644 --- a/pkg/otel/common/otlp/scope.go +++ b/pkg/otel/common/otlp/scope.go @@ -31,26 +31,6 @@ type ScopeIds struct { DroppedAttributesCount int } -// ToDo remove this function once metrics have been converted to the model v1 -func NewScopeIds(resSpansDT *arrow.StructType) (*ScopeIds, error) { - scopeID, scopeDT, err := arrowutils.StructFieldIDFromStruct(resSpansDT, constants.Scope) - if err != nil { - return nil, werror.Wrap(err) - } - - nameID, _ := arrowutils.FieldIDFromStruct(scopeDT, constants.Name) - versionID, _ := arrowutils.FieldIDFromStruct(scopeDT, constants.Version) - droppedAttributesCountID, _ := arrowutils.FieldIDFromStruct(scopeDT, constants.DroppedAttributesCount) - attrsID, _ := arrowutils.FieldIDFromStruct(scopeDT, constants.ID) - return &ScopeIds{ - Scope: scopeID, - Name: nameID, - Version: versionID, - DroppedAttributesCount: droppedAttributesCountID, - ID: attrsID, - }, nil -} - func NewScopeIdsFromSchema(schema *arrow.Schema) (*ScopeIds, error) { scopeID, scopeDT, err := arrowutils.StructFieldIDFromSchema(schema, constants.Scope) if err != nil { @@ -70,49 +50,6 @@ func NewScopeIdsFromSchema(schema *arrow.Schema) (*ScopeIds, error) { }, nil } -// ToDo remove this function once metrics have been converted to the model v1 - -// UpdateScopeWith appends a scope into a given scope spans from an Arrow list of structs. -func UpdateScopeWith( - s pcommon.InstrumentationScope, - listOfStructs *arrowutils.ListOfStructs, - row int, - ids *ScopeIds, - attrsStore *Attributes16Store, -) error { - _, scopeArray, err := listOfStructs.StructByID(ids.Scope, row) - if err != nil { - return werror.WrapWithContext(err, map[string]interface{}{"row": row}) - } - name, err := arrowutils.StringFromStruct(scopeArray, row, ids.Name) - if err != nil { - return werror.WrapWithContext(err, map[string]interface{}{"row": row}) - } - version, err := arrowutils.StringFromStruct(scopeArray, row, ids.Version) - if err != nil { - return werror.WrapWithContext(err, map[string]interface{}{"row": row}) - } - droppedAttributesCount, err := arrowutils.U32FromStruct(scopeArray, row, ids.DroppedAttributesCount) - if err != nil { - return werror.WrapWithContext(err, map[string]interface{}{"row": row}) - } - - attrsID, err := arrowutils.NullableU16FromStruct(scopeArray, row, ids.ID) - if err != nil { - return werror.WrapWithContext(err, map[string]interface{}{"row": row}) - } - if attrsID != nil { - attrs := attrsStore.AttributesByDeltaID(*attrsID) - if attrs != nil { - attrs.CopyTo(s.Attributes()) - } - } - s.SetName(name) - s.SetVersion(version) - s.SetDroppedAttributesCount(droppedAttributesCount) - return nil -} - func UpdateScopeFromRecord( s pcommon.InstrumentationScope, record arrow.Record, diff --git a/pkg/otel/common/test_utils.go b/pkg/otel/common/test_utils.go new file mode 100644 index 00000000..89aa989f --- /dev/null +++ b/pkg/otel/common/test_utils.go @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package common + +import ( + "math/rand" + + "github.com/apache/arrow/go/v12/arrow" + + "github.com/f5/otel-arrow-adapter/pkg/record_message" +) + +func MixUpArrowRecords(rng *rand.Rand, record arrow.Record, relatedRecords []*record_message.RecordMessage) (bool, arrow.Record, []*record_message.RecordMessage) { + mainRecordChanged := false + + if rng.Intn(100)%2 == 0 { + // exchange one of the related records with the main record + relatedRecordPos := rng.Intn(len(relatedRecords)) + relatedRecord := relatedRecords[relatedRecordPos].Record() + relatedRecords[relatedRecordPos].SetRecord(record) + record = relatedRecord + mainRecordChanged = true + } + + // mix up the related records + payloadTypes := make([]record_message.PayloadType, len(relatedRecords)) + for i := 0; i < len(relatedRecords); i++ { + payloadTypes[i] = relatedRecords[i].PayloadType() + } + rng.Shuffle(len(payloadTypes), func(i, j int) { payloadTypes[i], payloadTypes[j] = payloadTypes[j], payloadTypes[i] }) + for i := 0; i < len(relatedRecords); i++ { + relatedRecords[i].SetPayloadType(payloadTypes[i]) + } + + return mainRecordChanged, record, relatedRecords +} diff --git a/pkg/otel/logs/arrow/related_data.go b/pkg/otel/logs/arrow/related_data.go index d003eb4c..f93183b5 100644 --- a/pkg/otel/logs/arrow/related_data.go +++ b/pkg/otel/logs/arrow/related_data.go @@ -20,8 +20,6 @@ package arrow // Infrastructure to manage related records. import ( - "math" - carrow "github.com/f5/otel-arrow-adapter/pkg/otel/common/arrow" "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/builder" "github.com/f5/otel-arrow-adapter/pkg/otel/stats" @@ -32,8 +30,6 @@ type ( // RelatedData is a collection of related/dependent data to log record // entities. RelatedData struct { - logRecordCount uint64 - relatedRecordsManager *carrow.RelatedRecordsManager attrsBuilders *AttrsBuilders @@ -91,25 +87,9 @@ func (r *RelatedData) RecordBuilderExt(payloadType *carrow.PayloadType) *builder } func (r *RelatedData) Reset() { - r.logRecordCount = 0 r.relatedRecordsManager.Reset() } -func (r *RelatedData) LogRecordCount() uint16 { - return uint16(r.logRecordCount) -} - -func (r *RelatedData) NextSpanID() uint16 { - c := r.logRecordCount - - if c == math.MaxUint16 { - panic("maximum number of log records reached per batch, please reduce the batch size to a maximum of 65535 log records") - } - - r.logRecordCount++ - return uint16(c) -} - func (r *RelatedData) BuildRecordMessages() ([]*record_message.RecordMessage, error) { return r.relatedRecordsManager.BuildRecordMessages() } diff --git a/pkg/otel/logs/otlp/logs.go b/pkg/otel/logs/otlp/logs.go index 6396720d..32830452 100644 --- a/pkg/otel/logs/otlp/logs.go +++ b/pkg/otel/logs/otlp/logs.go @@ -60,6 +60,10 @@ func LogsFrom(record arrow.Record, relatedData *RelatedData) (plog.Logs, error) logs := plog.NewLogs() + if relatedData == nil { + return logs, werror.Wrap(otlp.ErrMissingRelatedData) + } + logRecordIDs, err := SchemaToIDs(record.Schema()) if err != nil { return logs, werror.Wrap(err) @@ -158,60 +162,64 @@ func LogsFrom(record arrow.Record, relatedData *RelatedData) (plog.Logs, error) if err != nil { return logs, werror.WrapWithContext(err, map[string]interface{}{"row": row}) } - bodyType, err := arrowutils.U8FromStruct(bodyStruct, row, logRecordIDs.BodyType) - if err != nil { - return logs, werror.Wrap(err) - } - body := logRecord.Body() - switch pcommon.ValueType(bodyType) { - case pcommon.ValueTypeStr: - v, err := arrowutils.StringFromStruct(bodyStruct, row, logRecordIDs.BodyStr) - if err != nil { - return logs, werror.Wrap(err) - } - body.SetStr(v) - case pcommon.ValueTypeInt: - v, err := arrowutils.I64FromStruct(bodyStruct, row, logRecordIDs.BodyInt) - if err != nil { - return logs, werror.Wrap(err) - } - body.SetInt(v) - case pcommon.ValueTypeDouble: - v, err := arrowutils.F64FromStruct(bodyStruct, row, logRecordIDs.BodyDouble) - if err != nil { - return logs, werror.Wrap(err) - } - body.SetDouble(v) - case pcommon.ValueTypeBool: - v, err := arrowutils.BoolFromStruct(bodyStruct, row, logRecordIDs.BodyBool) - if err != nil { - return logs, werror.Wrap(err) - } - body.SetBool(v) - case pcommon.ValueTypeBytes: - v, err := arrowutils.BinaryFromStruct(bodyStruct, row, logRecordIDs.BodyBytes) - if err != nil { - return logs, werror.Wrap(err) - } - body.SetEmptyBytes().FromRaw(v) - case pcommon.ValueTypeSlice: - v, err := arrowutils.BinaryFromStruct(bodyStruct, row, logRecordIDs.BodySer) - if err != nil { - return logs, werror.Wrap(err) - } - if err = common.Deserialize(v, body); err != nil { - return logs, werror.Wrap(err) - } - case pcommon.ValueTypeMap: - v, err := arrowutils.BinaryFromStruct(bodyStruct, row, logRecordIDs.BodySer) + + if bodyStruct != nil { + // If there is a body struct, read the body type and value + bodyType, err := arrowutils.U8FromStruct(bodyStruct, row, logRecordIDs.BodyType) if err != nil { return logs, werror.Wrap(err) } - if err = common.Deserialize(v, body); err != nil { - return logs, werror.Wrap(err) + body := logRecord.Body() + switch pcommon.ValueType(bodyType) { + case pcommon.ValueTypeStr: + v, err := arrowutils.StringFromStruct(bodyStruct, row, logRecordIDs.BodyStr) + if err != nil { + return logs, werror.Wrap(err) + } + body.SetStr(v) + case pcommon.ValueTypeInt: + v, err := arrowutils.I64FromStruct(bodyStruct, row, logRecordIDs.BodyInt) + if err != nil { + return logs, werror.Wrap(err) + } + body.SetInt(v) + case pcommon.ValueTypeDouble: + v, err := arrowutils.F64FromStruct(bodyStruct, row, logRecordIDs.BodyDouble) + if err != nil { + return logs, werror.Wrap(err) + } + body.SetDouble(v) + case pcommon.ValueTypeBool: + v, err := arrowutils.BoolFromStruct(bodyStruct, row, logRecordIDs.BodyBool) + if err != nil { + return logs, werror.Wrap(err) + } + body.SetBool(v) + case pcommon.ValueTypeBytes: + v, err := arrowutils.BinaryFromStruct(bodyStruct, row, logRecordIDs.BodyBytes) + if err != nil { + return logs, werror.Wrap(err) + } + body.SetEmptyBytes().FromRaw(v) + case pcommon.ValueTypeSlice: + v, err := arrowutils.BinaryFromStruct(bodyStruct, row, logRecordIDs.BodySer) + if err != nil { + return logs, werror.Wrap(err) + } + if err = common.Deserialize(v, body); err != nil { + return logs, werror.Wrap(err) + } + case pcommon.ValueTypeMap: + v, err := arrowutils.BinaryFromStruct(bodyStruct, row, logRecordIDs.BodySer) + if err != nil { + return logs, werror.Wrap(err) + } + if err = common.Deserialize(v, body); err != nil { + return logs, werror.Wrap(err) + } + default: + // silently ignore unknown types to avoid DOS attacks } - default: - // silently ignore unknown types to avoid DOS attacks } logRecordAttrs := logRecord.Attributes() diff --git a/pkg/otel/logs/otlp/related_data.go b/pkg/otel/logs/otlp/related_data.go index a22c1921..b7c72f43 100644 --- a/pkg/otel/logs/otlp/related_data.go +++ b/pkg/otel/logs/otlp/related_data.go @@ -50,6 +50,12 @@ func (r *RelatedData) LogRecordIDFromDelta(delta uint16) uint16 { } func RelatedDataFrom(records []*record_message.RecordMessage) (relatedData *RelatedData, logsRecord *record_message.RecordMessage, err error) { + defer func() { + for _, record := range records { + record.Record().Release() + } + }() + relatedData = NewRelatedData() // Create the attribute map stores for all the attribute records. diff --git a/pkg/otel/logs/validation_test.go b/pkg/otel/logs/validation_test.go index 6abd8656..dc7ad56e 100644 --- a/pkg/otel/logs/validation_test.go +++ b/pkg/otel/logs/validation_test.go @@ -28,6 +28,7 @@ import ( "github.com/f5/otel-arrow-adapter/pkg/config" "github.com/f5/otel-arrow-adapter/pkg/datagen" "github.com/f5/otel-arrow-adapter/pkg/otel/assert" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema" "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/builder" cfg "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/config" @@ -42,19 +43,43 @@ var ( producerStats = stats.NewProducerStats() ) -// TestConversionFromSyntheticData tests the conversion of OTLP logs to Arrow and back to OTLP. -// The initial OTLP logs are generated from a synthetic dataset. -// This test is based on the JSON serialization of the initial generated OTLP logs compared to the JSON serialization -// of the OTLP logs generated from the Arrow records. -func TestConversionFromSyntheticData(t *testing.T) { +// TestLogsEncodingDecoding tests the conversion of OTLP logs to OTel Arrow logs +// and back to OTLP. The initial OTLP logs are generated from a synthetic +// dataset. +// +// The validation process is based on the JSON comparison the OTLP logs generated +// and the OTLP logs decoded from the OTel Arrow logs. This comparison is strict +// and accept differences in the order of the fields. +func TestLogsEncodingDecoding(t *testing.T) { t.Parallel() entropy := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing logsGen := datagen.NewLogsGenerator(entropy, entropy.NewStandardResourceAttributes(), entropy.NewStandardInstrumentationScopes()) - // Generate a random OTLP logs request. - expectedRequest := plogotlp.NewExportRequestFromLogs(logsGen.Generate(1, 100)) + expectedRequest := plogotlp.NewExportRequestFromLogs(logsGen.Generate(5000, 100)) + + CheckEncodeDecode(t, expectedRequest) +} + +// TestInvalidLogsDecoding is similar to TestLogsEncodingDecoding but introduces +// some random modification of the Arrow Records used to represent OTel logs. +// These modifications should be handled gracefully by the decoding process and +// generate an error but should never panic. +func TestInvalidLogsDecoding(t *testing.T) { + t.Parallel() + + entropy := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing + logsGen := datagen.NewLogsGenerator(entropy, entropy.NewStandardResourceAttributes(), entropy.NewStandardInstrumentationScopes()) + expectedRequest := plogotlp.NewExportRequestFromLogs(logsGen.Generate(100, 100)) + + MultiRoundOfCheckEncodeMessUpDecode(t, expectedRequest) +} + +func CheckEncodeDecode( + t *testing.T, + expectedRequest plogotlp.ExportRequest, +) { // Convert the OTLP logs request to Arrow. pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -62,11 +87,11 @@ func TestConversionFromSyntheticData(t *testing.T) { rBuilder := builder.NewRecordBuilderExt(pool, logsarrow.LogsSchema, DefaultDictConfig, producerStats) defer rBuilder.Release() + conf := config.DefaultConfig() + var record arrow.Record var relatedRecords []*record_message.RecordMessage - conf := config.DefaultConfig() - for { lb, err := logsarrow.NewLogsBuilder(rBuilder, logsarrow.NewConfig(conf), stats.NewProducerStats()) require.NoError(t, err) @@ -95,3 +120,65 @@ func TestConversionFromSyntheticData(t *testing.T) { assert.Equiv(t, []json.Marshaler{expectedRequest}, []json.Marshaler{plogotlp.NewExportRequestFromLogs(logs)}) } + +func MultiRoundOfCheckEncodeMessUpDecode( + t *testing.T, + expectedRequest plogotlp.ExportRequest, +) { + rng := rand.New(rand.NewSource(int64(rand.Uint64()))) + + for i := 0; i < 100; i++ { + CheckEncodeMessUpDecode(t, expectedRequest, rng) + } +} + +func CheckEncodeMessUpDecode( + t *testing.T, + expectedRequest plogotlp.ExportRequest, + rng *rand.Rand, +) { + // Convert the OTLP logs request to Arrow. + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + rBuilder := builder.NewRecordBuilderExt(pool, logsarrow.LogsSchema, DefaultDictConfig, producerStats) + defer rBuilder.Release() + + conf := config.DefaultConfig() + + var record arrow.Record + var relatedRecords []*record_message.RecordMessage + + for { + lb, err := logsarrow.NewLogsBuilder(rBuilder, logsarrow.NewConfig(conf), stats.NewProducerStats()) + require.NoError(t, err) + defer lb.Release() + + err = lb.Append(expectedRequest.Logs()) + require.NoError(t, err) + + record, err = rBuilder.NewRecord() + if err == nil { + relatedRecords, err = lb.RelatedData().BuildRecordMessages() + require.NoError(t, err) + break + } + require.Error(t, acommon.ErrSchemaNotUpToDate) + } + + // Mix up the Arrow records in such a way as to make decoding impossible. + mainRecordChanged, record, relatedRecords := common.MixUpArrowRecords(rng, record, relatedRecords) + + relatedData, _, err := logsotlp.RelatedDataFrom(relatedRecords) + + // Convert the Arrow records back to OTLP. + _, err = logsotlp.LogsFrom(record, relatedData) + + if mainRecordChanged || relatedData == nil { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + record.Release() +} diff --git a/pkg/otel/metrics/otlp/errors.go b/pkg/otel/metrics/otlp/errors.go index 7fe8634d..01333c0c 100644 --- a/pkg/otel/metrics/otlp/errors.go +++ b/pkg/otel/metrics/otlp/errors.go @@ -22,11 +22,8 @@ import ( ) var ( - ErrNotArraySparseUnion = errors.New("not an arrow array.SparseUnion") - ErrNotArrayInt32 = errors.New("not an arrow array.Int32") - ErrNotArrayUint64 = errors.New("not an arrow array.Uint64") - ErrNotArrayFloat64 = errors.New("not an arrow array.Float64") - ErrNotArrayList = errors.New("not an arrow array.List") - ErrNotArrayBoolean = errors.New("not an arrow array.Boolean") - ErrUnknownTypeCode = errors.New("unknown type code") + ErrNotArrayInt32 = errors.New("not an arrow array.Int32") + ErrNotArrayUint64 = errors.New("not an arrow array.Uint64") + ErrNotArrayFloat64 = errors.New("not an arrow array.Float64") + ErrNotArrayList = errors.New("not an arrow array.List") ) diff --git a/pkg/otel/metrics/otlp/metrics.go b/pkg/otel/metrics/otlp/metrics.go index 21597636..ba18a7b0 100644 --- a/pkg/otel/metrics/otlp/metrics.go +++ b/pkg/otel/metrics/otlp/metrics.go @@ -48,6 +48,10 @@ func MetricsFrom(record arrow.Record, relatedData *RelatedData) (pmetric.Metrics metrics := pmetric.NewMetrics() + if relatedData == nil { + return metrics, werror.Wrap(otlp.ErrMissingRelatedData) + } + metricsIDs, err := SchemaToIds(record.Schema()) if err != nil { return metrics, werror.Wrap(err) diff --git a/pkg/otel/metrics/otlp/related_data.go b/pkg/otel/metrics/otlp/related_data.go index 16394935..90bae824 100644 --- a/pkg/otel/metrics/otlp/related_data.go +++ b/pkg/otel/metrics/otlp/related_data.go @@ -82,6 +82,12 @@ func (r *RelatedData) MetricIDFromDelta(delta uint16) uint16 { } func RelatedDataFrom(records []*record_message.RecordMessage) (relatedData *RelatedData, metricsRecord *record_message.RecordMessage, err error) { + defer func() { + for _, record := range records { + record.Record().Release() + } + }() + var numberDPRec *record_message.RecordMessage var summaryDPRec *record_message.RecordMessage var histogramDPRec *record_message.RecordMessage diff --git a/pkg/otel/metrics/validation_test.go b/pkg/otel/metrics/validation_test.go index c89587c3..20e6c29b 100644 --- a/pkg/otel/metrics/validation_test.go +++ b/pkg/otel/metrics/validation_test.go @@ -28,6 +28,7 @@ import ( "github.com/f5/otel-arrow-adapter/pkg/config" "github.com/f5/otel-arrow-adapter/pkg/datagen" "github.com/f5/otel-arrow-adapter/pkg/otel/assert" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema" "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/builder" cfg "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/config" @@ -39,17 +40,26 @@ import ( var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) -// TestBackAndForthConversion tests the conversion of OTLP metrics to Arrow and back to OTLP. +// TestMetricsEncodingDecoding tests the conversion of OTLP metrics to Arrow and back to OTLP. // The initial OTLP metrics are generated from a synthetic dataset. // This test is based on the JSON serialization of the initial generated OTLP metrics compared to the JSON serialization // of the OTLP metrics generated from the Arrow records. -func TestBackAndForthConversion(t *testing.T) { +func TestMetricsEncodingDecoding(t *testing.T) { t.Parallel() metricsGen := MetricsGenerator() expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateAllKindOfMetrics(100, 100)) - GenericMetricTests(t, expectedRequest) + CheckEncodeDecode(t, expectedRequest) +} + +func TestInvalidMetricsDecoding(t *testing.T) { + t.Parallel() + + metricsGen := MetricsGenerator() + expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateAllKindOfMetrics(100, 100)) + + MultiRoundOfCheckEncodeMessUpDecode(t, expectedRequest) } func TestGauges(t *testing.T) { @@ -58,7 +68,8 @@ func TestGauges(t *testing.T) { metricsGen := MetricsGenerator() expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateGauges(100, 100)) - GenericMetricTests(t, expectedRequest) + CheckEncodeDecode(t, expectedRequest) + MultiRoundOfCheckEncodeMessUpDecode(t, expectedRequest) } func TestSums(t *testing.T) { @@ -67,7 +78,8 @@ func TestSums(t *testing.T) { metricsGen := MetricsGenerator() expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateSums(100, 100)) - GenericMetricTests(t, expectedRequest) + CheckEncodeDecode(t, expectedRequest) + MultiRoundOfCheckEncodeMessUpDecode(t, expectedRequest) } func TestSummaries(t *testing.T) { @@ -76,7 +88,8 @@ func TestSummaries(t *testing.T) { metricsGen := MetricsGenerator() expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateSummaries(100, 100)) - GenericMetricTests(t, expectedRequest) + CheckEncodeDecode(t, expectedRequest) + MultiRoundOfCheckEncodeMessUpDecode(t, expectedRequest) } func TestHistograms(t *testing.T) { @@ -85,7 +98,8 @@ func TestHistograms(t *testing.T) { metricsGen := MetricsGenerator() expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateHistograms(100, 100)) - GenericMetricTests(t, expectedRequest) + CheckEncodeDecode(t, expectedRequest) + MultiRoundOfCheckEncodeMessUpDecode(t, expectedRequest) } func TestExponentialHistograms(t *testing.T) { @@ -94,7 +108,8 @@ func TestExponentialHistograms(t *testing.T) { metricsGen := MetricsGenerator() expectedRequest := pmetricotlp.NewExportRequestFromMetrics(metricsGen.GenerateExponentialHistograms(100, 100)) - GenericMetricTests(t, expectedRequest) + CheckEncodeDecode(t, expectedRequest) + MultiRoundOfCheckEncodeMessUpDecode(t, expectedRequest) } func MetricsGenerator() *datagen.MetricsGenerator { @@ -111,7 +126,7 @@ func MetricsGenerator() *datagen.MetricsGenerator { return datagen.NewMetricsGeneratorWithDataGenerator(dg) } -func GenericMetricTests(t *testing.T, expectedRequest pmetricotlp.ExportRequest) { +func CheckEncodeDecode(t *testing.T, expectedRequest pmetricotlp.ExportRequest) { // Convert the OTLP metrics request to Arrow. pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -152,3 +167,67 @@ func GenericMetricTests(t *testing.T, expectedRequest pmetricotlp.ExportRequest) assert.Equiv(t, []json.Marshaler{expectedRequest}, []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}) } + +// MultiRoundOfCheckEncodeMessUpDecode tests the robustness of the conversion of +// OTel Arrow records to OTLP metrics. These tests should never trigger a panic. +// For every main record, and related records (if any), we mix up the Arrow +// records in order to test the robustness of the conversion. In this situation, +// the conversion can generate errors, but should never panic. +func MultiRoundOfCheckEncodeMessUpDecode(t *testing.T, expectedRequest pmetricotlp.ExportRequest) { + rng := rand.New(rand.NewSource(int64(rand.Uint64()))) + + for i := 0; i < 100; i++ { + OneRoundOfMessUpArrowRecords(t, expectedRequest, rng) + } +} + +func OneRoundOfMessUpArrowRecords(t *testing.T, expectedRequest pmetricotlp.ExportRequest, rng *rand.Rand) { + // Convert the OTLP metrics request to Arrow. + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer func() { + pool.AssertSize(t, 0) + }() + + rBuilder := builder.NewRecordBuilderExt(pool, ametrics.MetricsSchema, DefaultDictConfig, stats.NewProducerStats()) + defer func() { + rBuilder.Release() + }() + + var record arrow.Record + var relatedRecords []*record_message.RecordMessage + + conf := config.DefaultConfig() + + for { + lb, err := ametrics.NewMetricsBuilder(rBuilder, ametrics.NewConfig(conf), stats.NewProducerStats()) + require.NoError(t, err) + defer lb.Release() + + err = lb.Append(expectedRequest.Metrics()) + require.NoError(t, err) + + record, err = rBuilder.NewRecord() + if err == nil { + relatedRecords, err = lb.RelatedData().BuildRecordMessages() + require.NoError(t, err) + break + } + require.Error(t, schema.ErrSchemaNotUpToDate) + } + + // Mix up the Arrow records in such a way as to make decoding impossible. + mainRecordChanged, record, relatedRecords := common.MixUpArrowRecords(rng, record, relatedRecords) + + relatedData, _, err := otlp.RelatedDataFrom(relatedRecords) + + // Convert the Arrow records back to OTLP. + _, err = otlp.MetricsFrom(record, relatedData) + + if mainRecordChanged || relatedData == nil { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + record.Release() +} diff --git a/pkg/otel/traces/otlp/related_data.go b/pkg/otel/traces/otlp/related_data.go index 4a9f1d70..22874e31 100644 --- a/pkg/otel/traces/otlp/related_data.go +++ b/pkg/otel/traces/otlp/related_data.go @@ -59,6 +59,12 @@ func (r *RelatedData) SpanIDFromDelta(delta uint16) uint16 { } func RelatedDataFrom(records []*record_message.RecordMessage, conf *arrow.Config) (relatedData *RelatedData, tracesRecord *record_message.RecordMessage, err error) { + defer func() { + for _, record := range records { + record.Record().Release() + } + }() + var spanEventRecord *record_message.RecordMessage var spanLinkRecord *record_message.RecordMessage diff --git a/pkg/otel/traces/otlp/traces.go b/pkg/otel/traces/otlp/traces.go index a5f1cae3..3899b4cd 100644 --- a/pkg/otel/traces/otlp/traces.go +++ b/pkg/otel/traces/otlp/traces.go @@ -66,6 +66,10 @@ func TracesFrom(record arrow.Record, relatedData *RelatedData) (ptrace.Traces, e traces := ptrace.NewTraces() + if relatedData == nil { + return traces, werror.Wrap(otlp.ErrMissingRelatedData) + } + traceIDs, err := SchemaToIds(record.Schema()) if err != nil { return traces, err diff --git a/pkg/otel/traces/validation_test.go b/pkg/otel/traces/validation_test.go index 71e4915a..468f50d8 100644 --- a/pkg/otel/traces/validation_test.go +++ b/pkg/otel/traces/validation_test.go @@ -29,6 +29,7 @@ import ( "github.com/f5/otel-arrow-adapter/pkg/config" "github.com/f5/otel-arrow-adapter/pkg/datagen" "github.com/f5/otel-arrow-adapter/pkg/otel/assert" + "github.com/f5/otel-arrow-adapter/pkg/otel/common" acommon "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema" "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/builder" cfg "github.com/f5/otel-arrow-adapter/pkg/otel/common/schema/config" @@ -41,11 +42,32 @@ import ( var DefaultDictConfig = cfg.NewDictionary(math.MaxUint16) var ProducerStats = stats.NewProducerStats() -// TestConversionFromSyntheticData tests the conversion of OTLP traces to Arrow and back to OTLP. -// The initial OTLP traces are generated from a synthetic dataset. -// This test is based on the JSON serialization of the initial generated OTLP traces compared to the JSON serialization -// of the OTLP traces generated from the Arrow records. -func TestConversionFromSyntheticData(t *testing.T) { +// TestTracesEncodingDecoding tests the conversion of OTLP traces to OTel Arrow traces +// and back to OTLP. The initial OTLP traces are generated from a synthetic +// dataset. +// +// The validation process is based on the JSON comparison the OTLP traces generated +// and the OTLP traces decoded from the OTel Arrow traces. This comparison is strict +// and accept differences in the order of the fields. +func TestTracesEncodingDecoding(t *testing.T) { + t.Parallel() + + entropy := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing + + tracesGen := datagen.NewTracesGenerator(entropy, entropy.NewStandardResourceAttributes(), entropy.NewStandardInstrumentationScopes()) + + expectedRequest := ptraceotlp.NewExportRequestFromTraces(tracesGen.Generate(100, 100)) + CheckEncodeDecode(t, expectedRequest) + + expectedRequest = ptraceotlp.NewExportRequestFromTraces(tracesGen.GenerateRandomTraces(100, 100)) + CheckEncodeDecode(t, expectedRequest) +} + +// TestInvalidTracesDecoding is similar to TestLogsEncodingDecoding but introduces +// some random modification of the Arrow Records used to represent OTel traces. +// These modifications should be handled gracefully by the decoding process and +// generate an error but should never panic. +func TestInvalidTracesDecoding(t *testing.T) { t.Parallel() entropy := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing @@ -53,8 +75,15 @@ func TestConversionFromSyntheticData(t *testing.T) { tracesGen := datagen.NewTracesGenerator(entropy, entropy.NewStandardResourceAttributes(), entropy.NewStandardInstrumentationScopes()) // Generate a random OTLP traces request. - expectedRequest := ptraceotlp.NewExportRequestFromTraces(tracesGen.Generate(1, 100)) + expectedRequest := ptraceotlp.NewExportRequestFromTraces(tracesGen.GenerateRandomTraces(2000, 100)) + MultiRoundOfCheckEncodeMessUpDecode(t, expectedRequest) +} + +func CheckEncodeDecode( + t *testing.T, + expectedRequest ptraceotlp.ExportRequest, +) { // Convert the OTLP traces request to Arrow. pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer pool.AssertSize(t, 0) @@ -96,6 +125,68 @@ func TestConversionFromSyntheticData(t *testing.T) { assert.Equiv(t, []json.Marshaler{expectedRequest}, []json.Marshaler{ptraceotlp.NewExportRequestFromTraces(traces)}) } +func MultiRoundOfCheckEncodeMessUpDecode( + t *testing.T, + expectedRequest ptraceotlp.ExportRequest, +) { + rng := rand.New(rand.NewSource(int64(rand.Uint64()))) + + for i := 0; i < 100; i++ { + CheckEncodeMessUpDecode(t, expectedRequest, rng) + } +} + +func CheckEncodeMessUpDecode( + t *testing.T, + expectedRequest ptraceotlp.ExportRequest, + rng *rand.Rand, +) { + // Convert the OTLP traces request to Arrow. + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + rBuilder := builder.NewRecordBuilderExt(pool, tracesarrow.TracesSchema, DefaultDictConfig, ProducerStats) + defer rBuilder.Release() + + var record arrow.Record + var relatedRecords []*record_message.RecordMessage + + conf := config.DefaultConfig() + + for { + tb, err := tracesarrow.NewTracesBuilder(rBuilder, tracesarrow.NewConfig(conf), stats.NewProducerStats()) + require.NoError(t, err) + defer tb.Release() + + err = tb.Append(expectedRequest.Traces()) + require.NoError(t, err) + + record, err = rBuilder.NewRecord() + if err == nil { + relatedRecords, err = tb.RelatedData().BuildRecordMessages() + require.NoError(t, err) + break + } + require.Error(t, acommon.ErrSchemaNotUpToDate) + } + + // Mix up the Arrow records in such a way as to make decoding impossible. + mainRecordChanged, record, relatedRecords := common.MixUpArrowRecords(rng, record, relatedRecords) + + relatedData, _, err := tracesotlp.RelatedDataFrom(relatedRecords, tracesarrow.NewConfig(conf)) + + // Convert the Arrow record back to OTLP. + _, err = tracesotlp.TracesFrom(record, relatedData) + + if mainRecordChanged || relatedData == nil { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + record.Release() +} + // TestConversionFromRealData tests the conversion of OTLP traces to Arrow and back to OTLP. // The initial OTLP traces are generated from a real dataset (anonymized). // This test is based on the JSON serialization of the initial generated OTLP traces compared to the JSON serialization @@ -129,10 +220,10 @@ func checkTracesConversion(t *testing.T, expectedRequest ptraceotlp.ExportReques var record arrow.Record var relatedRecords []*record_message.RecordMessage - cfg := config.DefaultConfig() + conf := config.DefaultConfig() for { - tb, err := tracesarrow.NewTracesBuilder(rBuilder, tracesarrow.NewConfig(cfg), stats.NewProducerStats()) + tb, err := tracesarrow.NewTracesBuilder(rBuilder, tracesarrow.NewConfig(conf), stats.NewProducerStats()) require.NoError(t, err) err = tb.Append(expectedRequest.Traces()) require.NoError(t, err) @@ -145,7 +236,7 @@ func checkTracesConversion(t *testing.T, expectedRequest ptraceotlp.ExportReques require.Error(t, acommon.ErrSchemaNotUpToDate) } - relatedData, _, err := tracesotlp.RelatedDataFrom(relatedRecords, tracesarrow.NewConfig(cfg)) + relatedData, _, err := tracesotlp.RelatedDataFrom(relatedRecords, tracesarrow.NewConfig(conf)) require.NoError(t, err) // Convert the Arrow records back to OTLP. diff --git a/pkg/record_message/arrow_record.go b/pkg/record_message/arrow_record.go index ea288da1..275db0e7 100644 --- a/pkg/record_message/arrow_record.go +++ b/pkg/record_message/arrow_record.go @@ -98,11 +98,19 @@ func (rm *RecordMessage) Record() arrow.Record { return rm.record } +func (rm *RecordMessage) SetRecord(record arrow.Record) { + rm.record = record +} + // PayloadType returns the type of payload contained in this RecordMessage. func (rm *RecordMessage) PayloadType() PayloadType { return rm.payloadType } +func (rm *RecordMessage) SetPayloadType(payloadType PayloadType) { + rm.payloadType = payloadType +} + func (rm *RecordMessage) ShowStats() { schema := rm.record.Schema() columns := rm.record.Columns()