Skip to content

Commit

Permalink
[ENG-8039] Lakeview last modified approach for non blocking processin…
Browse files Browse the repository at this point in the history
…g of commits (#108)
  • Loading branch information
karankm97 authored Oct 4, 2024
1 parent dbd88ea commit c135544
Show file tree
Hide file tree
Showing 8 changed files with 966 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class MetadataExtractorConfig {
@Builder.Default private Optional<List<String>> pathExclusionPatterns = Optional.empty();
@Builder.Default private JobRunMode jobRunMode = JobRunMode.CONTINUOUS;

@Builder.Default
private UploadStrategy uploadStrategy = UploadStrategy.BLOCK_ON_INCOMPLETE_COMMIT;

@Builder.Default
private int presignedUrlRequestBatchSizeActiveTimeline =
PRESIGNED_URL_REQUEST_BATCH_SIZE_ACTIVE_TIMELINE;
Expand All @@ -47,4 +50,9 @@ public enum JobRunMode {
CONTINUOUS,
ONCE
}

public enum UploadStrategy {
BLOCK_ON_INCOMPLETE_COMMIT,
CONTINUE_ON_INCOMPLETE_COMMIT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private MetadataExtractorConstants() {}
.batchId(0)
.checkpointTimestamp(Instant.EPOCH)
.lastUploadedFile("")
.firstIncompleteCommitFile("")
.archivedCommitsProcessed(false)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,62 @@
import static ai.onehouse.constants.MetadataExtractorConstants.SAVEPOINT_ACTION;
import static ai.onehouse.constants.MetadataExtractorConstants.WHITELISTED_ACTION_TYPES;

import ai.onehouse.config.Config;
import ai.onehouse.config.models.configv1.MetadataExtractorConfig;
import ai.onehouse.metadata_extractor.models.Checkpoint;
import ai.onehouse.storage.models.File;
import com.google.inject.Inject;
import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

public class ActiveTimelineInstantBatcher {
private final MetadataExtractorConfig extractorConfig;

@Inject
ActiveTimelineInstantBatcher(@Nonnull Config config) {
this.extractorConfig = config.getMetadataExtractorConfig();
}

/**
* Creates batches of Hudi instants, ensuring related instants are grouped together.
*
* @param instants The list of Hudi instants.
* @param maxBatchSize the maximum number of instants per batch.
* @return A list of batches, each batch being a list of instants.
*/
public List<List<File>> createBatches(List<File> instants, int maxBatchSize) {
public Pair<String, List<List<File>>> createBatches(
List<File> instants, int maxBatchSize, Checkpoint checkpoint) {
if (maxBatchSize < 3) {
throw new IllegalArgumentException("max batch size cannot be less than 3");
}

List<File> sortedInstants = sortAndFilterInstants(instants);
List<File> sortedInstants;
if (extractorConfig
.getUploadStrategy()
.equals(MetadataExtractorConfig.UploadStrategy.CONTINUE_ON_INCOMPLETE_COMMIT)) {
// Get sorted instants by grouping them if they belong to the same commit and any of the files
// has a last modified which is greater than the lastModified of the last checkpoint that was
// uploaded
sortedInstants = sortAndFilterInstants(instants, checkpoint.getCheckpointTimestamp());
} else {
sortedInstants = sortAndFilterInstants(instants);
}

List<List<File>> batches = new ArrayList<>();
List<File> currentBatch = new ArrayList<>();
String firstIncompleteCheckpoint = checkpoint.getFirstIncompleteCommitFile();

int startIndex = 0;
if (!sortedInstants.isEmpty()
Expand Down Expand Up @@ -107,9 +136,23 @@ public List<List<File>> createBatches(List<File> instants, int maxBatchSize) {
currentBatch.clear();
currentBatch.addAll(sortedInstants.subList(index, index + groupSize));
}
} else {
// Instants are not related; add what we have and stop processing
shouldStopIteration = true;
} else if (!shouldStopIteration) {
if (extractorConfig
.getUploadStrategy()
.equals(MetadataExtractorConfig.UploadStrategy.CONTINUE_ON_INCOMPLETE_COMMIT)) {
// Instead of blocking the creation of batches, skipping the incomplete commit file and
// updating the first incomplete checkpoint(startAfter) to be a unit before the incomplete
// instant
String firstIncompleteCheckpointUpdated =
getFirstIncompleteCheckpoint(instant1.getTimestamp());
if (StringUtils.isBlank(firstIncompleteCheckpoint)
|| firstIncompleteCheckpointUpdated.compareTo(firstIncompleteCheckpoint) < 0) {
firstIncompleteCheckpoint = firstIncompleteCheckpointUpdated;
}
groupSize = 1;
} else {
shouldStopIteration = true;
}
}

if (shouldStopIteration) {
Expand All @@ -128,30 +171,59 @@ public List<List<File>> createBatches(List<File> instants, int maxBatchSize) {
batches.add(currentBatch);
}

return batches;
return Pair.of(firstIncompleteCheckpoint, batches);
}

private static String getFirstIncompleteCheckpoint(String numericString) {
BigInteger number = new BigInteger(numericString);
BigInteger decrementedNumber = number.subtract(BigInteger.ONE);
return decrementedNumber.toString();
}

private List<File> sortAndFilterInstants(List<File> instants) {
return instants.stream()
.filter(this::filterFile)
.sorted(getFileComparator())
.collect(Collectors.toList());
}

private List<File> sortAndFilterInstants(List<File> instants, Instant lastModifiedFilter) {
return instants.stream()
.filter(this::filterFile)
.collect(Collectors.groupingBy(file -> file.getFilename().split("\\.", 3)[0]))
.values()
.stream()
.filter(
file ->
file.getFilename().equals(HOODIE_PROPERTIES_FILE)
|| WHITELISTED_ACTION_TYPES.contains(
getActiveTimeLineInstant(file.getFilename()).action))
.sorted(
Comparator.comparing(
File::getFilename,
(name1, name2) -> {
if (HOODIE_PROPERTIES_FILE.equals(name1)) {
return -1;
} else if (HOODIE_PROPERTIES_FILE.equals(name2)) {
return 1;
}
return name1.compareTo(name2);
}))
group ->
group.stream()
.anyMatch(
file ->
file.getFilename().equals(HOODIE_PROPERTIES_FILE)
|| lastModifiedFilter.isBefore(file.getLastModifiedAt())))
.flatMap(List::stream)
.sorted(getFileComparator())
.collect(Collectors.toList());
}

private boolean filterFile(File file) {
return file.getFilename().equals(HOODIE_PROPERTIES_FILE)
|| WHITELISTED_ACTION_TYPES.contains(
getActiveTimeLineInstant(file.getFilename()).getAction());
}

private Comparator<File> getFileComparator() {
return Comparator.comparing(
File::getFilename,
(name1, name2) -> {
if (HOODIE_PROPERTIES_FILE.equals(name1)) {
return -1;
} else if (HOODIE_PROPERTIES_FILE.equals(name2)) {
return 1;
}
return name1.compareTo(name2);
});
}

private static boolean areRelatedInstants(
ActiveTimelineInstant instant1,
ActiveTimelineInstant instant2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

/*
* Has the core logic for listing and uploading commit instants in a given timeline
Expand Down Expand Up @@ -127,8 +128,18 @@ public CompletableFuture<Checkpoint> paginatedBatchUploadWithCheckpoint(
storageUtils.constructFileUri(
table.getAbsoluteTableUri(), getPathSuffixForTimeline(commitTimelineType)));

// startAfter is used only in the first call to get the objects, post that continuation token is
// used
// Resetting the firstIncompleteCommitFile so that we do not process from the same commit again
String startAfter = getStartAfterString(prefix, checkpoint, true);
return executePaginatedBatchUpload(
tableId, table, bucketName, prefix, checkpoint, commitTimelineType);
tableId,
table,
bucketName,
prefix,
checkpoint.toBuilder().firstIncompleteCommitFile("").build(),
commitTimelineType,
startAfter);
}

private CompletableFuture<Checkpoint> executeFullBatchUpload(
Expand Down Expand Up @@ -170,15 +181,18 @@ private CompletableFuture<Checkpoint> executePaginatedBatchUpload(
String bucketName,
String prefix,
Checkpoint checkpoint,
CommitTimelineType commitTimelineType) {
CommitTimelineType commitTimelineType,
String startAfter) {
return asyncStorageClient
.fetchObjectsByPage(bucketName, prefix, null, getStartAfterString(prefix, checkpoint))
.fetchObjectsByPage(bucketName, prefix, null, startAfter)
.thenComposeAsync(
continuationTokenAndFiles -> {
String nextContinuationToken = continuationTokenAndFiles.getLeft();

List<File> filesToUpload =
getFilesToUploadBasedOnPreviousCheckpoint(
continuationTokenAndFiles.getRight(), checkpoint, commitTimelineType, false);

if (!filesToUpload.isEmpty()) {
return uploadInstantsInSequentialBatches(
tableId, table, filesToUpload, checkpoint, commitTimelineType)
Expand All @@ -201,7 +215,8 @@ private CompletableFuture<Checkpoint> executePaginatedBatchUpload(
bucketName,
prefix,
updatedCheckpoint,
commitTimelineType);
commitTimelineType,
getStartAfterString(prefix, updatedCheckpoint, false));
},
executorService);
} else {
Expand Down Expand Up @@ -251,9 +266,17 @@ private CompletableFuture<Checkpoint> uploadInstantsInSequentialBatches(
Lists.partition(
filesToUpload, getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ARCHIVED));
} else {
batches =
Pair<String, List<List<File>>> incompleteCheckpointBatchesPair =
activeTimelineInstantBatcher.createBatches(
filesToUpload, getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE));
filesToUpload,
getUploadBatchSize(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE),
checkpoint);
batches = incompleteCheckpointBatchesPair.getRight();
checkpoint =
checkpoint
.toBuilder()
.firstIncompleteCommitFile(incompleteCheckpointBatchesPair.getLeft())
.build();
}
int numBatches = batches.size();

Expand Down Expand Up @@ -394,6 +417,7 @@ private CompletableFuture<Checkpoint> updateCheckpointAfterProcessingBatch(
.lastUploadedFile(lastUploadedFile.getFilename())
.checkpointTimestamp(lastUploadedFile.getLastModifiedAt())
.archivedCommitsProcessed(archivedCommitsProcessed)
.firstIncompleteCommitFile(previousCheckpoint.getFirstIncompleteCommitFile())
.build();
try {
return onehouseApiClient
Expand Down Expand Up @@ -467,7 +491,6 @@ private List<File> getFilesToUploadBasedOnPreviousCheckpoint(
// for the first batch, always include hoodie properties file
filesToUpload.add(0, HOODIE_PROPERTIES_FILE_OBJ);
}

return filesToUpload;
}

Expand All @@ -490,6 +513,13 @@ private boolean isInstantAlreadyUploaded(
Checkpoint checkpoint, File file, CommitTimelineType commitTimelineType) {
if (checkpoint.getBatchId() != 0 && isInstantFile(checkpoint.getLastUploadedFile())) {
if (commitTimelineType.equals(CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE)) {
if (extractorConfig
.getUploadStrategy()
.equals(MetadataExtractorConfig.UploadStrategy.CONTINUE_ON_INCOMPLETE_COMMIT)) {
// The commits can be incomplete even if below condition is true, hence not ignoring for
// non-blocking mode
return false;
}
return getCommitIdFromActiveTimelineInstant(file.getFilename())
.compareTo(getCommitIdFromActiveTimelineInstant(checkpoint.getLastUploadedFile()))
<= 0;
Expand Down Expand Up @@ -548,12 +578,28 @@ private int getNumericPartFromArchivedCommit(String archivedCommitFileName) {
}
}

private String getStartAfterString(String prefix, Checkpoint checkpoint) {
public String getStartAfterString(String prefix, Checkpoint checkpoint, boolean isFirstFetch) {
String lastProcessedFile = checkpoint.getLastUploadedFile();
return lastProcessedFile.equals(HOODIE_PROPERTIES_FILE)
|| StringUtils.isBlank(lastProcessedFile)
? null
: storageUtils.constructFileUri(prefix, checkpoint.getLastUploadedFile());
// Base case to process from the beginning
if (lastProcessedFile.equals(HOODIE_PROPERTIES_FILE)
|| StringUtils.isBlank(lastProcessedFile)) {
return null;
}

// Extractor blocks on incomplete commits, startAfter is the last processed file
if (extractorConfig
.getUploadStrategy()
.equals(MetadataExtractorConfig.UploadStrategy.BLOCK_ON_INCOMPLETE_COMMIT)
|| !isFirstFetch) {
return storageUtils.constructFileUri(prefix, lastProcessedFile);
}

// Extractor does not block on incomplete commits, it resumes from the first incomplete commit
// file if present else takes the lastProcessedFile as the starting point
String firstIncompleteCommitFile = checkpoint.getFirstIncompleteCommitFile();
return StringUtils.isBlank(firstIncompleteCommitFile)
? storageUtils.constructFileUri(prefix, lastProcessedFile)
: storageUtils.constructFileUri(prefix, firstIncompleteCommitFile);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ public class Checkpoint implements Serializable {
int batchId;
@NonNull Instant checkpointTimestamp;
@NonNull String lastUploadedFile;
String firstIncompleteCommitFile;
boolean archivedCommitsProcessed;
}
Loading

0 comments on commit c135544

Please sign in to comment.