Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: Add queued_chunks_limit_size to control the number of queued chunks #1916

Merged
merged 3 commits into from
Apr 3, 2018

Conversation

repeatedly
Copy link
Member

@repeatedly repeatedly commented Mar 30, 2018

Releated to #1904
Current patch resolves generating lots of queued chunks with small flush_interval.
Note that v0.12's the number of queues is 1.

I will check how control this parameter with write families.

@repeatedly
Copy link
Member Author

@mururu How about this change?

@mururu
Copy link
Member

mururu commented Apr 3, 2018

This patch addresses only too many buffer file problem mentioned in #1904, not high CPU usage, right? Then basically it looks good.
But I have a concern about flush_at_shutdown. flush_at_shutdown calls #enqueue_all at first, so some chunks seem not to be flushed at shutdown phase if we have too many chunks.

@repeatedly
Copy link
Member Author

high CPU usage

I think this problem was resolved since 1.1.2: #1901

But I have a concern about flush_at_shutdown. flush_at_shutdown calls #enqueue_all at first

Ah I forgot it. Yeah, v0.12 enqueues all un-flushed buffer and flushes it. How about following patch?

diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb
index 72b0dae..9e4c89e 100644
--- a/lib/fluent/plugin/buffer.rb
+++ b/lib/fluent/plugin/buffer.rb
@@ -428,12 +428,13 @@ module Fluent
         end
       end
 
-      def enqueue_all
+      # At flush_at_shutdown, all staged chunks should be enqueued for buffer flush. Set true to force_enqueue for it.
+      def enqueue_all(force_enqueue = false)
         log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id }
 
         if block_given?
           synchronize{ @stage.keys }.each do |metadata|
-            return if queue_full?
+            return if !force_enqueue && queue_full?
             # NOTE: The following line might cause data race depending on Ruby implementations except CRuby
             # cf. https://github.com/fluent/fluentd/pull/1721#discussion_r146170251
             chunk = @stage[metadata]
@@ -443,7 +444,7 @@ module Fluent
           end
         else
           synchronize{ @stage.keys }.each do |metadata|
-            return if queue_full?
+            return if !force_enqueue && queue_full?
             enqueue_chunk(metadata)
           end
         end
diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb
index 4a8b42d..61d9afd 100644
--- a/lib/fluent/plugin/output.rb
+++ b/lib/fluent/plugin/output.rb
@@ -1197,7 +1197,7 @@ module Fluent
 
       def force_flush
         if @buffering
-          @buffer.enqueue_all
+          @buffer.enqueue_all(true)
           submit_flush_all
         end
       end

@mururu
Copy link
Member

mururu commented Apr 3, 2018

Looks good.

@repeatedly repeatedly merged commit fb3178d into master Apr 3, 2018
@repeatedly repeatedly deleted the limit-queued-chunks branch April 3, 2018 20:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Feature request or improve operations v1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants