Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add appProtocol for otlp and jaeger receiver parsers #704

Merged
merged 1 commit into from
Feb 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions pkg/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"
Expand Down Expand Up @@ -53,6 +52,17 @@ func TestExtractPortsFromConfig(t *testing.T) {
protocols:
thrift_http:
endpoint: 0.0.0.0:15268
otlp:
protocols:
grpc:
http:
otlp/2:
protocols:
grpc:
endpoint: 0.0.0.0:55555
zipkin:
zipkin/2:
endpoint: 0.0.0.0:33333
`

// prepare
Expand All @@ -63,7 +73,7 @@ func TestExtractPortsFromConfig(t *testing.T) {
// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
assert.NoError(t, err)
assert.Len(t, ports, 6)
assert.Len(t, ports, 12)

// verify
expectedPorts := map[int32]bool{}
Expand All @@ -73,6 +83,11 @@ func TestExtractPortsFromConfig(t *testing.T) {
expectedPorts[int32(6831)] = false
expectedPorts[int32(6833)] = false
expectedPorts[int32(15268)] = false
expectedPorts[int32(4318)] = false
expectedPorts[int32(55681)] = false
expectedPorts[int32(55555)] = false
expectedPorts[int32(9411)] = false
expectedPorts[int32(33333)] = false

expectedNames := map[string]bool{}
expectedNames["examplereceiver"] = false
Expand All @@ -81,20 +96,44 @@ func TestExtractPortsFromConfig(t *testing.T) {
expectedNames["jaeger-thrift-compact"] = false
expectedNames["jaeger-thrift-binary"] = false
expectedNames["jaeger-custom-thrift-http"] = false
expectedNames["otlp-grpc"] = false
expectedNames["otlp-http"] = false
expectedNames["otlp-http-legacy"] = false
expectedNames["otlp-2-grpc"] = false
expectedNames["zipkin"] = false
expectedNames["zipkin-2"] = false

expectedAppProtocols := map[string]string{}
expectedAppProtocols["otlp-grpc"] = "grpc"
expectedAppProtocols["otlp-http"] = "http"
expectedAppProtocols["otlp-http-legacy"] = "http"
expectedAppProtocols["jaeger-custom-thrift-http"] = "http"
expectedAppProtocols["jaeger-grpc"] = "grpc"
expectedAppProtocols["otlp-2-grpc"] = "grpc"
expectedAppProtocols["zipkin"] = "http"
expectedAppProtocols["zipkin-2"] = "http"

// make sure we only have the ports in the set
for _, port := range ports {
assert.NotNil(t, expectedPorts[port.Port])
assert.NotNil(t, expectedNames[port.Name])
expectedPorts[port.Port] = true
expectedNames[port.Name] = true

if appProtocol, ok := expectedAppProtocols[port.Name]; ok {
assert.Equal(t, appProtocol, *port.AppProtocol)
}
}

// and make sure all the ports from the set are there
for _, val := range expectedPorts {
assert.True(t, val)
}

// make sure we only have the ports names in the set
for _, val := range expectedNames {
assert.True(t, val)
}
}

func TestNoPortsParsed(t *testing.T) {
Expand Down Expand Up @@ -162,7 +201,7 @@ func TestParserFailed(t *testing.T) {
// prepare
mockParserCalled := false
mockParser := &mockParser{
portsFunc: func() ([]v1.ServicePort, error) {
portsFunc: func() ([]corev1.ServicePort, error) {
mockParserCalled = true
return nil, errors.New("mocked error")
},
Expand Down
20 changes: 13 additions & 7 deletions pkg/collector/parser/receiver_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ var _ ReceiverParser = &GenericReceiver{}

// GenericReceiver is a special parser for generic receivers. It doesn't self-register and should be created/used directly.
type GenericReceiver struct {
logger logr.Logger
name string
config map[interface{}]interface{}
defaultPort int32
parserName string
logger logr.Logger
name string
config map[interface{}]interface{}
defaultPort int32
defaultProtocol corev1.Protocol
defaultAppProtocol *string
parserName string
}

// NOTE: Operator will sync with only receivers that aren't scrapers. Operator sync up receivers
Expand All @@ -49,13 +51,17 @@ func NewGenericReceiverParser(logger logr.Logger, name string, config map[interf
func (g *GenericReceiver) Ports() ([]corev1.ServicePort, error) {
port := singlePortFromConfigEndpoint(g.logger, g.name, g.config)
if port != nil {
port.Protocol = g.defaultProtocol
port.AppProtocol = g.defaultAppProtocol
return []corev1.ServicePort{*port}, nil
}

if g.defaultPort > 0 {
return []corev1.ServicePort{{
Port: g.defaultPort,
Name: portName(g.name, g.defaultPort),
Port: g.defaultPort,
Name: portName(g.name, g.defaultPort),
Protocol: g.defaultProtocol,
AppProtocol: g.defaultAppProtocol,
}}, nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/collector/parser/receiver_jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) {
name string
defaultPort int32
transportProtocol corev1.Protocol
appProtocol string
}{
{
name: "grpc",
defaultPort: defaultGRPCPort,
transportProtocol: corev1.ProtocolTCP,
appProtocol: "grpc",
},
{
name: "thrift_http",
defaultPort: defaultThriftHTTPPort,
transportProtocol: corev1.ProtocolTCP,
appProtocol: "http",
},
{
name: "thrift_compact",
Expand Down Expand Up @@ -109,6 +112,11 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) {
// set the appropriate transport protocol (i.e. TCP/UDP) for this kind of receiver protocol
protocolPort.Protocol = protocol.transportProtocol

if protocol.appProtocol != "" {
c := protocol.appProtocol
protocolPort.AppProtocol = &c
}

// at this point, we *have* a port specified, add it to the list of ports
ports = append(ports, *protocolPort)
}
Expand Down
38 changes: 27 additions & 11 deletions pkg/collector/parser/receiver_otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (
defaultOTLPHTTPPort int32 = 4318
)

var (
grpc = "grpc"
http = "http"
)

// OTLPReceiverParser parses the configuration for OTLP receivers.
type OTLPReceiverParser struct {
logger logr.Logger
Expand Down Expand Up @@ -64,27 +69,30 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) {
defaultPorts []corev1.ServicePort
}{
{
name: "grpc",
name: grpc,
defaultPorts: []corev1.ServicePort{
{
Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort),
Port: defaultOTLPGRPCPort,
TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)),
Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort),
Port: defaultOTLPGRPCPort,
TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)),
AppProtocol: &grpc,
},
},
},
{
name: "http",
name: http,
defaultPorts: []corev1.ServicePort{
{
Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort),
Port: defaultOTLPHTTPPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)),
Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort),
Port: defaultOTLPHTTPPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)),
AppProtocol: &http,
},
{
Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort),
Port: defaultOTLPHTTPLegacyPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), // we target the official port, not the legacy
Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort),
Port: defaultOTLPHTTPLegacyPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), // we target the official port, not the legacy
AppProtocol: &http,
},
},
},
Expand All @@ -106,6 +114,14 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) {
if protocolPort == nil {
ports = append(ports, protocol.defaultPorts...)
} else {
// infer protocol and appProtocol from protocol.name
if protocol.name == grpc {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &grpc
} else if protocol.name == http {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &http
}
ports = append(ports, *protocolPort)
}
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/collector/parser/receiver_zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@

package parser

import "github.com/go-logr/logr"
import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
)

const parserNameZipkin = "__zipkin"

// NewZipkinReceiverParser builds a new parser for Zipkin receivers.
func NewZipkinReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) ReceiverParser {
http := "http"
return &GenericReceiver{
logger: logger,
name: name,
config: config,
defaultPort: 9411,
parserName: parserNameZipkin,
logger: logger,
name: name,
config: config,
defaultPort: 9411,
defaultProtocol: corev1.ProtocolTCP,
defaultAppProtocol: &http,
parserName: parserNameZipkin,
}
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/collector/reconcile/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,12 @@ func TestDesiredService(t *testing.T) {
})
t.Run("should return service with port mentioned in Instance.Spec.Ports and inferred ports", func(t *testing.T) {

grpc := "grpc"
jaegerPorts := v1.ServicePort{
Name: "jaeger-grpc",
Protocol: "TCP",
Port: 14250,
Name: "jaeger-grpc",
Protocol: "TCP",
Port: 14250,
AppProtocol: &grpc,
}
ports := append(params().Instance.Spec.Ports, jaegerPorts)
expected := service("test-collector", ports)
Expand Down
28 changes: 28 additions & 0 deletions tests/e2e/smoke-simplest/00-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,31 @@ metadata:
name: simplest-collector
status:
readyReplicas: 1

---

apiVersion: v1
kind: Service
metadata:
name: simplest-collector-headless
spec:
ports:
- appProtocol: grpc
name: jaeger-grpc
port: 14250
protocol: TCP
targetPort: 14250

---

apiVersion: v1
kind: Service
metadata:
name: simplest-collector
spec:
ports:
- appProtocol: grpc
name: jaeger-grpc
port: 14250
protocol: TCP
targetPort: 14250