diff --git a/examples/streaming/README.md b/examples/snap-plugin-collector-rand-streaming/README.md similarity index 97% rename from examples/streaming/README.md rename to examples/snap-plugin-collector-rand-streaming/README.md index 477b9e1..ff9ca57 100644 --- a/examples/streaming/README.md +++ b/examples/snap-plugin-collector-rand-streaming/README.md @@ -156,7 +156,7 @@ package rand ``` ## Ready to Share -You've made a plugin! Now it's time to share it. Create a release by following these [steps](https://help.github.com/articles/creating-releases/). We recommend that your release version match your plugin version, see example [here](https://github.com/intelsdi-x/snap-plugin-lib-go/blob/master/examples/streaming/main.go#L29). +You've made a plugin! Now it's time to share it. Create a release by following these [steps](https://help.github.com/articles/creating-releases/). We recommend that your release version match your plugin version, see example [here](main.go#L29). Don't forget to announce your plugin release on [slack](https://intelsdi-x.herokuapp.com/) and get your plugin added to the [Plugin Catalog](https://github.com/intelsdi-x/snap/blob/master/docs/PLUGIN_CATALOG.md)! diff --git a/examples/streaming/main.go b/examples/snap-plugin-collector-rand-streaming/main.go similarity index 90% rename from examples/streaming/main.go rename to examples/snap-plugin-collector-rand-streaming/main.go index 26fbe50..7c16493 100644 --- a/examples/streaming/main.go +++ b/examples/snap-plugin-collector-rand-streaming/main.go @@ -20,7 +20,7 @@ limitations under the License. package main import ( - "github.com/intelsdi-x/snap-plugin-lib-go/examples/streaming/rand" + "github.com/intelsdi-x/snap-plugin-lib-go/examples/snap-plugin-collector-rand-streaming/rand" "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin" ) diff --git a/examples/streaming/rand/rand.go b/examples/snap-plugin-collector-rand-streaming/rand/rand.go similarity index 97% rename from examples/streaming/rand/rand.go rename to examples/snap-plugin-collector-rand-streaming/rand/rand.go index 7e90c69..b22b8ad 100644 --- a/examples/streaming/rand/rand.go +++ b/examples/snap-plugin-collector-rand-streaming/rand/rand.go @@ -20,6 +20,7 @@ limitations under the License. package rand import ( + "context" "fmt" "math/rand" "time" @@ -69,13 +70,13 @@ type RandCollector struct { // The metrics_out channel is used by the plugin to send the collected metrics // to Snap. func (r *RandCollector) StreamMetrics( + ctx context.Context, metrics_in chan []plugin.Metric, metrics_out chan []plugin.Metric, err chan string) error { go r.streamIt(metrics_out, err) - go r.drainMetrics(metrics_in) - + r.drainMetrics(metrics_in) return nil } @@ -125,7 +126,7 @@ func (r *RandCollector) streamIt(ch chan []plugin.Metric, err chan string) { } } ch <- metrics - time.Sleep(time.Second * time.Duration(rand.Int63n(10))) + time.Sleep(time.Millisecond * time.Duration(rand.Int63n(1000))) } } diff --git a/examples/snap-plugin-collector-rand/rand/rand.go b/examples/snap-plugin-collector-rand/rand/rand.go index 7f5e86e..b8f7c8f 100644 --- a/examples/snap-plugin-collector-rand/rand/rand.go +++ b/examples/snap-plugin-collector-rand/rand/rand.go @@ -64,14 +64,12 @@ func init() { //When the flag is set, an additional policy will be added in GetConfigPolicy(). //This additional policy has a required field. This simulates //the situation when a plugin requires a config to load. - plugin.AddFlag( - cli.BoolFlag{ - Name: "required-config", - Hidden: false, - Usage: "Plugin requires config passed in", - Destination: &req, - }, - ) + plugin.Flags = append(plugin.Flags, cli.BoolFlag{ + Name: "required-config", + Hidden: false, + Usage: "Plugin requires config passed in", + Destination: &req, + }) } // Mock collector implementation used for testing diff --git a/util/items.go b/util/items.go new file mode 100644 index 0000000..30fd800 --- /dev/null +++ b/util/items.go @@ -0,0 +1,57 @@ +package util + +import ( + "errors" + "sync" + + "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc" + "golang.org/x/net/context" +) + +type streamsMgr struct { + *sync.Mutex + collection map[rpc.StreamCollector_StreamMetricsServer]context.CancelFunc +} + +func New() *streamsMgr { + return &streamsMgr{ + Mutex: &sync.Mutex{}, + collection: make(map[rpc.StreamCollector_StreamMetricsServer]context.CancelFunc), + } +} + +func (s *streamsMgr) Add(stream rpc.StreamCollector_StreamMetricsServer, cancel context.CancelFunc) { + s.Lock() + defer s.Unlock() + s.collection[stream] = cancel +} + +func (s *streamsMgr) RemoveAndCancel(stream rpc.StreamCollector_StreamMetricsServer) error { + s.Lock() + defer s.Unlock() + cancel, ok := s.collection[stream] + if !ok { + return errors.New("stream not found") + } + cancel() + delete(s.collection, stream) + return nil +} + +func (s *streamsMgr) GetAll() []rpc.StreamCollector_StreamMetricsServer { + s.Lock() + defer s.Unlock() + keys := make([]rpc.StreamCollector_StreamMetricsServer, len(s.collection)) + i := 0 + for k := range s.collection { + keys[i] = k + i++ + } + return keys +} + +func (s *streamsMgr) Count() int { + s.Lock() + defer s.Unlock() + return len(s.collection) +} diff --git a/v1/plugin/flags.go b/v1/plugin/flags.go index 64ffce9..46b73e6 100644 --- a/v1/plugin/flags.go +++ b/v1/plugin/flags.go @@ -19,10 +19,22 @@ var ( Name: "port", Usage: "port GRPC will listen on", } + // ListenAddr the address that GRPC will listen on. Plugin authors can also + // use this address if their plugin binds to a local port as it's sometimes + // needed to bind to a public interface. + ListenAddr = "127.0.0.1" + flAddr = cli.StringFlag{ + Name: "addr", + Usage: "addr GRPC will listen on", + Value: ListenAddr, + Destination: &ListenAddr, + } + LogLevel = 2 flLogLevel = cli.IntFlag{ - Name: "log-level", - Usage: "log level - 0:panic 1:fatal 2:error 3:warn 4:info 5:debug", - Value: 2, + Name: "log-level", + Usage: "log level - 0:panic 1:fatal 2:error 3:warn 4:info 5:debug", + Value: LogLevel, + Destination: &LogLevel, } flPprof = cli.BoolFlag{ Name: "pprof", @@ -51,6 +63,18 @@ var ( flHTTPPort = cli.IntFlag{ Name: "stand-alone-port", Usage: "specify http port when stand-alone is set", - Value: 8181, + Value: 8182, + } + collectDurationStr = "5s" + flMaxCollectDuration = cli.StringFlag{ + Name: "max-collect-duration", + Usage: "sets the maximum duration (always greater than 0s) between collections before metrics are sent. Defaults to 10s what means that after 10 seconds no new metrics are received, the plugin should send whatever data it has in the buffer instead of waiting longer. (e.g. 5s)", + Value: collectDurationStr, + Destination: &collectDurationStr, + } + + flMaxMetricsBuffer = cli.Int64Flag{ + Name: "max-metrics-buffer", + Usage: "maximum number of metrics the plugin is buffering before sending metrics. Defaults to zero what means send metrics immediately.", } ) diff --git a/v1/plugin/meta.go b/v1/plugin/meta.go index dc2210f..0b026f9 100644 --- a/v1/plugin/meta.go +++ b/v1/plugin/meta.go @@ -85,6 +85,7 @@ const ( collectorType pluginType = iota processorType publisherType + streamCollectorType ) type metaRPCType int diff --git a/v1/plugin/mocks_test.go b/v1/plugin/mocks_test.go index 0492920..0be62f1 100644 --- a/v1/plugin/mocks_test.go +++ b/v1/plugin/mocks_test.go @@ -22,6 +22,7 @@ limitations under the License. package plugin import ( + "context" "errors" "fmt" "time" @@ -123,7 +124,7 @@ func (mc *mockStreamer) GetMetricTypes(cfg Config) ([]Metric, error) { return mts, nil } -func (mc *mockStreamer) StreamMetrics(i chan []Metric, o chan []Metric, _ chan string) error { +func (mc *mockStreamer) StreamMetrics(ctx context.Context, i chan []Metric, o chan []Metric, _ chan string) error { if mc.err != nil { return errors.New("error") diff --git a/v1/plugin/plugin.go b/v1/plugin/plugin.go index 5ccaf61..3a5e7fa 100644 --- a/v1/plugin/plugin.go +++ b/v1/plugin/plugin.go @@ -20,6 +20,7 @@ limitations under the License. package plugin import ( + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -52,12 +53,12 @@ var ( version int opts []MetaOpt } -) - -func init() { - app = cli.NewApp() - app.Flags = []cli.Flag{ + // Flags required by the plugin lib flags - plugin authors can provide their + // own flags. Checkout https://github.com/intelsdi-x/snap-plugin-lib-go/blob/master/examples/snap-plugin-collector-rand/rand/rand.go + // for an example of a plugin adding a custom flag. + Flags []cli.Flag = []cli.Flag{ flConfig, + flAddr, flPort, flPprof, flTLS, @@ -67,16 +68,10 @@ func init() { flStandAlone, flHTTPPort, flLogLevel, + flMaxCollectDuration, + flMaxMetricsBuffer, } - app.Action = startPlugin -} - -// AddFlag accepts a cli.Flag to the plugins standard flags. -func AddFlag(flags ...cli.Flag) { - for _, f := range flags { - app.Flags = append(app.Flags, f) - } -} +) // Plugin is the base plugin type. All plugins must implement GetConfigPolicy. type Plugin interface { @@ -130,7 +125,7 @@ type StreamCollector interface { // // A channel for error strings that the library will report to snap // as task errors. - StreamMetrics(chan []Metric, chan []Metric, chan string) error + StreamMetrics(context.Context, chan []Metric, chan []Metric, chan string) error GetMetricTypes(Config) ([]Metric, error) } @@ -343,6 +338,10 @@ func buildGRPCServer(typeOfPlugin pluginType, name string, version int, arg *Arg // generates a response for the initial stdin / stdout handshake, and starts // the plugin's gRPC server. func StartCollector(plugin Collector, name string, version int, opts ...MetaOpt) int { + app = cli.NewApp() + app.Flags = Flags + app.Action = startPlugin + appArgs.plugin = plugin appArgs.name = name appArgs.version = version @@ -363,6 +362,10 @@ func StartCollector(plugin Collector, name string, version int, opts ...MetaOpt) // generates a response for the initial stdin / stdout handshake, and starts // the plugin's gRPC server. func StartProcessor(plugin Processor, name string, version int, opts ...MetaOpt) int { + app = cli.NewApp() + app.Flags = Flags + app.Action = startPlugin + appArgs.plugin = plugin appArgs.name = name appArgs.version = version @@ -371,7 +374,9 @@ func StartProcessor(plugin Processor, name string, version int, opts ...MetaOpt) app.Usage = "a Snap processor" err := app.Run(getOSArgs()) if err != nil { - log.Error(err) + log.WithFields(log.Fields{ + "_block": "StartProcessor", + }).Error(err) return 1 } return 0 @@ -381,6 +386,10 @@ func StartProcessor(plugin Processor, name string, version int, opts ...MetaOpt) // generates a response for the initial stdin / stdout handshake, and starts // the plugin's gRPC server. func StartPublisher(plugin Publisher, name string, version int, opts ...MetaOpt) int { + app = cli.NewApp() + app.Flags = Flags + app.Action = startPlugin + appArgs.plugin = plugin appArgs.name = name appArgs.version = version @@ -389,7 +398,9 @@ func StartPublisher(plugin Publisher, name string, version int, opts ...MetaOpt) app.Usage = "a Snap publisher" err := app.Run(getOSArgs()) if err != nil { - log.Error(err) + log.WithFields(log.Fields{ + "_block": "StartPublisher", + }).Error(err) return 1 } return 0 @@ -399,6 +410,10 @@ func StartPublisher(plugin Publisher, name string, version int, opts ...MetaOpt) // generates a response for the initial stdin / stdout handshake, and starts // the plugin's gRPC server. func StartStreamCollector(plugin StreamCollector, name string, version int, opts ...MetaOpt) int { + app = cli.NewApp() + app.Flags = Flags + app.Action = startPlugin + appArgs.plugin = plugin appArgs.name = name appArgs.version = version @@ -409,7 +424,9 @@ func StartStreamCollector(plugin StreamCollector, name string, version int, opts app.Usage = "a Snap collector" err := app.Run(getOSArgs()) if err != nil { - log.Error(err) + log.WithFields(log.Fields{ + "_block": "StartStreamCollector", + }).Error(err) return 1 } return 0 @@ -430,9 +447,10 @@ type preamble struct { func startPlugin(c *cli.Context) error { var ( - server *grpc.Server - meta *meta - pluginProxy *pluginProxy + server *grpc.Server + meta *meta + pluginProxy *pluginProxy + MaxMetricsBuffer int64 ) libInputOutput.setContext(c) arg, err := processInput(c) @@ -444,9 +462,10 @@ func startPlugin(c *cli.Context) error { } else { log.SetLevel(log.Level(arg.LogLevel)) } - logger := log.WithFields(log.Fields{ - "_block": "startPlugin", - }) + logger := log.WithFields( + log.Fields{ + "_block": "startPlugin", + }) switch plugin := appArgs.plugin.(type) { case Collector: proxy := &collectorProxy{ @@ -482,14 +501,37 @@ func startPlugin(c *cli.Context) error { } rpc.RegisterPublisherServer(server, proxy) case StreamCollector: + if c.IsSet("max-metrics-buffer") { + MaxMetricsBuffer = c.Int64("max-metrics-buffer") + } else { + MaxMetricsBuffer = defaultMaxMetricsBuffer + } + + logger.WithFields(log.Fields{ + "option": "max-metrics-buffer", + "value": MaxMetricsBuffer, + }).Debug("setting max metrics buffer") + + maxCollectDuration, err := time.ParseDuration(collectDurationStr) + if err != nil { + return err + } + + logger.WithFields(log.Fields{ + "option": "max-collect-duration", + "value": maxCollectDuration, + }).Debug("setting max collect duration") + proxy := &StreamProxy{ plugin: plugin, + ctx: context.Background(), pluginProxy: *newPluginProxy(plugin), - maxCollectDuration: defaultMaxCollectDuration, - maxMetricsBuffer: defaultMaxMetricsBuffer, + maxCollectDuration: maxCollectDuration, + maxMetricsBuffer: MaxMetricsBuffer, } + pluginProxy = &proxy.pluginProxy - server, meta, err = buildGRPCServer(collectorType, appArgs.name, appArgs.version, arg, appArgs.opts...) + server, meta, err = buildGRPCServer(streamCollectorType, appArgs.name, appArgs.version, arg, appArgs.opts...) if err != nil { return cli.NewExitError(err, 2) } @@ -502,7 +544,6 @@ func startPlugin(c *cli.Context) error { httpPort := c.Int("stand-alone-port") preamble, err := printPreambleAndServe(server, meta, pluginProxy, arg.ListenPort, arg.Pprof) if err != nil { - log.Error(err) return err } @@ -567,13 +608,13 @@ func startPlugin(c *cli.Context) error { } func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isPprof bool) (string, error) { - l, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) + l, err := net.Listen("tcp", fmt.Sprintf("%s:%v", ListenAddr, port)) if err != nil { return "", err } l.Close() - addr := fmt.Sprintf("127.0.0.1:%v", l.Addr().(*net.TCPAddr).Port) + addr := fmt.Sprintf("%s:%v", ListenAddr, l.Addr().(*net.TCPAddr).Port) lis, err := net.Listen("tcp", addr) if err != nil { return "", err @@ -591,9 +632,13 @@ func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isP return "", err } } + advertisedAddr, err := getAddr(ListenAddr) + if err != nil { + return "", err + } resp := preamble{ Meta: *m, - ListenAddress: addr, + ListenAddress: fmt.Sprintf("%v:%v", advertisedAddr, l.Addr().(*net.TCPAddr).Port), Type: m.Type, PprofAddress: pprofAddr, State: 0, // Hardcode success since panics on err @@ -606,6 +651,26 @@ func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isP return string(preambleJSON), nil } +// getAddr if we were provided the addr 0.0.0.0 we need to determine the +// address we will advertise to the framework in the preamble. +func getAddr(addr string) (string, error) { + if strings.Compare(addr, "0.0.0.0") == 0 { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "", err + } + for _, address := range addrs { + // check the address type and if it is not a loopback the display it + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String(), nil + } + } + } + } + return addr, nil +} + func showDiagnostics(m meta, p *pluginProxy, c Config) error { defer timeTrack(time.Now(), "showDiagnostics") printRuntimeDetails(m) @@ -907,5 +972,14 @@ func processInput(c *cli.Context) (*Arg, error) { if c.IsSet("tls") { arg.TLSEnabled = true } + + if c.IsSet("max-collect-duration") { + arg.MaxCollectDuration = c.String("max-collect-duration") + } + + if c.IsSet("max-metrics-buffer") { + arg.MaxMetricsBuffer = c.Int64("max-metrics-buffer") + } + return processArg(arg) } diff --git a/v1/plugin/session.go b/v1/plugin/session.go index 95c7175..a38bc56 100644 --- a/v1/plugin/session.go +++ b/v1/plugin/session.go @@ -37,6 +37,9 @@ type Arg struct { // Flag requesting server to establish TLS channel TLSEnabled bool + + MaxCollectDuration string + MaxMetricsBuffer int64 } // processArg is provided *Arg and returns *Arg after unmarshaling the first command line argument which is expected to be valid JSON. diff --git a/v1/plugin/stream_collector_proxy.go b/v1/plugin/stream_collector_proxy.go index f5af9c2..e9555d1 100644 --- a/v1/plugin/stream_collector_proxy.go +++ b/v1/plugin/stream_collector_proxy.go @@ -7,6 +7,7 @@ import ( "golang.org/x/net/context" + log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc" ) @@ -18,6 +19,7 @@ const ( type StreamProxy struct { pluginProxy plugin StreamCollector + ctx context.Context // maxMetricsBuffer is the maximum number of metrics the plugin is buffering before sending metrics. // Defaults to zero what means send metrics immediately. @@ -49,6 +51,11 @@ func (p *StreamProxy) GetMetricTypes(ctx context.Context, arg *rpc.GetMetricType } func (p *StreamProxy) StreamMetrics(stream rpc.StreamCollector_StreamMetricsServer) error { + log.WithFields( + log.Fields{ + "_block": "StreamMetrics", + }, + ).Debug("streaming started") if stream == nil { return errors.New("Stream metrics server is nil") } @@ -60,37 +67,41 @@ func (p *StreamProxy) StreamMetrics(stream rpc.StreamCollector_StreamMetricsServ inChan := make(chan []Metric) // Metrics out of the plugin into snap. outChan := make(chan []Metric) - - err := p.plugin.StreamMetrics(inChan, outChan, errChan) - if err != nil { - return err - } + // context for communicating that the stream has been closed to the plugin author go p.metricSend(outChan, stream) go p.errorSend(errChan, stream) - p.streamRecv(inChan, stream) + go p.streamRecv(inChan, stream) - return nil -} + return p.plugin.StreamMetrics(stream.Context(), inChan, outChan, errChan) -func (p *StreamProxy) SetConfig(context.Context, *rpc.ConfigMap) (*rpc.ErrReply, error) { - return nil, nil } func (p *StreamProxy) errorSend(errChan chan string, stream rpc.StreamCollector_StreamMetricsServer) { - for r := range errChan { - reply := &rpc.CollectReply{ - Error: &rpc.ErrReply{Error: r}, - } - if err := stream.Send(reply); err != nil { - fmt.Println(err.Error()) + for { + select { + case <-stream.Context().Done(): + return + case r := <-errChan: + reply := &rpc.CollectReply{ + Error: &rpc.ErrReply{Error: r}, + } + if err := stream.Send(reply); err != nil { + fmt.Println(err.Error()) + } } } } func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_StreamMetricsServer) { + log.WithFields( + log.Fields{ + "_block": "metricSend", + }, + ).Debug("sending metrics") metrics := []*rpc.Metric{} + afterCollectDuration := time.After(p.maxCollectDuration) for { select { case mts := <-ch: @@ -118,38 +129,63 @@ func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_St if p.maxMetricsBuffer == 0 { sendReply(metrics, stream) metrics = []*rpc.Metric{} + afterCollectDuration = time.After(p.maxCollectDuration) } - case <-time.After(p.maxCollectDuration): + + case <-afterCollectDuration: // send metrics if maxCollectDuration is reached sendReply(metrics, stream) metrics = []*rpc.Metric{} - + afterCollectDuration = time.After(p.maxCollectDuration) + case <-stream.Context().Done(): + return } } } func (p *StreamProxy) streamRecv(ch chan []Metric, stream rpc.StreamCollector_StreamMetricsServer) { + logger := log.WithFields( + log.Fields{ + "_block": "streamRecv", + }, + ) + logger.Debug("receiving metrics") for { - s, err := stream.Recv() - if err != nil { - fmt.Println(err) - continue - } - if s != nil { - if s.MaxMetricsBuffer > 0 { - p.setMaxMetricsBuffer(s.MaxMetricsBuffer) - } - if s.MaxCollectDuration > 0 { - p.setMaxCollectDuration(time.Duration(s.MaxCollectDuration)) + select { + case <-stream.Context().Done(): + close(ch) + return + default: + + s, err := stream.Recv() + if err != nil { + logger.Error(err) + break } - if s.Metrics_Arg != nil { - metrics := []Metric{} - for _, mt := range s.Metrics_Arg.Metrics { - metric := fromProtoMetric(mt) - metrics = append(metrics, metric) + if s != nil { + if s.MaxMetricsBuffer > 0 { + logger.WithFields(log.Fields{ + "option": "max-metrics-buffer", + "value": s.MaxMetricsBuffer, + }).Debug("setting max metrics buffer option") + p.setMaxMetricsBuffer(s.MaxMetricsBuffer) + } + if s.MaxCollectDuration > 0 { + logger.WithFields(log.Fields{ + "option": "max-collect-duration", + "value": fmt.Sprintf("%v seconds", time.Duration(s.MaxCollectDuration).Seconds()), + }).Debug("setting max collect duration option") + p.setMaxCollectDuration(time.Duration(s.MaxCollectDuration)) + } + if s.Metrics_Arg != nil { + metrics := []Metric{} + for _, mt := range s.Metrics_Arg.Metrics { + metric := fromProtoMetric(mt) + metrics = append(metrics, metric) + } + // send requested metrics to be collected into the stream plugin + ch <- metrics } - // send requested metrics to be collected into the stream plugin - ch <- metrics } } } @@ -174,6 +210,10 @@ func sendReply(metrics []*rpc.Metric, stream rpc.StreamCollector_StreamMetricsSe } if err := stream.Send(reply); err != nil { - fmt.Println(err.Error()) + log.WithFields( + log.Fields{ + "_block": "streamRecv", + }, + ).Error(err) } } diff --git a/v1/plugin/stream_collector_proxy_test.go b/v1/plugin/stream_collector_proxy_test.go index 747648f..369c7e0 100644 --- a/v1/plugin/stream_collector_proxy_test.go +++ b/v1/plugin/stream_collector_proxy_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "golang.org/x/net/context" "google.golang.org/grpc" "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc" @@ -37,6 +38,10 @@ type mockStreamServer struct { recvChan chan *rpc.CollectArg } +func (m mockStreamServer) Context() context.Context { + return context.TODO() +} + func (m mockStreamServer) Send(arg *rpc.CollectReply) error { m.sendChan <- arg return nil @@ -54,7 +59,7 @@ func TestStreamMetrics(t *testing.T) { time.Sleep(t) } } - Convey("TestStreamMetrics", t, func() { + Convey("TestStreamMetrics", t, func(c C) { Convey("Error calling StreamMetrics", func() { sp := StreamProxy{ pluginProxy: *newPluginProxy(newMockErrStreamer()), @@ -62,17 +67,13 @@ func TestStreamMetrics(t *testing.T) { maxMetricsBuffer: defaultMaxMetricsBuffer, maxCollectDuration: defaultMaxCollectDuration, } - sendChan := make(chan *rpc.CollectReply) - recvChan := make(chan *rpc.CollectArg) - s := mockStreamServer{ - sendChan: sendChan, - recvChan: recvChan, - } - - err := sp.StreamMetrics(s) + errChan := make(chan string) + sendChan := make(chan []Metric) + recvChan := make(chan []Metric) + err := sp.plugin.StreamMetrics(context.Background(), recvChan, sendChan, errChan) So(err, ShouldNotBeNil) }) - Convey("Successful Call to StreamMetrics", func() { + Convey("Successful Call to StreamMetrics", func(c C) { // Make a successful call to stream metrics pl := newMockStreamer() sp := StreamProxy{ @@ -90,7 +91,7 @@ func TestStreamMetrics(t *testing.T) { } go func() { err := sp.StreamMetrics(s) - So(err, ShouldBeNil) + c.So(err, ShouldBeNil) }() Convey("Successful call, stream error", func() { // plugin returns an error. @@ -99,7 +100,7 @@ func TestStreamMetrics(t *testing.T) { // plugin returns metrics }) }) - Convey("Successfully stream metrics from plugin immediately", func() { + Convey("Successfully stream metrics from plugin immediately", func(c C) { pl := newMockStreamerStream(mockStreamAction) sp := StreamProxy{ pluginProxy: *newPluginProxy(newMockStreamer()), @@ -115,7 +116,7 @@ func TestStreamMetrics(t *testing.T) { } go func() { err := sp.StreamMetrics(s) - So(err, ShouldBeNil) + c.So(err, ShouldBeNil) }() // Need to give time for streamMetrics call to propagate time.Sleep(time.Millisecond * 100) @@ -139,7 +140,7 @@ func TestStreamMetrics(t *testing.T) { } }) }) - Convey("Successfully stream metrics from plugin", func() { + Convey("Successfully stream metrics from plugin", func(c C) { pl := newMockStreamerStream(mockStreamAction) // Set maxMetricsBuffer to define buffer capacity @@ -157,7 +158,7 @@ func TestStreamMetrics(t *testing.T) { } go func() { err := sp.StreamMetrics(s) - So(err, ShouldBeNil) + c.So(err, ShouldBeNil) }() // Create mocked metrics metrics := []Metric{} @@ -169,8 +170,8 @@ func TestStreamMetrics(t *testing.T) { Convey("get buffered metrics through stream proxy when maxMetricsBuffer is reached", func() { Convey("when maxMetricsBuffer is reached", func() { So(pl.outMetric, ShouldNotBeNil) - // Send metrics down to channel every 100 ms - pl.doAction(time.Millisecond*100, metrics) + // Send metrics down to channel every 20 ms + pl.doAction(time.Millisecond*20, metrics) select { case mts := <-sendChan: // Success! we got something....