Skip to content

Commit

Permalink
[#11256] Fix a bug that incorrectly assigned bucket to interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 2, 2024
1 parent 0e3fd4e commit 3f13445
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public GrpcSpanReceiverConfiguration() {

@Deprecated
@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy", matchIfMissing = true)
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy")
public static class LegacySpanInterceptorConfiguration {
@Bean
public FactoryBean<ScheduledExecutorService> grpcSpanStreamScheduler(@Qualifier("grpcSpanStreamProperties")
Expand All @@ -89,7 +89,7 @@ public FactoryBean<ServerInterceptor> spanStreamExecutorInterceptor(@Qualifier("
}

@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit")
@ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit", matchIfMissing = true)
public static class RateLimitServerInterceptorConfiguration {
@Bean
public Bandwidth spanBandwidth(@Value("${collector.receiver.grpc.span.stream.flow-control.rate-limit.capacity:5000}") long capacity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public GrpcStatReceiverConfiguration() {

@Deprecated
@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "legacy", matchIfMissing = true)
@ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "legacy")
public static class LegacySpanInterceptorConfiguration {

@Bean
Expand Down Expand Up @@ -92,7 +92,7 @@ public FactoryBean<ServerInterceptor> statStreamExecutorInterceptor(@Qualifier("


@Configuration
@ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "rate-limit")
@ConditionalOnProperty(name = "collector.receiver.grpc.stat.stream.flow-control.type", havingValue = "rate-limit", matchIfMissing = true)
public static class RateLimitServerInterceptorConfiguration {

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class RateLimitClientStreamServerInterceptor implements ServerInterceptor
private final Executor executor;

private final Bandwidth bandwidth;
private final Bucket bucket;


public RateLimitClientStreamServerInterceptor(String name, final Executor executor, Bandwidth bandwidth, final long throttledLoggerRatio) {
Expand All @@ -56,9 +55,6 @@ public RateLimitClientStreamServerInterceptor(String name, final Executor execut
this.executor = Context.currentContextExecutor(executor);

this.bandwidth = Objects.requireNonNull(bandwidth, "bandwidth");
this.bucket = Bucket.builder()
.addLimit(bandwidth)
.build();

this.rejectLogger = ThrottledLogger.getLogger(logger, throttledLoggerRatio);
this.bandwidthLogger = ThrottledLogger.getLogger(logger, throttledLoggerRatio);
Expand All @@ -78,6 +74,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re
final ServerCall.Listener<ReqT> listener = next.startCall(call, headers);

return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(listener) {
private final Bucket bucket = Bucket.builder().addLimit(bandwidth).build();

@Override
public void onMessage(final ReqT message) {
Expand Down

0 comments on commit 3f13445

Please sign in to comment.