Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Enables streaming collectors
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed May 11, 2017
1 parent d1d32a0 commit 5683141
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 15 deletions.
1 change: 1 addition & 0 deletions v1/plugin/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
collectorType pluginType = iota
processorType
publisherType
streamCollectorType
)

type metaRPCType int
Expand Down
2 changes: 1 addition & 1 deletion v1/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func startPlugin(c *cli.Context) error {
maxMetricsBuffer: defaultMaxMetricsBuffer,
}
pluginProxy = &proxy.pluginProxy
server, meta, err = buildGRPCServer(collectorType, appArgs.name, appArgs.version, arg, appArgs.opts...)
server, meta, err = buildGRPCServer(streamCollectorType, appArgs.name, appArgs.version, arg, appArgs.opts...)
if err != nil {
return cli.NewExitError(err, 2)
}
Expand Down
32 changes: 25 additions & 7 deletions v1/plugin/stream_collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"golang.org/x/net/context"

log "github.com/Sirupsen/logrus"
"github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc"
)

Expand Down Expand Up @@ -49,6 +50,11 @@ func (p *StreamProxy) GetMetricTypes(ctx context.Context, arg *rpc.GetMetricType
}

func (p *StreamProxy) StreamMetrics(stream rpc.StreamCollector_StreamMetricsServer) error {
log.WithFields(
log.Fields{
"_block": "StreamMetrics",
},
).Debug("streaming started")
if stream == nil {
return errors.New("Stream metrics server is nil")
}
Expand All @@ -61,16 +67,18 @@ func (p *StreamProxy) StreamMetrics(stream rpc.StreamCollector_StreamMetricsServ
// Metrics out of the plugin into snap.
outChan := make(chan []Metric)

err := p.plugin.StreamMetrics(inChan, outChan, errChan)
if err != nil {
return err
}
go func() {
err := p.plugin.StreamMetrics(inChan, outChan, errChan)
if err != nil {
log.Fatal(err)
}
}()

go p.metricSend(outChan, stream)
go p.errorSend(errChan, stream)
p.streamRecv(inChan, stream)
go p.streamRecv(inChan, stream)

return nil
return p.plugin.StreamMetrics(inChan, outChan, errChan)
}

func (p *StreamProxy) SetConfig(context.Context, *rpc.ConfigMap) (*rpc.ErrReply, error) {
Expand All @@ -89,6 +97,11 @@ func (p *StreamProxy) errorSend(errChan chan string, stream rpc.StreamCollector_
}

func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_StreamMetricsServer) {
log.WithFields(
log.Fields{
"_block": "metricSend",
},
).Debug("sending metrics")
metrics := []*rpc.Metric{}

for {
Expand Down Expand Up @@ -129,11 +142,16 @@ func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_St
}

func (p *StreamProxy) streamRecv(ch chan []Metric, stream rpc.StreamCollector_StreamMetricsServer) {
log.WithFields(
log.Fields{
"_block": "streamRecv",
},
).Debug("receiving metrics")
for {
s, err := stream.Recv()
if err != nil {
fmt.Println(err)
continue
return
}
if s != nil {
if s.MaxMetricsBuffer > 0 {
Expand Down
14 changes: 7 additions & 7 deletions v1/plugin/stream_collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestStreamMetrics(t *testing.T) {
time.Sleep(t)
}
}
Convey("TestStreamMetrics", t, func() {
Convey("TestStreamMetrics", t, func(c C) {
Convey("Error calling StreamMetrics", func() {
sp := StreamProxy{
pluginProxy: *newPluginProxy(newMockErrStreamer()),
Expand All @@ -72,7 +72,7 @@ func TestStreamMetrics(t *testing.T) {
err := sp.StreamMetrics(s)
So(err, ShouldNotBeNil)
})
Convey("Successful Call to StreamMetrics", func() {
Convey("Successful Call to StreamMetrics", func(c C) {
// Make a successful call to stream metrics
pl := newMockStreamer()
sp := StreamProxy{
Expand All @@ -90,7 +90,7 @@ func TestStreamMetrics(t *testing.T) {
}
go func() {
err := sp.StreamMetrics(s)
So(err, ShouldBeNil)
c.So(err, ShouldBeNil)
}()
Convey("Successful call, stream error", func() {
// plugin returns an error.
Expand All @@ -99,7 +99,7 @@ func TestStreamMetrics(t *testing.T) {
// plugin returns metrics
})
})
Convey("Successfully stream metrics from plugin immediately", func() {
Convey("Successfully stream metrics from plugin immediately", func(c C) {
pl := newMockStreamerStream(mockStreamAction)
sp := StreamProxy{
pluginProxy: *newPluginProxy(newMockStreamer()),
Expand All @@ -115,7 +115,7 @@ func TestStreamMetrics(t *testing.T) {
}
go func() {
err := sp.StreamMetrics(s)
So(err, ShouldBeNil)
c.So(err, ShouldBeNil)
}()
// Need to give time for streamMetrics call to propagate
time.Sleep(time.Millisecond * 100)
Expand All @@ -139,7 +139,7 @@ func TestStreamMetrics(t *testing.T) {
}
})
})
Convey("Successfully stream metrics from plugin", func() {
Convey("Successfully stream metrics from plugin", func(c C) {
pl := newMockStreamerStream(mockStreamAction)

// Set maxMetricsBuffer to define buffer capacity
Expand All @@ -157,7 +157,7 @@ func TestStreamMetrics(t *testing.T) {
}
go func() {
err := sp.StreamMetrics(s)
So(err, ShouldBeNil)
c.So(err, ShouldBeNil)
}()
// Create mocked metrics
metrics := []Metric{}
Expand Down

0 comments on commit 5683141

Please sign in to comment.