From f2ec1668136c1dc373d76206359235961683a1da Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Mon, 30 Oct 2023 15:21:55 +0100 Subject: [PATCH] [exporter/syslog] send syslog messages in batches (#27799) **Description:** This changes the behavior of the Syslog exporter to send each batch of Syslog messages in a single request (with messages separated by newlines), instead of sending each message in a separate request and closing the connection after each message. The batching only happens when using TCP. For UDP, each syslog message is still sent in a separate request, as defined by [the spec](https://datatracker.ietf.org/doc/html/rfc5426#section-3.1). This also significantly refactors (and hopefully simplifies) the exporter's code, extracting the code that formats the syslog messages from the `sender` type into separate `formatter` types. Hopefully this will make the development of this component easier. **Link to tracking Issue:** - https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21244 **Testing:** The unit tests have been updated to reflect the refactored codebase. The integration tests introduced in https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/27464 are unchanged, as the format of the output messages hasn't changed. **Documentation:** No documentation updates. --- .chloggen/syslog-exporter-single-request.yaml | 27 +++ exporter/syslogexporter/exporter.go | 129 ++++++++------- exporter/syslogexporter/formatter.go | 29 ++++ exporter/syslogexporter/rfc3164_formatter.go | 56 +++++++ .../syslogexporter/rfc3164_formatter_test.go | 41 +++++ exporter/syslogexporter/rfc5424_formatter.go | 98 +++++++++++ .../syslogexporter/rfc5424_formatter_test.go | 94 +++++++++++ exporter/syslogexporter/sender.go | 109 +------------ exporter/syslogexporter/sender_test.go | 154 ------------------ exporter/syslogexporter/utils.go | 8 - exporter/syslogexporter/utils_test.go | 24 --- 11 files changed, 410 insertions(+), 359 deletions(-) create mode 100755 .chloggen/syslog-exporter-single-request.yaml create mode 100644 exporter/syslogexporter/formatter.go create mode 100644 exporter/syslogexporter/rfc3164_formatter.go create mode 100644 exporter/syslogexporter/rfc3164_formatter_test.go create mode 100644 exporter/syslogexporter/rfc5424_formatter.go create mode 100644 exporter/syslogexporter/rfc5424_formatter_test.go delete mode 100644 exporter/syslogexporter/sender_test.go diff --git a/.chloggen/syslog-exporter-single-request.yaml b/.chloggen/syslog-exporter-single-request.yaml new file mode 100755 index 000000000000..4feea79ee3ba --- /dev/null +++ b/.chloggen/syslog-exporter-single-request.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/syslog + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: send syslog messages in batches + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21244] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This changes the behavior of the Syslog exporter to send each batch of Syslog messages in a single request (with messages separated by newlines), instead of sending each message in a separate request and closing the connection after each message. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/syslogexporter/exporter.go b/exporter/syslogexporter/exporter.go index 7d897137f544..c96274ee0d62 100644 --- a/exporter/syslogexporter/exporter.go +++ b/exporter/syslogexporter/exporter.go @@ -8,12 +8,10 @@ import ( "crypto/tls" "fmt" "strings" - "time" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/multierr" "go.uber.org/zap" @@ -23,6 +21,7 @@ type syslogexporter struct { config *Config logger *zap.Logger tlsConfig *tls.Config + formatter formatter } func initExporter(cfg *Config, createSettings exporter.CreateSettings) (*syslogexporter, error) { @@ -37,11 +36,12 @@ func initExporter(cfg *Config, createSettings exporter.CreateSettings) (*sysloge config: cfg, logger: createSettings.Logger, tlsConfig: tlsConfig, + formatter: createFormatter(cfg.Protocol), } s.logger.Info("Syslog Exporter configured", zap.String("endpoint", cfg.Endpoint), - zap.String("Protocol", cfg.Protocol), + zap.String("protocol", cfg.Protocol), zap.Int("port", cfg.Port), ) @@ -69,78 +69,77 @@ func newLogsExporter( ) } -func (se *syslogexporter) logsToMap(record plog.LogRecord) map[string]any { - attributes := record.Attributes().AsRaw() - return attributes -} - -func (se *syslogexporter) getTimestamp(record plog.LogRecord) time.Time { - timestamp := record.Timestamp().AsTime() - return timestamp +func (se *syslogexporter) pushLogsData(_ context.Context, logs plog.Logs) error { + batchMessages := strings.ToLower(se.config.Network) == "tcp" + var err error + if batchMessages { + err = se.exportBatch(logs) + } else { + err = se.exportNonBatch(logs) + } + return err } -func (se *syslogexporter) pushLogsData(_ context.Context, ld plog.Logs) error { - type droppedResourceRecords struct { - resource pcommon.Resource - records []plog.LogRecord - } - var ( - errs []error - dropped []droppedResourceRecords - ) - rls := ld.ResourceLogs() - for i := 0; i < rls.Len(); i++ { - rl := rls.At(i) - if droppedRecords, err := se.sendSyslogs(rl); err != nil { - dropped = append(dropped, droppedResourceRecords{ - resource: rl.Resource(), - records: droppedRecords, - }) - errs = append(errs, err) +func (se *syslogexporter) exportBatch(logs plog.Logs) error { + var payload strings.Builder + for i := 0; i < logs.ResourceLogs().Len(); i++ { + resourceLogs := logs.ResourceLogs().At(i) + for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ { + scopeLogs := resourceLogs.ScopeLogs().At(j) + for k := 0; k < scopeLogs.LogRecords().Len(); k++ { + logRecord := scopeLogs.LogRecords().At(k) + formatted := se.formatter.format(logRecord) + payload.WriteString(formatted) + } } } - if len(dropped) > 0 { - ld = plog.NewLogs() - for i := range dropped { - rls := ld.ResourceLogs().AppendEmpty() - logRecords := rls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - dropped[i].resource.MoveTo(rls.Resource()) - for j := 0; j < len(dropped[i].records); j++ { - dropped[i].records[j].MoveTo(logRecords) - } + + if payload.Len() > 0 { + sender, err := connect(se.logger, se.config, se.tlsConfig) + if err != nil { + return consumererror.NewLogs(err, logs) + } + defer sender.close() + err = sender.Write(payload.String()) + if err != nil { + return consumererror.NewLogs(err, logs) } - errs = deduplicateErrors(errs) - return consumererror.NewLogs(multierr.Combine(errs...), ld) } - se.logger.Info("Connected successfully, exporting logs....") return nil } -func (se *syslogexporter) sendSyslogs(rl plog.ResourceLogs) ([]plog.LogRecord, error) { - var ( - errs []error - droppedRecords []plog.LogRecord - ) - slgs := rl.ScopeLogs() - for i := 0; i < slgs.Len(); i++ { - slg := slgs.At(i) - for j := 0; j < slg.LogRecords().Len(); j++ { - lr := slg.LogRecords().At(j) - formattedLine := se.logsToMap(lr) - timestamp := se.getTimestamp(lr) - s, errConn := connect(se.logger, se.config, se.tlsConfig) - if errConn != nil { - droppedRecords = append(droppedRecords, lr) - errs = append(errs, errConn) - continue - } - defer s.close() - err := s.Write(formattedLine, timestamp) - if err != nil { - droppedRecords = append(droppedRecords, lr) - errs = append(errs, err) +func (se *syslogexporter) exportNonBatch(logs plog.Logs) error { + sender, err := connect(se.logger, se.config, se.tlsConfig) + if err != nil { + return consumererror.NewLogs(err, logs) + } + defer sender.close() + + errs := []error{} + droppedLogs := plog.NewLogs() + for i := 0; i < logs.ResourceLogs().Len(); i++ { + resourceLogs := logs.ResourceLogs().At(i) + droppedResourceLogs := droppedLogs.ResourceLogs().AppendEmpty() + for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ { + scopeLogs := resourceLogs.ScopeLogs().At(j) + droppedScopeLogs := droppedResourceLogs.ScopeLogs().AppendEmpty() + for k := 0; k < scopeLogs.LogRecords().Len(); k++ { + logRecord := scopeLogs.LogRecords().At(k) + formatted := se.formatter.format(logRecord) + err = sender.Write(formatted) + if err != nil { + errs = append(errs, err) + droppedLogRecord := droppedScopeLogs.LogRecords().AppendEmpty() + logRecord.CopyTo(droppedLogRecord) + } } } } - return droppedRecords, multierr.Combine(errs...) + + if len(errs) > 0 { + errs = deduplicateErrors(errs) + return consumererror.NewLogs(multierr.Combine(errs...), droppedLogs) + } + + return nil } diff --git a/exporter/syslogexporter/formatter.go b/exporter/syslogexporter/formatter.go new file mode 100644 index 000000000000..c184ce2813e6 --- /dev/null +++ b/exporter/syslogexporter/formatter.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package syslogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter" + +import ( + "go.opentelemetry.io/collector/pdata/plog" +) + +func createFormatter(protocol string) formatter { + if protocol == protocolRFC5424Str { + return newRFC5424Formatter() + } + return newRFC3164Formatter() +} + +type formatter interface { + format(plog.LogRecord) string +} + +// getAttributeValueOrDefault returns the value of the requested log record's attribute as a string. +// If the attribute was not found, it returns the provided default value. +func getAttributeValueOrDefault(logRecord plog.LogRecord, attributeName string, defaultValue string) string { + value := defaultValue + if attributeValue, found := logRecord.Attributes().Get(attributeName); found { + value = attributeValue.AsString() + } + return value +} diff --git a/exporter/syslogexporter/rfc3164_formatter.go b/exporter/syslogexporter/rfc3164_formatter.go new file mode 100644 index 000000000000..4298a2cb8406 --- /dev/null +++ b/exporter/syslogexporter/rfc3164_formatter.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package syslogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter" + +import ( + "fmt" + "strconv" + + "go.opentelemetry.io/collector/pdata/plog" +) + +type rfc3164Formatter struct { +} + +func newRFC3164Formatter() *rfc3164Formatter { + return &rfc3164Formatter{} +} + +func (f *rfc3164Formatter) format(logRecord plog.LogRecord) string { + priorityString := f.formatPriority(logRecord) + timestampString := f.formatTimestamp(logRecord) + hostnameString := f.formatHostname(logRecord) + appnameString := f.formatAppname(logRecord) + messageString := f.formatMessage(logRecord) + appnameMessageDelimiter := "" + if len(appnameString) > 0 && messageString != emptyMessage { + appnameMessageDelimiter = " " + } + formatted := fmt.Sprintf("<%s>%s %s %s%s%s\n", priorityString, timestampString, hostnameString, appnameString, appnameMessageDelimiter, messageString) + return formatted +} + +func (f *rfc3164Formatter) formatPriority(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, priority, strconv.Itoa(defaultPriority)) +} + +func (f *rfc3164Formatter) formatTimestamp(logRecord plog.LogRecord) string { + return logRecord.Timestamp().AsTime().Format("Jan 02 15:04:05") +} + +func (f *rfc3164Formatter) formatHostname(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, hostname, emptyValue) +} + +func (f *rfc3164Formatter) formatAppname(logRecord plog.LogRecord) string { + value := getAttributeValueOrDefault(logRecord, app, "") + if value != "" { + value += ":" + } + return value +} + +func (f *rfc3164Formatter) formatMessage(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, message, emptyMessage) +} diff --git a/exporter/syslogexporter/rfc3164_formatter_test.go b/exporter/syslogexporter/rfc3164_formatter_test.go new file mode 100644 index 000000000000..6c3a48cf0ae3 --- /dev/null +++ b/exporter/syslogexporter/rfc3164_formatter_test.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package syslogexporter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +func TestRFC3164Formatter(t *testing.T) { + expected := "<34>Aug 24 05:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8\n" + logRecord := plog.NewLogRecord() + logRecord.Attributes().PutStr("appname", "su") + logRecord.Attributes().PutStr("hostname", "mymachine") + logRecord.Attributes().PutStr("message", "'su root' failed for lonvick on /dev/pts/8") + logRecord.Attributes().PutInt("priority", 34) + timestamp, err := time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003Z") + require.NoError(t, err) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + + actual := newRFC3164Formatter().format(logRecord) + assert.NoError(t, err) + assert.Equal(t, expected, actual) + + expected = "<165>Aug 24 05:14:15 - -\n" + logRecord = plog.NewLogRecord() + logRecord.Attributes().PutStr("message", "-") + timestamp, err = time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003Z") + require.NoError(t, err) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + + actual = newRFC3164Formatter().format(logRecord) + assert.NoError(t, err) + assert.Equal(t, expected, actual) +} diff --git a/exporter/syslogexporter/rfc5424_formatter.go b/exporter/syslogexporter/rfc5424_formatter.go new file mode 100644 index 000000000000..fa98da4f5d50 --- /dev/null +++ b/exporter/syslogexporter/rfc5424_formatter.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package syslogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter" + +import ( + "fmt" + "strconv" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +type rfc5424Formatter struct { +} + +func newRFC5424Formatter() *rfc5424Formatter { + return &rfc5424Formatter{} +} + +func (f *rfc5424Formatter) format(logRecord plog.LogRecord) string { + priorityString := f.formatPriority(logRecord) + versionString := f.formatVersion(logRecord) + timestampString := f.formatTimestamp(logRecord) + hostnameString := f.formatHostname(logRecord) + appnameString := f.formatAppname(logRecord) + pidString := f.formatPid(logRecord) + messageIDString := f.formatMessageID(logRecord) + structuredData := f.formatStructuredData(logRecord) + messageString := f.formatMessage(logRecord) + formatted := fmt.Sprintf("<%s>%s %s %s %s %s %s %s%s\n", priorityString, versionString, timestampString, hostnameString, appnameString, pidString, messageIDString, structuredData, messageString) + return formatted +} + +func (f *rfc5424Formatter) formatPriority(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, priority, strconv.Itoa(defaultPriority)) +} + +func (f *rfc5424Formatter) formatVersion(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, version, strconv.Itoa(versionRFC5424)) +} + +func (f *rfc5424Formatter) formatTimestamp(logRecord plog.LogRecord) string { + return logRecord.Timestamp().AsTime().Format(time.RFC3339Nano) +} + +func (f *rfc5424Formatter) formatHostname(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, hostname, emptyValue) +} + +func (f *rfc5424Formatter) formatAppname(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, app, emptyValue) +} + +func (f *rfc5424Formatter) formatPid(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, pid, emptyValue) +} + +func (f *rfc5424Formatter) formatMessageID(logRecord plog.LogRecord) string { + return getAttributeValueOrDefault(logRecord, msgID, emptyValue) +} + +func (f *rfc5424Formatter) formatStructuredData(logRecord plog.LogRecord) string { + structuredDataAttributeValue, found := logRecord.Attributes().Get(structuredData) + if !found { + return emptyValue + } + if structuredDataAttributeValue.Type() != pcommon.ValueTypeMap { + return emptyValue + } + + sdElements := []string{} + for key, val := range structuredDataAttributeValue.Map().AsRaw() { + sdElements = append(sdElements, key) + vval, ok := val.(map[string]interface{}) + if !ok { + continue + } + for k, v := range vval { + vv, ok := v.(string) + if !ok { + continue + } + sdElements = append(sdElements, fmt.Sprintf("%s=\"%s\"", k, vv)) + } + } + return fmt.Sprint(sdElements) + +} + +func (f *rfc5424Formatter) formatMessage(logRecord plog.LogRecord) string { + formatted := getAttributeValueOrDefault(logRecord, message, emptyMessage) + if len(formatted) > 0 { + formatted = " " + formatted + } + return formatted +} diff --git a/exporter/syslogexporter/rfc5424_formatter_test.go b/exporter/syslogexporter/rfc5424_formatter_test.go new file mode 100644 index 000000000000..d88fb9f53a88 --- /dev/null +++ b/exporter/syslogexporter/rfc5424_formatter_test.go @@ -0,0 +1,94 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package syslogexporter + +import ( + "fmt" + "regexp" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +func TestRFC5424Formatter(t *testing.T) { + expected := "<165>1 2003-08-24T05:14:15.000003Z 192.0.2.1 myproc 8710 - - It's time to make the do-nuts.\n" + logRecord := plog.NewLogRecord() + logRecord.Attributes().PutStr("appname", "myproc") + logRecord.Attributes().PutStr("hostname", "192.0.2.1") + logRecord.Attributes().PutStr("message", "It's time to make the do-nuts.") + logRecord.Attributes().PutInt("priority", 165) + logRecord.Attributes().PutStr("proc_id", "8710") + logRecord.Attributes().PutInt("version", 1) + timestamp, err := time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003Z") + require.NoError(t, err) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + + actual := newRFC5424Formatter().format(logRecord) + assert.Equal(t, expected, actual) + + expected = "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog 111 ID47 - BOMAn application event log entry...\n" + logRecord = plog.NewLogRecord() + logRecord.Attributes().PutStr("appname", "evntslog") + logRecord.Attributes().PutStr("hostname", "mymachine.example.com") + logRecord.Attributes().PutStr("message", "BOMAn application event log entry...") + logRecord.Attributes().PutStr("msg_id", "ID47") + logRecord.Attributes().PutInt("priority", 165) + logRecord.Attributes().PutStr("proc_id", "111") + logRecord.Attributes().PutInt("version", 1) + timestamp, err = time.Parse(time.RFC3339Nano, "2003-10-11T22:14:15.003Z") + require.NoError(t, err) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + + actual = newRFC5424Formatter().format(logRecord) + assert.Equal(t, expected, actual) + + // Test structured data + expectedRegex := "\\<165\\>1 2003-08-24T12:14:15.000003Z 192\\.0\\.2\\.1 myproc 8710 - " + + "\\[\\S+ \\S+ \\S+ \\S+ \\S+\\] It's time to make the do-nuts\\.\n" + logRecord = plog.NewLogRecord() + logRecord.Attributes().PutStr("appname", "myproc") + logRecord.Attributes().PutStr("hostname", "192.0.2.1") + logRecord.Attributes().PutStr("message", "It's time to make the do-nuts.") + logRecord.Attributes().PutInt("priority", 165) + logRecord.Attributes().PutStr("proc_id", "8710") + logRecord.Attributes().PutEmptyMap("structured_data") + structuredData, found := logRecord.Attributes().Get("structured_data") + require.True(t, found) + structuredData.Map().PutEmptyMap("SecureAuth@27389") + structuredDataSubmap, found := structuredData.Map().Get("SecureAuth@27389") + require.True(t, found) + structuredDataSubmap.Map().PutStr("PEN", "27389") + structuredDataSubmap.Map().PutStr("Realm", "SecureAuth0") + structuredDataSubmap.Map().PutStr("UserHostAddress", "192.168.2.132") + structuredDataSubmap.Map().PutStr("UserID", "Tester2") + logRecord.Attributes().PutInt("version", 1) + timestamp, err = time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003-07:00") + require.NoError(t, err) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + + actual = newRFC5424Formatter().format(logRecord) + assert.NoError(t, err) + matched, err := regexp.MatchString(expectedRegex, actual) + assert.NoError(t, err) + assert.True(t, matched, fmt.Sprintf("unexpected form of formatted message, formatted message: %s, regexp: %s", actual, expectedRegex)) + assert.True(t, strings.Contains(actual, "Realm=\"SecureAuth0\"")) + assert.True(t, strings.Contains(actual, "UserHostAddress=\"192.168.2.132\"")) + assert.True(t, strings.Contains(actual, "UserID=\"Tester2\"")) + assert.True(t, strings.Contains(actual, "PEN=\"27389\"")) + + // Test defaults + expected = "<165>1 2003-08-24T12:14:15.000003Z - - - - -\n" + logRecord = plog.NewLogRecord() + timestamp, err = time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003-07:00") + require.NoError(t, err) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + + actual = newRFC5424Formatter().format(logRecord) + assert.Equal(t, expected, actual) +} diff --git a/exporter/syslogexporter/sender.go b/exporter/syslogexporter/sender.go index 9396cbcc059c..cd3d5d700963 100644 --- a/exporter/syslogexporter/sender.go +++ b/exporter/syslogexporter/sender.go @@ -9,20 +9,17 @@ import ( "net" "strings" "sync" - "time" "go.uber.org/zap" ) const defaultPriority = 165 -const defaultFacility = 1 const versionRFC5424 = 1 const protocolRFC5424Str = "rfc5424" const protocolRFC3164Str = "rfc3164" const priority = "priority" -const facility = "facility" const version = "version" const hostname = "hostname" const app = "appname" @@ -89,12 +86,10 @@ func (s *sender) dial() error { return err } -func (s *sender) Write(msg map[string]any, timestamp time.Time) error { +func (s *sender) Write(msgStr string) error { s.mu.Lock() defer s.mu.Unlock() - msgStr := s.formatMsg(msg, timestamp) - if s.conn != nil { if err := s.write(msgStr); err == nil { return nil @@ -106,7 +101,6 @@ func (s *sender) Write(msg map[string]any, timestamp time.Time) error { return s.write(msgStr) } - func (s *sender) write(msg string) error { // check if logs contains new line character at the end, if not add it if !strings.HasSuffix(msg, "\n") { @@ -115,104 +109,3 @@ func (s *sender) write(msg string) error { _, err := fmt.Fprint(s.conn, msg) return err } - -func (s *sender) formatMsg(msg map[string]any, timestamp time.Time) string { - switch s.protocol { - case protocolRFC3164Str: - return s.formatRFC3164(msg, timestamp) - case protocolRFC5424Str: - return s.formatRFC5424(msg, timestamp) - default: - panic(fmt.Sprintf("unsupported syslog protocol, protocol: %s", s.protocol)) - } -} - -func (s *sender) addStructuredData(msg map[string]any) { - if s.protocol != protocolRFC5424Str { - return - } - - switch sd := msg[structuredData].(type) { - case map[string]map[string]string: - sdElements := []string{} - for key, val := range sd { - sdElements = append(sdElements, key) - for k, v := range val { - sdElements = append(sdElements, fmt.Sprintf("%s=\"%s\"", k, v)) - } - } - msg[structuredData] = sdElements - case map[string]interface{}: - sdElements := []string{} - for key, val := range sd { - sdElements = append(sdElements, key) - vval, ok := val.(map[string]interface{}) - if !ok { - continue - } - for k, v := range vval { - vv, ok := v.(string) - if !ok { - continue - } - sdElements = append(sdElements, fmt.Sprintf("%s=\"%s\"", k, vv)) - } - } - msg[structuredData] = sdElements - default: - msg[structuredData] = emptyValue - } -} - -func populateDefaults(msg map[string]any, msgProperties []string) { - for _, msgProperty := range msgProperties { - if _, ok := msg[msgProperty]; ok { - continue - } - - switch msgProperty { - case priority: - msg[msgProperty] = defaultPriority - case version: - msg[msgProperty] = versionRFC5424 - case facility: - msg[msgProperty] = defaultFacility - case message: - msg[msgProperty] = emptyMessage - default: - msg[msgProperty] = emptyValue - } - } -} - -func (s *sender) formatRFC3164(msg map[string]any, timestamp time.Time) string { - msgProperties := []string{priority, hostname, message, app} - populateDefaults(msg, msgProperties) - timestampString := timestamp.Format("Jan 02 15:04:05") - appname := "" - if msg[app] != emptyValue { - appname = msg[app].(string) + ":" - } - if appname != "" && message != emptyMessage { - appname += " " - } - return fmt.Sprintf("<%d>%s %s %s%s", msg[priority], timestampString, msg[hostname], appname, msg[message]) -} - -func (s *sender) formatRFC5424(msg map[string]any, timestamp time.Time) string { - msgProperties := []string{priority, version, hostname, app, pid, msgID, message, structuredData} - populateDefaults(msg, msgProperties) - s.addStructuredData(msg) - timestampString := timestamp.Format(time.RFC3339Nano) - - return fmt.Sprintf("<%d>%d %s %s %s %s %s %s%s", msg[priority], msg[version], timestampString, msg[hostname], msg[app], msg[pid], msg[msgID], msg[structuredData], formatMessagePart(msg[message])) -} - -func formatMessagePart(message any) string { - msg := message.(string) - if msg != emptyMessage { - msg = " " + msg - } - - return msg -} diff --git a/exporter/syslogexporter/sender_test.go b/exporter/syslogexporter/sender_test.go deleted file mode 100644 index 68b178220cdd..000000000000 --- a/exporter/syslogexporter/sender_test.go +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package syslogexporter - -import ( - "fmt" - "regexp" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestFormatRFC5424(t *testing.T) { - - s := sender{protocol: protocolRFC5424Str} - - msg := map[string]any{ - "timestamp": "2003-08-24T05:14:15.000003-07:00", - "appname": "myproc", - "facility": 20, - "hostname": "192.0.2.1", - "log.file.name": "syslog", - "message": "It's time to make the do-nuts.", - "priority": 165, - "proc_id": "8710", - "version": 1, - } - - expected := "<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - - It's time to make the do-nuts." - timeObj1, err := time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003-07:00") - assert.Equal(t, expected, s.formatMsg(msg, timeObj1)) - assert.Nil(t, err) - - msg2 := map[string]any{ - "timestamp": "2003-10-11T22:14:15.003Z", - "appname": "evntslog", - "facility": 20, - "hostname": "mymachine.example.com", - "log.file.name": "syslog", - "message": "BOMAn application event log entry...", - "msg_id": "ID47", - "priority": 165, - "proc_id": "111", - "version": 1, - } - - expected2 := "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog 111 ID47 - BOMAn application event log entry..." - timeObj2, err := time.Parse(time.RFC3339Nano, "2003-10-11T22:14:15.003Z") - assert.Nil(t, err) - assert.Equal(t, expected2, s.formatMsg(msg2, timeObj2)) - - msg3 := map[string]any{ - "timestamp": "2003-08-24T05:14:15.000003-07:00", - "appname": "myproc", - "facility": 20, - "hostname": "192.0.2.1", - "log.file.name": "syslog", - "message": "It's time to make the do-nuts.", - "priority": 165, - "proc_id": "8710", - "version": 1, - "structured_data": map[string]map[string]string{ - "SecureAuth@27389": { - "PEN": "27389", - "Realm": "SecureAuth0", - "UserHostAddress": "192.168.2.132", - "UserID": "Tester2", - }, - }, - } - - expectedForm := "\\<165\\>1 2003-08-24T05:14:15.000003-07:00 192\\.0\\.2\\.1 myproc 8710 - " + - "\\[\\S+ \\S+ \\S+ \\S+ \\S+\\] It's time to make the do-nuts\\." - timeObj3, err := time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003-07:00") - assert.Nil(t, err) - formattedMsg := s.formatMsg(msg3, timeObj3) - matched, err := regexp.MatchString(expectedForm, formattedMsg) - assert.Nil(t, err) - assert.Equal(t, true, matched, fmt.Sprintf("unexpected form of formatted message, formatted message: %s, regexp: %s", formattedMsg, expectedForm)) - assert.Equal(t, true, strings.Contains(formattedMsg, "Realm=\"SecureAuth0\"")) - assert.Equal(t, true, strings.Contains(formattedMsg, "UserHostAddress=\"192.168.2.132\"")) - assert.Equal(t, true, strings.Contains(formattedMsg, "UserID=\"Tester2\"")) - assert.Equal(t, true, strings.Contains(formattedMsg, "PEN=\"27389\"")) - - // Test defaults - msg4 := map[string]any{} - expected = "<165>1 2003-08-24T05:14:15.000003-07:00 - - - - -" - timeObj1, err = time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003-07:00") - assert.Equal(t, expected, s.formatMsg(msg4, timeObj1)) - assert.Nil(t, err) - - msg5 := map[string]any{ - "timestamp": "2003-08-24T05:14:15.000003-07:00", - "appname": "myproc", - "facility": 20, - "hostname": "192.0.2.1", - "log.file.name": "syslog", - "message": "It's time to make the do-nuts.", - "priority": 165, - "proc_id": "8710", - "version": 1, - "structured_data": map[string]interface{}{ - "SecureAuth@27389": map[string]interface{}{ - "PEN": "27389", - "Realm": "SecureAuth0", - "UserHostAddress": "192.168.2.132", - "UserID": "Tester2", - }, - }, - } - - expectedForm = "\\<165\\>1 2003-08-24T05:14:15.000003-07:00 192\\.0\\.2\\.1 myproc 8710 - " + - "\\[\\S+ \\S+ \\S+ \\S+ \\S+\\] It's time to make the do-nuts\\." - timeObj5, err := time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003-07:00") - assert.Nil(t, err) - formattedMsg = s.formatMsg(msg5, timeObj5) - matched, err = regexp.MatchString(expectedForm, formattedMsg) - assert.Nil(t, err) - assert.Equal(t, true, matched, fmt.Sprintf("unexpected form of formatted message, formatted message: %s, regexp: %s", formattedMsg, expectedForm)) - assert.Equal(t, true, strings.Contains(formattedMsg, "Realm=\"SecureAuth0\"")) - assert.Equal(t, true, strings.Contains(formattedMsg, "UserHostAddress=\"192.168.2.132\"")) - assert.Equal(t, true, strings.Contains(formattedMsg, "UserID=\"Tester2\"")) - assert.Equal(t, true, strings.Contains(formattedMsg, "PEN=\"27389\"")) -} - -func TestFormatRFC3164(t *testing.T) { - - s := sender{protocol: protocolRFC3164Str} - - msg := map[string]interface{}{ - "message": "'su root' failed for lonvick on /dev/pts/8", - "hostname": "mymachine", - "appname": "su", - "priority": int64(34), - "facility": int64(4), - } - - expected := "<34>Aug 24 05:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8" - timeObj1, err := time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003-07:00") - assert.Equal(t, expected, s.formatMsg(msg, timeObj1)) - assert.Nil(t, err) - - // Test defaults - msg4 := map[string]interface{}{ - "message": "-", - } - expected = "<165>Aug 24 05:14:15 - -" - timeObj1, err = time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003-07:00") - assert.Equal(t, expected, s.formatMsg(msg4, timeObj1)) - assert.Nil(t, err) -} diff --git a/exporter/syslogexporter/utils.go b/exporter/syslogexporter/utils.go index a121145587b4..634962bdeacc 100644 --- a/exporter/syslogexporter/utils.go +++ b/exporter/syslogexporter/utils.go @@ -45,11 +45,3 @@ func deduplicateErrors(errs []error) []error { } return uniqueErrors } - -func errorListToStringSlice(errList []error) []string { - errStrList := make([]string, len(errList)) - for i, err := range errList { - errStrList[i] = err.Error() - } - return errStrList -} diff --git a/exporter/syslogexporter/utils_test.go b/exporter/syslogexporter/utils_test.go index 835bd5b4cf77..be9536d1b8b4 100644 --- a/exporter/syslogexporter/utils_test.go +++ b/exporter/syslogexporter/utils_test.go @@ -57,27 +57,3 @@ func TestDeduplicateErrors(t *testing.T) { }) } } - -func TestErrorString(t *testing.T) { - testCases := []struct { - name string - errs []error - expected []string - }{ - { - name: "duplicates are removed", - errs: []error{ - errors.New("failed sending data: 502 Bad Gateway"), - errors.New("dial tcp 127.0.0.1:514: connect: connection refused"), - }, - expected: []string{"failed sending data: 502 Bad Gateway", - "dial tcp 127.0.0.1:514: connect: connection refused"}, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - assert.Equal(t, testCase.expected, errorListToStringSlice(testCase.errs)) - }) - } -}