読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Kafka概要確認(その6 メッセージ消費状態管理

こんにちは。とりあえず続きです。

10.メッセージ消費側駆動の状態管理方式

  • メッセージ配信セマンティクス

これまでからわかるように、メッセージ配信の保証方式として下記の3パターンが考えられる。

  1. 1回以下:前述した第1のケース。メッセージは提供後すぐ削除される。結果、消費側の障害によっては処理されないメッセージも発生する。
  2. 少なくとも1回:前述した第2のケース。各メッセージについて1回処理されることが保証されるが、失敗の場合は2回以上配信される。
  3. 1度だけ:何をしたいかによるが、各メッセージに対して「1回だけ」配信されることが保証されるケース

これらはつまりは「トランザクションの問題」のバリエーション。
2または3相コミット、Paxosのバリエーションのアルゴリズムによって「厳密に1回だけ」処理させることも可能。
ただし、いくつかの欠点も存在する。

上記のアルゴリズムを用いた場合複数回の往復通信が必要となり、
かつ特に時間制限も存在しないアルゴリズムのため状況次第では完了しないまま放置されるようなこともあり得る。

分散同意アルゴリズムには上記のような制約がどうしても発生してくる。

Kafkaではこれらのメタデータに対して2つ珍しい対処を行っている。
1つ目として、ストリーム(データの流れ)はBrokerプロセスごとに全く異なるパーティションに分割している。
これらのパーティションの意味と、メッセージをどのパーティションに送信するかはProducerプロセスに委ねられている。
同一パーティション内のメッセージはBrokerプロセス内で受信順にソートされ、同じ順でConsumerプロセスに提供される。

これはつまりは各メッセージごとに状態管理(消費されたか、まだか)をするのではなく、
Consumer、トピック、パーティションごとにのみ状態管理をすればいいことを示している。

そのため、メッセージの消費状態は個々のメッセージに対してではなく
Consumer、トピック、パーティションについて保存される。
これをオフセットと呼び、この機構のおかげでKafkaはメッセージの消費状態について非常に小さいデータで管理可能となる。
詳細については実装セクション参照。
=====
これがかなり飛んだ感じにしか訳せませんでした。
Consumerはさておき、トピック、パーティションの包含関係が明確になっていないため、わかりません。
現状わかっている情報としては、
「トピックの中に複数のメッセージが含まれる」
パーティションの中に複数のメッセージが含まれる」
パーティション内ではメッセージの順序が維持される」の3点のみ。
トピックとパーティションの関係が1対Nなのか、N対1なのか、N対Nなのか・・・
ともあれ、読み進めてみます。
=====

  • メッセージの消費状態管理

KafkaにおいてはConsumer側でメッセージをどこまで消費したかの状態(オフセット)を維持する必要がある。
KafkaのConsumerライブラリは基本自分がどこまでメッセージを処理したかの「状態値」をZookeeperに書き込む。
しかしながら、状態値をConsumerが処理結果を書き込むデータストアと同じデータストアに書き込むことが有益かもしれない。

例えば、Consumerはいくつかの集計値を集中型のトランザクショナルOLTPデータベースに入力することもできる。
このケースにおいてはConsumerはどこまでメッセージを処理したかの状態をデータベース更新と同トランザクションとして反映することもできる。
トランザクションで実施することにより分散部分がなくなり、分散同意問題をそもそも発生させなくすることにつながる。
類似の対処はトランザクショナルでは無いデータストアにおいても良好に動作する。
=====
確かに、その通りなんですが、Kafkaはそこまで柔軟にオフセット保存先を切り替えることができるんでしょうか・・・?
こちらについても読み進めてみないとわかりませんね。
=====

検索システムはメッセージの処理状態を検索インデックスと共に保存することができる。
それは耐久性の保証を提供しない場合があるけれども「メッセージのオフセットと検索インデックスを同期させている」
そのため、検索インデックスを出力する場合にクラッシュして消えた場合、
「検索インデックス出力前のオフセット」から開始可能なため、保存した最新の状態から再開することが可能。
=====
なんか、このあたり訳が限りなく怪しいですね・・・
=====

同様に、HadoopでKafkaからのパラレルロードを実行している当社のシステムにおいては
似たような状態同期と復旧を行うことが可能。
個々のマッパーはマップタスクの終了時にHDFSへの最後の消費メッセージのオフセットを書き込む。
HDFSに格納されたオフセットからジョブが失敗して再起動される場合、各マッパーは単に再起動すればよい。

この設計方針を取ったことにより、副次的な利点も生じている。
Consumerはオフセットを古い値に巻き戻すことにより、前の状態から再度データを消費することが可能。
これにより、一般的なキューの概念とは異なるが、消費者サイドから見ると都合がいい動作が実現可能。
例えば、消費する側のコードにバグがあり、メッセージを処理した後にバグが修正された場合、
バグが発現する前のオフセットから再スタートすることによりメッセージを再処理させることが可能。

=====
このあたりはなるほど、と思います。
一般的なメッセージキューとは異なる方式を取り、
消費者側でオフセットを管理することになったために出来るようになったこともあるということですね。
で、元々Kafkaはローカルに大量にデータをため込んでいたとしても、
ディスクの容量内に収まっていればアクセスする際のコストは変わらないという。

そんな関係からKafkaはBrokerでデータを保持できるだけ保持し、
溢れそうになったらオフセットなどは関係なく既に終わっているはずのデータを削除可能・・・
ということなんでしょうかね。
=====