KafkaクラスタにおけるTopic生成時動作の確認
こんにちは。
前回に引き続き、Kafkaの動作を確認してみよう、の回です。
今回はトピックを作成した際の動作確認を行います。
1-1.レプリケーション数1のTopicの作成
まずはTopicを作成します。
作成する際にはパーティション数、レプリカ数等が指定可能ですが、
パーティション数3、レプリカ数1で作成を行います。
# cd /opt/kafka # bin/kafka-create-topic.sh Missing required argument "[topic]" Option Description ------ ----------- --partition <Integer: # of partitions> number of partitions in the topic (default: 1) --replica <Integer: replication factor> replication factor for each partitions in the topic (default: 1) --replica-assignment-list for manually assigning replicas to <broker_id_for_part1_replica1 : brokers (default: ) broker_id_for_part1_replica2, broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...> --topic <topic> REQUIRED: The topic to be created. --zookeeper <urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. # bin/kafka-create-topic.sh --partition 3 --replica 1 --topic TopicRep1 --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 creation succeeded!
作成完了したので、ログとファイル、ZooKeeperの中身を確認してみます。
ログ
kafka1(/var/log/kafka/state-change.log)
[2014-02-27 07:04:40,089] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 12 to broker 2 for partition [TopicRep1,1] (state.change.logger) // ★broker2がパーティション1のリーダーに決定★ [2014-02-27 07:04:40,089] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 12 to broker 1 for partition [TopicRep1,0] (state.change.logger) // ★broker1がパーティション0のリーダーに決定★ [2014-02-27 07:04:40,091] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 12 to broker 3 for partition [TopicRep1,2] (state.change.logger) // ★broker3がパーティション2のリーダーに決定★
kafka1(/var/log/kafka/server.log)
[2014-02-27 07:04:40,145] INFO [Log Manager on Broker 1] Created log for partition [TopicRep1,0] in /opt/kafka-logs. (kafka.log.LogManager) // ★ログファイル生成★ [2014-02-27 07:04:40,151] WARN No highwatermark file is found. Returning 0 as the highwatermark for partition [TopicRep1,0] (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
kafka2(/var/log/kafka/server.log)
[2014-02-27 07:04:40,331] INFO [Log Manager on Broker 2] Created log for partition [TopicRep1,1] in /opt/kafka-logs. (kafka.log.LogManager) // ★ログファイル生成★ [2014-02-27 07:04:40,332] WARN No highwatermark file is found. Returning 0 as the highwatermark for partition [TopicRep1,1] (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
kafka3(/var/log/kafka/server.log)
[2014-02-27 07:04:40,411] INFO [Log Manager on Broker 3] Created log for partition [TopicRep1,2] in /opt/kafka-logs. (kafka.log.LogManager) // ★ログファイル生成★ [2014-02-27 07:04:40,411] WARN No highwatermark file is found. Returning 0 as the highwatermark for partition [TopicRep1,2] (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
ログを確認してみると、各パーティションのリーダーが決定し、Topicを保存するファイルが生成されていることがわかりました。
ファイルは下記のようになっていました。
Topic用のディレクトリ/ファイルとレプリケーションのチェックポイントファイルが更新された形になっていますね。
kafka1
# ls -lR /opt/kafka-logs /opt/kafka-logs: 合計 8 drwxr-xr-x. 2 root root 4096 2月 27 07:04 2014 TopicRep1-0 -rw-r--r--. 1 root root 18 2月 28 06:57 2014 replication-offset-checkpoint /opt/kafka-logs/TopicRep1-0: 合計 0 -rw-r--r--. 1 root root 10485760 2月 27 07:04 2014 00000000000000000000.index -rw-r--r--. 1 root root 0 2月 27 07:04 2014 00000000000000000000.log
ZooKeeperに保存される内容
次はZooKeeperを確認してみます。
作成したTopicの情報が追加されているのがわかりますね。
リーダーの情報も実際に選出されたものと一致しています。
/ Zookeeperルートディレクトリ └―brokers/ └―topics/ └―TopicRep1/ ({ "partitions":{ "0":[ 1 ], "1":[ 2 ], "2":[ 3 ] }, "version":1 }) └―partitions/ ├―0/ | └―state { "controller_epoch":1, "isr":[ 1 ], "leader":1, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★ ├―1/ | └―state { "controller_epoch":1, "isr":[ 2 ], "leader":2, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★ └― 2 └―state { "controller_epoch":1, "isr":[ 3 ], "leader":3, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★
1-2.レプリケーション数2のTopicの作成
では、次はレプリケーション数2のTopicを作成して動作を確認してみます。
# cd /opt/kafka # bin/kafka-create-topic.sh --partition 3 --replica 2 --topic TopicRep2 --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 creation succeeded!
レプリケーション1の時と同じようにログとファイル、ZooKeeperの中身を確認してみます。
ログ
kafka1(/var/log/kafka/state-change.log)
[2014-02-28 08:01:32,019] TRACE Controller 1 epoch 1 sending become-follower LeaderAndIsr request with correlationId 16 to broker 2 for partition [TopicRep2,0] (state.change.logger) // ★broker2がパーティション0のフォロワーに決定★ [2014-02-28 08:01:32,019] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 16 to broker 2 for partition [TopicRep2,1] (state.change.logger) // ★broker2がパーティション1のリーダーに決定★ [2014-02-28 08:01:32,019] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 16 to broker 1 for partition [TopicRep2,0] (state.change.logger) // ★broker1がパーティション0のリーダーに決定★ [2014-02-28 08:01:32,019] TRACE Controller 1 epoch 1 sending become-follower LeaderAndIsr request with correlationId 16 to broker 1 for partition [TopicRep2,2] (state.change.logger) // ★broker1がパーティション2のフォロワーに決定★ [2014-02-28 08:01:32,020] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 16 to broker 3 for partition [TopicRep2,2] (state.change.logger) // ★broker3がパーティション2のリーダーに決定★ [2014-02-28 08:01:32,020] TRACE Controller 1 epoch 1 sending become-follower LeaderAndIsr request with correlationId 16 to broker 3 for partition [TopicRep2,1] (state.change.logger) // ★broker1がパーティション1のフォロワーに決定★
kafka1(/var/log/kafka/server.log)
[2014-02-28 08:01:32,022] INFO [Log Manager on Broker 1] Created log for partition [TopicRep2,0] in /opt/kafka-logs. (kafka.log.LogManager) // ★リーダー用ログファイル生成★ [2014-02-28 08:01:32,024] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 0. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★ [2014-02-28 08:01:32,026] INFO [Log Manager on Broker 1] Created log for partition [TopicRep2,2] in /opt/kafka-logs. (kafka.log.LogManager) // ★フォロワー用ログファイル生成★ [2014-02-28 08:01:32,027] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 2. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
kafka2(/var/log/kafka/server.log)
[2014-02-28 08:01:32,185] INFO [Log Manager on Broker 2] Created log for partition [TopicRep2,0] in /opt/kafka-logs. (kafka.log.LogManager) [2014-02-28 08:01:32,187] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 0. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★ [2014-02-28 08:01:32,207] INFO [Log Manager on Broker 2] Created log for partition [TopicRep2,1] in /opt/kafka-logs. (kafka.log.LogManager) // ★リーダー用ログファイル生成★ [2014-02-28 08:01:32,208] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 1. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
kafka3(/var/log/kafka/server.log)
[2014-02-28 08:01:32,307] INFO [Log Manager on Broker 3] Created log for partition [TopicRep2,1] in /opt/kafka-logs. (kafka.log.LogManager) // ★フォロワー用ログファイル生成★ [2014-02-28 08:01:32,309] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 1. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★ [2014-02-28 08:01:32,333] INFO [Log Manager on Broker 3] Created log for partition [TopicRep2,2] in /opt/kafka-logs. (kafka.log.LogManager) // ★リーダー用ログファイル生成★ [2014-02-28 08:01:32,343] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 2. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
とりあえずログから動作の方はわかりますね。
ただ、レプリケーションを行うTopicを作成すると、下記のように「kafka-request.log」がバックグラウンドのレプリケーションの同期によって
出力され続ける状態になってしまいます。
このあたりは抑制できるようログ設定を考える必要がありそうですね。
kafka1
# ls -l /var/log/kafka/kafka-request.log* -rw-r--r--. 1 root root 254890 3月 1 12:03 2014 /var/log/kafka/kafka-request.log -rw-r--r--. 1 root root 0 2月 26 06:30 2014 /var/log/kafka/kafka-request.log.2014-02-26-06 -rw-r--r--. 1 root root 2496 2月 27 07:04 2014 /var/log/kafka/kafka-request.log.2014-02-27-07 -rw-r--r--. 1 root root 4646397 2月 28 08:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-08 -rw-r--r--. 1 root root 4783059 2月 28 09:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-09 -rw-r--r--. 1 root root 4791753 2月 28 10:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-10 -rw-r--r--. 1 root root 4792411 2月 28 11:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-11 -rw-r--r--. 1 root root 4792419 2月 28 12:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-12 -rw-r--r--. 1 root root 4792418 2月 28 13:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-13 -rw-r--r--. 1 root root 4791754 2月 28 14:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-14 -rw-r--r--. 1 root root 4793075 2月 28 15:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-15 -rw-r--r--. 1 root root 4793079 2月 28 16:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-16 -rw-r--r--. 1 root root 4792415 2月 28 17:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-17 -rw-r--r--. 1 root root 4792414 2月 28 18:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-18 -rw-r--r--. 1 root root 4792418 2月 28 19:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-19 -rw-r--r--. 1 root root 4793074 2月 28 20:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-20 -rw-r--r--. 1 root root 4793855 2月 28 21:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-21 -rw-r--r--. 1 root root 4806792 2月 28 22:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-22 -rw-r--r--. 1 root root 4806783 2月 28 23:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-23 -rw-r--r--. 1 root root 4806790 3月 1 00:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-00 -rw-r--r--. 1 root root 4806782 3月 1 01:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-01 -rw-r--r--. 1 root root 4808119 3月 1 02:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-02 -rw-r--r--. 1 root root 4806781 3月 1 03:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-03 -rw-r--r--. 1 root root 4807455 3月 1 04:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-04 -rw-r--r--. 1 root root 4807446 3月 1 05:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-05 -rw-r--r--. 1 root root 4807448 3月 1 06:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-06 -rw-r--r--. 1 root root 4807454 3月 1 07:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-07 -rw-r--r--. 1 root root 4806119 3月 1 08:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-08 -rw-r--r--. 1 root root 4806791 3月 1 09:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-09 -rw-r--r--. 1 root root 4807450 3月 1 10:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-10 -rw-r--r--. 1 root root 4807443 3月 1 11:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-11
ファイル
ログに出力されていた通り、TopicRep2用のファイルはリーダー用、フォロワー用両方生成されています。
kafka1
# ls -lR /opt/kafka-logs .: 合計 16 drwxr-xr-x. 2 root root 4096 2月 27 07:04 2014 TopicRep1-0 drwxr-xr-x. 2 root root 4096 2月 28 08:01 2014 TopicRep2-0 drwxr-xr-x. 2 root root 4096 2月 28 08:01 2014 TopicRep2-2 -rw-r--r--. 1 root root 46 3月 1 12:07 2014 replication-offset-checkpoint ./TopicRep1-0: 合計 0 -rw-r--r--. 1 root root 10485760 2月 27 07:04 2014 00000000000000000000.index -rw-r--r--. 1 root root 0 2月 27 07:04 2014 00000000000000000000.log ./TopicRep2-0: 合計 0 -rw-r--r--. 1 root root 10485760 2月 28 08:01 2014 00000000000000000000.index -rw-r--r--. 1 root root 0 2月 28 08:01 2014 00000000000000000000.log ./TopicRep2-2: 合計 0 -rw-r--r--. 1 root root 10485760 2月 28 08:01 2014 00000000000000000000.index -rw-r--r--. 1 root root 0 2月 28 08:01 2014 00000000000000000000.log
尚、replication-offset-checkpointの中身は下記の通りでした。
保持しているTopicとパーティションID、
あとはどこまでレプリケーションを行ったかのオフセットを保存しているようです。
オフセットは現状は0ですね。
# cat /opt/kafka-logs/replication-offset-checkpoint 0 3 TopicRep2 2 0 TopicRep1 0 0 TopicRep2 0 0
ZooKeeper
次はZooKeeperです。
今回のTopicの追加で追加されたZooKeeperの情報は下記でした。
レプリケーションが加わったことでフォロワーの情報も保持されるようになっているようですね。
/ Zookeeperルートディレクトリ └―brokers/ └―topics/ └―TopicRep2/ ({ "partitions":{ "0":[ 1, 2 ], "1":[ 2, 3 ], "2":[ 3, 1 ] }, "version":1 }) └―partitions/ ├―0/ | └―state { "controller_epoch":1, "isr":[ 1, 2 ], "leader":1, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★ ├―1/ | └―state { "controller_epoch":1, "isr":[ 2, 3 ], "leader":2, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★ └― 2 └―state { "controller_epoch":1, "isr":[ 3, 1 ], "leader":3, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★
この後は情報を投入してレプリケーションの確認・・・
といきたかったところですが、記事が長くなりすぎるため、ここで一度切ります。
最後に
Topic作成を行った際にKafkaがどう動くか、を確認することができました。
レプリケーションを行った際にはログのファイル自体はフォロワーにも同じく作成され、
後はリーダーをZooKeeperで管理する形になっているようですね。
では、次は実際にログを流してみてファイルがどう変化するかを確認してみます。