Skip to content

Commit

Permalink
[#11256] Fix stream leak of FlowControl
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jul 31, 2024
1 parent dbba51f commit 7c598e1
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public GrpcSpanReceiverConfiguration() {

@Deprecated
@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy")
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy", matchIfMissing = true)
public static class LegacySpanInterceptorConfiguration {
@Bean
public FactoryBean<ScheduledExecutorService> grpcSpanStreamScheduler(@Qualifier("grpcSpanStreamProperties")
Expand All @@ -89,7 +89,7 @@ public FactoryBean<ServerInterceptor> spanStreamExecutorInterceptor(@Qualifier("
}

@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit", matchIfMissing = true)
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit")
public static class RateLimitServerInterceptorConfiguration {
@Bean
public Bandwidth spanBandwidth(@Value("${collector.receiver.grpc.span.stream.flow-control.rate-limit.capacity:5000}") long capacity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public GrpcStatReceiverConfiguration() {

@Deprecated
@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "legacy")
@ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "legacy", matchIfMissing = true)
public static class LegacySpanInterceptorConfiguration {

@Bean
Expand Down Expand Up @@ -92,7 +92,7 @@ public FactoryBean<ServerInterceptor> statStreamExecutorInterceptor(@Qualifier("


@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "rate-limit", matchIfMissing = true)
@ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "rate-limit")
public static class RateLimitServerInterceptorConfiguration {

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class GrpcStreamProperties {
@PositiveOrZero
private int callInitRequestCount = 1000;
@Positive
private int schedulerPeriodMillis = 64;
private int schedulerPeriodMillis = 1000;
private int schedulerRecoveryMessageCount = 10;

private long idleTimeout = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public void cancel(Status status, Metadata trailers) {
this.serverCall.close(status, trailers);
}

@Override
public boolean isCancelled() {
return this.serverCall.isCancelled();
}

@Override
public String toString() {
return "DefaultServerCallWrapper{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,26 @@

import io.grpc.Metadata;
import io.grpc.Status;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public class FlowControlRejectExecutionListener implements RejectedExecutionListener {
private final Logger logger = LogManager.getLogger(this.getClass());

private static final AtomicLongFieldUpdater<FlowControlRejectExecutionListener> REJECT =
AtomicLongFieldUpdater.newUpdater(FlowControlRejectExecutionListener.class, "rejectedExecutionCounter");

private static final Status STREAM_IDLE_TIMEOUT = Status.DEADLINE_EXCEEDED.withDescription("Stream idle timeout");

private final Logger logger = LogManager.getLogger(this.getClass());

private final String name;

private final AtomicLong rejectedExecutionCounter = new AtomicLong(0);
private volatile long rejectedExecutionCounter;

private final ServerCallWrapper serverCall;
private final long recoveryMessagesCount;

Expand All @@ -33,11 +38,19 @@ public FlowControlRejectExecutionListener(String name, ServerCallWrapper serverC

@Override
public void onRejectedExecution() {
this.rejectedExecutionCounter.incrementAndGet();
REJECT.incrementAndGet(this);
}

@Override
public void onSchedule() {
if (logger.isTraceEnabled()) {
logger.trace("Stream state check {} agent:{}/{}", this.name, serverCall.getApplicationName(), serverCall.getAgentId());
}
if (this.serverCall.isCancelled()) {
logger.info("Stream already cancelled:{} agent:{}/{}", this.name, serverCall.getApplicationName(), serverCall.getAgentId());
this.cancel();
return;
}
if (!expireIdleTimeout()) {
reject();
}
Expand All @@ -55,17 +68,20 @@ private boolean expireIdleTimeout() {


private void reject() {
final long currentRejectCount = this.rejectedExecutionCounter.get();
final long currentRejectCount = getRejectedExecutionCount();
if (currentRejectCount > 0) {
final long recovery = Math.min(currentRejectCount, recoveryMessagesCount);
this.rejectedExecutionCounter.addAndGet(-recovery);
REJECT.addAndGet(this, -recovery);
if (logger.isDebugEnabled()) {
logger.debug("flow-control request:{} {}/{}", recovery, serverCall.getApplicationName(), serverCall.getAgentId());
}
serverCall.request((int) recovery);
}
}

@Override
public long getRejectedExecutionCount() {
return rejectedExecutionCounter.get();
return REJECT.get(this);
}

@Override
Expand Down Expand Up @@ -102,16 +118,19 @@ public boolean isCancelled() {


private void idleTimeout() {
logger.info("stream idle timeout applicationName:{} agentId:{} {}", this.name, serverCall.getApplicationName(), serverCall.getAgentId());
serverCall.cancel(STREAM_IDLE_TIMEOUT, new Metadata());
logger.info("Stream idle timeout agent:{}/{} {}", this.name, serverCall.getApplicationName(), serverCall.getAgentId());
try {
serverCall.cancel(STREAM_IDLE_TIMEOUT, new Metadata());
} catch (IllegalStateException ex) {
logger.warn("Failed to cancel stream. agent:{}/{} {}", serverCall.getApplicationName(), serverCall.getAgentId(), ex.getMessage());
}
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("RejectedExecutionListener{");
sb.append("rejectedExecutionCounter=").append(rejectedExecutionCounter);
sb.append(", serverCall=").append(serverCall);
sb.append('}');
return sb.toString();
return "RejectedExecutionListener{" +
"rejectedExecutionCounter=" + rejectedExecutionCounter +
", serverCall=" + serverCall +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ public interface ServerCallWrapper {
SocketAddress getRemoteAddr();

void cancel(Status status, Metadata trailers);

boolean isCancelled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.grpc.Metadata;
import io.grpc.Status;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;
import java.util.concurrent.Future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ public RejectedExecutionListener getRejectedExecutionListener() {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("StreamExecutorRejectedExecutionRequestScheduler{");
sb.append("scheduledExecutorService=").append(scheduledExecutor);
sb.append(", rejectedExecutionListenerFactory=").append(rejectedExecutionListenerFactory);
sb.append('}');
return sb.toString();
return "StreamExecutorRejectedExecutionRequestScheduler{" +
"scheduledExecutorService=" + scheduledExecutor +
", rejectedExecutionListenerFactory=" + rejectedExecutionListenerFactory +
'}';
}

public static class Listener {
Expand Down Expand Up @@ -102,10 +101,9 @@ public void onMessage() {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Listener{");
sb.append("rejectedExecutionListener=").append(rejectedExecutionListener);
sb.append('}');
return sb.toString();
return "Listener{" +
"rejectedExecutionListener=" + rejectedExecutionListener +
'}';
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

/**
* @author jaehong.kim
Expand Down Expand Up @@ -86,10 +85,11 @@ public void run() {
}
});
// scheduleListener.onMessage();
} catch (RejectedExecutionException ree) {
} catch (Throwable th) {
// Defense code, need log ?
scheduleListener.onRejectedExecution();
throttledLogger.info("Failed to request. Rejected execution, count={}", scheduleListener.getRejectedExecutionCount());
throttledLogger.info("Failed to request. Rejected execution, count={} {}/{}",
scheduleListener.getRejectedExecutionCount(), serverCall.getApplicationName(), serverCall.getAgentId());
}
}

Expand Down

0 comments on commit 7c598e1

Please sign in to comment.