Skip to content

Commit

Permalink
feat: implement CachedIndexStorageManagerDecorator
Browse files Browse the repository at this point in the history
  • Loading branch information
sepgh committed Jun 2, 2024
1 parent a710971 commit 24d77a3
Show file tree
Hide file tree
Showing 3 changed files with 306 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.github.sepgh.internal.storage;

import com.github.sepgh.internal.index.Pointer;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

public class CachedIndexStorageManagerDecorator extends IndexStorageManagerDecorator {
private final Cache<TablePointer, NodeData> cache;
private final Map<Integer, NodeData> rootCache = new HashMap<>();

public CachedIndexStorageManagerDecorator(IndexStorageManager decorated, int maxSize) {
this(decorated, CacheBuilder.newBuilder().maximumSize(maxSize).initialCapacity(10).build());
}

public CachedIndexStorageManagerDecorator(IndexStorageManager decorated, Cache<TablePointer, NodeData> cache) {
super(decorated);
this.cache = cache;
}

public CompletableFuture<Optional<NodeData>> getRoot(int table) throws InterruptedException {

synchronized (rootCache){
NodeData nodeData = rootCache.get(table);
if (nodeData != null){
return CompletableFuture.completedFuture(Optional.of(nodeData));
}
}

return super.getRoot(table).whenComplete((optionalNodeData, throwable) -> {
synchronized (rootCache) {
if (throwable != null && optionalNodeData.isPresent())
rootCache.put(table, optionalNodeData.get());
}
});
}

public CompletableFuture<NodeData> readNode(int table, long position, int chunk) throws InterruptedException {
NodeData optionalNodeData = cache.getIfPresent(new TablePointer(table, new Pointer(Pointer.TYPE_NODE, position, chunk)));
if (optionalNodeData != null){
return CompletableFuture.completedFuture(optionalNodeData);
}
return super.readNode(table, position, chunk).whenComplete((nodeData, throwable) -> {
if (throwable == null){
cache.put(new TablePointer(table, nodeData.pointer()), nodeData);
}
});
}

public CompletableFuture<NodeData> writeNewNode(int table, byte[] data, boolean isRoot) throws IOException, ExecutionException, InterruptedException {
return super.writeNewNode(table, data, isRoot).whenComplete((nodeData, throwable) -> {
if (throwable == null){
cache.put(new TablePointer(table, nodeData.pointer()), nodeData);
synchronized (rootCache) {
if (isRoot)
rootCache.put(table, nodeData);
}
}
});
}

public CompletableFuture<Integer> updateNode(int table, byte[] data, Pointer pointer, boolean root) throws IOException, InterruptedException {
return super.updateNode(table, data, pointer, root).whenComplete((integer, throwable) -> {
if (throwable == null) {
NodeData nodeData = new NodeData(pointer, data);
cache.put(new TablePointer(table, pointer), nodeData);
synchronized (rootCache) {
if (root)
rootCache.put(table, nodeData);
}
}
});
}

public void close() throws IOException {
this.cache.invalidateAll();
super.close();
}

public CompletableFuture<Integer> removeNode(int table, Pointer pointer) throws InterruptedException {
return super.removeNode(table, pointer).whenComplete((integer, throwable) -> {
cache.invalidate(new TablePointer(table, pointer));
synchronized (rootCache){
if (rootCache.get(table).pointer().equals(pointer)){
rootCache.remove(table);
}
}
});
}

public record TablePointer(int table, Pointer pointer) {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TablePointer that = (TablePointer) o;
return table == that.table && Objects.equals(pointer, that.pointer);
}

@Override
public int hashCode() {
return Objects.hash(table, pointer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.github.sepgh.internal.storage;

import com.github.sepgh.internal.index.Pointer;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class IndexStorageManagerDecorator implements IndexStorageManager {
protected final IndexStorageManager decorated;

public IndexStorageManagerDecorator(IndexStorageManager decorated) {
this.decorated = decorated;
}

public CompletableFuture<Optional<NodeData>> getRoot(int table) throws InterruptedException {
return this.decorated.getRoot(table);
}

public byte[] getEmptyNode() {
return this.decorated.getEmptyNode();
}

public CompletableFuture<NodeData> readNode(int table, Pointer pointer) throws InterruptedException {
return this.readNode(table, pointer.getPosition(), pointer.getChunk());
}
public CompletableFuture<NodeData> readNode(int table, long position, int chunk) throws InterruptedException {
return this.decorated.readNode(table, position, chunk);
}

public CompletableFuture<NodeData> writeNewNode(int table, byte[] data, boolean isRoot) throws IOException, ExecutionException, InterruptedException {
return this.decorated.writeNewNode(table, data, isRoot);
}

public CompletableFuture<NodeData> writeNewNode(int table, byte[] data) throws IOException, ExecutionException, InterruptedException {
return this.writeNewNode(table, data, false);
}
public CompletableFuture<Integer> updateNode(int table, byte[] data, Pointer pointer) throws IOException, InterruptedException {
return this.decorated.updateNode(table, data, pointer, false);
}
public CompletableFuture<Integer> updateNode(int table, byte[] data, Pointer pointer, boolean root) throws IOException, InterruptedException {
return this.decorated.updateNode(table, data, pointer, root);
}

public void close() throws IOException {
this.decorated.close();
}

public CompletableFuture<Integer> removeNode(int table, Pointer pointer) throws InterruptedException {
return this.decorated.removeNode(table, pointer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package com.github.sepgh.internal.index.tree.reading;

import com.github.sepgh.internal.EngineConfig;
import com.github.sepgh.internal.index.IndexManager;
import com.github.sepgh.internal.index.Pointer;
import com.github.sepgh.internal.index.TableLevelAsyncIndexManagerDecorator;
import com.github.sepgh.internal.index.tree.BPlusTreeIndexManager;
import com.github.sepgh.internal.storage.CachedIndexStorageManagerDecorator;
import com.github.sepgh.internal.storage.CompactFileIndexStorageManager;
import com.github.sepgh.internal.storage.InMemoryHeaderManager;
import com.github.sepgh.internal.storage.IndexStorageManager;
import com.github.sepgh.internal.storage.header.Header;
import com.github.sepgh.internal.storage.header.HeaderManager;
import org.junit.jupiter.api.*;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static com.github.sepgh.internal.storage.CompactFileIndexStorageManager.INDEX_FILE_NAME;

public class CacheBPlusTreeIndexManagerReadingTestCase {
private Path dbPath;
private EngineConfig engineConfig;
private Header header;
private int degree = 4;
private Path indexPath;

@BeforeEach
public void setUp() throws IOException {
dbPath = Files.createTempDirectory("TEST_CacheBPlusTreeIndexManagerReadingTestCase");
engineConfig = EngineConfig.builder()
.bTreeDegree(degree)
.bTreeGrowthNodeAllocationCount(2)
.build();
engineConfig.setBTreeMaxFileSize(12L * engineConfig.getPaddedSize());

byte[] writingBytes = new byte[]{};
indexPath = Path.of(dbPath.toString(), String.format("%s.%d", INDEX_FILE_NAME, 0));
Files.write(indexPath, writingBytes, StandardOpenOption.WRITE, StandardOpenOption.CREATE);

header = Header.builder()
.database("sample")
.tables(
Collections.singletonList(
Header.Table.builder()
.id(1)
.name("test")
.chunks(
Collections.singletonList(
Header.IndexChunk.builder()
.chunk(0)
.offset(0)
.build()
)
)
.root(
Header.IndexChunk.builder()
.chunk(0)
.offset(0)
.build()
)
.initialized(true)
.build()
)
)
.build();

Assertions.assertTrue(header.getTableOfId(1).isPresent());
Assertions.assertTrue(header.getTableOfId(1).get().getIndexChunk(0).isPresent());
}

public boolean destroy() throws IOException {
return new File(indexPath.toString()).delete();
}

@Test
@Timeout(value = 2)
public void findIndexSuccessfully() throws IOException, ExecutionException, InterruptedException {
HeaderManager headerManager = new InMemoryHeaderManager(header);
IndexStorageManager indexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig);
indexStorageManager = new CachedIndexStorageManagerDecorator(indexStorageManager, 12);

IndexManager indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, indexStorageManager));
Pointer dataPointer = new Pointer(Pointer.TYPE_DATA, 100, 0);

for (int i = 0; i < 20; i++){
indexManager.addIndex(1, i, dataPointer);
}
Optional<Pointer> optionalPointer = indexManager.getIndex(1, 10);

Assertions.assertTrue(optionalPointer.isPresent());
Assertions.assertEquals(dataPointer, optionalPointer.get());

Assertions.assertTrue(destroy());
optionalPointer = indexManager.getIndex(1, 10);
Assertions.assertEquals(dataPointer, optionalPointer.get());

indexStorageManager.close();

indexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig);
indexStorageManager = new CachedIndexStorageManagerDecorator(indexStorageManager, 12);

indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, indexStorageManager));
optionalPointer = indexManager.getIndex(1, 10);
System.out.println(optionalPointer);
Assertions.assertFalse(optionalPointer.isPresent());

}

@Test
@Timeout(value = 2)
public void findIndexFailure() throws IOException, ExecutionException, InterruptedException {
HeaderManager headerManager = new InMemoryHeaderManager(header);
IndexStorageManager indexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig);
indexStorageManager = new CachedIndexStorageManagerDecorator(indexStorageManager, 12);

IndexManager indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, indexStorageManager));
Pointer dataPointer = new Pointer(Pointer.TYPE_DATA, 100, 0);

for (int i = 0; i < 20; i++){
indexManager.addIndex(1, i, dataPointer);
}

// Forcing cache to be created
Optional<Pointer> optionalPointer = indexManager.getIndex(1, 100);
Assertions.assertFalse(optionalPointer.isPresent());

// Removing file and checking if we can still find index
Assertions.assertTrue(destroy());
optionalPointer = indexManager.getIndex(1, 100);
Assertions.assertFalse(optionalPointer.isPresent());
}
}

0 comments on commit 24d77a3

Please sign in to comment.