Stormの内部実装を解説する資料確認してます(その5
こんにちは。
では、StormのTuple処理の肝であるMessage Passingについて読んでいきます。
Message処理実装
======
本ページでは、Emit/TransferされたTupleがどう動作するかを記述する。
■Workerがメッセージの転送について統括する。
1.Workerにおいて、設定値:task.refresh.poll.secs秒経過するかZooKeeper上のアサイン情報が更新するたびにrefresh-connectionsメソッドが呼び出される。
本メソッドは他Workerとの接続維持と、「task -> worker」のマッピングを維持する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L123)
2.Workerは他タスクへのTupleの送信を行う「通信機能」を提供する。
「通信機能」はTaskIDとTupleを受け取ってシリアライズを行い、「転送待ちキュー」にPushする。「転送待ちキュー」は各Worker毎に1個存在する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L56)
#Workerが保持するQueueはLinkedBlockingQueueです。そのため、後述されていますがTupleの送信はシングルスレッドで行われる模様。
3.2で実行されるシリアライザはスレッドセーフ
(https://github.com/nathanmarz/storm/blob/0.7.1/src/jvm/backtype/storm/serialization/KryoTupleSerializer.java#L26)
#ThreadResourceManagerという独自のロック機構を用いてWorker毎に1度に1つシリアライズが走るようになっているようです。
4.Workerは転送待ちキューからTupleを取得し他Workerに転送するスレッドを1つ保持する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L185)
5.Tupleは下記のprotocolインタフェースを通して転送される。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L185)
6.分散モード(クラスタ上で実行するモード)においては、protocolインタフェースはZeroMQを用いて実現する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/messaging/zmq.clj)
7.ローカルモード(同一プロセス内で完結するモード)においては、protocolインタフェースはメモリ上のJavaQueueを用いて実現する。
このモードについてはZeroMQのインストールされていない環境であっても動作する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/messaging/local.clj)
======
Worker1つにつき転送待ちキューと転送スレッドを各々1つしか保持しない関係上、
WorkerにTaskを詰め込みすぎると性能が発揮しきれない・・・ということになる可能性がありますね。
後、通信にZeroMQを用いるのか、Thriftを用いるのか微妙に曖昧だったんですが、
結論としては、「両方つかっている」という形と推測されます。
Thriftで送信/受信のインタフェースとストリームを制御し、そのProtocolとしてZeroMQという形のように見えます。
Thriftはアプリ層と、シリアライズ層が別個定義可能のため、シリアライズ層の通信先を
ZeroMQConnectionにするか、JavaQueueにするかの違い・・・と勝手に考えています。
どれだけThriftについて理解していないんだ、という突っ込みしかでてこない状態ですが(汗
ともあれ、この辺は追加で情報を見つけたらまたまとめます。
とりあえず、きりがいいので今回はここまでで、次はTaskでのTuple処理について。中々重そうです。
======