Apache Kafka概要確認(その8 Producerプロセス
こんにちは。ついに設計の大方針の内容が終わり、
個々のプロセスの設計の内容に入っていくようです。
11.Producer
- Producerプロセスの自動ロードバランシング
Kafkaは、ProducerのTCP接続のバランスをとるための専用ロードバランサを使用するために
クライアントサイドのロードバランシングをサポートしている。
専用のレイヤ4ロードバランサはBrokerプロセス経由でTCP接続のバランスを取ることによって動作する。
この構成の場合、Producerプロセスが生産したメッセージは全て単一のBrokerプロセスに集約される。
レイヤ4ロードバランサを使う利点は各Producerプロセスは単一のTCPコネクションのみを必要とし、
Zookeeperへの接続を行う必要がないことである。
欠点はロードバランシングがTCP接続レベルで行われており、結果として上手くバランシングが行われないかもしれない点。
もしいくつかのProducerプロセスが他のものより多くのメッセージを生成する場合、
コネクションを均等にはったとしても均等にメッセージが割り振られるわけではなくなる。
クライアントサイドのZookeeperベースのロードバランシングは、これらの問題のいくつかを解決する。
ZookeeperベースのロードバランシングはProducerプロセスに対して下記2機能を提供する。
- 動的なBrokerプロセス発見
- リクエスト毎のロードバランシング
同様に、ZookeeperベースのロードバランシングはProducerプロセスに対して
単なるランダムでは無いメッセージのIDや利用するユーザIDに応じたバランシングを可能にする。
この機能を「セマンティックパーティショニング」と呼ばれる。
以下でより詳細に説明する。
Zookeeperベースのロードバランシングの動作は下記の通り。
ZookeeperWatherは下記のイベントに対してコールバックを受け取ることができる。
- 新規Brokerプロセス追加
- Brokerプロセスがダウン
- 新規トピックの追加
- Brokerプロセスが既存トピックに対して登録
内部的にはProducerプロセスはBrokerプロセスへの接続プールをBrokerプロセス毎に保持する。
この接続プールはZookeeperからのコールバックを受け、
全ての生存Brokerプロセスへの接続の確立/維持を行っている。
特定のトピックのプロデューサー要求が来たときに、Brokerパーティションはパーティショナで選択される。
(詳細はセマンティックパーティショニングのセクションを参照)
プールから取得したBrokerへの接続オブジェクトを用いて選択したBrokerパーティションへのデータ送信を行っている。
- 非同期送信
非同期ノンブロッキング操作はメッセージングシステムをスケールさせるための基本の要素。
KafkaではProducerプロセスがメッセージを生産する際に非同期リクエストディスパッチオプションを指定可能。
これは一定時間ごと、または一定サイズごとを事前に設定し、Producerプロセス内のメモリキューにバッファリングした要求をまとめて送付するもの。
データの生成レートは通常ノードごとに異なるため、非同期オプションを指定することで
ネットワーク内のトラフィックを軽減/効率的な利用を可能にし、Brokerプロセスへのリクエスト負荷を一定にするのに役立つ。
- セマンティックパーティショニング
各メンバーのプロファイルに対する訪問者数を維持したいアプリケーションを考えるとする。
この場合、あるメンバーのプロファイルに対する訪問履歴イベントは出来れば同じパーティション、同じConsumerスレッドにおいて処理したいと考える。
KafkaのProducerプロセスは生存中のKafkaノード、およびパーティションへメッセージをマッピングさせる機能を持っている。
メッセージ中のいくつかの値を元にBrokerプロセス達に対してストリームを分割する機能・・という形で実現される。
このパーティショニング機能はkafka.producer.Partitionerインタフェースを継承したパーティショナーを作成することで
カスタマイズ可能。デフォルトではランダムでパーティショニングが行われる。
尚、上記の例においてはパーティショニングを行うキーはMEMBER_IDになり、
パーティショナーは Hash(MEMBER_ID) % num_partitions で配信先のBrokerプロセスを算出するだろう。
=====
比較的わかりやすい章でしたが、一つ気になることとしては
「セマンティックパーティショニング」を実行中にノードの追加削除が行われた場合、
num_partitions は変わるのか、否か、がありますね。
ノードを追加した際にパーティションも再配分されるのであれば完全に動的に追加削除が可能となるわけですが、
パーティション自体の保持するデータ量が大きくなってくればそういうわけにもいかない。
とまぁ、これは後で確認すればいいことですか。
=====
12.Hadoopその他のバッチデータロードのサポート
Kafkaはスケーラブルな永続性を保持しているため、バッチシステムのために
定期的にデータをロードすることも可能。
LinkedIn(?)ではHadoopクラスタとDWHにデータをロードするために使用している。
バッチ処理はデータロードフェーズと、非循環グラフ的な処理フローを実行してデータを出力するフェーズと
段階的に実行される。
このモデルをサポートする際にKafkaのある時点からのデータロードを再度実施できるという特長は
障害が発生した際に都合がいい要素となっている。
Hadoopのケースにおいて、Kafkaは各ノード/トピック/パーティション単位でデータを分割配分可能なため、
各Mapperタスクに対してデータロード負荷を均等に配分可能。
HadoopはTaskの実行管理を提供するが、Kafkaを利用することで重複データのロードの危険なしに再起動が可能。
=====
こちらについてもこれまで読んできた内容があるのでわかりやすい内容ですね。
後はパーティション=1つのトピックを複数のパーティションに分割するための概念のように見えますが。。。
断言はまたあとで。
=====