diff --git a/CHANGELOG.md b/CHANGELOG.md index 171abde8f49..d20bbee6e97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## CHANGELOG +### 5.12.3 +* Fix #3969: relist will not trigger sync events +* Fix #4222: backport of #4082 - to not process events until the cache is complete + ### 5.12.2 (2022-04-06) * Fix #3582: SSL truststore can be loaded in FIPS enabled environments * Fix #3797: Implement SchemaSwap; generate CRD from model not owned diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java index 60a6978ab6b..669340ade21 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java @@ -17,10 +17,13 @@ package io.fabric8.kubernetes.client.informers.cache; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.ProcessorListener.Notification; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; /** * Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs @@ -29,6 +32,8 @@ public class ProcessorStore implements SyncableStore { private Cache cache; private SharedProcessor processor; + private AtomicBoolean synced = new AtomicBoolean(); + private List deferredAdd = new ArrayList<>(); public ProcessorStore(Cache cache, SharedProcessor processor) { this.cache = cache; @@ -40,14 +45,26 @@ public void add(T obj) { update(obj); } - @Override - public void update(T obj) { + private Notification updateInternal(T obj) { T oldObj = this.cache.put(obj); + Notification notification = null; if (oldObj != null) { - this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj), - Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion())); + if (!Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion())) { + notification = new ProcessorListener.UpdateNotification<>(oldObj, obj); + } + } else if (synced.get()) { + notification = new ProcessorListener.AddNotification<>(obj); } else { - this.processor.distribute(new ProcessorListener.AddNotification<>(obj), false); + deferredAdd.add(getKey(obj)); + } + return notification; + } + + @Override + public void update(T obj) { + Notification notification = updateInternal(obj); + if (notification != null) { + this.processor.distribute(notification, false); } } @@ -81,6 +98,11 @@ public T getByKey(String key) { @Override public void retainAll(Set nextKeys) { + if (synced.compareAndSet(false, true)) { + deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull) + .forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false)); + deferredAdd.clear(); + } List current = cache.list(); if (nextKeys.isEmpty() && current.isEmpty()) { this.processor.distribute(l -> l.getHandler().onNothing(), false); @@ -90,11 +112,11 @@ public void retainAll(Set nextKeys) { String key = cache.getKey(v); if (!nextKeys.contains(key)) { cache.remove(v); - this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false); + this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false); } }); } - + @Override public String getKey(T obj) { return cache.getKey(obj); @@ -106,4 +128,4 @@ public void resync() { .forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true)); } -} +} \ No newline at end of file diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java index 129cc3b6fb1..70d28ee7917 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java @@ -25,8 +25,10 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -43,6 +45,9 @@ public void testEvents() { ProcessorStore processorStore = new ProcessorStore<>(podCache, processor); Pod pod = new PodBuilder().withNewMetadata().withName("pod").endMetadata().build(); + // initial sync complete + processorStore.retainAll(Collections.emptySet()); + // add notification processorStore.add(pod); @@ -60,21 +65,19 @@ public void testEvents() { Mockito.when(podCache.remove(pod)).thenReturn(pod); processorStore.delete(pod); - Mockito.verify(processor, Mockito.times(4)).distribute(notificationCaptor.capture(), syncCaptor.capture()); + Mockito.verify(processor, Mockito.times(3)).distribute(notificationCaptor.capture(), syncCaptor.capture()); List> notifications = notificationCaptor.getAllValues(); assertThat(notifications.get(0)).isInstanceOf(AddNotification.class); assertThat(notifications.get(1)).isInstanceOf(AddNotification.class); - assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class); - assertThat(notifications.get(3)).isInstanceOf(DeleteNotification.class); + assertThat(notifications.get(2)).isInstanceOf(DeleteNotification.class); List syncValues = syncCaptor.getAllValues(); assertThat(syncValues.get(0)).isFalse(); assertThat(syncValues.get(1)).isFalse(); - assertThat(syncValues.get(2)).isTrue(); // same object/revision, so it's sync - assertThat(syncValues.get(3)).isFalse(); + assertThat(syncValues.get(2)).isFalse(); } @Test @@ -86,13 +89,20 @@ public void testSyncEvents() { ProcessorStore processorStore = new ProcessorStore<>(podCache, processor); - Pod pod = new PodBuilder().withNewMetadata().endMetadata().build(); + Pod pod = new PodBuilder().withNewMetadata().withName("pod1").withResourceVersion("1").endMetadata().build(); Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").endMetadata().build(); // replace empty store with two values processorStore.add(pod); processorStore.add(pod2); + // add events should not be called until retainAll + Mockito.verify(processor, Mockito.times(0)).distribute(notificationCaptor.capture(), syncCaptor.capture()); + + List pods = Arrays.asList(pod, pod2); + + processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet())); + // resync two values processorStore.resync(); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java index d08a8041c6a..f89549c671d 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java @@ -168,93 +168,98 @@ void testCreateOrReplaceWithDeleteExisting() throws Exception { @Test void testSuccessfulWaitUntilCondition() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("ns1").and().build(); - Pod noReady1 = createReadyFrom(pod1, "False"); - Pod ready1 = createReadyFrom(pod1, "True"); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("ns1").and().build(); + Pod noReady1 = ResourceTest.createReadyFrom(pod1, "False", "2"); + Pod ready1 = ResourceTest.createReadyFrom(pod1, "True", "3"); Pod pod2 = new PodBuilder().withNewMetadata() - .withName("pod2") - .withResourceVersion("1") - .withNamespace("ns1").and().build(); - Pod noReady2 = createReadyFrom(pod2, "False"); - Pod ready2 = createReadyFrom(pod2, "True"); + .withName("pod2") + .withResourceVersion("1") + .withNamespace("ns1").and().build(); + Pod noReady2 = ResourceTest.createReadyFrom(pod2, "False", "4"); + Pod ready2 = ResourceTest.createReadyFrom(pod2, "True", "5"); Predicate isReady = p -> "Pod".equals(p.getKind()) && ((Pod) p).getStatus().getConditions().stream() - .anyMatch(c -> "True".equals(c.getStatus())); + .anyMatch(c -> "True".equals(c.getStatus())); // The pods are never ready if you request them directly. ResourceTest.list(server, noReady1); ResourceTest.list(server, noReady2); - server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(ready1, "MODIFIED")) - .done() - .once(); - - server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED")) - .done() - .once(); + server.expect().get().withPath( + "/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500).andEmit(new WatchEvent(ready1, "MODIFIED")) + .done() + .once(); + + server.expect().get().withPath( + "/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED")) + .done() + .once(); KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build(); List results = client.resourceList(list).inNamespace("ns1") - .waitUntilCondition(isReady, 5, SECONDS); + .waitUntilCondition(isReady, 5, SECONDS); assertThat(results) - .containsExactlyInAnyOrder(ready1, ready2); + .containsExactlyInAnyOrder(ready1, ready2); } @Test void testPartialSuccessfulWaitUntilCondition() { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("ns1").and().build(); - Pod noReady1 = createReadyFrom(pod1, "False"); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("ns1").and().build(); + Pod noReady1 = ResourceTest.createReadyFrom(pod1, "False", "2"); Pod pod2 = new PodBuilder().withNewMetadata() - .withName("pod2") - .withResourceVersion("1") - .withNamespace("ns1").and().build(); - Pod noReady2 = createReadyFrom(pod2, "False"); - Pod ready2 = createReadyFrom(pod2, "True"); + .withName("pod2") + .withResourceVersion("1") + .withNamespace("ns1").and().build(); + Pod noReady2 = ResourceTest.createReadyFrom(pod2, "False", "3"); + Pod ready2 = ResourceTest.createReadyFrom(pod2, "True", "4"); Predicate isReady = p -> "Pod".equals(p.getKind()) && ((Pod) p).getStatus().getConditions().stream() - .anyMatch(c -> "True".equals(c.getStatus())); + .anyMatch(c -> "True".equals(c.getStatus())); // The pods are never ready if you request them directly. ResourceTest.list(server, noReady1); ResourceTest.list(server, noReady2); Status gone = new StatusBuilder() - .withCode(HTTP_GONE) - .build(); + .withCode(HTTP_GONE) + .build(); // This pod has a non-retryable error. - server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") - .andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(gone, "ERROR")) - .done() - .once(); + server.expect().get().withPath( + "/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500).andEmit(new WatchEvent(gone, "ERROR")) + .done() + .once(); // This pod succeeds. - server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true") - .andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED")) - .done() - .once(); + server.expect().get().withPath( + "/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED")) + .done() + .once(); - KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build(); - final ListVisitFromServerGetDeleteRecreateWaitApplicable ops = client.resourceList(list).inNamespace("ns1"); - KubernetesClientTimeoutException ex = assertThrows(KubernetesClientTimeoutException.class, () -> - ops.waitUntilCondition(isReady, 5, SECONDS) - ); - assertThat(ex.getResourcesNotReady()) + KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build(); + final ListVisitFromServerGetDeleteRecreateWaitApplicable ops = client.resourceList(list).inNamespace("ns1"); + KubernetesClientTimeoutException ex = assertThrows(KubernetesClientTimeoutException.class, + () -> ops.waitUntilCondition(isReady, 5, SECONDS)); + assertThat(ex.getResourcesNotReady()) .containsExactly(pod1); } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java index 6e69a2abebc..ce226230f34 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java @@ -70,7 +70,11 @@ class ResourceTest { void testCreateOrReplace() { // Given Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); - server.expect().post().withPath("/api/v1/namespaces/test/pods").andReturn(HttpURLConnection.HTTP_CREATED, pod1).once(); + server.expect() + .post() + .withPath("/api/v1/namespaces/test/pods") + .andReturn(HttpURLConnection.HTTP_CREATED, pod1) + .once(); // When HasMetadata response = client.resource(pod1).createOrReplace(); @@ -78,12 +82,16 @@ void testCreateOrReplace() { // Then assertEquals(pod1, response); } - + @Test void testCreateOrReplaceString() { // Given Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); - server.expect().post().withPath("/api/v1/namespaces/test/pods").andReturn(HttpURLConnection.HTTP_CREATED, pod1).once(); + server.expect() + .post() + .withPath("/api/v1/namespaces/test/pods") + .andReturn(HttpURLConnection.HTTP_CREATED, pod1) + .once(); // When HasMetadata response = client.resource(Serialization.asYaml(pod1)).createOrReplace(); @@ -94,17 +102,22 @@ void testCreateOrReplaceString() { @Test void testGenericResourceFails() { - assertThrows(KubernetesClientException.class, () -> client.resource(Serialization.unmarshal("apiVersion: example.io/v1\n" - + "kind: GenericThatFails\n" - + "metadata:\n" - + " name: failure\n", GenericKubernetesResource.class))); + assertThrows(KubernetesClientException.class, + () -> client.resource(Serialization.unmarshal("apiVersion: example.io/v1\n" + + "kind: GenericThatFails\n" + + "metadata:\n" + + " name: failure\n", GenericKubernetesResource.class))); } @Test void testCreateOrReplaceWhenCreateFails() { // Given Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); - server.expect().post().withPath("/api/v1/namespaces/test/pods").andReturn(HttpURLConnection.HTTP_BAD_REQUEST, pod1).once(); + server.expect() + .post() + .withPath("/api/v1/namespaces/test/pods") + .andReturn(HttpURLConnection.HTTP_BAD_REQUEST, pod1) + .once(); NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable podOperation = client.resource(pod1); @@ -112,31 +125,35 @@ void testCreateOrReplaceWhenCreateFails() { assertThrows(KubernetesClientException.class, podOperation::createOrReplace); } - @Test - void testCreateWithExplicitNamespace() { - Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); + @Test + void testCreateWithExplicitNamespace() { + Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); - server.expect().post().withPath("/api/v1/namespaces/ns1/pods").andReturn(HttpURLConnection.HTTP_CREATED, pod1).once(); + server.expect() + .post() + .withPath("/api/v1/namespaces/ns1/pods") + .andReturn(HttpURLConnection.HTTP_CREATED, pod1) + .once(); - HasMetadata response = client.resource(pod1).inNamespace("ns1").createOrReplace(); - assertEquals(pod1, response); - } + HasMetadata response = client.resource(pod1).inNamespace("ns1").createOrReplace(); + assertEquals(pod1, response); + } - @Test - void testCreateOrReplaceWithDeleteExisting() throws Exception { - Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); + @Test + void testCreateOrReplaceWithDeleteExisting() throws Exception { + Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); - server.expect().delete().withPath("/api/v1/namespaces/ns1/pods/pod1").andReturn(HttpURLConnection.HTTP_OK, pod1).once(); - server.expect().post().withPath("/api/v1/namespaces/ns1/pods").andReturn(HttpURLConnection.HTTP_CREATED, pod1).once(); + server.expect().delete().withPath("/api/v1/namespaces/ns1/pods/pod1").andReturn(HttpURLConnection.HTTP_OK, pod1).once(); + server.expect().post().withPath("/api/v1/namespaces/ns1/pods").andReturn(HttpURLConnection.HTTP_CREATED, pod1).once(); - HasMetadata response = client.resource(pod1).inNamespace("ns1").deletingExisting().createOrReplace(); - assertEquals(pod1, response); + HasMetadata response = client.resource(pod1).inNamespace("ns1").deletingExisting().createOrReplace(); + assertEquals(pod1, response); - RecordedRequest request = server.getLastRequest(); - assertEquals(2, server.getRequestCount()); - assertEquals("/api/v1/namespaces/ns1/pods", request.getPath()); - assertEquals("POST", request.getMethod()); - } + RecordedRequest request = server.getLastRequest(); + assertEquals(2, server.getRequestCount()); + assertEquals("/api/v1/namespaces/ns1/pods", request.getPath()); + assertEquals("POST", request.getMethod()); + } @Test void itPassesPropagationPolicyWithDeleteExisting() throws InterruptedException { @@ -174,13 +191,17 @@ void testCreateOrReplaceWithDeleteExistingWithCreateFailed() { assertThrows(KubernetesClientException.class, podOperation::createOrReplace); } - @Test - void testRequire() { - server.expect().get().withPath("/api/v1/namespaces/ns1/pods/pod1").andReturn(HttpURLConnection.HTTP_NOT_FOUND, "").once(); - PodResource podOp = client.pods().inNamespace("ns1").withName("pod1"); + @Test + void testRequire() { + server.expect() + .get() + .withPath("/api/v1/namespaces/ns1/pods/pod1") + .andReturn(HttpURLConnection.HTTP_NOT_FOUND, "") + .once(); + PodResource podOp = client.pods().inNamespace("ns1").withName("pod1"); - Assertions.assertThrows(ResourceNotFoundException.class, podOp::require); - } + Assertions.assertThrows(ResourceNotFoundException.class, podOp::require); + } @Test void testDelete() { @@ -200,22 +221,27 @@ void testDelete() { assertFalse(deleted); } - @Test void testWatch() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() .withName("pod1") .withResourceVersion("1") - .withNamespace("test").and().build(); + .withNamespace("test") + .and() + .build(); - server.expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, pod1).once(); - server.expect().post().withPath("/api/v1/namespaces/test/pods").andReturn(201, pod1).once(); + server.expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, pod1).once(); + server.expect().post().withPath("/api/v1/namespaces/test/pods").andReturn(201, pod1).once(); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() + server.expect() + .get() + .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() .open() - .waitFor(1000).andEmit(new WatchEvent(pod1, "DELETED")) + .waitFor(1000) + .andEmit(new WatchEvent(pod1, "DELETED")) .done() - .always(); + .always(); final CountDownLatch latch = new CountDownLatch(1); @@ -234,25 +260,30 @@ public void onClose(WatcherException cause) { watch.close(); } - @Test void testWaitUntilReady() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().build(); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .build(); - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); + Pod noReady = createReadyFrom(pod1, "False", "2"); + Pod ready = createReadyFrom(pod1, "True", "3"); list(noReady); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(ready, "MODIFIED")) - .done() - .always(); - + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500) + .andEmit(new WatchEvent(ready, "MODIFIED")) + .done() + .always(); Pod p = client.resource(noReady).waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); @@ -265,7 +296,8 @@ private void list(Pod pod) { static void list(KubernetesMockServer server, Pod pod) { server.expect() .get() - .withPath("/api/v1/namespaces/"+pod.getMetadata().getNamespace()+"/pods?fieldSelector=metadata.name%3D"+pod.getMetadata().getName()) + .withPath("/api/v1/namespaces/" + pod.getMetadata().getNamespace() + "/pods?fieldSelector=metadata.name%3D" + + pod.getMetadata().getName()) .andReturn(200, new PodListBuilder().withItems(pod).withNewMetadata().withResourceVersion("1").endMetadata().build()) .once(); @@ -274,22 +306,32 @@ static void list(KubernetesMockServer server, Pod pod) { @Test void testWaitUntilExistsThenReady() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().build(); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .build(); - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); + Pod noReady = createReadyFrom(pod1, "False", "1"); + Pod ready = createReadyFrom(pod1, "True", "2"); // and again so that "periodicWatchUntilReady" successfully begins - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1").andReturn(200, noReady).times(2); - - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(100).andEmit(new WatchEvent(ready, "MODIFIED")) - .done() - .always(); + server.expect() + .get() + .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1") + .andReturn(200, noReady) + .times(2); + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(100) + .andEmit(new WatchEvent(ready, "MODIFIED")) + .done() + .always(); Pod p = client.pods().withName("pod1").waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); @@ -298,84 +340,108 @@ void testWaitUntilExistsThenReady() throws InterruptedException { @Test void testWaitUntilCondition() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().build(); - - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); - - Pod withConditionBeingFalse = new PodBuilder(pod1).withNewStatus() - .addNewCondition() - .withType("Ready") - .withStatus("True") - .endCondition() - .addNewCondition() - .withType("dummy") - .withStatus("False") - .endCondition() - .endStatus() - .build(); - - Pod withConditionBeingTrue = new PodBuilder(pod1).withNewStatus() - .addNewCondition() - .withType("Ready") - .withStatus("True") - .endCondition() - .addNewCondition() - .withType("Dummy") - .withStatus("True") - .endCondition() - .endStatus() - .build(); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .build(); + + Pod noReady = createReadyFrom(pod1, "False", "1"); + Pod ready = createReadyFrom(pod1, "True", "3"); + + Pod withConditionBeingFalse = new PodBuilder(pod1).editMetadata() + .withResourceVersion("2") + .endMetadata() + .withNewStatus() + .addNewCondition() + .withType("Ready") + .withStatus("True") + .endCondition() + .addNewCondition() + .withType("dummy") + .withStatus("False") + .endCondition() + .endStatus() + .build(); + + Pod withConditionBeingTrue = new PodBuilder(pod1).editMetadata() + .withResourceVersion("4") + .endMetadata() + .withNewStatus() + .addNewCondition() + .withType("Ready") + .withStatus("True") + .endCondition() + .addNewCondition() + .withType("Dummy") + .withStatus("True") + .endCondition() + .endStatus() + .build(); // at first the pod is non-ready list(noReady); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(1000).andEmit(new WatchEvent(ready, "MODIFIED")) - .waitFor(2000).andEmit(new WatchEvent(withConditionBeingFalse, "MODIFIED")) - .waitFor(2500).andEmit(new WatchEvent(withConditionBeingTrue, "MODIFIED")) - .done() - .always(); - + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(1000) + .andEmit(new WatchEvent(ready, "MODIFIED")) + .waitFor(2000) + .andEmit(new WatchEvent(withConditionBeingFalse, "MODIFIED")) + .waitFor(2500) + .andEmit(new WatchEvent(withConditionBeingTrue, "MODIFIED")) + .done() + .always(); - Pod p = client.pods().withName("pod1").waitUntilCondition( - r -> r.getStatus().getConditions() - .stream() - .anyMatch(c -> "Dummy".equals(c.getType()) && "True".equals(c.getStatus())), - 8, SECONDS - ); + Pod p = client.pods() + .withName("pod1") + .waitUntilCondition( + r -> r.getStatus() + .getConditions() + .stream() + .anyMatch(c -> "Dummy".equals(c.getType()) && "True".equals(c.getStatus())), + 8, SECONDS); assertThat(p.getStatus().getConditions()) - .extracting("type", "status") - .containsExactly(tuple("Ready", "True"), tuple("Dummy", "True")); + .extracting("type", "status") + .containsExactly(tuple("Ready", "True"), tuple("Dummy", "True")); } @Test void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().build(); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .build(); - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); + Pod noReady = createReadyFrom(pod1, "False", "2"); + Pod ready = createReadyFrom(pod1, "True", "3"); Status status = new StatusBuilder() - .withCode(HttpURLConnection.HTTP_FORBIDDEN) - .build(); + .withCode(HttpURLConnection.HTTP_FORBIDDEN) + .build(); // once not ready, to begin watch list(noReady); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(status, "ERROR")) - .waitFor(500).andEmit(new WatchEvent(ready, "MODIFIED")) - .done() - .once(); + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500) + .andEmit(new WatchEvent(status, "ERROR")) + .waitFor(500) + .andEmit(new WatchEvent(ready, "MODIFIED")) + .done() + .once(); Pod p = client.resource(noReady).waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); @@ -384,32 +450,43 @@ void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { @Test void testRetryOnErrorEventDuringWait() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().build(); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .build(); - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); + Pod noReady = createReadyFrom(pod1, "False", "2"); + Pod ready = createReadyFrom(pod1, "True", "3"); Status status = new StatusBuilder() - .withCode(HttpURLConnection.HTTP_FORBIDDEN) - .build(); + .withCode(HttpURLConnection.HTTP_FORBIDDEN) + .build(); // once not ready, to begin watch list(noReady); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(status, "ERROR")) - .done() - .once(); - - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(ready, "MODIFIED")) - .done() - .once(); + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500) + .andEmit(new WatchEvent(status, "ERROR")) + .done() + .once(); + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500) + .andEmit(new WatchEvent(ready, "MODIFIED")) + .done() + .once(); Pod p = client.resource(noReady).waitUntilReady(5, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); @@ -418,12 +495,14 @@ void testRetryOnErrorEventDuringWait() throws InterruptedException { @Test void testSkipWatchIfAlreadyMatchingCondition() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().build(); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .build(); - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); + Pod noReady = createReadyFrom(pod1, "False", "2"); + Pod ready = createReadyFrom(pod1, "True", "3"); // once not ready, to begin watch list(ready); @@ -437,23 +516,30 @@ void testRetryWatchOnHttpGone() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() .withName("pod1") .withResourceVersion("1") - .withNamespace("test").and().build(); + .withNamespace("test") + .and() + .build(); - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); + Pod noReady = createReadyFrom(pod1, "False", "2"); + Pod ready = createReadyFrom(pod1, "True", "3"); Status status = new StatusBuilder() - .withCode(HTTP_GONE) - .build(); + .withCode(HTTP_GONE) + .build(); // once not ready, to begin watch list(noReady); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(500).andEmit(new WatchEvent(status, "ERROR")) - .done() - .once(); + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(500) + .andEmit(new WatchEvent(status, "ERROR")) + .done() + .once(); list(ready); @@ -463,48 +549,60 @@ void testRetryWatchOnHttpGone() throws InterruptedException { @Test void testWaitOnConditionDeleted() throws InterruptedException { Pod ready = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().withNewStatus() - .addNewCondition() - .withType("Ready") - .withStatus("True") - .endCondition() - .endStatus() - .build(); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .withNewStatus() + .addNewCondition() + .withType("Ready") + .withStatus("True") + .endCondition() + .endStatus() + .build(); list(ready); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(1000).andEmit(new WatchEvent(ready, "DELETED")) - .done() - .once(); - + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(1000) + .andEmit(new WatchEvent(ready, "DELETED")) + .done() + .once(); - Pod p = client.pods().withName("pod1").waitUntilCondition(Objects::isNull,8, SECONDS); + Pod p = client.pods().withName("pod1").waitUntilCondition(Objects::isNull, 8, SECONDS); assertNull(p); } @Test void testCreateAndWaitUntilReady() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withNamespace("test").and().build(); + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .build(); - Pod noReady = createReadyFrom(pod1, "False"); - Pod ready = createReadyFrom(pod1, "True"); + Pod noReady = createReadyFrom(pod1, "False", "2"); + Pod ready = createReadyFrom(pod1, "True", "3"); list(noReady); server.expect().post().withPath("/api/v1/namespaces/test/pods").andReturn(201, noReady).once(); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket() - .open() - .waitFor(1000).andEmit(new WatchEvent(ready, "MODIFIED")) - .done() - .always(); - + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(1000) + .andEmit(new WatchEvent(ready, "MODIFIED")) + .done() + .always(); Pod p = client.resource(noReady).createOrReplaceAnd().waitUntilReady(10, SECONDS); Assert.assertTrue(Readiness.isPodReady(p)); @@ -513,15 +611,14 @@ void testCreateAndWaitUntilReady() throws InterruptedException { @Test void testFromServerGet() { Pod pod = new PodBuilder().withNewMetadata() - .withName("pod1") - .withNamespace("test") - .withResourceVersion("1") - .and() - .build(); + .withName("pod1") + .withNamespace("test") + .withResourceVersion("1") + .and() + .build(); server.expect().get().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, pod).once(); - HasMetadata response = client.resource(pod).fromServer().get(); assertEquals(pod, response); } @@ -531,28 +628,36 @@ void testFromServerGet() { void testFromServerWaitUntilConditionAlwaysGetsResourceFromServer() throws Exception { // Given final Pod conditionNotMetPod = new PodBuilder().withNewMetadata() - .withName("pod") - .withNamespace("test") - .addToLabels("CONDITION", "NOT_MET") - .endMetadata().build(); + .withName("pod") + .withNamespace("test") + .withResourceVersion("1") + .addToLabels("CONDITION", "NOT_MET") + .endMetadata() + .build(); final Pod conditionMetPod = new PodBuilder().withNewMetadata() - .withName("pod") - .withNamespace("test") - .withResourceVersion("1") - .addToLabels("CONDITION", "MET") - .endMetadata() - .build(); + .withName("pod") + .withNamespace("test") + .withResourceVersion("2") + .addToLabels("CONDITION", "MET") + .endMetadata() + .build(); list(conditionNotMetPod); - server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod&resourceVersion=1&allowWatchBookmarks=true&watch=true") - .andUpgradeToWebSocket().open() - .immediately().andEmit(new WatchEvent(conditionNotMetPod, "MODIFIED")) - .waitFor(10).andEmit(new WatchEvent(conditionMetPod, "MODIFIED")) - .done() - .once(); + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .immediately() + .andEmit(new WatchEvent(conditionNotMetPod, "MODIFIED")) + .waitFor(10) + .andEmit(new WatchEvent(conditionMetPod, "MODIFIED")) + .done() + .once(); // When HasMetadata response = client - .resource(new PodBuilder(conditionMetPod).build()) - .waitUntilCondition(p -> "MET".equals(p.getMetadata().getLabels().get("CONDITION")), 1, SECONDS); + .resource(new PodBuilder(conditionMetPod).build()) + .waitUntilCondition(p -> "MET".equals(p.getMetadata().getLabels().get("CONDITION")), 1, SECONDS); // Then assertEquals(conditionMetPod, response); assertEquals(2, server.getRequestCount()); @@ -561,25 +666,27 @@ void testFromServerWaitUntilConditionAlwaysGetsResourceFromServer() throws Excep @Test void testWaitNullDoesntExist() throws InterruptedException { server.expect() - .get() - .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1") - .andReturn(200, - new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build()) - .once(); + .get() + .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1") + .andReturn(200, + new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build()) + .once(); Pod p = client.pods().withName("pod1").waitUntilCondition(Objects::isNull, 1, SECONDS); assertNull(p); } - private static Pod createReadyFrom(Pod pod, String status) { + static Pod createReadyFrom(Pod pod, String status, String resourceVersion) { return new PodBuilder(pod) - .withNewStatus() + .editMetadata() + .withResourceVersion(resourceVersion) + .endMetadata() + .withNewStatus() .addNewCondition() - .withType("Ready") - .withStatus(status) + .withType("Ready") + .withStatus(status) .endCondition() - .endStatus() - .build(); + .endStatus() + .build(); } } -