Skip to content

Commit

Permalink
Merge branch '8.15' into mergify/bp/8.15/pr-40121
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrehilbert authored Aug 12, 2024
2 parents b001e5e + 9a6b801 commit 0712dda
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]

*Heartbeat*
Expand Down
31 changes: 21 additions & 10 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand All @@ -48,12 +49,13 @@ type Input interface {

// Runner encapsulate the lifecycle of the input
type Runner struct {
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
statusReporter status.StatusReporter
}

// New instantiates a new Runner
Expand Down Expand Up @@ -83,10 +85,11 @@ func New(
}

context := Context{
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
GetStatusReporter: input.GetStatusReporter,
}
var ipt Input
ipt, err = f(conf, connector, context)
Expand Down Expand Up @@ -164,3 +167,11 @@ func (p *Runner) stop() {
func (p *Runner) String() string {
return fmt.Sprintf("input [type=%s]", p.config.Type)
}

func (p *Runner) SetStatusReporter(statusReporter status.StatusReporter) {
p.statusReporter = statusReporter
}

func (p *Runner) GetStatusReporter() status.StatusReporter {
return p.statusReporter
}
20 changes: 20 additions & 0 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Input struct {
meta map[string]string
stopOnce sync.Once
fileStateIdentifier file.StateIdentifier
getStatusReporter input.GetStatusReporter
}

// NewInput instantiates a new Log
Expand Down Expand Up @@ -157,8 +159,11 @@ func NewInput(
done: context.Done,
meta: meta,
fileStateIdentifier: identifier,
getStatusReporter: context.GetStatusReporter,
}

p.updateStatus(status.Starting, "starting the log input")

// Create empty harvester to check if configs are fine
// TODO: Do config validation instead
_, err = p.createHarvester(logger, file.State{}, nil)
Expand Down Expand Up @@ -224,6 +229,9 @@ func (p *Input) loadStates(states []file.State) error {

// Run runs the input
func (p *Input) Run() {
// Mark it Running for now.
// Any errors encountered in this loop will change state to degraded
p.updateStatus(status.Running, "")
logger := p.logger
logger.Debug("Start next scan")

Expand Down Expand Up @@ -558,6 +566,7 @@ func (p *Input) scan() {
continue
}
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf(harvesterErrMsg, newState.Source, err))
logger.Errorf(harvesterErrMsg, newState.Source, err)
}
} else {
Expand All @@ -583,6 +592,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol
logger.Debugf("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
err := p.startHarvester(logger, newState, oldState.Offset)
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err))
logger.Errorf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
}
return
Expand All @@ -593,6 +603,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol
logger.Debugf("Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Offset, newState.Fileinfo.Size())
err := p.startHarvester(logger, newState, 0)
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err))
logger.Errorf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
}

Expand Down Expand Up @@ -833,3 +844,12 @@ func (p *Input) stopWhenDone() {

p.Wait()
}

func (p *Input) updateStatus(status status.Status, msg string) {
if p.getStatusReporter == nil {
return
}
if reporter := p.getStatusReporter(); reporter != nil {
reporter.UpdateStatus(status, msg)
}
}
12 changes: 8 additions & 4 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

type GetStatusReporter func() status.StatusReporter

type Context struct {
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
GetStatusReporter GetStatusReporter
}

// Factory is used to register functions creating new Input instances.
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) {
// Queue metrics are reported under the pipeline namespace
var pipelineMetrics *monitoring.Registry
if c.monitors.Metrics != nil {
pipelineMetrics := c.monitors.Metrics.GetRegistry("pipeline")
pipelineMetrics = c.monitors.Metrics.GetRegistry("pipeline")
if pipelineMetrics == nil {
pipelineMetrics = c.monitors.Metrics.NewRegistry("pipeline")
}
Expand Down
24 changes: 24 additions & 0 deletions libbeat/publisher/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"

//"github.com/elastic/beats/v7/libbeat/tests/resources"

Expand Down Expand Up @@ -243,3 +244,26 @@ func TestQueueProducerBlocksUntilOutputIsSet(t *testing.T) {
})
assert.True(t, allFinished, "All queueProducer requests should be unblocked once an output is set")
}

func TestQueueMetrics(t *testing.T) {
// More thorough testing of queue metrics are in the queue package,
// here we just want to make sure that they appear under the right
// monitoring namespace.
reg := monitoring.NewRegistry()
controller := outputController{
queueFactory: memqueue.FactoryForSettings(memqueue.Settings{Events: 1000}),
consumer: &eventConsumer{
targetChan: make(chan consumerTarget, 4),
retryObserver: nilObserver,
},
monitors: Monitors{Metrics: reg},
}
controller.Set(outputs.Group{
Clients: []outputs.Client{newMockClient(nil)},
})
entry := reg.Get("pipeline.queue.max_events")
require.NotNil(t, entry, "pipeline.queue.max_events must exist")
value, ok := entry.(*monitoring.Uint)
require.True(t, ok, "pipeline.queue.max_events must be a *monitoring.Uint")
assert.Equal(t, uint64(1000), value.Get(), "pipeline.queue.max_events should match the events configuration key")
}
Loading

0 comments on commit 0712dda

Please sign in to comment.