Skip to content

Commit

Permalink
Adopt parsers in Filebeat's journald input (#29070)
Browse files Browse the repository at this point in the history
One test is added that runs Filebeat reading from a journald file, it
only tests one parser, however that should be enough to ensure parsers
are supported on journald input.

Splits from #26130
  • Loading branch information
belimawr authored Nov 30, 2021
1 parent 51463bf commit 8fcad13
Show file tree
Hide file tree
Showing 10 changed files with 464 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support in aws-s3 input for s3 notification from SNS to SQS. {pull}28800[28800]
- Add support in aws-s3 input for custom script parsing of s3 notifications. {pull}28946[28946]
- Improve error handling in aws-s3 input for malformed s3 notifications. {issue}28828[28828] {pull}28946[28946]
- Add support for parsers on journald input {pull}29070[29070]

*Heartbeat*

Expand Down
19 changes: 19 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,22 @@ filebeat.inputs:
# Configure stream to filter to a specific stream: stdout, stderr or all (default)
#stream: all

#------------------------------ Journald input --------------------------------
# Journald input is experimental.
#- type: journald
#enabled: true
#id: service-foo

# You may wish to have separate inputs for each service. You can use
# include_matches to specify a list of filter expressions that are
# applied as a logical OR. You may specify filter
#include_matches:
#- _SYSTEMD_UNIT=foo.service

# Parsers are also supported, here is an example of the multiline
# parser.
#parsers:
#- multiline:
#type: count
#count_lines: 3

19 changes: 19 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,25 @@ filebeat.inputs:
# Configure stream to filter to a specific stream: stdout, stderr or all (default)
#stream: all

#------------------------------ Journald input --------------------------------
# Journald input is experimental.
#- type: journald
#enabled: true
#id: service-foo

# You may wish to have separate inputs for each service. You can use
# include_matches to specify a list of filter expressions that are
# applied as a logical OR. You may specify filter
#include_matches:
#- _SYSTEMD_UNIT=foo.service

# Parsers are also supported, here is an example of the multiline
# parser.
#parsers:
#- multiline:
#type: count
#count_lines: 3


# =========================== Filebeat autodiscover ============================

Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/journald/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield"
"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalread"
"github.com/elastic/beats/v7/libbeat/reader/parser"
)

// Config stores the options of a journald input.
Expand All @@ -51,6 +52,9 @@ type config struct {

// SaveRemoteHostname defines if the original source of the entry needs to be saved.
SaveRemoteHostname bool `config:"save_remote_hostname"`

// Parsers configuration
Parsers parser.Config `config:",inline"`
}

var errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback")
Expand Down
286 changes: 286 additions & 0 deletions filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build linux && cgo && withjournald
// +build linux,cgo,withjournald

package journald

import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/go-concert/unison"
)

type inputTestingEnvironment struct {
t *testing.T
workingDir string
stateStore *testInputStore
pipeline *mockPipelineConnector

pluginInitOnce sync.Once
plugin v2.Plugin

wg sync.WaitGroup
grp unison.TaskGroup
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
return &inputTestingEnvironment{
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
}
}

func (e *inputTestingEnvironment) getManager() v2.InputManager {
e.pluginInitOnce.Do(func() {
e.plugin = Plugin(logp.L(), e.stateStore)
})
return e.plugin.Manager
}

func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) v2.Input {
e.t.Helper()
e.grp = unison.TaskGroup{}
manager := e.getManager()
if err := manager.Init(&e.grp, v2.ModeRun); err != nil {
e.t.Fatalf("failed to initialise manager: %+v", err)
}

c := common.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
e.t.Fatalf("failed to create input using manager: %+v", err)
}

return inp
}

func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) {
e.wg.Add(1)
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
defer wg.Done()
defer grp.Stop()

inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx}
inp.Run(inputCtx, e.pipeline)
}(&e.wg, &e.grp)
}

// waitUntilEventCount waits until total count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
e.t.Helper()
for {
sum := len(e.pipeline.GetAllEvents())
if sum == count {
return
}
if count < sum {
e.t.Fatalf("too many events; expected: %d, actual: %d", count, sum)
}
time.Sleep(10 * time.Millisecond)
}
}

