diff --git a/src/main/java/com/google/devtools/build/lib/remote/grpc/BUILD b/src/main/java/com/google/devtools/build/lib/remote/grpc/BUILD index cca37d8e1350a7..34f7a7863ecf3b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/grpc/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/grpc/BUILD @@ -16,6 +16,8 @@ java_library( name = "grpc", srcs = glob(["*.java"]), deps = [ + "//src/main/java/com/google/devtools/build/lib/concurrent", + "//third_party:guava", "//third_party:rxjava3", "//third_party/grpc:grpc-jar", ], diff --git a/src/main/java/com/google/devtools/build/lib/remote/grpc/ConnectionFactory.java b/src/main/java/com/google/devtools/build/lib/remote/grpc/ConnectionFactory.java index 462a77e869f55e..3e8cb67a75c9e2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/grpc/ConnectionFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/grpc/ConnectionFactory.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.remote.grpc; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import io.reactivex.rxjava3.core.Single; /** @@ -25,7 +26,10 @@ * *

Connection creation must be cancellable. Canceling connection creation must release (“close”) * the connection and all associated resources. + * + *

Implementations must be thread-safe. */ +@ThreadSafe public interface ConnectionFactory { /** Creates a new {@link Connection}. */ Single create(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/grpc/TokenBucket.java b/src/main/java/com/google/devtools/build/lib/remote/grpc/TokenBucket.java new file mode 100644 index 00000000000000..604ce66e6a3aef --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/grpc/TokenBucket.java @@ -0,0 +1,109 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.grpc; + +import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.subjects.BehaviorSubject; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedDeque; + +/** A container for tokens which is used for rate limiting. */ +@ThreadSafe +public class TokenBucket implements Closeable { + private final ConcurrentLinkedDeque tokens; + private final BehaviorSubject tokenBehaviorSubject; + + public TokenBucket() { + this(ImmutableList.of()); + } + + public TokenBucket(Collection initialTokens) { + tokens = new ConcurrentLinkedDeque<>(initialTokens); + tokenBehaviorSubject = BehaviorSubject.create(); + + if (!tokens.isEmpty()) { + tokenBehaviorSubject.onNext(tokens.getFirst()); + } + } + + /** Add a token to the bucket. */ + public void addToken(T token) { + tokens.addLast(token); + tokenBehaviorSubject.onNext(token); + } + + /** Returns current number of tokens in the bucket. */ + public int size() { + return tokens.size(); + } + + /** + * Returns a cold {@link Single} which will start the token acquisition process upon subscription. + */ + public Single acquireToken() { + return Single.create( + downstream -> + tokenBehaviorSubject.subscribe( + new Observer() { + Disposable upstream; + + @Override + public void onSubscribe(@NonNull Disposable d) { + upstream = d; + downstream.setDisposable(d); + } + + @Override + public void onNext(@NonNull T ignored) { + if (!downstream.isDisposed()) { + T token = tokens.pollFirst(); + if (token != null) { + downstream.onSuccess(token); + } + } + } + + @Override + public void onError(@NonNull Throwable e) { + downstream.onError(new IllegalStateException(e)); + } + + @Override + public void onComplete() { + if (!downstream.isDisposed()) { + downstream.onError(new IllegalStateException("closed")); + } + } + })); + } + + /** + * Closes the bucket and release all the tokens. + * + *

Subscriptions after closed to the Single returned by {@link TokenBucket#acquireToken()} will + * emit error. + */ + @Override + public void close() throws IOException { + tokens.clear(); + tokenBehaviorSubject.onComplete(); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index e4f9c2766696cb..02fa8cd79deb7d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -11,6 +11,7 @@ filegroup( srcs = glob(["**"]) + [ "//src/test/java/com/google/devtools/build/lib/remote/downloader:srcs", "//src/test/java/com/google/devtools/build/lib/remote/http:srcs", + "//src/test/java/com/google/devtools/build/lib/remote/grpc:srcs", "//src/test/java/com/google/devtools/build/lib/remote/logging:srcs", "//src/test/java/com/google/devtools/build/lib/remote/merkletree:srcs", "//src/test/java/com/google/devtools/build/lib/remote/options:srcs", diff --git a/src/test/java/com/google/devtools/build/lib/remote/grpc/BUILD b/src/test/java/com/google/devtools/build/lib/remote/grpc/BUILD new file mode 100644 index 00000000000000..8df04632cb3a8d --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/grpc/BUILD @@ -0,0 +1,31 @@ +load("@rules_java//java:defs.bzl", "java_test") + +package( + default_testonly = 1, + default_visibility = ["//src:__subpackages__"], +) + +filegroup( + name = "srcs", + testonly = 0, + srcs = glob(["**"]), + visibility = ["//src/test/java/com/google/devtools/build/lib/remote:__pkg__"], +) + +java_test( + name = "grpc", + srcs = glob(["*.java"]), + tags = [ + "requires-network", + ], + test_class = "com.google.devtools.build.lib.AllTests", + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote/grpc", + "//src/test/java/com/google/devtools/build/lib:test_runner", + "//third_party:guava", + "//third_party:junit4", + "//third_party:mockito", + "//third_party:rxjava3", + "//third_party:truth", + ], +) diff --git a/src/test/java/com/google/devtools/build/lib/remote/grpc/TokenBucketTest.java b/src/test/java/com/google/devtools/build/lib/remote/grpc/TokenBucketTest.java new file mode 100644 index 00000000000000..1b98c306435815 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/grpc/TokenBucketTest.java @@ -0,0 +1,211 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.grpc; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.ImmutableList; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.observers.TestObserver; +import java.io.IOException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link TokenBucket} */ +@RunWith(JUnit4.class) +public class TokenBucketTest { + + @Test + public void acquireToken_smoke() { + TokenBucket bucket = new TokenBucket<>(); + assertThat(bucket.size()).isEqualTo(0); + bucket.addToken(0); + assertThat(bucket.size()).isEqualTo(1); + + TestObserver observer = bucket.acquireToken().test(); + + observer.assertValue(0).assertComplete(); + assertThat(bucket.size()).isEqualTo(0); + } + + @Test + public void acquireToken_releaseInitialTokens() { + TokenBucket bucket = new TokenBucket<>(ImmutableList.of(0)); + assertThat(bucket.size()).isEqualTo(1); + + TestObserver observer = bucket.acquireToken().test(); + + observer.assertValue(0).assertComplete(); + assertThat(bucket.size()).isEqualTo(0); + } + + @Test + public void acquireToken_multipleInitialTokens_releaseFirstToken() { + TokenBucket bucket = new TokenBucket<>(ImmutableList.of(0, 1)); + assertThat(bucket.size()).isEqualTo(2); + + TestObserver observer = bucket.acquireToken().test(); + + observer.assertValue(0).assertComplete(); + assertThat(bucket.size()).isEqualTo(1); + } + + @Test + public void acquireToken_multipleInitialTokens_releaseSecondToken() { + TokenBucket bucket = new TokenBucket<>(ImmutableList.of(0, 1)); + assertThat(bucket.size()).isEqualTo(2); + bucket.acquireToken().test().assertValue(0).assertComplete(); + + TestObserver observer = bucket.acquireToken().test(); + + observer.assertValue(1).assertComplete(); + assertThat(bucket.size()).isEqualTo(0); + } + + @Test + public void acquireToken_releaseTokenToPreviousObserver() { + TokenBucket bucket = new TokenBucket<>(); + TestObserver observer = bucket.acquireToken().test(); + observer.assertEmpty(); + + bucket.addToken(0); + + observer.assertValue(0).assertComplete(); + assertThat(bucket.size()).isEqualTo(0); + } + + @Test + public void acquireToken_notReleaseTokenToDisposedObserver() { + TokenBucket bucket = new TokenBucket<>(); + TestObserver observer = bucket.acquireToken().test(); + + observer.dispose(); + bucket.addToken(0); + + observer.assertEmpty(); + assertThat(bucket.size()).isEqualTo(1); + } + + @Test + public void acquireToken_disposeAfterTokenAcquired() { + TokenBucket bucket = new TokenBucket<>(); + TestObserver observer = bucket.acquireToken().test(); + + bucket.addToken(0); + bucket.addToken(1); + + observer.assertValue(0).assertComplete(); + assertThat(bucket.size()).isEqualTo(1); + } + + @Test + public void acquireToken_multipleObservers_onlyOneCanAcquire() { + TokenBucket bucket = new TokenBucket<>(); + TestObserver observer1 = bucket.acquireToken().test(); + TestObserver observer2 = bucket.acquireToken().test(); + + bucket.addToken(0); + + if (!observer1.values().isEmpty()) { + observer1.assertValue(0).assertComplete(); + observer2.assertEmpty(); + + bucket.addToken(1); + observer2.assertValue(1).assertComplete(); + } else { + observer1.assertEmpty(); + observer2.assertValue(0).assertComplete(); + + bucket.addToken(1); + observer1.assertValue(1).assertComplete(); + } + } + + @Test + public void acquireToken_reSubscription_waitAvailableToken() { + TokenBucket bucket = new TokenBucket<>(); + bucket.addToken(0); + Single tokenSingle = bucket.acquireToken(); + + TestObserver observer1 = tokenSingle.test(); + TestObserver observer2 = tokenSingle.test(); + + observer1.assertValue(0).assertComplete(); + observer2.assertEmpty(); + } + + @Test + public void acquireToken_reSubscription_acquireNewToken() { + TokenBucket bucket = new TokenBucket<>(); + bucket.addToken(0); + Single tokenSingle = bucket.acquireToken(); + TestObserver observer1 = tokenSingle.test(); + TestObserver observer2 = tokenSingle.test(); + + bucket.addToken(1); + + observer1.assertValue(0).assertComplete(); + observer2.assertValue(1).assertComplete(); + } + + @Test + public void acquireToken_reSubscription_acquireNextToken() { + TokenBucket bucket = new TokenBucket<>(); + bucket.addToken(0); + bucket.addToken(1); + Single tokenSingle = bucket.acquireToken(); + + TestObserver observer1 = tokenSingle.test(); + TestObserver observer2 = tokenSingle.test(); + + observer1.assertValue(0).assertComplete(); + observer2.assertValue(1).assertComplete(); + } + + @Test + public void acquireToken_disposed_tokenRemains() { + TokenBucket bucket = new TokenBucket<>(); + TestObserver observer = bucket.acquireToken().test(); + observer.assertEmpty(); + + observer.dispose(); + bucket.addToken(0); + + assertThat(bucket.size()).isEqualTo(1); + } + + @Test + public void close_errorAfterClose() throws IOException { + TokenBucket bucket = new TokenBucket<>(); + bucket.addToken(0); + bucket.close(); + + TestObserver observer = bucket.acquireToken().test(); + + observer.assertError( + e -> e instanceof IllegalStateException && e.getMessage().contains("closed")); + } + + @Test + public void close_errorPreviousObservers() throws IOException { + TokenBucket bucket = new TokenBucket<>(); + TestObserver observer = bucket.acquireToken().test(); + + bucket.close(); + + observer.assertError( + e -> e instanceof IllegalStateException && e.getMessage().contains("closed")); + } +}