Skip to content

Commit

Permalink
feat: add support for Kafka secure connection with certificates X509 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
derberg authored Jun 22, 2021
1 parent 5a71b85 commit 12c8435
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.DS_Store
node_modules
node_modules
output
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ Property name | Reason | Fallback | Default
* [Kafka](https://en.wikipedia.org/wiki/Apache_Kafka)
* [WebSocket](https://en.wikipedia.org/wiki/WebSocket)



## How to use the template

This template must be used with the AsyncAPI Generator. You can find all available options [here](https://github.com/asyncapi/generator/).

In case you use X509 security and need to provide certificates, either place them in the root of the generated server with the following names: `ca.pem`, `service.cert`, `service.key`. You can provide a custom directory where cert files are located using `certFilesDir` parameter like `-p certFilesDir=../not/in/my/app/dir`.

Since you can have multiple different security schemes, to use the one of X509 type, you need to pass the name of the scheme like this: `-p securityScheme=SCHEME_NAME`.

> You can find a complete tutorial on AsyncAPI Generator using this template [here](https://www.asyncapi.com/docs/tutorials/streetlights).
### CLI

```bash
Expand All @@ -76,6 +79,7 @@ cd output
npm i

# Start server
# To enable production settings start the server with "NODE_ENV=production npm start"
npm start

##
Expand All @@ -95,16 +99,15 @@ mqtt pub -t 'smartylighting/streetlights/1/0/event/123/lighting/measured' -h 'te
#Notice that the server automatically validates incoming messages and logs out validation errors
```



## Template configuration

You can configure this template by passing different parameters in the Generator CLI: `-p PARAM1_NAME=PARAM1_VALUE -p PARAM2_NAME=PARAM2_VALUE`

|Name|Description|Required|Example|
|---|---|---|---|
|server|The server you want to use in the code.|Yes|`production`|

|securityScheme|Name of the security scheme. Only scheme with X509 and Kafka protocol is supported for now.|No|'mySchemeName'|
|certFilesDir|Directory where application certificates are located. This parameter is needed when you use X509 security scheme and your cert files are not located in the root of your application.|No|`../not/in/my/app/dir`|

## Development

Expand Down
20 changes: 20 additions & 0 deletions filters/all.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,23 @@ function replaceVariablesWithValues(url, serverVariables) {
return url;
}
filter.replaceVariablesWithValues = replaceVariablesWithValues;

function getConfig(p) {
let protocol = p;
let configName = 'broker';

if (p === 'ws') configName = 'ws';
if (p === 'kafka-secure') protocol = 'kafka';

return `config.${configName}.${protocol}`;
}
filter.getConfig = getConfig;

function getProtocol(p) {
let protocol = p;

if (p === 'kafka-secure') protocol = 'kafka';

return protocol;
}
filter.getProtocol = getProtocol;
8 changes: 8 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"amqp",
"mqtt",
"kafka",
"kafka-secure",
"ws"
],
"parameters": {
Expand All @@ -79,6 +80,13 @@
},
"asyncapiFileDir": {
"description": "Custom location of the AsyncAPI file that you provided as an input in generation. By default it is located in the root of the output directory"
},
"securityScheme": {
"description": "Name of the security scheme. Only scheme with X509 and Kafka protocol is supported for now."
},
"certFilesDir": {
"description": "Directory where application certificates are located. This parameter is needed when you use X509 security scheme and your cert files are not located in the root of your application.",
"default": "./"
}
},
"nonRenderableFiles": [
Expand Down
20 changes: 20 additions & 0 deletions template/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
# {{ asyncapi.info().title() }}

{{ asyncapi.info().description() | safe }}

## Running the server

1. Install dependencies
```sh
npm i
```
{%- if params.securityScheme and (asyncapi.server(params.server).protocol() === 'kafka' or asyncapi.server(params.server).protocol() === 'kafka-secure') and asyncapi.components().securityScheme(params.securityScheme).type() === 'X509' %}
1. (Optional) For X509 security provide files with all data required to establish secure connection using certificates. Place files like `ca.pem`, `service.cert`, `service.key` in the root of the project or the location that you explicitly specified during generation.
{%- endif %}
1. Start the server with default configuration
```sh
npm start
```
1. (Optional) Start server with secure production configuration
```sh
NODE_ENV=production npm start
```

> NODE_ENV=production relates to `config/common.yml` that contains different configurations for different environments. Starting server without `NODE_ENV` applies default configuration while starting the server as `NODE_ENV=production npm start` applies default configuration supplemented by configuration settings called `production`.
8 changes: 5 additions & 3 deletions template/config/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ default:
retain:
subscribe: true
{%- endif %}
{%- if asyncapi.server(params.server).protocol() === "kafka" %}
{%- if asyncapi.server(params.server).protocol() === "kafka" or asyncapi.server(params.server).protocol() === "kafka-secure" %}
kafka:
clientId: {{ asyncapi.info().title() | camelCase }}
brokers:
- {{ asyncapi.server(params.server).url() | replaceVariablesWithValues(asyncapi.server(params.server).variables()) | host }}
- {{ asyncapi.server(params.server).url() | replaceVariablesWithValues(asyncapi.server(params.server).variables()) | stripProtocol }}
consumerOptions:
groupId: {{ asyncapi.info().title() | camelCase }}
topics:
Expand All @@ -56,13 +56,15 @@ test:
staging:

production:
{%- if asyncapi.server(params.server).protocol() === "kafka" %}
{%- if asyncapi.server(params.server).protocol() === "kafka" or asyncapi.server(params.server).protocol() === "kafka-secure"%}
broker:
kafka:
ssl:
rejectUnauthorized: true
{%- if params.securityScheme and asyncapi.components().securityScheme(params.securityScheme).type() !== 'X509' %}
sasl:
mechanism: 'plain'
username:
password:
{%- endif %}
{%- endif %}
2 changes: 1 addition & 1 deletion template/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{% if asyncapi.server(params.server).protocol() === 'mqtt' -%}
"hermesjs-mqtt": "2.x",
{%- endif -%}
{% if asyncapi.server(params.server).protocol() === 'kafka' -%}
{% if asyncapi.server(params.server).protocol() === 'kafka' or asyncapi.server(params.server).protocol() === 'kafka-secure' -%}
"hermesjs-kafka": "2.x",
{%- endif -%}
{% if asyncapi.server(params.server).protocol() === 'amqp' -%}
Expand Down
20 changes: 18 additions & 2 deletions template/src/api/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const Hermes = require('hermesjs');
const app = new Hermes();
const path = require('path');
const { cyan, gray, yellow } = require('colors/safe');
const buffer2string = require('./middlewares/buffer2string');
const string2json = require('./middlewares/string2json');
Expand All @@ -8,12 +9,27 @@ const logger = require('./middlewares/logger');
const errorLogger = require('./middlewares/error-logger');
const config = require('../lib/config');
{%- set protocol = asyncapi.server(params.server).protocol() %}
const {{ protocol | capitalize }}Adapter = require('hermesjs-{{protocol}}');
const serverConfig = {{ protocol | getConfig }};
const {{ protocol | getProtocol | capitalize }}Adapter = require('hermesjs-{{ protocol | getProtocol }}');
{%- for channelName, channel in asyncapi.channels() %}
const {{ channelName | camelCase }} = require('./routes/{{ channelName | convertToFilename }}.js');
{%- endfor %}

app.addAdapter({{ protocol | capitalize }}Adapter, config.{% if protocol === 'ws' %}ws{% else %}broker.{{protocol}}{% endif %});
{%- if params.securityScheme and (asyncapi.server(params.server).protocol() === 'kafka' or asyncapi.server(params.server).protocol() === 'kafka-secure') and asyncapi.components().securityScheme(params.securityScheme).type() === 'X509' %}
const fs = require('fs')
const certFilesDir = '{{ params.certFilesDir }}';

try {
serverConfig.ssl.ca = fs.readFileSync(path.join(process.cwd(), certFilesDir, 'ca.pem'));
serverConfig.ssl.key = fs.readFileSync(path.join(process.cwd(), certFilesDir,'service.key'));
serverConfig.ssl.cert = fs.readFileSync(path.join(process.cwd(), certFilesDir,'service.cert'));
} catch (error) {
throw new Error(`Unable to set cert files in the config: ${error}`);
}

{%- endif %}

app.addAdapter({{ protocol | getProtocol | capitalize }}Adapter, serverConfig);

app.use(buffer2string);
app.use(string2json);
Expand Down

0 comments on commit 12c8435

Please sign in to comment.