Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[doc][connector-v2] pulsar source options doc #2128

Merged
merged 2 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions docs/en/connector-v2/source/pulsar.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Apache Pulsar

## Description

Source connector for Apache Pulsar. It can support both off-line and real-time jobs.

## Options
ashulin marked this conversation as resolved.
Show resolved Hide resolved

| name | type | required | default value |
| --- | --- | --- | --- |
| topic | String | No | - |
| topic-pattern | String | No | - |
| topic-discovery.interval | Long | No | 30000 |
| subscription.name | String | Yes | - |
| client.service-url | String | Yes | - |
| admin.service-url | String | Yes | - |
| auth.plugin-class | String | No | - |
| auth.params | String | No | - |
| poll.timeout | Integer | No | 100 |
| poll.interval | Long | No | 50 |
| poll.batch.size | Integer | No | 500 |
| cursor.startup.mode | Enum | No | LATEST |
| cursor.startup.timestamp | Long | No | - |
| cursor.reset.mode | Enum | No | LATEST |
| cursor.stop.mode | Enum | No | NEVER |
| cursor.stop.timestamp | Long | No | - |

### topic [String]

Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'.

**Note, only one of "topic-pattern" and "topic" can be specified for sources.**

### topic-pattern [String]

The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running.

**Note, only one of "topic-pattern" and "topic" can be specified for sources.**

### topic-discovery.interval [Long]

The interval (in ms) for the Pulsar source to discover the new topic partitions. A non-positive value disables the topic partition discovery.

**Note, This option only works if the 'topic-pattern' option is used.**

### subscription.name [String]

Specify the subscription name for this consumer. This argument is required when constructing the consumer.

### client.service-url [String]

Service URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.

For example, `localhost`: `pulsar://localhost:6650,localhost:6651`.

### admin.service-url [String]

The Pulsar service HTTP URL for the admin endpoint.

For example, `http://my-broker.example.com:8080`, or `https://my-broker.example.com:8443` for TLS.

### auth.plugin-class [String]

Name of the authentication plugin.

### auth.params [String]

Parameters for the authentication plugin.

For example, `key1:val1,key2:val2`

### poll.timeout [Integer]

The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency.

### poll.interval [Long]

The interval time(in ms) when fetcing records. A shorter time increases throughput, but also increases CPU load.

### poll.batch.size [Integer]

The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency.

### cursor.startup.mode [Enum]

Startup mode for Pulsar consumer, valid values are `'EARLIEST'`, `'LATEST'`, `'SUBSCRIPTION'`, `'TIMESTAMP'`.

### cursor.startup.timestamp [String]

Start from the specified epoch timestamp (in milliseconds).

**Note, This option is required when the "cursor.startup.mode" option used `'TIMESTAMP'`.**

### cursor.reset.mode [Enum]

Cursor reset strategy for Pulsar consumer valid values are `'EARLIEST'`, `'LATEST'`.

**Note, This option only works if the "cursor.startup.mode" option used `'SUBSCRIPTION'`.**

### cursor.stop.mode [String]

Stop mode for Pulsar consumer, valid values are `'NEVER'`, `'LATEST'`and `'TIMESTAMP'`.

**Note, When `'NEVER' `is specified, it is a real-time job, and other mode are off-line jobs.**

### cursor.startup.timestamp [String]

Stop from the specified epoch timestamp (in milliseconds).

**Note, This option is required when the "cursor.stop.mode" option used `'TIMESTAMP'`.**

## Example

```Jdbc {
source {
Pulsar {
topic = "example"
subscription.name = "seatunnel"
client.service-url = "localhost:pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
result_table_name = "test"
}
}
```
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,4 @@ seatunnel.sink.Clickhouse = connector-clickhouse
seatunnel.sink.ClickhouseFile = connector-clickhouse
seatunnel.source.Jdbc = connector-jdbc
seatunnel.sink.Jdbc = connector-jdbc
seatunnel.source.Pulsar = connector-pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -19,61 +19,48 @@

