Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #3969 #4222: deferring add events until cache is complete #4229

Merged
merged 1 commit into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## CHANGELOG

### 5.12.3
* Fix #3969: relist will not trigger sync events
* Fix #4222: backport of #4082 - to not process events until the cache is complete

### 5.12.2 (2022-04-06)
* Fix #3582: SSL truststore can be loaded in FIPS enabled environments
* Fix #3797: Implement SchemaSwap; generate CRD from model not owned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.cache.ProcessorListener.Notification;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
Expand All @@ -29,6 +32,8 @@ public class ProcessorStore<T extends HasMetadata> implements SyncableStore<T> {

private Cache<T> cache;
private SharedProcessor<T> processor;
private AtomicBoolean synced = new AtomicBoolean();
private List<String> deferredAdd = new ArrayList<>();

public ProcessorStore(Cache<T> cache, SharedProcessor<T> processor) {
this.cache = cache;
Expand All @@ -40,14 +45,26 @@ public void add(T obj) {
update(obj);
}

@Override
public void update(T obj) {
private Notification<T> updateInternal(T obj) {
T oldObj = this.cache.put(obj);
Notification<T> notification = null;
if (oldObj != null) {
this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj),
Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion()));
if (!Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion())) {
notification = new ProcessorListener.UpdateNotification<>(oldObj, obj);
}
} else if (synced.get()) {
notification = new ProcessorListener.AddNotification<>(obj);
} else {
this.processor.distribute(new ProcessorListener.AddNotification<>(obj), false);
deferredAdd.add(getKey(obj));
}
return notification;
}

@Override
public void update(T obj) {
Notification<T> notification = updateInternal(obj);
if (notification != null) {
this.processor.distribute(notification, false);
}
}

Expand Down Expand Up @@ -81,6 +98,11 @@ public T getByKey(String key) {

@Override
public void retainAll(Set<String> nextKeys) {
if (synced.compareAndSet(false, true)) {
deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull)
.forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false));
deferredAdd.clear();
}
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
Expand All @@ -90,11 +112,11 @@ public void retainAll(Set<String> nextKeys) {
String key = cache.getKey(v);
if (!nextKeys.contains(key)) {
cache.remove(v);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
}
});
}

