Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outputs block inputs when batch size is reached #2919

Closed
rconn01 opened this issue Jun 13, 2017 · 13 comments · Fixed by #4938
Closed

Outputs block inputs when batch size is reached #2919

rconn01 opened this issue Jun 13, 2017 · 13 comments · Fixed by #4938
Assignees
Labels
area/agent bug unexpected problem or unintended behavior
Milestone

Comments

@rconn01
Copy link

rconn01 commented Jun 13, 2017

Bug report

If one of the output plugins become slow it can cause back pressure all the way up through the input plugins and start to block the input plugins from running.

The reason for this is the channels between the input and output plugins only have a buffer of 100. This should be ok since most of the operations are in memory. The problem ends up being here where the buffer gets written if the batch size is full. This ends up happening on the same thread that reads from the outMetricC channel in the agent

Relevant telegraf.conf:

# Configuration for telegraf agent
[agent]
  interval = "5s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 250000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "5s"
  precision = ""
  debug = true
  quiet = false
  logfile = ""
  hostname = ""
  omit_hostname = false


###############################################################################
#                            OUTPUT PLUGINS                                   #
###############################################################################

## Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
  urls = ["http://localhost:8086"] # required
  database = "telegraf" # required
  retention_policy = ""
  write_consistency = "any"
  timeout = "5s"


## Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
  urls = ["http://localhost:8080"] # required
  database = "telegraf" # required
  retention_policy = ""
  write_consistency = "any"
  timeout = "10s"



###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################

# Read metrics about cpu usage
[[inputs.cpu]]
  percpu = true
  totalcpu = true
  collect_cpu_time = false

# Read metrics about disk usage by mount point
[[inputs.disk]]
  ignore_fs = ["tmpfs", "devtmpfs"]

# Read metrics about disk IO by device
[[inputs.diskio]]

# Get kernel statistics from /proc/stat
[[inputs.kernel]]

# Read metrics about memory usage
[[inputs.mem]]

# Read metrics about system load & uptime
[[inputs.system]]

[[inputs.internal]]


###############################################################################
#                            SERVICE INPUT PLUGINS                            #
###############################################################################

 # Influx HTTP write listener
 [[inputs.http_listener]]
   service_address = ":8186"
   read_timeout = "10s"
   write_timeout = "10s"
   max_body_size = 0
   max_line_size = 0

System info:

Telegraf version 1.3.1

Steps to reproduce:

  1. Start simple python httpServer shown below
  2. Start telegraf with the provided config
  3. Start producing metrics to the http listener, the number of metrics produced should be more than the metricBatchSize
  4. start to see log output that looks like
2017-06-13T22:08:30Z D! Output [influxdb] wrote batch of 1000 metrics in 6.552099ms
2017-06-13T22:08:38Z D! Output [influxdb] wrote batch of 1000 metrics in 8.001152082s
2017-06-13T22:08:38Z D! Output [influxdb] wrote batch of 1000 metrics in 6.412968ms
2017-06-13T22:08:45Z E! Error in plugin [inputs.cpu]: took longer to collect than collection interval (5s)
2017-06-13T22:08:45Z E! Error in plugin [inputs.diskio]: took longer to collect than collection interval (5s)
2017-06-13T22:08:45Z E! Error in plugin [inputs.system]: took longer to collect than collection interval (5s)
2017-06-13T22:08:45Z E! Error in plugin [inputs.internal]: took longer to collect than collection interval (5s)
2017-06-13T22:08:45Z E! Error in plugin [inputs.mem]: took longer to collect than collection interval (5s)
2017-06-13T22:08:45Z E! Error in plugin [inputs.disk]: took longer to collect than collection interval (5s)
2017-06-13T22:08:46Z D! Output [influxdb] wrote batch of 1000 metrics in 8.004440807s
2017-06-13T22:08:46Z D! Output [influxdb] wrote batch of 1000 metrics in 6.244551ms
2017-06-13T22:08:50Z D! Output [influxdb] buffer fullness: 0 / 250000 metrics. 
2017-06-13T22:08:50Z D! Output [influxdb] buffer fullness: 0 / 250000 metrics. 
2017-06-13T22:08:51Z E! Error in plugin [inputs.diskio]: took longer to collect than collection interval (5s)
2017-06-13T22:08:51Z E! Error in plugin [inputs.system]: took longer to collect than collection interval (5s)
2017-06-13T22:08:54Z D! Output [influxdb] wrote batch of 1000 metrics in 8.004885924s
2017-06-13T22:08:54Z D! Output [influxdb] wrote batch of 1000 metrics in 6.013943ms
2017-06-13T22:08:58Z D! Output [influxdb] buffer fullness: 0 / 250000 metrics. 
2017-06-13T22:08:58Z D! Output [influxdb] buffer fullness: 0 / 250000 metrics. 
2017-06-13T22:08:59Z E! Error in plugin [inputs.system]: took longer to collect than collection interval (5s)
2017-06-13T22:09:00Z E! Error in plugin [inputs.disk]: took longer to collect than collection interval (5s)
2017-06-13T22:09:00Z E! Error in plugin [inputs.diskio]: took longer to collect than collection interval (5s)
2017-06-13T22:09:00Z E! Error in plugin [inputs.internal]: took longer to collect than collection interval (5s)
2017-06-13T22:09:00Z E! Error in plugin [inputs.mem]: took longer to collect than collection interval (5s)
2017-06-13T22:09:00Z E! Error in plugin [inputs.cpu]: took longer to collect than collection interval (5s)

