Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jdconrad committed Apr 4, 2024
1 parent 64fc8a3 commit 19fa7e1
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,22 +254,6 @@ public void validate(ByteSizeValue value, Map<Setting<?>, Object> settings, bool
Setting.Property.NodeScope
);

// used in tests
void computeDecay() {
// TODO: remove this
}

// used in tests
void maybeScheduleDecayAndNewEpoch() {
// TODO: remove this
}

// used in tests
long epoch() {
// TODO: remove this
return -1L;
}

private interface Cache<K, T> extends Releasable {
CacheEntry<T> get(K cacheKey, long fileLength, int region);

Expand Down Expand Up @@ -614,12 +598,6 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {

}

// used by tests
int getFreq(CacheFileRegion cacheFileRegion) {
// TODO: remove this
return -1;
}

@Override
public void close() {
sharedBytes.decRef();
Expand Down Expand Up @@ -1337,7 +1315,7 @@ private void pushEntryToFront(final LRUCacheEntry entry) {
front.head = entry;
front.tail = entry;
} else {
entry.next = front.head.next;
entry.next = front.head;
front.head.prev = entry;
front.head = entry;
}
Expand Down Expand Up @@ -1376,8 +1354,7 @@ private synchronized boolean invariant(final LRUCacheEntry e, boolean present) {
private boolean assertChunkActiveOrEvicted(LRUCacheEntry entry) {
synchronized (SharedBlobCacheService.this) {
// assert linked (or evicted)
assert entry.prev != null || entry.chunk.isEvicted();

assert entry.list != null || entry.chunk.isEvicted();
}
SharedBytes.IO io = entry.chunk.io;
assert io != null || entry.chunk.isEvicted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
Expand Down Expand Up @@ -251,174 +249,6 @@ public void testForceEvictResponse() throws IOException {
}
}

public void testDecay() throws IOException {
// we have 8 regions
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(400)).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
var cacheService = new SharedBlobCacheService<>(
environment,
settings,
taskQueue.getThreadPool(),
ThreadPool.Names.GENERIC,
BlobCacheMetrics.NOOP
)
) {
assertEquals(4, cacheService.freeRegionCount());

final var cacheKey1 = generateCacheKey();
final var cacheKey2 = generateCacheKey();
final var cacheKey3 = generateCacheKey();
// add a region that we can evict when provoking first decay
cacheService.get("evictkey", size(250), 0);
assertEquals(3, cacheService.freeRegionCount());
final var region0 = cacheService.get(cacheKey1, size(250), 0);
assertEquals(2, cacheService.freeRegionCount());
final var region1 = cacheService.get(cacheKey2, size(250), 1);
assertEquals(1, cacheService.freeRegionCount());
final var region2 = cacheService.get(cacheKey3, size(250), 1);
assertEquals(0, cacheService.freeRegionCount());

assertEquals(1, cacheService.getFreq(region0));
assertEquals(1, cacheService.getFreq(region1));
assertEquals(1, cacheService.getFreq(region2));
AtomicLong expectedEpoch = new AtomicLong();
Runnable triggerDecay = () -> {
assertThat(taskQueue.hasRunnableTasks(), is(false));
cacheService.get(expectedEpoch.toString(), size(250), 0);
assertThat(taskQueue.hasRunnableTasks(), is(true));
taskQueue.runAllRunnableTasks();
assertThat(cacheService.epoch(), equalTo(expectedEpoch.incrementAndGet()));
};

triggerDecay.run();

cacheService.get(cacheKey1, size(250), 0);
cacheService.get(cacheKey2, size(250), 1);
cacheService.get(cacheKey3, size(250), 1);

triggerDecay.run();

final var region0Again = cacheService.get(cacheKey1, size(250), 0);
assertSame(region0Again, region0);
assertEquals(3, cacheService.getFreq(region0));
assertEquals(1, cacheService.getFreq(region1));
assertEquals(1, cacheService.getFreq(region2));

triggerDecay.run();

cacheService.get(cacheKey1, size(250), 0);
assertEquals(4, cacheService.getFreq(region0));
cacheService.get(cacheKey1, size(250), 0);
assertEquals(4, cacheService.getFreq(region0));
assertEquals(0, cacheService.getFreq(region1));
assertEquals(0, cacheService.getFreq(region2));

// ensure no freq=0 entries
cacheService.get(cacheKey2, size(250), 1);
cacheService.get(cacheKey3, size(250), 1);
assertEquals(2, cacheService.getFreq(region1));
assertEquals(2, cacheService.getFreq(region2));

triggerDecay.run();

assertEquals(3, cacheService.getFreq(region0));
assertEquals(1, cacheService.getFreq(region1));
assertEquals(1, cacheService.getFreq(region2));

triggerDecay.run();
assertEquals(2, cacheService.getFreq(region0));
assertEquals(0, cacheService.getFreq(region1));
assertEquals(0, cacheService.getFreq(region2));

// ensure no freq=0 entries
cacheService.get(cacheKey2, size(250), 1);
cacheService.get(cacheKey3, size(250), 1);
assertEquals(2, cacheService.getFreq(region1));
assertEquals(2, cacheService.getFreq(region2));

triggerDecay.run();
assertEquals(1, cacheService.getFreq(region0));
assertEquals(1, cacheService.getFreq(region1));
assertEquals(1, cacheService.getFreq(region2));

triggerDecay.run();
assertEquals(0, cacheService.getFreq(region0));
assertEquals(0, cacheService.getFreq(region1));
assertEquals(0, cacheService.getFreq(region2));
}
}

/**
* Test when many objects need to decay, in particular useful to measure how long the decay task takes.
* For 1M objects (with no assertions) it took 26ms locally.
*/
public void testMassiveDecay() throws IOException {
int regions = 1024; // to measure decay time, increase to 1024*1024 and disable assertions.
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(regions)).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(1)).getStringRep())
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
var cacheService = new SharedBlobCacheService<>(
environment,
settings,
taskQueue.getThreadPool(),
ThreadPool.Names.GENERIC,
BlobCacheMetrics.NOOP
)
) {
Runnable decay = () -> {
assertThat(taskQueue.hasRunnableTasks(), is(true));
long before = System.currentTimeMillis();
taskQueue.runAllRunnableTasks();
long after = System.currentTimeMillis();
logger.debug("took {} ms", (after - before));
};
long fileLength = size(regions + 100);
Object cacheKey = new Object();
for (int i = 0; i < regions; ++i) {
cacheService.get(cacheKey, fileLength, i);
if (Integer.bitCount(i) == 1) {
logger.debug("did {} gets", i);
}
}
assertThat(taskQueue.hasRunnableTasks(), is(false));
cacheService.get(cacheKey, fileLength, regions);
decay.run();
int maxRounds = 5;
for (int round = 2; round <= maxRounds; ++round) {
for (int i = round; i < regions + round; ++i) {
cacheService.get(cacheKey, fileLength, i);
if (Integer.bitCount(i) == 1) {
logger.debug("did {} gets", i);
}
}
decay.run();
}

Map<Integer, Integer> freqs = new HashMap<>();
for (int i = maxRounds; i < regions + maxRounds; ++i) {
int freq = cacheService.getFreq(cacheService.get(cacheKey, fileLength, i)) - 2;
freqs.compute(freq, (k, v) -> v == null ? 1 : v + 1);
if (Integer.bitCount(i) == 1) {
logger.debug("did {} gets", i);
}
}
assertThat(freqs.get(4), equalTo(regions - maxRounds + 1));
}
}

