diff --git a/examples/snap-plugin-collector-rand/rand/rand.go b/examples/snap-plugin-collector-rand/rand/rand.go index 7f5e86e..b8f7c8f 100644 --- a/examples/snap-plugin-collector-rand/rand/rand.go +++ b/examples/snap-plugin-collector-rand/rand/rand.go @@ -64,14 +64,12 @@ func init() { //When the flag is set, an additional policy will be added in GetConfigPolicy(). //This additional policy has a required field. This simulates //the situation when a plugin requires a config to load. - plugin.AddFlag( - cli.BoolFlag{ - Name: "required-config", - Hidden: false, - Usage: "Plugin requires config passed in", - Destination: &req, - }, - ) + plugin.Flags = append(plugin.Flags, cli.BoolFlag{ + Name: "required-config", + Hidden: false, + Usage: "Plugin requires config passed in", + Destination: &req, + }) } // Mock collector implementation used for testing diff --git a/v1/plugin/flags.go b/v1/plugin/flags.go index 64ffce9..510e9f7 100644 --- a/v1/plugin/flags.go +++ b/v1/plugin/flags.go @@ -19,6 +19,13 @@ var ( Name: "port", Usage: "port GRPC will listen on", } + grpcListenAddr = "127.0.0.1" + flAddr = cli.StringFlag{ + Name: "addr", + Usage: "addr GRPC will listen on", + Value: grpcListenAddr, + Destination: &grpcListenAddr, + } flLogLevel = cli.IntFlag{ Name: "log-level", Usage: "log level - 0:panic 1:fatal 2:error 3:warn 4:info 5:debug", diff --git a/v1/plugin/plugin.go b/v1/plugin/plugin.go index 61942d1..fa68b39 100644 --- a/v1/plugin/plugin.go +++ b/v1/plugin/plugin.go @@ -52,12 +52,11 @@ var ( version int opts []MetaOpt } -) - -func init() { - app = cli.NewApp() - app.Flags = []cli.Flag{ + // Flags required by the plugin lib flags - plugin authors can provide their + // own flags. + Flags []cli.Flag = []cli.Flag{ flConfig, + flAddr, flPort, flPprof, flTLS, @@ -68,15 +67,14 @@ func init() { flHTTPPort, flLogLevel, } - app.Action = startPlugin -} +) -// AddFlag accepts a cli.Flag to the plugins standard flags. -func AddFlag(flags ...cli.Flag) { - for _, f := range flags { - app.Flags = append(app.Flags, f) - } -} +// // AddFlag accepts a cli.Flag to the plugins standard flags. +// func AddFlag(flags ...cli.Flag) { +// for _, f := range flags { +// app.Flags = append(app.Flags, f) +// } +// } // Plugin is the base plugin type. All plugins must implement GetConfigPolicy. type Plugin interface { @@ -343,6 +341,10 @@ func buildGRPCServer(typeOfPlugin pluginType, name string, version int, arg *Arg // generates a response for the initial stdin / stdout handshake, and starts // the plugin's gRPC server. func StartCollector(plugin Collector, name string, version int, opts ...MetaOpt) int { + app = cli.NewApp() + app.Flags = Flags + app.Action = startPlugin + appArgs.plugin = plugin appArgs.name = name appArgs.version = version @@ -363,6 +365,10 @@ func StartCollector(plugin Collector, name string, version int, opts ...MetaOpt) // generates a response for the initial stdin / stdout handshake, and starts // the plugin's gRPC server. func StartProcessor(plugin Processor, name string, version int, opts ...MetaOpt) int { + app = cli.NewApp() + app.Flags = Flags + app.Action = startPlugin + appArgs.plugin = plugin appArgs.name = name appArgs.version = version @@ -381,6 +387,10 @@ func StartProcessor(plugin Processor, name string, version int, opts ...MetaOpt) // generates a response for the initial stdin / stdout handshake, and starts // the plugin's gRPC server. func StartPublisher(plugin Publisher, name string, version int, opts ...MetaOpt) int { + app = cli.NewApp() + app.Flags = Flags + app.Action = startPlugin + appArgs.plugin = plugin appArgs.name = name appArgs.version = version @@ -399,6 +409,10 @@ func StartPublisher(plugin Publisher, name string, version int, opts ...MetaOpt) // generates a response for the initial stdin / stdout handshake, and starts // the plugin's gRPC server. func StartStreamCollector(plugin StreamCollector, name string, version int, opts ...MetaOpt) int { + app = cli.NewApp() + app.Flags = Flags + app.Action = startPlugin + appArgs.plugin = plugin appArgs.name = name appArgs.version = version @@ -573,7 +587,7 @@ func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isP } l.Close() - addr := fmt.Sprintf("127.0.0.1:%v", l.Addr().(*net.TCPAddr).Port) + addr := fmt.Sprintf("%s:%v", grpcListenAddr, l.Addr().(*net.TCPAddr).Port) lis, err := net.Listen("tcp", addr) if err != nil { return "", err @@ -591,9 +605,13 @@ func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isP return "", err } } + advertisedAddr, err := getAddr(grpcListenAddr) + if err != nil { + return "", err + } resp := preamble{ Meta: *m, - ListenAddress: addr, + ListenAddress: fmt.Sprintf("%v:%v", advertisedAddr, l.Addr().(*net.TCPAddr).Port), Type: m.Type, PprofAddress: pprofAddr, State: 0, // Hardcode success since panics on err @@ -606,6 +624,26 @@ func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isP return string(preambleJSON), nil } +// getAddr if we were provided the addr 0.0.0.0 we need to determine the +// address we will advertise to the framework in the premable. +func getAddr(addr string) (string, error) { + if strings.Compare(addr, "0.0.0.0") == 0 { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "", err + } + for _, address := range addrs { + // check the address type and if it is not a loopback the display it + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String(), nil + } + } + } + } + return addr, nil +} + func showDiagnostics(m meta, p *pluginProxy, c Config) error { defer timeTrack(time.Now(), "showDiagnostics") printRuntimeDetails(m) diff --git a/v1/plugin/stream_collector_proxy.go b/v1/plugin/stream_collector_proxy.go index 897a872..1542643 100644 --- a/v1/plugin/stream_collector_proxy.go +++ b/v1/plugin/stream_collector_proxy.go @@ -67,13 +67,6 @@ func (p *StreamProxy) StreamMetrics(stream rpc.StreamCollector_StreamMetricsServ // Metrics out of the plugin into snap. outChan := make(chan []Metric) - go func() { - err := p.plugin.StreamMetrics(inChan, outChan, errChan) - if err != nil { - log.Fatal(err) - } - }() - go p.metricSend(outChan, stream) go p.errorSend(errChan, stream) go p.streamRecv(inChan, stream) @@ -86,12 +79,17 @@ func (p *StreamProxy) SetConfig(context.Context, *rpc.ConfigMap) (*rpc.ErrReply, } func (p *StreamProxy) errorSend(errChan chan string, stream rpc.StreamCollector_StreamMetricsServer) { - for r := range errChan { - reply := &rpc.CollectReply{ - Error: &rpc.ErrReply{Error: r}, - } - if err := stream.Send(reply); err != nil { - fmt.Println(err.Error()) + for { + select { + case <-stream.Context().Done(): + return + case r := <-errChan: + reply := &rpc.CollectReply{ + Error: &rpc.ErrReply{Error: r}, + } + if err := stream.Send(reply); err != nil { + fmt.Println(err.Error()) + } } } } @@ -136,7 +134,8 @@ func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_St // send metrics if maxCollectDuration is reached sendReply(metrics, stream) metrics = []*rpc.Metric{} - + case <-stream.Context().Done(): + return } } } @@ -148,26 +147,33 @@ func (p *StreamProxy) streamRecv(ch chan []Metric, stream rpc.StreamCollector_St }, ).Debug("receiving metrics") for { - s, err := stream.Recv() - if err != nil { - fmt.Println(err) + select { + case <-stream.Context().Done(): + close(ch) return - } - if s != nil { - if s.MaxMetricsBuffer > 0 { - p.setMaxMetricsBuffer(s.MaxMetricsBuffer) - } - if s.MaxCollectDuration > 0 { - p.setMaxCollectDuration(time.Duration(s.MaxCollectDuration)) + default: + + s, err := stream.Recv() + if err != nil { + log.Debug(err) + break } - if s.Metrics_Arg != nil { - metrics := []Metric{} - for _, mt := range s.Metrics_Arg.Metrics { - metric := fromProtoMetric(mt) - metrics = append(metrics, metric) + if s != nil { + if s.MaxMetricsBuffer > 0 { + p.setMaxMetricsBuffer(s.MaxMetricsBuffer) + } + if s.MaxCollectDuration > 0 { + p.setMaxCollectDuration(time.Duration(s.MaxCollectDuration)) + } + if s.Metrics_Arg != nil { + metrics := []Metric{} + for _, mt := range s.Metrics_Arg.Metrics { + metric := fromProtoMetric(mt) + metrics = append(metrics, metric) + } + // send requested metrics to be collected into the stream plugin + ch <- metrics } - // send requested metrics to be collected into the stream plugin - ch <- metrics } } } diff --git a/v1/plugin/stream_collector_proxy_test.go b/v1/plugin/stream_collector_proxy_test.go index 914630c..0bfc28b 100644 --- a/v1/plugin/stream_collector_proxy_test.go +++ b/v1/plugin/stream_collector_proxy_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "golang.org/x/net/context" "google.golang.org/grpc" "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc" @@ -37,6 +38,10 @@ type mockStreamServer struct { recvChan chan *rpc.CollectArg } +func (m mockStreamServer) Context() context.Context { + return context.TODO() +} + func (m mockStreamServer) Send(arg *rpc.CollectReply) error { m.sendChan <- arg return nil