Expected behavior:

The expected behavior is for the outputs and the inputs to be independent of each other, meaning all the inputs should write to a buffer and the outputs read from that buffer. This way the outputs can't block the inputs.

Actual behavior:

Most of the time the expected behavior happens, except for when the batch size is reached. When the batch size is reached the output happens on that thread which ends up blocking all the inputs. This causes the inputs to take a long time to collect as they get blocked waiting on the metricC channel

Additional info:

I believe this is the code path that causes the blocking.

  • an input calls gather and writes to the accumulator here
  • this results in a write channel here
  • which is the channel created here
  • and is the same channel passed to the flusher here
  • The flusher reads from that channel and adds to the outMetricC channel here
  • The outputMetricC channel is read from and then added to the runningOutput here
  • If the buffer is the same as batch size, the output ends up getting invoked here
  • if the output takes a while to execute all the channels can back up since no more reads are happening off of them and ultimately lead to the inputs getting blocked, waiting to be able to write to the accumulator

It seems that the flusher should just write to the buffer and trigger the go routine here to write instead of doing the write inline. This way the flow can still continue and metrics just get buffered

Sample python server

This is the sample python http server used for testing that does a sleep. This simulates an output taking too long.

#!/usr/bin/env python

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SocketServer import ThreadingMixIn
from optparse import OptionParser
import time

class RequestHandler(BaseHTTPRequestHandler):        
    def do_POST(self):
        stime = 8
        print "sleeping for: " + str(stime)
        time.sleep(stime)       
        self.send_response(204)
        self.end_headers()
        self.wfile.write(" ")
        self.wfile.close()

class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
    """Handle requests in a separate thread."""

def main():
    port = 8080
    print('Listening on localhost:%s' % port)
    server = ThreadedHTTPServer(('', port), RequestHandler)
    server.serve_forever()

        
if __name__ == "__main__":    
    main()
@danielnelson
Copy link
Contributor

Thanks for the write up, this is indeed the case. I would like to get this fixed but of course we need to be very careful to do it correctly. There is one more issue that complicates things, we need to ensure that inputs reading from queues do not drop messages just because the output cannot keep up.

@rconn01
Copy link
Author

rconn01 commented Jun 14, 2017

Hmm yea didn't think of that and I can I see how that complicates things, since currently the input will block and it will stop reading from queues if it can't write to the outputs. If I submit my proposal that will definitely break that flow and could cause metrics to be dropped. It's almost list blocking is needed for inputs that are queues and internal buffering is needed for inputs that do collection.

@danielnelson
Copy link
Contributor

We need a method for inputs to be notified when their metrics have been emitted, this will allow durable transfer of queue items since we can delay acking those messages as well as better flow control. Of course it needs to be performant too.

@djsly
Copy link
Contributor

djsly commented Jun 14, 2017

Sorry if I'm highjacking here.. but I have a similar problem and was wondering if it wouldn't be beneficial to augment the logs to tell if the took longer it caused by the fetching VS the pushing.

I have this in my telegraf logs... over three days of logs saying input [inputs.prometheus] took longer to collect than collection interval (10s)

