From 9ab00d4d429881372f27c7a234dc75d0f4fd5b41 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Sat, 15 Jan 2022 10:37:16 -0500 Subject: [PATCH] fix #3587: adding inform support for limit/batch fetching --- CHANGELOG.md | 1 + .../kubernetes/client/dsl/Informable.java | 15 ++++++ .../client/dsl/base/BaseOperation.java | 20 ++++++- .../client/informers/ListerWatcher.java | 4 +- .../informers/cache/ProcessorStore.java | 40 +++++++------- .../client/informers/cache/Reflector.java | 30 ++++++++--- .../client/informers/cache/SyncableStore.java | 22 ++++---- .../informers/cache/ProcessorStoreTest.java | 13 +++-- .../client/informers/cache/ReflectorTest.java | 4 +- .../DefaultSharedIndexInformerResyncTest.java | 1 - .../DefaultSharedIndexInformerIT.java | 24 +++++++++ .../kubernetes/client/mock/InformTest.java | 54 +++++++++++++++++++ 12 files changed, 180 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 553eb3fd9cb..f8e7eb44c5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ #### Improvements * Fix #3674: allows the connect and websocket timeouts to apply to watches instead of a hardcoded timeout * Fix #3651: Introduce SchemaFrom annotation as escape hatch (CRD Generator) +* Fix #3587: adding inform support for limit/batch fetching * Fix #3734: extract static finalizer validation method #### Dependency Upgrade diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java index 92dcb20bb95..bf4ab0f6062 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java @@ -37,6 +37,21 @@ public interface Informable { */ Informable withIndexers(Map>> indexers); + /** + * Set the limit to the number of resources to list at one time. This means that longer + * lists will take multiple requests to fetch. + *

If the list fails to complete it will be re-attempted with this limit, rather than + * falling back to the full list. You should ensure that your handlers are either async or + * fast acting to prevent long delays in list processing that may cause results to expire + * before reaching the end of the list. + *

