diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java index accaf7c99cc176..9e2f803b3a0afc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java @@ -411,34 +411,6 @@ public void setIdleStateRetention(Duration duration) { configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, duration); } - /** - * NOTE: Currently the concept of min/max idle state retention has been deprecated and only idle - * state retention time is supported. The min idle state retention is regarded as idle state - * retention and the max idle state retention is derived from idle state retention as 1.5 x idle - * state retention. - * - * @return The minimum time until state which was not updated will be retained. - * @deprecated use{@link getIdleStateRetention} instead. - */ - @Deprecated - public long getMinIdleStateRetentionTime() { - return configuration.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis(); - } - - /** - * NOTE: Currently the concept of min/max idle state retention has been deprecated and only idle - * state retention time is supported. The min idle state retention is regarded as idle state - * retention and the max idle state retention is derived from idle state retention as 1.5 x idle - * state retention. - * - * @return The maximum time until state which was not updated will be retained. - * @deprecated use{@link getIdleStateRetention} instead. - */ - @Deprecated - public long getMaxIdleStateRetentionTime() { - return getMinIdleStateRetentionTime() * 3 / 2; - } - /** * @return The duration until state which was not updated will be retained. */ diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala index 6621a32e2afb41..025661f7d93e7a 100644 --- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala +++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala @@ -49,7 +49,8 @@ class StreamTableEnvironmentImplTest { tEnv.toDataStream(table) assertThat(tEnv.getConfig.getIdleStateRetention.toMillis).isEqualTo(retention.toMillis) - assertThat(tEnv.getConfig.getIdleStateRetention.toMillis * 3 / 2).isEqualTo(retention.toMillis * 3 / 2) + assertThat(tEnv.getConfig.getIdleStateRetention.toMillis * 3 / 2) + .isEqualTo(retention.toMillis * 3 / 2) } @Test @@ -64,7 +65,8 @@ class StreamTableEnvironmentImplTest { tEnv.toRetractStream[Row](table) assertThat(tEnv.getConfig.getIdleStateRetention.toMillis).isEqualTo(retention.toMillis) - assertThat(tEnv.getConfig.getIdleStateRetention.toMillis * 3 / 2).isEqualTo(retention.toMillis * 3 / 2) + assertThat(tEnv.getConfig.getIdleStateRetention.toMillis * 3 / 2) + .isEqualTo(retention.toMillis * 3 / 2) } private def getStreamTableEnvironment( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java index 1ec65c852e3708..b575530dc10daa 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java @@ -45,7 +45,6 @@ import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.planner.plan.utils.OverAggregateUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction; @@ -320,7 +319,7 @@ private KeyedProcessFunction createUnboundedOverProce // ROWS unbounded over process function return new RowTimeRowsUnboundedPrecedingFunction<>( config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), + config.getStateRetentionTime() * 3 / 2, genAggsHandler, flattenAccTypes, fieldTypes, @@ -329,7 +328,7 @@ private KeyedProcessFunction createUnboundedOverProce // RANGE unbounded over process function return new RowTimeRangeUnboundedPrecedingFunction<>( config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), + config.getStateRetentionTime() * 3 / 2, genAggsHandler, flattenAccTypes, fieldTypes, @@ -407,7 +406,7 @@ private KeyedProcessFunction createBoundedOverProcess if (isRowsClause) { return new RowTimeRowsBoundedPrecedingFunction<>( config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), + config.getStateRetentionTime() * 3 / 2, genAggsHandler, flattenAccTypes, fieldTypes, @@ -421,7 +420,7 @@ private KeyedProcessFunction createBoundedOverProcess if (isRowsClause) { return new ProcTimeRowsBoundedPrecedingFunction<>( config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), + config.getStateRetentionTime() * 3 / 2, genAggsHandler, flattenAccTypes, fieldTypes, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java index d3b66c4d2ff646..c646540a63fdbf 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java @@ -42,7 +42,6 @@ import org.apache.flink.table.planner.plan.utils.AggregateUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.dataview.DataViewSpec; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -186,7 +185,7 @@ protected Transformation translateToPlanInternal( pythonFunctionInfos, dataViewSpecs, config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), + config.getStateRetentionTime() * 3 / 2, inputCountIndex, countStarInserted); // partitioned aggregation diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java index 1f425f1d1c39b2..810e96e8e3cb19 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java @@ -42,7 +42,6 @@ import org.apache.flink.table.planner.plan.utils.AggregateUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.dataview.DataViewSpec; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -142,7 +141,7 @@ protected Transformation translateToPlanInternal( pythonFunctionInfos, dataViewSpecs, config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), + config.getStateRetentionTime() * 3 / 2, generateUpdateBefore, inputCountIndex); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java index d9e9e55dad84da..7a8db503e3bf96 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java @@ -46,7 +46,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.planner.plan.utils.OverAggregateUtil; -import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.generated.GeneratedProjection; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -209,7 +208,7 @@ protected Transformation translateToPlanInternal( precedingOffset, group.isRows(), config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), + config.getStateRetentionTime() * 3 / 2, pythonConfig, config, planner.getFlinkContext().getClassLoader()); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java index eb6f68a2170903..e81f20001dad82 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java @@ -44,7 +44,6 @@ import org.apache.flink.table.planner.plan.utils.JoinUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.join.FlinkJoinType; @@ -263,7 +262,7 @@ private TwoInputStreamOperator createJoinOperator( boolean isLeftOuterJoin = joinSpec.getJoinType() == FlinkJoinType.LEFT; long minRetentionTime = config.getStateRetentionTime(); - long maxRetentionTime = TableConfigUtils.getMaxIdleStateRetentionTime(config); + long maxRetentionTime = config.getStateRetentionTime() * 3 / 2; if (rightTimeAttributeIndex >= 0) { return new TemporalRowTimeJoinOperator( InternalTypeInfo.of(leftInputType), diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/TableConfigUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/TableConfigUtils.java index 59d00097ed9ec7..ceaa43c1c4a8d2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/TableConfigUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/TableConfigUtils.java @@ -21,7 +21,6 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.config.AggregatePhaseStrategy; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.planner.calcite.CalciteConfig; import org.apache.flink.table.planner.calcite.CalciteConfig$; @@ -105,16 +104,6 @@ public static ZoneId getLocalTimeZone(ReadableConfig tableConfig) { return ZoneId.of(zone); } - /** - * Similar to {@link TableConfig#getMaxIdleStateRetentionTime()}. - * - * @see TableConfig#getMaxIdleStateRetentionTime() - */ - @Deprecated - public static long getMaxIdleStateRetentionTime(ReadableConfig tableConfig) { - return tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis() * 3 / 2; - } - // Make sure that we cannot instantiate this class private TableConfigUtils() {} }