kinesis-storm-spoutのソースコードを読んでみる(その4
こんにちは。
前回初期化処理周りの確認が終わったので、今回はメッセージの取得→送信や、
Ack/Fail機構周りを見ていきます。
尚、実際に見てみるとわかりますが、かなり煩雑な記述です(汗
ですので、とりあえず概要を知りたい方は
下記のポイントを理解できていれば問題ないと思います。
- KinesisSpoutはKinesisから取得したメッセージを1件単位でメモリ上に保持しており、Fail時は再送を行う。
- KinesisSpoutは「どのシーケンスまで処理完了したか?」を保持しており、定期的にZooKeeperに保存している。
- 「どのシーケンスまで処理完了したか?」は確実に完了した値としている。
- そのため、それより先のシーケンスのメッセージが処理完了しているケースもある。
今回のキーとなるクラスはInflightRecordTrackerです。
InflightRecordTrackerは「CheckPoint」という値を保持しており、
ZooKeeperに保存する「どこまでShardを読み込んだか?」のチェックポイントとして用いています。
「CheckPoint」は「ここまでは処理が完了した。(=Ackが返った)」という値になっており、
障害発生時も「CheckPoint」から読み込めばメッセージの欠損は発生しない作りとなっています。
#欠損は発生しないものの、重複処理は発生しえます。
nextTuple
Stormから常時呼ばれ続けるメソッドです。
このメソッドでメッセージを取得して下流コンポーネントに送信する・・という動作を行います。
※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※ShardGetterが二重になっていてわかりにくいが、クラス図の通りBufferedGetter > KinesisShardGetterという入れ子になっている。
- StateManagerがShardGetterを保持しない場合はSleepして終了
- ShardGetterを次のものに切り替える
- Roundrobin方式でnextTupleメソッドを呼び出すことにGetterを切り替えている。
- StateManagerから対象のShardに対するリトライメッセージを保持している場合はリトライメッセージを送信メッセージとする。
- リトライメッセージの保持については後で詳細を確認する。
- リトライメッセージが存在しない場合はShardGetterからメッセージを取得する
- BufferedGetterはバッファリングされたメッセージが存在しない場合はKinesisShardGetterからメッセージを取得する。
- BufferedGetterはバッファリングされたメッセージが存在せず、Shardの最後まで読んでいた場合、空メッセージを返す。
- 「Shardの最後まで読んでいた」はShardがReshadingされた結果既にクローズされている場合のみ発生する。
- ConfigのIKinesisRecordSchemeを使用してStormメッセージを生成する。
- 下流のコンポーネントにStormメッセージを送信
- Ack/Failの際に用いられるキー値は「ShardId:シーケンス」となっている。
- StateManagerにStormメッセージを送信した旨を通知する。
- StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
- (リトライ実行フラグがONの場合のみ下記を実施)
- InflightRecordTrackerの保持するAck待ち内部マップに送信メッセージが保持されていなかった場合、シーケンスをキーとして保存する。
- ※Ack待ち内部マップはLinkedListのような前後関係を保持するHashMapとなっており、前後のRecordへのアクセスを効率化している。
- Ack待ち内部マップに保存する際、その時点の「最後メッセージ」を「前の要素」として紐づける。
- Ack待ち内部マップに保存する際に「最後メッセージ」として記録する。
- Ack待ち内部マップに保存する際に「最前メッセージ」が空の場合、「最前メッセージ」としても記録する。
- InflightRecordTrackerは送信メッセージがリトライだった場合、リトライ数をインクリメントする。
- InflightRecordTrackerは送信メッセージがリトライだった場合、送信メッセージのシーケンスをリトライキューから除去する。
- StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
やたらと処理が多いですが、これは「CheckPoint」のシーケンスを
Ack/Fail時に算出するために必要になっています。
あと、最前メッセージ/最後メッセージは内容的には下記です。
最前メッセージ=「その時点でAck待ちとなっている中でシーケンスが最も前のメッセージ」
最後メッセージ=「その時点でAck待ちとなっている中でシーケンスが最も後のメッセージ」
ack
メッセージに対してAckが返った場合に呼び出されるメソッドです。
※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※キー値からShardId、シーケンスを復元して用いている。
- StateManagerにackを通知する。StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
- InflightRecordTrackerの保持するAck待ち内部マップから「送信メッセージ」を取得する。
- (送信メッセージが取得できた場合のみ以後の処理を実施)
- 「送信メッセージ」のAckフラグをON
- Ack待ち内部マップから「前の要素」「次の要素」を取得し、下記の処理を実施。
- 「前の要素」が空の場合下記の処理を実施。
- 対象メッセージ=「再前メッセージ」の場合シーケンスを「CheckPoint」として設定
- 対象メッセージをAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
- 「次の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
- 「次の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 対象メッセージ=「再前メッセージ」の場合シーケンスを「CheckPoint」として設定
- 「前の要素」が空でない場合下記の処理を実施。
- 「前の要素」がAck済みの場合下記の処理を実施
- 「前の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
- 「前の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
- 「次の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
- 「次の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 「前の要素」がAck済みでない場合下記の処理を実施
- 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
- 対象メッセージをAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
- 「前の要素」がAck済みの場合下記の処理を実施
fail
メッセージに対してFailが返った場合に呼び出されるメソッドです。
※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※キー値からShardId、シーケンスを復元して用いている。
- StateManagerにfailを通知する。StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
- InflightRecordTrackerの保持するAck待ち内部マップから「送信メッセージ」を取得する。
- (送信メッセージが取得できた場合のみ以後の処理を実施)
- リトライ数がリトライ閾値未満だった場合、対象メッセージのシーケンスをリトライキューに追加する。
- リトライ数がリトライ閾値以上だった場合、リトライカウントアウトログを出力し、対象メッセージのシーケンスに対してack処理を実施
・・と見てみるとackの処理がやたらと複雑ですが、
結局の所は「送信メッセージ」に対してAckを受信したタイミングで
「前の要素」「次の要素」の状態に応じて下記のように処理をしています。
「削除」となったタイミングで、削除対象のメッセージ=「最前メッセージ」だった場合に、
「CheckPoint」を削除対象のメッセージのシーケンスまで進め、
その次のメッセージを「最前メッセージ」と扱っている処理を行っています。
#「前の要素」が存在しない場合は「送信メッセージ」が「最前メッセージ」となるため、
#下記の表にも「CheckPoint」が進む旨を明記しています。
「次の要素」---> 「前の要素」↓ |
Ack済み | Ack待ち | 未存在 |
---|---|---|---|
Ack済み | 「送信メッセージ」を削除 「前の要素」を削除 |
「送信メッセージ」を保持 「前の要素」を削除 |
「送信メッセージ」を保持 「前の要素」を削除 |
Ack待ち | 「送信メッセージ」を削除 | 「送信メッセージ」を保持 | 「送信メッセージ」を保持 |
未存在 | 「送信メッセージ」を削除 「次の要素」を削除 「CheckPoint」を「次の要素」のシーケンスに設定 |
「送信メッセージ」を削除 「CheckPoint」を「送信メッセージ」のシーケンスに設定 |
「送信メッセージ」を削除 「CheckPoint」を「送信メッセージ」のシーケンスに設定 |
・・と、処理自体はローカルにメッセージを保存しているため複雑でしたが、
動作の概要自体は非常にわかりやすいものでした。
これでこれまでより深く理解してKinesisSpoutを使うことができますね!