Skip to content

Commit

Permalink
fix fabric8io#3969 fabric8io#4222: deferring add events until cache i…
Browse files Browse the repository at this point in the history
…s complete

also omitting resync events on relist
  • Loading branch information
shawkins committed Jun 23, 2022
1 parent d686ed0 commit 5c4069e
Show file tree
Hide file tree
Showing 5 changed files with 436 additions and 288 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## CHANGELOG

### 5.12.3
* Fix #3969: relist will not trigger sync events
* Fix #4222: backport of #4082 - to not process events until the cache is complete

### 5.12.2 (2022-04-06)
* Fix #3582: SSL truststore can be loaded in FIPS enabled environments
* Fix #3797: Implement SchemaSwap; generate CRD from model not owned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.cache.ProcessorListener.Notification;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
Expand All @@ -29,6 +32,8 @@ public class ProcessorStore<T extends HasMetadata> implements SyncableStore<T> {

private Cache<T> cache;
private SharedProcessor<T> processor;
private AtomicBoolean synced = new AtomicBoolean();
private List<String> deferredAdd = new ArrayList<>();

public ProcessorStore(Cache<T> cache, SharedProcessor<T> processor) {
this.cache = cache;
Expand All @@ -40,14 +45,26 @@ public void add(T obj) {
update(obj);
}

@Override
public void update(T obj) {
private Notification<T> updateInternal(T obj) {
T oldObj = this.cache.put(obj);
Notification<T> notification = null;
if (oldObj != null) {
this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj),
Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion()));
if (!Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion())) {
notification = new ProcessorListener.UpdateNotification<>(oldObj, obj);
}
} else if (synced.get()) {
notification = new ProcessorListener.AddNotification<>(obj);
} else {
this.processor.distribute(new ProcessorListener.AddNotification<>(obj), false);
deferredAdd.add(getKey(obj));
}
return notification;
}

@Override
public void update(T obj) {
Notification<T> notification = updateInternal(obj);
if (notification != null) {
this.processor.distribute(notification, false);
}
}

Expand Down Expand Up @@ -81,6 +98,11 @@ public T getByKey(String key) {

@Override
public void retainAll(Set<String> nextKeys) {
if (synced.compareAndSet(false, true)) {
deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull)
.forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false));
deferredAdd.clear();
}
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
Expand All @@ -90,11 +112,11 @@ public void retainAll(Set<String> nextKeys) {
String key = cache.getKey(v);
if (!nextKeys.contains(key)) {
cache.remove(v);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
}
});
}

