Skip to content

Commit

Permalink
fix: index storage allocation and header management for multi table
Browse files Browse the repository at this point in the history
Allocation implementation had bugs in multi table environment
  • Loading branch information
sepgh committed May 4, 2024
1 parent 816a78b commit bcd934e
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public CompletableFuture<NodeData> fillRoot(int table, byte[] data){
return output;
}

FileUtils.write(getAsynchronousFileChannel(optionalTable.get().getRoot().getChunk()), optionalTable.get().getRoot().getOffset(), data).whenComplete((size, throwable) -> {
long rootAbsolutePosition = headerTable.getIndexChunk(headerTable.getRoot().getChunk()).get().getOffset() + headerTable.getRoot().getOffset();
FileUtils.write(getAsynchronousFileChannel(headerTable.getRoot().getChunk()),rootAbsolutePosition, data).whenComplete((size, throwable) -> {
if (throwable != null){
output.completeExceptionally(throwable);
}
Expand All @@ -115,12 +116,12 @@ public CompletableFuture<Optional<NodeData>> getRoot(int table) {
CompletableFuture<Optional<NodeData>> output = new CompletableFuture<>();

Optional<Header.Table> optionalTable = headerManager.getHeader().getTableOfId(table);
Header.Table headerTable = optionalTable.get();
if (optionalTable.isEmpty() || headerTable.getRoot() == null){
if (optionalTable.isEmpty() || optionalTable.get().getRoot() == null){
output.complete(Optional.empty());
return output;
}

Header.Table headerTable = optionalTable.get();
Header.IndexChunk root = headerTable.getRoot();
FileUtils.readBytes(
getAsynchronousFileChannel(root.getChunk()),
Expand Down Expand Up @@ -192,6 +193,7 @@ public CompletableFuture<NodeData> writeNewNode(int table, byte[] data, boolean
FileUtils.write(getAsynchronousFileChannel(pointer.getChunk()), offset, data).whenComplete((size, throwable) -> {
if (throwable != null){
output.completeExceptionally(throwable);
return;
}

output.complete(
Expand Down Expand Up @@ -268,7 +270,7 @@ private Pointer getAllocatedSpaceForNewNode(int tableId, int chunk) throws IOExc
:
tablesIncludingChunk.get(indexOfTable + 1).getIndexChunk(chunk).get().getOffset() - engineConfig.indexGrowthAllocationSize();

if (positionToCheck > 0) {
if (positionToCheck > 0 && positionToCheck > tablesIncludingChunk.get(indexOfTable).getIndexChunk(chunk).get().getOffset()) {
byte[] bytes = FileUtils.readBytes(asynchronousFileChannel, positionToCheck, engineConfig.indexGrowthAllocationSize()).get();
Optional<Integer> optionalAdditionalPosition = getPossibleAllocationLocation(bytes);
if (optionalAdditionalPosition.isPresent()){
Expand All @@ -294,15 +296,12 @@ private Pointer getAllocatedSpaceForNewNode(int tableId, int chunk) throws IOExc

for (int i = indexOfTable + 1; i < tablesIncludingChunk.size(); i++){
Header.Table nextTable = tablesIncludingChunk.get(i);
if (nextTable.getRoot().getChunk() == chunk) {
nextTable.getRoot().setOffset(
nextTable.getRoot().getOffset() + engineConfig.indexGrowthAllocationSize()
);
}
Header.IndexChunk indexChunk = nextTable.getIndexChunk(chunk).get();
indexChunk.setOffset(indexChunk.getOffset() + engineConfig.indexGrowthAllocationSize());
}
headerManager.update();
}

return new Pointer(Pointer.TYPE_NODE, allocatedOffset, chunk);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.github.sepgh.internal.tree.node.BaseTreeNode;
import com.github.sepgh.internal.tree.node.InternalTreeNode;
import com.github.sepgh.internal.tree.node.LeafTreeNode;
import com.google.common.hash.HashCode;

import java.io.IOException;
import java.util.*;
Expand All @@ -20,9 +21,8 @@ public BTreeIndexManager(int order, IndexStorageManager indexStorageManager){
this.indexStorageManager = indexStorageManager;
}

private BaseTreeNode getRoot(int table) throws ExecutionException, InterruptedException, IOException {
CompletableFuture<Optional<IndexStorageManager.NodeData>> completableFuture = indexStorageManager.getRoot(table);
Optional<IndexStorageManager.NodeData> optionalNodeData = completableFuture.get();
private BaseTreeNode getRoot(int table) throws ExecutionException, InterruptedException {
Optional<IndexStorageManager.NodeData> optionalNodeData = indexStorageManager.getRoot(table).get();
if (optionalNodeData.isPresent()){
BaseTreeNode root = BaseTreeNode.fromBytes(optionalNodeData.get().bytes());
root.setNodePointer(optionalNodeData.get().pointer());
Expand Down Expand Up @@ -95,7 +95,7 @@ public BaseTreeNode addIndex(int table, long identifier, Pointer pointer) throws
parentInternalTreeNode.setChildAtIndex(0, currentNode.getNodePointer());
parentInternalTreeNode.setChildAtIndex(1, newLeafTreeNode.getNodePointer());
indexStorageManager.writeNewNode(table, parentInternalTreeNode.toBytes(), true).get();
indexStorageManager.updateNode(table, currentNode.getData(), currentNode.getNodePointer());
indexStorageManager.updateNode(table, currentNode.getData(), currentNode.getNodePointer()).get();
return newLeafTreeNode;
}

Expand Down
78 changes: 26 additions & 52 deletions src/main/java/com/github/sepgh/internal/utils/FileUtils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.github.sepgh.internal.utils;

import com.google.common.hash.HashCode;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
Expand Down Expand Up @@ -39,12 +41,7 @@ public static CompletableFuture<Long> allocate(AsynchronousFileChannel asynchron
asynchronousFileChannel.write(ByteBuffer.allocate(size), fileSize, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
try {
asynchronousFileChannel.force(true);
future.complete(fileSize);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(fileSize);
}

@Override
Expand All @@ -58,54 +55,31 @@ public void failed(Throwable exc, Object attachment) {
public static CompletableFuture<Long> allocate(AsynchronousFileChannel asynchronousFileChannel, long position, int size) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<>();

int capacity = (int) Math.subtractExact(
asynchronousFileChannel.size(),
position
);

ByteBuffer buffer = ByteBuffer.allocate(capacity);

asynchronousFileChannel.read(buffer, position, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
buffer.flip();
ByteBuffer dataAfterPosition = ByteBuffer.allocate(capacity);
dataAfterPosition.put(buffer);

// Write the empty area at the desired position
ByteBuffer emptyBuffer = ByteBuffer.allocate(size);
asynchronousFileChannel.write(emptyBuffer, position, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
asynchronousFileChannel.write(dataAfterPosition, position + size, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
try {
asynchronousFileChannel.force(true);
future.complete(position);
} catch (IOException e){
future.completeExceptionally(e);
}
}

@Override
public void failed(Throwable exc, Object attachment) {
future.completeExceptionally(exc);
}
});
}

@Override
public void failed(Throwable exc, Object attachment) {
future.completeExceptionally(exc);
int readSize = (int) (asynchronousFileChannel.size() - position);
allocate(asynchronousFileChannel, size).whenComplete((allocatedBeginningPosition, throwable) -> {
if (throwable != null){
future.completeExceptionally(throwable);
return;
}
FileUtils.readBytes(asynchronousFileChannel, position, readSize).whenComplete((readBytes, throwable1) -> {
if (throwable1 != null){
future.completeExceptionally(throwable1);
return;
}
FileUtils.write(asynchronousFileChannel, position, new byte[readBytes.length]).whenComplete((integer, throwable2) -> {
if (throwable2 != null){
future.completeExceptionally(throwable2);
return;
}
FileUtils.write(asynchronousFileChannel, position + size, readBytes).whenComplete((integer1, throwable3) -> {
if (throwable3 != null){
future.completeExceptionally(throwable3);
return;
}
future.complete(position);
});
});
}

@Override
public void failed(Throwable exc, Object attachment) {
future.completeExceptionally(exc);
}
});
});

return future;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.github.sepgh.internal.helper;

import com.github.sepgh.internal.EngineConfig;
import com.github.sepgh.internal.storage.header.HeaderManager;
import com.github.sepgh.internal.tree.Pointer;
import com.github.sepgh.internal.tree.node.BaseTreeNode;
import com.github.sepgh.internal.tree.node.InternalTreeNode;
import com.github.sepgh.internal.tree.node.LeafTreeNode;
import com.github.sepgh.internal.utils.FileUtils;
import com.google.common.hash.HashCode;
import lombok.AllArgsConstructor;

import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;


@AllArgsConstructor
public class IndexFileDescriptor {
private final AsynchronousFileChannel asynchronousFileChannel;
private final HeaderManager headerManager;
private final EngineConfig engineConfig;

public void describe() throws IOException, ExecutionException, InterruptedException {
long paddingCounts = asynchronousFileChannel.size() / engineConfig.getPaddedSize();

for (int i = 0; i < paddingCounts; i++){
int offset = i * engineConfig.getPaddedSize();
byte[] bytes = FileUtils.readBytes(this.asynchronousFileChannel, offset, engineConfig.getPaddedSize()).get();

if (bytes.length == 0 || bytes[0] == 0){
this.printEmptyNode(offset);
continue;
}

BaseTreeNode baseTreeNode = BaseTreeNode.fromBytes(bytes);
if (baseTreeNode.isLeaf())
this.printLeafNode((LeafTreeNode) baseTreeNode, offset);
else
this.printInternalNode((InternalTreeNode) baseTreeNode, offset);
}

}

private void printInternalNode(InternalTreeNode node, int offset) {
System.out.println();
System.out.println(HashCode.fromBytes(node.toBytes()));
System.out.printf("Offset: %d%n", offset);
System.out.printf("Node Header: root(%s) [internal] %n", node.isRoot() ? "T" : "F");
System.out.println("Keys:" + node.keyList());
System.out.println("Children:" + node.childrenList());
System.out.println("===========================");
}

private void printEmptyNode(int offset) {
System.out.println();
System.out.printf("Offset: %d%n", offset);
System.out.println("EMPTY");
System.out.println("===========================");
}

private void printLeafNode(LeafTreeNode node, int offset){
System.out.println();
System.out.println(HashCode.fromBytes(node.toBytes()));
System.out.printf("Offset: %d%n", offset);
System.out.printf("Node Header: root(%s) [leaf] %n", node.isRoot() ? "T" : "F");
StringBuilder stringBuilder = new StringBuilder();
Iterator<Map.Entry<Long, Pointer>> entryIterator = node.keyValues();
while (entryIterator.hasNext()) {
Map.Entry<Long, Pointer> next = entryIterator.next();
stringBuilder
.append("\t")
.append("K: ")
.append(next.getKey())
.append(" V: ")
.append(next.getValue())
.append(" OFFSET: TBD")
.append("\n")
;
}
System.out.println("Key Values:");
System.out.println(stringBuilder);
System.out.println("===========================");
}


}
Loading

0 comments on commit bcd934e

Please sign in to comment.