Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. #4382

Merged
merged 35 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3491092
Add Pulsar sink connector.
Mar 21, 2023
3046c38
Add license header.
Mar 22, 2023
43aad76
Merge branch 'dev' into create-pulsar-sink
Mar 23, 2023
ab1e71c
Modify asynchronous write mode to improve write throughput performance
Mar 24, 2023
0b8d97f
update doc.
Mar 27, 2023
5584b72
Merge branch 'dev' into create-pulsar-sink
Mar 27, 2023
b7cde11
Merge branch 'dev' into create-pulsar-sink
May 24, 2023
0a94a8d
Merge branch 'dev' into create-pulsar-sink
May 25, 2023
50d3ad7
Merge branch 'dev' into create-pulsar-sink
hailin0 May 25, 2023
ce34460
add e2e test.
May 26, 2023
86f1c0d
Merge branch 'create-pulsar-sink' of github.com:lightzhao/incubator-s…
May 26, 2023
273452d
Merge branch 'dev' into create-pulsar-sink
Aug 14, 2023
89157b8
add pulsar sink config.
Aug 15, 2023
3c80879
Merge branch 'dev' into create-pulsar-sink
Aug 28, 2023
5ee01e1
fix sink test and miss class.
Aug 28, 2023
de45934
Merge branch 'dev' into create-pulsar-sink
Sep 1, 2023
8a08656
Merge branch 'dev' into create-pulsar-sink
Sep 12, 2023
8f1a1ec
Merge branch 'dev' into create-pulsar-sink
Sep 18, 2023
e16c7dd
Merge branch 'dev' into create-pulsar-sink
Sep 21, 2023
64da469
Merge branch 'dev' into create-pulsar-sink
Nov 22, 2023
d00a87b
update doc and add e2e test case.
Nov 22, 2023
7200d1f
update test.
Nov 24, 2023
78bfe21
update test.
Nov 24, 2023
7f67ac5
update test.
Nov 25, 2023
49b0a69
update test.
Nov 25, 2023
9438f90
update e2e test.
Nov 25, 2023
a7a6dff
update e2e test.
Nov 25, 2023
918729d
Merge branch 'dev' into create-pulsar-sink
Dec 1, 2023
a80e32c
Merge branch 'dev' into create-pulsar-sink
Dec 8, 2023
c9ca686
Support multi-table sink feature and update doc.
Dec 11, 2023
612d000
Support multi-table sink feature and update doc.
Dec 12, 2023
994242b
Merge branch 'dev' into create-pulsar-sink
Dec 13, 2023
ba5a187
remove unless code.
Dec 19, 2023
0122d77
Merge branch 'dev' into create-pulsar-sink
Dec 20, 2023
126874d
Update seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/…
EricJoy2048 Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/en/connector-v2/Error-Quick-Reference-Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ problems encountered by users.
| PULSAR-05 | Get last cursor of pulsar topic failed | When users encounter this error code, it means that get last cursor of pulsar topic failed, please check it |
| PULSAR-06 | Get partition information of pulsar topic failed | When users encounter this error code, it means that Get partition information of pulsar topic failed, please check it |
| PULSAR-07 | Pulsar consumer acknowledgeCumulative failed | When users encounter this error code, it means that Pulsar consumer acknowledgeCumulative failed |
| PULSAR-08 | Pulsar create producer failed | When users encounter this error code, it means that create producer failed, please check it |
| PULSAR-09 | Pulsar create transaction failed | When users encounter this error code, it means that Pulsar create transaction failed, please check it |
| PULSAR-10 | Pulsar send message failed | When users encounter this error code, it means that Pulsar sned message failed, please check it |

## StarRocks Connector Error Codes

Expand Down
177 changes: 177 additions & 0 deletions docs/en/connector-v2/sink/Pulsar.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Pulsar

> Pulsar sink connector

## Support Those Engines

> Spark<br/>
> Flink<br/>
> Seatunnel Zeta<br/>

## Key features

- [x] [exactly-once](../../concept/connector-v2-features.md)

## Description

Sink connector for Apache Pulsar.

## Supported DataSource Info

| Datasource | Supported Versions |
|------------|--------------------|
| Pulsar | Universal |

## Sink Options

