From 6c779c6d4e589247239753c402f1b0bafb7d3b1d Mon Sep 17 00:00:00 2001 From: emeroad Date: Mon, 7 Oct 2024 17:00:35 +0900 Subject: [PATCH] [#11497] Refactor ActiveThreadCountStreamSocket --- .../grpc/AgentGrpcDataSenderProvider.java | 11 +- .../grpc/ActiveThreadCountStreamSocket.java | 89 ++++++++---- .../grpc/GrpcActiveThreadCountService.java | 78 ++-------- .../receiver/grpc/GrpcCommandDispatcher.java | 11 +- .../receiver/grpc/GrpcCommandService.java | 47 ++++--- .../grpc/GrpcProfilerStreamSocket.java | 11 +- .../receiver/grpc/GrpcStreamService.java | 133 ++++++++++++++---- .../grpc/PinpointClientResponseObserver.java | 87 ------------ .../sender/grpc/PingStreamContext.java | 3 +- .../ActiveThreadCountStreamSocketTest.java | 12 +- .../receiver/grpc/GrpcStreamServiceTest.java | 56 ++++++++ .../PinpointClientResponseObserverTest.java | 31 ++-- .../stream/ClientCallStateStreamObserver.java | 106 ++++++++++++++ .../pinpoint/grpc/stream/ObserverState.java | 34 +++++ .../pinpoint/grpc/stream}/StreamUtils.java | 32 ++++- .../ClientCallStateStreamObserverTest.java | 49 +++++++ 16 files changed, 527 insertions(+), 263 deletions(-) delete mode 100644 agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java create mode 100644 agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcStreamServiceTest.java create mode 100644 grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserver.java create mode 100644 grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ObserverState.java rename {agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc => grpc/src/main/java/com/navercorp/pinpoint/grpc/stream}/StreamUtils.java (50%) create mode 100644 grpc/src/test/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserverTest.java diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java index a387e82b1711..00c36fbad0ae 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java @@ -40,6 +40,7 @@ import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcActiveThreadDumpService; import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcActiveThreadLightDumpService; import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcEchoService; +import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcStreamService; import com.navercorp.pinpoint.profiler.sender.grpc.AgentGrpcDataSender; import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor; import io.grpc.ClientInterceptor; @@ -166,12 +167,16 @@ private ProfilerCommandServiceLocator createProfilerCommandServiceLocator(Active profilerCommandLocatorBuilder.addService(new GrpcEchoService()); if (activeTraceRepository != null) { - profilerCommandLocatorBuilder.addService(new GrpcActiveThreadCountService(activeTraceRepository)); + GrpcActiveThreadCountService grpcActiveThreadCountService = newActiveThreadCountService(activeTraceRepository); + profilerCommandLocatorBuilder.addService(grpcActiveThreadCountService); profilerCommandLocatorBuilder.addService(new GrpcActiveThreadDumpService(activeTraceRepository, threadDumpMapper)); profilerCommandLocatorBuilder.addService(new GrpcActiveThreadLightDumpService(activeTraceRepository, threadDumpMapper)); } + return profilerCommandLocatorBuilder.build(); + } - final ProfilerCommandServiceLocator commandServiceLocator = profilerCommandLocatorBuilder.build(); - return commandServiceLocator; + private GrpcActiveThreadCountService newActiveThreadCountService(ActiveTraceRepository activeTraceRepository) { + GrpcStreamService grpcStreamService = new GrpcStreamService("ActiveThreadCountService", GrpcStreamService.DEFAULT_FLUSH_DELAY, activeTraceRepository); + return new GrpcActiveThreadCountService(grpcStreamService); } } \ No newline at end of file diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java index 20ba9a6a27e3..0dfffa727c4b 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java @@ -17,34 +17,49 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; import com.google.protobuf.Empty; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; +import com.navercorp.pinpoint.grpc.stream.StreamUtils; import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes; import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import javax.annotation.Nullable; import java.util.Objects; /** * @author Taejin Koo */ -public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket { +public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket, + ClientResponseObserver { private final Logger logger = LogManager.getLogger(this.getClass()); private final GrpcStreamService grpcStreamService; + private final int socketId; private final int streamObserverId; + private int sequenceId = 0; - private final PinpointClientResponseObserver clientResponseObserver; + private ClientCallStateStreamObserver requestStream; + private volatile boolean closed = false; - public ActiveThreadCountStreamSocket(int streamObserverId, GrpcStreamService grpcStreamService) { + public ActiveThreadCountStreamSocket(int socketId, int streamObserverId, + GrpcStreamService grpcStreamService) { + this.socketId = socketId; this.streamObserverId = streamObserverId; this.grpcStreamService = Objects.requireNonNull(grpcStreamService, "grpcStreamService"); - this.clientResponseObserver = new PinpointClientResponseObserver<>(this); + } + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + this.requestStream = ClientCallStateStreamObserver.clientCall(requestStream); } public PCmdStreamResponse newHeader() { @@ -54,13 +69,20 @@ public PCmdStreamResponse newHeader() { return headerResponseBuilder.build(); } + @Override - public void send(PCmdActiveThreadCountRes activeThreadCount) { - if (clientResponseObserver.isReady()) { - clientResponseObserver.sendRequest(activeThreadCount); + public boolean send(PCmdActiveThreadCountRes activeThreadCount) { + if (closed) { + return false; + } + final CallStreamObserver request = this.requestStream; + if (request.isReady()) { + request.onNext(activeThreadCount); + return true; } else { - logger.info("Send fail. (ActiveThreadCount) client is not ready. streamObserverId:{}", streamObserverId); + logger.info("Send fail. (ActiveThreadCount) client is not ready. socketId:{} streamObserverId:{}", socketId, streamObserverId); } + return false; } private int getSequenceId() { @@ -69,45 +91,62 @@ private int getSequenceId() { @Override public void close() { - logger.info("close"); - close0(null); + close(null); } @Override - public void close(Throwable throwable) { + public void close(@Nullable Throwable throwable) { + if (closed) { + return; + } logger.warn("close", throwable); - close0(throwable); + dispose(); + + StreamUtils.onCompleted(requestStream, (th) -> logger.info("close", th)); + } + + public boolean isClosed() { + return closed; } + private void dispose() { + this.closed = true; + grpcStreamService.unregister(this); + } + @Override - public void disconnect() { - logger.info("disconnect"); - close0(null); + public void onNext(Empty empty) { + logger.debug("onNext {}", empty); } @Override - public void disconnect(Throwable throwable) { + public void onError(Throwable throwable) { Status status = Status.fromThrowable(throwable); Metadata metadata = Status.trailersFromThrowable(throwable); - logger.info("disconnect. {} {}", status, metadata); - close0(throwable); - } + logger.info("onError {}. {} {}", this, status, metadata); - private void close0(Throwable throwable) { - clientResponseObserver.close(throwable); - grpcStreamService.unregister(this); + this.dispose(); + if (requestStream.isRun()) { + StreamUtils.onCompleted(requestStream, (th) -> logger.info("onError", th)); + } } @Override - public ClientResponseObserver getResponseObserver() { - return clientResponseObserver; + public void onCompleted() { + logger.info("onCompleted {}", this); + + this.dispose(); + if (requestStream.isRun()) { + StreamUtils.onCompleted(requestStream, (th) -> logger.info("onCompleted", th)); + } } @Override public String toString() { return "ActiveThreadCountStreamSocket{" + - "streamObserverId=" + streamObserverId + + "socketId=" + socketId + + ", streamObserverId=" + streamObserverId + '}'; } } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java index ecb97b6ad64e..88c94280cb15 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java @@ -16,41 +16,29 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; -import com.google.protobuf.Empty; -import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes; import com.navercorp.pinpoint.grpc.trace.PCmdRequest; -import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse; import com.navercorp.pinpoint.grpc.trace.PCommandType; import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc; -import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram; -import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository; -import io.grpc.stub.ClientResponseObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; -import java.util.List; import java.util.Objects; -import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicInteger; /** * @author Taejin Koo */ public class GrpcActiveThreadCountService implements ProfilerGrpcCommandService, Closeable { - private static final long DEFAULT_FLUSH_DELAY = 1000; - private final Logger logger = LogManager.getLogger(getClass()); - private final boolean isDebug = logger.isDebugEnabled(); - - private final ActiveTraceRepository activeTraceRepository; + private final AtomicInteger sequence = new AtomicInteger(0); - private final GrpcStreamService grpcStreamService = new GrpcStreamService("ActiveThreadCountService", DEFAULT_FLUSH_DELAY); + private final GrpcStreamService grpcStreamService ; - public GrpcActiveThreadCountService(ActiveTraceRepository activeTraceRepository) { - this.activeTraceRepository = Objects.requireNonNull(activeTraceRepository, "activeTraceRepository"); + public GrpcActiveThreadCountService(GrpcStreamService grpcStreamService) { + this.grpcStreamService = Objects.requireNonNull(grpcStreamService, "grpcStreamService"); } @Override @@ -59,29 +47,13 @@ public short getCommandServiceCode() { } @Override - public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerCommandServiceStub profilerCommandServiceStub) { - ActiveThreadCountStreamSocket activeThreadCountStreamSocket = new ActiveThreadCountStreamSocket(request.getRequestId(), grpcStreamService); - ClientResponseObserver responseObserver = activeThreadCountStreamSocket.getResponseObserver(); - profilerCommandServiceStub.commandStreamActiveThreadCount(responseObserver); + public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerCommandServiceStub commandServiceStub) { + ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(sequence.getAndIncrement(), request.getRequestId(), grpcStreamService); + commandServiceStub.commandStreamActiveThreadCount(socket); - grpcStreamService.register(activeThreadCountStreamSocket, new ActiveThreadCountTimerTask()); + grpcStreamService.register(socket); } - private PCmdActiveThreadCountRes.Builder getActiveThreadCountResponse() { - final long currentTime = System.currentTimeMillis(); - final ActiveTraceHistogram histogram = activeTraceRepository.getActiveTraceHistogram(currentTime); - - PCmdActiveThreadCountRes.Builder responseBuilder = PCmdActiveThreadCountRes.newBuilder(); - responseBuilder.setTimeStamp(currentTime); - responseBuilder.setHistogramSchemaType(histogram.getHistogramSchema().getTypeCode()); - - final List activeTraceCountList = histogram.getCounter(); - for (Integer activeTraceCount : activeTraceCountList) { - responseBuilder.addActiveThreadCount(activeTraceCount); - } - - return responseBuilder; - } @Override public void close() throws IOException { @@ -89,36 +61,4 @@ public void close() throws IOException { grpcStreamService.close(); } - private class ActiveThreadCountTimerTask extends TimerTask { - - @Override - public void run() { - if (isDebug) { - logger.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList())); - } - - PCmdActiveThreadCountRes.Builder activeThreadCountResponseBuilder = getActiveThreadCountResponse(); - for (GrpcProfilerStreamSocket streamSocket : grpcStreamService.getStreamSocketList()) { - if (streamSocket instanceof ActiveThreadCountStreamSocket) { - try { - final ActiveThreadCountStreamSocket stream = (ActiveThreadCountStreamSocket) streamSocket; - - PCmdStreamResponse header = stream.newHeader(); - activeThreadCountResponseBuilder.setCommonStreamResponse(header); - PCmdActiveThreadCountRes activeThreadCount = activeThreadCountResponseBuilder.build(); - - stream.send(activeThreadCount); - if (isDebug) { - logger.debug("ActiveThreadCountStreamSocket. {}", stream); - } - } catch (Throwable e) { - logger.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e); - streamSocket.close(e); - } - } - } - } - - } - } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcCommandDispatcher.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcCommandDispatcher.java index 7bd57ab56d0b..7299c231469c 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcCommandDispatcher.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcCommandDispatcher.java @@ -60,16 +60,23 @@ public void handle(PCmdRequest commandRequest, StreamObserver strea logger.warn("Failed to handle commandService. message:{}", e.getMessage(), e); PCmdResponse failMessage = createFailMessage(commandRequest, e.getMessage()); if (streamObserver != null) { - streamObserver.onNext(PCmdMessage.newBuilder().setFailMessage(failMessage).build()); + PCmdMessage cmd = failCmdMessage(failMessage); + streamObserver.onNext(cmd); } } } else { PCmdResponse failMessage = createFailMessage(commandRequest, RouteResult.NOT_SUPPORTED_REQUEST.name()); if (streamObserver != null) { - streamObserver.onNext(PCmdMessage.newBuilder().setFailMessage(failMessage).build()); + PCmdMessage failCmd = failCmdMessage(failMessage); + streamObserver.onNext(failCmd); } } + } + private PCmdMessage failCmdMessage(PCmdResponse failMessage) { + return PCmdMessage.newBuilder() + .setFailMessage(failMessage) + .build(); } private PCmdResponse createFailMessage(PCmdRequest commandRequest, String message) { diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcCommandService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcCommandService.java index b3d29e32f415..5763eb7b717c 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcCommandService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcCommandService.java @@ -16,14 +16,16 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; +import com.navercorp.pinpoint.grpc.MessageFormatUtils; import com.navercorp.pinpoint.grpc.client.SupportCommandCodeClientInterceptor; +import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver; +import com.navercorp.pinpoint.grpc.stream.StreamUtils; import com.navercorp.pinpoint.grpc.trace.PCmdMessage; import com.navercorp.pinpoint.grpc.trace.PCmdRequest; import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc; import com.navercorp.pinpoint.profiler.receiver.ProfilerCommandServiceLocator; import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor; import com.navercorp.pinpoint.profiler.sender.grpc.Reconnector; -import com.navercorp.pinpoint.profiler.sender.grpc.StreamUtils; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.stub.ClientCallStreamObserver; @@ -67,10 +69,11 @@ public void run() { } private void connect() { - logger.info("Attempt to connect to CommandServiceStream."); if (shutdown) { + logger.info("Already shutdown"); return; } + logger.info("Attempt to connect to CommandServiceStream"); ProfilerCommandServiceGrpc.ProfilerCommandServiceStub profilerCommandServiceStub = newCommandServiceStub(commandServiceStubFactory, profilerCommandServiceLocator); GrpcCommandDispatcher commandDispatcher = new GrpcCommandDispatcher(profilerCommandServiceStub, profilerCommandServiceLocator); @@ -93,34 +96,38 @@ private void reserveReconnect() { public void stop() { logger.info("stop() started"); - if (!shutdown) { - // It's okay to be called multiple times. - this.shutdown = true; + if (shutdown) { + logger.info("Already shutdown"); + } - final CommandServiceMainStreamObserver commandServiceMainStreamObserver = this.commandServiceMainStreamObserver; - if (commandServiceMainStreamObserver != null) { - commandServiceMainStreamObserver.stop(); - } + // It's okay to be called multiple times. + this.shutdown = true; + + final CommandServiceMainStreamObserver observer = this.commandServiceMainStreamObserver; + if (observer != null) { + observer.stop(); } } private class CommandServiceMainStreamObserver implements ClientResponseObserver { + private final Logger logger = LogManager.getLogger(this.getClass()); private final GrpcCommandDispatcher commandDispatcher; - private ClientCallStreamObserver requestStream; + private ClientCallStateStreamObserver requestStream; public CommandServiceMainStreamObserver(GrpcCommandDispatcher commandDispatcher) { this.commandDispatcher = Objects.requireNonNull(commandDispatcher, "commandDispatcher"); } @Override - public void beforeStart(final ClientCallStreamObserver requestStream) { - this.requestStream = requestStream; + public void beforeStart(final ClientCallStreamObserver stream) { + + this.requestStream = ClientCallStateStreamObserver.clientCall(stream); requestStream.setOnReadyHandler(new Runnable() { @Override public void run() { - logger.info("Connect to CommandServiceStream completed."); + logger.info("Connect to CommandServiceStream completed"); reconnector.reset(); } }); @@ -130,7 +137,7 @@ public void run() { @Override public void onNext(PCmdRequest request) { if (isDebug) { - logger.debug("received request:{}", request); + logger.debug("received request:{}", MessageFormatUtils.debugLog(request)); } if (request != null) { @@ -144,8 +151,8 @@ public void onError(Throwable t) { Metadata metadata = Status.trailersFromThrowable(t); logger.info("Failed to command stream, {} {}", status, metadata); - if (requestStream != null) { - requestStream.onError(t); + if (requestStream.isRun()) { + StreamUtils.onCompleted(requestStream, (th) -> logger.info("CommandService.onError", th)); } reserveReconnect(); @@ -154,14 +161,16 @@ public void onError(Throwable t) { @Override public void onCompleted() { logger.info("onCompleted"); - StreamUtils.close(requestStream, GrpcCommandService.this.logger); - // TODO : needs to check whether needs new action + + if (requestStream.isRun()) { + StreamUtils.onCompleted(requestStream, (th) -> logger.info("CommandService.onCompleted", th)); + } reserveReconnect(); } private void stop() { logger.info("stop"); - StreamUtils.close(requestStream, GrpcCommandService.this.logger); + StreamUtils.onCompleted(requestStream, (th) -> logger.info("CommandService.stop")); commandDispatcher.close(); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcProfilerStreamSocket.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcProfilerStreamSocket.java index eec6baae8b33..8e4045a23cc1 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcProfilerStreamSocket.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcProfilerStreamSocket.java @@ -16,23 +16,16 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; -import io.grpc.stub.ClientResponseObserver; - /** * @author Taejin Koo */ -public interface GrpcProfilerStreamSocket { +public interface GrpcProfilerStreamSocket { - void send(Req send); + boolean send(Req send); void close(); void close(Throwable throwable); - void disconnect(); - - void disconnect(Throwable throwable); - - ClientResponseObserver getResponseObserver(); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcStreamService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcStreamService.java index 35c58b4eeefb..870d103b8105 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcStreamService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcStreamService.java @@ -16,83 +16,115 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; +import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory; import com.navercorp.pinpoint.common.util.Assert; +import com.navercorp.pinpoint.grpc.ExecutorUtils; +import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes; +import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse; +import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram; +import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * @author Taejin Koo */ -public class GrpcStreamService { +public class GrpcStreamService implements AutoCloseable { private final Logger logger = LogManager.getLogger(this.getClass()); - private final Timer timer; + public static final long DEFAULT_FLUSH_DELAY = 1000; + + private final ScheduledExecutorService scheduler; private final long flushDelay; private final Object lock = new Object(); - private final AtomicReference currentTaskReference = new AtomicReference<>(); + private ScheduledFuture jobHandle; + + private final List grpcProfilerStreamSocketList = new CopyOnWriteArrayList<>(); - private final List> grpcProfilerStreamSocketList = new CopyOnWriteArrayList<>(); + private final ActiveTraceRepository activeTrace; - public GrpcStreamService(String name, long flushDelay) { + public GrpcStreamService(String name, long flushDelay, ActiveTraceRepository activeTrace) { Objects.requireNonNull(name, "name"); Assert.isTrue(flushDelay > 0, "flushDelay must be >= 0"); - this.timer = new Timer("Pinpoint-Grpc-" + name + "-Timer", true); + + this.scheduler = newScheduledExecutorService(name); + this.flushDelay = flushDelay; + this.activeTrace = Objects.requireNonNull(activeTrace, "ActiveTrace"); } - public GrpcProfilerStreamSocket[] getStreamSocketList() { - return grpcProfilerStreamSocketList.toArray(new GrpcProfilerStreamSocket[0]); + private ScheduledExecutorService newScheduledExecutorService(String name) { + PinpointThreadFactory threadFactory = new PinpointThreadFactory("Pinpoint-" + name + "-Timer", true); + return Executors.newScheduledThreadPool(1, threadFactory); } - public boolean register(GrpcProfilerStreamSocket streamSocket, TimerTask timerTask) { + ActiveThreadCountStreamSocket[] getStreamSocketList() { + return grpcProfilerStreamSocketList.toArray(new ActiveThreadCountStreamSocket[0]); + } + + public boolean register(ActiveThreadCountStreamSocket streamSocket) { synchronized (lock) { grpcProfilerStreamSocketList.add(streamSocket); - boolean turnOn = currentTaskReference.compareAndSet(null, timerTask); - if (turnOn) { - logger.info("turn on TimerTask."); - timer.scheduleAtFixedRate(timerTask, 0, flushDelay); + + if (!isStarted()) { + logger.info("turn on ActiveThreadCountTask"); + Runnable job = new ActiveThreadCountTimerTask(); + this.jobHandle = scheduler.scheduleAtFixedRate(job, flushDelay, flushDelay, TimeUnit.MILLISECONDS); return true; } } return false; } - public boolean unregister(GrpcProfilerStreamSocket streamSocket) { + + + + void stopTask() { + if (jobHandle == null) { + return; + } + logger.info("turn off TimerTask"); + jobHandle.cancel(false); + jobHandle = null; + } + + boolean isStarted() { + return jobHandle != null; + } + + public boolean unregister(ActiveThreadCountStreamSocket streamSocket) { synchronized (lock) { grpcProfilerStreamSocketList.remove(streamSocket); // turnoff if (grpcProfilerStreamSocketList.isEmpty()) { - TimerTask activeThreadCountTimerTask = currentTaskReference.get(); - if (activeThreadCountTimerTask != null) { - currentTaskReference.compareAndSet(activeThreadCountTimerTask, null); - activeThreadCountTimerTask.cancel(); - - logger.info("turn off TimerTask."); - } + stopTask(); return true; } } return false; } + @Override public void close() { synchronized (lock) { - if (timer != null) { - timer.cancel(); + if (scheduler != null) { + ExecutorUtils.shutdownExecutorService("GrpcStreamService", scheduler); } - GrpcProfilerStreamSocket[] streamSockets = grpcProfilerStreamSocketList.toArray(new GrpcProfilerStreamSocket[0]); - for (GrpcProfilerStreamSocket streamSocket : streamSockets) { + ActiveThreadCountStreamSocket[] streamSockets = getStreamSocketList(); + for (ActiveThreadCountStreamSocket streamSocket : streamSockets) { if (streamSocket != null) { streamSocket.close(); } @@ -102,4 +134,49 @@ public void close() { } } + + private PCmdActiveThreadCountRes.Builder builder() { + final long currentTime = System.currentTimeMillis(); + final ActiveTraceHistogram histogram = activeTrace.getActiveTraceHistogram(currentTime); + + PCmdActiveThreadCountRes.Builder responseBuilder = PCmdActiveThreadCountRes.newBuilder(); + responseBuilder.setTimeStamp(currentTime); + responseBuilder.setHistogramSchemaType(histogram.getHistogramSchema().getTypeCode()); + + final List activeTraceCountList = histogram.getCounter(); + for (Integer activeTraceCount : activeTraceCountList) { + responseBuilder.addActiveThreadCount(activeTraceCount); + } + + return responseBuilder; + } + + private class ActiveThreadCountTimerTask implements Runnable { + + @Override + public void run() { + ActiveThreadCountStreamSocket[] streamSocketList = getStreamSocketList(); + if (logger.isDebugEnabled()) { + logger.debug("ActiveThreadCountTimerTask run. streamSocketList:{}", Arrays.toString(streamSocketList)); + } + + final PCmdActiveThreadCountRes.Builder builder = builder(); + for (ActiveThreadCountStreamSocket streamSocket : streamSocketList) { + try { + PCmdStreamResponse header = streamSocket.newHeader(); + builder.setCommonStreamResponse(header); + PCmdActiveThreadCountRes activeThreadCount = builder.build(); + + streamSocket.send(activeThreadCount); + if (logger.isDebugEnabled()) { + logger.debug("ActiveThreadCountStreamSocket. {}", streamSocket); + } + } catch (Throwable e) { + logger.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e); + streamSocket.close(e); + } + } + } + + } } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java deleted file mode 100644 index d0f03756b3c6..000000000000 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2019 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.navercorp.pinpoint.profiler.receiver.grpc; - -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; - -import java.util.Objects; - -/** - * @author Taejin Koo - */ -public class PinpointClientResponseObserver implements ClientResponseObserver { - - private final GrpcProfilerStreamSocket socket; - - private volatile ClientCallStreamObserver requestStream; - - public PinpointClientResponseObserver(GrpcProfilerStreamSocket socket) { - this.socket = Objects.requireNonNull(socket, "socket"); - } - - @Override - public void beforeStart(ClientCallStreamObserver requestStream) { - this.requestStream = requestStream; - } - - @Override - public void onNext(ResT res) { - // do nothing - } - - @Override - public void onError(Throwable t) { - socket.disconnect(t); - } - - @Override - public void onCompleted() { - socket.disconnect(); - } - - public void sendRequest(ReqT value) { - final ClientCallStreamObserver copy = this.requestStream; - if (copy == null) { - return; - } - copy.onNext(value); - } - - public boolean isReady() { - final ClientCallStreamObserver copy = this.requestStream; - if (copy == null) { - return false; - } - return copy.isReady(); - } - - - public void close(Throwable throwable) { - final ClientCallStreamObserver copy = requestStream; - if (copy == null) { - return; - } - - if (throwable == null) { - copy.onCompleted(); - } else { - copy.onError(throwable); - } - } - -} \ No newline at end of file diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/PingStreamContext.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/PingStreamContext.java index f5c2f67fb37f..da86083420c5 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/PingStreamContext.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/PingStreamContext.java @@ -17,6 +17,7 @@ package com.navercorp.pinpoint.profiler.sender.grpc; import com.navercorp.pinpoint.grpc.MessageFormatUtils; +import com.navercorp.pinpoint.grpc.stream.StreamUtils; import com.navercorp.pinpoint.grpc.trace.AgentGrpc; import com.navercorp.pinpoint.grpc.trace.PPing; import io.grpc.Metadata; @@ -139,7 +140,7 @@ private ScheduledFuture schedule(Runnable command) { public void close() { logger.info("{} close()", streamId); - StreamUtils.close(this.requestObserver, this.logger); + StreamUtils.onCompleted(this.requestObserver, (th) -> this.logger.info("PingStreamContext.close", th)); } @Override diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocketTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocketTest.java index d08b6ec6afe3..682940d53c01 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocketTest.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocketTest.java @@ -1,5 +1,7 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; +import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes; +import io.grpc.stub.ClientCallStreamObserver; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -11,14 +13,20 @@ class ActiveThreadCountStreamSocketTest { @Test void close_NPE() { GrpcStreamService grpcStreamService = mock(GrpcStreamService.class); - ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(1, grpcStreamService); + ClientCallStreamObserver client = mock(ClientCallStreamObserver.class); + + ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(1, 2, grpcStreamService); + socket.beforeStart(client); socket.close(null); } @Test void close() { GrpcStreamService grpcStreamService = mock(GrpcStreamService.class); - ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(1, grpcStreamService); + ClientCallStreamObserver client = mock(ClientCallStreamObserver.class); + + ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(1, 2, grpcStreamService); + socket.beforeStart(client); socket.close(new IOException("test")); } } \ No newline at end of file diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcStreamServiceTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcStreamServiceTest.java new file mode 100644 index 000000000000..7d59378352ff --- /dev/null +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcStreamServiceTest.java @@ -0,0 +1,56 @@ +package com.navercorp.pinpoint.profiler.receiver.grpc; + +import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.mockito.Mockito.mock; + +@ExtendWith(MockitoExtension.class) +class GrpcStreamServiceTest { + private final Logger logger = LogManager.getLogger(this.getClass()); + + @Mock + ActiveTraceRepository repo; + + @Test + void register() { + + try (GrpcStreamService service = new GrpcStreamService("test", 5000, repo)) { + ActiveThreadCountStreamSocket socket = mock(ActiveThreadCountStreamSocket.class); + + Assertions.assertFalse(service.isStarted()); + service.register(socket); + + Assertions.assertEquals(1, service.getStreamSocketList().length); + Assertions.assertTrue(service.isStarted()); + + service.unregister(socket); + Assertions.assertFalse(service.isStarted()); + } + } + + + @Test + void register_duplicate() { + try (GrpcStreamService service = new GrpcStreamService("test", 5000, repo)) { + ActiveThreadCountStreamSocket socket = mock(ActiveThreadCountStreamSocket.class); + + Assertions.assertTrue(service.register(socket)); + Assertions.assertFalse(service.register(socket)); + + Assertions.assertTrue(service.isStarted()); + + Assertions.assertFalse(service.unregister(socket)); + Assertions.assertTrue(service.unregister(socket)); + + Assertions.assertFalse(service.isStarted()); + } + } + +} \ No newline at end of file diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserverTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserverTest.java index ba1fde6fa2ac..bab5fe021f3d 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserverTest.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserverTest.java @@ -1,36 +1,41 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; -import com.google.protobuf.Empty; +import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes; import io.grpc.stub.ClientCallStreamObserver; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class PinpointClientResponseObserverTest { @Test void isReady_true() { - GrpcProfilerStreamSocket socket = mock(GrpcProfilerStreamSocket.class); - PinpointClientResponseObserver responseObserver = new PinpointClientResponseObserver<>(socket); + GrpcStreamService service = mock(GrpcStreamService.class); - ClientCallStreamObserver requestStream = mock(ClientCallStreamObserver.class); + ClientCallStreamObserver requestStream = mock(ClientCallStreamObserver.class); when(requestStream.isReady()).thenReturn(true); - responseObserver.beforeStart(requestStream); - Assertions.assertTrue(responseObserver.isReady()); + ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(1, 2, service); + socket.beforeStart(requestStream); + + socket.send(PCmdActiveThreadCountRes.getDefaultInstance()); + verify(requestStream).onNext(PCmdActiveThreadCountRes.getDefaultInstance()); } @Test void isReady_false() { - GrpcProfilerStreamSocket socket = mock(GrpcProfilerStreamSocket.class); - PinpointClientResponseObserver responseObserver = new PinpointClientResponseObserver<>(socket); + GrpcStreamService service = mock(GrpcStreamService.class); + + ClientCallStreamObserver requestStream = mock(ClientCallStreamObserver.class); + when(requestStream.isReady()).thenReturn(false); - Assertions.assertFalse(responseObserver.isReady()); + ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(1, 2, service); + socket.beforeStart(requestStream); - ClientCallStreamObserver requestStream = mock(ClientCallStreamObserver.class); - responseObserver.beforeStart(requestStream); - Assertions.assertFalse(responseObserver.isReady()); + socket.send(PCmdActiveThreadCountRes.getDefaultInstance()); + verify(requestStream, never()).onNext(PCmdActiveThreadCountRes.getDefaultInstance()); } } \ No newline at end of file diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserver.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserver.java new file mode 100644 index 000000000000..ba14099e6f28 --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserver.java @@ -0,0 +1,106 @@ +package com.navercorp.pinpoint.grpc.stream; + +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.StreamObserver; + +import javax.annotation.Nullable; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +public class ClientCallStateStreamObserver extends ClientCallStreamObserver{ + + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater STATE + = AtomicReferenceFieldUpdater.newUpdater(ClientCallStateStreamObserver.class, ObserverState.class, "state"); + + private final ClientCallStreamObserver delegate; + + private volatile ObserverState state = ObserverState.RUN; + + public static ClientCallStateStreamObserver clientCall(StreamObserver delegate) { + if (delegate instanceof ClientCallStreamObserver) { + ClientCallStreamObserver clientCall = (ClientCallStreamObserver) delegate; + return new ClientCallStateStreamObserver<>(clientCall); + } + throw new IllegalArgumentException("delegate is not instance of ClientCallStreamObserver"); + } + + public ClientCallStateStreamObserver(ClientCallStreamObserver delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate"); + } + + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + delegate.cancel(message, cause); + } + + @Override + public void disableAutoRequestWithInitial(int request) { + delegate.disableAutoRequestWithInitial(request); + } + + @Override + public boolean isReady() { + return delegate.isReady(); + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) { + delegate.setOnReadyHandler(onReadyHandler); + } + + @Override + public void request(int count) { + delegate.request(count); + } + + @Override + public void setMessageCompression(boolean enable) { + delegate.setMessageCompression(enable); + } + + @Override + public void disableAutoInboundFlowControl() { + delegate.disableAutoInboundFlowControl(); + } + + @Override + public void onNext(ReqT value) { + delegate.onNext(value); + } + + @Override + public void onError(Throwable t) { + if (ObserverState.changeError(STATE, this)) { + delegate.onError(t); + } + + } + + @Override + public void onCompleted() { + if (ObserverState.changeComplete(STATE, this)) { + delegate.onCompleted(); + } + } + + public boolean isRun() { + return state.isRun(); + } + + public boolean isClosed() { + return state.isClosed(); + } + + public ObserverState state() { + return state; + } + + @Override + public String toString() { + return "ClientCallStateStreamObserver{" + + "delegate=" + delegate + + ", state=" + state + + '}'; + } +} diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ObserverState.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ObserverState.java new file mode 100644 index 000000000000..8eac97e9a837 --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ObserverState.java @@ -0,0 +1,34 @@ +package com.navercorp.pinpoint.grpc.stream; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +public enum ObserverState { + RUN, + COMPLETED, + ERROR; + + public boolean isCompleted() { + return this == COMPLETED; + } + + public boolean isError() { + return this == ERROR; + } + + public boolean isRun() { + return this == RUN; + } + + public boolean isClosed() { + return this != RUN; + } + + public static boolean changeComplete(AtomicReferenceFieldUpdater updater, T target) { + return updater.compareAndSet(target, RUN, COMPLETED); + } + + public static boolean changeError(AtomicReferenceFieldUpdater updater, T target) { + return updater.compareAndSet(target, RUN, ERROR); + } + +} diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamUtils.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/StreamUtils.java similarity index 50% rename from agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamUtils.java rename to grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/StreamUtils.java index f008579b7f60..f491dc604734 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamUtils.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/StreamUtils.java @@ -14,10 +14,11 @@ * limitations under the License. */ -package com.navercorp.pinpoint.profiler.sender.grpc; +package com.navercorp.pinpoint.grpc.stream; import io.grpc.stub.StreamObserver; -import org.apache.logging.log4j.Logger; + +import java.util.function.Consumer; /** @@ -28,15 +29,36 @@ public final class StreamUtils { private StreamUtils() { } - public static void close(final StreamObserver streamObserver, Logger logger) { + public static void onCompleted(final StreamObserver streamObserver) { + onCompleted(streamObserver, null); + } + + public static void onCompleted(final StreamObserver streamObserver, Consumer consumer) { if (streamObserver != null) { try { streamObserver.onCompleted(); } catch (Throwable th) { - if (logger != null) { - logger.info("StreamObserver.onCompleted error", th); + if (consumer != null) { + consumer.accept(th); + } + } + } + } + + public static void onError(final StreamObserver streamObserver, Throwable t) { + onError(streamObserver, t, null); + } + + public static void onError(final StreamObserver streamObserver, Throwable t, Consumer consumer) { + if (streamObserver != null) { + try { + streamObserver.onError(t); + } catch (Throwable th) { + if (consumer != null) { + consumer.accept(th); } } + } } } diff --git a/grpc/src/test/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserverTest.java b/grpc/src/test/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserverTest.java new file mode 100644 index 000000000000..12535f15d8c3 --- /dev/null +++ b/grpc/src/test/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserverTest.java @@ -0,0 +1,49 @@ +package com.navercorp.pinpoint.grpc.stream; + +import io.grpc.stub.ClientCallStreamObserver; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +class ClientCallStateStreamObserverTest { + + @Test + void state() { + ClientCallStreamObserver clientCall = mock(ClientCallStreamObserver.class); + ClientCallStateStreamObserver adaptor = ClientCallStateStreamObserver.clientCall(clientCall); + Assertions.assertTrue(adaptor.state().isRun()); + + adaptor.onCompleted(); + Assertions.assertTrue(adaptor.isClosed()); + } + + @Test + void onError() { + ClientCallStreamObserver clientCall = mock(ClientCallStreamObserver.class); + ClientCallStateStreamObserver adaptor = ClientCallStateStreamObserver.clientCall(clientCall); + + adaptor.onError(new RuntimeException("test")); + adaptor.onError(new RuntimeException("test")); + + Assertions.assertTrue(adaptor.state().isError()); + Assertions.assertTrue(adaptor.state().isClosed()); + verify(clientCall).onError(any()); + } + + @Test + void onCompleted() { + ClientCallStreamObserver clientCall = mock(ClientCallStreamObserver.class); + ClientCallStateStreamObserver adaptor = ClientCallStateStreamObserver.clientCall(clientCall); + + adaptor.onCompleted(); + adaptor.onCompleted(); + adaptor.onCompleted(); + + Assertions.assertTrue(adaptor.state().isCompleted()); + Assertions.assertTrue(adaptor.state().isClosed()); + verify(clientCall).onCompleted(); + } +} \ No newline at end of file