From b2eccd37debeab770a3aa902ab81542cfe661286 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Sun, 3 Apr 2022 13:46:19 -0400 Subject: [PATCH] fix #3472 #3587: allowing for full control over informer key and state --- CHANGELOG.md | 3 +- doc/MIGRATION-v6.md | 2 + .../kubernetes/client/dsl/Informable.java | 9 +- .../client/informers/SharedIndexInformer.java | 106 +++++++++- .../client/informers/SharedInformer.java | 126 ----------- .../informers/cache/BasicItemStore.java | 70 +++++++ .../client/informers/cache/Cache.java | 14 +- .../client/informers/cache/ItemStore.java | 57 +++++ .../cache/ReducedStateItemStore.java | 185 +++++++++++++++++ .../cache/ReducedStateItemStoreTest.java | 56 +++++ .../impl/DefaultSharedIndexInformer.java | 37 +++- .../informers/impl/cache/CacheImpl.java | 25 +-- .../api/model/GenericKubernetesResource.java | 57 +++-- .../kubernetes/client/mock/InformTest.java | 195 ++++++++++++++++-- 14 files changed, 750 insertions(+), 192 deletions(-) delete mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/BasicItemStore.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/ItemStore.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReducedStateItemStore.java create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReducedStateItemStoreTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 581082fd2a6..c11af0f4292 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ * Fix #1285: removed references to manually calling registerCustomKind * Fix #3334: adding basic support for server side apply. Use patch(PatchContext.of(PatchType.SERVER_SIDE_APPLY), service), or new PatchContext.Builder().withPatchType(PatchType.SERVER_SIDE_APPLY).withForce(true).build() to override conflicts. * Fix #3969: relist will not trigger sync events -* Fix #3968: SharedInformer.initialState can be used to set the store state before the informer starts. +* Fix #3968: SharedIndexInformer.initialState can be used to set the store state before the informer starts. SharedIndexInformer allows for the addition and removal of indexes even after starting, and you can remove the default namespace index if you wish. And Store.getKey can be used rather than directly referencing static Cache functions. @@ -35,6 +35,7 @@ And Store.getKey can be used rather than directly referencing static Cache funct * Fix #3407 #3973: Added Resourceable.resource to directly associate a resource with the DSL. It can be used as an alternative to Loadable.load when you already have the item. There is also client.resourceList(...).resources() and client.configMaps().resources() - that will provide a Resource stream. This allows you to implement composite operations easily with lambda: client.secrets().resources().forEach(r -> r.delete()); +* Fix #3472 #3587: Allowing for customization of the Informer store/cache key function and how state is stored. See BasicItemStore and ReducedStateItemStore and the SharedIndexInformer.itemStore function. #### _**Note**_: Breaking changes in the API Please see the [migration guide](doc/MIGRATION-v6.md) diff --git a/doc/MIGRATION-v6.md b/doc/MIGRATION-v6.md index aa3a5269a73..e5bab24307e 100644 --- a/doc/MIGRATION-v6.md +++ b/doc/MIGRATION-v6.md @@ -208,6 +208,8 @@ KubernetesList and Template will no longer automatically sort their objects by d - PodResource is no longer generic. +- SharedInformer was removed, there is now only SharedIndexInformer + ## Evict Changes Evictable.evict will throw an exception rather than returning false if the pod is not found. This ensures that false strictly means that the evict failed. diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java index 89ab4951a99..6b507878af7 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java @@ -19,7 +19,6 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.fabric8.kubernetes.client.informers.SharedInformer; import java.util.List; import java.util.Map; @@ -31,8 +30,8 @@ public interface Informable { /** - * The indexers to add to {@link SharedInformer}s created by subsequent inform calls; - * + * The indexers to add to {@link SharedIndexInformer}s created by subsequent inform calls; + * * @param indexers to customize the indexing * @return the current {@link Informable} * @deprecated please use methods on the {@link SharedIndexInformer} to add/remove indexes @@ -52,7 +51,7 @@ public interface Informable { * WARNING As noted in the go client: "paginated lists are always served directly from * etcd, which is significantly less efficient and may lead to serious performance and * scalability problems." - * + * * @param limit of a items in a list fetch * @return the current {@link Informable} */ @@ -132,7 +131,7 @@ default SharedIndexInformer inform(ResourceEventHandler handler) { * Return a {@link Future} when the list at this context satisfies the given {@link Predicate}. * The predicate will be tested against the state of the underlying informer store on every event. * The returned future should be cancelled by the caller if not waiting for completion to close the underlying informer - * + * * @param condition the {@link Predicate} to test * @return a {@link CompletableFuture} of the list of items after the condition is met */ diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index ca34ec207af..d58d6dcdc3f 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -17,22 +17,26 @@ import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.informers.cache.Indexer; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; +import io.fabric8.kubernetes.client.informers.cache.Store; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import java.util.stream.Stream; /** * SharedInxedInformer extends SharedInformer and provides indexer operability additionally. */ -public interface SharedIndexInformer extends SharedInformer { +public interface SharedIndexInformer extends AutoCloseable { /** * Add indexers * * @param indexers indexers */ - void addIndexers(Map>> indexers); + SharedIndexInformer addIndexers(Map>> indexers); /** * Remove the namesapce index @@ -57,4 +61,102 @@ default SharedIndexInformer removeNamespaceIndex() { * @return the internal indexer store */ Indexer getIndexer(); + + /** + * Add event handler + * + * @param handler event handler + */ + SharedIndexInformer addEventHandler(ResourceEventHandler handler); + + /** + * Adds an event handler to the shared informer using the specified resync period. + * Events to a single handler are delivered sequentially, but there is no + * coordination between different handlers. + * + * @param handle the event handler + * @param resyncPeriod the specific resync period + */ + SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handle, long resyncPeriod); + + /** + * Starts the shared informer, which will be stopped when {@link #stop()} is called. + * + *
+ * Only one start attempt is made - subsequent calls will not re-start the informer. + * + *
+ * If the informer is not already running, this is a blocking call + */ + SharedIndexInformer run(); + + /** + * Stops the shared informer. The informer cannot be started again. + */ + void stop(); + + @Override + default void close() { + stop(); + } + + /** + * Return true if the informer has ever synced + */ + default boolean hasSynced() { + return lastSyncResourceVersion() != null; + } + + /** + * The resource version observed when last synced with the underlying store. + * The value returned is not synchronized with access to the underlying store + * and is not thread-safe. + * + * @return string value or null if never synced + */ + String lastSyncResourceVersion(); + + /** + * Return true if the informer is running + */ + boolean isRunning(); + + /** + * Return the class this informer is watching + */ + Class getApiTypeClass(); + + /** + * Return true if the informer is actively watching + *
+ * Will return false when {@link #isRunning()} is true when the watch needs to be re-established. + */ + boolean isWatching(); + + /** + * Return the Store associated with this informer + * + * @return the store + */ + Store getStore(); + + /** + * Sets the initial state of the informer store, which will + * be replaced by the initial list operation. This will emit + * relevant delete and update events, rather than just adds. + *
+ * Can only be called before the informer is running + * + * @param items + */ + SharedIndexInformer initialState(Stream items); + + SharedIndexInformer itemStore(ItemStore itemStore); + + /** + * A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called. + *
+ * Only one start attempt is made - subsequent calls will not re-start the informer. + */ + CompletableFuture start(); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java deleted file mode 100644 index 3470b5b40bc..00000000000 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.informers; - -import io.fabric8.kubernetes.client.informers.cache.Store; - -import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; - -/** - * SharedInformer defines basic methods of an informer. - * - * This has been ported from official go client: - * https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go - */ -public interface SharedInformer extends AutoCloseable { - - /** - * Add event handler - * - * @param handler event handler - */ - void addEventHandler(ResourceEventHandler handler); - - /** - * Adds an event handler to the shared informer using the specified resync period. - * Events to a single handler are delivered sequentially, but there is no - * coordination between different handlers. - * - * @param handle the event handler - * @param resyncPeriod the specific resync period - */ - void addEventHandlerWithResyncPeriod(ResourceEventHandler handle, long resyncPeriod); - - /** - * Starts the shared informer, which will be stopped when {@link #stop()} is called. - * - *
- * Only one start attempt is made - subsequent calls will not re-start the informer. - * - *
- * If the informer is not already running, this is a blocking call - */ - void run(); - - /** - * Stops the shared informer. The informer cannot be started again. - */ - void stop(); - - @Override - default void close() { - stop(); - } - - /** - * Return true if the informer has ever synced - */ - default boolean hasSynced() { - return lastSyncResourceVersion() != null; - } - - /** - * The resource version observed when last synced with the underlying store. - * The value returned is not synchronized with access to the underlying store - * and is not thread-safe. - * - * @return string value or null if never synced - */ - String lastSyncResourceVersion(); - - /** - * Return true if the informer is running - */ - boolean isRunning(); - - /** - * Return the class this informer is watching - */ - Class getApiTypeClass(); - - /** - * Return true if the informer is actively watching - *
- * Will return false when {@link #isRunning()} is true when the watch needs to be re-established. - */ - boolean isWatching(); - - /** - * Return the Store associated with this informer - * - * @return the store - */ - Store getStore(); - - /** - * Sets the initial state of the informer store, which will - * be replaced by the initial list operation. This will emit - * relevant delete and update events, rather than just adds. - *
- * Can only be called before the informer is running - * - * @param items - */ - SharedIndexInformer initialState(Stream items); - - /** - * A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called. - *
- * Only one start attempt is made - subsequent calls will not re-start the informer. - */ - CompletableFuture start(); -} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/BasicItemStore.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/BasicItemStore.java new file mode 100644 index 00000000000..036de6f4c6b --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/BasicItemStore.java @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers.cache; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Stream; + +public class BasicItemStore implements ItemStore { + + private Function keyFunction; + private ConcurrentHashMap store = new ConcurrentHashMap<>(); + + public BasicItemStore(Function keyFunction) { + this.keyFunction = keyFunction; + } + + @Override + public String getKey(V obj) { + return keyFunction.apply(obj); + } + + @Override + public V put(String key, V obj) { + return store.put(key, obj); + } + + @Override + public V remove(String key) { + return store.remove(key); + } + + @Override + public Stream keySet() { + return store.keySet().stream(); + } + + @Override + public Stream values() { + return store.values().stream(); + } + + @Override + public V get(String key) { + return store.get(key); + } + + @Override + public int size() { + return store.size(); + } + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java index 7e66aca03fe..401ab702e09 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.informers.cache; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.utils.ReflectUtils; import io.fabric8.kubernetes.client.utils.Utils; @@ -42,11 +43,11 @@ public interface Cache extends Indexer { */ public static String metaNamespaceKeyFunc(Object obj) { try { - if( obj == null ) { + if (obj == null) { return ""; } ObjectMeta metadata; - if(obj instanceof String) { + if (obj instanceof String) { return (String) obj; } else if (obj instanceof ObjectMeta) { metadata = (ObjectMeta) obj; @@ -63,6 +64,14 @@ public static String metaNamespaceKeyFunc(Object obj) { } } + public static String metaUidKeyFunc(HasMetadata obj) { + if (obj == null || obj.getMetadata() == null) { + return ""; + } + String result = obj.getMetadata().getUid(); + return Utils.getNonNullOrElse(result, ""); + } + /** * Default index function that indexes based on an object's namespace and name. * @@ -90,4 +99,3 @@ public static List metaNamespaceIndexFunc(Object obj) { } } } - diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/ItemStore.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/ItemStore.java new file mode 100644 index 00000000000..ef89eb2cf94 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/ItemStore.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers.cache; + +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +import java.util.stream.Stream; + +/** + * Holds item state for the {@link SharedIndexInformer}. For events to be consistent the store should remember + * all entries until they are deleted. At its simplest this is just a map coupled with a key function. + *

+ * Modifications to this store once the informer is running, by anything other than the informer will alter the event stream. If + * for example an item is not found, an subsequent update from the api version will send notifications to + * {@link ResourceEventHandler}s as an add. + *

+ * Direct modifications to this store by anything other than the informer will not updated indexes nor emit events. + *

+ * The implementation should be safe with respect to concurrency. Modifications from the informer + * will be single threaded, but not necessarily the same thread. Reads may be concurrent with writes. + *

+ * See an example implementations {@link BasicItemStore} and {@link ReducedStateItemStore} + * + * @param + */ +public interface ItemStore { + + String getKey(V obj); + + V put(String key, V obj); + + V remove(String key); + + Stream keySet(); + + Stream values(); + + int size(); + + V get(String key); + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReducedStateItemStore.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReducedStateItemStore.java new file mode 100644 index 00000000000..ea42bbf160d --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReducedStateItemStore.java @@ -0,0 +1,185 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers.cache; + +import io.fabric8.kubernetes.api.model.GenericKubernetesResource; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.utils.Serialization; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * By default an informer stores every item that exists in memory. + * If that is too much memory for your application, + * you may instead reduce what is actually stored by using this item store. + */ +public class ReducedStateItemStore implements ItemStore { + + private static final String METADATA = "metadata"; + private final ConcurrentHashMap store = new ConcurrentHashMap<>(); + private final List fields = new ArrayList<>(); + private final Class typeClass; + private final KeyState keyState; + + public static class KeyState { + + final Function keyFunction; + final Function keyFieldFunction; + final List keyFields; + + /** + * The key function must decompose a given key into the given fields - in field order + * + * @param keyFieldFunction to convert a key into fields + * @param keyFields the fields represented by the key + */ + public KeyState(Function keyFunction, Function keyFieldFunction, + String[]... keyFields) { + this.keyFunction = keyFunction; + this.keyFieldFunction = keyFieldFunction; + this.keyFields = Arrays.asList(keyFields); + } + + } + + public static final KeyState NAME_KEY_STATE = new KeyState(Cache::metaNamespaceKeyFunc, + k -> { + int index = k.indexOf("/"); + if (index == -1) { + return new String[] { null, k }; + } + return new String[] { k.substring(0, index), k.substring(index + 1) }; + }, new String[] { METADATA, "namespace" }, new String[] { METADATA, "name" }); + + public static final KeyState UID_KEY_STATE = new KeyState(Cache::metaUidKeyFunc, + k -> new String[] { k }, new String[] { METADATA, "uid" }); + + /** + * Create a state store with only the fields specified. + *

+ * metadata.resourceVersion - will automatically be saved as will + * the necessary key fields. + *

+ * If you are using custom indexers, then the fields used by those + * indexes must be added to the valueFields - otherwise the indexer won't be able to delete the + * index entries when the item is removed. + *

+ * For example in level event handling systems all you may need beyond the + * key is the ownerReferences. You would use withValueFields("metadata.ownerReferences") + * for that. + *

+ * NOTE: If you use this feature, you should only use the informer cache/store for basic + * existence checks and maintain your own cache of full resource objects. + *

+ * Only simple names are allowed in field paths - '.' is reserved as the separator. + *

+ * Whatever is provided as the {@link KeyState} should match the keyFunction provided to the informer. + * + * @param keyState information about the key fields/function + * @param typeClass the expected type + * @param valueFields the additional fields to save + */ + public ReducedStateItemStore(KeyState keyState, Class typeClass, String... valueFields) { + this.keyState = keyState; + fields.add(new String[] { METADATA, "resourceVersion" }); + if (valueFields != null) { + for (int i = 0; i < valueFields.length; i++) { + fields.add(valueFields[i].split("\\.")); + } + } + this.typeClass = typeClass; + } + + Object[] store(V value) { + if (value == null) { + return null; + } + Map raw = Serialization.jsonMapper().convertValue(value, Map.class); + return fields.stream().map(f -> GenericKubernetesResource.get(raw, (Object[]) f)).toArray(); + } + + V restore(String key, Object[] values) { + if (values == null) { + return null; + } + Map raw = new HashMap<>(); + applyFields(values, raw, this.fields); + String[] keyParts = this.keyState.keyFieldFunction.apply(key); + applyFields(keyParts, raw, this.keyState.keyFields); + + return Serialization.jsonMapper().convertValue(raw, typeClass); + } + + private static void applyFields(Object[] values, Map raw, List fields) { + for (int i = 0; i < fields.size(); i++) { + Object value = values[i]; + if (value == null) { + continue; + } + String[] path = fields.get(i); + Map parent = raw; + for (int j = 0; j < path.length - 1; j++) { + parent = (Map) parent.computeIfAbsent(path[j], k -> new LinkedHashMap()); + } + parent.put(path[path.length - 1], value); + } + } + + @Override + public V put(String key, V obj) { + return restore(key, store.put(key, store(obj))); + } + + @Override + public V remove(String key) { + return restore(key, store.remove(key)); + } + + @Override + public Stream keySet() { + return store.keySet().stream(); + } + + @Override + public Stream values() { + return store.entrySet().stream().map(e -> restore(e.getKey(), e.getValue())); + } + + @Override + public V get(String key) { + return restore(key, store.get(key)); + } + + @Override + public int size() { + return store.size(); + } + + @Override + public String getKey(V obj) { + return this.keyState.keyFunction.apply(obj); + } + +} diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReducedStateItemStoreTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReducedStateItemStoreTest.java new file mode 100644 index 00000000000..f710b59ae62 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReducedStateItemStoreTest.java @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.informers.cache; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ReducedStateItemStoreTest { + + @Test + void testStoreRestore() { + ReducedStateItemStore store = new ReducedStateItemStore<>(ReducedStateItemStore.UID_KEY_STATE, Pod.class, + "metadata.labels", "foo.bar"); + + Pod pod = new PodBuilder().withNewSpec().endSpec().withNewMetadata().withUid("x").withName("y").addToLabels("one", "1") + .addToLabels("two", "2").withResourceVersion("2").endMetadata().withNewStatus().endStatus().build(); + + Object[] values = store.store(pod); + + assertEquals(3, values.length); + assertEquals("2", values[0]); // always the resourceVersion + assertEquals(pod.getMetadata().getLabels(), values[1]); + assertNull(values[2]); + + Pod restored = store.restore("x", values); + + assertNull(restored.getSpec()); + assertNull(restored.getStatus()); + assertEquals("x", restored.getMetadata().getUid()); + assertEquals(pod.getMetadata().getLabels(), restored.getMetadata().getLabels()); + + assertNull(store.put("x", pod)); + assertNotNull(store.get("x")); + assertEquals(1, store.size()); + assertNotNull(store.remove("x")); + } + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 3e1f3f57771..8c080441b3d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Indexer; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.fabric8.kubernetes.client.informers.cache.Store; import io.fabric8.kubernetes.client.informers.impl.cache.CacheImpl; import io.fabric8.kubernetes.client.informers.impl.cache.ProcessorStore; @@ -59,7 +60,7 @@ public class DefaultSharedIndexInformer reflector; private final Class apiTypeClass; private final ProcessorStore processorStore; - private final CacheImpl indexer; + private final CacheImpl indexer = new CacheImpl<>(); private final SharedProcessor processor; private final Executor informerExecutor; @@ -68,6 +69,8 @@ public class DefaultSharedIndexInformer resyncFuture; + private Stream initialState; + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, Executor informerExecutor) { if (resyncPeriod < 0) { @@ -80,7 +83,6 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.informerExecutor = informerExecutor; // reuse the informer executor, but ensure serial processing this.processor = new SharedProcessor<>(informerExecutor); - this.indexer = new CacheImpl<>(); processorStore = new ProcessorStore<>(this.indexer, this.processor); this.reflector = new Reflector<>(apiTypeClass, listerWatcher, processorStore); @@ -92,15 +94,17 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis * @param handler event handler */ @Override - public void addEventHandler(ResourceEventHandler handler) { + public SharedIndexInformer addEventHandler(ResourceEventHandler handler) { addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod); + return this; } @Override - public void addEventHandlerWithResyncPeriod(ResourceEventHandler handler, long resyncPeriodMillis) { + public SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handler, + long resyncPeriodMillis) { if (stopped) { log.info("DefaultSharedIndexInformer#Handler was not added to shared informer because it has stopped already"); - return; + return this; } if (resyncPeriodMillis > 0) { @@ -127,6 +131,8 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler hand this.processor.addProcessorListener(handler, determineResyncPeriod(resyncPeriodMillis, this.resyncCheckPeriodMillis), this.indexer::list); + + return this; } @Override @@ -143,6 +149,10 @@ public CompletableFuture start() { if (!started.compareAndSet(false, true)) { return CompletableFuture.completedFuture(null); } + + if (initialState != null) { + initialState.forEach(indexer::put); + } } log.debug("informer: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis); @@ -160,8 +170,9 @@ public CompletableFuture start() { } @Override - public void run() { + public SharedIndexInformer run() { Utils.waitUntilReadyOrFail(start(), -1, TimeUnit.MILLISECONDS); + return this; } @Override @@ -180,8 +191,9 @@ private synchronized void stopResync() { } @Override - public void addIndexers(Map>> indexers) { + public SharedIndexInformer addIndexers(Map>> indexers) { indexer.addIndexers(indexers); + return this; } @Override @@ -250,7 +262,16 @@ public synchronized SharedIndexInformer initialState(Stream items) { if (started.get()) { throw new KubernetesClientException("Informer cannot be running when initial state is added"); } - items.forEach(i -> this.indexer.put(i)); + this.initialState = items; + return this; + } + + @Override + public synchronized SharedIndexInformer itemStore(ItemStore itemStore) { + if (started.get()) { + throw new KubernetesClientException("Informer cannot be running when setting item store"); + } + this.indexer.setItemStore(itemStore); return this; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java index a6c65de4245..586d2a2a2b9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java @@ -17,7 +17,9 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.informers.cache.BasicItemStore; import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.fabric8.kubernetes.client.utils.ReflectUtils; import io.fabric8.kubernetes.client.utils.Utils; @@ -29,8 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import java.util.stream.Collectors; /** * It basically saves and indexes all the entries. @@ -38,9 +40,6 @@ * @param type for cache object */ public class CacheImpl implements Cache { - // Defines how to map objects into indices - private Function keyFunc; - // NAMESPACE_INDEX is the default index function for caching objects public static final String NAMESPACE_INDEX = "namespace"; @@ -48,7 +47,7 @@ public class CacheImpl implements Cache { private final Map>> indexers = new HashMap<>(); // items stores object instances - private final Map items = new ConcurrentHashMap<>(); + private ItemStore items; // indices stores objects' key by their indices private final Map>> indices = new HashMap<>(); @@ -58,10 +57,14 @@ public CacheImpl() { } public CacheImpl(String indexName, Function> indexFunc, Function keyFunc) { - this.keyFunc = keyFunc; + this.items = new BasicItemStore<>(keyFunc); addIndexFunc(indexName, indexFunc); } + public void setItemStore(ItemStore items) { + this.items = items; + } + /** * Returns the indexers registered with the cache. * @@ -123,7 +126,7 @@ public synchronized T remove(T obj) { */ @Override public List listKeys() { - return new ArrayList<>(this.items.keySet()); + return this.items.keySet().collect(Collectors.toList()); } /** @@ -143,7 +146,7 @@ public T get(T obj) { */ @Override public String getKey(T obj) { - String result = this.keyFunc.apply(obj); + String result = this.items.getKey(obj); return result == null ? "" : result; } @@ -154,7 +157,7 @@ public String getKey(T obj) { */ @Override public List list() { - return new ArrayList<>(this.items.values()); + return this.items.values().collect(Collectors.toList()); } /** @@ -321,9 +324,7 @@ public synchronized CacheImpl addIndexFunc(String indexName, Function updateIndex(getKey(v), v, indexFunc, index)); return this; } diff --git a/kubernetes-model-generator/kubernetes-model-core/src/main/java/io/fabric8/kubernetes/api/model/GenericKubernetesResource.java b/kubernetes-model-generator/kubernetes-model-core/src/main/java/io/fabric8/kubernetes/api/model/GenericKubernetesResource.java index a0c7a105662..c4a16326d11 100644 --- a/kubernetes-model-generator/kubernetes-model-core/src/main/java/io/fabric8/kubernetes/api/model/GenericKubernetesResource.java +++ b/kubernetes-model-generator/kubernetes-model-core/src/main/java/io/fabric8/kubernetes/api/model/GenericKubernetesResource.java @@ -37,10 +37,10 @@ @JsonDeserialize(using = com.fasterxml.jackson.databind.JsonDeserializer.None.class) @JsonInclude(JsonInclude.Include.NON_NULL) @JsonPropertyOrder({ - "apiVersion", - "kind", - "metadata", - "items", + "apiVersion", + "kind", + "metadata", + "items", }) @Getter @Setter @@ -82,15 +82,19 @@ public JsonNode getAdditionalPropertiesNode() { /** * Allows the retrieval of field values from this Resource for the provided path segments. * - *

If the path segment is of type {@link Integer}, then we assume that it is an array index to retrieve + *

+ * If the path segment is of type {@link Integer}, then we assume that it is an array index to retrieve * the value of an entry in the array. * - *

If the path segment is of type {@link String}, then we assume that it is a field name to retrieve the value + *

+ * If the path segment is of type {@link String}, then we assume that it is a field name to retrieve the value * from the resource. * - *

In any other case, the path segment is ignored and considered invalid. The method returns null. + *

+ * In any other case, the path segment is ignored and considered invalid. The method returns null. * - *

Considering the following JSON object: + *

+ * Considering the following JSON object: * *

{@code
    * {
@@ -104,27 +108,40 @@ public JsonNode getAdditionalPropertiesNode() {
    * }
    * }
* - *

The following invocations will produce the documented results: + *

+ * The following invocations will produce the documented results: *

    - *
  • {@code get("field", "value")} will result in {@code 42}
  • - *
  • {@code get("field", "1")} will result in {@code "one"}
  • - *
  • {@code get("field", 1)} will result in {@code null}
  • - *
  • {@code get("field", "list", 1, "entry")} will result in {@code 2}
  • - *
  • {@code get("field", "list", 99, "entry")} will result in {@code null}
  • - *
  • {@code get("field", "list", "1", "entry")} will result in {@code null}
  • - *
  • {@code get("field", "list", 1, false)} will result in {@code null}
  • + *
  • {@code get("field", "value")} will result in {@code 42}
  • + *
  • {@code get("field", "1")} will result in {@code "one"}
  • + *
  • {@code get("field", 1)} will result in {@code null}
  • + *
  • {@code get("field", "list", 1, "entry")} will result in {@code 2}
  • + *
  • {@code get("field", "list", 99, "entry")} will result in {@code null}
  • + *
  • {@code get("field", "list", "1", "entry")} will result in {@code null}
  • + *
  • {@code get("field", "list", 1, false)} will result in {@code null}
  • *
* * @param path of the field to retrieve. * @param type of the returned object. * @return the value of the traversed path or null if the field does not exist. */ - @SuppressWarnings("unchecked") public T get(Object... path) { - Object current = getAdditionalProperties(); + return get(getAdditionalProperties(), path); + } + + /** + * The same as {@link #get(Object...)}, but starting at any root raw object + * + * @param type of the returned object (Map, Collection, or value). + * @param root starting object + * @param path of the field to retrieve. + * @return the value of the traversed path or null if the field does not exist. + */ + @SuppressWarnings("unchecked") + public static T get(Map root, Object... path) { + Object current = root; for (Object segment : path) { - if (segment instanceof Integer && current instanceof Collection && ((Collection)current).size() > (int)segment) { - current = ((Collection) current).toArray()[(int)segment]; + if (segment instanceof Integer && current instanceof Collection && ((Collection) current).size() > (int) segment) { + current = ((Collection) current).toArray()[(int) segment]; } else if (segment instanceof String && current instanceof Map) { current = ((Map) current).get(segment.toString()); } else { diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java index 6bb380de5bf..5e89df64377 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java @@ -24,10 +24,14 @@ import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodListBuilder; import io.fabric8.kubernetes.api.model.WatchEvent; +import io.fabric8.kubernetes.api.model.apps.ReplicaSetBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.fabric8.kubernetes.client.informers.cache.BasicItemStore; +import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.fabric8.kubernetes.client.informers.cache.ReducedStateItemStore; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import org.junit.jupiter.api.Test; @@ -38,6 +42,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -52,8 +58,12 @@ class InformTest { @Test void testInformPodWithLabel() throws InterruptedException { // Given - Pod pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1") - .withResourceVersion("1").endMetadata().build(); + Pod pod1 = new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod1") + .withResourceVersion("1") + .endMetadata() + .build(); server.expect() .withPath("/api/v1/namespaces/test/pods?labelSelector=my-label") @@ -62,7 +72,8 @@ void testInformPodWithLabel() throws InterruptedException { .once(); server.expect() - .withPath("/api/v1/namespaces/test/pods?labelSelector=my-label&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .withPath( + "/api/v1/namespaces/test/pods?labelSelector=my-label&resourceVersion=1&allowWatchBookmarks=true&watch=true") .andUpgradeToWebSocket() .open() .waitFor(EVENT_WAIT_PERIOD_MS) @@ -153,7 +164,8 @@ public void onUpdate(GenericKubernetesResource oldObj, GenericKubernetesResource .withPlural("dummies") .build(); - SharedIndexInformer informer = client.genericKubernetesResources(context).withLabel("my-label") + SharedIndexInformer informer = client.genericKubernetesResources(context) + .withLabel("my-label") .inform(handler); assertTrue(deleteLatch.await(10, TimeUnit.SECONDS)); @@ -168,8 +180,12 @@ public void onUpdate(GenericKubernetesResource oldObj, GenericKubernetesResource @Test void testGenericWithKnownType() throws InterruptedException { // Given - Pod pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1") - .withResourceVersion("1").endMetadata().build(); + Pod pod1 = new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod1") + .withResourceVersion("1") + .endMetadata() + .build(); server.expect() .withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1") @@ -216,7 +232,8 @@ public void onUpdate(GenericKubernetesResource oldObj, GenericKubernetesResource .withPlural("pods") .build(); - SharedIndexInformer informer = client.genericKubernetesResources(context).withName("pod1") + SharedIndexInformer informer = client.genericKubernetesResources(context) + .withName("pod1") .inform(handler); assertTrue(deleteLatch.await(1000, TimeUnit.SECONDS)); @@ -228,8 +245,12 @@ public void onUpdate(GenericKubernetesResource oldObj, GenericKubernetesResource @Test void testRunnableInformer() throws InterruptedException { // Given - Pod pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1") - .withResourceVersion("1").endMetadata().build(); + Pod pod1 = new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod1") + .withResourceVersion("1") + .endMetadata() + .build(); server.expect() .withPath("/api/v1/namespaces/test/pods?labelSelector=my-label") @@ -238,7 +259,8 @@ void testRunnableInformer() throws InterruptedException { .once(); server.expect() - .withPath("/api/v1/namespaces/test/pods?labelSelector=my-label&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .withPath( + "/api/v1/namespaces/test/pods?labelSelector=my-label&resourceVersion=1&allowWatchBookmarks=true&watch=true") .andUpgradeToWebSocket() .open() .waitFor(EVENT_WAIT_PERIOD_MS) @@ -283,16 +305,28 @@ public void onUpdate(Pod oldObj, Pod newObj) { @Test void testListLimit() throws InterruptedException { // Given - Pod pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1") - .withResourceVersion("1").endMetadata().build(); + Pod pod1 = new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod1") + .withResourceVersion("1") + .endMetadata() + .build(); - Pod pod2 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod2") - .withResourceVersion("2").endMetadata().build(); + Pod pod2 = new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod2") + .withResourceVersion("2") + .endMetadata() + .build(); server.expect() .withPath("/api/v1/namespaces/test/pods?limit=1") .andReturn(HttpURLConnection.HTTP_OK, - new PodListBuilder().withNewMetadata().withResourceVersion("2").withContinue("x").endMetadata().withItems(pod1) + new PodListBuilder().withNewMetadata() + .withResourceVersion("2") + .withContinue("x") + .endMetadata() + .withItems(pod1) .build()) .once(); @@ -335,4 +369,135 @@ public void onUpdate(Pod oldObj, Pod newObj) { informer.stop(); } + @Test + void testInformWithAlternativeKeyFunction() throws InterruptedException { + // Given + Pod pod1 = new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod1") + .withResourceVersion("1") + .withUid("uid") + .endMetadata() + .build(); + + server.expect() + .withPath("/api/v1/namespaces/test/pods") + .andReturn(HttpURLConnection.HTTP_OK, + new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().withItems(pod1).build()) + .once(); + + server.expect() + .withPath("/api/v1/namespaces/test/pods?resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .done() + .once(); + + final CountDownLatch addLatch = new CountDownLatch(1); + final ResourceEventHandler handler = new ResourceEventHandler() { + + @Override + public void onAdd(Pod obj) { + addLatch.countDown(); + } + + @Override + public void onDelete(Pod obj, boolean deletedFinalStateUnknown) { + } + + @Override + public void onUpdate(Pod oldObj, Pod newObj) { + + } + + }; + // When + SharedIndexInformer informer = client.pods() + .runnableInformer(0) + .itemStore(new BasicItemStore<>(Cache::metaUidKeyFunc)) + .removeNamespaceIndex() + .addEventHandler(handler) + .run(); + + assertTrue(addLatch.await(10, TimeUnit.SECONDS)); + assertTrue(informer.getIndexer().getIndexers().isEmpty()); + assertEquals(Arrays.asList("uid"), informer.getStore().listKeys()); + + informer.stop(); + } + + @Test + void testInformWithMinimalState() throws InterruptedException { + // Given + Pod pod1 = new PodBuilder().withNewMetadata() + .withNamespace("test") + .withName("pod1") + .withResourceVersion("1") + .withUid("uid") + .endMetadata() + .withNewSpec() + .endSpec() + .build(); + + pod1.addOwnerReference(new ReplicaSetBuilder().withNewMetadata() + .withUid("owner") + .withName("rs") + .withNamespace("test") + .endMetadata() + .build()); + + server.expect() + .withPath("/api/v1/namespaces/test/pods") + .andReturn(HttpURLConnection.HTTP_OK, + new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().withItems(pod1).build()) + .once(); + + server.expect() + .withPath("/api/v1/namespaces/test/pods?resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .done() + .once(); + + final CountDownLatch addLatch = new CountDownLatch(1); + final ResourceEventHandler handler = new ResourceEventHandler() { + + @Override + public void onAdd(Pod obj) { + addLatch.countDown(); + } + + @Override + public void onDelete(Pod obj, boolean deletedFinalStateUnknown) { + } + + @Override + public void onUpdate(Pod oldObj, Pod newObj) { + + } + + }; + // When + SharedIndexInformer informer = client.pods() + .runnableInformer(0) + .itemStore( + new ReducedStateItemStore<>(ReducedStateItemStore.NAME_KEY_STATE, Pod.class, "metadata.ownerReferences")) + .removeNamespaceIndex() + .addEventHandler(handler) + .run(); + + assertTrue(addLatch.await(10, TimeUnit.SECONDS)); + assertTrue(informer.getIndexer().getIndexers().isEmpty()); + // still using the namespace key + assertEquals(Arrays.asList("test/pod1"), informer.getStore().listKeys()); + Pod byKey = informer.getStore().getByKey("test/pod1"); + assertEquals("pod1", byKey.getMetadata().getName()); + assertEquals("test", byKey.getMetadata().getNamespace()); + assertEquals("1", byKey.getMetadata().getResourceVersion()); + assertEquals(1, byKey.getMetadata().getOwnerReferences().size()); + assertNull(byKey.getSpec()); + + informer.stop(); + } + }