読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

複数台マシンを用いたKafkaクラスタの構築方法

こんにちは。

しばらく前にKakfa0.8.0正式版がリリースされているのですが、
0.8.0から追加されたレプリケーションの機能をきちんと試していなかったなぁ・・・
ということで、Kafkaクラスタを構築するところから再度確認しなおしてみます。

まず、構築したい環境としては下記です。
3台の別マシン上にZooKeeperとKafkaのクラスタを構築することをゴールとします。

その他の前提条件としては下記です。
尚、ZooKeeperの構築手順や前提条件の設定方法についてはMesos-MasterをZooKeeperで冗長化させるを参照してください。

1-1.KafkaServerインストール

バイナリをダウンロードし、展開します。

# cd /usr/local/src
# wget http://ftp.meisei-u.ac.jp/mirror/apache/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz
# tar xvzpf kafka_2.8.0-0.8.0.tar.gz
# mv kafka_2.8.0-0.8.0 /opt/
# ln -s /opt/kafka_2.8.0-0.8.0 /opt/kafka

1-2.KafkaServer設定

次はサーバ設定とログ設定を行います。

サーバ設定
# vi /opt/kafka/config/server.properties

下記の個所の設定を行います。

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1 # ★kafka1=1、kafka2=2、kafka3=3の設定を行う
〜〜〜〜〜
# A comma seperated list of directories under which to store log files
log.dirs=/opt/kafka-logs # ディレクトリを設定
〜〜〜〜〜
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181 # ZooKeeperの接続先を設定

設定したディレクトリを作成しておきます。

# mkdir /opt/kafka-logs
ログ設定

次はログ設定です。デフォルトだと一部コンソールに出力されるなど使い勝手が悪い点があるため、
全て/var/log/kafka 配下にファイルとして出力されるように設定しておきます。

# vi /opt/kafka/config/log4j.properties
log4j.rootLogger=INFO, kafkaAppender # デフォルトの出力先をServer.logに修正
〜〜〜〜〜
log4j.appender.kafkaAppender.File=/var/log/kafka/server.log # 出力先を/var/log/kafka 配下に修正
〜〜〜〜〜
log4j.appender.stateChangeAppender.File=/var/log/kafka/state-change.log # 出力先を/var/log/kafka 配下に修正
〜〜〜〜〜
log4j.appender.requestAppender.File=/var/log/kafka/kafka-request.log # 出力先を/var/log/kafka 配下に修正
〜〜〜〜〜
log4j.appender.controllerAppender.File=/var/log/kafka/controller.log # 出力先を/var/log/kafka 配下に修正

1-2.KafkaServer起動

では起動してクラスタ化されることを確認してみます。

# cd /opt/kafka
# nohup bin/kafka-server-start.sh config/server.properties &

すると、下記のようなログが/var/log/kafka/server.logに出力され、
各々のKafkaServerがZooKeeperに登録された上でリーダーが1に選出されたことがわかりました。
・・・ただ、初期化周りのログが2重に出力されているように見える所が微妙ですね。
1回しか出力されていないログもあるため、ログ設定が根本から誤っている・・・ということはなさそうなのですが。

  • kafka1
[2014-02-26 06:30:21,580] INFO Registered broker 1 at path /brokers/ids/1 with address kafka1:9092. (kafka.utils.ZkUtils$) // ★自身の情報を登録★
[2014-02-26 06:30:21,580] INFO Registered broker 1 at path /brokers/ids/1 with address kafka1:9092. (kafka.utils.ZkUtils$) // ★上のログと同じ?★
〜〜〜〜〜
[2014-02-26 06:30:21,690] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) // ★リーダーに1が選出された★
[2014-02-26 06:30:21,690] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) // ★上のログと同じ?★
[2014-02-26 06:30:22,945] INFO No state transitions triggered since no partitions are assigned to brokers 2 (kafka.utils.ZkUtils$) // ★broker 2がZooKeeperに登録された★
[2014-02-26 06:30:22,945] INFO No state transitions triggered since no partitions are assigned to brokers 2 (kafka.utils.ZkUtils$) // ★上のログと同じ?★
[2014-02-26 06:30:24,084] INFO No state transitions triggered since no partitions are assigned to brokers 3 (kafka.utils.ZkUtils$) // ★broker 3がZooKeeperに登録された★
[2014-02-26 06:30:24,084] INFO No state transitions triggered since no partitions are assigned to brokers 3 (kafka.utils.ZkUtils$) // ★上のログと同じ?★
〜〜〜〜〜
  • kafka2
