-
Notifications
You must be signed in to change notification settings - Fork 0
/
drip.txt
1258 lines (949 loc) · 60.3 KB
/
drip.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
*ストリーム指向のストレージDrip
この章では筆者が最近夢中になっているストリーム指向のストレージ、Dripについて紹介します。Dripはストレージであると同時に、プロセス間を協調のメカニズムでもあります。このくだりを聴いてもRindaとの共通部分が多くあると感じるでしょう。実際にRindaのアプリケーションを書いた経験を元にして書かれました。DripはRindaを置き換えるものではありません。どちらかというとオブジェクトの貯蔵庫であって、オブジェクト指向データベース、Key Value Storeやマルチディメンジョンのリストなど一連のストレージの習作を出発点としました。
**Dripとはなにか
Dripは追記型のストレージの一種で、Rubyオブジェクトを時系列にログします。Dripにはオブジェクトの追記のみ可能で、削除や更新はできません。dRubyのRMIを考慮した、局所的で安価なブラウズ用APIを用意してあります。オブジェクトのまとめ転送や、簡単なパターンによるフィルタ、シークをなど、です。
また、Dripはプロセス間の同期メカニズムでもあります。新しいオブジェクトの到着を待合せることができます。Dripでは一度保存されたオブジェクトは変化することはありません。複数のプロセスがばらばらの時刻に読み出した情報はどれも同じものですし、誰かが読んだオブジェクトを別の誰かが変更することはありません。この特性は分散ファイルシステムでよく見られる特性で、情報を排他的にアクセスしなくてはならない状況を減らすことができます。
Dripはちょっとしたオブジェクトの保存先であり、プロセス間通信、バッチ処理のエーテルであり、ライフログです。単純な仕組みであるため、さまざまな用途への応用が考えられますし、それ故に使い途を想像するのが難しいとも言えます。私のDripを次のようなアプリケーションに使いました。
- バッチ処理のミドルウェア
- Wikiシステムのストレージと全文検索
- Twitterのタイムラインのアーカイブとbotフレームワーク
- irbでの作業中のメモ
ちょっと雲をつかむような感じですね。次の節から、身近な同期メカニズムであるQueue、身近なオブジェクトの貯蔵庫であるHashとの違いをそれぞれ見ながら、Dripを紹介します。
**Queueとの比較
まずQueueと比較しながらDripにおけるプロセス間の協調の違いを見てましょう。
ここでのQueueとはRubyに付属のQueueクラスです。QueueはFIFOのバッファで、要素は任意のオブジェクトです。Queueにオブジェクトを追加するのはpush、オブジェクトを取り出すのはpopです。popはオブジェクトを返すと同時に、Queueの中からそのオブジェクトを削除します。
同時に複数のスレッドからpopすることも可能ですが、一つの要素は一つのスレッドにだけ届きます。同じ要素が複数のpopに届くことはありません。
空のQueueに対してpopを行うとpopはブロックします。新しいオブジェクトが追加され、そしてそのオブジェクトを獲得したただ一人のスレッドに対してオブジェクトを届けます。
Dripにおいてpopに相当する操作はreadです。readは指定したカーソルより新しい要素を返します。ただし、Dripの中から要素を削除することはありません。複数のスレッドが同じカーソルでreadした場合には、それぞれのスレッドに同じ要素を返します。
カーソルよりも新しい要素がない場合、readはブロックします。新しいオブジェクトがwriteされるとreadのブロックはとけて、新しい要素を返します。この場合も、複数のスレッドに同じ要素が届きます。
DripがQueueやRindaとよく似ているポイントは、要素の到着を待つことができるところです。
また異なるポイントは要素を消費するかどうかです。Queueのpopは要素を消費しますが、Dripのreadでは要素は減りません。これは何度でも/何人でも読めるということです。Rindaではアプリケーションのバグやクラッシュによるタプルの紛失はシステム全体のダウンを意味することがありますが、Dripでは要素の紛失を気にする必要はありません。
具体的なコードでDripのreadの様子を見ていきましょう。
***ここで使用するメソッド
ここで使用するメソッドは主に二つです。
>|ruby|
Drip#write(obj, *tags)
||<
writeメソッドはDripの状態を変化させる唯一の操作で、要素を追加します。要素objをDripに格納し、格納されたキーを返します。objへのアクセスを容易にするために、複数のタグをしていできます。タグの使い方はあとで説明します。
もう一つのメソッドはreadです。
>|ruby|
Drip#read(key, n=1, at_least=1, timeout=nil)
||<
Dripをブラウズする基本となるメソッドがreadです。keyは注目点(カーソル)で、keyよりも後に追加された要素のキーと値の組をn個の配列で返します。要素がat_least個そろうまで、readはブロックします。timeoutを指定することができます。
説明が長いですね。要するに「新しい要素をn返せ。at_least個揃うまでは待機せよ。」です。
***Dripのインストールと起動
おっと。Dripのインストールを忘れていました。DripはRBTreeという赤黒木の外部ライブラリを使用します。gemを用意していただいたので次のようにインストールして下さい。
>||
% gem ?????
||<
次にDripサーバを起動します。
Dripはデフォルトでは二次記憶としてプレーンなファイルを使います。Dripを生成するにはファイルを置くディレクトリを指定します。次のスクリプト(drip_s.rb)はDripを生成しdRubyでサービスするものです。
>|ruby|
require 'drip'
require 'drb'
class Drip
def quit
Thread.new do
synchronize do |key|
exit(0)
end
end
end
end
drip = Drip.new('drip_dir')
DRb.start_service('druby://localhost:54321', drip)
DRb.thread.join
||<
Dripにquitメソッドを追加しています。これはRMI経由でこのプロセスを終了させるもので、Dripが二次記憶への操作をしていないとき(synchronize中)を待ってから終わらせます。
次のように起動できます。
>||
% ruby drip_s.rb
||<
***MyDrip
MacOSXなどPOSIXなOS専用ですが、MyDripという1人用の起動が簡単なDripサーバも用意されています。これは、ホームディレクトリの直下に.dripというディレクトリを作成し、この中をストレージとするDripで、UNIXドメインソケットを使ってサービスします。UNIXドメインソケットですから、ファイルの権限、可視性によって利用者を制限できます。また、UNIXドメインソケットのファイル名はホームディレクトリ以下のいつも決まったパスで接続できます。
TCPの場合、固定にするにはそのマシンの中である番号のポートをあるサービスに割り当てる、とみんなで約束を守る必要があり、dRubyのURIを固定にするのに面倒なところがあります。それに対して、各ユーザのホームディレクトリの下のファイルを使う場合にはみんなで約束しあう必要がありませんから、URIを機械的に決めるのが簡単です。
MyDripを利用するにはmy_dripをrequireします。
起動してみましょう。
>||
ターミナル1
% irb -r my_drip --simple-prompt
>> MyDrip.invoke
=> 51252
>> MyDrip.class
=> DRb::DRbObject
||<
MyDripはこの固定のポートを指すDRbObjectですが、特別にinvokeメソッドが定義されています。MyDrip.invokeは新しいプロセスをforkし、必要であればDripデーモン起動します。すでに自分用のMyDripが動いている場合にはなにもせずに終了します。なお、MyDripを終了させるにはMyDrip.quitメソッドを使います。
MyDripはirb実行中にちょっとしたオブジェクトのメモをとるのにも使える便利なデーモンです。筆者の環境ではいつもMyDripを起動してあり、Twitterのタイムラインを常にアーカイブしたり、メモをしたりbotのミドルウェアになったりしています。
私の.irbrcは次のようにmy_dripをrequireしています。irbを使っているときはいつでもMyDripにメモできます。
>|ruby|
require 'my_drip'
||<
以降の実験では、主にMyDripを利用します。MyDripが利用できない環境の方は、次のように定義した"my_drip.rb"を用意することでdrip_s.rbのサービスを代用して使えます。
>|ruby|
MyDrip = DRbObject.new_with_uri('druby://localhost:54321')
||<
***再びQueueとの比較
MyDripデーモン(あるいは代用となるdrip_s.rb)が起動している状態で実験です。
writeメソッドを使ってオブジェクトを二つ追加します。writeはDripを変化させる唯一のメソッドです。writeメソッドの戻り値は追加された要素と関連付けられたキーです。キーは時刻(usec)から作られた正の整数で、64bitマシンではしばらくの間はFixnumとなります。
>||
ターミナル2
% irb -r my_drip --simple-prompt
>> MyDrip.write('Hello')
=> 1312541947966187
>> MyDrip.write('world')
=> 1312541977245158
||<
つぎにDripからデータを読んでみます。
>||
ターミナル3
% irb -r my_drip --simple-prompt
>> MyDrip.read(0, 1)
=> [[1312541947966187, "Hello"]]
||<
readはカーソルからn個の要素を読むメソッドで、キーと値のペアの配列を返します。
順に読むには次のようにカーソルを動かしながらreadすると良いでしょう。
>||
>> k = 0
=> 0
>> k, v = MyDrip.read(k, 1)[0]
=> [1312541947966187, "Hello"]
>> k, v = MyDrip.read(k, 1)[0]
=> [1312541977245158, "World"]
||<
二つ読めました。さらに読むとどうなるでしょう。
>||
>> k, v = MyDrip.read(k, 1)[0]
||<
kよりも新しい要素がないのでブロックします。ターミナル2から新しい要素を追加するとブロックがとけ、そのオブジェクトが読めるはずです。
>||
ターミナル2
>> MyDrip.write('Hello, Again')
=> 1312542657718320
||<
>||
>> k, v = MyDrip.read(k, 1)[0]
=> [1312542657718320, "Hello, Again"]
||<
どうですか?待合せできていますか?
読み手を増やしてまた0から読んでみましょう。
>||
ターミナル4
% irb -r my_drip --simple-prompt
>> k = 0
=> 0
>> k, v = MyDrip.read(k, 1)[0]
=> [1312541947966187, "Hello"]
>> k, v = MyDrip.read(k, 1)[0]
=> [1312541977245158, "World"]
>> k, v = MyDrip.read(k, 1)[0]
=> [1312542657718320, "Hello, Again"]
||<
同じ要素が読めました。DripではQueueとちがって要素を消費しませんから、同じ情報をなんども読めます。その代わりにどの辺りの要素を読むのか、readのたびに指定しなくてはなりません。
ここでMyDripを再起動させましょう。quitメソッドを呼ぶとだれもwriteしていないときを見計らってプロセスを終了させます。再起動するにはinvokeを呼びます。MyDrip.invokeはログが大きいと時間がかかるときがあります。
>||
ターミナル1
>> MyDrip.quit
=> #<Thread:...>
>> MyDrip.invoke
=> 61470
||<
readメソッドで先ほどの状態になっているか確認してみましょう。
>||
ターミナル1
>> MyDrip.read(0, 3)
=> [[1312541947966187, "Hello"], [1312541977245158, "World"], [1312542657718320, "Hello, Again"]]
||<
***実験のまとめ
Queueと似ている点は、時系列に並んだデータを順に取り出せるところ、データの到着を待合せできるところです。Queueと異なる点はデータが減らないところです。同じ要素を複数のプロセスから読めますし、同じプロセスが何度もよむこともできます。経験上、バッチ処理は開発中も運用中も何度も停まりますよね。Dripでは工夫すれば先ほどの状態から処理を再開できます。途中からでも最初からでもやり直すチャンスがあります。
またQueueとの比較を通じて基本となる二つの操作、write、readを紹介しました。
**Hashとの比較
ここではKVS、あるいはHashとDripを比較し、それを通じてDripの操作を学びます。
RubyのHashはキーと値が組になった連想配列で、連想配列の実装にハッシュ表を使うことからHashと呼ばれています。あるキーと関連するのは一つの値です。Dripではwrite時に指定できるタグを使ってHashを模倣することができます。
***タグ
Drip#writeには格納したいオブジェクトのほかにタグを指定することができます。タグはStringです。一つのオブジェクトに複数のタグをつけることができます。あるタグを指定してreadすることができるため、オブジェクトをあとで取り出すのが容易になります。このタグを利用するとHashを模倣することができます。
タグをHashのキーと考えてみましょう。Dripにおいて「タグをつけてwriteする」のはHashにおいては「キーに関連する値を設定する」ことになります。「タグをもつ最新の値をreadする」のはHashではキーに関連す値を取り出すことと同じです。「最新の値」を取り出せばHashと同様ですが、それ以前の値を取り出すことができますから、この方法で模倣したHashは、変更履歴を持つHashと言えます。
***ここで使用するAPI
ここで新たに使用するAPIはheadとread_tagです。
>|ruby|
Drip#head(n=1, tag=nil)
||<
headは先頭からn個の要素の配列を返します。tagを指定すると、そのtagを持つ要素だけを選んでn個返します。Drip中の要素数がnより小さくてもHeadはブロックしません。先頭のn個を覗くだけです。
>|ruby|
Drip#read_tag(key, tag, n=1, at_least=1, timeout=nil)
||<
read_tagの基本的な動作はreadと同じですが、tagを指定するところが違います。tagをもつ要素だけをreadします。readと同じですから、keyより新しい要素の数がat_least個に満たない場合は、新しいデータが追加されるまでブロックします。あるタグを持つ要素の追加を待ち合わせることができるわけです。
***実験
タグとhead、read_tagを組み合わせてHashを模倣してみましょう。先ほどのMyDripをそのまま使います。
まず値の設定です。
>|ruby|
hash['seki.age'] = 29
||<
上記のhashへの操作に相当するのは次の通りです。'seki.age'というタグをつけて29をwriteします。
>||
ターミナル2
>> MyDrip.write(29, 'seki.age')
=> 1313358208178481
||<
値の取り出しにはheadが良いでしょう。'seki.age'タグを持つ要素を先頭から一つ要求します。
>||
ターミナル2
>> MyDrip.head(1, 'seki.age')
=> [[1313358208178481, 29, "seki.age"]]
||<
一つの要素は[キー, 値, 任意個のタグ]で、これらの配列が返ります。値だけを見たいのであれば次のようにしても良いでしょう。
>||
ターミナル2
>> k, v = MyDrip.head(1, 'seki.age')
=> [[1313358208178481, 29, "seki.age"]]
>> v
=> 29
||<
今度は値を再設定してみます。
>|ruby|
hash['seki.age'] = 49
||<
Hashでいうと上記のような操作です。'seki.age'に関連する値を49と変更するには、先ほどと同様に'seki.age'というタグをつけて49をwriteすればよいです。writeして、headで確認してみましょう。
>||
ターミナル2
>> MyDrip.write(49, 'seki.age')
=> 1313358584380683
>> MyDrip.head(1, 'seki.age')
=> [[1313358584380683, 49, "seki.age"]]
||<
変更履歴は過去のデータを取り出せばわかります。headを使って最新10バージョンの履歴を調べます。
>||
ターミナル2
>> MyDrip.head(10, 'seki.age')
=> [[1313358208178481, 29, "seki.age"], [1313358584380683, 49, "seki.age"]]
||<
先頭から10個の要素を要求しましたが、いまDripの中にある'seki.age'を持つ要素は二つだけなので、2要素のArrayが返りました。結果が複数返る場合、配列は旧い方から新しい方へ向けて並んでいます。
では存在しないキー(Hashでいうところのキー)を問い合わせるとどうなるでしょう。
>||
ターミナル2
>> MyDrip.head(1, 'sora_h.age')
=> []
||<
空の配列が返りました。ブロックもしません。headはブロックしない操作なので、要素が見つからないときは空の配列を返します。
狙った要素が追加を待ち合わせするにはread_tagを使います。
>||
ターミナル2
>> MyDrip.read_tag(0, 'sora_h.age')
||<
ブロックしますね。別の端末から値を設定してみます。
>||
ターミナル3
>> MyDrip.write(12, 'sora_h.age')
=> 1313359385886937
||<
read_tagのブロックは解けて、いま追加したオブジェクトが返ります。
>||
ターミナル2
>> MyDrip.read_tag(0, 'sora_h.age')
=> [[1313359385886937, 12, "sora_h.age"]]
||<
***実験のまとめ
タグをうまく使うとHashの基本操作である値の設定と取り出しが模倣できることがわかりました。Hashと違うところは次の点です。
- 要素は消せない
- 履歴がある
- keys/eachがない
Hashと違い要素を削除することはできませんが、nil、あるいは削除状態を表わす特別なオブジェクトを設定するなどによって代用できると思います。また、要素を削除できない副産物として変更の履歴を全て見ることができます。
keysとeachが用意されないのは意図してのことです。簡単に作れるので一度作成しましたが削除しました。現在、DripにそのAPIは残っていません。keysを実装するには全ての要素を一度集める必要がありますが、要素数が大きくなったときに破綻する可能性があるからです。多くの分散ハッシュテーブルでもkeysは用意されていないのではないかと思います。
TupleSpaceと似ている点があります。read_tagを使うと要素の追加や更新を待ち合わせることができます。これはRindaのTupleSpaceにおけるreadのパターンマッチングを非常に限定したものと考えられます。ある特定のタグをもつ要素が追加されるまでプロセスを待たせることができます。このパターンマッチはRindaと比較すると非常に貧弱なものですが、実際のアプリケーションの多くには充分ではないか予想しています。
DripではRindaで広げすぎた仕様を狭くして、最適化しやすい単純な仕組みに挑戦しています。Rindaはインメモリを中心にRuby的な豪華な世界を表現しました。これに対しDripでは永続化を前提として協調機構を考え直しより単純なもの目指しました。
この予想を検証するにはもっと多くのアプリケーションが必要ですね。
この二つの節ではQueue、Hashとの比較を通じてDripを説明してきました。単純な追記しかないストリームでもちょっと凝ったデータ構造が表現できそうです。多くのデータ構造においてeachが定義できるわけですから、世界のほとんどは一直線にならべることができるかもしれませんしね。
QueueやHashと比較してDripを説明しました。
**キーとブラウズ
ここではDripに格納されたデータをブラウズする方法を学びます。Dripでは全ての要素はwriteされた順に並んでいますから、Dripにおけるデータのブラウズは時間軸に沿って旅をするようなものです。
ブラウズに使うAPIのほとんどは注目点(カーソル)のキーを引数にとります。まずキーの規則を説明し、次にブラウズの実際を見ていきます。
***キー
Drip#writeすると、その要素に対応するキーが返ります。キーは単調増加の整数で、あたらしいキーはこれまでに格納されたどのキーよりも大きくなります。現在の実装ではキーは次のように計算されます。
>|ruby|
def time_to_key(time)
time.tv_sec * 1000000 + time.tv_usec
end
||<
キーは時刻から計算された整数です。64bitマシンにおいて(当面の間は)Fixnumです。usecの分解能しかありませんから、1 usecのうちに複数の要素を書き込める際に衝突が発生します。この場合、最も大きなキーより一つ大きな値が選択されます。
>|ruby|
# lastは最後の(最大の)キー
key = [time_to_key(at), last + 1].max
||<
0が最古のキーとなります。一番旧い要素を指定するときに思い出して下さい。
***ブラウズ
これまでの実験でread、read_tag、headを試しました。他にも次のようなAPIがあります。
- 未来方向へのブラウズ / read, read_tag, newer
- 過去方向へのブラウズ / head, older
DripのアプリケーションではこれらのAPIを使って時間軸を前後に移動します。タグをうまく使ってスキップすることもあります。
この節では、タグを使って任意の要素へシークしそこから順に読む操作を紹介します。
次の疑似コードは全ての要素を4つずつ読み出していく例です。kが注目点です。kをreadした要素の最後のキーとしながら繰り返すことで、要素を順に読んでいくことができます。
>|ruby|
while true
ary = drip.read(k, 4, 1)
...
k = ary[-1][0]
end
||<
この疑似コードをirbで分解しながら実行していきます。MyDripは動いていますか?この実験もMyDripを使います。まずirbからMyDripにテスト用のデータを書き込みます。
>||
ターミナル1
% irb -r my_drip --simple-prompt
>> MyDrip.write('sentinel', 'test1')
=> 1313573767321912
>> MyDrip.write(:orange, 'test1=orange')
=> 1313573806023712
>> MyDrip.write(:orange, 'test1=orange')
=> 1313573808504784
>> MyDrip.write(:blue, 'test1=blue')
=> 1313573823137557
>> MyDrip.write(:green, 'test1=green')
=> 1313573835145049
>> MyDrip.write(:orange, 'test1=orange')
=> 1313573840760815
>> MyDrip.write(:orange, 'test1=orange')
=> 1313573842988144
>> MyDrip.write(:green, 'test1=green')
=> 1313573844392779
||<
はじめに書いたのは実験を始めた時点を記録するための錨のような要素です。それ以降、オレンジ、オレンジ、青、緑、オレンジ、オレンジ、緑とオブジェクトをwriteしました。色の名前に対応したタグをつけてあります。
>||
ターミナル2
% irb -r my_drip --simple-prompt
>> k, = MyDrip.head(1, 'test1')[0]
=> [1313573767321912, "sentinel", "test1"]
>> k
=> 1313573767321912
||<
まず"test1"というタグをつけた錨の要素をキーを手に入れます。実験の出発点となります。headを使うのが良いでしょう。
次に錨の以降の要素を4つreadします。
>||
ターミナル2
>> ary = MyDrip.read(k, 4)
=> [[1313573806023712, :orange, "test1=orange"], [1313573808504784, :orange, "test1=orange"], [1313573823137557, :blue, "test1=blue"], [1313573835145049, :green, "test1=green"]]
||<
読めましたか?次に注目点を更新して、もう一度4つreadしてみましょう。
>||
ターミナル2
>> k = ary[-1][0]
=> 1313573835145049
>> ary = MyDrip.read(k, 4)
=> [[1313573840760815, :orange, "test1=orange"], [1313573842988144, :orange, "test1=orange"], [1313573844392779, :green, "test1=green"]]
||<
続きの3つの要素が返りました。これはk以降の要素が3つしかないからです。さらに読むとどうなるでしょう。みなさんの予想通り、readはブロックするはずです。
>||
ターミナル2
>> k = ary[-1][0]
=> 1313573844392779
>> ary = MyDrip.read(k, 4)
||<
別の端末からなにかwriteしてreadが動き出すか、確認します。
>||
ターミナル1
>> MyDrip.write('hello')
=> 1313574622814421
||<
解除されましたか?
次はread_tagを使ってフィルタする例を示します。注目点を巻き戻してもう一度実験です。
>||
ターミナル2
>> k, = MyDrip.head(1, 'test1')[0]
=> [1313573767321912, "sentinel", "test1"]
||<
注目点より新しいデータで、タグが'test1=orange'のものを4つ(最低でも2つ)readせよ、としてみましょう。
>||
>> ary = MyDrip.read_tag(k, 'test1=orange', 4, 2)
=> [[1313573806023712, :orange, "test1=orange"], [1313573808504784, :orange, "test1=orange"], [1313573840760815, :orange, "test1=orange"], [1313573842988144, :orange, "test1=orange"]]
||<
オレンジばかり、4つ手に入りました。
注目点を更新して、もう一度同じ操作をしてみます。
>||
>> k = ary[-1][0]
=> 1313573842988144
>> ary = MyDrip.read_tag(k, 'test1=orange', 4, 2)
||<
新しい注目点よりも後には、オレンジの要素が一つもありませんからブロックします。別の端末からオレンジを2つwriteすれば、このread_tagは動き出すでしょう。
>||
ターミナル1
>> MyDrip.write('more orange', 'test1=orange')
=> 1313575076451864
>> MyDrip.write('more orange', 'test1=orange')
=> 1313575077963911
||<
>||
ターミナル2
>> ary = MyDrip.read_tag(k, 'test1=orange', 4, 2)
=> [[1313575076451864, "more orange", "test1=orange"], [1313575077963911, "more orange", "test1=orange"]]
||<
ここではタグを使ったシークと、そこからのread、フィルタを使ったread_tagの例を示しました。Dripのデータの中をブラウズする際の基本となるイディオムです。
ほかにもいくつかユーティリティメソッドがあります。
>|ruby|
Drip#newer(key, tag=nil)
||<
keyより新しいものを一つ返します。tagを指定することもできます。newerはread/read_tagのラッパーです。新しい要素が無い場合ブロックせず、nilを返します。
>|ruby|
Drip#older(key, tag=nil)
||<
keyよりも旧いものを一つ返します。tagを指定することもできます。新しい要素が無い場合ブロックせず、nilを返します。
そうそう。普通過ぎてわすれていましたが、キーがわかっているときに対応する値を取り出すAPIもあります。
>|ruby|
Drip#[](key)
||<
>||
>> k, = MyDrip.head(1, 'test1')[0]
=> [1313573767321912, "sentinel", "test1"]
>> MyDrip[k]
=> ["sentinel", "test1"]
||<
値とタグからなる配列を返します。キーは返りません。
これで主要なAPIの説明は終わりです。そういえばRubyでよく見かけるeachはありませんでしたね。それについてはつぎのクドい話を読んで下さい。
**APIの設計指針
DripはdRubyと組み合わせて使うのを前提としてAPIを設計しました。dRubyの弱点はいくつかありますが、特に苦手なのはサーバ側のオブジェクトの寿命と排他制御の管理、そしてRMIの遅さです。サーバ側に状態をもつオブジェクトを作らないこと、RMIの回数を減らすことはAPIの選択の指針となります。
さきほどのreadメソッドに与えるキーについて、もう一度よく見てみましょう。readのキーは、データベース中の視点、カーソル、ページといった概念に近いものです。よくあるデータベースのAPIでは「カーソル」はコンテキストの中に隠されています。例えばRubyのFileオブジェクトは現在の位置を覚えていて、ファイル中のその位置から読んだり、その位置へ書いたりします。これに対し、DripではFileオブジェクトのような状態/コンテキストをもつオブジェクトを用いません。Dripへの質問は状態の変化を伴わない、関数のようになっています。位置などのコンテキストを管理するオブジェクトの代わりに、注目点となるキーを使うのです。このAPIを選択した理由は、コンテキストを管理するオブジェクトをDripサーバの中で生成しないためです。DripはdRubyを経由したRMIで利用されることを前提としています。生成と消滅(beginとend、openとclose)があるようなコンテキストを導入すると、その寿命をサーバが気にする必要が生まれます。分散環境でのGCといった難しい問題に向かい合わなくてはなりません。このため、Dripではそのような面倒を嫌ってInteger(キー)だけの付き合いとなるようにAPIを設計しました。
この節で示した通り、コンテキストを管理するオブジェクトを使う代わりに、readのたびに返されるキーを使ってアクセスすることで、同様な操作を実現できます。もしこのAPIでの操作が面倒と感じるなら、ローカルなプロセスの中でキーを隠すようなコンテキストを準備することを勧めます。間違ってDripサーバ側にコンテキストを用意しないよう注意して下さいね。
readでは、自分の知らない情報を一度に最大n個、少なくともm個を返せ、と指示します。n回のreadで構成すると、RMIの回数が増えてしまいますが、このように一度に転送すればRMIの回数を削減できます。応答時間よりも処理時間が重要なバッチ処理などのケースで有効です。「少なくともm個」を指定することで、イベントの(データの)発生の都度RMIを発生させずにすみます。ほどほどにデータがたまるのを待って一度に転送することができるからです。
Dripはストレージに関する一連の習作の経験から、「作りすぎない」ことに留意しました。「作る」ことは楽しいので、請われるままに機能を増やしてしまうことがしばしば起こります(私はそういう経験があります)。Dripのポリシーを明確にして、機能を増やしてしまう誘惑と戦いました。
**アプリケーション
***簡易検索システム
ここでは非常に小さな検索システムを作ります。検索システムのミニチュアの作成を通じてDripの応用のヒントとして下さい。
このシステムには主に三つのプロセスが登場します。自分のマシンにあるRubyスクリプトを探してはDripに登録するクロウラ、Dripへ登録されたファイルを検索のために索引をつけるインデクサ、そして中心となるMyDripサーバです。
***動かし方
この実験でもMyDripを使用しますので、事前にMyDrip.invokeするか、Windows環境では代替となるサーバを起動しておいて下さいね。
>||
$ irb -r drip -r my_drip
>> MyDrip.invoke
=> 45616
||<
今回のサンプルはDripのソースコードの中にも含まれています。まずはダウンロードしてみましょう。
>||
$ cd ~
$ git clone git://github.com/seki/Drip.git
$ cd Drip/sample/demo4book
||<
実際にcrawlerを動かす前にcrawler.rbの10行目に、検索したいディレクトリして下さい。
ファイル数が多いと実験に時間が非常にかかるので、少ないディレクトリを選んでください。500ファイル程度が実験しやすいのではないかと思います。今回はソースコードのディレクトリを指定しました。
>||
@root = File.expand_path('~/Drip/')
||<
以下のようにcrawl.rbを実行するとcrawlするごとにファイルの一覧が表示されます。
>||
$ ruby crawl.rb
["install.rb",
"lib/drip/version.rb",
"lib/drip.rb",
"lib/my_drip.rb",
"sample/copocopo.rb",
"sample/demo4book/crawl.rb",
"sample/demo4book/index.rb",
"sample/drip_s.rb",
"sample/drip_tw.rb",
"sample/gca.rb",
"sample/hello_tw.rb",
"sample/my_status.rb",
"sample/simple-oauth.rb",
"sample/tw_markov.rb",
"test/basic.rb"]
||<
次に別のターミナルでインデクサを起動し、探したい単語を入力すると、その単語が存在するファイル名を一覧として表示します。
ここでは「def」という単語を検索しています。起動してすぐはまだ索引が完全でないので、急いでなんども検索すると索引対象が増えていく様子を見られるかもしれません。
>||
$ ruby index.rb
def
["sample/demo4book/index.rb", "sample/demo4book/crawl.rb"]
2
def
["sample/drip_s.rb",
"lib/drip.rb",
"lib/my_drip.rb",
"sample/copocopo.rb",
"sample/demo4book/index.rb",
"sample/demo4book/crawl.rb"]
6
||<
クロウラは60秒置きに更新を調べるようになっています。標準入力からなにか入力すると、更新の合間を待ってから終了します。これは、一般的な検索システムのクロウラを模倣して、適度に休むようにしてあります。とくにWebページなど検索対象が広い場合などは頻繁な更新情報の収集にはムリがあります。
なお、クローラを休ませる時間を短くすればファイルを更新してすぐに索引に反映されるようになります。このクローラを改造していくことで、自分だけのちょっとしたリアルタイム検索ツールになるかもしれません。また最近のOSでしたらファイルの更新自体をイベントとして知ることができると思うので、そういった機構をトリガーとするのも面白いと思います。
ここからはソースコードを解説していきます。
***投入する要素
このシステムでDripに投入するオブジェクトとタグについて説明します。主に使用するのは「ファイル更新通知」です。
-ファイル更新通知 - ファイル名、内容、更新日の配列です。'rbcrawl'と'rbcrawl-fname=ファイル名'の二つのタグを持ちます。
クロウラは更新されたファイルを見つけるたびにこの情報をwriteします。これはファイル内容のアーカイブであると同時に、更新を通知するイベントになります。インデクサは更新通知を見つけるたびに索引を更新します。
補助的に利用するものもあります。
-クロウラの足跡 - ひとまとまりの処理のなかで更新したファイル名の一覧と、その時刻をメモします。'rbcrawl-footprint'というタグを持ちます。
-実験開始を示すアンカー - 'rbcrawl-begin'というタグを持ちます。何度か実験を繰り返しているうちにはじめからやり直したくなったらこのタグでなにかwriteしてください。
ではこれらのオブジェクトやタグがどのように使われているか見てみましょう
***クロウラ
簡易クロウラの動作を説明します。
>|ruby|
class Crawler
include MonitorMixin
def initialize
super()
@root = File.expand_path('~/develop/git-repo/')
@drip = MyDrip
k, = @drip.head(1, 'rbcrawl-begin')[0]
@fence = k || 0
end
def last_mtime(fname)
k, v, = @drip.head(1, 'rbcrawl-fname=' + fname)[0]
(v && k > @fence) ? v[1] : Time.at(1)
end
def do_crawl
synchronize do
ary = []
Dir.chdir(@root)
Dir.glob('**/*.rb').each do |fname|
mtime = File.mtime(fname)
next if last_mtime(fname) >= mtime
@drip.write([fname, mtime, File.read(fname)],
'rbcrawl', 'rbcrawl-fname=' + fname)
ary << fname
end
@drip.write(ary, 'rbcrawl-footprint')
ary
end
end
def quit
synchronize do
exit(0)
end
end
end
||<
まず、指定したディレクトリ(@root)以下にある*.rbのファイルを探します。そしてその更新時刻を調べ、新しいファイルを見つけたらその内容や時刻をwriteします。
これは実際には以下のようなデータを書き込んでいます。
>|ruby|
@drip.write(
["sample/demo4book/index.rb", 2011-08-23 23:50:44 +0100, "ファイルの中身"],
"rbcrawl", "rbcrawl-fname=sample/demo4book/index.rb"
)
||<
値はファイル名、時刻、ファイルの中身からなる配列で、それに対して二つのタグがついています。
クロウラは60秒置きに更新を調べるようになっています。標準入力からなにか入力すると、更新の合間を待ってから終了します。一回の処理で見つけたファイル名の配列を'rbcrawl-footprint'というタグをつけて覚えておきます。たとえば、以下のようなデータを書き込みます。
>|ruby|
@drip.write(["sample/demo4book/index.rb"], 'rbcrawl-footprint')
||<
このバージョンのクロウラはファイルの削除を追いかけませんが、この足跡情報を使えば削除を知ることができるかもしれません。
更新されたか否かは、headメソッドで一つ前のバージョンを探し比較して検査します。
'rbcrawl-fname=ファイル名'というタグでheadすることで、直前のバージョン(つまりDripに書かれている最新のバージョン)を調べることができます。
>|ruby|
k, v = @drip.head(1, "rbcrawl-fname=sample/demo4book/index.rb")[0]
||<
以下に完全なクロウラを載せます。
>|ruby|
require 'pp'
require 'my_drip'
require 'monitor'
class Crawler
include MonitorMixin
def initialize
super()
@root = File.expand_path('~/develop/git-repo/')
@drip = MyDrip
k, = @drip.head(1, 'rbcrawl-begin')[0]
@fence = k || 0
end
def last_mtime(fname)
k, v, = @drip.head(1, 'rbcrawl-fname=' + fname)[0]
(v && k > @fence) ? v[1] : Time.at(1)
end
def do_crawl
synchronize do
ary = []
Dir.chdir(@root)
Dir.glob('**/*.rb').each do |fname|
mtime = File.mtime(fname)
next if last_mtime(fname) >= mtime
@drip.write([fname, mtime, File.read(fname)],
'rbcrawl', 'rbcrawl-fname=' + fname)
ary << fname
end
@drip.write(ary, 'rbcrawl-footprint')
ary
end
end
def quit
synchronize do
exit(0)
end
end
end
if __FILE__ == $0
crawler = Crawler.new
Thread.new do
while true
pp crawler.do_crawl
sleep 60
end
end
gets
crawler.quit
end
||<
***インデクサ
このインデクサは索引の作成、更新と、検索そのものも提供します。指定した単語を含んでいるファイルの名前を返します。このサンプルは実験用のミニチュアなので、インメモリに索引を作ることにしました。rbtreeが必要ですが、Dripが動いているならrbtreeはインストールされていると思います。
>|ruby|
class Indexer
def initialize(cursor=0)
@drip = MyDrip
@dict = Dict.new
k, = @drip.head(1, 'rbcrawl-begin')[0]
@fence = k || 0
@cursor = [cursor, @fence].max
end
attr_reader :dict
def update_dict
each_document do |cur, prev|
@dict.delete(*prev) if prev
@dict.push(*cur)
end
end
def each_document
while true
ary = @drip.read_tag(@cursor, 'rbcrawl', 10, 1)
ary.each do |k, v|
prev = prev_version(k, v[0])
yield(v, prev)
@cursor = k
end
end
end
def prev_version(cursor, fname)
k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname)
(v && k > @fence) ? v : nil
end
end
||<
インデクサはDripから'rbcrawl'タグのついたオブジェクトを取り出し、その都度、索引を更新します。
>|ruby|
@drip.read_tag(@cursor, 'rbcrawl', 10, 1)
||<
第4引数の「1」に注目して下さい。先ほど「keyより新しい要素の数がat_least個に満たない場合は、新しいデータが追加されるまでブロックします」と説明したのを覚えていますか?一度に10個ずつ、最低でも1個ずつ返せ、という指示ですから返せる要素が一つもないときにはブロックします。
これによりクロウラが'rbcrawl'タグのデータを挿入するのをブロックしながら待ち合わせている事になります。
インデクサにとってrbcrawlタグのオブジェクトは更新イベントであると同時に文書でもあります。更新されたファイル名、更新時刻、内容がまとめて手に入ります。
また、DripはQueueとちがい、すでに読んだ要素を再び読むことが可能です。注目点の直前の要素を調べるolderなどで調べることが可能です。
>|ruby|
def prev_version(cursor, fname)
k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname)
(v && k > @fence) ? v : nil
end
||<
通知されたファイルに旧いバージョンの文書があった場合、インデクサは旧い内容を使って索引を削除してから、新しい内容で索引を追加します。
>|ruby|
def update_dict
each_document do |cur, prev|
@dict.delete(*prev) if prev
@dict.push(*cur)
end
end
||<
インデクサは起動されるとスレッドを生成してサブスレッドでDripからのread_tagと索引づけを行います。
>|ruby|
indexer ||= Indexer.new(0)
Thread.new do
indexer.update_dict
end
||<
メインスレッドではユーザーからの入力を待ち、入力されるとその単語を探して検索結果を印字します。
>|ruby|
while line = gets
ary = indexer.dict.query(line.chomp)
pp ary
pp ary.size
end
||<
以下に完全なインデクサを載せます。
>|ruby|
require 'nkf'
require 'rbtree'
require 'my_drip'
require 'monitor'
require 'pp'
class Indexer
def initialize(cursor=0)
@drip = MyDrip
@dict = Dict.new
k, = @drip.head(1, 'rbcrawl-begin')[0]
@fence = k || 0
@cursor = [cursor, @fence].max
end
attr_reader :dict
def update_dict
each_document do |cur, prev|
@dict.delete(*prev) if prev
@dict.push(*cur)
end
end
def each_document
while true
ary = @drip.read_tag(@cursor, 'rbcrawl', 10, 1)
ary.each do |k, v|
prev = prev_version(k, v[0])
yield(v, prev)
@cursor = k
end
end
end
def prev_version(cursor, fname)
k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname)
(v && k > @fence) ? v : nil
end
end
class Dict
include MonitorMixin
def initialize
super()
@tree = RBTree.new
end
def query(word)
synchronize do
@tree.bound([word, 0, ''], [word + "\0", 0, '']).collect {|k, v| k[2]}
end
end
def delete(fname, mtime, src)
synchronize do
each_tree_key(fname, mtime, src) do |key|
@tree.delete(key)
end
end
end
def push(fname, mtime, src)
synchronize do
each_tree_key(fname, mtime, src) do |key|
@tree[key] = true
end
end
end
def intern(word)
k, v = @tree.lower_bound([word, 0, ''])
return k[0] if k && k[0] == word
word
end
def each_tree_key(fname, mtime, src)
NKF.nkf('-w', src).scan(/\w+/m).uniq.each do |word|
yield([intern(word), mtime.to_i, fname])
end
end
end
if __FILE__ == $0
indexer ||= Indexer.new(0)
Thread.new do
indexer.update_dict
end
while line = gets
ary = indexer.dict.query(line.chomp)
pp ary
pp ary.size
end
end
||<
***クロウラの動作間隔とインデクサの同期
このサンプルで示したかったものの一つに、複数の処理が自分の都合のよいタイミングで動作するというものがあります。
クロウラは定期的に動作を開始します。クロウラはインデクサの状態など気にせずに処理を行い、更新を見つけてはwriteします。
インデクサも同様です。インデクサはクロウラの動作状況を気にせず、これまでDripに格納されていた文書をまとめて取り出しては索引の更新を行います。文書を処理し終わったら、新しい文書がwriteされるまで休眠状態になります。
データの流れとしては、クロウラが発生源で、Dripに蓄えられて、インデクサがそれを取り出し索引を作ります。しかし、クロウラが発生させた処理の中でインデクサが動作するわけではありません。たとえば、オブザーバーパターンでクロウラ→インデクサとコールバック等のメソッド呼び出しの連鎖のなかで索引更新が行われると想像してみてください。クロウラ側の更新を調べる処理は、索引の更新と直列に動作し律速してしまいます。
Dripにおけるイベントの通知は、受動的ではありません。リスナ側が自分の都合のよいときに能動的に行われます。このスタイルはアクターモデルともよく似ています。インデクサは自分の担当する仕事が一通り終わって、自分の状態が安定してから次の文書を取り出します。dRubyのRMIがサブスレッドにより気付かないうちに実行されるのと対照的ですね。
ややこしい喩え話はともかく、クロウラはインデクサの処理を待つことなく動きますし、インデクサはクロウラの処理の頻度と関係なく自分のペースで動きます。Dripはメッセージングのミドルウェアとして彼らの間をゆるく仲介します。
***フェンスと足跡
実験を繰り返していると、最初の状態からやり直したくなることがあるでしょう。Dripのデータベースを作り直せばやりなおせますが、でもMyDripはこのアプリケーション以外からも雑多な情報をwriteされているでそれは抵抗がありますよね。
そこでこのアプリケーションの始まりの点を閉めすオブジェクトを導入することに。'rbcrawl-begin'というタグを持つオブジェクトがあるときは、それよりも旧い情報を無視することで、それ以前のオブジェクトに影響されずに実験できます。@fenceはクロウラ、インデクサのどちらでも使っているので読んでみて下さい。
具体的にはolderやheadの際にそのキーをチェックして、@fenceよりも旧かったら無視することにします。
>||
=> MyDrip.write('fence', 'rbcrawl-begin')