Skip to content

Commit

Permalink
Copy metrics for each configured output
Browse files Browse the repository at this point in the history
This is for better thread-safety when running with multiple outputs,
which can cause very odd panics at very high loads

primarily this is to address #1432

closes #1432
  • Loading branch information
sparrc committed Jul 14, 2016
1 parent 7e86f49 commit 4ac375e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ should now look like:
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes.
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.

## v1.0 beta 2 [2016-06-21]

Expand Down
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ build-for-docker:
"-s -X main.version=$(VERSION)" \
./cmd/telegraf/telegraf.go

# Build with race detector
dev: prepare
go build -race -ldflags "-X main.version=$(VERSION)" ./...

# run package script
package:
./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload
Expand Down
23 changes: 22 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,34 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for _, o := range a.Config.Outputs {
if len(a.Config.Outputs) > 1 {
for _, o := range a.Config.Outputs {
o.AddMetric(copyMetric(m))
}
} else {
// no need to copy if there is only one output
o.AddMetric(m)
}
}
}
}

func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())

tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}

out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
}

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
Expand Down
11 changes: 9 additions & 2 deletions plugins/inputs/tcp_listener/tcp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type TcpListener struct {
accept chan bool
// drops tracks the number of dropped metrics.
drops int
// malformed tracks the number of malformed packets
malformed int

// track the listener here so we can close it in Stop()
listener *net.TCPListener
Expand All @@ -45,6 +47,9 @@ var dropwarn = "ERROR: tcp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"

var malformedwarn = "WARNING: tcp_listener has received %d malformed packets" +
" thus far."

const sampleConfig = `
## Address and port to host TCP listener on
service_address = ":8094"
Expand Down Expand Up @@ -243,8 +248,10 @@ func (t *TcpListener) tcpParser() error {
if err == nil {
t.storeMetrics(metrics)
} else {
log.Printf("Malformed packet: [%s], Error: %s\n",
string(packet), err)
t.malformed++
if t.malformed == 1 || t.malformed%1000 == 0 {
log.Printf(malformedwarn, t.malformed)
}
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion plugins/inputs/udp_listener/udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type UdpListener struct {
done chan struct{}
// drops tracks the number of dropped metrics.
drops int
// malformed tracks the number of malformed packets
malformed int

parser parsers.Parser

Expand All @@ -44,6 +46,9 @@ var dropwarn = "ERROR: udp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"

var malformedwarn = "WARNING: udp_listener has received %d malformed packets" +
" thus far."

const sampleConfig = `
## Address and port to host UDP listener on
service_address = ":8092"
Expand Down Expand Up @@ -152,7 +157,10 @@ func (u *UdpListener) udpParser() error {
if err == nil {
u.storeMetrics(metrics)
} else {
log.Printf("Malformed packet: [%s], Error: %s\n", packet, err)
u.malformed++
if u.malformed == 1 || u.malformed%1000 == 0 {
log.Printf(malformedwarn, u.malformed)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/serializers/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type GraphiteSerializer struct {
Template string
}

func (s GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}

// Convert UnixNano to Unix timestamps
Expand Down

0 comments on commit 4ac375e

Please sign in to comment.