Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Generate function and extension logs via Telemetry API receiver #1347

Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
bd5dd05
Added WithLogs and its handling
jerrytfleung May 24, 2024
63558a8
nits
jerrytfleung May 25, 2024
1ae4fdd
Added extensions
jerrytfleung May 25, 2024
0f74cbe
Fixed unit tests
jerrytfleung May 28, 2024
0c54292
Added unit test cases
jerrytfleung May 28, 2024
dd2e317
Added config (#26)
jerrytfleung Jun 20, 2024
13a99af
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Jun 20, 2024
4ff5876
Added severityTextToNumber function
jerrytfleung Jun 20, 2024
730cfc9
Corrected README.md
jerrytfleung Jun 27, 2024
999a7ce
Handled empty types array
jerrytfleung Jun 27, 2024
1190cb5
Added CRITICAL & ALL
jerrytfleung Jun 27, 2024
165cdda
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Jul 15, 2024
26a3c68
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Jul 17, 2024
16f92d1
Removed invalid test case
jerrytfleung Jul 17, 2024
2575871
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Aug 22, 2024
a297035
Fixed code after rebase
jerrytfleung Aug 22, 2024
2d33076
Updated README.md
jerrytfleung Aug 22, 2024
cdea000
Used time.RFC3339 format
jerrytfleung Aug 22, 2024
166628a
Applied review comments
jerrytfleung Aug 27, 2024
2e03ffe
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Aug 28, 2024
3b7cda5
Added WARNING, Updated test cases, Added String.ToUpper
jerrytfleung Aug 28, 2024
1bc05fb
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Sep 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
}

telemetryClient := telemetryapi.NewClient(logger)
_, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr)
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
if err != nil {
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
}
Expand Down
8 changes: 1 addition & 7 deletions collector/internal/telemetryapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ func NewClient(logger *zap.Logger) *Client {
}
}

func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) {
eventTypes := []EventType{
Platform,
// Function,
// Extension,
}

func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) {
bufferingConfig := BufferingCfg{
MaxItems: 1000,
MaxBytes: 256 * 1024,
Expand Down
26 changes: 20 additions & 6 deletions collector/receiver/telemetryapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Telemetry API Receiver

| Status | |
| ------------------------ |-----------------|
| Stability | [alpha] |
| Supported pipeline types | traces |
| Distributions | [extension] |
| Status | |
| ------------------------ |--------------|
| Stability | [alpha] |
| Supported pipeline types | traces, logs |
| Distributions | [extension] |

This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup.

Expand All @@ -15,11 +15,25 @@ Supported events:

## Configuration

There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration:
| Field | Default | Description |
|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 4325 | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |


```yaml
receivers:
telemetryapi:
telemetryapi/1:
port: 4326
telemetryapi/2:
port: 4327
types:
- platform
- function
telemetryapi/3:
port: 4328
types: ["platform", "function"]
```
[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
Expand Down
11 changes: 11 additions & 0 deletions collector/receiver/telemetryapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,23 @@

package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"fmt"
)

// Config defines the configuration for the various elements of the receiver agent.
type Config struct {
extensionID string
Port int `mapstructure:"port"`
Types []string `mapstructure:"types"`
}

// Validate validates the configuration by checking for missing or invalid fields
func (cfg *Config) Validate() error {
for _, t := range cfg.Types {
if t != platform && t != function && t != extension {
return fmt.Errorf("unknown extension type: %s", t)
}
}
return nil
}
103 changes: 103 additions & 0 deletions collector/receiver/telemetryapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,107 @@
package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"fmt"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

// Helper function to create expected Config
createExpectedConfig := func(types []string) *Config {
return &Config{
extensionID: "extensionID",
Port: 12345,
Types: types,
}
}

tests := []struct {
name string
id component.ID
expected component.Config
}{
{
name: "default",
id: component.NewID(component.MustNewType("telemetryapi")),
expected: NewFactory("extensionID").CreateDefaultConfig(),
},
{
name: "all types",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"),
expected: createExpectedConfig([]string{platform, function, extension}),
},
{
name: "platform only",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"),
expected: createExpectedConfig([]string{platform}),
},
{
name: "function only",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"),
expected: createExpectedConfig([]string{function}),
},
{
name: "extension only",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"),
expected: createExpectedConfig([]string{extension}),
},
{
name: "platform and function",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"),
expected: createExpectedConfig([]string{platform, function}),
},
{
name: "platform and extension",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"),
expected: createExpectedConfig([]string{platform, extension}),
},
{
name: "function and extension",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"),
expected: createExpectedConfig([]string{function, extension}),
},
{
name: "empty types",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"),
expected: createExpectedConfig([]string{}),
},
{
name: "function and extension (alternative syntax)",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"),
expected: createExpectedConfig([]string{function, extension}),
},
{
name: "function and extension (another syntax)",
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"),
expected: createExpectedConfig([]string{function, extension}),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

factory := NewFactory("extensionID")
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))
require.NoError(t, component.ValidateConfig(cfg))

require.Equal(t, tt.expected, cfg)
})
}
}

