From 56831412dbdbf9a455fbd1b033f45347e64c86f5 Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Mon, 8 May 2017 13:30:53 -0700 Subject: [PATCH] Enables streaming collectors --- v1/plugin/meta.go | 1 + v1/plugin/plugin.go | 2 +- v1/plugin/stream_collector_proxy.go | 32 ++++++++++++++++++------ v1/plugin/stream_collector_proxy_test.go | 14 +++++------ 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/v1/plugin/meta.go b/v1/plugin/meta.go index dc2210f..0b026f9 100644 --- a/v1/plugin/meta.go +++ b/v1/plugin/meta.go @@ -85,6 +85,7 @@ const ( collectorType pluginType = iota processorType publisherType + streamCollectorType ) type metaRPCType int diff --git a/v1/plugin/plugin.go b/v1/plugin/plugin.go index 5ccaf61..61942d1 100644 --- a/v1/plugin/plugin.go +++ b/v1/plugin/plugin.go @@ -489,7 +489,7 @@ func startPlugin(c *cli.Context) error { maxMetricsBuffer: defaultMaxMetricsBuffer, } pluginProxy = &proxy.pluginProxy - server, meta, err = buildGRPCServer(collectorType, appArgs.name, appArgs.version, arg, appArgs.opts...) + server, meta, err = buildGRPCServer(streamCollectorType, appArgs.name, appArgs.version, arg, appArgs.opts...) if err != nil { return cli.NewExitError(err, 2) } diff --git a/v1/plugin/stream_collector_proxy.go b/v1/plugin/stream_collector_proxy.go index f5af9c2..897a872 100644 --- a/v1/plugin/stream_collector_proxy.go +++ b/v1/plugin/stream_collector_proxy.go @@ -7,6 +7,7 @@ import ( "golang.org/x/net/context" + log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc" ) @@ -49,6 +50,11 @@ func (p *StreamProxy) GetMetricTypes(ctx context.Context, arg *rpc.GetMetricType } func (p *StreamProxy) StreamMetrics(stream rpc.StreamCollector_StreamMetricsServer) error { + log.WithFields( + log.Fields{ + "_block": "StreamMetrics", + }, + ).Debug("streaming started") if stream == nil { return errors.New("Stream metrics server is nil") } @@ -61,16 +67,18 @@ func (p *StreamProxy) StreamMetrics(stream rpc.StreamCollector_StreamMetricsServ // Metrics out of the plugin into snap. outChan := make(chan []Metric) - err := p.plugin.StreamMetrics(inChan, outChan, errChan) - if err != nil { - return err - } + go func() { + err := p.plugin.StreamMetrics(inChan, outChan, errChan) + if err != nil { + log.Fatal(err) + } + }() go p.metricSend(outChan, stream) go p.errorSend(errChan, stream) - p.streamRecv(inChan, stream) + go p.streamRecv(inChan, stream) - return nil + return p.plugin.StreamMetrics(inChan, outChan, errChan) } func (p *StreamProxy) SetConfig(context.Context, *rpc.ConfigMap) (*rpc.ErrReply, error) { @@ -89,6 +97,11 @@ func (p *StreamProxy) errorSend(errChan chan string, stream rpc.StreamCollector_ } func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_StreamMetricsServer) { + log.WithFields( + log.Fields{ + "_block": "metricSend", + }, + ).Debug("sending metrics") metrics := []*rpc.Metric{} for { @@ -129,11 +142,16 @@ func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_St } func (p *StreamProxy) streamRecv(ch chan []Metric, stream rpc.StreamCollector_StreamMetricsServer) { + log.WithFields( + log.Fields{ + "_block": "streamRecv", + }, + ).Debug("receiving metrics") for { s, err := stream.Recv() if err != nil { fmt.Println(err) - continue + return } if s != nil { if s.MaxMetricsBuffer > 0 { diff --git a/v1/plugin/stream_collector_proxy_test.go b/v1/plugin/stream_collector_proxy_test.go index 747648f..83c46fd 100644 --- a/v1/plugin/stream_collector_proxy_test.go +++ b/v1/plugin/stream_collector_proxy_test.go @@ -54,7 +54,7 @@ func TestStreamMetrics(t *testing.T) { time.Sleep(t) } } - Convey("TestStreamMetrics", t, func() { + Convey("TestStreamMetrics", t, func(c C) { Convey("Error calling StreamMetrics", func() { sp := StreamProxy{ pluginProxy: *newPluginProxy(newMockErrStreamer()), @@ -72,7 +72,7 @@ func TestStreamMetrics(t *testing.T) { err := sp.StreamMetrics(s) So(err, ShouldNotBeNil) }) - Convey("Successful Call to StreamMetrics", func() { + Convey("Successful Call to StreamMetrics", func(c C) { // Make a successful call to stream metrics pl := newMockStreamer() sp := StreamProxy{ @@ -90,7 +90,7 @@ func TestStreamMetrics(t *testing.T) { } go func() { err := sp.StreamMetrics(s) - So(err, ShouldBeNil) + c.So(err, ShouldBeNil) }() Convey("Successful call, stream error", func() { // plugin returns an error. @@ -99,7 +99,7 @@ func TestStreamMetrics(t *testing.T) { // plugin returns metrics }) }) - Convey("Successfully stream metrics from plugin immediately", func() { + Convey("Successfully stream metrics from plugin immediately", func(c C) { pl := newMockStreamerStream(mockStreamAction) sp := StreamProxy{ pluginProxy: *newPluginProxy(newMockStreamer()), @@ -115,7 +115,7 @@ func TestStreamMetrics(t *testing.T) { } go func() { err := sp.StreamMetrics(s) - So(err, ShouldBeNil) + c.So(err, ShouldBeNil) }() // Need to give time for streamMetrics call to propagate time.Sleep(time.Millisecond * 100) @@ -139,7 +139,7 @@ func TestStreamMetrics(t *testing.T) { } }) }) - Convey("Successfully stream metrics from plugin", func() { + Convey("Successfully stream metrics from plugin", func(c C) { pl := newMockStreamerStream(mockStreamAction) // Set maxMetricsBuffer to define buffer capacity @@ -157,7 +157,7 @@ func TestStreamMetrics(t *testing.T) { } go func() { err := sp.StreamMetrics(s) - So(err, ShouldBeNil) + c.So(err, ShouldBeNil) }() // Create mocked metrics metrics := []Metric{}