夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

KafkaクラスタでProducerからメッセージを投入した時の動作

こんにちは。

前回に引き続き、動作を確認していきます。
今回はProducerからメッセージを投入した時の動作について、です。

確認する観点としては下記あたりが考えられますので、一つ一つ確認していきます。
レプリケーションあり、なしでも変わってきそうですね。

  1. 投入した時のメッセージの保存先確認
  2. 投入した時のチェックポイントファイル確認

1-1.Producerのコマンド確認

まずはProducer用コマンドのパラメータを確認しておきます。

Producer

ProducerではTopicとBrokerListが必須指定になるようですね。
その他は・・syncやcompress、キューサイズやシリアライザあたりが実際にProducer作るにあたっては指定するパラメータになりそうです。
文字列でなくてオブジェクトをシリアライズして送るという選択肢も出てきそうではありますね。

# cd /opt/kafka
# bin/kafka-console-producer.sh
Missing required argument "[topic]"
Option                                  Description
------                                  -----------
--batch-size <Integer: size>            Number of messages to send in a single batch if they are not being sent synchronously. (default: 200)
--broker-list <broker-list>             REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.
--compress                              If set, messages batches are sent compressed
--key-serializer <encoder_class>        The class name of the message encoder implementation to use for serializing keys. (default: kafka.serializer.StringEncoder)
--line-reader <reader_class>            The class name of the class to use for reading lines from standard in. By default each line is read as a separate message.
                                        (default: kafka.producer.ConsoleProducer$LineMessageReader)
--message-send-max-retries <Integer>    Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them.
                                        This property specifies the number of retires before the producer give up and drop this message. (default: 3)
--property <prop>                       A mechanism to pass user-defined properties in the form key=value to the message reader.
                                        This allows custom configuration for a user-defined message reader.
--queue-enqueuetimeout-ms <Long: queue  Timeout for event enqueue (default: 0)
  enqueuetimeout ms>
--queue-size <Long: queue_size>         If set and the producer is running in asynchronous mode,
                                        this gives the maximum amount of  messages will queue awaiting suffient batch size.(default: 10000)
--request-required-acks <Integer:       The required acks of the producer requests (default: 0)
  request required acks>                 
--request-timeout-ms <Integer: request  The ack timeout of the producer requests. Value must be non-negative and non-zero (default: 1500)
  timeout ms>                             
--retry-backoff-ms <Long>               Before each retry, the producer refreshes the metadata of relevant topics. 
                                        Since leader election takes a bit of time, this property specifies the amount of time that
                                        the producer waits before refreshing the metadata. (default: 100)
--socket-buffer-size <Integer: size>    The size of the tcp RECV size. (default: 102400)
--sync                                  If set message send requests to the brokers are synchronously, one at a time as they arrive.
--timeout <Long: timeout_ms>            If set and the producer is running in asynchronous mode,
                                        this gives the maximum amount of time a message will queue awaiting suffient batch size.
                                        The value is given in ms. (default: 1000)
--topic <topic>                         REQUIRED: The topic id to produce messages to.
--value-serializer <encoder_class>      The class name of the message encoder implementation to use for serializing values.
                                        (default: kafka.serializer.StringEncoder)

1-2.レプリケーション数1のTopicに対する投入

パラメータを確認したところで、実際にメッセージを投入してみます。
Producerを起動すると入力待ちとなるため、メッセージを入力することで送信されます。

# cd /opt/kafka
# bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic TopicRep1
TopicRep1_Message1

1通目のメッセージを送信しました。
すると、下記のようにパーティション2に割り振られ、蓄積されたのがわかります。

kafka3
# ls -lR /opt/kafka-logs
.:
合計 16
drwxr-xr-x. 2 root root 4096  227 07:04 2014 TopicRep1-2
drwxr-xr-x. 2 root root 4096  228 08:01 2014 TopicRep2-1
drwxr-xr-x. 2 root root 4096  228 08:01 2014 TopicRep2-2
-rw-r--r--. 1 root root   46  36 06:34 2014 replication-offset-checkpoint

