Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #3587: adding inform support for limit/batch fetching #3753

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#### Improvements
* Fix #3674: allows the connect and websocket timeouts to apply to watches instead of a hardcoded timeout
* Fix #3651: Introduce SchemaFrom annotation as escape hatch (CRD Generator)
* Fix #3587: adding inform support for limit/batch fetching
* Fix #3734: extract static finalizer validation method

#### Dependency Upgrade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ public interface Informable<T> {
*/
Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Set the limit to the number of resources to list at one time. This means that longer
* lists will take multiple requests to fetch.
* <p>If the list fails to complete it will be re-attempted with this limit, rather than
* falling back to the full list. You should ensure that your handlers are either async or
* fast acting to prevent long delays in list processing that may cause results to expire
* before reaching the end of the list.
* <p>WARNING As noted in the go client: "paginated lists are always served directly from
* etcd, which is significantly less efficient and may lead to serious performance and
* scalability problems."
* @param limit of a items in a list fetch
* @return the current {@link Informable}
*/
Informable<T> withLimit(Long limit);

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
protected String apiVersion;

protected Class<L> listType;
// informable state
private Map<String, Function<T, List<String>>> indexers;
private Long limit;

protected BaseOperation(OperationContext ctx) {
super(ctx);
Expand Down Expand Up @@ -404,7 +406,7 @@ public L list() {

@Override
public L list(Integer limitVal, String continueVal) {
return list(new ListOptionsBuilder().withLimit(Long.parseLong(limitVal.toString())).withContinue(continueVal).build());
return list(new ListOptionsBuilder().withLimit(limitVal.longValue()).withContinue(continueVal).build());
}

@Override
Expand Down Expand Up @@ -931,8 +933,22 @@ public WritableOperation<T> dryRun(boolean isDryRun) {
public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers) {
BaseOperation<T, L, R> result = newInstance(context);
result.indexers = indexers;
result.limit = this.limit;
return result;
}

@Override
public BaseOperation<T, L, R> withLimit(Long limit) {
BaseOperation<T, L, R> result = newInstance(context);
result.indexers = this.indexers;
result.limit = limit;
return result;
}

@Override
public Long getLimit() {
return this.limit;
}

@Override
public SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler, long resync) {
Expand All @@ -958,7 +974,7 @@ private DefaultSharedIndexInformer<T, L> createInformer(long resync) {
}

// use the local context / namespace but without a resourceVersion
DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), this.withResourceVersion(null), resync, Runnable::run); // just run the event notification in the websocket thread
DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), this.withResourceVersion(null).withLimit(this.limit), resync, Runnable::run); // just run the event notification in the websocket thread
if (indexers != null) {
informer.addIndexers(indexers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
public interface ListerWatcher<T, L> {
Watch watch(ListOptions params, Watcher<T> watcher);

L list();
L list(ListOptions listOptions);

Long getLimit();

String getNamespace();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
Expand All @@ -45,7 +44,8 @@ public void add(T obj) {
public void update(T obj) {
T oldObj = this.cache.put(obj);
if (oldObj != null) {
this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj), false);
this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj),
Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion()));
} else {
this.processor.distribute(new ProcessorListener.AddNotification<>(obj), false);
}
Expand Down Expand Up @@ -80,26 +80,24 @@ public T getByKey(String key) {
}

