forked from fabric8io/kubernetes-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix fabric8io#3587: adding a LongWatch for lack of a better name
- Loading branch information
Showing
4 changed files
with
153 additions
and
4 deletions.
There are no files selected for viewing
148 changes: 148 additions & 0 deletions
148
kubernetes-client/src/main/java/io/fabric8/kubernetes/client/LongWatch.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package io.fabric8.kubernetes.client; | ||
|
||
import io.fabric8.kubernetes.api.model.HasMetadata; | ||
import io.fabric8.kubernetes.api.model.KubernetesResourceList; | ||
import io.fabric8.kubernetes.api.model.ListOptionsBuilder; | ||
import io.fabric8.kubernetes.client.informers.ListerWatcher; | ||
import io.fabric8.kubernetes.client.utils.Utils; | ||
|
||
import java.util.Objects; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* proof of concept of something between a watch and an informer. | ||
* | ||
* Due to several design issues I haven't heavily refactored the informer to support this logic yet | ||
* It also does not protect against exceptions when calling the actual watcher | ||
*/ | ||
public class LongWatch<T extends HasMetadata> implements Watcher<T>, Watch { | ||
|
||
/** | ||
* Similar to a watch, but must account for a delete where the final state is not known | ||
*/ | ||
public interface LongWatcher<T> extends Watcher<T> { | ||
|
||
void deletedStateUnknown(String key, String resourceVersion); | ||
|
||
} | ||
|
||
public static <T extends HasMetadata> String getUidKey(T resource) { | ||
return resource.getMetadata().getUid(); | ||
} | ||
|
||
private ListerWatcher<T, ? extends KubernetesResourceList<T>> listerWatcher; | ||
private Long batchSize; | ||
private Function<T, String> keyFunction; | ||
private LongWatcher<T> watcher; | ||
|
||
private AtomicBoolean closed = new AtomicBoolean(); | ||
private volatile Watch watch; | ||
private volatile ConcurrentHashMap<String, String> resources = new ConcurrentHashMap<>(); | ||
|
||
public LongWatch(ListerWatcher<T, ? extends KubernetesResourceList<T>> listerWatcher, Long batchSize, Function<T, String> keyFunction, LongWatcher<T> watcher) { | ||
this.listerWatcher = listerWatcher; | ||
this.batchSize = batchSize; | ||
this.keyFunction = keyFunction; | ||
this.watcher = watcher; | ||
listAndWatch(); | ||
} | ||
|
||
@Override | ||
public void eventReceived(Action action, T resource) { | ||
String resourceVersion = resource.getMetadata().getResourceVersion(); | ||
// the choice of key is based upon whether anyone could have access to the cache - uid is less useful | ||
String key = keyFunction.apply(resource); | ||
switch (action) { | ||
case ADDED: | ||
case MODIFIED: | ||
updated(resource, resourceVersion, key); | ||
break; | ||
case DELETED: | ||
if (resources.remove(key) != null) { | ||
watcher.eventReceived(Action.DELETED, resource); | ||
} | ||
break; | ||
case BOOKMARK: | ||
break; | ||
default: | ||
// pass through | ||
watcher.eventReceived(action, resource); | ||
break; | ||
} | ||
} | ||
|
||
private void updated(T resource, String resourceVersion, String key) { | ||
String previous = resources.put(key, resourceVersion); | ||
if (previous == null) { | ||
watcher.eventReceived(Action.ADDED, resource); | ||
} else if (!Objects.equals(resourceVersion, previous)) { | ||
watcher.eventReceived(Action.MODIFIED, resource); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
if (closed.compareAndSet(false, true)) { | ||
watch.close(); | ||
watcher.onClose(); | ||
} | ||
} | ||
|
||
@Override | ||
public void onClose(WatcherException cause) { | ||
if (closed.get()) { | ||
return; | ||
} | ||
if (!cause.isHttpGone()) { | ||
if (closed.compareAndSet(false, true)) { | ||
watcher.onClose(cause); | ||
} | ||
return; | ||
} | ||
listAndWatch(); | ||
if (closed.get()) { | ||
watch.close(); | ||
} | ||
} | ||
|
||
/** | ||
* Since we don't expose the resources, they are modified as needed | ||
*/ | ||
private void listAndWatch() { | ||
ConcurrentHashMap<String, String> next = new ConcurrentHashMap<>(); | ||
String continueVal = null; | ||
String resourceVersion = null; | ||
|
||
// if there's an exception while we do this, we'll simply retry | ||
// could consider increasing the batch size for the next run | ||
do { | ||
KubernetesResourceList<T> result = listerWatcher.list(new ListOptionsBuilder().withLimit(batchSize).withContinue(continueVal).build()); | ||
result.getItems().forEach(i -> { | ||
String key = keyFunction.apply(i); | ||
String itemResourceVersion = i.getMetadata().getResourceVersion(); | ||
// process the updates immediately so we don't need to hold the item | ||
updated(i, itemResourceVersion, key); | ||
next.put(key, itemResourceVersion); | ||
}); | ||
resourceVersion = result.getMetadata().getResourceVersion(); | ||
continueVal = result.getMetadata().getContinue(); | ||
} while (Utils.isNotNullOrEmpty(continueVal)); | ||
|
||
// process the special case deletes | ||
resources.keySet().removeAll(next.keySet()); | ||
resources.forEach((k, v) -> this.watcher.deletedStateUnknown(k, v)); | ||
|
||
resources = next; | ||
|
||
watch = listerWatcher.watch( | ||
new ListOptionsBuilder().withResourceVersion(resourceVersion).withAllowWatchBookmarks(true).build(), this); | ||
} | ||
|
||
@Override | ||
public boolean reconnecting() { | ||
return true; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters