From 71105bb26a87c1ca138c90b8054bb29af7b9d204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Sun, 28 Aug 2022 09:46:57 +0800 Subject: [PATCH 1/4] [Feature][Connector-V2] Add Socket Connector Sink --- docs/en/connector-v2/sink/Socket.md | 89 +++++++++ plugin-mapping.properties | 2 + .../connector-socket/pom.xml | 6 + .../seatunnel/socket/config/SinkConfig.java | 25 +++ .../seatunnel/socket/sink/SocketClient.java | 178 ++++++++++++++++++ .../seatunnel/socket/sink/SocketSink.java | 73 +++++++ .../socket/sink/SocketSinkWriter.java | 52 +++++ 7 files changed, 425 insertions(+) create mode 100644 docs/en/connector-v2/sink/Socket.md create mode 100644 seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java create mode 100644 seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java create mode 100644 seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java create mode 100644 seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java diff --git a/docs/en/connector-v2/sink/Socket.md b/docs/en/connector-v2/sink/Socket.md new file mode 100644 index 00000000000..7339f7b0133 --- /dev/null +++ b/docs/en/connector-v2/sink/Socket.md @@ -0,0 +1,89 @@ +# Socket + +> Socket sink connector + +## Description + +Used to send data to Socket Server. Both support streaming and batch mode. +> For example, if the data from upstream is [`age: 12, name: jared`], the content send to socket server is the following: `{"name":"jared","age":17}` + + +## Options + +| name | type | required | default value | +| --- |--------|----------|---------------| +| host | String | Yes | - | +| port | Integer | yes | - | +| max_retries | Integer | No | 3 | + +### host [string] +socket server host + +### port [integer] + +socket server port + +### max_retries [integer] + +The number of retries to send record failed + +## Example + +simple: + +```hocon +Socket { + host = "localhost" + port = 9999 + } +``` + +test: + +* Configuring the SeaTunnel config file + +```hocon +env { + execution.parallelism = 1 + job.mode = "STREAMING" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +transform { + sql = "select name, age from fake" +} + +sink { + Socket { + host = "localhost" + port = 9999 + } +} + +``` + +* Start a port listening + +```shell +nc -l -v 9999 +``` + +* Start a SeaTunnel task + + +* Socket Server Console print data + +```text +{"name":"jared","age":17} +``` diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 0efd44f1f65..7d92053adde 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -120,3 +120,5 @@ seatunnel.source.IoTDB = connector-iotdb seatunnel.sink.IoTDB = connector-iotdb seatunnel.sink.Neo4j = connector-neo4j seatunnel.sink.FtpFile = connector-file-ftp +seatunnel.sink.Socket = connector-socket + diff --git a/seatunnel-connectors-v2/connector-socket/pom.xml b/seatunnel-connectors-v2/connector-socket/pom.xml index dca9bf524fd..e196b24841f 100644 --- a/seatunnel-connectors-v2/connector-socket/pom.xml +++ b/seatunnel-connectors-v2/connector-socket/pom.xml @@ -35,6 +35,12 @@ connector-common ${project.version} + + org.apache.seatunnel + seatunnel-format-json + 2.1.3-SNAPSHOT + compile + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java new file mode 100644 index 00000000000..b509ebf52cd --- /dev/null +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java @@ -0,0 +1,25 @@ +package org.apache.seatunnel.connectors.seatunnel.socket.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class SinkConfig implements Serializable { + public static final String HOST = "host"; + public static final String PORT = "port"; + private static final String MAX_RETRIES = "max_retries"; + private static final int DEFAULT_MAX_RETRIES = 3; + private String host; + private Integer port; + private Integer maxNumRetries = DEFAULT_MAX_RETRIES; + public SinkConfig(Config config) { + this.host = config.getString(HOST); + this.port = config.getInt(PORT); + if (config.hasPath(MAX_RETRIES)) { + this.maxNumRetries = config.getInt(MAX_RETRIES); + } + } +} diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java new file mode 100644 index 00000000000..77a38b1c868 --- /dev/null +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.socket.sink; + +import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; + +@Slf4j +public class SocketClient { + + private final String hostName; + private final int port; + private int retries; + private final int maxNumRetries; + private transient Socket client; + private transient OutputStream outputStream; + private final SerializationSchema serializationSchema; + private volatile boolean isRunning = Boolean.TRUE; + private static final int CONNECTION_RETRY_DELAY = 500; + + public SocketClient(SinkConfig config, SerializationSchema serializationSchema) { + this.hostName = config.getHost(); + this.port = config.getPort(); + this.serializationSchema = serializationSchema; + retries = config.getMaxNumRetries(); + maxNumRetries = config.getMaxNumRetries(); + } + + private void createConnection() throws IOException { + client = new Socket(hostName, port); + client.setKeepAlive(true); + client.setTcpNoDelay(true); + + outputStream = client.getOutputStream(); + } + + public void open() throws IOException { + try { + synchronized (SocketClient.class) { + createConnection(); + } + } catch (IOException e) { + throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e); + } + } + + public void wirte(SeaTunnelRow row) throws IOException { + byte[] msg = serializationSchema.serialize(row); + try { + outputStream.write(msg); + outputStream.flush(); + + } catch (IOException e) { + // if no re-tries are enable, fail immediately + if (maxNumRetries == 0) { + throw new IOException( + "Failed to send message '" + + row + + "' to socket server at " + + hostName + + ":" + + port + + ". Connection re-tries are not enabled.", + e); + } + + log.error( + "Failed to send message '" + + row + + "' to socket server at " + + hostName + + ":" + + port + + ". Trying to reconnect...", + e); + + synchronized (SocketClient.class) { + IOException lastException = null; + retries = 0; + + while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) { + + // first, clean up the old resources + try { + if (outputStream != null) { + outputStream.close(); + } + } catch (IOException ee) { + log.error("Could not close output stream from failed write attempt", ee); + } + try { + if (client != null) { + client.close(); + } + } catch (IOException ee) { + log.error("Could not close socket from failed write attempt", ee); + } + + // try again + retries++; + + try { + // initialize a new connection + createConnection(); + outputStream.write(msg); + return; + } catch (IOException ee) { + lastException = ee; + log.error( + "Re-connect to socket server and send message failed. Retry time(s): " + + retries, + ee); + } + try { + this.wait(CONNECTION_RETRY_DELAY); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "unable to write; interrupted while doing another attempt", e); + } + } + + if (isRunning) { + throw new IOException( + "Failed to send message '" + + row + + "' to socket server at " + + hostName + + ":" + + port + + ". Failed after " + + retries + + " retries.", + lastException); + } + } + } + } + + public void close() throws IOException { + isRunning = false; + synchronized (this) { + this.notifyAll(); + try { + if (outputStream != null) { + outputStream.close(); + } + } finally { + if (client != null) { + client.close(); + } + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java new file mode 100644 index 00000000000..b9534b062b0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.socket.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class SocketSink extends AbstractSimpleSink { + private Config pluginConfig; + private SinkConfig sinkConfig; + private SeaTunnelRowType seaTunnelRowType; + @Override + public String getPluginName() { + return "Socket"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + this.pluginConfig = pluginConfig; + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, SinkConfig.PORT, SinkConfig.HOST); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); + } + sinkConfig = new SinkConfig(pluginConfig); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new SocketSinkWriter(sinkConfig, seaTunnelRowType); + } +} diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java new file mode 100644 index 00000000000..360085a37ee --- /dev/null +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.socket.sink; + +import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig; +import org.apache.seatunnel.format.json.JsonSerializationSchema; + +import java.io.IOException; + +public class SocketSinkWriter extends AbstractSinkWriter { + private final SocketClient socketClient; + + protected final SerializationSchema serializationSchema; + + private final SinkConfig sinkConfig; + SocketSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) throws IOException { + this.sinkConfig = sinkConfig; + this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType); + this.socketClient = new SocketClient(sinkConfig, serializationSchema); + socketClient.open(); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + socketClient.wirte(element); + } + + @Override + public void close() throws IOException { + socketClient.close(); + } + +} From 9467e4c4da8b71b8359deb8f5c3f696635e83d3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Sun, 28 Aug 2022 09:57:38 +0800 Subject: [PATCH 2/4] add License --- .../seatunnel/socket/config/SinkConfig.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java index b509ebf52cd..6e73de34450 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.connectors.seatunnel.socket.config; import org.apache.seatunnel.shade.com.typesafe.config.Config; From 291b931d6a2d9c6ae7c0e35e73ff6267aa793678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Sun, 28 Aug 2022 14:19:12 +0800 Subject: [PATCH 3/4] fix code style --- seatunnel-connectors-v2/connector-socket/pom.xml | 2 +- .../seatunnel/socket/config/SinkConfig.java | 11 ++++++----- .../connectors/seatunnel/socket/sink/SocketSink.java | 1 + .../seatunnel/socket/sink/SocketSinkWriter.java | 6 ++---- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-socket/pom.xml b/seatunnel-connectors-v2/connector-socket/pom.xml index e196b24841f..fab1c5f3ec2 100644 --- a/seatunnel-connectors-v2/connector-socket/pom.xml +++ b/seatunnel-connectors-v2/connector-socket/pom.xml @@ -38,7 +38,7 @@ org.apache.seatunnel seatunnel-format-json - 2.1.3-SNAPSHOT + ${project.version} compile diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java index 6e73de34450..b566799f1ca 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java @@ -25,13 +25,14 @@ @Data public class SinkConfig implements Serializable { - public static final String HOST = "host"; - public static final String PORT = "port"; - private static final String MAX_RETRIES = "max_retries"; - private static final int DEFAULT_MAX_RETRIES = 3; + public static final String HOST = "host"; + public static final String PORT = "port"; + private static final String MAX_RETRIES = "max_retries"; + private static final int DEFAULT_MAX_RETRIES = 3; private String host; - private Integer port; + private int port; private Integer maxNumRetries = DEFAULT_MAX_RETRIES; + public SinkConfig(Config config) { this.host = config.getString(HOST); this.port = config.getInt(PORT); diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java index b9534b062b0..3277ef51891 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java @@ -41,6 +41,7 @@ public class SocketSink extends AbstractSimpleSink { private Config pluginConfig; private SinkConfig sinkConfig; private SeaTunnelRowType seaTunnelRowType; + @Override public String getPluginName() { return "Socket"; diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java index 360085a37ee..9241c9e57d9 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java @@ -28,10 +28,9 @@ public class SocketSinkWriter extends AbstractSinkWriter { private final SocketClient socketClient; - - protected final SerializationSchema serializationSchema; - + private final SerializationSchema serializationSchema; private final SinkConfig sinkConfig; + SocketSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) throws IOException { this.sinkConfig = sinkConfig; this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType); @@ -48,5 +47,4 @@ public void write(SeaTunnelRow element) throws IOException { public void close() throws IOException { socketClient.close(); } - } From ad64ff992b02b947a19db7322a3f91862e27bad4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Sun, 28 Aug 2022 15:23:34 +0800 Subject: [PATCH 4/4] fix code style --- .../connectors/seatunnel/socket/config/SinkConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java index b566799f1ca..9e93336b73b 100644 --- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java @@ -31,7 +31,7 @@ public class SinkConfig implements Serializable { private static final int DEFAULT_MAX_RETRIES = 3; private String host; private int port; - private Integer maxNumRetries = DEFAULT_MAX_RETRIES; + private int maxNumRetries = DEFAULT_MAX_RETRIES; public SinkConfig(Config config) { this.host = config.getString(HOST);