diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java index fe619944c8a..ffdd9421d6b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.io.Serializable; -public interface Serializer extends Serializable{ +public interface Serializer extends Serializable { /** * Serializes the given object. diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java new file mode 100644 index 00000000000..65500b719b5 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.serialization; + +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.translation.flink.sink.CommitWrapper; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class CommitWrapperSerializer implements SimpleVersionedSerializer> { + private final Serializer serializer; + + public CommitWrapperSerializer(Serializer serializer) { + this.serializer = serializer; + } + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(CommitWrapper commitWrapper) throws IOException { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + byte[] serialize = serializer.serialize(commitWrapper.getCommit()); + out.writeInt(serialize.length); + out.write(serialize); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public CommitWrapper deserialize(int version, byte[] serialized) throws IOException { + try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + final DataInputStream in = new DataInputStream(bais)) { + final int size = in.readInt(); + final byte[] stateBytes = new byte[size]; + in.read(stateBytes); + T commitT = serializer.deserialize(stateBytes); + return new CommitWrapper<>(commitT); + } + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java new file mode 100644 index 00000000000..1b91085af34 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.sink; + +public class CommitWrapper { + private final CommitT commit; + + public CommitWrapper(CommitT commit) { + this.commit = commit; + } + + public CommitT getCommit() { + return commit; + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java index 6e4a1902447..4cdfeb30124 100644 --- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java @@ -26,8 +26,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; -public class FlinkCommitter implements Committer { +public class FlinkCommitter implements Committer> { private static final Logger LOGGER = LoggerFactory.getLogger(FlinkCommitter.class); @@ -38,8 +39,10 @@ public class FlinkCommitter implements Committer { } @Override - public List commit(List committables) throws IOException { - List reCommittable = sinkCommitter.commit(committables); + public List> commit(List> committables) throws IOException { + List reCommittable = sinkCommitter.commit(committables.stream() + .map(CommitWrapper::getCommit) + .collect(Collectors.toList())); if (reCommittable != null && !reCommittable.isEmpty()) { LOGGER.warn("this version not support re-commit when use flink engine"); } diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java index 774b8e62194..f71634351c1 100644 --- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java @@ -27,8 +27,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; -public class FlinkGlobalCommitter implements GlobalCommitter { +public class FlinkGlobalCommitter implements GlobalCommitter, GlobalCommT> { private static final Logger LOGGER = LoggerFactory.getLogger(FlinkGlobalCommitter.class); @@ -44,8 +45,10 @@ public List filterRecoveredCommittables(List globalCommittables) th } @Override - public GlobalCommT combine(List committables) throws IOException { - return aggregatedCommitter.combine(committables); + public GlobalCommT combine(List> committables) throws IOException { + return aggregatedCommitter.combine(committables.stream() + .map(CommitWrapper::getCommit) + .collect(Collectors.toList())); } @Override diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java index 5f755eca8b1..c90fd86eb75 100644 --- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.sink.DefaultSinkWriterContext; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer; import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer; import org.apache.seatunnel.translation.flink.serialization.FlinkWriterStateSerializer; @@ -36,7 +37,7 @@ import java.util.stream.Collectors; @SuppressWarnings("unchecked") -public class FlinkSink implements Sink implements Sink, FlinkWriterState, GlobalCommT> { private final SeaTunnelSink sink; @@ -49,7 +50,7 @@ public FlinkSink(SeaTunnelSink s } @Override - public SinkWriter> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List> states) throws IOException { + public SinkWriter, FlinkWriterState> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List> states) throws IOException { // TODO add subtask and parallelism. org.apache.seatunnel.api.sink.SinkWriter.Context stContext = new DefaultSinkWriterContext(configuration, 0, 0); @@ -63,18 +64,18 @@ public SinkWriter> createWriter(or } @Override - public Optional> createCommitter() throws IOException { + public Optional>> createCommitter() throws IOException { return sink.createCommitter().map(FlinkCommitter::new); } @Override - public Optional> createGlobalCommitter() throws IOException { + public Optional, GlobalCommT>> createGlobalCommitter() throws IOException { return sink.createAggregatedCommitter().map(FlinkGlobalCommitter::new); } @Override - public Optional> getCommittableSerializer() { - return sink.getCommitInfoSerializer().map(FlinkSimpleVersionedSerializer::new); + public Optional>> getCommittableSerializer() { + return sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new); } @Override diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java index cc6d7659273..5e221a289c6 100644 --- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java @@ -31,7 +31,7 @@ import java.util.Optional; import java.util.stream.Collectors; -public class FlinkSinkWriter implements SinkWriter> { +public class FlinkSinkWriter implements SinkWriter, FlinkWriterState> { private final org.apache.seatunnel.api.sink.SinkWriter sinkWriter; private final FlinkRowConverter rowSerialization; @@ -55,9 +55,9 @@ public void write(InputT element, org.apache.flink.api.connector.sink.SinkWriter } @Override - public List prepareCommit(boolean flush) throws IOException { + public List> prepareCommit(boolean flush) throws IOException { Optional commTOptional = sinkWriter.prepareCommit(); - return commTOptional.map(Collections::singletonList).orElse(Collections.emptyList()); + return commTOptional.map(CommitWrapper::new).map(Collections::singletonList).orElse(Collections.emptyList()); } @Override