Skip to content

Commit

Permalink
Merge branch 'dev' into add_file_sink_v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Jul 7, 2022
2 parents 07b68bc + 59ce8a2 commit b0434e7
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 94 deletions.
13 changes: 2 additions & 11 deletions docs/en/connector-v2/sink/File.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,6 @@ import TabItem from '@theme/TabItem';

Output data to local or hdfs or s3 file.

:::tip

Used to write data to file. Supports Batch and Streaming mode.

* [x] Batch
* [x] Streaming

:::

## Options

<Tabs
Expand All @@ -36,7 +27,7 @@ Used to write data to file. Supports Batch and Streaming mode.
| field_delimiter | string | no | '\001' |
| row_delimiter | string | no | "\n" |
| partition_by | array | no | - |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
| partition_dir_expression | string | no | "\${k0}=\${v0}\/\${k1}=\${v1}\/...\/\${kn}=\${vn}\/" |
| is_partition_field_write_in_file | boolean| no | false |
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean| no | true |
Expand Down Expand Up @@ -129,7 +120,7 @@ Streaming Job not support `overwrite`.
| field_delimiter | string | no | '\001' |
| row_delimiter | string | no | "\n" |
| partition_by | array | no | - |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
| partition_dir_expression | string | no | "\${k0}=\${v0}\/\${k1}=\${v1}\/...\/\${kn}=\${vn}\/" |
| is_partition_field_write_in_file | boolean| no | false |
| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean| no | true |
Expand Down
126 changes: 126 additions & 0 deletions docs/en/connector-v2/source/pulsar.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Apache Pulsar

## Description

Source connector for Apache Pulsar. It can support both off-line and real-time jobs.

## Options

| name | type | required | default value |
| --- | --- | --- | --- |
| topic | String | No | - |
| topic-pattern | String | No | - |
| topic-discovery.interval | Long | No | 30000 |
| subscription.name | String | Yes | - |
| client.service-url | String | Yes | - |
| admin.service-url | String | Yes | - |
| auth.plugin-class | String | No | - |
| auth.params | String | No | - |
| poll.timeout | Integer | No | 100 |
| poll.interval | Long | No | 50 |
| poll.batch.size | Integer | No | 500 |
| cursor.startup.mode | Enum | No | LATEST |
| cursor.startup.timestamp | Long | No | - |
| cursor.reset.mode | Enum | No | LATEST |
| cursor.stop.mode | Enum | No | NEVER |
| cursor.stop.timestamp | Long | No | - |

### topic [String]

Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'.

**Note, only one of "topic-pattern" and "topic" can be specified for sources.**

### topic-pattern [String]

The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running.

**Note, only one of "topic-pattern" and "topic" can be specified for sources.**

### topic-discovery.interval [Long]

The interval (in ms) for the Pulsar source to discover the new topic partitions. A non-positive value disables the topic partition discovery.

**Note, This option only works if the 'topic-pattern' option is used.**

### subscription.name [String]

Specify the subscription name for this consumer. This argument is required when constructing the consumer.

### client.service-url [String]

Service URL provider for Pulsar service.
To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.

For example, `localhost`: `pulsar://localhost:6650,localhost:6651`.

### admin.service-url [String]

The Pulsar service HTTP URL for the admin endpoint.

For example, `http://my-broker.example.com:8080`, or `https://my-broker.example.com:8443` for TLS.

### auth.plugin-class [String]

Name of the authentication plugin.

### auth.params [String]

Parameters for the authentication plugin.

For example, `key1:val1,key2:val2`

### poll.timeout [Integer]

The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency.

### poll.interval [Long]

The interval time(in ms) when fetcing records. A shorter time increases throughput, but also increases CPU load.

### poll.batch.size [Integer]

The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency.

### cursor.startup.mode [Enum]

Startup mode for Pulsar consumer, valid values are `'EARLIEST'`, `'LATEST'`, `'SUBSCRIPTION'`, `'TIMESTAMP'`.

### cursor.startup.timestamp [String]

Start from the specified epoch timestamp (in milliseconds).

**Note, This option is required when the "cursor.startup.mode" option used `'TIMESTAMP'`.**

### cursor.reset.mode [Enum]

Cursor reset strategy for Pulsar consumer valid values are `'EARLIEST'`, `'LATEST'`.

**Note, This option only works if the "cursor.startup.mode" option used `'SUBSCRIPTION'`.**

### cursor.stop.mode [String]

Stop mode for Pulsar consumer, valid values are `'NEVER'`, `'LATEST'`and `'TIMESTAMP'`.

**Note, When `'NEVER' `is specified, it is a real-time job, and other mode are off-line jobs.**

### cursor.startup.timestamp [String]

Stop from the specified epoch timestamp (in milliseconds).

**Note, This option is required when the "cursor.stop.mode" option used `'TIMESTAMP'`.**

## Example

```Jdbc {
source {
Pulsar {
topic = "example"
subscription.name = "seatunnel"
client.service-url = "localhost:pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
result_table_name = "test"
}
}
```
8 changes: 4 additions & 4 deletions docs/en/contribution/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ as an example, when you run it successfully you could see the output as below:


```log
+I[Gary, 1647423592505]
+I[Kid Xiong, 1647423593510]
+I[Ricky Huo, 1647423598537]
+I[Ricky Huo, 71]
+I[Gary, 12]
+I[Ricky Huo, 93]
...
...
+I[Gary, 1647423597533]
+I[Ricky Huo, 83]
```

