-
Notifications
You must be signed in to change notification settings - Fork 20
/
README
272 lines (210 loc) · 9.92 KB
/
README
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
== Fubar ==
An MQTT message broker written in erlang targeted to support internet scale
applications.
== Getting Started ==
1. Getting dependencies and building
$ make deps
$ make
2. Starting a broker in interactive mode
$ make test
3. Testing
3.1. Starting a node for client
$ make client
3.2. Connecting clients (in the client shell)
1> C1 = mqtt_client:start([{client_id, <<"c1">>}]).
2> C2 = mqtt_client:start([{client_id, <<"c2">>}]).
3.3. Subscribing and publishing messages
3> C1 ! mqtt:subscribe([{topics, [<<"t1">>]}]).
4> C2 ! mqtt:publish([{topic, <<"t1">>}, {payload, <<"hello!">>}]).
3.4. Direct messaging
This is an extra feature not originally stated in MQTT specification.
One client may use the other client's client id as a topic name to
send a message directly to the client.
5> C2 ! mqtt:publish([{topic, <<"c1">>}, {payload, <<"world!">>}]).
4. Shutting down the broker gracefully.
(fubar@host)1> fubar:stop().
(fubar@host)2> q().
== More Features ==
1. Adding a new broker node to existing broker cluster
$ make test master=name@host
Note) If you want to start more than one broker node in a computer,
You have to use different node name and listen port as:
$ make test master=name@host node=other mqtt_port=1884
2. Using account control
Uncomment {auth, mqtt_account} in src/fubar.app.src to enable auth.
Then, call mqtt_account:update/2 as:
(fubar@host)1> mqtt_account:update(<<"romeo">>, <<"1234">>).
3. More parameters for a client
Full list of client parameters are:
1> C3 = mqtt_client:start([{hostname, "localhost"},
{port, 1884},
{username, <<"romeo">>},
{password, <<"1234">>},
{client_id, <<"c1">>},
{keep_alive, 60},
clean_session,
{will_topic, <<"will">>},
{will_message, <<"bye-bye">>},
{will_qos, at_least_once},
will_retain,
socket_options, [...]]).
Refer inet:setopts/2 for socket_options.
4. Starting a broker in daemon mode
Use 'run' instead of 'test'.
$ make run
Note) You may not start more than one daemon in a machine.
5. Getting a daemon shell
$ make debug
Note) The debug shell is closely bound with the daemon. If you exit from
the debug shell in a normal way, the daemon will stop. Use CTRL-D to stop
debugging.
6. Broker state is preserved
The broker restores all the state -- accounts, topics and subscriptions -
on restart. To clear this state, do:
$ make reset
7. Dumping logs to text files
SASL logs can be taken as:
$ make dump
SASL logs show events happened in the start-up process and events not dealt
with by the application.
Application logs can be taken from the shell as:
(fubar@host)1> fubar_log:dump(trace, "trace.log").
Or you can dump all the logs by:
(fubar@host)1> fubar_log:dump().
Log classes are:
- access % network addresses of clients and disconnect events
- packet % bytes sent and received by the broker
- protocol % mqtt messages sent and received by the broker
- resource % long lasting resources such as sessions and topics
- debug % message put for debugging purpose
- info % irrelevant events taken but dropped
- warning % abnormal conditions that are not considered critical
- error % irrecoverable failures
- trace % fubar packets marked as trace
8. Log management
Basically, two types of logs are used in fubar. One is error_logger and the
other is disk_log. Most of the runtime events use disk_log and system
start-up, shutdown events use error_logger. error_logger is also called
as SASL log. The other difference is the fact that disk_log supports
distributed logging but SASL log doesn't, which means that you can read
all the disk_logs created in many nodes in one place but you have to go
to each node to read SASL log. Also, you can open/close/redirect disk_log
at runtime. Refer fubar_log:open/1, close/1, out/1, out/2 for more info.
Log configuration stored in src/fubar.app.src controls both logs.
~ {fubar_log, [{dir, "priv/log"},
~ {max_bytes, 10485760},
~ {max_files, 10},
~ {classes, [{trace, standard_io}, {warning, "fubar"}]}
~ ]},
- dir % Log files are left under <dir>/<node> usually "priv/log/fubar"
- max_bytes % Maximum individual log file size per class
- max_files % Maximum number of rolling log files per class
- classes % Initial disk_log classes to open
Classes take {Class, Redirect} as values. Class is one of the log classes
listed in section 7. Redirect is one of none, standard_io or "prefix".
The meaning of the redirect is as follows.
- none % No redirect. Logs are left in the binary files only.
- standard_io % Redirect to tty.
- "prefix" % Redirect to daily log files with the prefix.
% e.g, "prefix_2012-12-29.log.1"
As noted before, disk_log is distributed log. Common practice to use
distributed log in production is to use one node for central log consumer
and set all the other nodes as log producer. To configure your system like
this, start a node with 'make run mqtt=undefined' command from a computer.
Then you have a node 'fubar@host1' without a listener. Now start other
nodes with 'make run master=fubar@host1' command from other computers. You
may turn all the log classes off in all the other nodes and control in only
one place, the monitor node.
You will find following files under the log directory.
- 1, 2,..., index % SASL logs, binary rolling files.
- access.*,... % disk_logs for each class, binary rolling files.
- run_erl.log % Broker start-up linux command (production only).
- erlang.log.* % Erlang shell input/output history (production only).
- *_YYYY-MM-DD.log.* % Daily logs, text files without size limit.
If you have daily logs in your log directory, you have to delete them
regularly. They are produced if you redirect any disk_log to file by
setting {Class, "prefix"} in src/fubar.app.src or by calling
fubar_log:out(Class, "prefix").
9. SSL support
9.1. Prepare a certificate authority
$ cd priv/ssl/ca
$ mkdir certs private
$ chmod 700 private
$ echo 01 > serial
$ touch index.txt
$ openssl req -x509 -config openssl.cnf -newkey rsa:2048 -days 365 \
-out cacert.pem -outform PEM -subj /CN=fubar_ca/ -nodes
$ openssl x509 -in cacert.pem -out cacert.cer -outform DER
9.2. Create a server certificate
$ openssl genrsa -out ../key.pem 2048
$ openssl req -new -key ../key.pem -out ../req.pem -outform PEM \
-subj /CN=SomeHostName/O=YourOrganizationName/ -nodes
$ openssl ca -config openssl.cnf -in ../req.pem -out ../cert.pem
-notext -batch -extensions server_ca_extensions
$ openssl pkcs12 -export -in ../cert.pem -out ../keycert.p12 \
-inkey ../key.pem -passout pass:YourPassword
The processes in 9.1 and 9.2 are stored in batch script
priv/ssl/ca/new-certs.sh. So you can just,
$ cd priv/ssl/ca
$ ./new-certs.sh
9.3. Testing
Use mqtts_port command line parameter to start an SSL listener when
starting the broker.
$ cd ../../..
$ make test mqtts_port=8883
(fubar@host)1>
Test basic connection with openssl s_client.
$ openssl s_client -connect localhost:8883
If it succeeds, use {transport, ranch_ssl} option on the client side.
$ make client
1> ssl:start().
2> mqtt_client:start([{port, 8883},{transport, ranch_ssl},
{client_id, <<"ssltest">>}]).
10. Benchmarking
You can simulate many client connections with a computer.
First, start a client shell.
$ make client
1>
mqtt_client module provides batch functions, batch_start/2,3,
batch_restart/0,1 and batch_stop/0. Prepare a list of client id.
1> Id = [list_to_binary(io_lib:format("~4..0B", [N])) ||
N <- lists:seq(1, 1000)].
[<<"0001">>,<<"0002">>,...,<<"1000">>]
Now you can use the list to call batch_start/2,3 as:
2> mqtt_client:batch_start(Id, [{host, "broker addr"}, clean_session]).
[<0.121.0>,<0.122.0>,...]
It may take some time to complete the batch start operation.
You should be able to connect more than 50k clients from a computer.
But of course you need to do proper kernel tuning and ulimit setting to
do that.
Start another client for probing. We'll use this client later to
generate load.
3> Probe = mqtt_client:start([{host, "broker addr"},
{client_id, <<"probe">>}, clean_session]).
Now, we are ready to generate load. Let's try direct messaging.
4> Worker = mqtt_probe:start(Probe, Id, <<"...">>, 0).
You should find endless messages printed by the batch clients.
The above command spawns a Worker that publishes a message <<"...">> to one
of the Id list every 0 millisecond using the client Probe. You may want to
monitor the broker's cpu/memory/io performance during this benchmark. Use
top or sar in that case.
You can stop the worker any time.
5> mqtt_probe:stop(Worker).
Let's try another type of load now. Make all the clients subscribe to a
common topic.
6> [Pid ! mqtt:subscribe([{topics,<<"t1">>}]) ||
{_, Pid} <- mqtt_client_sup:running()].
Then, use the probe to publish messages to the common topic periodically.
7> f(Worker).
8> Worker = mqtt_probe:start(Probe, <<"t1">>, <<"...">>, 1000).
And so it goes.
== Configurations ==
Initial broker configuration is read from ebin/fubar.app which is compiled
from src/fubar.app.src. You have to edit src/fubar.app.src to make your
configuration applied whenever you restart the broker. Or you can call
fubar:settings/2 to apply changes while the broker is running.
(fubar@host)1> fubar:settings(mqtt_protocol, {max_packet_size, 8192}).
The above sample shows changing maximum MQTT packet size to 8kB. This change
affects all the clients connection from this point on. Note that some change
applies immediately but others require further intervention.
Refer src/fubar.app.src for details.