/**
* Exercise SharedBlobCacheService#get in multiple threads to trigger any assertion errors.
* @throws IOException
Expand Down Expand Up @@ -829,8 +659,8 @@ public void testCacheSizeChanges() throws IOException {
}
}

public void testMaybeEvictLeastUsed() throws Exception {
final int numRegions = 10;
public void testMaybeEvictRecentUsed() throws Exception {
final int numRegions = 20;
final long regionSize = size(1L);
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
Expand All @@ -839,7 +669,6 @@ public void testMaybeEvictLeastUsed() throws Exception {
.put("path.home", createTempDir())
.build();

final AtomicLong relativeTimeInMillis = new AtomicLong(0L);
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
Expand All @@ -852,8 +681,6 @@ public void testMaybeEvictLeastUsed() throws Exception {
BlobCacheMetrics.NOOP
)
) {
final Map<Object, SharedBlobCacheService<Object>.CacheFileRegion> cacheEntries = new HashMap<>();

assertThat("All regions are free", cacheService.freeRegionCount(), equalTo(numRegions));
assertThat("Cache has no entries", cacheService.maybeEvictLeastRecent(), is(false));

Expand All @@ -867,53 +694,15 @@ public void testMaybeEvictLeastUsed() throws Exception {
taskQueue.getThreadPool().generic(),
ActionListener.noop()
);
assertThat(cacheService.getFreq(entry), equalTo(1));
cacheEntries.put(cacheKey, entry);
}

assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastRecent(), is(false));
assertThat("Last entry has other references", cacheService.maybeEvictLeastRecent(), is(false));

taskQueue.runAllRunnableTasks();

assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastRecent(), is(false));

cacheService.maybeScheduleDecayAndNewEpoch();
taskQueue.runAllRunnableTasks();

cacheEntries.keySet().forEach(key -> cacheService.get(key, regionSize, 0));
cacheService.maybeScheduleDecayAndNewEpoch();
taskQueue.runAllRunnableTasks();

// touch some random cache entries
var usedCacheKeys = Set.copyOf(randomSubsetOf(cacheEntries.keySet()));
usedCacheKeys.forEach(key -> cacheService.get(key, regionSize, 0));

cacheEntries.forEach(
(key, entry) -> assertThat(cacheService.getFreq(entry), usedCacheKeys.contains(key) ? equalTo(3) : equalTo(1))
);

assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastRecent(), is(false));

cacheService.maybeScheduleDecayAndNewEpoch();
taskQueue.runAllRunnableTasks();

assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
cacheEntries.forEach(
(key, entry) -> assertThat(cacheService.getFreq(entry), usedCacheKeys.contains(key) ? equalTo(2) : equalTo(0))
);

var zeroFrequencyCacheEntries = cacheEntries.size() - usedCacheKeys.size();
for (int i = 0; i < zeroFrequencyCacheEntries; i++) {
assertThat(cacheService.freeRegionCount(), equalTo(i));
assertThat("Cache entry is old enough to be evicted", cacheService.maybeEvictLeastRecent(), is(true));
assertThat(cacheService.freeRegionCount(), equalTo(i + 1));
}

assertThat("No more cache entries old enough to be evicted", cacheService.maybeEvictLeastRecent(), is(false));
assertThat(cacheService.freeRegionCount(), equalTo(zeroFrequencyCacheEntries));
assertThat("Cache should evict last entry", cacheService.maybeEvictLeastRecent(), is(true));
}
}

Expand Down Expand Up @@ -1004,25 +793,6 @@ public void execute(Runnable command) {
assertEquals(1 + remainingFreeRegions, bulkTaskCount.get());
}
{
// cache fully used, no entry old enough to be evicted
assertEquals(0, cacheService.freeRegionCount());
final var cacheKey = generateCacheKey();
final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
cacheService.maybeFetchRegion(
cacheKey,
randomIntBetween(0, 10),
randomLongBetween(1L, regionSize),
(channel, channelPos, relativePos, length, progressUpdater) -> {
throw new AssertionError("should not be executed");
},
future
);
assertThat("Listener is immediately completed", future.isDone(), is(true));
assertThat("Region already exists in cache", future.get(), is(false));
}
{
cacheService.computeDecay();

// fetch one more region should evict an old cache entry
final var cacheKey = generateCacheKey();
assertEquals(0, cacheService.freeRegionCount());
Expand Down

0 comments on commit 19fa7e1

Please sign in to comment.