Skip to content

Commit

Permalink
etcd: update to v3.5.9
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Burgazzoli <lburgazzoli@gmail.com>
  • Loading branch information
lburgazzoli committed May 30, 2023
1 parent 6c59cf6 commit 879bb22
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 183 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ jobs:
OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }}
OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}
run: |
export TC_USER="$(id -u):$(id -g)"
echo "tc user -> $TC_USER"
./gradlew spotlessCheck
./gradlew test
./gradlew publishToSonatype -Prelease.forceSnapshot
3 changes: 3 additions & 0 deletions .github/workflows/build-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,8 @@ jobs:
distribution: 'temurin'
- name: Build Project
run: |
export TC_USER="$(id -u):$(id -g)"
echo "tc user -> $TC_USER"
./gradlew check -x test
./gradlew test
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ subprojects {
showStandardStreams false
}

task allDeps(type: DependencyReportTask) {

}
tasks.register('allDeps', DependencyReportTask)
}


2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ vertxGrpc = { module = "io.vertx:vertx-grpc", version.ref = "vertx" }

log4jApi = { module = "org.apache.logging.log4j:log4j-api", version.ref = "log4j" }
log4jCore = { module = "org.apache.logging.log4j:log4j-core", version.ref = "log4j" }
log4jSlf4j = { module = "org.apache.logging.log4j:log4j-slf4j-impl", version.ref = "log4j" }
log4jSlf4j = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version.ref = "log4j" }
log4j12 = { module = "org.apache.logging.log4j:log4j-1.2-api", version.ref = "log4j" }

autoServiceAnnotations = { module = "com.google.auto.service:auto-service-annotations", version.ref = "autoService"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,15 @@

package io.etcd.jetcd.impl;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.support.Errors;
import io.etcd.jetcd.support.Util;
import io.grpc.CallOptions;
import io.grpc.Channel;
Expand All @@ -48,16 +42,9 @@
import io.vertx.core.VertxOptions;
import io.vertx.grpc.VertxChannelBuilder;

import com.google.common.util.concurrent.ListenableFuture;

import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;

import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException;

final class ClientConnectionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientConnectionManager.class);

private final Object lock;
private final ClientBuilder builder;
private final ExecutorService executorService;
Expand Down Expand Up @@ -237,52 +224,4 @@ public void start(Listener<RespT> responseListener, Metadata headers) {

return channelBuilder;
}

