Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

the loading to each worker are slightly different for the multi process worker feature #3346

Closed
chikinchoi opened this issue Apr 22, 2021 · 17 comments
Assignees
Labels
waiting-for-user Similar to "moreinfo", but especially need feedback from user

Comments

@chikinchoi
Copy link

Describe the bug
The "multi process workers" feature is not working. I have defined 2 workers in the system directive of the fluentd config. However, when I use the Grafana to check the performance of the fluentd, the fluentd_output_status_buffer_available_space_ratio metrics of each worker are slightly different. For example, worker0 is 98% and worker1 is 0%.

To Reproduce
To Reproduce, please use the below fluentd config:

<system>
  @log_level debug
  workers 2
  root_dir /var/log/fluent/
</system>

<source>
  @type  forward
  @id    input1
  @log_level debug
  port  24224
  bind 0.0.0.0
</source>

# Used for docker health check
<source>
  @type http
  port 8888
  bind 0.0.0.0
</source>

# records sent for health checking won't be forwarded anywhere
<match health**>
  @type null
</match>

<filter **>
  @type string_scrub
  replace_char ?
</filter>

<filter **firelens**>
  @type concat
  key log
  multiline_start_regexp '^\{\\"@timestamp'
  multiline_end_regexp '/\}/'
  separator ""
  flush_interval 1
  timeout_label @NORMAL
</filter>

<filter **firelens**>
  @type parser
  key_name log
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

<filter **firelens**>
  @type record_modifier
  <record>
    taskDef ${record["ecs_task_definition"].gsub(/:.*/, '')}
  </record>
</filter>

<filter kube**>
  @type record_modifier
  <record>
    taskDef ${record["ecs_task_definition"].gsub(/:.*/, '')}
  </record>
</filter>

<filter lambdaNode**>
  @type record_modifier
  <record>
    functionName ${record["context"]["functionName"]}
  </record>
</filter>

<filter lambdaPython**>
  @type record_modifier
  <record>
    functionName ${record["function_name"]}
  </record>
</filter>

<filter lambdaNode**>
  @type grep
  <exclude>
    key functionName
    pattern /^(?:null|)$/
  </exclude>
</filter>

<filter lambdaPython**>
  @type grep
  <exclude>
    key functionName
    pattern  /^(?:null|)$/
  </exclude>
</filter>

# Prometheus Configuration
# count number of incoming records per tag

<filter **firelens**>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total_firelens
    type counter
    desc The total number of incoming records for firelens
    <labels>
      taskDef ${taskDef}
    </labels>
  </metric>
</filter>

<filter kube**>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total_kube
    type counter
    desc The total number of incoming records for kubernetes
    <labels>
      taskDef ${taskDef}
    </labels>
  </metric>
</filter>

<filter lambda**>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total_lambda
    type counter
    desc The total number of incoming records for lambda
    <labels>
      functionName ${functionName}
    </labels>
  </metric>
</filter>

<filter lambdaNode**>
  @type parser
  key_name data
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

<filter lambdaPython**>
  @type parser
  key_name message
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

# count number of outgoing records per tag
<match **firelens**>
  @type copy
  @id firelens
  <store>
    @type elasticsearch
    @id firelens_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${taskDef}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    <buffer taskDef>
      @type file
      flush_mode interval
      flush_interval 5s
      flush_thread_count 16
      total_limit_size 8GB
      chunk_limit_size 80MB
      overflow_action drop_oldest_chunk
      retry_max_interval 16s
      disable_chunk_backup true
      retry_forever false
      chunk_limit_records 1000
    </buffer>
    <metadata>
     include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id firelens_pro
    <metric>
      name fluentd_output_status_num_records_total_firelens
      type counter
      desc The total number of outgoing records firelens
      <labels>
        taskDef ${taskDef}
      </labels>
    </metric>
  </store>
</match>

