夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Stormの内部実装を解説する資料確認してます(その3

少し間が空いてしまいましたが、続きになります。

今回からはLifecycle of a topologyの章を確認していきます。
・・・とりあえず書いてみましたが、ひたすらべた書きになっているため、
一通り読み終わったらわかりやすくまとめる必要がありそうですね(汗

Topologyのライフサイクル

=======
本ページでは「storm jar」コマンドを実行して
TopologyをNimbusにコミットしてから終了するまでの詳細を説明する。
また、NimbusがTopologyをどうモニタリングしているかの説明も行う。

■まずはじめに、Topologyについての重要事項を2点押さえておいてもらいたい。

1.実際にStormクラスタ上で走るTopologyはユーザが定義するTopology定義と異なる。
  実際のTopologyは暗黙的にAcker(メッセージ応答確認用のBolt)と、Ackerのメッセージをやり取りするためのStreamを保持するため。
  これらの暗黙的なTopology要素はcommon.clj#system-topology!にて実行される。
2.system-topology!は下記の2つの場所で使用される。
  A.NimbusがTopology用のTaskを生成するタイミング(nimbus.clj#mk-task-component-assignments)
  B.Workerがメッセージのルーティング情報を取得するタイミング(worker.clj#mk-topology-context)

Topology起動

■「storm jar」コマンドは指定された引数を基にTopology起動クラスを実行する。
 「storm jar」コマンドの他の起動方法との異なる点としては、
 コマンドライン上で依存ライブラリを読み込む「storm.jar」オプションを指定可能となっていること。
 指定することで、TopologyをSubmitする際に追加のJarとしてNimbusにアップロードされる。

■TopologyをSubmitするStormSubmitterの動作は下記のとおりとなる。
1.はじめに、依存ライブラリがアップロードされていない場合、アップロードを行う。(対象コード)
2.JarのアップロードはNimbusThriftインタフェース:beginFileUploadを用いて行われる。
3.NimbusThriftインタフェース:beginFileUploadはNimbusの内部パスを返す。
4.NimbusThriftインタフェース:uploadChunkを用いて15キロバイトずつ分割アップを行う
5.アップ完了後、NimbusThriftインタフェース:finishFileUploadが呼び出される
6.これらのNimbusThriftインタフェースはnimbus.cljに記述されている
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L694)
7.Jarファイルアップ終了後、StormSubmitterはNimbusThriftインタフェース:submitTopologyを呼び出す
(https://github.com/nathanmarz/storm/blob/0.7.1/src/jvm/backtype/storm/StormSubmitter.java#L60)
8.Topologyの設定値はJSON形式でシリアライズされる
9.StormSubmitterはsubmitTopologyをNimbus上にアップロードされたJarファイルパスと同時に呼び出す

NimbusはNimbusThriftインタフェース:submitTopologyを用いてTopologyのSubmitを受ける
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L639)

NimbusはTopologyの設定値を復号する。
復号を行う意図としては、各Taskが属するSupervisorに関わらず、
このタイミングでSubmitされた設定値を用いるためである。
この時点の復号化によって、再シリアライズを行うことを許容する。
#いまいち意図が読み取れませんでした。
#競合が発生した場合への対処のように読み取れますが・・・?

NimbusはTopologyの静的な状態を設定する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L661)
#・・・ここまで集約されているといまいち何やっているかわかりませんでした(汗

1.Jarファイルと設定値はローカルファイルに保持される。(ZooKeeperに保持するにはサイズが大きいため)
  保持されたJarファイルと設定値は{nimbus local dir}/stormdist/{topology id}配下にコピーされる。
2.setup-storm-staticメソッドにてTask->コンポーネントマッピングをZooKeeperに出力する。
3.setup-heartbeatsメソッドにてZookeeperにHeartBeat用のディレクトリを作成する。

Nimbusはmk-assignmentメソッドにて各マシンに対してTaskをアサインする。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L458)

1.アサイン定義は「master-code-dir node->host task->node+port task->start-time-secs」となっている。
#ノードがWorkerなのか、Supervisorなのか微妙なところではありますが・・・・
#ただ、ポート番号とペアで語られるところから、おそらくWorkerのように見えます。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/common.clj#L25)
2.各アサインのマッピング内容は下記の通り。
  −master-code-dir SupervisorがNimbusからJarファイル/設定値を取得するために用いられる
  −task->node+port タスクIDからWorkerへのマッピング(タスクがどのWorkerで走るべきか?)
           (WorkerはNode/Portのペア値で定義)
  −node->host ノードIDからホスト名のマッピング
        これは各Workerが他Workerとの通信を行うための定義となる。
        ノードIDはSupervisorの定義に用いられる。ノードIDがあるため、1マシン上で複数のSupervisorで動作させることが可能となる。
        他にも、Mesosとの統合に用いられる。
#Mesosのようなリソース管理プラットフォームと統合する際に、Supervisorが1マシン上で1台しか走らないと不都合があるよう・・・にみえます。
#こちらはMesos側も調べてみる必要がありそうですね。
  −task->start-time-secs タスクID>Nimbusが該当のタスクを開始した時刻のマッピング
              この値はNimbusがTask起動タイムアウト検知用のHeartBeatに用いる。(設定値:nimbus.task.launch.secs)

■一度Topologyがアサインされた後、Topologyの初期状態は「非Active」の状態となる。
 start-stormメソッドにて、対象のTopologyがActiveかの状態が検知できる情報/TupleをSpoutからemit開始するための情報がZooKeeper上に書き込まれる。
 (https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L504)
 #書込み後、activate-stormメソッドが呼ばれるため、このメソッドによって「Active」の状態となるようです。

※TODO 今後Clusterの状態遷移図を書く予定
#・・・と、コメントのように残されていました。

■Supervisorは下記の2メソッドをバックグラウンドで実行する。
1.synchronize-supervisor Zookeeperの状態が変わるか、10秒経過する度に実行される
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/supervisor.clj#L241)
  −NimbusからTaskをアサインされたマシンがTopology動作に必要なコードをまだ取得していない場合取得
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/supervisor.clj#L258)
  −ローカルファイルにノードの動作状態を書き出す。
   書き出す情報は「ポート>LocalAssignment」のマップ。
   LocalAssignmentにはTopologyIDと、対象のWorkerに割り振られたTaskIDが含まれる。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/supervisor.clj#L13)
#前にも記述されている通り、WorkerはNode/Portで特定されます。
#LocalのためNodeは確定、そのためPortでWorkerを識別しているようです。

2.sync-processes ローカルファイルシステム(LFS)からsynchronize-supervisorメソッドで出力された情報を読み込み、
         ローカルマシンの実際の動作状態と一致しているかを確認する。
         LocalAssignmentの情報とWorkerプロセスが起動/終了したタイミングで同期する必要がある。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/supervisor.clj#L177)

■Workerプロセスはmk-workerメソッドを用いて起動する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L67)
1.Workerプロセスは他Workerプロセスとの接続し、変化を検知するスレッドを起動する。
  もしWorkerが再アサインされた場合、Workerプロセスは自動的に他Workerの移動先に対して再接続を行う。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L123)

2.変数「storm-active-atom」を確認し、Topologyが「Active状態」かどうかを確認する。
  この変数はSpoutがnextTupleメソッドを呼び出すか否かの判断に用いられる。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L155)

3.Workerは実Taskをスレッドとして起動する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L178)
#Storm0.8.0以降はExecutorがTaskを実行するため、事情は微妙に変わってきそうではあります。

■Taskはmk-taskメソッドを用いて起動する。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L160)
1.TaskはStreamを乗せ、Tuple出力/応答や通信先のTaskIDを取得するための通信経路を確立する。
(DirectStreamの例:https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L207)

2.TaskはSpout固有/Bolt固有の設定値の設定を行う。
(https://github.com/nathanmarz/storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L241)
======

・・・と、ひたすら読んでみましたが、やはりClojureが読めないと詳細な動作はわからない・・・
となっています。
ともあれ、今回はここまで。次回はTopologyモニタリングとTopology終了について読んでみます。