Skip to content

Commit

Permalink
[FLINK-35066] Fix the unwrap from IterationRecord during keyBy
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Apr 9, 2024
1 parent f08f275 commit 6c971ac
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
@Override
public void setKeyContextElement(StreamRecord<IterationRecord<IN>> record)
throws Exception {
reusedInput.replace(record.getValue(), record.getTimestamp());
input.setKeyContextElement(reusedInput);
if (record.getValue().getType() == IterationRecord.Type.RECORD) {
reusedInput.replace(record.getValue().getValue(), record.getTimestamp());
input.setKeyContextElement(reusedInput);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,37 @@ public TwoInputAllRoundWrapperOperator(
this.reusedInput2 = new StreamRecord<>(null, 0);
}

@Override
public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
setKeyContextElement(record, reusedInput1, wrappedOperator::setKeyContextElement1);
}

@Override
public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
setKeyContextElement(record, reusedInput2, wrappedOperator::setKeyContextElement2);
}

private void setKeyContextElement(StreamRecord<?> record,
StreamRecord<?> reusedInput,
ThrowingConsumer<StreamRecord<?>, Exception> processor) throws Exception {
if (!(record.getValue() instanceof IterationRecord)) {
super.setKeyContextElement1(record);
return;
}

IterationRecord<?> iterationRecord = (IterationRecord<?>)record.getValue();
switch (iterationRecord.getType()) {
case RECORD:
reusedInput.replace(iterationRecord.getValue(), record.getTimestamp());
processor.accept(reusedInput);
break;
case EPOCH_WATERMARK:
break;
default:
throw new FlinkRuntimeException("Not supported iteration record type: " + iterationRecord.getType());
}
}

@Override
public void processElement1(StreamRecord<IterationRecord<IN1>> element) throws Exception {
processElement(element, 0, reusedInput1, wrappedOperator::processElement1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -75,9 +76,9 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
MultipleInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO), 1, new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO), 1, new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO), 1, new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.iteration.operator.OperatorWrapper;
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
OneInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO), 1, new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.iteration.operator.WrapperOperatorFactory;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
Expand Down Expand Up @@ -74,8 +75,8 @@ public void testProcessElementsAndEpochWatermarks() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
TwoInputStreamTask::new,
new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO), 1, new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO), 1, new ProxyKeySelector<Integer, Integer>(x -> x % 2))
.setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
.build()) {
harness.processElement(new StreamRecord<>(IterationRecord.newRecord(5, 1), 2), 0);
Expand Down

0 comments on commit 6c971ac

Please sign in to comment.