Skip to content

Commit

Permalink
[#11256] Fix stream leak of FlowControl
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jul 30, 2024
1 parent b2f3b26 commit e6a43f6
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public GrpcSpanReceiverConfiguration() {

@Deprecated
@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy")
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy", matchIfMissing = true)
public static class LegacySpanInterceptorConfiguration {
@Bean
public FactoryBean<ScheduledExecutorService> grpcSpanStreamScheduler(@Qualifier("grpcSpanStreamProperties")
Expand All @@ -89,7 +89,7 @@ public FactoryBean<ServerInterceptor> spanStreamExecutorInterceptor(@Qualifier("
}

@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit", matchIfMissing = true)
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit")
public static class RateLimitServerInterceptorConfiguration {
@Bean
public Bandwidth spanBandwidth(@Value("${collector.receiver.grpc.span.stream.flow-control.rate-limit.capacity:5000}") long capacity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class GrpcStreamProperties {
@PositiveOrZero
private int callInitRequestCount = 1000;
@Positive
private int schedulerPeriodMillis = 64;
private int schedulerPeriodMillis = 1000;
private int schedulerRecoveryMessageCount = 10;

private long idleTimeout = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public void cancel(Status status, Metadata trailers) {
this.serverCall.close(status, trailers);
}

@Override
public boolean isCancelled() {
return this.serverCall.isCancelled();

Check warning on line 56 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java#L56

Added line #L56 was not covered by tests
}

@Override
public String toString() {
return "DefaultServerCallWrapper{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.grpc.Metadata;
import io.grpc.Status;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;
import java.util.concurrent.Future;
Expand All @@ -24,6 +24,7 @@ public class FlowControlRejectExecutionListener implements RejectedExecutionList

private volatile Future<?> future;

int count ;
public FlowControlRejectExecutionListener(String name, ServerCallWrapper serverCall, long recoveryMessagesCount, IdleTimeout idleTimeout) {
this.name = Objects.requireNonNull(name, "name");
this.serverCall = Objects.requireNonNull(serverCall, "serverCall");
Expand All @@ -38,6 +39,17 @@ public void onRejectedExecution() {

@Override
public void onSchedule() {
if (logger.isTraceEnabled()) {
logger.trace("Stream state check {} agent:{}/{}", this.name, serverCall.getApplicationName(), serverCall.getAgentId());

Check warning on line 43 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java#L43

Added line #L43 was not covered by tests
}
if (this.serverCall.isCancelled()) {
logger.info("Stream already cancelled:{} agent:{}/{}", this.name, serverCall.getApplicationName(), serverCall.getAgentId());
final Future<?> copy = future;

Check warning on line 47 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java#L46-L47

Added lines #L46 - L47 were not covered by tests
if (copy != null) {
copy.cancel(false);

Check warning on line 49 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java#L49

Added line #L49 was not covered by tests
}
return;

Check warning on line 51 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java#L51

Added line #L51 was not covered by tests
}
if (!expireIdleTimeout()) {
reject();
}
Expand All @@ -59,6 +71,9 @@ private void reject() {
if (currentRejectCount > 0) {
final long recovery = Math.min(currentRejectCount, recoveryMessagesCount);
this.rejectedExecutionCounter.addAndGet(-recovery);
if (logger.isDebugEnabled()) {
logger.debug("flow-control request:{} {}/{}", recovery, serverCall.getApplicationName(), serverCall.getAgentId());

Check warning on line 75 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java#L75

Added line #L75 was not covered by tests
}
serverCall.request((int) recovery);
}
}
Expand Down Expand Up @@ -108,10 +123,9 @@ private void idleTimeout() {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("RejectedExecutionListener{");
sb.append("rejectedExecutionCounter=").append(rejectedExecutionCounter);
sb.append(", serverCall=").append(serverCall);
sb.append('}');
return sb.toString();
return "RejectedExecutionListener{" +

Check warning on line 126 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java#L126

Added line #L126 was not covered by tests
"rejectedExecutionCounter=" + rejectedExecutionCounter +
", serverCall=" + serverCall +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ public interface ServerCallWrapper {
SocketAddress getRemoteAddr();

void cancel(Status status, Metadata trailers);

boolean isCancelled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.grpc.Metadata;
import io.grpc.Status;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;
import java.util.concurrent.Future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ public RejectedExecutionListener getRejectedExecutionListener() {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("StreamExecutorRejectedExecutionRequestScheduler{");
sb.append("scheduledExecutorService=").append(scheduledExecutor);
sb.append(", rejectedExecutionListenerFactory=").append(rejectedExecutionListenerFactory);
sb.append('}');
return sb.toString();
return "StreamExecutorRejectedExecutionRequestScheduler{" +

Check warning on line 69 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java#L69

Added line #L69 was not covered by tests
"scheduledExecutorService=" + scheduledExecutor +
", rejectedExecutionListenerFactory=" + rejectedExecutionListenerFactory +
'}';
}

public static class Listener {
Expand Down Expand Up @@ -102,10 +101,9 @@ public void onMessage() {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Listener{");
sb.append("rejectedExecutionListener=").append(rejectedExecutionListener);
sb.append('}');
return sb.toString();
return "Listener{" +

Check warning on line 104 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java#L104

Added line #L104 was not covered by tests
"rejectedExecutionListener=" + rejectedExecutionListener +
'}';
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

/**
* @author jaehong.kim
Expand Down Expand Up @@ -86,10 +85,11 @@ public void run() {
}
});
// scheduleListener.onMessage();
} catch (RejectedExecutionException ree) {
} catch (Throwable th) {

Check warning on line 88 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java#L88

Added line #L88 was not covered by tests
// Defense code, need log ?
scheduleListener.onRejectedExecution();
throttledLogger.info("Failed to request. Rejected execution, count={}", scheduleListener.getRejectedExecutionCount());
throttledLogger.info("Failed to request. Rejected execution, count={} {}/{}",
scheduleListener.getRejectedExecutionCount(), serverCall.getApplicationName(), serverCall.getAgentId());

Check warning on line 92 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java#L91-L92

Added lines #L91 - L92 were not covered by tests
}
}

Expand Down

0 comments on commit e6a43f6

Please sign in to comment.