Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

[journald_input] Write journald fields as attributes instead of body. #353

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions docs/operators/journald_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The `journald_input` operator reads logs from the systemd journal using the `jou

By default, `journalctl` will read from `/run/journal` or `/var/log/journal`. If either `directory` or `files` are set, `journalctl` will instead read from those.

The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the journald entry as the parsed entry's timestamp. All other fields are added to the entry's body as returned by `journalctl`.
The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the journald entry as the parsed entry's timestamp, the `PRIORITY` field of the journald entry as the parsed entry's severity, the `MESSAGE` field of the journald entry as the parsed entry's body. All other fields are added to the entry's attributes as returned by `journalctl`.

### Configuration Fields

Expand All @@ -16,10 +16,11 @@ The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the j
| `files` | | A list of journal files to read entries from. |
| `units` | | A list of units to read entries from. |
| `priority` | `info` | Filter output by message priorities or priority ranges. |
| `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry. |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `fields` | Semantic Conventions | A map of journald fields to attributes. |
| `resource_fields` | Semantic Conventions | A map of journald fields to resource attributes. |

### Example Configurations
```yaml
Expand All @@ -45,13 +46,14 @@ Output entry sample:
```json
"entry": {
"timestamp": "2020-04-16T11:05:49.516168-04:00",
"body": {
"CODE_FILE": "../src/core/unit.c",
"CODE_FUNC": "unit_log_success",
"CODE_LINE": "5487",
"MESSAGE": "var-lib-docker-overlay2-bff8130ef3f66eeb81ce2102f1ac34cfa7a10fcbd1b8ae27c6c5a1543f64ddb7-merged.mount: Succeeded.",
"severity": 9,
"severity_text": "info",
"body": "var-lib-docker-overlay2-bff8130ef3f66eeb81ce2102f1ac34cfa7a10fcbd1b8ae27c6c5a1543f64ddb7-merged.mount: Succeeded.",
"attributes": {
"code.filepath": "../src/core/unit.c",
"code.function": "unit_log_success",
"code.lineno": "5487",
"MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448",
"PRIORITY": "6",
"SYSLOG_FACILITY": "3",
"SYSLOG_IDENTIFIER": "systemd",
"USER_INVOCATION_ID": "de9283b4fd634213a50f5abe71b4d951",
Expand All @@ -60,13 +62,8 @@ Output entry sample:
"_AUDIT_SESSION": "299",
"_BOOT_ID": "c4fa36de06824d21835c05ff80c54468",
"_CAP_EFFECTIVE": "0",
"_CMDLINE": "/lib/systemd/systemd --user",
"_COMM": "systemd",
"_EXE": "/usr/lib/systemd/systemd",
"_GID": "1000",
"_HOSTNAME": "testhost",
"_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d",
"_PID": "18667",
"_SELINUX_CONTEXT": "unconfined\n",
"_SOURCE_REALTIME_TIMESTAMP": "1587049549515868",
"_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/user@1000.service/init.scope",
Expand All @@ -80,6 +77,13 @@ Output entry sample:
"_UID": "1000",
"__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1efec9;b=c4fa36de06824d21835c05ff80c54468;m=a001b7ec5a;t=5a369c4a3cd88;x=f9717e0b5608807b",
"__MONOTONIC_TIMESTAMP": "687223598170"
},
"resource": {
"host.name": "testhost",
"process.pid": "18667",
"process.command_line": "/lib/systemd/systemd --user",
"process.command": "systemd",
"process.executable.path": "/usr/lib/systemd/systemd"
}
}
```
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ require (
k8s.io/client-go v0.23.2
)

require go.opentelemetry.io/collector/model v0.42.0

require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -49,7 +51,6 @@ require (
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/klog/v2 v2.30.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/collector v0.42.0 h1:hyOOmPe7CkPeiN8NT/eCQXJwak0pYwjocjDTGw95kvU=
go.opentelemetry.io/collector v0.42.0/go.mod h1:HiryUIokIPVCspJIAXlGdpfPFCepUAFLxTzid2AH7es=
go.opentelemetry.io/collector/model v0.42.0 h1:jQb9oi9NwhTJu6H8cOlK/3yeg+cyWxOrQD8A5TlcqQw=
go.opentelemetry.io/collector/model v0.42.0/go.mod h1:uUgx84gI+G/tE87Oo84305q0MD8tUV9uWxg+ckAE7Ew=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0/go.mod h1:vEhqr0m4eTc+DWxfsXoXue2GBgV2uUwVznkGIHW/e5w=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.28.0/go.mod h1:Ihno+mNBfZlT0Qot3XyRTdZ/9U/Cg2Pfgj75DTdIfq4=
Expand Down
58 changes: 58 additions & 0 deletions operator/builtin/input/journald/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package journald

import (
"go.opentelemetry.io/collector/model/semconv/v1.8.0"
)

var (
// For field definitions, see https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html
defaultAttributeMapping = map[string]string{
// MESSAGE_ID
"CODE_FILE": semconv.AttributeCodeFilepath,
"CODE_LINE": semconv.AttributeCodeLineNumber,
"CODE_FUNC": semconv.AttributeCodeFunction,
// ERRNO
// SYSLOG_FACILITY
// SYSLOG_IDENTIFIER
// SYSLOG_TIMESTAMP
// SYSLOG_RAW
// DOCUMENTATION
"TID": semconv.AttributeThreadID,
}

defaultResourceMapping = map[string]string{
// INVOCATION_ID
// USER_INVOCATION_ID
// SYSLOG_PID
"_PID": semconv.AttributeProcessPID,
// _UID
// _GID
"_COMM": semconv.AttributeProcessCommand,
"_EXE": semconv.AttributeProcessExecutablePath,
"_CMDLINE": semconv.AttributeProcessCommandLine,
// _CAP_EFFECTIVE
// _AUDIT_SESSION
// _AUDIT_LOGINUID
// _SYSTEMD_CGROUP
// _SYSTEMD_SLICE
// _SYSTEMD_UNIT
// _SYSTEMD_USER_UNIT
// _SYSTEMD_USER_SLICE
// _SYSTEMD_SESSION
// _SYSTEMD_OWNER_UID
// _SELINUX_CONTEXT
// _SOURCE_REALTIME_TIMESTAMP
// _BOOT_ID
// _MACHINE_ID
"_HOSTNAME": semconv.AttributeHostName,
// _TRANSPORT
// _STREAM_ID
// _LINE_BREAK
// _NAMESPACE
}
)

func hasFieldMapping(mapping map[string]string, field string) bool {
_, ok := mapping[field]
return ok
}
117 changes: 108 additions & 9 deletions operator/builtin/input/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package journald
import (
"bufio"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -52,11 +53,13 @@ func NewJournaldInputConfig(operatorID string) *JournaldInputConfig {
type JournaldInputConfig struct {
helper.InputConfig `mapstructure:",squash" yaml:",inline"`

Directory *string `mapstructure:"directory,omitempty" json:"directory,omitempty" yaml:"directory,omitempty"`
Files []string `mapstructure:"files,omitempty" json:"files,omitempty" yaml:"files,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
Units []string `mapstructure:"units,omitempty" json:"units,omitempty" yaml:"units,omitempty"`
Priority string `mapstructure:"priority,omitempty" json:"priority,omitempty" yaml:"priority,omitempty"`
Directory *string `mapstructure:"directory,omitempty" json:"directory,omitempty" yaml:"directory,omitempty"`
Files []string `mapstructure:"files,omitempty" json:"files,omitempty" yaml:"files,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
Units []string `mapstructure:"units,omitempty" json:"units,omitempty" yaml:"units,omitempty"`
Priority string `mapstructure:"priority,omitempty" json:"priority,omitempty" yaml:"priority,omitempty"`
ResourceFields map[string]string `mapstructure:"resource_fields,omitempty" json:"resource_fields,omitempty" yaml:"resource_fields,omitempty"`
Fields map[string]string `mapstructure:"fields,omitempty" json:"fields,omitempty" yaml:"fields,omitempty"`
}

// Build will build a journald input operator from the supplied configuration
Expand Down Expand Up @@ -100,6 +103,20 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat
}
}

fields := defaultAttributeMapping
if c.Fields != nil {
for k, v := range c.Fields {
fields[k] = v
}
}

resource_fields := defaultResourceMapping
if c.ResourceFields != nil {
for k, v := range c.ResourceFields {
resource_fields[k] = v
}
}

journaldInput := &JournaldInput{
InputOperator: inputOperator,
newCmd: func(ctx context.Context, cursor []byte) cmd {
Expand All @@ -109,7 +126,9 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat
return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ...
// journalctl is an executable that is required for this operator to function
},
json: jsoniter.ConfigFastest,
json: jsoniter.ConfigFastest,
fields: fields,
resource_fields: resource_fields,
}
return []operator.Operator{journaldInput}, nil
}
Expand All @@ -118,7 +137,9 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat
type JournaldInput struct {
helper.InputOperator

newCmd func(ctx context.Context, cursor []byte) cmd
newCmd func(ctx context.Context, cursor []byte) cmd
fields map[string]string
resource_fields map[string]string

persister operator.Persister
json jsoniter.API
Expand Down Expand Up @@ -222,12 +243,39 @@ func (operator *JournaldInput) parseJournalEntry(line []byte) (*entry.Entry, str
return nil, "", errors.New("journald field for cursor is not a string")
}

entry, err := operator.NewEntry(body)
msg, ok := body["MESSAGE"]
if !ok {
return nil, "", errors.New("journald body missing MESSAGE field")
}
delete(body, "MESSAGE")

entry, err := operator.NewEntry(msg)
if err != nil {
return nil, "", fmt.Errorf("failed to create entry: %s", err)
}

entry.Timestamp = time.Unix(0, timestampInt*1000) // in microseconds

for k, v := range body {
switch {
case k == "PRIORITY":
if err := addSeverity(entry, v); err != nil {
return nil, "", err
}
case hasFieldMapping(operator.fields, k):
if val := convertField(v); val != "" {
entry.AddAttribute(operator.fields[k], val)
}
case hasFieldMapping(operator.resource_fields, k):
if val := convertField(v); val != "" {
entry.AddResourceKey(operator.resource_fields[k], val)
}
default:
if val := convertField(v); val != "" {
entry.AddAttribute(k, val)
}
}
}

return entry, cursorString, nil
}

Expand All @@ -237,3 +285,54 @@ func (operator *JournaldInput) Stop() error {
operator.wg.Wait()
return nil
}

var severityMapping = [...]entry.Severity{
0: entry.Fatal,
1: entry.Error3,
2: entry.Error2,
3: entry.Error,
4: entry.Warn,
5: entry.Info2,
6: entry.Info,
7: entry.Debug,
}

var severityText = [...]string{
0: "emerg",
1: "alert",
2: "crit",
3: "err",
4: "warning",
5: "notice",
6: "info",
7: "debug",
}

func addSeverity(e *entry.Entry, sev interface{}) error {
sevInt, err := strconv.Atoi(sev.(string))
if err != nil {
return fmt.Errorf("severity field is not an int")
}

if sevInt < 0 || sevInt > 7 {
return fmt.Errorf("invalid severity '%d'", sevInt)
}

e.Severity = severityMapping[sevInt]
e.SeverityText = severityText[sevInt]
return nil
}

func convertField(val interface{}) string {
// attributes only supports strings at the moment
// in future, these should return AttributeValue types
// https://github.com/open-telemetry/opentelemetry-log-collection/issues/190
switch v := val.(type) {
case []byte:
return base64.StdEncoding.EncodeToString(v)
case nil:
return ""
default:
return fmt.Sprintf("%v", val)
}
}
Loading