diff --git a/reader/mongo/mongo.go b/reader/mongo/mongo.go index 806e57c9c..ef6e670ea 100644 --- a/reader/mongo/mongo.go +++ b/reader/mongo/mongo.go @@ -2,6 +2,7 @@ package mongo import ( "errors" + "fmt" "strings" "sync" "sync/atomic" @@ -20,18 +21,37 @@ import ( . "github.com/qiniu/logkit/utils/models" ) -// CollectionFilter is just a typed map of strings of map[string]interface{} -type CollectionFilter map[string]interface{} - -const ( - MongoDefaultOffsetKey = "_id" +var ( + _ reader.DaemonReader = &Reader{} + _ reader.StatsReader = &Reader{} + _ reader.Reader = &Reader{} ) func init() { reader.RegisterConstructor(reader.ModeMongo, NewReader) } +// CollectionFilter is just a typed map of strings of map[string]interface{} +type CollectionFilter map[string]interface{} + +const ( + DefaultOffsetKey = "_id" +) + type Reader struct { + meta *reader.Meta + // Note: 原子操作,用于表示 reader 整体的运行状态 + status int32 + // Note: 原子操作,用于表示获取数据的线程运行状态,只可能是 StatusInit 和 StatusRunning + routineStatus int32 + + stopChan chan struct{} + readChan chan []byte //bson + errChan chan error + + stats StatsInfo + statsLock sync.RWMutex + host string database string collection string @@ -39,21 +59,13 @@ type Reader struct { readBatch int // 每次读取的数据量 collectionFilters map[string]CollectionFilter - Cron *cron.Cron //定时任务 - loop bool + isLoop bool loopDuration time.Duration - readChan chan []byte //bson - errChan chan error - meta *reader.Meta // 记录offset的元数据 + execOnStart bool + Cron *cron.Cron //定时任务 session *mgo.Session offset interface{} //对于默认的offset_key: "_id", 是objectID作为offset,存储的表现形式是string,其他则是int64 - execOnStart bool - status int32 - started bool - mux sync.Mutex - stats StatsInfo - statsLock sync.RWMutex } func NewReader(meta *reader.Meta, conf conf.MapConf) (mr reader.Reader, err error) { @@ -67,7 +79,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (mr reader.Reader, err erro return nil, err } host, _ := conf.GetStringOr(reader.KeyMongoHost, "localhost:9200") - offsetkey, _ := conf.GetStringOr(reader.KeyMongoOffsetKey, MongoDefaultOffsetKey) + offsetkey, _ := conf.GetStringOr(reader.KeyMongoOffsetKey, DefaultOffsetKey) cronSched, _ := conf.GetStringOr(reader.KeyMongoCron, "") execOnStart, _ := conf.GetBoolOr(reader.KeyMongoExecOnstart, true) filters, _ := conf.GetStringOr(reader.KeyMongoFilters, "") @@ -85,24 +97,22 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (mr reader.Reader, err erro //TODO mongo鉴权暂时不支持 } mmr := &Reader{ - meta: meta, - host: host, - database: database, - collection: collection, - offsetkey: offsetkey, - readBatch: readBatch, //这个参数目前没有用 - - collectionFilters: map[string]CollectionFilter{}, - Cron: cron.New(), + meta: meta, status: reader.StatusInit, + routineStatus: reader.StatusInit, + stopChan: make(chan struct{}), readChan: make(chan []byte), errChan: make(chan error), + host: host, + database: database, + collection: collection, + offsetkey: offsetkey, + readBatch: readBatch, //这个参数目前没有用 + collectionFilters: map[string]CollectionFilter{}, execOnStart: execOnStart, - started: false, - mux: sync.Mutex{}, - statsLock: sync.RWMutex{}, + Cron: cron.New(), } - if offsetkey == MongoDefaultOffsetKey { + if offsetkey == DefaultOffsetKey { if bson.IsObjectIdHex(keyOrObj) { mmr.offset = bson.ObjectIdHex(keyOrObj) } else { @@ -121,7 +131,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (mr reader.Reader, err erro if len(cronSched) > 0 { cronSched = strings.ToLower(cronSched) if strings.HasPrefix(cronSched, reader.Loop) { - mmr.loop = true + mmr.isLoop = true mmr.loopDuration, err = reader.ParseLoopDuration(cronSched) if err != nil { log.Errorf("Runner[%v] %v %v", mmr.meta.RunnerName, mmr.Name(), err) @@ -139,192 +149,233 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (mr reader.Reader, err erro return mr, nil } -func (mr *Reader) Name() string { - return "MongoReader:" + mr.Source() +func (r *Reader) isStopping() bool { + return atomic.LoadInt32(&r.status) == reader.StatusStopping } -func (mr *Reader) Source() string { - return mr.host + "_" + mr.database + "_" + mr.collection +func (r *Reader) hasStopped() bool { + return atomic.LoadInt32(&r.status) == reader.StatusStopped } -func (mr *Reader) Status() StatsInfo { - mr.statsLock.RLock() - defer mr.statsLock.RUnlock() - return mr.stats +func (r *Reader) Name() string { + return "MongoReader<" + r.Source() + ">" } -func (mr *Reader) setStatsError(err string) { - mr.statsLock.Lock() - defer mr.statsLock.Unlock() - mr.stats.LastError = err +func (_ *Reader) SetMode(_ string, _ interface{}) error { + return errors.New("MongoDB Reader does not support read mode") } -func (mr *Reader) Close() (err error) { - mr.Cron.Stop() - if mr.session != nil { - mr.session.Close() - } - if atomic.CompareAndSwapInt32(&mr.status, reader.StatusRunning, reader.StatusStopping) { - log.Infof("Runner[%v] %v stopping", mr.meta.RunnerName, mr.Name()) - } else { - atomic.CompareAndSwapInt32(&mr.status, reader.StatusInit, reader.StatusStopped) - close(mr.readChan) - close(mr.errChan) - } - return +func (r *Reader) setStatsError(err string) { + r.statsLock.Lock() + defer r.statsLock.Unlock() + r.stats.LastError = err } -//Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 -func (mr *Reader) Start() { - mr.mux.Lock() - defer mr.mux.Unlock() - if mr.started { +func (r *Reader) sendError(err error) { + if err == nil { return } - if mr.loop { - go mr.LoopRun() - } else { - if mr.execOnStart { - go mr.run() + defer func() { + if rec := recover(); rec != nil { + log.Errorf("Reader %q was panicked and recovered from %v", r.Name(), rec) } - mr.Cron.Start() - } - mr.started = true - log.Infof("Runner[%v] %v pull data daemon started", mr.meta.RunnerName, mr.Name()) + }() + r.errChan <- err } -func (mr *Reader) LoopRun() { - for { - if atomic.LoadInt32(&mr.status) == reader.StatusStopping { - log.Warnf("Runner[%v] %v stopped from running", mr.meta.RunnerName, mr.Name()) - return +func (r *Reader) Start() error { + if r.isStopping() || r.hasStopped() { + return errors.New("reader is stopping or has stopped") + } else if !atomic.CompareAndSwapInt32(&r.status, reader.StatusInit, reader.StatusRunning) { + log.Warnf("Runner[%v] %q daemon has already started and is running", r.meta.RunnerName, r.Name()) + return nil + } + + if r.isLoop { + go func() { + ticker := time.NewTicker(r.loopDuration) + defer ticker.Stop() + for { + r.run() + + select { + case <-r.stopChan: + atomic.StoreInt32(&r.status, reader.StatusStopped) + log.Infof("Runner[%v] %q daemon has stopped from running", r.meta.RunnerName, r.Name()) + return + case <-ticker.C: + } + } + }() + + } else { + if r.execOnStart { + go r.run() } - mr.run() - time.Sleep(mr.loopDuration) + r.Cron.Start() } + log.Infof("Runner[%v] %q daemon has started", r.meta.RunnerName, r.Name()) + return nil } -func (mr *Reader) ReadLine() (data string, err error) { - if !mr.started { - mr.Start() - } +func (r *Reader) Source() string { + return r.host + "_" + r.database + "_" + r.collection +} + +func (r *Reader) ReadLine() (string, error) { timer := time.NewTimer(time.Second) + defer timer.Stop() select { - case dat := <-mr.readChan: - data = string(dat) - case err = <-mr.errChan: + case data := <-r.readChan: + return string(data), nil + case err := <-r.errChan: + return "", err case <-timer.C: } - timer.Stop() - return + + return "", nil } -func (mr *Reader) run() { - var err error - // 防止并发run - for { - if atomic.LoadInt32(&mr.status) == reader.StatusStopped || atomic.LoadInt32(&mr.status) == reader.StatusStopping { - return +func (r *Reader) Status() StatsInfo { + r.statsLock.RLock() + defer r.statsLock.RUnlock() + return r.stats +} + +// SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复 +func (r *Reader) SyncMeta() { + var key string + var offset int64 + if r.offsetkey == DefaultOffsetKey { + if id, ok := r.offset.(bson.ObjectId); ok { + key = id.Hex() } - if atomic.CompareAndSwapInt32(&mr.status, reader.StatusInit, reader.StatusRunning) { - break + } else { + key = r.offsetkey + if ofs, ok := r.offset.(int64); ok { + offset = ofs + } else if ofs, ok := r.offset.(int); ok { + offset = int64(ofs) } - //节省CPU - time.Sleep(time.Microsecond) } - //double check - if atomic.LoadInt32(&mr.status) == reader.StatusStopped || atomic.LoadInt32(&mr.status) == reader.StatusStopping { + if err := r.meta.WriteOffset(key, offset); err != nil { + log.Errorf("Runner[%v] %v SyncMeta error %v", r.meta.RunnerName, r.Name(), err) + } +} + +func (r *Reader) Close() (err error) { + if !atomic.CompareAndSwapInt32(&r.status, reader.StatusRunning, reader.StatusStopping) { + log.Warnf("Runner[%v] reader %q is not running, close operation ignored", r.meta.RunnerName, r.Name()) + return nil + } + log.Debugf("Runner[%v] %q daemon is stopping", r.meta.RunnerName, r.Name()) + close(r.stopChan) + + r.Cron.Stop() + if r.session != nil { + r.session.Close() + } + + // 如果此时没有 routine 正在运行,则在此处关闭数据管道,否则由 routine 在退出时负责关闭 + if atomic.LoadInt32(&r.routineStatus) != reader.StatusRunning { + close(r.readChan) + close(r.errChan) + } + return +} + +func (r *Reader) run() { + // 当上个任务还未执行完成的时候直接跳过 + if !atomic.CompareAndSwapInt32(&r.routineStatus, reader.StatusInit, reader.StatusRunning) { + errMsg := fmt.Sprintf("Runner[%v] %q daemon is still working on last task, this task will not be executed and is skipped this time", r.meta.RunnerName, r.Name()) + log.Error(errMsg) + if !r.isLoop { + // 通知上层 Cron 执行间隔可能过短或任务执行时间过长 + r.sendError(errors.New(errMsg)) + } return } - // running时退出 状态改为Init,以便 cron 调度下次运行 - // stopping时推出改为 stopped,不再运行 defer func() { - atomic.CompareAndSwapInt32(&mr.status, reader.StatusRunning, reader.StatusInit) - if atomic.CompareAndSwapInt32(&mr.status, reader.StatusStopping, reader.StatusStopped) { - close(mr.readChan) - close(mr.errChan) - } - if err == nil { - log.Infof("Runner[%v] %v successfully finished", mr.meta.RunnerName, mr.Name()) + // 如果 reader 在 routine 运行时关闭,则需要此 routine 负责关闭数据管道 + if r.isStopping() || r.hasStopped() { + close(r.readChan) + close(r.errChan) } + atomic.StoreInt32(&r.routineStatus, reader.StatusInit) }() - // 开始work逻辑 - for { - if atomic.LoadInt32(&mr.status) == reader.StatusStopping { - log.Warnf("Runner[%v] %v stopped from running", mr.meta.RunnerName, mr.Name()) + // 如果执行失败,最多重试 10 次 + for i := 1; i <= 10; i++ { + // 判断上层是否已经关闭,先判断 routineStatus 再判断 status 可以保证同时只有一个 r.run 会运行到此处 + if r.isStopping() || r.hasStopped() { + log.Warnf("Runner[%v] %q daemon has stopped, task is interrupted", r.meta.RunnerName, r.Name()) return } - err = mr.exec() + + err := r.exec() if err == nil { - log.Infof("Runner[%v] %v successfully exec", mr.meta.RunnerName, mr.Name()) + log.Infof("Runner[%v] %q task has been successfully executed", r.meta.RunnerName, r.Name()) return } - log.Error(err) - mr.setStatsError(err.Error()) - mr.sendError(err) - time.Sleep(3 * time.Second) - } -} -func (s *Reader) sendError(err error) { - if err == nil { - return - } - defer func() { - if rec := recover(); rec != nil { - log.Errorf("Reader %s panic, recovered from %v", s.Name(), rec) + log.Errorf("Runner[%v] %q task execution failed: %v ", r.meta.RunnerName, r.Name(), err) + r.setStatsError(err.Error()) + r.sendError(err) + + if r.isLoop { + return // 循环执行的任务上层逻辑已经等同重试 } - }() - s.errChan <- err + time.Sleep(3 * time.Second) + } + log.Warnf("Runner[%v] %q task execution failed and gave up after 10 tries", r.meta.RunnerName, r.Name()) } -func (mr *Reader) catQuery(c string, lastID interface{}, mgoSession *mgo.Session) *mgo.Query { +func (r *Reader) catQuery(c string, lastID interface{}, mgoSession *mgo.Session) *mgo.Query { query := bson.M{} - if f, ok := mr.collectionFilters[c]; ok { + if f, ok := r.collectionFilters[c]; ok { query = bson.M(f) } if lastID != nil { - query[mr.offsetkey] = bson.M{"$gt": lastID} + query[r.offsetkey] = bson.M{"$gt": lastID} } - return mgoSession.DB(mr.database).C(c).Find(query).Sort(mr.offsetkey) + return mgoSession.DB(r.database).C(c).Find(query).Sort(r.offsetkey) } -func (mr *Reader) exec() (err error) { - if mr.session == nil { - mr.session, err = utils.MongoDail(mr.host, "", 0) +func (r *Reader) exec() (err error) { + if r.session == nil { + r.session, err = utils.MongoDail(r.host, "", 0) if err != nil { return } - mr.session.SetSocketTimeout(time.Second * 5) - mr.session.SetSyncTimeout(time.Second * 5) + r.session.SetSocketTimeout(time.Second * 5) + r.session.SetSyncTimeout(time.Second * 5) } else { - err := mr.session.Ping() + err := r.session.Ping() if err != nil { - mr.session.Refresh() - mr.session.SetSocketTimeout(time.Second * 5) - mr.session.SetSyncTimeout(time.Second * 5) + r.session.Refresh() + r.session.SetSocketTimeout(time.Second * 5) + r.session.SetSyncTimeout(time.Second * 5) } else { time.Sleep(time.Second * 5) } } - iter := mr.catQuery(mr.collection, mr.offset, mr.session).Iter() + iter := r.catQuery(r.collection, r.offset, r.session).Iter() var result bson.M for iter.Next(&result) { - if atomic.LoadInt32(&mr.status) == reader.StatusStopping { - log.Warnf("Runner[%v] %v stopped from running", mr.meta.RunnerName, mr.Name()) + if r.isStopping() || r.hasStopped() { + log.Warnf("Runner[%v] %q daemon has stopped, iteration is interrupted", r.meta.RunnerName, r.Name()) return nil } - if id, ok := result[mr.offsetkey]; ok { - mr.offset = id + if id, ok := result[r.offsetkey]; ok { + r.offset = id } bytes, ierr := jsoniter.Marshal(result) if ierr != nil { - log.Errorf("Runner[%v] %v json marshal inner error %v", mr.meta.RunnerName, result, ierr) + log.Errorf("Runner[%v] %v json marshal inner error %v", r.meta.RunnerName, result, ierr) } - mr.readChan <- bytes + r.readChan <- bytes result = bson.M{} } if err := iter.Err(); err != nil { @@ -332,29 +383,3 @@ func (mr *Reader) exec() (err error) { } return nil } - -//SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。 -func (mr *Reader) SyncMeta() { - var key string - var offset int64 - if mr.offsetkey == MongoDefaultOffsetKey { - if id, ok := mr.offset.(bson.ObjectId); ok { - key = id.Hex() - } - } else { - key = mr.offsetkey - if ofs, ok := mr.offset.(int64); ok { - offset = ofs - } else if ofs, ok := mr.offset.(int); ok { - offset = int64(ofs) - } - } - if err := mr.meta.WriteOffset(key, offset); err != nil { - log.Errorf("Runner[%v] %v SyncMeta error %v", mr.meta.RunnerName, mr.Name(), err) - } - return -} - -func (mr *Reader) SetMode(mode string, v interface{}) error { - return errors.New("MongoDB Reader not support read mode") -} diff --git a/reader/mongo/mongo_test.go b/reader/mongo/mongo_test.go index 46c9f381b..2ebac9a97 100644 --- a/reader/mongo/mongo_test.go +++ b/reader/mongo/mongo_test.go @@ -28,14 +28,14 @@ func TestMongoReader(t *testing.T) { host: "127.0.0.1:12701", database: "testdb", collection: "coll", - offsetkey: MongoDefaultOffsetKey, + offsetkey: DefaultOffsetKey, offset: obj, collectionFilters: map[string]CollectionFilter{}, status: reader.StatusInit, readChan: make(chan []byte), } - assert.EqualValues(t, "MongoReader:127.0.0.1:12701_testdb_coll", er.Name()) + assert.EqualValues(t, "MongoReader<127.0.0.1:12701_testdb_coll>", er.Name()) er.SyncMeta() got, gotoffset, err := er.meta.ReadOffset() assert.NoError(t, err)