Skip to content

Commit

Permalink
fix: index storage allocation and header management
Browse files Browse the repository at this point in the history
Headers didn't use to update after allocations occur
  • Loading branch information
sepgh committed May 4, 2024
1 parent c7532fa commit 816a78b
Show file tree
Hide file tree
Showing 7 changed files with 603 additions and 58 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ This project is implemented to practice a B+Tree implementation to index data on

- Prevent "too many open files" issue: since index chunks can grow, its safer to create a better pool for `SynchronisedFileChannel` used -currently- by `IndexFileManager`
- Searching for keys, adding keys, or key values, are all done linearly. Alternatively, we could add/modify using binary search (works better in case of large key sizes) or hold a metadata in node with sorts
- Allocation may require a flag to set that part of storage as "reserved", so write and overwrite can be different. But would this be enough to prevent writing to a location a requester (a part of code that needed allocation) has allocated itself? Or, maybe this is completely wrong. If we make addIndex() sync, and only one thread can allocate space per table, there won't be an issue? Could still be wrong since some other table may have allocated space in same chunk, and that can ruin things (race condition)
- After allocation, table chunk offsets should be updated: any table after the table we allocated for needs to change (increase) their offset position. This is the only possible way to not break all nodes child pointers.
- Allocation may require a flag to set that part of storage as "reserved", so write and overwrite can be different. But would this be enough to prevent writing to a location a requester (a part of code that needed allocation) has allocated itself? Or, maybe this is completely wrong. If we make addIndex() sync, and only one thread can allocate space per table, there won't be an issue? Could still be wrong since some other table may have allocated space in same chunk, and that can ruin things (race condition) | **update**: this may be wrong as BTree operations on a single db should be sync
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static com.github.sepgh.internal.tree.node.BaseTreeNode.TYPE_INTERNAL_NODE_BIT;
import static com.github.sepgh.internal.tree.node.BaseTreeNode.TYPE_LEAF_NODE_BIT;
Expand Down Expand Up @@ -79,6 +78,38 @@ private AsynchronousFileChannel getAsynchronousFileChannel(int chunk) {
return channel;
}

@Override
public CompletableFuture<NodeData> fillRoot(int table, byte[] data){
CompletableFuture<NodeData> output = new CompletableFuture<>();

Optional<Header.Table> optionalTable = headerManager.getHeader().getTableOfId(table);

Header.Table headerTable = optionalTable.get();
if (optionalTable.isEmpty() || headerTable.getRoot() == null){
output.completeExceptionally(new Exception("Root position is undetermined")); // Todo
return output;
}

FileUtils.write(getAsynchronousFileChannel(optionalTable.get().getRoot().getChunk()), optionalTable.get().getRoot().getOffset(), data).whenComplete((size, throwable) -> {
if (throwable != null){
output.completeExceptionally(throwable);
}

output.complete(
new NodeData(
new Pointer(
Pointer.TYPE_NODE,
optionalTable.get().getRoot().getOffset(),
optionalTable.get().getRoot().getChunk()
),
data
)
);
});

return output;
}

