Skip to content

Commit

Permalink
Replace usages of Atomic* for Atomic*FieldUpdater (#195)
Browse files Browse the repository at this point in the history
Replaced some usages of AtomicInteger, AtomicLong and AtomicReference
for their Atomic*FieldUpdater counterpart on classes which get
instantiated a lot, this should reduce memory usage for the broker
and clients which use lots of topics and consumers.
  • Loading branch information
sschepens authored and merlimat committed Feb 10, 2017
1 parent 68338c7 commit fc9b34c
Show file tree
Hide file tree
Showing 22 changed files with 466 additions and 347 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public void safeRun() {
OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
checkArgument(this == firstInQueue);

ml.numberOfEntries.incrementAndGet();
ml.totalSize.addAndGet(dataLength);
ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
if (ml.hasActiveCursors()) {
// Avoid caching entries if no cursor has been created
ml.entryCache.insert(new EntryImpl(ledger.getId(), entryId, data));
Expand All @@ -148,7 +148,7 @@ public void safeRun() {
data.release();

PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
ml.entriesAddedCounter.incrementAndGet();
ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml);
ml.lastConfirmedEntry = lastEntry;

if (closeWhenDone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.yahoo.pulsar.broker.namespace;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
Expand All @@ -35,7 +35,11 @@ public class OwnedBundle {
* based on {@link #active} flag
*/
private final ReentrantReadWriteLock nsLock = new ReentrantReadWriteLock();
private final AtomicBoolean isActive = new AtomicBoolean(true);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<OwnedBundle> IS_ACTIVE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(OwnedBundle.class, "isActive");
private volatile int isActive = TRUE;

/**
* constructor
Expand All @@ -44,6 +48,7 @@ public class OwnedBundle {
*/
public OwnedBundle(NamespaceBundle suName) {
this.bundle = suName;
IS_ACTIVE_UPDATER.set(this, TRUE);
};

/**
Expand All @@ -55,7 +60,7 @@ public OwnedBundle(NamespaceBundle suName) {
*/
public OwnedBundle(NamespaceBundle suName, boolean active) {
this.bundle = suName;
this.isActive.set(active);
IS_ACTIVE_UPDATER.set(this, active ? TRUE : FALSE);
}

/**
Expand Down Expand Up @@ -90,11 +95,11 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception {

try {
// set the flag locally s.t. no more producer/consumer to this namespace is allowed
if (!this.isActive.compareAndSet(true, false)) {
if (!IS_ACTIVE_UPDATER.compareAndSet(this, TRUE, FALSE)) {
// An exception is thrown when the namespace is not in active state (i.e. another thread is
// removing/have removed it)
throw new IllegalStateException(
"Namespace is not active. ns:" + this.bundle + "; state:" + this.isActive.get());
"Namespace is not active. ns:" + this.bundle + "; state:" + IS_ACTIVE_UPDATER.get(this));
}
} finally {
// no matter success or not, unlock
Expand Down Expand Up @@ -137,10 +142,10 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception {
* @return boolean value indicate that the namespace is active or not.
*/
public boolean isActive() {
return this.isActive.get();
return IS_ACTIVE_UPDATER.get(this) == TRUE;
}

public void setActive(boolean active) {
isActive.set(active);
IS_ACTIVE_UPDATER.set(this, active ? TRUE : FALSE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -67,18 +67,24 @@ public class Consumer {
// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
// increase its availability
private final AtomicInteger messagePermits = new AtomicInteger(0);
private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
private volatile int messagePermits = 0;
// It starts keep tracking of messagePermits once consumer gets blocked, as consumer needs two separate counts:
// messagePermits (1) before and (2) after being blocked: to dispatch only blockedPermit number of messages at the
// time of redelivery
private final AtomicInteger permitsReceivedWhileConsumerBlocked = new AtomicInteger(0);
private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
private volatile int permitsReceivedWhileConsumerBlocked = 0;

private final ConcurrentOpenHashMap<PositionImpl, Integer> pendingAcks;

private final ConsumerStats stats;

private final int maxUnackedMessages;
private AtomicInteger unackedMessages = new AtomicInteger(0);
private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
private volatile int unackedMessages = 0;
private volatile boolean blockedConsumerOnUnackedMsgs = false;

public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName,
Expand All @@ -93,6 +99,9 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
this.appId = appId;
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0);

stats = new ConsumerStats();
stats.address = cnx.clientAddress().toString();
Expand Down Expand Up @@ -181,7 +190,7 @@ public Pair<ChannelPromise, Integer> sendMessages(final List<Entry> entries) {
}

private void incrementUnackedMessages(int ackedMessages) {
if (unackedMessages.addAndGet(ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) {
if (UNACKED_MESSAGES_UPDATER.addAndGet(this, ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) {
blockedConsumerOnUnackedMsgs = true;
}
}
Expand Down Expand Up @@ -226,7 +235,7 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) {
permitsToReduce += batchSize;
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int permits = messagePermits.addAndGet(-permitsToReduce);
int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce);
incrementUnackedMessages(permitsToReduce);
if (permits < 0) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -305,15 +314,15 @@ void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.get() >= maxUnackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && UNACKED_MESSAGES_UPDATER.get(this) >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
oldPermits = messagePermits.getAndAdd(additionalNumberOfMessages);
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
subscription.consumerFlow(this, additionalNumberOfMessages);
} else {
oldPermits = permitsReceivedWhileConsumerBlocked.getAndAdd(additionalNumberOfMessages);
oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
}

if (log.isDebugEnabled()) {
Expand All @@ -331,15 +340,15 @@ void flowPermits(int additionalNumberOfMessages) {
* Consumer whose blockedPermits needs to be dispatched
*/
void flowConsumerBlockedPermits(Consumer consumer) {
int additionalNumberOfPermits = consumer.permitsReceivedWhileConsumerBlocked.getAndSet(0);
int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
// add newly flow permits to actual consumer.messagePermits
consumer.messagePermits.getAndAdd(additionalNumberOfPermits);
MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
// dispatch pending permits to flow more messages: it will add more permits to dispatcher and consumer
subscription.consumerFlow(consumer, additionalNumberOfPermits);
}

public int getAvailablePermits() {
return messagePermits.get();
return MESSAGE_PERMITS_UPDATER.get(this);
}

public boolean isBlocked() {
Expand Down Expand Up @@ -367,7 +376,7 @@ public void updateRates() {

public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages.get();
stats.unackedMessages = UNACKED_MESSAGES_UPDATER.get(this);
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
return stats;
}
Expand Down Expand Up @@ -433,7 +442,7 @@ private void removePendingAcks(PositionImpl position) {
int totalAckedMsgs = ackOwnedConsumer.getPendingAcks().remove(position);
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (((ackOwnedConsumer.unackedMessages.addAndGet(-totalAckedMsgs) <= (maxUnackedMessages / 2))
if (((UNACKED_MESSAGES_UPDATER.addAndGet(ackOwnedConsumer, -totalAckedMsgs) <= (maxUnackedMessages / 2))
&& ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
Expand All @@ -450,7 +459,7 @@ public ConcurrentOpenHashMap<PositionImpl, Integer> getPendingAcks() {

public void redeliverUnacknowledgedMessages() {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
unackedMessages.set(0);
UNACKED_MESSAGES_UPDATER.set(this, 0);
blockedConsumerOnUnackedMsgs = false;
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
Expand Down Expand Up @@ -479,21 +488,20 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
}
}

unackedMessages.addAndGet(-totalRedeliveryMessages);
UNACKED_MESSAGES_UPDATER.addAndGet(this, -totalRedeliveryMessages);
blockedConsumerOnUnackedMsgs = false;

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);

int numberOfBlockedPermits = Math.min(totalRedeliveryMessages,
permitsReceivedWhileConsumerBlocked.get());
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.get(this));

// if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages
if (numberOfBlockedPermits > 0) {
permitsReceivedWhileConsumerBlocked.getAndAdd(-numberOfBlockedPermits);
messagePermits.getAndAdd(numberOfBlockedPermits);
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, -numberOfBlockedPermits);
MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
subscription.consumerFlow(this, numberOfBlockedPermits);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -46,7 +46,9 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche

private final PersistentTopic topic;
private final ManagedCursor cursor;
private final AtomicReference<Consumer> activeConsumer = new AtomicReference<Consumer>();
private static final AtomicReferenceFieldUpdater<PersistentDispatcherSingleActiveConsumer, Consumer> ACTIVE_CONSUMER_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(PersistentDispatcherSingleActiveConsumer.class, Consumer.class, "activeConsumer");
private volatile Consumer activeConsumer = null;
private final CopyOnWriteArrayList<Consumer> consumers;
private boolean havePendingRead = false;
private CompletableFuture<Void> closeFuture = null;
Expand All @@ -67,6 +69,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
this.partitionIndex = partitionIndex;
this.subscriptionType = subscriptionType;
this.readBatchSize = MaxReadBatchSize;
ACTIVE_CONSUMER_UPDATER.set(this, null);
}

private void pickAndScheduleActiveConsumer() {
Expand All @@ -75,9 +78,9 @@ private void pickAndScheduleActiveConsumer() {
consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName()));

int index = partitionIndex % consumers.size();
Consumer prevConsumer = activeConsumer.getAndSet(consumers.get(index));
Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));

if (prevConsumer == activeConsumer.get()) {
if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) {
// Active consumer did not change. Do nothing at this point
return;
}
Expand All @@ -90,7 +93,7 @@ private void pickAndScheduleActiveConsumer() {
// let it finish and then rewind
if (!havePendingRead) {
cursor.rewind();
readMoreEntries(activeConsumer.get());
readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
}
}

Expand All @@ -115,7 +118,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
}

if (consumers.isEmpty()) {
activeConsumer.set(null);
ACTIVE_CONSUMER_UPDATER.set(this, null);
}

if (closeFuture == null && !consumers.isEmpty()) {
Expand Down Expand Up @@ -143,7 +146,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
*/
@Override
public synchronized boolean canUnsubscribe(Consumer consumer) {
return (consumers.size() == 1) && Objects.equals(consumer, activeConsumer.get());
return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
}

/**
Expand Down Expand Up @@ -189,7 +192,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o

readFailureBackoff.reduceToHalf();

Consumer currentConsumer = activeConsumer.get();
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer == null || readConsumer != currentConsumer) {
// Active consumer has changed since the read request has been issued. We need to rewind the cursor and
// re-issue the read request for the new consumer
Expand All @@ -203,7 +206,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
if (future.isSuccess()) {
// Schedule a new read batch operation only after the previous batch has been written to the socket
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = activeConsumer.get();
Consumer newConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
} else {
Expand All @@ -222,7 +225,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
@Override
public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
if (!havePendingRead) {
if (activeConsumer.get() == consumer) {
if (ACTIVE_CONSUMER_UPDATER.get(this) == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Trigger new read after receiving flow control message", consumer);
}
Expand All @@ -242,7 +245,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
if (consumer != activeConsumer.get()) {
if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
log.info("[{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
consumer);
return;
Expand Down Expand Up @@ -320,7 +323,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj

topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer = activeConsumer.get();
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
// we should retry the read if we have an active consumer and there is no pending read
if (currentConsumer != null && !havePendingRead) {
if (log.isDebugEnabled()) {
Expand All @@ -340,7 +343,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj

@Override
public boolean isConsumerConnected() {
return activeConsumer.get() != null;
return ACTIVE_CONSUMER_UPDATER.get(this) != null;
}

@Override
Expand All @@ -354,7 +357,7 @@ public SubType getType() {
}

public Consumer getActiveConsumer() {
return activeConsumer.get();
return ACTIVE_CONSUMER_UPDATER.get(this);
}

}
Loading

0 comments on commit fc9b34c

Please sign in to comment.