<match kube**>
  @type copy
  @id kube
  <store>
    @type elasticsearch
    @id kube_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${taskDef}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    <buffer taskDef>
      @type file
      flush_mode interval
      flush_interval 5s
      flush_thread_count 16
      total_limit_size 512MB
      chunk_limit_size 80MB
      overflow_action drop_oldest_chunk
      retry_max_interval 16s
      disable_chunk_backup true
      retry_forever false
      chunk_limit_records 1000
    </buffer>
    <metadata>
     include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id kube_pro
    <metric>
      name fluentd_output_status_num_records_total_kube
      type counter
      desc The total number of outgoing records kubernetes
      <labels>
        taskDef ${taskDef}
      </labels>
    </metric>
  </store>
</match>

<match lambdaNode**>
  @type copy
  @id lambdaNode
  <store>
    @type elasticsearch
    @id lambdaNode_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    include_timestamp true
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${$.context.functionName}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    <buffer $.context.functionName>
      flush_mode interval
      flush_interval 5s
      chunk_limit_size 5MB
      flush_thread_count 16
      total_limit_size 256MB
      retry_max_interval 16s
      disable_chunk_backup true
      chunk_limit_records 1000
    </buffer>
    <metadata>
      include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id lambdaNode_pro
    <metric>
      name fluentd_output_status_num_records_total_lambda
      type counter
      desc The total number of outgoing records lambda
      <labels>
        functionName ${functionName}
      </labels>
    </metric>
  </store>
</match>

<match lambdaPython**>
  @type copy
  @id lambdaPython
  <store>
    @type elasticsearch
    @id lambdaPython_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    include_timestamp true
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${function_name}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    <buffer function_name>
      flush_mode interval
      flush_interval 5s
      chunk_limit_size 5MB
      flush_thread_count 16
      total_limit_size 256MB
      retry_max_interval 16s
      disable_chunk_backup true
      chunk_limit_records 1000
    </buffer>
    <metadata>
      include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id lambdaPython_pro
    <metric>
      name fluentd_output_status_num_records_total_lambda
      type counter
      desc The total number of outgoing records lambda
      <labels>
        functionName ${functionName}
      </labels>
    </metric>
  </store>
</match>

<label @FLUENT_LOG>
  <match fluent.*>
    @type null
  </match>
</label>

<label @NORMAL>
  <match **>
    @type null
  </match>
</label>

# expose metrics in prometheus format
<source>
  @type prometheus
  bind 0.0.0.0
  port 24231
  metrics_path /metrics
</source>

<source>
  @type prometheus_output_monitor
  interval 10
  <labels>
    hostname ${hostname}
  </labels>
</source>

Expected behavior
I expect that the fluentd_output_status_buffer_available_space_ratio should be evenly as the distribution of the loading to each workers should be evenly too.

Your Environment

  • fluentd:v1.11.1-1.0
@fujimotos
Copy link
Member

@chikinchoi Can you set the following environmental variable and
see if the load balancing status improves?

$ export SERVERENGINE_USE_SOCKET_REUSEPORT=1
$ fluentd -c your-config.conf

Here is some background note:

The "multi process workers" feature is not working.
...
For example, worker0 is 98% and worker1 is 0%.

This is actually a common issue among server products on Linux.
Nginx has the exact same issue:

https://blog.cloudflare.com/the-sad-state-of-linux-socket-balancing/

The core problem is that Fluentd itself has no load-balancing mechanism.
It just prepares a bunch of worker processes, each listening on a shared socket.
When a request arrives, every worker wakes up, rashes to accept(), and
whoever gets there first "wins" (and get the task as a treat).

This model works poorly on Linux, because Linux often wakes the busiest
process first. So there is no load balancing. It's just that a single worker
winning the game again and again, leaving other workers just slacking off.

The SERVERENGINE_USE_SOCKET_REUSEPORT mentioned above was
introduced in treasure-data/serverengine#103 to specifically resolve this issue.

This is experimental and not well documented, but it's worth a try if the above
issue is bugging you.

@fujimotos fujimotos self-assigned this Apr 22, 2021
@fujimotos fujimotos added the waiting-for-user Similar to "moreinfo", but especially need feedback from user label Apr 22, 2021
@chikinchoi
Copy link
Author