@Override
public CompletableFuture<Optional<NodeData>> getRoot(int table) {
CompletableFuture<Optional<NodeData>> output = new CompletableFuture<>();
Expand All @@ -101,6 +132,11 @@ public CompletableFuture<Optional<NodeData>> getRoot(int table) {
return;
}

if (bytes.length == 0 || bytes[0] == (byte) 0x00){
output.complete(Optional.empty());
return;
}

output.complete(
Optional.of(
new NodeData(new Pointer(Pointer.TYPE_NODE, root.getOffset(), root.getChunk()), bytes)
Expand Down Expand Up @@ -151,7 +187,7 @@ public CompletableFuture<NodeData> writeNewNode(int table, byte[] data, boolean
byte[] finalData1 = data;
long offset = pointer.getPosition();

// setting pointer position according to the offset. Reading table again since a new chunk may have been created
// setting pointer position according to the table offset. Reading table again since a new chunk may have been created
pointer.setPosition(offset - headerManager.getHeader().getTableOfId(table).get().getIndexChunk(pointer.getChunk()).get().getOffset());
FileUtils.write(getAsynchronousFileChannel(pointer.getChunk()), offset, data).whenComplete((size, throwable) -> {
if (throwable != null){
Expand All @@ -169,68 +205,110 @@ public CompletableFuture<NodeData> writeNewNode(int table, byte[] data, boolean
return output;
}

// Todo: as currently written in README, after allocating space, the chunk offset of tables after the tableId should be updated
private List<Header.Table> getTablesIncludingChunk(int chunk){
return headerManager.getHeader().getTables().stream().filter(table -> table.getIndexChunk(chunk).isPresent()).toList();
}

private int getIndexOfTable(List<Header.Table> tables, int table){
int index = -1;
for (int i = 0; i < tables.size(); i++)
if (tables.get(i).getId() == table){
index = i;
break;
}

return index;
}

/**
* ## How it works:
* if the chunk is new for this table, then just allocate at the end of the file and add the chunk index to header
* and return. But if there isn't space left, try next chunk.
* if file size is not 0,
* see if there is any empty space in the file (allocated before but never written to) and return the
* pointer to that space if is available.
* if file size is equal or greater than maximum file size try next chunk
* allocate space at end of the file and return pointer if the table is at end of the file
* otherwise, allocate space right before the next table in this chunk begins and push next tables to the end
* also make sure to update possible roots and chunk indexes offset for next tables
* @param tableId table to allocate space in
* @param chunk chunk to allocate space in
* @return Pointer to the beginning of allocated location
*/
private Pointer getAllocatedSpaceForNewNode(int tableId, int chunk) throws IOException, ExecutionException, InterruptedException {
Header.Table table = headerManager.getHeader().getTableOfId(tableId).get();
Optional<Header.IndexChunk> optional = table.getIndexChunk(chunk);
boolean newChunkCreated = optional.isEmpty();
boolean newChunkCreatedForTable = optional.isEmpty();

AsynchronousFileChannel asynchronousFileChannel = this.getAsynchronousFileChannel(chunk);
int indexOfTableMetaData = headerManager.getHeader().indexOfTable(tableId);

boolean isLastTable = indexOfTableMetaData == headerManager.getHeader().tablesCount() - 1;
long fileSize = asynchronousFileChannel.size();
long position = 0;
if (fileSize != 0){
position = isLastTable ?
fileSize - engineConfig.indexGrowthAllocationSize()
:
headerManager.getHeader().getTableOfIndex(indexOfTableMetaData + 1).get().getIndexChunk(chunk).get().getOffset() - engineConfig.indexGrowthAllocationSize();

Future<byte[]> future = FileUtils.readBytes(asynchronousFileChannel, position, engineConfig.indexGrowthAllocationSize());
byte[] bytes = new byte[0];
try {
bytes = future.get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}

Optional<Integer> optionalAdditionalPosition = getPossibleAllocationLocation(bytes);
if (optionalAdditionalPosition.isPresent()){
long finalPosition = position + optionalAdditionalPosition.get();
return new Pointer(Pointer.TYPE_NODE, finalPosition, chunk);
}


/*
If there isn't an empty allocated location, we check if maximum size is reached.
If it is, we won't be allocating and just move on to next chunk
through recursion till we reach to a chunk where we can allocate space
*/
if (newChunkCreatedForTable){
if (fileSize >= engineConfig.getBTreeMaxFileSize()){
return getAllocatedSpaceForNewNode(tableId, chunk + 1);
} else {
Long position = FileUtils.allocate(asynchronousFileChannel, engineConfig.indexGrowthAllocationSize()).get();
List<Header.IndexChunk> newChunks = new ArrayList<>(table.getChunks());
newChunks.add(new Header.IndexChunk(chunk, position));
table.setChunks(newChunks);
headerManager.update();
return new Pointer(Pointer.TYPE_NODE, position, chunk);
}
}

List<Header.Table> tablesIncludingChunk = getTablesIncludingChunk(chunk);
int indexOfTable = getIndexOfTable(tablesIncludingChunk, tableId);
boolean isLastTable = indexOfTable == tablesIncludingChunk.size() - 1;

}

Long finalPosition;
if (isLastTable || position == 0){
finalPosition = FileUtils.allocate(asynchronousFileChannel, engineConfig.indexGrowthAllocationSize()).get();
}else {
finalPosition = FileUtils.allocate(asynchronousFileChannel, position, engineConfig.indexGrowthAllocationSize()).get();
}
if (fileSize > 0){
long positionToCheck =
isLastTable ?
fileSize - engineConfig.indexGrowthAllocationSize()
:
tablesIncludingChunk.get(indexOfTable + 1).getIndexChunk(chunk).get().getOffset() - engineConfig.indexGrowthAllocationSize();

if (newChunkCreated){
List<Header.IndexChunk> newChunks = new ArrayList<>(table.getChunks());
newChunks.add(new Header.IndexChunk(chunk, finalPosition));
table.setChunks(newChunks);
headerManager.update();
if (positionToCheck > 0) {
byte[] bytes = FileUtils.readBytes(asynchronousFileChannel, positionToCheck, engineConfig.indexGrowthAllocationSize()).get();
Optional<Integer> optionalAdditionalPosition = getPossibleAllocationLocation(bytes);
if (optionalAdditionalPosition.isPresent()){
long finalPosition = positionToCheck + optionalAdditionalPosition.get();
return new Pointer(Pointer.TYPE_NODE, finalPosition, chunk);
}
}
}

return new Pointer(Pointer.TYPE_NODE, finalPosition, chunk);
if (fileSize >= engineConfig.getBTreeMaxFileSize())
return this.getAllocatedSpaceForNewNode(tableId, chunk + 1);


long allocatedOffset;
if (isLastTable){
allocatedOffset = FileUtils.allocate(asynchronousFileChannel, engineConfig.indexGrowthAllocationSize()).get();
} else {
allocatedOffset = FileUtils.allocate(
asynchronousFileChannel,
tablesIncludingChunk.get(indexOfTable + 1).getIndexChunk(chunk).get().getOffset(),
engineConfig.indexGrowthAllocationSize()
).get();

for (int i = indexOfTable + 1; i < tablesIncludingChunk.size(); i++){
Header.Table nextTable = tablesIncludingChunk.get(i);
if (nextTable.getRoot().getChunk() == chunk) {
nextTable.getRoot().setOffset(
nextTable.getRoot().getOffset() + engineConfig.indexGrowthAllocationSize()
);
}
Header.IndexChunk indexChunk = nextTable.getIndexChunk(chunk).get();
indexChunk.setOffset(indexChunk.getOffset() + engineConfig.indexGrowthAllocationSize());
}
}
return new Pointer(Pointer.TYPE_NODE, allocatedOffset, chunk);
}

/*
* Returns the empty position within byte[] passed to the method
*/
private Optional<Integer> getPossibleAllocationLocation(byte[] bytes){
for (int i = 0; i < engineConfig.getBTreeGrowthNodeAllocationCount(); i++){
int position = i * engineConfig.getPaddedSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.concurrent.ExecutionException;

public interface IndexStorageManager {
CompletableFuture<NodeData> fillRoot(int table, byte[] data);

CompletableFuture<Optional<NodeData>> getRoot(int table);

byte[] getEmptyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ private BaseTreeNode getRoot(int table) throws ExecutionException, InterruptedEx
LeafTreeNode leafTreeNode = (LeafTreeNode) BaseTreeNode.fromBytes(emptyNode, BaseTreeNode.NodeType.LEAF);
leafTreeNode.setAsRoot();

IndexStorageManager.NodeData nodeData = indexStorageManager.writeNewNode(
table,
leafTreeNode.getData(),
true
).get();

IndexStorageManager.NodeData nodeData = indexStorageManager.fillRoot(table, leafTreeNode.getData()).get();
leafTreeNode.setNodePointer(nodeData.pointer());
return leafTreeNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import com.github.sepgh.internal.tree.node.BaseTreeNode;
import com.github.sepgh.internal.tree.node.InternalTreeNode;
import com.github.sepgh.internal.tree.node.LeafTreeNode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -58,6 +55,12 @@ public void setUp() throws IOException {
.build()
)
)
.root(
Header.IndexChunk.builder()
.chunk(0)
.offset(0)
.build()
)
.initialized(true)
.build()
)
Expand All @@ -80,6 +83,7 @@ public void destroy() throws IOException {


@Test
@Timeout(value = 2)
public void addIndex() throws IOException, ExecutionException, InterruptedException {
HeaderManager headerManager = new InMemoryHeaderManager(header);
FileIndexStorageManager fileIndexStorageManager = new FileIndexStorageManager(dbPath, headerManager, engineConfig);
Expand All @@ -102,6 +106,7 @@ public void addIndex() throws IOException, ExecutionException, InterruptedExcept
}

@Test
@Timeout(value = 2)
public void testSingleSplitAddIndex() throws IOException, ExecutionException, InterruptedException {
Random random = new Random();

Expand Down Expand Up @@ -188,6 +193,7 @@ public void testSingleSplitAddIndex() throws IOException, ExecutionException, In
* └── 012
*/
@Test
@Timeout(value = 2)
public void testMultiSplitAddIndex() throws IOException, ExecutionException, InterruptedException {

List<Long> testIdentifiers = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L);
Expand Down
Loading

0 comments on commit 816a78b

Please sign in to comment.