Skip to content

Commit

Permalink
remove deprecated functions
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 committed Dec 11, 2024
1 parent 1e22194 commit e1db084
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,34 +411,6 @@ public void setIdleStateRetention(Duration duration) {
configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, duration);
}

/**
* NOTE: Currently the concept of min/max idle state retention has been deprecated and only idle
* state retention time is supported. The min idle state retention is regarded as idle state
* retention and the max idle state retention is derived from idle state retention as 1.5 x idle
* state retention.
*
* @return The minimum time until state which was not updated will be retained.
* @deprecated use{@link getIdleStateRetention} instead.
*/
@Deprecated
public long getMinIdleStateRetentionTime() {
return configuration.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis();
}

/**
* NOTE: Currently the concept of min/max idle state retention has been deprecated and only idle
* state retention time is supported. The min idle state retention is regarded as idle state
* retention and the max idle state retention is derived from idle state retention as 1.5 x idle
* state retention.
*
* @return The maximum time until state which was not updated will be retained.
* @deprecated use{@link getIdleStateRetention} instead.
*/
@Deprecated
public long getMaxIdleStateRetentionTime() {
return getMinIdleStateRetentionTime() * 3 / 2;
}

/**
* @return The duration until state which was not updated will be retained.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class StreamTableEnvironmentImplTest {
tEnv.toDataStream(table)

assertThat(tEnv.getConfig.getIdleStateRetention.toMillis).isEqualTo(retention.toMillis)
assertThat(tEnv.getConfig.getIdleStateRetention.toMillis * 3 / 2).isEqualTo(retention.toMillis * 3 / 2)
assertThat(tEnv.getConfig.getIdleStateRetention.toMillis * 3 / 2)
.isEqualTo(retention.toMillis * 3 / 2)
}

@Test
Expand All @@ -64,7 +65,8 @@ class StreamTableEnvironmentImplTest {
tEnv.toRetractStream[Row](table)

assertThat(tEnv.getConfig.getIdleStateRetention.toMillis).isEqualTo(retention.toMillis)
assertThat(tEnv.getConfig.getIdleStateRetention.toMillis * 3 / 2).isEqualTo(retention.toMillis * 3 / 2)
assertThat(tEnv.getConfig.getIdleStateRetention.toMillis * 3 / 2)
.isEqualTo(retention.toMillis * 3 / 2)
}

private def getStreamTableEnvironment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction;
Expand Down Expand Up @@ -320,7 +319,7 @@ private KeyedProcessFunction<RowData, RowData, RowData> createUnboundedOverProce
// ROWS unbounded over process function
return new RowTimeRowsUnboundedPrecedingFunction<>(
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
config.getStateRetentionTime() * 3 / 2,
genAggsHandler,
flattenAccTypes,
fieldTypes,
Expand All @@ -329,7 +328,7 @@ private KeyedProcessFunction<RowData, RowData, RowData> createUnboundedOverProce
// RANGE unbounded over process function
return new RowTimeRangeUnboundedPrecedingFunction<>(
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
config.getStateRetentionTime() * 3 / 2,
genAggsHandler,
flattenAccTypes,
fieldTypes,
Expand Down Expand Up @@ -407,7 +406,7 @@ private KeyedProcessFunction<RowData, RowData, RowData> createBoundedOverProcess
if (isRowsClause) {
return new RowTimeRowsBoundedPrecedingFunction<>(
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
config.getStateRetentionTime() * 3 / 2,
genAggsHandler,
flattenAccTypes,
fieldTypes,
Expand All @@ -421,7 +420,7 @@ private KeyedProcessFunction<RowData, RowData, RowData> createBoundedOverProcess
if (isRowsClause) {
return new ProcTimeRowsBoundedPrecedingFunction<>(
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
config.getStateRetentionTime() * 3 / 2,
genAggsHandler,
flattenAccTypes,
fieldTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.dataview.DataViewSpec;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand Down Expand Up @@ -186,7 +185,7 @@ protected Transformation<RowData> translateToPlanInternal(
pythonFunctionInfos,
dataViewSpecs,
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
config.getStateRetentionTime() * 3 / 2,
inputCountIndex,
countStarInserted);
// partitioned aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.dataview.DataViewSpec;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand Down Expand Up @@ -142,7 +141,7 @@ protected Transformation<RowData> translateToPlanInternal(
pythonFunctionInfos,
dataViewSpecs,
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
config.getStateRetentionTime() * 3 / 2,
generateUpdateBefore,
inputCountIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand Down Expand Up @@ -209,7 +208,7 @@ protected Transformation<RowData> translateToPlanInternal(
precedingOffset,
group.isRows(),
config.getStateRetentionTime(),
TableConfigUtils.getMaxIdleStateRetentionTime(config),
config.getStateRetentionTime() * 3 / 2,
pythonConfig,
config,
planner.getFlinkContext().getClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
Expand Down Expand Up @@ -263,7 +262,7 @@ private TwoInputStreamOperator<RowData, RowData, RowData> createJoinOperator(

boolean isLeftOuterJoin = joinSpec.getJoinType() == FlinkJoinType.LEFT;
long minRetentionTime = config.getStateRetentionTime();
long maxRetentionTime = TableConfigUtils.getMaxIdleStateRetentionTime(config);
long maxRetentionTime = config.getStateRetentionTime() * 3 / 2;
if (rightTimeAttributeIndex >= 0) {
return new TemporalRowTimeJoinOperator(
InternalTypeInfo.of(leftInputType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.planner.calcite.CalciteConfig;
import org.apache.flink.table.planner.calcite.CalciteConfig$;
Expand Down Expand Up @@ -105,16 +104,6 @@ public static ZoneId getLocalTimeZone(ReadableConfig tableConfig) {
return ZoneId.of(zone);
}

/**
* Similar to {@link TableConfig#getMaxIdleStateRetentionTime()}.
*
* @see TableConfig#getMaxIdleStateRetentionTime()
*/
@Deprecated
public static long getMaxIdleStateRetentionTime(ReadableConfig tableConfig) {
return tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis() * 3 / 2;
}

// Make sure that we cannot instantiate this class
private TableConfigUtils() {}
}

0 comments on commit e1db084

Please sign in to comment.