diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcSpanReceiverConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcSpanReceiverConfiguration.java index 15c5ee28b2ff..c0afa08e8a48 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcSpanReceiverConfiguration.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcSpanReceiverConfiguration.java @@ -63,7 +63,7 @@ public GrpcSpanReceiverConfiguration() { @Deprecated @Configuration - @ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy") + @ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy", matchIfMissing = true) public static class LegacySpanInterceptorConfiguration { @Bean public FactoryBean grpcSpanStreamScheduler(@Qualifier("grpcSpanStreamProperties") @@ -89,7 +89,7 @@ public FactoryBean spanStreamExecutorInterceptor(@Qualifier(" } @Configuration - @ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit", matchIfMissing = true) + @ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit") public static class RateLimitServerInterceptorConfiguration { @Bean public Bandwidth spanBandwidth(@Value("${collector.receiver.grpc.span.stream.flow-control.rate-limit.capacity:5000}") long capacity, diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStatReceiverConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStatReceiverConfiguration.java index 7a2881cf70be..3c958866aa31 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStatReceiverConfiguration.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStatReceiverConfiguration.java @@ -63,7 +63,7 @@ public GrpcStatReceiverConfiguration() { @Deprecated @Configuration - @ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "legacy") + @ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "legacy", matchIfMissing = true) public static class LegacySpanInterceptorConfiguration { @Bean @@ -92,7 +92,7 @@ public FactoryBean statStreamExecutorInterceptor(@Qualifier(" @Configuration - @ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "rate-limit", matchIfMissing = true) + @ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "rate-limit") public static class RateLimitServerInterceptorConfiguration { @Bean diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStreamProperties.java b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStreamProperties.java index 762a22e42b39..7f9a1c8251ff 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStreamProperties.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStreamProperties.java @@ -26,7 +26,7 @@ public class GrpcStreamProperties { @PositiveOrZero private int callInitRequestCount = 1000; @Positive - private int schedulerPeriodMillis = 64; + private int schedulerPeriodMillis = 1000; private int schedulerRecoveryMessageCount = 10; private long idleTimeout = -1; diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java index 75304e6b1be6..38191866d146 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java @@ -51,6 +51,11 @@ public void cancel(Status status, Metadata trailers) { this.serverCall.close(status, trailers); } + @Override + public boolean isCancelled() { + return this.serverCall.isCancelled(); + } + @Override public String toString() { return "DefaultServerCallWrapper{" + diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java index 5baf7a431f14..64e9ffe03de9 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java @@ -2,21 +2,26 @@ import io.grpc.Metadata; import io.grpc.Status; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Objects; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; public class FlowControlRejectExecutionListener implements RejectedExecutionListener { - private final Logger logger = LogManager.getLogger(this.getClass()); + + private static final AtomicLongFieldUpdater REJECT = + AtomicLongFieldUpdater.newUpdater(FlowControlRejectExecutionListener.class, "rejectedExecutionCounter"); private static final Status STREAM_IDLE_TIMEOUT = Status.DEADLINE_EXCEEDED.withDescription("Stream idle timeout"); + private final Logger logger = LogManager.getLogger(this.getClass()); + private final String name; - private final AtomicLong rejectedExecutionCounter = new AtomicLong(0); + private volatile long rejectedExecutionCounter; + private final ServerCallWrapper serverCall; private final long recoveryMessagesCount; @@ -33,11 +38,19 @@ public FlowControlRejectExecutionListener(String name, ServerCallWrapper serverC @Override public void onRejectedExecution() { - this.rejectedExecutionCounter.incrementAndGet(); + REJECT.incrementAndGet(this); } @Override public void onSchedule() { + if (logger.isTraceEnabled()) { + logger.trace("Stream state check {} agent:{}/{}", this.name, serverCall.getApplicationName(), serverCall.getAgentId()); + } + if (this.serverCall.isCancelled()) { + logger.info("Stream already cancelled:{} agent:{}/{}", this.name, serverCall.getApplicationName(), serverCall.getAgentId()); + this.cancel(); + return; + } if (!expireIdleTimeout()) { reject(); } @@ -55,17 +68,20 @@ private boolean expireIdleTimeout() { private void reject() { - final long currentRejectCount = this.rejectedExecutionCounter.get(); + final long currentRejectCount = getRejectedExecutionCount(); if (currentRejectCount > 0) { final long recovery = Math.min(currentRejectCount, recoveryMessagesCount); - this.rejectedExecutionCounter.addAndGet(-recovery); + REJECT.addAndGet(this, -recovery); + if (logger.isDebugEnabled()) { + logger.debug("flow-control request:{} {}/{}", recovery, serverCall.getApplicationName(), serverCall.getAgentId()); + } serverCall.request((int) recovery); } } @Override public long getRejectedExecutionCount() { - return rejectedExecutionCounter.get(); + return REJECT.get(this); } @Override @@ -102,16 +118,19 @@ public boolean isCancelled() { private void idleTimeout() { - logger.info("stream idle timeout applicationName:{} agentId:{} {}", this.name, serverCall.getApplicationName(), serverCall.getAgentId()); - serverCall.cancel(STREAM_IDLE_TIMEOUT, new Metadata()); + logger.info("Stream idle timeout agent:{}/{} {}", this.name, serverCall.getApplicationName(), serverCall.getAgentId()); + try { + serverCall.cancel(STREAM_IDLE_TIMEOUT, new Metadata()); + } catch (IllegalStateException ex) { + logger.warn("Failed to cancel stream. agent:{}/{} {}", serverCall.getApplicationName(), serverCall.getAgentId(), ex.getMessage()); + } } @Override public String toString() { - final StringBuilder sb = new StringBuilder("RejectedExecutionListener{"); - sb.append("rejectedExecutionCounter=").append(rejectedExecutionCounter); - sb.append(", serverCall=").append(serverCall); - sb.append('}'); - return sb.toString(); + return "RejectedExecutionListener{" + + "rejectedExecutionCounter=" + rejectedExecutionCounter + + ", serverCall=" + serverCall + + '}'; } } \ No newline at end of file diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/ServerCallWrapper.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/ServerCallWrapper.java index b92dcdd7a4b7..812570f276cf 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/ServerCallWrapper.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/ServerCallWrapper.java @@ -15,4 +15,6 @@ public interface ServerCallWrapper { SocketAddress getRemoteAddr(); void cancel(Status status, Metadata trailers); + + boolean isCancelled(); } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/SimpleRejectedExecutionListener.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/SimpleRejectedExecutionListener.java index 4789c7115949..973512bfb81c 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/SimpleRejectedExecutionListener.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/SimpleRejectedExecutionListener.java @@ -2,8 +2,8 @@ import io.grpc.Metadata; import io.grpc.Status; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Objects; import java.util.concurrent.Future; diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java index 4e2975ecf532..d8d71ee0d4dc 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java @@ -66,11 +66,10 @@ public RejectedExecutionListener getRejectedExecutionListener() { @Override public String toString() { - final StringBuilder sb = new StringBuilder("StreamExecutorRejectedExecutionRequestScheduler{"); - sb.append("scheduledExecutorService=").append(scheduledExecutor); - sb.append(", rejectedExecutionListenerFactory=").append(rejectedExecutionListenerFactory); - sb.append('}'); - return sb.toString(); + return "StreamExecutorRejectedExecutionRequestScheduler{" + + "scheduledExecutorService=" + scheduledExecutor + + ", rejectedExecutionListenerFactory=" + rejectedExecutionListenerFactory + + '}'; } public static class Listener { @@ -102,10 +101,9 @@ public void onMessage() { @Override public String toString() { - final StringBuilder sb = new StringBuilder("Listener{"); - sb.append("rejectedExecutionListener=").append(rejectedExecutionListener); - sb.append('}'); - return sb.toString(); + return "Listener{" + + "rejectedExecutionListener=" + rejectedExecutionListener + + '}'; } } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java index b7db2a6a897f..90163157f325 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java @@ -30,7 +30,6 @@ import java.util.Objects; import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; /** * @author jaehong.kim @@ -86,10 +85,11 @@ public void run() { } }); // scheduleListener.onMessage(); - } catch (RejectedExecutionException ree) { + } catch (Throwable th) { // Defense code, need log ? scheduleListener.onRejectedExecution(); - throttledLogger.info("Failed to request. Rejected execution, count={}", scheduleListener.getRejectedExecutionCount()); + throttledLogger.info("Failed to request. Rejected execution, count={} {}/{}", + scheduleListener.getRejectedExecutionCount(), serverCall.getApplicationName(), serverCall.getAgentId()); } }