@Override
public void replace(List<T> list) {
Map<String, T> oldState = cache.replace(list);

if (list.isEmpty() && oldState.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
public void retainAll(Set<String> nextKeys) {
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
return;
}

// now that the store is up-to-date, process the notifications
for (T newValue : list) {
T old = oldState.remove(cache.getKey(newValue));
if (old == null) {
this.processor.distribute(new ProcessorListener.AddNotification<>(newValue), false);
} else {
boolean same = Objects.equals(KubernetesResourceUtil.getResourceVersion(old), KubernetesResourceUtil.getResourceVersion(newValue));
this.processor.distribute(new ProcessorListener.UpdateNotification<>(old, newValue), same);
current.forEach(v -> {
String key = cache.getKey(v);
if (!nextKeys.contains(key)) {
cache.remove(v);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
}
}
// deletes are not marked as sync=true in keeping with the old code
oldState.values()
.forEach(old -> this.processor.distribute(new ProcessorListener.DeleteNotification<>(old, true), false));
});
}

@Override
public String getKey(T obj) {
return cache.getKey(obj);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T>> {
Expand All @@ -49,10 +52,6 @@ public Reflector(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, Synca
this.watch = new AtomicReference<>(null);
}

protected L getList() {
return listerWatcher.list();
}

public void stop() {
running = false;
stopWatcher();
Expand All @@ -74,11 +73,26 @@ private synchronized void stopWatcher() {
*/
public void listSyncAndWatch() {
running = true;
final L list = getList();
final String latestResourceVersion = list.getMetadata().getResourceVersion();
KubernetesResourceList<T> result = null;
String continueVal = null;
Set<String> nextKeys = new LinkedHashSet<>();
do {
result = listerWatcher
.list(new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal).build());
result.getItems().forEach(i -> {
String key = store.getKey(i);
// process the updates immediately so we don't need to hold the item
store.update(i);
nextKeys.add(key);
});
continueVal = result.getMetadata().getContinue();
} while (Utils.isNotNullOrEmpty(continueVal));

store.retainAll(nextKeys);

final String latestResourceVersion = result.getMetadata().getResourceVersion();
lastSyncResourceVersion = latestResourceVersion;
log.debug("Listing items ({}) for resource {} v{}", list.getItems().size(), apiTypeClass, latestResourceVersion);
store.replace(list.getItems());
log.debug("Listing items ({}) for resource {} v{}", nextKeys.size(), apiTypeClass, latestResourceVersion);
startWatcher(latestResourceVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers.cache;

import java.util.List;
import java.util.Set;

public interface SyncableStore<T> extends Store<T> {

Expand All @@ -41,17 +41,21 @@ public interface SyncableStore<T> extends Store<T> {
void delete(T obj);

/**
* Deletes the contents of the store, using instead the given list.
* Store takes ownership of the list, you should not reference it
* after calling this function
*
* @param list list of objects
* Sends a resync event for each item.
*/
void replace(List<T> list);
void resync();

/**
* Get the key for the given object
* @param obj object
* @return the key
*/
String getKey(T obj);

/**
* Sends a resync event for each item.
* Retain only the values with keys in the given set
* @param nextKeys to retain
*/
void resync();
void retainAll(Set<String> nextKeys);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -70,7 +69,12 @@ public void testEvents() {
assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class);
assertThat(notifications.get(3)).isInstanceOf(DeleteNotification.class);

assertTrue(syncCaptor.getAllValues().stream().allMatch(s->!s.booleanValue()));
List<Boolean> syncValues = syncCaptor.getAllValues();

assertThat(syncValues.get(0)).isFalse();
assertThat(syncValues.get(1)).isFalse();
assertThat(syncValues.get(2)).isTrue(); // same object/revision, so it's sync
assertThat(syncValues.get(3)).isFalse();
}

@Test
Expand All @@ -86,13 +90,14 @@ public void testSyncEvents() {
Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").endMetadata().build();

// replace empty store with two values
processorStore.replace(Arrays.asList(pod, pod2));
processorStore.add(pod);
processorStore.add(pod2);

// resync two values
processorStore.resync();

// relist with deletes
processorStore.replace(Collections.emptyList());
processorStore.retainAll(Collections.emptySet());

Mockito.verify(processor, Mockito.times(6)).distribute(notificationCaptor.capture(), syncCaptor.capture());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ReflectorTest {
void testStateFlags() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.list()).thenReturn(list);
Mockito.when(mock.list(Mockito.any())).thenReturn(list);

Reflector<Pod, PodList> reflector =
new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class));
Expand Down Expand Up @@ -70,7 +70,7 @@ void testStateFlags() {
void testNonHttpGone() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.list()).thenReturn(list);
Mockito.when(mock.list(Mockito.any())).thenReturn(list);

Reflector<Pod, PodList> reflector =
new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.DisplayName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,30 @@ public void testSharedIndexInformerGetsSingleUpdates() throws InterruptedExcepti
}
}

@Test
public void testLimit() throws Exception {
client.configMaps().delete();

client.configMaps().create(new ConfigMapBuilder().withNewMetadata().withName("my-map1").endMetadata().build());
client.configMaps().create(new ConfigMapBuilder().withNewMetadata().withName("my-map2").endMetadata().build());

CountDownLatch addEvents = new CountDownLatch(2);

TestResourceEventHandler<HasMetadata> eventHandler = new TestResourceEventHandler<>(addEvents, new CountDownLatch(1));

SharedIndexInformer<ConfigMap> informer = client.configMaps().withLimit(1L).inform(eventHandler, RESYNC_PERIOD);

try {
// When
addEvents.await(30000, TimeUnit.MILLISECONDS);

// Then
assertThat(addEvents.getCount()).isZero();
} finally {
informer.stop();
}
}

private static class TestResourceEventHandler<T extends HasMetadata> implements ResourceEventHandler<T> {
private final CountDownLatch addEventRecievedLatch;
private final CountDownLatch updateEventRecievedLatch;
Expand Down
Loading