Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #85 from jcooklin/fb/enables-streaming
Browse files Browse the repository at this point in the history
Enables streaming collectors
  • Loading branch information
jcooklin authored Jun 17, 2017
2 parents 6031e52 + 3a91475 commit 25d0f6c
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ package rand
```

## Ready to Share
You've made a plugin! Now it's time to share it. Create a release by following these [steps](https://help.github.com/articles/creating-releases/). We recommend that your release version match your plugin version, see example [here](https://github.com/intelsdi-x/snap-plugin-lib-go/blob/master/examples/streaming/main.go#L29).
You've made a plugin! Now it's time to share it. Create a release by following these [steps](https://help.github.com/articles/creating-releases/). We recommend that your release version match your plugin version, see example [here](main.go#L29).

Don't forget to announce your plugin release on [slack](https://intelsdi-x.herokuapp.com/) and get your plugin added to the [Plugin Catalog](https://github.com/intelsdi-x/snap/blob/master/docs/PLUGIN_CATALOG.md)!

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ limitations under the License.
package main

import (
"github.com/intelsdi-x/snap-plugin-lib-go/examples/streaming/rand"
"github.com/intelsdi-x/snap-plugin-lib-go/examples/snap-plugin-collector-rand-streaming/rand"
"github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package rand

import (
"context"
"fmt"
"math/rand"
"time"
Expand Down Expand Up @@ -69,13 +70,13 @@ type RandCollector struct {
// The metrics_out channel is used by the plugin to send the collected metrics
// to Snap.
func (r *RandCollector) StreamMetrics(
ctx context.Context,
metrics_in chan []plugin.Metric,
metrics_out chan []plugin.Metric,
err chan string) error {

go r.streamIt(metrics_out, err)
go r.drainMetrics(metrics_in)

r.drainMetrics(metrics_in)
return nil
}

Expand Down Expand Up @@ -125,7 +126,7 @@ func (r *RandCollector) streamIt(ch chan []plugin.Metric, err chan string) {
}
}
ch <- metrics
time.Sleep(time.Second * time.Duration(rand.Int63n(10)))
time.Sleep(time.Millisecond * time.Duration(rand.Int63n(1000)))
}
}

Expand Down
14 changes: 6 additions & 8 deletions examples/snap-plugin-collector-rand/rand/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,12 @@ func init() {
//When the flag is set, an additional policy will be added in GetConfigPolicy().
//This additional policy has a required field. This simulates
//the situation when a plugin requires a config to load.
plugin.AddFlag(
cli.BoolFlag{
Name: "required-config",
Hidden: false,
Usage: "Plugin requires config passed in",
Destination: &req,
},
)
plugin.Flags = append(plugin.Flags, cli.BoolFlag{
Name: "required-config",
Hidden: false,
Usage: "Plugin requires config passed in",
Destination: &req,
})
}

// Mock collector implementation used for testing
Expand Down
57 changes: 57 additions & 0 deletions util/items.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package util

import (
"errors"
"sync"

"github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc"
"golang.org/x/net/context"
)

type streamsMgr struct {
*sync.Mutex
collection map[rpc.StreamCollector_StreamMetricsServer]context.CancelFunc
}

func New() *streamsMgr {
return &streamsMgr{
Mutex: &sync.Mutex{},
collection: make(map[rpc.StreamCollector_StreamMetricsServer]context.CancelFunc),
}
}

func (s *streamsMgr) Add(stream rpc.StreamCollector_StreamMetricsServer, cancel context.CancelFunc) {
s.Lock()
defer s.Unlock()
s.collection[stream] = cancel
}

func (s *streamsMgr) RemoveAndCancel(stream rpc.StreamCollector_StreamMetricsServer) error {
s.Lock()
defer s.Unlock()
cancel, ok := s.collection[stream]
if !ok {
return errors.New("stream not found")
}
cancel()
delete(s.collection, stream)
return nil
}

func (s *streamsMgr) GetAll() []rpc.StreamCollector_StreamMetricsServer {
s.Lock()
defer s.Unlock()
keys := make([]rpc.StreamCollector_StreamMetricsServer, len(s.collection))
i := 0
for k := range s.collection {
keys[i] = k
i++
}
return keys
}

func (s *streamsMgr) Count() int {
s.Lock()
defer s.Unlock()
return len(s.collection)
}
32 changes: 28 additions & 4 deletions v1/plugin/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@ var (
Name: "port",
Usage: "port GRPC will listen on",
}
// ListenAddr the address that GRPC will listen on. Plugin authors can also
// use this address if their plugin binds to a local port as it's sometimes
// needed to bind to a public interface.
ListenAddr = "127.0.0.1"
flAddr = cli.StringFlag{
Name: "addr",
Usage: "addr GRPC will listen on",
Value: ListenAddr,
Destination: &ListenAddr,
}
LogLevel = 2
flLogLevel = cli.IntFlag{
Name: "log-level",
Usage: "log level - 0:panic 1:fatal 2:error 3:warn 4:info 5:debug",
Value: 2,
Name: "log-level",
Usage: "log level - 0:panic 1:fatal 2:error 3:warn 4:info 5:debug",
Value: LogLevel,
Destination: &LogLevel,
}
flPprof = cli.BoolFlag{
Name: "pprof",
Expand Down Expand Up @@ -51,6 +63,18 @@ var (
flHTTPPort = cli.IntFlag{
Name: "stand-alone-port",
Usage: "specify http port when stand-alone is set",
Value: 8181,
Value: 8182,
}
collectDurationStr = "5s"
flMaxCollectDuration = cli.StringFlag{
Name: "max-collect-duration",
Usage: "sets the maximum duration (always greater than 0s) between collections before metrics are sent. Defaults to 10s what means that after 10 seconds no new metrics are received, the plugin should send whatever data it has in the buffer instead of waiting longer. (e.g. 5s)",
Value: collectDurationStr,
Destination: &collectDurationStr,
}

flMaxMetricsBuffer = cli.Int64Flag{
Name: "max-metrics-buffer",
Usage: "maximum number of metrics the plugin is buffering before sending metrics. Defaults to zero what means send metrics immediately.",
}
)
1 change: 1 addition & 0 deletions v1/plugin/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
collectorType pluginType = iota
processorType
publisherType
streamCollectorType
)

type metaRPCType int
Expand Down
3 changes: 2 additions & 1 deletion v1/plugin/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License.
package plugin

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -123,7 +124,7 @@ func (mc *mockStreamer) GetMetricTypes(cfg Config) ([]Metric, error) {
return mts, nil
}

func (mc *mockStreamer) StreamMetrics(i chan []Metric, o chan []Metric, _ chan string) error {
func (mc *mockStreamer) StreamMetrics(ctx context.Context, i chan []Metric, o chan []Metric, _ chan string) error {

if mc.err != nil {
return errors.New("error")
Expand Down
Loading

0 comments on commit 25d0f6c

Please sign in to comment.