Stormの内部実装を解説する資料確認してます(その6
こんにちは。引き続きメッセージ処理のページを読み進めます。
今回はTaskがメッセージを受信したときの動作について。
Message処理実装
======
■Taskがメッセージ受信時に行う動作はローカルモードか、分散モードかで異なる
1.ローカルモードにおいては、送信先Task受信用のメモリ上のキューにPushされる
2.分散モードにおいては、各Workerが1個のTCPポートでメッセージの到着を待ち受けており、各Task用のメモリ上のキューに配信する。
このTCPポートは『Virtual Port』と呼ばれる。なぜなら、TCPポートにおいて『Task ID,メッセージ』のペアを受信し、そのペアを実際のTaskに配信するという動作となっているため。
(htthttps://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L204)
#ディスパッチャのような動作をしているからVirtual Port?・・・というのは微妙な気はしますが。
#ただ、ローカルモードと分散モードの両方に利用するということを考えれば仮想的とおいておくのはありだとは思います。
−Virtual Portの実現形態は下記の通り
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/zilch/virtual_port.clj)
#Clojureです。読めません・・・
−TaskではインメモリZeroMQを用いてVirtual Portから配信されるメッセージを待ち受ける
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L201)
#こちらもさっぱり・・・
―Boltのメッセージ受信部は下記の通り
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L489)
#Tuple処理開始時の時間を保持した上で、Boltのexecuteメソッドに引き渡しているようです。
―Spoutのメッセージ受信部は下記の通り
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L382)
#受信するのは当然Ack系のメッセージとなるので、処理時間を記録したうえで、
#AckかFailかの判定を行っているようです。判定基準は受信元ストリームIDのようですね・・
======
徐々に解説が無くなり、コードがしめされるのみとなりつつあります(汗
やはりClojureか!Clojureなのか・・・!
======
■Taskはメッセージルーティングを統括している。TaskからemitされたTupleはDirectStream(TaskID指定)か、RegularStreamにemitされる。
DirectStreamにおいては、Tupleは単に指定されたTaskに対して送信されるだけである。
RegularStreamにおいては、Tupleの送信先Taskを決定するためにStreamGroupingが起動される。
1.Taskは各々メッセージルーティングマップ({stream id} -> {component id} -> {stream grouping function})を保持する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L198)
#ここでのcomponentとは、WordCountBoltのようなクラスをさします。Topology初期化の時にComponentとStreamGroupingをペアで指定しているからですね。
2.tasks-fnメソッドはTaskIDとStreamの種類、Tupleの値を指定し、送信先のTaskIDリストを返す。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L207)
3.送信先のTaskID取得後、Bolt/SpoutはWorkerのtransfer-fnメソッドを用いて実際にTupleを送信する。
―Bolt用の送信メソッドは下記
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L429)
#付与情報も全て含めたTupleを作成して、それで送信している形のようです。
―Spout用の送信メソッドは下記
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L329)
======
とりあえず、概要はわかったような気もしますが、詳細なところは全てClojureという形でした。
Clojureを勉強しなければ、というモチベーションだけは燃え上がるに十分なドキュメントでしたね!(おい
尚、このドキュメントは0.7.1の時点で記述された者のため、0.8.0系とは違う可能性もあります。
そのため、あくまで参考レベルになってしまいそうですが、理解には間違いなく役立つはず・・・