From c968eca3b4aee1bc05324dfbd954261832a4183c Mon Sep 17 00:00:00 2001 From: emeroad Date: Fri, 11 Oct 2024 15:27:47 +0900 Subject: [PATCH] [#11497] Add Debug log --- .../grpc/GrpcActiveThreadDumpService.java | 6 +++- .../GrpcActiveThreadLightDumpService.java | 6 +++- .../receiver/grpc/GrpcEchoService.java | 6 +++- ...erver.java => ResponseStreamObserver.java} | 29 ++++++++++----- .../stream/ClientCallStateStreamObserver.java | 35 +++++++++++++------ 5 files changed, 59 insertions(+), 23 deletions(-) rename agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/{EmptyStreamObserver.java => ResponseStreamObserver.java} (52%) diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java index 4b7925f80d8a..0e3c10343b98 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java @@ -16,6 +16,7 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; +import com.google.protobuf.Empty; import com.navercorp.pinpoint.common.util.JvmUtils; import com.navercorp.pinpoint.grpc.trace.PActiveThreadDump; import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadDump; @@ -34,6 +35,7 @@ import com.navercorp.pinpoint.profiler.receiver.service.ThreadDump; import com.navercorp.pinpoint.profiler.receiver.service.ThreadDumpRequest; import com.navercorp.pinpoint.profiler.util.ThreadDumpUtils; +import io.grpc.stub.StreamObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -83,8 +85,10 @@ public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerComma List activeThreadDumpList = getActiveThreadDumpList(commandActiveThreadDump); builder.addAllThreadDump(activeThreadDumpList); + PCmdActiveThreadDumpRes activeThreadDump = builder.build(); - profilerCommandServiceStub.commandActiveThreadDump(builder.build(), EmptyStreamObserver.create()); + StreamObserver response = ResponseStreamObserver.responseStream("ActiveThreadDumpResponse"); + profilerCommandServiceStub.commandActiveThreadDump(activeThreadDump, response); } private List getActiveThreadDumpList(PCmdActiveThreadDump commandActiveThreadDump) { diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadLightDumpService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadLightDumpService.java index a40c33d91237..0d9911975d59 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadLightDumpService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadLightDumpService.java @@ -16,6 +16,7 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; +import com.google.protobuf.Empty; import com.navercorp.pinpoint.common.util.JvmUtils; import com.navercorp.pinpoint.grpc.trace.PActiveThreadLightDump; import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadLightDump; @@ -31,6 +32,7 @@ import com.navercorp.pinpoint.profiler.receiver.service.ActiveThreadDumpCoreService; import com.navercorp.pinpoint.profiler.receiver.service.ThreadDump; import com.navercorp.pinpoint.profiler.receiver.service.ThreadDumpRequest; +import io.grpc.stub.StreamObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -81,8 +83,10 @@ public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerComma List activeThreadDumpList = getActiveThreadDumpList(commandActiveThreadLightDump); builder.addAllThreadDump(activeThreadDumpList); + PCmdActiveThreadLightDumpRes activeThreadLightDump = builder.build(); - profilerCommandServiceStub.commandActiveThreadLightDump(builder.build(), EmptyStreamObserver.create()); + StreamObserver response = ResponseStreamObserver.responseStream("ActiveThreadLightDumpResponse"); + profilerCommandServiceStub.commandActiveThreadLightDump(activeThreadLightDump, response); } private List getActiveThreadDumpList(PCmdActiveThreadLightDump commandActiveThreadLightDump) { diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcEchoService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcEchoService.java index 6dc4cdddfec2..2bed4978e68b 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcEchoService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcEchoService.java @@ -16,12 +16,14 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; +import com.google.protobuf.Empty; import com.navercorp.pinpoint.grpc.trace.PCmdEcho; import com.navercorp.pinpoint.grpc.trace.PCmdEchoResponse; import com.navercorp.pinpoint.grpc.trace.PCmdRequest; import com.navercorp.pinpoint.grpc.trace.PCmdResponse; import com.navercorp.pinpoint.grpc.trace.PCommandType; import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc; +import io.grpc.stub.StreamObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,8 +45,10 @@ public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerComma PCmdResponse commonResponse = PCmdResponse.newBuilder().setResponseId(request.getRequestId()).build(); responseBuilder.setCommonResponse(commonResponse); + PCmdEchoResponse echoResponse = responseBuilder.build(); - profilerCommandServiceStub.commandEcho(responseBuilder.build(), EmptyStreamObserver.create()); + StreamObserver response = ResponseStreamObserver.responseStream("EchoResponse"); + profilerCommandServiceStub.commandEcho(echoResponse, response); } @Override diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ResponseStreamObserver.java similarity index 52% rename from agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java rename to agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ResponseStreamObserver.java index ed7b12032c8a..2e38ce8b17d0 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ResponseStreamObserver.java @@ -16,37 +16,48 @@ package com.navercorp.pinpoint.profiler.receiver.grpc; -import com.google.protobuf.Empty; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.stub.StreamObserver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Objects; + /** * @author Taejin Koo */ -public class EmptyStreamObserver implements StreamObserver { +public class ResponseStreamObserver implements StreamObserver { + + private static final Logger LOGGER = LogManager.getLogger(ResponseStreamObserver.class); - private final Logger logger = LogManager.getLogger(this.getClass()); + private final String responseName; + + public ResponseStreamObserver(String responseName) { + this.responseName = Objects.requireNonNull(responseName, "responseName"); + } @Override - public void onNext(Empty value) { - logger.info("onNext. message:{}", value); + public void onNext(ResT value) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("{} onNext {}", responseName, value.getClass().getSimpleName()); + } } @Override public void onError(Throwable t) { Status status = Status.fromThrowable(t); - logger.info("onError:{}", status); + Metadata metadata = Status.trailersFromThrowable(t); + LOGGER.info("{} onError {} {}", responseName, status, metadata); } @Override public void onCompleted() { - logger.info("onCompleted."); + LOGGER.info("{} onCompleted", responseName); } - static StreamObserver create() { - return new EmptyStreamObserver(); + public static StreamObserver responseStream(String responseName) { + return new ResponseStreamObserver<>(responseName); } } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserver.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserver.java index ba14099e6f28..242ec71ace89 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserver.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/stream/ClientCallStateStreamObserver.java @@ -2,6 +2,8 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import javax.annotation.Nullable; import java.util.Objects; @@ -13,6 +15,8 @@ public class ClientCallStateStreamObserver extends ClientCallStreamObserve private static final AtomicReferenceFieldUpdater STATE = AtomicReferenceFieldUpdater.newUpdater(ClientCallStateStreamObserver.class, ObserverState.class, "state"); + private final Logger logger = LogManager.getLogger(this.getClass()); + private final ClientCallStreamObserver delegate; private volatile ObserverState state = ObserverState.RUN; @@ -29,58 +33,67 @@ public ClientCallStateStreamObserver(ClientCallStreamObserver delegate) { this.delegate = Objects.requireNonNull(delegate, "delegate"); } + ClientCallStreamObserver delegate() { + return delegate; + } + @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { - delegate.cancel(message, cause); + delegate().cancel(message, cause); } @Override public void disableAutoRequestWithInitial(int request) { - delegate.disableAutoRequestWithInitial(request); + delegate().disableAutoRequestWithInitial(request); } @Override public boolean isReady() { - return delegate.isReady(); + return delegate().isReady(); } @Override public void setOnReadyHandler(Runnable onReadyHandler) { - delegate.setOnReadyHandler(onReadyHandler); + delegate().setOnReadyHandler(onReadyHandler); } @Override public void request(int count) { - delegate.request(count); + delegate().request(count); } @Override public void setMessageCompression(boolean enable) { - delegate.setMessageCompression(enable); + delegate().setMessageCompression(enable); } @Override public void disableAutoInboundFlowControl() { - delegate.disableAutoInboundFlowControl(); + delegate().disableAutoInboundFlowControl(); } @Override public void onNext(ReqT value) { - delegate.onNext(value); + delegate().onNext(value); } @Override public void onError(Throwable t) { if (ObserverState.changeError(STATE, this)) { - delegate.onError(t); + delegate().onError(t); + } else { + // for debugging + logger.warn("onError() WARNING. state already changed {}", state); } - } @Override public void onCompleted() { if (ObserverState.changeComplete(STATE, this)) { - delegate.onCompleted(); + delegate().onCompleted(); + } else { + // for debugging + logger.warn("onComplete() WARNING. state already changed {}", state); } }