Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stopping / tracking harvester implementation #964

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion filebeat/beater/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,13 @@ func (p *syncLogPublisher) Start() {

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
// Only send event with bytes read. 0 Bytes means state update only

// Only publish events with content, means more then 0 bytes read
// 0 Bytes event can be status reports like renames from harvesters
if event.Bytes > 0 {
pubEvents = append(pubEvents, event.ToMapStr())
} else {
logp.Debug("publish", "REPORTING STATE: %+v", event)
}
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *Crawler) Stop() {

logp.Info("Stopping %v prospectors", len(c.prospectors))
for _, prospector := range c.prospectors {
prospector.Stop()
go prospector.Stop()
}
c.wg.Wait()
logp.Info("Crawler stopped")
Expand Down
69 changes: 52 additions & 17 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
package crawler

import (
"expvar"
"fmt"
"sync"
"time"

"github.com/satori/go.uuid"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
)

// Puts number of running harvesters into expvar
var harvesterCounter = expvar.NewInt("harvesters")

type Prospector struct {
ProspectorConfig cfg.ProspectorConfig
prospectorer Prospectorer
channel chan *input.FileEvent
registrar *Registrar
done chan struct{}
ProspectorConfig cfg.ProspectorConfig
prospectorer Prospectorer
channel chan *input.FileEvent
registrar *Registrar
harvesters map[uuid.UUID]*harvester.Harvester
harvestersWaitGroup *sync.WaitGroup
done chan struct{}
}

type Prospectorer interface {
Expand All @@ -26,10 +34,12 @@ type Prospectorer interface {

func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar, channel chan *input.FileEvent) (*Prospector, error) {
prospector := &Prospector{
ProspectorConfig: prospectorConfig,
registrar: registrar,
channel: channel,
done: make(chan struct{}),
ProspectorConfig: prospectorConfig,
registrar: registrar,
channel: channel,
harvesters: map[uuid.UUID]*harvester.Harvester{},
harvestersWaitGroup: &sync.WaitGroup{},
done: make(chan struct{}),
}

err := prospector.Init()
Expand Down Expand Up @@ -76,15 +86,10 @@ func (p *Prospector) Init() error {
// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Prospector) Run(wg *sync.WaitGroup) {

// TODO: Defer the wg.Done() call to block shutdown
// Currently there are 2 cases where shutting down the prospector could be blocked:
// 1. reading from file
// 2. forwarding event to spooler
// As this is not implemented yet, no blocking on prospector shutdown is done.
wg.Done()

logp.Info("Starting prospector of type: %v", p.ProspectorConfig.Harvester.InputType)

defer wg.Done()

for {
select {
case <-p.done:
Expand All @@ -98,18 +103,48 @@ func (p *Prospector) Run(wg *sync.WaitGroup) {
}

func (p *Prospector) Stop() {
// : Wait until all prospectors have exited the Run part.
logp.Info("Stopping Prospector")
close(p.done)

//logp.Debug("prospector", "Stopping %d harvesters.", len(p.harvesters))
for _, h := range p.harvesters {
go h.Stop()
}
//logp.Debug("prospector", "Waiting for %d harvesters to stop", len(p.harvesters))
p.harvestersWaitGroup.Wait()

}

func (p *Prospector) AddHarvester(file string, stat *harvester.FileStat) (*harvester.Harvester, error) {
// CreateHarvester creates a harvester based on the given params
// Note: Not every harvester that is created is necessarly started as it can
// a harvester for the same file/input already exists
func (p *Prospector) CreateHarvester(file string, stat *harvester.FileStat) (*harvester.Harvester, error) {

h, err := harvester.NewHarvester(
&p.ProspectorConfig.Harvester, file, stat, p.channel)

p.harvesters[h.Id] = h

return h, err
}

func (p *Prospector) RunHarvester(h *harvester.Harvester) {
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)
logp.Debug("harvester", "Starting harvester: %v", h.Id)

harvesterCounter.Add(1)
p.harvestersWaitGroup.Add(1)

go func(h2 *harvester.Harvester) {
defer func() {
p.harvestersWaitGroup.Done()
harvesterCounter.Add(-1)
}()
h2.Harvest()
}(h)
}

// Setup Prospector Config
func (p *Prospector) setupProspectorConfig() error {
var err error
Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (p *ProspectorLog) scanGlob(glob string) {
newInfo := harvester.NewFileStat(newFile.FileInfo, p.iteration)

// Init harvester with info
h, err := p.Prospector.AddHarvester(file, newInfo)
h, err := p.Prospector.CreateHarvester(file, newInfo)

if err != nil {
logp.Err("Error initializing harvester: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/prospector_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) {

var err error

prospectorer.harvester, err = p.AddHarvester("-", nil)
prospectorer.harvester, err = p.CreateHarvester("-", nil)

if err != nil {
return nil, fmt.Errorf("Error initializing stdin harvester: %v", err)
Expand Down
14 changes: 11 additions & 3 deletions filebeat/crawler/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
package crawler

import (
"sync"
"testing"
"time"

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -187,7 +191,9 @@ func TestProspectorInitInputTypeStdin(t *testing.T) {
}

prospector := Prospector{
ProspectorConfig: prospectorConfig,
ProspectorConfig: prospectorConfig,
harvestersWaitGroup: &sync.WaitGroup{},
harvesters: map[uuid.UUID]*harvester.Harvester{},
}

err := prospector.Init()
Expand All @@ -204,7 +210,8 @@ func TestProspectorInitInputTypeWrong(t *testing.T) {
}

prospector := Prospector{
ProspectorConfig: prospectorConfig,
ProspectorConfig: prospectorConfig,
harvestersWaitGroup: &sync.WaitGroup{},
}

err := prospector.Init()
Expand All @@ -222,7 +229,8 @@ func TestProspectorFileExclude(t *testing.T) {
}

prospector := Prospector{
ProspectorConfig: prospectorConfig,
ProspectorConfig: prospectorConfig,
harvestersWaitGroup: &sync.WaitGroup{},
}

prospector.Init()
Expand Down
8 changes: 7 additions & 1 deletion filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Registrar struct {

Channel chan []*FileEvent
done chan struct{}
wg sync.WaitGroup
}

func NewRegistrar(registryFile string) (*Registrar, error) {
Expand Down Expand Up @@ -79,10 +80,14 @@ func (r *Registrar) LoadState() {
}

func (r *Registrar) Run() {
r.wg.Add(1)
logp.Info("Starting Registrar")

// Writes registry on shutdown
defer r.writeRegistry()
defer func() {
r.writeRegistry()
r.wg.Done()
}()

for {
select {
Expand Down Expand Up @@ -123,6 +128,7 @@ func (r *Registrar) processEvents(events []*FileEvent) {
func (r *Registrar) Stop() {
logp.Info("Stopping Registrar")
close(r.done)
r.wg.Wait()
// Note: don't block using waitGroup, cause this method is run by async signal handler
}

Expand Down
12 changes: 12 additions & 0 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"regexp"
"sync"

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/input"
)

type Harvester struct {
Id uuid.UUID
Path string /* the file path to harvest */
Config *config.HarvesterConfig
offset int64
Expand All @@ -34,8 +37,10 @@ type Harvester struct {
SpoolerChan chan *input.FileEvent
encoding encoding.EncodingFactory
file FileSource /* the file being watched */
fileLock sync.Mutex
ExcludeLinesRegexp []*regexp.Regexp
IncludeLinesRegexp []*regexp.Regexp
done chan struct{}
}

func NewHarvester(
Expand All @@ -52,11 +57,13 @@ func NewHarvester(
}

h := &Harvester{
Id: uuid.NewV4(), // Unique identifier of each harvester
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is a random harvester ID required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not, see #1144 (comment) It is more I quite like UUID.

Path: path,
Config: cfg,
Stat: stat,
SpoolerChan: spooler,
encoding: encoding,
done: make(chan struct{}),
}
h.ExcludeLinesRegexp, err = InitRegexps(cfg.ExcludeLines)
if err != nil {
Expand All @@ -73,3 +80,8 @@ func (h *Harvester) Start() {
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)
go h.Harvest()
}

func (h *Harvester) Stop() {
//logp.Debug("harvester", "Stopping harvester: %v", h.Id)
close(h.done)
}
4 changes: 3 additions & 1 deletion filebeat/harvester/linereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ func createLineReader(
readerConfig logFileReaderConfig,
jsonConfig *config.JSONConfig,
mlrConfig *config.MultilineConfig,
done chan struct{},
) (processor.LineProcessor, error) {

var p processor.LineProcessor
var err error

fileReader, err := newLogFileReader(in, readerConfig)
fileReader, err := newLogFileReader(in, readerConfig, done)
if err != nil {
return nil, err
}
Expand Down
Loading