読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Kafka概要確認(その9 API

今回からは実際のAPIを確認していく形になります。

13.Producer側のAPI

Producer側のAPIは以下の2つの低レベルAPIをラップしている。

  1. kafka.producer.SyncProducer
  2. kafka.producer.async.AsyncProducer
class Producer {
  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData producerData);

  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List<kafka.javaapi.producer.ProducerData> producerData);

  /* Closes the producer and cleans up */	
  public void close();
}

このAPIとした目的は、クライアントへの単一のAPIを介してすべての生産機能を公開すること。
新しいProducerを作成する際には下記のAPIを記述する必要がある。

  • 1.複数のプロデューサーの要求をバッファリングして非同期ディスパッチ/キューイングを行うAPI

kafka.producer.Producerは(producer.type=async)と設定することで
シリアライズ/Brokerプロセスに対して配分する前に複数のリクエストをバッチ化する機能を提供する。
バッチのサイズは、いくつかの設定パラメータによって制御することができる。
イベントがキューに入った場合、「queue.time」「batch.size」のどちらかに達するまでキューでバッファリングされる。

バッファリングしたデータを処理する場合、
バックグラウンドスレッド(kafka.producer.async.ProducerSendThread)がキューからデータを取得し、
イベントハンドラ(kafka.producer.EventHandler)がデータのシリアライズと適切なBrokerプロセスへの送信を行う。

カスタムのイベントハンドラもevent.handlerの設定値にてプラグイン方式で追加することが可能。
この処理の流れをProduerパイプラインと呼び、各フェーズにコールバックやログ出力、
トレース用のメトリクス情報収集などの処理をプラグイン方式で追加する追加することが可能。

これらの処理は kafka.producer.async.CallbackHandler を継承したクラスを
設定値「callback.handler」に設定することで追加可能。

  • 2.エンコーダの指定

ユーザ側で下記のインタフェースを継承したエンコーダを指定する。

interface Encoder<T> {
  public Message toMessage(T data);
}

デフォルトでは何も実施しない「kafka.serializer.DefaultEncoder」クラスが使用される。

  • 3.Zookeeperベースの自動Brokerプロセス検知

Zookeeperベースの自動Brokerプロセス検知、ロードバランシングはZookeeperへの接続URLを
設定値「zk.connect」に設定することで使用可能。
ただ、いくつかのユースケースにおいてはZookeeperへの依存が不適切となるケースもある。

その場合はProducer側の「broker.list」に静的なBrokerリストを定義する形で対応可能。
この設定を行った場合、Producer要求はランダムBrokerパーティションに配分される。
もし送信時に対象のBrokerがダウンしていた場合、そのProducerリクエストは失敗する。
=====
ランダムBrokerパーティション・・・というとまるっきりランダムなのか、
他に要素があるかは気になるところですね。
=====

  • 4.必要に応じてパーティショナーを定義

ルーティングは下記のインタフェースを継承したクラスを定義することで実現可能。

interface Partitioner<T> {
   int partition(T key, int numPartitions);
}

このAPIはキーとパーティション数を渡して配分先のパーティションIDを取得する構成になっている。
このIDはProducerリクエストに対してBrokerパーティションを選択するbroker_idsと
パーティションリストへのインデックスとして使用される。

デフォルトのパーティショナーはキーのハッシュ値をnumPartitionsで剰余を取ることで配分を行う。
キーがnullの場合ランダムBrokerパーティションで配分を行う。
カスタムパーティショナーは設定値「partitioner.class」で定義が可能。

14.ConsumerのAPI

KafkaではConsumerのAPIとして2レベルのAPIを保持している。
低レベルの "Simple" APIは、単一のBrokerプロセスへの接続を維持し、
サーバに送信されるネットワーク要求に密接な対応を有している。

高レベルのAPIはConsumerからBrokerプロセスの詳細を隠し、背後のネットワークトポロジ等も気にすることなく
メッセージを取得することが可能となる。
また、自動的にConsumer側のオフセットを設定する。
加えて、高レベルAPIブラックリストホワイトリスト方式(正規表現を使用可能)でメッセージをフィルタリングして購読することが可能。
=====
こんな感じで高低のAPIを提供するあたりは好感持てます。
=====

低レベルAPIの構成は下記の通り。

class SimpleConsumer {
	
  /* Send fetch request to a broker and get back a set of messages. */ 
  public ByteBufferMessageSet fetch(FetchRequest request);

  /* Send a list of fetch requests to a broker and get back a response set. */ 
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);

  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

低レベルAPIは高レベルAPIを構成するために使用されるだけでなく、
Hadoop向けConsumer等のように状態管理が一部特殊となるConsumerの構成にも使用されている。

高レベルAPIの構成は下記の通り。

/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {
	
  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 

  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()
  
  /* Shut down the connector */
  public shutdown()
}

このAPIはKafkaStreamクラスをイテレータで束ねる形で実現している。
各KafkaStreamは、1〜n個のサーバ上の1〜n個のパーティションからのメッセージストリームを表す。
したがって、各KafkaStreamは複数パーティションからのデータを束ねて受信することができるが、
1パーティションから送信されるデータは1Streamオブジェクトにのみ送られる。
各KafkaStreamはシングルスレッド処理のために使用される。
クライアントは任意にストリームを作成することが可能。
=====
つまりはBroker側で1パーティションに対しては1個のStreamしか接続されないようになっているんですかね。
=====

createMessageStreamsメソッドを呼び出すことにより、Consumerをトピックに対して登録する。
結果、Consumer/Broker間でのリバランスが走ることになる。
API側ではリバランスを最小限にするためにcreateMessageStreamsメソッド実行時に
複数のトピックに対するStreamを生成することを推奨している。

createMessageStreamsByFilterメソッドはフィルタに一致するトピックを発見するWatcherオブジェクトを登録する。
フィルタで複数トピックを許容した場合、createMessageStreamsByFilterが返す複数トピックが返すメッセージは
「反復するケースがある」ということに注意すること。
=====
反復と表現しましたが、Iterate over messages・・・が元の英文でした。
これはつまりは複数トピックに対してフィルタをかけた場合、同一メッセージを複数回取得する可能性が
あるということなんでしょうか・・・?
と考えると、フィルタをかけてメッセージを取得する場合は単一トピックに対して実施するが無難な使用方法になりますが。
=====

=====
実動作的には確認する必要がありますが、Producer、Consumer共にAPI自体は非常に分かりやすいものでした。
これまで概念を見てきたからわかりやすいと思えるからなのかも知れませんが。
=====