Skip to content

Commit

Permalink
fix fabric8io#3587 fabric8io#3472 adding more control over indexing, …
Browse files Browse the repository at this point in the history
…key function, storage
  • Loading branch information
shawkins committed Mar 7, 2022
1 parent 8e453ab commit 40da2f0
Show file tree
Hide file tree
Showing 21 changed files with 621 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformer;
import io.fabric8.kubernetes.client.informers.cache.Store;

import java.util.List;
import java.util.Map;
Expand All @@ -29,19 +30,21 @@
import java.util.function.Predicate;

public interface Informable<T> {

/**
* The indexers to add to {@link SharedInformer}s created by subsequent inform calls;
* @param indexers to customize the indexing
* @return the current {@link Informable}
* @deprecated please use methods on the {@link SharedIndexInformer} to add/remove indexes
*/
@Deprecated
Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Set the limit to the number of resources to list at one time. This means that longer
* lists will take multiple requests to fetch.
* <p>If the list fails to complete it will be re-attempted with this limit, rather than
* falling back to the full list. You should ensure that your handlers are either async or
* falling back to the full list. You should ensure that your handlers are either async or
* fast acting to prevent long delays in list processing that may cause results to expire
* before reaching the end of the list.
* <p>WARNING As noted in the go client: "paginated lists are always served directly from
Expand All @@ -51,46 +54,46 @@ public interface Informable<T> {
* @return the current {@link Informable}
*/
Informable<T> withLimit(Long limit);

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
* <p>This returned informer will not support resync.
* <p>This call will be blocking for the initial list and watch.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
*
* @return a running {@link SharedIndexInformer}
*/
default SharedIndexInformer<T> inform() {
return inform(null, 0);
}

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
* <p>This returned informer will not support resync.
* <p>This call will be blocking for the initial list and watch.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
*
* @param handler to notify
* @return a running {@link SharedIndexInformer}
*/
default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
return inform(handler, 0);
}

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
* <p>This call will be blocking for the initial list and watch.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
*
* @param handler to notify
* @param resync the resync period or 0 for no resync
* @return a running {@link SharedIndexInformer}
Expand All @@ -108,7 +111,7 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
* @return a non-running {@link SharedIndexInformer}
*/
SharedIndexInformer<T> runnableInformer(long resync);

/**
* Return a {@link Future} when the list at this context satisfies the given {@link Predicate}.
* The predicate will be tested against the state of the underlying informer store on every event.
Expand All @@ -118,4 +121,40 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
*/
CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition);

/**
* Use uid instead of namespace/name as the key function.
* <p>
* When you perform {@link Store} operations such as get, listKeys, or getByKey it will
* be based upon the uid.
*
* @return the current {@link Informable}
*/
Informable<T> withUidStoreKeyFunction();

