Skip to content

Commit

Permalink
Robustify OTel Arrow decoding (#175)
Browse files Browse the repository at this point in the history
* remove unused code

* Add pseudo "fuzz testing" to mess up with Arrow records in order to robustify OTel Arrow decoding

* Add pseudo "fuzz testing" to mess up with Log Arrow records in order to robustify OTel Arrow decoding

* Add pseudo "fuzz testing" to mess up with Trace Arrow records in order to robustify OTel Arrow decoding

* Improve code documentation

* Fix issue with logs having an empty body

* Fix issue with logs having an empty body

* Test invalid traces decoding

* Add documentation on the validation process

* Update documentation on the validation process

* Remove unused code
  • Loading branch information
lquerel authored Jun 12, 2023
1 parent 445aac7 commit 534d56b
Show file tree
Hide file tree
Showing 31 changed files with 1,155 additions and 1,108 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ as the end-to-end performance between telemetry data producers and receivers.
**This package is still experimental and subject to change.** It is currently used by an [experimental OTLP/Arrow gRPC
exporter and receiver](https://github.com/open-telemetry/experimental-arrow-collector).

Other important links:
- [OTEP/Specification](https://github.com/lquerel/oteps/blob/main/text/0156-columnar-encoding.md) describing the
motivations, the protocol, the schemas, the benchmark results and the different phases of this project. The OTEP is
still [pending, unmerged](https://github.com/open-telemetry/oteps/pull/171).
- [Donation proposal](https://github.com/open-telemetry/community/issues/1332) - approved, but repo not yet transferred in OTel org.
- [Arrow schemas](docs/data_model.md) used by this package.
- [Project Roadmap](https://github.com/f5/otel-arrow-adapter/milestones?direction=asc&sort=due_date&state=open) and [project Board](https://github.com/orgs/f5/projects/1/views/2) describing the current state of the project.
Important links:
- [OTEP](https://github.com/lquerel/oteps/blob/main/text/0156-columnar-encoding.md) - protocol specification
(status: [pending, unmerged](https://github.com/open-telemetry/oteps/pull/171)).
- [Donation](https://github.com/open-telemetry/community/issues/1332) - approved, but repo not yet transferred in OTel org.
- [Arrow Data Model](docs/data_model.md) - Mapping OTLP entities to Arrow Schemas.
- [Benchmark results](docs/benchmarks.md) - Based on synthetic and production data.
- [Validation process](docs/validation_process.md) - Encoding/Decoding validation process.
- [Slides](https://docs.google.com/presentation/d/12uLXmMWNelAyAiKFYMR0i7E7N4dPhzBi2_HLshFOLak/edit?usp=sharing) (01/30/2023 Maintainers meeting).

## Benchmark summary
Expand Down
45 changes: 45 additions & 0 deletions docs/validation_process.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Encoding/Decoding Validation Process

This document describes the validation process employed to confirm the correctness
and resilience of encoding and decoding process of OTLP entities to/from OTel Arrow
entities. The OTel Arrow protocol is designed to enhance the transport efficiency
of ALL forms of telemetry data, including metrics, logs, and traces, in a seamless
manner. The complexity and dynamic nature of the OTLP entities make it challenging
to validate the encoding/decoding process with traditional methods (traditional
QA process, or unit tests). The approach followed by this project offers a more
systematic and robust method.

## Automatic generation of metrics, logs, and traces

In this step we generate a large amount of random metrics, logs, and traces having
different characteristics in terms of shape, size, and content. The goal is to
test the capacity of the encoding process to handle a wide range of diversity in
the data.

Mandatory fields will not be generated systematically in order to test the
robustness of the encoding/decoding process (the original Protobuf encoding
doesn't enforce mandatory fields).

## Generic comparison of OTLP entities before and after encoding/decoding

OTLP entities are serialized in their JSON representation before encoding and
after decoding. These two JSON representations must be logically equivalent.
This comparison is implemented by a dedicated function that recursively traverses
the 2 JSON trees and compares the values of the fields taking into account that
the fields are not ordered and that the batching process may have changed the
internal organization of the data.

## Decoding of invalid data

A generic process has been implemented to inject specific errors and data changes
in the encoded data. For example, the existing process keep the Arrow records
but randomly change their payload types. The goal is to test the resilience of
the decoding process to invalid data. The decoding layer must be able to handle
any invalid data and return appropriate error messages without crashing.

## Capturing and Replaying production data

A new version of the OTel file exporter has been implemented to capture OTLP
traffic in a generic JSON format (with ZSTD compression). A set of tools have
been developed to replay this data, convert it to OTel Arrow, validate the
encoding/decoding process, and assess the compression and end-to-end performance.
12 changes: 9 additions & 3 deletions pkg/arrow/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var (

// ErrNotStructType is returned when an array is not of type Struct.
ErrNotStructType = errors.New("not arrow.StructType")
// ErrNotListOfStructsType is returned when an array is not of type List of structs.
// ErrNotListOfStructsType is returned when an array is not of type List of
// structs.
ErrNotListOfStructsType = errors.New("not arrow.ListType of arrow.StructType")
// ErrNotListType is returned when an array is not of type list.
ErrNotListType = errors.New("not an arrow.ListType")
Expand All @@ -34,9 +35,14 @@ var (
ErrNotArrayStruct = errors.New("not an arrow array.Struct")
// ErrNotArrayList is returned when an array is not an array.List.
ErrNotArrayList = errors.New("not an arrow array.List")
// ErrNotArrayListOfStructs is returned when an array is not an array.List of array.Struct.
// ErrNotArrayListOfStructs is returned when an array is not an array.List
// of array.Struct.
ErrNotArrayListOfStructs = errors.New("not an Arrow array.List of array.Struct")

// ErrDuplicateFieldName is returned when a field name is duplicated in the same struct.
// ErrDuplicateFieldName is returned when a field name is duplicated in the
// same struct.
ErrDuplicateFieldName = errors.New("duplicate field name")

// ErrMissingFieldName is returned when a field name is missing in a struct.
ErrMissingFieldName = errors.New("missing field name")
)
13 changes: 13 additions & 0 deletions pkg/arrow/from_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ func FieldIDFromSchema(schema *arrow.Schema, fieldName string) (int, error) {
return ids[0], nil
}

// MandatoryFieldIDFromSchema returns the field id of a field from an Arrow
// schema or an error if the field is not present or duplicated.
func MandatoryFieldIDFromSchema(schema *arrow.Schema, fieldName string) (int, error) {
ids := schema.FieldIndices(fieldName)
if len(ids) == 0 {
return 0, werror.WrapWithContext(ErrMissingFieldName, map[string]interface{}{"fieldName": fieldName})
}
if len(ids) > 1 {
return 0, werror.WrapWithContext(ErrDuplicateFieldName, map[string]interface{}{"fieldName": fieldName})
}
return ids[0], nil
}

// StructFieldIDFromSchema returns the field id of a struct
// field from an Arrow schema or AbsentFieldID for an unknown field.
//
Expand Down
39 changes: 39 additions & 0 deletions pkg/datagen/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package datagen

import (
"fmt"

"github.com/brianvoe/gofakeit/v6"
"go.opentelemetry.io/collector/pdata/pcommon"
)

Expand Down Expand Up @@ -42,6 +45,42 @@ func (te TestEntropy) shuffleAttrs(fs ...func(Attrs)) pcommon.Map {
return attrs
}

// RandomAttributes returns a random set of attributes. The number of attributes
// is random [0,10). The type and value of each attribute is also random.
func (te TestEntropy) RandomAttributes() pcommon.Map {
count := te.rng.Intn(10)

attrs := pcommon.NewMap()

for i := 0; i < count; i++ {
switch te.rng.Intn(3) {
case 0:
attrs.PutStr(fmt.Sprintf("attr_%d", i), gofakeit.LoremIpsumWord())
case 1:
attrs.PutInt(fmt.Sprintf("attr_%d", i), te.rng.Int63())
case 2:
attrs.PutDouble(fmt.Sprintf("attr_%d", i), te.rng.Float64())
case 3:
attrs.PutBool(fmt.Sprintf("attr_%d", i), te.rng.Intn(2) == 0)
case 4:
attrs.PutEmpty(fmt.Sprintf("attr_%d", i))
case 5:
attrs.PutEmptyBytes(fmt.Sprintf("attr_%d", i)).FromRaw([]byte(gofakeit.LoremIpsumWord()))
case 6:
vMap := attrs.PutEmptyMap(fmt.Sprintf("attr_%d", i))
vMap.PutInt("int", te.rng.Int63())
vMap.PutStr("str", gofakeit.LoremIpsumWord())
case 7:
vSlice := attrs.PutEmptySlice(fmt.Sprintf("attr_%d", i))
vSlice.AppendEmpty().SetBool(te.rng.Intn(2) == 0)
vSlice.AppendEmpty().SetDouble(te.rng.Float64())
vSlice.AppendEmpty().SetInt(te.rng.Int63())
}
}

return attrs
}

func (te TestEntropy) NewStandardAttributes() pcommon.Map {
return te.shuffleAttrs(
func(attrs Attrs) { attrs.PutStr("hostname", pick(te, HOSTNAMES)) },
Expand Down
80 changes: 80 additions & 0 deletions pkg/datagen/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (lg *LogsGenerator) Generate(batchSize int, collectInterval time.Duration)
lg.LogWarnRecord(logRecords.AppendEmpty())
lg.LogErrorRecord(logRecords.AppendEmpty())
//lg.LogInfoComplexRecord(logRecords.AppendEmpty())
lg.RandomLogRecord(logRecords.AppendEmpty())
}

return result
Expand Down Expand Up @@ -109,3 +110,82 @@ func (dg *DataGenerator) complexLogRecord(log plog.LogRecord, sev plog.SeverityN
log.SetTraceID(dg.Id16Bytes())
log.SetSpanID(dg.Id8Bytes())
}

// RandomLogRecord generates a random log record. The list of fields set is random.
// The value of these fields is also random.
func (dg *DataGenerator) RandomLogRecord(log plog.LogRecord) {
if dg.GenBool() {
log.SetTimestamp(dg.CurrentTime())
}
if dg.GenBool() {
log.SetObservedTimestamp(dg.CurrentTime())
}
if dg.GenBool() {
log.SetSeverityNumber(plog.SeverityNumber(gofakeit.Number(0, 4)))
}
if dg.GenBool() {
log.SetSeverityText(gofakeit.LetterN(4))
}
if dg.GenBool() {
dg.RandomBody(log.Body())
}
if dg.GenBool() {
dg.RandomAttributes().CopyTo(log.Attributes())
}
if dg.GenBool() {
log.SetTraceID(dg.Id16Bytes())
}
if dg.GenBool() {
log.SetSpanID(dg.Id8Bytes())
}
if dg.GenBool() {
log.SetDroppedAttributesCount(uint32(gofakeit.Number(0, 1000)))
}
if dg.GenBool() {
log.SetFlags(plog.LogRecordFlags(gofakeit.Number(0, 1000)))
}
}

func (dg *DataGenerator) RandomBody(body pcommon.Value) {
switch dg.GenI64Range(0, 11) {
case 0:
// Body with a random string
body.SetStr(gofakeit.LoremIpsumSentence(20))
case 1:
// Body an empty string
body.SetStr("")
case 2:
// Empty body
case 3:
// Body with a random int
body.SetInt(gofakeit.Int64())
case 4:
// Body with a random double
body.SetDouble(gofakeit.Float64())
case 5:
// Body with a random bool
body.SetBool(gofakeit.Bool())
case 6:
// Body with a slice of random bytes
body.SetEmptyBytes().FromRaw(dg.GenId(10))
case 7:
// Body with an empty slice of bytes
body.SetEmptyBytes()
case 8:
// Body with a random map
bodyMap := body.SetEmptyMap()
bodyMap.PutStr("attr1", gofakeit.LoremIpsumSentence(10))
bodyMap.PutInt("attr2", 1)
case 9:
// Body with an empty map
body.SetEmptyMap()
case 10:
// Body with a random slice
bodySlice := body.SetEmptySlice()
bodySlice.AppendEmpty().SetStr(gofakeit.LoremIpsumSentence(10))
bodySlice.AppendEmpty().SetInt(gofakeit.Int64())
case 11:
// Body with an empty slice
body.SetEmptySlice()
}
}
Loading

0 comments on commit 534d56b

Please sign in to comment.