Skip to content

Commit

Permalink
[FLINK-35066] Fix the unwrap from IterationRecord during keyBy
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Apr 9, 2024
1 parent f08f275 commit 90bf4ad
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
@Override
public void setKeyContextElement(StreamRecord<IterationRecord<IN>> record)
throws Exception {
reusedInput.replace(record.getValue(), record.getTimestamp());
input.setKeyContextElement(reusedInput);
if (record.getValue().getType() == IterationRecord.Type.RECORD) {
reusedInput.replace(record.getValue().getValue(), record.getTimestamp());
input.setKeyContextElement(reusedInput);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,40 @@ public TwoInputAllRoundWrapperOperator(
this.reusedInput2 = new StreamRecord<>(null, 0);
}

@Override
public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
setKeyContextElement(record, reusedInput1, wrappedOperator::setKeyContextElement1);
}

@Override
public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
setKeyContextElement(record, reusedInput2, wrappedOperator::setKeyContextElement2);
}

private void setKeyContextElement(
StreamRecord<?> record,
StreamRecord<?> reusedInput,
ThrowingConsumer<StreamRecord<?>, Exception> processor)
throws Exception {
if (!(record.getValue() instanceof IterationRecord)) {
super.setKeyContextElement1(record);
return;
}

IterationRecord<?> iterationRecord = (IterationRecord<?>) record.getValue();
switch (iterationRecord.getType()) {
case RECORD:
reusedInput.replace(iterationRecord.getValue(), record.getTimestamp());
processor.accept(reusedInput);
break;
case EPOCH_WATERMARK:
break;
default:
throw new FlinkRuntimeException(
"Not supported iteration record type: " + iterationRecord.getType());
}
}

@Override
public void processElement1(StreamRecord<IterationRecord<IN1>> element) throws Exception {
processElement(element, 0, reusedInput1, wrappedOperator::processElement1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void setKeyContextElement(StreamRecord<IterationRecord<IN>> element)
if (element.getValue().getType() == IterationRecord.Type.RECORD) {
// Ensures the operators are created.
getWrappedOperator(element.getValue().getEpoch());
reusedInput.replace(element.getValue(), element.getTimestamp());
reusedInput.replace(element.getValue().getValue(), element.getTimestamp());
operatorInputsByEpoch
.get(element.getValue().getEpoch())
.get(inputIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -75,9 +76,18 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
MultipleInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.iteration.operator.OperatorWrapper;
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -79,7 +80,10 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
OneInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -74,8 +75,14 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
TwoInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.operator.allround.LifeCycle;
import org.apache.flink.iteration.operator.allround.OneInputAllRoundWrapperOperator;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -77,9 +78,18 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
MultipleInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.iteration.operator.OperatorWrapper;
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.operator.allround.LifeCycle;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -93,7 +94,10 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
OneInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.operator.allround.LifeCycle;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -75,8 +76,14 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
TwoInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
1,
new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2), 0);
Expand Down

0 comments on commit 90bf4ad

Please sign in to comment.