Skip to content

Commit

Permalink
fix #3587: adding inform support for limit/batch fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Jan 20, 2022
1 parent c50615a commit 36d6731
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#### Improvements
* Fix #3674: allows the connect and websocket timeouts to apply to watches instead of a hardcoded timeout
* Fix #3651: Introduce SchemaFrom annotation as escape hatch (CRD Generator)
* Fix #3587: adding inform support for limit/batch fetching
* Fix #3734: extract static finalizer validation method

#### Dependency Upgrade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ public interface Informable<T> {
*/
Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Set the limit to the number of resources to list at one time. This means that longer
* lists will take multiple requests to fetch.
* <p>If the list fails to complete it will be re-attempted with this limit, rather than
* falling back to the full list. You should ensure that your handlers are either async or
* fast acting to prevent long delays in list processing that may cause results to expire
* before reaching the end of the list.
* <p>WARNING As noted in the go client: "paginated lists are always served directly from
* etcd, which is significantly less efficient and may lead to serious performance and
* scalability problems."
* @param limit of a items in a list fetch
* @return the current {@link Informable}
*/
Informable<T> withLimit(Long limit);

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
protected String apiVersion;

protected Class<L> listType;
// informable state
private Map<String, Function<T, List<String>>> indexers;
private Long limit;

protected BaseOperation(OperationContext ctx) {
super(ctx);
Expand Down Expand Up @@ -404,7 +406,7 @@ public L list() {

@Override
public L list(Integer limitVal, String continueVal) {
return list(new ListOptionsBuilder().withLimit(Long.parseLong(limitVal.toString())).withContinue(continueVal).build());
return list(new ListOptionsBuilder().withLimit(limitVal.longValue()).withContinue(continueVal).build());
}

@Override
Expand Down Expand Up @@ -931,8 +933,22 @@ public WritableOperation<T> dryRun(boolean isDryRun) {
public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers) {
BaseOperation<T, L, R> result = newInstance(context);
result.indexers = indexers;
result.limit = this.limit;
return result;
}

@Override
public BaseOperation<T, L, R> withLimit(Long limit) {
BaseOperation<T, L, R> result = newInstance(context);
result.indexers = this.indexers;
result.limit = limit;
return result;
}

@Override
public Long getLimit() {
return this.limit;
}

@Override
public SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler, long resync) {
Expand All @@ -958,7 +974,7 @@ private DefaultSharedIndexInformer<T, L> createInformer(long resync) {
}

// use the local context / namespace but without a resourceVersion
DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), this.withResourceVersion(null), resync, Runnable::run); // just run the event notification in the websocket thread
DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), this.withResourceVersion(null).withLimit(this.limit), resync, Runnable::run); // just run the event notification in the websocket thread
if (indexers != null) {
informer.addIndexers(indexers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
public interface ListerWatcher<T, L> {
Watch watch(ListOptions params, Watcher<T> watcher);

L list();
L list(ListOptions listOptions);

Long getLimit();

String getNamespace();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
Expand All @@ -45,7 +44,8 @@ public void add(T obj) {
public void update(T obj) {
T oldObj = this.cache.put(obj);
if (oldObj != null) {
this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj), false);
this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj),
Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion()));
} else {
this.processor.distribute(new ProcessorListener.AddNotification<>(obj), false);
}
Expand Down Expand Up @@ -80,26 +80,24 @@ public T getByKey(String key) {
}

