Skip to content

Commit

Permalink
[Feature][Connector-V2] Add prometheus source and sink (#7265)
Browse files Browse the repository at this point in the history
  • Loading branch information
CosmosNi authored Oct 29, 2024
1 parent 8d9c6a3 commit dde6f9f
Show file tree
Hide file tree
Showing 41 changed files with 27,752 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/labeler/label-scope-conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ http:
- changed-files:
- any-glob-to-any-file: seatunnel-connectors-v2/connector-http/**
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(http)/**'
prometheus:
- all:
- changed-files:
- any-glob-to-any-file: seatunnel-connectors-v2/connector-prometheus/**
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(prometheus)/**'
hudi:
- all:
- changed-files:
Expand Down
1 change: 1 addition & 0 deletions config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ connector-tdengine
connector-web3j
connector-milvus
connector-activemq
connector-prometheus
connector-sls
connector-qdrant
connector-typesense
Expand Down
103 changes: 103 additions & 0 deletions docs/en/connector-v2/sink/Prometheus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Prometheus

> Prometheus sink connector
## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Key Features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)

## Description

Used to launch web hooks using data.

> For example, if the data from upstream is [`label: {"__name__": "test1"}, value: 1.2.3,time:2024-08-15T17:00:00`], the body content is the following: `{"label":{"__name__": "test1"}, "value":"1.23","time":"2024-08-15T17:00:00"}`
**Tips: Prometheus sink only support `post json` webhook and the data from source will be treated as body content in web hook.And does not support passing past data**

## Supported DataSource Info

In order to use the Http connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central repository.

| Datasource | Supported Versions | Dependency |
|------------|--------------------|------------------------------------------------------------------------------------------------------------------|
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus) |

## Sink Options

| Name | Type | Required | Default | Description |
|-----------------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------|
| url | String | Yes | - | Http request url |
| headers | Map | No | - | Http headers |
| retry | Int | No | - | The max retry times if request http return to `IOException` |
| retry_backoff_multiplier_ms | Int | No | 100 | The retry-backoff times(millis) multiplier if request http failed |
| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed |
| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. |
| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. |
| key_timestamp | Int | NO | - | prometheus timestamp key . |
| key_label | String | yes | - | prometheus label key |
| key_value | Double | yes | - | prometheus value |
| batch_size | Int | false | 1024 | prometheus batch size write |
| flush_interval | Long | false | 300000L | prometheus flush commit interval |
| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details |

## Example

simple:

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_double = double
c_timestamp = timestamp
}
}
result_table_name = "fake"
rows = [
{
kind = INSERT
fields = [{"__name__": "test1"}, 1.23, "2024-08-15T17:00:00"]
},
{
kind = INSERT
fields = [{"__name__": "test2"}, 1.23, "2024-08-15T17:00:00"]
}
]
}
}
sink {
Prometheus {
url = "http://prometheus:9090/api/v1/write"
key_label = "c_map"
key_value = "c_double"
key_timestamp = "c_timestamp"
batch_size = 1
}
}
```

## Changelog

### 2.3.8-beta 2024-08-22

- Add Http Sink Connector

152 changes: 152 additions & 0 deletions docs/en/connector-v2/source/Prometheus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Prometheus

> Prometheus source connector
## Description

Used to read data from Prometheus.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|---------|----------|-----------------|
| url | String | Yes | - |
| query | String | Yes | - |
| query_type | String | Yes | Instant |
| content_field | String | Yes | $.data.result.* |
| schema.fields | Config | Yes | - |
| format | String | No | json |
| params | Map | Yes | - |
| poll_interval_millis | int | No | - |
| retry | int | No | - |
| retry_backoff_multiplier_ms | int | No | 100 |
| retry_backoff_max_ms | int | No | 10000 |
| enable_multi_lines | boolean | No | false |
| common-options | config | No | - |

### url [String]

http request url

### query [String]

Prometheus expression query string

### query_type [String]

Instant/Range

1. Instant : The following endpoint evaluates an instant query at a single point in time
2. Range : The following endpoint evaluates an expression query over a range of time

https://prometheus.io/docs/prometheus/latest/querying/api/

### params [Map]

http request params

### poll_interval_millis [int]

request http api interval(millis) in stream mode

### retry [int]

The max retry times if request http return to `IOException`

### retry_backoff_multiplier_ms [int]

The retry-backoff times(millis) multiplier if request http failed

### retry_backoff_max_ms [int]

The maximum retry-backoff times(millis) if request http failed

### format [String]

the format of upstream data, default `json`.

### schema [Config]

Fill in a fixed value

```hocon
schema = {
fields {
metric = "map<string, string>"
value = double
time = long
}
}
```

#### fields [Config]

the schema fields of upstream data

### common options

Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details

## Example

### Instant:

```hocon
source {
Prometheus {
result_table_name = "http"
url = "http://mockserver:1080"
query = "up"
query_type = "Instant"
content_field = "$.data.result.*"
format = "json"
schema = {
fields {
metric = "map<string, string>"
value = double
time = long
}
}
}
}
```

### Range

```hocon
source {
Prometheus {
result_table_name = "http"
url = "http://mockserver:1080"
query = "up"
query_type = "Range"
content_field = "$.data.result.*"
format = "json"
start = "2024-07-22T20:10:30.781Z"
end = "2024-07-22T20:11:00.781Z"
step = "15s"
schema = {
fields {
metric = "map<string, string>"
value = double
time = long
}
}
}
}
```

## Changelog

### next version

- Add Prometheus Source Connector
- Reduce configuration items

101 changes: 101 additions & 0 deletions docs/zh/connector-v2/sink/Prometheus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Prometheus

> Prometheus 数据接收器
## 引擎支持

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## 主要特性

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)

## 描述

接收Source端传入的数据,利用数据触发 web hooks。

> 例如,来自上游的数据为 [`label: {"__name__": "test1"}, value: 1.2.3,time:2024-08-15T17:00:00`], 则body内容如下: `{"label":{"__name__": "test1"}, "value":"1.23","time":"2024-08-15T17:00:00"}`
**Tips: Prometheus 数据接收器 仅支持 `post json` 类型的 web hook,source 数据将被视为 webhook 中的 body 内容。并且不支持传递过去太久的数据**

## 支持的数据源信息

想使用 Prometheus 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖

| 数据源 | 支持版本 | 依赖 |
|------|-----------|------------------------------------------------------------------------------------------------------------------|
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus) |

## 接收器选项

| Name | Type | Required | Default | Description |
|-----------------------------|--------|----------|---------|-------------------------------------------------------------------|
| url | String | Yes | - | Http 请求链接 |
| headers | Map | No | - | Http 标头 |
| retry | Int | No | - | 如果请求http返回`IOException`的最大重试次数 |
| retry_backoff_multiplier_ms | Int | No | 100 | http请求失败,重试回退次数(毫秒)乘数 |
| retry_backoff_max_ms | Int | No | 10000 | http请求失败,最大重试回退时间(毫秒) |
| connect_timeout_ms | Int | No | 12000 | 连接超时设置,默认12s |
| socket_timeout_ms | Int | No | 60000 | 套接字超时设置,默认为60s |
| key_timestamp | Int | NO | - | prometheus时间戳的key. |
| key_label | String | yes | - | prometheus标签的key |
| key_value | Double | yes | - | prometheus值的key |
| batch_size | Int | false | 1024 | prometheus批量写入大小 |
| flush_interval | Long | false | 300000L | prometheus定时写入 |
| common-options | | No | - | Sink插件常用参数,请参考 [Sink常用选项 ](../sink-common-options.md) 了解详情 |

## 示例

简单示例:

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_double = double
c_timestamp = timestamp
}
}
result_table_name = "fake"
rows = [
{
kind = INSERT
fields = [{"__name__": "test1"}, 1.23, "2024-08-15T17:00:00"]
},
{
kind = INSERT
fields = [{"__name__": "test2"}, 1.23, "2024-08-15T17:00:00"]
}
]
}
}
sink {
Prometheus {
url = "http://prometheus:9090/api/v1/write"
key_label = "c_map"
key_value = "c_double"
key_timestamp = "c_timestamp"
batch_size = 1
}
}
```

## Changelog

### 2.3.8-beta 2024-08-22

- 添加prometheus接收连接器

Loading

0 comments on commit dde6f9f

Please sign in to comment.