From 1b32d03cb044da2c96cd5c11e1ced2a8edd5f6ed Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Mon, 16 May 2022 10:22:10 +0800 Subject: [PATCH] Add boundedness to source reader context, so that we can control to stop this reader when produce data --- pom.xml | 43 +++++++++++++---- seatunnel-api/pom.xml | 4 -- .../seatunnel/api/source/Boundedness.java | 4 ++ .../seatunnel/api/source/Collector.java | 5 ++ .../seatunnel/api/source/SeaTunnelSource.java | 39 ++++++++++++++- .../seatunnel/api/source/SourceEvent.java | 2 +- .../seatunnel/api/source/SourceReader.java | 39 ++++++++++++++- .../api/source/SourceSplitEnumerator.java | 12 +++++ .../org/apache/seatunnel/api/state/State.java | 4 +- .../seatunnel/api/table/factory/Factory.java | 3 ++ .../api/table/factory/TableSinkFactory.java | 9 ++++ .../api/table/factory/TableSourceFactory.java | 4 ++ .../common/utils/SerializationUtils.java | 8 ++-- .../seatunnel/console/sink/ConsoleSink.java | 3 ++ ...rg.apache.seatunnel.api.sink.SeaTunnelSink | 17 ------- .../seatunnel/fake/source/FakeSource.java | 8 +++- .../fake/source/FakeSourceReader.java | 7 ++- .../fake/source/ObjectSerializer.java | 47 ------------------- ...pache.seatunnel.api.source.SeaTunnelSource | 17 ------- seatunnel-dist/release-docs/LICENSE | 3 -- .../src/main/assembly/assembly-bin.xml | 10 ++++ .../source/ParallelReaderContext.java | 9 ++++ .../translation/source/ParallelSource.java | 9 ++-- .../util/ThreadPoolExecutorFactory.java | 1 + tools/dependencies/known-dependencies.txt | 2 - 25 files changed, 195 insertions(+), 114 deletions(-) delete mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink delete mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java delete mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource diff --git a/pom.xml b/pom.xml index 893a8bd3416..7880a2aeb04 100644 --- a/pom.xml +++ b/pom.xml @@ -174,6 +174,8 @@ true 7 1.7.25 + 19.0 + 1.0.1 @@ -607,20 +609,43 @@ ${scala.version} - - org.slf4j - slf4j-api - ${slf4j.version} - - - org.slf4j - slf4j-log4j12 - ${slf4j.version} + com.google.guava + guava + ${guava.version} + + + com.google.auto.service + auto-service + ${auto-service.version} + provided + + + org.projectlombok + lombok + provided + + + junit + junit + test + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + ${project.artifactId}-${project.version}-${scala.version} diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml index f3efb218c20..7b6829a0f93 100644 --- a/seatunnel-api/pom.xml +++ b/seatunnel-api/pom.xml @@ -38,9 +38,5 @@ seatunnel-common ${project.version} - - org.slf4j - slf4j-api - \ No newline at end of file diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java index bceb3cdb105..c7b3fcd9582 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java @@ -17,6 +17,10 @@ package org.apache.seatunnel.api.source; +/** + * Used to define the boundedness of a source. In batch mode, the source is {@link Boundedness#BOUNDED}. + * In streaming mode, the source is {@link Boundedness#UNBOUNDED}. + */ public enum Boundedness { /** * A BOUNDED stream is a stream with finite records. diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java index f36a7f877d3..6bd08eec5e0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java @@ -17,6 +17,11 @@ package org.apache.seatunnel.api.source; +/** + * A {@link Collector} is used to collect data from {@link SourceReader}. + * + * @param data type. + */ public interface Collector { void collect(T record); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java index 729753fca71..7244891a97d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java @@ -27,6 +27,7 @@ * * @param The type of records produced by the source. * @param The type of splits handled by the source. + * @param The type of checkpoint states. */ public interface SeaTunnelSource extends Serializable { @@ -37,14 +38,48 @@ public interface SeaTunnelSource extends */ Boundedness getBoundedness(); + /** + * Create source reader, used to produce data. + * + * @param readerContext reader context. + * @return source reader. + * @throws Exception when create reader failed. + */ SourceReader createReader(SourceReader.Context readerContext) throws Exception; + /** + * Create split serializer, use to serialize/deserialize split generated by {@link SourceSplitEnumerator}. + * + * @return split serializer. + */ Serializer getSplitSerializer(); - SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception; + /** + * Create source split enumerator, used to generate splits. This method will be called only once when start a source. + * + * @param enumeratorContext enumerator context. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) + throws Exception; - SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, StateT checkpointState) throws Exception; + /** + * Create source split enumerator, used to generate splits. This method will be called when restore from checkpoint. + * + * @param enumeratorContext enumerator context. + * @param checkpointState checkpoint state. + * @return source split enumerator. + * @throws Exception when create enumerator failed. + */ + SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, + StateT checkpointState) throws Exception; + /** + * Create enumerator state serializer, used to serialize/deserialize checkpoint state. + * + * @return enumerator state serializer. + */ Serializer getEnumeratorStateSerializer(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java index 4d2374b4738..2f4558fbcf0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java @@ -20,7 +20,7 @@ import java.io.Serializable; /** - * An base class for the events passed between the SourceReaders and Enumerators. + * A base class for the events passed between the {@link SourceReader} and {@link SourceSplitEnumerator}. */ public interface SourceEvent extends Serializable { } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java index 8131e835333..f43b5a55413 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java @@ -22,8 +22,17 @@ import java.io.IOException; import java.util.List; +/** + * The {@link SourceReader} is used to generate source record, and it will be running at worker. + * + * @param record type. + * @param source split type. + */ public interface SourceReader extends AutoCloseable, CheckpointListener { + /** + * Open the source reader. + */ void open(); /** @@ -33,10 +42,28 @@ public interface SourceReader extends AutoCloseab @Override void close() throws IOException; + /** + * Generate the next batch of records. + * + * @param output output collector. + * @throws Exception if error occurs. + */ void pollNext(Collector output) throws Exception; + /** + * Get the current split checkpoint state by checkpointId. + * + * @param checkpointId checkpoint Id. + * @return split checkpoint state. + * @throws Exception if error occurs. + */ List snapshotState(long checkpointId) throws Exception; + /** + * Add the split checkpoint state to reader. + * + * @param splits split checkpoint state. + */ void addSplits(List splits); /** @@ -48,6 +75,11 @@ public interface SourceReader extends AutoCloseab */ void handleNoMoreSplits(); + /** + * Handle the source event form {@link SourceSplitEnumerator}. + * + * @param sourceEvent source event. + */ default void handleSourceEvent(SourceEvent sourceEvent) { } @@ -59,7 +91,12 @@ interface Context { int getIndexOfSubtask(); /** - * Indicator that the input has reached the end of data. + * @return boundedness of this reader. + */ + Boundedness getBoundedness(); + + /** + * Indicator that the input has reached the end of data. Then will cancel this reader. */ void signalNoMoreElement(); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java index 77e75c8f68c..4ad8df6afc0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java @@ -24,6 +24,12 @@ import java.util.List; import java.util.Set; +/** + * The {@link SourceSplitEnumerator} is responsible for enumerating the splits of a source. It will run at master. + * + * @param source split type + * @param source split state type + */ public interface SourceSplitEnumerator extends AutoCloseable, CheckpointListener { void open(); @@ -52,6 +58,12 @@ public interface SourceSplitEnumerator exten StateT snapshotState(long checkpointId) throws Exception; + /** + * Handle the source event from {@link SourceReader}. + * + * @param subtaskId The id of the subtask to which the source event from. + * @param sourceEvent source event. + */ default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java index d7e43d4420d..4384100b568 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java @@ -17,5 +17,7 @@ package org.apache.seatunnel.api.state; -public interface State { +import java.io.Serializable; + +public interface State extends Serializable { } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java index 59b0fd515fa..cd35609bcef 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.api.table.factory; +/** + * This is the SPI interface. + */ public interface Factory { /** diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java index bdc3d1a642b..8d941885208 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java @@ -19,6 +19,15 @@ import org.apache.seatunnel.api.table.connector.TableSink; +/** + * This is an SPI interface, used to create {@link TableSink}. Each plugin need to have it own implementation. + * todo: now we have not use this interface, we directly use {@link org.apache.seatunnel.api.sink.SeaTunnelSink} as the SPI interface. + * + * @param row type + * @param state type + * @param commit info type + * @param aggregated commit info type + */ public interface TableSinkFactory extends Factory { TableSink createSink(TableFactoryContext context); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java index 2206a6bb9ac..a75236e0cec 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java @@ -20,6 +20,10 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.connector.TableSource; +/** + * This is an SPI interface, used to create {@link TableSource}. Each plugin need to have it own implementation. + * todo: now we have not use this interface, we directly use {@link org.apache.seatunnel.api.source.SeaTunnelSource} as the SPI interface + */ public interface TableSourceFactory extends Factory { TableSource createSource(TableFactoryContext context); diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java index 43dead16767..a1d0aa96b42 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java @@ -26,24 +26,24 @@ public class SerializationUtils { public static String objectToString(Serializable obj) { if (obj != null) { - return Base64.encodeBase64String(org.apache.commons.lang3.SerializationUtils.serialize(obj)); + return Base64.encodeBase64String(SerializationUtils.serialize(obj)); } return null; } public static T stringToObject(String str) { if (StringUtils.isNotEmpty(str)) { - return org.apache.commons.lang3.SerializationUtils.deserialize(Base64.decodeBase64(str)); + return SerializationUtils.deserialize(Base64.decodeBase64(str)); } return null; } public static byte[] serialize(T obj) { - return org.apache.commons.lang3.SerializationUtils.serialize(obj); + return SerializationUtils.serialize(obj); } public static T deserialize(byte[] bytes) { - return org.apache.commons.lang3.SerializationUtils.deserialize(bytes); + return SerializationUtils.deserialize(bytes); } } diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java index d7eab44b555..ea7f69cb62f 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java @@ -22,8 +22,11 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState; +import com.google.auto.service.AutoService; + import java.util.List; +@AutoService(SeaTunnelSink.class) public class ConsoleSink implements SeaTunnelSink { @Override diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink deleted file mode 100644 index 12b499834f7..00000000000 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink \ No newline at end of file diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java index 9f18668ba54..9c12e4da39b 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source; +import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; @@ -25,6 +26,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState; +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSource.class) public class FakeSource implements SeaTunnelSource { @Override @@ -39,7 +43,7 @@ public SourceReader createReader(SourceReader.Con @Override public Serializer getSplitSerializer() { - return new ObjectSerializer<>(); + return new DefaultSerializer<>(); } @Override @@ -57,6 +61,6 @@ public SourceSplitEnumerator restoreEnumerator( @Override public Serializer getEnumeratorStateSerializer() { - return new ObjectSerializer<>(); + return new DefaultSerializer<>(); } } diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java index bf2cb2f67ce..e6bdd0d49bd 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -57,7 +58,6 @@ public void close() { @Override @SuppressWarnings("magicnumber") public void pollNext(Collector output) throws InterruptedException { - Thread.sleep(1000L); int i = random.nextInt(names.length); Map fieldMap = new HashMap<>(4); fieldMap.put("name", names[i]); @@ -65,6 +65,11 @@ public void pollNext(Collector output) throws InterruptedException fieldMap.put("timestamp", System.currentTimeMillis()); SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[i], ages[i], System.currentTimeMillis()}, fieldMap); output.collect(seaTunnelRow); + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + context.signalNoMoreElement(); + } + Thread.sleep(1000L); } @Override diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java deleted file mode 100644 index bd53b4a2596..00000000000 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/ObjectSerializer.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.fake.source; - -import org.apache.seatunnel.api.serialization.Serializer; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -public class ObjectSerializer implements Serializer { - - @Override - public byte[] serialize(T obj) throws IOException { - try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) { - objectOutputStream.writeObject(obj); - return byteArrayOutputStream.toByteArray(); - } - } - - @Override - public T deserialize(byte[] serialized) throws IOException { - try { - return (T) new ObjectInputStream(new ByteArrayInputStream(serialized)).readObject(); - } catch (ClassNotFoundException e) { - throw new RuntimeException("deserialize split error", e); - } - } -} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource deleted file mode 100644 index b21a058146e..00000000000 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource \ No newline at end of file diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE index 636eb0d1a9e..0c696160250 100644 --- a/seatunnel-dist/release-docs/LICENSE +++ b/seatunnel-dist/release-docs/LICENSE @@ -755,9 +755,6 @@ The text of each license is the standard Apache 2.0 license. (The Apache Software License, Version 2.0) Google HTTP Client Library for Java (com.google.http-client:google-http-client:1.26.0 - https://github.com/googleapis/google-http-java-client/google-http-client) (The Apache Software License, Version 2.0) Google OAuth Client Library for Java (com.google.oauth-client:google-oauth-client:1.26.0 - https://github.com/googleapis/google-oauth-java-client/google-oauth-client) (The Apache Software License, Version 2.0) Gson (com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/) - (The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:11.0.2 - http://code.google.com/p/guava-libraries/guava) - (The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:13.0.1 - http://code.google.com/p/guava-libraries/guava) - (The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:16.0.1 - http://code.google.com/p/guava-libraries/guava) (The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava) (The Apache Software License, Version 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc) (The Apache Software License, Version 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc) diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml index aaa25e23820..aa5380bc543 100644 --- a/seatunnel-dist/src/main/assembly/assembly-bin.xml +++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml @@ -108,6 +108,16 @@ /connectors/spark + + ../seatunnel-connectors/seatunnel-connectors-seatunnel-dist/target/lib + + seatunnel-connector-seatunnel*.jar + + + %regex[.*((javadoc)|(sources))\.jar] + + /connectors/seatunnel + ../seatunnel-connectors diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java index 0535645f8ec..b2667ea8fea 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java @@ -17,17 +17,20 @@ package org.apache.seatunnel.translation.source; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SourceEvent; import org.apache.seatunnel.api.source.SourceReader; public class ParallelReaderContext implements SourceReader.Context { protected final ParallelSource parallelSource; + protected final Boundedness boundedness; protected final Integer subtaskId; public ParallelReaderContext(ParallelSource parallelSource, Integer subtaskId) { this.parallelSource = parallelSource; + this.boundedness = parallelSource.source.getBoundedness(); this.subtaskId = subtaskId; } @@ -36,8 +39,14 @@ public int getIndexOfSubtask() { return subtaskId; } + @Override + public Boundedness getBoundedness() { + return boundedness; + } + @Override public void signalNoMoreElement() { + // todo: if we have multiple subtasks, we need to know if all subtask is stopped parallelSource.handleNoMoreElement(); } diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java index af67eb66cf0..59c51eb4638 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java @@ -30,10 +30,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class ParallelSource implements AutoCloseable, CheckpointListener { + private final long splitEnumeratorTimeInterval = 5L; + protected final SeaTunnelSource source; protected final ParallelEnumeratorContext parallelEnumeratorContext; protected final ParallelReaderContext readerContext; @@ -47,7 +50,7 @@ public class ParallelSource implements Au protected transient volatile SourceSplitEnumerator splitEnumerator; protected transient volatile SourceReader reader; - protected transient volatile ExecutorService executorService; + protected transient volatile ScheduledThreadPoolExecutor executorService; /** * Flag indicating whether the consumer is still running. @@ -99,7 +102,7 @@ public void open() throws Exception { } public void run(Collector collector) throws Exception { - executorService.execute(() -> splitEnumerator.run()); + executorService.scheduleAtFixedRate(() -> splitEnumerator.run(), 0L, splitEnumeratorTimeInterval, TimeUnit.SECONDS); while (running) { reader.pollNext(collector); } diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java index cd5dffa6fe5..7ce81a8f98c 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java @@ -28,6 +28,7 @@ public static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(int AtomicInteger cnt = new AtomicInteger(0); return new ScheduledThreadPoolExecutor(corePoolSize, runnable -> { Thread thread = new Thread(runnable); + thread.setDaemon(true); thread.setName(name + "-" + cnt.incrementAndGet()); return thread; }); diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 9d9b4b7bc48..0b2f98228f7 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -188,8 +188,6 @@ google-http-client-1.26.0.jar google-http-client-jackson2-1.26.0.jar google-oauth-client-1.26.0.jar gson-2.2.4.jar -guava-13.0.1.jar -guava-16.0.1.jar guava-19.0.jar guice-3.0.jar guice-4.1.0.jar