diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 5efb7413e70c..f48764f772b4 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -70,6 +70,7 @@ import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructLikeWrapper; +import org.awaitility.Awaitility; import org.junit.Assert; public class SimpleDataUtil { @@ -277,21 +278,17 @@ public static void assertRecordsEqual(List expected, List actual } /** - * Assert table contains the expected list of records after waiting up to {@code maxCheckCount} - * with {@code checkInterval} + * Assert table contains the expected list of records after waiting up to the configured {@code + * timeout} */ - public static void assertTableRecords( - Table table, List expected, Duration checkInterval, int maxCheckCount) - throws IOException, InterruptedException { - for (int i = 0; i < maxCheckCount; ++i) { - if (equalsRecords(expected, tableRecords(table), table.schema())) { - break; - } else { - Thread.sleep(checkInterval.toMillis()); - } - } - // success or failure, assert on the latest table state - assertRecordsEqual(expected, tableRecords(table), table.schema()); + public static void assertTableRecords(Table table, List expected, Duration timeout) { + Awaitility.await("expected list of records should be produced") + .atMost(timeout) + .untilAsserted( + () -> { + equalsRecords(expected, tableRecords(table), table.schema()); + assertRecordsEqual(expected, tableRecords(table), table.schema()); + }); } public static void assertTableRecords(Table table, List expected) throws IOException { diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 6d26f933b334..31e9733fcd60 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink.source; +import static org.assertj.core.api.Assertions.assertThat; + import java.time.Duration; import java.util.Collection; import java.util.List; @@ -47,6 +49,7 @@ import org.apache.iceberg.flink.data.RowDataToRowMapper; import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Rule; @@ -401,10 +404,11 @@ public static List waitForResult(CloseableIterator iter, int limit) { return results; } - public static void waitUntilJobIsRunning(ClusterClient client) throws Exception { - while (getRunningJobs(client).isEmpty()) { - Thread.sleep(10); - } + public static void waitUntilJobIsRunning(ClusterClient client) { + Awaitility.await("job should be running") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofMillis(10)) + .untilAsserted(() -> assertThat(getRunningJobs(client)).isNotEmpty()); } public static List getRunningJobs(ClusterClient client) throws Exception { diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java index cad1fa67ae19..70e7a79d8373 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java @@ -39,7 +39,6 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; @@ -98,11 +97,6 @@ protected List generateRecords(int numRecords, long seed) { return RandomGenericData.generate(schema(), numRecords, seed); } - protected void assertRecords( - Table table, List expectedRecords, Duration interval, int maxCount) throws Exception { - SimpleDataUtil.assertTableRecords(table, expectedRecords, interval, maxCount); - } - @Test public void testBoundedWithTaskManagerFailover() throws Exception { testBoundedIcebergSource(FailoverType.TM); @@ -156,7 +150,8 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio RecordCounterToFail::continueProcessing, miniClusterResource.getMiniCluster()); - assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } @Test @@ -219,7 +214,8 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep // wait longer for continuous source to reduce flakiness // because CI servers tend to be overloaded. - assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } // ------------------------------------------------------------------------ diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 1b9049f1bbfc..494c633088d9 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -18,12 +18,13 @@ */ package org.apache.iceberg.flink.source; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; @@ -47,6 +48,7 @@ import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -107,18 +109,14 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + awaitExpectedSplits(sourceContext); // Stop the stream task. function.close(); - Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); TestHelpers.assertRecords( sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); } @@ -144,18 +142,14 @@ public void testConsumeFromStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + awaitExpectedSplits(sourceContext); // Stop the stream task. function.close(); - Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); TestHelpers.assertRecords( sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); } @@ -180,18 +174,14 @@ public void testConsumeFromStartTag() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + awaitExpectedSplits(sourceContext); // Stop the stream task. function.close(); - Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); TestHelpers.assertRecords( sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); } @@ -208,20 +198,16 @@ public void testCheckpointRestore() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, func); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + awaitExpectedSplits(sourceContext); state = harness.snapshot(1, 1); // Stop the stream task. func.close(); - Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); TestHelpers.assertRecords( sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); } @@ -234,23 +220,29 @@ public void testCheckpointRestore() throws Exception { harness.initializeState(state); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, newFunc); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + awaitExpectedSplits(sourceContext); // Stop the stream task. newFunc.close(); - Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); TestHelpers.assertRecords( sourceContext.toRows(), Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA); } } + private void awaitExpectedSplits(TestSourceContext sourceContext) { + Awaitility.await("expected splits should be produced") + .atMost(Duration.ofMillis(WAIT_TIME_MILLIS)) + .untilAsserted( + () -> { + assertThat(sourceContext.latch.getCount()).isEqualTo(0); + assertThat(sourceContext.splits).as("Should produce the expected splits").hasSize(1); + }); + } + @Test public void testInvalidMaxPlanningSnapshotCount() { ScanContext scanContext1 =