Skip to content

Commit

Permalink
File System Caching for remote store
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmad AbuKhalil <abukhali@amazon.com>
  • Loading branch information
aabukhalil committed Jan 25, 2023
1 parent ac18db7 commit b940bb0
Show file tree
Hide file tree
Showing 37 changed files with 3,028 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,8 @@ public void testStrictWeightedRouting() throws Exception {

/**
* Assert that preference based search works with non-strict weighted shard routing
* @throws Exception
*/
public void testPreferenceSearchWithWeightedRouting() throws Exception {
public void testPreferenceSearchWithWeightedRouting() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
Expand Down
20 changes: 9 additions & 11 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -513,17 +513,15 @@ private void put(K key, V value, long now) {
promote(tuple.v1(), now);
}
if (replaced) {
removalListener.onRemoval(
new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalNotification.RemovalReason.REPLACED)
);
removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
}
}

private final Consumer<CompletableFuture<Entry<K, V>>> invalidationConsumer = f -> {
try {
Entry<K, V> entry = f.get();
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
delete(entry, RemovalReason.INVALIDATED);
}
} catch (ExecutionException e) {
// ok
Expand All @@ -534,7 +532,7 @@ private void put(K key, V value, long now) {

/**
* Invalidate the association for the specified key. A removal notification will be issued for invalidated
* entries with {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
* entries with {@link RemovalReason} INVALIDATED.
*
* @param key the key whose mapping is to be invalidated from the cache
*/
Expand All @@ -546,7 +544,7 @@ public void invalidate(K key) {
/**
* Invalidate the entry for the specified key and value. If the value provided is not equal to the value in
* the cache, no removal will occur. A removal notification will be issued for invalidated
* entries with {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
* entries with {@link RemovalReason} INVALIDATED.
*
* @param key the key whose mapping is to be invalidated from the cache
* @param value the expected value that should be associated with the key
Expand All @@ -558,7 +556,7 @@ public void invalidate(K key, V value) {

/**
* Invalidate all cache entries. A removal notification will be issued for invalidated entries with
* {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
* {@link RemovalReason} INVALIDATED.
*/
public void invalidateAll() {
Entry<K, V> h;
Expand Down Expand Up @@ -589,7 +587,7 @@ public void invalidateAll() {
}
}
while (h != null) {
removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED));
removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalReason.INVALIDATED));
h = h.after;
}
}
Expand Down Expand Up @@ -707,7 +705,7 @@ public void remove() {
segment.remove(entry.key, entry.value, f -> {});
try (ReleasableLock ignored = lruLock.acquire()) {
current = null;
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
delete(entry, RemovalReason.INVALIDATED);
}
}
}
Expand Down Expand Up @@ -796,10 +794,10 @@ private void evictEntry(Entry<K, V> entry) {
if (segment != null) {
segment.remove(entry.key, entry.value, f -> {});
}
delete(entry, RemovalNotification.RemovalReason.EVICTED);
delete(entry, RemovalReason.EVICTED);
}

private void delete(Entry<K, V> entry, RemovalNotification.RemovalReason removalReason) {
private void delete(Entry<K, V> entry, RemovalReason removalReason) {
assert lruLock.isHeldByCurrentThread();

if (unlink(entry)) {
Expand Down
41 changes: 41 additions & 0 deletions server/src/main/java/org/opensearch/common/cache/CacheUsage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

import org.opensearch.common.cache.refcounted.RefCountedCache;

/**
* Usage metrics for {@link RefCountedCache}
*
* @opensearch.internal
*/
public class CacheUsage {
/**
* Cache usage of the system
*/
private final long usage;

/**
* Cache usage by entries which are referenced
*/
private final long activeUsage;

public CacheUsage(long usage, long activeUsage) {
this.usage = usage;
this.activeUsage = activeUsage;
}

public long usage() {
return usage;
}

public long activeUsage() {
return activeUsage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,6 @@
* @opensearch.internal
*/
public class RemovalNotification<K, V> {
/**
* Reason for notification removal
*
* @opensearch.internal
*/
public enum RemovalReason {
REPLACED,
INVALIDATED,
EVICTED
}

private final K key;
private final V value;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Reason for notification removal
*
* @opensearch.internal
*/
public enum RemovalReason {
REPLACED,
INVALIDATED,
EVICTED,
EXPLICIT,
CAPACITY
}
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/common/cache/Weigher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* A class that can determine the weight of a value. The total weight threshold
* is used to determine when an eviction is required.
*
* @opensearch.internal
*/
public interface Weigher<V> {

/**
* Measures an object's weight to determine how many units of capacity that
* the value consumes. A value must consume a minimum of one unit.
*
* @param value the object to weigh
* @return the object's weight
*/
long weightOf(V value);
}
Loading

0 comments on commit b940bb0

Please sign in to comment.