diff --git a/filebeat/beater/publish.go b/filebeat/beater/publish.go index 7fadc16d24f7..5ead3b174858 100644 --- a/filebeat/beater/publish.go +++ b/filebeat/beater/publish.go @@ -101,9 +101,13 @@ func (p *syncLogPublisher) Start() { pubEvents := make([]common.MapStr, 0, len(events)) for _, event := range events { - // Only send event with bytes read. 0 Bytes means state update only + + // Only publish events with content, means more then 0 bytes read + // 0 Bytes event can be status reports like renames from harvesters if event.Bytes > 0 { pubEvents = append(pubEvents, event.ToMapStr()) + } else { + logp.Debug("publish", "REPORTING STATE: %+v", event) } } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 15961cc06131..1982c8ec418f 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -66,7 +66,7 @@ func (c *Crawler) Stop() { logp.Info("Stopping %v prospectors", len(c.prospectors)) for _, prospector := range c.prospectors { - prospector.Stop() + go prospector.Stop() } c.wg.Wait() logp.Info("Crawler stopped") diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index 762aec7e37ea..015283fed16a 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -1,22 +1,30 @@ package crawler import ( + "expvar" "fmt" "sync" "time" + "github.com/satori/go.uuid" + cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/logp" ) +// Puts number of running harvesters into expvar +var harvesterCounter = expvar.NewInt("harvesters") + type Prospector struct { - ProspectorConfig cfg.ProspectorConfig - prospectorer Prospectorer - channel chan *input.FileEvent - registrar *Registrar - done chan struct{} + ProspectorConfig cfg.ProspectorConfig + prospectorer Prospectorer + channel chan *input.FileEvent + registrar *Registrar + harvesters map[uuid.UUID]*harvester.Harvester + harvestersWaitGroup *sync.WaitGroup + done chan struct{} } type Prospectorer interface { @@ -26,10 +34,12 @@ type Prospectorer interface { func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar, channel chan *input.FileEvent) (*Prospector, error) { prospector := &Prospector{ - ProspectorConfig: prospectorConfig, - registrar: registrar, - channel: channel, - done: make(chan struct{}), + ProspectorConfig: prospectorConfig, + registrar: registrar, + channel: channel, + harvesters: map[uuid.UUID]*harvester.Harvester{}, + harvestersWaitGroup: &sync.WaitGroup{}, + done: make(chan struct{}), } err := prospector.Init() @@ -76,15 +86,10 @@ func (p *Prospector) Init() error { // Starts scanning through all the file paths and fetch the related files. Start a harvester for each file func (p *Prospector) Run(wg *sync.WaitGroup) { - // TODO: Defer the wg.Done() call to block shutdown - // Currently there are 2 cases where shutting down the prospector could be blocked: - // 1. reading from file - // 2. forwarding event to spooler - // As this is not implemented yet, no blocking on prospector shutdown is done. - wg.Done() - logp.Info("Starting prospector of type: %v", p.ProspectorConfig.Harvester.InputType) + defer wg.Done() + for { select { case <-p.done: @@ -98,18 +103,48 @@ func (p *Prospector) Run(wg *sync.WaitGroup) { } func (p *Prospector) Stop() { + // : Wait until all prospectors have exited the Run part. logp.Info("Stopping Prospector") close(p.done) + + //logp.Debug("prospector", "Stopping %d harvesters.", len(p.harvesters)) + for _, h := range p.harvesters { + go h.Stop() + } + //logp.Debug("prospector", "Waiting for %d harvesters to stop", len(p.harvesters)) + p.harvestersWaitGroup.Wait() + } -func (p *Prospector) AddHarvester(file string, stat *harvester.FileStat) (*harvester.Harvester, error) { +// CreateHarvester creates a harvester based on the given params +// Note: Not every harvester that is created is necessarly started as it can +// a harvester for the same file/input already exists +func (p *Prospector) CreateHarvester(file string, stat *harvester.FileStat) (*harvester.Harvester, error) { h, err := harvester.NewHarvester( &p.ProspectorConfig.Harvester, file, stat, p.channel) + p.harvesters[h.Id] = h + return h, err } +func (p *Prospector) RunHarvester(h *harvester.Harvester) { + // Starts harvester and picks the right type. In case type is not set, set it to defeault (log) + logp.Debug("harvester", "Starting harvester: %v", h.Id) + + harvesterCounter.Add(1) + p.harvestersWaitGroup.Add(1) + + go func(h2 *harvester.Harvester) { + defer func() { + p.harvestersWaitGroup.Done() + harvesterCounter.Add(-1) + }() + h2.Harvest() + }(h) +} + // Setup Prospector Config func (p *Prospector) setupProspectorConfig() error { var err error diff --git a/filebeat/crawler/prospector_log.go b/filebeat/crawler/prospector_log.go index 6202e271d8eb..467278b7803e 100644 --- a/filebeat/crawler/prospector_log.go +++ b/filebeat/crawler/prospector_log.go @@ -120,7 +120,7 @@ func (p *ProspectorLog) scanGlob(glob string) { newInfo := harvester.NewFileStat(newFile.FileInfo, p.iteration) // Init harvester with info - h, err := p.Prospector.AddHarvester(file, newInfo) + h, err := p.Prospector.CreateHarvester(file, newInfo) if err != nil { logp.Err("Error initializing harvester: %v", err) diff --git a/filebeat/crawler/prospector_stdin.go b/filebeat/crawler/prospector_stdin.go index c3089fd77148..e939cdc18cdc 100644 --- a/filebeat/crawler/prospector_stdin.go +++ b/filebeat/crawler/prospector_stdin.go @@ -21,7 +21,7 @@ func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) { var err error - prospectorer.harvester, err = p.AddHarvester("-", nil) + prospectorer.harvester, err = p.CreateHarvester("-", nil) if err != nil { return nil, fmt.Errorf("Error initializing stdin harvester: %v", err) diff --git a/filebeat/crawler/prospector_test.go b/filebeat/crawler/prospector_test.go index f93c3cd104ee..e59cc041a0c0 100644 --- a/filebeat/crawler/prospector_test.go +++ b/filebeat/crawler/prospector_test.go @@ -3,10 +3,14 @@ package crawler import ( + "sync" "testing" "time" + "github.com/satori/go.uuid" + "github.com/elastic/beats/filebeat/config" + "github.com/elastic/beats/filebeat/harvester" "github.com/stretchr/testify/assert" ) @@ -187,7 +191,9 @@ func TestProspectorInitInputTypeStdin(t *testing.T) { } prospector := Prospector{ - ProspectorConfig: prospectorConfig, + ProspectorConfig: prospectorConfig, + harvestersWaitGroup: &sync.WaitGroup{}, + harvesters: map[uuid.UUID]*harvester.Harvester{}, } err := prospector.Init() @@ -204,7 +210,8 @@ func TestProspectorInitInputTypeWrong(t *testing.T) { } prospector := Prospector{ - ProspectorConfig: prospectorConfig, + ProspectorConfig: prospectorConfig, + harvestersWaitGroup: &sync.WaitGroup{}, } err := prospector.Init() @@ -222,7 +229,8 @@ func TestProspectorFileExclude(t *testing.T) { } prospector := Prospector{ - ProspectorConfig: prospectorConfig, + ProspectorConfig: prospectorConfig, + harvestersWaitGroup: &sync.WaitGroup{}, } prospector.Init() diff --git a/filebeat/crawler/registrar.go b/filebeat/crawler/registrar.go index 04cd2f71c5e0..4a5597ab530f 100644 --- a/filebeat/crawler/registrar.go +++ b/filebeat/crawler/registrar.go @@ -25,6 +25,7 @@ type Registrar struct { Channel chan []*FileEvent done chan struct{} + wg sync.WaitGroup } func NewRegistrar(registryFile string) (*Registrar, error) { @@ -79,10 +80,14 @@ func (r *Registrar) LoadState() { } func (r *Registrar) Run() { + r.wg.Add(1) logp.Info("Starting Registrar") // Writes registry on shutdown - defer r.writeRegistry() + defer func() { + r.writeRegistry() + r.wg.Done() + }() for { select { @@ -123,6 +128,7 @@ func (r *Registrar) processEvents(events []*FileEvent) { func (r *Registrar) Stop() { logp.Info("Stopping Registrar") close(r.done) + r.wg.Wait() // Note: don't block using waitGroup, cause this method is run by async signal handler } diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index f97cb5457507..995049993c50 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -19,12 +19,15 @@ import ( "regexp" "sync" + "github.com/satori/go.uuid" + "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/input" ) type Harvester struct { + Id uuid.UUID Path string /* the file path to harvest */ Config *config.HarvesterConfig offset int64 @@ -34,8 +37,10 @@ type Harvester struct { SpoolerChan chan *input.FileEvent encoding encoding.EncodingFactory file FileSource /* the file being watched */ + fileLock sync.Mutex ExcludeLinesRegexp []*regexp.Regexp IncludeLinesRegexp []*regexp.Regexp + done chan struct{} } func NewHarvester( @@ -52,11 +57,13 @@ func NewHarvester( } h := &Harvester{ + Id: uuid.NewV4(), // Unique identifier of each harvester Path: path, Config: cfg, Stat: stat, SpoolerChan: spooler, encoding: encoding, + done: make(chan struct{}), } h.ExcludeLinesRegexp, err = InitRegexps(cfg.ExcludeLines) if err != nil { @@ -73,3 +80,8 @@ func (h *Harvester) Start() { // Starts harvester and picks the right type. In case type is not set, set it to defeault (log) go h.Harvest() } + +func (h *Harvester) Stop() { + //logp.Debug("harvester", "Stopping harvester: %v", h.Id) + close(h.done) +} diff --git a/filebeat/harvester/linereader.go b/filebeat/harvester/linereader.go index 23b436d54d1b..931ffa65174f 100644 --- a/filebeat/harvester/linereader.go +++ b/filebeat/harvester/linereader.go @@ -15,11 +15,13 @@ func createLineReader( readerConfig logFileReaderConfig, jsonConfig *config.JSONConfig, mlrConfig *config.MultilineConfig, + done chan struct{}, ) (processor.LineProcessor, error) { + var p processor.LineProcessor var err error - fileReader, err := newLogFileReader(in, readerConfig) + fileReader, err := newLogFileReader(in, readerConfig, done) if err != nil { return nil, err } diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 398d2ba7e804..f09b421431f3 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -24,8 +24,8 @@ func (h *Harvester) Harvest() { // Make sure file is closed as soon as harvester exits // If file was never properly opened, it can't be closed - if h.file != nil { - h.file.Close() + if h.getFile() != nil { + h.closeFile() logp.Debug("harvester", "Stopping harvester, closing file: %s", h.Path) } else { logp.Debug("harvester", "Stopping harvester, NOT closing file as file info not available: %s", h.Path) @@ -38,7 +38,7 @@ func (h *Harvester) Harvest() { return } - h.fileInfo, err = h.file.Stat() + h.fileInfo, err = h.getFile().Stat() if err != nil { logp.Err("Stop Harvesting. Unexpected file stat rror: %s", err) return @@ -59,19 +59,35 @@ func (h *Harvester) Harvest() { } reader, err := createLineReader( - h.file, enc, config.BufferSize, config.MaxBytes, readerConfig, - config.JSON, config.Multiline) + h.getFile(), enc, config.BufferSize, config.MaxBytes, readerConfig, + config.JSON, config.Multiline, h.done) + if err != nil { logp.Err("Stop Harvesting. Unexpected encoding line reader error: %s", err) return } + go func() { + // Closes file so readLine returns error + // TODO: What happens to this if h.done never closed? + <-h.done + h.closeFile() + }() + + // Report status harvester + h.sendEvent(h.createEvent()) + for { + select { + case <-h.done: + return + default: + } // Partial lines return error and are only read on completion ts, text, bytesRead, jsonFields, err := readLine(reader) if err != nil { if err == errFileTruncate { - seeker, ok := h.file.(io.Seeker) + seeker, ok := h.getFile().(io.Seeker) if !ok { logp.Err("can not seek source") return @@ -102,13 +118,25 @@ func (h *Harvester) Harvest() { } // Always send event to update state, also if lines was skipped - h.sendEvent(event) + sent := h.sendEvent(event) + if !sent { + return + } + } +} + +// sendEvent sends event to the spooler channel +func (h *Harvester) sendEvent(event *input.FileEvent) bool { + select { + case <-h.done: + return false + case h.SpoolerChan <- event: // ship the new event downstream } + return true } -// createEvent creates and empty event. -// By default the offset is set to 0, means no bytes read. This can be used to report the status -// of a harvester +// createEvent creates a FileEvent. +// By default this is an "empty" event with 0 bytes read, means this is only used to update the state func (h *Harvester) createEvent() *input.FileEvent { return &input.FileEvent{ EventMetadata: h.Config.EventMetadata, @@ -122,11 +150,6 @@ func (h *Harvester) createEvent() *input.FileEvent { } } -// sendEvent sends event to the spooler channel -func (h *Harvester) sendEvent(event *input.FileEvent) { - h.SpoolerChan <- event // ship the new event downstream -} - // shouldExportLine decides if the line is exported or not based on // the include_lines and exclude_lines options. func (h *Harvester) shouldExportLine(line string) bool { @@ -159,8 +182,8 @@ func (h *Harvester) open() (encoding.Encoding, error) { } func (h *Harvester) openStdin() (encoding.Encoding, error) { - h.file = pipeSource{os.Stdin} - return h.encoding(h.file) + h.setFile(pipeSource{os.Stdin}) + return h.encoding(h.getFile()) } // openFile opens a file and checks for the encoding. In case the encoding cannot be detected @@ -202,7 +225,7 @@ func (h *Harvester) openFile() (encoding.Encoding, error) { } // yay, open file - h.file = fileSource{file} + h.setFile(fileSource{file}) return encoding, nil } @@ -258,3 +281,24 @@ func (h *Harvester) GetOffset() int64 { return h.offset } + +func (h *Harvester) getFile() FileSource { + h.fileLock.Lock() + defer h.fileLock.Unlock() + + return h.file +} + +func (h *Harvester) setFile(file FileSource) { + h.fileLock.Lock() + defer h.fileLock.Unlock() + + h.file = file +} + +func (h *Harvester) closeFile() { + h.fileLock.Lock() + defer h.fileLock.Unlock() + + h.file.Close() +} diff --git a/filebeat/harvester/log_test.go b/filebeat/harvester/log_test.go index cd4de1259090..4b32494fe993 100644 --- a/filebeat/harvester/log_test.go +++ b/filebeat/harvester/log_test.go @@ -64,7 +64,9 @@ func TestReadLine(t *testing.T) { maxBackoffDuration: 1 * time.Second, backoffFactor: 2, } - reader, _ := createLineReader(fileSource{readFile}, codec, 100, 1000, readConfig, nil, nil) + + done := make(chan struct{}) + reader, _ := createLineReader(fileSource{readFile}, codec, 100, 1000, readConfig, nil, nil, done) // Read third line _, text, bytesread, _, err := readLine(reader) diff --git a/filebeat/harvester/reader.go b/filebeat/harvester/reader.go index 5a0586e4a09d..0ded7c626d82 100644 --- a/filebeat/harvester/reader.go +++ b/filebeat/harvester/reader.go @@ -18,6 +18,7 @@ type logFileReader struct { lastTimeRead time.Time backoff time.Duration + done chan struct{} } type logFileReaderConfig struct { @@ -37,6 +38,7 @@ var ( func newLogFileReader( fs FileSource, config logFileReaderConfig, + done chan struct{}, ) (*logFileReader, error) { var offset int64 if seeker, ok := fs.(io.Seeker); ok { @@ -53,6 +55,7 @@ func newLogFileReader( config: config, lastTimeRead: time.Now(), backoff: config.backoffDuration, + done: done, }, nil } @@ -71,6 +74,11 @@ func (r *logFileReader) Read(buf []byte) (int, error) { } for { + select { + case <-r.done: + return 0, nil + default: + } n, err := r.fs.Read(buf) if n > 0 { r.offset += int64(n) diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 3cad178636b9..6a4aa6093326 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -38,8 +38,7 @@ def test_registrar_file_content(self): c = self.log_contains_count("states written") self.wait_until( - lambda: self.log_contains( - "Processing 5 events"), + lambda: self.output_has(lines=5), max_timeout=15) # Make sure states written appears one more time @@ -115,9 +114,9 @@ def test_registrar_files(self): filebeat = self.start_beat() self.wait_until( - lambda: self.log_contains( - "Processing 10 events"), + lambda: self.output_has(lines=10), max_timeout=15) + # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. self.wait_until( @@ -147,8 +146,7 @@ def test_custom_registry_file_location(self): f.write("hello world\n") filebeat = self.start_beat() self.wait_until( - lambda: self.log_contains( - "Processing 1 events"), + lambda: self.output_has(lines=1), max_timeout=15) # wait until the registry file exist. Needed to avoid a race between # the logging and actual writing the file. Seems to happen on Windows. diff --git a/filebeat/tests/system/test_shutdown.py b/filebeat/tests/system/test_shutdown.py index 023af42f5f95..e603128a8e71 100644 --- a/filebeat/tests/system/test_shutdown.py +++ b/filebeat/tests/system/test_shutdown.py @@ -9,7 +9,7 @@ """ class Test(BaseTest): - @unittest.skip("Needs fix from #964") + #@unittest.skip("Needs fix from #964") def test_shutdown(self): """ Test starting and stopping Filebeat under load.