Skip to content

Commit

Permalink
Feature influxdata#1820: add testing without outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
ingosus committed Feb 20, 2017
1 parent 3b6ffb3 commit 6ad58ce
Showing 1 changed file with 121 additions and 96 deletions.
217 changes: 121 additions & 96 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,94 +109,17 @@ Examples:

var stop chan struct{}

var srvc service.Service

type program struct{}

func reloadLoop(stop chan struct{}, s service.Service) {
defer func() {
if service.Interactive() {
os.Exit(0)
}
return
}()
func reloadLoop(
stop chan struct{},
inputFilters []string,
outputFilters []string,
aggregatorFilters []string,
processorFilters []string,
) {
reload := make(chan bool, 1)
reload <- true
for <-reload {
reload <- false
flag.Parse()
args := flag.Args()

var inputFilters []string
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
var outputFilters []string
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
var aggregatorFilters []string
if *fAggregatorFilters != "" {
aggregatorFilter := strings.TrimSpace(*fAggregatorFilters)
aggregatorFilters = strings.Split(":"+aggregatorFilter+":", ":")
}
var processorFilters []string
if *fProcessorFilters != "" {
processorFilter := strings.TrimSpace(*fProcessorFilters)
processorFilters = strings.Split(":"+processorFilter+":", ":")
}

if len(args) > 0 {
switch args[0] {
case "version":
fmt.Printf("Telegraf v%s (git: %s %s)\n", version, branch, commit)
return
case "config":
config.PrintSampleConfig(
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
return
}
}

// switch for flags which just do something and exit immediately
switch {
case *fOutputList:
fmt.Println("Available Output Plugins:")
for k, _ := range outputs.Outputs {
fmt.Printf(" %s\n", k)
}
return
case *fInputList:
fmt.Println("Available Input Plugins:")
for k, _ := range inputs.Inputs {
fmt.Printf(" %s\n", k)
}
return
case *fVersion:
fmt.Printf("Telegraf v%s (git: %s %s)\n", version, branch, commit)
return
case *fSampleConfig:
config.PrintSampleConfig(
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
return
case *fUsage != "":
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("E! %s and %s", err, err2)
}
}
return
}

// If no other options are specified, load the config file and run.
c := config.NewConfig()
Expand All @@ -213,7 +136,7 @@ func reloadLoop(stop chan struct{}, s service.Service) {
log.Fatal("E! " + err.Error())
}
}
if len(c.Outputs) == 0 {
if !*fTest && len(c.Outputs) == 0 {
log.Fatalf("E! Error: no outputs found, did you provide a valid config file?")
}
if len(c.Inputs) == 0 {
Expand All @@ -237,7 +160,7 @@ func reloadLoop(stop chan struct{}, s service.Service) {
if err != nil {
log.Fatal("E! " + err.Error())
}
return
os.Exit(0)
}

err = ag.Connect()
Expand Down Expand Up @@ -271,14 +194,21 @@ func reloadLoop(stop chan struct{}, s service.Service) {
log.Printf("I! Tags enabled: %s", c.ListTags())

if *fPidfile != "" {
f, err := os.Create(*fPidfile)
f, err := os.OpenFile(*fPidfile, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatalf("E! Unable to create pidfile: %s", err)
log.Printf("E! Unable to create pidfile: %s", err)
} else {
fmt.Fprintf(f, "%d\n", os.Getpid())

f.Close()

defer func() {
err := os.Remove(*fPidfile)
if err != nil {
log.Printf("E! Unable to remove pidfile: %s", err)
}
}()
}

fmt.Fprintf(f, "%d\n", os.Getpid())

f.Close()
}

ag.Run(shutdown)
Expand All @@ -290,14 +220,26 @@ func usageExit(rc int) {
os.Exit(rc)
}

type program struct {
inputFilters []string
outputFilters []string
aggregatorFilters []string
processorFilters []string
}

func (p *program) Start(s service.Service) error {
srvc = s
go p.run()
return nil
}
func (p *program) run() {
stop = make(chan struct{})
reloadLoop(stop, srvc)
reloadLoop(
stop,
p.inputFilters,
p.outputFilters,
p.aggregatorFilters,
p.processorFilters,
)
}
func (p *program) Stop(s service.Service) error {
close(stop)
Expand All @@ -307,6 +249,74 @@ func (p *program) Stop(s service.Service) error {
func main() {
flag.Usage = func() { usageExit(0) }
flag.Parse()
args := flag.Args()

inputFilters, outputFilters := []string{}, []string{}
if *fInputFilters != "" {
inputFilters = strings.Split(":"+strings.TrimSpace(*fInputFilters)+":", ":")
}
if *fOutputFilters != "" {
outputFilters = strings.Split(":"+strings.TrimSpace(*fOutputFilters)+":", ":")
}

aggregatorFilters, processorFilters := []string{}, []string{}
if *fAggregatorFilters != "" {
aggregatorFilters = strings.Split(":"+strings.TrimSpace(*fAggregatorFilters)+":", ":")
}
if *fProcessorFilters != "" {
processorFilters = strings.Split(":"+strings.TrimSpace(*fProcessorFilters)+":", ":")
}

if len(args) > 0 {
switch args[0] {
case "version":
fmt.Printf("Telegraf v%s (git: %s %s)\n", version, branch, commit)
return
case "config":
config.PrintSampleConfig(
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
return
}
}

// switch for flags which just do something and exit immediately
switch {
case *fOutputList:
fmt.Println("Available Output Plugins:")
for k, _ := range outputs.Outputs {
fmt.Printf(" %s\n", k)
}
return
case *fInputList:
fmt.Println("Available Input Plugins:")
for k, _ := range inputs.Inputs {
fmt.Printf(" %s\n", k)
}
return
case *fVersion:
fmt.Printf("Telegraf v%s (git: %s %s)\n", version, branch, commit)
return
case *fSampleConfig:
config.PrintSampleConfig(
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
return
case *fUsage != "":
err := config.PrintInputConfig(*fUsage)
err2 := config.PrintOutputConfig(*fUsage)
if err != nil && err2 != nil {
log.Fatalf("E! %s and %s", err, err2)
}
return
}

if runtime.GOOS == "windows" {
svcConfig := &service.Config{
Name: "telegraf",
Expand All @@ -316,7 +326,12 @@ func main() {
Arguments: []string{"-config", "C:\\Program Files\\Telegraf\\telegraf.conf"},
}

prg := &program{}
prg := &program{
inputFilters: inputFilters,
outputFilters: outputFilters,
aggregatorFilters: aggregatorFilters,
processorFilters: processorFilters,
}
s, err := service.New(prg, svcConfig)
if err != nil {
log.Fatal("E! " + err.Error())
Expand All @@ -327,10 +342,14 @@ func main() {
if *fConfig != "" {
(*svcConfig).Arguments = []string{"-config", *fConfig}
}
if *fConfigDirectory != "" {
(*svcConfig).Arguments = append((*svcConfig).Arguments, "-config-directory", *fConfigDirectory)
}
err := service.Control(s, *fService)
if err != nil {
log.Fatal("E! " + err.Error())
}
os.Exit(0)
} else {
err = s.Run()
if err != nil {
Expand All @@ -339,6 +358,12 @@ func main() {
}
} else {
stop = make(chan struct{})
reloadLoop(stop, nil)
reloadLoop(
stop,
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
}
}

0 comments on commit 6ad58ce

Please sign in to comment.