Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] ETW input - use errgroup #38009

Merged
merged 3 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 40 additions & 55 deletions x-pack/filebeat/input/etw/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"golang.org/x/sync/errgroup"
"golang.org/x/sys/windows"
)

Expand Down Expand Up @@ -66,6 +67,7 @@ type etwInput struct {
log *logp.Logger
config config
etwSession *etw.Session
publisher stateless.Publisher
operator sessionOperator
}

Expand Down Expand Up @@ -105,10 +107,13 @@ func (e *etwInput) Run(ctx input.Context, publisher stateless.Publisher) error {
if err != nil {
return fmt.Errorf("error initializing ETW session: %w", err)
}
e.etwSession.Callback = e.consumeEvent
e.publisher = publisher

// Set up logger with session information
e.log = ctx.Logger.With("session", e.etwSession.Name)
e.log.Info("Starting " + inputName + " input")
defer e.log.Info(inputName + " input stopped")

// Handle realtime session creation or attachment
if e.etwSession.Realtime {
Expand All @@ -125,71 +130,31 @@ func (e *etwInput) Run(ctx input.Context, publisher stateless.Publisher) error {
if err != nil {
return fmt.Errorf("realtime session could not be created: %w", err)
}
e.log.Debug("created session")
e.log.Debug("created new session")
}
}
// Defer the cleanup and closing of resources
var wg sync.WaitGroup
var once sync.Once

// Create an error channel to communicate errors from the goroutine
errChan := make(chan error, 1)
stopConsumer := sync.OnceFunc(e.Close)
defer stopConsumer()

defer func() {
once.Do(e.Close)
e.log.Info(inputName + " input stopped")
// Stop the consumer upon input cancellation (shutdown).
go func() {
<-ctx.Cancelation.Done()
stopConsumer()
}()

// eventReceivedCallback processes each ETW event
eventReceivedCallback := func(record *etw.EventRecord) uintptr {
if record == nil {
e.log.Error("received null event record")
return 1
}

e.log.Debugf("received event %d with length %d", record.EventHeader.EventDescriptor.Id, record.UserDataLength)

data, err := etw.GetEventProperties(record)
if err != nil {
e.log.Errorw("failed to read event properties", "error", err)
return 1
}

evt := buildEvent(data, record.EventHeader, e.etwSession, e.config)
publisher.Publish(evt)

return 0
}

// Set the callback function for the ETW session
e.etwSession.Callback = eventReceivedCallback

// Start a goroutine to consume ETW events
wg.Add(1)
go func() {
defer wg.Done()
e.log.Debug("starting to listen ETW events")
g := new(errgroup.Group)
g.Go(func() error {
e.log.Debug("starting ETW consumer")
defer e.log.Debug("stopped ETW consumer")
if err = e.operator.startConsumer(e.etwSession); err != nil {
errChan <- fmt.Errorf("failed to start consumer: %w", err) // Send error to channel
return
return fmt.Errorf("failed running ETW consumer: %w", err)
}
e.log.Debug("stopped to read ETW events from session")
errChan <- nil
}()
return nil
})

// We ensure resources are closed when receiving a cancellation signal
go func() {
<-ctx.Cancelation.Done()
once.Do(e.Close)
}()

wg.Wait() // Ensure all goroutines have finished before closing
close(errChan)
if err, ok := <-errChan; ok && err != nil {
return err
}

return nil
return g.Wait()
}

var (
Expand Down Expand Up @@ -271,6 +236,26 @@ func convertFileTimeToGoTime(fileTime64 uint64) time.Time {
return time.Unix(0, fileTime.Nanoseconds()).UTC()
}

func (e *etwInput) consumeEvent(record *etw.EventRecord) uintptr {
if record == nil {
e.log.Error("received null event record")
return 1
}

e.log.Debugf("received event with ID %d and user-data length %d", record.EventHeader.EventDescriptor.Id, record.UserDataLength)

data, err := etw.GetEventProperties(record)
if err != nil {
e.log.Errorw("failed to read event properties", "error", err)
return 1
}

evt := buildEvent(data, record.EventHeader, e.etwSession, e.config)
e.publisher.Publish(evt)

return 0
}

// Close stops the ETW session and logs the outcome.
func (e *etwInput) Close() {
if err := e.operator.stopSession(e.etwSession); err != nil {
Expand Down
17 changes: 10 additions & 7 deletions x-pack/filebeat/input/etw/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func Test_RunEtwInput_AttachToExistingSessionError(t *testing.T) {
mockSession := &etw.Session{
Name: "MySession",
Realtime: true,
NewSession: false}
NewSession: false,
}
return mockSession, nil
}
// Setup the mock behavior for AttachToExistingSession
Expand Down Expand Up @@ -146,7 +147,8 @@ func Test_RunEtwInput_CreateRealtimeSessionError(t *testing.T) {
mockSession := &etw.Session{
Name: "MySession",
Realtime: true,
NewSession: true}
NewSession: true,
}
return mockSession, nil
}
// Setup the mock behavior for AttachToExistingSession
Expand Down Expand Up @@ -189,7 +191,8 @@ func Test_RunEtwInput_StartConsumerError(t *testing.T) {
mockSession := &etw.Session{
Name: "MySession",
Realtime: true,
NewSession: true}
NewSession: true,
}
return mockSession, nil
}
// Setup the mock behavior for AttachToExistingSession
Expand Down Expand Up @@ -232,7 +235,7 @@ func Test_RunEtwInput_StartConsumerError(t *testing.T) {

// Run test
err := etwInput.Run(inputCtx, nil)
assert.EqualError(t, err, "failed to start consumer: mock error")
assert.EqualError(t, err, "failed running ETW consumer: mock error")
}

func Test_RunEtwInput_Success(t *testing.T) {
Expand All @@ -244,7 +247,8 @@ func Test_RunEtwInput_Success(t *testing.T) {
mockSession := &etw.Session{
Name: "MySession",
Realtime: true,
NewSession: true}
NewSession: true,
}
return mockSession, nil
}
// Setup the mock behavior for AttachToExistingSession
Expand Down Expand Up @@ -471,7 +475,6 @@ func Test_buildEvent(t *testing.T) {
assert.Equal(t, tt.expected["event.severity"], mapEv["event.severity"])
assert.Equal(t, tt.expected["log.file.path"], mapEv["log.file.path"])
assert.Equal(t, tt.expected["log.level"], mapEv["log.level"])

})
}
}
Expand All @@ -495,7 +498,7 @@ func Test_convertFileTimeToGoTime(t *testing.T) {
{
name: "TestActualDate",
fileTime: 133515900000000000, // February 05, 2024, 7:00:00 AM
want: time.Date(2024, 02, 05, 7, 0, 0, 0, time.UTC),
want: time.Date(2024, 0o2, 0o5, 7, 0, 0, 0, time.UTC),
},
}

Expand Down
4 changes: 1 addition & 3 deletions x-pack/libbeat/reader/etw/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,8 @@ func (s *Session) StartConsumer() error {
// Open an ETW trace processing handle for consuming events
// from an ETW real-time trace session or an ETW log file.
s.traceHandler, err = s.openTrace(&elf)

switch {
case err == nil:

// Handle specific errors for trace opening.
case errors.Is(err, ERROR_BAD_PATHNAME):
return fmt.Errorf("invalid log source when opening trace: %w", err)
Expand All @@ -241,10 +239,10 @@ func (s *Session) StartConsumer() error {
default:
return fmt.Errorf("failed to open trace: %w", err)
}

// Process the trace. This function blocks until processing ends.
if err := s.processTrace(&s.traceHandler, 1, nil, nil); err != nil {
return fmt.Errorf("failed to process trace: %w", err)
}

return nil
}
Loading