Skip to content

Commit

Permalink
[pinpoint-apm#11497] Fix DirectByteBuffer leak in active thread count
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Sep 26, 2024
1 parent c540794 commit d9a305e
Show file tree
Hide file tree
Showing 26 changed files with 411 additions and 456 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ public ClientResponseObserver<PCmdActiveThreadCountRes, Empty> getResponseObserv
return clientResponseObserver;
}

@Override
public String toString() {
return "ActiveThreadCountStreamSocket{" +
"streamObserverId=" + streamObserverId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public void run() {
PCmdActiveThreadCountRes activeThreadCount = activeThreadCountResponseBuilder.build();

stream.send(activeThreadCount);
if (isDebug) {
LOGGER.debug("ActiveThreadCountStreamSocket. {}", stream);
}
} catch (Throwable e) {
LOGGER.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e);
streamSocket.close(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public void run() throws Exception {

MetadataService metadataService = new MetadataService(new MockDispatchHandler(), Executors.newFixedThreadPool(8), serverRequestFactory);
List<ServerServiceDefinition> serviceList = List.of(agentService.bindService(), metadataService.bindService());

grpcReceiver.setBindAddress(builder.build());
grpcReceiver.setAddressFilter(new MockAddressFilter());

grpcReceiver.setBindableServiceList(serviceList);
grpcReceiver.setAddressFilter(new MockAddressFilter());
grpcReceiver.setExecutor(Executors.newFixedThreadPool(8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@
*/
package com.navercorp.pinpoint.realtime.collector.receiver;

import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadDumpRes;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadLightDumpRes;
import com.navercorp.pinpoint.grpc.trace.PCmdEchoResponse;
import com.navercorp.pinpoint.realtime.collector.receiver.grpc.GrpcAgentConnectionRepository;
import com.navercorp.pinpoint.realtime.collector.receiver.grpc.GrpcCommandService;
import com.navercorp.pinpoint.realtime.collector.sink.ErrorSinkRepository;
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadCountPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadDumpPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadLightDumpPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.EchoPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.RealtimeCollectorSinkConfig;
import com.navercorp.pinpoint.realtime.collector.sink.SinkRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.MonoSink;

/**
* @author youngjin.kim2
Expand All @@ -45,15 +42,13 @@ GrpcAgentConnectionRepository grpcAgentConnectionRepository() {
@Bean("commandService")
GrpcCommandService grpcCommandService(
GrpcAgentConnectionRepository agentConnectionRepository,
ErrorSinkRepository errorSinkRepository,
SinkRepository<FluxSink<PCmdActiveThreadCountRes>> activeThreadCountSinkRepository,
SinkRepository<MonoSink<PCmdActiveThreadDumpRes>> activeThreadDumpSinkRepository,
SinkRepository<MonoSink<PCmdActiveThreadLightDumpRes>> activeThreadLightDumpSinkRepository,
SinkRepository<MonoSink<PCmdEchoResponse>> echoSinkRepository
SinkRepository<ActiveThreadCountPublisher> activeThreadCountSinkRepository,
SinkRepository<ActiveThreadDumpPublisher> activeThreadDumpSinkRepository,
SinkRepository<ActiveThreadLightDumpPublisher> activeThreadLightDumpSinkRepository,
SinkRepository<EchoPublisher> echoSinkRepository
) {
return new GrpcCommandService(
agentConnectionRepository,
errorSinkRepository,
activeThreadCountSinkRepository,
activeThreadDumpSinkRepository,
activeThreadLightDumpSinkRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,72 +16,73 @@
package com.navercorp.pinpoint.realtime.collector.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadCountPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.SinkRepository;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.FluxSink;

import java.util.Objects;

/**
* @author youngjin.kim2
*/
public abstract class FluxCommandResponseObserver<T> implements StreamObserver<T> {
public abstract class ActiveThreadCountResponseStreamObserver implements StreamObserver<PCmdActiveThreadCountRes> {

private static final Logger logger = LogManager.getLogger(FluxCommandResponseObserver.class);
private static final Logger logger = LogManager.getLogger(ActiveThreadCountResponseStreamObserver.class);

private final ServerCallStreamObserver<Empty> connectionObserver;
private final SinkRepository<FluxSink<T>> sinkRepository;
private final ServerCallStreamObserver<Empty> serverCallStreamObserver;
private final SinkRepository<ActiveThreadCountPublisher> sinkRepository;

private volatile long sinkId = -1;
private volatile FluxSink<T> sink = null;
private volatile ActiveThreadCountPublisher publisher = null;

public FluxCommandResponseObserver(
ServerCallStreamObserver<Empty> connectionObserver,
SinkRepository<FluxSink<T>> sinkRepository
) {
this.connectionObserver = Objects.requireNonNull(connectionObserver, "connectionObserver");
public ActiveThreadCountResponseStreamObserver(ServerCallStreamObserver<Empty> serverCallStreamObserver, SinkRepository<ActiveThreadCountPublisher> sinkRepository) {
this.serverCallStreamObserver = Objects.requireNonNull(serverCallStreamObserver, "serverCallStreamObserver");
this.sinkRepository = Objects.requireNonNull(sinkRepository, "sinkRepository");
}

@Override
public void onNext(T response) {
public void onNext(PCmdActiveThreadCountRes response) {
boolean isHello = extractSequence(response) == 1;

if (isHello) {
connectionObserver.onNext(Empty.getDefaultInstance());
serverCallStreamObserver.onNext(Empty.getDefaultInstance());
}

if (ensureSink(response) == null) {
this.connectionObserver.onError(new StatusException(Status.INTERNAL.withDescription("sink not found")));
final ActiveThreadCountPublisher publisher = ensureSink(response);
if (publisher == null) {
this.serverCallStreamObserver.onError(new StatusException(Status.INTERNAL.withDescription("sink not found")));
return;
}

logger.debug("Realtime flux item received: sinkId = {}", sinkId);
if (!isHello) {
this.sink.next(response);
publisher.publish(response);
}
}

private FluxSink<T> ensureSink(T response) {
if (this.sinkId == -1 || this.sink == null) {
private ActiveThreadCountPublisher ensureSink(PCmdActiveThreadCountRes response) {
if (this.sinkId == -1 || publisher == null) {
return initSink(response);
}
return this.sink;
return this.publisher;
}

private FluxSink<T> initSink(T response) {
private ActiveThreadCountPublisher initSink(PCmdActiveThreadCountRes response) {
this.sinkId = this.extractSinkId(response);
this.sink = this.sinkRepository.get(this.sinkId);
if (this.sink == null) {
this.publisher = this.sinkRepository.get(sinkId);
if (this.publisher == null) {
logger.warn("Failed to handle realtime flux item: sink {} not found", this.sinkId);
return null;
} else {
publisher.setStreamObserver(this.serverCallStreamObserver);
}
return this.sink;
return publisher;
}

@Override
Expand All @@ -93,10 +94,10 @@ public void onError(Throwable t) {
logger.warn("Stream error: sinkId = {}, {}", sinkId, status);
}

this.connectionObserver.onCompleted();
this.serverCallStreamObserver.onCompleted();

if (this.sink != null) {
this.sink.error(t);
if (this.publisher != null) {
this.publisher.error(t);
} else {
logger.warn("Failed to emit error: sink not found. the error may have occurred before the first message");
}
Expand All @@ -106,17 +107,17 @@ public void onError(Throwable t) {
public void onCompleted() {
logger.info("Completed stream: sinkId = {}", this.sinkId);

this.connectionObserver.onCompleted();
this.serverCallStreamObserver.onCompleted();

if (this.sink != null) {
this.sink.complete();
if (this.publisher != null) {
this.publisher.complete();
} else {
logger.warn("Failed to emit complete: sink not found");
}
}

protected abstract long extractSinkId(T response);
protected abstract long extractSinkId(PCmdActiveThreadCountRes response);

protected abstract int extractSequence(T response);
protected abstract int extractSequence(PCmdActiveThreadCountRes response);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
import com.navercorp.pinpoint.realtime.collector.sink.ErrorSinkRepository;
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadCountPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadDumpPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadLightDumpPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.EchoPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.Publisher;
import com.navercorp.pinpoint.realtime.collector.sink.SinkRepository;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -38,8 +42,6 @@
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.MonoSink;

import java.net.InetSocketAddress;
import java.util.List;
Expand All @@ -54,22 +56,19 @@ public class GrpcCommandService extends ProfilerCommandServiceGrpc.ProfilerComma
private final Logger logger = LogManager.getLogger(this.getClass());

private final GrpcAgentConnectionRepository agentConnectionRepository;
private final ErrorSinkRepository sinkRepository;
private final SinkRepository<FluxSink<PCmdActiveThreadCountRes>> activeThreadCountSinkRepo;
private final SinkRepository<MonoSink<PCmdActiveThreadDumpRes>> activeThreadDumpSinkRepo;
private final SinkRepository<MonoSink<PCmdActiveThreadLightDumpRes>> activeThreadLightDumpSinkRepo;
private final SinkRepository<MonoSink<PCmdEchoResponse>> echoSinkRepo;
private final SinkRepository<ActiveThreadCountPublisher> activeThreadCountSinkRepo;
private final SinkRepository<ActiveThreadDumpPublisher> activeThreadDumpSinkRepo;
private final SinkRepository<ActiveThreadLightDumpPublisher> activeThreadLightDumpSinkRepo;
private final SinkRepository<EchoPublisher> echoSinkRepo;

public GrpcCommandService(
GrpcAgentConnectionRepository agentConnectionRepository,
ErrorSinkRepository sinkRepository,
SinkRepository<FluxSink<PCmdActiveThreadCountRes>> activeThreadCountSinkRepo,
SinkRepository<MonoSink<PCmdActiveThreadDumpRes>> activeThreadDumpSinkRepo,
SinkRepository<MonoSink<PCmdActiveThreadLightDumpRes>> activeThreadLightDumpSinkRepo,
SinkRepository<MonoSink<PCmdEchoResponse>> echoSinkRepo
SinkRepository<ActiveThreadCountPublisher> activeThreadCountSinkRepo,
SinkRepository<ActiveThreadDumpPublisher> activeThreadDumpSinkRepo,
SinkRepository<ActiveThreadLightDumpPublisher> activeThreadLightDumpSinkRepo,
SinkRepository<EchoPublisher> echoSinkRepo
) {
this.agentConnectionRepository = Objects.requireNonNull(agentConnectionRepository, "clusterPointRepository");
this.sinkRepository = Objects.requireNonNull(sinkRepository, "sinkRepository");
this.activeThreadCountSinkRepo = Objects.requireNonNull(activeThreadCountSinkRepo, "activeThreadCountSinkRepo");
this.activeThreadDumpSinkRepo = Objects.requireNonNull(activeThreadDumpSinkRepo, "activeThreadDumpSinkRepo");
this.activeThreadLightDumpSinkRepo = Objects.requireNonNull(activeThreadLightDumpSinkRepo, "activeThreadLightDumpSinkRepo");
Expand Down Expand Up @@ -127,9 +126,7 @@ public void onNext(PCmdMessage value) {
GrpcCommandService.this.agentConnectionRepository.add(conn);
}
} else if (value.hasFailMessage()) {
PCmdResponse failMessage = value.getFailMessage();
long sinkId = failMessage.getResponseId();
sinkRepository.error(sinkId, new RuntimeException(failMessage.getMessage().getValue()));
handleFail(value);
}
}

Expand Down Expand Up @@ -199,9 +196,7 @@ public StreamObserver<PCmdMessage> handleCommandV2(StreamObserver<PCmdRequest> r
@Override
public void onNext(PCmdMessage value) {
if (value.hasFailMessage()) {
PCmdResponse failMessage = value.getFailMessage();
long sinkId = failMessage.getResponseId();
sinkRepository.error(sinkId, new RuntimeException(failMessage.getMessage().getValue()));
handleFail(value);
}
}

Expand Down Expand Up @@ -232,6 +227,16 @@ private GrpcAgentConnection buildAgentConnection(
);
}

private void handleFail(PCmdMessage value) {
final PCmdResponse failMessage = value.getFailMessage();
final long sinkId = failMessage.getResponseId();
final Exception exception = new RuntimeException(failMessage.getMessage().getValue());
activeThreadCountSinkRepo.error(sinkId, exception);
activeThreadDumpSinkRepo.error(sinkId, exception);
activeThreadLightDumpSinkRepo.error(sinkId, exception);
echoSinkRepo.error(sinkId, exception);
}

private void handleOnError(Throwable t, GrpcAgentConnection conn) {
if (conn == null) {
logger.warn("Command error before establishment");
Expand All @@ -242,7 +247,7 @@ private void handleOnError(Throwable t, GrpcAgentConnection conn) {
Metadata metadata = Status.trailersFromThrowable(t);

logger.info("Failed to command stream, {} => local, {} {}",
conn.getClusterKey(), status, metadata);
conn.getClusterKey(), status, metadata);

}

Expand All @@ -258,33 +263,31 @@ private void handleOnCompleted(GrpcAgentConnection conn) {
@Override
public void commandEcho(PCmdEchoResponse response, StreamObserver<Empty> responseObserver) {
long sinkId = response.getCommonResponse().getResponseId();
emitMono(response, responseObserver, this.echoSinkRepo.get(sinkId));
final EchoPublisher publisher = this.echoSinkRepo.get(sinkId);
emitMono(response, responseObserver, publisher);
this.echoSinkRepo.invalidate(sinkId);
}

@Override
public void commandActiveThreadDump(PCmdActiveThreadDumpRes response, StreamObserver<Empty> responseObserver) {
long sinkId = response.getCommonResponse().getResponseId();
emitMono(response, responseObserver, this.activeThreadDumpSinkRepo.get(sinkId));
final ActiveThreadDumpPublisher publisher = this.activeThreadDumpSinkRepo.get(sinkId);
emitMono(response, responseObserver, publisher);
this.activeThreadDumpSinkRepo.invalidate(sinkId);
}

@Override
public void commandActiveThreadLightDump(
PCmdActiveThreadLightDumpRes response,
StreamObserver<Empty> responseObserver
) {
public void commandActiveThreadLightDump(PCmdActiveThreadLightDumpRes response, StreamObserver<Empty> responseObserver) {
long sinkId = response.getCommonResponse().getResponseId();
emitMono(response, responseObserver, this.activeThreadLightDumpSinkRepo.get(sinkId));
final ActiveThreadLightDumpPublisher publisher = this.activeThreadLightDumpSinkRepo.get(sinkId);
emitMono(response, responseObserver, publisher);
this.activeThreadLightDumpSinkRepo.invalidate(sinkId);
}

@Override
public StreamObserver<PCmdActiveThreadCountRes> commandStreamActiveThreadCount(
StreamObserver<Empty> responseObserver
) {
public StreamObserver<PCmdActiveThreadCountRes> commandStreamActiveThreadCount(StreamObserver<Empty> responseObserver) {
ServerCallStreamObserver<Empty> serverResponseObserver = (ServerCallStreamObserver<Empty>) responseObserver;
return new FluxCommandResponseObserver<>(serverResponseObserver, this.activeThreadCountSinkRepo) {
return new ActiveThreadCountResponseStreamObserver(serverResponseObserver, this.activeThreadCountSinkRepo) {
@Override
protected long extractSinkId(PCmdActiveThreadCountRes response) {
return response.getCommonStreamResponse().getResponseId();
Expand All @@ -297,13 +300,13 @@ protected int extractSequence(PCmdActiveThreadCountRes response) {
};
}

private <T> void emitMono(T response, StreamObserver<Empty> responseObserver, MonoSink<T> sink) {
private <T> void emitMono(T response, StreamObserver<Empty> responseObserver, Publisher<T> sink) {
if (sink == null) {
logger.warn("Could not find echo sink: clusterKey = {}", getClusterKeyFromContext());
responseObserver.onError(new StatusException(Status.NOT_FOUND));
return;
}
sink.success(response);
sink.publish(response);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
Expand Down Expand Up @@ -356,5 +359,4 @@ public void onCompleted() {
}

}

}
Loading

0 comments on commit d9a305e

Please sign in to comment.