From fd66284ddd60665ee3b517daf45b3ec26e3ad2bb Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 13 Jul 2016 08:14:48 -0600 Subject: [PATCH] Copy metrics for each configured output This is for better thread-safety when running with multiple outputs, which can cause very odd panics at very high loads primarily this is to address #1432 --- Makefile | 4 ---- agent/agent.go | 22 ++++++++++++++++++++- plugins/inputs/tcp_listener/tcp_listener.go | 11 +++++++++-- plugins/inputs/udp_listener/udp_listener.go | 10 +++++++++- plugins/serializers/graphite/graphite.go | 2 +- 5 files changed, 40 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 816c93cf18f40..ce7cf7880b227 100644 --- a/Makefile +++ b/Makefile @@ -25,10 +25,6 @@ build-for-docker: "-s -X main.version=$(VERSION)" \ ./cmd/telegraf/telegraf.go -# Build with race detector -dev: prepare - go build -race -ldflags "-X main.version=$(VERSION)" ./... - # run package script package: ./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload diff --git a/agent/agent.go b/agent/agent.go index d1d36186ea968..21a31a8513b8b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -268,13 +268,33 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() case m := <-metricC: - for _, o := range a.Config.Outputs { + if len(a.Config.Outputs) > 1 { + for _, o := range a.Config.Outputs { + o.AddMetric(copyMetric(m)) + } + } else { o.AddMetric(m) } } } } +func copyMetric(m telegraf.Metric) telegraf.Metric { + t := time.Time(m.Time()) + + tags := make(map[string]string) + fields := make(map[string]interface{}) + for k, v := range m.Tags() { + tags[k] = v + } + for k, v := range m.Fields() { + fields[k] = v + } + + out, _ := telegraf.NewMetric(m.Name(), tags, fields, t) + return out +} + // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index 053fc927e8d10..4688e008bb39c 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -31,6 +31,8 @@ type TcpListener struct { accept chan bool // drops tracks the number of dropped metrics. drops int + // malformed tracks the number of malformed packets + malformed int // track the listener here so we can close it in Stop() listener *net.TCPListener @@ -45,6 +47,9 @@ var dropwarn = "ERROR: tcp_listener message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" +var malformedwarn = "WARNING: tcp_listener has received %d malformed packets" + + " thus far." + const sampleConfig = ` ## Address and port to host TCP listener on service_address = ":8094" @@ -243,8 +248,10 @@ func (t *TcpListener) tcpParser() error { if err == nil { t.storeMetrics(metrics) } else { - log.Printf("Malformed packet: [%s], Error: %s\n", - string(packet), err) + t.malformed++ + if t.malformed == 1 || t.malformed%1000 == 0 { + log.Printf(malformedwarn, t.malformed) + } } } } diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index a20a5583f5d62..120ee50e5a7bd 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -27,6 +27,8 @@ type UdpListener struct { done chan struct{} // drops tracks the number of dropped metrics. drops int + // malformed tracks the number of malformed packets + malformed int parser parsers.Parser @@ -44,6 +46,9 @@ var dropwarn = "ERROR: udp_listener message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" +var malformedwarn = "WARNING: udp_listener has received %d malformed packets" + + " thus far." + const sampleConfig = ` ## Address and port to host UDP listener on service_address = ":8092" @@ -152,7 +157,10 @@ func (u *UdpListener) udpParser() error { if err == nil { u.storeMetrics(metrics) } else { - log.Printf("Malformed packet: [%s], Error: %s\n", packet, err) + u.malformed++ + if u.malformed == 1 || u.malformed%1000 == 0 { + log.Printf(malformedwarn, u.malformed) + } } } } diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index 6e5c4e879e4c3..2cc4add56ea55 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -20,7 +20,7 @@ type GraphiteSerializer struct { Template string } -func (s GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { +func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { out := []string{} // Convert UnixNano to Unix timestamps