Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #42119) [filebeat] Add unifiedlogs input to agent #42135

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions x-pack/agentbeat/agentbeat.spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ inputs:
platforms: *platforms
outputs: *outputs
command: *filebeat_command
- name: unifiedlogs
description: "macOS Unified logs"
platforms: *platforms
outputs: *outputs
command: *filebeat_command
- name: unix
description: "Unix Socket"
platforms: *platforms
Expand Down
18 changes: 9 additions & 9 deletions x-pack/filebeat/input/unifiedlogs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

type config struct {
showConfig
commonConfig
Backfill bool `config:"backfill"`
ShowConfig showConfig `config:",inline"`
CommonConfig commonConfig `config:",inline"`
Backfill bool `config:"backfill"`
}

type showConfig struct {
Expand All @@ -38,17 +38,17 @@ type commonConfig struct {
}

func (c config) Validate() error {
if err := checkDateFormat(c.Start); err != nil {
if err := checkDateFormat(c.ShowConfig.Start); err != nil {
return fmt.Errorf("start date is not valid: %w", err)
}
if err := checkDateFormat(c.End); err != nil {
if err := checkDateFormat(c.ShowConfig.End); err != nil {
return fmt.Errorf("end date is not valid: %w", err)
}
if c.ArchiveFile != "" && !strings.HasSuffix(c.ArchiveFile, ".logarchive") {
return fmt.Errorf("archive_file %v has the wrong extension", c.ArchiveFile)
if c.ShowConfig.ArchiveFile != "" && !strings.HasSuffix(c.ShowConfig.ArchiveFile, ".logarchive") {
return fmt.Errorf("archive_file %v has the wrong extension", c.ShowConfig.ArchiveFile)
}
if c.TraceFile != "" && !strings.HasSuffix(c.TraceFile, ".tracev3") {
return fmt.Errorf("trace_file %v has the wrong extension", c.TraceFile)
if c.ShowConfig.TraceFile != "" && !strings.HasSuffix(c.ShowConfig.TraceFile, ".tracev3") {
return fmt.Errorf("trace_file %v has the wrong extension", c.ShowConfig.TraceFile)
}
return nil
}
Expand Down
62 changes: 62 additions & 0 deletions x-pack/filebeat/input/unifiedlogs/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build darwin

package unifiedlogs

import (
"testing"

"github.com/stretchr/testify/assert"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestConfig(t *testing.T) {
const cfgYaml = `
archive_file: /path/to/file.logarchive
trace_file: /path/to/file.tracev3
start: 2024-12-04 13:46:00+0200
end: 2024-12-04 13:46:00+0200
predicate:
- pid == 1
process:
- sudo
source: true
info: true
debug: true
backtrace: true
signpost: true
unreliable: true
mach_continuous_time: true
backfill: true
`

expected := config{
ShowConfig: showConfig{
ArchiveFile: "/path/to/file.logarchive",
TraceFile: "/path/to/file.tracev3",
Start: "2024-12-04 13:46:00+0200",
End: "2024-12-04 13:46:00+0200",
},
CommonConfig: commonConfig{
Predicate: []string{"pid == 1"},
Process: []string{"sudo"},
Source: true,
Info: true,
Debug: true,
Backtrace: true,
Signpost: true,
Unreliable: true,
MachContinuousTime: true,
},
Backfill: true,
}

c := conf.MustNewConfigFrom(cfgYaml)
cfg := defaultConfig()
assert.NoError(t, c.Unpack(&cfg))
assert.EqualValues(t, expected, cfg)
}
52 changes: 26 additions & 26 deletions x-pack/filebeat/input/unifiedlogs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type source struct {
}

func newSource(config config) source {
if config.ArchiveFile != "" || config.TraceFile != "" {
if config.ShowConfig.ArchiveFile != "" || config.ShowConfig.TraceFile != "" {
return source{name: srcArchiveName}
}
return source{name: srcPollName}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (input *input) Run(ctxt v2.Context, src inputcursor.Source, resumeCursor in
return err
}
if startFrom != "" {
input.Start = startFrom
input.ShowConfig.Start = startFrom
}

return input.runWithMetrics(stdCtx, pub, log)
Expand All @@ -135,7 +135,7 @@ func (input *input) runWithMetrics(ctx context.Context, pub inputcursor.Publishe
// backfilling process.
if input.mustStream() {
g.Go(func() error {
logCmd := newLogStreamCmd(ctx, input.commonConfig)
logCmd := newLogStreamCmd(ctx, input.CommonConfig)
return input.runLogCmd(ctx, logCmd, wrappedPub, log)
})
}
Expand All @@ -149,7 +149,7 @@ func (input *input) runWithMetrics(ctx context.Context, pub inputcursor.Publishe
// To avoid potentially losing data we move the end forward one second,
// since it is preferable to have some duplicated events.
t = t.Add(time.Second)
input.End = t.Format(cursorDateLayout)
input.ShowConfig.End = t.Format(cursorDateLayout)

// to avoid race conditions updating the cursor, and to be able to
// resume from the oldest point in time, we only update cursor
Expand All @@ -171,14 +171,14 @@ func (input *input) runWithMetrics(ctx context.Context, pub inputcursor.Publishe
// mustStream returns true in case a stream command is needed.
// This is the default case and the only exceptions are when an archive file or an end date are set.
func (input *input) mustStream() bool {
return !(input.ArchiveFile != "" || input.TraceFile != "" || input.End != "")
return !(input.ShowConfig.ArchiveFile != "" || input.ShowConfig.TraceFile != "" || input.ShowConfig.End != "")
}

// mustBackfill returns true in case a show command is needed.
// This happens when start or end dates are set (for example when resuming filebeat), when an archive file is used,
// or when user forces it via the backfill config.
func (input *input) mustBackfill() bool {
return input.Backfill || input.ArchiveFile != "" || input.TraceFile != "" || input.Start != "" || input.End != ""
return input.Backfill || input.ShowConfig.ArchiveFile != "" || input.ShowConfig.TraceFile != "" || input.ShowConfig.Start != "" || input.ShowConfig.End != ""
}

func (input *input) runLogCmd(ctx context.Context, logCmd *exec.Cmd, pub inputcursor.Publisher, log *logp.Logger) error {
Expand Down Expand Up @@ -318,53 +318,53 @@ func newLogShowCmd(ctx context.Context, cfg config) *exec.Cmd {
}

func newLogStreamCmd(ctx context.Context, cfg commonConfig) *exec.Cmd {
return exec.CommandContext(ctx, "log", newLogCmdArgs("stream", config{commonConfig: cfg})...) // #nosec G204
return exec.CommandContext(ctx, "log", newLogCmdArgs("stream", config{CommonConfig: cfg})...) // #nosec G204
}

func newLogCmdArgs(subcmd string, config config) []string {
args := []string{subcmd, "--style", "ndjson"}
if config.ArchiveFile != "" {
args = append(args, "--archive", config.ArchiveFile)
if config.ShowConfig.ArchiveFile != "" {
args = append(args, "--archive", config.ShowConfig.ArchiveFile)
}
if config.TraceFile != "" {
args = append(args, "--file", config.TraceFile)
if config.ShowConfig.TraceFile != "" {
args = append(args, "--file", config.ShowConfig.TraceFile)
}
if len(config.Predicate) > 0 {
for _, p := range config.Predicate {
if len(config.CommonConfig.Predicate) > 0 {
for _, p := range config.CommonConfig.Predicate {
args = append(args, "--predicate", p)
}
}
if len(config.Process) > 0 {
for _, p := range config.Process {
if len(config.CommonConfig.Process) > 0 {
for _, p := range config.CommonConfig.Process {
args = append(args, "--process", p)
}
}
if config.Source {
if config.CommonConfig.Source {
args = append(args, "--source")
}
if config.Info {
if config.CommonConfig.Info {
args = append(args, "--info")
}
if config.Debug {
if config.CommonConfig.Debug {
args = append(args, "--debug")
}
if config.Backtrace {
if config.CommonConfig.Backtrace {
args = append(args, "--backtrace")
}
if config.Signpost {
if config.CommonConfig.Signpost {
args = append(args, "--signpost")
}
if config.Unreliable {
if config.CommonConfig.Unreliable {
args = append(args, "--unreliable")
}
if config.MachContinuousTime {
if config.CommonConfig.MachContinuousTime {
args = append(args, "--mach-continuous-time")
}
if config.Start != "" {
args = append(args, "--start", config.Start)
if config.ShowConfig.Start != "" {
args = append(args, "--start", config.ShowConfig.Start)
}
if config.End != "" {
args = append(args, "--end", config.End)
if config.ShowConfig.End != "" {
args = append(args, "--end", config.ShowConfig.End)
}
return args
}
Expand Down
30 changes: 15 additions & 15 deletions x-pack/filebeat/input/unifiedlogs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestInput(t *testing.T) {
{
name: "Archive not found",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: "notfound.logarchive",
},
},
Expand All @@ -93,7 +93,7 @@ func TestInput(t *testing.T) {
{
name: "Archived file",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
},
},
Expand All @@ -104,7 +104,7 @@ func TestInput(t *testing.T) {
{
name: "Trace file",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
TraceFile: path.Join(archivePath, "logdata.LiveData.tracev3"),
},
},
Expand All @@ -115,7 +115,7 @@ func TestInput(t *testing.T) {
{
name: "With start date",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
Start: "2024-12-04 13:46:00+0200",
},
Expand All @@ -127,7 +127,7 @@ func TestInput(t *testing.T) {
{
name: "With start and end dates",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
Start: "2024-12-04 13:45:00+0200",
End: "2024-12-04 13:46:00+0200",
Expand All @@ -140,7 +140,7 @@ func TestInput(t *testing.T) {
{
name: "With end date",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
End: "2024-12-04 13:46:00+0200",
},
Expand All @@ -152,10 +152,10 @@ func TestInput(t *testing.T) {
{
name: "With predicate",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
},
commonConfig: commonConfig{
CommonConfig: commonConfig{
Predicate: []string{
`processImagePath == "/kernel"`,
},
Expand All @@ -168,10 +168,10 @@ func TestInput(t *testing.T) {
{
name: "With process",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
},
commonConfig: commonConfig{
CommonConfig: commonConfig{
Process: []string{
"0",
},
Expand All @@ -184,10 +184,10 @@ func TestInput(t *testing.T) {
{
name: "With optional flags",
cfg: config{
showConfig: showConfig{
ShowConfig: showConfig{
ArchiveFile: archivePath,
},
commonConfig: commonConfig{
CommonConfig: commonConfig{
Info: true,
Debug: true,
Backtrace: true,
Expand Down Expand Up @@ -253,10 +253,10 @@ func TestBackfillAndStream(t *testing.T) {

cfg := config{
Backfill: true,
showConfig: showConfig{
ShowConfig: showConfig{
Start: time.Now().Add(-5 * time.Second).Format("2006-01-02 15:04:05"),
},
commonConfig: commonConfig{
CommonConfig: commonConfig{
Info: true,
Debug: true,
Backtrace: true,
Expand Down Expand Up @@ -350,7 +350,7 @@ func filterLogCmdLine(buf []byte, cmd, cmdPrefix string) string {
continue
}

trimmed := strings.TrimPrefix(parts[3], cmdStartPrefix)
trimmed := strings.TrimPrefix(parts[3], cmdPrefix)
if strings.HasPrefix(trimmed, cmd) {
return trimmed
}
Expand Down
Loading