夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

KafkaクラスタにおけるTopic生成時動作の確認

こんにちは。

前回に引き続き、Kafkaの動作を確認してみよう、の回です。
今回はトピックを作成した際の動作確認を行います。

1-1.レプリケーション数1のTopicの作成

まずはTopicを作成します。
作成する際にはパーティション数、レプリカ数等が指定可能ですが、
パーティション数3、レプリカ数1で作成を行います。

# cd /opt/kafka
# bin/kafka-create-topic.sh
Missing required argument "[topic]"
Option                                  Description
------                                  -----------
--partition <Integer: # of partitions>  number of partitions in the topic
                                          (default: 1)
--replica <Integer: replication factor> replication factor for each partitions
                                          in the topic (default: 1)
--replica-assignment-list               for manually assigning replicas to
  <broker_id_for_part1_replica1 :         brokers (default: )
  broker_id_for_part1_replica2,
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2, ...>
--topic <topic>                         REQUIRED: The topic to be created.
--zookeeper <urls>                      REQUIRED: The connection string for
                                          the zookeeper connection in the form
                                          host:port. Multiple URLS can be
                                          given to allow fail-over.
# bin/kafka-create-topic.sh --partition 3 --replica 1 --topic TopicRep1 --zookeeper kafka1:2181,kafka2:2181,kafka3:2181
creation succeeded!

作成完了したので、ログとファイル、ZooKeeperの中身を確認してみます。

