夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

kinesis-storm-spoutのソースコードを読んでみる(その1

こんにちは。

今更感はありますが、Amazon Kinesisが最近個人的に熱くなり、改めて使い方を見ています。
aws lambdaでKinesisのイベントを取得することも出来ますし、色々組み合わせて使えそうなんですよね。

ただ、当然Kinesisは単体では使用できないため、
何かしらのデータ処理基盤と組み合わせる形になります。
というわけで(?)、kinesis-storm-spoutのソースコードを読んでいます。

とまぁ、前置きはさておき、今回は読んでみてわかったことの概要をまず項目で挙げてしまい、
次回以降きちんと構造を書いて中身を追っていくことにします。

わかったこと一覧は下記です。

  1. 初回起動時はKinesisの「最初から」「最後から」のどちらかを読出し場所として指定する。デフォルトは「最後から」
  2. 「どこまで読んだか?」はZooKeeperに保存し、2回目起動時以降はZooKeeperから読み込む
  3. KinesisのShard数変更時、Topologyをdeactivate>activateすればShardに対するSpoutの再割り振りが行われる(?)
1.初回起動時はKinesisの「最初から」「最後から」のどちらかを読出し場所として指定する。デフォルトは「最後から」

kinesis-storm-spoutは初回起動時はKinesis
「最初から(=保存されている中で一番古いレコード)」か、
「最後から(起動後に追加されたレコードのみ読む)」のどちらかを選んで読込みを開始します。
Kinesis自体がレコードは1日分保存しているため、「最初から」は1日前のレコードから読み込み始めるということになりますね。

2.「どこまで読んだか?」はZooKeeperに保存し、2回目起動時以降はZooKeeperから読み込む

これが重要な所で、kinesis-storm-spoutは
通常のKinesisApplicationのようにDynamoにどこまで読み込んだかを保存することはありません。
Kinesisにアクセスできる=Dynamoにも当然アクセスできる環境ですので、
何故そうしないのかはわかりませんが、現状はこういう動作になっています。

尚、ZooKeeperに状態を保存するのはデフォルトで60秒間隔。
Stormから常時呼び出されるTuple取得処理の中でZooKeeperに保存されます。

と、ここまで書いてわかるかとは思いますが、
複数のTopologyを起動してKinesisの読み込み状態を共有したい場合、
Zookeeperを共有しておく必要があるというわけですね。

このあたりは微妙に厄介かもしれません。

3.KinesisのShard数変更時、Topologyをdeactivate>activateすればShardに対するSpoutの再割り振りが行われる(?)

Kinesisはスケールアウト/スケールインでShard数が変動しますが、
kinesis-storm-spoutはレコード読込みをしながらShard数の変動を検知することはできません。

ではどうすればいいのかというと、
Topologyのdeactivate/activate処理において、KinesisのShardを取得しなおして
ZooKeeper上の状態を更新している処理があるため、それで適応しているように見えます。
・・ですが、いまいちこのあたりのコードが怪しいため、どこまできちんと動くかは微妙です。

尚、activate処理はTopologyを起動した際にも実行されるため、
Topologyの再起動でも実質同じことが行われることになります。

とりあえず、特徴的な所やkinesis-storm-spoutの固有事情的な所はこんな感じでしょうか。
次回以降クラス構成を起こして詳細を読んでみようと思います。
・・続くかは微妙ですが^^;

ただ、去年まではソースコードを読み込むなりしておきながら
どこにも残さずにしていたため、読んだ結果は何かしらの形で残していくかと。
それでは。