func (e *inputTestingEnvironment) waitUntilInputStops() {
e.wg.Wait()
}

func (e *inputTestingEnvironment) abspath(filename string) string {
return filepath.Join(e.workingDir, filename)
}

func (e *inputTestingEnvironment) mustWriteFile(filename string, lines []byte) {
e.t.Helper()
path := e.abspath(filename)
if err := os.WriteFile(path, lines, 0644); err != nil {
e.t.Fatalf("failed to write file '%s': %+v", path, err)
}
}

type testInputStore struct {
registry *statestore.Registry
}

func openTestStatestore() *testInputStore {
return &testInputStore{
registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()),
}
}

func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

func (s *testInputStore) CleanupInterval() time.Duration {
return 24 * time.Hour
}

type mockClient struct {
publishing []beat.Event
published []beat.Event
ackHandler beat.ACKer
closed bool
mtx sync.Mutex
canceler context.CancelFunc
}

// GetEvents returns the published events
func (c *mockClient) GetEvents() []beat.Event {
c.mtx.Lock()
defer c.mtx.Unlock()

return c.published
}

// Publish mocks the Client Publish method
func (c *mockClient) Publish(e beat.Event) {
c.PublishAll([]beat.Event{e})
}

// PublishAll mocks the Client PublishAll method
func (c *mockClient) PublishAll(events []beat.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()

c.publishing = append(c.publishing, events...)
for _, event := range events {
c.ackHandler.AddEvent(event, true)
}
c.ackHandler.ACKEvents(len(events))

for _, event := range events {
c.published = append(c.published, event)
}
}

func (c *mockClient) waitUntilPublishingHasStarted() {
for len(c.publishing) == 0 {
time.Sleep(10 * time.Millisecond)
}
}

// Close mocks the Client Close method
func (c *mockClient) Close() error {
c.mtx.Lock()
defer c.mtx.Unlock()

if c.closed {
return fmt.Errorf("mock client already closed")
}

c.closed = true
return nil
}

// mockPipelineConnector mocks the PipelineConnector interface
type mockPipelineConnector struct {
blocking bool
clients []*mockClient
mtx sync.Mutex
}

// GetAllEvents returns all events associated with a pipeline
func (pc *mockPipelineConnector) GetAllEvents() []beat.Event {
pc.mtx.Lock()
defer pc.mtx.Unlock()

var evList []beat.Event
for _, clientEvents := range pc.clients {
evList = append(evList, clientEvents.GetEvents()...)
}

return evList
}

// Connect mocks the PipelineConnector Connect method
func (pc *mockPipelineConnector) Connect() (beat.Client, error) {
return pc.ConnectWith(beat.ClientConfig{})
}

// ConnectWith mocks the PipelineConnector ConnectWith method
func (pc *mockPipelineConnector) ConnectWith(config beat.ClientConfig) (beat.Client, error) {
pc.mtx.Lock()
defer pc.mtx.Unlock()

ctx, cancel := context.WithCancel(context.Background())
c := &mockClient{
canceler: cancel,
ackHandler: newMockACKHandler(ctx, pc.blocking, config),
}

pc.clients = append(pc.clients, c)

return c, nil

}

func (pc *mockPipelineConnector) cancelAllClients() {
pc.mtx.Lock()
defer pc.mtx.Unlock()

for _, client := range pc.clients {
client.canceler()
}
}

func (pc *mockPipelineConnector) cancelClient(i int) {
pc.mtx.Lock()
defer pc.mtx.Unlock()

if len(pc.clients) < i+1 {
return
}

pc.clients[i].canceler()
}

func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.ACKer {
if !blocking {
return config.ACKHandler
}

return acker.Combine(blockingACKer(starter), config.ACKHandler)

}

func blockingACKer(starter context.Context) beat.ACKer {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
for starter.Err() == nil {
}
})
}
Loading

0 comments on commit 8fcad13

Please sign in to comment.