Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for Kafka secure connection with certificates X509 #70

Merged
merged 8 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.DS_Store
node_modules
node_modules
output
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ Property name | Reason | Fallback | Default
* [Kafka](https://en.wikipedia.org/wiki/Apache_Kafka)
* [WebSocket](https://en.wikipedia.org/wiki/WebSocket)



## How to use the template

This template must be used with the AsyncAPI Generator. You can find all available options [here](https://github.com/asyncapi/generator/).

In case you use X509 security and need to provide certificates, either place them in the root of the generated server with the following names: `ca.pem`, `service.cert`, `service.key`. You can provide a custom directory where cert files are located using `certFilesDir` parameter like `-p certFilesDir=../not/in/my/app/dir`.

Since you can have multiple different security schemes, to use the one of X509 type, you need to pass the name of the scheme like this: `-p securityScheme=SCHEME_NAME`.

> You can find a complete tutorial on AsyncAPI Generator using this template [here](https://www.asyncapi.com/docs/tutorials/streetlights).

### CLI

```bash
Expand All @@ -76,6 +79,7 @@ cd output
npm i

# Start server
# To enable production settings start the server with "NODE_ENV=production npm start"
npm start

##
Expand All @@ -95,16 +99,15 @@ mqtt pub -t 'smartylighting/streetlights/1/0/event/123/lighting/measured' -h 'te
#Notice that the server automatically validates incoming messages and logs out validation errors
```



## Template configuration

You can configure this template by passing different parameters in the Generator CLI: `-p PARAM1_NAME=PARAM1_VALUE -p PARAM2_NAME=PARAM2_VALUE`

|Name|Description|Required|Example|
|---|---|---|---|
|server|The server you want to use in the code.|Yes|`production`|

|securityScheme|Name of the security scheme. Only scheme with X509 and Kafka protocol is supported for now.|No|'mySchemeName'|
|certFilesDir|Directory where application certificates are located. This parameter is needed when you use X509 security scheme and your cert files are not located in the root of your application.|No|`../not/in/my/app/dir`|

## Development

Expand Down
20 changes: 20 additions & 0 deletions filters/all.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,23 @@ function replaceVariablesWithValues(url, serverVariables) {
return url;
}
filter.replaceVariablesWithValues = replaceVariablesWithValues;

function getConfig(p) {
let protocol = p;
let configName = 'broker';

if (p === 'ws') configName = 'ws';
if (p === 'kafka-secure') protocol = 'kafka';

return `config.${configName}.${protocol}`;
}
filter.getConfig = getConfig;

function getProtocol(p) {
let protocol = p;

if (p === 'kafka-secure') protocol = 'kafka';

return protocol;
}
filter.getProtocol = getProtocol;
8 changes: 8 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"amqp",
"mqtt",
"kafka",
"kafka-secure",
"ws"
],
"parameters": {
Expand All @@ -79,6 +80,13 @@
},
"asyncapiFileDir": {
"description": "Custom location of the AsyncAPI file that you provided as an input in generation. By default it is located in the root of the output directory"
},
"securityScheme": {
"description": "Name of the security scheme. Only scheme with X509 and Kafka protocol is supported for now."
},
"certFilesDir": {
"description": "Directory where application certificates are located. This parameter is needed when you use X509 security scheme and your cert files are not located in the root of your application.",
"default": "./"
}
},
"nonRenderableFiles": [
Expand Down
20 changes: 20 additions & 0 deletions template/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
# {{ asyncapi.info().title() }}

{{ asyncapi.info().description() | safe }}

## Running the server

1. Install dependencies
```sh
npm i
```
{%- if params.securityScheme and (asyncapi.server(params.server).protocol() === 'kafka' or asyncapi.server(params.server).protocol() === 'kafka-secure') and asyncapi.components().securityScheme(params.securityScheme).type() === 'X509' %}
1. (Optional) For X509 security provide files with all data required to establish secure connection using certificates. Place files like `ca.pem`, `service.cert`, `service.key` in the root of the project or the location that you explicitly specified during generation.
{%- endif %}
1. Start the server with default configuration
```sh
npm start
```
1. (Optional) Start server with secure production configuration
```sh
NODE_ENV=production npm start
```

> NODE_ENV=production relates to `config/common.yml` that contains different configurations for different environments. Starting server without `NODE_ENV` applies default configuration while starting the server as `NODE_ENV=production npm start` applies default configuration supplemented by configuration settings called `production`.
8 changes: 5 additions & 3 deletions template/config/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ default:
retain:
subscribe: true
{%- endif %}
{%- if asyncapi.server(params.server).protocol() === "kafka" %}
{%- if asyncapi.server(params.server).protocol() === "kafka" or asyncapi.server(params.server).protocol() === "kafka-secure" %}
kafka:
clientId: {{ asyncapi.info().title() | camelCase }}
brokers:
- {{ asyncapi.server(params.server).url() | replaceVariablesWithValues(asyncapi.server(params.server).variables()) | host }}
- {{ asyncapi.server(params.server).url() | replaceVariablesWithValues(asyncapi.server(params.server).variables()) | stripProtocol }}
consumerOptions:
groupId: {{ asyncapi.info().title() | camelCase }}
topics:
Expand All @@ -56,13 +56,15 @@ test:
staging:

production:
{%- if asyncapi.server(params.server).protocol() === "kafka" %}
{%- if asyncapi.server(params.server).protocol() === "kafka" or asyncapi.server(params.server).protocol() === "kafka-secure"%}
broker:
kafka:
ssl:
rejectUnauthorized: true
{%- if params.securityScheme and asyncapi.components().securityScheme(params.securityScheme).type() !== 'X509' %}
sasl:
mechanism: 'plain'
username:
password:
{%- endif %}
{%- endif %}
2 changes: 1 addition & 1 deletion template/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{% if asyncapi.server(params.server).protocol() === 'mqtt' -%}
"hermesjs-mqtt": "2.x",
{%- endif -%}
{% if asyncapi.server(params.server).protocol() === 'kafka' -%}
{% if asyncapi.server(params.server).protocol() === 'kafka' or asyncapi.server(params.server).protocol() === 'kafka-secure' -%}
"hermesjs-kafka": "2.x",
{%- endif -%}
{% if asyncapi.server(params.server).protocol() === 'amqp' -%}
Expand Down
20 changes: 18 additions & 2 deletions template/src/api/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const Hermes = require('hermesjs');
const app = new Hermes();
const path = require('path');
const { cyan, gray, yellow } = require('colors/safe');
const buffer2string = require('./middlewares/buffer2string');
const string2json = require('./middlewares/string2json');
Expand All @@ -8,12 +9,27 @@ const logger = require('./middlewares/logger');
const errorLogger = require('./middlewares/error-logger');
const config = require('../lib/config');
{%- set protocol = asyncapi.server(params.server).protocol() %}
const {{ protocol | capitalize }}Adapter = require('hermesjs-{{protocol}}');
const serverConfig = {{ protocol | getConfig }};
const {{ protocol | getProtocol | capitalize }}Adapter = require('hermesjs-{{ protocol | getProtocol }}');
{%- for channelName, channel in asyncapi.channels() %}
const {{ channelName | camelCase }} = require('./routes/{{ channelName | convertToFilename }}.js');
{%- endfor %}

app.addAdapter({{ protocol | capitalize }}Adapter, config.{% if protocol === 'ws' %}ws{% else %}broker.{{protocol}}{% endif %});
{%- if params.securityScheme and (asyncapi.server(params.server).protocol() === 'kafka' or asyncapi.server(params.server).protocol() === 'kafka-secure') and asyncapi.components().securityScheme(params.securityScheme).type() === 'X509' %}
const fs = require('fs')
const certFilesDir = '{{ params.certFilesDir }}';

try {
serverConfig.ssl.ca = fs.readFileSync(path.join(process.cwd(), certFilesDir, 'ca.pem'));
serverConfig.ssl.key = fs.readFileSync(path.join(process.cwd(), certFilesDir,'service.key'));
serverConfig.ssl.cert = fs.readFileSync(path.join(process.cwd(), certFilesDir,'service.cert'));
} catch (error) {
throw new Error(`Unable to set cert files in the config: ${error}`);
}

{%- endif %}

app.addAdapter({{ protocol | getProtocol | capitalize }}Adapter, serverConfig);

app.use(buffer2string);
app.use(string2json);
Expand Down