Skip to content

Commit

Permalink
[pinpoint-apm#11497] Add Debug log
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Oct 11, 2024
1 parent b99b8a6 commit dba136b
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.common.util.JvmUtils;
import com.navercorp.pinpoint.grpc.trace.PActiveThreadDump;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadDump;
Expand All @@ -34,6 +35,7 @@
import com.navercorp.pinpoint.profiler.receiver.service.ThreadDump;
import com.navercorp.pinpoint.profiler.receiver.service.ThreadDumpRequest;
import com.navercorp.pinpoint.profiler.util.ThreadDumpUtils;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -83,8 +85,10 @@ public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerComma

List<PActiveThreadDump> activeThreadDumpList = getActiveThreadDumpList(commandActiveThreadDump);
builder.addAllThreadDump(activeThreadDumpList);
PCmdActiveThreadDumpRes activeThreadDump = builder.build();

profilerCommandServiceStub.commandActiveThreadDump(builder.build(), EmptyStreamObserver.create());
StreamObserver<Empty> response = ResponseStreamObserver.responseStream("ActiveThreadDumpResponse");
profilerCommandServiceStub.commandActiveThreadDump(activeThreadDump, response);
}

private List<PActiveThreadDump> getActiveThreadDumpList(PCmdActiveThreadDump commandActiveThreadDump) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.common.util.JvmUtils;
import com.navercorp.pinpoint.grpc.trace.PActiveThreadLightDump;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadLightDump;
Expand All @@ -31,6 +32,7 @@
import com.navercorp.pinpoint.profiler.receiver.service.ActiveThreadDumpCoreService;
import com.navercorp.pinpoint.profiler.receiver.service.ThreadDump;
import com.navercorp.pinpoint.profiler.receiver.service.ThreadDumpRequest;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -81,8 +83,10 @@ public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerComma

List<PActiveThreadLightDump> activeThreadDumpList = getActiveThreadDumpList(commandActiveThreadLightDump);
builder.addAllThreadDump(activeThreadDumpList);
PCmdActiveThreadLightDumpRes activeThreadLightDump = builder.build();

profilerCommandServiceStub.commandActiveThreadLightDump(builder.build(), EmptyStreamObserver.create());
StreamObserver<Empty> response = ResponseStreamObserver.responseStream("ActiveThreadLightDumpResponse");
profilerCommandServiceStub.commandActiveThreadLightDump(activeThreadLightDump, response);
}

private List<PActiveThreadLightDump> getActiveThreadDumpList(PCmdActiveThreadLightDump commandActiveThreadLightDump) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.trace.PCmdEcho;
import com.navercorp.pinpoint.grpc.trace.PCmdEchoResponse;
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
import com.navercorp.pinpoint.grpc.trace.PCommandType;
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -43,8 +45,10 @@ public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerComma

PCmdResponse commonResponse = PCmdResponse.newBuilder().setResponseId(request.getRequestId()).build();
responseBuilder.setCommonResponse(commonResponse);
PCmdEchoResponse echoResponse = responseBuilder.build();

profilerCommandServiceStub.commandEcho(responseBuilder.build(), EmptyStreamObserver.create());
StreamObserver<Empty> response = ResponseStreamObserver.responseStream("EchoResponse");
profilerCommandServiceStub.commandEcho(echoResponse, response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,48 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;

/**
* @author Taejin Koo
*/
public class EmptyStreamObserver implements StreamObserver<Empty> {
public class ResponseStreamObserver<ResT> implements StreamObserver<ResT> {

private static final Logger LOGGER = LogManager.getLogger(ResponseStreamObserver.class);

private final Logger logger = LogManager.getLogger(this.getClass());
private final String responseName;

public ResponseStreamObserver(String responseName) {
this.responseName = Objects.requireNonNull(responseName, "responseName");
}

@Override
public void onNext(Empty value) {
logger.info("onNext. message:{}", value);
public void onNext(ResT value) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("{} onNext {}", responseName, value.getClass().getSimpleName());
}
}

@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.info("onError:{}", status);
Metadata metadata = Status.trailersFromThrowable(t);
LOGGER.info("{} onError {} {}", responseName, status, metadata);
}

@Override
public void onCompleted() {
logger.info("onCompleted.");
LOGGER.info("{} onCompleted", responseName);
}

static StreamObserver<Empty> create() {
return new EmptyStreamObserver();
public static <ResT> StreamObserver<ResT> responseStream(String responseName) {
return new ResponseStreamObserver<>(responseName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.annotation.Nullable;
import java.util.Objects;
Expand All @@ -13,6 +15,8 @@ public class ClientCallStateStreamObserver<ReqT> extends ClientCallStreamObserve
private static final AtomicReferenceFieldUpdater<ClientCallStateStreamObserver, ObserverState> STATE
= AtomicReferenceFieldUpdater.newUpdater(ClientCallStateStreamObserver.class, ObserverState.class, "state");

private final Logger logger = LogManager.getLogger(this.getClass());

private final ClientCallStreamObserver<ReqT> delegate;

private volatile ObserverState state = ObserverState.RUN;
Expand All @@ -29,58 +33,67 @@ public ClientCallStateStreamObserver(ClientCallStreamObserver<ReqT> delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}

ClientCallStreamObserver<ReqT> delegate() {
return delegate;
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
delegate.cancel(message, cause);
delegate().cancel(message, cause);
}

@Override
public void disableAutoRequestWithInitial(int request) {
delegate.disableAutoRequestWithInitial(request);
delegate().disableAutoRequestWithInitial(request);
}

@Override
public boolean isReady() {
return delegate.isReady();
return delegate().isReady();
}

@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
delegate.setOnReadyHandler(onReadyHandler);
delegate().setOnReadyHandler(onReadyHandler);
}

@Override
public void request(int count) {
delegate.request(count);
delegate().request(count);
}

@Override
public void setMessageCompression(boolean enable) {
delegate.setMessageCompression(enable);
delegate().setMessageCompression(enable);
}

@Override
public void disableAutoInboundFlowControl() {
delegate.disableAutoInboundFlowControl();
delegate().disableAutoInboundFlowControl();
}

@Override
public void onNext(ReqT value) {
delegate.onNext(value);
delegate().onNext(value);
}

@Override
public void onError(Throwable t) {
if (ObserverState.changeError(STATE, this)) {
delegate.onError(t);
delegate().onError(t);
} else {
// for debugging
logger.warn("onError() WARNING. state already changed {}", state);
}

}

@Override
public void onCompleted() {
if (ObserverState.changeComplete(STATE, this)) {
delegate.onCompleted();
delegate().onCompleted();
} else {
// for debugging
logger.warn("onComplete() WARNING. state already changed {}", state);
}
}

Expand Down

0 comments on commit dba136b

Please sign in to comment.