From 543d2c508679c5f750d72f6f76c1a970b23db7a4 Mon Sep 17 00:00:00 2001 From: lightzhao <40714172+lightzhao@users.noreply.github.com> Date: Thu, 11 Jan 2024 18:34:46 +0800 Subject: [PATCH] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. (#4382) --- .../Error-Quick-Reference-Manual.md | 3 + docs/en/connector-v2/sink/Pulsar.md | 177 ++++++++++++ plugin-mapping.properties | 1 + .../pulsar/config/PulsarConfigUtil.java | 116 +++++++- .../pulsar/config/PulsarSemantics.java | 35 +++ .../pulsar/config/SinkProperties.java | 97 +++++++ .../exception/PulsarConnectorErrorCode.java | 5 +- .../seatunnel/pulsar/sink/PulsarSink.java | 99 +++++++ .../pulsar/sink/PulsarSinkCommitter.java | 83 ++++++ .../pulsar/sink/PulsarSinkFactory.java | 61 ++++ .../pulsar/sink/PulsarSinkWriter.java | 263 ++++++++++++++++++ .../source/reader/PulsarSourceReader.java | 3 +- .../state/PulsarAggregatedCommitInfo.java | 30 ++ .../pulsar/state/PulsarCommitInfo.java | 33 +++ .../pulsar/state/PulsarSinkState.java | 33 +++ .../e2e/connector/pulsar/PulsarSinkIT.java | 138 +++++++++ .../src/test/resources/fake_to_pulsar.conf | 66 +++++ .../AbstractPluginDiscoveryTest.java | 2 +- .../home/connectors/plugin-mapping.properties | 1 + 19 files changed, 1242 insertions(+), 4 deletions(-) create mode 100644 docs/en/connector-v2/sink/Pulsar.md create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSemantics.java create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkCommitter.java create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarAggregatedCommitInfo.java create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarCommitInfo.java create mode 100644 seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarSinkState.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_to_pulsar.conf diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md index b7e59b74cf5..2519bfbe655 100644 --- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md +++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md @@ -208,6 +208,9 @@ problems encountered by users. | PULSAR-05 | Get last cursor of pulsar topic failed | When users encounter this error code, it means that get last cursor of pulsar topic failed, please check it | | PULSAR-06 | Get partition information of pulsar topic failed | When users encounter this error code, it means that Get partition information of pulsar topic failed, please check it | | PULSAR-07 | Pulsar consumer acknowledgeCumulative failed | When users encounter this error code, it means that Pulsar consumer acknowledgeCumulative failed | +| PULSAR-08 | Pulsar create producer failed | When users encounter this error code, it means that create producer failed, please check it | +| PULSAR-09 | Pulsar create transaction failed | When users encounter this error code, it means that Pulsar create transaction failed, please check it | +| PULSAR-10 | Pulsar send message failed | When users encounter this error code, it means that Pulsar sned message failed, please check it | ## StarRocks Connector Error Codes diff --git a/docs/en/connector-v2/sink/Pulsar.md b/docs/en/connector-v2/sink/Pulsar.md new file mode 100644 index 00000000000..d3a648e2369 --- /dev/null +++ b/docs/en/connector-v2/sink/Pulsar.md @@ -0,0 +1,177 @@ +# Pulsar + +> Pulsar sink connector + +## Support Those Engines + +> Spark
+> Flink
+> Seatunnel Zeta
+ +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +## Description + +Sink connector for Apache Pulsar. + +## Supported DataSource Info + +| Datasource | Supported Versions | +|------------|--------------------| +| Pulsar | Universal | + +## Sink Options + +| Name | Type | Required | Default | Description | +|----------------------|--------|----------|---------------------|----------------------------------------------------------------------------------------------------------| +| topic | String | Yes | - | sink pulsar topic | +| client.service-url | String | Yes | - | Service URL provider for Pulsar service. | +| admin.service-url | String | Yes | - | The Pulsar service HTTP URL for the admin endpoint. | +| auth.plugin-class | String | No | - | Name of the authentication plugin. | +| auth.params | String | No | - | Parameters for the authentication plugin. | +| format | String | No | json | Data format. The default format is json. Optional text format. | +| field_delimiter | String | No | , | Customize the field delimiter for data format. | +| semantics | Enum | No | AT_LEAST_ONCE | Consistency semantics for writing to pulsar. | +| transaction_timeout | Int | No | 600 | The transaction timeout is specified as 10 minutes by default. | +| pulsar.config | Map | No | - | In addition to the above parameters that must be specified by the Pulsar producer client. | +| message.routing.mode | Enum | No | RoundRobinPartition | Default routing mode for messages to partition. | +| partition_key_fields | array | No | - | Configure which fields are used as the key of the pulsar message. | +| common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | + +## Parameter Interpretation + +### client.service-url [String] + +Service URL provider for Pulsar service. +To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL. +You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme. + +For example, `localhost`: `pulsar://localhost:6650,localhost:6651`. + +### admin.service-url [String] + +The Pulsar service HTTP URL for the admin endpoint. + +For example, `http://my-broker.example.com:8080`, or `https://my-broker.example.com:8443` for TLS. + +### auth.plugin-class [String] + +Name of the authentication plugin. + +### auth.params [String] + +Parameters for the authentication plugin. + +For example, `key1:val1,key2:val2` + +### format [String] + +Data format. The default format is json. Optional text format. The default field separator is ",". +If you customize the delimiter, add the "field_delimiter" option. + +### field_delimiter [String] + +Customize the field delimiter for data format.The default field_delimiter is ','. + +### semantics [Enum] + +Consistency semantics for writing to pulsar. +Available options are EXACTLY_ONCE,NON,AT_LEAST_ONCE, default AT_LEAST_ONCE. +If semantic is specified as EXACTLY_ONCE, we will use 2pc to guarantee the message is sent to pulsar exactly once. +If semantic is specified as NON, we will directly send the message to pulsar, the data may duplicat/lost if +job restart/retry or network error. + +### transaction_timeout [Int] + +The transaction timeout is specified as 10 minutes by default. +If the transaction does not commit within the specified timeout, the transaction will be automatically aborted. +So you need to ensure that the timeout is greater than the checkpoint interval. + +### pulsar.config [Map] + +In addition to the above parameters that must be specified by the Pulsar producer client, +the user can also specify multiple non-mandatory parameters for the producer client, +covering all the producer parameters specified in the official Pulsar document. + +### message.routing.mode [Enum] + +Default routing mode for messages to partition. +Available options are SinglePartition,RoundRobinPartition. +If you choose SinglePartition, If no key is provided, The partitioned producer will randomly pick one single partition and publish all the messages into that partition, If a key is provided on the message, the partitioned producer will hash the key and assign message to a particular partition. +If you choose RoundRobinPartition, If no key is provided, the producer will publish messages across all partitions in round-robin fashion to achieve maximum throughput. +Please note that round-robin is not done per individual message but rather it's set to the same boundary of batching delay, to ensure batching is effective. + +### partition_key_fields [String] + +Configure which fields are used as the key of the pulsar message. + +For example, if you want to use value of fields from upstream data as key, you can assign field names to this property. + +Upstream data is the following: + +| name | age | data | +|------|-----|---------------| +| Jack | 16 | data-example1 | +| Mary | 23 | data-example2 | + +If name is set as the key, then the hash value of the name column will determine which partition the message is sent to. + +If not set partition key fields, the null message key will be sent to. + +The format of the message key is json, If name is set as the key, for example '{"name":"Jack"}'. + +The selected field must be an existing field in the upstream. + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. + +## Task Example + +### Simple: + +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Pulsar Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target topic is test_topic will also be 16 rows of data in the topic. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. + +```hocon +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +sink { + Pulsar { + topic = "example" + client.service-url = "localhost:pulsar://localhost:6650" + admin.service-url = "http://my-broker.example.com:8080" + result_table_name = "test" + pulsar.config = { + sendTimeoutMs = 30000 + } + } +} +``` + +## Changelog + +### next version + +- Add Pulsar Sink Connector + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 4243e8d45c9..b896fcdacbb 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -115,3 +115,4 @@ seatunnel.source.AmazonSqs = connector-amazonsqs seatunnel.sink.AmazonSqs = connector-amazonsqs seatunnel.source.Paimon = connector-paimon seatunnel.sink.Paimon = connector-paimon +seatunnel.sink.Pulsar = connector-pulsar \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java index 6e2d6c3e0cc..1b73085ec6a 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.pulsar.config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException; @@ -26,12 +27,29 @@ import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.impl.ProducerBase; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PULSAR_CONFIG; + public class PulsarConfigUtil { public static final String IDENTIFIER = "Pulsar"; @@ -50,10 +68,14 @@ public static PulsarAdmin createAdmin(PulsarAdminConfig config) { } } - public static PulsarClient createClient(PulsarClientConfig config) { + public static PulsarClient createClient( + PulsarClientConfig config, PulsarSemantics pulsarSemantics) { ClientBuilder builder = PulsarClient.builder(); builder.serviceUrl(config.getServiceUrl()); builder.authentication(createAuthentication(config)); + if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) { + builder.enableTransaction(true); + } try { return builder.build(); } catch (PulsarClientException e) { @@ -88,4 +110,96 @@ private static Authentication createAuthentication(BasePulsarConfig config) { "Authentication parameters are required when using authentication plug-in."); } } + + /** + * get TransactionCoordinatorClient + * + * @param pulsarClient + * @return + */ + public static TransactionCoordinatorClient getTcClient(PulsarClient pulsarClient) { + TransactionCoordinatorClient coordinatorClient = + ((PulsarClientImpl) pulsarClient).getTcClient(); + // enabled transaction. + if (coordinatorClient == null) { + throw new IllegalArgumentException("You haven't enable transaction in Pulsar client."); + } + + return coordinatorClient; + } + + /** + * create transaction + * + * @param pulsarClient + * @param timeout + * @return + * @throws PulsarClientException + * @throws InterruptedException + * @throws ExecutionException + */ + public static Transaction getTransaction(PulsarClient pulsarClient, int timeout) + throws PulsarClientException, InterruptedException, ExecutionException { + Transaction transaction = + pulsarClient + .newTransaction() + .withTransactionTimeout(timeout, TimeUnit.SECONDS) + .build() + .get(); + return transaction; + } + + /** + * create a Producer + * + * @param pulsarClient + * @param topic + * @param pulsarSemantics + * @param pluginConfig + * @param messageRoutingMode + * @return + * @throws PulsarClientException + */ + public static Producer createProducer( + PulsarClient pulsarClient, + String topic, + PulsarSemantics pulsarSemantics, + ReadonlyConfig pluginConfig, + MessageRoutingMode messageRoutingMode) + throws PulsarClientException { + ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.BYTES); + producerBuilder.topic(topic); + producerBuilder.messageRoutingMode(messageRoutingMode); + producerBuilder.blockIfQueueFull(true); + + if (pluginConfig.get(PULSAR_CONFIG) != null) { + Map pulsarProperties = new HashMap<>(); + pluginConfig + .get(PULSAR_CONFIG) + .forEach((key, value) -> pulsarProperties.put(key, value)); + producerBuilder.properties(pulsarProperties); + } + if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) { + /** + * A condition for pulsar to open a transaction Only producers disabled sendTimeout are + * allowed to produce transactional messages + */ + producerBuilder.sendTimeout(0, TimeUnit.SECONDS); + } + return producerBuilder.create(); + } + + /** + * create TypedMessageBuilder + * + * @param producer + * @param transaction + * @return + * @throws PulsarClientException + */ + public static TypedMessageBuilder createTypedMessageBuilder( + Producer producer, TransactionImpl transaction) throws PulsarClientException { + ProducerBase producerBase = (ProducerBase) producer; + return new TypedMessageBuilderImpl(producerBase, Schema.BYTES, transaction); + } } diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSemantics.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSemantics.java new file mode 100644 index 00000000000..7a371b139ea --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarSemantics.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.config; + +public enum PulsarSemantics { + + /** + * At this semantics, we will directly send the message to pulsar, the data may duplicat/lost if + * job restart/retry or network error. + */ + NON, + + /** At this semantics, we will send at least one */ + AT_LEAST_ONCE, + + /** + * AT this semantics, we will use 2pc to guarantee the message is sent to pulsar exactly once. + */ + EXACTLY_ONCE; +} diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java new file mode 100644 index 00000000000..ccf59e87c34 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SinkProperties.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import org.apache.pulsar.client.api.MessageRoutingMode; + +import java.util.List; +import java.util.Map; + +public class SinkProperties { + + /** The default data format is JSON */ + public static final String DEFAULT_FORMAT = "json"; + + public static final String TEXT_FORMAT = "text"; + + /** The default field delimiter is “,” */ + public static final String DEFAULT_FIELD_DELIMITER = ","; + + public static final Option FORMAT = + Options.key("format") + .stringType() + .defaultValue(DEFAULT_FORMAT) + .withDescription( + "Data format. The default format is json. Optional text format. The default field separator is \", \". " + + "If you customize the delimiter, add the \"field_delimiter\" option."); + + public static final Option FIELD_DELIMITER = + Options.key("field_delimiter") + .stringType() + .defaultValue(DEFAULT_FIELD_DELIMITER) + .withDescription( + "Customize the field delimiter for data format.The default field_delimiter is ',' "); + public static final Option TOPIC = + Options.key("topic") + .stringType() + .noDefaultValue() + .withDescription("sink pulsar topic name."); + public static final Option SEMANTICS = + Options.key("semantics") + .enumType(PulsarSemantics.class) + .defaultValue(PulsarSemantics.AT_LEAST_ONCE) + .withDescription( + "If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Pulsar transaction."); + + public static final Option TRANSACTION_TIMEOUT = + Options.key("transaction_timeout") + .intType() + .defaultValue(600) + .withDescription( + "The transaction timeout is specified as 10 minutes by default. If the transaction does not commit within the specified timeout, the transaction will be automatically aborted. So you need to ensure that the timeout is greater than the checkpoint interval"); + + public static final Option> PULSAR_CONFIG = + Options.key("pulsar.config") + .mapType() + .noDefaultValue() + .withDescription( + "In addition to the above parameters that must be specified by the Pulsar producer or consumer client, " + + "the user can also specify multiple non-mandatory parameters for the producer or consumer client, " + + "covering all the producer parameters specified in the official Pulsar document."); + + public static final Option MESSAGE_ROUTING_MODE = + Options.key("message.routing.mode") + .enumType(MessageRoutingMode.class) + .defaultValue(MessageRoutingMode.RoundRobinPartition) + .withDescription( + "Default routing mode for messages to partition. " + + "If you choose SinglePartition,If no key is provided, The partitioned producer will randomly pick one single partition and publish all the messages into that partition. " + + " If a key is provided on the message, the partitioned producer will hash the key and assign message to a particular partition." + + " If you choose RoundRobinPartition,If no key is provided, the producer will publish messages across all partitions in round-robin fashion to achieve maximum throughput. " + + "Please note that round-robin is not done per individual message but rather it's set to the same boundary of batching delay, to ensure batching is effective."); + + public static final Option> PARTITION_KEY_FIELDS = + Options.key("partition_key_fields") + .listType() + .noDefaultValue() + .withDescription( + "Configure which fields are used as the key of the pulsar message."); +} diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java index 4f9871b4985..4d631389b71 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java @@ -26,7 +26,10 @@ public enum PulsarConnectorErrorCode implements SeaTunnelErrorCode { SUBSCRIBE_TOPIC_FAILED("PULSAR-04", "Subscribe topic from pulsar failed"), GET_LAST_CURSOR_FAILED("PULSAR-05", "Get last cursor of pulsar topic failed"), GET_TOPIC_PARTITION_FAILED("PULSAR-06", "Get partition information of pulsar topic failed"), - ACK_CUMULATE_FAILED("PULSAR-07", "Pulsar consumer acknowledgeCumulative failed"); + ACK_CUMULATE_FAILED("PULSAR-07", "Pulsar consumer acknowledgeCumulative failed"), + CREATE_PRODUCER_FAILED("PULSAR-08", "create producer failed"), + CREATE_TRANSACTION_FAILED("PULSAR-09", "create transaction failed"), + SEND_MESSAGE_FAILED("PULSAR-10", "send message failed"); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java new file mode 100644 index 00000000000..05a007df9a7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil; +import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL; + +/** + * Pulsar Sink implementation by using SeaTunnel sink API. This class contains the method to create + * {@link PulsarSinkWriter} and {@link PulsarSinkCommitter}. + */ +public class PulsarSink + implements SeaTunnelSink< + SeaTunnelRow, PulsarSinkState, PulsarCommitInfo, PulsarAggregatedCommitInfo> { + + private SeaTunnelRowType seaTunnelRowType; + private PulsarClientConfig clientConfig; + private ReadonlyConfig readonlyConfig; + + public PulsarSink(ReadonlyConfig readonlyConfig, SeaTunnelRowType seaTunnelRowType) { + this.readonlyConfig = readonlyConfig; + this.seaTunnelRowType = seaTunnelRowType; + + /** client config */ + PulsarClientConfig.Builder clientConfigBuilder = + PulsarClientConfig.builder().serviceUrl(readonlyConfig.get(CLIENT_SERVICE_URL)); + clientConfigBuilder.authPluginClassName(readonlyConfig.get(AUTH_PLUGIN_CLASS)); + clientConfigBuilder.authParams(readonlyConfig.get(AUTH_PARAMS)); + this.clientConfig = clientConfigBuilder.build(); + } + + @Override + public SinkWriter createWriter( + SinkWriter.Context context) { + return new PulsarSinkWriter( + context, clientConfig, seaTunnelRowType, readonlyConfig, Collections.emptyList()); + } + + @Override + public SinkWriter restoreWriter( + SinkWriter.Context context, List states) { + return new PulsarSinkWriter( + context, clientConfig, seaTunnelRowType, readonlyConfig, states); + } + + @Override + public Optional> getWriterStateSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional> createCommitter() { + return Optional.of(new PulsarSinkCommitter(clientConfig)); + } + + @Override + public Optional> getCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public String getPluginName() { + return PulsarConfigUtil.IDENTIFIER; + } +} diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkCommitter.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkCommitter.java new file mode 100644 index 00000000000..99330c889d9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkCommitter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.sink; + +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics; +import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; +import org.apache.pulsar.client.api.transaction.TxnID; + +import java.io.IOException; +import java.util.List; + +public class PulsarSinkCommitter implements SinkCommitter { + + private PulsarClientConfig clientConfig; + private PulsarClient pulsarClient; + private TransactionCoordinatorClient coordinatorClient; + + public PulsarSinkCommitter(PulsarClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + @Override + public List commit(List commitInfos) throws IOException { + if (commitInfos.isEmpty()) { + return commitInfos; + } + + TransactionCoordinatorClient client = transactionCoordinatorClient(); + + for (PulsarCommitInfo pulsarCommitInfo : commitInfos) { + TxnID txnID = pulsarCommitInfo.getTxnID(); + client.commit(txnID); + } + return commitInfos; + } + + @Override + public void abort(List commitInfos) throws IOException { + if (commitInfos.isEmpty()) { + return; + } + TransactionCoordinatorClient client = transactionCoordinatorClient(); + for (PulsarCommitInfo commitInfo : commitInfos) { + TxnID txnID = commitInfo.getTxnID(); + client.abort(txnID); + } + if (this.pulsarClient != null) { + pulsarClient.close(); + } + } + + private TransactionCoordinatorClient transactionCoordinatorClient() + throws PulsarClientException { + if (coordinatorClient == null) { + this.pulsarClient = + PulsarConfigUtil.createClient(clientConfig, PulsarSemantics.EXACTLY_ONCE); + this.coordinatorClient = PulsarConfigUtil.getTcClient(pulsarClient); + } + return coordinatorClient; + } +} diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java new file mode 100644 index 00000000000..c5b13e2876b --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL; + +@AutoService(Factory.class) +public class PulsarSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return PulsarConfigUtil.IDENTIFIER; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL, TOPIC) + .optional(FORMAT, FIELD_DELIMITER, MESSAGE_ROUTING_MODE) + .bundled(AUTH_PLUGIN_CLASS, AUTH_PARAMS) + .build(); + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> + new PulsarSink( + context.getOptions(), + context.getCatalogTable().getTableSchema().toPhysicalRowDataType()); + } +} diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java new file mode 100644 index 00000000000..9f3eae2c263 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics; +import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException; +import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState; +import org.apache.seatunnel.format.json.JsonSerializationSchema; +import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; +import org.apache.seatunnel.format.text.TextSerializationSchema; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT; + +public class PulsarSinkWriter + implements SinkWriter { + + private Context context; + private Producer producer; + private PulsarClient pulsarClient; + private SerializationSchema serializationSchema; + private SerializationSchema keySerializationSchema; + private TransactionImpl transaction; + private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue(); + private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue(); + private final AtomicLong pendingMessages; + + public PulsarSinkWriter( + Context context, + PulsarClientConfig clientConfig, + SeaTunnelRowType seaTunnelRowType, + ReadonlyConfig pluginConfig, + List pulsarStates) { + this.context = context; + String topic = pluginConfig.get(TOPIC); + String format = pluginConfig.get(FORMAT); + String delimiter = pluginConfig.get(FIELD_DELIMITER); + Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT); + PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS); + MessageRoutingMode messageRoutingMode = pluginConfig.get(MESSAGE_ROUTING_MODE); + this.serializationSchema = createSerializationSchema(seaTunnelRowType, format, delimiter); + List partitionKeyList = getPartitionKeyFields(pluginConfig, seaTunnelRowType); + this.keySerializationSchema = + createKeySerializationSchema(partitionKeyList, seaTunnelRowType); + this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, pulsarSemantics); + + if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) { + try { + this.transaction = + (TransactionImpl) + PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout); + } catch (Exception e) { + throw new PulsarConnectorException( + PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED, + "Pulsar transaction create fail."); + } + } + try { + this.producer = + PulsarConfigUtil.createProducer( + pulsarClient, topic, pulsarSemantics, pluginConfig, messageRoutingMode); + } catch (PulsarClientException e) { + throw new PulsarConnectorException( + PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED, + "Pulsar Producer create fail."); + } + this.pendingMessages = new AtomicLong(0); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + byte[] message = serializationSchema.serialize(element); + byte[] key = null; + if (keySerializationSchema != null) { + key = keySerializationSchema.serialize(element); + } + TypedMessageBuilder typedMessageBuilder = + PulsarConfigUtil.createTypedMessageBuilder(producer, transaction); + if (key != null) { + typedMessageBuilder.keyBytes(key); + } + typedMessageBuilder.value(message); + if (PulsarSemantics.NON == pulsarSemantics) { + typedMessageBuilder.sendAsync(); + } else { + pendingMessages.incrementAndGet(); + CompletableFuture future = typedMessageBuilder.sendAsync(); + future.whenComplete( + (id, ex) -> { + pendingMessages.decrementAndGet(); + if (ex != null) { + throw new PulsarConnectorException( + PulsarConnectorErrorCode.SEND_MESSAGE_FAILED, + "send message failed"); + } + }); + } + } + + @Override + public Optional prepareCommit() throws IOException { + if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) { + PulsarCommitInfo pulsarCommitInfo = new PulsarCommitInfo(this.transaction.getTxnID()); + return Optional.of(pulsarCommitInfo); + } else { + return Optional.empty(); + } + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + if (PulsarSemantics.NON != pulsarSemantics) { + /** flush pending messages */ + producer.flush(); + while (pendingMessages.longValue() > 0) { + producer.flush(); + } + } + if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) { + List pulsarSinkStates = + Lists.newArrayList(new PulsarSinkState(this.transaction.getTxnID())); + try { + this.transaction = + (TransactionImpl) + PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout); + } catch (Exception e) { + throw new PulsarConnectorException( + PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED, + "Pulsar transaction create fail."); + } + return pulsarSinkStates; + } + return Collections.emptyList(); + } + + @Override + public void abortPrepare() { + if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) { + transaction.abort(); + } + } + + @Override + public void close() throws IOException { + producer.close(); + pulsarClient.close(); + } + + private SerializationSchema createSerializationSchema( + SeaTunnelRowType rowType, String format, String delimiter) { + if (DEFAULT_FORMAT.equals(format)) { + return new JsonSerializationSchema(rowType); + } else if (TEXT_FORMAT.equals(format)) { + return TextSerializationSchema.builder() + .seaTunnelRowType(rowType) + .delimiter(delimiter) + .build(); + } else { + throw new SeaTunnelJsonFormatException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format); + } + } + + public static SerializationSchema createKeySerializationSchema( + List keyFieldNames, SeaTunnelRowType seaTunnelRowType) { + if (keyFieldNames == null || keyFieldNames.isEmpty()) { + return null; + } + int[] keyFieldIndexArr = new int[keyFieldNames.size()]; + SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()]; + for (int i = 0; i < keyFieldNames.size(); i++) { + String keyFieldName = keyFieldNames.get(i); + int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName); + keyFieldIndexArr[i] = rowFieldIndex; + keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex); + } + SeaTunnelRowType keyType = + new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr); + SerializationSchema keySerializationSchema = new JsonSerializationSchema(keyType); + + Function keyDataExtractor = + row -> { + Object[] keyFields = new Object[keyFieldIndexArr.length]; + for (int i = 0; i < keyFieldIndexArr.length; i++) { + keyFields[i] = row.getField(keyFieldIndexArr[i]); + } + return new SeaTunnelRow(keyFields); + }; + return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row)); + } + + private List getPartitionKeyFields( + ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) { + if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) { + List partitionKeyFields = pluginConfig.get(PARTITION_KEY_FIELDS); + List rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames()); + for (String partitionKeyField : partitionKeyFields) { + if (!rowTypeFieldNames.contains(partitionKeyField)) { + throw new PulsarConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + String.format( + "Partition key field not found: %s, rowType: %s", + partitionKeyField, rowTypeFieldNames)); + } + } + return partitionKeyFields; + } + return Collections.emptyList(); + } +} diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java index 3475cd32d9e..d811f49af31 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil; import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig; +import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics; import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException; import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor; @@ -104,7 +105,7 @@ public PulsarSourceReader( @Override public void open() { - this.pulsarClient = PulsarConfigUtil.createClient(clientConfig); + this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, PulsarSemantics.NON); } @Override diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarAggregatedCommitInfo.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarAggregatedCommitInfo.java new file mode 100644 index 00000000000..bb75197248a --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarAggregatedCommitInfo.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class PulsarAggregatedCommitInfo implements Serializable { + List commitInfos; +} diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarCommitInfo.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarCommitInfo.java new file mode 100644 index 00000000000..fc54308a335 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarCommitInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.state; + +import org.apache.pulsar.client.api.transaction.TxnID; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class PulsarCommitInfo implements Serializable { + + /** The transaction id. */ + private final TxnID txnID; +} diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarSinkState.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarSinkState.java new file mode 100644 index 00000000000..a5aabebb848 --- /dev/null +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarSinkState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.pulsar.state; + +import org.apache.pulsar.client.api.transaction.TxnID; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class PulsarSinkState implements Serializable { + + /** The transaction id. */ + private final TxnID txnID; +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java new file mode 100644 index 00000000000..829c9b7cc5b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.pulsar; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static java.time.temporal.ChronoUnit.SECONDS; + +@Slf4j +public class PulsarSinkIT extends TestSuiteBase implements TestResource { + + private static final String PULSAR_IMAGE_NAME = "apachepulsar/pulsar:2.3.1"; + public static final String PULSAR_HOST = "pulsar.e2e.sink"; + public static final String TOPIC = "topic-test02"; + private PulsarContainer pulsarContainer; + + @Override + @BeforeAll + public void startUp() throws Exception { + pulsarContainer = + new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(PULSAR_HOST) + .withStartupTimeout(Duration.of(400, SECONDS)) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME))); + + Startables.deepStart(Stream.of(pulsarContainer)).join(); + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS); + } + + @Override + public void tearDown() throws Exception { + pulsarContainer.close(); + } + + private List getPulsarConsumerData() { + List data = new ArrayList<>(); + try { + PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build(); + + Random random = new Random(); + Consumer consumer = + client.newConsumer() + .topic(TOPIC) + .subscriptionName("PulsarSubTest" + random.nextInt()) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + int i = 0; + while (true) { + i++; + Message msg = consumer.receive(); + if (msg != null) { + data.add(new String(msg.getData())); + consumer.acknowledge(msg.getMessageId()); + log.info("value:{}", new String(msg.getData())); + } + if (i == 10) { + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + return data; + } + + @TestTemplate + public void testSinkPulsar(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/fake_to_pulsar.conf"); + Assertions.assertEquals(execResult.getExitCode(), 0); + + List data = getPulsarConsumerData(); + log.info("data size:{}", data.size()); + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode objectNode = objectMapper.readValue(data.get(0), ObjectNode.class); + Assertions.assertTrue(objectNode.has("c_map")); + Assertions.assertTrue(objectNode.has("c_array")); + Assertions.assertTrue(objectNode.has("c_string")); + Assertions.assertTrue(objectNode.has("c_boolean")); + Assertions.assertTrue(objectNode.has("c_double")); + Assertions.assertEquals(10, data.size()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_to_pulsar.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_to_pulsar.conf new file mode 100644 index 00000000000..f6590308aef --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_to_pulsar.conf @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } +} + +sink { + pulsar { + topic = "topic-test02" + client.service-url = "pulsar://pulsar.e2e.sink:6650" + admin.service-url = "http://pulsar.e2e.sink:8080" + format = json + } +} \ No newline at end of file diff --git a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java index 379fcd2aceb..18b86a65a84 100644 --- a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java +++ b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java @@ -54,7 +54,7 @@ public void testGetAllPlugins() { Map sinkPlugins = AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SINK); - Assertions.assertEquals(30, sinkPlugins.size()); + Assertions.assertEquals(31, sinkPlugins.size()); } @AfterEach diff --git a/seatunnel-plugin-discovery/src/test/resources/home/connectors/plugin-mapping.properties b/seatunnel-plugin-discovery/src/test/resources/home/connectors/plugin-mapping.properties index b3f9707aaf5..f34f17a8aed 100644 --- a/seatunnel-plugin-discovery/src/test/resources/home/connectors/plugin-mapping.properties +++ b/seatunnel-plugin-discovery/src/test/resources/home/connectors/plugin-mapping.properties @@ -145,3 +145,4 @@ seatunnel.sink.StarRocks = connector-starrocks seatunnel.source.MyHours = connector-http-myhours seatunnel.sink.InfluxDB = connector-influxdb seatunnel.source.GoogleSheets = connector-google-sheets +seatunnel.sink.Pulsar = connector-pulsar