夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Kafkaを動かしてみる

こんにちは。

これまで以下のような記事で実際にKafkaとは何か、や
実際にビルドしてみていまいちうまくいかないとかを繰り返してきましたが、
本家サイト側でついに最新版である0.8.0(β)のバイナリリリースが公開されたということもあり、
実際に環境を構築して動作を確認してみることにします。

Apache Kafka 概要(Design)和訳まとめ
http://d.hatena.ne.jp/kimutansk/20130520/1369064154

Apache KafkaのレプリケーションComments
http://d.hatena.ne.jp/kimutansk/20130706/1373068963

Apache Kafka 0.8.0の新機能/変更点
http://d.hatena.ne.jp/kimutansk/20130703/1372803004

Apache Kafkaの最新版をビルドしてみる
http://d.hatena.ne.jp/kimutansk/20130526/1369567169

あ、なので、そもそもApache Kafkaが何かということについては上記の記事群を確認してください。

1.前提ソフトウェアのインストール/設定

Apache Kafkaを使うにあたって、以下の環境が必要になります。

  • Java(バージョンは明記されていませんが、おそらくJDK6以降)
  • ZooKeeper(こちらもバージョン不明ですが、Kafkaの媒体に含まれてるZooKeeperは「3.3.4」であることからおそらくそれ以降)

なので、上記の環境をまずインストールしておきます。

// JDKインストール
yum install java-1.7.0-openjdk-devel

// ZooKeeperインストール
wget http://archive.cloudera.com/cdh4/redhat/6/x86_64/cdh/cdh4-repository-1-0.noarch.rpm
rpm -ivh cdh4-repository-1-0.noarch.rpm
yum install zookeeper-server

その上で、ZooKeeperの保存先ディレクトリなどを設定します。
尚、/homeディレクトリ配下にファイルを集約しているのはCentOS6.4のデフォルトでインストールした結果、
/homeディレクトリ配下に大部分の領域が割り当てられているためです。
アプリケーションログ位は許容範囲なんですが、ZooKeeperのトランザクションログをさすがに出力するのは厳しい、といった感じです。

  • /usr/lib/zookeeper/conf/zoo.cfg
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper # 変更:/var/lib/zookeeper → /home/zookeeper/data
# the port at which the clients will connect
clientPort=2181
server.1=192.168.100.210:2888:3888 # 追加(インストール先のサーバアドレス)

その上でディレクトリを初期化し、トランザクションログのクリア処理を定期実行させます。

// ZooKeeperデータディレクトリ初期化
mkdir -p /home/zookeeper/data
chown zookeeper:zookeeper -R /home/zookeeper
sudo -u zookeeper /usr/lib/zookeeper/bin/zkServer-initialize.sh --myid=1 /home/zookeeper/data

// ZooKeeperトランザクションログクリア処理追加
crontab -u zookeeper -e
 → 「0 3 * * * /usr/lib/zookeeper/bin/zkCleanup.sh /home/zookeeper/data -n 3」を追加

// ZooKeeper起動
service zookeeper-server start

これでKafkaのインストール準備は完了です。

2.Kafkaのインストール/設定

と言いつつ、Kafkaのインストール自体はダウンロードして展開するだけで完了です。

// Kafkaインストール
cd /opt
wget https://dist.apache.org/repos/dist/release/kafka/kafka_2.8.0-0.8.0-beta1.tgz
tar xvzf kafka_2.8.0-0.8.0-beta1.tgz
ln -s /opt/kafka_2.8.0-0.8.0-beta1 kafka

そのため、設定項目の確認/起動確認に入ります。

2-1.KafkaBroker

まずはKafkaBrokerから。媒体内ではserver、という名称で呼称されているようです。

  • /opt/kafka/config/server.properties
# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
host.name=chronos # 変更:ホスト名設定

# The directory under which to store log files
log.dir=/home/kafka/kafka-logs #変更:/tmp/kafka-logs → /home/kafka/kafka-logs

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=192.168.100.210:2181 #変更:localhost → 192.168.100.210

# metrics reporter properties
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/home/kafka/kafka_metrics #変更:/tmp/kafka_metrics→ /home/kafka/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=true #変更:false → true

では、実際に起動してみます。すると・・・

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &

