夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Kafka概要確認(その11 メッセージを保存するログファイル

こんにちは。今日は実際にメッセージを保存しているログファイルの形式と形態について述べた章になります。
これまで述べられたメッセージがどういう形式でディスク上に保存されているかですね。

18.ログ

"my_topic"というトピック名称を持つ2パーティションを持つトピックは"my_topic_0"と"my_topic_1"という
2つのディレクトリと、メッセージを保持するファイル群から構成される。
ログファイルのフォーマットは"ログエントリ"の配列となっている。
ログエントリは、メッセージ長(NByte)を4Byteの整数で保持している。
各メッセージは対象トピック/パーティション上の全メッセージ中の開始位置であるオフセットを64ビット整数で保持しており、
そのオフセットによって一意に識別される。

ディスク上に保存されている際のメッセージフォーマットは以下の通り。
各ログファイルはファイル中に保持する最初のメッセージのオフセット値を名称に保持する。
そのため、最初に作成されるログファイル名称は「00000000000.kafka」となる。
以後の各ファイルの名称は大体S毎の間隔を持つログファイルとなる。
(ここでのSは設定ファイルで設定された最大ログファイルサイズの値)

このバイナリフォーマットを標準インタフェースとして厳密に維持することで、
Producer、Broker、Consumer間で通信する際に再コピーや変換を最小限に抑えている。

ディスク上のフォーマットは下記。

message length : 4 bytes (value: 1+4+n) 
"magic" value  : 1 byte
crc            : 4 bytes
payload        : n bytes

通常、メッセージのIDとしてオフセットは使用されない。
私たちの元々のアイディアはProducerによって生成されたGUIDを用い、
各BrokerでGUID→オフセットのマッピングを保持するというものだった。

しかし、上記の方式を採用した場合Consumerは各サーバ(Broker?)のIDを維持しなければならない関係上、
GUIDの一意性を確保しても意味がなくなる。
さらに、GUID→オフセットのマッピングを維持するための複雑さは
ディスクに常時同期させる必要があり、かつランダムアクセスが必要となる重いインデックス構造を必要とする。

そこで私たちはこのルックアップ構造を単純化するために
パーティション/ノードIDとペアで用いることでメッセージを一意に識別可能なアトミックなカウンタを用いることにした。
この方針変更によってConsumerから複数のシークが並列で走る状況であってもルックアップ構造は単純化された。

単調増加し、パーティション毎にユニークに割り振られるオフセットを用いることにより、
オフセットを用いてメッセージに直接飛ぶ構造が単純化され、固定化された処理となった。
=====
この辺、訳がかなり怪しいです・・・
=====

尚、オフセットはConsumerAPIから隠されているため、実装の詳細は後から変更でき、
私たちはより効率的なアプローチにするため対応を続けている。

  • 書込

ログにデータを追加する際、常に最後のファイルに直接追加することが可能。
ログファイルサイズが1GBのような設定可能な最大サイズに達した場合、次のファイルが生成される。
ログは2つのパラメータを取る。
M:OSに対して強制的にフラッシュするまでのメッセージ数
S:OSに対して強制的にフラッシュするまでの時間

上記の設定によって、システムのクラッシュが発生した際に失われる最大のメッセージ数/メッセージの配信期間を設定することが可能。

  • 読込

読み込みはオフセットと最大読み込みチャンクサイズSを渡すことで行われている。
読み込みの結果、SByteのメッセージを含むメッセージのイテレータが返される。
Sの値はどのメッセージよりも大きくなることを想定しているが、イベントが通常でない大きなサイズとなっていた場合、
読込はメッセージの読み込みが成功するまで複数回実施され、実行された回数だけSByteのメッセージ領域が合計して確保される。
最大メッセージ数/最大メッセージサイズはサーバが一定サイズを超えたメッセージを受信した際に拒否する形で実現される。
これはConsumerが受信可能な最大サイズをオーバーしないために設定されている。
もし受信側が1メッセージの途中で受信が終了してしまった場合、ログエントリのデリメータが発生しないためすぐに検知できる。

オフセットを用いた実際の読み込みプロセスはまずはじめにどのファイルに該当のメッセージが含まれるかを特定する。
その上でファイル名に記述されたオフセットと読込対象オフセットの差分を取って読込場所を特定する。
この検索処理はファイルごとに管理されるメモリに対して単純なバイナリサーチのバリエーションアルゴリズムによる検索を実施することで実現している。

このログの方式によってKafkaは「今書き込まれた」ファイルも含めてConsumerの取得対象にすることができる。
この方式はSLAとして設定したパラメータ内でCosumer側がログの処理に失敗した場合にも有用。
尚、この方式は存在しないオフセットを指定した場合にはOutOfRangeException を発生させるためその例外を持ってハンドリングを行うこと。

Consumerに送信されるメッセージのフォーマットは下記。

MessageSetSend (fetch result)

total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes


MultiMessageSetSend (multiFetch result)

total length     : 4 bytes
error code       : 2 bytes
messageSetSend 1
...
messageSetSend n
  • 削除

ログを削除する際、同時に1つのログセグメントを削除する。
ログマネージャには各ファイルが削除対象かどうかを判断するためのポリシーをプラグイン方式で追加することが可能。
現状では「最終更新時刻がN日以上前」か、「保持するログの量が新しい方からNGB保持し、それ以外を削除」という方式が有用だと判断している。
削除対象となったファイルに対して書込み/読み込みが競合するようなパターンを回避するため、セグメントリストの実装にコピーオンライト方式を取っている。
ログマネージャでは削除処理が走っている間はイミュータブルなセグメントリストのスナップショットを作成し、
イナリサーチアルゴリズムはそちらを参照するようにしている。

  • 保証

ログ出力時、ファイルに強制的にフラッシュをかける保持メッセージ数Mを設定項目として設定可能。
Kafkaのプロセス起動時、ログのリカバリプロセスが走り、
最新のログセグメント中に保持されているメッセージエントリが有効であることを確認している。

メッセージエントリは下記の条件を満たす場合正常と判断する。

  1. メッセージサイズとオフセット値の合計値がファイルサイズより小さい
  2. 保存しているCRC32チェックサム値が保持しているメッセージペイロードチェックサム結果と一致している

メッセージの破棄はファイル中で一番最後の正常メッセージがあったオフセット以降を
削除することで行われる。

尚、2種類の問題に対処する必要があることに留意すること。

  1. システムがクラッシュしたため不正なメッセージが末尾に残っている
  2. 不正なメッセージ部が記述されている

このような問題に対処する必要がある理由として、一般的にOSはファイルのinodeと実体データの書き込み順同期は保証していないため。
結果、inodeを更新したが実体データの更新に失敗した場合、サイズは大きいが実際には中身がないといった不整合が発生する。
CRCチェックサムはこういった不整合パターンへの対処のために搭載し、ログ破壊の波及を防いでいる。

=====
これでメッセージが実際にどういう形でログファイルとして保存されるかについても確認完了。
残るは状態管理とリバランスのアルゴリズムとなります。
そろそろ終わりが近いですね。
=====