/**
* By default an informer stores every item that exists in memory.
* If that is too much memory for your application,
* you may instead reduce what is actually stored by specifying the
* fields here.
* <p>
* metadata.resourceVersion - will automatically be saved as will
* the necessary key fields.
* <p>
* If you are using custom indexers, then the fields used by those
* must be stored as well.
* <p>
* For example in level event handling systems all you may need beyond the
* key is the ownerReferences. You would use withValueFields("metadata.ownerReferences")
* for that.
* <p>
* NOTE: If you use this feature, you should only use the informer cache/store for basic
* existence checks and maintain your own cache of full resource objects.
* <p>
* Only simple names are allowed in field paths - '.' is reserved as the separator.
*
* @param fields to be held in the cache
* @return the current {@link Informable}
*/
Informable<T> withStoreValueFields(String... fields);

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ public interface ExtensibleResource<T> extends Resource<T> {
@Override
ExtensibleResource<T> withLimit(Long limit);

@Override
ExtensibleResource<T> withUidStoreKeyFunction();

@Override
ExtensibleResource<T> withStoreValueFields(String... fields);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@ public ExtensibleResource<T> withLimit(Long limit) {
return new ExtensibleResourceAdapter<>(resource.withLimit(limit), client);
}

@Override
public ExtensibleResource<T> withUidStoreKeyFunction() {
return new ExtensibleResourceAdapter<>(resource.withUidStoreKeyFunction(), client);
}

@Override
public ExtensibleResource<T> withStoreValueFields(String... fields) {
return new ExtensibleResourceAdapter<>(resource.withStoreValueFields(fields), client);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,14 @@ public CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition
return resource.informOnCondition(condition);
}

@Override
public Informable<T> withUidStoreKeyFunction() {
return resource.withUidStoreKeyFunction();
}

@Override
public Informable<T> withStoreValueFields(String... fields) {
return resource.withStoreValueFields(fields);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Indexer;

import java.util.List;
Expand All @@ -33,6 +34,23 @@ public interface SharedIndexInformer<T> extends SharedInformer<T> {
*/
void addIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Remove the namesapce index
*
* @return this
*/
default SharedIndexInformer<T> removeNamespaceIndex() {
return removeIndexer(Cache.NAMESPACE_INDEX);
}

/**
* Remove the named index
*
* @param name
* @return this
*/
SharedIndexInformer<T> removeIndexer(String name);

/**
* returns the internal indexer store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.utils.ReflectUtils;
import io.fabric8.kubernetes.client.utils.Utils;
Expand Down Expand Up @@ -63,6 +64,14 @@ public static String metaNamespaceKeyFunc(Object obj) {
}
}

public static String metaUidKeyFunc(HasMetadata obj) {
if( obj == null || obj.getMetadata() == null) {
return "";
}
String result = obj.getMetadata().getUid();
return Utils.getNonNullOrElse(result, "");
}

/**
* Default index function that indexes based on an object's namespace and name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,12 @@ public interface Indexer<T> extends Store<T> {
* @param indexers indexers to add
*/
void addIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Remove the named index
*
* @param name
* @return this
*/
void removeIndexer(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void addIndexers(Map<String, Function<T, List<String>>> indexers) {

}

@Override
public void removeIndexer(String name) {

}

@Override
public List<T> list() {
return map.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
import io.fabric8.kubernetes.client.informers.impl.cache.CacheImpl;
import io.fabric8.kubernetes.client.readiness.Readiness;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.Serialization;
Expand Down Expand Up @@ -96,9 +98,7 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
protected String apiVersion;

protected Class<L> listType;
// informable state
private Map<String, Function<T, List<String>>> indexers;
private Long limit;
protected InformableContext<T> informableContext = InformableContext.<T>builder().build();

protected BaseOperation(OperationContext ctx) {
super(ctx);
Expand Down Expand Up @@ -931,22 +931,20 @@ public ExtensibleResource<T> dryRun(boolean isDryRun) {
@Override
public ExtensibleResource<T> withIndexers(Map<String, Function<T, List<String>>> indexers) {
BaseOperation<T, L, R> result = newInstance(context);
result.indexers = indexers;
result.limit = this.limit;
result.informableContext = informableContext.toBuilder().indexers(indexers).build();
return result;
}

@Override
public BaseOperation<T, L, R> withLimit(Long limit) {
BaseOperation<T, L, R> result = newInstance(context);
result.indexers = this.indexers;
result.limit = limit;
result.informableContext = informableContext.toBuilder().limit(limit).build();
return result;
}

@Override
public Long getLimit() {
return this.limit;
return this.informableContext.getLimit();
}

@Override
Expand All @@ -973,9 +971,11 @@ private DefaultSharedIndexInformer<T, L> createInformer(long resync) {
}

// use the local context / namespace but without a resourceVersion
DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(), this.withResourceVersion(null).withLimit(this.limit), resync, Runnable::run); // just run the event notification in the websocket thread
if (indexers != null) {
informer.addIndexers(indexers);
DefaultSharedIndexInformer<T, L> informer = new DefaultSharedIndexInformer<>(getType(),
this.withResourceVersion(null).withLimit(this.informableContext.getLimit()), resync, Runnable::run,
new CacheImpl<>(informableContext)); // just run the event notification in the websocket thread
if (informableContext.getIndexers() != null) {
informer.addIndexers(informableContext.getIndexers());
}
return informer;
}
Expand Down Expand Up @@ -1017,6 +1017,20 @@ public static URL appendListOptionParams(URL base, ListOptions listOptions) {
}
return urlBuilder.build();
}

@Override
public BaseOperation<T, L, R> withUidStoreKeyFunction() {
BaseOperation<T, L, R> result = newInstance(context);
result.informableContext = informableContext.toBuilder().uidKey(true).keyFunction(Cache::metaUidKeyFunc).build();
return result;
}

@Override
public BaseOperation<T, L, R> withStoreValueFields(String... fields) {
BaseOperation<T, L, R> result = newInstance(context);
result.informableContext = informableContext.toBuilder().fields(fields).typeClass(this.type).build();
return result;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.client.informers.cache.Cache;
import lombok.Builder;
import lombok.Getter;

import java.util.List;
import java.util.Map;
import java.util.function.Function;

@Builder(toBuilder = true)
@Getter
public class InformableContext<T> {

private Map<String, Function<T, List<String>>> indexers;
private Long limit;
@Builder.Default
private Function<T, String> keyFunction = Cache::metaNamespaceKeyFunc;
private boolean uidKey;
private String[] fields;
private Class<T> typeClass;

}
Loading

0 comments on commit 40da2f0

Please sign in to comment.