Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Enable streaming collector #85

Merged
merged 1 commit into from
Jun 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ package rand
```

## Ready to Share
You've made a plugin! Now it's time to share it. Create a release by following these [steps](https://help.github.com/articles/creating-releases/). We recommend that your release version match your plugin version, see example [here](https://github.com/intelsdi-x/snap-plugin-lib-go/blob/master/examples/streaming/main.go#L29).
You've made a plugin! Now it's time to share it. Create a release by following these [steps](https://help.github.com/articles/creating-releases/). We recommend that your release version match your plugin version, see example [here](main.go#L29).

Don't forget to announce your plugin release on [slack](https://intelsdi-x.herokuapp.com/) and get your plugin added to the [Plugin Catalog](https://github.com/intelsdi-x/snap/blob/master/docs/PLUGIN_CATALOG.md)!

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ limitations under the License.
package main

import (
"github.com/intelsdi-x/snap-plugin-lib-go/examples/streaming/rand"
"github.com/intelsdi-x/snap-plugin-lib-go/examples/snap-plugin-collector-rand-streaming/rand"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on that, some changes are needed in links in README.md - for example here: https://github.com/intelsdi-x/snap-plugin-lib-go/blob/master/examples/streaming/README.md#ready-to-share:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcooklin, I made PR with fix to your repo: jcooklin#10

"github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package rand

import (
"context"
"fmt"
"math/rand"
"time"
Expand Down Expand Up @@ -69,13 +70,13 @@ type RandCollector struct {
// The metrics_out channel is used by the plugin to send the collected metrics
// to Snap.
func (r *RandCollector) StreamMetrics(
ctx context.Context,
metrics_in chan []plugin.Metric,
metrics_out chan []plugin.Metric,
err chan string) error {

go r.streamIt(metrics_out, err)
go r.drainMetrics(metrics_in)

r.drainMetrics(metrics_in)
return nil
}

Expand Down Expand Up @@ -125,7 +126,7 @@ func (r *RandCollector) streamIt(ch chan []plugin.Metric, err chan string) {
}
}
ch <- metrics
time.Sleep(time.Second * time.Duration(rand.Int63n(10)))
time.Sleep(time.Millisecond * time.Duration(rand.Int63n(1000)))
}
}

Expand Down
14 changes: 6 additions & 8 deletions examples/snap-plugin-collector-rand/rand/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,12 @@ func init() {
//When the flag is set, an additional policy will be added in GetConfigPolicy().
//This additional policy has a required field. This simulates
//the situation when a plugin requires a config to load.
plugin.AddFlag(
cli.BoolFlag{
Name: "required-config",
Hidden: false,
Usage: "Plugin requires config passed in",
Destination: &req,
},
)
plugin.Flags = append(plugin.Flags, cli.BoolFlag{
Name: "required-config",
Hidden: false,
Usage: "Plugin requires config passed in",
Destination: &req,
})
}

// Mock collector implementation used for testing
Expand Down
57 changes: 57 additions & 0 deletions util/items.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package util

import (
"errors"
"sync"

"github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc"
"golang.org/x/net/context"
)

type streamsMgr struct {
*sync.Mutex
collection map[rpc.StreamCollector_StreamMetricsServer]context.CancelFunc
}

func New() *streamsMgr {
return &streamsMgr{
Mutex: &sync.Mutex{},
collection: make(map[rpc.StreamCollector_StreamMetricsServer]context.CancelFunc),
}
}

func (s *streamsMgr) Add(stream rpc.StreamCollector_StreamMetricsServer, cancel context.CancelFunc) {
s.Lock()
defer s.Unlock()
s.collection[stream] = cancel
}

func (s *streamsMgr) RemoveAndCancel(stream rpc.StreamCollector_StreamMetricsServer) error {
s.Lock()
defer s.Unlock()
cancel, ok := s.collection[stream]
if !ok {
return errors.New("stream not found")
}
cancel()
delete(s.collection, stream)
return nil
}

func (s *streamsMgr) GetAll() []rpc.StreamCollector_StreamMetricsServer {
s.Lock()
defer s.Unlock()
keys := make([]rpc.StreamCollector_StreamMetricsServer, len(s.collection))
i := 0
for k := range s.collection {
keys[i] = k
i++
}
return keys
}

func (s *streamsMgr) Count() int {
s.Lock()
defer s.Unlock()
return len(s.collection)
}
32 changes: 28 additions & 4 deletions v1/plugin/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@ var (
Name: "port",
Usage: "port GRPC will listen on",
}
// ListenAddr the address that GRPC will listen on. Plugin authors can also
// use this address if their plugin binds to a local port as it's sometimes
// needed to bind to a public interface.
ListenAddr = "127.0.0.1"
flAddr = cli.StringFlag{
Name: "addr",
Usage: "addr GRPC will listen on",
Value: ListenAddr,
Destination: &ListenAddr,
}
LogLevel = 2
flLogLevel = cli.IntFlag{
Name: "log-level",
Usage: "log level - 0:panic 1:fatal 2:error 3:warn 4:info 5:debug",
Value: 2,
Name: "log-level",
Usage: "log level - 0:panic 1:fatal 2:error 3:warn 4:info 5:debug",
Value: LogLevel,
Destination: &LogLevel,
}
flPprof = cli.BoolFlag{
Name: "pprof",
Expand Down Expand Up @@ -51,6 +63,18 @@ var (
flHTTPPort = cli.IntFlag{
Name: "stand-alone-port",
Usage: "specify http port when stand-alone is set",
Value: 8181,
Value: 8182,
}
collectDurationStr = "5s"
flMaxCollectDuration = cli.StringFlag{
Name: "max-collect-duration",
Usage: "sets the maximum duration (always greater than 0s) between collections before metrics are sent. Defaults to 10s what means that after 10 seconds no new metrics are received, the plugin should send whatever data it has in the buffer instead of waiting longer. (e.g. 5s)",
Value: collectDurationStr,
Destination: &collectDurationStr,
}

flMaxMetricsBuffer = cli.Int64Flag{
Name: "max-metrics-buffer",
Usage: "maximum number of metrics the plugin is buffering before sending metrics. Defaults to zero what means send metrics immediately.",
}
)
1 change: 1 addition & 0 deletions v1/plugin/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
collectorType pluginType = iota
processorType
publisherType
streamCollectorType
)

type metaRPCType int
Expand Down
3 changes: 2 additions & 1 deletion v1/plugin/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License.
package plugin

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -123,7 +124,7 @@ func (mc *mockStreamer) GetMetricTypes(cfg Config) ([]Metric, error) {
return mts, nil
}

func (mc *mockStreamer) StreamMetrics(i chan []Metric, o chan []Metric, _ chan string) error {
func (mc *mockStreamer) StreamMetrics(ctx context.Context, i chan []Metric, o chan []Metric, _ chan string) error {

if mc.err != nil {
return errors.New("error")
Expand Down
Loading