| Name | Type | Required | Default | Description |
|----------------------|--------|----------|---------------------|----------------------------------------------------------------------------------------------------------|
| topic | String | Yes | - | sink pulsar topic |
| client.service-url | String | Yes | - | Service URL provider for Pulsar service. |
| admin.service-url | String | Yes | - | The Pulsar service HTTP URL for the admin endpoint. |
| auth.plugin-class | String | No | - | Name of the authentication plugin. |
| auth.params | String | No | - | Parameters for the authentication plugin. |
| format | String | No | json | Data format. The default format is json. Optional text format. |
| field_delimiter | String | No | , | Customize the field delimiter for data format. |
| semantics | Enum | No | AT_LEAST_ONCE | Consistency semantics for writing to pulsar. |
| transaction_timeout | Int | No | 600 | The transaction timeout is specified as 10 minutes by default. |
| pulsar.config | Map | No | - | In addition to the above parameters that must be specified by the Pulsar producer client. |
| message.routing.mode | Enum | No | RoundRobinPartition | Default routing mode for messages to partition. |
| partition_key_fields | array | No | - | Configure which fields are used as the key of the pulsar message. |
| common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## Parameter Interpretation

### client.service-url [String]

Service URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.

For example, `localhost`: `pulsar://localhost:6650,localhost:6651`.

### admin.service-url [String]

The Pulsar service HTTP URL for the admin endpoint.

For example, `http://my-broker.example.com:8080`, or `https://my-broker.example.com:8443` for TLS.

### auth.plugin-class [String]

Name of the authentication plugin.

### auth.params [String]

Parameters for the authentication plugin.

For example, `key1:val1,key2:val2`

### format [String]

Data format. The default format is json. Optional text format. The default field separator is ",".
If you customize the delimiter, add the "field_delimiter" option.

### field_delimiter [String]

Customize the field delimiter for data format.The default field_delimiter is ','.

### semantics [Enum]

Consistency semantics for writing to pulsar.
Available options are EXACTLY_ONCE,NON,AT_LEAST_ONCE, default AT_LEAST_ONCE.
If semantic is specified as EXACTLY_ONCE, we will use 2pc to guarantee the message is sent to pulsar exactly once.
If semantic is specified as NON, we will directly send the message to pulsar, the data may duplicat/lost if
job restart/retry or network error.

### transaction_timeout [Int]

The transaction timeout is specified as 10 minutes by default.
If the transaction does not commit within the specified timeout, the transaction will be automatically aborted.
So you need to ensure that the timeout is greater than the checkpoint interval.

### pulsar.config [Map]

In addition to the above parameters that must be specified by the Pulsar producer client,
the user can also specify multiple non-mandatory parameters for the producer client,
covering all the producer parameters specified in the official Pulsar document.

### message.routing.mode [Enum]

Default routing mode for messages to partition.
Available options are SinglePartition,RoundRobinPartition.
If you choose SinglePartition, If no key is provided, The partitioned producer will randomly pick one single partition and publish all the messages into that partition, If a key is provided on the message, the partitioned producer will hash the key and assign message to a particular partition.
If you choose RoundRobinPartition, If no key is provided, the producer will publish messages across all partitions in round-robin fashion to achieve maximum throughput.
Please note that round-robin is not done per individual message but rather it's set to the same boundary of batching delay, to ensure batching is effective.

### partition_key_fields [String]

Configure which fields are used as the key of the pulsar message.

For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.

Upstream data is the following:

| name | age | data |
|------|-----|---------------|
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

If not set partition key fields, the null message key will be sent to.

The format of the message key is json, If name is set as the key, for example '{"name":"Jack"}'.

The selected field must be an existing field in the upstream.

### common options

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.

## Task Example

### Simple:

> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Pulsar Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target topic is test_topic will also be 16 rows of data in the topic. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.

```hocon
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

sink {
Pulsar {
topic = "example"
client.service-url = "localhost:pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
result_table_name = "test"
pulsar.config = {
sendTimeoutMs = 30000
}
}
}
```

## Changelog

### next version

