From 17e681d1df853be639c6351ad528677d8c33fba0 Mon Sep 17 00:00:00 2001 From: Chao Lv Date: Wed, 28 Jun 2023 23:08:51 +0800 Subject: [PATCH] feat: add skywalking parser to parse servicePort from config (#1650) Co-authored-by: Pavol Loffay --- .chloggen/add-skywalking-parser.yaml | 16 +++ pkg/collector/parser/receiver_skywalking.go | 128 ++++++++++++++++++ .../parser/receiver_skywalking_test.go | 108 +++++++++++++++ 3 files changed, 252 insertions(+) create mode 100644 .chloggen/add-skywalking-parser.yaml create mode 100644 pkg/collector/parser/receiver_skywalking.go create mode 100644 pkg/collector/parser/receiver_skywalking_test.go diff --git a/.chloggen/add-skywalking-parser.yaml b/.chloggen/add-skywalking-parser.yaml new file mode 100644 index 0000000000..c5fef81f6f --- /dev/null +++ b/.chloggen/add-skywalking-parser.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action) +component: collector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add Skywalking parser to extract skywalking service port from config + +# One or more tracking issues related to the change +issues: [1634] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/collector/parser/receiver_skywalking.go b/pkg/collector/parser/receiver_skywalking.go new file mode 100644 index 0000000000..bddd921146 --- /dev/null +++ b/pkg/collector/parser/receiver_skywalking.go @@ -0,0 +1,128 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parser + +import ( + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +var _ ReceiverParser = &SkywalkingReceiverParser{} + +const ( + parserNameSkywalking = "__skywalking" + + defaultSkywalkingGRPCPort int32 = 11800 + defaultSkywalkingHTTPPort int32 = 12800 +) + +// SkywalkingReceiverParser parses the configuration for Skywalking receivers. +type SkywalkingReceiverParser struct { + config map[interface{}]interface{} + logger logr.Logger + name string +} + +// NewSkywalkingReceiverParser builds a new parser for Skywalking receivers. +func NewSkywalkingReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) ReceiverParser { + if protocols, ok := config["protocols"].(map[interface{}]interface{}); ok { + return &SkywalkingReceiverParser{ + logger: logger, + name: name, + config: protocols, + } + } + + return &SkywalkingReceiverParser{ + name: name, + config: map[interface{}]interface{}{}, + } +} + +// Ports returns all the service ports for all protocols in this parser. +func (o *SkywalkingReceiverParser) Ports() ([]corev1.ServicePort, error) { + ports := []corev1.ServicePort{} + + for _, protocol := range []struct { + name string + defaultPorts []corev1.ServicePort + }{ + { + name: grpc, + defaultPorts: []corev1.ServicePort{ + { + Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultSkywalkingGRPCPort), + Port: defaultSkywalkingGRPCPort, + TargetPort: intstr.FromInt(int(defaultSkywalkingGRPCPort)), + AppProtocol: &grpc, + }, + }, + }, + { + name: http, + defaultPorts: []corev1.ServicePort{ + { + Name: portName(fmt.Sprintf("%s-http", o.name), defaultSkywalkingHTTPPort), + Port: defaultSkywalkingHTTPPort, + TargetPort: intstr.FromInt(int(defaultSkywalkingHTTPPort)), + AppProtocol: &http, + }, + }, + }, + } { + // do we have the protocol specified at all? + if receiverProtocol, ok := o.config[protocol.name]; ok { + // we have the specified protocol, we definitely need a service port + nameWithProtocol := fmt.Sprintf("%s-%s", o.name, protocol.name) + var protocolPort *corev1.ServicePort + + // do we have a configuration block for the protocol? + settings, ok := receiverProtocol.(map[interface{}]interface{}) + if ok { + protocolPort = singlePortFromConfigEndpoint(o.logger, nameWithProtocol, settings) + } + + // have we parsed a port based on the configuration block? + // if not, we use the default port + if protocolPort == nil { + ports = append(ports, protocol.defaultPorts...) + } else { + // infer protocol and appProtocol from protocol.name + if protocol.name == grpc { + protocolPort.Protocol = corev1.ProtocolTCP + protocolPort.AppProtocol = &grpc + } else if protocol.name == http { + protocolPort.Protocol = corev1.ProtocolTCP + protocolPort.AppProtocol = &http + } + ports = append(ports, *protocolPort) + } + } + } + + return ports, nil +} + +// ParserName returns the name of this parser. +func (o *SkywalkingReceiverParser) ParserName() string { + return parserNameSkywalking +} + +func init() { + Register("skywalking", NewSkywalkingReceiverParser) +} diff --git a/pkg/collector/parser/receiver_skywalking_test.go b/pkg/collector/parser/receiver_skywalking_test.go new file mode 100644 index 0000000000..46d60cd846 --- /dev/null +++ b/pkg/collector/parser/receiver_skywalking_test.go @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parser + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSkywalkingSelfRegisters(t *testing.T) { + // verify + assert.True(t, IsRegistered("skywalking")) +} + +func TestSkywalkingIsFoundByName(t *testing.T) { + // test + p := For(logger, "skywalking", map[interface{}]interface{}{}) + + // verify + assert.Equal(t, "__skywalking", p.ParserName()) +} + +func TestSkywalkingPortsOverridden(t *testing.T) { + // prepare + builder := NewSkywalkingReceiverParser(logger, "skywalking", map[interface{}]interface{}{ + "protocols": map[interface{}]interface{}{ + "grpc": map[interface{}]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + "http": map[interface{}]interface{}{ + "endpoint": "0.0.0.0:1235", + }, + }, + }) + + expectedResults := map[string]struct { + portNumber int32 + seen bool + }{ + "skywalking-grpc": {portNumber: 1234}, + "skywalking-http": {portNumber: 1235}, + } + + // test + ports, err := builder.Ports() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(expectedResults)) + + for _, port := range ports { + r := expectedResults[port.Name] + r.seen = true + expectedResults[port.Name] = r + assert.EqualValues(t, r.portNumber, port.Port) + } + for k, v := range expectedResults { + assert.True(t, v.seen, "the port %s wasn't included in the service ports", k) + } +} + +func TestSkywalkingExposeDefaultPorts(t *testing.T) { + // prepare + builder := NewSkywalkingReceiverParser(logger, "skywalking", map[interface{}]interface{}{ + "protocols": map[interface{}]interface{}{ + "grpc": map[interface{}]interface{}{}, + "http": map[interface{}]interface{}{}, + }, + }) + + expectedResults := map[string]struct { + portNumber int32 + seen bool + }{ + "skywalking-grpc": {portNumber: 11800}, + "skywalking-http": {portNumber: 12800}, + } + + // test + ports, err := builder.Ports() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(expectedResults)) + + for _, port := range ports { + r := expectedResults[port.Name] + r.seen = true + expectedResults[port.Name] = r + assert.EqualValues(t, r.portNumber, port.Port) + } + for k, v := range expectedResults { + assert.True(t, v.seen, "the port %s wasn't included in the service ports", k) + } +}