Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 1240][C++] Support setting ConsumerEventListener in pulsar cpp client #12118

Merged
merged 4 commits into from
Sep 24, 2021

Conversation

metahys
Copy link
Contributor

@metahys metahys commented Sep 21, 2021

Fixes #1240

Motivation

This PR supports setting ConsumerEventListener in pulsar cpp client.

Modifications

  • add an abstract class ConsumerEventListener for listening consumer events.
  • add setConsumerEventListener and getConsumerEventListener methods in ConsumerConfiguration
  • add handleActiveConsumerChange method in ClientConnection to handle the ACTIVE_CONSUMER_CHANGE event.

Verifying this change

  • Added unit test in ConsumerConfigurationTest
  • Added unit test in ConsumerTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • no-need-doc

@Anonymitaet Anonymitaet added the doc-not-needed Your PR changes do not impact docs label Sep 22, 2021
@metahys
Copy link
Contributor Author

metahys commented Sep 22, 2021

/pulsarbot run-failure-checks

@@ -28,11 +28,19 @@ DECLARE_LOG_OBJECT()

using namespace pulsar;

class DummyEventListener : public ConsumerEventListener {
public:
virtual void becomeActive(Consumer consumer, int partitionId) override {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test that verifies that the active/inactive tests are being invoked. We can follow the same line of test that we're doing for java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added unit tests in ConsumerTest. And I renamed "become" to "became" to make it consistent with java client. The call to registerConsumer is moved from handleCreateConsumer to connectionOpened, so that we can handle commands such as ACTIVE_CONSUMER_CHANGE properly.

@merlimat Please take a look.

@merlimat merlimat added this to the 2.9.0 milestone Sep 24, 2021
@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Sep 24, 2021
@merlimat merlimat merged commit 3b76784 into apache:master Sep 24, 2021
BewareMyPower added a commit that referenced this pull request Jan 27, 2022
…13883)

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.

(cherry picked from commit e50493e)

Fix the conflicts by:
- Remove ReaderImpl::getLastMessageIdAsync introduced from #11723
- Remove getPriorityLevel() method introduced from #12076
- Revert changes of registerConsumer from #12118
- Remove new fields introduced from #13627
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
… client (apache#12118)

* [C++] Support setting ConsumerEventListener in pulsar cpp client

* Rename "become" to "became"

* Register consumer before sending subscribe request

* Add unit tests in ConsumerTest
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs release/2.9.0 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement ActiveConsumerListener for pulsar cpp client
5 participants