Skip to content

Commit

Permalink
Flink 1.17: Use awaitility instead of Thread.sleep() (#8852)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Oct 18, 2023
1 parent ad602a3 commit e837973
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructLikeWrapper;
import org.awaitility.Awaitility;
import org.junit.Assert;

public class SimpleDataUtil {
Expand Down Expand Up @@ -277,21 +278,17 @@ public static void assertRecordsEqual(List<Record> expected, List<Record> actual
}

/**
* Assert table contains the expected list of records after waiting up to {@code maxCheckCount}
* with {@code checkInterval}
* Assert table contains the expected list of records after waiting up to the configured {@code
* timeout}
*/
public static void assertTableRecords(
Table table, List<Record> expected, Duration checkInterval, int maxCheckCount)
throws IOException, InterruptedException {
for (int i = 0; i < maxCheckCount; ++i) {
if (equalsRecords(expected, tableRecords(table), table.schema())) {
break;
} else {
Thread.sleep(checkInterval.toMillis());
}
}
// success or failure, assert on the latest table state
assertRecordsEqual(expected, tableRecords(table), table.schema());
public static void assertTableRecords(Table table, List<Record> expected, Duration timeout) {
Awaitility.await("expected list of records should be produced")
.atMost(timeout)
.untilAsserted(
() -> {
equalsRecords(expected, tableRecords(table), table.schema());
assertRecordsEqual(expected, tableRecords(table), table.schema());
});
}

public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -47,6 +49,7 @@
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
Expand Down Expand Up @@ -401,10 +404,11 @@ public static List<Row> waitForResult(CloseableIterator<Row> iter, int limit) {
return results;
}

public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
while (getRunningJobs(client).isEmpty()) {
Thread.sleep(10);
}
public static void waitUntilJobIsRunning(ClusterClient<?> client) {
Awaitility.await("job should be running")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofMillis(10))
.untilAsserted(() -> assertThat(getRunningJobs(client)).isNotEmpty());
}

public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
Expand Down Expand Up @@ -98,11 +97,6 @@ protected List<Record> generateRecords(int numRecords, long seed) {
return RandomGenericData.generate(schema(), numRecords, seed);
}

protected void assertRecords(
Table table, List<Record> expectedRecords, Duration interval, int maxCount) throws Exception {
SimpleDataUtil.assertTableRecords(table, expectedRecords, interval, maxCount);
}

@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
testBoundedIcebergSource(FailoverType.TM);
Expand Down Expand Up @@ -156,7 +150,8 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());

assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000);
SimpleDataUtil.assertTableRecords(
sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

@Test
Expand Down Expand Up @@ -219,7 +214,8 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep

// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000);
SimpleDataUtil.assertTableRecords(
sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
*/
package org.apache.iceberg.flink.source;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
Expand All @@ -47,6 +48,7 @@
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -107,18 +109,14 @@ public void testConsumeWithoutStartSnapshotId() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

// Stop the stream task.
function.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
Expand All @@ -144,18 +142,14 @@ public void testConsumeFromStartSnapshotId() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

// Stop the stream task.
function.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
Expand All @@ -180,18 +174,14 @@ public void testConsumeFromStartTag() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

// Stop the stream task.
function.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
Expand All @@ -208,20 +198,16 @@ public void testCheckpointRestore() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, func);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

state = harness.snapshot(1, 1);

// Stop the stream task.
func.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
Expand All @@ -234,23 +220,29 @@ public void testCheckpointRestore() throws Exception {
harness.initializeState(state);
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, newFunc);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

// Stop the stream task.
newFunc.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA);
}
}

private void awaitExpectedSplits(TestSourceContext sourceContext) {
Awaitility.await("expected splits should be produced")
.atMost(Duration.ofMillis(WAIT_TIME_MILLIS))
.untilAsserted(
() -> {
assertThat(sourceContext.latch.getCount()).isEqualTo(0);
assertThat(sourceContext.splits).as("Should produce the expected splits").hasSize(1);
});
}

@Test
public void testInvalidMaxPlanningSnapshotCount() {
ScanContext scanContext1 =
Expand Down

0 comments on commit e837973

Please sign in to comment.