/**
* execute the task and retry it in case of failure.
*
* @param task a function that returns a new SourceFuture.
* @param resultConvert a function that converts Type S to Type T.
* @param doRetry a function that determines the retry condition base on SourceFuture error.
* @param <S> Source type
* @param <T> Converted Type.
* @return a CompletableFuture with type T.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public <S, T> CompletableFuture<T> execute(
Callable<ListenableFuture<S>> task,
Function<S, T> resultConvert,
Predicate<Throwable> doRetry) {

RetryPolicy<CompletableFuture<S>> retryPolicy = new RetryPolicy<CompletableFuture<S>>().handleIf(doRetry)
.onRetriesExceeded(e -> LOGGER.warn("maximum number of auto retries reached"))
.withBackoff(builder.retryDelay(), builder.retryMaxDelay(), builder.retryChronoUnit());

if (builder.retryMaxDuration() != null) {
retryPolicy = retryPolicy.withMaxDuration(builder.retryMaxDuration());
}

return Failsafe.with(retryPolicy).with(executorService)
.getAsyncExecution(execution -> {
CompletableFuture<S> wrappedFuture = new CompletableFuture<>();
ListenableFuture<S> future = task.call();
future.addListener(() -> {
try {
wrappedFuture.complete(future.get());
execution.complete(wrappedFuture);
} catch (Exception error) {
if (Errors.isInvalidTokenError(error)) {
authCredential().refresh();
}
if (Errors.isAuthStoreExpired(error)) {
authCredential().refresh();
}
if (!execution.retryOn(error)) {
// permanent failure
wrappedFuture.completeExceptionally(error);
}
}
}, executorService);
}).thenCompose(f -> f.thenApply(resultConvert));
}
}
4 changes: 3 additions & 1 deletion jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ private void onNext(WatchResponse response) {

// handle a special case when watch has been created and closed at the same time
if (response.getCreated() && response.getCanceled() && response.getCancelReason() != null
&& response.getCancelReason().contains("etcdserver: permission denied")) {
&& (response.getCancelReason().contains("etcdserver: permission denied") ||
response.getCancelReason().contains("etcdserver: invalid auth token"))) {

// potentially access token expired
connectionManager().authCredential().refresh();
Status error = Status.Code.CANCELLED.toStatus().withDescription(response.getCancelReason());
Expand Down
83 changes: 0 additions & 83 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,20 @@

package io.etcd.jetcd.impl;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.Network;

import io.etcd.jetcd.Client;
import io.etcd.jetcd.Cluster;
import io.etcd.jetcd.cluster.Member;
import io.etcd.jetcd.test.EtcdClusterExtension;

import static org.assertj.core.api.Assertions.assertThat;

@Timeout(value = 30, unit = TimeUnit.SECONDS)
public class ClusterClientTest {
private static final Network NETWORK = Network.newNetwork();

@RegisterExtension
public static final EtcdClusterExtension n1 = EtcdClusterExtension.builder()
.withNodes(1)
.withPrefix("n1")
.withNetwork(NETWORK)
.build();
@RegisterExtension
public static final EtcdClusterExtension n2 = EtcdClusterExtension.builder()
.withNodes(1)
.withPrefix("n2")
.withNetwork(NETWORK)
.build();
@RegisterExtension
public static final EtcdClusterExtension n3 = EtcdClusterExtension.builder()
.withNodes(1)
.withPrefix("n3")
.withNetwork(NETWORK)
.build();

@RegisterExtension
public static final EtcdClusterExtension cluster = EtcdClusterExtension.builder()
.withNodes(3)
Expand All @@ -68,61 +42,4 @@ public void testMemberList() throws ExecutionException, InterruptedException {
assertThat(client.getClusterClient().listMember().get().getMembers()).hasSize(3);
}
}

@Test
public void testMemberManagement() throws ExecutionException, InterruptedException, TimeoutException {
final Client client = Client.builder().endpoints(n1.clientEndpoints()).build();
final Cluster clusterClient = client.getClusterClient();

Member m2 = clusterClient.addMember(n2.peerEndpoints())
.get(5, TimeUnit.SECONDS)
.getMember();

assertThat(m2).isNotNull();
assertThat(clusterClient.listMember().get().getMembers()).hasSize(2);

/*
TODO: check
Member m3 = clusterClient.addMember(n3.peerEndpoints())
.get(5, TimeUnit.SECONDS)
.getMember();
assertThat(m3).isNotNull();
assertThat(clusterClient.listMember().get().getMembers()).hasSize(3);
*/
}

@Test
public void testMemberManagementAddNonLearner() throws ExecutionException, InterruptedException, TimeoutException {
final Client client = Client.builder().endpoints(n1.clientEndpoints()).build();
final Cluster clusterClient = client.getClusterClient();

Member m2 = clusterClient.addMember(n2.peerEndpoints(), false)
.get(5, TimeUnit.SECONDS)
.getMember();

assertThat(m2).isNotNull();
assertThat(m2.isLearner()).isFalse();

List<Member> members = clusterClient.listMember().get().getMembers();
assertThat(members).hasSize(2);
assertThat(members.stream().filter(Member::isLearner).findAny()).isEmpty();
}

@Test
public void testMemberManagementAddLearner() throws ExecutionException, InterruptedException, TimeoutException {
final Client client = Client.builder().endpoints(n1.clientEndpoints()).build();
final Cluster clusterClient = client.getClusterClient();

Member m2 = clusterClient.addMember(n2.peerEndpoints(), true)
.get(5, TimeUnit.SECONDS)
.getMember();

assertThat(m2).isNotNull();
assertThat(m2.isLearner()).isTrue();

List<Member> members = clusterClient.listMember().get().getMembers();
assertThat(members).hasSize(2);
assertThat(members.stream().filter(Member::isLearner).findAny()).isPresent();
}
}
115 changes: 115 additions & 0 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/ClusterMembersTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2016-2021 The jetcd authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.etcd.jetcd.impl;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.Network;