WARNING As noted in the go client: "paginated lists are always served directly from + * etcd, which is significantly less efficient and may lead to serious performance and + * scalability problems." + * @param limit of a items in a list fetch + * @return the current {@link Informable} + */ + Informable withLimit(Long limit); + /** * Similar to a {@link Watch}, but will attempt to handle failures after successfully started. * and provides a store of all the current resources. diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java index 6e569b90c36..70827e8589a 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java @@ -103,7 +103,9 @@ public class BaseOperation listType; + // informable state private Map>> indexers; + private Long limit; protected BaseOperation(OperationContext ctx) { super(ctx); @@ -404,7 +406,7 @@ public L list() { @Override public L list(Integer limitVal, String continueVal) { - return list(new ListOptionsBuilder().withLimit(Long.parseLong(limitVal.toString())).withContinue(continueVal).build()); + return list(new ListOptionsBuilder().withLimit(limitVal.longValue()).withContinue(continueVal).build()); } @Override @@ -931,8 +933,22 @@ public WritableOperation dryRun(boolean isDryRun) { public Informable withIndexers(Map>> indexers) { BaseOperation result = newInstance(context); result.indexers = indexers; + result.limit = this.limit; return result; } + + @Override + public BaseOperation withLimit(Long limit) { + BaseOperation result = newInstance(context); + result.indexers = this.indexers; + result.limit = limit; + return result; + } + + @Override + public Long getLimit() { + return this.limit; + } @Override public SharedIndexInformer inform(ResourceEventHandler handler, long resync) { @@ -958,7 +974,7 @@ private DefaultSharedIndexInformer createInformer(long resync) { } // use the local context / namespace but without a resourceVersion - DefaultSharedIndexInformer informer = new DefaultSharedIndexInformer<>(getType(), this.withResourceVersion(null), resync, Runnable::run); // just run the event notification in the websocket thread + DefaultSharedIndexInformer informer = new DefaultSharedIndexInformer<>(getType(), this.withResourceVersion(null).withLimit(this.limit), resync, Runnable::run); // just run the event notification in the websocket thread if (indexers != null) { informer.addIndexers(indexers); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ListerWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ListerWatcher.java index d7dd0ca638a..1d1e062a2fe 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ListerWatcher.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ListerWatcher.java @@ -29,7 +29,9 @@ public interface ListerWatcher { Watch watch(ListOptions params, Watcher watcher); - L list(); + L list(ListOptions listOptions); + + Long getLimit(); String getNamespace(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java index efeb979cdac..60a6978ab6b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java @@ -17,11 +17,10 @@ package io.fabric8.kubernetes.client.informers.cache; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; import java.util.List; -import java.util.Map; import java.util.Objects; +import java.util.Set; /** * Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs @@ -45,7 +44,8 @@ public void add(T obj) { public void update(T obj) { T oldObj = this.cache.put(obj); if (oldObj != null) { - this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj), false); + this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj), + Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion())); } else { this.processor.distribute(new ProcessorListener.AddNotification<>(obj), false); } @@ -80,26 +80,24 @@ public T getByKey(String key) { } @Override - public void replace(List list) { - Map oldState = cache.replace(list); - - if (list.isEmpty() && oldState.isEmpty()) { - this.processor.distribute(l -> l.getHandler().onNothing(), false); + public void retainAll(Set nextKeys) { + List current = cache.list(); + if (nextKeys.isEmpty() && current.isEmpty()) { + this.processor.distribute(l -> l.getHandler().onNothing(), false); + return; } - - // now that the store is up-to-date, process the notifications - for (T newValue : list) { - T old = oldState.remove(cache.getKey(newValue)); - if (old == null) { - this.processor.distribute(new ProcessorListener.AddNotification<>(newValue), false); - } else { - boolean same = Objects.equals(KubernetesResourceUtil.getResourceVersion(old), KubernetesResourceUtil.getResourceVersion(newValue)); - this.processor.distribute(new ProcessorListener.UpdateNotification<>(old, newValue), same); + current.forEach(v -> { + String key = cache.getKey(v); + if (!nextKeys.contains(key)) { + cache.remove(v); + this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false); } - } - // deletes are not marked as sync=true in keeping with the old code - oldState.values() - .forEach(old -> this.processor.distribute(new ProcessorListener.DeleteNotification<>(old, true), false)); + }); + } + + @Override + public String getKey(T obj) { + return cache.getKey(obj); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java index c88ff11cb15..d9b84152771 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java @@ -23,9 +23,12 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.informers.ListerWatcher; +import io.fabric8.kubernetes.client.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; public class Reflector> { @@ -49,10 +52,6 @@ public Reflector(Class apiTypeClass, ListerWatcher listerWatcher, Synca this.watch = new AtomicReference<>(null); } - protected L getList() { - return listerWatcher.list(); - } - public void stop() { running = false; stopWatcher(); @@ -74,11 +73,26 @@ private synchronized void stopWatcher() { */ public void listSyncAndWatch() { running = true; - final L list = getList(); - final String latestResourceVersion = list.getMetadata().getResourceVersion(); + KubernetesResourceList result = null; + String continueVal = null; + Set nextKeys = new LinkedHashSet<>(); + do { + result = listerWatcher + .list(new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal).build()); + result.getItems().forEach(i -> { + String key = store.getKey(i); + // process the updates immediately so we don't need to hold the item + store.update(i); + nextKeys.add(key); + }); + continueVal = result.getMetadata().getContinue(); + } while (Utils.isNotNullOrEmpty(continueVal)); + + store.retainAll(nextKeys); + + final String latestResourceVersion = result.getMetadata().getResourceVersion(); lastSyncResourceVersion = latestResourceVersion; - log.debug("Listing items ({}) for resource {} v{}", list.getItems().size(), apiTypeClass, latestResourceVersion); - store.replace(list.getItems()); + log.debug("Listing items ({}) for resource {} v{}", nextKeys.size(), apiTypeClass, latestResourceVersion); startWatcher(latestResourceVersion); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SyncableStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SyncableStore.java index e58a169b41f..fad0b13e022 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SyncableStore.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SyncableStore.java @@ -15,7 +15,7 @@ */ package io.fabric8.kubernetes.client.informers.cache; -import java.util.List; +import java.util.Set; public interface SyncableStore extends Store { @@ -41,17 +41,21 @@ public interface SyncableStore extends Store { void delete(T obj); /** - * Deletes the contents of the store, using instead the given list. - * Store takes ownership of the list, you should not reference it - * after calling this function - * - * @param list list of objects + * Sends a resync event for each item. */ - void replace(List list); + void resync(); + + /** + * Get the key for the given object + * @param obj object + * @return the key + */ + String getKey(T obj); /** - * Sends a resync event for each item. + * Retain only the values with keys in the given set + * @param nextKeys to retain */ - void resync(); + void retainAll(Set nextKeys); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java index a79eed8ce5a..129cc3b6fb1 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java @@ -25,7 +25,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -70,7 +69,12 @@ public void testEvents() { assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class); assertThat(notifications.get(3)).isInstanceOf(DeleteNotification.class); - assertTrue(syncCaptor.getAllValues().stream().allMatch(s->!s.booleanValue())); + List syncValues = syncCaptor.getAllValues(); + + assertThat(syncValues.get(0)).isFalse(); + assertThat(syncValues.get(1)).isFalse(); + assertThat(syncValues.get(2)).isTrue(); // same object/revision, so it's sync + assertThat(syncValues.get(3)).isFalse(); } @Test @@ -86,13 +90,14 @@ public void testSyncEvents() { Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").endMetadata().build(); // replace empty store with two values - processorStore.replace(Arrays.asList(pod, pod2)); + processorStore.add(pod); + processorStore.add(pod2); // resync two values processorStore.resync(); // relist with deletes - processorStore.replace(Collections.emptyList()); + processorStore.retainAll(Collections.emptySet()); Mockito.verify(processor, Mockito.times(6)).distribute(notificationCaptor.capture(), syncCaptor.capture()); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReflectorTest.java index 86ccdfd2730..8dfbc21d77c 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReflectorTest.java @@ -36,7 +36,7 @@ class ReflectorTest { void testStateFlags() { ListerWatcher mock = Mockito.mock(ListerWatcher.class); PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); - Mockito.when(mock.list()).thenReturn(list); + Mockito.when(mock.list(Mockito.any())).thenReturn(list); Reflector reflector = new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class)); @@ -70,7 +70,7 @@ void testStateFlags() { void testNonHttpGone() { ListerWatcher mock = Mockito.mock(ListerWatcher.class); PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); - Mockito.when(mock.list()).thenReturn(list); + Mockito.when(mock.list(Mockito.any())).thenReturn(list); Reflector reflector = new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class)); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java index eb15032afdc..2859500c5d3 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java @@ -17,7 +17,6 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; -import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.informers.ListerWatcher; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.DisplayName; diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/DefaultSharedIndexInformerIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/DefaultSharedIndexInformerIT.java index ca33efd791e..1de6d18123d 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/DefaultSharedIndexInformerIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/DefaultSharedIndexInformerIT.java @@ -87,6 +87,30 @@ public void testSharedIndexInformerGetsSingleUpdates() throws InterruptedExcepti } } + @Test + public void testLimit() throws Exception { + client.configMaps().delete(); + + client.configMaps().create(new ConfigMapBuilder().withNewMetadata().withName("my-map1").endMetadata().build()); + client.configMaps().create(new ConfigMapBuilder().withNewMetadata().withName("my-map2").endMetadata().build()); + + CountDownLatch addEvents = new CountDownLatch(2); + + TestResourceEventHandler eventHandler = new TestResourceEventHandler<>(addEvents, new CountDownLatch(1)); + + SharedIndexInformer informer = client.configMaps().withLimit(1L).inform(eventHandler, RESYNC_PERIOD); + + try { + // When + addEvents.await(30000, TimeUnit.MILLISECONDS); + + // Then + assertThat(addEvents.getCount()).isZero(); + } finally { + informer.stop(); + } + } + private static class TestResourceEventHandler implements ResourceEventHandler { private final CountDownLatch addEventRecievedLatch; private final CountDownLatch updateEventRecievedLatch; diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java index cde01b87386..19d2158c31d 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java @@ -275,4 +275,58 @@ public void onUpdate(Pod oldObj, Pod newObj) { informer.stop(); } + @Test + void testListLimit() throws InterruptedException { + // Given + Pod pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1") + .withResourceVersion("1").endMetadata().build(); + + Pod pod2 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod2") + .withResourceVersion("2").endMetadata().build(); + + server.expect() + .withPath("/api/v1/namespaces/test/pods?limit=1") + .andReturn(HttpURLConnection.HTTP_OK, + new PodListBuilder().withNewMetadata().withResourceVersion("2").withContinue("x").endMetadata().withItems(pod1).build()) + .once(); + + server.expect() + .withPath("/api/v1/namespaces/test/pods?limit=1&continue=x") + .andReturn(HttpURLConnection.HTTP_OK, + new PodListBuilder().withNewMetadata().withResourceVersion("2").endMetadata().withItems(pod2).build()) + .once(); + + server.expect() + .withPath("/api/v1/namespaces/test/pods?resourceVersion=2&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .done() + .once(); + final CountDownLatch addLatch = new CountDownLatch(2); + final ResourceEventHandler handler = new ResourceEventHandler() { + + @Override + public void onAdd(Pod obj) { + addLatch.countDown(); + } + + @Override + public void onDelete(Pod obj, boolean deletedFinalStateUnknown) { + + } + + @Override + public void onUpdate(Pod oldObj, Pod newObj) { + + } + + }; + // When + SharedIndexInformer informer = client.pods().withLimit(1L).inform(handler); + + assertTrue(addLatch.await(10, TimeUnit.SECONDS)); + + informer.stop(); + } + }