Skip to content

Commit

Permalink
feat: migrate decoders to protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
kruskall committed Jun 16, 2023
1 parent 5cae1b2 commit 4a4acc4
Show file tree
Hide file tree
Showing 17 changed files with 1,136 additions and 1,504 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,54 +36,64 @@ import (

// Values used for populating the model structs
type Values struct {
Str string
Int int
Float float64
Bool bool
Time time.Time
Duration time.Duration
IP netip.Addr
HTTPHeader http.Header
LabelVal model.LabelValue
NumericLabelVal model.NumericLabelValue
MetricType modelpb.MetricType
Str string
Int int
Float float64
Bool bool
Time time.Time
Duration time.Duration
IP netip.Addr
HTTPHeader http.Header
LabelVal model.LabelValue
NumericLabelVal model.NumericLabelValue
MetricType modelpb.MetricType
CompressionStrategy modelpb.CompressionStrategy
// N controls how many elements are added to a slice or a map
N int
}

var compressionStrategyText = map[modelpb.CompressionStrategy]string{
modelpb.CompressionStrategy_COMPRESSION_STRATEGY_EXACT_MATCH: "exact_match",
modelpb.CompressionStrategy_COMPRESSION_STRATEGY_SAME_KIND: "same_kind",
}

// DefaultValues returns a Values struct initialized with non-zero values
func DefaultValues() *Values {
initTime, _ := time.Parse(time.RFC3339, "2020-10-10T10:00:00Z")
return &Values{
Str: "init",
Int: 1,
Float: 0.5,
Bool: true,
Time: initTime,
Duration: time.Second,
IP: netip.MustParseAddr("127.0.0.1"),
HTTPHeader: http.Header{http.CanonicalHeaderKey("user-agent"): []string{"a", "b", "c"}},
LabelVal: model.LabelValue{Value: "init"},
NumericLabelVal: model.NumericLabelValue{Value: 0.5},
N: 3,
Str: "init",
Int: 1,
Float: 0.5,
Bool: true,
Time: initTime,
Duration: time.Second,
IP: netip.MustParseAddr("127.0.0.1"),
HTTPHeader: http.Header{http.CanonicalHeaderKey("user-agent"): []string{"a", "b", "c"}},
LabelVal: model.LabelValue{Value: "init"},
NumericLabelVal: model.NumericLabelValue{Value: 0.5},
MetricType: modelpb.MetricType_METRIC_TYPE_COUNTER,
CompressionStrategy: modelpb.CompressionStrategy_COMPRESSION_STRATEGY_EXACT_MATCH,
N: 3,
}
}

// NonDefaultValues returns a Values struct initialized with non-zero values
func NonDefaultValues() *Values {
updatedTime, _ := time.Parse(time.RFC3339, "2020-12-10T10:00:00Z")
return &Values{
Str: "overwritten",
Int: 12,
Float: 3.5,
Bool: false,
Time: updatedTime,
Duration: time.Minute,
IP: netip.MustParseAddr("192.168.0.1"),
HTTPHeader: http.Header{http.CanonicalHeaderKey("user-agent"): []string{"d", "e"}},
LabelVal: model.LabelValue{Value: "overwritten"},
NumericLabelVal: model.NumericLabelValue{Value: 3.5},
N: 2,
Str: "overwritten",
Int: 12,
Float: 3.5,
Bool: false,
Time: updatedTime,
Duration: time.Minute,
IP: netip.MustParseAddr("192.168.0.1"),
HTTPHeader: http.Header{http.CanonicalHeaderKey("user-agent"): []string{"d", "e"}},
LabelVal: model.LabelValue{Value: "overwritten"},
NumericLabelVal: model.NumericLabelValue{Value: 3.5},
MetricType: modelpb.MetricType_METRIC_TYPE_GAUGE,
CompressionStrategy: modelpb.CompressionStrategy_COMPRESSION_STRATEGY_SAME_KIND,
N: 2,
}
}

Expand Down Expand Up @@ -185,8 +195,13 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti
case reflect.Struct:
switch v := f.Interface().(type) {
case nullable.String:
v.Set(values.Str)
if key == "composite.compression_strategy" {
v.Set(compressionStrategyText[values.CompressionStrategy])
} else {
v.Set(values.Str)
}
fieldVal = reflect.ValueOf(v)

case nullable.Int:
v.Set(values.Int)
fieldVal = reflect.ValueOf(v)
Expand Down Expand Up @@ -324,12 +339,14 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
newVal = &values.Bool
case http.Header:
newVal = values.HTTPHeader
case *timestamppb.Timestamp:
newVal = timestamppb.New(values.Time)
case time.Duration:
newVal = values.Duration
case *timestamppb.Timestamp:
newVal = timestamppb.New(values.Time)
case modelpb.MetricType:
newVal = values
newVal = values.MetricType
case modelpb.CompressionStrategy:
newVal = values.CompressionStrategy
default:
// the populator recursively iterates over struct and structPtr
// calling this function for all fields;
Expand All @@ -346,7 +363,7 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
assert.NotZero(t, fVal, key)
return
}
panic(fmt.Sprintf("unhandled type %s for key %s", f.Type(), key))
panic(fmt.Sprintf("unhandled type %s %s for key %s", f.Kind(), f.Type(), key))
}
assert.Equal(t, newVal, fVal, key)
})
Expand Down
36 changes: 36 additions & 0 deletions input/elasticapm/internal/modeldecoder/modeldecoderutil/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package modeldecoderutil
import (
"encoding/json"
"net/http"

"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/structpb"
)

