From 6faf246edef7477a5b07fc4db1b208b8e07fdc02 Mon Sep 17 00:00:00 2001 From: binjip978 Date: Mon, 21 Feb 2022 15:14:13 +0200 Subject: [PATCH] Add appProtocol for otlp and jaeger receiver parsers (#704) Signed-off-by: Sergei Semenchuk --- .../adapters/config_to_ports_test.go | 45 +++++++++++++++++-- pkg/collector/parser/receiver_generic.go | 20 ++++++--- pkg/collector/parser/receiver_jaeger.go | 8 ++++ pkg/collector/parser/receiver_otlp.go | 38 +++++++++++----- pkg/collector/parser/receiver_zipkin.go | 18 +++++--- pkg/collector/reconcile/service_test.go | 8 ++-- tests/e2e/smoke-simplest/00-assert.yaml | 28 ++++++++++++ 7 files changed, 135 insertions(+), 30 deletions(-) diff --git a/pkg/collector/adapters/config_to_ports_test.go b/pkg/collector/adapters/config_to_ports_test.go index f13db7b868..65d4d15799 100644 --- a/pkg/collector/adapters/config_to_ports_test.go +++ b/pkg/collector/adapters/config_to_ports_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" logf "sigs.k8s.io/controller-runtime/pkg/log" "github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters" @@ -53,6 +52,17 @@ func TestExtractPortsFromConfig(t *testing.T) { protocols: thrift_http: endpoint: 0.0.0.0:15268 + otlp: + protocols: + grpc: + http: + otlp/2: + protocols: + grpc: + endpoint: 0.0.0.0:55555 + zipkin: + zipkin/2: + endpoint: 0.0.0.0:33333 ` // prepare @@ -63,7 +73,7 @@ func TestExtractPortsFromConfig(t *testing.T) { // test ports, err := adapters.ConfigToReceiverPorts(logger, config) assert.NoError(t, err) - assert.Len(t, ports, 6) + assert.Len(t, ports, 12) // verify expectedPorts := map[int32]bool{} @@ -73,6 +83,11 @@ func TestExtractPortsFromConfig(t *testing.T) { expectedPorts[int32(6831)] = false expectedPorts[int32(6833)] = false expectedPorts[int32(15268)] = false + expectedPorts[int32(4318)] = false + expectedPorts[int32(55681)] = false + expectedPorts[int32(55555)] = false + expectedPorts[int32(9411)] = false + expectedPorts[int32(33333)] = false expectedNames := map[string]bool{} expectedNames["examplereceiver"] = false @@ -81,6 +96,22 @@ func TestExtractPortsFromConfig(t *testing.T) { expectedNames["jaeger-thrift-compact"] = false expectedNames["jaeger-thrift-binary"] = false expectedNames["jaeger-custom-thrift-http"] = false + expectedNames["otlp-grpc"] = false + expectedNames["otlp-http"] = false + expectedNames["otlp-http-legacy"] = false + expectedNames["otlp-2-grpc"] = false + expectedNames["zipkin"] = false + expectedNames["zipkin-2"] = false + + expectedAppProtocols := map[string]string{} + expectedAppProtocols["otlp-grpc"] = "grpc" + expectedAppProtocols["otlp-http"] = "http" + expectedAppProtocols["otlp-http-legacy"] = "http" + expectedAppProtocols["jaeger-custom-thrift-http"] = "http" + expectedAppProtocols["jaeger-grpc"] = "grpc" + expectedAppProtocols["otlp-2-grpc"] = "grpc" + expectedAppProtocols["zipkin"] = "http" + expectedAppProtocols["zipkin-2"] = "http" // make sure we only have the ports in the set for _, port := range ports { @@ -88,6 +119,10 @@ func TestExtractPortsFromConfig(t *testing.T) { assert.NotNil(t, expectedNames[port.Name]) expectedPorts[port.Port] = true expectedNames[port.Name] = true + + if appProtocol, ok := expectedAppProtocols[port.Name]; ok { + assert.Equal(t, appProtocol, *port.AppProtocol) + } } // and make sure all the ports from the set are there @@ -95,6 +130,10 @@ func TestExtractPortsFromConfig(t *testing.T) { assert.True(t, val) } + // make sure we only have the ports names in the set + for _, val := range expectedNames { + assert.True(t, val) + } } func TestNoPortsParsed(t *testing.T) { @@ -162,7 +201,7 @@ func TestParserFailed(t *testing.T) { // prepare mockParserCalled := false mockParser := &mockParser{ - portsFunc: func() ([]v1.ServicePort, error) { + portsFunc: func() ([]corev1.ServicePort, error) { mockParserCalled = true return nil, errors.New("mocked error") }, diff --git a/pkg/collector/parser/receiver_generic.go b/pkg/collector/parser/receiver_generic.go index da6a2e6669..587a08fece 100644 --- a/pkg/collector/parser/receiver_generic.go +++ b/pkg/collector/parser/receiver_generic.go @@ -25,11 +25,13 @@ var _ ReceiverParser = &GenericReceiver{} // GenericReceiver is a special parser for generic receivers. It doesn't self-register and should be created/used directly. type GenericReceiver struct { - logger logr.Logger - name string - config map[interface{}]interface{} - defaultPort int32 - parserName string + logger logr.Logger + name string + config map[interface{}]interface{} + defaultPort int32 + defaultProtocol corev1.Protocol + defaultAppProtocol *string + parserName string } // NOTE: Operator will sync with only receivers that aren't scrapers. Operator sync up receivers @@ -49,13 +51,17 @@ func NewGenericReceiverParser(logger logr.Logger, name string, config map[interf func (g *GenericReceiver) Ports() ([]corev1.ServicePort, error) { port := singlePortFromConfigEndpoint(g.logger, g.name, g.config) if port != nil { + port.Protocol = g.defaultProtocol + port.AppProtocol = g.defaultAppProtocol return []corev1.ServicePort{*port}, nil } if g.defaultPort > 0 { return []corev1.ServicePort{{ - Port: g.defaultPort, - Name: portName(g.name, g.defaultPort), + Port: g.defaultPort, + Name: portName(g.name, g.defaultPort), + Protocol: g.defaultProtocol, + AppProtocol: g.defaultAppProtocol, }}, nil } diff --git a/pkg/collector/parser/receiver_jaeger.go b/pkg/collector/parser/receiver_jaeger.go index 460c0b6adc..790782ab78 100644 --- a/pkg/collector/parser/receiver_jaeger.go +++ b/pkg/collector/parser/receiver_jaeger.go @@ -63,16 +63,19 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) { name string defaultPort int32 transportProtocol corev1.Protocol + appProtocol string }{ { name: "grpc", defaultPort: defaultGRPCPort, transportProtocol: corev1.ProtocolTCP, + appProtocol: "grpc", }, { name: "thrift_http", defaultPort: defaultThriftHTTPPort, transportProtocol: corev1.ProtocolTCP, + appProtocol: "http", }, { name: "thrift_compact", @@ -109,6 +112,11 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) { // set the appropriate transport protocol (i.e. TCP/UDP) for this kind of receiver protocol protocolPort.Protocol = protocol.transportProtocol + if protocol.appProtocol != "" { + c := protocol.appProtocol + protocolPort.AppProtocol = &c + } + // at this point, we *have* a port specified, add it to the list of ports ports = append(ports, *protocolPort) } diff --git a/pkg/collector/parser/receiver_otlp.go b/pkg/collector/parser/receiver_otlp.go index 45eab11ed7..a637bf8109 100644 --- a/pkg/collector/parser/receiver_otlp.go +++ b/pkg/collector/parser/receiver_otlp.go @@ -32,6 +32,11 @@ const ( defaultOTLPHTTPPort int32 = 4318 ) +var ( + grpc = "grpc" + http = "http" +) + // OTLPReceiverParser parses the configuration for OTLP receivers. type OTLPReceiverParser struct { logger logr.Logger @@ -64,27 +69,30 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) { defaultPorts []corev1.ServicePort }{ { - name: "grpc", + name: grpc, defaultPorts: []corev1.ServicePort{ { - Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort), - Port: defaultOTLPGRPCPort, - TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)), + Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort), + Port: defaultOTLPGRPCPort, + TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)), + AppProtocol: &grpc, }, }, }, { - name: "http", + name: http, defaultPorts: []corev1.ServicePort{ { - Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort), - Port: defaultOTLPHTTPPort, - TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), + Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort), + Port: defaultOTLPHTTPPort, + TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), + AppProtocol: &http, }, { - Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort), - Port: defaultOTLPHTTPLegacyPort, - TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), // we target the official port, not the legacy + Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort), + Port: defaultOTLPHTTPLegacyPort, + TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), // we target the official port, not the legacy + AppProtocol: &http, }, }, }, @@ -106,6 +114,14 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) { if protocolPort == nil { ports = append(ports, protocol.defaultPorts...) } else { + // infer protocol and appProtocol from protocol.name + if protocol.name == grpc { + protocolPort.Protocol = corev1.ProtocolTCP + protocolPort.AppProtocol = &grpc + } else if protocol.name == http { + protocolPort.Protocol = corev1.ProtocolTCP + protocolPort.AppProtocol = &http + } ports = append(ports, *protocolPort) } } diff --git a/pkg/collector/parser/receiver_zipkin.go b/pkg/collector/parser/receiver_zipkin.go index 0aa9eb7ed0..d96d951e09 100644 --- a/pkg/collector/parser/receiver_zipkin.go +++ b/pkg/collector/parser/receiver_zipkin.go @@ -14,18 +14,24 @@ package parser -import "github.com/go-logr/logr" +import ( + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" +) const parserNameZipkin = "__zipkin" // NewZipkinReceiverParser builds a new parser for Zipkin receivers. func NewZipkinReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) ReceiverParser { + http := "http" return &GenericReceiver{ - logger: logger, - name: name, - config: config, - defaultPort: 9411, - parserName: parserNameZipkin, + logger: logger, + name: name, + config: config, + defaultPort: 9411, + defaultProtocol: corev1.ProtocolTCP, + defaultAppProtocol: &http, + parserName: parserNameZipkin, } } diff --git a/pkg/collector/reconcile/service_test.go b/pkg/collector/reconcile/service_test.go index 4bfafee6f4..d400560d69 100644 --- a/pkg/collector/reconcile/service_test.go +++ b/pkg/collector/reconcile/service_test.go @@ -115,10 +115,12 @@ func TestDesiredService(t *testing.T) { }) t.Run("should return service with port mentioned in Instance.Spec.Ports and inferred ports", func(t *testing.T) { + grpc := "grpc" jaegerPorts := v1.ServicePort{ - Name: "jaeger-grpc", - Protocol: "TCP", - Port: 14250, + Name: "jaeger-grpc", + Protocol: "TCP", + Port: 14250, + AppProtocol: &grpc, } ports := append(params().Instance.Spec.Ports, jaegerPorts) expected := service("test-collector", ports) diff --git a/tests/e2e/smoke-simplest/00-assert.yaml b/tests/e2e/smoke-simplest/00-assert.yaml index 63f327e5e4..d3dfd05652 100644 --- a/tests/e2e/smoke-simplest/00-assert.yaml +++ b/tests/e2e/smoke-simplest/00-assert.yaml @@ -4,3 +4,31 @@ metadata: name: simplest-collector status: readyReplicas: 1 + +--- + +apiVersion: v1 +kind: Service +metadata: + name: simplest-collector-headless +spec: + ports: + - appProtocol: grpc + name: jaeger-grpc + port: 14250 + protocol: TCP + targetPort: 14250 + +--- + +apiVersion: v1 +kind: Service +metadata: + name: simplest-collector +spec: + ports: + - appProtocol: grpc + name: jaeger-grpc + port: 14250 + protocol: TCP + targetPort: 14250