Skip to content

Commit

Permalink
Fixing few test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Sep 30, 2024
1 parent ec072e7 commit ff162eb
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1786,15 +1786,15 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {
HoodieJavaWriteClient client = getHoodieWriteClient(config);

// Write 1 (Bulk insert)
String newCommitTime = "0000001";
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.insert(records, newCommitTime);
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// Write 2 (inserts)
newCommitTime = "0000002";
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(records, newCommitTime);
Expand Down Expand Up @@ -1827,7 +1827,7 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file))));

// trigger new write to mimic other writes succeeding before re-attempt.
newCommitTime = "0000003";
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(records, newCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,25 +1319,29 @@ public void testRollbackFailedCommits() throws Exception {
HoodieJavaWriteClient client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));

// perform 1 successful commit
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
String commit1 = HoodieActiveTimeline.createNewInstantTime();
writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), commit1,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, true);

// Perform 2 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100",
String commit2 = HoodieActiveTimeline.createNewInstantTime();
writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), commit1,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
String commit3 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), commit3,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Perform 1 successful write
String commit4 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)), commit4,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, true);
HoodieTableMetaClient metaClient = createMetaClient();
Expand All @@ -1349,13 +1353,14 @@ public void testRollbackFailedCommits() throws Exception {
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
Thread.sleep(2000);
}
client.close();
client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 successful write
writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
String commit5 = HoodieActiveTimeline.createNewInstantTime();
writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit5)), commit5,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, true);
client.clean();
Expand Down Expand Up @@ -1396,47 +1401,53 @@ public void testRollbackFailedCommitsToggleCleaningPolicy() throws Exception {
HoodieTestUtils.init(storageConf, basePath);
HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER;
HoodieJavaWriteClient client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
String commit1 = HoodieActiveTimeline.createNewInstantTime();
// Perform 1 successful writes to table
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), commit1,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, true);

// Perform 1 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
String commit2 = HoodieActiveTimeline.createNewInstantTime();
writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), commit2,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Toggle cleaning policy to LAZY
cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
String commit3 = HoodieActiveTimeline.createNewInstantTime();
// Perform 2 failed writes to table
client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), commit3,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
String commit4 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)), commit4,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Await till enough time passes such that the 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("400");
conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit4);
Thread.sleep(2000);
}
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
String commit5 = HoodieActiveTimeline.createNewInstantTime();
// Perform 2 failed commits
client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300",
writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit3)), commit3,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
String commit6 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400",
writeBatch(client, commit6, commit5, Option.of(Arrays.asList(commit4)), commit4,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
Expand All @@ -1458,28 +1469,33 @@ public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTestUtils.init(storageConf, basePath);
HoodieJavaWriteClient client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
String commit1 = HoodieActiveTimeline.createNewInstantTime();
// perform 1 successful write
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)), commit1,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 100,
0, true);

// Perform 2 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
String commit2 = HoodieActiveTimeline.createNewInstantTime();
writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)), commit2,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 100,
0, false);
client.close();

String commit3 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)), commit3,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert, false, 100, 100,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
String commit4 = HoodieActiveTimeline.createNewInstantTime();
// Create a successful commit
Future<List<WriteStatus>> commit3 = service.submit(() -> writeBatch(new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)),
"400", "300", Option.of(Arrays.asList("400")), "300", 100, dataGen::generateInserts,
Future<List<WriteStatus>> commit4Future = service.submit(() -> writeBatch(new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)),
commit4, commit3, Option.of(Arrays.asList(commit4)), commit3, 100, dataGen::generateInserts,
HoodieJavaWriteClient::bulkInsert, false, 100, 100, 0, true));
commit3.get();
commit4Future.get();
HoodieTableMetaClient metaClient = createMetaClient();

assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
Expand All @@ -1490,14 +1506,15 @@ public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception {
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
Thread.sleep(2000);
}
Future<List<WriteStatus>> commit4 = service.submit(() -> writeBatch(new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)),
"500", "400", Option.of(Arrays.asList("500")), "500", 100, dataGen::generateInserts,
String commit5 = HoodieActiveTimeline.createNewInstantTime();
Future<List<WriteStatus>> commit5Future = service.submit(() -> writeBatch(new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)),
commit5, commit4, Option.of(Arrays.asList(commit5)), commit5, 100, dataGen::generateInserts,
HoodieJavaWriteClient::bulkInsert, false, 100, 100, 0, true));
Future<HoodieCleanMetadata> clean1 = service.submit(() -> new HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)).clean());
commit4.get();
commit5Future.get();
clean1.get();
client.close();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
// Create upserts, schedule cleaning, schedule compaction in parallel
//String instant4 = HoodieActiveTimeline.createNewInstantTime();
Future future1 = executors.submit(() -> {
int numRecords = 100;
String commitTimeBetweenPrevAndNew = instantTime2;
Expand All @@ -151,7 +150,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
Future future2 = executors.submit(() -> {
try {
int counter = 0;
while (counter++ < 5) {
while (counter++ < 5) { // we can't really time which writer triggers first and which one triggers later. So, lets add few rounds of retries.
try {
String instant5 = HoodieActiveTimeline.createNewInstantTime();
client2.scheduleTableService(instant5, Option.empty(), TableServiceType.COMPACT);
Expand All @@ -172,7 +171,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
AtomicReference<String> instant6Ref = new AtomicReference<>();
Future future3 = executors.submit(() -> {
int counter = 0;
while (counter++ < 5) {
while (counter++ < 5) { // we can't really time which writer triggers first and which one triggers later. So, lets add few rounds of retries.
try {
String instant6 = HoodieActiveTimeline.createNewInstantTime();
client2.scheduleTableService(instant6, Option.empty(), TableServiceType.CLEAN);
Expand All @@ -194,7 +193,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
future1 = executors.submit(() -> {
int numRecords = 100;
int counter = 0;
while (counter++ < 5) {
while (counter++ < 5) { // we can't really time which writer triggers first and which one triggers later. So, lets add few rounds of retries.
try {
String instant7 = HoodieActiveTimeline.createNewInstantTime();
createCommitWithInserts(cfg, client1, instantTime3, instant7, numRecords);
Expand All @@ -211,7 +210,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
});
future2 = executors.submit(() -> {
int counter = 0;
while (counter++ < 5) {
while (counter++ < 5) { // we can't really time which writer triggers first and which one triggers later. So, lets add few rounds of retries.
try {
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client2.compact(instant5Ref.get());
client2.commitCompaction(instant5Ref.get(), compactionMetadata.getCommitMetadata().get(), Option.empty());
Expand All @@ -223,17 +222,14 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
}
} catch (Exception e2) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
if (!(e2 instanceof HoodieWriteConflictException)) {
System.out.println("asdfad");
}
Assertions.assertTrue(e2 instanceof HoodieWriteConflictException);
}
}
}
});
future3 = executors.submit(() -> {
int counter = 0;
while (counter++ < 5) {
while (counter++ < 5) { // we can't really time which writer triggers first and which one triggers later. So, lets add few rounds of retries.
try {
client2.clean(instant6Ref.get(), false);
validInstants.add(instant6Ref.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2202,15 +2202,15 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {
SparkRDDWriteClient client = getHoodieWriteClient(config);

// Write 1 (Bulk insert)
String newCommitTime = "0000001";
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// Write 2 (inserts)
newCommitTime = "0000002";
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
Expand Down Expand Up @@ -2240,7 +2240,7 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file))));

// trigger new write to mimic other writes succeeding before re-attempt.
newCommitTime = "0000003";
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
Expand Down
Loading

0 comments on commit ff162eb

Please sign in to comment.