@Override
public void replace(List<T> list) {
Map<String, T> oldState = cache.replace(list);

if (list.isEmpty() && oldState.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
public void retainAll(Set<String> nextKeys) {
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
return;
}

// now that the store is up-to-date, process the notifications
for (T newValue : list) {
T old = oldState.remove(cache.getKey(newValue));
if (old == null) {
this.processor.distribute(new ProcessorListener.AddNotification<>(newValue), false);
} else {
boolean same = Objects.equals(KubernetesResourceUtil.getResourceVersion(old), KubernetesResourceUtil.getResourceVersion(newValue));
this.processor.distribute(new ProcessorListener.UpdateNotification<>(old, newValue), same);
current.forEach(v -> {
String key = cache.getKey(v);
if (!nextKeys.contains(key)) {
cache.remove(v);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
}
}
// deletes are not marked as sync=true in keeping with the old code
oldState.values()
.forEach(old -> this.processor.distribute(new ProcessorListener.DeleteNotification<>(old, true), false));
});
}

@Override
public String getKey(T obj) {
return cache.getKey(obj);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T>> {
Expand All @@ -49,10 +52,6 @@ public Reflector(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, Synca
this.watch = new AtomicReference<>(null);
}

protected L getList() {
return listerWatcher.list();
}

public void stop() {
running = false;
stopWatcher();
Expand All @@ -74,11 +73,26 @@ private synchronized void stopWatcher() {
*/
public void listSyncAndWatch() {
running = true;
final L list = getList();
final String latestResourceVersion = list.getMetadata().getResourceVersion();
KubernetesResourceList<T> result = null;
String continueVal = null;
Set<String> nextKeys = new LinkedHashSet<>();
do {
result = listerWatcher
.list(new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal).build());
result.getItems().forEach(i -> {
String key = store.getKey(i);
// process the updates immediately so we don't need to hold the item
store.update(i);
nextKeys.add(key);
});
continueVal = result.getMetadata().getContinue();
} while (Utils.isNotNullOrEmpty(continueVal));

store.retainAll(nextKeys);

final String latestResourceVersion = result.getMetadata().getResourceVersion();
lastSyncResourceVersion = latestResourceVersion;
log.debug("Listing items ({}) for resource {} v{}", list.getItems().size(), apiTypeClass, latestResourceVersion);
store.replace(list.getItems());
log.debug("Listing items ({}) for resource {} v{}", nextKeys.size(), apiTypeClass, latestResourceVersion);
startWatcher(latestResourceVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers.cache;

import java.util.List;
import java.util.Set;

public interface SyncableStore<T> extends Store<T> {

Expand All @@ -41,17 +41,21 @@ public interface SyncableStore<T> extends Store<T> {
void delete(T obj);

/**
* Deletes the contents of the store, using instead the given list.
* Store takes ownership of the list, you should not reference it
* after calling this function
*
* @param list list of objects
* Sends a resync event for each item.
*/
void replace(List<T> list);
void resync();

/**
* Get the key for the given object
* @param obj object
* @return the key
*/
String getKey(T obj);

/**
* Sends a resync event for each item.
* Retain only the values with keys in the given set
* @param nextKeys to retain
*/
void resync();
void retainAll(Set<String> nextKeys);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -70,7 +69,12 @@ public void testEvents() {
assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class);
assertThat(notifications.get(3)).isInstanceOf(DeleteNotification.class);

assertTrue(syncCaptor.getAllValues().stream().allMatch(s->!s.booleanValue()));
List<Boolean> syncValues = syncCaptor.getAllValues();

assertThat(syncValues.get(0)).isFalse();
assertThat(syncValues.get(1)).isFalse();
assertThat(syncValues.get(2)).isTrue(); // same object/revision, so it's sync
assertThat(syncValues.get(3)).isFalse();
}

@Test
Expand All @@ -86,13 +90,14 @@ public void testSyncEvents() {
Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").endMetadata().build();

// replace empty store with two values
processorStore.replace(Arrays.asList(pod, pod2));
processorStore.add(pod);
processorStore.add(pod2);

// resync two values
processorStore.resync();

// relist with deletes
processorStore.replace(Collections.emptyList());
processorStore.retainAll(Collections.emptySet());

Mockito.verify(processor, Mockito.times(6)).distribute(notificationCaptor.capture(), syncCaptor.capture());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ReflectorTest {
void testStateFlags() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.list()).thenReturn(list);
Mockito.when(mock.list(Mockito.any())).thenReturn(list);

Reflector<Pod, PodList> reflector =
new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class));
Expand Down Expand Up @@ -70,7 +70,7 @@ void testStateFlags() {
void testNonHttpGone() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.list()).thenReturn(list);
Mockito.when(mock.list(Mockito.any())).thenReturn(list);

Reflector<Pod, PodList> reflector =
new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.DisplayName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,30 @@ public void testSharedIndexInformerGetsSingleUpdates() throws InterruptedExcepti
}
}

@Test
public void testLimit() throws Exception {
client.configMaps().delete();

client.configMaps().create(new ConfigMapBuilder().withNewMetadata().withName("my-map1").endMetadata().build());
client.configMaps().create(new ConfigMapBuilder().withNewMetadata().withName("my-map2").endMetadata().build());

CountDownLatch addEvents = new CountDownLatch(2);

TestResourceEventHandler<HasMetadata> eventHandler = new TestResourceEventHandler<>(addEvents, new CountDownLatch(1));

SharedIndexInformer<ConfigMap> informer = client.configMaps().withLimit(1L).inform(eventHandler, RESYNC_PERIOD);

try {
// When
addEvents.await(30000, TimeUnit.MILLISECONDS);

// Then
assertThat(addEvents.getCount()).isZero();
} finally {
informer.stop();
}
}

private static class TestResourceEventHandler<T extends HasMetadata> implements ResourceEventHandler<T> {
private final CountDownLatch addEventRecievedLatch;
private final CountDownLatch updateEventRecievedLatch;
Expand Down
Loading

0 comments on commit 36d6731

Please sign in to comment.