Hi @fujimotos ,

Thank you for your quick reply!
Is it mean this uneven behavior is expected for the multi worker feature?
For the SERVERENGINE_USE_SOCKET_REUSEPORT parameter, is it ok to add it into dockerfile?
below is my dockerfile config:

FROM fluent/fluentd:v1.11.1-1.0
# Use root account to use apk
USER root
# below RUN includes plugin as examples elasticsearch is not required
# you may customize including plugins as you wish
RUN apk add --no-cache --update --virtual .build-deps \
        sudo build-base ruby-dev \
&& sudo gem install fluent-plugin-elasticsearch -v 4.2.2 \
&& sudo gem install fluent-plugin-prometheus \
&& sudo gem sources --clear-all \
&& sudo gem install elasticsearch-xpack \
&& sudo gem install fluent-plugin-record-modifier \
&& sudo gem install fluent-plugin-concat \
&& sudo gem install typhoeus \
&& sudo gem install fluent-plugin-string-scrub \
&& apk add curl \
&& apk del .build-deps \
&& rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem
COPY fluent.conf /fluentd/etc/
RUN mkdir /var/log/fluent
RUN chmod -R 777 /var/log/fluent
RUN chown -R fluent /var/log/fluent
RUN sniffer=$(gem contents fluent-plugin-elasticsearch|grep elasticsearch_simple_sniffer.rb ); \
echo $sniffer
# fluentd -c /fluentd/etc/fluent.conf -r $sniffer;
COPY entrypoint.sh /bin/
RUN chmod +x /bin/entrypoint.sh

# USER fluent

@fujimotos
Copy link
Member

Is it mean this uneven behavior is expected for the multi worker feature?

@chibicode Right. The uneven worker load is an open issue on Linux.

One proposed solution is SERVERENGINE_USE_SOCKET_REUSEPORT.
It's promising, but still being in the experimental stage. So we haven't
yet enabled the feature by default.

For the SERVERENGINE_USE_SOCKET_REUSEPORT parameter, is it ok to add it into dockerfile?

In your use case, I think the best point to set the env is /bin/entrypoint.sh.
Add the export line just before the main program invocation.

Here is an example:

#!/bin/bash
export SERVERENGINE_USE_SOCKET_REUSEPORT=1
fluentd -c /fluentd/etc/fluent.conf

@chikinchoi
Copy link
Author

Hi @fujimotos ,

Thank you for your replying.
I am testing to add "SERVERENGINE_USE_SOCKET_REUSEPORT" to the entrypoint.sh and will let your know the result once done.

Right. The uneven worker load is an open issue on Linux.

for the uneven worker load issue, I read the fluentd document and saw that there is a "worker N-M directive". may I know what is the purpose of the "worker N-M" if the uneven worker load issue is expected behavior?
Thank you very much!

@chikinchoi
Copy link
Author

@fujimotos ,
I have added "SERVERENGINE_USE_SOCKET_REUSEPORT" in entrypoint.sh as the below script but found that the loading to each worker still has different. The fluentd_output_status_buffer_available_space_ratio of worker0 is 92.7% and worker1 is 99.2%. Is this difference expected? Also, how can I verify if the "SERVERENGINE_USE_SOCKET_REUSEPORT" variable is working?

#!/bin/sh

#source vars if file exists
DEFAULT=/etc/default/fluentd

export SERVERENGINE_USE_SOCKET_REUSEPORT=1

if [ -r $DEFAULT ]; then
    set -o allexport
    . $DEFAULT
    set +o allexport
fi

# If the user has supplied only arguments append them to `fluentd` command
if [ "${1#-}" != "$1" ]; then
    set -- fluentd "$@"
fi

