Skip to content

Commit

Permalink
fix fabric8io#3587: adding a LongWatch for lack of a better name
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Nov 24, 2021
1 parent de44680 commit 6d99576
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.fabric8.kubernetes.client;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/**
* proof of concept of something between a watch and an informer.
*
* Due to several design issues I haven't heavily refactored the informer to support this logic yet
* It also does not protect against exceptions when calling the actual watcher
*/
public class LongWatch<T extends HasMetadata> implements Watcher<T>, Watch {

/**
* Similar to a watch, but must account for a delete where the final state is not known
*/
public interface LongWatcher<T> extends Watcher<T> {

void deletedStateUnknown(String key, String resourceVersion);

@Override
default void onClose(WatcherException cause) {
LoggerFactory.getLogger(Watcher.class).warn("Watcher closed with error", cause);
}

}

public static <T extends HasMetadata> String getUidKey(T resource) {
return resource.getMetadata().getUid();
}

private ListerWatcher<T, ? extends KubernetesResourceList<T>> listerWatcher;
private Long batchSize;
private Function<T, String> keyFunction;
private LongWatcher<T> watcher;

private AtomicBoolean closed = new AtomicBoolean();
private volatile Watch watch;
private volatile ConcurrentHashMap<String, String> resources = new ConcurrentHashMap<>();

public LongWatch(ListerWatcher<T, ? extends KubernetesResourceList<T>> listerWatcher, Long batchSize, Function<T, String> keyFunction, LongWatcher<T> watcher) {
this.listerWatcher = listerWatcher;
this.batchSize = batchSize;
this.keyFunction = keyFunction;
this.watcher = watcher;
listAndWatch();
}

@Override
public void eventReceived(Action action, T resource) {
String resourceVersion = resource.getMetadata().getResourceVersion();
// the choice of key is based upon whether anyone could have access to the cache - uid is less useful
String key = keyFunction.apply(resource);
switch (action) {
case ADDED:
case MODIFIED:
updated(resource, resourceVersion, key);
break;
case DELETED:
if (resources.remove(key) != null) {
watcher.eventReceived(Action.DELETED, resource);
}
break;
case BOOKMARK:
break;
default:
// pass through
watcher.eventReceived(action, resource);
break;
}
}

private void updated(T resource, String resourceVersion, String key) {
String previous = resources.put(key, resourceVersion);
if (previous == null) {
watcher.eventReceived(Action.ADDED, resource);
} else if (!Objects.equals(resourceVersion, previous)) {
watcher.eventReceived(Action.MODIFIED, resource);
}
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
watch.close();
watcher.onClose();
}
}

@Override
public void onClose(WatcherException cause) {
if (closed.get()) {
return;
}
if (!cause.isHttpGone()) {
if (closed.compareAndSet(false, true)) {
watcher.onClose(cause);
}
return;
}
listAndWatch();
if (closed.get()) {
watch.close();
}
}

/**
* Since we don't expose the resources, they are modified as needed
*/
private void listAndWatch() {
ConcurrentHashMap<String, String> next = new ConcurrentHashMap<>();
String continueVal = null;
String resourceVersion = null;

// if there's an exception while we do this, we'll simply retry
// could consider increasing the batch size for the next run
do {
KubernetesResourceList<T> result = listerWatcher.list(new ListOptionsBuilder().withLimit(batchSize).withContinue(continueVal).build());
result.getItems().forEach(i -> {
String key = keyFunction.apply(i);
String itemResourceVersion = i.getMetadata().getResourceVersion();
// process the updates immediately so we don't need to hold the item
updated(i, itemResourceVersion, key);
next.put(key, itemResourceVersion);
});
resourceVersion = result.getMetadata().getResourceVersion();
continueVal = result.getMetadata().getContinue();
} while (Utils.isNotNullOrEmpty(continueVal));

// process the special case deletes
resources.keySet().removeAll(next.keySet());
resources.forEach((k, v) -> this.watcher.deletedStateUnknown(k, v));

resources = next;

watch = listerWatcher.watch(
new ListOptionsBuilder().withResourceVersion(resourceVersion).withAllowWatchBookmarks(true).build(), this);
}

@Override
public boolean reconnecting() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public interface ListerWatcher<T, L> {
Watch watch(ListOptions params, Watcher<T> watcher);

L list();
L list(ListOptions listOptions);

String getNamespace();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
Expand Down Expand Up @@ -50,7 +51,7 @@ public Reflector(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, Synca
}

protected L getList() {
return listerWatcher.list();
return listerWatcher.list(new ListOptions());
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ReflectorTest {
void testStateFlags() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.list()).thenReturn(list);
Mockito.when(mock.list(Mockito.any())).thenReturn(list);

Reflector<Pod, PodList> reflector =
new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class));
Expand Down Expand Up @@ -70,7 +70,7 @@ void testStateFlags() {
void testNonHttpGone() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.list()).thenReturn(list);
Mockito.when(mock.list(Mockito.any())).thenReturn(list);

Reflector<Pod, PodList> reflector =
new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class));
Expand Down

0 comments on commit 6d99576

Please sign in to comment.