夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Kafkaのレプリケーション

こんにちは。

前回に続いて、Apache Kafkaのネタです。

今期の最新バージョンからレプリケーション機能が追加されたとのため、
レプリケーション機能の設計を実際に確認してみます。
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication

レプリケーション追加の大目的

Kafkaにレプリケーション機能を追加した目的はKafkaの持続性と可用性を高めるため。
KafkaにおいてPublishされたメッセージが失われることなく消費されることを保証したかった。
これらの問題はマシン障害、ソフトウェア障害等様々なエラーで発生する可能性がある。

レプリケーション機能追加において気にしたことは下記。

  • 1.持続性の度合いを設定可能

一つ一つのデータをロストしたくないアプリケーションにおいては書込みレイテンシがかかる代わりに高い持続性を持つように、
アプリケーションによってはそれよりもスピードを重視する構成にすることを可能にする。

  • 2.レプリカ管理の自動化

Brokerへのレプリカの配置を自動化し、容易にスケールアウトが可能にしたい。

これらを解決するために以下の問題点に対してきちんと対処する必要がある。

  1. Brokerに均等にレプリカを割り振るためにはどうするか?
  2. あるパーティションにおいてメッセージを全てのレプリカまで伝播させるのにはどうするか?

レプリカ配置

初期配置

トピックを作成するときのみ、配置は現状生存しているBrokerに基づいて決定される。(トピック作成用のコマンドを用いて作成される)
リバランスコマンドを実行することで再配置される。

まずはじめに、Broker一覧の初期セットを以下のようにして作成する。

create cluster with brokers broker-0, broker-1, broker2

別の管理APIを用いてトピックを作成する。

create topic topicX with 100 partitions

この結果、以下の情報がZooKeeperに登録される。

  1. Brokerのリスト
  2. トピックとパーティションの一覧からなるトピック一覧

よりよいロードバランシングのためにはトピックにパーティションを必要としている。
基本的にはパーティションはサーバ数より多く存在し、各々のトピックごとにBrokerに対して均等にパーティションを配置したい。
実際に配置を行うアルゴリズムとして、Brokerとパーティションのリストをソートして使用する。
n個のBrokerが存在した場合、i番目のパーティションはiのn剰余番目のBrokerに配置される。
あるパーティションの最初のレプリカは上記のアルゴリズムで算出されたBroker上に配置され、この時「好ましいレプリカ」と呼ばれる。

Brokerがダウンした場合、残ったBrokerに均等に負荷が配分されるようにしたい。
そのため、Broker iに割り振られたパーティションmが存在すると仮定する。
この際、パーティションkのj番目のレプリカは(i + j + k)のn剰余によって配置先が決定される。

以下の図は15個(p0〜p14)のパーティションと5個のBroker(b0〜b4)の例となる。
Broker b0がダウンした場合、p0、p5、p10のパーティションはb1〜b4の4個のBrokerから配分される。
各々のパーティションのレプリカがどこに配置されているかについてはZooKeeperに保存される。
#と言いつつ、図は添付されていません。

Brokerの動的追加

Kafkaクラスタは徐々にスケールすることを想定しているため、以下のAPIでBrokerを動的に追加することが可能。

alter cluster add brokers broker-3, broker-4

新たなBrokerが追加された場合、自動的にパーティションの再配置がおこなれる。
目標としては再配置中のBroker間の移動を最小化し、負荷を一定に保つこととなる。
その際、Kafkaではリバランスと以後に記述したアルゴリズムを実行するために独立したプロセスを使用している。

Brokerの削除

KafkaではBrokerクラスタの縮小にも対応している。
その際、以下のコマンドを使用する。

alter cluster remove brokers broker-1

上記のコマンドを実行した場合、broker-1のホスト上でリバランスプロセスがスタートする。完了後、broker-1はオフラインとなる。
同様に、上記のコマンドによってZooKeeper上のbroker-1のBrokerホスト情報もクリアされる。

データ複製

私たちはクライアントが非同期レプリケーションと同期レプリケーションの両方を選択可能にしたいと考えている。
非同期レプリケーションの場合、Kafkaにpublishしたメッセージは1つのレプリカに保存されると公開される。
同期レプリケーションの場合、全てのレプリカから応答が返った時点で公開される。このパターンが最も持続性に優れる。
Producerがメッセージを投入しようとした場合、Kafkaはメッセージを他の全レプリカに対しても伝播させる必要がある。
その際、以下の4点を決める必要がある。

  1. どのようにしてメッセージを伝播させるか
  2. いくつのレプリカにメッセージが伝播したらProducerにackを返すか
  3. レプリカがダウンしていた時にどうするか
  4. ダウンしていたレプリカが戻った時にどうするか

