Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds flag 'addr'
Browse files Browse the repository at this point in the history
- The flag 'addr' provides the address that GRCP will listen on (format: 0.0.0.0)
  • Loading branch information
jcooklin committed May 31, 2017
1 parent 4f3438e commit 06336aa
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 54 deletions.
14 changes: 6 additions & 8 deletions examples/snap-plugin-collector-rand/rand/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,12 @@ func init() {
//When the flag is set, an additional policy will be added in GetConfigPolicy().
//This additional policy has a required field. This simulates
//the situation when a plugin requires a config to load.
plugin.AddFlag(
cli.BoolFlag{
Name: "required-config",
Hidden: false,
Usage: "Plugin requires config passed in",
Destination: &req,
},
)
plugin.Flags = append(plugin.Flags, cli.BoolFlag{
Name: "required-config",
Hidden: false,
Usage: "Plugin requires config passed in",
Destination: &req,
})
}

// Mock collector implementation used for testing
Expand Down
7 changes: 7 additions & 0 deletions v1/plugin/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ var (
Name: "port",
Usage: "port GRPC will listen on",
}
grpcListenAddr = "127.0.0.1"
flAddr = cli.StringFlag{
Name: "addr",
Usage: "addr GRPC will listen on",
Value: grpcListenAddr,
Destination: &grpcListenAddr,
}
flLogLevel = cli.IntFlag{
Name: "log-level",
Usage: "log level - 0:panic 1:fatal 2:error 3:warn 4:info 5:debug",
Expand Down
68 changes: 53 additions & 15 deletions v1/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ var (
version int
opts []MetaOpt
}
)