- Add Pulsar Sink Connector

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,4 @@ seatunnel.source.AmazonSqs = connector-amazonsqs
seatunnel.sink.AmazonSqs = connector-amazonsqs
seatunnel.source.Paimon = connector-paimon
seatunnel.sink.Paimon = connector-paimon
seatunnel.sink.Pulsar = connector-pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.pulsar.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;

Expand All @@ -26,12 +27,29 @@
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PULSAR_CONFIG;

public class PulsarConfigUtil {

public static final String IDENTIFIER = "Pulsar";
Expand All @@ -50,10 +68,14 @@ public static PulsarAdmin createAdmin(PulsarAdminConfig config) {
}
}

public static PulsarClient createClient(PulsarClientConfig config) {
public static PulsarClient createClient(
PulsarClientConfig config, PulsarSemantics pulsarSemantics) {
ClientBuilder builder = PulsarClient.builder();
builder.serviceUrl(config.getServiceUrl());
builder.authentication(createAuthentication(config));
if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
builder.enableTransaction(true);
}
try {
return builder.build();
} catch (PulsarClientException e) {
Expand Down Expand Up @@ -88,4 +110,96 @@ private static Authentication createAuthentication(BasePulsarConfig config) {
"Authentication parameters are required when using authentication plug-in.");
}
}

/**
* get TransactionCoordinatorClient
*
* @param pulsarClient
* @return
*/
public static TransactionCoordinatorClient getTcClient(PulsarClient pulsarClient) {
TransactionCoordinatorClient coordinatorClient =
((PulsarClientImpl) pulsarClient).getTcClient();
// enabled transaction.
if (coordinatorClient == null) {
throw new IllegalArgumentException("You haven't enable transaction in Pulsar client.");
}

return coordinatorClient;
}

/**
* create transaction
*
* @param pulsarClient
* @param timeout
* @return
* @throws PulsarClientException
* @throws InterruptedException
* @throws ExecutionException
*/
public static Transaction getTransaction(PulsarClient pulsarClient, int timeout)
throws PulsarClientException, InterruptedException, ExecutionException {
Transaction transaction =
pulsarClient
.newTransaction()
.withTransactionTimeout(timeout, TimeUnit.SECONDS)
.build()
.get();
return transaction;
}

/**
* create a Producer
*
* @param pulsarClient
* @param topic
* @param pulsarSemantics
* @param pluginConfig
* @param messageRoutingMode
* @return
* @throws PulsarClientException
*/
public static Producer<byte[]> createProducer(
PulsarClient pulsarClient,
String topic,
PulsarSemantics pulsarSemantics,
ReadonlyConfig pluginConfig,
MessageRoutingMode messageRoutingMode)
throws PulsarClientException {
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer(Schema.BYTES);
producerBuilder.topic(topic);
producerBuilder.messageRoutingMode(messageRoutingMode);
producerBuilder.blockIfQueueFull(true);

if (pluginConfig.get(PULSAR_CONFIG) != null) {
Map<String, String> pulsarProperties = new HashMap<>();
pluginConfig
.get(PULSAR_CONFIG)
.forEach((key, value) -> pulsarProperties.put(key, value));
producerBuilder.properties(pulsarProperties);
}
if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
/**
* A condition for pulsar to open a transaction Only producers disabled sendTimeout are
* allowed to produce transactional messages
*/
producerBuilder.sendTimeout(0, TimeUnit.SECONDS);
}
return producerBuilder.create();
}

/**
* create TypedMessageBuilder
*
* @param producer
* @param transaction
* @return
* @throws PulsarClientException
*/
public static TypedMessageBuilder<byte[]> createTypedMessageBuilder(
Producer<byte[]> producer, TransactionImpl transaction) throws PulsarClientException {
ProducerBase<byte[]> producerBase = (ProducerBase<byte[]>) producer;
return new TypedMessageBuilderImpl<byte[]>(producerBase, Schema.BYTES, transaction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.pulsar.config;

public enum PulsarSemantics {

/**
* At this semantics, we will directly send the message to pulsar, the data may duplicat/lost if
* job restart/retry or network error.
*/
NON,

/** At this semantics, we will send at least one */
AT_LEAST_ONCE,

/**
* AT this semantics, we will use 2pc to guarantee the message is sent to pulsar exactly once.
*/
EXACTLY_ONCE;
}
Loading
Loading