diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index ede58e25bcf..60b4ff0609e 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -21,10 +21,9 @@ import ( "fmt" "sync" - "github.com/elastic/beats/v7/filebeat/channel" + "github.com/mitchellh/hashstructure" + "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/filebeat/input/file" - "github.com/elastic/beats/v7/filebeat/registrar" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" @@ -32,9 +31,9 @@ import ( ) type crawler struct { - inputs map[uint64]*input.Runner + log *logp.Logger + inputs map[uint64]cfgfile.Runner inputConfigs []*common.Config - out channel.Factory wg sync.WaitGroup inputsFactory cfgfile.RunnerFactory modulesFactory cfgfile.RunnerFactory @@ -46,14 +45,13 @@ type crawler struct { func newCrawler( inputFactory, module cfgfile.RunnerFactory, - out channel.Factory, inputConfigs []*common.Config, beatDone chan struct{}, once bool, ) (*crawler, error) { return &crawler{ - out: out, - inputs: map[uint64]*input.Runner{}, + log: logp.NewLogger("crawler"), + inputs: map[uint64]cfgfile.Runner{}, inputsFactory: inputFactory, modulesFactory: module, inputConfigs: inputConfigs, @@ -65,16 +63,16 @@ func newCrawler( // Start starts the crawler with all inputs func (c *crawler) Start( pipeline beat.Pipeline, - r *registrar.Registrar, configInputs *common.Config, configModules *common.Config, ) error { + log := c.log - logp.Info("Loading Inputs: %v", len(c.inputConfigs)) + log.Infof("Loading Inputs: %v", len(c.inputConfigs)) // Prospect the globs/paths given on the command line and launch harvesters for _, inputConfig := range c.inputConfigs { - err := c.startInput(pipeline, inputConfig, r.GetStates()) + err := c.startInput(pipeline, inputConfig) if err != nil { return fmt.Errorf("starting input failed: %+v", err) } @@ -107,7 +105,7 @@ func (c *crawler) Start( }() } - logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs)) + log.Infof("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs)) return nil } @@ -115,26 +113,33 @@ func (c *crawler) Start( func (c *crawler) startInput( pipeline beat.Pipeline, config *common.Config, - states []file.State, ) error { if !config.Enabled() { return nil } - connector := c.out(pipeline) - p, err := input.New(config, connector, c.beatDone, states, nil) + var h map[string]interface{} + config.Unpack(&h) + id, err := hashstructure.Hash(h, nil) if err != nil { - return fmt.Errorf("Error while initializing input: %s", err) + return fmt.Errorf("can not compute id from configuration: %v", err) + } + if _, ok := c.inputs[id]; ok { + return fmt.Errorf("input with same ID already exists: %v", id) } - p.Once = c.once - if _, ok := c.inputs[p.ID]; ok { - return fmt.Errorf("Input with same ID already exists: %d", p.ID) + runner, err := c.inputsFactory.Create(pipeline, config, nil) + if err != nil { + return fmt.Errorf("Error while initializing input: %+v", err) + } + if inputRunner, ok := runner.(*input.Runner); ok { + inputRunner.Once = c.once } - c.inputs[p.ID] = p + c.inputs[id] = runner - p.Start() + c.log.Info("Starting input (ID: %d)", id) + runner.Start() return nil } @@ -151,9 +156,13 @@ func (c *crawler) Stop() { } logp.Info("Stopping %v inputs", len(c.inputs)) - for _, p := range c.inputs { - // Stop inputs in parallel - asyncWaitStop(p.Stop) + // Stop inputs in parallel + for id, p := range c.inputs { + id, p := id, p + asyncWaitStop(func() { + c.log.Infof("Stopping input: %d", id) + p.Stop() + }) } if c.inputReloader != nil { diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 226646150e4..c339c0a6cc0 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -249,7 +249,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { inputLoader := input.NewRunnerFactory(pipelineConnector, registrar, fb.done) moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) - crawler, err := newCrawler(inputLoader, moduleLoader, pipelineConnector, config.Inputs, fb.done, *once) + crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) return err @@ -283,7 +283,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Debug("modules", "Existing Ingest pipelines will be updated") } - err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules) + err = crawler.Start(b.Publisher, config.ConfigInput, config.ConfigModules) if err != nil { crawler.Stop() return fmt.Errorf("Failed to start crawler: %+v", err) diff --git a/filebeat/input/input.go b/filebeat/input/input.go index 06e62e03555..f488fb1ec87 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -22,8 +22,6 @@ import ( "sync" "time" - "github.com/mitchellh/hashstructure" - "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/libbeat/common" @@ -52,7 +50,6 @@ type Runner struct { input Input done chan struct{} wg *sync.WaitGroup - ID uint64 Once bool beatDone chan struct{} } @@ -78,13 +75,6 @@ func New( return nil, err } - var h map[string]interface{} - conf.Unpack(&h) - input.ID, err = hashstructure.Hash(h, nil) - if err != nil { - return nil, err - } - var f Factory f, err = GetFactory(input.config.Type) if err != nil { @@ -111,7 +101,6 @@ func New( // Start starts the input func (p *Runner) Start() { p.wg.Add(1) - logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID) onceWg := sync.WaitGroup{} if p.Once { @@ -164,8 +153,6 @@ func (p *Runner) Stop() { } func (p *Runner) stop() { - logp.Info("Stopping Input: %d", p.ID) - // In case of once, it will be waited until harvesters close itself if p.Once { p.input.Wait() @@ -175,5 +162,5 @@ func (p *Runner) stop() { } func (p *Runner) String() string { - return fmt.Sprintf("input [type=%s, ID=%d]", p.config.Type, p.ID) + return fmt.Sprintf("input [type=%s]", p.config.Type) } diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index d5e9027fc9c..449f22da937 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -1086,7 +1086,7 @@ def test_restart_state_reset(self): # Wait until inputs are started self.wait_until( lambda: self.log_contains_count( - "Starting input of type: log", logfile="filebeat2.log") >= 1, + "Starting input", logfile="filebeat2.log") >= 1, max_timeout=10) filebeat.check_kill_and_wait()