Skip to content

Commit

Permalink
fix #3968 adding SharedInformer.initialState
Browse files Browse the repository at this point in the history
also allowing for indexers to be added/removed at runtime, and exposing
Store.getKey
  • Loading branch information
shawkins authored and manusa committed Mar 25, 2022
1 parent 18e3895 commit 2f65ca6
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 145 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
* Fix #1285: removed references to manually calling registerCustomKind
* Fix #3334: adding basic support for server side apply. Use patch(PatchContext.of(PatchType.SERVER_SIDE_APPLY), service), or new PatchContext.Builder().withPatchType(PatchType.SERVER_SIDE_APPLY).withForce(true).build() to override conflicts.
* Fix #3969: relist will not trigger sync events
* Fix #3968: SharedInformer.initialState can be used to set the store state before the informer starts.
SharedIndexInformer allows for the addition and removal of indexes even after starting, and you can remove the default namespace index if you wish.
And Store.getKey can be used rather than directly referencing static Cache functions.

#### Dependency Upgrade
* Fix #3788: Point CamelK Extension model to latest released version v1.8.0
Expand Down
2 changes: 2 additions & 0 deletions doc/MIGRATION-v6.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ Client.adapt will no longer perform the isAdaptable check - that is you may free
- Extension specific EnableXXXMockClient and XXXMockServer classes have been deprecated. You can simply use EnableKubernetesMockClient and KubernetesMockServer instead. Dependencies on the xxx-mock jar are then no longer needed, just a dependency to kubernetes-server-mock.
- Client.supportsApiPath and Client.isAdaptable have been deprecated. Please use Client.supports and Client.hasApiGroup as needed.
- Informable.withIndexers has been deprecated. Indexers can be added/removed after the creation of the informer.
## Object Sorting
KubernetesList and Template will no longer automatically sort their objects by default. You may use the HasMetadataComparator to sort the items as needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,68 +29,85 @@
import java.util.function.Predicate;

public interface Informable<T> {

/**
* The indexers to add to {@link SharedInformer}s created by subsequent inform calls;
*
* @param indexers to customize the indexing
* @return the current {@link Informable}
* @deprecated please use methods on the {@link SharedIndexInformer} to add/remove indexes
*/
@Deprecated
Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Set the limit to the number of resources to list at one time. This means that longer
* Set the limit to the number of resources to list at one time. This means that longer
* lists will take multiple requests to fetch.
* <p>If the list fails to complete it will be re-attempted with this limit, rather than
* falling back to the full list. You should ensure that your handlers are either async or
* <p>
* If the list fails to complete it will be re-attempted with this limit, rather than
* falling back to the full list. You should ensure that your handlers are either async or
* fast acting to prevent long delays in list processing that may cause results to expire
* before reaching the end of the list.
* <p>WARNING As noted in the go client: "paginated lists are always served directly from
* <p>
* WARNING As noted in the go client: "paginated lists are always served directly from
* etcd, which is significantly less efficient and may lead to serious performance and
* scalability problems."
*
* @param limit of a items in a list fetch
* @return the current {@link Informable}
*/
Informable<T> withLimit(Long limit);

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
* <p>This returned informer will not support resync.
* <p>This call will be blocking for the initial list and watch.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* <p>
* This returned informer will not support resync.
* <p>
* This call will be blocking for the initial list and watch.
* <p>
* You are expected to call stop to terminate the underlying Watch.
* <p>
* Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
*
* @return a running {@link SharedIndexInformer}
*/
default SharedIndexInformer<T> inform() {
return inform(null, 0);
}

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
* <p>This returned informer will not support resync.
* <p>This call will be blocking for the initial list and watch.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* <p>
* This returned informer will not support resync.
* <p>
* This call will be blocking for the initial list and watch.
* <p>
* You are expected to call stop to terminate the underlying Watch.
* <p>
* Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
*
* @param handler to notify
* @return a running {@link SharedIndexInformer}
*/
default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
return inform(handler, 0);
}

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
* <p>This call will be blocking for the initial list and watch.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* <p>
* This call will be blocking for the initial list and watch.
* <p>
* You are expected to call stop to terminate the underlying Watch.
* <p>
* Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
*
* @param handler to notify
* @param resync the resync period or 0 for no resync
* @return a running {@link SharedIndexInformer}
Expand All @@ -100,19 +117,22 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* <p>
* You are expected to call stop to terminate the underlying Watch.
* <p>
* Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
* @param resync the resync period or 0 for no resync
* @return a non-running {@link SharedIndexInformer}
*/
SharedIndexInformer<T> runnableInformer(long resync);