2017-06-11T19:50:10Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-11T19:50:10Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-11T19:50:20Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-11T19:50:20Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-11T19:50:30Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-11T19:50:30Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
...
...
2017-06-14T21:05:10Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-14T21:05:20Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-14T21:05:20Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-14T21:05:30Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)
2017-06-14T21:05:30Z E! ERROR: input [inputs.prometheus] took longer to collect than collection interval (10s)

nothing about outputs.
My config looks like this, a simple prometheus input and a influxdb output

https://gist.github.com/djsly/6cf0e61ae440772f4c103f0898c8e62a

[[inputs.prometheus]]
  urls = ["http://kube-state-metrics:80/metrics"]
  namepass = ["kube_*"]
  tagexclude = ["url"]

and when I go in the telegraf docker container, I can easily fetch the metrics without issues.

curl http://kube-state-metrics:80/metrics | grep kube_* | tail -10
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  944k  100  944k    0     0  8570k      0 --:--:-- --:--:-- --:--:-- 8587k
kube_service_labels{label_hash="d415b9fa00e47fa9e0638d95fb1694ae",label_kubernetes_io_cluster_service="true",label_kubernetes_io_name="mzk",namespace="mahyad2",service="mzk"} 1
kube_service_labels{label_app="gateway",label_app_group_id="gwrc",label_name="gateway",namespace="le",service="ngw-expose"} 1
kube_service_labels{label_k8s_app="kube-dns",label_kubernetes_io_cluster_service="true",label_kubernetes_io_name="KubeDNS",namespace="kube-system",service="kube-dns"} 1
kube_service_labels{label_app="kube-state-metrics",label_kubernetes_io_cluster_service="true",label_kubernetes_io_name="kube-state-metrics",namespace="kube-system",service="kube-state-metrics"} 1
kube_service_labels{label_k8s_app="kubernetes-dashboard-canary",label_kubernetes_io_cluster_service="true",label_kubernetes_io_name="kubernetes-dashboard",namespace="kube-system",service="kubernetes-dashboard"} 1
kube_service_labels{label_app="media-gateway",label_app_group_id="mediagwrc",label_name="media-gateway",namespace="le",service="mgw-expose"} 1
kube_service_labels{label_app="mzk",label_app_group_id="mzkrc",label_name="mzk",namespace="le",service="mzk-expose"} 1
kube_service_labels{label_app="nmas",label_app_group_id="nmasrc",label_name="nmas",namespace="le",service="nmas-expose"} 1
kube_service_labels{label_app="vocs-pool-1",label_app_group_id="vocs-pool-1",label_name="vocs-pool-1",namespace="le",service="vocs-expose"} 1
kube_service_labels{label_app="zookeeper",label_app_group_id="zkrc",label_name="zookeeper",namespace="le",service="zk-expose"} 1

I'm running telegraf 1.2.1

root@telegraf-prometheus-55757120-qzvmf:/# telegraf version
Telegraf v1.2.1 (git: release-1.2 3b6ffb344e5c03c1595d862282a6823ecb438cff)

NB: if you prefer a new issue let me know :) thanks!

@danielnelson
Copy link
Contributor

It would be nice to improve this, but I don't think we can be confident that it is the output causing this. In general any part of the input could cause this error since the http request is just one part of the collection. Even if the input is definitely blocked on the channel it is not always the outputs fault.

One thing maybe we could do is log a warning if it takes longer than some amount of time to add a metric to the channel, or perhaps this is something we should store an internal metric for.

@rconn01
Copy link
Author

rconn01 commented Jun 15, 2017

we need to ensure that inputs reading from queues do not drop messages just because the output cannot keep up

I was giving more thought to this today and realized that I don't think that contract is kept with the way the agent is written today. Specifically in the event that the output is down or slow (takes longer than a timeout) and the write to the output fails.

In that case the agent will push the batch into the fail buffer This will allow the inputs to add metrics until they fill a batch again. That batch would then try to write, but wouldn't be able to and get added to the fail buffer. This will keep happening until the fail buffer is full at which point the agent starts dropping metrics.


As for the comment about different logging, I would agree it would be better if the logging was a little different, although not sure the best way though. The way it is know it makes it look like something is hanging on the system that is preventing it from collecting the metrics, which is what the belief was in #2183 as well. In reality there is just something blocked within telegraf (in this case). I was focused on that path for a while until I started looking at the internal metrics of how long the writes were taking. It feels there should be some indication that the collection of time of the metrics was ok.

