KafkaクラスタでConsumerを接続した際の動作
こんにちは。
Producerからメッセージを投入時の動作を確認した前回に引き続き、動作を確認していきます。
今回はConsumerがKafkaクラスタに接続した時の動作について、です。
1-1.Consumerのコマンド確認
前回と同じく、Consumerのコマンドを確認します。
重要になるのはgroup、topic、後は必須がZooKeeperURLとなっています。
# cd /opt/kafka # bin/kafka-console-consumer.sh [2014-03-18 07:22:07,118] ERROR Missing required argument "[zookeeper]" (kafka.utils.CommandLineUtils$) Option Description ------ ----------- --autocommit.interval.ms <Integer: ms> The time interval at which to save the current offset in ms (default: 60000) --blacklist <blacklist> Blacklist of topics to exclude from consumption. --consumer-timeout-ms <Integer: prop> consumer throws timeout exception after waiting this much of time without incoming messages (default: -1) --csv-reporter-enabled If set, the CSV metrics reporter will be enabled --fetch-size <Integer: size> The amount of data to fetch in a single request. (default: 1048576) --formatter <class> The name of a class to use for formatting kafka messages for display. (default: kafka.consumer.DefaultMessageFormatter) --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --group <gid> The group id to consume on. (default: console-consumer-9976) --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 100) --metrics-dir <metrics dictory> If csv-reporter-enable is set, and this parameter isset, the csv metrics will be outputed here --min-fetch-bytes <Integer: bytes> The min number of bytes each fetch request waits for. (default: 1) --property <prop> --refresh-leader-backoff-ms <Integer: Backoff time before refreshing metadata ms> (default: 200) --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 2097152) --socket-timeout-ms <Integer: ms> The socket timeout used for the connection to the broker (default: 30000) --topic <topic> The topic id to consume on. --whitelist <whitelist> Whitelist of topics to include for consumption. --zookeeper <urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
1-2.GroupIdを指定しない状態での起動
GroupIdを指定しない状態でConsumerを1プロセス起動します。
# cd /opt/kafka # bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic TopicRep1
起動したタイミングだと特にメッセージの表示は行われません。
Console Consumer自体はデフォルトでは接続後に投入されたメッセージが表示されるようですね。
Consumer起動時のZooKeeperの状態
Consumerを起動したタイミングでZooKeeperには下記の情報が追加で登録されます。
指定したConsumerGroupId配下にConsumerの情報が保存されます。
これで各ConsumerがどのPartitionを読み込んでいるか管理しているようですね。
/ Zookeeperルートディレクトリ └―consumers/ └―console-consumer-52756 // ★Consumer Groupのノードが生成 ├―offsets/ | └―TopicRep1/ // ★Partition毎の読んだ場所を保持 | ├―0 7 | ├―1 0 | └―2 4 | ├―owners | └―TopicRep1/ // ★Partition毎の読むConsumerIDを保持 | ├―0 console-consumer-52756_kafka1-1395095277646-2a649f63-0(ephemeral) | ├―1 console-consumer-52756_kafka1-1395095277646-2a649f63-0(ephemeral) | └―2 console-consumer-52756_kafka1-1395095277646-2a649f63-0(ephemeral) | └―ids/ // ★ ConsumerIDとConsumerの設定情報を保持 └―console-consumer-52756_kafka1-1395095277646-2a649f63 { "pattern":"white_list", "subscription":{ "TopicRep1" : 1 }, "timestamp":"1395095277767", "version":1 }(ephemeral)
その後、同様にGroupIdを指定しないConsumerプロセスを2個起動します。
すると、下記のようにConsumerGroupが異なるツリーが生成されました。
ConsoleConsumer自体は一定の名称を持ったランダムのGroupIdを生成するようですね。
/ Zookeeperルートディレクトリ └―consumers/ |―console-consumer-52756/ (配下) // ★Kafka1で起動したConsumerのGroup情報 ├―console-consumer-57100/ (配下) // ★Kafka2で起動したConsumerのGroup情報 └―console-consumer-89259/ (配下) // ★Kafka3で起動したConsumerのGroup情報
1-3.GroupIdを指定しない状態でのメッセージ投入
では、上記の状態でメッセージを投入すると・・・下記のように全Consumerにメッセージが表示されました。
■Producer
TestMessage1
■Consumer1
TestMessage1
■Consumer2
TestMessage1
■Consumer3
TestMessage1
1-4.GroupIdを指定しない状態での起動
では、次はGroupIdを3Consumerとも同一の値を指定した状態で起動します。
# cd /opt/kafka # bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic TopicRep1 -group KafkaGroup
Consumer起動時のZooKeeperの状態
この状態でZooKeeperの中を見てみますと、下記のようになっていました。
/ Zookeeperルートディレクトリ └―consumers/ └―KafkaGroup // ★Consumer Groupのノードが生成 ├―offsets/ | └―TopicRep1/ // ★Partition毎の読んだ場所を保持 | ├―0 8 | ├―1 0 | └―2 4 | ├―owners | └―KafkaGroup/ // ★Partition毎の読むConsumerIDを保持 | ├―0 KafkaGroup_kafka1-1396390831658-54cedb19-0(ephemeral) | ├―1 KafkaGroup_kafka2-1396390834405-2850b581-0(ephemeral) | └―2 KafkaGroup_kafka3-1396390837607-e513d40d-0(ephemeral) | └―ids/ // ★ ConsumerIDとConsumerの設定情報を保持 ├―KafkaGroup_kafka1-1396390831658-54cedb19 { "pattern":"white_list", "subscription":{ "TopicRep1" : 1 }, "timestamp":"1396390831759", "version":1 } ├―KafkaGroup_kafka2-1396390834405-2850b581 { "pattern":"white_list", "subscription":{ "TopicRep1" : 1 }, "timestamp":"1396390834504", "version":1 } └―KafkaGroup_kafka3-1396390837607-e513d40d { "pattern":"white_list", "subscription":{ "TopicRep1" : 1 }, "timestamp":"1396390837707", "version":1 }
今度はGroupを指定したため、特定のGroup配下に3つのConsumerの情報が集約された状態になっていますね。
1-5.GroupIdを指定した状態でのメッセージ投入
GroupIdを指定した状態でメッセージを投入すると・・・下記のよう対応したPartitionのOwnerであるConsumerにメッセージが表示されました。
何故かメッセージが全てpartition0に割り振られたため、確認としては微妙にバランス悪いですが(汗
■Producer
TestMessage01 // (partition=0) TestMessage02 // (partition=0) TestTestMessage // (partition=0) Message01 // (partition=0) Message02 // (partition=0) TestTest // (partition=0) ABCDEFG // (partition=0)
■Consumer1
TestMessage01 TestMessage02 TestTestMessage Message01 Message02 TestTest ABCDEFG
■Consumer2
■Consumer3
まとめ
Consumerでメッセージを受信した時の動作をまとめると、下記の通りでした。
- GroupIdが異なるConsumerを起動した場合、各Consumerで全てのメッセージが取得される
- GroupIdが同じConsumerを起動した場合、Group内で1つのConsumerでメッセージが取得される
- Consumerの接続情報はZooKeeper上にGroupId別に保存される
次回はKafka0.8.1がリリースされたため、追加された機能の一覧を作成するか、
または別の機能を試してみるか・・とまぁ、それはその時にでも。