Skip to content

Commit

Permalink
Add unifiedlogs input to agent
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-gr committed Dec 19, 2024
1 parent d42aab1 commit 6d629e3
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 50 deletions.
5 changes: 5 additions & 0 deletions x-pack/agentbeat/agentbeat.spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ inputs:
platforms: *platforms
outputs: *outputs
command: *filebeat_command
- name: unifiedlogs
description: "macOS Unified logs"
platforms: *platforms
outputs: *outputs
command: *filebeat_command
- name: unix
description: "Unix Socket"
platforms: *platforms
Expand Down
18 changes: 9 additions & 9 deletions x-pack/filebeat/input/unifiedlogs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

type config struct {
showConfig
commonConfig
Backfill bool `config:"backfill"`
ShowConfig showConfig `config:",inline"`
CommonConfig commonConfig `config:",inline"`
Backfill bool `config:"backfill"`
}

type showConfig struct {
Expand All @@ -38,17 +38,17 @@ type commonConfig struct {
}

func (c config) Validate() error {
if err := checkDateFormat(c.Start); err != nil {
if err := checkDateFormat(c.ShowConfig.Start); err != nil {
return fmt.Errorf("start date is not valid: %w", err)
}
if err := checkDateFormat(c.End); err != nil {
if err := checkDateFormat(c.ShowConfig.End); err != nil {
return fmt.Errorf("end date is not valid: %w", err)
}
if c.ArchiveFile != "" && !strings.HasSuffix(c.ArchiveFile, ".logarchive") {
return fmt.Errorf("archive_file %v has the wrong extension", c.ArchiveFile)
if c.ShowConfig.ArchiveFile != "" && !strings.HasSuffix(c.ShowConfig.ArchiveFile, ".logarchive") {
return fmt.Errorf("archive_file %v has the wrong extension", c.ShowConfig.ArchiveFile)
}
if c.TraceFile != "" && !strings.HasSuffix(c.TraceFile, ".tracev3") {
return fmt.Errorf("trace_file %v has the wrong extension", c.TraceFile)
if c.ShowConfig.TraceFile != "" && !strings.HasSuffix(c.ShowConfig.TraceFile, ".tracev3") {
return fmt.Errorf("trace_file %v has the wrong extension", c.ShowConfig.TraceFile)
}
return nil
}
Expand Down
61 changes: 61 additions & 0 deletions x-pack/filebeat/input/unifiedlogs/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build darwin

package unifiedlogs

import (
"testing"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/stretchr/testify/assert"
)

func TestConfig(t *testing.T) {
const cfgYaml = `
archive_file: /path/to/file.logarchive
trace_file: /path/to/file.tracev3
start: 2024-12-04 13:46:00+0200
end: 2024-12-04 13:46:00+0200
predicate:
- pid == 1
process:
- sudo
source: true
info: true
debug: true
backtrace: true
signpost: true
unreliable: true
mach_continuous_time: true
backfill: true
`

expected := config{
ShowConfig: showConfig{
ArchiveFile: "/path/to/file.logarchive",
TraceFile: "/path/to/file.tracev3",
Start: "2024-12-04 13:46:00+0200",
End: "2024-12-04 13:46:00+0200",
},
CommonConfig: commonConfig{
Predicate: []string{"pid == 1"},
Process: []string{"sudo"},
Source: true,
Info: true,
Debug: true,
Backtrace: true,
Signpost: true,
Unreliable: true,
MachContinuousTime: true,
},
Backfill: true,
}

c := conf.MustNewConfigFrom(cfgYaml)
cfg := defaultConfig()
assert.NoError(t, c.Unpack(&cfg))
assert.EqualValues(t, expected, cfg)
}
52 changes: 26 additions & 26 deletions x-pack/filebeat/input/unifiedlogs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type source struct {
}

func newSource(config config) source {
if config.ArchiveFile != "" || config.TraceFile != "" {
if config.ShowConfig.ArchiveFile != "" || config.ShowConfig.TraceFile != "" {
return source{name: srcArchiveName}
}
return source{name: srcPollName}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (input *input) Run(ctxt v2.Context, src inputcursor.Source, resumeCursor in
return err
}
if startFrom != "" {
input.Start = startFrom
input.ShowConfig.Start = startFrom
}

return input.runWithMetrics(stdCtx, pub, log)
Expand All @@ -135,7 +135,7 @@ func (input *input) runWithMetrics(ctx context.Context, pub inputcursor.Publishe
// backfilling process.
if input.mustStream() {
g.Go(func() error {
logCmd := newLogStreamCmd(ctx, input.commonConfig)
logCmd := newLogStreamCmd(ctx, input.CommonConfig)
return input.runLogCmd(ctx, logCmd, wrappedPub, log)
})
}
Expand All @@ -149,7 +149,7 @@ func (input *input) runWithMetrics(ctx context.Context, pub inputcursor.Publishe
// To avoid potentially losing data we move the end forward one second,
// since it is preferable to have some duplicated events.
t = t.Add(time.Second)
input.End = t.Format(cursorDateLayout)
input.ShowConfig.End = t.Format(cursorDateLayout)

// to avoid race conditions updating the cursor, and to be able to
// resume from the oldest point in time, we only update cursor
Expand All @@ -171,14 +171,14 @@ func (input *input) runWithMetrics(ctx context.Context, pub inputcursor.Publishe
// mustStream returns true in case a stream command is needed.
// This is the default case and the only exceptions are when an archive file or an end date are set.
func (input *input) mustStream() bool {
return !(input.ArchiveFile != "" || input.TraceFile != "" || input.End != "")
return !(input.ShowConfig.ArchiveFile != "" || input.ShowConfig.TraceFile != "" || input.ShowConfig.End != "")
}

// mustBackfill returns true in case a show command is needed.
// This happens when start or end dates are set (for example when resuming filebeat), when an archive file is used,
// or when user forces it via the backfill config.
func (input *input) mustBackfill() bool {
return input.Backfill || input.ArchiveFile != "" || input.TraceFile != "" || input.Start != "" || input.End != ""
return input.Backfill || input.ShowConfig.ArchiveFile != "" || input.ShowConfig.TraceFile != "" || input.ShowConfig.Start != "" || input.ShowConfig.End != ""
}

func (input *input) runLogCmd(ctx context.Context, logCmd *exec.Cmd, pub inputcursor.Publisher, log *logp.Logger) error {
Expand Down Expand Up @@ -318,53 +318,53 @@ func newLogShowCmd(ctx context.Context, cfg config) *exec.Cmd {
}

func newLogStreamCmd(ctx context.Context, cfg commonConfig) *exec.Cmd {
return exec.CommandContext(ctx, "log", newLogCmdArgs("stream", config{commonConfig: cfg})...) // #nosec G204
return exec.CommandContext(ctx, "log", newLogCmdArgs("stream", config{CommonConfig: cfg})...) // #nosec G204
}

func newLogCmdArgs(subcmd string, config config) []string {
args := []string{subcmd, "--style", "ndjson"}
if config.ArchiveFile != "" {
args = append(args, "--archive", config.ArchiveFile)
if config.ShowConfig.ArchiveFile != "" {
args = append(args, "--archive", config.ShowConfig.ArchiveFile)
}
if config.TraceFile != "" {
args = append(args, "--file", config.TraceFile)
if config.ShowConfig.TraceFile != "" {
args = append(args, "--file", config.ShowConfig.TraceFile)
}
if len(config.Predicate) > 0 {
for _, p := range config.Predicate {
if len(config.CommonConfig.Predicate) > 0 {
for _, p := range config.CommonConfig.Predicate {
args = append(args, "--predicate", p)
}
}
if len(config.Process) > 0 {
for _, p := range config.Process {
if len(config.CommonConfig.Process) > 0 {
for _, p := range config.CommonConfig.Process {
args = append(args, "--process", p)
}
}
if config.Source {
if config.CommonConfig.Source {
args = append(args, "--source")
}
if config.Info {
if config.CommonConfig.Info {
args = append(args, "--info")
}
if config.Debug {
if config.CommonConfig.Debug {
args = append(args, "--debug")
}
if config.Backtrace {
if config.CommonConfig.Backtrace {
args = append(args, "--backtrace")
}
if config.Signpost {
if config.CommonConfig.Signpost {
args = append(args, "--signpost")
}
if config.Unreliable {
if config.CommonConfig.Unreliable {
args = append(args, "--unreliable")
}
if config.MachContinuousTime {
if config.CommonConfig.MachContinuousTime {
args = append(args, "--mach-continuous-time")
}
if config.Start != "" {
args = append(args, "--start", config.Start)
if config.ShowConfig.Start != "" {
args = append(args, "--start", config.ShowConfig.Start)
}
if config.End != "" {
args = append(args, "--end", config.End)
if config.ShowConfig.End != "" {
args = append(args, "--end", config.ShowConfig.End)
}
return args
}
Expand Down
30 changes: 15 additions & 15 deletions x-pack/filebeat/input/unifiedlogs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestInput(t *testing.T) {
{
name: "Archive not found",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: "notfound.logarchive",
},
},
Expand All @@ -93,7 +93,7 @@ func TestInput(t *testing.T) {
{
name: "Archived file",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
},
},
Expand All @@ -104,7 +104,7 @@ func TestInput(t *testing.T) {
{
name: "Trace file",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
TraceFile: path.Join(archivePath, "logdata.LiveData.tracev3"),
},
},
Expand All @@ -115,7 +115,7 @@ func TestInput(t *testing.T) {
{
name: "With start date",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
Start: "2024-12-04 13:46:00+0200",
},
Expand All @@ -127,7 +127,7 @@ func TestInput(t *testing.T) {
{
name: "With start and end dates",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
Start: "2024-12-04 13:45:00+0200",
End: "2024-12-04 13:46:00+0200",
Expand All @@ -140,7 +140,7 @@ func TestInput(t *testing.T) {
{
name: "With end date",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
End: "2024-12-04 13:46:00+0200",
},
Expand All @@ -152,10 +152,10 @@ func TestInput(t *testing.T) {
{
name: "With predicate",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
},
commonConfig: commonConfig{
CommonConfig: commonConfig{
Predicate: []string{
`processImagePath == "/kernel"`,
},
Expand All @@ -168,10 +168,10 @@ func TestInput(t *testing.T) {
{
name: "With process",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
},
commonConfig: commonConfig{
CommonConfig: commonConfig{
Process: []string{
"0",
},
Expand All @@ -184,10 +184,10 @@ func TestInput(t *testing.T) {
{
name: "With optional flags",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
},
commonConfig: commonConfig{
CommonConfig: commonConfig{
Info: true,
Debug: true,
Backtrace: true,
Expand Down Expand Up @@ -253,10 +253,10 @@ func TestBackfillAndStream(t *testing.T) {

cfg := config{
Backfill: true,
showConfig: showConfig{
ShowConfig: showConfig{
Start: time.Now().Add(-5 * time.Second).Format("2006-01-02 15:04:05"),
},
commonConfig: commonConfig{
CommonConfig: commonConfig{
Info: true,
Debug: true,
Backtrace: true,
Expand Down Expand Up @@ -350,7 +350,7 @@ func filterLogCmdLine(buf []byte, cmd, cmdPrefix string) string {
continue
}

trimmed := strings.TrimPrefix(parts[3], cmdStartPrefix)
trimmed := strings.TrimPrefix(parts[3], cmdPrefix)
if strings.HasPrefix(trimmed, cmd) {
return trimmed
}
Expand Down

0 comments on commit 6d629e3

Please sign in to comment.