Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async commit is not working #227

Open
architmmmec opened this issue Jul 6, 2021 · 8 comments
Open

Async commit is not working #227

architmmmec opened this issue Jul 6, 2021 · 8 comments
Labels
for/stackoverflow Questions are best asked on SO or Gitter

Comments

@architmmmec
Copy link

architmmmec commented Jul 6, 2021

   /**
     * Consumes data from Kafka and returns a flux of receiver record
     * @return receiver record
     */
    public Flux<ReceiverRecord<String, byte[]>> consumeFromSource() {
        ReceiverOptions<String, byte[]> receiverOptions = getReceiverOptions();
        return KafkaReceiver.create(receiverOptions).receive(Integer.parseInt(prefetch))
                .map(receiverRecord -> {
                    try {
                        if (receiverRecord.receiverOffset().offset() % commitBatchSize == 0) {
                            receiverRecord.receiverOffset().commit();
                            log.debug("Committed offset {},{}", receiverRecord.receiverOffset().topicPartition(),
                                    receiverRecord.receiverOffset().offset());
                        }
                    } catch (Exception exception) {
                        log.error("Exception while committing", exception);
                    }
                  
                    return receiverRecord;
                });
    }

This code is not working, can some please this

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jul 6, 2021
@garyrussell
Copy link
Contributor

"not working" is not enough information. Please elaborate. What behavior are you seeing?

@architmmmec
Copy link
Author

Offsets are not getting committed @garyrussell

@garyrussell garyrussell added for/stackoverflow Questions are best asked on SO or Gitter and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Jul 6, 2021
@garyrussell
Copy link
Contributor

You need to subscribe (e.g. .block()) on the Mono returned by commit - see its javadoc.

@aridechettali
Copy link

But block() is when you wanted a sync commit, As per the doc we can have async commit by just calling receiverOffset().commit();
https://projectreactor.io/docs/kafka/release/reference/#concurrent-ordered

@garyrussell
Copy link
Contributor

That's a bug in the documentation; you always have to subscribe to a Mono to make it run.

You can run the commit on a different thread; add .publishOn(...).

See https://projectreactor.io/docs/kafka/snapshot/reference/#kafka-source

@aridechettali
Copy link

Got it thanks

@garyrussell
Copy link
Contributor

So here's the final answer - you need block() if you set the commit interval to zero - because then, the commit event is never scheduled, subscribing to the Mono is required to schedule the commit task to run.

When there is a commit interval, the commit task is scheduled periodically. Calling commit() simply updates the offsets to be committed and they will be picked up by the scheduled task; there is no need to subscribe to the Mono.

@koldat
Copy link

koldat commented Aug 31, 2021

You can use Mono as it was designed (receiverRecord.receiverOffset().commit().then(Mono.just(receiverRecord))):

        return KafkaReceiver.create(receiverOptions).receive()
                .flatMap(receiverRecord -> {
                    try {
                        if (receiverRecord.receiverOffset().offset() % commitBatchSize == 0) {
                            log.debug("Committed offset {},{}", receiverRecord.receiverOffset().topicPartition(),
                                    receiverRecord.receiverOffset().offset());
                            return receiverRecord.receiverOffset().commit()
                                .then(Mono.just(receiverRecord));
                        }
                    } catch (Exception exception) {
                        log.error("Exception while committing", exception);
                    }
                  
                    return Mono.just(receiverRecord);
                });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Projects
None yet
Development

No branches or pull requests

5 participants