夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

KafkaクラスタでConsumerを接続した際の動作

こんにちは。

Producerからメッセージを投入時の動作を確認した前回に引き続き、動作を確認していきます。
今回はConsumerがKafkaクラスタに接続した時の動作について、です。

1-1.Consumerのコマンド確認

前回と同じく、Consumerのコマンドを確認します。

重要になるのはgroup、topic、後は必須がZooKeeperURLとなっています。

# cd /opt/kafka
# bin/kafka-console-consumer.sh
[2014-03-18 07:22:07,118] ERROR Missing required argument "[zookeeper]" (kafka.utils.CommandLineUtils$)
Option                                  Description
------                                  -----------
--autocommit.interval.ms <Integer: ms>  The time interval at which to save the current offset in ms (default: 60000)
--blacklist <blacklist>                 Blacklist of topics to exclude from consumption.
--consumer-timeout-ms <Integer: prop>   consumer throws timeout exception after waiting this much of time
                                        without incoming messages (default: -1)
--csv-reporter-enabled                  If set, the CSV metrics reporter will be enabled
--fetch-size <Integer: size>            The amount of data to fetch in a single request. (default: 1048576)
--formatter <class>                     The name of a class to use for formatting kafka messages for display. 
                                        (default: kafka.consumer.DefaultMessageFormatter)
--from-beginning                        If the consumer does not already have an established offset to consume from,
                                        start with the earliest message present in the log rather than the latest message.
--group <gid>                           The group id to consume on. (default: console-consumer-9976)
--max-messages <Integer: num_messages>  The maximum number of messages to consume before exiting. If not set, consumption is continual.
--max-wait-ms <Integer: ms>             The max amount of time each fetch request waits. (default: 100)
--metrics-dir <metrics dictory>         If csv-reporter-enable is set, and this parameter isset, the csv metrics will be outputed here
--min-fetch-bytes <Integer: bytes>      The min number of bytes each fetch request waits for. (default: 1)
--property <prop>
--refresh-leader-backoff-ms <Integer:   Backoff time before refreshing metadata 
  ms>                                   (default: 200)
--skip-message-on-error                 If there is an error when processing a message, skip it instead of halt.
--socket-buffer-size <Integer: size>    The size of the tcp RECV size. (default: 2097152)
--socket-timeout-ms <Integer: ms>       The socket timeout used for the connection to the broker (default: 30000)
--topic <topic>                         The topic id to consume on.
--whitelist <whitelist>                 Whitelist of topics to include for consumption.
--zookeeper <urls>                      REQUIRED: The connection string for the zookeeper connection in the form host:port.
                                        Multiple URLS can be given to allow fail-over.

1-2.GroupIdを指定しない状態での起動

GroupIdを指定しない状態でConsumerを1プロセス起動します。

# cd /opt/kafka
# bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic TopicRep1

起動したタイミングだと特にメッセージの表示は行われません。
Console Consumer自体はデフォルトでは接続後に投入されたメッセージが表示されるようですね。

Consumer起動時のZooKeeperの状態

Consumerを起動したタイミングでZooKeeperには下記の情報が追加で登録されます。
指定したConsumerGroupId配下にConsumerの情報が保存されます。
これで各ConsumerがどのPartitionを読み込んでいるか管理しているようですね。

/  Zookeeperルートディレクトリ
└―consumers/
    └―console-consumer-52756 // ★Consumer Groupのノードが生成
        ├―offsets/ 
        |  └―TopicRep1/ // ★Partition毎の読んだ場所を保持
        |       ├―0  7 
        |       ├―1  0
        |       └―2  4
        | 
        ├―owners
        |  └―TopicRep1/ // ★Partition毎の読むConsumerIDを保持
        |       ├―0  console-consumer-52756_kafka1-1395095277646-2a649f63-0(ephemeral)
        |       ├―1  console-consumer-52756_kafka1-1395095277646-2a649f63-0(ephemeral)
        |       └―2  console-consumer-52756_kafka1-1395095277646-2a649f63-0(ephemeral)
        |
        └―ids/ // ★ ConsumerIDとConsumerの設定情報を保持
            └―console-consumer-52756_kafka1-1395095277646-2a649f63 { "pattern":"white_list", "subscription":{ "TopicRep1" : 1 }, "timestamp":"1395095277767", "version":1 }(ephemeral)

その後、同様にGroupIdを指定しないConsumerプロセスを2個起動します。
すると、下記のようにConsumerGroupが異なるツリーが生成されました。
ConsoleConsumer自体は一定の名称を持ったランダムのGroupIdを生成するようですね。

/  Zookeeperルートディレクトリ
└―consumers/
    |―console-consumer-52756/ (配下) // ★Kafka1で起動したConsumerのGroup情報
    ├―console-consumer-57100/ (配下) // ★Kafka2で起動したConsumerのGroup情報
    └―console-consumer-89259/ (配下) // ★Kafka3で起動したConsumerのGroup情報

1-3.GroupIdを指定しない状態でのメッセージ投入

では、上記の状態でメッセージを投入すると・・・下記のように全Consumerにメッセージが表示されました。
■Producer

TestMessage1

■Consumer1

TestMessage1

■Consumer2

TestMessage1

■Consumer3

TestMessage1

1-4.GroupIdを指定しない状態での起動

では、次はGroupIdを3Consumerとも同一の値を指定した状態で起動します。

# cd /opt/kafka
# bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic TopicRep1 -group KafkaGroup
Consumer起動時のZooKeeperの状態

この状態でZooKeeperの中を見てみますと、下記のようになっていました。

/  Zookeeperルートディレクトリ
└―consumers/
    └―KafkaGroup // ★Consumer Groupのノードが生成
        ├―offsets/ 
        |  └―TopicRep1/ // ★Partition毎の読んだ場所を保持
        |       ├―0  8 
        |       ├―1  0
        |       └―2  4
        | 
        ├―owners
        |  └―KafkaGroup/ // ★Partition毎の読むConsumerIDを保持
        |       ├―0  KafkaGroup_kafka1-1396390831658-54cedb19-0(ephemeral)
        |       ├―1  KafkaGroup_kafka2-1396390834405-2850b581-0(ephemeral)
        |       └―2  KafkaGroup_kafka3-1396390837607-e513d40d-0(ephemeral)
        |
        └―ids/ // ★ ConsumerIDとConsumerの設定情報を保持
            ├―KafkaGroup_kafka1-1396390831658-54cedb19 { "pattern":"white_list", "subscription":{ "TopicRep1" : 1 }, "timestamp":"1396390831759", "version":1 }
            ├―KafkaGroup_kafka2-1396390834405-2850b581 { "pattern":"white_list", "subscription":{ "TopicRep1" : 1 }, "timestamp":"1396390834504", "version":1 }
            └―KafkaGroup_kafka3-1396390837607-e513d40d { "pattern":"white_list", "subscription":{ "TopicRep1" : 1 }, "timestamp":"1396390837707", "version":1 }

今度はGroupを指定したため、特定のGroup配下に3つのConsumerの情報が集約された状態になっていますね。

1-5.GroupIdを指定した状態でのメッセージ投入

GroupIdを指定した状態でメッセージを投入すると・・・下記のよう対応したPartitionのOwnerであるConsumerにメッセージが表示されました。
何故かメッセージが全てpartition0に割り振られたため、確認としては微妙にバランス悪いですが(汗
■Producer

TestMessage01      // (partition=0)
TestMessage02      // (partition=0)
TestTestMessage    // (partition=0)
Message01          // (partition=0)
Message02          // (partition=0)
TestTest           // (partition=0)
ABCDEFG            // (partition=0)

■Consumer1

TestMessage01
TestMessage02
TestTestMessage
Message01
Message02
TestTest
ABCDEFG

■Consumer2

■Consumer3


まとめ

Consumerでメッセージを受信した時の動作をまとめると、下記の通りでした。

  1. GroupIdが異なるConsumerを起動した場合、各Consumerで全てのメッセージが取得される
  2. GroupIdが同じConsumerを起動した場合、Group内で1つのConsumerでメッセージが取得される
  3. Consumerの接続情報はZooKeeper上にGroupId別に保存される

次回はKafka0.8.1がリリースされたため、追加された機能の一覧を作成するか、
または別の機能を試してみるか・・とまぁ、それはその時にでも。