func init() {
app = cli.NewApp()
app.Flags = []cli.Flag{
// Flags required by the plugin lib flags - plugin authors can provide their
// own flags.
Flags []cli.Flag = []cli.Flag{
flConfig,
flAddr,
flPort,
flPprof,
flTLS,
Expand All @@ -68,15 +67,14 @@ func init() {
flHTTPPort,
flLogLevel,
}
app.Action = startPlugin
}
)

// AddFlag accepts a cli.Flag to the plugins standard flags.
func AddFlag(flags ...cli.Flag) {
for _, f := range flags {
app.Flags = append(app.Flags, f)
}
}
// // AddFlag accepts a cli.Flag to the plugins standard flags.
// func AddFlag(flags ...cli.Flag) {
// for _, f := range flags {
// app.Flags = append(app.Flags, f)
// }
// }

// Plugin is the base plugin type. All plugins must implement GetConfigPolicy.
type Plugin interface {
Expand Down Expand Up @@ -343,6 +341,10 @@ func buildGRPCServer(typeOfPlugin pluginType, name string, version int, arg *Arg
// generates a response for the initial stdin / stdout handshake, and starts
// the plugin's gRPC server.
func StartCollector(plugin Collector, name string, version int, opts ...MetaOpt) int {
app = cli.NewApp()
app.Flags = Flags
app.Action = startPlugin

appArgs.plugin = plugin
appArgs.name = name
appArgs.version = version
Expand All @@ -363,6 +365,10 @@ func StartCollector(plugin Collector, name string, version int, opts ...MetaOpt)
// generates a response for the initial stdin / stdout handshake, and starts
// the plugin's gRPC server.
func StartProcessor(plugin Processor, name string, version int, opts ...MetaOpt) int {
app = cli.NewApp()
app.Flags = Flags
app.Action = startPlugin

appArgs.plugin = plugin
appArgs.name = name
appArgs.version = version
Expand All @@ -381,6 +387,10 @@ func StartProcessor(plugin Processor, name string, version int, opts ...MetaOpt)
// generates a response for the initial stdin / stdout handshake, and starts
// the plugin's gRPC server.
func StartPublisher(plugin Publisher, name string, version int, opts ...MetaOpt) int {
app = cli.NewApp()
app.Flags = Flags
app.Action = startPlugin

appArgs.plugin = plugin
appArgs.name = name
appArgs.version = version
Expand All @@ -399,6 +409,10 @@ func StartPublisher(plugin Publisher, name string, version int, opts ...MetaOpt)
// generates a response for the initial stdin / stdout handshake, and starts
// the plugin's gRPC server.
func StartStreamCollector(plugin StreamCollector, name string, version int, opts ...MetaOpt) int {
app = cli.NewApp()
app.Flags = Flags
app.Action = startPlugin

appArgs.plugin = plugin
appArgs.name = name
appArgs.version = version
Expand Down Expand Up @@ -573,7 +587,7 @@ func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isP
}
l.Close()

addr := fmt.Sprintf("127.0.0.1:%v", l.Addr().(*net.TCPAddr).Port)
addr := fmt.Sprintf("%s:%v", grpcListenAddr, l.Addr().(*net.TCPAddr).Port)
lis, err := net.Listen("tcp", addr)
if err != nil {
return "", err
Expand All @@ -591,9 +605,13 @@ func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isP
return "", err
}
}
advertisedAddr, err := getAddr(grpcListenAddr)
if err != nil {
return "", err
}
resp := preamble{
Meta: *m,
ListenAddress: addr,
ListenAddress: fmt.Sprintf("%v:%v", advertisedAddr, l.Addr().(*net.TCPAddr).Port),
Type: m.Type,
PprofAddress: pprofAddr,
State: 0, // Hardcode success since panics on err
Expand All @@ -606,6 +624,26 @@ func printPreambleAndServe(srv server, m *meta, p *pluginProxy, port string, isP
return string(preambleJSON), nil
}

// getAddr if we were provided the addr 0.0.0.0 we need to determine the
// address we will advertise to the framework in the premable.
func getAddr(addr string) (string, error) {
if strings.Compare(addr, "0.0.0.0") == 0 {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, address := range addrs {
// check the address type and if it is not a loopback the display it
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String(), nil
}
}
}
}
return addr, nil
}

func showDiagnostics(m meta, p *pluginProxy, c Config) error {
defer timeTrack(time.Now(), "showDiagnostics")
printRuntimeDetails(m)
Expand Down
68 changes: 37 additions & 31 deletions v1/plugin/stream_collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,6 @@ func (p *StreamProxy) StreamMetrics(stream rpc.StreamCollector_StreamMetricsServ
// Metrics out of the plugin into snap.
outChan := make(chan []Metric)

go func() {
err := p.plugin.StreamMetrics(inChan, outChan, errChan)
if err != nil {
log.Fatal(err)
}
}()

go p.metricSend(outChan, stream)
go p.errorSend(errChan, stream)
go p.streamRecv(inChan, stream)
Expand All @@ -86,12 +79,17 @@ func (p *StreamProxy) SetConfig(context.Context, *rpc.ConfigMap) (*rpc.ErrReply,
}

func (p *StreamProxy) errorSend(errChan chan string, stream rpc.StreamCollector_StreamMetricsServer) {
for r := range errChan {
reply := &rpc.CollectReply{
Error: &rpc.ErrReply{Error: r},
}
if err := stream.Send(reply); err != nil {
fmt.Println(err.Error())
for {
select {
case <-stream.Context().Done():
return
case r := <-errChan:
reply := &rpc.CollectReply{
Error: &rpc.ErrReply{Error: r},
}
if err := stream.Send(reply); err != nil {
fmt.Println(err.Error())
}
}
}
}
Expand Down Expand Up @@ -136,7 +134,8 @@ func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_St
// send metrics if maxCollectDuration is reached
sendReply(metrics, stream)
metrics = []*rpc.Metric{}

case <-stream.Context().Done():
return
}
}
}
Expand All @@ -148,26 +147,33 @@ func (p *StreamProxy) streamRecv(ch chan []Metric, stream rpc.StreamCollector_St
},
).Debug("receiving metrics")
for {
s, err := stream.Recv()
if err != nil {
fmt.Println(err)
select {
case <-stream.Context().Done():
close(ch)
return
}
if s != nil {
if s.MaxMetricsBuffer > 0 {
p.setMaxMetricsBuffer(s.MaxMetricsBuffer)
}
if s.MaxCollectDuration > 0 {
p.setMaxCollectDuration(time.Duration(s.MaxCollectDuration))
default:

s, err := stream.Recv()
if err != nil {
log.Debug(err)
break
}
if s.Metrics_Arg != nil {
metrics := []Metric{}
for _, mt := range s.Metrics_Arg.Metrics {
metric := fromProtoMetric(mt)
metrics = append(metrics, metric)
if s != nil {
if s.MaxMetricsBuffer > 0 {
p.setMaxMetricsBuffer(s.MaxMetricsBuffer)
}
if s.MaxCollectDuration > 0 {
p.setMaxCollectDuration(time.Duration(s.MaxCollectDuration))
}
if s.Metrics_Arg != nil {
metrics := []Metric{}
for _, mt := range s.Metrics_Arg.Metrics {
metric := fromProtoMetric(mt)
metrics = append(metrics, metric)
}
// send requested metrics to be collected into the stream plugin
ch <- metrics
}
// send requested metrics to be collected into the stream plugin
ch <- metrics
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions v1/plugin/stream_collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin/rpc"
Expand All @@ -37,6 +38,10 @@ type mockStreamServer struct {
recvChan chan *rpc.CollectArg
}

func (m mockStreamServer) Context() context.Context {
return context.TODO()
}

func (m mockStreamServer) Send(arg *rpc.CollectReply) error {
m.sendChan <- arg
return nil
Expand Down

0 comments on commit 06336aa

Please sign in to comment.