Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Stats rework (3/4): Exposing new cache stats API #13237

Merged
merged 23 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Tiered Caching] Expose new cache stats API ([#13237](https://github.com/opensearch-project/OpenSearch/pull/13237))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Tiered Caching] Gate new stats logic behind FeatureFlags.PLUGGABLE_CACHE ([#13238](https://github.com/opensearch-project/OpenSearch/pull/13238))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -90,6 +91,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -642,12 +645,47 @@
return Collections.emptyMap();
}
Map<K, V> map = new HashMap<>(size);
readIntoMap(keyReader, valueReader, map, size);
return map;
}

/**
* Read a serialized map into a SortedMap using the default ordering for the keys. If the result is empty it might be immutable.
*/
public <K extends Comparable<K>, V> SortedMap<K, V> readOrderedMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader)
throws IOException {
return readOrderedMap(keyReader, valueReader, null);
}

/**
* Read a serialized map into a SortedMap, specifying a Comparator for the keys. If the result is empty it might be immutable.
*/
public <K extends Comparable<K>, V> SortedMap<K, V> readOrderedMap(
Writeable.Reader<K> keyReader,
Writeable.Reader<V> valueReader,
@Nullable Comparator<K> keyComparator
) throws IOException {
int size = readArraySize();
if (size == 0) {
return Collections.emptySortedMap();

Check warning on line 670 in libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamInput.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamInput.java#L670

Added line #L670 was not covered by tests
}
SortedMap<K, V> sortedMap;
if (keyComparator == null) {
sortedMap = new TreeMap<>();
} else {
sortedMap = new TreeMap<>(keyComparator);

Check warning on line 676 in libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamInput.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamInput.java#L676

Added line #L676 was not covered by tests
}
readIntoMap(keyReader, valueReader, sortedMap, size);
return sortedMap;
}

private <K, V> void readIntoMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader, Map<K, V> map, int size)
throws IOException {
for (int i = 0; i < size; i++) {
K key = keyReader.read(this);
V value = valueReader.read(this);
map.put(key, value);
}
return map;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void close() throws IOException {
}

@Override
public ImmutableCacheStatsHolder stats() {
public ImmutableCacheStatsHolder stats(String[] levels) {
return null; // TODO: in TSC stats PR
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public ImmutableCacheStatsHolder stats() {
return null;
}

@Override
public ImmutableCacheStatsHolder stats(String[] levels) {
return null;
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
List<String> dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null");
// If this cache is being used, FeatureFlags.PLUGGABLE_CACHE is already on, so we can always use the DefaultCacheStatsHolder.
this.cacheStatsHolder = new DefaultCacheStatsHolder(dimensionNames);
this.cacheStatsHolder = new DefaultCacheStatsHolder(dimensionNames, EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME);
}

@SuppressWarnings({ "rawtypes" })
Expand Down Expand Up @@ -446,12 +446,13 @@
}

/**
* Relevant stats for this cache.
* @return CacheStats
* Relevant stats for this cache, aggregated by levels.
* @param levels The levels to aggregate by.
* @return ImmutableCacheStatsHolder
*/
@Override
public ImmutableCacheStatsHolder stats() {
return cacheStatsHolder.getImmutableCacheStatsHolder();
public ImmutableCacheStatsHolder stats(String[] levels) {
return cacheStatsHolder.getImmutableCacheStatsHolder(levels);
}

/**
Expand Down Expand Up @@ -510,15 +511,15 @@
public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends ByteArrayWrapper> event) {
switch (event.getType()) {
case CREATED:
cacheStatsHolder.incrementEntries(event.getKey().dimensions);
cacheStatsHolder.incrementItems(event.getKey().dimensions);
cacheStatsHolder.incrementSizeInBytes(event.getKey().dimensions, getNewValuePairSize(event));
assert event.getOldValue() == null;
break;
case EVICTED:
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EVICTED)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
cacheStatsHolder.incrementEvictions(event.getKey().dimensions);
assert event.getNewValue() == null;
Expand All @@ -527,15 +528,15 @@
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EXPLICIT)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
assert event.getNewValue() == null;
break;
case EXPIRED:
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.INVALIDATED)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);

Check warning on line 539 in plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java

View check run for this annotation

Codecov / codecov/patch

plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java#L539

Added line #L539 was not covered by tests
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
assert event.getNewValue() == null;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testBasicGetAndPut() throws IOException {
String value = ehcacheTest.get(getICacheKey(entry.getKey()));
assertEquals(entry.getValue(), value);
}
assertEquals(randomKeys, ehcacheTest.stats().getTotalEntries());
assertEquals(randomKeys, ehcacheTest.stats().getTotalItems());
assertEquals(randomKeys, ehcacheTest.stats().getTotalHits());
assertEquals(expectedSize, ehcacheTest.stats().getTotalSizeInBytes());
assertEquals(randomKeys, ehcacheTest.count());
Expand Down Expand Up @@ -217,7 +217,7 @@ public void testConcurrentPut() throws Exception {
assertEquals(entry.getValue(), value);
}
assertEquals(randomKeys, ehcacheTest.count());
assertEquals(randomKeys, ehcacheTest.stats().getTotalEntries());
assertEquals(randomKeys, ehcacheTest.stats().getTotalItems());
ehcacheTest.close();
}
}
Expand Down Expand Up @@ -416,7 +416,7 @@ public String load(ICacheKey<String> key) {
assertEquals(1, numberOfTimesValueLoaded);
assertEquals(0, ((EhcacheDiskCache) ehcacheTest).getCompletableFutureMap().size());
assertEquals(1, ehcacheTest.stats().getTotalMisses());
assertEquals(1, ehcacheTest.stats().getTotalEntries());
assertEquals(1, ehcacheTest.stats().getTotalItems());
assertEquals(numberOfRequest - 1, ehcacheTest.stats().getTotalHits());
assertEquals(1, ehcacheTest.count());
ehcacheTest.close();
Expand Down
Loading
Loading