# If user does not supply config file or plugins, use the default
if [ "$1" = "fluentd" ]; then
    if ! echo $@ | grep ' \-c' ; then
       set -- "$@" -c /fluentd/etc/${FLUENTD_CONF}
    fi

    if ! echo $@ | grep ' \-p' ; then
       set -- "$@" -p /fluentd/plugins
    fi

    set -- "$@" -r /usr/lib/ruby/gems/2.5.0/gems/fluent-plugin-elasticsearch-4.2.2/lib/fluent/plugin/elasticsearch_simple_sniffer.rb
fi

df -h
echo $@
echo $SERVERENGINE_USE_SOCKET_REUSEPORT
exec "$@"

@fujimotos
Copy link
Member

fujimotos commented Apr 30, 2021

The fluentd_output_status_buffer_available_space_ratio of worker0 is 92.7% and worker1 is 99.2%. Is this difference expected?

@chikinchoi I think a small difference is expected.

You originally reported that the space usage
(fluentd_output_status_buffer_available_space_ratio) was:

worker0 is 98% and worker1 is 0%.

So worker1 was obviously overworking. On the other hand,
the current status is:

worker0 is 92.7% and worker1 is 99.2%.

so I consider this as a progress, better than 0% vs 98% usage.

@chikinchoi
Copy link
Author

@fujimotos I found worker0 is 71% and worker1 is 0% today, seems it is still a progress, but do you think there is any way to make it better?

@fujimotos
Copy link
Member

fujimotos commented May 4, 2021

do you think there is any way to make it better?

@chikinchoi As far as I know, there is no other option that can improve
the task distribution.

Edit: There is a fix being proposed in the Linux kernel level.
But the kernel maintainers are not convinced by that patch.

So I believe SERVERENGINE_USE_SOCKET_REUSEPORT is currenlty
the best Fluentd can archive to distribute the task load evenly.

@ankit21491
Copy link

Thanks for the resolution, I have tried using it but after making the change in the "export SERVERENGINE_USE_SOCKET_REUSEPORT=1", the other workers (I am using 6 worker node in my configuration) started utilizing CPU for a very short period of time, ~2 minutes and after that everything reverted back as earlier.

Also I am sending the logs to NewRelic using Fluentd, and for most of the server/cluster it is working fine but for few of them it is showing lags from 2 hours and goes even beyond 48 hours.

Suprisingly the logs for one of the namespace I have in my K8s cluster streaming live in the NewRelic however for one of the namespace I am facing this issue. I have tried using directive as well as the solution provided above that reduced the latency from hours to somewhat close to 10-15 minutes but I am still not getting the logs without lag.

Any troubleshooting step would be appreciated.

@jvs87
Copy link

jvs87 commented Jun 13, 2024

Im facing with the same problem, any other solution aditional to SERVERENGINE_USE_SOCKET_REUSEPORT ?

@daipom
Copy link
Contributor

daipom commented Jun 13, 2024

So, the load is unbalanced even if setting SERVERENGINE_USE_SOCKET_REUSEPORT?
How much difference does it make?

@jvs87
Copy link

jvs87 commented Jun 13, 2024

So much diference, this is a picture of buffer from yesterdey:

image

As you can see worker 1 buffer its increasing and the other are "emtpy".

Thanks.

@daipom
Copy link
Contributor

daipom commented Jun 14, 2024

@jvs87 Thanks!
Does this occur even if setting SERVERENGINE_USE_SOCKET_REUSEPORT?

@jvs87
Copy link

jvs87 commented Jun 14, 2024

Yes, it is declared in the env:

image

@daipom
Copy link
Contributor

daipom commented Jun 14, 2024

Thanks. I see...
I am surprised to see so much imbalance, even with reuseport.
When I applied reuseport on nginx, the load was more distributed.
We may need to investigate the cause.

Note: uken/fluent-plugin-elasticsearch#1047

@jvs87
Copy link

jvs87 commented Jun 14, 2024

Yes, I'm a little blinded and dont know if the problem is related to multi process or in the other hand to bad use of buffer.

@jvs87
Copy link

jvs87 commented Jun 25, 2024

Hi. Do you need any other test?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
waiting-for-user Similar to "moreinfo", but especially need feedback from user
Projects
None yet
Development

No branches or pull requests

5 participants