diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java index eaaa9d190904..15cf1a0df8d8 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java @@ -88,11 +88,13 @@ public EnhancedDataSender get() { final ChannelFactoryBuilder channelFactoryBuilder = newChannelFactoryBuilder(sslEnable, clientRetryEnable); final ChannelFactory channelFactory = channelFactoryBuilder.build(); + + final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize(); if (clientRetryEnable) { - return new MetadataGrpcHedgingDataSender<>(collectorIp, collectorPort, messageConverter, channelFactory); + return new MetadataGrpcHedgingDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory); } - final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize(); + final int retryMaxCount = grpcTransportConfig.getMetadataRetryMaxCount(); final int retryDelayMillis = grpcTransportConfig.getMetadataRetryDelayMillis(); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java index 9ab2b623bb15..2fd0d7bdeb0a 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java @@ -60,7 +60,7 @@ public MetadataGrpcDataSender(String host, int port, int executorQueueSize, ChannelFactory channelFactory, int retryMaxCount, int retryDelayMillis) { super(host, port, executorQueueSize, messageConverter, channelFactory); - this.maxAttempts = getMaxAttempts(retryMaxCount); + this.maxAttempts = Math.max(retryMaxCount, 0); this.retryDelayMillis = retryDelayMillis; this.metadataStub = MetadataGrpc.newStub(managedChannel); @@ -73,19 +73,12 @@ public boolean isSuccess(PResult response) { } @Override - public void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCount) { - MetadataGrpcDataSender.this.scheduleNextRetry(request, remainingRetryCount); + public void scheduleNextRetry(GeneratedMessageV3 request, int retryCount) { + MetadataGrpcDataSender.this.scheduleNextRetry(request, retryCount); } }; } - private int getMaxAttempts(int retryMaxCount) { - if (retryMaxCount < 0) { - return 0; - } - return retryMaxCount; - } - private Timer newTimer(String name) { ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true); return new HashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512, false, MAX_PENDING_TIMEOUTS); @@ -109,6 +102,10 @@ public boolean send(T data) { @Override public boolean request(final T data) { + if (data == null) { + return true; + } + final Runnable convertAndRun = new Runnable() { @Override public void run() { @@ -118,7 +115,7 @@ public void run() { if (isDebug) { logger.debug("Request metadata={}", MessageFormatUtils.debugLog(message)); } - request0(message, maxAttempts); + request0(message, 0); } catch (Exception ex) { logger.info("Failed to request metadata={}", data, ex); } @@ -127,33 +124,33 @@ public void run() { try { executor.execute(convertAndRun); } catch (RejectedExecutionException reject) { - logger.info("Rejected metadata={}", data); + logger.info("Rejected metadata={}", data.getClass().getSimpleName()); return false; } return true; } // Request - private void request0(final GeneratedMessageV3 message, final int remainingRetryCount) { + private void request0(final GeneratedMessageV3 message, final int retryCount) { if (message instanceof PSqlMetaData) { final PSqlMetaData sqlMetaData = (PSqlMetaData) message; - final StreamObserver responseObserver = newResponseStream(message, remainingRetryCount); + final StreamObserver responseObserver = newResponseStream(message, retryCount); this.metadataStub.requestSqlMetaData(sqlMetaData, responseObserver); } else if (message instanceof PSqlUidMetaData) { final PSqlUidMetaData sqlUidMetaData = (PSqlUidMetaData) message; - final StreamObserver responseObserver = newResponseStream(message, remainingRetryCount); + final StreamObserver responseObserver = newResponseStream(message, retryCount); this.metadataStub.requestSqlUidMetaData(sqlUidMetaData, responseObserver); } else if (message instanceof PApiMetaData) { final PApiMetaData apiMetaData = (PApiMetaData) message; - final StreamObserver responseObserver = newResponseStream(message, remainingRetryCount); + final StreamObserver responseObserver = newResponseStream(message, retryCount); this.metadataStub.requestApiMetaData(apiMetaData, responseObserver); } else if (message instanceof PStringMetaData) { final PStringMetaData stringMetaData = (PStringMetaData) message; - final StreamObserver responseObserver = newResponseStream(message, remainingRetryCount); + final StreamObserver responseObserver = newResponseStream(message, retryCount); this.metadataStub.requestStringMetaData(stringMetaData, responseObserver); } else if (message instanceof PExceptionMetaData) { final PExceptionMetaData exceptionMetaData = (PExceptionMetaData) message; - final StreamObserver responseObserver = newResponseStream(message, remainingRetryCount); + final StreamObserver responseObserver = newResponseStream(message, retryCount); this.metadataStub.requestExceptionMetaData(exceptionMetaData, responseObserver); } else { logger.warn("Unsupported message {}", MessageFormatUtils.debugLog(message)); @@ -165,22 +162,22 @@ private StreamObserver newResponseStream(GeneratedMessageV3 message, in } // Retry - private void scheduleNextRetry(final GeneratedMessageV3 message, final int remainingRetryCount) { + private void scheduleNextRetry(final GeneratedMessageV3 message, final int retryCount) { if (shutdown) { if (isDebug) { - logger.debug("Request drop. Already shutdown request={}", MessageFormatUtils.debugLog(message)); + logger.debug("Request drop. Already shutdown metadata={}", MessageFormatUtils.debugLog(message)); } return; } - if (remainingRetryCount <= 0) { + if (retryCount > maxAttempts) { if (isDebug) { - logger.debug("Request drop. request={}, remainingRetryCount={}", MessageFormatUtils.debugLog(message), remainingRetryCount); + logger.debug("Request drop. metadata={}, retryCount={}", MessageFormatUtils.debugLog(message), retryCount); } return; } if (isDebug) { - logger.debug("Request retry. request={}, remainingRetryCount={}", MessageFormatUtils.debugLog(message), remainingRetryCount); + logger.debug("Request retry. metadata={}, retryCount={}", MessageFormatUtils.debugLog(message), retryCount); } final TimerTask timerTask = new TimerTask() { @Override @@ -191,7 +188,7 @@ public void run(Timeout timeout) throws Exception { if (shutdown) { return; } - request0(message, remainingRetryCount); + request0(message, retryCount); } }; diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java index 50e54d676603..24a6fbe4bc33 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java @@ -17,8 +17,11 @@ package com.navercorp.pinpoint.profiler.sender.grpc; import com.google.protobuf.GeneratedMessageV3; +import com.navercorp.pinpoint.common.profiler.concurrent.ExecutorFactory; +import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; import com.navercorp.pinpoint.common.profiler.message.MessageConverter; +import com.navercorp.pinpoint.grpc.ExecutorUtils; import com.navercorp.pinpoint.grpc.MessageFormatUtils; import com.navercorp.pinpoint.grpc.client.ChannelFactory; import com.navercorp.pinpoint.grpc.trace.MetadataGrpc; @@ -31,6 +34,9 @@ import com.navercorp.pinpoint.io.ResponseMessage; import io.grpc.stub.StreamObserver; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; @@ -42,14 +48,23 @@ public class MetadataGrpcHedgingDataSender extends AbstractGrpcDataSender private final AtomicLong requestIdGen = new AtomicLong(0); - public MetadataGrpcHedgingDataSender(String host, int port, + private final ExecutorService executor; + + public MetadataGrpcHedgingDataSender(String host, int port, int queueSize, MessageConverter messageConverter, ChannelFactory channelFactory) { super(host, port, messageConverter, channelFactory); this.metadataStub = MetadataGrpc.newStub(managedChannel); + this.executor = newExecutorService(name + "-Executor", queueSize); + } + + protected ExecutorService newExecutorService(String name, int senderExecutorQueueSize) { + ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true); + return ExecutorFactory.newFixedThreadPool(1, senderExecutorQueueSize, threadFactory); } + // Unsupported Operation @Override public boolean request(T data, int retry) { @@ -68,6 +83,25 @@ public boolean send(T data) { @Override public boolean request(final T data) { + Runnable sendTask = new Runnable() { + @Override + public void run() { + doRequest(data); + } + }; + + try { + this.executor.execute(sendTask); + } catch (RejectedExecutionException reject) { + if (tLogger.isWarnEnabled()) { + tLogger.warn("Rejected Metadata sendTask {}", tLogger.getCounter()); + } + return false; + } + return true; + } + + private boolean doRequest(T data) { try { final GeneratedMessageV3 message = messageConverter.toMessage(data); @@ -111,6 +145,7 @@ public void stop() { } this.shutdown = true; + ExecutorUtils.shutdownExecutorService(name, executor); super.releaseChannel(); } } \ No newline at end of file diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/RetryResponseStreamObserver.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/RetryResponseStreamObserver.java index 7dc5b4b8c9c3..475211046500 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/RetryResponseStreamObserver.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/RetryResponseStreamObserver.java @@ -32,13 +32,13 @@ public class RetryResponseStreamObserver implements StreamObserver retryScheduler; private final ReqT message; - private final int remainingRetryCount; + private final int retryCount; - public RetryResponseStreamObserver(Logger logger, RetryScheduler retryScheduler, ReqT message, int remainingRetryCount) { + public RetryResponseStreamObserver(Logger logger, RetryScheduler retryScheduler, ReqT message, int retryCount) { this.logger = Objects.requireNonNull(logger, "logger"); this.retryScheduler = Objects.requireNonNull(retryScheduler, "retryScheduler"); this.message = Objects.requireNonNull(message, "message"); - this.remainingRetryCount = remainingRetryCount; + this.retryCount = retryCount; } @Override @@ -51,7 +51,7 @@ public void onNext(ResT response) { } else { // Retry if (logger.isInfoEnabled()) { - logger.info("Request fail. request={}, result={}", logString(message), logString(response)); + logger.info("Request failed. PResult.getSuccess() is false. request={}, result={}", logString(message), logString(response)); } retryScheduler.scheduleNextRetry(message, nextRetryCount()); } @@ -68,8 +68,7 @@ public void onError(Throwable throwable) { } // Retry - final int remainingRetryCount = nextRetryCount(); - retryScheduler.scheduleNextRetry(message, remainingRetryCount); + retryScheduler.scheduleNextRetry(message, nextRetryCount()); } @Override @@ -77,7 +76,7 @@ public void onCompleted() { } private int nextRetryCount() { - return remainingRetryCount - 1; + return retryCount + 1; } private String logString(Object message) { diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/RetryScheduler.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/RetryScheduler.java index de3d83c0cb58..fd8213ea6d78 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/RetryScheduler.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/RetryScheduler.java @@ -21,6 +21,6 @@ */ public interface RetryScheduler { boolean isSuccess(ResT response); - void scheduleNextRetry(ReqT request, int remainingRetryCount); + void scheduleNextRetry(ReqT request, int retryCount); } diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSenderTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSenderTest.java index 61dd4e4d260c..e103db16ea81 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSenderTest.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSenderTest.java @@ -329,6 +329,6 @@ public void close() { } }; - return new MetadataGrpcHedgingDataSender<>("localhost", 1234, converter, factory); + return new MetadataGrpcHedgingDataSender<>("localhost", 1234, 1, converter, factory); } } \ No newline at end of file diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/MetadataClientMock.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/MetadataClientMock.java index 6bdb7c43ca88..e896fdb5ca4b 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/MetadataClientMock.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/MetadataClientMock.java @@ -78,15 +78,15 @@ public MetadataClientMock(final String host, final int port, final boolean agent this.channel = channelFactory.build(host, port); this.metadataStub = MetadataGrpc.newStub(channel); - this.retryScheduler = new RetryScheduler() { + this.retryScheduler = new RetryScheduler<>() { @Override public boolean isSuccess(PResult response) { return response.getSuccess(); } @Override - public void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCount) { - MetadataClientMock.this.scheduleNextRetry(request, remainingRetryCount); + public void scheduleNextRetry(GeneratedMessageV3 request, int retryCount) { + MetadataClientMock.this.scheduleNextRetry(request, retryCount); } }; }