Skip to content

Commit

Permalink
[pinpoint-apm#11497] Refactor ActiveThreadCountStreamSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Oct 10, 2024
1 parent 336e81e commit ebdb8ff
Show file tree
Hide file tree
Showing 16 changed files with 522 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcActiveThreadDumpService;
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcActiveThreadLightDumpService;
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcEchoService;
import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcStreamService;
import com.navercorp.pinpoint.profiler.sender.grpc.AgentGrpcDataSender;
import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor;
import io.grpc.ClientInterceptor;
Expand Down Expand Up @@ -166,12 +167,16 @@ private ProfilerCommandServiceLocator createProfilerCommandServiceLocator(Active

profilerCommandLocatorBuilder.addService(new GrpcEchoService());
if (activeTraceRepository != null) {
profilerCommandLocatorBuilder.addService(new GrpcActiveThreadCountService(activeTraceRepository));
GrpcActiveThreadCountService grpcActiveThreadCountService = newActiveThreadCountService(activeTraceRepository);
profilerCommandLocatorBuilder.addService(grpcActiveThreadCountService);
profilerCommandLocatorBuilder.addService(new GrpcActiveThreadDumpService(activeTraceRepository, threadDumpMapper));
profilerCommandLocatorBuilder.addService(new GrpcActiveThreadLightDumpService(activeTraceRepository, threadDumpMapper));
}
return profilerCommandLocatorBuilder.build();
}

final ProfilerCommandServiceLocator commandServiceLocator = profilerCommandLocatorBuilder.build();
return commandServiceLocator;
private GrpcActiveThreadCountService newActiveThreadCountService(ActiveTraceRepository activeTraceRepository) {
GrpcStreamService grpcStreamService = new GrpcStreamService("ActiveThreadCountService", GrpcStreamService.DEFAULT_FLUSH_DELAY, activeTraceRepository);
return new GrpcActiveThreadCountService(grpcStreamService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.stream.ClientCallStateStreamObserver;
import com.navercorp.pinpoint.grpc.stream.StreamUtils;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -30,21 +33,29 @@
/**
* @author Taejin Koo
*/
public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<PCmdActiveThreadCountRes, Empty> {
public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<PCmdActiveThreadCountRes, Empty>, StreamObserver<Empty> {

private final Logger logger = LogManager.getLogger(this.getClass());

private final GrpcStreamService grpcStreamService;

private final int socketId;
private final int streamObserverId;

private int sequenceId = 0;

private final PinpointClientResponseObserver<PCmdActiveThreadCountRes, Empty> clientResponseObserver;
private final ClientCallStateStreamObserver<PCmdActiveThreadCountRes> requestStream;
private volatile boolean closed = false;

public ActiveThreadCountStreamSocket(int streamObserverId, GrpcStreamService grpcStreamService) {
public ActiveThreadCountStreamSocket(int socketId, int streamObserverId,
GrpcStreamService grpcStreamService,
StreamFactory<PCmdActiveThreadCountRes, Empty> streamFactory) {
this.socketId = socketId;
this.streamObserverId = streamObserverId;
this.grpcStreamService = Objects.requireNonNull(grpcStreamService, "grpcStreamService");
this.clientResponseObserver = new PinpointClientResponseObserver<>(this);

Objects.requireNonNull(streamFactory, "streamFactory");
this.requestStream = ClientCallStateStreamObserver.wrap(streamFactory.newStream(this));
}

public PCmdStreamResponse newHeader() {
Expand All @@ -54,13 +65,20 @@ public PCmdStreamResponse newHeader() {
return headerResponseBuilder.build();
}


@Override
public void send(PCmdActiveThreadCountRes activeThreadCount) {
if (clientResponseObserver.isReady()) {
clientResponseObserver.sendRequest(activeThreadCount);
public boolean send(PCmdActiveThreadCountRes activeThreadCount) {
if (closed) {
return false;
}
final CallStreamObserver<PCmdActiveThreadCountRes> request = this.requestStream;
if (request.isReady()) {
request.onNext(activeThreadCount);
return true;
} else {
logger.info("Send fail. (ActiveThreadCount) client is not ready. streamObserverId:{}", streamObserverId);
logger.info("Send fail. (ActiveThreadCount) client is not ready. socketId:{} streamObserverId:{}", socketId, streamObserverId);
}
return false;
}

private int getSequenceId() {
Expand All @@ -69,45 +87,63 @@ private int getSequenceId() {

@Override
public void close() {
if (closed) {
return;
}
logger.info("close");
close0(null);
dispose();
requestStream.onCompleted();
}

@Override
public void close(Throwable throwable) {
if (closed) {
return;
}
logger.warn("close", throwable);
close0(throwable);
dispose();
requestStream.onCompleted();
}

public boolean isClosed() {
return closed;
}


private void dispose() {
this.closed = true;
grpcStreamService.unregister(this);
}

@Override
public void disconnect() {
logger.info("disconnect");
close0(null);
public void onNext(Empty empty) {
logger.debug("onNext {}", empty);
}

@Override
public void disconnect(Throwable throwable) {
public void onError(Throwable throwable) {
Status status = Status.fromThrowable(throwable);
Metadata metadata = Status.trailersFromThrowable(throwable);
logger.info("disconnect. {} {}", status, metadata);
close0(throwable);
}
logger.info("onError {}. {} {}", this, status, metadata);

private void close0(Throwable throwable) {
clientResponseObserver.close(throwable);
grpcStreamService.unregister(this);
this.dispose();
StreamUtils.onCompleted(requestStream, (th) -> logger.info("onError", th));
}

@Override
public ClientResponseObserver<PCmdActiveThreadCountRes, Empty> getResponseObserver() {
return clientResponseObserver;
public void onCompleted() {
logger.info("onCompleted {}", this);

this.dispose();
StreamUtils.onCompleted(requestStream, (th) -> logger.info("onCompleted", th));
}

@Override
public String toString() {
return "ActiveThreadCountStreamSocket{" +
"streamObserverId=" + streamObserverId +
"socketId=" + socketId +
", streamObserverId=" + streamObserverId +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,29 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse;
import com.navercorp.pinpoint.grpc.trace.PCommandType;
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository;
import io.grpc.stub.ClientResponseObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author Taejin Koo
*/
public class GrpcActiveThreadCountService implements ProfilerGrpcCommandService, Closeable {

private static final long DEFAULT_FLUSH_DELAY = 1000;

private final Logger logger = LogManager.getLogger(getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final ActiveTraceRepository activeTraceRepository;
private final AtomicInteger sequence = new AtomicInteger(0);

private final GrpcStreamService grpcStreamService = new GrpcStreamService("ActiveThreadCountService", DEFAULT_FLUSH_DELAY);
private final GrpcStreamService grpcStreamService ;

public GrpcActiveThreadCountService(ActiveTraceRepository activeTraceRepository) {
this.activeTraceRepository = Objects.requireNonNull(activeTraceRepository, "activeTraceRepository");
public GrpcActiveThreadCountService(GrpcStreamService grpcStreamService) {
this.grpcStreamService = Objects.requireNonNull(grpcStreamService, "grpcStreamService");
}

@Override
Expand All @@ -59,66 +47,17 @@ public short getCommandServiceCode() {
}

@Override
public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerCommandServiceStub profilerCommandServiceStub) {
ActiveThreadCountStreamSocket activeThreadCountStreamSocket = new ActiveThreadCountStreamSocket(request.getRequestId(), grpcStreamService);
ClientResponseObserver<PCmdActiveThreadCountRes, Empty> responseObserver = activeThreadCountStreamSocket.getResponseObserver();
profilerCommandServiceStub.commandStreamActiveThreadCount(responseObserver);
public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerCommandServiceStub commandServiceStub) {
ActiveThreadCountStreamSocket socket = new ActiveThreadCountStreamSocket(sequence.getAndIncrement(), request.getRequestId(), grpcStreamService, commandServiceStub::commandStreamActiveThreadCount);

grpcStreamService.register(activeThreadCountStreamSocket, new ActiveThreadCountTimerTask());
grpcStreamService.register(socket);
}

private PCmdActiveThreadCountRes.Builder getActiveThreadCountResponse() {
final long currentTime = System.currentTimeMillis();
final ActiveTraceHistogram histogram = activeTraceRepository.getActiveTraceHistogram(currentTime);

PCmdActiveThreadCountRes.Builder responseBuilder = PCmdActiveThreadCountRes.newBuilder();
responseBuilder.setTimeStamp(currentTime);
responseBuilder.setHistogramSchemaType(histogram.getHistogramSchema().getTypeCode());

final List<Integer> activeTraceCountList = histogram.getCounter();
for (Integer activeTraceCount : activeTraceCountList) {
responseBuilder.addActiveThreadCount(activeTraceCount);
}

return responseBuilder;
}

@Override
public void close() throws IOException {
logger.info("close");
grpcStreamService.close();
}

private class ActiveThreadCountTimerTask extends TimerTask {

@Override
public void run() {
if (isDebug) {
logger.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList()));
}

PCmdActiveThreadCountRes.Builder activeThreadCountResponseBuilder = getActiveThreadCountResponse();
for (GrpcProfilerStreamSocket<?, ?> streamSocket : grpcStreamService.getStreamSocketList()) {
if (streamSocket instanceof ActiveThreadCountStreamSocket) {
try {
final ActiveThreadCountStreamSocket stream = (ActiveThreadCountStreamSocket) streamSocket;

PCmdStreamResponse header = stream.newHeader();
activeThreadCountResponseBuilder.setCommonStreamResponse(header);
PCmdActiveThreadCountRes activeThreadCount = activeThreadCountResponseBuilder.build();

stream.send(activeThreadCount);
if (isDebug) {
logger.debug("ActiveThreadCountStreamSocket. {}", stream);
}
} catch (Throwable e) {
logger.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e);
streamSocket.close(e);
}
}
}
}

}

}
Loading

0 comments on commit ebdb8ff

Please sign in to comment.