ログ
kafka1(/var/log/kafka/state-change.log)
[2014-02-27 07:04:40,089] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 12 to broker 2 for partition [TopicRep1,1] (state.change.logger) // ★broker2がパーティション1のリーダーに決定★
[2014-02-27 07:04:40,089] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 12 to broker 1 for partition [TopicRep1,0] (state.change.logger) // ★broker1がパーティション0のリーダーに決定★
[2014-02-27 07:04:40,091] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 12 to broker 3 for partition [TopicRep1,2] (state.change.logger) // ★broker3がパーティション2のリーダーに決定★
kafka1(/var/log/kafka/server.log)
[2014-02-27 07:04:40,145] INFO [Log Manager on Broker 1] Created log for partition [TopicRep1,0] in /opt/kafka-logs. (kafka.log.LogManager) // ★ログファイル生成★
[2014-02-27 07:04:40,151] WARN No highwatermark file is found. Returning 0 as the highwatermark for partition [TopicRep1,0] (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
kafka2(/var/log/kafka/server.log)
[2014-02-27 07:04:40,331] INFO [Log Manager on Broker 2] Created log for partition [TopicRep1,1] in /opt/kafka-logs. (kafka.log.LogManager) // ★ログファイル生成★
[2014-02-27 07:04:40,332] WARN No highwatermark file is found. Returning 0 as the highwatermark for partition [TopicRep1,1] (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
kafka3(/var/log/kafka/server.log)
[2014-02-27 07:04:40,411] INFO [Log Manager on Broker 3] Created log for partition [TopicRep1,2] in /opt/kafka-logs. (kafka.log.LogManager) // ★ログファイル生成★
[2014-02-27 07:04:40,411] WARN No highwatermark file is found. Returning 0 as the highwatermark for partition [TopicRep1,2] (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★

ログを確認してみると、各パーティションのリーダーが決定し、Topicを保存するファイルが生成されていることがわかりました。

ファイルは下記のようになっていました。
Topic用のディレクトリ/ファイルとレプリケーションのチェックポイントファイルが更新された形になっていますね。

kafka1
# ls -lR /opt/kafka-logs
/opt/kafka-logs:
合計 8
drwxr-xr-x. 2 root root 4096  227 07:04 2014 TopicRep1-0
-rw-r--r--. 1 root root   18  228 06:57 2014 replication-offset-checkpoint 

/opt/kafka-logs/TopicRep1-0:
合計 0
-rw-r--r--. 1 root root 10485760  227 07:04 2014 00000000000000000000.index
-rw-r--r--. 1 root root        0  227 07:04 2014 00000000000000000000.log
ZooKeeperに保存される内容

次はZooKeeperを確認してみます。
作成したTopicの情報が追加されているのがわかりますね。
リーダーの情報も実際に選出されたものと一致しています。

/  Zookeeperルートディレクトリ
└―brokers/
    └―topics/
        └―TopicRep1/ ({ "partitions":{ "0":[ 1 ], "1":[ 2 ], "2":[ 3 ] }, "version":1 })
            └―partitions/
                 ├―0/
                 |  └―state  { "controller_epoch":1, "isr":[ 1 ], "leader":1, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★
                 ├―1/
                 |  └―state  { "controller_epoch":1, "isr":[ 2 ], "leader":2, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★
                 └― 2 
                     └―state  { "controller_epoch":1, "isr":[ 3 ], "leader":3, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★

1-2.レプリケーション数2のTopicの作成

では、次はレプリケーション数2のTopicを作成して動作を確認してみます。

# cd /opt/kafka
# bin/kafka-create-topic.sh --partition 3 --replica 2 --topic TopicRep2 --zookeeper kafka1:2181,kafka2:2181,kafka3:2181
creation succeeded!

レプリケーション1の時と同じようにログとファイル、ZooKeeperの中身を確認してみます。

ログ
kafka1(/var/log/kafka/state-change.log)
[2014-02-28 08:01:32,019] TRACE Controller 1 epoch 1 sending become-follower LeaderAndIsr request with correlationId 16 to broker 2 for partition [TopicRep2,0] (state.change.logger) // ★broker2がパーティション0のフォロワーに決定★
[2014-02-28 08:01:32,019] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 16 to broker 2 for partition [TopicRep2,1] (state.change.logger) // ★broker2がパーティション1のリーダーに決定★
[2014-02-28 08:01:32,019] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 16 to broker 1 for partition [TopicRep2,0] (state.change.logger) // ★broker1がパーティション0のリーダーに決定★
[2014-02-28 08:01:32,019] TRACE Controller 1 epoch 1 sending become-follower LeaderAndIsr request with correlationId 16 to broker 1 for partition [TopicRep2,2] (state.change.logger) // ★broker1がパーティション2のフォロワーに決定★
[2014-02-28 08:01:32,020] TRACE Controller 1 epoch 1 sending become-leader LeaderAndIsr request with correlationId 16 to broker 3 for partition [TopicRep2,2] (state.change.logger) // ★broker3がパーティション2のリーダーに決定★
[2014-02-28 08:01:32,020] TRACE Controller 1 epoch 1 sending become-follower LeaderAndIsr request with correlationId 16 to broker 3 for partition [TopicRep2,1] (state.change.logger) // ★broker1がパーティション1のフォロワーに決定★
kafka1(/var/log/kafka/server.log)
[2014-02-28 08:01:32,022] INFO [Log Manager on Broker 1] Created log for partition [TopicRep2,0] in /opt/kafka-logs. (kafka.log.LogManager) // ★リーダー用ログファイル生成★
[2014-02-28 08:01:32,024] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 0. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
[2014-02-28 08:01:32,026] INFO [Log Manager on Broker 1] Created log for partition [TopicRep2,2] in /opt/kafka-logs. (kafka.log.LogManager) // ★フォロワー用ログファイル生成★
[2014-02-28 08:01:32,027] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 2. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
kafka2(/var/log/kafka/server.log)
[2014-02-28 08:01:32,185] INFO [Log Manager on Broker 2] Created log for partition [TopicRep2,0] in /opt/kafka-logs. (kafka.log.LogManager)
[2014-02-28 08:01:32,187] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 0. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
[2014-02-28 08:01:32,207] INFO [Log Manager on Broker 2] Created log for partition [TopicRep2,1] in /opt/kafka-logs. (kafka.log.LogManager) // ★リーダー用ログファイル生成★
[2014-02-28 08:01:32,208] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 1. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
kafka3(/var/log/kafka/server.log)
[2014-02-28 08:01:32,307] INFO [Log Manager on Broker 3] Created log for partition [TopicRep2,1] in /opt/kafka-logs. (kafka.log.LogManager) // ★フォロワー用ログファイル生成★
[2014-02-28 08:01:32,309] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 1. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★
[2014-02-28 08:01:32,333] INFO [Log Manager on Broker 3] Created log for partition [TopicRep2,2] in /opt/kafka-logs. (kafka.log.LogManager) // ★リーダー用ログファイル生成★
[2014-02-28 08:01:32,343] WARN No previously checkpointed highwatermark value found for topic TopicRep2 partition 2. Returning 0 as the highwatermark (kafka.server.HighwaterMarkCheckpoint) // ★ 同期済みインデックス値生成 ★

とりあえずログから動作の方はわかりますね。
ただ、レプリケーションを行うTopicを作成すると、下記のように「kafka-request.log」がバックグラウンドのレプリケーションの同期によって
出力され続ける状態になってしまいます。
このあたりは抑制できるようログ設定を考える必要がありそうですね。

kafka1
# ls -l /var/log/kafka/kafka-request.log*
-rw-r--r--. 1 root root  254890  31 12:03 2014 /var/log/kafka/kafka-request.log
-rw-r--r--. 1 root root       0  226 06:30 2014 /var/log/kafka/kafka-request.log.2014-02-26-06
-rw-r--r--. 1 root root    2496  227 07:04 2014 /var/log/kafka/kafka-request.log.2014-02-27-07
-rw-r--r--. 1 root root 4646397  228 08:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-08
-rw-r--r--. 1 root root 4783059  228 09:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-09
-rw-r--r--. 1 root root 4791753  228 10:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-10
-rw-r--r--. 1 root root 4792411  228 11:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-11
-rw-r--r--. 1 root root 4792419  228 12:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-12
-rw-r--r--. 1 root root 4792418  228 13:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-13
-rw-r--r--. 1 root root 4791754  228 14:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-14
-rw-r--r--. 1 root root 4793075  228 15:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-15
-rw-r--r--. 1 root root 4793079  228 16:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-16
-rw-r--r--. 1 root root 4792415  228 17:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-17
-rw-r--r--. 1 root root 4792414  228 18:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-18
-rw-r--r--. 1 root root 4792418  228 19:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-19
-rw-r--r--. 1 root root 4793074  228 20:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-20
-rw-r--r--. 1 root root 4793855  228 21:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-21
-rw-r--r--. 1 root root 4806792  228 22:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-22
-rw-r--r--. 1 root root 4806783  228 23:59 2014 /var/log/kafka/kafka-request.log.2014-02-28-23
-rw-r--r--. 1 root root 4806790  31 00:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-00
-rw-r--r--. 1 root root 4806782  31 01:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-01
-rw-r--r--. 1 root root 4808119  31 02:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-02
-rw-r--r--. 1 root root 4806781  31 03:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-03
-rw-r--r--. 1 root root 4807455  31 04:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-04
-rw-r--r--. 1 root root 4807446  31 05:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-05
-rw-r--r--. 1 root root 4807448  31 06:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-06
-rw-r--r--. 1 root root 4807454  31 07:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-07
-rw-r--r--. 1 root root 4806119  31 08:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-08
-rw-r--r--. 1 root root 4806791  31 09:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-09
-rw-r--r--. 1 root root 4807450  31 10:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-10
-rw-r--r--. 1 root root 4807443  31 11:59 2014 /var/log/kafka/kafka-request.log.2014-03-01-11
ファイル

ログに出力されていた通り、TopicRep2用のファイルはリーダー用、フォロワー用両方生成されています。

kafka1
# ls -lR /opt/kafka-logs
.:
合計 16
drwxr-xr-x. 2 root root 4096  227 07:04 2014 TopicRep1-0
drwxr-xr-x. 2 root root 4096  228 08:01 2014 TopicRep2-0
drwxr-xr-x. 2 root root 4096  228 08:01 2014 TopicRep2-2
-rw-r--r--. 1 root root   46  31 12:07 2014 replication-offset-checkpoint

./TopicRep1-0:
合計 0
-rw-r--r--. 1 root root 10485760  227 07:04 2014 00000000000000000000.index
-rw-r--r--. 1 root root        0  227 07:04 2014 00000000000000000000.log

./TopicRep2-0:
合計 0
-rw-r--r--. 1 root root 10485760  228 08:01 2014 00000000000000000000.index
-rw-r--r--. 1 root root        0  228 08:01 2014 00000000000000000000.log

./TopicRep2-2:
合計 0
-rw-r--r--. 1 root root 10485760  228 08:01 2014 00000000000000000000.index
-rw-r--r--. 1 root root        0  228 08:01 2014 00000000000000000000.log

尚、replication-offset-checkpointの中身は下記の通りでした。
保持しているTopicとパーティションID、
あとはどこまでレプリケーションを行ったかのオフセットを保存しているようです。
オフセットは現状は0ですね。

# cat /opt/kafka-logs/replication-offset-checkpoint
0
3
TopicRep2 2 0
TopicRep1 0 0
TopicRep2 0 0
ZooKeeper

次はZooKeeperです。
今回のTopicの追加で追加されたZooKeeperの情報は下記でした。
レプリケーションが加わったことでフォロワーの情報も保持されるようになっているようですね。

/  Zookeeperルートディレクトリ
└―brokers/
    └―topics/
        └―TopicRep2/ ({ "partitions":{ "0":[ 1, 2 ], "1":[ 2, 3 ], "2":[ 3, 1 ] }, "version":1 })
            └―partitions/
                 ├―0/
                 |  └―state  { "controller_epoch":1, "isr":[ 1, 2 ], "leader":1, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★
                 ├―1/
                 |  └―state  { "controller_epoch":1, "isr":[ 2, 3 ], "leader":2, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★
                 └― 2 
                     └―state  { "controller_epoch":1, "isr":[ 3, 1 ], "leader":3, "leader_epoch":0, "version":1 } // ★パーティションのリーダー情報を保持★

この後は情報を投入してレプリケーションの確認・・・
といきたかったところですが、記事が長くなりすぎるため、ここで一度切ります。

最後に

Topic作成を行った際にKafkaがどう動くか、を確認することができました。
レプリケーションを行った際にはログのファイル自体はフォロワーにも同じく作成され、
後はリーダーをZooKeeperで管理する形になっているようですね。

では、次は実際にログを流してみてファイルがどう変化するかを確認してみます。