jerrytfleung marked this conversation as resolved.
Show resolved Hide resolved
func TestValidate(t *testing.T) {
testCases := []struct {
desc string
Expand All @@ -31,6 +127,13 @@ func TestValidate(t *testing.T) {
cfg: &Config{},
expectedErr: nil,
},
{
desc: "invalid config",
cfg: &Config{
Types: []string{"invalid"},
},
expectedErr: fmt.Errorf("unknown extension type: invalid"),
},
}

for _, tc := range testCases {
Expand Down
35 changes: 31 additions & 4 deletions collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
platform = "platform"
function = "function"
extension = "extension"
)

var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config")
Expand All @@ -37,16 +42,38 @@ func NewFactory(extensionID string) receiver.Factory {
func() component.Config {
return &Config{
extensionID: extensionID,
Port: defaultPort,
Types: []string{platform, function, extension},
}
},
receiver.WithTraces(createTracesReceiver, stability))
receiver.WithTraces(createTracesReceiver, stability),
receiver.WithLogs(createLogsReceiver, stability))
}

func createTracesReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}
r := receivers.GetOrAdd(cfg, func() component.Component {
t, _ := newTelemetryAPIReceiver(cfg, params)
return t
})
r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next)
return r, nil
}

return newTelemetryAPIReceiver(cfg, next, params)
func createLogsReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}
r := receivers.GetOrAdd(cfg, func() component.Component {
t, _ := newTelemetryAPIReceiver(cfg, params)
return t
})
r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next)
return r, nil
}

var receivers = sharedcomponent.NewSharedComponents()
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) {
testFunc: func(t *testing.T) {
factory := NewFactory("test")

var expectedCfg component.Config = &Config{extensionID: "test"}
var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}}

require.Equal(t, expectedCfg, factory.CreateDefaultConfig())
},
Expand Down
28 changes: 19 additions & 9 deletions collector/receiver/telemetryapireceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ require (
github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.107.0
go.opentelemetry.io/collector/consumer v0.106.1
go.opentelemetry.io/collector/consumer/consumertest v0.106.1
go.opentelemetry.io/collector/confmap v0.107.0
go.opentelemetry.io/collector/consumer v0.107.0
go.opentelemetry.io/collector/consumer/consumertest v0.107.0
go.opentelemetry.io/collector/pdata v1.13.0
go.opentelemetry.io/collector/receiver v0.106.1
go.opentelemetry.io/collector/semconv v0.106.1
go.opentelemetry.io/collector/receiver v0.107.0
go.opentelemetry.io/collector/semconv v0.107.0
go.uber.org/zap v1.27.0
)

Expand All @@ -25,9 +26,16 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand All @@ -37,18 +45,20 @@ require (
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.107.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.1 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.106.1 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.0 // indirect
go.opentelemetry.io/collector/featuregate v1.13.0 // indirect
go.opentelemetry.io/collector/internal/globalgates v0.107.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.107.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
Expand Down
Loading
Loading