Skip to content

Commit

Permalink
fix #4081 #4082: correcting/removing versionable
Browse files Browse the repository at this point in the history
also improving informOnCondition
  • Loading branch information
shawkins authored and manusa committed Apr 28, 2022
1 parent d77e3ab commit 03ec3b1
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 71 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#### Bugs
* Fix #3832 #1883: simplifying the isHttpsAvailable check
* Fix #3745: the client will throw better exceptions when a namespace is not discernible for an operation
* Fix #4081: moving Versionable.withResourceVersion to a method on WatchAndWaitable and removing Waitable from the return type

#### Improvements
* Remove `setIntVal`, `setStrVal`, `setKind` setters from `IntOrString` class to avoid invalid combinations
Expand All @@ -12,6 +13,7 @@
* Fix #1285: removed references to manually calling registerCustomKind
* Fix #3334: adding basic support for server side apply. Use patch(PatchContext.of(PatchType.SERVER_SIDE_APPLY), service), or new PatchContext.Builder().withPatchType(PatchType.SERVER_SIDE_APPLY).withForce(true).build() to override conflicts.
* Fix #3969: relist will not trigger sync events
* Fix #4082: improving informOnCondition to test the initial list instead of individual add events
* Fix #3968: SharedIndexInformer.initialState can be used to set the store state before the informer starts.
SharedIndexInformer allows for the addition and removal of indexes even after starting, and you can remove the default namespace index if you wish.
And Store.getKey can be used rather than directly referencing static Cache functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.Watcher;

import java.util.stream.Stream;

