Skip to content

Commit

Permalink
Add numMessagesInQueue and getAvailablePermits for PartitionedConsume…
Browse files Browse the repository at this point in the history
…rImpl
  • Loading branch information
sschepens committed Oct 18, 2016
1 parent b2805b8 commit 8ec9f88
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ protected SubType getSubType() {

abstract public boolean isConnected();

abstract public int getAvailablePermits();

abstract public int numMessagesInQueue();

public CompletableFuture<Consumer> subscribeFuture() {
return subscribeFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,10 +858,12 @@ int getPartitionIndex() {
return partitionIndex;
}

@Override
public int getAvailablePermits() {
return availablePermits.get();
}

@Override
public int numMessagesInQueue() {
return incomingMessages.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,16 @@ List<ConsumerImpl> getConsumers() {
return consumers;
}

@Override
public int getAvailablePermits() {
return consumers.stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
}

@Override
public int numMessagesInQueue() {
return consumers.stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
}

@Override
public synchronized ConsumerStats getStats() {
if (stats == null) {
Expand Down

0 comments on commit 8ec9f88

Please sign in to comment.