夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

kinesis-storm-spoutのソースコードを読んでみる(その4

こんにちは。

前回初期化処理周りの確認が終わったので、今回はメッセージの取得→送信や、
Ack/Fail機構周りを見ていきます。
尚、実際に見てみるとわかりますが、かなり煩雑な記述です(汗

ですので、とりあえず概要を知りたい方は
下記のポイントを理解できていれば問題ないと思います。

  1. KinesisSpoutはKinesisから取得したメッセージを1件単位でメモリ上に保持しており、Fail時は再送を行う。
  2. KinesisSpoutは「どのシーケンスまで処理完了したか?」を保持しており、定期的にZooKeeperに保存している。
  3. 「どのシーケンスまで処理完了したか?」は確実に完了した値としている。
    • そのため、それより先のシーケンスのメッセージが処理完了しているケースもある。


今回のキーとなるクラスはInflightRecordTrackerです。
InflightRecordTrackerは「CheckPoint」という値を保持しており、
ZooKeeperに保存する「どこまでShardを読み込んだか?」のチェックポイントとして用いています。

「CheckPoint」は「ここまでは処理が完了した。(=Ackが返った)」という値になっており、
障害発生時も「CheckPoint」から読み込めばメッセージの欠損は発生しない作りとなっています。
#欠損は発生しないものの、重複処理は発生しえます。

nextTuple

Stormから常時呼ばれ続けるメソッドです。
このメソッドでメッセージを取得して下流コンポーネントに送信する・・という動作を行います。

※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※ShardGetterが二重になっていてわかりにくいが、クラス図の通りBufferedGetter > KinesisShardGetterという入れ子になっている。

  1. StateManagerがShardGetterを保持しない場合はSleepして終了
  2. ShardGetterを次のものに切り替える
    1. Roundrobin方式でnextTupleメソッドを呼び出すことにGetterを切り替えている。
  3. StateManagerから対象のShardに対するリトライメッセージを保持している場合はリトライメッセージを送信メッセージとする。
    1. リトライメッセージの保持については後で詳細を確認する。
  4. リトライメッセージが存在しない場合はShardGetterからメッセージを取得する
    1. BufferedGetterはバッファリングされたメッセージが存在しない場合はKinesisShardGetterからメッセージを取得する。
    2. BufferedGetterはバッファリングされたメッセージが存在せず、Shardの最後まで読んでいた場合、空メッセージを返す。
      1. 「Shardの最後まで読んでいた」はShardがReshadingされた結果既にクローズされている場合のみ発生する。
  5. ConfigのIKinesisRecordSchemeを使用してStormメッセージを生成する。
  6. 下流のコンポーネントにStormメッセージを送信
    1. Ack/Failの際に用いられるキー値は「ShardId:シーケンス」となっている。
  7. StateManagerにStormメッセージを送信した旨を通知する。
    1. StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
      1. (リトライ実行フラグがONの場合のみ下記を実施)
      2. InflightRecordTrackerの保持するAck待ち内部マップに送信メッセージが保持されていなかった場合、シーケンスをキーとして保存する。
      3. ※Ack待ち内部マップはLinkedListのような前後関係を保持するHashMapとなっており、前後のRecordへのアクセスを効率化している。
      4. Ack待ち内部マップに保存する際、その時点の「最後メッセージ」を「前の要素」として紐づける。
      5. Ack待ち内部マップに保存する際に「最後メッセージ」として記録する。
      6. Ack待ち内部マップに保存する際に「最前メッセージ」が空の場合、「最前メッセージ」としても記録する。
      7. InflightRecordTrackerは送信メッセージがリトライだった場合、リトライ数をインクリメントする。
      8. InflightRecordTrackerは送信メッセージがリトライだった場合、送信メッセージのシーケンスをリトライキューから除去する。

やたらと処理が多いですが、これは「CheckPoint」のシーケンスを
Ack/Fail時に算出するために必要になっています。
あと、最前メッセージ/最後メッセージは内容的には下記です。
最前メッセージ=「その時点でAck待ちとなっている中でシーケンスが最も前のメッセージ」
最後メッセージ=「その時点でAck待ちとなっている中でシーケンスが最も後のメッセージ」

ack

メッセージに対してAckが返った場合に呼び出されるメソッドです。

※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※キー値からShardId、シーケンスを復元して用いている。

  1. StateManagerにackを通知する。StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
  2. InflightRecordTrackerの保持するAck待ち内部マップから「送信メッセージ」を取得する。
  3. (送信メッセージが取得できた場合のみ以後の処理を実施)
  4. 「送信メッセージ」のAckフラグをON
  5. Ack待ち内部マップから「前の要素」「次の要素」を取得し、下記の処理を実施。
  6. 「前の要素」が空の場合下記の処理を実施。
    1. 対象メッセージ=「再前メッセージ」の場合シーケンスを「CheckPoint」として設定
      1. 対象メッセージをAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
    2. 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
      1. 「次の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
      2. 「次の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
  7. 「前の要素」が空でない場合下記の処理を実施。
    1. 「前の要素」がAck済みの場合下記の処理を実施
      1. 「前の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
      2. 「前の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
      3. 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
        1. 「次の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
        2. 「次の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
    2. 「前の要素」がAck済みでない場合下記の処理を実施
      1. 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
        1. 対象メッセージをAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。

fail

メッセージに対してFailが返った場合に呼び出されるメソッドです。

※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※キー値からShardId、シーケンスを復元して用いている。

  1. StateManagerにfailを通知する。StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
  2. InflightRecordTrackerの保持するAck待ち内部マップから「送信メッセージ」を取得する。
  3. (送信メッセージが取得できた場合のみ以後の処理を実施)
  4. リトライ数がリトライ閾値未満だった場合、対象メッセージのシーケンスをリトライキューに追加する。
  5. リトライ数がリトライ閾値以上だった場合、リトライカウントアウトログを出力し、対象メッセージのシーケンスに対してack処理を実施


・・と見てみるとackの処理がやたらと複雑ですが、
結局の所は「送信メッセージ」に対してAckを受信したタイミングで
「前の要素」「次の要素」の状態に応じて下記のように処理をしています。

「削除」となったタイミングで、削除対象のメッセージ=「最前メッセージ」だった場合に、
「CheckPoint」を削除対象のメッセージのシーケンスまで進め、
その次のメッセージを「最前メッセージ」と扱っている処理を行っています。
#「前の要素」が存在しない場合は「送信メッセージ」が「最前メッセージ」となるため、
#下記の表にも「CheckPoint」が進む旨を明記しています。

「次の要素」--->
「前の要素」↓
Ack済み Ack待ち 未存在
Ack済み 「送信メッセージ」を削除
「前の要素」を削除
「送信メッセージ」を保持
「前の要素」を削除
「送信メッセージ」を保持
「前の要素」を削除
Ack待ち 「送信メッセージ」を削除 「送信メッセージ」を保持 「送信メッセージ」を保持
未存在 「送信メッセージ」を削除
「次の要素」を削除
「CheckPoint」を「次の要素」のシーケンスに設定
「送信メッセージ」を削除
「CheckPoint」を「送信メッセージ」のシーケンスに設定
「送信メッセージ」を削除
「CheckPoint」を「送信メッセージ」のシーケンスに設定

・・と、処理自体はローカルにメッセージを保存しているため複雑でしたが、
動作の概要自体は非常にわかりやすいものでした。
これでこれまでより深く理解してKinesisSpoutを使うことができますね!