Skip to content

Commit

Permalink
[Issue 1240][C++] Support setting ConsumerEventListener in pulsar cpp…
Browse files Browse the repository at this point in the history
… client (#12118)

* [C++] Support setting ConsumerEventListener in pulsar cpp client

* Rename "become" to "became"

* Register consumer before sending subscribe request

* Add unit tests in ConsumerTest
  • Loading branch information
metahys authored Sep 24, 2021
1 parent 7d4d8cc commit 3b76784
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 4 deletions.
19 changes: 19 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <pulsar/CryptoKeyReader.h>
#include <pulsar/InitialPosition.h>
#include <pulsar/KeySharedPolicy.h>
#include <pulsar/ConsumerEventListener.h>

namespace pulsar {

Expand All @@ -43,6 +44,8 @@ typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
/// Callback definition for MessageListener
typedef std::function<void(Consumer consumer, const Message& msg)> MessageListener;

typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;

struct ConsumerConfigurationImpl;

/**
Expand Down Expand Up @@ -127,6 +130,22 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
bool hasMessageListener() const;

/**
* A event listener enables your application to react the consumer state
* change event (active or inactive).
*/
ConsumerConfiguration& setConsumerEventListener(ConsumerEventListenerPtr eventListener);

/**
* @return the consumer event listener
*/
ConsumerEventListenerPtr getConsumerEventListener() const;

/**
* @return true if the consumer event listener has been set
*/
bool hasConsumerEventListener() const;

/**
* Sets the size of the consumer receive queue.
*
Expand Down
49 changes: 49 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerEventListener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_CONSUMEREVENTLISTENER_H_
#define PULSAR_CONSUMEREVENTLISTENER_H_

#include <pulsar/defines.h>

namespace pulsar {

class Consumer;

class PULSAR_PUBLIC ConsumerEventListener {
public:
virtual ~ConsumerEventListener(){};
/**
* @brief Notified when the consumer group is changed, and the consumer becomes active.
*
* @param consumer the consumer that originated the event
* @param partitionId the id of the partition that beconmes active.
*/
virtual void becameActive(Consumer consumer, int partitionId) = 0;

/**
* @brief Notified when the consumer group is changed, and the consumer is still inactive or becomes
* inactive.
*
* @param consumer the consumer that originated the event
* @param partitionId the id of the partition that is still inactive or becomes inactive.
*/
virtual void becameInactive(Consumer consumer, int partitionId) = 0;
};
} // namespace pulsar
#endif /* PULSAR_CONSUMEREVENTLISTENER_H_ */
29 changes: 26 additions & 3 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,26 @@ bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& r
return isChecksumValid;
}

void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change) {
Lock lock(mutex_);
ConsumersMap::iterator it = consumers_.find(change.consumer_id());
if (it != consumers_.end()) {
ConsumerImplPtr consumer = it->second.lock();

if (consumer) {
lock.unlock();
consumer->activeConsumerChanged(change.is_active());
} else {
consumers_.erase(change.consumer_id());
LOG_DEBUG(cnxString_ << "Ignoring incoming message for already destroyed consumer "
<< change.consumer_id());
}
} else {
LOG_DEBUG(cnxString_ << "Got invalid consumer Id in " << change.consumer_id()
<< " -- isActive: " << change.is_active());
}
}

void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload) {
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: " << msg.consumer_id());
Expand Down Expand Up @@ -1128,12 +1148,15 @@ void ClientConnection::handleIncomingCommand() {
asyncWrite(buffer.const_asio_buffer(),
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
std::placeholders::_1, buffer));
break;
}

case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
LOG_DEBUG(cnxString_ << "Received notification about active consumer changes");
// ignore this message for now.
// TODO: @link{https://github.com/apache/pulsar/issues/1240}
const CommandActiveConsumerChange& change = incomingCmd_.active_consumer_change();
LOG_DEBUG(cnxString_
<< "Received notification about active consumer change, consumer_id: "
<< change.consumer_id() << " isActive: " << change.is_active());
handleActiveConsumerChange(change);
break;
}

Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
bool verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes,
proto::BaseCommand& incomingCmd_);

void handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change);
void handleIncomingCommand();
void handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ MessageListener ConsumerConfiguration::getMessageListener() const { return impl_

bool ConsumerConfiguration::hasMessageListener() const { return impl_->hasMessageListener; }

ConsumerConfiguration& ConsumerConfiguration::setConsumerEventListener(
ConsumerEventListenerPtr eventListener) {
impl_->eventListener = eventListener;
impl_->hasConsumerEventListener = true;
return *this;
}

ConsumerEventListenerPtr ConsumerConfiguration::getConsumerEventListener() const {
return impl_->eventListener;
}

bool ConsumerConfiguration::hasConsumerEventListener() const { return impl_->hasConsumerEventListener; }

void ConsumerConfiguration::setReceiverQueueSize(int size) { impl_->receiverQueueSize = size; }

int ConsumerConfiguration::getReceiverQueueSize() const { return impl_->receiverQueueSize; }
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ struct ConsumerConfigurationImpl {
ConsumerType consumerType{ConsumerExclusive};
MessageListener messageListener;
bool hasMessageListener{false};
ConsumerEventListenerPtr eventListener;
bool hasConsumerEventListener{false};
int receiverQueueSize{1000};
int maxTotalReceiverQueueSizeAcrossPartitions{50000};
std::string consumerName;
Expand Down
25 changes: 24 additions & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
subscription_(subscriptionName),
originalSubscriptionName_(subscriptionName),
messageListener_(config_.getMessageListener()),
eventListener_(config_.getConsumerEventListener()),
hasParent_(hasParent),
consumerTopicType_(consumerTopicType),
subscriptionMode_(subscriptionMode),
Expand Down Expand Up @@ -164,6 +165,10 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
return;
}

// Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
// sending the subscribe request.
cnx->registerConsumer(consumerId_, shared_from_this());

Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();
Expand Down Expand Up @@ -217,7 +222,6 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
Lock lock(mutex_);
connection_ = cnx;
incomingMessages_.clear();
cnx->registerConsumer(consumerId_, shared_from_this());
state_ = Ready;
backoff_.reset();
// Complicated logic since we don't have a isLocked() function for mutex
Expand Down Expand Up @@ -389,6 +393,25 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
}
}

void ConsumerImpl::activeConsumerChanged(bool isActive) {
if (eventListener_) {
listenerExecutor_->postWork(
std::bind(&ConsumerImpl::internalConsumerChangeListener, shared_from_this(), isActive));
}
}

void ConsumerImpl::internalConsumerChangeListener(bool isActive) {
try {
if (isActive) {
eventListener_->becameActive(Consumer(shared_from_this()), partitionIndex_);
} else {
eventListener_->becameInactive(Consumer(shared_from_this()), partitionIndex_);
}
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from event listener " << e.what());
}
}

void ConsumerImpl::failPendingReceiveCallback() {
Message msg;
Lock lock(pendingReceiveMutex_);
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class ConsumerImpl : public ConsumerImplBase,
void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
void messageProcessed(Message& msg, bool track = true);
void activeConsumerChanged(bool isActive);
inline proto::CommandSubscribe_SubType getSubType();
inline proto::CommandSubscribe_InitialPosition getInitialPosition();
void handleUnsubscribe(Result result, ResultCallback callback);
Expand Down Expand Up @@ -148,6 +149,9 @@ class ConsumerImpl : public ConsumerImplBase,
void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);

void internalListener();

void internalConsumerChangeListener(bool isActive);

void handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer);
ConsumerStatsBasePtr consumerStatsBasePtr_;

Expand Down Expand Up @@ -182,6 +186,7 @@ class ConsumerImpl : public ConsumerImplBase,
const std::string subscription_;
std::string originalSubscriptionName_;
MessageListener messageListener_;
ConsumerEventListenerPtr eventListener_;
ExecutorServicePtr listenerExecutor_;
bool hasParent_;
ConsumerTopicType consumerTopicType_;
Expand Down
11 changes: 11 additions & 0 deletions pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,19 @@ DECLARE_LOG_OBJECT()

using namespace pulsar;

class DummyEventListener : public ConsumerEventListener {
public:
virtual void becameActive(Consumer consumer, int partitionId) override {}

virtual void becameInactive(Consumer consumer, int partitionId) override {}
};

TEST(ConsumerConfigurationTest, testDefaultConfig) {
ConsumerConfiguration conf;
ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
ASSERT_EQ(conf.getConsumerType(), ConsumerExclusive);
ASSERT_EQ(conf.hasMessageListener(), false);
ASSERT_EQ(conf.hasConsumerEventListener(), false);
ASSERT_EQ(conf.getReceiverQueueSize(), 1000);
ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000);
ASSERT_EQ(conf.getConsumerName(), "");
Expand Down Expand Up @@ -73,6 +81,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
conf.setMessageListener([](Consumer consumer, const Message& msg) {});
ASSERT_EQ(conf.hasMessageListener(), true);

conf.setConsumerEventListener(std::make_shared<DummyEventListener>());
ASSERT_EQ(conf.hasConsumerEventListener(), true);

conf.setReceiverQueueSize(2000);
ASSERT_EQ(conf.getReceiverQueueSize(), 2000);

Expand Down
Loading

0 comments on commit 3b76784

Please sign in to comment.