diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index ce1da58ad..762ff1566 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -37,7 +37,6 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect - github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fxamacker/cbor/v2 v2.4.0 // indirect @@ -56,7 +55,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.16.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect @@ -69,6 +68,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.1.19 // indirect + github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/cmd/otelcorecol/go.sum b/cmd/otelcorecol/go.sum index ab06c33cf..ca51d3642 100644 --- a/cmd/otelcorecol/go.sum +++ b/cmd/otelcorecol/go.sum @@ -650,8 +650,6 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= -github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f h1:GQeKf74r7kOzLxO53CFRoBj38i+I+5fOgtnc/Kzmv4g= -github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f/go.mod h1:LlrQzSY6y3cusaBHQ6fTy5NuiGfhBkC/2dJKa2k1w0M= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -840,8 +838,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= @@ -913,6 +911,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 h1:yCQq2YHwISLwlCuRiFC+kvrT3Y0lC4k9KrijSLjlJ10= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60/go.mod h1:yWEudwLj+xkvv3oooZyuzwrpAZhlS00gN24GUDJyDfc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index a55f575e8..bd9693e0d 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -37,7 +37,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect diff --git a/config/configgrpc/go.sum b/config/configgrpc/go.sum index 455396550..53fd24e7b 100644 --- a/config/configgrpc/go.sum +++ b/config/configgrpc/go.sum @@ -242,8 +242,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/exporter/go.mod b/exporter/go.mod index 8c991fe10..2da5eefe5 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -4,17 +4,22 @@ go 1.19 require ( github.com/cenkalti/backoff/v4 v4.2.1 + github.com/klauspost/compress v1.16.7 + github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 github.com/stretchr/testify v1.8.4 go.opencensus.io v0.24.0 go.opentelemetry.io/collector v0.80.0 go.opentelemetry.io/collector/component v0.80.0 + go.opentelemetry.io/collector/confmap v0.80.0 go.opentelemetry.io/collector/consumer v0.80.0 go.opentelemetry.io/collector/extension v0.80.0 go.opentelemetry.io/collector/pdata v1.0.0-rcv0013 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) require ( @@ -44,7 +49,6 @@ require ( github.com/prometheus/procfs v0.10.1 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.80.0 // indirect - go.opentelemetry.io/collector/confmap v0.80.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0013 // indirect go.opentelemetry.io/collector/processor v0.80.0 // indirect go.opentelemetry.io/collector/receiver v0.80.0 // indirect @@ -52,7 +56,6 @@ require ( go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.39.0 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.11.0 // indirect golang.org/x/sys v0.9.0 // indirect golang.org/x/text v0.10.0 // indirect diff --git a/exporter/go.sum b/exporter/go.sum index f47287853..e83a913be 100644 --- a/exporter/go.sum +++ b/exporter/go.sum @@ -235,6 +235,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -286,6 +288,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 h1:yCQq2YHwISLwlCuRiFC+kvrT3Y0lC4k9KrijSLjlJ10= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60/go.mod h1:yWEudwLj+xkvv3oooZyuzwrpAZhlS00gN24GUDJyDfc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= @@ -717,6 +721,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/exporter/otlpexporter/config.go b/exporter/otlpexporter/config.go index 9c6ef463b..a7f27058f 100644 --- a/exporter/otlpexporter/config.go +++ b/exporter/otlpexporter/config.go @@ -5,6 +5,7 @@ package otlpexporter // import "go.opentelemetry.io/collector/exporter/otlpexpor import ( "fmt" + "time" "google.golang.org/grpc" @@ -33,10 +34,11 @@ type Config struct { // ArrowSettings includes whether Arrow is enabled and the number of // concurrent Arrow streams. type ArrowSettings struct { - Disabled bool `mapstructure:"disabled"` - NumStreams int `mapstructure:"num_streams"` - DisableDowngrade bool `mapstructure:"disable_downgrade"` - EnableMixedSignals bool `mapstructure:"enable_mixed_signals"` + Disabled bool `mapstructure:"disabled"` + NumStreams int `mapstructure:"num_streams"` + DisableDowngrade bool `mapstructure:"disable_downgrade"` + EnableMixedSignals bool `mapstructure:"enable_mixed_signals"` + MaxStreamLifetime time.Duration `mapstructure:"max_stream_lifetime"` } var _ component.Config = (*Config)(nil) @@ -59,5 +61,9 @@ func (cfg *ArrowSettings) Validate() error { return fmt.Errorf("stream count must be > 0: %d", cfg.NumStreams) } + if cfg.MaxStreamLifetime.Seconds() < float64(1) { + return fmt.Errorf("max stream life must be > 0: %d", cfg.MaxStreamLifetime) + } + return nil } diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index d688885a2..809902e8e 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -79,27 +79,32 @@ func TestUnmarshalConfig(t *testing.T) { Arrow: ArrowSettings{ NumStreams: 2, EnableMixedSignals: true, + MaxStreamLifetime: 2 * time.Hour, }, }, cfg) } func TestArrowSettingsValidate(t *testing.T) { - settings := func(enabled bool, numStreams int) *ArrowSettings { - return &ArrowSettings{Disabled: !enabled, NumStreams: numStreams} + settings := func(enabled bool, numStreams int, maxStreamLifetime time.Duration) *ArrowSettings { + return &ArrowSettings{Disabled: !enabled, NumStreams: numStreams, MaxStreamLifetime: maxStreamLifetime} } - require.NoError(t, settings(true, 1).Validate()) - require.NoError(t, settings(false, 1).Validate()) - require.NoError(t, settings(true, 2).Validate()) - require.NoError(t, settings(true, math.MaxInt).Validate()) + require.NoError(t, settings(true, 1, 10*time.Second).Validate()) + require.NoError(t, settings(false, 1, 10*time.Second).Validate()) + require.NoError(t, settings(true, 2, 1*time.Second).Validate()) + require.NoError(t, settings(true, math.MaxInt, 10*time.Second).Validate()) - require.Error(t, settings(true, 0).Validate()) - require.Contains(t, settings(true, 0).Validate().Error(), "stream count must be") - require.Error(t, settings(false, -1).Validate()) - require.Error(t, settings(true, math.MinInt).Validate()) + require.Error(t, settings(true, 0, 10*time.Second).Validate()) + require.Contains(t, settings(true, 0, 10*time.Second).Validate().Error(), "stream count must be") + require.Contains(t, settings(true, 1, -1*time.Second).Validate().Error(), "max stream life must be") + require.Error(t, settings(false, -1, 10*time.Second).Validate()) + require.Error(t, settings(false, 1, -1*time.Second).Validate()) + require.Error(t, settings(true, math.MinInt, 10*time.Second).Validate()) } func TestDefaultSettingsValid(t *testing.T) { cfg := createDefaultConfig() + // this must be set by the user and config + // validation always checks that a value is set. + cfg.(*Config).Arrow.MaxStreamLifetime = 2 * time.Second require.NoError(t, cfg.(*Config).Validate()) - } diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index df4cdd46d..3b91cdfea 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -6,8 +6,9 @@ package otlpexporter // import "go.opentelemetry.io/collector/exporter/otlpexpor import ( "context" "runtime" + "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" @@ -22,7 +23,7 @@ import ( const ( // The value of "type" key in configuration. - typeStr = "otlp" + typeStr = "otelarrow" ) // NewFactory creates a factory for OTLP exporter. @@ -49,7 +50,8 @@ func createDefaultConfig() component.Config { WriteBufferSize: 512 * 1024, }, Arrow: ArrowSettings{ - NumStreams: runtime.NumCPU(), + NumStreams: runtime.NumCPU(), + MaxStreamLifetime: time.Hour, }, } } diff --git a/exporter/otlpexporter/factory_test.go b/exporter/otlpexporter/factory_test.go index da701e552..480bb41f6 100644 --- a/exporter/otlpexporter/factory_test.go +++ b/exporter/otlpexporter/factory_test.go @@ -34,7 +34,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueSettings()) assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutSettings()) assert.Equal(t, ocfg.Compression, configcompression.Gzip) - assert.Equal(t, ocfg.Arrow, ArrowSettings{Disabled: false, NumStreams: runtime.NumCPU()}) + assert.Equal(t, ocfg.Arrow, ArrowSettings{Disabled: false, NumStreams: runtime.NumCPU(), MaxStreamLifetime: time.Hour}) } func TestCreateMetricsExporter(t *testing.T) { diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index ef3ab73e7..9f6f381c8 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -4,8 +4,8 @@ go 1.19 require ( github.com/apache/arrow/go/v12 v12.0.1 - github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f github.com/golang/mock v1.6.0 + github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector v0.80.0 go.opentelemetry.io/collector/component v0.80.0 @@ -49,7 +49,7 @@ require ( github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.16.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index a7ff6527d..0461cce81 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -72,8 +72,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= -github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f h1:GQeKf74r7kOzLxO53CFRoBj38i+I+5fOgtnc/Kzmv4g= -github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f/go.mod h1:LlrQzSY6y3cusaBHQ6fTy5NuiGfhBkC/2dJKa2k1w0M= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -206,8 +204,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= @@ -268,6 +266,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 h1:yCQq2YHwISLwlCuRiFC+kvrT3Y0lC4k9KrijSLjlJ10= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60/go.mod h1:yWEudwLj+xkvv3oooZyuzwrpAZhlS00gN24GUDJyDfc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/exporter/otlpexporter/internal/arrow/common_test.go b/exporter/otlpexporter/internal/arrow/common_test.go index 3427aebf1..7f1fe4a2d 100644 --- a/exporter/otlpexporter/internal/arrow/common_test.go +++ b/exporter/otlpexporter/internal/arrow/common_test.go @@ -9,9 +9,9 @@ import ( "io" "testing" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowCollectorMock "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1/mock" "github.com/golang/mock/gomock" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" @@ -97,6 +97,7 @@ type commonTestStream struct { ctxCall *gomock.Call sendCall *gomock.Call recvCall *gomock.Call + closeSendCall *gomock.Call } func (ctc *commonTestCase) newMockStream(ctx context.Context) *commonTestStream { @@ -108,7 +109,8 @@ func (ctc *commonTestCase) newMockStream(ctx context.Context) *commonTestStream sendCall: client.EXPECT().Send( gomock.Any(), // *arrowpb.BatchArrowRecords ).Times(0), - recvCall: client.EXPECT().Recv().Times(0), + recvCall: client.EXPECT().Recv().Times(0), + closeSendCall: client.EXPECT().CloseSend().AnyTimes().Return(nil), } return testStream } diff --git a/exporter/otlpexporter/internal/arrow/exporter.go b/exporter/otlpexporter/internal/arrow/exporter.go index 00ca3b09b..0680b0693 100644 --- a/exporter/otlpexporter/internal/arrow/exporter.go +++ b/exporter/otlpexporter/internal/arrow/exporter.go @@ -7,9 +7,10 @@ import ( "context" "errors" "sync" + "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -23,6 +24,8 @@ type Exporter struct { // numStreams is the number of streams that will be used. numStreams int + maxStreamLifetime time.Duration + // disableDowngrade prevents downgrade from occurring, supports // forcing Arrow transport. disableDowngrade bool @@ -90,6 +93,7 @@ func MakeAnyStreamClient[T AnyStreamClient](clientFunc func(ctx context.Context, // NewExporter configures a new Exporter. func NewExporter( + maxStreamLifetime time.Duration, numStreams int, disableDowngrade bool, telemetry component.TelemetrySettings, @@ -99,6 +103,7 @@ func NewExporter( perRPCCredentials credentials.PerRPCCredentials, ) *Exporter { return &Exporter{ + maxStreamLifetime: maxStreamLifetime, numStreams: numStreams, disableDowngrade: disableDowngrade, telemetry: telemetry, @@ -175,6 +180,7 @@ func (e *Exporter) runArrowStream(ctx context.Context) { producer := e.newProducer() stream := newStream(producer, e.ready, e.telemetry, e.perRPCCredentials) + stream.maxStreamLifetime = e.maxStreamLifetime defer func() { if err := producer.Close(); err != nil { diff --git a/exporter/otlpexporter/internal/arrow/exporter_test.go b/exporter/otlpexporter/internal/arrow/exporter_test.go index fc272fef0..a92a413af 100644 --- a/exporter/otlpexporter/internal/arrow/exporter_test.go +++ b/exporter/otlpexporter/internal/arrow/exporter_test.go @@ -13,11 +13,11 @@ import ( "testing" "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" - arrowRecordMock "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record/mock" - otelAssert "github.com/f5/otel-arrow-adapter/pkg/otel/assert" "github.com/golang/mock/gomock" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" + arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock" + otelAssert "github.com/open-telemetry/otel-arrow/pkg/otel/assert" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/http2/hpack" @@ -30,6 +30,8 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) +const defaultMaxStreamLifetime = 11 * time.Second + type compareJSONTraces struct{ ptrace.Traces } type compareJSONMetrics struct{ pmetric.Metrics } type compareJSONLogs struct{ plog.Logs } @@ -123,7 +125,7 @@ func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, di }) } - exp := NewExporter(numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI { + exp := NewExporter(defaultMaxStreamLifetime, numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI { // Mock the close function, use a real producer for testing dataflow. mock := arrowRecordMock.NewMockProducerAPI(ctc.ctrl) prod := arrowRecord.NewProducer() @@ -177,6 +179,7 @@ func statusUnrecognizedFor(id int64) *arrowpb.BatchStatus { // TestArrowExporterSuccess tests a single Send through a healthy channel. func TestArrowExporterSuccess(t *testing.T) { + stdTesting := otelAssert.NewStdUnitTest(t) for _, inputData := range []interface{}{twoTraces, twoMetrics, twoLogs} { tc := newSingleStreamTestCase(t) channel := newHealthyTestChannel() @@ -207,7 +210,7 @@ func TestArrowExporterSuccess(t *testing.T) { traces, err := testCon.TracesFrom(outputData) require.NoError(t, err) require.Equal(t, 1, len(traces)) - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONTraces{testData}, }, []json.Marshaler{ compareJSONTraces{traces[0]}, @@ -216,7 +219,7 @@ func TestArrowExporterSuccess(t *testing.T) { logs, err := testCon.LogsFrom(outputData) require.NoError(t, err) require.Equal(t, 1, len(logs)) - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONLogs{testData}, }, []json.Marshaler{ compareJSONLogs{logs[0]}, @@ -225,7 +228,7 @@ func TestArrowExporterSuccess(t *testing.T) { metrics, err := testCon.MetricsFrom(outputData) require.NoError(t, err) require.Equal(t, 1, len(metrics)) - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONMetrics{testData}, }, []json.Marshaler{ compareJSONMetrics{metrics[0]}, diff --git a/exporter/otlpexporter/internal/arrow/stream.go b/exporter/otlpexporter/internal/arrow/stream.go index 429605a2f..dd515d1db 100644 --- a/exporter/otlpexporter/internal/arrow/stream.go +++ b/exporter/otlpexporter/internal/arrow/stream.go @@ -11,9 +11,10 @@ import ( "io" "strings" "sync" + "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/net/http2/hpack" @@ -31,6 +32,12 @@ import ( // Stream is 1:1 with gRPC stream. type Stream struct { + // maxStreamLifetime is the max timeout before stream + // should be closed on the client side. This ensures a + // graceful shutdown before max_connection_age is reached + // on the server side. + maxStreamLifetime time.Duration + // producer is exclusive to the holder of the stream. producer arrowRecord.ProducerAPI @@ -149,8 +156,10 @@ func (s *Stream) run(bgctx context.Context, streamClient StreamClientFunc, grpcO ww.Add(1) go func() { defer ww.Done() - defer cancel() writeErr = s.write(ctx) + if writeErr != nil { + cancel() + } }() // the result from read() is processed after cancel and wait, @@ -257,6 +266,8 @@ func (s *Stream) write(ctx context.Context) error { var hdrsBuf bytes.Buffer hdrsEnc := hpack.NewEncoder(&hdrsBuf) + timer := time.NewTimer(s.maxStreamLifetime) + for { // Note: this can't block b/c stream has capacity & // individual streams shut down synchronously. @@ -265,8 +276,16 @@ func (s *Stream) write(ctx context.Context) error { // this can block, and if the context is canceled we // wait for the reader to find this stream. var wri writeItem + var ok bool select { - case wri = <-s.toWrite: + case <-timer.C: + s.prioritizer.removeReady(s) + return s.client.CloseSend() + case wri, ok = <-s.toWrite: + // channel is closed + if !ok { + return nil + } case <-ctx.Done(): // Because we did not <-stream.toWrite, there // is a potential sender race since the stream @@ -325,8 +344,17 @@ func (s *Stream) read(_ context.Context) error { // cancel a call to Recv() but the call to processBatchStatus // is non-blocking. for { + // Note: if the client has called CloseSend() and is waiting for a response from the server. + // And if the server fails for some reason, we will wait until some other condition, such as a context + // timeout. TODO: possibly, improve to wait for no outstanding requests and then stop reading. resp, err := s.client.Recv() if err != nil { + // Once the send direction of stream is closed the server should return + // an error that mentions an EOF. The expected error code is codes.Unknown. + status, ok := status.FromError(err) + if ok && status.Message() == "EOF" && status.Code() == codes.Unknown { + return nil + } // Note: do not wrap, contains a Status. return err } @@ -334,6 +362,7 @@ func (s *Stream) read(_ context.Context) error { if err = s.processBatchStatus(resp); err != nil { return fmt.Errorf("process: %w", err) } + } } diff --git a/exporter/otlpexporter/internal/arrow/stream_test.go b/exporter/otlpexporter/internal/arrow/stream_test.go index e6ee6a7df..03cc74425 100644 --- a/exporter/otlpexporter/internal/arrow/stream_test.go +++ b/exporter/otlpexporter/internal/arrow/stream_test.go @@ -11,9 +11,9 @@ import ( "testing" "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowRecordMock "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record/mock" "github.com/golang/mock/gomock" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -53,6 +53,7 @@ func newStreamTestCase(t *testing.T) *streamTestCase { ctc.requestMetadataCall.AnyTimes().Return(nil, nil) stream := newStream(producer, prio, ctc.telset, ctc.perRPCCredentials) + stream.maxStreamLifetime = 10 * time.Second fromTracesCall := producer.EXPECT().BatchArrowRecordsFromTraces(gomock.Any()).Times(0) fromMetricsCall := producer.EXPECT().BatchArrowRecordsFromMetrics(gomock.Any()).Times(0) @@ -118,6 +119,38 @@ func (tc *streamTestCase) get() *Stream { return <-tc.prioritizer.readyChannel() } +// TestStreamEncodeError verifies that exceeding the +// max_stream_lifetime results in shutdown that +// simply restarts the stream. +func TestStreamGracefulShutdown(t *testing.T) { + tc := newStreamTestCase(t) + maxStreamLifetime := 1 * time.Second + tc.stream.maxStreamLifetime = maxStreamLifetime + + tc.fromTracesCall.Times(1).Return(oneBatch, nil) + tc.closeSendCall.Times(1).Return(nil) + + channel := newHealthyTestChannel() + tc.start(channel) + defer tc.cancelAndWaitForShutdown() + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + go func() { + defer wg.Done() + batch := <-channel.sent + channel.recv <- statusOKFor(batch.BatchId) + }() + + err := tc.get().SendAndWait(tc.bgctx, twoTraces) + require.NoError(t, err) + // let stream get closed and send again. + time.Sleep(maxStreamLifetime) + err = tc.get().SendAndWait(tc.bgctx, twoTraces) + require.Error(t, err) + require.True(t, errors.Is(err, ErrStreamRestarting)) +} + // TestStreamEncodeError verifies that an encoder error in the sender // yields a permanent error. func TestStreamEncodeError(t *testing.T) { diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index d1f99b5aa..f63fcf73d 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -11,7 +11,7 @@ import ( "time" arrowPkg "github.com/apache/arrow/go/v12/arrow" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.uber.org/multierr" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" @@ -131,7 +131,7 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro } } - e.arrow = arrow.NewExporter(e.config.Arrow.NumStreams, e.config.Arrow.DisableDowngrade, e.settings.TelemetrySettings, e.callOptions, func() arrowRecord.ProducerAPI { + e.arrow = arrow.NewExporter(e.config.Arrow.MaxStreamLifetime, e.config.Arrow.NumStreams, e.config.Arrow.DisableDowngrade, e.settings.TelemetrySettings, e.callOptions, func() arrowRecord.ProducerAPI { return arrowRecord.NewProducer() }, e.streamClientFactory(e.config, e.clientConn), perRPCCreds) diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index a763ecafd..db6afc503 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -15,10 +15,10 @@ import ( "testing" "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowpbMock "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1/mock" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" "github.com/golang/mock/gomock" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowpbMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -457,6 +457,7 @@ func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPCClientSettings = test.gRPCClientSettings cfg.GRPCClientSettings.Endpoint = test.scheme + ln.Addr().String() + cfg.Arrow.MaxStreamLifetime = 100 * time.Second if test.useTLS { cfg.GRPCClientSettings.TLSSetting.InsecureSkipVerify = true } @@ -513,6 +514,7 @@ func TestSendMetrics(t *testing.T) { "header": "header-value", }, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" @@ -611,6 +613,7 @@ func TestSendTraceDataServerDownAndUp(t *testing.T) { // Do not rely on external retry logic here, if that is intended set InitialInterval to 100ms. WaitForReady: true, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) require.NoError(t, err) @@ -668,6 +671,7 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) { Insecure: true, }, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) require.NoError(t, err) @@ -721,6 +725,7 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) { Insecure: true, }, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) require.NoError(t, err) @@ -803,6 +808,7 @@ func TestSendLogData(t *testing.T) { Insecure: true, }, } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" @@ -920,6 +926,7 @@ func testSendArrowTraces(t *testing.T, mixedSignals, clientWaitForReady, streamS cfg.Arrow = ArrowSettings{ NumStreams: 1, EnableMixedSignals: mixedSignals, + MaxStreamLifetime: 100 * time.Second, } set := exportertest.NewNopCreateSettings() @@ -1092,6 +1099,7 @@ func TestSendArrowFailedTraces(t *testing.T) { cfg.Arrow = ArrowSettings{ NumStreams: 1, EnableMixedSignals: true, + MaxStreamLifetime: 100 * time.Second, } cfg.QueueSettings.Enabled = false diff --git a/exporter/otlpexporter/testdata/config.yaml b/exporter/otlpexporter/testdata/config.yaml index 0120d78fc..799cf4a31 100644 --- a/exporter/otlpexporter/testdata/config.yaml +++ b/exporter/otlpexporter/testdata/config.yaml @@ -29,3 +29,4 @@ arrow: num_streams: 2 disabled: false enable_mixed_signals: true + max_stream_lifetime: 2h diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index d5d0c10d7..6678d5ffd 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -31,7 +31,6 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect - github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fxamacker/cbor/v2 v2.4.0 // indirect @@ -44,7 +43,7 @@ require ( github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.16.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect @@ -55,6 +54,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.1.19 // indirect + github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.9.0 // indirect diff --git a/exporter/otlphttpexporter/go.sum b/exporter/otlphttpexporter/go.sum index 08ebc0e10..6ba066c31 100644 --- a/exporter/otlphttpexporter/go.sum +++ b/exporter/otlphttpexporter/go.sum @@ -647,8 +647,6 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= -github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f h1:GQeKf74r7kOzLxO53CFRoBj38i+I+5fOgtnc/Kzmv4g= -github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f/go.mod h1:LlrQzSY6y3cusaBHQ6fTy5NuiGfhBkC/2dJKa2k1w0M= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -830,8 +828,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= @@ -900,6 +898,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 h1:yCQq2YHwISLwlCuRiFC+kvrT3Y0lC4k9KrijSLjlJ10= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60/go.mod h1:yWEudwLj+xkvv3oooZyuzwrpAZhlS00gN24GUDJyDfc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/internal/netstats/netstats.go b/internal/netstats/netstats.go index 14a7c8f47..a6433f84f 100644 --- a/internal/netstats/netstats.go +++ b/internal/netstats/netstats.go @@ -40,7 +40,7 @@ const ( // (includes compression) by exporters and receivers. RecvWireBytes = "recv_wire" - scopeName = "github.com/f5/otel-arrow-adapter/collector/netstats" + scopeName = "github.com/open-telemetry/otel-arrow/collector/netstats" ) // NetworkReporter is a helper to add network-level observability to diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 57c5588bb..934066a9b 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -16,7 +16,7 @@ import ( ) const ( - typeStr = "otlp" + typeStr = "otelarrow" defaultGRPCEndpoint = "0.0.0.0:4317" defaultHTTPEndpoint = "0.0.0.0:4318" diff --git a/receiver/otlpreceiver/go.mod b/receiver/otlpreceiver/go.mod index 32fc2aa2b..15c51afdf 100644 --- a/receiver/otlpreceiver/go.mod +++ b/receiver/otlpreceiver/go.mod @@ -3,9 +3,9 @@ module go.opentelemetry.io/collector/receiver/otlpreceiver go 1.19 require ( - github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.6.0 + github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector v0.80.0 go.opentelemetry.io/collector/component v0.80.0 @@ -56,7 +56,7 @@ require ( github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.16.6 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect diff --git a/receiver/otlpreceiver/go.sum b/receiver/otlpreceiver/go.sum index ec1020029..bcc98dc55 100644 --- a/receiver/otlpreceiver/go.sum +++ b/receiver/otlpreceiver/go.sum @@ -107,8 +107,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= -github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f h1:GQeKf74r7kOzLxO53CFRoBj38i+I+5fOgtnc/Kzmv4g= -github.com/f5/otel-arrow-adapter v0.0.0-20230629002931-9dac16c8ad8f/go.mod h1:LlrQzSY6y3cusaBHQ6fTy5NuiGfhBkC/2dJKa2k1w0M= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -278,8 +276,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= @@ -341,6 +339,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60 h1:yCQq2YHwISLwlCuRiFC+kvrT3Y0lC4k9KrijSLjlJ10= +github.com/open-telemetry/otel-arrow v0.0.0-20230814172509-caeffd6edc60/go.mod h1:yWEudwLj+xkvv3oooZyuzwrpAZhlS00gN24GUDJyDfc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/receiver/otlpreceiver/internal/arrow/arrow.go b/receiver/otlpreceiver/internal/arrow/arrow.go index ad09e5370..bc8fdb8a2 100644 --- a/receiver/otlpreceiver/internal/arrow/arrow.go +++ b/receiver/otlpreceiver/internal/arrow/arrow.go @@ -10,8 +10,8 @@ import ( "io" "strings" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/net/http2/hpack" diff --git a/receiver/otlpreceiver/internal/arrow/arrow_test.go b/receiver/otlpreceiver/internal/arrow/arrow_test.go index b0ec547b6..f6c23dc51 100644 --- a/receiver/otlpreceiver/internal/arrow/arrow_test.go +++ b/receiver/otlpreceiver/internal/arrow/arrow_test.go @@ -14,12 +14,12 @@ import ( "sync" "testing" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowCollectorMock "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1/mock" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" - arrowRecordMock "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record/mock" - otelAssert "github.com/f5/otel-arrow-adapter/pkg/otel/assert" "github.com/golang/mock/gomock" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" + arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock" + otelAssert "github.com/open-telemetry/otel-arrow/pkg/otel/assert" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -369,6 +369,7 @@ func TestReceiverLogs(t *testing.T) { func TestReceiverMetrics(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) + stdTesting := otelAssert.NewStdUnitTest(t) md := testdata.GenerateMetrics(2) batch, err := ctc.testProducer.BatchArrowRecordsFromMetrics(md) @@ -379,7 +380,7 @@ func TestReceiverMetrics(t *testing.T) { ctc.start(ctc.newRealConsumer) ctc.putBatch(batch, nil) - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONMetrics{md}, }, []json.Marshaler{ compareJSONMetrics{(<-ctc.consume).Data.(pmetric.Metrics)}, @@ -424,6 +425,8 @@ func TestReceiverSendError(t *testing.T) { } func TestReceiverConsumeError(t *testing.T) { + stdTesting := otelAssert.NewStdUnitTest(t) + data := []interface{}{ testdata.GenerateTraces(2), testdata.GenerateMetrics(2), @@ -458,19 +461,19 @@ func TestReceiverConsumeError(t *testing.T) { switch input := item.(type) { case ptrace.Traces: - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONTraces{input}, }, []json.Marshaler{ compareJSONTraces{(<-ctc.consume).Data.(ptrace.Traces)}, }) case plog.Logs: - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONLogs{input}, }, []json.Marshaler{ compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}, }) case pmetric.Metrics: - otelAssert.Equiv(t, []json.Marshaler{ + otelAssert.Equiv(stdTesting, []json.Marshaler{ compareJSONMetrics{input}, }, []json.Marshaler{ compareJSONMetrics{(<-ctc.consume).Data.(pmetric.Metrics)}, diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index edffcb0f7..1cb3eb398 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -11,8 +11,8 @@ import ( "net/http" "sync" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.uber.org/zap" "google.golang.org/grpc" diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index f0c57b48d..49085af75 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -17,9 +17,9 @@ import ( "testing" "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/experimental/arrow/v1" - arrowRecord "github.com/f5/otel-arrow-adapter/pkg/otel/arrow_record" "github.com/golang/mock/gomock" + arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/http2/hpack"