Skip to content

Commit

Permalink
feat: migrate handler to modelpb and prepare for decoder migration (#62)
Browse files Browse the repository at this point in the history
* feat: migrate handler to modelpb and prepare for decoder migration

* Replace reflection-based struct assertions

* fix: nil check trace id in rumv3 decoder

---------

Co-authored-by: Andrew Wilkins <axw@elastic.co>
  • Loading branch information
kruskall and axw authored Jun 15, 2023
1 parent 1163d40 commit 5cae1b2
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

// Values used for populating the model structs
Expand All @@ -44,6 +46,7 @@ type Values struct {
HTTPHeader http.Header
LabelVal model.LabelValue
NumericLabelVal model.NumericLabelValue
MetricType modelpb.MetricType
// N controls how many elements are added to a slice or a map
N int
}
Expand Down Expand Up @@ -129,7 +132,7 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti
switch fKind := f.Kind(); fKind {
case reflect.String:
fieldVal = reflect.ValueOf(values.Str)
case reflect.Int, reflect.Int64:
case reflect.Int, reflect.Int32, reflect.Int64:
fieldVal = reflect.ValueOf(values.Int).Convert(f.Type())
case reflect.Float64:
fieldVal = reflect.ValueOf(values.Float).Convert(f.Type())
Expand Down Expand Up @@ -301,6 +304,8 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
newVal = &val
case *int:
newVal = &values.Int
case int32:
newVal = int32(values.Int)
case uint32:
newVal = uint32(values.Int)
case *uint32:
Expand All @@ -319,10 +324,12 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
newVal = &values.Bool
case http.Header:
newVal = values.HTTPHeader
case time.Time:
newVal = values.Time
case *timestamppb.Timestamp:
newVal = timestamppb.New(values.Time)
case time.Duration:
newVal = values.Duration
case modelpb.MetricType:
newVal = values
default:
// the populator recursively iterates over struct and structPtr
// calling this function for all fields;
Expand Down Expand Up @@ -356,6 +363,9 @@ func IterateStruct(i interface{}, fn func(reflect.Value, string)) {
}

func iterateStruct(v reflect.Value, key string, fn func(f reflect.Value, fKey string)) {
if v.Type().Kind() == reflect.Ptr {
v = v.Elem()
}
t := v.Type()
if t.Kind() != reflect.Struct {
panic(fmt.Sprintf("iterateStruct: invalid type %s", t.Kind()))
Expand Down
28 changes: 21 additions & 7 deletions input/elasticapm/internal/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/modeldecoderutil"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

var (
Expand Down Expand Up @@ -95,7 +96,7 @@ func DecodeNestedMetadata(d decoder.Decoder, out *model.APMEvent) error {
// DecodeNestedError decodes an error from d, appending it to batch.
//
// DecodeNestedError should be used when the stream in the decoder contains the `error` key
func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *model.Batch) error {
func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchErrorRoot()
defer releaseErrorRoot(root)
if err := d.Decode(root); err != nil && err != io.EOF {
Expand All @@ -106,15 +107,17 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *mode
}
event := input.Base
mapToErrorModel(&root.Error, &event)
*batch = append(*batch, event)
var out modelpb.APMEvent
event.ToModelProtobuf(&out)
*batch = append(*batch, &out)
return nil
}

// DecodeNestedTransaction a transaction and zero or more nested spans and
// metricsets, appending them to batch.
//
// DecodeNestedTransaction should be used when the decoder contains the `transaction` key
func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *model.Batch) error {
func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchTransactionRoot()
defer releaseTransactionRoot(root)
if err := d.Decode(root); err != nil && err != io.EOF {
Expand All @@ -126,7 +129,9 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch

transaction := input.Base
mapToTransactionModel(&root.Transaction, &transaction)
*batch = append(*batch, transaction)
var out modelpb.APMEvent
transaction.ToModelProtobuf(&out)
*batch = append(*batch, &out)

for _, m := range root.Transaction.Metricsets {
event := input.Base
Expand All @@ -135,7 +140,9 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
Type: transaction.Transaction.Type,
}
if mapToTransactionMetricsetModel(&m, &event) {
*batch = append(*batch, event)
var out modelpb.APMEvent
event.ToModelProtobuf(&out)
*batch = append(*batch, &out)
}
}

Expand All @@ -146,12 +153,19 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
event.Transaction = &model.Transaction{ID: transaction.Transaction.ID}
event.Parent.ID = transaction.Transaction.ID // may be overridden later
event.Trace = transaction.Trace
*batch = append(*batch, event)
var out modelpb.APMEvent
event.ToModelProtobuf(&out)
*batch = append(*batch, &out)
}
spans := (*batch)[offset:]
for i, s := range root.Transaction.Spans {
if s.ParentIndex.IsSet() && s.ParentIndex.Val >= 0 && s.ParentIndex.Val < len(spans) {
spans[i].Parent.ID = spans[s.ParentIndex.Val].Span.ID
if e := spans[s.ParentIndex.Val]; e != nil {
if spans[i].Parent == nil {
spans[i].Parent = &modelpb.Parent{}
}
spans[i].Parent.Id = e.Span.Id
}
}
}
return nil
Expand Down
24 changes: 16 additions & 8 deletions input/elasticapm/internal/modeldecoder/rumv3/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/modeldecodertest"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetErrorOnRelease(t *testing.T) {
Expand All @@ -43,27 +46,32 @@ func TestResetErrorOnRelease(t *testing.T) {

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now()
now := time.Now().UTC()
eventBase := initializedMetadata()
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"e":{"id":"a-b-c","timestamp":1599996822281000,"log":{"mg":"abc"}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch model.Batch
var batch modelpb.Batch
require.NoError(t, DecodeNestedError(dec, &input, &batch))
require.Len(t, batch, 1)
require.NotNil(t, batch[0].Error)
defaultValues := modeldecodertest.DefaultValues()
defaultValues.Update(time.Unix(1599996822, 281000000).UTC())
modeldecodertest.AssertStructValues(t, &batch[0], metadataExceptions(), defaultValues)
assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime())
assert.Empty(t, cmp.Diff(&modelpb.Error{
Id: "a-b-c",
Log: &modelpb.ErrorLog{
Message: "abc",
LoggerName: "default",
},
}, batch[0].Error, protocmp.Transform()))

// if no timestamp is provided, leave base event timestamp unmodified
input = modeldecoder.Input{Base: eventBase}
str = `{"e":{"id":"a-b-c","log":{"mg":"abc"}}}`
dec = decoder.NewJSONDecoder(strings.NewReader(str))
batch = model.Batch{}
batch = modelpb.Batch{}
require.NoError(t, DecodeNestedError(dec, &input, &batch))
assert.Equal(t, now, batch[0].Timestamp)
assert.Equal(t, now, batch[0].Timestamp.AsTime())

// test decode
err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
Expand All @@ -72,7 +80,7 @@ func TestDecodeNestedError(t *testing.T) {
})

t.Run("validate", func(t *testing.T) {
var batch model.Batch
var batch modelpb.Batch
err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &batch)
require.Error(t, err)
assert.Contains(t, err.Error(), "validation")
Expand Down
107 changes: 54 additions & 53 deletions input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/modeldecodertest"
"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetTransactionOnRelease(t *testing.T) {
Expand All @@ -44,13 +47,13 @@ func TestResetTransactionOnRelease(t *testing.T) {

func TestDecodeNestedTransaction(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now()
now := time.Now().UTC()
eventBase := initializedMetadata()
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"x":{"n":"tr-a","d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"y":[{"n":"a","d":10,"t":"http","id":"123","s":20}],"me":[{"sa":{"ysc":{"v":5}},"y":{"t":"span_type","su":"span_subtype"}}]}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch model.Batch
var batch modelpb.Batch
require.NoError(t, DecodeNestedTransaction(dec, &input, &batch))
require.Len(t, batch, 3) // 1 transaction, 1 metricset, 1 span
require.NotNil(t, batch[0].Transaction)
Expand All @@ -59,38 +62,30 @@ func TestDecodeNestedTransaction(t *testing.T) {

assert.Equal(t, "request", batch[0].Transaction.Type)
// fall back to request time
assert.Equal(t, now, batch[0].Timestamp)
assert.Equal(t, now, batch[0].Timestamp.AsTime())

// Ensure nested metricsets are decoded. RUMv3 only sends
// breakdown metrics, so the Metricsets will be empty and
// metrics will be recorded on the Transaction and Span
// fields.
assert.Equal(t, &model.Metricset{}, batch[1].Metricset)
assert.Equal(t, &model.Transaction{
assert.Empty(t, cmp.Diff(&modelpb.Metricset{}, batch[1].Metricset, protocmp.Transform()))
assert.Empty(t, cmp.Diff(&modelpb.Transaction{
Name: "tr-a",
Type: "request",
}, batch[1].Transaction)
assert.Equal(t, &model.Span{
}, batch[1].Transaction, protocmp.Transform()))
assert.Empty(t, cmp.Diff(&modelpb.Span{
Type: "span_type",
Subtype: "span_subtype",
SelfTime: model.AggregatedDuration{Count: 5},
}, batch[1].Span)
assert.Equal(t, now, batch[1].Timestamp)
SelfTime: &modelpb.AggregatedDuration{Count: 5},
}, batch[1].Span, protocmp.Transform()))
assert.Equal(t, now, batch[1].Timestamp.AsTime())

// ensure nested spans are decoded
start := time.Duration(20 * 1000 * 1000)
assert.Equal(t, now.Add(start), batch[2].Timestamp) // add start to timestamp
assert.Equal(t, "100", batch[2].Transaction.ID)
assert.Equal(t, "1", batch[2].Trace.ID)
assert.Equal(t, "100", batch[2].Parent.ID)

for _, event := range batch {
modeldecodertest.AssertStructValues(
t, &event,
metadataExceptions("Timestamp"), // timestamp checked above
modeldecodertest.DefaultValues(),
)
}
assert.Equal(t, now.Add(start), batch[2].Timestamp.AsTime()) // add start to timestamp
assert.Equal(t, "100", batch[2].Transaction.Id)
assert.Equal(t, "1", batch[2].Trace.Id)
assert.Equal(t, "100", batch[2].Parent.Id)

err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
require.Error(t, err)
Expand All @@ -103,46 +98,52 @@ func TestDecodeNestedTransaction(t *testing.T) {
input := modeldecoder.Input{Base: eventBase}
str := `{"x":{"d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"k":{"a":{"dc":0.1,"di":0.2,"ds":0.3,"de":0.4,"fb":0.5,"fp":0.6,"lp":0.7,"long":0.8},"nt":{"fs":0.1,"ls":0.2,"le":0.3,"cs":0.4,"ce":0.5,"qs":0.6,"rs":0.7,"re":0.8,"dl":0.9,"di":0.11,"ds":0.21,"de":0.31,"dc":0.41,"es":0.51,"ee":6,"long":0.99},"long":{"long":0.1}}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch model.Batch
var batch modelpb.Batch
require.NoError(t, DecodeNestedTransaction(dec, &input, &batch))
marks := model.TransactionMarks{
"agent": map[string]float64{
"domComplete": 0.1,
"domInteractive": 0.2,
"domContentLoadedEventStart": 0.3,
"domContentLoadedEventEnd": 0.4,
"timeToFirstByte": 0.5,
"firstContentfulPaint": 0.6,
"largestContentfulPaint": 0.7,
"long": 0.8,
marks := map[string]*modelpb.TransactionMark{
"agent": {
Measurements: map[string]float64{
"domComplete": 0.1,
"domInteractive": 0.2,
"domContentLoadedEventStart": 0.3,
"domContentLoadedEventEnd": 0.4,
"timeToFirstByte": 0.5,
"firstContentfulPaint": 0.6,
"largestContentfulPaint": 0.7,
"long": 0.8,
},
},
"navigationTiming": map[string]float64{
"fetchStart": 0.1,
"domainLookupStart": 0.2,
"domainLookupEnd": 0.3,
"connectStart": 0.4,
"connectEnd": 0.5,
"requestStart": 0.6,
"responseStart": 0.7,
"responseEnd": 0.8,
"domLoading": 0.9,
"domInteractive": 0.11,
"domContentLoadedEventStart": 0.21,
"domContentLoadedEventEnd": 0.31,
"domComplete": 0.41,
"loadEventStart": 0.51,
"loadEventEnd": 6,
"long": 0.99,
"navigationTiming": {
Measurements: map[string]float64{
"fetchStart": 0.1,
"domainLookupStart": 0.2,
"domainLookupEnd": 0.3,
"connectStart": 0.4,
"connectEnd": 0.5,
"requestStart": 0.6,
"responseStart": 0.7,
"responseEnd": 0.8,
"domLoading": 0.9,
"domInteractive": 0.11,
"domContentLoadedEventStart": 0.21,
"domContentLoadedEventEnd": 0.31,
"domComplete": 0.41,
"loadEventStart": 0.51,
"loadEventEnd": 6,
"long": 0.99,
},
},
"long": map[string]float64{
"long": 0.1,
"long": {
Measurements: map[string]float64{
"long": 0.1,
},
},
}
assert.Equal(t, marks, batch[0].Transaction.Marks)
})

t.Run("validate", func(t *testing.T) {
var batch model.Batch
var batch modelpb.Batch
err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &batch)
require.Error(t, err)
assert.Contains(t, err.Error(), "validation")
Expand Down
Loading

0 comments on commit 5cae1b2

Please sign in to comment.