Skip to content

Commit

Permalink
Add appProtocol for otlp and jaeger receiver parsers
Browse files Browse the repository at this point in the history
Signed-off-by: Sergei Semenchuk <pdp.eleven11@gmail.com>
  • Loading branch information
binjip978 committed Feb 11, 2022
1 parent e8d6f8b commit 97d4221
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 17 deletions.
40 changes: 37 additions & 3 deletions pkg/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"
Expand Down Expand Up @@ -53,6 +52,15 @@ func TestExtractPortsFromConfig(t *testing.T) {
protocols:
thrift_http:
endpoint: 0.0.0.0:15268
otlp:
protocols:
grpc:
http:
otlp/2:
protocols:
grpc:
endpoint: 0.0.0.0:55555
zipkin:
`

// prepare
Expand All @@ -63,7 +71,7 @@ func TestExtractPortsFromConfig(t *testing.T) {
// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
assert.NoError(t, err)
assert.Len(t, ports, 6)
assert.Len(t, ports, 11)

// verify
expectedPorts := map[int32]bool{}
Expand All @@ -73,6 +81,10 @@ func TestExtractPortsFromConfig(t *testing.T) {
expectedPorts[int32(6831)] = false
expectedPorts[int32(6833)] = false
expectedPorts[int32(15268)] = false
expectedPorts[int32(4318)] = false
expectedPorts[int32(55681)] = false
expectedPorts[int32(55555)] = false
expectedPorts[int32(9411)] = false

expectedNames := map[string]bool{}
expectedNames["examplereceiver"] = false
Expand All @@ -81,20 +93,42 @@ func TestExtractPortsFromConfig(t *testing.T) {
expectedNames["jaeger-thrift-compact"] = false
expectedNames["jaeger-thrift-binary"] = false
expectedNames["jaeger-custom-thrift-http"] = false
expectedNames["otlp-grpc"] = false
expectedNames["otlp-http"] = false
expectedNames["otlp-http-legacy"] = false
expectedNames["otlp-2-grpc"] = false
expectedNames["zipkin"] = false

expectedAppProtocols := map[string]string{}
expectedAppProtocols["otlp-grpc"] = "grpc"
expectedAppProtocols["otlp-http"] = "http"
expectedAppProtocols["otlp-http-legacy"] = "http"
expectedAppProtocols["jaeger-custom-thrift-http"] = "http"
expectedAppProtocols["jaeger-grpc"] = "grpc"
expectedAppProtocols["otlp-2-grpc"] = "grpc"
expectedAppProtocols["zipkin"] = "http"

// make sure we only have the ports in the set
for _, port := range ports {
assert.NotNil(t, expectedPorts[port.Port])
assert.NotNil(t, expectedNames[port.Name])
expectedPorts[port.Port] = true
expectedNames[port.Name] = true

if appProtocol, ok := expectedAppProtocols[port.Name]; ok {
assert.Equal(t, appProtocol, *port.AppProtocol)
}
}

// and make sure all the ports from the set are there
for _, val := range expectedPorts {
assert.True(t, val)
}

// make sure we only have the ports names in the set
for _, val := range expectedNames {
assert.True(t, val)
}
}

func TestNoPortsParsed(t *testing.T) {
Expand Down Expand Up @@ -162,7 +196,7 @@ func TestParserFailed(t *testing.T) {
// prepare
mockParserCalled := false
mockParser := &mockParser{
portsFunc: func() ([]v1.ServicePort, error) {
portsFunc: func() ([]corev1.ServicePort, error) {
mockParserCalled = true
return nil, errors.New("mocked error")
},
Expand Down
10 changes: 10 additions & 0 deletions pkg/collector/parser/receiver_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ func (g *GenericReceiver) Ports() ([]corev1.ServicePort, error) {
}

if g.defaultPort > 0 {
if g.name == "zipkin" {
http := "http"
return []corev1.ServicePort{{
Port: g.defaultPort,
Name: portName(g.name, g.defaultPort),
Protocol: corev1.ProtocolTCP,
AppProtocol: &http,
}}, nil
}

return []corev1.ServicePort{{
Port: g.defaultPort,
Name: portName(g.name, g.defaultPort),
Expand Down
8 changes: 8 additions & 0 deletions pkg/collector/parser/receiver_jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) {
name string
defaultPort int32
transportProtocol corev1.Protocol
appProtocol string
}{
{
name: "grpc",
defaultPort: defaultGRPCPort,
transportProtocol: corev1.ProtocolTCP,
appProtocol: "grpc",
},
{
name: "thrift_http",
defaultPort: defaultThriftHTTPPort,
transportProtocol: corev1.ProtocolTCP,
appProtocol: "http",
},
{
name: "thrift_compact",
Expand Down Expand Up @@ -109,6 +112,11 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) {
// set the appropriate transport protocol (i.e. TCP/UDP) for this kind of receiver protocol
protocolPort.Protocol = protocol.transportProtocol

if protocol.appProtocol != "" {
c := protocol.appProtocol
protocolPort.AppProtocol = &c
}

// at this point, we *have* a port specified, add it to the list of ports
ports = append(ports, *protocolPort)
}
Expand Down
38 changes: 27 additions & 11 deletions pkg/collector/parser/receiver_otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (
defaultOTLPHTTPPort int32 = 4318
)

var (
grpc = "grpc"
http = "http"
)

// OTLPReceiverParser parses the configuration for OTLP receivers.
type OTLPReceiverParser struct {
logger logr.Logger
Expand Down Expand Up @@ -64,27 +69,30 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) {
defaultPorts []corev1.ServicePort
}{
{
name: "grpc",
name: grpc,
defaultPorts: []corev1.ServicePort{
{
Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort),
Port: defaultOTLPGRPCPort,
TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)),
Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort),
Port: defaultOTLPGRPCPort,
TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)),
AppProtocol: &grpc,
},
},
},
{
name: "http",
name: http,
defaultPorts: []corev1.ServicePort{
{
Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort),
Port: defaultOTLPHTTPPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)),
Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort),
Port: defaultOTLPHTTPPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)),
AppProtocol: &http,
},
{
Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort),
Port: defaultOTLPHTTPLegacyPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), // we target the official port, not the legacy
Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort),
Port: defaultOTLPHTTPLegacyPort,
TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), // we target the official port, not the legacy
AppProtocol: &http,
},
},
},
Expand All @@ -106,6 +114,14 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) {
if protocolPort == nil {
ports = append(ports, protocol.defaultPorts...)
} else {
// infer protocol and appProtocol from protocol.name
if protocol.name == grpc {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &grpc
} else if protocol.name == http {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &http
}
ports = append(ports, *protocolPort)
}
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/collector/reconcile/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,12 @@ func TestDesiredService(t *testing.T) {
})
t.Run("should return service with port mentioned in Instance.Spec.Ports and inferred ports", func(t *testing.T) {

grpc := "grpc"
jaegerPorts := v1.ServicePort{
Name: "jaeger-grpc",
Protocol: "TCP",
Port: 14250,
Name: "jaeger-grpc",
Protocol: "TCP",
Port: 14250,
AppProtocol: &grpc,
}
ports := append(params().Instance.Spec.Ports, jaegerPorts)
expected := service("test-collector", ports)
Expand Down

0 comments on commit 97d4221

Please sign in to comment.