読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

kinesis-storm-spoutのソースコードを読んでみる(その3

こんにちは。

とりあえず前回で大体の構造がわかったので、
今後はStormで基本となる下記の処理単位ごとに実際に行われる処理を見てみます。
今回は初期化/有効化/停止周りまで。

コンストラクタ

Stormではコンストラクタと、Topologyを生成するクラスの中で呼ばれる各種メソッドでSpout/Boltにパラメータを設定します。
基本的にこのタイミングで設定可能なのはシリアライズ可能なオブジェクトのみですが、
KinesisSpoutは例外的にSerializationHelperというクラスによって
シリアライズ可能でないAWSの固有クラス(CredentialProvider等)がシリアライズ化され、Workerプロセスに配分されます。

コンストラクタで指定可能なのは下記の3要素です。

  1. KinesisSpoutConfig
  2. AWSCredentialsProvider
  3. ClientConfiguration

KinesisSpoutの動作設定と認証情報、接続情報が設定可能ということになりますね。

open

Workerプロセスに配信されて復元された後に呼び出されるメソッドです。
openメソッドの中で実行されている処理は下記でした。

  1. KinesisSpoutの状態管理クラス(ZookeeperStateManager)の生成

activate

openメソッドの後、Active状態で起動したTopologyや、
またはDeactivate状態で起動した後Activateイベントを受信したTopologyで呼び出されるメソッド

  1. KinesisSpoutの状態管理クラス(ZookeeperStateManager)のactivate
    1. ShardListGetterを用いてShard一覧を取得
      1. Kinesis:DescribeStreamRequestを実行(Shardを取得しきるまで)
    2. ZooKeeper上(/kinesis_storm_spout/【TopologyName】/【StreamName】/shardList)にShardId一覧を保存する
      1. 但し、既にShardId一覧が保存されている場合は内容に関わらず、更新されない。
    3. ZooKeeper(/kinesis_storm_spout/【TopologyName】/【StreamName】/shardList)の監視を開始
      1. ただ、内容に関わらず更新されないため、実質Watcherが起動することはないと思われる。
  2. KinesisSpoutの状態管理クラス(ZookeeperStateManager)のrebalance
    1. 「どこまでShardを読んだか?」を読込み対象のShard毎にZooKeeperに保存
    2. ZooKeeper上のShardId一覧から自分の読み込み対象となるShardIdを取得
      1. 読込み対象となるShardIdに対応する「どこまでShardを読んだか?」をZooKeeperから取得。取得できない場合は読み込みシーケンスを空で生成。
      2. 読込み対象はZooKeeper上のShardIdが基準となるため、Reshard後はZooKeeper上のShardIdと実際のShardIdがずれ、一切読み込めなくなる。
    3. 読込み対象となるShardIdに対応するShardGetterを生成
    4. ShardGetterをループするイテレータを生成し、RoundrobinでGetterからデータを取得する準備を行う。
  3. ZooKeeperへの状態保存時刻を現在時刻で初期化

deactivate

  1. KinesisSpoutの状態管理クラス(ZookeeperStateManager)のdeactivate
    1. 「どこまでShardを読んだか?」を読込み対象のShard毎にZooKeeperに保存

・・・ここまで読んだ時点でかなり萎える事実が判明していますね(汗
KinesisSpoutはKinesisのReshardingには一切対応しておらず、かつ
Reshardingを行うとZooKeeper上に保存された古いShardIdに対して読み込みを行い続けるため、
ZooKeeperの状態を消さない限りその後動作しなくなる
、というもののようです。

尚、ご丁寧にソースコードには下記のように対応していない旨が明記されていました。とほほ。
「ZooKeeper上(/kinesis_storm_spout/【TopologyName】/【StreamName】/shardList)にShardId一覧を保存する」の個所の処理

/**
 * Initialize the shardList in ZK. This is called by every spout task on activate(), and ensures
 * that the shardList is up to date and correct.
 *
 * @param shards  list of shards (output of DescribeStream).
 * @throws Exception
 */
void initialize(final ImmutableList<String> shards) throws Exception {
    NodeFunction verifyOrCreateShardList = new NodeFunction() {
        @Override
        public byte[] initialize() {
            LOG.info(this + " First initialization of shardList: " + shards);
            ShardListV0 shardList = new ShardListV0(shards);
            ObjectMapper objectMapper = new ObjectMapper();
            byte[] data;
            try {
                data = objectMapper.writeValueAsBytes(shardList);
            } catch (JsonProcessingException e) {
                throw new KinesisSpoutException("Unable to serialize shardList " + shardList, e);
            }
            return data;
        }

        @Override
        public Mod<byte[]> apply(byte[] x) {
            // At this point, we don't support resharding. We assume the shard list is valid if one exists. ★対応していない宣言★
            LOG.info("ShardList already initialized in Zookeeper. Assuming it is valid.");
            return Mod.noModification();
        }
    };

    atomicUpdate(SHARD_LIST_SUFFIX, verifyOrCreateShardList);
}

ただ、KinesisSpoutのAck/Fail機構はそれはそれで面白そうなので、
中身を追うのは継続してみようとは思います。