Skip to content

Commit

Permalink
Add appProtocol for otlp and jaeger receiver parsers
Browse files Browse the repository at this point in the history
Signed-off-by: Sergei Semenchuk <pdp.eleven11@gmail.com>
  • Loading branch information
binjip978 committed Feb 12, 2022
1 parent e8d6f8b commit 6026da6
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 30 deletions.
45 changes: 42 additions & 3 deletions pkg/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"
Expand Down Expand Up @@ -53,6 +52,17 @@ func TestExtractPortsFromConfig(t *testing.T) {
protocols:
thrift_http:
endpoint: 0.0.0.0:15268
otlp:
protocols:
grpc:
http:
otlp/2:
protocols:
grpc:
endpoint: 0.0.0.0:55555
zipkin:
zipkin/2:
endpoint: 0.0.0.0:33333
`

// prepare
Expand All @@ -63,7 +73,7 @@ func TestExtractPortsFromConfig(t *testing.T) {
// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
assert.NoError(t, err)
assert.Len(t, ports, 6)
assert.Len(t, ports, 12)

// verify
expectedPorts := map[int32]bool{}
Expand All @@ -73,6 +83,11 @@ func TestExtractPortsFromConfig(t *testing.T) {
expectedPorts[int32(6831)] = false
expectedPorts[int32(6833)] = false
expectedPorts[int32(15268)] = false
expectedPorts[int32(4318)] = false
expectedPorts[int32(55681)] = false
expectedPorts[int32(55555)] = false
expectedPorts[int32(9411)] = false
expectedPorts[int32(33333)] = false

expectedNames := map[string]bool{}
expectedNames["examplereceiver"] = false
Expand All @@ -81,20 +96,44 @@ func TestExtractPortsFromConfig(t *testing.T) {
expectedNames["jaeger-thrift-compact"] = false
expectedNames["jaeger-thrift-binary"] = false
expectedNames["jaeger-custom-thrift-http"] = false
expectedNames["otlp-grpc"] = false
expectedNames["otlp-http"] = false
expectedNames["otlp-http-legacy"] = false
expectedNames["otlp-2-grpc"] = false
expectedNames["zipkin"] = false
expectedNames["zipkin-2"] = false

expectedAppProtocols := map[string]string{}
expectedAppProtocols["otlp-grpc"] = "grpc"
expectedAppProtocols["otlp-http"] = "http"
expectedAppProtocols["otlp-http-legacy"] = "http"
expectedAppProtocols["jaeger-custom-thrift-http"] = "http"
expectedAppProtocols["jaeger-grpc"] = "grpc"
expectedAppProtocols["otlp-2-grpc"] = "grpc"
expectedAppProtocols["zipkin"] = "http"
expectedAppProtocols["zipkin-2"] = "http"

// make sure we only have the ports in the set
for _, port := range ports {
assert.NotNil(t, expectedPorts[port.Port])
assert.NotNil(t, expectedNames[port.Name])
expectedPorts[port.Port] = true
expectedNames[port.Name] = true

if appProtocol, ok := expectedAppProtocols[port.Name]; ok {
assert.Equal(t, appProtocol, *port.AppProtocol)
}
}

// and make sure all the ports from the set are there
for _, val := range expectedPorts {
assert.True(t, val)
}

// make sure we only have the ports names in the set
for _, val := range expectedNames {
assert.True(t, val)
}
}

func TestNoPortsParsed(t *testing.T) {
Expand Down Expand Up @@ -162,7 +201,7 @@ func TestParserFailed(t *testing.T) {
// prepare
mockParserCalled := false
mockParser := &mockParser{
portsFunc: func() ([]v1.ServicePort, error) {
portsFunc: func() ([]corev1.ServicePort, error) {
mockParserCalled = true
return nil, errors.New("mocked error")
},
Expand Down
20 changes: 13 additions & 7 deletions pkg/collector/parser/receiver_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ var _ ReceiverParser = &GenericReceiver{}

// GenericReceiver is a special parser for generic receivers. It doesn't self-register and should be created/used directly.
type GenericReceiver struct {
logger logr.Logger
name string
config map[interface{}]interface{}
defaultPort int32
parserName string
logger logr.Logger
name string
config map[interface{}]interface{}
defaultPort int32
defaultProtocol corev1.Protocol
defaultAppProtocol *string
parserName string
}

// NOTE: Operator will sync with only receivers that aren't scrapers. Operator sync up receivers
Expand All @@ -49,13 +51,17 @@ func NewGenericReceiverParser(logger logr.Logger, name string, config map[interf
func (g *GenericReceiver) Ports() ([]corev1.ServicePort, error) {
port := singlePortFromConfigEndpoint(g.logger, g.name, g.config)
if port != nil {
port.Protocol = g.defaultProtocol
port.AppProtocol = g.defaultAppProtocol
return []corev1.ServicePort{*port}, nil
}

if g.defaultPort > 0 {
return []corev1.ServicePort{{
Port: g.defaultPort,
Name: portName(g.name, g.defaultPort),
Port: g.defaultPort,
Name: portName(g.name, g.defaultPort),
Protocol: g.defaultProtocol,
AppProtocol: g.defaultAppProtocol,
}}, nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/collector/parser/receiver_jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) {
name string
defaultPort int32
transportProtocol corev1.Protocol
appProtocol string
}{
{
name: "grpc",
defaultPort: defaultGRPCPort,
transportProtocol: corev1.ProtocolTCP,
appProtocol: "grpc",
},
{
name: "thrift_http",
defaultPort: defaultThriftHTTPPort,
transportProtocol: corev1.ProtocolTCP,
appProtocol: "http",
},
{
name: "thrift_compact",
Expand Down Expand Up @@ -109,6 +112,11 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) {
// set the appropriate transport protocol (i.e. TCP/UDP) for this kind of receiver protocol
protocolPort.Protocol = protocol.transportProtocol

if protocol.appProtocol != "" {
c := protocol.appProtocol
protocolPort.AppProtocol = &c
}

// at this point, we *have* a port specified, add it to the list of ports
ports = append(ports, *protocolPort)
}
Expand Down
38 changes: 27 additions & 11 deletions pkg/collector/parser/receiver_otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (
defaultOTLPHTTPPort int32 = 4318
)

var (
grpc = "grpc"
http = "http"
)

// OTLPReceiverParser parses the configuration for OTLP receivers.
type OTLPReceiverParser struct {
logger logr.Logger
Expand Down Expand Up @@ -64,27 +69,30 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) {
defaultPorts []corev1.ServicePort
}{
{
name: "grpc",
name: grpc,
defaultPorts: []corev1.ServicePort{
{
Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort),
Port: defaultOTLPGRPCPort,
TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)),
Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort),
Port: defaultOTLPGRPCPort,
TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)),
AppProtocol: &grpc,
},
},
},
{
name: "http",
name: http,
defaultPorts: []corev1.ServicePort{
{
Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort),
Port: defaultOTLPHTTPPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)),
Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort),
Port: defaultOTLPHTTPPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)),
AppProtocol: &http,
},
{
Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort),
Port: defaultOTLPHTTPLegacyPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), // we target the official port, not the legacy
Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort),
Port: defaultOTLPHTTPLegacyPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), // we target the official port, not the legacy
AppProtocol: &http,
},
},
},
Expand All @@ -106,6 +114,14 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) {
if protocolPort == nil {
ports = append(ports, protocol.defaultPorts...)
} else {
// infer protocol and appProtocol from protocol.name
if protocol.name == grpc {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &grpc
} else if protocol.name == http {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &http
}
ports = append(ports, *protocolPort)
}
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/collector/parser/receiver_zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@

package parser

import "github.com/go-logr/logr"
import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
)

const parserNameZipkin = "__zipkin"

// NewZipkinReceiverParser builds a new parser for Zipkin receivers.
func NewZipkinReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) ReceiverParser {
http := "http"
return &GenericReceiver{
logger: logger,
name: name,
config: config,
defaultPort: 9411,
parserName: parserNameZipkin,
logger: logger,
name: name,
config: config,
defaultPort: 9411,
defaultProtocol: corev1.ProtocolTCP,
defaultAppProtocol: &http,
parserName: parserNameZipkin,
}
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/collector/reconcile/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,12 @@ func TestDesiredService(t *testing.T) {
})
t.Run("should return service with port mentioned in Instance.Spec.Ports and inferred ports", func(t *testing.T) {

grpc := "grpc"
jaegerPorts := v1.ServicePort{
Name: "jaeger-grpc",
Protocol: "TCP",
Port: 14250,
Name: "jaeger-grpc",
Protocol: "TCP",
Port: 14250,
AppProtocol: &grpc,
}
ports := append(params().Instance.Spec.Ports, jaegerPorts)
expected := service("test-collector", ports)
Expand Down
28 changes: 28 additions & 0 deletions tests/e2e/smoke-simplest/00-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,31 @@ metadata:
name: simplest-collector
status:
readyReplicas: 1

---

apiVersion: v1
kind: Service
metadata:
name: simplest-collector-headless
spec:
ports:
- appProtocol: grpc
name: jaeger-grpc
port: 14250
protocol: TCP
targetPort: 14250

---

apiVersion: v1
kind: Service
metadata:
name: simplest-collector
spec:
ports:
- appProtocol: grpc
name: jaeger-grpc
port: 14250
protocol: TCP
targetPort: 14250

0 comments on commit 6026da6

Please sign in to comment.