@Override
public String getKey(T obj) {
return cache.getKey(obj);
Expand All @@ -106,4 +128,4 @@ public void resync() {
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -43,6 +45,9 @@ public void testEvents() {
ProcessorStore<Pod> processorStore = new ProcessorStore<>(podCache, processor);
Pod pod = new PodBuilder().withNewMetadata().withName("pod").endMetadata().build();

// initial sync complete
processorStore.retainAll(Collections.emptySet());

// add notification
processorStore.add(pod);

Expand All @@ -60,21 +65,19 @@ public void testEvents() {
Mockito.when(podCache.remove(pod)).thenReturn(pod);
processorStore.delete(pod);

Mockito.verify(processor, Mockito.times(4)).distribute(notificationCaptor.capture(), syncCaptor.capture());
Mockito.verify(processor, Mockito.times(3)).distribute(notificationCaptor.capture(), syncCaptor.capture());

List<Notification<Pod>> notifications = notificationCaptor.getAllValues();

assertThat(notifications.get(0)).isInstanceOf(AddNotification.class);
assertThat(notifications.get(1)).isInstanceOf(AddNotification.class);
assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class);
assertThat(notifications.get(3)).isInstanceOf(DeleteNotification.class);
assertThat(notifications.get(2)).isInstanceOf(DeleteNotification.class);

List<Boolean> syncValues = syncCaptor.getAllValues();

assertThat(syncValues.get(0)).isFalse();
assertThat(syncValues.get(1)).isFalse();
assertThat(syncValues.get(2)).isTrue(); // same object/revision, so it's sync
assertThat(syncValues.get(3)).isFalse();
assertThat(syncValues.get(2)).isFalse();
}

@Test
Expand All @@ -86,13 +89,20 @@ public void testSyncEvents() {

ProcessorStore<Pod> processorStore = new ProcessorStore<>(podCache, processor);

Pod pod = new PodBuilder().withNewMetadata().endMetadata().build();
Pod pod = new PodBuilder().withNewMetadata().withName("pod1").withResourceVersion("1").endMetadata().build();
Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").endMetadata().build();

// replace empty store with two values
processorStore.add(pod);
processorStore.add(pod2);

// add events should not be called until retainAll
Mockito.verify(processor, Mockito.times(0)).distribute(notificationCaptor.capture(), syncCaptor.capture());

List<Pod> pods = Arrays.asList(pod, pod2);

processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()));

// resync two values
processorStore.resync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,93 +168,98 @@ void testCreateOrReplaceWithDeleteExisting() throws Exception {
@Test
void testSuccessfulWaitUntilCondition() throws InterruptedException {
Pod pod1 = new PodBuilder().withNewMetadata()
.withName("pod1")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady1 = createReadyFrom(pod1, "False");
Pod ready1 = createReadyFrom(pod1, "True");
.withName("pod1")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady1 = ResourceTest.createReadyFrom(pod1, "False", "2");
Pod ready1 = ResourceTest.createReadyFrom(pod1, "True", "3");

Pod pod2 = new PodBuilder().withNewMetadata()
.withName("pod2")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady2 = createReadyFrom(pod2, "False");
Pod ready2 = createReadyFrom(pod2, "True");
.withName("pod2")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady2 = ResourceTest.createReadyFrom(pod2, "False", "4");
Pod ready2 = ResourceTest.createReadyFrom(pod2, "True", "5");

Predicate<HasMetadata> isReady = p -> "Pod".equals(p.getKind()) && ((Pod) p).getStatus().getConditions().stream()
.anyMatch(c -> "True".equals(c.getStatus()));
.anyMatch(c -> "True".equals(c.getStatus()));

// The pods are never ready if you request them directly.
ResourceTest.list(server, noReady1);
ResourceTest.list(server, noReady2);

server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready1, "MODIFIED"))
.done()
.once();

server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
.once();
server.expect().get().withPath(
"/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready1, "MODIFIED"))
.done()
.once();

server.expect().get().withPath(
"/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
.once();

KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build();
List<HasMetadata> results = client.resourceList(list).inNamespace("ns1")
.waitUntilCondition(isReady, 5, SECONDS);
.waitUntilCondition(isReady, 5, SECONDS);
assertThat(results)
.containsExactlyInAnyOrder(ready1, ready2);
.containsExactlyInAnyOrder(ready1, ready2);
}

@Test
void testPartialSuccessfulWaitUntilCondition() {
Pod pod1 = new PodBuilder().withNewMetadata()
.withName("pod1")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady1 = createReadyFrom(pod1, "False");
.withName("pod1")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady1 = ResourceTest.createReadyFrom(pod1, "False", "2");

Pod pod2 = new PodBuilder().withNewMetadata()
.withName("pod2")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady2 = createReadyFrom(pod2, "False");
Pod ready2 = createReadyFrom(pod2, "True");
.withName("pod2")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady2 = ResourceTest.createReadyFrom(pod2, "False", "3");
Pod ready2 = ResourceTest.createReadyFrom(pod2, "True", "4");

Predicate<HasMetadata> isReady = p -> "Pod".equals(p.getKind()) && ((Pod) p).getStatus().getConditions().stream()
.anyMatch(c -> "True".equals(c.getStatus()));
.anyMatch(c -> "True".equals(c.getStatus()));

// The pods are never ready if you request them directly.
ResourceTest.list(server, noReady1);
ResourceTest.list(server, noReady2);

Status gone = new StatusBuilder()
.withCode(HTTP_GONE)
.build();
.withCode(HTTP_GONE)
.build();

// This pod has a non-retryable error.
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(gone, "ERROR"))
.done()
.once();
server.expect().get().withPath(
"/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(gone, "ERROR"))
.done()
.once();

// This pod succeeds.
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
.once();
server.expect().get().withPath(
"/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
.once();

KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build();
final ListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata> ops = client.resourceList(list).inNamespace("ns1");
KubernetesClientTimeoutException ex = assertThrows(KubernetesClientTimeoutException.class, () ->
ops.waitUntilCondition(isReady, 5, SECONDS)
);
assertThat(ex.getResourcesNotReady())
KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build();
final ListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata> ops = client.resourceList(list).inNamespace("ns1");
KubernetesClientTimeoutException ex = assertThrows(KubernetesClientTimeoutException.class,
() -> ops.waitUntilCondition(isReady, 5, SECONDS));
assertThat(ex.getResourcesNotReady())
.containsExactly(pod1);
}

Expand Down
Loading

0 comments on commit 5c4069e

Please sign in to comment.