From 24d77a3408c167004d1d890a2d9c0dcbbd998a61 Mon Sep 17 00:00:00 2001 From: sepgh Date: Sun, 2 Jun 2024 17:06:04 +0330 Subject: [PATCH] feat: implement CachedIndexStorageManagerDecorator --- .../CachedIndexStorageManagerDecorator.java | 113 ++++++++++++++ .../storage/IndexStorageManagerDecorator.java | 53 +++++++ ...eBPlusTreeIndexManagerReadingTestCase.java | 140 ++++++++++++++++++ 3 files changed, 306 insertions(+) create mode 100644 src/main/java/com/github/sepgh/internal/storage/CachedIndexStorageManagerDecorator.java create mode 100644 src/main/java/com/github/sepgh/internal/storage/IndexStorageManagerDecorator.java create mode 100644 src/test/java/com/github/sepgh/internal/index/tree/reading/CacheBPlusTreeIndexManagerReadingTestCase.java diff --git a/src/main/java/com/github/sepgh/internal/storage/CachedIndexStorageManagerDecorator.java b/src/main/java/com/github/sepgh/internal/storage/CachedIndexStorageManagerDecorator.java new file mode 100644 index 0000000..b4eed1f --- /dev/null +++ b/src/main/java/com/github/sepgh/internal/storage/CachedIndexStorageManagerDecorator.java @@ -0,0 +1,113 @@ +package com.github.sepgh.internal.storage; + +import com.github.sepgh.internal.index.Pointer; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +public class CachedIndexStorageManagerDecorator extends IndexStorageManagerDecorator { + private final Cache cache; + private final Map rootCache = new HashMap<>(); + + public CachedIndexStorageManagerDecorator(IndexStorageManager decorated, int maxSize) { + this(decorated, CacheBuilder.newBuilder().maximumSize(maxSize).initialCapacity(10).build()); + } + + public CachedIndexStorageManagerDecorator(IndexStorageManager decorated, Cache cache) { + super(decorated); + this.cache = cache; + } + + public CompletableFuture> getRoot(int table) throws InterruptedException { + + synchronized (rootCache){ + NodeData nodeData = rootCache.get(table); + if (nodeData != null){ + return CompletableFuture.completedFuture(Optional.of(nodeData)); + } + } + + return super.getRoot(table).whenComplete((optionalNodeData, throwable) -> { + synchronized (rootCache) { + if (throwable != null && optionalNodeData.isPresent()) + rootCache.put(table, optionalNodeData.get()); + } + }); + } + + public CompletableFuture readNode(int table, long position, int chunk) throws InterruptedException { + NodeData optionalNodeData = cache.getIfPresent(new TablePointer(table, new Pointer(Pointer.TYPE_NODE, position, chunk))); + if (optionalNodeData != null){ + return CompletableFuture.completedFuture(optionalNodeData); + } + return super.readNode(table, position, chunk).whenComplete((nodeData, throwable) -> { + if (throwable == null){ + cache.put(new TablePointer(table, nodeData.pointer()), nodeData); + } + }); + } + + public CompletableFuture writeNewNode(int table, byte[] data, boolean isRoot) throws IOException, ExecutionException, InterruptedException { + return super.writeNewNode(table, data, isRoot).whenComplete((nodeData, throwable) -> { + if (throwable == null){ + cache.put(new TablePointer(table, nodeData.pointer()), nodeData); + synchronized (rootCache) { + if (isRoot) + rootCache.put(table, nodeData); + } + } + }); + } + + public CompletableFuture updateNode(int table, byte[] data, Pointer pointer, boolean root) throws IOException, InterruptedException { + return super.updateNode(table, data, pointer, root).whenComplete((integer, throwable) -> { + if (throwable == null) { + NodeData nodeData = new NodeData(pointer, data); + cache.put(new TablePointer(table, pointer), nodeData); + synchronized (rootCache) { + if (root) + rootCache.put(table, nodeData); + } + } + }); + } + + public void close() throws IOException { + this.cache.invalidateAll(); + super.close(); + } + + public CompletableFuture removeNode(int table, Pointer pointer) throws InterruptedException { + return super.removeNode(table, pointer).whenComplete((integer, throwable) -> { + cache.invalidate(new TablePointer(table, pointer)); + synchronized (rootCache){ + if (rootCache.get(table).pointer().equals(pointer)){ + rootCache.remove(table); + } + } + }); + } + + public record TablePointer(int table, Pointer pointer) { + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TablePointer that = (TablePointer) o; + return table == that.table && Objects.equals(pointer, that.pointer); + } + + @Override + public int hashCode() { + return Objects.hash(table, pointer); + } + } +} diff --git a/src/main/java/com/github/sepgh/internal/storage/IndexStorageManagerDecorator.java b/src/main/java/com/github/sepgh/internal/storage/IndexStorageManagerDecorator.java new file mode 100644 index 0000000..723ad9e --- /dev/null +++ b/src/main/java/com/github/sepgh/internal/storage/IndexStorageManagerDecorator.java @@ -0,0 +1,53 @@ +package com.github.sepgh.internal.storage; + +import com.github.sepgh.internal.index.Pointer; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class IndexStorageManagerDecorator implements IndexStorageManager { + protected final IndexStorageManager decorated; + + public IndexStorageManagerDecorator(IndexStorageManager decorated) { + this.decorated = decorated; + } + + public CompletableFuture> getRoot(int table) throws InterruptedException { + return this.decorated.getRoot(table); + } + + public byte[] getEmptyNode() { + return this.decorated.getEmptyNode(); + } + + public CompletableFuture readNode(int table, Pointer pointer) throws InterruptedException { + return this.readNode(table, pointer.getPosition(), pointer.getChunk()); + } + public CompletableFuture readNode(int table, long position, int chunk) throws InterruptedException { + return this.decorated.readNode(table, position, chunk); + } + + public CompletableFuture writeNewNode(int table, byte[] data, boolean isRoot) throws IOException, ExecutionException, InterruptedException { + return this.decorated.writeNewNode(table, data, isRoot); + } + + public CompletableFuture writeNewNode(int table, byte[] data) throws IOException, ExecutionException, InterruptedException { + return this.writeNewNode(table, data, false); + } + public CompletableFuture updateNode(int table, byte[] data, Pointer pointer) throws IOException, InterruptedException { + return this.decorated.updateNode(table, data, pointer, false); + } + public CompletableFuture updateNode(int table, byte[] data, Pointer pointer, boolean root) throws IOException, InterruptedException { + return this.decorated.updateNode(table, data, pointer, root); + } + + public void close() throws IOException { + this.decorated.close(); + } + + public CompletableFuture removeNode(int table, Pointer pointer) throws InterruptedException { + return this.decorated.removeNode(table, pointer); + } +} diff --git a/src/test/java/com/github/sepgh/internal/index/tree/reading/CacheBPlusTreeIndexManagerReadingTestCase.java b/src/test/java/com/github/sepgh/internal/index/tree/reading/CacheBPlusTreeIndexManagerReadingTestCase.java new file mode 100644 index 0000000..36d7cb1 --- /dev/null +++ b/src/test/java/com/github/sepgh/internal/index/tree/reading/CacheBPlusTreeIndexManagerReadingTestCase.java @@ -0,0 +1,140 @@ +package com.github.sepgh.internal.index.tree.reading; + +import com.github.sepgh.internal.EngineConfig; +import com.github.sepgh.internal.index.IndexManager; +import com.github.sepgh.internal.index.Pointer; +import com.github.sepgh.internal.index.TableLevelAsyncIndexManagerDecorator; +import com.github.sepgh.internal.index.tree.BPlusTreeIndexManager; +import com.github.sepgh.internal.storage.CachedIndexStorageManagerDecorator; +import com.github.sepgh.internal.storage.CompactFileIndexStorageManager; +import com.github.sepgh.internal.storage.InMemoryHeaderManager; +import com.github.sepgh.internal.storage.IndexStorageManager; +import com.github.sepgh.internal.storage.header.Header; +import com.github.sepgh.internal.storage.header.HeaderManager; +import org.junit.jupiter.api.*; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static com.github.sepgh.internal.storage.CompactFileIndexStorageManager.INDEX_FILE_NAME; + +public class CacheBPlusTreeIndexManagerReadingTestCase { + private Path dbPath; + private EngineConfig engineConfig; + private Header header; + private int degree = 4; + private Path indexPath; + + @BeforeEach + public void setUp() throws IOException { + dbPath = Files.createTempDirectory("TEST_CacheBPlusTreeIndexManagerReadingTestCase"); + engineConfig = EngineConfig.builder() + .bTreeDegree(degree) + .bTreeGrowthNodeAllocationCount(2) + .build(); + engineConfig.setBTreeMaxFileSize(12L * engineConfig.getPaddedSize()); + + byte[] writingBytes = new byte[]{}; + indexPath = Path.of(dbPath.toString(), String.format("%s.%d", INDEX_FILE_NAME, 0)); + Files.write(indexPath, writingBytes, StandardOpenOption.WRITE, StandardOpenOption.CREATE); + + header = Header.builder() + .database("sample") + .tables( + Collections.singletonList( + Header.Table.builder() + .id(1) + .name("test") + .chunks( + Collections.singletonList( + Header.IndexChunk.builder() + .chunk(0) + .offset(0) + .build() + ) + ) + .root( + Header.IndexChunk.builder() + .chunk(0) + .offset(0) + .build() + ) + .initialized(true) + .build() + ) + ) + .build(); + + Assertions.assertTrue(header.getTableOfId(1).isPresent()); + Assertions.assertTrue(header.getTableOfId(1).get().getIndexChunk(0).isPresent()); + } + + public boolean destroy() throws IOException { + return new File(indexPath.toString()).delete(); + } + + @Test + @Timeout(value = 2) + public void findIndexSuccessfully() throws IOException, ExecutionException, InterruptedException { + HeaderManager headerManager = new InMemoryHeaderManager(header); + IndexStorageManager indexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig); + indexStorageManager = new CachedIndexStorageManagerDecorator(indexStorageManager, 12); + + IndexManager indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, indexStorageManager)); + Pointer dataPointer = new Pointer(Pointer.TYPE_DATA, 100, 0); + + for (int i = 0; i < 20; i++){ + indexManager.addIndex(1, i, dataPointer); + } + Optional optionalPointer = indexManager.getIndex(1, 10); + + Assertions.assertTrue(optionalPointer.isPresent()); + Assertions.assertEquals(dataPointer, optionalPointer.get()); + + Assertions.assertTrue(destroy()); + optionalPointer = indexManager.getIndex(1, 10); + Assertions.assertEquals(dataPointer, optionalPointer.get()); + + indexStorageManager.close(); + + indexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig); + indexStorageManager = new CachedIndexStorageManagerDecorator(indexStorageManager, 12); + + indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, indexStorageManager)); + optionalPointer = indexManager.getIndex(1, 10); + System.out.println(optionalPointer); + Assertions.assertFalse(optionalPointer.isPresent()); + + } + + @Test + @Timeout(value = 2) + public void findIndexFailure() throws IOException, ExecutionException, InterruptedException { + HeaderManager headerManager = new InMemoryHeaderManager(header); + IndexStorageManager indexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig); + indexStorageManager = new CachedIndexStorageManagerDecorator(indexStorageManager, 12); + + IndexManager indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, indexStorageManager)); + Pointer dataPointer = new Pointer(Pointer.TYPE_DATA, 100, 0); + + for (int i = 0; i < 20; i++){ + indexManager.addIndex(1, i, dataPointer); + } + + // Forcing cache to be created + Optional optionalPointer = indexManager.getIndex(1, 100); + Assertions.assertFalse(optionalPointer.isPresent()); + + // Removing file and checking if we can still find index + Assertions.assertTrue(destroy()); + optionalPointer = indexManager.getIndex(1, 100); + Assertions.assertFalse(optionalPointer.isPresent()); + } +}