Skip to content

Commit

Permalink
feat: add skywalking parser to parse servicePort from config (#1650)
Browse files Browse the repository at this point in the history
Co-authored-by: Pavol Loffay <p.loffay@gmail.com>
  • Loading branch information
gorexlv and pavolloffay authored Jun 28, 2023
1 parent 0bfbea9 commit 17e681d
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 0 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-skywalking-parser.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: collector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add Skywalking parser to extract skywalking service port from config

# One or more tracking issues related to the change
issues: [1634]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
128 changes: 128 additions & 0 deletions pkg/collector/parser/receiver_skywalking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parser

import (
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ ReceiverParser = &SkywalkingReceiverParser{}

const (
parserNameSkywalking = "__skywalking"

defaultSkywalkingGRPCPort int32 = 11800
defaultSkywalkingHTTPPort int32 = 12800
)

// SkywalkingReceiverParser parses the configuration for Skywalking receivers.
type SkywalkingReceiverParser struct {
config map[interface{}]interface{}
logger logr.Logger
name string
}

// NewSkywalkingReceiverParser builds a new parser for Skywalking receivers.
func NewSkywalkingReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) ReceiverParser {
if protocols, ok := config["protocols"].(map[interface{}]interface{}); ok {
return &SkywalkingReceiverParser{
logger: logger,
name: name,
config: protocols,
}
}

return &SkywalkingReceiverParser{
name: name,
config: map[interface{}]interface{}{},
}
}

// Ports returns all the service ports for all protocols in this parser.
func (o *SkywalkingReceiverParser) Ports() ([]corev1.ServicePort, error) {
ports := []corev1.ServicePort{}

for _, protocol := range []struct {
name string
defaultPorts []corev1.ServicePort
}{
{
name: grpc,
defaultPorts: []corev1.ServicePort{
{
Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultSkywalkingGRPCPort),
Port: defaultSkywalkingGRPCPort,
TargetPort: intstr.FromInt(int(defaultSkywalkingGRPCPort)),
AppProtocol: &grpc,
},
},
},
{
name: http,
defaultPorts: []corev1.ServicePort{
{
Name: portName(fmt.Sprintf("%s-http", o.name), defaultSkywalkingHTTPPort),
Port: defaultSkywalkingHTTPPort,
TargetPort: intstr.FromInt(int(defaultSkywalkingHTTPPort)),
AppProtocol: &http,
},
},
},
} {
// do we have the protocol specified at all?
if receiverProtocol, ok := o.config[protocol.name]; ok {
// we have the specified protocol, we definitely need a service port
nameWithProtocol := fmt.Sprintf("%s-%s", o.name, protocol.name)
var protocolPort *corev1.ServicePort

// do we have a configuration block for the protocol?
settings, ok := receiverProtocol.(map[interface{}]interface{})
if ok {
protocolPort = singlePortFromConfigEndpoint(o.logger, nameWithProtocol, settings)
}

// have we parsed a port based on the configuration block?
// if not, we use the default port
if protocolPort == nil {
ports = append(ports, protocol.defaultPorts...)
} else {
// infer protocol and appProtocol from protocol.name
if protocol.name == grpc {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &grpc
} else if protocol.name == http {
protocolPort.Protocol = corev1.ProtocolTCP
protocolPort.AppProtocol = &http
}
ports = append(ports, *protocolPort)
}
}
}

return ports, nil
}

// ParserName returns the name of this parser.
func (o *SkywalkingReceiverParser) ParserName() string {
return parserNameSkywalking
}

func init() {
Register("skywalking", NewSkywalkingReceiverParser)
}
108 changes: 108 additions & 0 deletions pkg/collector/parser/receiver_skywalking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parser

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSkywalkingSelfRegisters(t *testing.T) {
// verify
assert.True(t, IsRegistered("skywalking"))
}

func TestSkywalkingIsFoundByName(t *testing.T) {
// test
p := For(logger, "skywalking", map[interface{}]interface{}{})

// verify
assert.Equal(t, "__skywalking", p.ParserName())
}

func TestSkywalkingPortsOverridden(t *testing.T) {
// prepare
builder := NewSkywalkingReceiverParser(logger, "skywalking", map[interface{}]interface{}{
"protocols": map[interface{}]interface{}{
"grpc": map[interface{}]interface{}{
"endpoint": "0.0.0.0:1234",
},
"http": map[interface{}]interface{}{
"endpoint": "0.0.0.0:1235",
},
},
})

expectedResults := map[string]struct {
portNumber int32
seen bool
}{
"skywalking-grpc": {portNumber: 1234},
"skywalking-http": {portNumber: 1235},
}

// test
ports, err := builder.Ports()

// verify
assert.NoError(t, err)
assert.Len(t, ports, len(expectedResults))

for _, port := range ports {
r := expectedResults[port.Name]
r.seen = true
expectedResults[port.Name] = r
assert.EqualValues(t, r.portNumber, port.Port)
}
for k, v := range expectedResults {
assert.True(t, v.seen, "the port %s wasn't included in the service ports", k)
}
}

func TestSkywalkingExposeDefaultPorts(t *testing.T) {
// prepare
builder := NewSkywalkingReceiverParser(logger, "skywalking", map[interface{}]interface{}{
"protocols": map[interface{}]interface{}{
"grpc": map[interface{}]interface{}{},
"http": map[interface{}]interface{}{},
},
})

expectedResults := map[string]struct {
portNumber int32
seen bool
}{
"skywalking-grpc": {portNumber: 11800},
"skywalking-http": {portNumber: 12800},
}

// test
ports, err := builder.Ports()

// verify
assert.NoError(t, err)
assert.Len(t, ports, len(expectedResults))

for _, port := range ports {
r := expectedResults[port.Name]
r.seen = true
expectedResults[port.Name] = r
assert.EqualValues(t, r.portNumber, port.Port)
}
for k, v := range expectedResults {
assert.True(t, v.seen, "the port %s wasn't included in the service ports", k)
}
}

0 comments on commit 17e681d

Please sign in to comment.