import io.etcd.jetcd.Client;
import io.etcd.jetcd.Cluster;
import io.etcd.jetcd.cluster.Member;
import io.etcd.jetcd.test.EtcdClusterExtension;

import static org.assertj.core.api.Assertions.assertThat;

@Timeout(value = 30, unit = TimeUnit.SECONDS)
public class ClusterMembersTest {
private static final Network NETWORK = Network.newNetwork();

@RegisterExtension
public final EtcdClusterExtension n1 = EtcdClusterExtension.builder()
.withNodes(1)
.withPrefix("n1")
.withNetwork(NETWORK)
.build();
@RegisterExtension
public final EtcdClusterExtension n2 = EtcdClusterExtension.builder()
.withNodes(1)
.withPrefix("n2")
.withNetwork(NETWORK)
.build();
@RegisterExtension
public final EtcdClusterExtension n3 = EtcdClusterExtension.builder()
.withNodes(1)
.withPrefix("n3")
.withNetwork(NETWORK)
.build();

@Test
public void testMemberManagement() throws ExecutionException, InterruptedException, TimeoutException {
final Client client = Client.builder().endpoints(n1.clientEndpoints()).build();
final Cluster clusterClient = client.getClusterClient();

Member m2 = clusterClient.addMember(n2.peerEndpoints())
.get(5, TimeUnit.SECONDS)
.getMember();

assertThat(m2).isNotNull();
assertThat(clusterClient.listMember().get().getMembers()).hasSize(2);

/*
TODO: check
Member m3 = clusterClient.addMember(n3.peerEndpoints())
.get(5, TimeUnit.SECONDS)
.getMember();
assertThat(m3).isNotNull();
assertThat(clusterClient.listMember().get().getMembers()).hasSize(3);
*/
}

@Test
public void testMemberManagementAddNonLearner() throws ExecutionException, InterruptedException, TimeoutException {
final Client client = Client.builder().endpoints(n1.clientEndpoints()).build();
final Cluster clusterClient = client.getClusterClient();

Member m2 = clusterClient.addMember(n2.peerEndpoints(), false)
.get(5, TimeUnit.SECONDS)
.getMember();

assertThat(m2).isNotNull();
assertThat(m2.isLearner()).isFalse();

List<Member> members = clusterClient.listMember().get().getMembers();
assertThat(members).hasSize(2);
assertThat(members.stream().filter(Member::isLearner).findAny()).isEmpty();
}

@Test
public void testMemberManagementAddLearner() throws ExecutionException, InterruptedException, TimeoutException {
final Client client = Client.builder().endpoints(n1.clientEndpoints()).build();
final Cluster clusterClient = client.getClusterClient();

Member m2 = clusterClient.addMember(n2.peerEndpoints(), true)
.get(5, TimeUnit.SECONDS)
.getMember();

assertThat(m2).isNotNull();
assertThat(m2.isLearner()).isTrue();

List<Member> members = clusterClient.listMember().get().getMembers();
assertThat(members).hasSize(2);
assertThat(members.stream().filter(Member::isLearner).findAny()).isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testWatchOnPut() throws Exception {
try (Watcher watcher = watchClient.watch(key, ref::set)) {
cluster.restart();

kvClient.put(key, value).get(1, TimeUnit.SECONDS);
kvClient.put(key, value).get();

await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull());

Expand Down
3 changes: 2 additions & 1 deletion jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -280,7 +281,7 @@ public void testWatchAndGet(final Client client) throws Exception {
if (event.getEventType() == EventType.PUT) {
ByteSequence key1 = event.getKeyValue().getKey();

client.getKVClient().get(key1).whenComplete((r, t) -> {
Future<?> unused = client.getKVClient().get(key1).whenComplete((r, t) -> {
if (!r.getKvs().isEmpty()) {
ref.set(r.getKvs().get(0));
}
Expand Down
Loading

0 comments on commit 879bb22

Please sign in to comment.