From c36a3f4982b8e67837062825dc1d911d136c269d Mon Sep 17 00:00:00 2001 From: Unknwon Date: Thu, 26 Jul 2018 11:46:12 +0800 Subject: [PATCH] reader/tailx: improve status check and implement DaemonReader --- reader/tailx/tailx.go | 412 +++++++++++++++++++------------------ reader/tailx/tailx_test.go | 20 +- 2 files changed, 224 insertions(+), 208 deletions(-) diff --git a/reader/tailx/tailx.go b/reader/tailx/tailx.go index 9ec12d0d9..31bafd9df 100644 --- a/reader/tailx/tailx.go +++ b/reader/tailx/tailx.go @@ -21,33 +21,42 @@ import ( . "github.com/qiniu/logkit/utils/models" ) +var ( + _ reader.DaemonReader = &Reader{} + _ reader.StatsReader = &Reader{} + _ reader.LagReader = &Reader{} + _ reader.Reader = &Reader{} + _ Resetable = &Reader{} +) + func init() { reader.RegisterConstructor(reader.ModeTailx, NewReader) } type Reader struct { - started bool - status int32 + meta *reader.Meta + // Note: 原子操作,用于表示 reader 整体的运行状态 + status int32 + + stopChan chan struct{} + msgChan chan Result + errChan chan error + + stats StatsInfo + statsLock sync.RWMutex + fileReaders map[string]*ActiveReader armapmux sync.Mutex - startmux sync.Mutex - curFile string + currentFile string headRegexp *regexp.Regexp cacheMap map[string]string - msgChan chan Result - errChan chan error - //以下为传入参数 - meta *reader.Meta logPathPattern string expire time.Duration statInterval time.Duration maxOpenFiles int whence string - - stats StatsInfo - statsLock sync.RWMutex } type ActiveReader struct { @@ -251,10 +260,10 @@ func (ar *ActiveReader) expired(expireDur time.Duration) bool { return false } -func NewReader(meta *reader.Meta, conf conf.MapConf) (mr reader.Reader, err error) { +func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) { logPathPattern, err := conf.GetString(reader.KeyLogPath) if err != nil { - return + return nil, err } whence, _ := conf.GetStringOr(reader.KeyWhence, reader.WhenceOldest) @@ -301,309 +310,316 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (mr reader.Reader, err erro return &Reader{ meta: meta, + status: reader.StatusInit, + stopChan: make(chan struct{}), + msgChan: make(chan Result), + errChan: make(chan error), logPathPattern: logPathPattern, whence: whence, expire: expire, statInterval: statInterval, maxOpenFiles: maxOpenFiles, - started: false, - startmux: sync.Mutex{}, - status: reader.StatusInit, fileReaders: make(map[string]*ActiveReader), //armapmux cacheMap: cacheMap, //armapmux - armapmux: sync.Mutex{}, - msgChan: make(chan Result), - errChan: make(chan error), - statsLock: sync.RWMutex{}, }, nil +} +func (r *Reader) isStopping() bool { + return atomic.LoadInt32(&r.status) == reader.StatusStopping } -//Expire 函数关闭过期的文件,再更新 -func (mr *Reader) Expire() { - var paths []string - if atomic.LoadInt32(&mr.status) == reader.StatusStopped { - return - } - mr.armapmux.Lock() - defer mr.armapmux.Unlock() - if atomic.LoadInt32(&mr.status) == reader.StatusStopped { - return - } - for path, ar := range mr.fileReaders { - if ar.expired(mr.expire) { - ar.Close() - delete(mr.fileReaders, path) - delete(mr.cacheMap, path) - mr.meta.RemoveSubMeta(path) - paths = append(paths, path) - } - } - if len(paths) > 0 { - log.Infof("Runner[%v] expired logpath: %v", mr.meta.RunnerName, strings.Join(paths, ", ")) - } +func (r *Reader) hasStopped() bool { + return atomic.LoadInt32(&r.status) == reader.StatusStopped +} + +func (r *Reader) Name() string { + return "TailxReader: " + r.logPathPattern } -func (mr *Reader) SetMode(mode string, value interface{}) (err error) { +func (r *Reader) SetMode(mode string, value interface{}) error { reg, err := reader.HeadPatternMode(mode, value) if err != nil { - return fmt.Errorf("%v setmode error %v", mr.Name(), err) + return fmt.Errorf("get head pattern mode: %v", err) } if reg != nil { - mr.headRegexp = reg + r.headRegexp = reg } - return + return nil +} + +func (r *Reader) setStatsError(err string) { + r.statsLock.Lock() + defer r.statsLock.Unlock() + r.stats.LastError = err } -func (mr *Reader) sendError(err error) { +func (r *Reader) sendError(err error) { if err == nil { return } defer func() { - if r := recover(); r != nil { - log.Errorf("Reader %s Recovered from %v", mr.Name(), r) + if rec := recover(); rec != nil { + log.Errorf("Reader %q was panicked and recovered from %v", r.Name(), rec) } }() - mr.errChan <- err + r.errChan <- err +} + +// checkExpiredFiles 函数关闭过期的文件,再更新 +func (r *Reader) checkExpiredFiles() { + r.armapmux.Lock() + defer r.armapmux.Unlock() + + var paths []string + for path, ar := range r.fileReaders { + if ar.expired(r.expire) { + ar.Close() + delete(r.fileReaders, path) + delete(r.cacheMap, path) + r.meta.RemoveSubMeta(path) + paths = append(paths, path) + } + } + if len(paths) > 0 { + log.Infof("Runner[%v] expired logpath: %v", r.meta.RunnerName, strings.Join(paths, ", ")) + } } -func (mr *Reader) StatLogPath() { +func (r *Reader) statLogPath() { //达到最大打开文件数,不再追踪 - if len(mr.fileReaders) >= mr.maxOpenFiles { - log.Warnf("Runner[%v] %v meet maxOpenFiles limit %v, ignore Stat new log...", mr.meta.RunnerName, mr.Name(), mr.maxOpenFiles) + if len(r.fileReaders) >= r.maxOpenFiles { + log.Warnf("Runner[%v] %v meet maxOpenFiles limit %v, ignore Stat new log...", r.meta.RunnerName, r.Name(), r.maxOpenFiles) return } - matches, err := filepath.Glob(mr.logPathPattern) + matches, err := filepath.Glob(r.logPathPattern) if err != nil { - log.Errorf("Runner[%v] stat logPathPattern error %v", mr.meta.RunnerName, err) - mr.setStatsError("Runner[" + mr.meta.RunnerName + "] stat logPathPattern error " + err.Error()) + log.Errorf("Runner[%v] stat logPathPattern error %v", r.meta.RunnerName, err) + r.setStatsError("Runner[" + r.meta.RunnerName + "] stat logPathPattern error " + err.Error()) return } if len(matches) > 0 { - log.Debugf("Runner[%v] StatLogPath %v find matches: %v", mr.meta.RunnerName, mr.logPathPattern, strings.Join(matches, ", ")) + log.Debugf("Runner[%v] statLogPath %v find matches: %v", r.meta.RunnerName, r.logPathPattern, strings.Join(matches, ", ")) } var newaddsPath []string for _, mc := range matches { rp, fi, err := GetRealPath(mc) if err != nil { - log.Errorf("Runner[%v] file pattern %v match %v stat error %v, ignore this match...", mr.meta.RunnerName, mr.logPathPattern, mc, err) + log.Errorf("Runner[%v] file pattern %v match %v stat error %v, ignore this match...", r.meta.RunnerName, r.logPathPattern, mc, err) continue } if fi.IsDir() { - log.Debugf("Runner[%v] %v is dir, mode[tailx] only support read file, ignore this match...", mr.meta.RunnerName, mc) + log.Debugf("Runner[%v] %v is dir, mode[tailx] only support read file, ignore this match...", r.meta.RunnerName, mc) continue } - mr.armapmux.Lock() - _, ok := mr.fileReaders[rp] - mr.armapmux.Unlock() + r.armapmux.Lock() + _, ok := r.fileReaders[rp] + r.armapmux.Unlock() if ok { - log.Debugf("Runner[%v] <%v> is collecting, ignore...", mr.meta.RunnerName, rp) + log.Debugf("Runner[%v] <%v> is collecting, ignore...", r.meta.RunnerName, rp) continue } - mr.armapmux.Lock() - cacheline := mr.cacheMap[rp] - mr.armapmux.Unlock() + r.armapmux.Lock() + cacheline := r.cacheMap[rp] + r.armapmux.Unlock() //过期的文件不追踪,除非之前追踪的并且有日志没读完 - if cacheline == "" && fi.ModTime().Add(mr.expire).Before(time.Now()) { - log.Debugf("Runner[%v] <%v> is expired, ignore...", mr.meta.RunnerName, mc) + if cacheline == "" && fi.ModTime().Add(r.expire).Before(time.Now()) { + log.Debugf("Runner[%v] <%v> is expired, ignore...", r.meta.RunnerName, mc) continue } - ar, err := NewActiveReader(mc, rp, mr.whence, mr.meta, mr.msgChan, mr.errChan) + ar, err := NewActiveReader(mc, rp, r.whence, r.meta, r.msgChan, r.errChan) if err != nil { - err = fmt.Errorf("runner[%v] NewActiveReader for matches %v error %v", mr.meta.RunnerName, rp, err) - mr.sendError(err) + err = fmt.Errorf("runner[%v] NewActiveReader for matches %v error %v", r.meta.RunnerName, rp, err) + r.sendError(err) log.Error(err, ", ignore this match...") continue } ar.readcache = cacheline - if mr.headRegexp != nil { - err = ar.br.SetMode(reader.ReadModeHeadPatternRegexp, mr.headRegexp) + if r.headRegexp != nil { + err = ar.br.SetMode(reader.ReadModeHeadPatternRegexp, r.headRegexp) if err != nil { - log.Errorf("Runner[%v] NewActiveReader for matches %v SetMode error %v", mr.meta.RunnerName, rp, err) - mr.setStatsError("Runner[" + mr.meta.RunnerName + "] NewActiveReader for matches " + rp + " SetMode error " + err.Error()) + log.Errorf("Runner[%v] NewActiveReader for matches %v SetMode error %v", r.meta.RunnerName, rp, err) + r.setStatsError("Runner[" + r.meta.RunnerName + "] NewActiveReader for matches " + rp + " SetMode error " + err.Error()) } } newaddsPath = append(newaddsPath, rp) - mr.armapmux.Lock() - if atomic.LoadInt32(&mr.status) != reader.StatusStopped { - if err = mr.meta.AddSubMeta(rp, ar.br.Meta); err != nil { - log.Errorf("Runner[%v] %v add submeta for %v err %v, but this reader will still working", mr.meta.RunnerName, mc, rp, err) + r.armapmux.Lock() + if atomic.LoadInt32(&r.status) != reader.StatusStopped { + if err = r.meta.AddSubMeta(rp, ar.br.Meta); err != nil { + log.Errorf("Runner[%v] %v add submeta for %v err %v, but this reader will still working", r.meta.RunnerName, mc, rp, err) } - mr.fileReaders[rp] = ar + r.fileReaders[rp] = ar } else { - log.Warnf("Runner[%v] %v NewActiveReader but reader was stopped, ignore this...", mr.meta.RunnerName, mc) + log.Warnf("Runner[%v] %v NewActiveReader but reader was stopped, ignore this...", r.meta.RunnerName, mc) } - mr.armapmux.Unlock() - if atomic.LoadInt32(&mr.status) != reader.StatusStopped { + r.armapmux.Unlock() + if atomic.LoadInt32(&r.status) != reader.StatusStopped { go ar.Run() } else { - log.Warnf("Runner[%v] %v NewActiveReader but reader was stopped, will not running...", mr.meta.RunnerName, mc) + log.Warnf("Runner[%v] %v NewActiveReader but reader was stopped, will not running...", r.meta.RunnerName, mc) } } if len(newaddsPath) > 0 { - log.Infof("Runner[%v] StatLogPath find new logpath: %v", mr.meta.RunnerName, strings.Join(newaddsPath, ", ")) + log.Infof("Runner[%v] statLogPath find new logpath: %v", r.meta.RunnerName, strings.Join(newaddsPath, ", ")) } } -func (mr *Reader) getActiveReaders() []*ActiveReader { - mr.armapmux.Lock() - defer mr.armapmux.Unlock() +func (r *Reader) Start() error { + if r.isStopping() || r.hasStopped() { + return errors.New("reader is stopping or has stopped") + } else if !atomic.CompareAndSwapInt32(&r.status, reader.StatusInit, reader.StatusRunning) { + log.Warnf("Runner[%v] %q daemon has already started and is running", r.meta.RunnerName, r.Name()) + return nil + } + + go func() { + ticker := time.NewTicker(r.statInterval) + defer ticker.Stop() + for { + r.checkExpiredFiles() + r.statLogPath() + + select { + case <-r.stopChan: + atomic.StoreInt32(&r.status, reader.StatusStopped) + log.Infof("Runner[%v] %q daemon has stopped from running", r.meta.RunnerName, r.Name()) + return + case <-ticker.C: + } + } + }() + log.Infof("Runner[%v] %q daemon has started", r.meta.RunnerName, r.Name()) + return nil +} + +func (r *Reader) getActiveReaders() []*ActiveReader { + r.armapmux.Lock() + defer r.armapmux.Unlock() var ars []*ActiveReader - for _, ar := range mr.fileReaders { + for _, ar := range r.fileReaders { ars = append(ars, ar) } return ars } -func (mr *Reader) Name() string { - return "MultiReader:" + mr.logPathPattern +func (r *Reader) Source() string { + return r.currentFile } -func (mr *Reader) Source() string { - return mr.curFile -} +// Note: 对 currentFile 的操作非线程安全,需由上层逻辑保证同步调用 ReadLine +func (r *Reader) ReadLine() (string, error) { + timer := time.NewTimer(time.Second) + defer timer.Stop() + select { + case msg := <-r.msgChan: + r.currentFile = msg.logpath + return msg.result, nil + case err := <-r.errChan: + return "", err + case <-timer.C: + } -func (mr *Reader) setStatsError(err string) { - mr.statsLock.Lock() - defer mr.statsLock.Unlock() - mr.stats.LastError = err + return "", nil } -func (mr *Reader) Status() StatsInfo { - mr.statsLock.RLock() - defer mr.statsLock.RUnlock() +func (r *Reader) Status() StatsInfo { + r.statsLock.RLock() + defer r.statsLock.RUnlock() - ars := mr.getActiveReaders() + ars := r.getActiveReaders() for _, ar := range ars { st := ar.Status() if st.LastError != "" { - mr.stats.LastError += "\n<" + ar.originpath + ">: " + st.LastError + r.stats.LastError += "\n<" + ar.originpath + ">: " + st.LastError } } - return mr.stats -} - -func (mr *Reader) Close() (err error) { - atomic.StoreInt32(&mr.status, reader.StatusStopped) - // 停10ms为了管道中的数据传递完毕,确认reader run函数已经结束不会再读取,保证syncMeta的正确性 - time.Sleep(10 * time.Millisecond) - mr.SyncMeta() - ars := mr.getActiveReaders() - var wg sync.WaitGroup - for _, ar := range ars { - wg.Add(1) - go func(mar *ActiveReader) { - defer wg.Done() - xerr := mar.Close() - if xerr != nil { - log.Errorf("Runner[%v] Close ActiveReader %v error %v", mr.meta.RunnerName, mar.originpath, xerr) - } - }(ar) - } - wg.Wait() - //在所有 active readers都关闭后再close msgChan - close(mr.msgChan) - close(mr.errChan) - return + return r.stats } -/* - Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 - 处理StatIntervel以及Expire两大循环任务 -*/ -func (mr *Reader) Start() { - mr.startmux.Lock() - defer mr.startmux.Unlock() - if mr.started { - return - } - go mr.run() - mr.started = true - log.Infof("%v MultiReader stat file deamon started", mr.Name()) -} +func (r *Reader) Lag() (*LagInfo, error) { + lagInfo := &LagInfo{SizeUnit: "bytes"} + var errStr string + ars := r.getActiveReaders() -func (mr *Reader) run() { - for { - if atomic.LoadInt32(&mr.status) == reader.StatusStopped { - log.Warnf("%v stopped from running", mr.Name()) - return + for _, ar := range ars { + lg, subErr := ar.Lag() + if subErr != nil { + errStr += subErr.Error() + log.Warn(subErr) + continue } - mr.Expire() - mr.StatLogPath() - time.Sleep(mr.statInterval) + lagInfo.Size += lg.Size } -} -func (mr *Reader) ReadLine() (data string, err error) { - if !mr.started { - mr.Start() - } - timer := time.NewTimer(time.Second) - select { - case result := <-mr.msgChan: - mr.curFile = result.logpath - data = result.result - case err = <-mr.errChan: - case <-timer.C: + var err error + if len(errStr) > 0 { + err = errors.New(errStr) } - timer.Stop() - return + return lagInfo, err } -//SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。 -func (mr *Reader) SyncMeta() { - ars := mr.getActiveReaders() +// SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复 +func (r *Reader) SyncMeta() { + ars := r.getActiveReaders() for _, ar := range ars { readcache := ar.SyncMeta() if readcache == "" { continue } - mr.armapmux.Lock() - mr.cacheMap[ar.realpath] = readcache - mr.armapmux.Unlock() + r.armapmux.Lock() + r.cacheMap[ar.realpath] = readcache + r.armapmux.Unlock() } - mr.armapmux.Lock() - buf, err := jsoniter.Marshal(mr.cacheMap) - mr.armapmux.Unlock() + r.armapmux.Lock() + buf, err := jsoniter.Marshal(r.cacheMap) + r.armapmux.Unlock() if err != nil { - log.Errorf("%v sync meta error %v, cacheMap %v", mr.Name(), err, mr.cacheMap) + log.Errorf("%v sync meta error %v, cacheMap %v", r.Name(), err, r.cacheMap) return } - err = mr.meta.WriteBuf(buf, 0, 0, len(buf)) + err = r.meta.WriteBuf(buf, 0, 0, len(buf)) if err != nil { - log.Errorf("%v sync meta WriteBuf error %v, buf %v", mr.Name(), err, string(buf)) + log.Errorf("%v sync meta WriteBuf error %v, buf %v", r.Name(), err, string(buf)) return } - return } -func (mr *Reader) Lag() (rl *LagInfo, err error) { - rl = &LagInfo{SizeUnit: "bytes"} - var errStr string - ars := mr.getActiveReaders() +func (r *Reader) Close() error { + if !atomic.CompareAndSwapInt32(&r.status, reader.StatusRunning, reader.StatusStopping) { + log.Warnf("Runner[%v] reader %q is not running, close operation ignored", r.meta.RunnerName, r.Name()) + return nil + } + log.Debugf("Runner[%v] %q daemon is stopping", r.meta.RunnerName, r.Name()) + close(r.stopChan) + // 停10ms为了管道中的数据传递完毕,确认reader run函数已经结束不会再读取,保证syncMeta的正确性 + time.Sleep(10 * time.Millisecond) + r.SyncMeta() + ars := r.getActiveReaders() + var wg sync.WaitGroup for _, ar := range ars { - lg, subErr := ar.Lag() - if subErr != nil { - errStr += subErr.Error() - log.Warn(subErr) - continue - } - rl.Size += lg.Size - } - if len(errStr) > 0 { - err = errors.New(errStr) + wg.Add(1) + go func(mar *ActiveReader) { + defer wg.Done() + xerr := mar.Close() + if xerr != nil { + log.Errorf("Runner[%v] Close ActiveReader %v error %v", r.meta.RunnerName, mar.originpath, xerr) + } + }(ar) } + wg.Wait() - return rl, err + // 在所有 active readers 关闭完成后再关闭管道 + close(r.msgChan) + close(r.errChan) + return nil } -func (mr *Reader) Reset() (err error) { +func (r *Reader) Reset() error { errMsg := make([]string, 0) - if err = mr.meta.Reset(); err != nil { + if err := r.meta.Reset(); err != nil { errMsg = append(errMsg, err.Error()) } - ars := mr.getActiveReaders() + ars := r.getActiveReaders() for _, ar := range ars { if ar.br != nil { if subErr := ar.br.Meta.Reset(); subErr != nil { @@ -612,7 +628,7 @@ func (mr *Reader) Reset() (err error) { } } if len(errMsg) != 0 { - err = errors.New(strings.Join(errMsg, "\n")) + return errors.New(strings.Join(errMsg, "\n")) } - return + return nil } diff --git a/reader/tailx/tailx_test.go b/reader/tailx/tailx_test.go index 8d92bdad3..a7df3c946 100644 --- a/reader/tailx/tailx_test.go +++ b/reader/tailx/tailx_test.go @@ -121,7 +121,7 @@ func multiReaderOneLineTest(t *testing.T) { assert.NoError(t, err) mmr, err := NewReader(meta, c) mr := mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) t.Log("mr started") go func() { time.Sleep(15 * time.Second) @@ -216,7 +216,7 @@ func multiReaderMultiLineTest(t *testing.T) { mmr, err := NewReader(meta, c) mmr.SetMode(reader.ReadModeHeadPatternString, "^abc*") mr := mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) t.Log("mr started") go func() { time.Sleep(15 * time.Second) @@ -317,7 +317,7 @@ func multiReaderSyncMetaOneLineTest(t *testing.T) { assert.NoError(t, err) mmr, err := NewReader(meta, c) mr := mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) t.Log("mr started") go func() { time.Sleep(15 * time.Second) @@ -352,7 +352,7 @@ func multiReaderSyncMetaOneLineTest(t *testing.T) { time.Sleep(500 * time.Millisecond) mmr, err = NewReader(meta, c) mr = mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) time.Sleep(500 * time.Millisecond) for { data, err := mr.ReadLine() @@ -447,7 +447,7 @@ func multiReaderSyncMetaMutilineTest(t *testing.T) { mmr, err := NewReader(meta, c) mmr.SetMode(reader.ReadModeHeadPatternString, "^abc*") mr := mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) t.Log("mr started") go func() { time.Sleep(15 * time.Second) @@ -478,7 +478,7 @@ func multiReaderSyncMetaMutilineTest(t *testing.T) { mmr, err = NewReader(meta, c) mmr.SetMode(reader.ReadModeHeadPatternString, "^abc*") mr = mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) time.Sleep(100 * time.Millisecond) for { data, err := mr.ReadLine() @@ -567,7 +567,7 @@ func TestMultiReaderReset(t *testing.T) { mmr, err := NewReader(meta, c) assert.NoError(t, err) mr := mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) t.Log("mr started") maxNum := 0 @@ -601,7 +601,7 @@ func TestMultiReaderReset(t *testing.T) { assert.NoError(t, err) mmr, err = NewReader(meta, c) mr = mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) time.Sleep(100 * time.Millisecond) resultMap = make(map[string]int) maxNum = 0 @@ -660,7 +660,7 @@ func TestReaderErrBegin(t *testing.T) { mmr, err := NewReader(meta, c) assert.NoError(t, err) mr := mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) maxNum := 0 for { _, err = mr.ReadLine() @@ -725,7 +725,7 @@ func TestReaderErrMiddle(t *testing.T) { mmr, err := NewReader(meta, c) assert.NoError(t, err) mr := mmr.(*Reader) - mr.Start() + assert.NoError(t, mr.Start()) maxNum := 0 for { _, err = mr.ReadLine()