Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader API for C++ client #717

Merged
merged 7 commits into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions pulsar-client-cpp/include/pulsar/BatchMessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,40 @@
#define LIB_BATCHMESSAGEID_H_

#include <pulsar/MessageId.h>
#include <iosfwd>

#pragma GCC visibility push(default)

namespace pulsar {

class PulsarWrapper;

class BatchMessageId : public MessageId {
public:
BatchMessageId(int64_t ledgerId, int64_t entryId, int batchIndex = -1)
: MessageId(ledgerId, entryId),
batchIndex_(batchIndex) {
}

BatchMessageId(const MessageId& msgId);

BatchMessageId()
: batchIndex_(-1) {
}

virtual void serialize(std::string& result) const;

// These functions compare the message order as stored in bookkeeper
inline bool operator<(const BatchMessageId& mID) const;
inline bool operator<=(const BatchMessageId& mID) const;
protected:
bool operator<(const BatchMessageId& other) const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why we removed the inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using std::tie to have 3 vars comparators so to not have any c++11 in headers. In general, we've kept all methods impl in .cc files.

bool operator<=(const BatchMessageId& other) const;
bool operator==(const BatchMessageId& other) const;

protected:
virtual int64_t getBatchIndex() const;

friend class Commands;
friend class ConsumerImpl;
friend class ReaderImpl;
friend class Message;
friend class MessageImpl;
friend class PartitionedProducerImpl;
Expand All @@ -46,15 +62,9 @@ class BatchMessageId : public MessageId {
friend class PulsarWrapper;
friend class PulsarFriend;
int64_t batchIndex_;
};

bool BatchMessageId::operator<(const BatchMessageId& mID) const {
return (ledgerId_ < mID.ledgerId_) || (ledgerId_ == mID.ledgerId_ && entryId_ < mID.entryId_);
}

bool BatchMessageId::operator<=(const BatchMessageId& mID) const {
return (ledgerId_ < mID.ledgerId_) || (ledgerId_ == mID.ledgerId_ && entryId_ <= mID.entryId_);
}
friend std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId);
};

}
#pragma GCC visibility pop
Expand Down
32 changes: 32 additions & 0 deletions pulsar-client-cpp/include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <pulsar/Consumer.h>
#include <pulsar/Producer.h>
#include <pulsar/Reader.h>
#include <pulsar/Result.h>
#include <pulsar/Message.h>
#include <pulsar/MessageBuilder.h>
Expand All @@ -32,6 +33,7 @@
namespace pulsar {
typedef boost::function<void(Result, Producer)> CreateProducerCallback;
typedef boost::function<void(Result, Consumer)> SubscribeCallback;
typedef boost::function<void(Result, Reader)> ReaderCallback;
typedef boost::function<void(Result)> CloseCallback;

class ClientImpl;
Expand Down Expand Up @@ -95,6 +97,36 @@ class Client {
void subscribeAsync(const std::string& topic, const std::string& consumerName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

/**
* Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic.
* <p>
* The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
* subscription. Reader can only work on non-partitioned topics.
* <p>
* The initial reader positioning is done by specifying a message id. The options are:
* <ul>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does CPP support format of comment like java?

* <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
* <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
* reader was created
* <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
* specific position. The first message to be read will be the message next to the specified messageId.
* </ul>
*
* @param topic
* The name of the topic where to read
* @param startMessageId
* The message id where the reader will position itself. The first message returned will be the one after
* the specified startMessageId
* @param conf
* The {@code ReaderConfiguration} object
* @return The {@code Reader} object
*/
Result createReader(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, Reader& reader);

void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, ReaderCallback callback);

/**
*
* @return
Expand Down
39 changes: 30 additions & 9 deletions pulsar-client-cpp/include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <iosfwd>
#include <stdint.h>
#include <boost/shared_ptr.hpp>

#pragma GCC visibility push(default)

Expand All @@ -34,13 +35,40 @@ class MessageId {
public:
MessageId& operator=(const MessageId&);
MessageId();
virtual ~MessageId() {}

/**
* MessageId representing the "earliest" or "oldest available" message stored in the topic
*/
static const MessageId& earliest();

/**
* MessageId representing the "latest" or "last published" message in the topic
*/
static const MessageId& latest();

/**
* Serialize the message id into a binary string for storing
*/
virtual void serialize(std::string& result) const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would we want to use the serialize and deserialize API??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using reader, you need to be able to store the message id somewhere (since you don't have a durable cursor, like in case of subscriptions/consumers).

Then you can deserialize that message id and use it to create a new Reader starting from a particular message.


/**
* Deserialize a message id from a binary string
*/
static boost::shared_ptr<MessageId> deserialize(const std::string& serializedMessageId);

// These functions compare the message order as stored in bookkeeper
inline bool operator<(const MessageId& mID) const;
inline bool operator==(const MessageId& mID) const;
bool operator<(const MessageId& other) const;
bool operator==(const MessageId& other) const;

protected:

virtual int64_t getBatchIndex() const;
friend class ConsumerImpl;
friend class Message;
friend class MessageImpl;
friend class Commands;
friend class BatchMessageId;
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
friend class UnAckedMessageTrackerEnabled;
Expand All @@ -53,13 +81,6 @@ class MessageId {
short partition_ :16;
};

bool MessageId::operator<(const MessageId& mID) const {
return (ledgerId_ < mID.ledgerId_) || (ledgerId_ == mID.ledgerId_ && entryId_ < mID.entryId_);
}

bool MessageId::operator==(const MessageId& mID) const {
return (ledgerId_ == mID.ledgerId_ && entryId_ == mID.entryId_);
}

}

Expand Down
90 changes: 90 additions & 0 deletions pulsar-client-cpp/include/pulsar/Reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_READER_HPP_
#define PULSAR_READER_HPP_

#include <pulsar/Message.h>
#include <pulsar/ReaderConfiguration.h>

#pragma GCC visibility push(default)


namespace pulsar {
class PulsarWrapper;
class PulsarFriend;
class ReaderImpl;

/**
* A Reader can be used to scan through all the messages currently available in a topic.
*/
class Reader {
public:
/**
* Construct an uninitialized reader object
*/
Reader();

/**
* @return the topic this reader is reading from
*/
const std::string& getTopic() const;

/**
* Read a single message.
*
* If a message is not immediately available, this method will block until a new
* message is available.
*
* @param msg a non-const reference where the received message will be copied
* @return ResultOk when a message is received
* @return ResultInvalidConfiguration if a message listener had been set in the configuration
*/
Result readNext(Message& msg);

/**
* Read a single message
*
* @param msg a non-const reference where the received message will be copied
* @param timeoutMs the receive timeout in milliseconds
* @return ResultOk if a message was received
* @return ResultTimeout if the receive timeout was triggered
* @return ResultInvalidConfiguration if a message listener had been set in the configuration
*/
Result readNext(Message& msg, int timeoutMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we also add readNextAsync()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have yet merged the Consumer.receiveAsync() :) Once that's in, adding Reader.readNextAsync() will be trivial since it's just a wrapper.


Result close();

void closeAsync(ResultCallback callback);

private:
typedef boost::shared_ptr<ReaderImpl> ReaderImplPtr;
ReaderImplPtr impl_;
explicit Reader(ReaderImplPtr);

friend class PulsarFriend;
friend class PulsarWrapper;
friend class ReaderImpl;
friend class ReaderTest;
};

}

#pragma GCC visibility pop

#endif /* PULSAR_READER_HPP_ */
89 changes: 89 additions & 0 deletions pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_READER_CONFIGURATION_H_
#define PULSAR_READER_CONFIGURATION_H_

#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <pulsar/Result.h>
#include <pulsar/Message.h>

#pragma GCC visibility push(default)
namespace pulsar {

class Reader;
class PulsarWrapper;

/// Callback definition for non-data operation
typedef boost::function<void(Result result)> ResultCallback;

/// Callback definition for MessageListener
typedef boost::function<void(Reader reader, const Message& msg)> ReaderListener;

class ReaderConfigurationImpl;

/**
* Class specifying the configuration of a consumer.
*/
class ReaderConfiguration {
public:
ReaderConfiguration();
~ReaderConfiguration();
ReaderConfiguration(const ReaderConfiguration&);
ReaderConfiguration& operator=(const ReaderConfiguration&);

/**
* A message listener enables your application to configure how to process
* messages. A listener will be called in order for every message received.
*/
ReaderConfiguration& setReaderListener(ReaderListener listener);
ReaderListener getReaderListener() const;
bool hasReaderListener() const;

/**
* Sets the size of the reader receive queue.
*
* The consumer receive queue controls how many messages can be accumulated by the Consumer before the
* application calls receive(). Using a higher value could potentially increase the consumer throughput
* at the expense of bigger memory utilization.
*
* Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling pre-fetching of
* messages. This approach improves the message distribution on shared subscription, by pushing messages only to
* the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can be
* used if the consumer queue size is zero. The receive() function call should not be interrupted when
* the consumer queue size is zero.
*
* Default value is 1000 messages and should be good for most use cases.
*
* @param size
* the new receiver queue size value
*/
void setReceiverQueueSize(int size);
int getReceiverQueueSize() const;

void setReaderName(const std::string& readerName);
const std::string& getReaderName() const;

private:
boost::shared_ptr<ReaderConfigurationImpl> impl_;
};

}
#pragma GCC visibility pop
#endif /* PULSAR_READER_CONFIGURATION_H_ */
9 changes: 7 additions & 2 deletions pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic
LOG_DEBUG(name_ << "Constructed BatchAcknowledgementTracker");
}

void BatchAcknowledgementTracker::clear() {
trackerMap_.clear();
sendList_.clear();
}

void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
// ignore message if it is not a batch message
if (!message.impl_->metadata.has_num_messages_in_batch()) {
Expand Down Expand Up @@ -99,8 +104,8 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messa
bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID, const proto::CommandAck_AckType ackType) {
Lock lock(mutex_);
TrackerMap::iterator pos = trackerMap_.find(msgID);
if (std::find(sendList_.begin(), sendList_.end(), msgID) != sendList_.end()
|| pos == trackerMap_.end()) {
if (pos == trackerMap_.end() ||
std::find(sendList_.begin(), sendList_.end(), msgID) != sendList_.end()) {
LOG_DEBUG(
"Batch is ready since message present in sendList_ or not present in trackerMap_ [message ID = " << msgID << "]");
return true;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class BatchAcknowledgementTracker {
void deleteAckedMessage(const BatchMessageId& messageId, proto::CommandAck_AckType ackType);
void receivedMessage(const Message& message);

void clear();

inline friend std::ostream& operator<<(std::ostream& os, const BatchAcknowledgementTracker& batchAcknowledgementTracker);

Expand Down
Loading