forked from booksbyus/zguide
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chapter8.txt
1043 lines (691 loc) · 95 KB
/
chapter8.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
.output chapter8.wd
.bookmark moving-pieces
++ A Universe of Moving Pieces
////
AO: This title does not tell the reader what you're offering in this chapter, and therefore should be replaced. You want people to come read it, so give them a title that indicates how they'll benefit by knowing the content. I'm a little fuzzy on what the benefit is, myself!
////
So far in this book I've aimed to take you though a journey of understanding 0MQ in its many aspects. By now you may have started to build your own products using the techniques I explained, and others you've figured out yourself. You will start to face questions about to make these products work in the real world.
But what is that "real world"? My view is that it is becoming a world of ever increasing numbers of moving pieces. Some people use the phrase the "Internet of Things", suggesting that we'll see a new category of devices that are more numerous but rather stupider than our current smart phones and tablets and laptops and servers. However, I don't think the data points this way at all. Yes, more and more devices, but they're not stupid at all. They're smart and powerful and getting more so all the time.
The mechanism at work is something I call "Cost Gravity" and it has the effect of cutting the cost of technology by half every 18-24 months. Or, put another way, our global computing capacity doubles every two years, over and over and over. The future is filled with trillions of devices that are fully powerful multi-core computers: they don't run some cut-down "operating system for things" but full operating systems and full applications.
And this is the world we're aiming at with 0MQ. When we talk of "scale" we don't mean hundreds of computers, or even thousands. Think of clouds of tiny smart and perhaps self-replicating machines surrounding every person, filling every space, covering every wall, filling the cracks and eventually, becoming so much a part of us that we get them before birth and they follow us to death.
These clouds of tiny machines talk to each other, all the time, over short-range wireless links, using the Internet Protocol. They create mesh networks, pass information and tasks around like nervous signals. They augment our memory, vision, every aspect of our communications, and physical functions. And it's 0MQ that powers their conversations and events and exchanges of work and information.
Now, to make even a thin imitation of this come true today, we need to solve a set of technical problems (how do peers discover each other, how do they talk to existing networks like the Web, how do they protect the information they carry, how do we track and monitor them, to get some idea of what they're doing). Then we need to do what most engineers forget about: package this solution into a framework that is dead easy for ordinary developers to use.
This is what we'll attempt in this chapter: to build a framework for distributed applications, as an API, protocols, and implementations. It's not a small challenge but I've claimed often that 0MQ makes such problems simple, so let's see if that's still true.
We'll cover:
* Building a basic framework for distributed computing.
+++ Design for The Real World
Whether we're connecting a roomful of mobile devices over WiFi, or a cluster of virtual boxes over simulated Ethernet, we will hit the same kinds of problems. These are:
* //Discovery// - how do we learn about other nodes on the network? Do we use a discovery service, centralized mediation, or some kind of broadcast beacon?
* //Presence// - how do we track when other nodes come and go? Do we use some kind of central registration service, or heartbeating or beacons?
* //Connectivity// - how do we actually connect one node to another? Do we use local networking, wide-area networking, or do we use a central message broker to do the forwarding?
* //Point-to-point messaging// - how do we send a message from one node to another? Do we send this to the node's network address, or do we use some indirect addressing via a centralized message broker?
* //Group messaging// - how do we send a message from one node to a group of others? Do we work via centralized message broker, or do we use a publish-subscribe model like 0MQ?
* //Testing and simulation// - how do we simulate large numbers of nodes so we can test performance properly? Do we have to buy two dozen Android tablets, or can we use pure software simulation?
* //Distributed Logging// - how do we track what this cloud of nodes is doing so we can detect performance problems and failures? Do we create a main logging service, or do we allow every device to log the world around it?
* //Content distribution// - how do we send content from one node to another? Do we use server-centric protocols like FTP or HTTP, or do we use decentralized protocols like FileMQ?
If we can solve these problems reasonably well, and the further problems that will emerge (like security and wide-area bridging) we get something like a framework for what I might call "Really Cool Distributed Applications", or as my grandkids call it, "the software our world runs on".
You should have guessed from my rhetorical questions that there are two broad directions we can go. One is to centralize everything. The other is to distribute everything. I'm going to bet on decentralization. If you want centralization, you don't really need 0MQ; there are other options you can use.
So very roughly, here's the story. One, the number of moving pieces increases exponentially over time (doubles every 24 months). Two, these pieces stop using wires since dragging cables everywhere gets //really// boring. Three, future applications run across clusters of these pieces using the Benevolent Tyrant pattern from [#the-community]. Four, today it's really difficult, nay still rather impossible, to build such applications. Five, let's make it cheap and easy using all the techniques and tools we've built up. Six, partay!
+++ The Secret Life of WiFi
The future is clearly wireless, and while many big businesses live by concentrating data in their clouds, the future doesn't look quite so centralized. The devices at the edges of our networks get smarter every year, not dumber. They're hungry for work and information to digest and profit from. And they don't drag cables around, except once a night for power. It's all wireless, and more and more, 802.11-branded WiFi of different alphabetical flavors.
++++ Why Mesh isn't Here Yet
As such a vital part of our future, WiFi has a big problem that's not often discussed but which anyone betting on it needs to be aware of. The phone companies of the world have built themselves nice profitable mobile phone cartels in nearly every country with a functioning government, based on convincing governments that without monopoly rights to airwaves and ideas, the world would fall apart. Technically, we call this "regulatory capture" and "patents", but in fact it's just a form of blackmail and corruption. If you, the state, give me, a business, the right to overcharge and tax the market, and ban all real competitors, I'll give you 5%. Not enough? How about 10%? OK, 15% plus snacks. If you refuse, we pull service.
But WiFi snuck past this, borrowing unlicensed airspace and riding on the back of the open and unpatented and remarkably innovative Internet protocol stack. So today we have the curious situation where it costs me several Euro a minute to call from Seoul to Brussels if I use the state-backed infrastructure that we've subsidized over decades, but nothing at all if I can find an unregulated WiFi access point. Oh, and I can do video, and send files, and photos, and download entire home movies all for the same amazing price point of precisely zero point zero zero (in any currency you like). God help me if I try to send just one photo home using the service I actually pay for. That would cost me more than the camera I took it on.
It is the price we pay for having tolerated the "trust us, we're the experts" patent system for so long. But more than that, it's a massive economic incentive to chunks of the technology sector -- and especially chipset makers who own patents on the anti-Internet GSM, GPRS, 3G, and LTE stacks, and who treat the telcos as prime clients -- to actively throttle WiFi development. And of course it's these firms that bulk out the IEEE committees that define WiFi.
The reason for this rant against lawyer-driven "innovation" is to steer your thinking into "what if WiFi was really free?" Because this will happen one day, not too far off, and it's worth betting on. We'll see several things happen. First, much more aggressive use of airspace especially for near-distance communications where there is no risk of interference. Secondly, big capacity improvements as we learn to use more airspace in parallel. Thirdly, acceleration of the standardization process. Lastly, broader support in devices for really interesting connectivity.
Right now streaming a movie from your phone to your TV is considered "leading edge". This is ridiculous. Let's get truly ambitious. How about a stadium of people watching a game, sharing photos and HD video with each other in real time, creating an ad-hoc event that literally saturates the airspace with a digital frenzy. I should be able to collect terabytes of imagery from those around me, in an hour. Why does this have to go through Twitter or Facebook and that tiny expensive mobile data connection? How about a home with hundreds of devices all talking to each other over mesh, so when someone rings the doorbell, the porch lights stream video through to your phone or TV? How about a car that can talk to your phone and play your dubstep playlist //without you plugging in wires//.
Why, in 2012, and to get more serious, is our digital society in the hands of central points that are monitored, censored, logged, used to track who we talk to, collect evidence against us, and then shut down when the authorities decide we have too much free speech? The loss of privacy we're living through is only a problem when it's one-sided, but then the problem is calamitous. A truly wireless world would bypass all central censorship. It's how the Internet was designed, and it's quite feasible. Technically.
++++ Some Physics
Naive developers of distributed software treat the network as infinitely fast and perfectly reliable. While this is approximately true for simple applications over Ethernet, WiFi rapidly proves the difference between magical thinking and science. That is, WiFi breaks so easily and dramatically under stress that I sometimes wonder how anyone would dare use it for real work. The ceiling moves up, as WiFi gets better, but never fast enough to stop us hitting it.
To understand how WiFi performs technically you need to understand a basic law of physics: the power required to connect two points increases according to the square of the distance. People who grow up in larger houses have exponentially louder voices, as I learned in Dallas. For a WiFi network this means that as two radios get further apart, they have to either use more power, or lower their signal rate.
There's only so much power you can pull out of a battery before users treat the device as hopelessly broken. So though a WiFi network may be rated at some speed, the real bit rate between the access point (AP) and a client depends on how far apart the two are. As you move your WiFi-enabled phone away from the AP, the two radios trying to talk to each other will first increase their power but then reduce their bit rate.
This effect has some consequences we need to be aware of if we want to build robust distributed applications that don't dangle wires behind them like puppets:
* If you have a group of devices talking to an AP, when the AP is talking to the slowest device, the //whole network has to wait//. It's like having to repeat a joke at a party to the designated driver, who has no sense of humor, is still fully and tragically sober, and has in any case a poor grasp of the language.
* If you use unicast TCP and send a message to multiple devices, the AP must send the packets to each device separately, Yes, you knew this, it's also how Ethernet works. But now understand that one distant (or low-powered) device means everything waits for that slowest device to catch up.
* If you use multicast or broadcast (which work the same, in most cases), the AP will send single packets to the whole network at once, which is awesome, but it will do it at the slowest possible bit rate (usually 1Mbps). You can adjust this rate manually in some APs. That just reduces the reach of your AP. You can also buy more expensive APs that have a little more intelligence and will figure out the highest bit rate they can safely use. You can also use enterprise APs with IGMP (Internet Group Management Protocol) support and 0MQ's PGM transport to send only to subscribed clients. I'd not however bet on such APs being widely available, ever.
As you try to put more devices onto an AP, performance rapidly gets worse to the point where adding one more device can break the whole network, for everyone. Many APs solve this by randomly disconnecting clients when they reach some limit, four to eight devices for a mobile hotspot, 30-50 devices for a consumer AP, perhaps 100 devices for an enterprise AP.
++++ What's the Current Status?
Despite its uncomfortable role as enterprise technology that somehow escaped into the wild, WiFi is already useful for more than getting a free Skype call. It's not ideal but it works well enough to let us solve some interesting problems. Let me give you a rapid status report.
First, point-to-point versus access point-to-client. Traditional WiFi is all AP-client. Every packet has to go from client A to AP, thence to client B. You cut your bandwidth by 50% but that's only half the problem. I explained about the inverse power law. If A and B are very close together but both far from the AP, they'll both be using a low bit rate. Imagine your AP is in the garage, and you're in the living room trying to stream video from your phone to your TV. Good luck!
There is an old "ad-hoc" mode that lets A and B talk to each other but it's way too slow for anything fun, and of course, it's disabled on all mobile chipsets. Actually, it's disabled in the top-secret drivers that the chipset makers kindly provide to hardware makers. There is a new "Tunneled Direct Link Setup" (TDLS) protocol that lets two devices create a direct link, using an AP for discovery but not for traffic. And there's a "5G" WiFi standard (it's a marketing term, so goes in quotes) that boosts link speeds to a gigabit. TDLS and 5G together make HD movie streaming from your phone to your TV a plausible reality. I assume TDLS will be restricted in various ways so as to placate the telcos.
Lastly, we saw standardization of the 802.11s mesh protocol in 2012, after a remarkably speedy ten years or so of work. Mesh removes the access point completely, at least in the imaginary future where it exists and is widely used. Devices talk to each other directly, and maintain little routing tables of neighbors that let them forward packets. Imagine the AP software embedded into every device but smart enough (it's not as impressive as it sounds) to do multiple hops.
No-one who is making money from the mobile data extortion racket wants to see 802.11s available, because city-wide mesh is such a nightmare for the bottom line, so it's happening as slowly as possible. The only large organization with the power (and, I assume the surface-to-surface missiles) to get mesh technology into wide use is the US Army. But mesh will emerge and I'd bet on 802.11s being widely available in consumer electronics by 2020 or so.
Second, if we don't have point-to-point, how far can we trust APs today? Well, if you go to a Starbucks in the USA and try the 0MQ Hello World example using two laptops connected via the free WiFi, you'll find they cannot connect. Why? Well, the answer is in the name: "attwifi". AT&T is a good old incumbent telco that hates WiFi and presumably provides the service cheaply to Starbucks and others so that independents can't get into the market. But any access point you buy will support client-to-AP-to-client access, and outside the USA I've never found a public AP locked-down the AT&T way.
Third, performance. The AP is clearly a bottleneck; you cannot get better than half of its advertised speed even if you put A and B literally beside the AP. Worse, if there are other APs in the same airspace, they'll shout each other out. In my home, WiFi barely works at all because the neighbors two houses down have an AP which they've amplified. Even on a different channel it interferes with our home WiFi. In the cafe where I'm sitting now there are over a dozen networks. Realistically, as long as we're dependent on AP-based WiFi, we're subject to random interference and unpredictable performance.
Fourth, battery life. There's no inherent reason that WiFi, when idle, is hungrier than Bluetooth, for example. They use the same radios and low-level framing. The main difference is tuning and in the protocols. For wireless power-saving to work well, devices have to mostly sleep, and beacon out to other devices only once every so often. For this to work, they need to synchronize their clocks. This happens properly for the mobile phone part, which is why my old flip phone can run five days on a charge. When WiFi is working, it will use more power. Current power amplifier technology is also inefficient, meaning you draw a lot more energy from your battery than you pump into the air (the waste turns into a hot phone). Power amplifiers are improving as people focus more on mobile WiFi.
Lastly, mobile access points. If we can't trust centralized APs, and if our devices are smart enough to run full operating systems, can't we make them work as APs? I'm //so glad// you asked that question. Yes, we can, and it works quite nicely. Especially since you can switch this on and off in software, on a modern OS like Android. Again, the villains of the peace are the US telcos, who mostly detest this feature and kill it or cripple it on the phones they control. Smarter telcos realize that it's a way to amplify their "last mile" and bring higher-value products to more users, but crooks don't compete on smarts.
++++ Conclusions
WiFi is not Ethernet and although I believe future 0MQ applications will have a very important decentralized wireless presence, it's not going to be an easy road. Much of the basic reliability and capacity that you expect from Ethernet is missing. When you run a distributed application over WiFi you have to allow for frequent timeouts, random latencies, arbitrary disconnections, whole interfaces going down and coming up, and so on.
The technological evolution of wireless networking is best described as "slow and joyless". Applications and frameworks that try to exploit decentralized wireless are mostly absent or poor. The only existing open source framework for proximity networking is [https://www.alljoyn.org AllJoyn], from Qualcomm. But with 0MQ we proved that the inertia and decrepit incompetence of existing players was no reason for us to sit still. When we accurately understand problems, we can solve them. What we imagine, we can make real.
+++ Discovery
One of the great things about short-range wireless is the proximity. WiFi maps closely to the physical space, which maps closely to how we naturally organize. In fact the Internet is quite abstract and this confuses a lot of people who kind of "get" it but in fact don't really. With WiFi we have technical connectivity that is potentially super-tangible. You see what you get and you get what you see. Tangible means easy to understand and that should mean love from users instead of the typical frustration and quiet hate.
Proximity is the key. We have a bunch of WiFi radios in room, happily beaconing to each other. For lots of applications it makes sense that they can find each other and start chatting, without any user input. After all, most real world data isn't private, it's just highly localized.
As a first step towards 0MQ-based proximity networking, let's look at how to do discovery. There exist libraries that do this. I don't like them. They seem too complex and too specific and somehow to date from a prehistoric era before people realized that distributed computing could be //fundamentally simple//.
++++ Preemptive Discovery over Raw Sockets
I'm in a hotel room in Gangnam, Seoul, with a 4G wireless hotspot, a Linux laptop, and an couple of Android phones. The phones and laptop are talking to the hotspot. The {{ifconfig}} command says my IP address is 192.168.1.2. Let me try some {{ping}} commands. DHCP servers tend to dish out addresses in sequence, so my phones are probably close by, numerically speaking:
[[code]]
$ ping 192.168.1.1
PING 192.168.1.1 (192.168.1.1) 56(84) bytes of data.
64 bytes from 192.168.1.1: icmp_req=1 ttl=64 time=376 ms
64 bytes from 192.168.1.1: icmp_req=2 ttl=64 time=358 ms
64 bytes from 192.168.1.1: icmp_req=4 ttl=64 time=167 ms
^C
--- 192.168.1.1 ping statistics ---
3 packets transmitted, 2 received, 33% packet loss, time 2001ms
rtt min/avg/max/mdev = 358.077/367.522/376.967/9.445 ms
[[/code]]
Found one! 150-300 msec round-trip latency... that's a surprisingly high figure, something to keep in mind for later. Now I ping myself, just to try to double check things:
[[code]]
$ ping 192.168.1.2
PING 192.168.1.2 (192.168.1.2) 56(84) bytes of data.
64 bytes from 192.168.1.2: icmp_req=1 ttl=64 time=0.054 ms
64 bytes from 192.168.1.2: icmp_req=2 ttl=64 time=0.055 ms
64 bytes from 192.168.1.2: icmp_req=3 ttl=64 time=0.061 ms
^C
--- 192.168.1.2 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 1998ms
rtt min/avg/max/mdev = 0.054/0.056/0.061/0.009 ms
[[/code]]
The response time is a bit faster now, which is what we'd expect. Let's try the next couple of addresses:
[[code]]
$ ping 192.168.1.3
PING 192.168.1.3 (192.168.1.3) 56(84) bytes of data.
64 bytes from 192.168.1.3: icmp_req=1 ttl=64 time=291 ms
64 bytes from 192.168.1.3: icmp_req=2 ttl=64 time=271 ms
64 bytes from 192.168.1.3: icmp_req=3 ttl=64 time=132 ms
^C
--- 192.168.1.3 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2001ms
rtt min/avg/max/mdev = 132.781/231.914/291.851/70.609 ms
[[/code]]
That's the second phone, with the same kind of latency as the first one. Let's continue, see if there are any other devices connected to the hotspot:
[[code]]
$ ping 192.168.1.4
PING 192.168.1.4 (192.168.1.4) 56(84) bytes of data.
^C
--- 192.168.1.4 ping statistics ---
3 packets transmitted, 0 received, 100% packet loss, time 2016ms
[[/code]]
And that is it. Now, {{ping}} uses raw IP sockets to send ICMP_ECHO messages. The useful thing about ICMP_ECHO is that it gets a response from any IP stack that has not been deliberately had echo switched off. That's still a common practice on corporate websites who fear the old "ping of death" exploit where malformed messages could crash the machine.
I call this //pre-emptive discovery// since it doesn't take any cooperation from the device. We don't rely on any cooperation from the phones to see them sitting there; as long as they're not actively ignoring us, we can see them.
You might ask why this is useful. We don't know that the peers responding to ICMP_ECHO run 0MQ, that they are interested in talking to us, that they have any services we can use, or even what kind of device they are. However, knowing that there's //something// on address 192.168.1.3 is already useful. We also know how far away the device is, relatively, we know how many devices are on the network, and we know the rough state of the network (as in, good, poor, or terrible).
It isn't even hard to create ICMP_ECHO messages and send them. A few dozen lines of code, and we could use 0MQ multithreading to do this in parallel for addresses stretching out above and below our own IP address. Could be kind of fun.
However, sadly, there's a fatal flaw in my idea of using ICMP_ECHO to discover devices. To open a raw IP socket requires root privileges on a POSIX box. It stops rogue programs getting data meant for others. We can get the power to open raw sockets on Linux by giving sudo privileges to our command (ping has the so-called 'sticky bit' set). On a mobile OS like Android, it requires root access, i.e. rooting the phone or tablet. That's out of the question for most people and so ICMP_ECHO is out of reach for most devices.
//Expletive deleted!// Let's try something in user space. The next step most people take is UDP multicast or broadcast. Let's follow that trail.
++++ Cooperative Discovery using UDP Broadcasts
Multicast tends to be seen as more modern and "better" than broadcast. In IPv6, broadcast doesn't work at all: you have to always use broadcast. Nonetheless, all IPv4 local network discovery protocols end up using UDP broadcast anyhow. The reasons: broadcast and multicast end up working much the same, except broadcast is simpler and less risky. Multicast is seen by network admins as kind of dangerous, as it can leak over network segments.
If you never used UDP, you'll discover it's quite a nice protocol. In some ways it reminds us of 0MQ, sending whole messages to peers using a two different patterns: one-to-one, and one-to-many. The main problems with UDP are that (a) the POSIX socket API was designed for universal flexibility not simplicity, (b) UDP messages are limited for practical purposes to about 512 bytes, and (c) when you start to use UDP for real data, you find that a lot of messages get dropped, especially as infrastructure tends to favor TCP over UDP.
Here is a minimal ping program that uses UDP instead of ICMP_ECHO:
[[code type="example" title="UDP discovery, model 1" name="udpping1"]]
[[/code]]
This code uses a single socket to broadcast 1-byte messages and receive anything that other nodes are broadcasting. When I run it, it shows just one node, which is itself:
[[code]]
Pinging peers...
Found peer 192.168.1.2:9999
Pinging peers...
Found peer 192.168.1.2:9999
[[/code]]
If I switch off all networking and try again, sending a message fails, as I'd expect:
[[code]]
Pinging peers...
sendto: Network is unreachable
[[/code]]
Working on the basis of //solve the problems currently aiming at your throat//, let's fix the most urgent issues in this first model. These issues are:
* Using the 255.255.255.255 broadcast address is a bit dubious. On the one hand, this broadcast address means precisely "send to all nodes on the local network, and don't forward". However, if you have several interfaces (wired Ethernet, WiFi) then broadcasts will go out on your default route only, and via just one interface. What we want to do is either send our broadcast on each interface's broadcast address, or find the WiFi interface and its broadcast address.
* Like many aspects of socket programming, getting information on network interfaces is not portable. Do we want to write non-portable code in our applications? No, this is better hidden in a library.
* There's no handling for errors except "abort", which is too brutal for transient problems like "your WiFi is switched off". The code should distinguish between soft errors (ignore and retry) and hard errors (assert).
* The code needs to know its own IP address and ignore beacons that it sent out. Like finding the broadcast address, this requires inspecting the available interfaces.
The simplest answer to these issues is to push the UDP code into a separate library that provided a clean API, like this:
[[code type="fragment" name="zyre-udp"]]
// Constructor
static udp_t *
udp_new (int port_nbr);
// Destructor
static void
udp_destroy (udp_t **self_p);
// Returns UDP socket handle
static int
udp_handle (udp_t *self);
// Send message using UDP broadcast
static void
udp_send (udp_t *self, byte *buffer, size_t length);
// Receive message from UDP broadcast
static ssize_t
udp_recv (udp_t *self, byte *buffer, size_t length);
[[/code]]
Here is the refactored UDP ping program that calls this library, which is much cleaner and nicer:
[[code type="example" title="UDP discovery, model 2" name="udpping2"]]
[[/code]]
The library, udplib, hides a lot of the unpleasant code (which will become uglier we make this work on more systems). I'm not going to print that code here. You can read it [https://github.com/imatix/zguide/blob/master/examples/C/udplib.c in the repository].
Now, there are more problems sizing us up and wondering if they can make lunch out of us. First, IPv4 versus IPv6 and multicast vs. broadcast. In IPv6, broadcast doesn't exist at all; one uses multicast. From my experience with WiFi, IPv4 multicast and broadcast work identically except that multicast breaks in some situations where broadcast works fine. Some access points do not forward multicast packets. When you have a device (e.g. a tablet) that acts as a mobile AP, then it's possible it won't get multicast packets. Meaning, it won't see other peers on the network.
The simplest plausible solution is simply to ignore IPv6 for now, and use broadcast. A perhaps smarter solution would be to use multicast, and deal with asymmetric beacons if they happen.
We'll stick with stupid and simple for now. There's always time to make it more complex.
++++ Multiple Nodes on One Device
So we can discover nodes on the WiFi network, as long as they're sending out beacons as we expect. So I try to test with two processes. But when I run udpping2 twice, the second instance complains "'Address already in use' on bind" and exits. Oh, right. UDP and TCP both return an error if you try to bind two different sockets to the same port. This is right. The semantics of two readers on one socket would be weird to say the least. Odd/even bytes? You get all the 1s, I get all the 0's?
However, a quick check of stackoverflow and some memory of a socket option called SO_REUSEADDR turns up gold. If I use that, I can bind several processes to the same UDP port, and they will all receive any message arriving on that port. It's almost as if the guys who designed this were reading my mind! (That's way more plausible than maybe I'm reinventing the wheel.)
A quick test shows that SO_REUSADDR works as promised. This is great because the next thing I want to do is design an API and then start dozens of nodes to see them discovering each other. It would be really cumbersome to have to test each node on a separate device. And when we get to testing how real traffic behaves on a large, flaky network, the two alternatives are simulation or temporary insanity.
And I speak from experience: we were, this summer, testing on dozens of devices at once. It takes about an hour to set-up a full test run, and you need a space shielded from WiFi interference if you want any kind of reproducibility (unless your test case is "prove that interference kills WiFi networks faster than Orval can kill a thirst".
If I was a wizz Android developer with a free weekend I'd immediately (as in, it would take me two days) port this code to my phone and get it sending beacons to my PC. But sometimes lazy is more profitable. I //like// my Linux laptop. I like being able to start a dozen threads from one process, and have each thread acting like an independent node. I like not having to work in a real Faraday cage when I can simulate one on my laptop.
++++ Designing the API
I'm going to run N nodes on a device, and they are going to have to discover each other, and also discover a bunch of other nodes out there on the local network. I can use UDP for local discovery as well as remote discovery. It's arguably not as efficient as using, e.g., the 0MQ inproc:// transport, but has the great advantage that the exact same code will work in simulation and in real deployment.
If I have multiple nodes on one device, we clearly can't use the IP address and port number as node address. I need some logical node identifier. Arguably, the node identifier only has to be unique within the context of the device. My mind fills with complex stuff I could make, like supernodes that sit on real UDP ports and forward messages to internal nodes. I hit my head on the table until the idea of //inventing new concepts// leaves it.
Experience tells us that WiFi does things like disappear and reappear while applications are running. Users click on things. Which does interesting things like change the IP address halfway through a session. We cannot depend on IP addresses, nor on established connections (in the TCP fashion). We need some long-lasting addressing mechanism that survives interfaces and connections being torn down, and then recreated.
Here's the simplest solution I can see: we give every node a UUID, and specify that nodes, represented by their UUIDs, can appear or reappear at certain IP address:port endpoints, and then disappear again. We'll deal with recovery from lost messages later. A UUID is 16 bytes. So if I have 100 nodes on a WiFi network that's (double it for other random stuff) 3,200 bytes a second of beacon data that the air has to carry just for discovery and presence. Seems acceptable.
Back to concepts. We do need some names for our API. At the least we need a way to distinguish between the node object that is "us", and node objects that are our peers. We'll be doing things like creating an "us" and then asking it how many peers it knows about and who they are. The term "peer" is clear enough.
From the developer point of view, a node (the application) needs a way to talk to the outside world. Let's borrow a term from networking and call this an "interface". The interface represents us to the rest of the world and presents the rest of the world to us, as a set of other peers. It automatically does whatever discovery it has to. When we want to talk to a peer, we get the interface to do that for us. And when a peer talks to us, it's the interface that delivers us the message.
This seems like a clean API design. How about the internals?
* The interface has to be multithreaded, so that one thread can do I/O in the background, while the foreground API talks to the application. We used this design in the Clone and Freelance client APIs.
* The interface background thread does the discovery business; bind to the UDP port, send out UDP beacons, and receive beacons.
* We need to at least send UUIDs in the beacon message so that we can distinguish our own beacons from those of our peers.
* We need to track peers that appear, and that disappear. For this I'll use a hash table that stores all known peers, and expire peers after some timeout.
* We need a way to report peers and events to the caller. Here we get into a juicy question. How does a background I/O thread tell a foreground API thread that stuff is happening? Callbacks maybe? //Heck no.// We'll use 0MQ messages, of course.
The third iteration of the UDP ping program is even simpler and more beautiful than the second. The main body, in C, is just ten lines of code.
[[code type="example" title="UDP discovery, model 3" name="udpping3"]]
[[/code]]
The interface code should be familiar if you've studied how we make multithreaded API classes:
[[code type="example" title="UDP ping interface" name="interface"]]
[[/code]]
When I run this in two windows, it reports one peer joining the network. I kill that peer and a few seconds later it tells me the peer left:
[[code]]
--------------------------------------
[006] JOINED
[032] 418E98D4B7184844B7D5E0EE5691084C
--------------------------------------
[004] LEFT
[032] 418E98D4B7184844B7D5E0EE5691084C
[[/code]]
What's nice about a 0MQ-message based API is that I can wrap this any way I like. For instance, turning it into callbacks if I really want those. I can also trace all activity on the API very easily.
Some notes about tuning. On Ethernet, five seconds (the expiry time I used in this code) seems like a lot. On a badly stressed WiFi network you can get ping latencies of 30 seconds or more. If you use a too-aggressive value for the expiry, you'll disconnect nodes that are still there. On the other side, end user applications expect a certain liveliness. If it takes 30 seconds to report that a node has gone, users will get annoyed.
A decent strategy is to detect and report disappeared nodes rapidly, but only delete them after a longer interval. Visually, a node would be green when it's alive, then gray for a while as it went out of reach, then finally disappear. We're not doing this now, but will do it in the real implementation of the as-yet-unnamed framework we're making.
As we will also see later, we have to treat any input from a node, not just UDP beacons, as a sign of life. UDP may get squashed when there's a lot of TCP traffic. This is perhaps the main reason we're not using an existing UDP discovery library: we have to integrate this tightly with our 0MQ messaging for it to work.
++++ More about UDP
So we have discovery and presence working over UDP IPv4 broadcasts. It's not ideal, but it works for the local networks we have today. However we can't use UDP for real work, not without additional work to make it reliable. There's a joke about UDP but sometimes you'll get it, and sometimes you won't.
We'll stick to TCP for all one-to-one messaging. There is one more use-case for UDP after discovery, which is multicast file distribution. I'll explain why and how, then shelve that for another day. The why is simple: what we call "social networks" is just augmented culture. We create culture by sharing, and this means more and more, sharing works that we make or remix. Photos, documents, contracts, tweets. The clouds of devices we're aiming towards do more of this, not less.
Now, there are two principal patterns for sharing content. One is the "pubsub" pattern where one node sends out content to a set of other nodes, at the same time. Second is the "late joiner" pattern, where a node arrives somewhat later and wants to catch up to the conversation. We can deal with the late joiner using TCP unicast. But doing TCP unicast to a group of clients at the same time has some disadvantages. First, it can be slower than multicast. Second, it's unfair since some will get the content before others.
Before you jump off to design a UDP multicast protocol, realize that it's not a simple calculation. When you send a multicast packet, the WiFi access point uses a low bit rate, to ensure that even the furthest devices will get it safely. Most normal APs don't do the obvious optimization, which is to measure the distance of the furthest device and use that bit rate. Instead they just use a fixed value. So if you have a few devices, close to the AP, multicast will be insanely slow. But if you have a roomful of devices which all want to get the next chapter of the textbook, multicast can be insanely effective.
The curves cross around 6-12 devices depending on the network. You could in theory measure the curves in real-time and create an adaptive protocol. That would be cool but probably too hard for even the smartest of us.
If you do sit down and sketch out a UDP multicast protocol, realize that you need a channel for recovery, to get lost packets. You'd probably want to do this over TCP, using 0MQ. For now, however, we'll forget about multicast UDP, and assume all traffic goes over TCP.
+++ Spinning off a Library Project
At this stage however, the code is growing larger than an example should be, so it's time to create a proper GitHub project. It's a rule: build your projects in public view, and tell people about them as you go, so your marketing and community building starts on day 1. I'll walk through what this involves. I explained in [#the-community] about growing communities around projects. We need a few things:
* A name.
* A slogan.
* A public github repository.
* A README that links to the C4 process.
* License files.
* An issue tracker.
* Two maintainers.
* A first bootstrap version.
The name and slogan first. The trademarks of the 21st century are domain names. So the first thing I do when spinning off a project is to look for a domain name that might work. Quite randomly, one of our old messaging products was called "Zyre" and I have the domain names for it. So, the "ZeroMQ Realtime Experience" project, which is a terribly forced construction but more or less accurate. We are aiming to create a framework for real time experiences (sharing games, photos, stories) over 0MQ.
I'm somewhat shy about pushing new projects into the 0MQ community too aggressively, and normally would start a project in either my personal account or the iMatix organization. But we've learned that moving projects after they become popular is counter-productive. My predictions of a future filled with moving pieces are either valid, or wrong. If this chapter is valid, we might as well launch this as a 0MQ project from the start. If it's wrong, we can delete the repository later or let it sink to the bottom of a long list of forgotten starts.
Start with the basics. The protocol (UDP and 0MQ/TCP) will be ZRE and the project will be ZyRE, with different capitalization to reduce confusion with the old project. I need a second maintainer, so invite my friend Dong Min (the Korean hacker behind JeroMQ, a pure-Java 0MQ stack) to join. He's been working on very similar ideas so is enthusiastic. We discuss this and we get the idea of building ZyRE on top of JeroMQ as well as on top of CZMQ and libzmq. This would make it a lot easier to run ZyRE on Android. It would also give us two fully separate implementations from the start, always a good thing for a protocol.
So we take the FileMQ project I built in [#the-human-scale] as a template for a new GitHub project. The GNU autoconf tools are quite decent but have a painful syntax. It's easiest to copy existing project files, and modify them. The FileMQ project builds a library, has test tools, license files, man pages, and so on. It's not too large so it's a good starting point.
I put together a README to summarize the goals of the project and point to C4. The issue tracker is enabled by default on new GitHub projects, so once we've pushed the UDP ping code as a first version, we're ready to go. However it's always good to recruit more maintainers, so I create an issue "Call for maintainers" that says:
> If you'd like to help click that lovely green "Merge Pull Request" button, and get eternal karma, add a comment confirming that you've read and understand the C4 process at http://rfc.zeromq.org/spec:16.
Finally, I change the issue tracker labels. GitHub by default offers the usual variety of issue types but with C4 we don't use them. Instead we need just two labels ("Urgent", in red, and "Ready", in black).
+++ Point-to-point Messaging
I'm going to take the last UDP ping program and build a point-to-point messaging layer on top of that. Our goal is that we can detect peers as they join and leave the network, that we can send messages to them, and that we can get replies. It is a non-trivial problem to solve and takes Min and me two days to get a "hello world" version working.
We had to solve a number of issues:
* What information to send in the UDP beacon, and how to format it.
* What 0MQ socket types to use to interconnect nodes.
* What 0MQ messages to send, and how to format them.
* How to send a message to a specific node.
* How to know the sender of any message so we could send a reply.
* How to recover from lost UDP beacons.
* How to avoid overloading the network with beacons.
I'll explain these in enough detail that you understand why we made each choice we did, with some code fragments to illustrate. We tagged this code as [https://github.com/zeromq/zyre/zipball/v0.1.0 version 0.1.0] so you can look at the code: most of the hard work is done in zre_interface.c.
++++ UDP Beacon Framing
Sending UUIDs across the network is the bare minimum for a logical addressing scheme. However we have a few more aspects to get working before this will work in real use:
* We need some protocol identification so that we can check for, and reject invalid packets.
* We need some version information so that we can change this protocol over time.
* We need to tell other nodes how to reach us via TCP, i.e. a 0MQ port they can talk to us on.
Let's start with the beacon message format. We probably want a fixed protocol header that will never change in future versions, and a body that depends on the version!figref().
[[code type="textdiagram" title="ZRE discovery message"]]
+---+---+---+------+ +------+------+
| Z | R | E | %x01 | | UUID | port |
+---+---+---+------+ +------+------+
Header Body
[[/code]]
The version can be a 1-byte counter starting at 1. The UUID is 16 bytes, and the port is a 2-byte port number, since UDP nicely tells us the sender's IP address for every message we receive. This gives us a 22-byte frame.
The C language (and a few others like Erlang) make it simple to read and write binary structures. We define the beacon frame structure:
[[code type="fragment" name="zyre-beacon"]]
#define BEACON_PROTOCOL "ZRE"
#define BEACON_VERSION 0x01
typedef struct {
byte protocol [3];
byte version;
uuid_t uuid;
uint16_t port;
} beacon_t;
[[/code]]
Which makes sending and receiving beacons quite simple. Here is how we send a beacon, using the zre_udp class to do the non-portable network calls:
[[code type="fragment" name="zyre-beacon-send"]]
// Beacon object
beacon_t beacon;
// Format beacon fields
beacon.protocol [0] = 'Z';
beacon.protocol [1] = 'R';
beacon.protocol [2] = 'E';
beacon.version = BEACON_VERSION;
memcpy (beacon.uuid, self->uuid, sizeof (uuid_t));
beacon.port = htons (self->port);
// Broadcast the beacon to anyone who is listening
zre_udp_send (self->udp, (byte *) &beacon, sizeof (beacon_t));
[[/code]]
When we receive a beacon we need to guard against bogus data. We're not going to be paranoid against, for example, denial-of-service attacks. We just want to make sure we're not going to crash when a bad ZRE implementation sends us erroneous frames.
To validate a frame we check its size and header. If those are OK, we assume the body is usable. When we get a UUID that isn't ourselves (recall, we'll get our own UDP broadcasts back), we can treat this as a peer:
[[code type="fragment" name="zyre-beacon-recv"]]
// Get beacon frame from network
beacon_t beacon;
ssize_t size = zre_udp_recv (self->udp, (byte *) &beacon, sizeof (beacon_t));
// Basic validation on the frame
if (size != sizeof (beacon_t)
|| beacon.protocol [0] != 'Z'
|| beacon.protocol [1] != 'R'
|| beacon.protocol [2] != 'E'
|| beacon.version != BEACON_VERSION)
return 0; // Ignore invalid beacons
// If we got a UUID and it's not our own beacon, we have a peer
if (memcmp (beacon.uuid, self->uuid, sizeof (uuid_t))) {
char *identity = s_uuid_str (beacon.uuid);
s_require_peer (self, identity,
zre_udp_from (self->udp), ntohs (beacon.port));
free (identity);
}
[[/code]]
++++ True Peer Connectivity (Harmony Pattern)
Since 0MQ is designed to make distributed messaging easy, people often ask how to interconnect a set of true peers (as compared to obvious clients and servers). It is a thorny question and 0MQ doesn't really provide a single clear answer.
TCP, which is the most commonly-used transport in 0MQ, is not symmetric; one side must bind and one must connect and though 0MQ tries to be neutral about this, it's not. When you connect, you create an outgoing message pipe. When you bind, you do not. When there is no pipe, you cannot write messages (0MQ will return EAGAIN).
Developers who study 0MQ and then try to create N-to-N connections between sets of equal peers often try a ROUTER-to-ROUTER flow. It's obvious why: each peer needs to address a set of peers, which requires ROUTER. It usually ends with a plaintive email to the list.
My conclusion after trying several times from different angles is that ROUTER-to-ROUTER does not work. And the 0MQ reference manual does not allow it when it discusses ROUTER sockets in {{zmq_socket[3]}}. At a minimum, one peer must bind and one must connect, meaning the architecture is not symmetrical. But also because you simply can't tell when you are allowed to safely send a message to a peer. It's Catch-22: you can talk to a peer after it's talked to you. But the peer can't talk to you until you've talked to it. One side or the other will be losing messages and thus has to retry, which means the peers cannot be equal.
I'm going to explain the Harmony pattern, which solves this problem, and which we use in ZyRE.
We want a guarantee that when a peer "appears" on our network, we can talk to it safely, without 0MQ dropping messages. For this, we have to use a DEALER or PUSH socket which //connects out to the peer// so that even if that connection takes some non-zero time, there is immediately a pipe, and 0MQ will accept outgoing messages.
A DEALER socket cannot address multiple peers individually. But if we have one DEALER per peer, and we connect that DEALER to the peer, we can safely send messages to a peer as soon as we've connected to it.
Now, the next problem is to know who sent us a particular message. We need a reply address, that is the UUID of the node who sent any given message. DEALER can't do this unless we prefix every single message with that 16-byte UUID, which would be wasteful. ROUTER does, if we set the identity properly before connecting to the router.
And so the Harmony pattern comes down to:
* One ROUTER socket that we bind to a transient port, which we broadcast in our beacons.
* One DEALER socket //per peer// that we connect to the peer's ROUTER socket.
* Reading from our ROUTER socket.
* Writing to the peer's DEALER socket.
Next problem is that discovery isn't neatly synchronized. We can get the first beacon from a peer //after// we start to receive messages from it. A message comes in on the ROUTER socket and has a nice UUID attached to it. But no physical IP address and port. We have to force discovery over TCP. To do this, our first command to any new peer we connect to is an OHAI command with our IP address and port. This ensure that the receiver connects back to us before trying to send us any command.
Breaking this down into steps:
* If we receive a UDP beacon we connect to the peer.
* We read messages from our ROUTER socket, and each message comes with the UUID of the sender.
* If it's an OHAI message we connect back to that peer if not already connected to it.
* If it's any other message, we //must// already be connected to the peer (a good place for an assertion).
* We send messages to each peer using a dedicated per-peer DEALER socket, which //must// be connected.
* When we connect to a peer we also tell our application that the peer exists.
* Every time we get a message from a peer, we treat that as a heartbeat (it's alive).
If we were not using UDP but some other discovery mechanism, I'd still use the Harmony pattern for a true peer network: one ROUTER for input from all peers, and one DEALER per peer for output. Bind the ROUTER, connect the DEALER, and start each conversation with an OHAI equivalent that provides the return IP address and port. You would need some external mechanism to bootstrap each connection.
++++ Detecting Disappearances
Heartbeating sounds simple but it's not. UDP packets get dropped when there's a lot of TCP traffic, so if we depend on UDP beacons we'll get false disconnections. TCP traffic can be delayed for five, ten, 30 seconds if the network is really busy. So if we kill peers when they go quiet, we'll have false disconnections.
Since UDP beacons aren't reliable, it's tempting to add in TCP beacons. After all, TCP will deliver them reliably. One little problem. Imagine you have 100 nodes on a network, and each node sends a TCP beacon once a second. Each beacon is 22 bytes not counting TCP's framing overhead. That is 100 * 99 * 22 bytes per second, or 217,000 bytes/second just for heartbeating. That's about 1-2% of a typical WiFi network's ideal capacity, which sounds OK. But when a network is stressed, or fighting other networks for airspace, that extra 200K a second will break what's left. UDP broadcasts are at least low cost.
So what we do is switch to TCP heartbeats only when a specific peer hasn't sent us any UDP beacons in a while. And then, we send TCP heartbeats only to that one peer. If the peer continues to be silent, we conclude it's gone away. If the peer comes back, with a different IP address and/or port, we have to disconnect our DEALER socket and reconnect to the new port.
This gives us a set of states for each peer, though at this stage the code doesn't use a formal state machine:
* Peer visible thanks to UDP beacon (we connect using IP address and port from beacon)
* Peer visible thanks to OHAI command (we connect using IP address and port from command)
* Peer seems alive (we got a UDP beacon or command over TCP recently)
* Peer seems quiet (no activity in some time, so we send a HUGZ command)
* Peer has disappeared (no reply to our HUGZ commands, so we destroy peer)
There's one remaining scenario we didn't address in the code at this stage. It's possible for a peer to change IP addresses and ports without actually triggering a disappearance event. For example if the user switches off WiFi and then switches it back on, then the the access point can assign the peer a new IP address. We'll need to handle a disappeared WiFi interface on our node by unbinding the ROUTER socket and rebinding it when we can. Since this is not central to the design now, I decide to log an issue on the GitHub tracker and leave it for a rainy day.
+++ Group Messaging
Group messaging is a common and very useful pattern. The concept is simple: instead of talking to a single node, you talk to a "group" of nodes. The group is just a name, a string that you agree on in the application. It's precisely like using the publish-subscribe prefixes in PUB and SUB sockets. In fact the only reason I say "group messaging" and not "pub-sub" is to prevent confusion, since we're not going to use PUB/SUB sockets for this.
PUB/SUB would almost work. But we've just done such a lot of work to solve the late joiner problem. Applications are inevitably going to wait for peers to arrive before sending messages to groups, so we have to build on the Harmony pattern rather than start again beside it.
Let's look at the operations we want to do on groups:
* We want to join and leave groups.
* We want to know what other nodes are in any given group.
* We want to send a message to (all nodes in) a group.
Which look familiar to anyone who's used Internet Relay Chat, except we have no server. Every node will need to keep track of what each group represents. This information will not always be fully consistent across the network but it will be close enough.
Our interface will track a set of groups (each an object). These are all the known groups with one or more member node, excluding ourselves. We'll track nodes as they leave and join groups. Since nodes can join the network at any time, we have to tell new peers what groups we're in. When a peer disappears, we'll remove it from all groups we know about.
This gives us some new protocol commands:
* JOIN - we send this to all peers when we join a group.
* LEAVE - we send this to all peers when we leave a group.
Plus, we add a 'groups' field to the first command we send (renamed from OHAI to HELLO at this point because I need a larger lexicon of command verbs).
Lastly, let's add a way for peers to double-check the accuracy of their group data. The risk is that we miss one of the above messages. Though we are using Harmony to avoid the typical message loss at startup, it's worth being paranoid. For now, all we need is a way to detect such a failure. We'll deal with recovery later, if the problem actually happens.
I'll use the UDP beacon for this. What we want is a rolling counter that simply tells how many join and leave operations ("transitions") there have been for a node. It starts at 0 and increments for each group we join or leave. We can use a minimal 1-byte value since that will catch all failures except the astronomically rare "we lost precisely 256 messages in a row" failure (this is the one that hits during the first demo). We will also put the transitions counter into the JOIN, LEAVE, and HELLO commands. And to try to provoke the problem, we'll test by joining/leaving several hundred groups, with a high-water mark set to 10 or so.
Time to choose verbs for the group messaging. We need a command that means "talk to one peer" and one that means "talk to many peers". After some attempts, my best choices are "WHISPER" and "SHOUT", and this is what the code uses. The SHOUT command needs to tell the user the group name, as well as the sender peer.
Since groups are like publish-subscribe, you might be tempted to use this to broadcast the JOIN and LEAVE commands as well, perhaps by creating a "global" group that all nodes join. My advice is to keep groups purely as user-space concepts for two reasons. First, how do you join the global group if you need the global group to send out a JOIN command? Second, it creates special cases ('reserved names') which are messy.
It's simpler just to send JOINs and LEAVEs explicitly to all connected peers, period.
I'm not going to work through the implementation of group messaging in detail since it's fairly pedantic and not exciting. The data structures for group and peer management aren't optimal but they're workable. We need:
* A list of groups for our interface, which we can send to new peers in a HELLO command;
* A hash of groups for other peers, which we update with information from HELLO, JOIN, and LEAVE commands;
* A hash of peers for each group, which we update with the same three commands.
At this stage I'm starting to get pretty happy with the binary serialization (our codec generator from [#the-human-scale]), which handles lists and dictionaries as well as strings and integers.
This version is tagged in the repository as v0.2.0 and you can [https://github.com/zeromq/zyre/tags download the tarball] if you want to check what the code looked like at this stage.
+++ Testing and Simulation
++++ On Assertions
The proper use of assertions is one of the hallmarks of a professional programmer.
Our confirmation bias as creators makes it hard to test our work properly. We tend to write tests to prove the code works, rather than trying to prove it doesn't. There are many reasons for this. We pretend to ourselves and others that we can be (could be) perfect, when in fact we consistently make mistakes. Bugs in code are seen as "bad", rather than "inevitable", so psychologically we want to see fewer of them, not uncover more of them. "He writes perfect code" is a compliment rather than a euphemism for "he never takes risks so his code is as boring and heavily used as cold spaghetti".
Some cultures teach us to aspire to perfection, and punish mistakes, in education and work, which makes this attitude worse. To accept that we're fallible, and then to learn how to turn that into profit rather than shame is one of the hardest intellectual exercises, in any profession. We leverage our fallibilities by working with others, and by challenging our own work sooner, not later.
One trick that makes it easier is to use assertions. Assertions are not a form of error handling. They are executable theories of fact. The code asserts, "at this point, such and such must be true" and if the assertion fails, the code kills itself.
The faster you can prove code incorrect, the faster and more accurately you can fix it. Believing that code works and proving that it behaves as expected is less science, and more magical thinking. It's far better to be able to say, "libzmq has five hundred assertions and despite all my efforts, not one of them fails".
So the ZyRE code base is scattered with assertions, and particularly a couple on the code that deals with the state of peers. This is the hardest aspect to get right: peers need to track each other and exchange state accurately, or things stop working. The algorithms depends on asynchronous messages flying around and I'm pretty sure the initial design has flaws. They always do.
And as I test the original ZyRE code by starting and stopping instances of zre_ping by hand, every so often I get an assertion failure. Running by hand doesn't reproduce these often enough, so let's make a proper tester tool.
++++ On Up-front Testing
Being able to fully test the real behavior of individual components in the laboratory can make a 10x or 100x difference to the cost of your project. That confirmation bias engineers have to their own work makes upfront testing incredibly profitable, and late-stage testing incredibly expensive.
I'll tell you a short story about a project we worked on in the late 1990's. We provided the software, and other teams the hardware, for a factory automation project. Three or four teams brought their experts on-site, which was a remote factory (funny how the polluting factories are always in remote border country).
One of these teams, a firm specializing in industrial automation, built ticket machines: kiosks, and software to run on them. Nothing unusual: swipe a badge, choose an option, receive a ticket. They assembled two of these kiosks, on-site, each week bringing some more bits and pieces. Ticket printers, monitor screens, special keypads from Israel. The stuff had to be resistant against dust since the kiosks sat outside. Nothing worked. The screens were unreadable in the sun. The ticket printers continually jammed and misprinted. The internals of the kiosk were just sat on wooden shelving. The kiosk software crashed regularly. It was comedic except that the project really, //really// had to work and so we spent weeks and then months on-site helping the other teams debug their bits and pieces until it worked.
A year later, a second factory, and the same story. By this time the client was getting impatient. So when they came to the third and largest factory, a year later, we jumped up and said, "please let us make the kiosks and the software and everything".
So we made a detailed design for the software and hardware and found suppliers for all the pieces. It took us three months to search the Internet for each component, and another two months to get them assembled into stainless-steel bricks each weighing about twenty kilos. These bricks were 60cm square and 20cm deep, with a large flat-screen panel behind unbreakable glass, and two connectors: one for power, one for Ethernet. You loaded up the paper bin with enough for six months, then screwed the brick into a housing, and it automatically booted, found its DNS server, loaded its Linux OS and then application software. It connected to the real server, and showed the main menu. You got access to the configuration screens by swiping a special badge and then entering a code.
The software was portable so we could test that as we wrote it, and as we collected the pieces from our suppliers we kept one of each so we had a disassembled kiosk to play with. When we got our finished kiosks, they all worked immediately. We shipped them to the client, who plugged them into their housing, switched them on, and went to business. We spent a week or so on site, and in ten years, one kiosk broke (the screen died, and was replaced).
Lesson is, test up-front so that when you plug the thing in, you know precisely how it's going to behave. If you haven't tested it up-front, you're going to be spending weeks and months in the field, ironing out problems that should never have been there.
++++ The ZyRE Tester
During manual testing I did hit an assertion rarely. It then disappeared. Since I don't believe in magic, that means the code is still wrong somewhere. So, next step is heavy-duty testing of the ZyRE 0.2.0 code to try to break its assertions, and get a good idea of how it will behave in the field.
We packaged the discovery and messaging functionality as an 'interface' object that the main program creates, works with, and then destroys. We don't use any global variables. This makes it easy to start large numbers of interfaces and simulate real activity, all within one process. And if there's one thing we've learned from writing lots of examples, it's that 0MQ's ability to orchestrate multiple threads in a single process is //much// easier to work with than multiple processes.
The first version of the tester consists of a main thread which starts and stops a set of child threads, each running one interface, each with a ROUTER, DEALER, and UDP socket ('R', 'D', and 'U' in the diagram)!figref().
[[code type="textdiagram" title="ZyRE Tester Tool"]]
+----------+
| Main |
| thread |
+-----+----+
|
|
/---------------+-------+-------+---------------\
| | | |
v v v v
+-----------+ +-----------+ +-----------+ +-----------+
| Child | | Child | | Child | | Child |
| thread | | thread | | thread | | thread |
+-----+-----+ +-----+-----+ +-----+-----+ +-----+-----+
| | | |
v v v v
+-----------+ +-----------+ +-----------+ +-----------+
| Interface | | Interface | | Interface | | Interface |
+---+---+---+ +---+---+---+ +---+---+---+ +---+---+---+
| R | D | U | | R | D | U | | R | D | U | | R | D | U |
+---+---+---+ +---+---+---+ +---+---+---+ +---+---+---+
[[/code]]
The nice thing is that when I am connected to a WiFi access point, all ZyRE traffic (even between two interfaces in the same process) goes across the AP. This means I can fully stress test any WiFi infrastructure with just a couple of PCs running in a room. It's hard to emphasize how valuable this is: if we had built ZyRE as, say, a dedicated service for Android, we'd literally need dozens of Android tablets or phones to do any large-scale testing. Kiosks, and all that.
The focus is now on breaking the current code, trying to prove it wrong. There's //no point// at this stage in testing how well it runs, how fast it is, how much memory it uses, or anything else. We'll work up to trying (and failing) to break each individual functionality but first, we try to break some of the core assertions I've put into the code.
These are:
* The first command that any node receives from a peer MUST be "HELLO". In other words, messages //cannot// be lost during the peer-to-peer connection process.
* The state each node each node calculates for its peers matches the state each peer calculates for itself. In other words, again, no messages are lost in the network.
* When my application sends a message to a peer, we have a connection to that peer. In other words, the application only "sees" a peer after we have established a 0MQ connection to it.
With 0MQ, there are several cases we may lose messages. One is when the "late joiner" syndrome. Two is when we close sockets without sending everything. Three is when we overflow the high-water mark on a ROUTER or PUB socket. Four is when we use an unknown address with a ROUTER socket.
Now, I //think// Harmony gets around all these potential cases. But we're also adding UDP to the mix. So the first version of the tester simulates an unstable and dynamic network, where nodes come and go randomly. It's here that things will break.
Here is the main thread of the tester, which manages a pool of 100 threads, starting and stopping each one randomly. Every ~750 msecs it either starts or stops one random thread. We randomize the timing so that threads aren't all synchronized. After a few minutes we have an average of 50 threads happily chatting to each other like Korean teenagers in Gangnam subway station:
[[code type="fragment" name="zyre-tester-main"]]
int main (int argc, char *argv [])
{
// Initialize context for talking to tasks
zctx_t *ctx = zctx_new ();
zctx_set_linger (ctx, 100);
// Get number of interfaces to simulate, default 100
int max_interface = 100;
int nbr_interfaces = 0;
if (argc > 1)
max_interface = atoi (argv [1]);
// We address interfaces as an array of pipes
void **pipes = zmalloc (sizeof (void *) * max_interface);
// We will randomly start and stop interface threads
while (!zctx_interrupted) {
uint index = randof (max_interface);
// Toggle interface thread
if (pipes [index]) {
zstr_send (pipes [index], "STOP");
zsocket_destroy (ctx, pipes [index]);
pipes [index] = NULL;
zclock_log ("I: Stopped interface (%d running)", --nbr_interfaces);
}
else {
pipes [index] = zthread_fork (ctx, interface_task, NULL);
zclock_log ("I: Started interface (%d running)", ++nbr_interfaces);
}
// Sleep ~750 msecs randomly so we smooth out activity
zclock_sleep (randof (500) + 500);
}
zctx_destroy (&ctx);
return 0;
}
[[/code]]
Note that we maintain a 'pipe' to each child thread (CZMQ creates the pipe automatically when we use the zthread_fork() method). It's via this pipe that we tell child threads to stop, when it's time for them to leave. The child threads do the following (I'm switching to pseudo-code for clarity):
[[code]]
create an interface
while true:
poll on pipe to parent, and on interface
if parent sent us a message:
break
if interface sent us a message:
if message is ENTER:
send a WHISPER to the new peer
if message is EXIT:
send a WHISPER to the departed peer
if message is WHISPER:
send back a WHISPER 1/2 of the time
if message is SHOUT:
send back a WHISPER 1/3 of the time
send back a SHOUT 1/3 of the time
once per second:
join or leave one of 10 random groups
destroy interface
[[/code]]
++++ Test Results
Yes, we broke the code. Several times, in fact. This was satisfying. I'll work through the different things we found.
Getting nodes to agree on consistent group status was the most difficult. Every node needs to track the group membership of the whole network, as I already explained in the section "Group Messaging". Group messaging is a publish-subscribe pattern. JOINs and LEAVEs are analogous to 'subscribe' and 'unsubscribe' messages. It's essential none of these ever get lost, or we'll find nodes dropping randomly off groups.
So each node counts the total number of JOINs and LEAVEs it's ever done, and broadcasts this status (as 1-byte rolling counter) in its UDP beacon. Other nodes pick up the status, compare it to their own calculations, and if there's a difference, the code asserts.
First problem was that UDP beacons get delayed randomly, so they're useless for carrying the status. When a beacons arrives late, the status is inaccurate and we get a 'false negative'. To fix this we moved the status information into the JOIN and LEAVE commands. We also added it to the HELLO command. The logic then becomes:
* Get initial status for a peer from its HELLO command.
* When getting a JOIN or LEAVE from a peer, increment the status counter.
* Check that the new status counter matches the value in the JOIN or LEAVE command
* If it doesn't, assert.
Next problem we got was that messages were arriving unexpectedly on new connections. The Harmony pattern connects, then sends HELLO as the first command. This means the receiving peer should always get HELLO as the first command from a new peer. We were seeing PING, JOIN, and other commands arriving.
This turned out to be due to CZMQ's ephemeral port logic. An ephemeral port is just a dynamically assigned port that a service can get rather than asking for a fixed port number. A POSIX system usually assigns ephemeral ports in the range 0xC000 to 0xFFFF. CZMQ's logic is to look for a free port in this range, bind to that, and return the port number to the calller.
Which sounds fine, until you get one node stopping and another node starting, close together, and the new node getting the port number of the old node. Remember that 0MQ tries to re-establish a broken connection. So when the first node stopped, its peers would retry to connect. When the new node appears on that same port, suddenly all the peers connect to it, and start chatting like they're old buddies.
It's a general problem that affects any larger-scale dynamic 0MQ application. There are a number of plausible answers. One is to not reuse ephemeral ports, which is easier said than done when you have multiple processes on one system. Another solution would be to select a random port each time, which at least reduces the risk of hitting a just-freed port. This brings the risk of a garbage connection down to perhaps 1/1000 but it's still there. Perhaps the best solution is to accept that this can happen, understand the causes, and deal with it on the application level.
We have a stateful protocol that always starts with a HELLO command. We know that it's possible for peers to connect to us, thinking we're an existing node that went away and came back, and send us other commands. Step one is when we discover a new peer, to destroy any existing peer connected to the same endpoint. It's not a full answer but it's polite, at least. Step two is to ignore anything coming in from a new peer until that peer says HELLO.
This doesn't require any change to the protocol but it has to be specified in the protocol when we come to it: due to the way 0MQ connections work, it's possible to receive unexpected commands from a //well-behaving// peer and there is no way to return an error code, or otherwise tell that peer to reset its connection. Thus, a peer must discard any command from a peer until it receives HELLO.
In fact, if you draw this on a piece of paper and think it through, you'll see that you never get a HELLO from such a connection. The peer will send PINGs and JOINs and LEAVEs and then eventually time-out and close, as it fails to get any heartbeats back from us.
You'll also see that there's no risk of confusion, no way for commands from two peers to get mixed into a single stream on our DEALER socket.
When you are satisfied this works, we're ready to move on. This version is tagged in the repository as v0.3.0 and you can [https://github.com/zeromq/zyre/tags download the tarball] if you want to check what the code looked like at this stage.
Note that doing heavy simulation of lots of nodes will probably cause your process to run out of file handles, giving an assertion failure in libzmq. I raised the per-process limit to 30,000 by running (on my Linux box):
[[code]]
ulimit -n 30000
[[/code]]
++++ Tracing Activity
To debug the kinds of problems we saw here, we need extensive logging. There's a lot happening in parallel but every problem can be traced down to a specific exchange between two nodes, consisting of a set of events that happen in strict sequence. We know how to make very sophisticated logging but as usual it's wiser to make just what we need, no more. We have to capture:
* Time and date for each event.
* In which node the event occurred.
* The peer node, if any.
* What the event was (e.g. which command arrived).
* Event data, if any.
The very simplest technique is to print the necessary information to the console, with a timestamp. That's the approach I used. Then it's simple to find the nodes affected by a failure, filter the log file for only messages referring to them, and see exactly what happened.
++++ Dealing with Blocked Peers
In any performance-sensitive 0MQ architecture you need to solve the problem of flow control. You cannot simply send unlimited messages to a socket and hope for the best. At the one extreme, you can exhaust memory. This is a classic failure pattern for a message broker: one slow client stops receiving messages; the broker starts to queue them, and eventually exhausts memory and the whole process dies. At the other extreme, the socket drops messages, or blocks, as you hit the high-water mark.
With ZyRE we want to distribute messages to a set of peers, and we want to do this fairly. Using a single ROTER socket for output would be problematic since any one blocked peer would block outgoing traffic to all peers. TCP does have good algorithms for spreading the network capacity across a set of connections. And we're using a separate DEALER socket to talk to each peer, so in theory each DEALER socket will send its queued messages in the background reasonably fairly.
The normal behavior of a DEALER socket that hits its high-water mark is to block. This is usually ideal, but it's a problem for us here. Our current interface design uses one thread that distributes messages to all peers. If one of those send calls were to block, all output would block.
There are a few options to avoid blocking. One is to use {{zmq_poll[3]}} on the whole set of DEALER sockets, and only write to sockets that are ready. I don't like this for a couple of reasons. First, the DEALER socket is hidden inside the peer class, and it is cleaner to allow each class to handle this opaquely. Second, what do we do with messages we can't yet deliver to a DEALER socket? Where do we queue them? Third, it seems to be side-stepping the issue. If a peer is really so busy it can't read its messages, something is wrong. Most likely, it's dead.
So no polling for output. The second option is to use one thread per peer. I quite like the idea of this, since it fits into the 0MQ design pattern of "do one thing in one thread". But this is going to create a //lot// of threads (square of the number of nodes we start) in the simulation, and we're already running out of file handles.
A third option is to use a non-blocking send. This is nicer and it's the solution I choose. We can then provide each peer with a reasonable outgoing queue (the HWM) and if that gets full, treat it as a fatal error on that peer. This will work for smaller messages. If we're sending large chunks -- e.g. for content distribution -- we'll need a credit-based flow control on top.
First step therefore is to prove to ourselves that we can turn the normal blocking DEALER socket into a non-blocking socket. This example creates a normal DEALER socket, connects it to some endpoint (so there's an outgoing pipe and the socket will accept messages), sets the high-water mark to four, and then sets the send timeout to zero:
[[code type="example" title="Checking EAGAIN on DEALER socket" name="eagain"]]
[[/code]]
When we run this, we send four messages successfully (they go nowhere, the socket just queues them), and then we get a nice EAGAIN error:
[[code]]
Sending message 0
Sending message 1
Sending message 2
Sending message 3
Sending message 4
Resource temporarily unavailable
[[/code]]
Next step is to decide what a reasonable high water mark would be for a peer. ZyRE is meant for human interactions, that is applications which chat at a low frequency. Perhaps two games, or a shared drawing program. I'd expect a hundred messages per second to be quite a lot. Our "peer is really dead" timeout is 10 seconds. So a high-water mark of 1,000 seems fair.
Rather than set a fixed HWM, or use the default (which randomly also happens to be 1,000) we calculate it as 100 * the timeout. Here's how we configure a new DEALER socket for a peer:
[[code type="fragment" name="zyre-peer-new-socket"]]
// Create new outgoing socket (drop any messages in transit)
self->mailbox = zsocket_new (self->ctx, ZMQ_DEALER);
// Set our caller 'From' identity so that receiving node knows
// who each message came from.
zsocket_set_identity (self->mailbox, reply_to);
// Set a high-water mark that allows for reasonable activity
zsocket_set_sndhwm (self->mailbox, PEER_EXPIRED * 100);
// Send messages immediately or return EAGAIN
zsocket_set_sndtimeo (self->mailbox, 0);
// Connect through to peer node
zsocket_connect (self->mailbox, "tcp://%s", endpoint);
[[/code]]
And finally, what do we do when we get an EAGAIN on a peer? We don't need to go through all the work of destroying the peer since the interface will do this automatically if it doesn't get any message from the peer within the expiry timeout. Just dropping the last message seems very weak - it will give the receiving peer gaps.
I'd rather a more brutal response. Brutal is good because it forces the design to a "good" or "bad" decision rather than a fuzzy "should work but to be honest there are a lot of edge cases so let's worry about it later". Destroy the socket, disconnect the peer, and stop sending anything to it. The peer will eventually have to reconnect and re-initialize any state. It's kind of an assertion that 100 messages a second is enough for anyone. So, in the zre_peer_send method:
[[code type="fragment" name="zyre-peer-send"]]
int
zre_peer_send (zre_peer_t *self, zre_msg_t **msg_p)
{
assert (self);
if (self->connected) {
if (zre_msg_send (msg_p, self->mailbox) && errno == EAGAIN) {
zre_peer_disconnect (self);
return -1;
}
}
return 0;
}
[[/code]]
Where the disconnect method looks like this:
[[code type="fragment" name="zyre-peer-disconnect"]]
void
zre_peer_disconnect (zre_peer_t *self)
{
// If connected, destroy socket and drop all pending messages
assert (self);
if (self->connected) {
zsocket_destroy (self->ctx, self->mailbox);
free (self->endpoint);
self->endpoint = NULL;
self->connected = false;
}
}
[[/code]]
+++ Distributed Logging and Monitoring
Let's look at logging and monitoring. If you've ever managed a real server (like a web server) you know how vital it is to have a capture of what is going on. There are a long list of reasons, not least:
* To measure the performance of the system over time.
* To see what kinds of work are done the most, to optimize performance.
* To track errors and how often they occur.
* To do postmortems of failures.
* To provide an audit trail in case of dispute.
Let's scope this in terms of the problems we think we'll have to solve:
* We want to track key events (such as nodes leaving and rejoining the network).
* For each event we want to track a consistent set of data: the date/time, node which observed the event, peer that created the event, type of event itself, and other event data.
* We want to be able to switch logging on and off at any time.
* We want to be able to process log data mechanically, since it will be sizable.
* We want to be able to monitor a running system, that is, collect logs and analyze in real-time.
* We want log traffic to have minimal effect on the network.
* We want to be able to collect log data at a single point on the network.
As in any design, some of these requirements are hostile to each other. For example, collecting log data in real-time means sending it over the network, which will affect network traffic to some extent. However as in any design these requirements are also hypothetical until we have running code, so we can't take them too seriously. We'll aim for //plausibly good enough// and improve over time.
++++ A Plausible Minimal Implementation
Arguably, just dumping log data to disk is one solution, and it's what most mobile applications do (using 'debug logs'). But most failures require correlation of events from two nodes. This means searching lots of debug logs by hand to find the ones that matter. It's not a very clever approach.
We want to send log data somewhere central, either immediately, or opportunistically (i.e. store and forward). For now, let's focus on immediate logging. My first idea, when it comes to sending data, is to use ZyRE for this. Just send log data to a group called "LOG", and hope someone collects it.
But using ZyRE to log ZyRE itself is a Catch-22. Who logs the logger? What if we want a verbose log of every message sent? Do we include logging messages in that, or not? It quickly gets messy. We want a logging protocol that's independent of ZyRE's main ZRE protocol. The simplest approach is a PUB-SUB protocol, where all nodes publish log data on a PUB socket, and a collector picks that up via a SUB socket!figref().
[[code type="textdiagram" title="Distributed Log Collection"]]
+--------+ +--------+ +--------+
| | | | | |
| Node | | Node | | Node |
| | | | | |
+--------+ +--------+ +--------+
| PUB | | PUB | | PUB |
+---+----+ +---+----+ +---+----+
| | |
| | |
+---------------+---------------+
|
|
v
+-----------+
| SUB |
+-----------+
| |
| Collector |
| |
+-----------+
[[/code]]
The collector can, of course, run on any node. This gives us a nice range of use-cases:
* A passive log collector that stores log data on disk for eventual statistical analysis; this would be a PC with sufficient hard disk space for weeks or months of log data.
* A collector that stores log data into a database where it can be used in real-time by other applications. This might be overkill for a small workgroup but would be snazzy for tracking the performance of larger groups. The collector could collect log data over WiFi and then forward it over Ethernet to a database somewhere.
* A live meter application that joined the ZyRE network and then collected log data from nodes, showing events and statistics in real-time.
Next question is how to interconnect the nodes and collector. Which side binds, and which connects? Both ways will work here but it's marginally better if the PUB sockets connect to the SUB socket. If you recall, 0MQ's internal buffers only pop into existence when there are connections. It means as soon as a node connects to the collector it can start sending log data without loss.
How do we tell nodes what endpoint to connect to? We may have any number of collectors on the network, and they'll be using arbitrary network addresses and ports. We need some kind of service announcement mechanism, and here we can use ZyRE to do the work for us. We could use group messaging, but it seems neater to build service discovery into the ZRE protocol itself. It's nothing complex: if a node provides a service X, it can tell other nodes about that when it sends them a HELLO command.
We'll extend the HELLO command with a //headers// field that holds a set of name=value pairs. Let's define that the header {{LOG_COLLECTOR}} specifies the collector endpoint (the SUB socket). A node that acts as a collector can add a header like this (for example):
[[code]]
LOG_COLLECTOR=tcp://192.168.1.122:9992
[[/code]]
When another node sees this header it simply connects its PUB socket to that endpoint. Log data now gets distributed to all collectors (zero or more) on the network.
Making this first version was fairly simple and took half a day. Here are the pieces we had to make or change:
* We made a new class {{zre_log}} that accepts log data and manages the connection to the collector, if any.
* We added some basic management for peer headers, taken from the HELLO command.
* When a peer has the LOG_COLLECTOR header, we connect to the endpoint it specifies.
* Where we were logging to stdout, we switched to logging via the zre_log class.
* We extended the interface API with a method that lets the application set headers.
* We wrote a simple logger application that manages the SUB socket and sets the LOG_COLLECTOR header.
* We send our own headers when we send a HELLO command.
This version is tagged in the ZyRE repository as v0.4.0 and you can [https://github.com/zeromq/zyre/tags download the tarball] if you want to check what the code looked like at this stage.
At this stage the log message is just a string. We'll make more professionally structured log data in a little while.
First, a note on dynamic ports. In the zre_tester app that we use for testing, we create and destroy interfaces aggressively. One consequence is that a new interface can easily reuse a port that was just freed by another application. If there's a 0MQ socket somewhere trying to connect this port, the results can be hilarious.
Here's the scenario I had, which caused a few minutes' confusion. The logger was running on a dynamic port:
* Start logger application
* Start tester application
* Stop logger
* Tester receives invalid message (and asserts as designed)
As the tester created a new interface, that reused the dynamic port freed by the (just stopped) logger, and suddenly the interface began to receive log data from nodes, on its mailbox. We saw a similar situation before, where a new interface could reuse the port freed by an old interface, and start getting old data.
The lesson is, if you use dynamic ports, be prepared to receive random data from ill-informed applications that are reconnecting to you. Switching to a static port stopped the misbehaving connection. That's not a full solution though. There are two more weaknesses:
* As I write this, libzmq doesn't check socket types when connecting. The [http://rfc.zeromq.org/spec:15 ZMTP/2.0 protocol] does announce each peer's socket type, so this check is doable.
* The ZRE protocol has no fail-fast (assertion) mechanism; we need to read and parse a whole message before realizing that it's invalid.
Let's address the second one. Socket pair validation wouldn't solve this fully anyhow.
++++ Protocol Assertions
As Wikipedia puts it, "Fail-fast systems are usually designed to stop normal operation rather than attempt to continue a possibly flawed process." A protocol like HTTP has a fail-fast mechanism in that the first four bytes that a client sends to an HTTP server must be "HTTP". If they're not, the server can close the connection without reading anything more.
Our ROUTER socket is not connection-oriented so there's no way to "close the connection" when we get bad incoming messages. However we can throw-out the entire message if it's not valid. The problem is going to be worse when we use ephemeral ports, but it applies broadly to all protocols.
So let's define a //protocol assertion// as being a unique signature that we place at the start of each message, and which identities the intended protocol. When we read a message, we check the signature, and if it's not what we expect, we discard the message silently. A good signature should be hard to confuse with regular data, and give us enough space for a number of protocols.
I'm going to use a 16-bit signature consisting of a twelve-bit pattern and a 4-bit protocol ID!figref(). The pattern %xAAA is meant to stay away from values we might otherwise expect to see at the start of a message: %x00, %xFF, and printable characters.
[[code type="textdiagram" title="Protocol Signature"]]
+---+---+---+---+---+---+---+---+ +---+---+---+---+---------------+
| 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | | 1 | 0 | 1 | 0 | Signature |
+---+---+---+---+---+---+---+---+ +---+---+---+---+---------------+
Byte 0 Byte 1
[[/code]]
As our protocol codec is generated, it's relatively easy to add this assertion. The logic is:
* Get first frame of message.
* Check if first two bytes are %xAAA with expected 4-bit signature.
* If so, continue to parse rest of message.
* If not, skip all 'more' frames, get first frame, and repeat.
And to test, I switched the logger back to using an ephemeral port. The interface now properly detects and discards any messages that don't have a valid signature. If the message has a valid signature and is //still// wrong, that's a proper bug.
++++ Binary Logging Protocol
Now that we have the logging framework working properly, let's look at the protocol itself. Sending strings around the network is simple but when it comes to WiFi we really cannot afford to waste bandwidth. We have the tools to work with efficient binary protocols, so let's design one for logging.
This is going to be a PUB-SUB protocol and in 0MQ/3.x we do publisher-side filtering. This means we can do multi-level logging (errors, warnings, information), if we put the logging level at the start of the message. So, our message starts with a protocol signature (two bytes), a logging level (one byte), and an event type (one byte).
In the first version we send UUID strings to identify each node. As text, these are 32 characters each. We can send binary UUIDs but it's still verbose and wasteful. In the log files we don't care about the node identifiers. All we need is some way to correlate events. So what's the shortest identifier we can use that's going to be unique enough for logging? I say unique 'enough' because while we really want zero chance of duplicate UUIDs in the live code, log files are not so critical.
Looking for the simplest plausible answer: hash the IP address and port into a 2-byte value. We'll get some collisions but they'll be rare. How rare? As a quick sanity check I write a small program that generates a bunch of addresses and hashes them into 16-bit values, looking for collisions. To be sure, I generate 10,000 addresses across a small number of IP addresses (matching a simulation set-up), and then across a large number of addresses (matching a real-life setup). The hashing algorithm is a //modified Bernstein//:
[[code type="fragment" name="endpoint-hashing"]]
uint16_t hash = 0;
while (*endpoint)
hash = 33 * hash ^ *endpoint++;
[[/code]]
Over several runs I don't get any collisions, so this will work as identifier for the log data. This adds four bytes (two for the node recording the event, and two for its peer in events that come from a peer).
Next, we want to store the date and time of the event. The POSIX time_t type used to be 32 bits but since this overflows in 2038, it's a 64-bit value. We'll use this; there's no need for millisecond resolution in a log file: events are sequential, clocks are unlikely to be that tightly synchronized, and network latencies mean that precise times aren't that meaningful.
We're up to 16 bytes, which is decent. Finally we want to allow some additional data, formatted as text and depending on the type of event. Putting this all together gives the following message specification: