Skip to content

Commit

Permalink
[#11498] Replace AtomicReference with AtomicReferenceFieldUpdater
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Sep 13, 2024
1 parent 4699b99 commit fc086c7
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import reactor.core.Disposable;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* @author youngjin.kim2
Expand All @@ -34,20 +34,26 @@ public class DeferredDisposable implements Disposable {
private static final Disposable COMPLETE = () -> {
};

private final AtomicReference<Disposable> delegateRef = new AtomicReference<>(UNINITIALIZED);
private static final AtomicReferenceFieldUpdater<DeferredDisposable, Disposable> UPDATER
= AtomicReferenceFieldUpdater.newUpdater(DeferredDisposable.class, Disposable.class, "delegate");

private volatile Disposable delegate = UNINITIALIZED;



@Override
public void dispose() {
this.delegateRef.getAndSet(COMPLETE).dispose();
Disposable disposable = UPDATER.getAndSet(this, COMPLETE);
disposable.dispose();
}

@Override
public boolean isDisposed() {
return this.delegateRef.get() == COMPLETE;
return UPDATER.get(this) == COMPLETE;
}

public void setDisposable(Disposable target) {
if (!this.delegateRef.compareAndSet(UNINITIALIZED, target)) {
if (!UPDATER.compareAndSet(this, UNINITIALIZED, target)) {
target.dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.collector.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.rpc.packet.stream.StreamClosePacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamCode;
import com.navercorp.pinpoint.rpc.packet.stream.StreamResponsePacket;
Expand All @@ -26,18 +27,16 @@
import com.navercorp.pinpoint.rpc.stream.StreamChannelStateChangeEventHandler;
import com.navercorp.pinpoint.rpc.stream.StreamChannelStateCode;
import com.navercorp.pinpoint.rpc.stream.StreamException;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* @author Taejin Koo
Expand All @@ -46,8 +45,11 @@ public class GrpcClientStreamChannel extends AbstractStreamChannel implements Cl

private final Logger logger = LogManager.getLogger(this.getClass());

private final AtomicReference<StreamObserver<Empty>> connectionObserverReference = new AtomicReference<>();
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<GrpcClientStreamChannel, StreamObserver> REF
= AtomicReferenceFieldUpdater.newUpdater(GrpcClientStreamChannel.class, StreamObserver.class, "connectionObserver");

private volatile StreamObserver<Empty> connectionObserver;
private final InetSocketAddress remoteAddress;
private final ClientStreamChannelEventHandler streamChannelEventHandler;

Expand Down Expand Up @@ -78,12 +80,12 @@ public void connect(Runnable connectRunnable, long timeout) throws StreamExcepti

// handle create success
public boolean setConnectionObserver(StreamObserver<Empty> connectionObserver) {
return connectionObserverReference.compareAndSet(null, connectionObserver);
return REF.compareAndSet(this, null, connectionObserver);
}

@Override
public boolean changeStateConnected() {
if (connectionObserverReference.get() == null) {
if (REF.get(this) == null) {
return false;
}
return super.changeStateConnected();
Expand Down Expand Up @@ -119,7 +121,8 @@ public void disconnect(StreamCode code) {
}

private void close0(StreamCode code) {
StreamObserver<Empty> connectionObserver = connectionObserverReference.get();
@SuppressWarnings("unchecked")
StreamObserver<Empty> connectionObserver = REF.get(this);
if (connectionObserver != null) {
if (code == StreamCode.STATE_CLOSED) {
Empty empty = Empty.newBuilder().build();
Expand All @@ -130,7 +133,7 @@ private void close0(StreamCode code) {
connectionObserver.onNext(empty);
connectionObserver.onError(new StatusException(Status.ABORTED.withDescription(code.name())));
}
connectionObserverReference.compareAndSet(connectionObserver, null);
REF.compareAndSet(this, connectionObserver, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* @author Taejin Koo
*/
public class GrpcClientStreamChannelTest {
private final Logger logger = LogManager.getLogger(this.getClass());

private final InetSocketAddress mockRemoteAddress = new InetSocketAddress("127.0.0.1", 61611);

Expand All @@ -49,7 +50,7 @@ public void connectTimeoutTest() throws StreamException {

grpcClientStreamChannel.init();

StreamException streamException = null;
StreamException streamException = new StreamException(StreamCode.STATE_CLOSED);
try {
grpcClientStreamChannel.connect(new Runnable() {
@Override
Expand All @@ -70,7 +71,7 @@ public void connectFailTest() throws StreamException {
GrpcClientStreamChannel grpcClientStreamChannel = new GrpcClientStreamChannel(mockRemoteAddress, 20, new StreamChannelRepository(), ClientStreamChannelEventHandler.DISABLED_INSTANCE);
grpcClientStreamChannel.init();

StreamException streamException = null;
StreamException streamException = new StreamException(StreamCode.STATE_CLOSED);
try {
grpcClientStreamChannel.connect(new Runnable() {
@Override
Expand Down Expand Up @@ -102,7 +103,7 @@ public void simpleTest() throws StreamException, InterruptedException {
CountDownLatch threadCompleteLatch = connect(grpcClientStreamChannel, connectCompleteLatch);

final AtomicInteger callCompletedCount = new AtomicInteger(0);
grpcClientStreamChannel.setConnectionObserver(new StreamObserver<Empty>() {
grpcClientStreamChannel.setConnectionObserver(new StreamObserver<>() {
@Override
public void onNext(Empty value) {

Expand Down Expand Up @@ -159,7 +160,7 @@ public void run() {
}
}, 1000);
} catch (StreamException e) {
e.printStackTrace();
logger.debug("connect error", e);
} finally {
threadCompleteLatch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,45 @@

package com.navercorp.pinpoint.common.server.cluster.zookeeper.util;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

public class CommonStateContext {

private final AtomicReference<CommonState> currentState = new AtomicReference<>();
private static final AtomicReferenceFieldUpdater<CommonStateContext, CommonState> REF
= AtomicReferenceFieldUpdater.newUpdater(CommonStateContext.class, CommonState.class, "currentState");

private volatile CommonState currentState = null;

public CommonStateContext() {
currentState.set(CommonState.NEW);
REF.set(this, CommonState.NEW);
}

public CommonState getCurrentState() {
return currentState.get();
return REF.get(this);
}

public boolean changeStateInitializing() {
return currentState.compareAndSet(CommonState.NEW, CommonState.INITIALIZING);
return REF.compareAndSet(this, CommonState.NEW, CommonState.INITIALIZING);
}

public boolean changeStateStarted() {
return currentState.compareAndSet(CommonState.INITIALIZING, CommonState.STARTED);
return REF.compareAndSet(this, CommonState.INITIALIZING, CommonState.STARTED);
}

public boolean changeStateDestroying() {
return currentState.compareAndSet(CommonState.STARTED, CommonState.DESTROYING);
return REF.compareAndSet(this, CommonState.STARTED, CommonState.DESTROYING);
}

public boolean changeStateStopped() {
return currentState.compareAndSet(CommonState.DESTROYING, CommonState.STOPPED);
return REF.compareAndSet(this, CommonState.DESTROYING, CommonState.STOPPED);
}

public boolean changeStateIllegal() {
currentState.set(CommonState.ILLEGAL_STATE);
REF.set(this, CommonState.ILLEGAL_STATE);
return true;
}

public boolean isStarted() {
return currentState.get() == CommonState.STARTED;
return REF.get(this) == CommonState.STARTED;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.navercorp.pinpoint.common.server.cluster.zookeeper.util;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

class CommonStateContextTest {

@Test
void getCurrentState() {
CommonStateContext commonStateContext = new CommonStateContext();
assertEquals(CommonState.NEW, commonStateContext.getCurrentState());
}

@Test
void changeStateInitializing() {
CommonStateContext commonStateContext = new CommonStateContext();
assertTrue(commonStateContext.changeStateInitializing());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* @author youngjin.kim2
*/
public class IntervalRunner implements InitializingBean, DisposableBean {

private static final AtomicReferenceFieldUpdater<IntervalRunner, Disposable> REF
= AtomicReferenceFieldUpdater.newUpdater(IntervalRunner.class, Disposable.class, "disposable");

private final Runnable runnable;
private final Duration period;
private final Scheduler scheduler;

private final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
private volatile Disposable disposable;

public IntervalRunner(Runnable runnable, Duration period, Scheduler scheduler) {
this.runnable = Objects.requireNonNull(runnable, "runnable");
Expand All @@ -44,7 +47,7 @@ public IntervalRunner(Runnable runnable, Duration period, Scheduler scheduler) {

@Override
public void destroy() {
Disposable disposable = this.disposableRef.get();
Disposable disposable = REF.get(this);
if (disposable != null) {
disposable.dispose();
}
Expand All @@ -53,7 +56,7 @@ public void destroy() {
@Override
public void afterPropertiesSet() throws Exception {
Disposable disposable = Flux.interval(this.period, this.scheduler).subscribe(t -> this.runnable.run());
if (!disposableRef.compareAndSet(null, disposable)) {
if (!REF.compareAndSet(this, null, disposable)) {
disposable.dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ public class IntervalRunnerTest {

@Test
public void test() throws Exception {
AtomicInteger counter = new AtomicInteger(0);
IntervalRunner runner = new IntervalRunner(() -> {
counter.getAndIncrement();
}, Duration.ofMillis(10), Schedulers.boundedElastic());
final AtomicInteger counter = new AtomicInteger(0);

IntervalRunner runner = new IntervalRunner(counter::getAndIncrement, Duration.ofMillis(10), Schedulers.boundedElastic());
runner.afterPropertiesSet();
Mono.delay(Duration.ofMillis(100)).block();
runner.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* @author youngjin.kim2
Expand Down Expand Up @@ -118,18 +118,21 @@ public void run() {
}

private static class SupplyCollector {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SupplyCollector, List> REF
= AtomicReferenceFieldUpdater.newUpdater(SupplyCollector.class, List.class, "agents");
String applicationName;
long supplyExpiredIn;

long sessionStartedAt = System.currentTimeMillis();
long shouldConnectUntil = sessionStartedAt + MAX_CONNECTION_WAITING_MILLIS;

AtomicReference<List<ClusterKey>> agentsRef;
private volatile List<ClusterKey> agents;
Map<ClusterKey, ATCSupply> supplyMap = new ConcurrentHashMap<>();
Map<ClusterKey, Long> updatedAtMap = new ConcurrentHashMap<>();

SupplyCollector(String applicationName, long supplyExpiredIn) {
this.agentsRef = new AtomicReference<>(null);
this.agents = null;
this.applicationName = applicationName;
this.supplyExpiredIn = supplyExpiredIn;
}
Expand All @@ -150,7 +153,8 @@ public void add(ATCSupply supply) {
}

public ActiveThreadCountResponse compose(Long t) {
List<ClusterKey> agents = this.agentsRef.get();
@SuppressWarnings("unchecked")
List<ClusterKey> agents = REF.get(this);
if (agents == null) {
return null;
}
Expand All @@ -164,7 +168,7 @@ public ActiveThreadCountResponse compose(Long t) {
}

public void updateAgents(List<ClusterKey> agents) {
this.agentsRef.set(agents);
REF.set(this, agents);
}

private void putAgent(ActiveThreadCountResponse response, ClusterKey agent, long now) {
Expand Down

0 comments on commit fc086c7

Please sign in to comment.