From 2a857a9e8ee3ba47e13343af2eeb0defcb0d9db9 Mon Sep 17 00:00:00 2001 From: Zongwen Li Date: Tue, 5 Jul 2022 01:38:25 +0800 Subject: [PATCH 1/2] [doc][connector-v2] pulsar source options doc Simplify option's name and add detailed documentation --- docs/en/connector-v2/source/pulsar.md | 42 ++++++++++ plugin-mapping.properties | 1 + .../pulsar/config/SourceProperties.java | 55 +++++-------- .../seatunnel/pulsar/source/PulsarSource.java | 82 +++++++++---------- 4 files changed, 105 insertions(+), 75 deletions(-) create mode 100644 docs/en/connector-v2/source/pulsar.md diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/pulsar.md new file mode 100644 index 00000000000..86195f6a4d4 --- /dev/null +++ b/docs/en/connector-v2/source/pulsar.md @@ -0,0 +1,42 @@ +# Apache Pulsar + +## Description + +Source connector for Apache Pulsar. It can support both off-line and real-time jobs. + +## Options + +| name | type | required | default | description | +| --- | --- | --- | --- | --- | +| topic | String | No | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'.
**Note,only one of "topic-pattern" and "topic" can be specified for sources.** | +| topic-pattern | String | No | - | The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running.
**Note, only one of "topic-pattern" and "topic" can be specified for sources.** | +| topic-discovery.interval | Long | No | 30000 | The interval (in ms) for the Pulsar source to discover the new topic partitions. A non-positive value disables the topic partition discovery.
**Note, This option only works if the 'topic-pattern' option is used.** | +| subscription.name | String | Yes | - | Specify the subscription name for this consumer. This argument is required when constructing the consumer. | +| client.service-url | String | Yes | - | Service URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
This is an example of `localhost`: `pulsar://localhost:6650,localhost:6651`. | +| admin.service-url | String | Yes | - | The Pulsar service HTTP URL for the admin endpoint. For example, `http://my-broker.example.com:8080`, or `https://my-broker.example.com:8443` for TLS. | +| auth.plugin-class | String | No | - | Name of the authentication plugin. | +| auth.params | String | No | - | Parameters for the authentication plugin.
Example: `key1:val1,key2:val2` | +| poll.timeout | Integer | No | 100 | The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency. | +| poll.interval | Long | No | 50 | The interval time(in ms) when fetcing records. A shorter time increases throughput, but also increases CPU load. | +| poll.batch.size | Integer | No | 500 | The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency. | +| cursor.startup.mode | Enum | No | LATEST | Startup mode for Pulsar consumer, valid values are `'EARLIEST'`, `'LATEST'`, `'SUBSCRIPTION'`, `'TIMESTAMP'`. | +| cursor.startup.timestamp | Long | No | - | Start from the specified epoch timestamp (in milliseconds).
**Note, This option is required when the "cursor.startup.mode" option used `'TIMESTAMP'`.** | +| cursor.reset.mode | Enum | No | LATEST | Cursor reset strategy for Pulsar consumer valid values are `'EARLIEST'`, `'LATEST'`.
**Note, This option only works if the "cursor.startup.mode" option used `'SUBSCRIPTION'`.** | +| cursor.stop.mode | Enum | No | NEVER | Stop mode for Pulsar consumer, valid values are `'NEVER'`, `'LATEST'`and `'TIMESTAMP'`.
**Note, When `'NEVER' `is specified, it is a real-time job, and other mode are off-line jobs.** | +| cursor.stop.timestamp | Long | No | - | Stop from the specified epoch timestamp (in milliseconds).
**Note, This option is required when the "cursor.stop.mode" option used `'TIMESTAMP'`.** | + +## Example + +```Jdbc { +source { + + Pulsar { + topic = "example" + subscription.name = "seatunnel" + client.service-url = "localhost:pulsar://localhost:6650" + admin.service-url = "http://my-broker.example.com:8080" + result_table_name = "test" + } + +} +``` \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index da3890a70a0..2c70402284f 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -101,3 +101,4 @@ seatunnel.sink.Clickhouse = connector-clickhouse seatunnel.sink.ClickhouseFile = connector-clickhouse seatunnel.source.Jdbc = connector-jdbc seatunnel.sink.Jdbc = connector-jdbc +seatunnel.source.Pulsar = connector-pulsar diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java index e8692322059..92453efee4c 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java @@ -19,61 +19,48 @@ public class SourceProperties { - // Pulsar client API config prefix. - public static final String CLIENT_CONFIG_PREFIX = "pulsar.client."; - // Pulsar admin API config prefix. - public static final String ADMIN_CONFIG_PREFIX = "pulsar.admin."; - // -------------------------------------------------------------------------------------------- // The configuration for ClientConfigurationData part. - // All the configuration listed below should have the pulsar.client prefix. // -------------------------------------------------------------------------------------------- - public static final String PULSAR_SERVICE_URL = CLIENT_CONFIG_PREFIX + "serviceUrl"; - public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = CLIENT_CONFIG_PREFIX + "authPluginClassName"; - public static final String PULSAR_AUTH_PARAMS = CLIENT_CONFIG_PREFIX + "authParams"; + public static final String CLIENT_SERVICE_URL = "client.service-url"; + public static final String AUTH_PLUGIN_CLASS = "auth.plugin-class"; + public static final String AUTH_PARAMS = "auth.params"; // -------------------------------------------------------------------------------------------- // The configuration for ClientConfigurationData part. // All the configuration listed below should have the pulsar.client prefix. // -------------------------------------------------------------------------------------------- - public static final String PULSAR_ADMIN_URL = ADMIN_CONFIG_PREFIX + "adminUrl"; - - // Pulsar source connector config prefix. - public static final String SOURCE_CONFIG_PREFIX = "pulsar.source."; - // Pulsar consumer API config prefix. - public static final String CONSUMER_CONFIG_PREFIX = "pulsar.consumer."; + public static final String ADMIN_SERVICE_URL = "admin.service-url"; // -------------------------------------------------------------------------------------------- // The configuration for ConsumerConfigurationData part. - // All the configuration listed below should have the pulsar.consumer prefix. // -------------------------------------------------------------------------------------------- - public static final String PULSAR_SUBSCRIPTION_NAME = CONSUMER_CONFIG_PREFIX + "subscriptionName"; - public static final String PULSAR_SUBSCRIPTION_TYPE = CONSUMER_CONFIG_PREFIX + "subscriptionType"; - public static final String PULSAR_SUBSCRIPTION_MODE = CONSUMER_CONFIG_PREFIX + "subscriptionMode"; + public static final String SUBSCRIPTION_NAME = "subscription.name"; + public static final String SUBSCRIPTION_TYPE = "subscription.type"; + public static final String SUBSCRIPTION_MODE = "subscription.mode"; // -------------------------------------------------------------------------------------------- // The configuration for pulsar source part. - // All the configuration listed below should have the pulsar.source prefix. // -------------------------------------------------------------------------------------------- - public static final String PULSAR_PARTITION_DISCOVERY_INTERVAL_MS = SOURCE_CONFIG_PREFIX + "partitionDiscoveryIntervalMs"; - public static final String PULSAR_TOPIC = SOURCE_CONFIG_PREFIX + "topic"; - public static final String PULSAR_TOPIC_PATTERN = SOURCE_CONFIG_PREFIX + "topic.pattern"; - public static final String PULSAR_POLL_TIMEOUT = SOURCE_CONFIG_PREFIX + "poll.timeout"; - public static final String PULSAR_POLL_INTERVAL = SOURCE_CONFIG_PREFIX + "poll.interval"; - public static final String PULSAR_BATCH_SIZE = SOURCE_CONFIG_PREFIX + "batch.size"; - public static final String PULSAR_CURSOR_START_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.mode"; - public static final String PULSAR_CURSOR_START_RESET_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.reset.mode"; - public static final String PULSAR_CURSOR_START_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.start.timestamp"; - public static final String PULSAR_CURSOR_START_ID = SOURCE_CONFIG_PREFIX + "scan.cursor.start.id"; - public static final String PULSAR_CURSOR_STOP_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.mode"; - public static final String PULSAR_CURSOR_STOP_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.timestamp"; + public static final String TOPIC_DISCOVERY_INTERVAL = "topic-discovery.interval"; + public static final String TOPIC = "topic"; + public static final String TOPIC_PATTERN = "topic-pattern"; + public static final String POLL_TIMEOUT = "poll.timeout"; + public static final String POLL_INTERVAL = "poll.interval"; + public static final String POLL_BATCH_SIZE = "poll.batch.size"; + public static final String CURSOR_STARTUP_MODE = "cursor.startup.mode"; + public static final String CURSOR_RESET_MODE = "cursor.reset.mode"; + public static final String CURSOR_STARTUP_TIMESTAMP = "cursor.startup.timestamp"; + public static final String CURSOR_STARTUP_ID = "cursor.startup.id"; + public static final String CURSOR_STOP_MODE = "cursor.stop.mode"; + public static final String CURSOR_STOP_TIMESTAMP = "cursor.stop.timestamp"; /** - * Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}. + * Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}. */ public enum StartMode { /** @@ -99,7 +86,7 @@ public enum StartMode { } /** - * Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}. + * Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}. */ public enum StopMode { /** diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java index 20028efb071..9247e1d2b8a 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java @@ -19,25 +19,25 @@ import static org.apache.seatunnel.common.PropertiesUtil.getEnum; import static org.apache.seatunnel.common.PropertiesUtil.setOption; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_ADMIN_URL; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_AUTH_PARAMS; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_AUTH_PLUGIN_CLASS_NAME; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_BATCH_SIZE; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_MODE; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_RESET_MODE; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_TIMESTAMP; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_STOP_MODE; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_STOP_TIMESTAMP; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_POLL_INTERVAL; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_POLL_TIMEOUT; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_SERVICE_URL; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_SUBSCRIPTION_NAME; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_TOPIC; -import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_TOPIC_PATTERN; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode.LATEST; import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL; +import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.serialization.DefaultSerializer; @@ -100,48 +100,48 @@ public String getPluginName() { @SuppressWarnings("checkstyle:MagicNumber") @Override public void prepare(Config config) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(config, PULSAR_SUBSCRIPTION_NAME, PULSAR_SERVICE_URL, PULSAR_ADMIN_URL); + CheckResult result = CheckConfigUtil.checkAllExists(config, SUBSCRIPTION_NAME, CLIENT_SERVICE_URL, ADMIN_SERVICE_URL); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } // admin config PulsarAdminConfig.Builder adminConfigBuilder = PulsarAdminConfig.builder() - .adminUrl(config.getString(PULSAR_ADMIN_URL)); - setOption(config, PULSAR_AUTH_PLUGIN_CLASS_NAME, config::getString, adminConfigBuilder::authPluginClassName); - setOption(config, PULSAR_AUTH_PARAMS, config::getString, adminConfigBuilder::authParams); + .adminUrl(config.getString(ADMIN_SERVICE_URL)); + setOption(config, AUTH_PLUGIN_CLASS, config::getString, adminConfigBuilder::authPluginClassName); + setOption(config, AUTH_PARAMS, config::getString, adminConfigBuilder::authParams); this.adminConfig = adminConfigBuilder.build(); // client config PulsarClientConfig.Builder clientConfigBuilder = PulsarClientConfig.builder() - .serviceUrl(config.getString(PULSAR_SERVICE_URL)); - setOption(config, PULSAR_AUTH_PLUGIN_CLASS_NAME, config::getString, clientConfigBuilder::authPluginClassName); - setOption(config, PULSAR_AUTH_PARAMS, config::getString, clientConfigBuilder::authParams); + .serviceUrl(config.getString(CLIENT_SERVICE_URL)); + setOption(config, AUTH_PLUGIN_CLASS, config::getString, clientConfigBuilder::authPluginClassName); + setOption(config, AUTH_PARAMS, config::getString, clientConfigBuilder::authParams); this.clientConfig = clientConfigBuilder.build(); // consumer config PulsarConsumerConfig.Builder consumerConfigBuilder = PulsarConsumerConfig.builder() - .subscriptionName(config.getString(PULSAR_SERVICE_URL)); + .subscriptionName(config.getString(SUBSCRIPTION_NAME)); this.consumerConfig = consumerConfigBuilder.build(); // source properties setOption(config, - PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, + TOPIC_DISCOVERY_INTERVAL, 30000L, config::getLong, v -> this.partitionDiscoveryIntervalMs = v); setOption(config, - PULSAR_POLL_TIMEOUT, + POLL_TIMEOUT, 100, config::getInt, v -> this.pollTimeout = v); setOption(config, - PULSAR_POLL_INTERVAL, + POLL_INTERVAL, 50L, config::getLong, v -> this.pollInterval = v); setOption(config, - PULSAR_BATCH_SIZE, + POLL_BATCH_SIZE, 500, config::getInt, v -> this.batchSize = v); @@ -159,7 +159,7 @@ public void prepare(Config config) throws PrepareFailException { } private void setStartCursor(Config config) { - StartMode startMode = getEnum(config, PULSAR_CURSOR_START_MODE, StartMode.class, LATEST); + StartMode startMode = getEnum(config, CURSOR_STARTUP_MODE, StartMode.class, LATEST); switch (startMode) { case EARLIEST: this.startCursor = StartCursor.earliest(); @@ -169,16 +169,16 @@ private void setStartCursor(Config config) { break; case SUBSCRIPTION: SubscriptionStartCursor.CursorResetStrategy resetStrategy = getEnum(config, - PULSAR_CURSOR_START_RESET_MODE, + CURSOR_RESET_MODE, SubscriptionStartCursor.CursorResetStrategy.class, SubscriptionStartCursor.CursorResetStrategy.LATEST); this.startCursor = StartCursor.subscription(resetStrategy); break; case TIMESTAMP: - if (StringUtils.isBlank(config.getString(PULSAR_CURSOR_START_TIMESTAMP))) { - throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", PULSAR_CURSOR_START_TIMESTAMP, PULSAR_CURSOR_START_MODE)); + if (StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP))) { + throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STARTUP_TIMESTAMP, CURSOR_STARTUP_MODE)); } - setOption(config, PULSAR_CURSOR_START_TIMESTAMP, config::getLong, timestamp -> this.startCursor = StartCursor.timestamp(timestamp)); + setOption(config, CURSOR_STARTUP_TIMESTAMP, config::getLong, timestamp -> this.startCursor = StartCursor.timestamp(timestamp)); break; default: throw new IllegalArgumentException(String.format("The %s mode is not supported.", startMode)); @@ -186,7 +186,7 @@ private void setStartCursor(Config config) { } private void setStopCursor(Config config) { - SourceProperties.StopMode stopMode = getEnum(config, PULSAR_CURSOR_STOP_MODE, SourceProperties.StopMode.class, NEVER); + SourceProperties.StopMode stopMode = getEnum(config, CURSOR_STOP_MODE, SourceProperties.StopMode.class, NEVER); switch (stopMode) { case LATEST: this.stopCursor = StopCursor.latest(); @@ -195,10 +195,10 @@ private void setStopCursor(Config config) { this.stopCursor = StopCursor.never(); break; case TIMESTAMP: - if (StringUtils.isBlank(config.getString(PULSAR_CURSOR_STOP_TIMESTAMP))) { - throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", PULSAR_CURSOR_STOP_TIMESTAMP, PULSAR_CURSOR_STOP_MODE)); + if (StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP))) { + throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STOP_TIMESTAMP, CURSOR_STOP_MODE)); } - setOption(config, PULSAR_CURSOR_START_TIMESTAMP, config::getLong, timestamp -> this.stopCursor = StopCursor.timestamp(timestamp)); + setOption(config, CURSOR_STARTUP_TIMESTAMP, config::getLong, timestamp -> this.stopCursor = StopCursor.timestamp(timestamp)); break; default: throw new IllegalArgumentException(String.format("The %s mode is not supported.", stopMode)); @@ -206,19 +206,19 @@ private void setStopCursor(Config config) { } private void setPartitionDiscoverer(Config config) { - String topic = config.getString(PULSAR_TOPIC); + String topic = config.getString(TOPIC); if (StringUtils.isNotBlank(topic)) { this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ","))); } - String topicPattern = config.getString(PULSAR_TOPIC_PATTERN); + String topicPattern = config.getString(TOPIC_PATTERN); if (StringUtils.isNotBlank(topicPattern)) { if (this.partitionDiscoverer != null) { - throw new IllegalArgumentException(String.format("The properties '%s' and '%s' is exclusive.", PULSAR_TOPIC, PULSAR_TOPIC_PATTERN)); + throw new IllegalArgumentException(String.format("The properties '%s' and '%s' is exclusive.", TOPIC, TOPIC_PATTERN)); } this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern)); } if (this.partitionDiscoverer == null) { - throw new IllegalArgumentException(String.format("The properties '%s' or '%s' is required.", PULSAR_TOPIC, PULSAR_TOPIC_PATTERN)); + throw new IllegalArgumentException(String.format("The properties '%s' or '%s' is required.", TOPIC, TOPIC_PATTERN)); } } From 50142efe5a7a12cc43f86b7831e61ac493cb5b17 Mon Sep 17 00:00:00 2001 From: Zongwen Li Date: Wed, 6 Jul 2022 18:59:16 +0800 Subject: [PATCH 2/2] [doc][connector-v2] Standardized Pulsar document --- docs/en/connector-v2/source/pulsar.md | 124 +++++++++++++++++++++----- 1 file changed, 104 insertions(+), 20 deletions(-) diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/pulsar.md index 86195f6a4d4..68f9da4df7c 100644 --- a/docs/en/connector-v2/source/pulsar.md +++ b/docs/en/connector-v2/source/pulsar.md @@ -6,30 +6,115 @@ Source connector for Apache Pulsar. It can support both off-line and real-time j ## Options -| name | type | required | default | description | -| --- | --- | --- | --- | --- | -| topic | String | No | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'.
**Note,only one of "topic-pattern" and "topic" can be specified for sources.** | -| topic-pattern | String | No | - | The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running.
**Note, only one of "topic-pattern" and "topic" can be specified for sources.** | -| topic-discovery.interval | Long | No | 30000 | The interval (in ms) for the Pulsar source to discover the new topic partitions. A non-positive value disables the topic partition discovery.
**Note, This option only works if the 'topic-pattern' option is used.** | -| subscription.name | String | Yes | - | Specify the subscription name for this consumer. This argument is required when constructing the consumer. | -| client.service-url | String | Yes | - | Service URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
This is an example of `localhost`: `pulsar://localhost:6650,localhost:6651`. | -| admin.service-url | String | Yes | - | The Pulsar service HTTP URL for the admin endpoint. For example, `http://my-broker.example.com:8080`, or `https://my-broker.example.com:8443` for TLS. | -| auth.plugin-class | String | No | - | Name of the authentication plugin. | -| auth.params | String | No | - | Parameters for the authentication plugin.
Example: `key1:val1,key2:val2` | -| poll.timeout | Integer | No | 100 | The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency. | -| poll.interval | Long | No | 50 | The interval time(in ms) when fetcing records. A shorter time increases throughput, but also increases CPU load. | -| poll.batch.size | Integer | No | 500 | The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency. | -| cursor.startup.mode | Enum | No | LATEST | Startup mode for Pulsar consumer, valid values are `'EARLIEST'`, `'LATEST'`, `'SUBSCRIPTION'`, `'TIMESTAMP'`. | -| cursor.startup.timestamp | Long | No | - | Start from the specified epoch timestamp (in milliseconds).
**Note, This option is required when the "cursor.startup.mode" option used `'TIMESTAMP'`.** | -| cursor.reset.mode | Enum | No | LATEST | Cursor reset strategy for Pulsar consumer valid values are `'EARLIEST'`, `'LATEST'`.
**Note, This option only works if the "cursor.startup.mode" option used `'SUBSCRIPTION'`.** | -| cursor.stop.mode | Enum | No | NEVER | Stop mode for Pulsar consumer, valid values are `'NEVER'`, `'LATEST'`and `'TIMESTAMP'`.
**Note, When `'NEVER' `is specified, it is a real-time job, and other mode are off-line jobs.** | -| cursor.stop.timestamp | Long | No | - | Stop from the specified epoch timestamp (in milliseconds).
**Note, This option is required when the "cursor.stop.mode" option used `'TIMESTAMP'`.** | +| name | type | required | default value | +| --- | --- | --- | --- | +| topic | String | No | - | +| topic-pattern | String | No | - | +| topic-discovery.interval | Long | No | 30000 | +| subscription.name | String | Yes | - | +| client.service-url | String | Yes | - | +| admin.service-url | String | Yes | - | +| auth.plugin-class | String | No | - | +| auth.params | String | No | - | +| poll.timeout | Integer | No | 100 | +| poll.interval | Long | No | 50 | +| poll.batch.size | Integer | No | 500 | +| cursor.startup.mode | Enum | No | LATEST | +| cursor.startup.timestamp | Long | No | - | +| cursor.reset.mode | Enum | No | LATEST | +| cursor.stop.mode | Enum | No | NEVER | +| cursor.stop.timestamp | Long | No | - | + +### topic [String] + +Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. + +**Note, only one of "topic-pattern" and "topic" can be specified for sources.** + +### topic-pattern [String] + +The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. + +**Note, only one of "topic-pattern" and "topic" can be specified for sources.** + +### topic-discovery.interval [Long] + +The interval (in ms) for the Pulsar source to discover the new topic partitions. A non-positive value disables the topic partition discovery. + +**Note, This option only works if the 'topic-pattern' option is used.** + +### subscription.name [String] + +Specify the subscription name for this consumer. This argument is required when constructing the consumer. + +### client.service-url [String] + +Service URL provider for Pulsar service. +To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL. +You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme. + +For example, `localhost`: `pulsar://localhost:6650,localhost:6651`. + +### admin.service-url [String] + +The Pulsar service HTTP URL for the admin endpoint. + +For example, `http://my-broker.example.com:8080`, or `https://my-broker.example.com:8443` for TLS. + +### auth.plugin-class [String] + +Name of the authentication plugin. + +### auth.params [String] + +Parameters for the authentication plugin. + +For example, `key1:val1,key2:val2` + +### poll.timeout [Integer] + +The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency. + +### poll.interval [Long] + +The interval time(in ms) when fetcing records. A shorter time increases throughput, but also increases CPU load. + +### poll.batch.size [Integer] + +The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency. + +### cursor.startup.mode [Enum] + +Startup mode for Pulsar consumer, valid values are `'EARLIEST'`, `'LATEST'`, `'SUBSCRIPTION'`, `'TIMESTAMP'`. + +### cursor.startup.timestamp [String] + +Start from the specified epoch timestamp (in milliseconds). + +**Note, This option is required when the "cursor.startup.mode" option used `'TIMESTAMP'`.** + +### cursor.reset.mode [Enum] + +Cursor reset strategy for Pulsar consumer valid values are `'EARLIEST'`, `'LATEST'`. + +**Note, This option only works if the "cursor.startup.mode" option used `'SUBSCRIPTION'`.** + +### cursor.stop.mode [String] + +Stop mode for Pulsar consumer, valid values are `'NEVER'`, `'LATEST'`and `'TIMESTAMP'`. + +**Note, When `'NEVER' `is specified, it is a real-time job, and other mode are off-line jobs.** + +### cursor.startup.timestamp [String] + +Stop from the specified epoch timestamp (in milliseconds). + +**Note, This option is required when the "cursor.stop.mode" option used `'TIMESTAMP'`.** ## Example ```Jdbc { source { - Pulsar { topic = "example" subscription.name = "seatunnel" @@ -37,6 +122,5 @@ source { admin.service-url = "http://my-broker.example.com:8080" result_table_name = "test" } - } ``` \ No newline at end of file