[2014-02-26 06:30:23,046] INFO Registered broker 2 at path /brokers/ids/2 with address kafka2:9092. (kafka.utils.ZkUtils$) // ★自身の情報を登録★
[2014-02-26 06:30:23,046] INFO Registered broker 2 at path /brokers/ids/2 with address kafka2:9092. (kafka.utils.ZkUtils$) // ★上のログと同じ?★
〜〜〜〜〜
[2014-02-26 06:30:23,133] INFO conflict in /controller data: { "brokerid":2, "timestamp":"1393363823123", "version":1 } stored data: { "brokerid":1, "timestamp":"1393363821686", "version":1 } (kafka.utils.ZkUtils$) // ★リーダーとして既に1が存在している★
[2014-02-26 06:30:23,133] INFO conflict in /controller data: { "brokerid":2, "timestamp":"1393363823123", "version":1 } stored data: { "brokerid":1, "timestamp":"1393363821686", "version":1 } (kafka.utils.ZkUtils$) // ★上のログと同じ?★
〜〜〜〜〜
  • kafka3
[2014-02-26 06:30:24,281] INFO Registered broker 3 at path /brokers/ids/3 with address kafka3:9092. (kafka.utils.ZkUtils$) // ★自身の情報を登録★
[2014-02-26 06:30:24,281] INFO Registered broker 3 at path /brokers/ids/3 with address kafka3:9092. (kafka.utils.ZkUtils$) // ★上のログと同じ?★
〜〜〜〜〜
[2014-02-26 06:30:24,407] INFO conflict in /controller data: { "brokerid":3, "timestamp":"1393363824399", "version":1 } stored data: { "brokerid":1, "timestamp":"1393363821686", "version":1 } (kafka.utils.ZkUtils$) // ★リーダーとして既に1が存在している★
[2014-02-26 06:30:24,407] INFO conflict in /controller data: { "brokerid":3, "timestamp":"1393363824399", "version":1 } stored data: { "brokerid":1, "timestamp":"1393363821686", "version":1 } (kafka.utils.ZkUtils$) // ★上のログと同じ?★
〜〜〜〜〜
ZooKeeperに保存される内容

尚、Kafka起動の時点でZooKeeperに登録されていた情報は下記の通りでした。
ZooKeeperのルートに作成してしまうあたりは微妙にいけていないですが、
そのあたりはZooKeeperの接続設定変えればいいだけの話ですので、気にしない方向で。

/  Zookeeperルートディレクトリ(Kafkaはルートディレクトリに直に作成する模様)
├―controller { "brokerid":1, "timestamp":"1393363821686", "version":1 } (ephemeral)
|
├―brokers/ 各KafkaServerの情報が保存
|    └―ids/
|         1 { "host":"kafka1", "jmx_port":-1, "port":9092, "timestamp":"1393363821490", "version":1 } (ephemeral)
|         2 { "host":"kafka2", "jmx_port":-1, "port":9092, "timestamp":"1393363822999", "version":1 } (ephemeral)
|         3 { "host":"kafka3", "jmx_port":-1, "port":9092, "timestamp":"1393363824238", "version":1 } (ephemeral)
|
└―controller_epoch 1 という情報が保存。リーダー選出時のID?

最後に

Kafkaクラスタを構築し、ZooKeeperを介してお互いが認識されることを確認しました。
ただ、毎回これを実行するのも面倒、かつnohupを用いた起動なども必要になってくるため、
このあたりはservices化し、更新した設定ファイルとともにRPMにまとめてしまいたいところではありますね。

なお、KafkaのRPMの事例は下記のようにいくつか見つかりますが、RPMファイル自体を公開している事例は無いようです。
また、ソースをダウンロードしてビルドしたり、ZooKeeperと同一サーバにインストールする必要があったり、
微妙に使いにくいというのも現状の模様。自前で作ってしまうのもありかもしれませんね。
https://github.com/nielsbasjes/kafka-rpm
https://github.com/edwardcapriolo/kafka-rpm
https://github.com/reedox/kafka-rpm

とりあえずクラスタ化が確認できたため、次回は実際にTopicを作成してデータを投入した際に
データがどこに保存/レプリケーションされるか・・・あたりを確認してみようと思います。