-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Java Client - Support for getting consumer stats from broker #254
Conversation
@@ -85,10 +85,10 @@ class BrokerConsumerStats { | |||
std::string consumerName_; | |||
|
|||
/** Number of available message permits for the consumer */ | |||
int availablePermits_; | |||
unsigned long long availablePermits_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use uint64_t
/// Number of messages in the subscription backlog | ||
long msgBacklog_; | ||
/** Number of messages in the subscription backlog */ | ||
unsigned long long msgBacklog_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uint64_t
@jai1 can you create individual PRs for the unrelated changes? |
48ee9f3
to
33e93cc
Compare
* topic pass only one PartitionIndex in a call. | ||
* @return a Completeable future for BrokerConsumerStats | ||
*/ | ||
CompletableFuture<BrokerConsumerStats> getBrokerConsumerStatsAsync(int... partitionIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole client API is based on abstracting the partitioning layer. This would be mixing the partition across different layers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
} | ||
|
||
Thread.sleep(20 * 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you reduce the default cache time, so test does not have to wait 20 secs.
|
||
public class BrokerConsumerStatsTest extends BrokerTestBase { | ||
private static final Logger log = LoggerFactory.getLogger(BrokerConsumerStatsTest.class); | ||
private final int totalMessages = 50; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move it to the test-method variable
public class BrokerConsumerStatsTest extends BrokerTestBase { | ||
private static final Logger log = LoggerFactory.getLogger(BrokerConsumerStatsTest.class); | ||
private final int totalMessages = 50; | ||
private String topicName = "persistent://prop/cluster/ns/topic-"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, test-method variable.
|
||
log.debug(consumer1.getBrokerConsumerStatsAsync().get().toString()); | ||
Assert.assertEquals(consumer1.getBrokerConsumerStatsAsync().get().getMsgBacklog(), totalMessages - count); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, Consumer
's default availablePermit is 1000. So, broker can dispatch all 50 messages at this point to consumer so, isn't backlog become 0 in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's shared subscription we send 20 messages per consumer in round robin fashion
* used to check if the stats are still valid. | ||
* | ||
* @return a Completeable future for BrokerConsumerStats | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead can we just say that: stats get refreshed at every X seconds
?
} | ||
} | ||
|
||
public static class UnknownError extends PulsarClientException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for unknownError
we just throw PulsarClientException
. So, we may not need unknownError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per convention:-
Every error in ServerError is mapped to a unique exception in Java and a unique Result in C++ via clientCnx.getPulsarClientException() (Java) and clientConnection.getResult(serverError)
Since UnknownError is a ServerError type I mapped it to a separate ClientException
if (future != null) { | ||
if (response.hasErrorCode()) { | ||
future.completeExceptionally(getPulsarClientException(response.getErrorCode(), | ||
response.hasErrorMessage() ? response.getErrorMessage() : "")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we pass null as errorMsg if not present?
|
||
|
||
CompletableFuture<BrokerConsumerStats> newConsumerStats(String topicName, String subscriptionName, long consumerId, long requestId) { | ||
CompletableFuture<BrokerConsumerStats> future = new CompletableFuture<BrokerConsumerStats>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new CompletableFuture<>()
@@ -92,6 +92,7 @@ | |||
|
|||
private final ConsumerStats stats; | |||
private final int priorityLevel; | |||
private volatile BrokerConsumerStats brokerConsumerStats = new BrokerConsumerStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we need volatile
for stats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, don't want us to change the brokerStats while user is reading it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I see we update brokerConsumerStats
object.
@Override | ||
public CompletableFuture<BrokerConsumerStats> getBrokerConsumerStatsAsync() { | ||
if (getState() != State.Ready || !isConnected()) { | ||
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PulsarClientException.NotConnectedException(ERROR_MSG)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PulsarClientException.NotConnectedException(ERROR_MSG) no such constructor exists
I can add this constructor but I don't think the ERROR_MSG will be much different from the default message.
@Override | ||
public CompletableFuture<BrokerConsumerStats> getBrokerConsumerStatsAsync() { | ||
BrokerConsumerStats brokerConsumerStats = new BrokerConsumerStats(); | ||
List<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lists.newArrayList()
??
f09d38a
to
7949061
Compare
7949061
to
d7a3131
Compare
@rdhabalia @merlimat - kindly review this. |
@merlimat - Please review this when you get time |
…rStatsJavaClient
if (brokerConsumerStats.isValid()) { | ||
CompletableFuture<BrokerConsumerStats> future = new CompletableFuture<BrokerConsumerStats>(); | ||
future.complete(brokerConsumerStats); | ||
return future; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return CompletableFuture.completedFuture(brokerConsumerStats)
?
BrokerConsumerStats brokerConsumerStats = new BrokerConsumerStats(); | ||
List<CompletableFuture<Void>> futures = Lists.newArrayList(); | ||
for (Consumer c : consumers) { | ||
futures.add(c.getBrokerConsumerStatsAsync().thenAcceptAsync(stats -> {brokerConsumerStats.add(stats);})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we want to delegate to another thread using thenApplyAsync
has any advantage here over continue with the same thread using thenApply
.
@@ -92,6 +92,7 @@ | |||
|
|||
private final ConsumerStats stats; | |||
private final int priorityLevel; | |||
private volatile BrokerConsumerStats brokerConsumerStats = new BrokerConsumerStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I see we update brokerConsumerStats
object.
return msgBacklog; | ||
} | ||
|
||
public synchronized void add(BrokerConsumerStats stats) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a thought here for Partitioned consumers: (here we have add(..)
for to aggregate partitioned-consumers result. But only problem is we can't get values for individual consumer as all attributes have aggregate value.
interface BaseBrokerConsumerStats {
//getters
long getAvailablePermits();
boolean isBlockedConsumerOnUnackedMsgs();
}
PartitionedBrokerConsumerStats implements BaseBrokerConsumerStats {
//getters: this will return aggregate result
long getAvailablePermits();
boolean isBlockedConsumerOnUnackedMsgs();
//getters:
long getAvailablePermits(int partition);
boolean isBlockedConsumerOnUnackedMsgs(int partition);
}
BrokerConsumerStats implements BaseBrokerConsumerStats{
//getters
}
and client can cast to PartitionedBrokerConsumerStats
if they are really interested per partitions else consumerStats will be abstract from client.
@rdhabalia - have implemented your suggestion |
@saandrews @rdhabalia - need a plus one on this - please |
@merlimat can you also please review it when you get a chance. |
2951e10
to
f5769e6
Compare
@@ -0,0 +1,84 @@ | |||
/** | |||
* Copyright 2016 Yahoo Inc. | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add relevant documentation as well?
@@ -352,6 +377,7 @@ protected boolean isHandshakeCompleted() { | |||
writeFuture.cause().getMessage()); | |||
getAndRemovePendingLookupRequest(requestId); | |||
future.completeExceptionally(writeFuture.cause()); | |||
getAndRemovePendingLookupRequest(requestId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we remove requestId from pendingLookupRequest twice?
writeFuture.cause().getMessage()); | ||
future.completeExceptionally(writeFuture.cause()); | ||
} else { | ||
pendingConsumerStatsRequests.put(requestId, future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a way to limit pending requests.
* Upgrade dlog dependency * Make pulsar functions work after rebase - bump zk version 3.5 - use the same dlog version as bookkeeper - change standalone to use sorted ledger storage manager (this is a temp fix to address the issues after rebase)
* Fix send async comments Signed-off-by: xiaolong.ran <rxl@apache.org> * fix a little Signed-off-by: xiaolong.ran <rxl@apache.org> * fix a little Signed-off-by: xiaolong.ran <rxl@apache.org> * fix comments Signed-off-by: xiaolong.ran <rxl@apache.org>
Master issue: apache#241 Currently if a Kafka Client tries to list all topics, the topic name in response will contain the namespace prefix like "persistent://public/default". However, for topics in the default namespace, they shouldn't contain the prefix because Kafka Client would like to use the short topic name. So this PR will remove the prefix of topics in the default namespace. * Add a method to remove the default namespace prefix * Use short topic name if the topic is in the default namespace * Change variable name to avoid conflict * Fix simpleProduceAndConsumeWithPulsarAuthed test error
Motivation
Modifications
A new flow added to Java Client
Result
We can get broker side consumer stats using java client.