## What's More
Expand Down
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,4 @@ seatunnel.source.Jdbc = connector-jdbc
seatunnel.sink.Jdbc = connector-jdbc
seatunnel.sink.HdfsFile = connector-file-hadoop
seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.Pulsar = connector-pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -19,61 +19,48 @@

public class SourceProperties {

// Pulsar client API config prefix.
public static final String CLIENT_CONFIG_PREFIX = "pulsar.client.";
// Pulsar admin API config prefix.
public static final String ADMIN_CONFIG_PREFIX = "pulsar.admin.";

// --------------------------------------------------------------------------------------------
// The configuration for ClientConfigurationData part.
// All the configuration listed below should have the pulsar.client prefix.
// --------------------------------------------------------------------------------------------

public static final String PULSAR_SERVICE_URL = CLIENT_CONFIG_PREFIX + "serviceUrl";
public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = CLIENT_CONFIG_PREFIX + "authPluginClassName";
public static final String PULSAR_AUTH_PARAMS = CLIENT_CONFIG_PREFIX + "authParams";
public static final String CLIENT_SERVICE_URL = "client.service-url";
public static final String AUTH_PLUGIN_CLASS = "auth.plugin-class";
public static final String AUTH_PARAMS = "auth.params";

// --------------------------------------------------------------------------------------------
// The configuration for ClientConfigurationData part.
// All the configuration listed below should have the pulsar.client prefix.
// --------------------------------------------------------------------------------------------

public static final String PULSAR_ADMIN_URL = ADMIN_CONFIG_PREFIX + "adminUrl";

// Pulsar source connector config prefix.
public static final String SOURCE_CONFIG_PREFIX = "pulsar.source.";
// Pulsar consumer API config prefix.
public static final String CONSUMER_CONFIG_PREFIX = "pulsar.consumer.";
public static final String ADMIN_SERVICE_URL = "admin.service-url";

// --------------------------------------------------------------------------------------------
// The configuration for ConsumerConfigurationData part.
// All the configuration listed below should have the pulsar.consumer prefix.
// --------------------------------------------------------------------------------------------

public static final String PULSAR_SUBSCRIPTION_NAME = CONSUMER_CONFIG_PREFIX + "subscriptionName";
public static final String PULSAR_SUBSCRIPTION_TYPE = CONSUMER_CONFIG_PREFIX + "subscriptionType";
public static final String PULSAR_SUBSCRIPTION_MODE = CONSUMER_CONFIG_PREFIX + "subscriptionMode";
public static final String SUBSCRIPTION_NAME = "subscription.name";
public static final String SUBSCRIPTION_TYPE = "subscription.type";
public static final String SUBSCRIPTION_MODE = "subscription.mode";

// --------------------------------------------------------------------------------------------
// The configuration for pulsar source part.
// All the configuration listed below should have the pulsar.source prefix.
// --------------------------------------------------------------------------------------------

public static final String PULSAR_PARTITION_DISCOVERY_INTERVAL_MS = SOURCE_CONFIG_PREFIX + "partitionDiscoveryIntervalMs";
public static final String PULSAR_TOPIC = SOURCE_CONFIG_PREFIX + "topic";
public static final String PULSAR_TOPIC_PATTERN = SOURCE_CONFIG_PREFIX + "topic.pattern";
public static final String PULSAR_POLL_TIMEOUT = SOURCE_CONFIG_PREFIX + "poll.timeout";
public static final String PULSAR_POLL_INTERVAL = SOURCE_CONFIG_PREFIX + "poll.interval";
public static final String PULSAR_BATCH_SIZE = SOURCE_CONFIG_PREFIX + "batch.size";
public static final String PULSAR_CURSOR_START_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.mode";
public static final String PULSAR_CURSOR_START_RESET_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.reset.mode";
public static final String PULSAR_CURSOR_START_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.start.timestamp";
public static final String PULSAR_CURSOR_START_ID = SOURCE_CONFIG_PREFIX + "scan.cursor.start.id";
public static final String PULSAR_CURSOR_STOP_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.mode";
public static final String PULSAR_CURSOR_STOP_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.timestamp";
public static final String TOPIC_DISCOVERY_INTERVAL = "topic-discovery.interval";
public static final String TOPIC = "topic";
public static final String TOPIC_PATTERN = "topic-pattern";
public static final String POLL_TIMEOUT = "poll.timeout";
public static final String POLL_INTERVAL = "poll.interval";
public static final String POLL_BATCH_SIZE = "poll.batch.size";
public static final String CURSOR_STARTUP_MODE = "cursor.startup.mode";
public static final String CURSOR_RESET_MODE = "cursor.reset.mode";
public static final String CURSOR_STARTUP_TIMESTAMP = "cursor.startup.timestamp";
public static final String CURSOR_STARTUP_ID = "cursor.startup.id";
public static final String CURSOR_STOP_MODE = "cursor.stop.mode";
public static final String CURSOR_STOP_TIMESTAMP = "cursor.stop.timestamp";

/**
* Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}.
* Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}.
*/
public enum StartMode {
/**
Expand All @@ -99,7 +86,7 @@ public enum StartMode {
}

/**
* Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}.
* Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}.
*/
public enum StopMode {
/**
Expand Down
Loading

0 comments on commit b0434e7

Please sign in to comment.