-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reader API for C++ client #717
Conversation
required uint64 ledgerId = 1; | ||
required uint64 entryId = 2; | ||
required int64 ledgerId = 1; | ||
required int64 entryId = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is safe as it doesn't change the encoding on the wire, just the type used in C++. In Java it would anyway be a long
with no difference.
This is required to pass -1
as special values.
181d5fb
to
4117703
Compare
@@ -36,7 +36,7 @@ | |||
PositionImpl startCursorPosition) { | |||
super(bookkeeper, config, ledger, cursorName); | |||
|
|||
if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) { | |||
if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any specific reason to not compare the position(ledgerId and entryId)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem comes from C++ client lib. The entry id in MessageId
is 48 bits only (which by itself it's more than enough), and for PositionImpl.latest
we're using Long.max()
for both ledgerId
and entryId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll be adding comments for these changes
Position startPosition = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()); | ||
long ledgerId = msgId.getLedgerId(); | ||
long entryId = msgId.getEntryId(); | ||
if (msgId instanceof BatchMessageIdImpl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think broker is batch-msg agnostic as it treats it as a normal msg (client manages batch-msg business) and it would be better to avoid introducing BatchMessageIdImpl
at broker-side. Can't we always assign entryId= entryId-1
while creating NonDurableSubscription
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we always assign entryId= entryId-1 while creating NonDurableSubscription?
The problem is that you can have these 2 scenarios:
- Without batching :
startMessageId = (ledgerId, entryId)
. eg.(3, 4)
- With batching :
startMessageId = (ledgerId, entryId, batchIndex)
. eg.(3,4,5)
In the first case, it will mean that (3,4)
was already consumed. So next message to be sent will be (3,5)
.
In the case of batching, (3,4,5)
is the start message. With the old logic, the broker would start delivering at (3,5,0)
while there might still be messages to read at (3,4,6)
..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without batching : startMessageId = (ledgerId, entryId). eg. (3, 4)
yes, I understand that batch-msg might not be consumed fully at client so, we need to go 1 msg back.
If we do the same for non-batch then (3,4)
will be delivered again which will be duplicate. Is there an issue if broker delivers duplicate msg? because anyway for durable cursor, broker can deliver duplicate msg by reseting cursor-position at marDeletePosition
. So, duplicate msg is the only conern to set entryId=entryId-1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the purpose of reader is to position exactly on the correct message :) That enables to avoid dups in the consuming side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case, that is an issue even with the current Java reader. I have another PR to fix that as well.
* @return ResultTimeout if the receive timeout was triggered | ||
* @return ResultInvalidConfiguration if a message listener had been set in the configuration | ||
*/ | ||
Result readNext(Message& msg, int timeoutMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we also add readNextAsync()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have yet merged the Consumer.receiveAsync()
:) Once that's in, adding Reader.readNextAsync()
will be trivial since it's just a wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far the changes LGTM (minor comments) - will give it another round of review tomorrow before approving.
@@ -36,7 +36,7 @@ | |||
PositionImpl startCursorPosition) { | |||
super(bookkeeper, config, ledger, cursorName); | |||
|
|||
if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is not too much work - can we move all java file changes as a part of a separate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm going to break it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pending
inline bool operator<(const BatchMessageId& mID) const; | ||
inline bool operator<=(const BatchMessageId& mID) const; | ||
protected: | ||
bool operator<(const BatchMessageId& other) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why we removed the inline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using std::tie to have 3 vars comparators so to not have any c++11 in headers. In general, we've kept all methods impl in .cc files.
@@ -301,14 +315,33 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: | |||
} | |||
|
|||
// Zero Queue size is not supported with Batch Messages | |||
unsigned int ConsumerImpl::receiveIndividualMessagesFromBatch(Message& batchedMessage) { | |||
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point, I remember @msb-at-yahoo advising to refrain using uint32_t or any other data types from <stdint.h> unless we are serializing and passing the value over the wire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way, I generally prefer explicit sizing to make it clear and also it's shorter than unsigned int
:D
} | ||
|
||
// Regular path, append individual message to incoming messages queue | ||
incomingMessages_.push(msg); | ||
} | ||
return batchSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be returning the batchSize since not all messages made it to incomingMessages_
Just return the number of messages added to the incomingMessages_ queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll fix it.
const BatchMessageId& msgId = static_cast<const BatchMessageId&>(msg.getMessageId()); | ||
|
||
// Only acknowledge on the first message in the batch | ||
if (msgId.batchIndex_ <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we acknowledge on the last message of the batch since you clear the Receiver Queue on reconnect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't really makes a lot of difference (and it's easy to ack on the first rather than the last), because for the reader the acknowledge it's just an internal trick that enables us to track the "backlog" in the stats. Since the cursor is non-durable and the reader always specify where to start reading, there is no other meaning for the acks.
consumerConf.setConsumerName(readerConf_.getReaderName()); | ||
} | ||
|
||
if (readerConf_.hasReaderListener()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit confused since the ReaderAPI has no method for acknowledging the message shouldn't we be calling acknowledgeIfNecessaryourselves before sending the message to ReaderListener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, I missed that
// We can safely cast to 'BatchMessageId' since all the messages queued will have that type of message id, | ||
// irrespective of whether they were part of a batch or not. | ||
const MessageId& nextMessageId = static_cast<const BatchMessageId&>(nextMessageInQueue.getMessageId()); | ||
BatchMessageId previousMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_ - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here you cast the BatchMessageId to MessageId again (i.e ignore the batchIndex) hence stand a risk of redelivering the message once the batch is redelivered
On the other hand if you maintain the batchIndex then you can't clear the unAckedMessageTracter and bathIndexTracker on reconnection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's actually a bug. Need to fix this
const MessageId& nextMessageId = static_cast<const BatchMessageId&>(nextMessageInQueue.getMessageId()); | ||
BatchMessageId previousMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_ - 1); | ||
return Optional<BatchMessageId>::of(previousMessageId); | ||
} else if (lastDequedMessage_.is_present()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return (lastDequedMessage_.is_present() ? lastDequedMessage_ : startMessageId_)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer longer form with comments :)
pulsar-client-cpp/lib/MessageId.cc
Outdated
@@ -43,12 +52,59 @@ MessageId::MessageId(int64_t ledgerId, int64_t entryId) | |||
// consumer's partition is assigned to this partition | |||
} | |||
|
|||
const MessageId& MessageId::earliest() { | |||
static const BatchMessageId _earliest(-1, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious what does _var
convention mean I know that var_
is for private members.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't implying any convention. these are just locally defined static variables.
/** | ||
* Serialize the message id into a binary string for storing | ||
*/ | ||
virtual void serialize(std::string& result) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would we want to use the serialize and deserialize API??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using reader, you need to be able to store the message id somewhere (since you don't have a durable cursor, like in case of subscriptions/consumers).
Then you can deserialize that message id and use it to create a new Reader starting from a particular message.
@@ -187,6 +191,16 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& | |||
subscribe->set_consumer_id(consumerId); | |||
subscribe->set_request_id(requestId); | |||
subscribe->set_consumer_name(consumerName); | |||
subscribe->set_durable(subscriptionMode == SubscriptionModeDurable); | |||
if (startMessageId.is_present()) { | |||
MessageIdData& messageIdData = *subscribe->mutable_start_message_id(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, but subscribe
is already pointer so, do we need pointer *subscribe->
again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscribe->mutable_start_message_id()
returns a MessageIdData*
pointer. So the *
applies to that, and not to subscribe
LOG_DEBUG( | ||
getName() << "Ignoring message from before the startMessageId" << msg.getMessageId()); | ||
increaseAvailablePermits(cnx); | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batchSize--
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Jai also pointed out, will fix that
&& msgId.batchIndex_ <= startMessageId_.value().batchIndex_) { | ||
LOG_DEBUG( | ||
getName() << "Ignoring message from before the startMessageId" << msg.getMessageId()); | ||
increaseAvailablePermits(cnx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call increaseAvailablePermits(cnx, numberOfPermits);
outside the loop as increaseAvailablePermits
sends command to broker if availablePermits > ReceiverQueueSize/2)
so, we can save multiple send command.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to address this comment???
@jai1 @rdhabalia I've pulled out all Java changes + the fix for Java client reader to position within a batch into #720. I'll push the stripped version here with just the C++ changes. |
4117703
to
467e53c
Compare
@jai1 @rdhabalia Removed Java changes, rebased on #720 and addressed comments. |
} | ||
|
||
bool BatchMessageId::operator<(const BatchMessageId& other) const { | ||
return std::tie(ledgerId_, entryId_, batchIndex_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am getting this error - when I try to compile it on our machines.
BatchMessageId.cc:49: error: cannot bind bitfield ‘((const pulsar::BatchMessageId*)other)->pulsar::BatchMessageId::<anonymous>.pulsar::MessageId::entryId_’ to ‘int64_t&’
Same with MessageId.cc
I think the only way out is to compare field by field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhm.. ok :/ I hoped tuple would have been supported in gcc-4.4 as well
std::string serialized; | ||
msgId.serialize(serialized); | ||
|
||
std::shared_ptr<MessageId> deserialized = MessageId::deserialize(serialized); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be
boost::shared_ptr deserialized = MessageId::deserialize(serialized);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I was initially using std::shared_ptr
and later converted into boost::shared_ptr
return s; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gtest-internal.h:97: undefined reference to pulsar::operator<<(std::basic_ostream<char, std::char_traits<char> >&, pulsar::BatchMessageId const&)
This operator needs to be visible outside if you want to use it in tests - like MessageId
#pragma GCC visibility push(default)
std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
s << '(' << messageId.ledgerId_ << ',' << messageId.entryId_ << ',' << messageId.partition_ << ')';
return s;
}
#pragma GCC visibility pop
c3e3a50
to
d014e25
Compare
@jai1 @rdhabalia Updated with fixes for comments plus some error in MessageId to BatchMessageId casting that was getting a test to be flaky. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two small changes remain - rest is fine
pulsar-client-cpp/lib/MessageId.cc
Outdated
|
||
bool MessageId::operator<(const MessageId& other) const { | ||
return std::tie(ledgerId_, entryId_, partition_) | ||
< std::tie(other.ledgerId_, other.entryId_, other.partition_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageId.cc:110: error: cannot bind bitfield ‘other->pulsar::MessageId::entryId_’ to ‘int64_t&’
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the std::tie()
to support older g++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to remove std::tie()
@@ -36,7 +36,7 @@ | |||
PositionImpl startCursorPosition) { | |||
super(bookkeeper, config, ledger, cursorName); | |||
|
|||
if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pending
d014e25
to
ebf4fb1
Compare
@rdhabalia @jai1 Rebased on last master with required commit merged. All comments should have been addressed already. |
ebf4fb1
to
23bdf8b
Compare
retest this please |
1 similar comment
retest this please |
pulsar-client-cpp/lib/MessageId.cc
Outdated
|
||
bool MessageId::operator<(const MessageId& other) const { | ||
return std::tie(ledgerId_, entryId_, partition_) | ||
< std::tie(other.ledgerId_, other.entryId_, other.partition_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to remove std::tie()
I thought I already did. let me check again |
* subscription. Reader can only work on non-partitioned topics. | ||
* <p> | ||
* The initial reader positioning is done by specifying a message id. The options are: | ||
* <ul> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does CPP support format of comment like java?
} | ||
|
||
std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId) { | ||
s << '(' << messageId.ledgerId_ << ':' << messageId.entryId_ << ':' << messageId.batchIndex_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it update value of input param std::ostream& s
. So, should we make return type of this function to void
so, user would know that it updates input value only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retest this please |
You did it from batchMessageId - forgot to do it for MessageId |
1ade68e
to
30b5c65
Compare
30b5c65
to
0ecbbd8
Compare
@jai1 Updated and also fixed a problem in comparison |
@merlimat - Reviewed the PR - LGTM - once you address Rajan's comment about increaseAvailablePermits we can merge it. |
@jai1 @rdhabalia Fixed the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merge it after the tests pass.
@saandrews @msb-at-yahoo @rdhabalia - Do you want to review this PR, in case I missed something.? |
retest this please |
Merging this to rebase dependent changes on top of it. Will address comments if they arise |
…artup (#9499) ### Motivation Standalone pulsar broker and function service can have deadlock which also sometime causes failures in function unit-test case. Pulsar function tries to create a subscription which blocks the zk thread and can be cause of possible deadlock. Below is the thread-dump when the deadlock happens and function service start up fails. So, remove blocking call while creating subscription using admin-api. ``` "pulsar-load-manager-139-1" #717 prio=5 os_prio=31 tid=0x00007f8a68698000 nid=0x3610f waiting on condition [0x00007000156e9000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000792a07020> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationFromZK(SimpleLoadManagerImpl.java:391) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationDouble(SimpleLoadManagerImpl.java:401) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getLoadBalancerBrokerOverloadedThresholdPercentage(SimpleLoadManagerImpl.java:450) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.generateLoadReportForcefully(SimpleLoadManagerImpl.java:1139) - locked <0x000000078e53f2d0> (a java.util.HashSet) at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.writeLoadReportOnZookeeper(SimpleLoadManagerImpl.java:1295) at org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask.run(LoadReportUpdaterTask.java:39) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) "pulsar-ordered-OrderedExecutor-1-0-EventThread" #621 daemon prio=5 os_prio=31 tid=0x00007f89fd8e1800 nid=0x2fe07 waiting on condition [0x000070000f7ce000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000792ecb8b0> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:2060) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$69(PersistentTopicsBase.java:2034) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$$Lambda$676/1709042625.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$5(ZooKeeperCache.java:249) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$154/5998675.processResult(Unknown Source) at org.apache.bookkeeper.zookeeper.ZooKeeperClient$15$1.processResult(ZooKeeperClient.java:879) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:583) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510) main" #1 prio=5 os_prio=31 tid=0x00007f8aa0013000 nid=0x1603 waiting on condition [0x0000700004160000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007930efe68> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:178) at org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:297) at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:632) at org.glassfish.jersey.client.JerseyInvocation$$Lambda$672/237755480.call(Unknown Source) at org.glassfish.jersey.client.JerseyInvocation.call(JerseyInvocation.java:654) at org.glassfish.jersey.client.JerseyInvocation.lambda$runInScope$3(JerseyInvocation.java:648) at org.glassfish.jersey.client.JerseyInvocation$$Lambda$673/197134325.call(Unknown Source) at org.glassfish.jersey.internal.Errors.process(Errors.java:292) at org.glassfish.jersey.internal.Errors.process(Errors.java:274) at org.glassfish.jersey.internal.Errors.process(Errors.java:205) at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390) at org.glassfish.jersey.client.JerseyInvocation.runInScope(JerseyInvocation.java:648) at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:631) at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:434) at org.glassfish.jersey.client.JerseyInvocation$Builder.put(JerseyInvocation.java:318) at org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:1143) at org.apache.pulsar.functions.worker.PulsarWorkerService.start(PulsarWorkerService.java:454) at org.apache.pulsar.broker.PulsarService.startWorkerService(PulsarService.java:1343) at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:671) at org.apache.pulsar.io.PulsarFunctionE2ETest.setup(PulsarFunctionE2ETest.java:209) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ```
Motivation
Implement the Reader API also for C++ client library