public interface FilterWatchListDeletable<T, L, R>
extends Filterable<FilterWatchListDeletable<T, L, R>>, Watchable<Watcher<T>>, Versionable<WatchAndWaitable<T>>, Listable<L>,
extends Filterable<FilterWatchListDeletable<T, L, R>>, Listable<L>,
WatchAndWaitable<T>,
DeletableWithOptions,
Informable<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public interface Resource<T> extends
FromServerGettable<T>,
WatchAndWaitable<T>, Versionable<WatchAndWaitable<T>>,
WatchAndWaitable<T>,
WritableOperation<T>,
DryRunable<WritableOperation<T>>,
Informable<T> {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@
import io.fabric8.kubernetes.client.Watcher;

public interface WatchAndWaitable<T> extends Watchable<Watcher<T>>, Waitable<T, T> {

Watchable<Watcher<T>> withResourceVersion(String resourceVersion);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.fabric8.kubernetes.client.dsl.Informable;
import io.fabric8.kubernetes.client.dsl.ReplaceDeletable;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.WatchAndWaitable;
import io.fabric8.kubernetes.client.dsl.Watchable;
import io.fabric8.kubernetes.client.dsl.WritableOperation;
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
Expand Down Expand Up @@ -91,7 +91,7 @@ public WritableOperation<T> dryRun() {
}

@Override
public WatchAndWaitable<T> withResourceVersion(String resourceVersion) {
public Watchable<Watcher<T>> withResourceVersion(String resourceVersion) {
return resource.withResourceVersion(resourceVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.cache.ItemStore;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,18 @@ public interface ItemStore<V> {

V get(String key);

/**
* Used to determine if initial add events can be deferred until
* the entire list operation has completed - when using a limit
* it may take several batches to complete.
* <br>
* If false, then the initial add events must be processed as they
* occur - meaning that the store state may not be complete.
*
* @return
*/
default boolean isFullState() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public V get(String key) {
return restore(key, store.get(key));
}

public String getResourceVersion(String key) {
return (String) store.getOrDefault(key, new Object[1])[0];
}

@Override
public int size() {
return store.size();
Expand All @@ -182,4 +186,9 @@ public String getKey(V obj) {
return this.keyState.keyFunction.apply(obj);
}

@Override
public boolean isFullState() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void testStoreRestore() {

assertNull(store.put("x", pod));
assertNotNull(store.get("x"));
assertEquals("2", store.getResourceVersion("x"));
assertEquals(1, store.size());
assertNotNull(store.remove("x"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -795,7 +794,7 @@ public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUn
if (l.isEmpty()) {
return condition.test(null);
}
return condition.test(l.get(0));
return l.stream().allMatch(condition);
});

if (!Utils.waitUntilReady(futureCondition, amount, timeUnit)) {
Expand All @@ -812,7 +811,6 @@ public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUn
@Override
public CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition) {
CompletableFuture<List<T>> future = new CompletableFuture<>();
AtomicReference<Runnable> tester = new AtomicReference<>();

// create an informer that supplies the tester with events and empty list handling
SharedIndexInformer<T> informer = this.createInformer(0);
Expand All @@ -821,41 +819,38 @@ public CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition
future.whenComplete((r, t) -> informer.stop());

// use the cache to evaluate the list predicate, trapping any exceptions
Runnable test = () -> {
Consumer<List<T>> test = list -> {
try {
// could skip if lastResourceVersion has not changed
List<T> list = informer.getStore().list();
if (condition.test(list)) {
future.complete(list);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
};
tester.set(test);

informer.addEventHandler(new ResourceEventHandler<T>() {
@Override
public void onAdd(T obj) {
test.run();
test.accept(informer.getStore().list());
}

@Override
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
test.run();
test.accept(informer.getStore().list());
}

@Override
public void onUpdate(T oldObj, T newObj) {
test.run();
test.accept(informer.getStore().list());
}

@Override
public void onNothing() {
test.run();
test.accept(informer.getStore().list());
}
});
informer.run();
}).run();
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,8 @@ public synchronized void removeIndexer(String name) {
this.indexers.remove(name);
}

public boolean isFullState() {
return items.isFullState();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.impl.cache.ProcessorListener.Notification;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
Expand All @@ -30,6 +33,8 @@ public class ProcessorStore<T extends HasMetadata> implements SyncableStore<T> {

private CacheImpl<T> cache;
private SharedProcessor<T> processor;
private AtomicBoolean synced = new AtomicBoolean();
private List<String> deferredAdd = new ArrayList<>();

public ProcessorStore(CacheImpl<T> cache, SharedProcessor<T> processor) {
this.cache = cache;
Expand All @@ -42,14 +47,30 @@ public void add(T obj) {
}

@Override
public void update(T obj) {
public void update(List<T> items) {
items.stream().map(this::updateInternal).filter(Objects::nonNull).forEach(n -> this.processor.distribute(n, false));
}

private Notification<T> updateInternal(T obj) {
T oldObj = this.cache.put(obj);
Notification<T> notification = null;
if (oldObj != null) {
if (!Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion())) {
this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj), false);
notification = new ProcessorListener.UpdateNotification<>(oldObj, obj);
}
} else if (synced.get() || !cache.isFullState()) {
notification = new ProcessorListener.AddNotification<>(obj);
} else {
this.processor.distribute(new ProcessorListener.AddNotification<>(obj), false);
deferredAdd.add(getKey(obj));
}
return notification;
}

@Override
public void update(T obj) {
Notification<T> notification = updateInternal(obj);
if (notification != null) {
this.processor.distribute(notification, false);
}
}

Expand Down Expand Up @@ -83,6 +104,11 @@ public T getByKey(String key) {

@Override
public void retainAll(Set<String> nextKeys) {
if (synced.compareAndSet(false, true)) {
deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull)
.forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false));
deferredAdd.clear();
}
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ private CompletableFuture<L> processList(Set<String> nextKeys, String continueVa
return futureResult.thenCompose(result -> {
result.getItems().forEach(i -> {
String key = store.getKey(i);
// process the updates immediately so we don't need to hold the item
store.update(i);
nextKeys.add(key);
});
store.update(result.getItems());
String nextContinueVal = result.getMetadata().getContinue();
if (Utils.isNotNullOrEmpty(nextContinueVal)) {
return processList(nextKeys, nextContinueVal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.fabric8.kubernetes.client.informers.cache.Store;

import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -58,4 +59,11 @@ public interface SyncableStore<T> extends Store<T> {
*/
void retainAll(Set<String> nextKeys);

/**
* Process a batch of updates
*
* @param items
*/
void update(List<T> items);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.impl.cache.ProcessorListener.AddNotification;
import io.fabric8.kubernetes.client.informers.impl.cache.ProcessorListener.DeleteNotification;
import io.fabric8.kubernetes.client.informers.impl.cache.ProcessorListener.Notification;
Expand All @@ -25,8 +26,10 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -84,12 +87,14 @@ void testSyncEvents() {

ProcessorStore<Pod> processorStore = new ProcessorStore<>(podCache, processor);

Pod pod = new PodBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Pod pod = new PodBuilder().withNewMetadata().withName("pod1").withResourceVersion("1").endMetadata().build();
Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").withResourceVersion("2").endMetadata().build();

// replace empty store with two values
processorStore.add(pod);
processorStore.add(pod2);
List<Pod> pods = Arrays.asList(pod, pod2);
processorStore.update(pods);

processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()));

// resync two values
processorStore.resync();
Expand Down
Loading

0 comments on commit 03ec3b1

Please sign in to comment.