Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add pubsub develop example for kafka #7059

Merged
merged 4 commits into from
May 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions docs/en/latest/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,87 @@ Apache APISIX implement an extensible pubsub module, which is responsible for st
- Implement the required message system instruction processing functions
- Optional: Create plugins to support advanced configurations of this messaging system

### Example
### Example of Apache Kafka

TODO, an example will be added later to point out how to support other messaging systems.
#### Add new commands and response body definitions to `pubsub.proto`

The core of the protocol definition in `pubsub.proto` is the two parts `PubSubReq` and `PubSubResp`.

First, create the `CmdKafkaFetch` command and add the required parameters. Then, register this command in the list of commands for `req` in `PubSubReq`, which is named `cmd_kafka_fetch`.

Then create the corresponding response body `KafkaFetchResp` and register it in the `resp` of `PubSubResp`, named `kafka_fetch_resp`.

The protocol definition [pubsub.proto](https://github.com/apache/apisix/blob/master/apisix/include/apisix/model/pubsub.proto).

#### Add a new option to the `scheme` configuration item in upstream

Add a new option `kafka` to the `scheme` field enumeration in the `upstream` of `apisix/schema_def.lua`.

The schema definition [schema_def.lua](https://github.com/apache/apisix/blob/master/apisix/schema_def.lua).

#### Add a new `scheme` judgment branch to `http_access_phase`

Add a `scheme` judgment branch to the `http_access_phase` function in `apisix/init.lua` to support the processing of `kafka` type upstreams. Because Apache Kafka has its clustering and partition scheme, we do not need to use the Apache APISIX built-in load balancing algorithm, so we intercept and take over the processing flow before selecting the upstream node, using the `kafka_access_phase` function.

The APISIX init file [init.lua](https://github.com/apache/apisix/blob/master/apisix/init.lua).

#### Implement the required message system commands processing functions

First, create an instance of the `pubsub` module, which is provided in the `core` package.

Then, an instance of the Apache Kafka client is created and omitted code here.

Next, add the command registered in the protocol definition above to the `pubsub` instance, which will provide a callback function that provides the parameters parsed from the communication protocol, in which the developer needs to call the kafka client to get the data and return it to the `pubsub` module as the function return value.

:::note Callback function prototype

The `params` is the data in the protocol definition; the first return value is the data, which needs to contain the fields in the response body definition, and returns the `nil` value when there is an error; the second return value is the error, and returns the error string when there is an error

:::

Finally, it enters the loop to wait for client commands, and when an error occurs, it returns the error and stops the processing flow.

The kafka pubsub implementation [kafka.lua](https://github.com/apache/apisix/blob/master/apisix/pubsub/kafka.lua).

#### Optional: Create plugins to support advanced configurations of this messaging system

Add the required fields to the plugin schema definition and write them to the context of the current request in the `access` function.

The `kafka-proxy` plugin [kafka-proxy.lua](https://github.com/apache/apisix/blob/master/apisix/plugins/kafka-proxy.lua).

Add this plugin to the list of plugins in the APISIX configuration file.

The plugins list [config-default.yaml](https://github.com/apache/apisix/blob/master/conf/config-default.yaml).

#### Results

After this is done, create a route like the one below to connect to this messaging system via APISIX using the WebSocket.

```shell
curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \
-H 'X-API-KEY: ${api-key}' \
-H 'Content-Type: application/json' \
-d '{
"uri": "/kafka",
"plugins": {
"kafka-proxy": {
"sasl": {
"username": "user",
"password": "pwd"
}
}
},
"upstream": {
"nodes": {
"kafka-server1:9092": 1,
"kafka-server2:9092": 1,
"kafka-server3:9092": 1
},
"type": "none",
"scheme": "kafka",
"tls": {
"verify": true
}
}
}'
```