// HTTPHeadersToMap converts h to a map[string]any, suitable for
Expand All @@ -39,6 +42,39 @@ func HTTPHeadersToMap(h http.Header) map[string]any {
return m
}

// HTTPHeadersToStructPb converts h to a *structpb.Struct, suitable for
// use in modelpb.HTTP.{Request,Response}.Headers.
func HTTPHeadersToStructPb(h http.Header) *structpb.Struct {
if len(h) == 0 {
return nil
}
m := make(map[string]any, len(h))
for k, v := range h {
arr := make([]any, 0, len(v))
for _, s := range v {
arr = append(arr, s)
}
m[k] = arr
}
if str, err := structpb.NewStruct(m); err == nil {
return str
}
return nil
}

func HTTPHeadersToModelpb(h http.Header) map[string]*modelpb.HTTPHeaderValue {
if len(h) == 0 {
return nil
}
m := make(map[string]*modelpb.HTTPHeaderValue, len(h))
for k, v := range h {
m[k] = &modelpb.HTTPHeaderValue{
Values: v,
}
}
return m
}

// NormalizeHTTPRequestBody recurses through v, replacing any instance of
// a json.Number with float64.
//
Expand Down
64 changes: 57 additions & 7 deletions input/elasticapm/internal/modeldecoder/modeldecoderutil/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,31 @@ import (
"strconv"

"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
)