# 5秒に1回例外発生
java.io.IOException: Unable to create /home/kafka/kafka_metrics/NumDelayedRequests.csv
        at com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(CsvReporter.java:141)
        at com.yammer.metrics.reporting.CsvReporter.getPrintStream(CsvReporter.java:257)
        at com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22)
        at com.yammer.metrics.reporting.CsvReporter$1.getStream(CsvReporter.java:156)
        at com.yammer.metrics.reporting.CsvReporter.processGauge(CsvReporter.java:229)
        at com.yammer.metrics.reporting.CsvReporter.processGauge(CsvReporter.java:22)
        at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)
        at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)

起動はするのですが、5秒に1回、つまりメトリクスを出力する度に例外が発生します。
ただ、実際に「/home/kafka/kafka_metrics/NumDelayedRequests.csv」の中身を見てみると、
以下のようになっており、メトリクス情報自体は出力されているようです。
まだβ版ということもあり、メトリクス出力部分はいまいち精度が低いのかもしれません。こちらはoffにして進めることにします。

# time,value
5,0
10,0
15,0
20,0
  • /opt/kafka/config/server.properties
# metrics reporter properties
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/home/kafka/kafka_metrics #変更:/tmp/kafka_metrics→ /home/kafka/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=true #変更:true → false
2-2.KafkaProducer

次はKafkaProducerです。
0.7.0系までと違って予めトピックを作成する必要があるため、作成後、投入を行います。

// トピック作成
cd /opt/kafka
bin/kafka-create-topic.sh --zookeeper 192.168.100.210:2181 --replica 1 --partition 1 --topic KafkaTest
// トピック確認
bin/kafka-list-topic.sh --zookeeper 192.168.100.210:2181
topic: KafkaTest        partition: 0    leader: 0       replicas: 0     isr: 0

作成できたことを確認したため、実際にメッセージの投入を行います。

// メッセージ投入
/opt/kafka/bin/kafka-console-producer.sh  --broker-list 192.168.100.210:9092 --topic KafkaTest

TestMessage001
TestMessage002

結果、Kafkaのログディレクトリにファイルが作成されていることを確認しました。

ls -lR /home/kafka/kafka-logs/
/home/kafka/kafka-logs/:
合計 8
drwxr-xr-x. 2 root root 4096  8月  4 19:36 2013 KafkaTest-0
-rw-r--r--. 1 root root   18  8月  4 19:41 2013 replication-offset-checkpoint

/home/kafka/kafka-logs/KafkaTest-0:
合計 4
-rw-r--r--. 1 root root 10485760  8月  4 19:36 2013 00000000000000000000.index
-rw-r--r--. 1 root root       80  8月  4 19:38 2013 00000000000000000000.log

実際のファイルの中身は以下のようになっています。
.logファイルに生のログが保存されていることがわかりますね。
特に設定していないため、圧縮などは行われていないようです。
(index/logはcatで見ると表示が崩れるためodコマンドで確認してます)

// checkpointファイル確認
cat /home/kafka/kafka-logs/replication-offset-checkpoint

int
0
1
KafkaTest 0 2

// indexファイル中身確認
od -c /home/kafka/kafka-logs/KafkaTest-0/00000000000000000000.index

0000000  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0
*
50000000

//logファイル中身確認
od -c /home/kafka/kafka-logs/KafkaTest-0/00000000000000000000.log

0000000  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0  \0 034 345   n   t  \0
0000020  \0  \0 377 377 377 377  \0  \0  \0 016   T   e   s   t   M   e
0000040   s   s   a   g   e   0   0   1  \0  \0  \0  \0  \0  \0  \0 001
0000060  \0  \0  \0 034   |   g   % 272  \0  \0 377 377 377 377  \0  \0
0000100  \0 016   T   e   s   t   M   e   s   s   a   g   e   0   0   2
0000120
2-3.KafkaConsumer

では、最後にKafkaConsumerです。
と言いつつ、後はただ持ってくるだけなので、取得するのみですね。

// メッセージ取得
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.100.210:2181 --topic KafkaTest --from-beginning

TestMessage001
TestMessage002

Producer側で投入したメッセージが取得可能であることを確認しました。
その後Producer側でメッセージを再度投入すると再度表示されます。

尚、Consumerを接続した結果、ZooKeeper上に以下のノードが生成されていました。
その上でどこまでのメッセージを取得したか・・・等の情報が記録されているようです。
「/consumers/console-consumer-25801」


と、こういう形でとりあえず動かすことはできました。
試してみたいことは今後たくさんありますが、まずは第一歩ということで^^