./TopicRep1-2:
合計 4
-rw-r--r--. 1 root root 10485760  227 07:04 2014 00000000000000000000.index
-rw-r--r--. 1 root root       44  36 06:34 2014 00000000000000000000.log
(省略)

logファイルとチェックポイントファイルの中身は下記のようになっていました。

logファイル

圧縮をかけていないので、ヘッダ部の管理情報は読めませんが、
メッセージ部分はそのまま文字列が保存されているのがわかります。
#尚、圧縮をかけた場合、1メッセージごとに圧縮されます。
#そのため、1メッセージのサイズが小さい場合は圧縮は逆効果となります。

# vi /opt/kafka-logs/TopicRep1-2/00000000000000000000.log
^@^@^@^@^@^@^@^@^@^@^@ T1dx^@^@yyyy^@^@^@^RTopicRep1_Message1
replication-offset-checkpoint

パーティション1については1メッセージ分同期されましたよ、ということを記録しているようです。

# cat /opt/kafka-logs/replication-offset-checkpoint
0
3
TopicRep2 1 0
TopicRep2 2 0
TopicRep1 2 1

その後、コンソールからメッセージを投入しますが、下記のようにパーティションに配分されました。
パーティションの配分定義は設定していないため、デフォルトの配分ルールに従って配分されているようです。
配分ルールについては・・ソース読めばわかりますので次回以降に。

TopicRep1_Message1     : パーティション2
TopicRep1_Message2     : パーティション2
TopicRep1_TestMessage3 : パーティション2
Message4_TopicRep1     : パーティション0
MMessage5_TopicRep1    : パーティション0
NMessage6_TopicRep1    : パーティション0
PMessage7_TopicRep1    : パーティション0
MMMessage8_TopicRep1   : パーティション0
A_TopicRep1            : パーティション0
_____                  : パーティション0
message_TopicRep1      : パーティション2

投入された際のファイル更新は1メッセージ目と同じく、
logファイルに【管理情報(バイナリ)】+【メッセージ】のペアで保存されていました。
チェックポイントファイルも1メッセージごとに末尾の数字が1ずつインクリメントされている形でした。

これで大体どうファイルが保存されるかがわかったので、次はレプリケーションありのパターンです。

# cd /opt/kafka
# bin/kafka-console-producer.sh --broker-list kafka1:9092,kka2:9092,kafka3:9092 --topic TopicRep2
TopicRep2_Message1

投入するとリーダー、フォロワーの両方にファイルが蓄積され、
チェックポイントファイルが更新されていました。

kafka3(リーダー)
# ls -lR /opt/kafka-logs
(省略)
./TopicRep2-2:
合計 4
-rw-r--r--. 1 root root 10485760  228 08:01 2014 00000000000000000000.index
-rw-r--r--. 1 root root       44  36 06:59 2014 00000000000000000000.log
(省略)
# cat /opt/kafka-logs/replication-offset-checkpoint
0
3
TopicRep2 1 0
TopicRep2 2 1
TopicRep1 2 4
kafka1(フォロワー)
# ls -lR /opt/kafka-logs
(省略)
./TopicRep2-2:
合計 4
-rw-r--r--. 1 root root 10485760  228 08:01 2014 00000000000000000000.index
-rw-r--r--. 1 root root       44  36 06:59 2014 00000000000000000000.log
(省略)
# cat /opt/kafka-logs/replication-offset-checkpoint
0
3
TopicRep2 2 1
TopicRep1 0 7
TopicRep2 0 0

まとめ

Producerからメッセージを投入した時の動作をまとめると、下記の通りでした。

  1. ファイルは【Kafkaのログ保存ディレクトリ】/【Topic名】-【パーティション番号】/*/log ファイルに保存される
  2. レプリケーションされている場合、フォロワーのサーバにもメッセージが保存される
  3. メッセージを何通受信したか、はチェックポイントファイル(replication-offset-checkpoint)に保存される

次回はConsumer側でメッセージがどう分配して取得されるか、を確認してみます。