/**
* Return a {@link Future} when the list at this context satisfies the given {@link Predicate}.
* The predicate will be tested against the state of the underlying informer store on every event.
* The returned future should be cancelled by the caller if not waiting for completion to close the underlying informer
*
* @param condition the {@link Predicate} to test
* @return a {@link CompletableFuture} of the list of items after the condition is met
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Indexer;

import java.util.List;
Expand All @@ -33,6 +34,23 @@ public interface SharedIndexInformer<T> extends SharedInformer<T> {
*/
void addIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Remove the namesapce index
*
* @return this
*/
default SharedIndexInformer<T> removeNamespaceIndex() {
return removeIndexer(Cache.NAMESPACE_INDEX);
}

/**
* Remove the named index
*
* @param name
* @return this
*/
SharedIndexInformer<T> removeIndexer(String name);

/**
* returns the internal indexer store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

import io.fabric8.kubernetes.client.informers.cache.Store;

import java.util.stream.Stream;

/**
* SharedInformer defines basic methods of an informer.
*
* This has been ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go
* This has been ported from official go client:
* https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go
*/
public interface SharedInformer<T> extends AutoCloseable {

Expand All @@ -43,18 +46,20 @@ public interface SharedInformer<T> extends AutoCloseable {

/**
* Starts the shared informer, which will be stopped when {@link #stop()} is called.
*
* <br>Only one start attempt is made - subsequent calls will not re-start the informer.
*
* <br>If the informer is not already running, this is a blocking call
*
* <br>
* Only one start attempt is made - subsequent calls will not re-start the informer.
*
* <br>
* If the informer is not already running, this is a blocking call
*/
void run();

/**
* Stops the shared informer. The informer cannot be started again.
* Stops the shared informer. The informer cannot be started again.
*/
void stop();

@Override
default void close() {
stop();
Expand All @@ -75,7 +80,7 @@ default boolean hasSynced() {
* @return string value or null if never synced
*/
String lastSyncResourceVersion();

/**
* Return true if the informer is running
*/
Expand All @@ -85,16 +90,29 @@ default boolean hasSynced() {
* Return the class this informer is watching
*/
Class<T> getApiTypeClass();

/**
* Return true if the informer is actively watching
* <br>Will return false when {@link #isRunning()} is true when the watch needs to be re-established.
* <br>
* Will return false when {@link #isRunning()} is true when the watch needs to be re-established.
*/
boolean isWatching();

/**
* Return the Store associated with this informer
*
* @return the store
*/
Store<T> getStore();

/**
* Sets the initial state of the informer store, which will
* be replaced by the initial list operation. This will emit
* relevant delete and update events, rather than just adds.
* <br>
* Can only be called before the informer is running
*
* @param items
*/
SharedIndexInformer<T> initialState(Stream<T> items);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
/**
* Indexer extends Store interface and add index/de-index methods.
*
* This implementation has been taken from official client: https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/Indexer.java
* This implementation has been taken from official client:
* https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/Indexer.java
* which has been ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/index.go
*
* @param <T> resource
Expand Down Expand Up @@ -68,4 +69,11 @@ public interface Indexer<T> extends Store<T> {
* @param indexers indexers to add
*/
void addIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Remove the named index
*
* @param name
*/
void removeIndexer(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
*
* This is ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/store.go
*
* <br>Refactored to only expose read methods
* <br>
* Refactored to only expose read methods
*
* @param <T> resource
*/
public interface Store<T> {

/**
* Returns a list of all the items.
* Returns a list of all the items.
*
* @return list of all items
*/
Expand Down Expand Up @@ -65,4 +66,12 @@ public interface Store<T> {
*/
T getByKey(String key);

/**
* Use the key function to extract the object's key.
*
* @param object object
* @return the key
*/
String getKey(T object);

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void addIndexers(Map<String, Function<T, List<String>>> indexers) {

}

@Override
public void removeIndexer(String name) {

}

@Override
public List<T> list() {
return map.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toList());
Expand Down Expand Up @@ -82,4 +87,9 @@ public T getByKey(String key) {
protected void put(String index, String key, T object) {
map.compute(index, (k, v) -> v == null ? new HashMap<>() : v).put(key, object);
}

@Override
public String getKey(T object) {
return null;
}
}
Loading

0 comments on commit 2f65ca6

Please sign in to comment.