I think it makes sense to add the logging statement of how long it took to write to a channel at the debug layer, this way its easier to see that it's not actually the collection taking forever. Although maybe if this gets fixed there won't be a need to log that as writing to the channel won't block.

@danielnelson
Copy link
Contributor

You are right about the queue handling when the output is down, metrics are dropped. When the outputs are slow but working, if I remember correctly it will block the inputs and slow consumption down, but eventually again you are right and items can be dropped. Even though the queue handling is not completely correct, if we decouple the output completely it would cause a lot more day to day problems for queue users, so it needs to be solved before we decouple things.

@buckleypm
Copy link

buckleypm commented Jun 15, 2017

Just to add a little context from my experimentation this week. I've been seeing

2017-06-13T23:07:00Z E! Error in plugin [inputs.statsd]: took longer to collect than collection interval (1m0s)

I tracked down the same path as the original submitter (and how I came across this issue report). Though at the beginning I suspected an issue with the statsd plugin due to a mutex dead lock that occurred between Statsd.Gather() and the statsd line parsing/aggregation.

In this case the statsd plugin eventually filled its incoming metric queue and starts dropping new metrics. So after an extensive rewrite to remove the mutex locking; I was able to resolve the queue full error. However the deadlock as described above in the issue remains. (statsd internal dead lock is resolved)

Unfortunately once telegraf gets here it never recovers till we restart.

On a side note if there is interest in the statsd changes I made I can start getting approvals on my side for code donation.

@danielnelson
Copy link
Contributor

Always interested in bug fixes, can you start by opening a new issue with a description of the symptoms?

@sybrandy
Copy link

Some additional info as I saw this recently: if the output, at least the InfluxDB output, cannot send a request, the same exact scenario occurs. This had a severe impact on our current system as I am using Telegraf as a metrics proxy for our apps. In our case, it was a configuration issue that was resolved quickly once I found this ticket, but it's not always going to be some thing this simple.

Now, this can be mitigated by using something like Kafka to buffer the requests and having queue inputs only ack messages that are successfully committed to a data store. However, not everyone is going to want to do this. So, this means that you may want to consider an embedded solution. The best I can come up with is having an option to buffer messages to a file when it detects that there are downstream issues. This assumes that there will be enough disk space and with good monitoring it should be noisy enough to trigger an alert. I'm not sure if we would want this on a per-output basis or have a single buffer that feeds all outputs. There are pros/cons to both. Granted, this idea needs to consider if we want to drop metrics after a specified amount or time period. If the period is short enough, then we wouldn't need the file. The file idea just allows for significantly more metrics to be pushed through before we start losing them, which may be important to some people.

@danielnelson danielnelson added the bug unexpected problem or unintended behavior label Aug 25, 2017
@rconn01
Copy link
Author

rconn01 commented Dec 12, 2017

@danielnelson any update for this blocking issue? or an idea on how this could be fixed

@danielnelson
Copy link
Contributor

I have not yet had time to work on this directly, though it is still on my long term roadmap as one of the major design issues that need to be addressed.

I think the first step towards fixing this issue is creating a method for throttling the various queue consumer inputs. These inputs should limit the messages they have in flight, perhaps using a callback when a message is output. This is complicated by the fact that metrics can be modified or duplicated depending on the plugin layout.

@jgitlin-bt
Copy link

@buckleypm I believe I might be seeing the same issue as you describe. I am seeing three telegraf servers randomly stop publishing metrics and my stack traces look similar; searching for the cause led me to this thread (but there's at least 4 other bugs that also looked similar)

Logs and Stack traces I am seeing:

2017-12-27T17:03:51Z E! Error in plugin [inputs.phpfpm]: took longer to collect than collection interval (10s)
2017-12-27T17:03:57Z E! Error: statsd message queue full. We have dropped 1 messages so far. You may want to increase allowed_pending_messages in the config
...
2017-12-27T17:05:51Z E! Error in plugin [inputs.statsd]: took longer to collect than collection interval (10s)
2017-12-27T17:05:51Z E! Error: statsd message queue full. We have dropped 10000 messages so far. You may want to increase allowed_pending_messages in the config
...
SIGQUIT: quit
PC=0x4822fa m=12 sigcode=65537

goroutine 15 [syscall]:
syscall.Syscall6(0x16b, 0x7, 0x0, 0x0, 0xc420868e88, 0xa, 0x1d3af70, 0xc420868e70, 0xc420001c80, 0xc420959640)
	/usr/local/go/src/syscall/asm_unix_amd64.s:42 +0x5 fp=0xc420868c70 sp=0xc420868c68 pc=0x4822d5
syscall.kevent(0x7, 0x0, 0x0, 0xc420868e88, 0xa, 0x1d3af70, 0x1, 0x0, 0x0)
	/usr/local/go/src/syscall/zsyscall_freebsd_amd64.go:202 +0x83 fp=0xc420868ce8 sp=0xc420868c70 pc=0x47f7e3
syscall.Kevent(0x7, 0x0, 0x0, 0x0, 0xc420868e88, 0xa, 0xa, 0x1d3af70, 0x0, 0xbc0600, ...)
	/usr/local/go/src/syscall/syscall_bsd.go:447 +0x71 fp=0xc420868d40 sp=0xc420868ce8 pc=0x47c921
..
goroutine 1 [semacquire, 126 minutes]:
sync.runtime_Semacquire(0xc42029e17c)
	/usr/local/go/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0xc42029e170)
	/usr/local/go/src/sync/waitgroup.go:131 +0x72
...
goroutine 28 [semacquire, 4 minutes]:
sync.runtime_SemacquireMutex(0xc4204def74, 0x144a800)
	/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc4204def70)
	/usr/local/go/src/sync/mutex.go:134 +0xee
github.com/influxdata/telegraf/plugins/inputs/statsd.(*Statsd).parseStatsdLine(0xc4204def00, 0xc420aa0c80, 0x47, 0x0, 0x0)
	/wrkdirs/usr/ports/net-mgmt/telegraf/work/telegraf-1.4.4/src/github.com/influxdata/telegraf/plugins/inputs/statsd/statsd.go:436 +0x69
github.com/influxdata/telegraf/plugins/inputs/statsd.(*Statsd).parser(0xc4204def00, 0x0, 0x0)
	/wrkdirs/usr/ports/net-mgmt/telegraf/work/telegraf-1.4.4/src/github.com/influxdata/telegraf/plugins/inputs/statsd/statsd.go:426 +0xe4
created by github.com/influxdata/telegraf/plugins/inputs/statsd.(*Statsd).Start
	/wrkdirs/usr/ports/net-mgmt/telegraf/work/telegraf-1.4.4/src/github.com/influxdata/telegraf/plugins/inputs/statsd/statsd.go:330 +0x75c
...
goroutine 22650 [chan send, 4 minutes]:
github.com/influxdata/telegraf/agent.(*accumulator).AddGauge(0xc420947000, 0x13c9353, 0x3, 0xc420505bf0, 0xc420505bc0, 0xc420e9b940, 0x1, 0x1)
	/wrkdirs/usr/ports/net-mgmt/telegraf/work/telegraf-1.4.4/src/github.com/influxdata/telegraf/agent/accumulator.go:64 +0x124
github.com/influxdata/telegraf/plugins/inputs/system.(*CPUStats).Gather(0xc420182d20, 0x1c232c0, 0xc420947000, 0xc4208f7440, 0x0)
	/wrkdirs/usr/ports/net-mgmt/telegraf/work/telegraf-1.4.4/src/github.com/influxdata/telegraf/plugins/inputs/system/cpu.go:122 +0xe35
github.com/influxdata/telegraf/agent.gatherWithTimeout.func1(0xc42078f0e0, 0xc42034fc80, 0xc420947000)
	/wrkdirs/usr/ports/net-mgmt/telegraf/work/telegraf-1.4.4/src/github.com/influxdata/telegraf/agent/agent.go:153 +0x49
created by github.com/influxdata/telegraf/agent.gatherWithTimeout
	/wrkdirs/usr/ports/net-mgmt/telegraf/work/telegraf-1.4.4/src/github.com/influxdata/telegraf/agent/agent.go:152 +0xc7

rax    0x4
rbx    0x0
rcx    0xc420868e88
rdx    0x0
rdi    0x7
rsi    0x0
rbp    0xc420868cd8
rsp    0xc420868c68
r8     0xa
r9     0x1d3af70
r10    0x2412689d
r11    0x212
r12    0x1
r13    0xc420122600
r14    0x0
r15    0x0
rip    0x4822fa
rflags 0x217
cs     0x43
fs     0x13
gs     0x1b

Full logs

If this looks like a different issue, please let me know

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/agent bug unexpected problem or unintended behavior
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants