Skip to content

Commit

Permalink
feat: implement DBLevelAsyncIndexManagerDecorator
Browse files Browse the repository at this point in the history
  • Loading branch information
sepgh committed Jun 1, 2024
1 parent 1411c5b commit a710971
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.github.sepgh.internal.index;

import com.github.sepgh.internal.index.tree.node.BaseTreeNode;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class DBLevelAsyncIndexManagerDecorator extends IndexManagerDecorator {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
public DBLevelAsyncIndexManagerDecorator(IndexManager indexManager) {
super(indexManager);
}

@Override
public BaseTreeNode addIndex(int table, long identifier, Pointer pointer) throws ExecutionException, InterruptedException, IOException {
writeLock.lock();
try {
return super.addIndex(table, identifier, pointer);
} finally {
writeLock.unlock();
}
}

@Override
public Optional<Pointer> getIndex(int table, long identifier) throws ExecutionException, InterruptedException, IOException {
readLock.lock();
try {
return super.getIndex(table, identifier);
} finally {
readLock.unlock();
}
}

@Override
public boolean removeIndex(int table, long identifier) throws ExecutionException, InterruptedException, IOException {
writeLock.lock();
try {
return super.removeIndex(table, identifier);
} finally {
writeLock.unlock();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.github.sepgh.internal.index.tree;
package com.github.sepgh.internal.index;

import com.github.sepgh.internal.index.IndexManager;
import com.github.sepgh.internal.index.IndexManagerDecorator;
import com.github.sepgh.internal.index.Pointer;
import com.github.sepgh.internal.index.tree.node.BaseTreeNode;

import java.io.IOException;
Expand All @@ -12,9 +9,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class AsyncIndexManagerDecorator extends IndexManagerDecorator {
public class TableLevelAsyncIndexManagerDecorator extends IndexManagerDecorator {
private final Map<Integer, LockManager> lockManagerPool = new ConcurrentHashMap<>();
public AsyncIndexManagerDecorator(IndexManager indexManager) {
public TableLevelAsyncIndexManagerDecorator(IndexManager indexManager) {
super(indexManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.github.sepgh.internal.EngineConfig;
import com.github.sepgh.internal.index.IndexManager;
import com.github.sepgh.internal.index.Pointer;
import com.github.sepgh.internal.index.tree.AsyncIndexManagerDecorator;
import com.github.sepgh.internal.index.TableLevelAsyncIndexManagerDecorator;
import com.github.sepgh.internal.index.tree.BPlusTreeIndexManager;
import com.github.sepgh.internal.storage.CompactFileIndexStorageManager;
import com.github.sepgh.internal.storage.InMemoryHeaderManager;
Expand Down Expand Up @@ -88,7 +88,7 @@ public void findIndexSuccessfully() throws IOException, ExecutionException, Inte
HeaderManager headerManager = new InMemoryHeaderManager(header);
CompactFileIndexStorageManager compactFileIndexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig);

IndexManager indexManager = new AsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, compactFileIndexStorageManager));
IndexManager indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, compactFileIndexStorageManager));
Pointer dataPointer = new Pointer(Pointer.TYPE_DATA, 100, 0);

indexManager.addIndex(1, 10, dataPointer);
Expand All @@ -104,7 +104,7 @@ public void findIndexFailure() throws IOException, ExecutionException, Interrupt
HeaderManager headerManager = new InMemoryHeaderManager(header);
CompactFileIndexStorageManager compactFileIndexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig);

IndexManager indexManager = new AsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, compactFileIndexStorageManager));
IndexManager indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, compactFileIndexStorageManager));
Pointer dataPointer = new Pointer(Pointer.TYPE_DATA, 100, 0);

indexManager.addIndex(1, 10, dataPointer);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.github.sepgh.internal.index.tree.removing;

import com.github.sepgh.internal.index.IndexManager;
import com.github.sepgh.internal.index.tree.AsyncIndexManagerDecorator;
import com.github.sepgh.internal.index.TableLevelAsyncIndexManagerDecorator;
import com.github.sepgh.internal.index.tree.BPlusTreeIndexManager;
import com.github.sepgh.internal.storage.CompactFileIndexStorageManager;
import com.github.sepgh.internal.storage.InMemoryHeaderManager;
Expand Down Expand Up @@ -51,7 +51,7 @@ public void testRemovingRoot() throws IOException, ExecutionException, Interrupt
public void testRemovingLeftToRightAsync() throws IOException, ExecutionException, InterruptedException {
IndexStorageManager indexStorageManager = getIndexStorageManager();
IndexManager indexManager = getIndexManager(indexStorageManager);
IndexManager asycnIndexManager = new AsyncIndexManagerDecorator(indexManager);
IndexManager asycnIndexManager = new TableLevelAsyncIndexManagerDecorator(indexManager);
super.testRemovingLeftToRightAsync(asycnIndexManager, indexStorageManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.github.sepgh.internal.EngineConfig;
import com.github.sepgh.internal.index.IndexManager;
import com.github.sepgh.internal.index.Pointer;
import com.github.sepgh.internal.index.tree.AsyncIndexManagerDecorator;
import com.github.sepgh.internal.index.TableLevelAsyncIndexManagerDecorator;
import com.github.sepgh.internal.index.tree.BPlusTreeIndexManager;
import com.github.sepgh.internal.index.tree.node.BaseTreeNode;
import com.github.sepgh.internal.index.tree.node.InternalTreeNode;
Expand Down Expand Up @@ -244,7 +244,7 @@ public void testMultiSplitAddIndexAsync() throws IOException, ExecutionException

HeaderManager headerManager = new InMemoryHeaderManager(header);
CompactFileIndexStorageManager compactFileIndexStorageManager = new CompactFileIndexStorageManager(dbPath, headerManager, engineConfig);
IndexManager indexManager = new AsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, compactFileIndexStorageManager));
IndexManager indexManager = new TableLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, compactFileIndexStorageManager));

ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch countDownLatch = new CountDownLatch(testIdentifiers.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.sepgh.internal.index.tree.storing;

import com.github.sepgh.internal.EngineConfig;
import com.github.sepgh.internal.index.DBLevelAsyncIndexManagerDecorator;
import com.github.sepgh.internal.index.IndexManager;
import com.github.sepgh.internal.index.Pointer;
import com.github.sepgh.internal.index.tree.BPlusTreeIndexManager;
Expand All @@ -11,10 +12,7 @@
import com.github.sepgh.internal.storage.header.HeaderManager;
import com.github.sepgh.internal.storage.pool.FileHandler;
import com.github.sepgh.internal.storage.pool.LimitedFileHandlerPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;

import java.io.File;
import java.io.IOException;
Expand All @@ -25,8 +23,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import static com.github.sepgh.internal.storage.ExtendedFileIndexStorageManager.INDEX_FILE_NAME;

Expand Down Expand Up @@ -228,25 +226,47 @@ public void testMultiSplitAddIndexLimitedOpenFiles_SuccessLimitTo2() throws IOEx
}


// Todo: there is a small chance of failure due to unordered execution of multiple threads
@Timeout(10)
@Test
public void testMultiSplitAddIndexDifferentAddOrders() throws IOException, ExecutionException, InterruptedException {
public void testMultiSplitAddIndexDifferentAddOrdersOnDBLevelAsyncIndexManager() throws IOException, ExecutionException, InterruptedException {

List<Long> testIdentifiers = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L);
Pointer samplePointer = new Pointer(Pointer.TYPE_DATA, 100, 0);

HeaderManager headerManager = new InMemoryHeaderManager(header);
ExtendedFileIndexStorageManager extendedFileIndexStorageManager = new ExtendedFileIndexStorageManager(dbPath, headerManager, engineConfig);
IndexManager indexManager = new BPlusTreeIndexManager(degree, extendedFileIndexStorageManager);
IndexManager indexManager = new DBLevelAsyncIndexManagerDecorator(new BPlusTreeIndexManager(degree, extendedFileIndexStorageManager));

ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch countDownLatch = new CountDownLatch((2 * testIdentifiers.size()) - 2);


AtomicInteger index1 = new AtomicInteger(0);
AtomicInteger index2 = new AtomicInteger(0);

int index = 0;
int runs = 0;
while (runs < testIdentifiers.size()){
indexManager.addIndex(1, testIdentifiers.get(index), samplePointer);
indexManager.addIndex(2, testIdentifiers.get(index) * 10, samplePointer);
index++;
executorService.submit(() -> {
try {
indexManager.addIndex(1, testIdentifiers.get(index1.getAndIncrement()), samplePointer);
countDownLatch.countDown();
} catch (ExecutionException | InterruptedException | IOException e) {
throw new RuntimeException(e);
}
});
executorService.submit(() -> {
try {
indexManager.addIndex(2, testIdentifiers.get(index2.getAndIncrement()) * 10, samplePointer);
countDownLatch.countDown();
} catch (ExecutionException | InterruptedException | IOException e) {
throw new RuntimeException(e);
}
});
runs++;
}

countDownLatch.await();

for (int tableId = 1; tableId <= 2; tableId++) {

Expand Down

0 comments on commit a710971

Please sign in to comment.