@Override
public String getKey(T obj) {
return cache.getKey(obj);
Expand All @@ -106,4 +128,4 @@ public void resync() {
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -43,6 +45,9 @@ public void testEvents() {
ProcessorStore<Pod> processorStore = new ProcessorStore<>(podCache, processor);
Pod pod = new PodBuilder().withNewMetadata().withName("pod").endMetadata().build();

// initial sync complete
processorStore.retainAll(Collections.emptySet());

// add notification
processorStore.add(pod);

Expand All @@ -60,21 +65,19 @@ public void testEvents() {
Mockito.when(podCache.remove(pod)).thenReturn(pod);
processorStore.delete(pod);

Mockito.verify(processor, Mockito.times(4)).distribute(notificationCaptor.capture(), syncCaptor.capture());
Mockito.verify(processor, Mockito.times(3)).distribute(notificationCaptor.capture(), syncCaptor.capture());

List<Notification<Pod>> notifications = notificationCaptor.getAllValues();

assertThat(notifications.get(0)).isInstanceOf(AddNotification.class);
assertThat(notifications.get(1)).isInstanceOf(AddNotification.class);
assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class);
assertThat(notifications.get(3)).isInstanceOf(DeleteNotification.class);
assertThat(notifications.get(2)).isInstanceOf(DeleteNotification.class);

List<Boolean> syncValues = syncCaptor.getAllValues();

assertThat(syncValues.get(0)).isFalse();
assertThat(syncValues.get(1)).isFalse();
assertThat(syncValues.get(2)).isTrue(); // same object/revision, so it's sync
assertThat(syncValues.get(3)).isFalse();
assertThat(syncValues.get(2)).isFalse();
}

@Test
Expand All @@ -86,13 +89,20 @@ public void testSyncEvents() {

ProcessorStore<Pod> processorStore = new ProcessorStore<>(podCache, processor);

Pod pod = new PodBuilder().withNewMetadata().endMetadata().build();
Pod pod = new PodBuilder().withNewMetadata().withName("pod1").withResourceVersion("1").endMetadata().build();
Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").endMetadata().build();

// replace empty store with two values
processorStore.add(pod);
processorStore.add(pod2);

// add events should not be called until retainAll
Mockito.verify(processor, Mockito.times(0)).distribute(notificationCaptor.capture(), syncCaptor.capture());

List<Pod> pods = Arrays.asList(pod, pod2);

processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()));

// resync two values
processorStore.resync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,93 +168,98 @@ void testCreateOrReplaceWithDeleteExisting() throws Exception {
@Test
void testSuccessfulWaitUntilCondition() throws InterruptedException {
Pod pod1 = new PodBuilder().withNewMetadata()
.withName("pod1")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady1 = createReadyFrom(pod1, "False");
Pod ready1 = createReadyFrom(pod1, "True");
.withName("pod1")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady1 = ResourceTest.createReadyFrom(pod1, "False", "2");
Pod ready1 = ResourceTest.createReadyFrom(pod1, "True", "3");

Pod pod2 = new PodBuilder().withNewMetadata()
.withName("pod2")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady2 = createReadyFrom(pod2, "False");
Pod ready2 = createReadyFrom(pod2, "True");
.withName("pod2")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady2 = ResourceTest.createReadyFrom(pod2, "False", "4");
Pod ready2 = ResourceTest.createReadyFrom(pod2, "True", "5");

Predicate<HasMetadata> isReady = p -> "Pod".equals(p.getKind()) && ((Pod) p).getStatus().getConditions().stream()
.anyMatch(c -> "True".equals(c.getStatus()));
.anyMatch(c -> "True".equals(c.getStatus()));

// The pods are never ready if you request them directly.
ResourceTest.list(server, noReady1);
ResourceTest.list(server, noReady2);

server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready1, "MODIFIED"))
.done()
.once();

server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true").andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
.once();
server.expect().get().withPath(
"/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready1, "MODIFIED"))
.done()
.once();

server.expect().get().withPath(
"/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
.once();

KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build();
List<HasMetadata> results = client.resourceList(list).inNamespace("ns1")
.waitUntilCondition(isReady, 5, SECONDS);
.waitUntilCondition(isReady, 5, SECONDS);
assertThat(results)
.containsExactlyInAnyOrder(ready1, ready2);
.containsExactlyInAnyOrder(ready1, ready2);
}

@Test
void testPartialSuccessfulWaitUntilCondition() {
Pod pod1 = new PodBuilder().withNewMetadata()
.withName("pod1")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady1 = createReadyFrom(pod1, "False");
.withName("pod1")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady1 = ResourceTest.createReadyFrom(pod1, "False", "2");

Pod pod2 = new PodBuilder().withNewMetadata()
.withName("pod2")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady2 = createReadyFrom(pod2, "False");
Pod ready2 = createReadyFrom(pod2, "True");
.withName("pod2")
.withResourceVersion("1")
.withNamespace("ns1").and().build();
Pod noReady2 = ResourceTest.createReadyFrom(pod2, "False", "3");
Pod ready2 = ResourceTest.createReadyFrom(pod2, "True", "4");

Predicate<HasMetadata> isReady = p -> "Pod".equals(p.getKind()) && ((Pod) p).getStatus().getConditions().stream()
.anyMatch(c -> "True".equals(c.getStatus()));
.anyMatch(c -> "True".equals(c.getStatus()));

// The pods are never ready if you request them directly.
ResourceTest.list(server, noReady1);
ResourceTest.list(server, noReady2);

Status gone = new StatusBuilder()
.withCode(HTTP_GONE)
.build();
.withCode(HTTP_GONE)
.build();

// This pod has a non-retryable error.
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(gone, "ERROR"))
.done()
.once();
server.expect().get().withPath(
"/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(gone, "ERROR"))
.done()
.once();

// This pod succeeds.
server.expect().get().withPath("/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
.once();
server.expect().get().withPath(
"/api/v1/namespaces/ns1/pods?fieldSelector=metadata.name%3Dpod2&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500).andEmit(new WatchEvent(ready2, "MODIFIED"))
.done()
.once();

KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build();
final ListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata> ops = client.resourceList(list).inNamespace("ns1");
KubernetesClientTimeoutException ex = assertThrows(KubernetesClientTimeoutException.class, () ->
ops.waitUntilCondition(isReady, 5, SECONDS)
);
assertThat(ex.getResourcesNotReady())
KubernetesList list = new KubernetesListBuilder().withItems(pod1, pod2).build();
final ListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata> ops = client.resourceList(list).inNamespace("ns1");
KubernetesClientTimeoutException ex = assertThrows(KubernetesClientTimeoutException.class,
() -> ops.waitUntilCondition(isReady, 5, SECONDS));
assertThat(ex.getResourcesNotReady())
.containsExactly(pod1);
}

Expand Down
Loading