diff --git a/vflow/ipfix.go b/vflow/ipfix.go index 920245f9..31d6542b 100644 --- a/vflow/ipfix.go +++ b/vflow/ipfix.go @@ -85,7 +85,6 @@ func NewIPFIX() *IPFIX { port: opts.IPFIXPort, addr: opts.IPFIXAddr, workers: opts.IPFIXWorkers, - pool: make(chan chan struct{}, maxWorkers), } } @@ -96,6 +95,8 @@ func (i *IPFIX) run() { return } + i.pool = make(chan chan struct{}, maxWorkers) + hostPort := net.JoinHostPort(i.addr, strconv.Itoa(i.port)) udpAddr, _ := net.ResolveUDPAddr("udp", hostPort) @@ -130,11 +131,10 @@ func (i *IPFIX) run() { go func() { if !opts.ProducerEnabled { - logger.Println("Producer message queue disabled") return } - p := producer.NewProducer(opts.MQName) + p := producer.NewProducer(opts.MQName) p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile) p.MQErrorCount = &i.stats.MQErrorCount p.Logger = logger @@ -171,7 +171,6 @@ func (i *IPFIX) run() { func (i *IPFIX) shutdown() { // exit if the ipfix is disabled if !opts.IPFIXEnabled { - logger.Println("ipfix disabled") return } diff --git a/vflow/netflow_v5.go b/vflow/netflow_v5.go index 5826e72a..9e8d68e8 100644 --- a/vflow/netflow_v5.go +++ b/vflow/netflow_v5.go @@ -31,7 +31,7 @@ import ( "sync/atomic" "time" - "github.com/VerizonDigital/vflow/netflow/v5" + netflow5 "github.com/VerizonDigital/vflow/netflow/v5" "github.com/VerizonDigital/vflow/producer" ) @@ -77,18 +77,20 @@ var ( func NewNetflowV5() *NetflowV5 { return &NetflowV5{ port: opts.NetflowV5Port, + addr: opts.NetflowV5Addr, workers: opts.NetflowV5Workers, - pool: make(chan chan struct{}, maxWorkers), } } func (i *NetflowV5) run() { // exit if the netflow v5 is disabled if !opts.NetflowV5Enabled { - logger.Println("netflowv5 has been disabled") + logger.Println("netflow v5 has been disabled") return } + i.pool = make(chan chan struct{}, maxWorkers) + hostPort := net.JoinHostPort(i.addr, strconv.Itoa(i.port)) udpAddr, _ := net.ResolveUDPAddr("udp", hostPort) @@ -110,11 +112,10 @@ func (i *NetflowV5) run() { go func() { if !opts.ProducerEnabled { - logger.Println("Producer message queue disabled") return } - p := producer.NewProducer(opts.MQName) + p := producer.NewProducer(opts.MQName) p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile) p.MQErrorCount = &i.stats.MQErrorCount p.Logger = logger @@ -151,7 +152,6 @@ func (i *NetflowV5) run() { func (i *NetflowV5) shutdown() { // exit if the netflow v5 is disabled if !opts.NetflowV5Enabled { - logger.Println("netflow v5 disabled") return } diff --git a/vflow/netflow_v9.go b/vflow/netflow_v9.go index 0f8ea62f..2566d532 100644 --- a/vflow/netflow_v9.go +++ b/vflow/netflow_v9.go @@ -31,7 +31,7 @@ import ( "sync/atomic" "time" - "github.com/VerizonDigital/vflow/netflow/v9" + netflow9 "github.com/VerizonDigital/vflow/netflow/v9" "github.com/VerizonDigital/vflow/producer" ) @@ -79,19 +79,20 @@ var ( func NewNetflowV9() *NetflowV9 { return &NetflowV9{ port: opts.NetflowV9Port, + addr: opts.NetflowV9Addr, workers: opts.NetflowV9Workers, - pool: make(chan chan struct{}, maxWorkers), } } func (i *NetflowV9) run() { - //TODO // exit if the netflow v9 is disabled if !opts.NetflowV9Enabled { - logger.Println("netflowv9 has been disabled") + logger.Println("netflow v9 has been disabled") return } + i.pool = make(chan chan struct{}, maxWorkers) + hostPort := net.JoinHostPort(i.addr, strconv.Itoa(i.port)) udpAddr, _ := net.ResolveUDPAddr("udp", hostPort) @@ -115,11 +116,10 @@ func (i *NetflowV9) run() { go func() { if !opts.ProducerEnabled { - logger.Println("Producer message queue disabled") return } - p := producer.NewProducer(opts.MQName) + p := producer.NewProducer(opts.MQName) p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile) p.MQErrorCount = &i.stats.MQErrorCount p.Logger = logger @@ -156,7 +156,6 @@ func (i *NetflowV9) run() { func (i *NetflowV9) shutdown() { // exit if the netflow v9 is disabled if !opts.NetflowV9Enabled { - logger.Println("netflow v9 disabled") return } diff --git a/vflow/options.go b/vflow/options.go index 62849150..1807b01d 100644 --- a/vflow/options.go +++ b/vflow/options.go @@ -65,6 +65,7 @@ type Options struct { // sFlow options SFlowEnabled bool `yaml:"sflow-enabled"` SFlowPort int `yaml:"sflow-port"` + SFlowAddr string `yaml:"sflow-addr"` SFlowUDPSize int `yaml:"sflow-udp-size"` SFlowWorkers int `yaml:"sflow-workers"` SFlowTopic string `yaml:"sflow-topic"` @@ -89,6 +90,7 @@ type Options struct { // Netflow V5 NetflowV5Enabled bool `yaml:"netflow5-enabled"` NetflowV5Port int `yaml:"netflow5-port"` + NetflowV5Addr string `yaml:"netflow5-addr"` NetflowV5UDPSize int `yaml:"netflow5-udp-size"` NetflowV5Workers int `yaml:"netflow5-workers"` NetflowV5Topic string `yaml:"netflow5-topic"` @@ -96,13 +98,14 @@ type Options struct { // Netflow NetflowV9Enabled bool `yaml:"netflow9-enabled"` NetflowV9Port int `yaml:"netflow9-port"` + NetflowV9Addr string `yaml:"netflow9-addr"` NetflowV9UDPSize int `yaml:"netflow9-udp-size"` NetflowV9Workers int `yaml:"netflow9-workers"` NetflowV9Topic string `yaml:"netflow9-topic"` NetflowV9TplCacheFile string `yaml:"netflow9-tpl-cache-file"` // producer - ProducerEnabled bool `yaml:producer-enabled"` + ProducerEnabled bool `yaml:"producer-enabled"` MQName string `yaml:"mq-name"` MQConfigFile string `yaml:"mq-config-file"` @@ -243,11 +246,8 @@ func (opts Options) vFlowIsRunning() bool { cmd := exec.Command("kill", "-0", string(b)) _, err = cmd.Output() - if err != nil { - return false - } - return true + return err == nil } func (opts Options) printVersion() { @@ -324,6 +324,7 @@ func (opts *Options) flagSet() { // sflow options flag.BoolVar(&opts.SFlowEnabled, "sflow-enabled", opts.SFlowEnabled, "enable/disable sflow listener") flag.IntVar(&opts.SFlowPort, "sflow-port", opts.SFlowPort, "sflow port number") + flag.StringVar(&opts.SFlowAddr, "sflow-addr", opts.SFlowAddr, "sflow IP address to bind to") flag.IntVar(&opts.SFlowUDPSize, "sflow-max-udp-size", opts.SFlowUDPSize, "sflow maximum UDP size") flag.IntVar(&opts.SFlowWorkers, "sflow-workers", opts.SFlowWorkers, "sflow workers number") flag.StringVar(&opts.SFlowTopic, "sflow-topic", opts.SFlowTopic, "sflow topic name") @@ -348,6 +349,7 @@ func (opts *Options) flagSet() { // netflow version 5 flag.BoolVar(&opts.NetflowV5Enabled, "netflow5-enabled", opts.NetflowV5Enabled, "enable/disable netflow version 5 listener") flag.IntVar(&opts.NetflowV5Port, "netflow5-port", opts.NetflowV5Port, "Netflow Version 5 port number") + flag.StringVar(&opts.NetflowV5Addr, "netflow5-addr", opts.NetflowV5Addr, "Netflow 5 IP address to bind to") flag.IntVar(&opts.NetflowV5UDPSize, "netflow5-max-udp-size", opts.NetflowV5UDPSize, "Netflow version 5 maximum UDP size") flag.IntVar(&opts.NetflowV5Workers, "netflow5-workers", opts.NetflowV5Workers, "Netflow version 5 workers number") flag.StringVar(&opts.NetflowV5Topic, "netflow5-topic", opts.NetflowV5Topic, "Netflow version 5 topic name") @@ -355,6 +357,7 @@ func (opts *Options) flagSet() { // netflow version 9 flag.BoolVar(&opts.NetflowV9Enabled, "netflow9-enabled", opts.NetflowV9Enabled, "enable/disable netflow version 9 listener") flag.IntVar(&opts.NetflowV9Port, "netflow9-port", opts.NetflowV9Port, "Netflow Version 9 port number") + flag.StringVar(&opts.NetflowV9Addr, "netflow9-addr", opts.NetflowV9Addr, "Netflow 9 IP address to bind to") flag.IntVar(&opts.NetflowV9UDPSize, "netflow9-max-udp-size", opts.NetflowV9UDPSize, "Netflow version 9 maximum UDP size") flag.IntVar(&opts.NetflowV9Workers, "netflow9-workers", opts.NetflowV9Workers, "Netflow version 9 workers number") flag.StringVar(&opts.NetflowV9Topic, "netflow9-topic", opts.NetflowV9Topic, "Netflow version 9 topic name") diff --git a/vflow/sflow.go b/vflow/sflow.go index 978e4651..959ba5ab 100644 --- a/vflow/sflow.go +++ b/vflow/sflow.go @@ -82,8 +82,8 @@ var ( func NewSFlow() *SFlow { return &SFlow{ port: opts.SFlowPort, + addr: opts.SFlowAddr, workers: opts.SFlowWorkers, - pool: make(chan chan struct{}, maxWorkers), } } @@ -95,6 +95,8 @@ func (s *SFlow) run() { return } + s.pool = make(chan chan struct{}, maxWorkers) + hostPort := net.JoinHostPort(s.addr, strconv.Itoa(s.port)) udpAddr, _ := net.ResolveUDPAddr("udp", hostPort) @@ -117,12 +119,11 @@ func (s *SFlow) run() { logger.Printf("sFlow is running (UDP: listening on [::]:%d workers#: %d)", s.port, s.workers) go func() { - p := producer.NewProducer(opts.MQName) if !opts.ProducerEnabled { - logger.Println("Producer message queue disabled") return } + p := producer.NewProducer(opts.MQName) p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile) p.MQErrorCount = &s.stats.MQErrorCount p.Logger = logger @@ -157,11 +158,17 @@ func (s *SFlow) run() { } func (s *SFlow) shutdown() { + // exit if the sFlow v5 is disabled + if !opts.SFlowEnabled { + return + } + + // stop reading from UDP listener s.stop = true logger.Println("stopping sflow service gracefully ...") time.Sleep(1 * time.Second) s.conn.Close() - logger.Println("vFlow has been shutdown") + logger.Println("sFlow has been shutdown") close(sFlowUDPCh) } diff --git a/vflow/vflow.go b/vflow/vflow.go index fbfc7206..fc106e70 100644 --- a/vflow/vflow.go +++ b/vflow/vflow.go @@ -54,12 +54,11 @@ func main() { signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM) logger = opts.Logger - sFlow := NewSFlow() - ipfix := NewIPFIX() - netflow5 := NewNetflowV5() - netflow9 := NewNetflowV9() + if !opts.ProducerEnabled { + logger.Println("producer message queue has been disabled") + } - protos := []proto{sFlow, ipfix, netflow5, netflow9} + protos := []proto{NewSFlow(), NewIPFIX(), NewNetflowV5(), NewNetflowV9()} for _, p := range protos { wg.Add(1)