以後、まずは既存のレプリケーション戦略について説明する。
その後、Kafkaの非同期レプリケーション、同期レプリケーションの動作について説明する。

関連研究

primary-backupレプリケーションとquorum-basedレプリケーションというレプリカを同期するための一般的な2つの方法がある。
これらに共通する内容として指定された1レプリカがリーダー、それ以外のレプリカがフォロワーと呼ばれる。
全ての書き込み要求はリーダーを通過し、フォロワーに伝播される。

primary-backupレプリケーションにおいては書込み要求受信後、全てのレプリカに伝播するまでリーダーは待機する。
もしレプリカのうち1つがダウンした場合、ダウンしたレプリカを伝播対象から外して同期を維持する。
レプリカが復旧したら再度伝播対象に入り、リーダーに追い付くよう同期が行われる。
もしレプリカがf個存在した場合、primary-backupレプリケーションにおいてはf-1個までの障害に対応可能。

quorum-basedレプリケーションにおいては書込み要求受信後、過半数のレプリカに伝播するまでリーダーは待機する。
レプリカグループのサイズはレプリカのいくつかがダウンした場合にも変わらない。
もし2f+1のレプリカが存在した場合、quorum-basedレプリケーションにおいてはf個までの障害に対応可能。
リーダーがダウンした場合は新たなリーダーを選出するのにf+1個のレプリカが必要となる。

2つのアプローチの間には以下のトレードオフがある。

  • 1.quorum-basedはprimary-backupよりも遅延時間で有利。quorum-basedはどこか1つでGCが発生した場合に全体に遅延を及ぼさないが、primary-backupは全体を引きずる。
  • 2.同じレプリカ数が存在するとおいた場合、primary-backupの方がより多くの同時障害に対応可能。
  • 3.2の要素(同時複数障害)を考慮した場合はprimary-backupの方がうまく動作する。quorum-basedは同時障害が多数発生した場合復旧する際再度起動しなおす必要が出る。

上記の結果より同時障害に対応が容易なprimary-backupを採用する。
レプリカがダウンしたり一時的に遅くなった際にしゃっくりが発生する可能性があるが、これは稀な事象でありこの「しゃっくり」現象はタイムアウトパラメータの調整で低減することができる。

同期レプリケーション

Kafkaのレプリケーションは典型的なprimary-backupレプリケーション方式に従っている。
パーティションには、レプリカをnをしているとn-1のレプリカの障害に耐えることができる。
レプリカのうち1つがリーダーとして選出され、レプリカの残りの部分は、フォロワーとされる。
リーダーは完全にリーダーに同期が追いついたレプリカのセット(ISR)を同期レプリカのセットとして保持する。
パーティションごとにKafkaはZooKeeper上で現在のリーダーと現在のISRを保持する。

各レプリカはローカルログファイルにメッセージを保持し、いくつかのログ上の読み込みポジションを保持する。(図参照とありますが、やはりこのページに図はない・・・)
ログ末尾オフセット(LEO)はログの末尾を示す。
ハイウォーターマーク(HW)は、最後にコミットされたメッセージのオフセットを示す。
各ログは定期的にディスクに同期される。オフセットをフラッシュする前のデータはディスクに永続化されることが保証される。(?)
わかるとは思うが、オフセットフラッシュはHWの前または後にすることができる。

書込み

パーティションへメッセージを登録するには、Producerは最初にZooKeeperからパーティションのリーダーを見つけ、リーダーにメッセージを送信します。
(ですが、Kafka0.8.0においては実際はProducerはZooKeeperにアクセスはせず、保持するBrokerのリストを用いて送信を行っている。
 Brokerは一定時間ごとにProducerにZooKeeperから取得した構成情報を応答として返すことで構成を維持している模様。)

リーダーは、そのローカルログにメッセージを書き込む。
フォロワーは常に単一のソケットチャネルを使用してリーダーから新しいメッセージを取得する。
フォロワーは同じ順序ですべてのメッセージを受信する。フォロワーは、各々が保持するローカルログにそれぞれ受信したメッセージを書き込み、リーダーにackを返す。
リーダーがISR内のすべてのレプリカからのackを受信したタイミングでメッセージがコミットされる。

