Skip to content

Commit

Permalink
[HUDI-7581] Add a multi writer test with index updates and cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Sep 30, 2024
1 parent 4e98278 commit 14a6c5c
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ private void updateFunctionalIndexIfPresent(HoodieCommitMetadata commitMetadata,
private HoodieData<HoodieRecord> getFunctionalIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception {
HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexPartition);
List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, value) -> {
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> {
List<FileSlice> fileSlices = getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(), dataPartition);
fileSlices.forEach(fileSlice -> {
// Filter log files for the instant time and add to this partition fileSlice pairs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -48,6 +50,8 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.marker.SimpleDirectMarkerBasedDetectionStrategy;
import org.apache.hudi.table.marker.SimpleTransactionDirectMarkerBasedDetectionStrategy;
Expand Down Expand Up @@ -903,6 +907,101 @@ public void testHoodieClientMultiWriterAutoCommitNonConflict() throws Exception
client2.close();
}

/**
* Test case for multi-writer scenario with index updates and aggressive cleaning.
*/
@Test
public void testMultiWriterWithIndexingAndAggressiveCleaning() throws Exception {
// setting up MOR table so that we can have a log file in the file slice
setUpMORTestTable();
// common write configs for both writers
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMetadataIndexColumnStats(true)
.withEnableRecordIndex(true).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.withAutoArchive(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProvider.class)
.withConflictResolutionStrategy(new SimpleConcurrentFileWritesConflictResolutionStrategy())
.build()).withAutoCommit(false);
HoodieWriteConfig writeConfig1 = writeConfigBuilder.build();

// clean every commit for writer2
HoodieWriteConfig writeConfig2 = writeConfigBuilder
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(1)
.withAutoClean(false).build())
.build();

// Simulate the first commit with Writer 1
final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig1);
final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
createCommitWithInserts(writeConfig1, getHoodieWriteClient(writeConfig1), "000", client1.createNewInstantTime(), 200, true);

// multi-writer setup
final int threadCount = 2;
final ExecutorService executors = Executors.newFixedThreadPool(threadCount);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);

// Writer 1 - Simulating the index update process
Future future1 = executors.submit(() -> {
try {
final String nextCommitTime = client1.createNewInstantTime();
final JavaRDD<WriteStatus> writeStatusList = startCommitForUpdate(writeConfig1, client1, nextCommitTime, 100);

// Wait for Writer 2 to start cleaning
cyclicBarrier.await(60, TimeUnit.SECONDS);

// Commit the update including index update and assert no exceptions
assertDoesNotThrow(() -> {
client1.commit(nextCommitTime, writeStatusList);
});

// Signal Writer 2 to continue
cyclicBarrier.await(60, TimeUnit.SECONDS);
writer1Completed.set(true);
} catch (Exception e) {
writer1Completed.set(false);
}
});

// Writer 2 - Simulating aggressive cleaning
Future future2 = executors.submit(() -> {
try {
// Wait for Writer 1 to make progress
cyclicBarrier.await(60, TimeUnit.SECONDS);

// Simulate aggressive cleaning
metaClient.reloadActiveTimeline();
HoodieTable table = HoodieSparkTable.create(writeConfig2, context, metaClient);
table.clean(context, client2.createNewInstantTime()); // clean old file slices

// Signal Writer 1 to complete its update
cyclicBarrier.await(60, TimeUnit.SECONDS);
writer2Completed.set(true);
} catch (Exception e) {
writer2Completed.set(false);
}
});

// Wait for both writers to complete
future1.get();
future2.get();

// Assertions to ensure both writers completed their operations
assertTrue(writer1Completed.get() && writer2Completed.get());

// Cleanup
client1.close();
client2.close();
}

private void ingestBatch(Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient writeClient, String commitTime, JavaRDD<HoodieRecord> records,
CountDownLatch countDownLatch) throws IOException, InterruptedException {
Expand Down

0 comments on commit 14a6c5c

Please sign in to comment.