diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java index ec213c69593..c17a1d580c0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java @@ -17,18 +17,20 @@ package org.apache.seatunnel.api.serialization; +import org.apache.seatunnel.common.utils.SerializationUtils; + import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.io.Serializable; -public class DefaultSerializer implements Serializer { +public class DefaultSerializer implements Serializer { @Override - public byte[] serialize(String obj) throws IOException { - return obj.getBytes(StandardCharsets.UTF_8); + public byte[] serialize(T obj) throws IOException { + return SerializationUtils.serialize(obj); } @Override - public String deserialize(byte[] serialized) throws IOException { - return new String(serialized); + public T deserialize(byte[] serialized) throws IOException { + return SerializationUtils.deserialize(serialized); } } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java index 0b6e9c521ec..43dead16767 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java @@ -37,4 +37,13 @@ public static T stringToObject(String str) { } return null; } + + public static byte[] serialize(T obj) { + return org.apache.commons.lang3.SerializationUtils.serialize(obj); + } + + public static T deserialize(byte[] bytes) { + return org.apache.commons.lang3.SerializationUtils.deserialize(bytes); + } + } diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java index 8aeb4537b75..cb45b70dc1a 100644 --- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java +++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java @@ -20,7 +20,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.common.utils.SerializationUtils; -import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Row; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; @@ -31,9 +31,16 @@ public class SparkSinkInjector { private static final String SPARK_SINK_CLASS_NAME = "org.apache.seatunnel.translation.spark.sink.SparkSink"; - public static DataStreamWriter inject(Dataset dataset, SeaTunnelSink sink, + public static DataStreamWriter inject(DataStreamWriter dataset, SeaTunnelSink sink, HashMap configuration) { - return dataset.writeStream().format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append()) + return dataset.format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append()) + .option("configuration", SerializationUtils.objectToString(configuration)).option("sink", + SerializationUtils.objectToString(sink)); + } + + public static DataFrameWriter inject(DataFrameWriter dataset, SeaTunnelSink sink, + HashMap configuration) { + return dataset.format(SPARK_SINK_CLASS_NAME) .option("configuration", SerializationUtils.objectToString(configuration)).option("sink", SerializationUtils.objectToString(sink)); }