Skip to content

Commit

Permalink
fix #3472 #3587: allowing for full control over informer key and state
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Apr 13, 2022
1 parent 502a14c commit af21ba4
Show file tree
Hide file tree
Showing 14 changed files with 750 additions and 192 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* Fix #1285: removed references to manually calling registerCustomKind
* Fix #3334: adding basic support for server side apply. Use patch(PatchContext.of(PatchType.SERVER_SIDE_APPLY), service), or new PatchContext.Builder().withPatchType(PatchType.SERVER_SIDE_APPLY).withForce(true).build() to override conflicts.
* Fix #3969: relist will not trigger sync events
* Fix #3968: SharedInformer.initialState can be used to set the store state before the informer starts.
* Fix #3968: SharedIndexInformer.initialState can be used to set the store state before the informer starts.
SharedIndexInformer allows for the addition and removal of indexes even after starting, and you can remove the default namespace index if you wish.
And Store.getKey can be used rather than directly referencing static Cache functions.

Expand All @@ -27,6 +27,7 @@ And Store.getKey can be used rather than directly referencing static Cache funct
* Fix #3407 #3973: Added Resourceable.resource to directly associate a resource with the DSL. It can be used as an alternative to Loadable.load when you already have the item.
There is also client.resourceList(...).resources() and client.configMaps().resources() - that will provide a Resource stream.
This allows you to implement composite operations easily with lambda: client.secrets().resources().forEach(r -> r.delete());
* Fix #3472 #3587: Allowing for customization of the Informer store/cache key function and how state is stored. See BasicItemStore and ReducedStateItemStore and the SharedIndexInformer.itemStore function.

#### _**Note**_: Breaking changes in the API
Please see the [migration guide](doc/MIGRATION-v6.md)
Expand Down
2 changes: 2 additions & 0 deletions doc/MIGRATION-v6.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ KubernetesList and Template will no longer automatically sort their objects by d
- WatchListDeletable now takes three type parameters to include the Resource type.
- PodResource is no longer generic.
-
- SharedInformer was removed, there is now only SharedIndexInformer
- The following interfaces were removed:
* CascadingEditReplacePatchDeletable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformer;

import java.util.List;
import java.util.Map;
Expand All @@ -31,8 +30,8 @@
public interface Informable<T> {

/**
* The indexers to add to {@link SharedInformer}s created by subsequent inform calls;
*
* The indexers to add to {@link SharedIndexInformer}s created by subsequent inform calls;
*
* @param indexers to customize the indexing
* @return the current {@link Informable}
* @deprecated please use methods on the {@link SharedIndexInformer} to add/remove indexes
Expand All @@ -52,7 +51,7 @@ public interface Informable<T> {
* WARNING As noted in the go client: "paginated lists are always served directly from
* etcd, which is significantly less efficient and may lead to serious performance and
* scalability problems."
*
*
* @param limit of a items in a list fetch
* @return the current {@link Informable}
*/
Expand Down Expand Up @@ -132,7 +131,7 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
* Return a {@link Future} when the list at this context satisfies the given {@link Predicate}.
* The predicate will be tested against the state of the underlying informer store on every event.
* The returned future should be cancelled by the caller if not waiting for completion to close the underlying informer
*
*
* @param condition the {@link Predicate} to test
* @return a {@link CompletableFuture} of the list of items after the condition is met
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@

import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
import io.fabric8.kubernetes.client.informers.cache.ItemStore;
import io.fabric8.kubernetes.client.informers.cache.Store;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Stream;

/**
* SharedInxedInformer extends SharedInformer and provides indexer operability additionally.
*/
public interface SharedIndexInformer<T> extends SharedInformer<T> {
public interface SharedIndexInformer<T> extends AutoCloseable {

/**
* Add indexers
*
* @param indexers indexers
*/
void addIndexers(Map<String, Function<T, List<String>>> indexers);
SharedIndexInformer<T> addIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Remove the namesapce index
Expand All @@ -57,4 +61,102 @@ default SharedIndexInformer<T> removeNamespaceIndex() {
* @return the internal indexer store
*/
Indexer<T> getIndexer();

/**
* Add event handler
*
* @param handler event handler
*/
SharedIndexInformer<T> addEventHandler(ResourceEventHandler<? super T> handler);

/**
* Adds an event handler to the shared informer using the specified resync period.
* Events to a single handler are delivered sequentially, but there is no
* coordination between different handlers.
*
* @param handle the event handler
* @param resyncPeriod the specific resync period
*/
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle, long resyncPeriod);

/**
* Starts the shared informer, which will be stopped when {@link #stop()} is called.
*
* <br>
* Only one start attempt is made - subsequent calls will not re-start the informer.
*
* <br>
* If the informer is not already running, this is a blocking call
*/
SharedIndexInformer<T> run();

/**
* Stops the shared informer. The informer cannot be started again.
*/
void stop();

@Override
default void close() {
stop();
}

/**
* Return true if the informer has ever synced
*/
default boolean hasSynced() {
return lastSyncResourceVersion() != null;
}

/**
* The resource version observed when last synced with the underlying store.
* The value returned is not synchronized with access to the underlying store
* and is not thread-safe.
*
* @return string value or null if never synced
*/
String lastSyncResourceVersion();

/**
* Return true if the informer is running
*/
boolean isRunning();

/**
* Return the class this informer is watching
*/
Class<T> getApiTypeClass();

/**
* Return true if the informer is actively watching
* <br>
* Will return false when {@link #isRunning()} is true when the watch needs to be re-established.
*/
boolean isWatching();

/**
* Return the Store associated with this informer
*
* @return the store
*/
Store<T> getStore();

/**
* Sets the initial state of the informer store, which will
* be replaced by the initial list operation. This will emit
* relevant delete and update events, rather than just adds.
* <br>
* Can only be called before the informer is running
*
* @param items
*/
SharedIndexInformer<T> initialState(Stream<T> items);

SharedIndexInformer<T> itemStore(ItemStore<T> itemStore);

/**
* A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called.
* <br>
* Only one start attempt is made - subsequent calls will not re-start the informer.
*/
CompletableFuture<Void> start();
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.cache.ItemStore;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Stream;

public class BasicItemStore<V extends HasMetadata> implements ItemStore<V> {

private Function<V, String> keyFunction;
private ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();

public BasicItemStore(Function<V, String> keyFunction) {
this.keyFunction = keyFunction;
}

@Override
public String getKey(V obj) {
return keyFunction.apply(obj);
}

@Override
public V put(String key, V obj) {
return store.put(key, obj);
}

@Override
public V remove(String key) {
return store.remove(key);
}

@Override
public Stream<String> keySet() {
return store.keySet().stream();
}

@Override
public Stream<V> values() {
return store.values().stream();
}

@Override
public V get(String key) {
return store.get(key);
}

@Override
public int size() {
return store.size();
}

}
Loading

0 comments on commit af21ba4

Please sign in to comment.