Skip to content

Commit

Permalink
[exporter/logicmonitorexporter] Support log resource mapping config (#…
Browse files Browse the repository at this point in the history
…29733)

**Description:** 
This PR adds support for resource mapping config for logs. 
To make the resource mapping for logs flexible, LogicMonitor platform
expects some params to be present in the log ingestion request. We have
changed the exporter configuration to accept that configuration.

**Link to tracking Issue:** #29732

**Testing:** Unit test cases were added. Also did the functional testing
for the log ingestion with the newly added config.

**Documentation:** 

As per the LogicMonitor's [Log Ingestion
documentation](https://www.logicmonitor.com/support/lm-logs/sending-logs-to-the-lm-logs-ingestion-api),
if more than one resource property exists, only the first property will
be mapped. In case of OTLP logs, there can be multiple resource
attributes and its order also cannot be guaranteed.

Recently, the LogicMonitor has made the resource mapping for logs more
flexible. With that, any of the resource attributes present in the log
matches can be considered for resource mapping. But, this is not the
default behaviour. In order to make the resource mapping flexible, you
can configure the resource_mapping_op in the LogicMonitor's exporter.

 ```
 exporters:
    logicmonitor:
     ...
      logs:
        resource_mapping_op: "OR"
```
The value for resource_mapping_op can be AND or OR. The values are case-insensitive.
  • Loading branch information
avadhut123pisal authored Dec 13, 2023
1 parent a5b2501 commit 810ab60
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 29 deletions.
27 changes: 27 additions & 0 deletions .chloggen/support-log-resource-mapping-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: logicmonitorexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add support for log resource mapping configurations

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29732]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
18 changes: 18 additions & 0 deletions exporter/logicmonitorexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,21 @@ Set the environment variable `LOGICMONITOR_BEARER_TOKEN`
headers:
Authorization: Bearer <bearer token of logicmonitor>
```
## Resource Mapping for Logs

As per the LogicMonitor's [Log Ingestion documentation](https://www.logicmonitor.com/support/lm-logs/sending-logs-to-the-lm-logs-ingestion-api), if more than one resource property exists, only the first property will be mapped. In case of OTLP logs, there can be multiple resource attributes and its order also cannot be guaranteed.

Recently we have made the resource mapping for logs more flexible. With that, any of the resource attributes present in the log matches can be considered for resource mapping.
But, this is not the default behaviour. In order to make the resource mapping flexible, you can configure the `resource_mapping_op` in the LogicMonitor's exporter.

```yaml
exporters:
logicmonitor:
endpoint: https://company.logicmonitor.com/rest
headers:
Authorization: Bearer <token>
logs:
resource_mapping_op: "OR"
```

The value for `resource_mapping_op` can be `AND` or `OR`. The values are case-insensitive.
31 changes: 29 additions & 2 deletions exporter/logicmonitorexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package logicmonitorexporter // import "github.com/open-telemetry/opentelemetry-
import (
"fmt"
"net/url"
"strings"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
Expand All @@ -24,21 +25,47 @@ type Config struct {

// ApiToken of Logicmonitor Platform
APIToken APIToken `mapstructure:"api_token"`
// Logs defines the Logs exporter specific configuration
Logs LogsConfig `mapstructure:"logs"`
}

type APIToken struct {
AccessID string `mapstructure:"access_id"`
AccessKey configopaque.String `mapstructure:"access_key"`
}

type MappingOperation string

const (
And MappingOperation = "and"
Or MappingOperation = "or"
)

func (mop *MappingOperation) UnmarshalText(in []byte) error {
switch op := MappingOperation(strings.ToLower(string(in))); op {
case And, Or:
*mop = op
return nil

default:
return fmt.Errorf("unsupported mapping operation %q", op)
}
}

// LogsConfig defines the logs exporter specific configuration options
type LogsConfig struct {
// Operation to be performed for resource mapping. Valid values are `and`, `or`.
ResourceMappingOperation MappingOperation `mapstructure:"resource_mapping_op"`
}

func (c *Config) Validate() error {
if c.Endpoint == "" {
return fmt.Errorf("Endpoint should not be empty")
return fmt.Errorf("endpoint should not be empty")
}

u, err := url.Parse(c.Endpoint)
if err != nil || u.Scheme == "" || u.Host == "" {
return fmt.Errorf("Endpoint must be valid")
return fmt.Errorf("endpoint must be valid")
}
return nil
}
55 changes: 52 additions & 3 deletions exporter/logicmonitorexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/exporter/exporterhelper"

Expand All @@ -34,7 +35,7 @@ func TestConfigValidation(t *testing.T) {
},
},
wantErr: true,
errorMessage: "Endpoint should not be empty",
errorMessage: "endpoint should not be empty",
},
{
name: "missing http scheme",
Expand All @@ -44,7 +45,7 @@ func TestConfigValidation(t *testing.T) {
},
},
wantErr: true,
errorMessage: "Endpoint must be valid",
errorMessage: "endpoint must be valid",
},
{
name: "invalid endpoint format",
Expand All @@ -54,7 +55,7 @@ func TestConfigValidation(t *testing.T) {
},
},
wantErr: true,
errorMessage: "Endpoint must be valid",
errorMessage: "endpoint must be valid",
},
{
name: "valid config",
Expand Down Expand Up @@ -124,6 +125,22 @@ func TestLoadConfig(t *testing.T) {
},
},
},
{
id: component.NewIDWithName(metadata.Type, "resource-mapping-op"),
expected: &Config{
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: "https://company.logicmonitor.com/rest",
Headers: map[string]configopaque.String{
"Authorization": "Bearer <token>",
},
},
Logs: LogsConfig{
ResourceMappingOperation: "or",
},
},
},
}

for _, tt := range tests {
Expand All @@ -140,3 +157,35 @@ func TestLoadConfig(t *testing.T) {
})
}
}

func TestUnmarshal(t *testing.T) {
tests := []struct {
name string
configMap *confmap.Conf
cfg *Config
err string
}{
{
name: "invalid resource mapping operation",
configMap: confmap.NewFromStringMap(map[string]any{
"logs": map[string]any{
"resource_mapping_op": "invalid_op",
},
}),
err: "1 error(s) decoding:\n\n* error decoding 'logs.resource_mapping_op': unsupported mapping operation \"invalid_op\"",
},
}

f := NewFactory()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := f.CreateDefaultConfig().(*Config)
err := component.UnmarshalConfig(tt.configMap, cfg)
if err != nil || tt.err != "" {
assert.EqualError(t, err, tt.err)
} else {
assert.Equal(t, tt.cfg, cfg)
}
})
}
}
12 changes: 2 additions & 10 deletions exporter/logicmonitorexporter/internal/logs/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

lmsdklogs "github.com/logicmonitor/lm-data-sdk-go/api/logs"
"github.com/logicmonitor/lm-data-sdk-go/model"
"github.com/logicmonitor/lm-data-sdk-go/utils"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"
Expand All @@ -23,15 +22,8 @@ type Sender struct {
}

// NewSender creates a new Sender
func NewSender(ctx context.Context, endpoint string, client *http.Client, authParams utils.AuthParams, logger *zap.Logger) (*Sender, error) {
options := []lmsdklogs.Option{
lmsdklogs.WithLogBatchingDisabled(),
lmsdklogs.WithAuthentication(authParams),
lmsdklogs.WithHTTPClient(client),
lmsdklogs.WithEndpoint(endpoint),
}

logIngestClient, err := lmsdklogs.NewLMLogIngest(ctx, options...)
func NewSender(ctx context.Context, logger *zap.Logger, opts ...lmsdklogs.Option) (*Sender, error) {
logIngestClient, err := lmsdklogs.NewLMLogIngest(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create logIngestClient: %w", err)
}
Expand Down
28 changes: 20 additions & 8 deletions exporter/logicmonitorexporter/internal/logs/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ import (
)

func TestSendLogs(t *testing.T) {
authParams := utils.AuthParams{
AccessID: "testId",
AccessKey: "testKey",
BearerToken: "testToken",
}

t.Run("should not return error", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
response := lmsdklogs.LMLogIngestResponse{
Expand All @@ -39,7 +35,7 @@ func TestSendLogs(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

sender, err := NewSender(ctx, ts.URL, ts.Client(), authParams, zap.NewNop())
sender, err := NewSender(ctx, zap.NewNop(), buildLogIngestTestOpts(ts.URL, ts.Client())...)
assert.NoError(t, err)

logInput := translator.ConvertToLMLogInput("test msg", utils.NewTimestampFromTime(time.Now()).String(), map[string]any{"system.hostname": "test"}, map[string]any{"cloud.provider": "aws"})
Expand All @@ -61,7 +57,7 @@ func TestSendLogs(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

sender, err := NewSender(ctx, ts.URL, ts.Client(), authParams, zap.NewNop())
sender, err := NewSender(ctx, zap.NewNop(), buildLogIngestTestOpts(ts.URL, ts.Client())...)
assert.NoError(t, err)

logInput := translator.ConvertToLMLogInput("test msg", utils.NewTimestampFromTime(time.Now()).String(), map[string]any{"system.hostname": "test"}, map[string]any{"cloud.provider": "aws"})
Expand All @@ -84,7 +80,7 @@ func TestSendLogs(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

sender, err := NewSender(ctx, ts.URL, ts.Client(), authParams, zap.NewNop())
sender, err := NewSender(ctx, zap.NewNop(), buildLogIngestTestOpts(ts.URL, ts.Client())...)
assert.NoError(t, err)

logInput := translator.ConvertToLMLogInput("test msg", utils.NewTimestampFromTime(time.Now()).String(), map[string]any{"system.hostname": "test"}, map[string]any{"cloud.provider": "aws"})
Expand All @@ -94,3 +90,19 @@ func TestSendLogs(t *testing.T) {
assert.Equal(t, false, consumererror.IsPermanent(err))
})
}

func buildLogIngestTestOpts(endpoint string, client *http.Client) []lmsdklogs.Option {
authParams := utils.AuthParams{
AccessID: "testId",
AccessKey: "testKey",
BearerToken: "testToken",
}

opts := []lmsdklogs.Option{
lmsdklogs.WithLogBatchingDisabled(),
lmsdklogs.WithAuthentication(authParams),
lmsdklogs.WithHTTPClient(client),
lmsdklogs.WithEndpoint(endpoint),
}
return opts
}
31 changes: 25 additions & 6 deletions exporter/logicmonitorexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package logicmonitorexporter // import "github.com/open-telemetry/opentelemetry-
import (
"context"
"fmt"
"net/http"
"time"

lmsdklogs "github.com/logicmonitor/lm-data-sdk-go/api/logs"
"github.com/logicmonitor/lm-data-sdk-go/model"
"github.com/logicmonitor/lm-data-sdk-go/utils"
"github.com/logicmonitor/lm-data-sdk-go/utils/translator"
Expand Down Expand Up @@ -48,13 +50,9 @@ func (e *logExporter) start(ctx context.Context, host component.Host) error {
return fmt.Errorf("failed to create http client: %w", err)
}

authParams := utils.AuthParams{
AccessID: e.config.APIToken.AccessID,
AccessKey: string(e.config.APIToken.AccessKey),
BearerToken: string(e.config.Headers["Authorization"]),
}
opts := buildLogIngestOpts(e.config, client)

e.sender, err = logs.NewSender(ctx, e.config.Endpoint, client, authParams, e.settings.Logger)
e.sender, err = logs.NewSender(ctx, e.settings.Logger, opts...)
if err != nil {
return err
}
Expand Down Expand Up @@ -97,6 +95,27 @@ func (e *logExporter) PushLogData(ctx context.Context, lg plog.Logs) error {
return e.sender.SendLogs(ctx, payload)
}

func buildLogIngestOpts(config *Config, client *http.Client) []lmsdklogs.Option {
authParams := utils.AuthParams{
AccessID: config.APIToken.AccessID,
AccessKey: string(config.APIToken.AccessKey),
BearerToken: string(config.Headers["Authorization"]),
}

opts := []lmsdklogs.Option{
lmsdklogs.WithLogBatchingDisabled(),
lmsdklogs.WithAuthentication(authParams),
lmsdklogs.WithHTTPClient(client),
lmsdklogs.WithEndpoint(config.Endpoint),
}

if config.Logs.ResourceMappingOperation != "" {
opts = append(opts, lmsdklogs.WithResourceMappingOperation(string(config.Logs.ResourceMappingOperation)))
}

return opts
}

func timestampFromLogRecord(lr plog.LogRecord) pcommon.Timestamp {
if lr.Timestamp() != 0 {
return lr.Timestamp()
Expand Down
8 changes: 8 additions & 0 deletions exporter/logicmonitorexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ logicmonitor/bearertoken:
endpoint: https://company.logicmonitor.com/rest
headers:
Authorization: Bearer <token>
# The following entry demonstrates how to set resource mapping operation (AND / OR).
# The values are case-insensitive
logicmonitor/resource-mapping-op:
logs:
resource_mapping_op: "OR"
endpoint: https://company.logicmonitor.com/rest
headers:
Authorization: Bearer <token>

0 comments on commit 810ab60

Please sign in to comment.