// GlobalLabelsFrom populates the Labels and NumericLabels from global labels
// in the metadata object.
func GlobalLabelsFrom(from map[string]any, to *model.APMEvent) {
func GlobalLabelsFrom(from map[string]any, to *modelpb.APMEvent) {
to.NumericLabels = make(modelpb.NumericLabels)
to.Labels = make(modelpb.Labels)
MergeLabels(from, to)
for k, v := range to.Labels {
v.Global = true
to.Labels[k] = v
}
for k, v := range to.NumericLabels {
v.Global = true
to.NumericLabels[k] = v
}
}

// GlobalLabelsFrom populates the Labels and NumericLabels from global labels
// in the metadata object.
func GlobalLabelsFromOld(from map[string]any, to *model.APMEvent) {
to.NumericLabels = make(model.NumericLabels)
to.Labels = make(model.Labels)
MergeLabels(from, to)
MergeLabelsOld(from, to)
for k, v := range to.Labels {
v.Global = true
to.Labels[k] = v
Expand All @@ -44,7 +61,7 @@ func GlobalLabelsFrom(from map[string]any, to *model.APMEvent) {
// combining event-specific labels onto (metadata) global labels.
//
// If eventLabels is non-nil, it is first cloned.
func MergeLabels(eventLabels map[string]any, to *model.APMEvent) {
func MergeLabelsOld(eventLabels map[string]any, to *model.APMEvent) {
if to.NumericLabels == nil {
to.NumericLabels = make(model.NumericLabels)
}
Expand All @@ -54,14 +71,47 @@ func MergeLabels(eventLabels map[string]any, to *model.APMEvent) {
for k, v := range eventLabels {
switch v := v.(type) {
case string:
to.Labels.Set(k, v)
model.Labels(to.Labels).Set(k, v)
case bool:
model.Labels(to.Labels).Set(k, strconv.FormatBool(v))
case float64:
model.NumericLabels(to.NumericLabels).Set(k, v)
case json.Number:
if floatVal, err := v.Float64(); err == nil {
model.NumericLabels(to.NumericLabels).Set(k, floatVal)
}
}
}
if len(to.NumericLabels) == 0 {
to.NumericLabels = nil
}
if len(to.Labels) == 0 {
to.Labels = nil
}
}

// MergeLabels merges eventLabels into the APMEvent. This is used for
// combining event-specific labels onto (metadata) global labels.
//
// If eventLabels is non-nil, it is first cloned.
func MergeLabels(eventLabels map[string]any, to *modelpb.APMEvent) {
if to.NumericLabels == nil {
to.NumericLabels = make(modelpb.NumericLabels)
}
if to.Labels == nil {
to.Labels = make(modelpb.Labels)
}
for k, v := range eventLabels {
switch v := v.(type) {
case string:
modelpb.Labels(to.Labels).Set(k, v)
case bool:
to.Labels.Set(k, strconv.FormatBool(v))
modelpb.Labels(to.Labels).Set(k, strconv.FormatBool(v))
case float64:
to.NumericLabels.Set(k, v)
modelpb.NumericLabels(to.NumericLabels).Set(k, v)
case json.Number:
if floatVal, err := v.Float64(); err == nil {
to.NumericLabels.Set(k, floatVal)
modelpb.NumericLabels(to.NumericLabels).Set(k, floatVal)
}
}
}
Expand Down
15 changes: 11 additions & 4 deletions input/elasticapm/internal/modeldecoder/modeldecoderutil/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package modeldecoderutil
import (
"time"

"github.com/elastic/apm-data/model"
"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
)

// SetInternalMetrics extracts well-known internal metrics from event.Metricset.Samples,
Expand All @@ -33,7 +34,7 @@ import (
// SetInternalMetrics returns true if any known metric samples were found, and false
// otherwise. If no known metric samples were found, the caller may opt to omit the
// metricset altogether.
func SetInternalMetrics(event *model.APMEvent) bool {
func SetInternalMetrics(event *modelpb.APMEvent) bool {
if event.Transaction == nil {
// Not an internal metricset.
return false
Expand All @@ -43,10 +44,16 @@ func SetInternalMetrics(event *model.APMEvent) bool {
for _, v := range event.Metricset.Samples {
switch v.Name {
case "span.self_time.count":
event.Span.SelfTime.Count = int(v.Value)
if event.Span.SelfTime == nil {
event.Span.SelfTime = &modelpb.AggregatedDuration{}
}
event.Span.SelfTime.Count = int64(v.Value)
haveMetrics = true
case "span.self_time.sum.us":
event.Span.SelfTime.Sum = time.Duration(v.Value * 1000)
if event.Span.SelfTime == nil {
event.Span.SelfTime = &modelpb.AggregatedDuration{}
}
event.Span.SelfTime.Sum = durationpb.New(time.Duration(v.Value * 1000))
haveMetrics = true
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package modeldecoderutil

import (
"google.golang.org/protobuf/types/known/structpb"
)

func ToStruct(m map[string]any) *structpb.Struct {
if str, err := structpb.NewStruct(m); err == nil {
return str
}
return nil
}

func ToValue(a any) *structpb.Value {
if v, err := structpb.NewValue(a); err == nil {
return v
}
return nil
}

func NormalizeMap(m map[string]any) map[string]any {
v := NormalizeHTTPRequestBody(m)
return v.(map[string]any)
}
Loading

0 comments on commit 4a4acc4

Please sign in to comment.