Skip to content

Commit

Permalink
fix: defer watch until the initial processing of list is complete
Browse files Browse the repository at this point in the history
closes: #5953

Signed-off-by: Steve Hawkins <shawkins@redhat.com>
  • Loading branch information
shawkins authored May 21, 2024
1 parent c5b1920 commit c470c55
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 95 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* Fix #5867: (java-generator) Add JsonFormat shape to date-time
* Fix #5954: (crd-generator) Sort required properties to ensure deterministic output
* Fix #5973: CacheImpl locking for reading indexes (Cache.byIndex|indexKeys|index) was reduced
* Fix #5953: Made informer watch starting deterministic with respect to list processing

#### Dependency Upgrade
* Fix #5695: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.30.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
*/
public class ProcessorStore<T extends HasMetadata> implements SyncableStore<T> {
public class ProcessorStore<T extends HasMetadata> {

private CacheImpl<T> cache;
private SharedProcessor<T> processor;
Expand All @@ -40,12 +42,10 @@ public ProcessorStore(CacheImpl<T> cache, SharedProcessor<T> processor) {
this.processor = processor;
}

@Override
public void add(T obj) {
update(obj);
}

@Override
public void update(List<T> items) {
items.stream().map(this::updateInternal).filter(Objects::nonNull).forEach(n -> this.processor.distribute(n, false));
}
Expand All @@ -65,44 +65,37 @@ private Notification<T> updateInternal(T obj) {
return notification;
}

@Override
public void update(T obj) {
Notification<T> notification = updateInternal(obj);
if (notification != null) {
this.processor.distribute(notification, false);
}
}

@Override
public void delete(T obj) {
Object oldObj = this.cache.remove(obj);
if (oldObj != null) {
this.processor.distribute(new ProcessorListener.DeleteNotification<>(obj, false), false);
}
}

@Override
public List<T> list() {
return cache.list();
}

@Override
public List<String> listKeys() {
return cache.listKeys();
}

@Override
public T get(T object) {
return cache.get(object);
}

@Override
public T getByKey(String key) {
return cache.getByKey(key);
}

@Override
public void retainAll(Set<String> nextKeys) {
public void retainAll(Set<String> nextKeys, Consumer<Executor> cacheStateComplete) {
if (synced.compareAndSet(false, true)) {
deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull)
.forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false));
Expand All @@ -111,7 +104,6 @@ public void retainAll(Set<String> nextKeys) {
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
return;
}
current.forEach(v -> {
String key = cache.getKey(v);
Expand All @@ -120,14 +112,15 @@ public void retainAll(Set<String> nextKeys) {
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
}
});
if (cacheStateComplete != null) {
cacheStateComplete.accept(this.processor::execute);
}
}

@Override
public String getKey(T obj) {
return cache.getKey(obj);
}

@Override
public void resync() {
// lock to ensure the ordering wrt other events
synchronized (cache.getLockObject()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T

private volatile String lastSyncResourceVersion;
private final ListerWatcher<T, L> listerWatcher;
private final SyncableStore<T> store;
private final ProcessorStore<T> store;
private final ReflectorWatcher watcher;
private volatile boolean watching;
private volatile CompletableFuture<AbstractWatchManager<T>> watchFuture;
Expand All @@ -64,11 +64,11 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T

private boolean cachedListing = true;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
public Reflector(ListerWatcher<T, L> listerWatcher, ProcessorStore<T> store) {
this(listerWatcher, store, Runnable::run);
}

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store, Executor executor) {
public Reflector(ListerWatcher<T, L> listerWatcher, ProcessorStore<T> store, Executor executor) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher();
Expand Down Expand Up @@ -124,11 +124,19 @@ public CompletableFuture<Void> listSyncAndWatch() {
}
Set<String> nextKeys = new ConcurrentSkipListSet<>();
CompletableFuture<Void> theFuture = processList(nextKeys, null).thenCompose(result -> {
store.retainAll(nextKeys);
final String latestResourceVersion = result.getMetadata().getResourceVersion();
lastSyncResourceVersion = latestResourceVersion;
log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion);
return startWatcher(latestResourceVersion);
CompletableFuture<?> cf = new CompletableFuture<>();
store.retainAll(nextKeys, executor -> {
boolean startWatchImmediately = cachedListing && lastSyncResourceVersion == null;
lastSyncResourceVersion = latestResourceVersion;
if (startWatchImmediately) {
cf.complete(null);
} else {
executor.execute(() -> cf.complete(null));
}
});
return cf.thenCompose(ignored -> startWatcher(latestResourceVersion));
}).thenAccept(w -> {
if (w != null) {
if (!isStopped()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,8 @@ public ProcessorListener<T> addProcessorListener(ResourceEventHandler<? super T>
lock.writeLock().unlock();
}
}

public void execute(Runnable runnable) {
this.executor.execute(runnable);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ void testSyncEvents() {
List<Pod> pods = Arrays.asList(pod, pod2);
processorStore.update(pods);

processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()));
processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()), null);

// resync two values
processorStore.resync();

// relist with deletes
processorStore.retainAll(Collections.emptySet());
processorStore.retainAll(Collections.emptySet(), null);

Mockito.verify(processor, Mockito.times(6)).distribute(notificationCaptor.capture(), syncCaptor.capture());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.exceptions.verification.TooFewActualInvocations;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -42,13 +44,24 @@

class ReflectorTest {

private ProcessorStore<Pod> mockStore;

@BeforeEach
void setup() {
mockStore = Mockito.mock(ProcessorStore.class);
Mockito.doAnswer(invocation -> {
((Consumer<Executor>) invocation.getArguments()[1]).accept(Runnable::run);
return null;
}).when(mockStore).retainAll(Mockito.anySet(),
Mockito.any());
}

@Test
void testStateFlags() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

SyncableStore<Pod> mockStore = Mockito.mock(SyncableStore.class);
Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, mockStore) {
@Override
protected void reconnect() {
Expand Down Expand Up @@ -95,7 +108,7 @@ void testNotRunningAfterStartError() {
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, Mockito.mock(SyncableStore.class));
Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, mockStore);

// throw an exception, then watch normally
Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any()))
Expand All @@ -115,7 +128,7 @@ void testNonHttpGone() {
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class));
Reflector<Pod, PodList> reflector = new Reflector<>(mock, mockStore);

Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any()))
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(AbstractWatchManager.class)));
Expand Down Expand Up @@ -143,7 +156,7 @@ void testTimeout() {
return null;
}).when(ex).execute(Mockito.any(Runnable.class));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class), ex);
Reflector<Pod, PodList> reflector = new Reflector<>(mock, mockStore, ex);
reflector.setMinTimeout(1);

AbstractWatchManager manager = Mockito.mock(AbstractWatchManager.class);
Expand Down

0 comments on commit c470c55

Please sign in to comment.