Skip to content

Commit

Permalink
Add flag to enable/disable sending messages to producer queue
Browse files Browse the repository at this point in the history
This is useful for users that are not interested on sending
flow data to message queue and they simply consume the output
from console with verbose flag set, i.e.:

vflow -producer-enabled=false -verbose

If not provided, it will just work as is now.
  • Loading branch information
rcarrillocruz committed Mar 12, 2021
1 parent 38d7212 commit f3ae150
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 4 deletions.
4 changes: 4 additions & 0 deletions vflow/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (i *IPFIX) run() {
go mirrorIPFIXDispatcher(ipfixMCh)

go func() {
if !opts.ProducerEnabled {
logger.Println("Producer message queue disabled")
return
}
p := producer.NewProducer(opts.MQName)

p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile)
Expand Down
4 changes: 4 additions & 0 deletions vflow/netflow_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (i *NetflowV5) run() {
logger.Printf("netflow v5 is running (UDP: listening on [::]:%d workers#: %d)", i.port, i.workers)

go func() {
if !opts.ProducerEnabled {
logger.Println("Producer message queue disabled")
return
}
p := producer.NewProducer(opts.MQName)

p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile)
Expand Down
4 changes: 4 additions & 0 deletions vflow/netflow_v9.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (i *NetflowV9) run() {
mCacheNF9 = netflow9.GetCache(opts.NetflowV9TplCacheFile)

go func() {
if !opts.ProducerEnabled {
logger.Println("Producer message queue disabled")
return
}
p := producer.NewProducer(opts.MQName)

p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile)
Expand Down
11 changes: 7 additions & 4 deletions vflow/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ type Options struct {
NetflowV9TplCacheFile string `yaml:"netflow9-tpl-cache-file"`

// producer
MQName string `yaml:"mq-name"`
MQConfigFile string `yaml:"mq-config-file"`
ProducerEnabled bool `yaml:producer-enabled"`
MQName string `yaml:"mq-name"`
MQConfigFile string `yaml:"mq-config-file"`

VFlowConfigPath string
}
Expand Down Expand Up @@ -180,8 +181,9 @@ func NewOptions() *Options {
NetflowV9Topic: "vflow.netflow9",
NetflowV9TplCacheFile: "/tmp/netflowv9.templates",

MQName: "kafka",
MQConfigFile: "mq.conf",
ProducerEnabled: true,
MQName: "kafka",
MQConfigFile: "mq.conf",

VFlowConfigPath: "/etc/vflow",
}
Expand Down Expand Up @@ -359,6 +361,7 @@ func (opts *Options) flagSet() {
flag.StringVar(&opts.NetflowV9TplCacheFile, "netflow9-tpl-cache-file", opts.NetflowV9TplCacheFile, "Netflow version 9 template cache file")

// producer options
flag.BoolVar(&opts.ProducerEnabled, "producer-enabled", opts.ProducerEnabled, "enable/disable producer message queue")
flag.StringVar(&opts.MQName, "mqueue", opts.MQName, "producer message queue name")
flag.StringVar(&opts.MQConfigFile, "mqueue-conf", opts.MQConfigFile, "producer message queue configuration file")

Expand Down
4 changes: 4 additions & 0 deletions vflow/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (s *SFlow) run() {

go func() {
p := producer.NewProducer(opts.MQName)
if !opts.ProducerEnabled {
logger.Println("Producer message queue disabled")
return
}

p.MQConfigFile = path.Join(opts.VFlowConfigPath, opts.MQConfigFile)
p.MQErrorCount = &s.stats.MQErrorCount
Expand Down

0 comments on commit f3ae150

Please sign in to comment.