読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Kafka概要確認(その12 分散方式

こんにちは。今回は実際分散協調をどうやっているかという話。
ある意味分散システムとしてのコアです。

19.分散方式

以後、BrokerとConsumerの協調に用いるZookeeperのディレクトリ構成とアルゴリズムについて記述する。

  • Zookeeper上のディレクトリ構成判例

[xyz]という形で[]で囲った個所はトピック名やナンバーによって変動する値を示す。
そのまま"xyz"というZnodeが存在しているとは限らない。

例として、「/topics/[topic]」は/topicというディレクトリの配下にトピック名の名前を持つZnodeが存在することを示す。
同様に「[0...5]」と記述した場合は0、1、2、3、4・・・という形でZnodeが存在することを示す。

また、「->」は実際のZnodeの値を示すことに使用される。
例えば、「/hello -> world」と記述した場合、「/hello」というZnodeが存在し、内容に「world」を保持することを示す。

/brokers/ids/[0...N] --> host:port (ephemeral node)

BrokerIDレジストリにはConsumer側からBrokerを論理的に一意に識別できるIDを名称として持つZnodeを保持する。
(論理的に一意に識別できるID=論理IDは設定項目として指定する必要がある)
Brokerは起動時に「/brokers/ids/」配下に自分の論理IDを持つZnodeを登録する。
このように論理IDを用いる目的は、Brokerが物理的に別のノードに移動せざるを得ない状況になった場合に
Consumer側でそのことをハンドリングする必要がないようにしたいため。

論理ID登録時に既にそのIDが使用されていた場合はエラーとなる。
「ephemeral node」のため、これらのZnodeはBrokerプロセスが起動している間のみ維持され、終了すると消える。
これによってConsumer側はBrokerが落ちたことを検知可能。

  • Brokerトピックディレクトリ
/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

各Brokerは[topic]ディレクトリ配下にトピックに対して自分が保持しているパーティション数を登録する。
=====
ようやくこれでトピックとパーティションの包含関係が明確になりました・・・
今さらかよ、という突っ込みはあるとは思いますが^^;
=====

  • Consumer/Consumerグループ

Consumer側もZookeeper上に自分の状態について登録を行う。
登録を行う理由は、消費量の計算とトピック/パーティションごとに
どこまでメッセージを消費したかを示すオフセットを共有するため。

複数のConsumerはConsumerグループを構成し、グループであるトピックに対するデータを共同で使用することができる。
あるConsumerグループに所属するConsumerに対してIDである「group_id」を共有する。
例えば、"foobar"という名称のConsumerプロセスが3つのノードにまたがって実行される場合、
"foobar"というIDをこれらのConsumerプロセス群に対して与える。

このグループIDはConsumerプロセスの設定値として指定する。
この設定によってConsumerプロセスは自分の所属するグループを認識する。

Consumerグループに所属するConsumerプロセス達はあるトピックに対するパーティション
出来るだけ均等になるように分け合う。
パーティションは1Consumerプロセスによって消費される。

「group_id」の他にConsumerにはConsumerの識別のために「consumer_id」(UUID形式のホスト名)が一時的に割り当てられる。
「consumer_id」は下記のディレクトリ配下に保存される。

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

同一グループ内のConsumerは同一の「group_id」ディレクトリ配下に「consumer_id」のZnodeを作成することで登録する。
Znodeは のマップを保持する。
このIDは単純にグループ内でどれだけのConsumerが生きているかどうかの判別に用いられる。
BrokerIDと同様に「ephemeral node」のため、Consumerが落ちたらこのZnodeも消える。

  • Consumerのオフセットトレース

Consumerプロセスはオフセットの最大値を各パーティションごとにどこまでメッセージを消費したかを判別するために記録する。

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

BrokerのパーティションはConsumerグループ毎にある1Consumerによって消費される。
Consumerはメッセージの消費を開始する前に該当のパーティションに対してオーナーであることを確定させる必要がある。
オーナーであることを確定させるためにConsumerは下記のディレクトリに「ephemeral node」を作成する。

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
  • Broker起動時処理

Brokerは基本独立しているため、Brokerが保持している情報についてのみ登録を行う。
Brokerが起動するとBrokerのレジストリディレクトリ配下にZnodeを作成し、ホストとポートの情報を登録する。
Brokerはあわせて自らが保持しているトピックとパーティション一覧をBrokerトピックレジストリ配下に登録する。
新しいトピックが作成された場合はその都度レジストリ配下にトピックZnodeが追加される。

  • Consumer起動時処理

Consumer起動時、下記の処理が行われる

  1. 「consumer_id」を所属グループのディレクトリ配下に登録する
  2. ConsumerIDレジストリ配下に対してZookeeperWatcherを設定し、新しいConsumerの追加や削除を検知できるようにする。(Consumerグループ配下のConsumerの状態が変更された場合、グループ内の全Consumer内でリバランスが起動される)
  3. BrokerIDレジストリ配下に対してZookeeperWatcherを設定し、新しいBrokerの追加や削除を検知できるようにする。(BrokerIDレジストリ配下のBrokerの状態が変更された場合、グループ内の全Consumer内でリバランスが起動される)
  4. トピックフィルタを使用している場合、Brokerトピックディレクトリ配下に対してZookeeperWatcherを設定し、新しいトピックの追加を検知できるようにする。(トピックが追加された場合、それがトピックフィルタに合致するかの確認が行われ、合致する場合はグループ内の全Consumer内でリバランスが起動される)
  5. 起動Consumerが所属するConsumerグループに対してリバランスを強制実行する

ConsumerリバランスはあるConsumerグループ内でどのConsumerがどのパーティションを消費するかを同意するために行われる。
Consumerリバランスはグループ内のConsumerの追加削除、Brokerの追加削除をトリガとして行われる。
1トピック、1Consumerグループを見た場合、Brokerパーティションはグループ内のConsumer毎に均等に割り振られる。
パーティションは常に1つのConsumerによって消費されるという実装によってこれらのリバランスはシンプルになっている。
こうした理由として、1パーティションに対して複数のConsumerが紐づけられた場合、ロックアルゴリズムが必要となり、競合も発生するためである。
尚、パーティションの数よりConsumerの数の方が多い場合、いくつかのConsumerはデータを一切受信することができないためその点は注意。

Consumerリバランスの際、以下のアルゴリズムを用いてリバランスを行うことにより、1Consumerが接続しに行くBrokerの数を抑えている。
Consumerリバランスの際行われる処理は下記。

前提:

  1. PTはあるトピックに対するパーティションの全体集合をさす
  2. PTに含まれるパーティションはPi(iの値は1〜Tまで変動)と記述する
  3. CGはあるConsumerグループに含まれるConsumer全体集合をさす
  4. CGに所属するConsumerはCi(iの値は1〜Gまで変動)と記述する

リバランス処理:

  1. PTをソートする(結果、同一Broker上のパーティションは固まる)
  2. CGをソートする
  3. パーティション数(PT)/Consumer数(CG)を実行して値Nを算出
  4. パーティション i * N 〜 (i + 1) * N - 1 をConsumer iに対して割り振る
  5. Consumerが元々保持していたパーティションオーナレジストリを削除する
  6. Consumerがリバランス後に読み込むパーティションオーナレジストリを登録する

リバランスがあるConsumerで起動された場合、同一Consumerグループ内のConsumerも同時にリバランスを実行する必要がある

=====
とりあえず、これでKafkaの設計方針の資料は全て確認しました。
実際どんなものか、というのはわかった気がしますね。
最後に1投稿にこれまでの結果をまとめた投稿を用意し、その後はClojureとバランスとりながら実際にKafkaクラスタを構築することを行っていきます。
=====