diff --git a/.github/workflows/build-main.yml b/.github/workflows/build-main.yml index 7f91cedba..104ac17e4 100644 --- a/.github/workflows/build-main.yml +++ b/.github/workflows/build-main.yml @@ -42,6 +42,9 @@ jobs: OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} run: | + export TC_USER="$(id -u):$(id -g)" + echo "tc user -> $TC_USER" + ./gradlew spotlessCheck ./gradlew test ./gradlew publishToSonatype -Prelease.forceSnapshot diff --git a/.github/workflows/build-pr.yml b/.github/workflows/build-pr.yml index 04ffd5f41..9f7012c7f 100644 --- a/.github/workflows/build-pr.yml +++ b/.github/workflows/build-pr.yml @@ -49,5 +49,8 @@ jobs: distribution: 'temurin' - name: Build Project run: | + export TC_USER="$(id -u):$(id -g)" + echo "tc user -> $TC_USER" + ./gradlew check -x test ./gradlew test diff --git a/build.gradle b/build.gradle index 3a1a604db..45a736898 100644 --- a/build.gradle +++ b/build.gradle @@ -82,9 +82,7 @@ subprojects { showStandardStreams false } - task allDeps(type: DependencyReportTask) { - - } + tasks.register('allDeps', DependencyReportTask) } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 9522b948c..b043bfa6a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -56,7 +56,7 @@ vertxGrpc = { module = "io.vertx:vertx-grpc", version.ref = "vertx" } log4jApi = { module = "org.apache.logging.log4j:log4j-api", version.ref = "log4j" } log4jCore = { module = "org.apache.logging.log4j:log4j-core", version.ref = "log4j" } -log4jSlf4j = { module = "org.apache.logging.log4j:log4j-slf4j-impl", version.ref = "log4j" } +log4jSlf4j = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version.ref = "log4j" } log4j12 = { module = "org.apache.logging.log4j:log4j-1.2-api", version.ref = "log4j" } autoServiceAnnotations = { module = "com.google.auto.service:auto-service-annotations", version.ref = "autoService"} diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ClientConnectionManager.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ClientConnectionManager.java index f6b7c0888..9bfcf44ee 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ClientConnectionManager.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ClientConnectionManager.java @@ -16,21 +16,15 @@ package io.etcd.jetcd.impl; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.function.Predicate; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.ClientBuilder; -import io.etcd.jetcd.support.Errors; import io.etcd.jetcd.support.Util; import io.grpc.CallOptions; import io.grpc.Channel; @@ -48,16 +42,9 @@ import io.vertx.core.VertxOptions; import io.vertx.grpc.VertxChannelBuilder; -import com.google.common.util.concurrent.ListenableFuture; - -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; - import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException; final class ClientConnectionManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ClientConnectionManager.class); - private final Object lock; private final ClientBuilder builder; private final ExecutorService executorService; @@ -237,52 +224,4 @@ public void start(Listener responseListener, Metadata headers) { return channelBuilder; } - - /** - * execute the task and retry it in case of failure. - * - * @param task a function that returns a new SourceFuture. - * @param resultConvert a function that converts Type S to Type T. - * @param doRetry a function that determines the retry condition base on SourceFuture error. - * @param Source type - * @param Converted Type. - * @return a CompletableFuture with type T. - */ - @SuppressWarnings("FutureReturnValueIgnored") - public CompletableFuture execute( - Callable> task, - Function resultConvert, - Predicate doRetry) { - - RetryPolicy> retryPolicy = new RetryPolicy>().handleIf(doRetry) - .onRetriesExceeded(e -> LOGGER.warn("maximum number of auto retries reached")) - .withBackoff(builder.retryDelay(), builder.retryMaxDelay(), builder.retryChronoUnit()); - - if (builder.retryMaxDuration() != null) { - retryPolicy = retryPolicy.withMaxDuration(builder.retryMaxDuration()); - } - - return Failsafe.with(retryPolicy).with(executorService) - .getAsyncExecution(execution -> { - CompletableFuture wrappedFuture = new CompletableFuture<>(); - ListenableFuture future = task.call(); - future.addListener(() -> { - try { - wrappedFuture.complete(future.get()); - execution.complete(wrappedFuture); - } catch (Exception error) { - if (Errors.isInvalidTokenError(error)) { - authCredential().refresh(); - } - if (Errors.isAuthStoreExpired(error)) { - authCredential().refresh(); - } - if (!execution.retryOn(error)) { - // permanent failure - wrappedFuture.completeExceptionally(error); - } - } - }, executorService); - }).thenCompose(f -> f.thenApply(resultConvert)); - } } diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java index 446d02368..e16f6d733 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java @@ -247,7 +247,9 @@ private void onNext(WatchResponse response) { // handle a special case when watch has been created and closed at the same time if (response.getCreated() && response.getCanceled() && response.getCancelReason() != null - && response.getCancelReason().contains("etcdserver: permission denied")) { + && (response.getCancelReason().contains("etcdserver: permission denied") || + response.getCancelReason().contains("etcdserver: invalid auth token"))) { + // potentially access token expired connectionManager().authCredential().refresh(); Status error = Status.Code.CANCELLED.toStatus().withDescription(response.getCancelReason()); diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterClientTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterClientTest.java index 3db7d5158..f70f1ed1b 100644 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterClientTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterClientTest.java @@ -16,46 +16,20 @@ package io.etcd.jetcd.impl; -import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; -import org.testcontainers.containers.Network; import io.etcd.jetcd.Client; -import io.etcd.jetcd.Cluster; -import io.etcd.jetcd.cluster.Member; import io.etcd.jetcd.test.EtcdClusterExtension; import static org.assertj.core.api.Assertions.assertThat; @Timeout(value = 30, unit = TimeUnit.SECONDS) public class ClusterClientTest { - private static final Network NETWORK = Network.newNetwork(); - - @RegisterExtension - public static final EtcdClusterExtension n1 = EtcdClusterExtension.builder() - .withNodes(1) - .withPrefix("n1") - .withNetwork(NETWORK) - .build(); - @RegisterExtension - public static final EtcdClusterExtension n2 = EtcdClusterExtension.builder() - .withNodes(1) - .withPrefix("n2") - .withNetwork(NETWORK) - .build(); - @RegisterExtension - public static final EtcdClusterExtension n3 = EtcdClusterExtension.builder() - .withNodes(1) - .withPrefix("n3") - .withNetwork(NETWORK) - .build(); - @RegisterExtension public static final EtcdClusterExtension cluster = EtcdClusterExtension.builder() .withNodes(3) @@ -68,61 +42,4 @@ public void testMemberList() throws ExecutionException, InterruptedException { assertThat(client.getClusterClient().listMember().get().getMembers()).hasSize(3); } } - - @Test - public void testMemberManagement() throws ExecutionException, InterruptedException, TimeoutException { - final Client client = Client.builder().endpoints(n1.clientEndpoints()).build(); - final Cluster clusterClient = client.getClusterClient(); - - Member m2 = clusterClient.addMember(n2.peerEndpoints()) - .get(5, TimeUnit.SECONDS) - .getMember(); - - assertThat(m2).isNotNull(); - assertThat(clusterClient.listMember().get().getMembers()).hasSize(2); - - /* - TODO: check - Member m3 = clusterClient.addMember(n3.peerEndpoints()) - .get(5, TimeUnit.SECONDS) - .getMember(); - - assertThat(m3).isNotNull(); - assertThat(clusterClient.listMember().get().getMembers()).hasSize(3); - */ - } - - @Test - public void testMemberManagementAddNonLearner() throws ExecutionException, InterruptedException, TimeoutException { - final Client client = Client.builder().endpoints(n1.clientEndpoints()).build(); - final Cluster clusterClient = client.getClusterClient(); - - Member m2 = clusterClient.addMember(n2.peerEndpoints(), false) - .get(5, TimeUnit.SECONDS) - .getMember(); - - assertThat(m2).isNotNull(); - assertThat(m2.isLearner()).isFalse(); - - List members = clusterClient.listMember().get().getMembers(); - assertThat(members).hasSize(2); - assertThat(members.stream().filter(Member::isLearner).findAny()).isEmpty(); - } - - @Test - public void testMemberManagementAddLearner() throws ExecutionException, InterruptedException, TimeoutException { - final Client client = Client.builder().endpoints(n1.clientEndpoints()).build(); - final Cluster clusterClient = client.getClusterClient(); - - Member m2 = clusterClient.addMember(n2.peerEndpoints(), true) - .get(5, TimeUnit.SECONDS) - .getMember(); - - assertThat(m2).isNotNull(); - assertThat(m2.isLearner()).isTrue(); - - List members = clusterClient.listMember().get().getMembers(); - assertThat(members).hasSize(2); - assertThat(members.stream().filter(Member::isLearner).findAny()).isPresent(); - } } diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterMembersTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterMembersTest.java new file mode 100644 index 000000000..2d2e6de35 --- /dev/null +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterMembersTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.etcd.jetcd.impl; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.Network; + +import io.etcd.jetcd.Client; +import io.etcd.jetcd.Cluster; +import io.etcd.jetcd.cluster.Member; +import io.etcd.jetcd.test.EtcdClusterExtension; + +import static org.assertj.core.api.Assertions.assertThat; + +@Timeout(value = 30, unit = TimeUnit.SECONDS) +public class ClusterMembersTest { + private static final Network NETWORK = Network.newNetwork(); + + @RegisterExtension + public final EtcdClusterExtension n1 = EtcdClusterExtension.builder() + .withNodes(1) + .withPrefix("n1") + .withNetwork(NETWORK) + .build(); + @RegisterExtension + public final EtcdClusterExtension n2 = EtcdClusterExtension.builder() + .withNodes(1) + .withPrefix("n2") + .withNetwork(NETWORK) + .build(); + @RegisterExtension + public final EtcdClusterExtension n3 = EtcdClusterExtension.builder() + .withNodes(1) + .withPrefix("n3") + .withNetwork(NETWORK) + .build(); + + @Test + public void testMemberManagement() throws ExecutionException, InterruptedException, TimeoutException { + final Client client = Client.builder().endpoints(n1.clientEndpoints()).build(); + final Cluster clusterClient = client.getClusterClient(); + + Member m2 = clusterClient.addMember(n2.peerEndpoints()) + .get(5, TimeUnit.SECONDS) + .getMember(); + + assertThat(m2).isNotNull(); + assertThat(clusterClient.listMember().get().getMembers()).hasSize(2); + + /* + TODO: check + Member m3 = clusterClient.addMember(n3.peerEndpoints()) + .get(5, TimeUnit.SECONDS) + .getMember(); + + assertThat(m3).isNotNull(); + assertThat(clusterClient.listMember().get().getMembers()).hasSize(3); + */ + } + + @Test + public void testMemberManagementAddNonLearner() throws ExecutionException, InterruptedException, TimeoutException { + final Client client = Client.builder().endpoints(n1.clientEndpoints()).build(); + final Cluster clusterClient = client.getClusterClient(); + + Member m2 = clusterClient.addMember(n2.peerEndpoints(), false) + .get(5, TimeUnit.SECONDS) + .getMember(); + + assertThat(m2).isNotNull(); + assertThat(m2.isLearner()).isFalse(); + + List members = clusterClient.listMember().get().getMembers(); + assertThat(members).hasSize(2); + assertThat(members.stream().filter(Member::isLearner).findAny()).isEmpty(); + } + + @Test + public void testMemberManagementAddLearner() throws ExecutionException, InterruptedException, TimeoutException { + final Client client = Client.builder().endpoints(n1.clientEndpoints()).build(); + final Cluster clusterClient = client.getClusterClient(); + + Member m2 = clusterClient.addMember(n2.peerEndpoints(), true) + .get(5, TimeUnit.SECONDS) + .getMember(); + + assertThat(m2).isNotNull(); + assertThat(m2.isLearner()).isTrue(); + + List members = clusterClient.listMember().get().getMembers(); + assertThat(members).hasSize(2); + assertThat(members.stream().filter(Member::isLearner).findAny()).isPresent(); + } +} diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchResumeTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchResumeTest.java index 282472ec6..acf081440 100644 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchResumeTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchResumeTest.java @@ -56,7 +56,7 @@ public void testWatchOnPut() throws Exception { try (Watcher watcher = watchClient.watch(key, ref::set)) { cluster.restart(); - kvClient.put(key, value).get(1, TimeUnit.SECONDS); + kvClient.put(key, value).get(); await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull()); diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTest.java index 10a3c3089..7727bd58e 100755 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTest.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -280,7 +281,7 @@ public void testWatchAndGet(final Client client) throws Exception { if (event.getEventType() == EventType.PUT) { ByteSequence key1 = event.getKeyValue().getKey(); - client.getKVClient().get(key1).whenComplete((r, t) -> { + Future unused = client.getKVClient().get(key1).whenComplete((r, t) -> { if (!r.getKvs().isEmpty()) { ref.set(r.getKvs().get(0)); } diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTokenExpireTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTokenExpireTest.java index c1f5834f3..bca6b8186 100644 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTokenExpireTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTokenExpireTest.java @@ -18,7 +18,6 @@ import java.io.File; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; @@ -30,12 +29,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.LoggerFactory; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.KV; import io.etcd.jetcd.Watch; import io.etcd.jetcd.auth.Permission; +import io.etcd.jetcd.options.WatchOption; import io.etcd.jetcd.test.EtcdClusterExtension; import static org.assertj.core.api.Assertions.assertThat; @@ -52,11 +53,13 @@ public class WatchTokenExpireTest { .withNodes(1) .withSsl(true) .withAdditionalArgs( - Arrays.asList("--auth-token", + List.of( + "--auth-token", "jwt,pub-key=/etc/ssl/etcd/server.pem,priv-key=/etc/ssl/etcd/server-key.pem,sign-method=RS256,ttl=1s")) .build(); - private static final ByteSequence key = TestUtil.randomByteSequence(); + private static final ByteSequence key = TestUtil.bytesOf("key"); + private static final ByteSequence keyEnd = TestUtil.bytesOf("key1"); private static final ByteSequence user = TestUtil.bytesOf("root"); private static final ByteSequence password = TestUtil.randomByteSequence(); @@ -73,7 +76,7 @@ private void setUpEnvironment() throws Exception { client.getAuthClient().roleAdd(role).get(); client.getAuthClient().userAdd(user, password).get(); // grant access only to given key - client.getAuthClient().roleGrantPermission(role, key, key, Permission.Type.READWRITE).get(); + client.getAuthClient().roleGrantPermission(role, key, keyEnd, Permission.Type.READWRITE).get(); client.getAuthClient().userGrantRole(user, role).get(); client.getAuthClient().authEnable().get(); @@ -87,7 +90,8 @@ private Client createAuthClient() throws Exception { .user(user) .password(password) .authority("etcd0") - .sslContext(b -> b.trustManager(caFile)).build(); + .sslContext(b -> b.trustManager(caFile)) + .build(); } @Test @@ -105,9 +109,15 @@ public void testRefreshExpiredToken() throws Exception { // watch should handle token refresh automatically // token is already expired when we attempt to create a watch - Watch.Watcher watcher = authWatchClient.watch(key, response -> { - modifications.incrementAndGet(); - }); + Watch.Watcher watcher = authWatchClient.watch( + key, + WatchOption.newBuilder().withRange(keyEnd).build(), + response -> { + modifications.incrementAndGet(); + }, + error -> { + LoggerFactory.getLogger(getClass()).info(">>> {}", error.toString()); + }); // create single thread pool, so that tasks are executed one after another ExecutorService executor = Executors.newFixedThreadPool(1); diff --git a/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/Etcd.java b/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/Etcd.java index 136dc5e32..8895b93e3 100644 --- a/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/Etcd.java +++ b/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/Etcd.java @@ -26,7 +26,7 @@ import org.testcontainers.containers.Network; public final class Etcd { - public static final String CONTAINER_IMAGE = "gcr.io/etcd-development/etcd:v3.5.4"; + public static final String CONTAINER_IMAGE = "gcr.io/etcd-development/etcd:v3.5.9"; public static final int ETCD_CLIENT_PORT = 2379; public static final int ETCD_PEER_PORT = 2380; public static final String ETCD_DATA_DIR = "/data.etcd"; diff --git a/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdClusterImpl.java b/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdClusterImpl.java index 7271c6d2e..39763146f 100644 --- a/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdClusterImpl.java +++ b/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdClusterImpl.java @@ -22,19 +22,27 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.Network; -import org.testcontainers.utility.ResourceReaper; + +import com.github.dockerjava.api.command.SyncDockerCmd; import static java.util.stream.Collectors.toList; public class EtcdClusterImpl implements EtcdCluster { + private static final Logger LOGGER = LoggerFactory.getLogger(EtcdClusterImpl.class); + private final List containers; private final String clusterName; private final Network network; private final List endpoints; + private final AtomicBoolean hookIsSet; public EtcdClusterImpl( String image, @@ -47,6 +55,7 @@ public EtcdClusterImpl( this.clusterName = clusterName; this.network = network; + this.hookIsSet = new AtomicBoolean(false); this.endpoints = IntStream.range(0, nodes) .mapToObj(i -> (prefix == null ? "etcd" : prefix + "etcd") + i) .collect(toList()); @@ -61,21 +70,37 @@ public EtcdClusterImpl( .collect(toList()); } + private void execQuietly(SyncDockerCmd cmd) { + try { + cmd.exec(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } + + private void performCleanup() { + if (this.network != null && network.getId() != null) { + execQuietly( + DockerClientFactory.instance().client().removeNetworkCmd(this.network.getId())); + } + } + @Override public void start() { + if (hookIsSet.compareAndSet(false, true)) { + // If the JVM stops without containers being stopped, try and stop the container. + Runtime + .getRuntime() + .addShutdownHook(new Thread(this::performCleanup)); + } + final CountDownLatch latch = new CountDownLatch(containers.size()); final AtomicReference failedToStart = new AtomicReference<>(); - ResourceReaper.instance().registerNetworkIdForCleanup(this.network.getId()); - for (EtcdContainer container : containers) { new Thread(() -> { try { container.start(); - - ResourceReaper.instance().registerContainerForCleanup( - container.getContainerId(), - container.getDockerImageName()); } catch (Exception e) { failedToStart.set(e); } finally { diff --git a/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdContainer.java b/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdContainer.java index 50598e7fd..00c13a1a1 100644 --- a/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdContainer.java +++ b/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdContainer.java @@ -109,6 +109,12 @@ protected void configure() { withNetworkAliases(node); withLogConsumer(new Slf4jLogConsumer(LOGGER).withPrefix(node)); withCommand(createCommand()); + withEnv("ETCD_LOG_LEVEL", "debug"); + + String user = System.getenv("TC_USER"); + if (user != null) { + withCreateContainerCmdModifier(c -> c.withUser(user)); + } waitingFor(Wait.forLogMessage(".*ready to serve client requests.*", 1)); } @@ -188,11 +194,15 @@ private String[] createCommand() { private static void deleteDataDirectory(Path dir) { if (dir != null && Files.exists(dir)) { - try (Stream stream = Files.walk(dir)) { - stream.sorted(Comparator.reverseOrder()) - .map(Path::toFile) - .forEach(File::delete); - } catch (IOException e) { + try { + try (Stream stream = Files.walk(dir)) { + stream.sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } catch (IOException e) { + LOGGER.error("Error deleting directory {}", dir, e); + } + } catch (Exception e) { LOGGER.error("Error deleting directory {}", dir, e); } } @@ -221,22 +231,22 @@ public String node() { } public InetSocketAddress getClientAddress() { - return new InetSocketAddress(getContainerIpAddress(), getMappedPort(Etcd.ETCD_CLIENT_PORT)); + return new InetSocketAddress(getHost(), getMappedPort(Etcd.ETCD_CLIENT_PORT)); } public URI clientEndpoint() { return newURI( - getContainerIpAddress(), + getHost(), getMappedPort(Etcd.ETCD_CLIENT_PORT)); } public InetSocketAddress getPeerAddress() { - return new InetSocketAddress(getContainerIpAddress(), getMappedPort(Etcd.ETCD_PEER_PORT)); + return new InetSocketAddress(getHost(), getMappedPort(Etcd.ETCD_PEER_PORT)); } public URI peerEndpoint() { return newURI( - getContainerIpAddress(), + getHost(), getMappedPort(Etcd.ETCD_PEER_PORT)); } diff --git a/jetcd-test/src/main/java/io/etcd/jetcd/test/EtcdClusterNameResolver.java b/jetcd-test/src/main/java/io/etcd/jetcd/test/EtcdClusterNameResolver.java index f356b38a0..f666ac0e6 100644 --- a/jetcd-test/src/main/java/io/etcd/jetcd/test/EtcdClusterNameResolver.java +++ b/jetcd-test/src/main/java/io/etcd/jetcd/test/EtcdClusterNameResolver.java @@ -17,14 +17,15 @@ package io.etcd.jetcd.test; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.etcd.jetcd.launcher.EtcdCluster; +import io.etcd.jetcd.launcher.EtcdContainer; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.NameResolver; @@ -117,17 +118,28 @@ private void doResolve() { throw new RuntimeException("Unable to find cluster " + authority); } - List servers = cluster.containers().stream() - .map(container -> { - return new EquivalentAddressGroup( + List servers = new ArrayList<>(); + + for (EtcdContainer container : cluster.containers()) { + try { + EquivalentAddressGroup ag = new EquivalentAddressGroup( container.getClientAddress(), Attributes.newBuilder() .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, container.node()) .build()); - }) - .collect(Collectors.toList()); - savedListener.onAddresses(servers, Attributes.EMPTY); + servers.add(ag); + } catch (IllegalStateException | IllegalArgumentException e) { + LOGGER.debug( + "Failure computing AddressGroup for cluster {}, {}", + cluster.clusterName(), + e.getMessage()); + } + } + + if (!servers.isEmpty()) { + savedListener.onAddresses(servers, Attributes.EMPTY); + } } catch (Exception e) { LOGGER.warn("Error wile getting list of servers", e);