diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala index 806d2f19f6f5c..29f40df83f24a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala @@ -379,6 +379,8 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + val clock = new StreamManualClock + val inputData = MemoryStream[InitInputRow] val kvDataSet = inputData.toDS() .groupByKey(x => x.key) @@ -390,10 +392,12 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest TimeMode.None(), OutputMode.Append(), initStateDf) testStream(query, OutputMode.Update())( + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), // non-exist key test AddData(inputData, InitInputRow("k1", "update", 37.0)), AddData(inputData, InitInputRow("k2", "update", 40.0)), AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("non-exist", "getOption", -1.0)), Execute { q => assert(q.lastProgress @@ -402,59 +406,80 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest AddData(inputData, InitInputRow("k1", "appendList", 37.0)), AddData(inputData, InitInputRow("k2", "appendList", 40.0)), AddData(inputData, InitInputRow("non-exist", "getList", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(), AddData(inputData, InitInputRow("k1", "incCount", 37.0)), AddData(inputData, InitInputRow("k2", "incCount", 40.0)), AddData(inputData, InitInputRow("non-exist", "getCount", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("non-exist", "getCount", 0.0)), + AddData(inputData, InitInputRow("k2", "incCount", 40.0)), AddData(inputData, InitInputRow("k2", "getCount", 40.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("k2", "getCount", 2.0)), // test every row in initial State is processed AddData(inputData, InitInputRow("init_1", "getOption", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_1", "getOption", 40.0)), + AddData(inputData, InitInputRow("init_2", "getOption", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_2", "getOption", 100.0)), AddData(inputData, InitInputRow("init_1", "getList", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_1", "getList", 40.0)), + AddData(inputData, InitInputRow("init_2", "getList", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_2", "getList", 100.0)), AddData(inputData, InitInputRow("init_1", "getCount", 40.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_1", "getCount", 1.0)), + AddData(inputData, InitInputRow("init_2", "getCount", 100.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_2", "getCount", 1.0)), // Update row with key in initial row will work AddData(inputData, InitInputRow("init_1", "update", 50.0)), AddData(inputData, InitInputRow("init_1", "getOption", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_1", "getOption", 50.0)), + AddData(inputData, InitInputRow("init_1", "remove", -1.0)), AddData(inputData, InitInputRow("init_1", "getOption", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_1", "getOption", -1.0)), AddData(inputData, InitInputRow("init_1", "appendList", 50.0)), AddData(inputData, InitInputRow("init_1", "getList", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_1", "getList", 50.0), ("init_1", "getList", 40.0)), AddData(inputData, InitInputRow("init_1", "incCount", 40.0)), AddData(inputData, InitInputRow("init_1", "getCount", 40.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_1", "getCount", 2.0)), // test remove AddData(inputData, InitInputRow("k1", "remove", -1.0)), AddData(inputData, InitInputRow("k1", "getOption", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("k1", "getOption", -1.0)), AddData(inputData, InitInputRow("init_1", "clearCount", -1.0)), AddData(inputData, InitInputRow("init_1", "getCount", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer(("init_1", "getCount", 0.0)), AddData(inputData, InitInputRow("init_1", "clearList", -1.0)), AddData(inputData, InitInputRow("init_1", "getList", -1.0)), + AdvanceManualClock(1 * 1000), CheckNewAnswer() ) }