複数台マシンを用いたKafkaクラスタの構築方法
こんにちは。
しばらく前にKakfa0.8.0正式版がリリースされているのですが、
0.8.0から追加されたレプリケーションの機能をきちんと試していなかったなぁ・・・
ということで、Kafkaクラスタを構築するところから再度確認しなおしてみます。
まず、構築したい環境としては下記です。
3台の別マシン上にZooKeeperとKafkaのクラスタを構築することをゴールとします。
その他の前提条件としては下記です。
尚、ZooKeeperの構築手順や前提条件の設定方法についてはMesos-MasterをZooKeeperで冗長化させるを参照してください。
1-1.KafkaServerインストール
バイナリをダウンロードし、展開します。
# cd /usr/local/src # wget http://ftp.meisei-u.ac.jp/mirror/apache/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz # tar xvzpf kafka_2.8.0-0.8.0.tar.gz # mv kafka_2.8.0-0.8.0 /opt/ # ln -s /opt/kafka_2.8.0-0.8.0 /opt/kafka
1-2.KafkaServer設定
次はサーバ設定とログ設定を行います。
サーバ設定
# vi /opt/kafka/config/server.properties
下記の個所の設定を行います。
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # ★kafka1=1、kafka2=2、kafka3=3の設定を行う 〜〜〜〜〜 # A comma seperated list of directories under which to store log files log.dirs=/opt/kafka-logs # ディレクトリを設定 〜〜〜〜〜 # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181 # ZooKeeperの接続先を設定
設定したディレクトリを作成しておきます。
# mkdir /opt/kafka-logs
ログ設定
次はログ設定です。デフォルトだと一部コンソールに出力されるなど使い勝手が悪い点があるため、
全て/var/log/kafka 配下にファイルとして出力されるように設定しておきます。
# vi /opt/kafka/config/log4j.properties
log4j.rootLogger=INFO, kafkaAppender # デフォルトの出力先をServer.logに修正 〜〜〜〜〜 log4j.appender.kafkaAppender.File=/var/log/kafka/server.log # 出力先を/var/log/kafka 配下に修正 〜〜〜〜〜 log4j.appender.stateChangeAppender.File=/var/log/kafka/state-change.log # 出力先を/var/log/kafka 配下に修正 〜〜〜〜〜 log4j.appender.requestAppender.File=/var/log/kafka/kafka-request.log # 出力先を/var/log/kafka 配下に修正 〜〜〜〜〜 log4j.appender.controllerAppender.File=/var/log/kafka/controller.log # 出力先を/var/log/kafka 配下に修正
1-2.KafkaServer起動
では起動してクラスタ化されることを確認してみます。
# cd /opt/kafka # nohup bin/kafka-server-start.sh config/server.properties &
すると、下記のようなログが/var/log/kafka/server.logに出力され、
各々のKafkaServerがZooKeeperに登録された上でリーダーが1に選出されたことがわかりました。
・・・ただ、初期化周りのログが2重に出力されているように見える所が微妙ですね。
1回しか出力されていないログもあるため、ログ設定が根本から誤っている・・・ということはなさそうなのですが。
- kafka1
[2014-02-26 06:30:21,580] INFO Registered broker 1 at path /brokers/ids/1 with address kafka1:9092. (kafka.utils.ZkUtils$) // ★自身の情報を登録★ [2014-02-26 06:30:21,580] INFO Registered broker 1 at path /brokers/ids/1 with address kafka1:9092. (kafka.utils.ZkUtils$) // ★上のログと同じ?★ 〜〜〜〜〜 [2014-02-26 06:30:21,690] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) // ★リーダーに1が選出された★ [2014-02-26 06:30:21,690] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) // ★上のログと同じ?★ [2014-02-26 06:30:22,945] INFO No state transitions triggered since no partitions are assigned to brokers 2 (kafka.utils.ZkUtils$) // ★broker 2がZooKeeperに登録された★ [2014-02-26 06:30:22,945] INFO No state transitions triggered since no partitions are assigned to brokers 2 (kafka.utils.ZkUtils$) // ★上のログと同じ?★ [2014-02-26 06:30:24,084] INFO No state transitions triggered since no partitions are assigned to brokers 3 (kafka.utils.ZkUtils$) // ★broker 3がZooKeeperに登録された★ [2014-02-26 06:30:24,084] INFO No state transitions triggered since no partitions are assigned to brokers 3 (kafka.utils.ZkUtils$) // ★上のログと同じ?★ 〜〜〜〜〜
- kafka2
[2014-02-26 06:30:23,046] INFO Registered broker 2 at path /brokers/ids/2 with address kafka2:9092. (kafka.utils.ZkUtils$) // ★自身の情報を登録★ [2014-02-26 06:30:23,046] INFO Registered broker 2 at path /brokers/ids/2 with address kafka2:9092. (kafka.utils.ZkUtils$) // ★上のログと同じ?★ 〜〜〜〜〜 [2014-02-26 06:30:23,133] INFO conflict in /controller data: { "brokerid":2, "timestamp":"1393363823123", "version":1 } stored data: { "brokerid":1, "timestamp":"1393363821686", "version":1 } (kafka.utils.ZkUtils$) // ★リーダーとして既に1が存在している★ [2014-02-26 06:30:23,133] INFO conflict in /controller data: { "brokerid":2, "timestamp":"1393363823123", "version":1 } stored data: { "brokerid":1, "timestamp":"1393363821686", "version":1 } (kafka.utils.ZkUtils$) // ★上のログと同じ?★ 〜〜〜〜〜
- kafka3
[2014-02-26 06:30:24,281] INFO Registered broker 3 at path /brokers/ids/3 with address kafka3:9092. (kafka.utils.ZkUtils$) // ★自身の情報を登録★ [2014-02-26 06:30:24,281] INFO Registered broker 3 at path /brokers/ids/3 with address kafka3:9092. (kafka.utils.ZkUtils$) // ★上のログと同じ?★ 〜〜〜〜〜 [2014-02-26 06:30:24,407] INFO conflict in /controller data: { "brokerid":3, "timestamp":"1393363824399", "version":1 } stored data: { "brokerid":1, "timestamp":"1393363821686", "version":1 } (kafka.utils.ZkUtils$) // ★リーダーとして既に1が存在している★ [2014-02-26 06:30:24,407] INFO conflict in /controller data: { "brokerid":3, "timestamp":"1393363824399", "version":1 } stored data: { "brokerid":1, "timestamp":"1393363821686", "version":1 } (kafka.utils.ZkUtils$) // ★上のログと同じ?★ 〜〜〜〜〜
ZooKeeperに保存される内容
尚、Kafka起動の時点でZooKeeperに登録されていた情報は下記の通りでした。
ZooKeeperのルートに作成してしまうあたりは微妙にいけていないですが、
そのあたりはZooKeeperの接続設定変えればいいだけの話ですので、気にしない方向で。
/ Zookeeperルートディレクトリ(Kafkaはルートディレクトリに直に作成する模様) ├―controller { "brokerid":1, "timestamp":"1393363821686", "version":1 } (ephemeral) | ├―brokers/ 各KafkaServerの情報が保存 | └―ids/ | 1 { "host":"kafka1", "jmx_port":-1, "port":9092, "timestamp":"1393363821490", "version":1 } (ephemeral) | 2 { "host":"kafka2", "jmx_port":-1, "port":9092, "timestamp":"1393363822999", "version":1 } (ephemeral) | 3 { "host":"kafka3", "jmx_port":-1, "port":9092, "timestamp":"1393363824238", "version":1 } (ephemeral) | └―controller_epoch 1 という情報が保存。リーダー選出時のID?
最後に
Kafkaクラスタを構築し、ZooKeeperを介してお互いが認識されることを確認しました。
ただ、毎回これを実行するのも面倒、かつnohupを用いた起動なども必要になってくるため、
このあたりはservices化し、更新した設定ファイルとともにRPMにまとめてしまいたいところではありますね。
なお、KafkaのRPMの事例は下記のようにいくつか見つかりますが、RPMファイル自体を公開している事例は無いようです。
また、ソースをダウンロードしてビルドしたり、ZooKeeperと同一サーバにインストールする必要があったり、
微妙に使いにくいというのも現状の模様。自前で作ってしまうのもありかもしれませんね。
https://github.com/nielsbasjes/kafka-rpm
https://github.com/edwardcapriolo/kafka-rpm
https://github.com/reedox/kafka-rpm
とりあえずクラスタ化が確認できたため、次回は実際にTopicを作成してデータを投入した際に
データがどこに保存/レプリケーションされるか・・・あたりを確認してみようと思います。