リーダーは進捗HWを保持し、Producerにackを返す。
パフォーマンスを向上させるため、それぞれのフォロワーはメッセージがメモリに書き込まれた後にackを返す。
そのためコミットされた各メッセージは、メッセージが複数ノードのメモリ上に複製されていることを保証する。
しかし、当然ながら任意のレプリカがディスクにコミットメッセージを持続しているという保証はない。
だが、相関障害は比較的まれであることを考えると、このアプローチは、Kafkaのレスポンスと対障害性の間の良好なバランスを提供する。
その上で、今後はさらに強力な保証を提供するオプションを追加することを検討している。

リーダーは定期的にすべてのフォロワーにHWをブロードキャストする。
放送はフォロワーからのフェッチ要求の戻り値に上乗せして返している。随時、各レプリカのチェックポイントHWもディスクに出力している。

読み込み

簡易化のため応答は全てリーダーから返す。HWの場所までのメッセージがConsumerに公開される。

障害シナリオ
  • フォロワーの障害

リーダーは設定したタイムアウト時間が経過した場合、障害が発生したフォロワーをISRから外し、残ったISRのフォロワーに対してレプリケーションを継続する。
フォロワーが復旧した場合、HW以後のログを全て除去し、HW以降のログをリーダーから受け取る。
HW以降のログを全て受け取って追いついた場合、リーダーは対象のフォロワーを再度ISRに加える。

  • リーダーの障害

リーダーの障害は下記の3パターンについて考える必要がある。
1.リーダーがローカルログにメッセージを保存する前にクラッシュした場合。その場合Producer側が失敗を検知可能なため、新たなリーダーに再送する。
2.リーダーがローカルログにメッセージを保存したが、Producerに応答を返す前にクラッシュした場合。
a.原子性を保証する必要性:全レプリカは受信したメッセージを全てローカルファイルに記述したか、または一切記述していない状態にする
b.Producerはメッセージを再送する。この際、システムは理想的には重複書き込みにならないか確認する必要がある。おそらく受信メッセージをローカルファイルに出力したフォロワーがリーダーとなるから。(?)
3.リーダーがProducerに応答を送信した後にクラッシュした場合:この場合、新しいリーダーが選出され、要求の受信を開始する。

これらのケースが発生する場合、新たなリーダーの選出には以下の手順を実行する必要がある。
1.ISR内の生き残ったレプリカはZooKeeperに自分のステータスを登録する
2.最初に登録したレプリカが新しいリーダーになる。新しいリーダーは新しいHWとしてLEOを選択する。
3.リーダーの更新が通知されるようにフォロワー側はZooKeeper側にリスナを登録しておく。
a.リーダー確定後、フォロワーは通知されたHW以後のメッセージを切り捨て、以後リーダーからのメッセージを受信する。
4.ISR内の全レプリカが追いつくか、設定された時間が経過するまでリーダーは待機する。リーダーはZooKeeperに現行ISRを書き込み、読出しと書込み処理を開始する。
(ISRが空の場合、任意のレプリカがリーダーになることができるため初期起動時に注意すること)

非同期レプリケーション

非同期レプリケーションをサポートするためにリーダーはローカルログに書き込み完了した段階でProducerを認識する必要がある(?)
注意点として、リーダーにフォロワーが追いつくキャッチアップフェーズにおいてHWより前のデータを切り捨ててしまう可能性がある。(?)
レプリケーションは非同期であるため、コミットしたメッセージは任意のレプリカの障害発生時に耐える保証はない。

未解決の問題

  1. リーダーの障害(2)においてどのように原子性を保証することができるか?
  2. 同時に同パーティションに対して複数リーダーが発生する事象をどうやったら防げるか?
  3. レプリカが複数のラック内にある場合は、どのように少なくとも一つのレプリカが別のラックに行くことを保証するか?

=====
と、こんな感じでした。
とりあえずどのように同期を行っているかは何となくわかってきた気がします。
ただ、当然ながら分散システム環境におけるレプリケーションには困難が伴うわけで、現状はまだ課題が残っている状態のようですね。
現状はまだベータ版ですが、いつ正式版になるかは楽しみなところです。

-