diff --git a/reader/snmp/snmp.go b/reader/snmp/snmp.go index 972a1580c..32fad0d40 100644 --- a/reader/snmp/snmp.go +++ b/reader/snmp/snmp.go @@ -3,6 +3,7 @@ package snmp import ( "bufio" "bytes" + "errors" "fmt" "math" "net" @@ -20,14 +21,36 @@ import ( "github.com/qiniu/logkit/conf" "github.com/qiniu/logkit/reader" + . "github.com/qiniu/logkit/utils/models" +) + +var ( + _ reader.DaemonReader = &Reader{} + _ reader.DataReader = &Reader{} + _ reader.Reader = &Reader{} ) func init() { reader.RegisterConstructor(reader.ModeSnmp, NewReader) } +type readInfo struct { + data Data + bytes int64 +} + // Reader holds the configuration for the plugin. type Reader struct { + meta *reader.Meta + // Note: 原子操作,用于表示 reader 整体的运行状态 + status int32 + // Note: 原子操作,用于表示获取数据的线程运行状态,只可能是 StatusInit 和 StatusRunning + routineStatus int32 + + stopChan chan struct{} + readChan chan readInfo + errChan chan error + SnmpName string Agents []string // agent address [ip:port] Timeout time.Duration // 等待回复的时间 @@ -51,12 +74,6 @@ type Reader struct { Tables []Table Fields []Field ConnectionCache []snmpConnection - - Meta *reader.Meta - Status int32 - StopChan chan struct{} - DataChan chan interface{} - errChan chan error } var execCommand = exec.Command @@ -75,18 +92,19 @@ func execCmd(arg0 string, args ...string) ([]byte, error) { return out, nil } -func NewReader(meta *reader.Meta, c conf.MapConf) (s reader.Reader, err error) { - var timeOut, interval time.Duration - name, _ := c.GetStringOr(reader.KeySnmpReaderName, "logki_default_snmp_name") +func NewReader(meta *reader.Meta, c conf.MapConf) (reader.Reader, error) { + name, _ := c.GetStringOr(reader.KeySnmpReaderName, "logkit_default_snmp_name") agents, _ := c.GetStringListOr(reader.KeySnmpReaderAgents, []string{"127.0.0.1:161"}) tableHost, _ := c.GetStringOr(reader.KeySnmpTableInitHost, "127.0.0.1") timeStr, _ := c.GetStringOr(reader.KeySnmpReaderTimeOut, "5s") - if timeOut, err = time.ParseDuration(timeStr); err != nil { - return + timeOut, err := time.ParseDuration(timeStr) + if err != nil { + return nil, err } intervalStr, _ := c.GetStringOr(reader.KeySnmpReaderInterval, "30s") - if interval, err = time.ParseDuration(intervalStr); err != nil { - return + interval, err := time.ParseDuration(intervalStr) + if err != nil { + return nil, err } retries, _ := c.GetIntOr(reader.KeySnmpReaderRetries, 3) version, _ := c.GetIntOr(reader.KeySnmpReaderVersion, 2) @@ -117,55 +135,153 @@ func NewReader(meta *reader.Meta, c conf.MapConf) (s reader.Reader, err error) { var fields []Field if err = jsoniter.Unmarshal([]byte(tableConf), &tables); err != nil { - return + return nil, err } if err = jsoniter.Unmarshal([]byte(fieldConf), &fields); err != nil { - return + return nil, err } for i := range tables { if subErr := tables[i].init(tableHost); subErr != nil { - err = Errorf(subErr, "initializing table %s", tables[i].Name) - return + return nil, Errorf(subErr, "initializing table %s", tables[i].Name) } } for i := range fields { if subErr := fields[i].init(); subErr != nil { - err = Errorf(subErr, "initializing field %s", fields[i].Name) - return + return nil, Errorf(subErr, "initializing field %s", fields[i].Name) } } return &Reader{ - Meta: meta, - SnmpName: name, - Agents: agents, - Timeout: timeOut, - Interval: interval, - Retries: retries, - Version: uint8(version), - Community: community, - MaxRepetitions: uint8(maxRepetitions), - ContextName: contextName, - SecLevel: secLevel, - SecName: secName, - AuthProtocol: authProtocol, - AuthPassword: authPassword, - PrivProtocol: privProtocol, - PrivPassword: privPassword, - EngineID: engineID, - EngineBoots: uint32(engineBoots), - EngineTime: uint32(engineTime), - Tables: tables, - Fields: fields, - - Status: reader.StatusInit, - StopChan: make(chan struct{}), - DataChan: make(chan interface{}, 1000), + meta: meta, + status: reader.StatusInit, + routineStatus: reader.StatusInit, + stopChan: make(chan struct{}), + readChan: make(chan readInfo, 1000), errChan: make(chan error), + SnmpName: name, + Agents: agents, + Timeout: timeOut, + Interval: interval, + Retries: retries, + Version: uint8(version), + Community: community, + MaxRepetitions: uint8(maxRepetitions), + ContextName: contextName, + SecLevel: secLevel, + SecName: secName, + AuthProtocol: authProtocol, + AuthPassword: authPassword, + PrivProtocol: privProtocol, + PrivPassword: privPassword, + EngineID: engineID, + EngineBoots: uint32(engineBoots), + EngineTime: uint32(engineTime), + Tables: tables, + Fields: fields, ConnectionCache: make([]snmpConnection, len(agents)), }, nil } +func (r *Reader) isStopping() bool { + return atomic.LoadInt32(&r.status) == reader.StatusStopping +} + +func (r *Reader) hasStopped() bool { + return atomic.LoadInt32(&r.status) == reader.StatusStopped +} + +func (r *Reader) Name() string { + return r.SnmpName +} + +func (r *Reader) SetMode(mode string, v interface{}) error { + return nil +} + +func (r *Reader) sendError(err error) { + if err == nil { + return + } + defer func() { + if rec := recover(); rec != nil { + log.Errorf("Reader %q was panicked and recovered from %v", r.Name(), rec) + } + }() + r.errChan <- err +} + +func (r *Reader) Start() error { + if r.isStopping() || r.hasStopped() { + return errors.New("reader is stopping or has stopped") + } else if !atomic.CompareAndSwapInt32(&r.status, reader.StatusInit, reader.StatusRunning) { + log.Warnf("Runner[%v] %q daemon has already started and is running", r.meta.RunnerName, r.Name()) + return nil + } + + go func() { + ticker := time.NewTicker(r.Interval) + defer ticker.Stop() + for { + err := r.Gather() + if err != nil { + log.Errorf("Runner[%v] %q gather failed: %v ", r.meta.RunnerName, r.Name(), err) + log.Error(err) + r.sendError(err) + } + + select { + case <-r.stopChan: + atomic.StoreInt32(&r.status, reader.StatusStopped) + log.Infof("Runner[%v] %q daemon has stopped from running", r.meta.RunnerName, r.Name()) + return + case <-ticker.C: + } + } + }() + log.Infof("Runner[%v] %q daemon has started", r.meta.RunnerName, r.Name()) + return nil +} + +func (r *Reader) Source() string { + return r.SnmpName +} + +func (r *Reader) ReadLine() (string, error) { + return "", errors.New("method ReadLine is not supported, please use ReadData") +} + +func (r *Reader) ReadData() (Data, int64, error) { + timer := time.NewTimer(time.Second) + defer timer.Stop() + select { + case info := <-r.readChan: + return info.data, info.bytes, nil + case err := <-r.errChan: + return nil, 0, err + case <-timer.C: + } + + return nil, 0, nil +} + +func (r *Reader) SyncMeta() {} + +func (r *Reader) Close() error { + if !atomic.CompareAndSwapInt32(&r.status, reader.StatusRunning, reader.StatusStopping) { + log.Warnf("Runner[%v] reader %q is not running, close operation ignored", r.meta.RunnerName, r.Name()) + return nil + } + log.Debugf("Runner[%v] %q daemon is stopping", r.meta.RunnerName, r.Name()) + close(r.stopChan) + + // 如果此时没有 routine 正在运行,则在此处关闭数据管道,否则由 routine 在退出时负责关闭 + if atomic.LoadInt32(&r.routineStatus) != reader.StatusRunning { + close(r.readChan) + close(r.errChan) + } + return nil +} + type Table struct { Name string `json:"table_name"` InheritTags []string `json:"table_inherit_tags"` @@ -256,56 +372,72 @@ func Errorf(err error, msg string, format ...interface{}) error { } } -func (s *Reader) StoreData(data []map[string]interface{}) (err error) { - if data == nil || len(data) == 0 { +func (r *Reader) StoreData(datas []Data) (err error) { + if datas == nil || len(datas) == 0 { return } - for _, d := range data { - if d == nil || len(d) == 0 { + for _, data := range datas { + if data == nil || len(data) == 0 { continue } select { - case <-s.StopChan: + case <-r.stopChan: return - case s.DataChan <- d: + case r.readChan <- readInfo{data, int64(len(fmt.Sprintf("%s", data)))}: } } return } -func (s *Reader) Gather() (err error) { +func (r *Reader) Gather() error { + // 当上个任务还未执行完成的时候直接跳过 + if !atomic.CompareAndSwapInt32(&r.routineStatus, reader.StatusInit, reader.StatusRunning) { + log.Errorf("Runner[%v] %q daemon is still working on last task, this task will not be executed and is skipped this time", r.meta.RunnerName, r.Name()) + return nil + } + defer func() { + // 如果 reader 在 routine 运行时关闭,则需要此 routine 负责关闭数据管道 + if r.isStopping() || r.hasStopped() { + close(r.readChan) + close(r.errChan) + } + atomic.StoreInt32(&r.routineStatus, reader.StatusInit) + }() + var wg sync.WaitGroup - data := make([]map[string]interface{}, 0) - errChan := make(chan error, len(s.Agents)) - for i, agent := range s.Agents { + datas := make([]Data, 0) + errChan := make(chan error, len(r.Agents)) + for i, agent := range r.Agents { wg.Add(1) go func(i int, agent string) { + log.Debugf("Runner[%v] %q is reading from agent: %s", r.meta.RunnerName, r.Name(), agent) defer wg.Done() - gs, subErr := s.getConnection(i) - if subErr != nil { - errChan <- subErr + + conn, err := r.getConnection(i) + if err != nil { + errChan <- err return } t := Table{ - Name: s.SnmpName, - Fields: s.Fields, + Name: r.SnmpName, + Fields: r.Fields, } topTags := map[string]string{} - if data, subErr = s.gatherTable(gs, t, topTags, false); subErr != nil { - errChan <- subErr + if datas, err = r.gatherTable(conn, t, topTags, false); err != nil { + errChan <- err return } - if err1 := s.StoreData(data); err1 != nil { - errChan <- err1 + if err = r.StoreData(datas); err != nil { + errChan <- err return } - for _, t := range s.Tables { - if data, subErr = s.gatherTable(gs, t, topTags, true); subErr != nil { - errChan <- subErr + for _, t := range r.Tables { + if datas, err = r.gatherTable(conn, t, topTags, true); err != nil { + errChan <- err return } - if err1 := s.StoreData(data); err1 != nil { - errChan <- err1 + if err = r.StoreData(datas); err != nil { + errChan <- err return } } @@ -313,20 +445,20 @@ func (s *Reader) Gather() (err error) { } wg.Wait() close(errChan) - for subErr := range errChan { - log.Errorf("gather error: ", subErr) - err = subErr + var lastErr error + for err := range errChan { + log.Errorf("gather error: ", err) + lastErr = err } - return err + return lastErr } -func (s *Reader) gatherTable(gs snmpConnection, t Table, topTags map[string]string, walk bool) (data []map[string]interface{}, err error) { - var rt *RTable - rt, err = t.Build(gs, walk) +func (r *Reader) gatherTable(gs snmpConnection, t Table, topTags map[string]string, walk bool) ([]Data, error) { + rt, err := t.Build(gs, walk) if err != nil { - return + return nil, err } - data = make([]map[string]interface{}, 0) + datas := make([]Data, 0, len(rt.Rows)) for _, tr := range rt.Rows { if !walk { for k, v := range tr.Tags { @@ -342,99 +474,19 @@ func (s *Reader) gatherTable(gs snmpConnection, t Table, topTags map[string]stri if _, ok := tr.Tags["agent_host"]; !ok { tr.Tags["agent_host"] = gs.Host() } - d := make(map[string]interface{}) + + data := make(Data) for k, v := range tr.Fields { - d[k] = v + data[k] = v } for k, v := range tr.Tags { - d[k] = v - } - d[reader.KeyTimestamp] = rt.Time - d[reader.KeySnmpTableName] = t.Name - data = append(data, d) - } - return - -} - -//Name reader名称 -func (s *Reader) Name() string { - return s.SnmpName -} - -//Source 读取的数据源 -func (s *Reader) Source() string { - return s.SnmpName -} - -func (s *Reader) Start() error { - if !atomic.CompareAndSwapInt32(&s.Status, reader.StatusInit, reader.StatusRunning) { - return fmt.Errorf("runner[%v] Reader[%v] already started", s.Meta.RunnerName, s.Name()) - } - go func() { - ticker := time.NewTicker(s.Interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - err := s.Gather() - if err != nil { - err = fmt.Errorf("runner[%v] Reader[%v] gather error %v", s.Meta.RunnerName, s.Name(), err) - log.Error(err) - s.sendError(err) - } - case <-s.StopChan: - close(s.DataChan) - close(s.errChan) - return - } + data[k] = v } - }() - return nil -} - -func (s *Reader) sendError(err error) { - if err == nil { - return + data[reader.KeyTimestamp] = rt.Time + data[reader.KeySnmpTableName] = t.Name + datas = append(datas, data) } - defer func() { - if rec := recover(); rec != nil { - log.Errorf("Reader %s panic, recovered from %v", s.Name(), rec) - } - }() - s.errChan <- err -} - -func (s *Reader) ReadLine() (line string, err error) { - if atomic.LoadInt32(&s.Status) == reader.StatusInit { - if err = s.Start(); err != nil { - log.Error(err) - return "", err - } - } - select { - case d := <-s.DataChan: - var db []byte - if db, err = jsoniter.Marshal(d); err != nil { - return - } - line = string(db) - case err = <-s.errChan: - default: - } - return -} - -func (s *Reader) SetMode(mode string, v interface{}) error { - return nil -} - -func (s *Reader) Close() error { - close(s.StopChan) - return nil -} - -func (s *Reader) SyncMeta() { + return datas, nil } func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) { @@ -583,15 +635,15 @@ func (gsw gosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) { return nil, err } -func (s *Reader) getConnection(idx int) (snmpConnection, error) { - if gs := s.ConnectionCache[idx]; gs != nil { +func (r *Reader) getConnection(idx int) (snmpConnection, error) { + if gs := r.ConnectionCache[idx]; gs != nil { return gs, nil } - agent := s.Agents[idx] + agent := r.Agents[idx] gs := gosnmpWrapper{&gosnmp.GoSNMP{}} - s.ConnectionCache[idx] = gs + r.ConnectionCache[idx] = gs host, portStr, err := net.SplitHostPort(agent) if err != nil { @@ -609,11 +661,11 @@ func (s *Reader) getConnection(idx int) (snmpConnection, error) { } gs.Port = uint16(port) - gs.Timeout = s.Timeout + gs.Timeout = r.Timeout - gs.Retries = s.Retries + gs.Retries = r.Retries - switch s.Version { + switch r.Version { case 3: gs.Version = gosnmp.Version3 case 2, 0: @@ -624,24 +676,24 @@ func (s *Reader) getConnection(idx int) (snmpConnection, error) { return nil, fmt.Errorf("invalid version") } - if s.Version < 3 { - if s.Community == "" { + if r.Version < 3 { + if r.Community == "" { gs.Community = "public" } else { - gs.Community = s.Community + gs.Community = r.Community } } - gs.MaxRepetitions = s.MaxRepetitions + gs.MaxRepetitions = r.MaxRepetitions - if s.Version == 3 { - gs.ContextName = s.ContextName + if r.Version == 3 { + gs.ContextName = r.ContextName sp := &gosnmp.UsmSecurityParameters{} gs.SecurityParameters = sp gs.SecurityModel = gosnmp.UserSecurityModel - switch strings.ToLower(s.SecLevel) { + switch strings.ToLower(r.SecLevel) { case "noauthnopriv", "": gs.MsgFlags = gosnmp.NoAuthNoPriv case "authnopriv": @@ -652,9 +704,9 @@ func (s *Reader) getConnection(idx int) (snmpConnection, error) { return nil, fmt.Errorf("invalid secLevel") } - sp.UserName = s.SecName + sp.UserName = r.SecName - switch strings.ToLower(s.AuthProtocol) { + switch strings.ToLower(r.AuthProtocol) { case "md5": sp.AuthenticationProtocol = gosnmp.MD5 case "sha": @@ -665,9 +717,9 @@ func (s *Reader) getConnection(idx int) (snmpConnection, error) { return nil, fmt.Errorf("invalid authProtocol") } - sp.AuthenticationPassphrase = s.AuthPassword + sp.AuthenticationPassphrase = r.AuthPassword - switch strings.ToLower(s.PrivProtocol) { + switch strings.ToLower(r.PrivProtocol) { case "des": sp.PrivacyProtocol = gosnmp.DES case "aes": @@ -678,13 +730,13 @@ func (s *Reader) getConnection(idx int) (snmpConnection, error) { return nil, fmt.Errorf("invalid privProtocol") } - sp.PrivacyPassphrase = s.PrivPassword + sp.PrivacyPassphrase = r.PrivPassword - sp.AuthoritativeEngineID = s.EngineID + sp.AuthoritativeEngineID = r.EngineID - sp.AuthoritativeEngineBoots = s.EngineBoots + sp.AuthoritativeEngineBoots = r.EngineBoots - sp.AuthoritativeEngineTime = s.EngineTime + sp.AuthoritativeEngineTime = r.EngineTime } if err := gs.Connect(); err != nil { diff --git a/reader/snmp/snmp_test.go b/reader/snmp/snmp_test.go index fe4af7257..69f8c434b 100644 --- a/reader/snmp/snmp_test.go +++ b/reader/snmp/snmp_test.go @@ -5,17 +5,16 @@ import ( "fmt" "net" "os/exec" - "strings" "sync" "testing" "time" - "github.com/json-iterator/go" "github.com/soniah/gosnmp" "github.com/stretchr/testify/assert" "github.com/qiniu/logkit/conf" "github.com/qiniu/logkit/reader" + . "github.com/qiniu/logkit/utils/models" ) type testSNMPConnection struct { @@ -138,7 +137,7 @@ func TestSnmpInit(t *testing.T) { "snmp_tables": `[{"table_oid": "TEST::testTable"}]`, "snmp_fields": `[{"field_oid": "TEST::hostname"}]`, } - ss, err := NewReader(nil, c) + ss, err := NewReader(&reader.Meta{RunnerName: "TestSnmpInit"}, c) assert.NoError(t, err) s := ss.(*Reader) @@ -194,7 +193,7 @@ func TestSnmpInit_noTranslate(t *testing.T) { } ]`, } - ss, err := NewReader(nil, c) + ss, err := NewReader(&reader.Meta{RunnerName: "TestSnmpInit_noTranslate"}, c) s := ss.(*Reader) assert.NoError(t, err) @@ -231,7 +230,7 @@ func TestGetSNMPConnection_v2(t *testing.T) { "snmp_version": "2", "snmp_community": "foo", } - ss, err := NewReader(nil, c) + ss, err := NewReader(&reader.Meta{RunnerName: "TestGetSNMPConnection_v2"}, c) s := ss.(*Reader) assert.NoError(t, err) @@ -250,7 +249,6 @@ func TestGetSNMPConnection_v2(t *testing.T) { assert.EqualValues(t, 161, gs.Port) } -// func TestGetSNMPConnection_v3(t *testing.T) { c := conf.MapConf{ reader.KeySnmpReaderAgents: "1.2.3.4", @@ -267,7 +265,7 @@ func TestGetSNMPConnection_v3(t *testing.T) { reader.KeySnmpReaderEngineBoots: "1", reader.KeySnmpReaderEngineTime: "2", } - ss, err := NewReader(nil, c) + ss, err := NewReader(&reader.Meta{RunnerName: "TestGetSNMPConnection_v3"}, c) if err != nil { t.Fatalf("exp no error, but got %v", err) } @@ -295,7 +293,7 @@ func TestGetSNMPConnection_caching(t *testing.T) { c := conf.MapConf{ reader.KeySnmpReaderAgents: "1.2.3.4, 1.2.3.5, 1.2.3.5", } - ss, err := NewReader(nil, c) + ss, err := NewReader(&reader.Meta{RunnerName: "TestGetSNMPConnection_caching"}, c) if err != nil { t.Fatalf("exp no error, but got %v", err) } @@ -531,8 +529,9 @@ func TestTableBuild_noWalk(t *testing.T) { assert.Contains(t, tb.Rows, rtr) } -func TestReadline(t *testing.T) { +func TestReadData(t *testing.T) { s := &Reader{ + meta: &reader.Meta{RunnerName: "TestReadData"}, Interval: 10 * time.Second, Agents: []string{"TestGather"}, SnmpName: "mytable", @@ -566,7 +565,7 @@ func TestReadline(t *testing.T) { ConnectionCache: []snmpConnection{ tsc, }, - DataChan: make(chan interface{}, 100), + readChan: make(chan readInfo, 100), } tstart := time.Now().UnixNano() err := s.Gather() @@ -575,43 +574,35 @@ func TestReadline(t *testing.T) { } tstop := time.Now().UnixNano() - data := make([]map[string]interface{}, 0) + datas := make([]Data, 0) for i := 0; i < 10; i++ { select { - case d := <-s.DataChan: - var m map[string]interface{} - if db, subErr := jsoniter.Marshal(d); subErr != nil { - t.Fatalf("exp no error, but got %v", subErr) - } else { - if subErr = jsoniter.Unmarshal(db, &m); subErr != nil { - t.Fatalf("exp no error, but got %v", subErr) - } - data = append(data, m) - } + case info := <-s.readChan: + datas = append(datas, info.data) default: } } - if !assert.Equal(t, len(data), 2) { - t.Fatalf("exp len 2, but got len %v, data is %v", len(data), data) + if !assert.Equal(t, len(datas), 2) { + t.Fatalf("exp len 2, but got len %v, data is %v", len(datas), datas) } - m := data[0] + m := datas[0] assert.Equal(t, "mytable", m[reader.KeySnmpTableName]) assert.Equal(t, "tsc", m["agent_host"]) assert.Equal(t, "baz", m["myfield1"]) - assert.Equal(t, float64(234), m["myfield2"]) + assert.Equal(t, int(234), m["myfield2"]) assert.Equal(t, "baz", m["myfield3"]) timestamp, subErr := time.Parse(time.RFC3339Nano, m[reader.KeyTimestamp].(string)) assert.NoError(t, subErr) assert.True(t, timestamp.UnixNano() > tstart) assert.True(t, timestamp.UnixNano() < tstop) - m2 := data[1] + m2 := datas[1] assert.Equal(t, "myOtherTable", m2[reader.KeySnmpTableName]) assert.Equal(t, "tsc", m2["agent_host"]) assert.Equal(t, "baz", m2["myfield1"]) - assert.Equal(t, float64(123456), m2["myOtherField"]) + assert.Equal(t, int(123456), m2["myOtherField"]) timestamp, subErr = time.Parse(time.RFC3339Nano, m[reader.KeyTimestamp].(string)) assert.NoError(t, subErr) assert.True(t, timestamp.UnixNano() > tstart) @@ -619,7 +610,8 @@ func TestReadline(t *testing.T) { } func TestGather_host(t *testing.T) { - s := &Reader{ + r := &Reader{ + meta: &reader.Meta{RunnerName: "TestGather_host"}, Agents: []string{"TestGather"}, SnmpName: "mytable", Fields: []Field{ @@ -637,35 +629,20 @@ func TestGather_host(t *testing.T) { ConnectionCache: []snmpConnection{ tsc, }, - DataChan: make(chan interface{}, 100), - } - err := s.Gather() - if err != nil { - t.Fatalf("exp no error, but got %v", err) + readChan: make(chan readInfo, 100), } + assert.NoError(t, r.Gather()) - data := make([]map[string]interface{}, 0) + datas := make([]Data, 0) for i := 0; i < 10; i++ { select { - case d := <-s.DataChan: - var m map[string]interface{} - if db, subErr := jsoniter.Marshal(d); subErr != nil { - t.Fatalf("exp no error, but got %v", subErr) - } else { - if subErr = jsoniter.Unmarshal(db, &m); subErr != nil { - t.Fatalf("exp no error, but got %v", subErr) - } - data = append(data, m) - } + case info := <-r.readChan: + datas = append(datas, info.data) default: } } - - if !assert.Equal(t, len(data), 1) { - t.Fatalf("exp len 1, but got len %v, data is %v", len(data), data) - } - m := data[0] - assert.Equal(t, "baz", m["host"]) + assert.Equal(t, len(datas), 1) + assert.Equal(t, "baz", datas[0]["host"]) } func TestFieldConvert(t *testing.T) { @@ -737,7 +714,7 @@ func TestSnmpTranslateCache_miss(t *testing.T) { func TestSnmpTranslateCache_hit(t *testing.T) { snmpTranslateCaches = map[string]snmpTranslateCache{ - "foo": snmpTranslateCache{ + "foo": { mibName: "a", oidNum: "b", oidText: "c", @@ -770,7 +747,7 @@ func TestSnmpTableCache_miss(t *testing.T) { func TestSnmpTableCache_hit(t *testing.T) { snmpTableCaches = map[string]snmpTableCache{ - "foo": snmpTableCache{ + "foo": { mibName: "a", oidNum: "b", oidText: "c", @@ -800,7 +777,8 @@ func TestError(t *testing.T) { } func TestExpandChannel(t *testing.T) { - s := &Reader{ + r := &Reader{ + meta: &reader.Meta{RunnerName: "TestExpandChannel"}, Interval: 10 * time.Second, Agents: []string{"TestGather"}, SnmpName: "mytable1", @@ -844,29 +822,29 @@ func TestExpandChannel(t *testing.T) { ConnectionCache: []snmpConnection{ tsc, }, - DataChan: make(chan interface{}, 1), + readChan: make(chan readInfo, 1), } + assert.NoError(t, r.Start()) - lines := make([]string, 0) - for i := 0; i < 10000; i++ { - line, err := s.ReadLine() + datas := make([]Data, 0) + for i := 0; i < 100; i++ { + data, _, err := r.ReadData() if err != nil { t.Fatalf("exp no error, but got %v", err) } - if line != "" { - lines = append(lines, line) - } else { - time.Sleep(1 * time.Second) + if len(data) > 0 { + datas = append(datas, data) } - if len(lines) == 3 { + if len(datas) == 3 { break } } - assert.Equal(t, 3, len(lines), strings.Join(lines, "\n")) + assert.Equal(t, 3, len(datas)) } func TestInterval(t *testing.T) { - s := &Reader{ + r := &Reader{ + meta: &reader.Meta{RunnerName: "TestInterval"}, Interval: 3 * time.Second, Agents: []string{"TestGather"}, SnmpName: "mytable1", @@ -910,41 +888,38 @@ func TestInterval(t *testing.T) { ConnectionCache: []snmpConnection{ tsc, }, - DataChan: make(chan interface{}, 1), + readChan: make(chan readInfo, 1), } + assert.NoError(t, r.Start()) - lines := make([]string, 0) - for i := 0; i < 10000; i++ { - line, err := s.ReadLine() + datas := make([]Data, 0) + for i := 0; i < 100; i++ { + data, _, err := r.ReadData() if err != nil { t.Fatalf("exp no error, but got %v", err) } - if line != "" { - lines = append(lines, line) - } else { - time.Sleep(1 * time.Second) + if len(data) > 0 { + datas = append(datas, data) } - if len(lines) == 3 { + if len(datas) == 3 { break } } - assert.Equal(t, 3, len(lines), strings.Join(lines, "\n")) + assert.Equal(t, 3, len(datas)) time.Sleep(3 * time.Second) - lines = make([]string, 0) - for i := 0; i < 10000; i++ { - line, err := s.ReadLine() + datas = make([]Data, 0) + for i := 0; i < 100; i++ { + data, _, err := r.ReadData() if err != nil { t.Fatalf("exp no error, but got %v", err) } - if line != "" { - lines = append(lines, line) - } else { - time.Sleep(1 * time.Second) + if len(data) > 0 { + datas = append(datas, data) } - if len(lines) == 3 { + if len(datas) == 3 { break } } - assert.Equal(t, 3, len(lines), strings.Join(lines, "\n")) + assert.Equal(t, 3, len(datas)) }