Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maxTickMessages is not working #1456

Open
praveenydv opened this issue Oct 6, 2021 · 2 comments
Open

maxTickMessages is not working #1456

praveenydv opened this issue Oct 6, 2021 · 2 comments

Comments

@praveenydv
Copy link

praveenydv commented Oct 6, 2021

When I am trying to fix my message per batch using maxTickMessages. But It is not affecting anything means it is not controlling number of message per batch.
Even if I am putting maxTickMessages=0 then I am getting messages.
I am getting messages on the basis of fetchMaxBytes no matter what I put value of maxTickMessages.

const topicName='service-topic'
const groupName="test-group"
const moment=require('moment-timezone')
var kafka = require('kafka-node');
var hostName='192.168.49.2:32003'

var options = {
    kafkaHost: hostName,
    groupId: groupName,
    autoCommit: true,
    maxTickMessages:10,
    maxNumSegments: 1,
    fetchMaxBytes: 1000,
    autoCommitIntervalMs: 1000,
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'latest',
    outOfRangeOffset: 'earliest'
    };

var consumerGroup = new kafka.ConsumerGroup(options, topicName);
consumerGroup.on('connect', function () {
    console.log("connected!")
    })

consumerGroup.on('error', function (message) {
    console.log("error",message)
    }) 

let paused=false
consumerGroup.on('message', function (message) {
    if(!paused){
        consumerGroup.pause()
        paused=true
        setTimeout(()=>{
            consumerGroup.resume()
            paused=false
        },5000)
    }console.log('Message: at',moment().unix(),JSON.stringify(message.value));
});

In the above code I am trying to fetch fix number( equal to maxTickMessages ) of messages in every 5 seconds.
But I am getting messages on the basis of fetchMaxBytes limit.

@mmsqe
Copy link

mmsqe commented Oct 11, 2021

same issue here, getting more than maxTickMessages(100) from consumer

// config:
{
    kafkaHost,
    groupId,
    id,
    retryMinTimeout: 250,
    sessionTimeout: 15000,
    fromOffset: "earliest",
    encoding: "utf8",
    keyEncoding: "utf8",
    autoCommit: false,
    maxTickMessages: 100,
}

// consumer:
consumer.on("done", () => {
    setImmediate(async () => {
        if (messages.length > 0) {
            while (messages.length > 0) {
                messages.shift();
                consol.log(`${messages.length} after shift`);
            }
            consumer.commit(true, () => {
                consol.log(`committed`);
                consumer.resume();
            });
        } else {
            consumer.resume();
        }
    });
});

@deepal
Copy link

deepal commented Oct 21, 2021

@praveenydv @mmsqe I think this is the expected behaviour in latest versions because maxTickMessages is not used to limit the number of messages in the received batch (however this was not the case in old kafka-node versions as I explain later). The number of messages in the batch is determined by fetchMaxBytes and depends on the size of the messages. maxTickMessages is only used to batch message events using process.nextTick to avoid blocking the event loop.

For example, if the consumer fetches 1000 messages and if the maxTickMessages is set to 100, the consumer will emit 100 message events, and schedule the next 100 message emits using process.nextTick. This will happen until all 1000 message events are emitted.

If all 1000 messages were emitted sequentially (without batching), and if your message handler is completely synchronous and blocking, your app won't be able to perform any other task until all 1000 messages were processed. But with maxTickMessages set to 100, your app gets chance to perform other tasks (e.g, handle a request etc.) between each 100 messages.

However, in earlier kafka-node versions (don't remember which exactly), maxTickMessages was used to limit the number of messages in a batch. For example, if the fetch request receives 1000 messages (using fetchMaxBytes) and if you have set maxTickMessages to 100, kafka-node will "discard" the last 900 messages to ensure that only 100 messages are in the batch. If I'm correct the process.nextTick was done to avoid wasting the bandwidth by discarding fetched messages.

So, if you are using a recent version of kafka-node (I guess maybe 5.x.x onwards), you will need to only use fetchMaxBytes to limit the amount of bytes you receive in a fetch request, which in turn implicitly tweaks the number of messages you receive.

Maybe a contributor of kafka-node might be able to provide more info (and/or if I'm incorrect)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants