diff --git a/pkg/collector/adapters/config_to_ports.go b/pkg/collector/adapters/config_to_ports.go index a3fc2822cb..de1dc2ab08 100644 --- a/pkg/collector/adapters/config_to_ports.go +++ b/pkg/collector/adapters/config_to_ports.go @@ -51,7 +51,10 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{ if !ok { return nil, ErrNoReceivers } - + recEnabled := GetEnabledReceivers(logger, config) + if recEnabled == nil { + return nil, ErrReceiversNotAMap + } receivers, ok := receiversProperty.(map[interface{}]interface{}) if !ok { return nil, ErrReceiversNotAMap @@ -59,6 +62,11 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{ ports := []corev1.ServicePort{} for key, val := range receivers { + // This check will pass only the enabled receivers, + // then only the related ports will be opened. + if !recEnabled[key] { + continue + } receiver, ok := val.(map[interface{}]interface{}) if !ok { logger.Info("receiver doesn't seem to be a map of properties", "receiver", key) diff --git a/pkg/collector/adapters/config_to_ports_test.go b/pkg/collector/adapters/config_to_ports_test.go index 65d4d15799..10f8109c54 100644 --- a/pkg/collector/adapters/config_to_ports_test.go +++ b/pkg/collector/adapters/config_to_ports_test.go @@ -62,7 +62,18 @@ func TestExtractPortsFromConfig(t *testing.T) { endpoint: 0.0.0.0:55555 zipkin: zipkin/2: - endpoint: 0.0.0.0:33333 + endpoint: 0.0.0.0:33333 +service: + pipelines: + metrics: + receivers: [examplereceiver, examplereceiver/settings] + exporters: [logging] + metrics/1: + receivers: [jaeger, jaeger/custom] + exporters: [logging] + metrics/2: + receivers: [otlp, otlp/2, zipkin] + exporters: [logging] ` // prepare @@ -73,7 +84,7 @@ func TestExtractPortsFromConfig(t *testing.T) { // test ports, err := adapters.ConfigToReceiverPorts(logger, config) assert.NoError(t, err) - assert.Len(t, ports, 12) + assert.Len(t, ports, 11) // verify expectedPorts := map[int32]bool{} @@ -87,7 +98,6 @@ func TestExtractPortsFromConfig(t *testing.T) { expectedPorts[int32(55681)] = false expectedPorts[int32(55555)] = false expectedPorts[int32(9411)] = false - expectedPorts[int32(33333)] = false expectedNames := map[string]bool{} expectedNames["examplereceiver"] = false @@ -101,7 +111,6 @@ func TestExtractPortsFromConfig(t *testing.T) { expectedNames["otlp-http-legacy"] = false expectedNames["otlp-2-grpc"] = false expectedNames["zipkin"] = false - expectedNames["zipkin-2"] = false expectedAppProtocols := map[string]string{} expectedAppProtocols["otlp-grpc"] = "grpc" @@ -111,7 +120,6 @@ func TestExtractPortsFromConfig(t *testing.T) { expectedAppProtocols["jaeger-grpc"] = "grpc" expectedAppProtocols["otlp-2-grpc"] = "grpc" expectedAppProtocols["zipkin"] = "http" - expectedAppProtocols["zipkin-2"] = "http" // make sure we only have the ports in the set for _, port := range ports { @@ -175,11 +183,11 @@ func TestInvalidReceivers(t *testing.T) { }{ { "receiver isn't a map", - "receivers:\n some-receiver: string", + "receivers:\n some-receiver: string\nservice:\n pipelines:\n metrics:\n receivers: [some-receiver]", }, { "receiver's endpoint isn't string", - "receivers:\n some-receiver:\n endpoint: 123", + "receivers:\n some-receiver:\n endpoint: 123\nservice:\n pipelines:\n metrics:\n receivers: [some-receiver]", }, } { t.Run(tt.desc, func(t *testing.T) { @@ -212,7 +220,14 @@ func TestParserFailed(t *testing.T) { config := map[interface{}]interface{}{ "receivers": map[interface{}]interface{}{ - "mock": map[interface{}]interface{}{}, + "mock": map[string]interface{}{}, + }, + "service": map[interface{}]interface{}{ + "pipelines": map[interface{}]interface{}{ + "metrics": map[interface{}]interface{}{ + "receivers": []interface{}{"mock"}, + }, + }, }, } diff --git a/pkg/collector/adapters/config_validate.go b/pkg/collector/adapters/config_validate.go new file mode 100644 index 0000000000..c7fc0fa87a --- /dev/null +++ b/pkg/collector/adapters/config_validate.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapters + +import ( + "github.com/go-logr/logr" +) + +//Following Otel Doc: Configuring a receiver does not enable it. The receivers are enabled via pipelines within the service section. +//GetEnabledReceivers returns all enabled receivers as a true flag set. If it can't find any receiver, it will return a nil interface. +func GetEnabledReceivers(logger logr.Logger, config map[interface{}]interface{}) map[interface{}]bool { + cfgReceivers, ok := config["receivers"] + if !ok { + return nil + } + receivers, ok := cfgReceivers.(map[interface{}]interface{}) + if !ok { + return nil + } + availableReceivers := map[interface{}]bool{} + + for recvID := range receivers { + + //Safe Cast + receiverID, ok := recvID.(string) + if !ok { + return nil + } + //Getting all receivers present in the receivers section and setting them to false. + availableReceivers[receiverID] = false + } + + cfgService, ok := config["service"].(map[interface{}]interface{}) + if !ok { + return nil + } + + pipeline, ok := cfgService["pipelines"].(map[interface{}]interface{}) + if !ok { + return nil + } + availablePipelines := map[string]bool{} + + for pipID := range pipeline { + //Safe Cast + pipelineID, ok := pipID.(string) + if !ok { + return nil + } + //Getting all the available pipelines. + availablePipelines[pipelineID] = true + } + + if len(pipeline) > 0 { + for pipelineID, pipelineCfg := range pipeline { + //Safe Cast + pipelineV, ok := pipelineID.(string) + if !ok { + continue + } + //Condition will get information if there are multiple configured pipelines. + if len(pipelineV) > 0 { + pipelineDesc, ok := pipelineCfg.(map[interface{}]interface{}) + if !ok { + return nil + } + for pipSpecID, pipSpecCfg := range pipelineDesc { + if pipSpecID.(string) == "receivers" { + receiversList, ok := pipSpecCfg.([]interface{}) + if !ok { + continue + } + // If receiversList is empty means that we haven't any enabled Receiver. + if len(receiversList) == 0 { + availableReceivers = nil + } else { + // All enabled receivers will be set as true + for _, recKey := range receiversList { + //Safe Cast + receiverKey, ok := recKey.(string) + if !ok { + return nil + } + availableReceivers[receiverKey] = true + } + } + //Removing all non-enabled receivers + for recID, recKey := range availableReceivers { + if !(recKey) { + delete(availableReceivers, recID) + } + } + } + } + } + } + } + return availableReceivers +} diff --git a/pkg/collector/adapters/config_validate_test.go b/pkg/collector/adapters/config_validate_test.go new file mode 100644 index 0000000000..27cd8a7d76 --- /dev/null +++ b/pkg/collector/adapters/config_validate_test.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapters + +import ( + "testing" + + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/stretchr/testify/require" +) + +var logger = logf.Log.WithName("unit-tests") + +func TestConfigValidate(t *testing.T) { + // prepare + + // First Test - Exporters + configStr := ` +receivers: + httpd/mtls: + protocols: + http: + endpoint: mysite.local:55690 + jaeger: + protocols: + grpc: + prometheus: + protocols: + grpc: + +processors: + +exporters: + logging: + +service: + pipelines: + metrics: + receivers: [httpd/mtls, jaeger] + exporters: [logging] + metrics/1: + receivers: [httpd/mtls, jaeger] + exporters: [logging] +` + // // prepare + config, err := ConfigFromString(configStr) + require.NoError(t, err) + require.NotEmpty(t, config) + + // test + check := GetEnabledReceivers(logger, config) + require.NotEmpty(t, check) +} + +func TestEmptyEnabledReceivers(t *testing.T) { + // prepare + + // First Test - Exporters + configStr := ` +receivers: + httpd/mtls: + protocols: + http: + endpoint: mysite.local:55690 + jaeger: + protocols: + grpc: + prometheus: + protocols: + grpc: + +processors: + +exporters: + logging: + +service: + pipelines: + metrics: + receivers: [] + exporters: [] + metrics/1: + receivers: [] + exporters: [] +` + // // prepare + config, err := ConfigFromString(configStr) + require.NoError(t, err) + require.NotEmpty(t, config) + + // test + check := GetEnabledReceivers(logger, config) + require.Empty(t, check) +} diff --git a/pkg/collector/reconcile/configmap_test.go b/pkg/collector/reconcile/configmap_test.go index 09f0310820..b55c322607 100644 --- a/pkg/collector/reconcile/configmap_test.go +++ b/pkg/collector/reconcile/configmap_test.go @@ -64,7 +64,7 @@ exporters: service: pipelines: metrics: - receivers: [prometheus] + receivers: [prometheus, jaeger] processors: [] exporters: [logging]`, } @@ -114,6 +114,7 @@ service: processors: [] receivers: - prometheus + - jaeger `, } diff --git a/pkg/collector/testdata/test.yaml b/pkg/collector/testdata/test.yaml index a14808f3bc..c920cee82d 100644 --- a/pkg/collector/testdata/test.yaml +++ b/pkg/collector/testdata/test.yaml @@ -17,6 +17,6 @@ exporters: service: pipelines: metrics: - receivers: [prometheus] + receivers: [prometheus, jaeger] processors: [] exporters: [logging] \ No newline at end of file