public class SourceProperties {

// Pulsar client API config prefix.
public static final String CLIENT_CONFIG_PREFIX = "pulsar.client.";
// Pulsar admin API config prefix.
public static final String ADMIN_CONFIG_PREFIX = "pulsar.admin.";

// --------------------------------------------------------------------------------------------
// The configuration for ClientConfigurationData part.
// All the configuration listed below should have the pulsar.client prefix.
// --------------------------------------------------------------------------------------------

public static final String PULSAR_SERVICE_URL = CLIENT_CONFIG_PREFIX + "serviceUrl";
public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = CLIENT_CONFIG_PREFIX + "authPluginClassName";
public static final String PULSAR_AUTH_PARAMS = CLIENT_CONFIG_PREFIX + "authParams";
public static final String CLIENT_SERVICE_URL = "client.service-url";
public static final String AUTH_PLUGIN_CLASS = "auth.plugin-class";
public static final String AUTH_PARAMS = "auth.params";

// --------------------------------------------------------------------------------------------
// The configuration for ClientConfigurationData part.
// All the configuration listed below should have the pulsar.client prefix.
// --------------------------------------------------------------------------------------------

public static final String PULSAR_ADMIN_URL = ADMIN_CONFIG_PREFIX + "adminUrl";

// Pulsar source connector config prefix.
public static final String SOURCE_CONFIG_PREFIX = "pulsar.source.";
// Pulsar consumer API config prefix.
public static final String CONSUMER_CONFIG_PREFIX = "pulsar.consumer.";
public static final String ADMIN_SERVICE_URL = "admin.service-url";

// --------------------------------------------------------------------------------------------
// The configuration for ConsumerConfigurationData part.
// All the configuration listed below should have the pulsar.consumer prefix.
// --------------------------------------------------------------------------------------------

public static final String PULSAR_SUBSCRIPTION_NAME = CONSUMER_CONFIG_PREFIX + "subscriptionName";
public static final String PULSAR_SUBSCRIPTION_TYPE = CONSUMER_CONFIG_PREFIX + "subscriptionType";
public static final String PULSAR_SUBSCRIPTION_MODE = CONSUMER_CONFIG_PREFIX + "subscriptionMode";
public static final String SUBSCRIPTION_NAME = "subscription.name";
public static final String SUBSCRIPTION_TYPE = "subscription.type";
public static final String SUBSCRIPTION_MODE = "subscription.mode";

// --------------------------------------------------------------------------------------------
// The configuration for pulsar source part.
// All the configuration listed below should have the pulsar.source prefix.
// --------------------------------------------------------------------------------------------

public static final String PULSAR_PARTITION_DISCOVERY_INTERVAL_MS = SOURCE_CONFIG_PREFIX + "partitionDiscoveryIntervalMs";
public static final String PULSAR_TOPIC = SOURCE_CONFIG_PREFIX + "topic";
public static final String PULSAR_TOPIC_PATTERN = SOURCE_CONFIG_PREFIX + "topic.pattern";
public static final String PULSAR_POLL_TIMEOUT = SOURCE_CONFIG_PREFIX + "poll.timeout";
public static final String PULSAR_POLL_INTERVAL = SOURCE_CONFIG_PREFIX + "poll.interval";
public static final String PULSAR_BATCH_SIZE = SOURCE_CONFIG_PREFIX + "batch.size";
public static final String PULSAR_CURSOR_START_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.mode";
public static final String PULSAR_CURSOR_START_RESET_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.reset.mode";
public static final String PULSAR_CURSOR_START_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.start.timestamp";
public static final String PULSAR_CURSOR_START_ID = SOURCE_CONFIG_PREFIX + "scan.cursor.start.id";
public static final String PULSAR_CURSOR_STOP_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.mode";
public static final String PULSAR_CURSOR_STOP_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.timestamp";
public static final String TOPIC_DISCOVERY_INTERVAL = "topic-discovery.interval";
public static final String TOPIC = "topic";
public static final String TOPIC_PATTERN = "topic-pattern";
public static final String POLL_TIMEOUT = "poll.timeout";
public static final String POLL_INTERVAL = "poll.interval";
public static final String POLL_BATCH_SIZE = "poll.batch.size";
public static final String CURSOR_STARTUP_MODE = "cursor.startup.mode";
public static final String CURSOR_RESET_MODE = "cursor.reset.mode";
public static final String CURSOR_STARTUP_TIMESTAMP = "cursor.startup.timestamp";
public static final String CURSOR_STARTUP_ID = "cursor.startup.id";
public static final String CURSOR_STOP_MODE = "cursor.stop.mode";
public static final String CURSOR_STOP_TIMESTAMP = "cursor.stop.timestamp";

/**
* Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}.
* Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}.
*/
public enum StartMode {
/**
Expand All @@ -99,7 +86,7 @@ public enum StartMode {
}

/**
* Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}.
* Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}.
*/
public enum StopMode {
/**
Expand Down
Loading