Apache Kafkaを動かしてみる
こんにちは。
これまで以下のような記事で実際にKafkaとは何か、や
実際にビルドしてみていまいちうまくいかないとかを繰り返してきましたが、
本家サイト側でついに最新版である0.8.0(β)のバイナリリリースが公開されたということもあり、
実際に環境を構築して動作を確認してみることにします。
■Apache Kafka 概要(Design)和訳まとめ
http://d.hatena.ne.jp/kimutansk/20130520/1369064154
■Apache KafkaのレプリケーションComments
http://d.hatena.ne.jp/kimutansk/20130706/1373068963
■Apache Kafka 0.8.0の新機能/変更点
http://d.hatena.ne.jp/kimutansk/20130703/1372803004
■Apache Kafkaの最新版をビルドしてみる
http://d.hatena.ne.jp/kimutansk/20130526/1369567169
あ、なので、そもそもApache Kafkaが何かということについては上記の記事群を確認してください。
1.前提ソフトウェアのインストール/設定
Apache Kafkaを使うにあたって、以下の環境が必要になります。
- Java(バージョンは明記されていませんが、おそらくJDK6以降)
- ZooKeeper(こちらもバージョン不明ですが、Kafkaの媒体に含まれてるZooKeeperは「3.3.4」であることからおそらくそれ以降)
なので、上記の環境をまずインストールしておきます。
// JDKインストール yum install java-1.7.0-openjdk-devel // ZooKeeperインストール wget http://archive.cloudera.com/cdh4/redhat/6/x86_64/cdh/cdh4-repository-1-0.noarch.rpm rpm -ivh cdh4-repository-1-0.noarch.rpm yum install zookeeper-server
その上で、ZooKeeperの保存先ディレクトリなどを設定します。
尚、/homeディレクトリ配下にファイルを集約しているのはCentOS6.4のデフォルトでインストールした結果、
/homeディレクトリ配下に大部分の領域が割り当てられているためです。
アプリケーションログ位は許容範囲なんですが、ZooKeeperのトランザクションログをさすがに出力するのは厳しい、といった感じです。
- /usr/lib/zookeeper/conf/zoo.cfg
# the directory where the snapshot is stored. dataDir=/var/lib/zookeeper # 変更:/var/lib/zookeeper → /home/zookeeper/data # the port at which the clients will connect clientPort=2181 server.1=192.168.100.210:2888:3888 # 追加(インストール先のサーバアドレス)
その上でディレクトリを初期化し、トランザクションログのクリア処理を定期実行させます。
// ZooKeeperデータディレクトリ初期化 mkdir -p /home/zookeeper/data chown zookeeper:zookeeper -R /home/zookeeper sudo -u zookeeper /usr/lib/zookeeper/bin/zkServer-initialize.sh --myid=1 /home/zookeeper/data // ZooKeeperトランザクションログクリア処理追加 crontab -u zookeeper -e → 「0 3 * * * /usr/lib/zookeeper/bin/zkCleanup.sh /home/zookeeper/data -n 3」を追加 // ZooKeeper起動 service zookeeper-server start
これでKafkaのインストール準備は完了です。
2.Kafkaのインストール/設定
と言いつつ、Kafkaのインストール自体はダウンロードして展開するだけで完了です。
// Kafkaインストール cd /opt wget https://dist.apache.org/repos/dist/release/kafka/kafka_2.8.0-0.8.0-beta1.tgz tar xvzf kafka_2.8.0-0.8.0-beta1.tgz ln -s /opt/kafka_2.8.0-0.8.0-beta1 kafka
そのため、設定項目の確認/起動確認に入ります。
2-1.KafkaBroker
まずはKafkaBrokerから。媒体内ではserver、という名称で呼称されているようです。
- /opt/kafka/config/server.properties
# Hostname the broker will bind to and advertise to producers and consumers. # If not set, the server will bind to all interfaces and advertise the value returned from # from java.net.InetAddress.getCanonicalHostName(). host.name=chronos # 変更:ホスト名設定 # The directory under which to store log files log.dir=/home/kafka/kafka-logs #変更:/tmp/kafka-logs → /home/kafka/kafka-logs # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=192.168.100.210:2181 #変更:localhost → 192.168.100.210 # metrics reporter properties kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/home/kafka/kafka_metrics #変更:/tmp/kafka_metrics→ /home/kafka/kafka_metrics # Disable csv reporting by default. kafka.csv.metrics.reporter.enabled=true #変更:false → true
では、実際に起動してみます。すると・・・
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties & # 5秒に1回例外発生 java.io.IOException: Unable to create /home/kafka/kafka_metrics/NumDelayedRequests.csv at com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(CsvReporter.java:141) at com.yammer.metrics.reporting.CsvReporter.getPrintStream(CsvReporter.java:257) at com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22) at com.yammer.metrics.reporting.CsvReporter$1.getStream(CsvReporter.java:156) at com.yammer.metrics.reporting.CsvReporter.processGauge(CsvReporter.java:229) at com.yammer.metrics.reporting.CsvReporter.processGauge(CsvReporter.java:22) at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28) at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
起動はするのですが、5秒に1回、つまりメトリクスを出力する度に例外が発生します。
ただ、実際に「/home/kafka/kafka_metrics/NumDelayedRequests.csv」の中身を見てみると、
以下のようになっており、メトリクス情報自体は出力されているようです。
まだβ版ということもあり、メトリクス出力部分はいまいち精度が低いのかもしれません。こちらはoffにして進めることにします。
# time,value 5,0 10,0 15,0 20,0
- /opt/kafka/config/server.properties
# metrics reporter properties kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/home/kafka/kafka_metrics #変更:/tmp/kafka_metrics→ /home/kafka/kafka_metrics # Disable csv reporting by default. kafka.csv.metrics.reporter.enabled=true #変更:true → false
2-2.KafkaProducer
次はKafkaProducerです。
0.7.0系までと違って予めトピックを作成する必要があるため、作成後、投入を行います。
// トピック作成 cd /opt/kafka bin/kafka-create-topic.sh --zookeeper 192.168.100.210:2181 --replica 1 --partition 1 --topic KafkaTest // トピック確認 bin/kafka-list-topic.sh --zookeeper 192.168.100.210:2181 topic: KafkaTest partition: 0 leader: 0 replicas: 0 isr: 0
作成できたことを確認したため、実際にメッセージの投入を行います。
// メッセージ投入 /opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.100.210:9092 --topic KafkaTest TestMessage001 TestMessage002
結果、Kafkaのログディレクトリにファイルが作成されていることを確認しました。
ls -lR /home/kafka/kafka-logs/ /home/kafka/kafka-logs/: 合計 8 drwxr-xr-x. 2 root root 4096 8月 4 19:36 2013 KafkaTest-0 -rw-r--r--. 1 root root 18 8月 4 19:41 2013 replication-offset-checkpoint /home/kafka/kafka-logs/KafkaTest-0: 合計 4 -rw-r--r--. 1 root root 10485760 8月 4 19:36 2013 00000000000000000000.index -rw-r--r--. 1 root root 80 8月 4 19:38 2013 00000000000000000000.log
実際のファイルの中身は以下のようになっています。
.logファイルに生のログが保存されていることがわかりますね。
特に設定していないため、圧縮などは行われていないようです。
(index/logはcatで見ると表示が崩れるためodコマンドで確認してます)
// checkpointファイル確認 cat /home/kafka/kafka-logs/replication-offset-checkpoint int 0 1 KafkaTest 0 2 // indexファイル中身確認 od -c /home/kafka/kafka-logs/KafkaTest-0/00000000000000000000.index 0000000 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 * 50000000 //logファイル中身確認 od -c /home/kafka/kafka-logs/KafkaTest-0/00000000000000000000.log 0000000 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 034 345 n t \0 0000020 \0 \0 377 377 377 377 \0 \0 \0 016 T e s t M e 0000040 s s a g e 0 0 1 \0 \0 \0 \0 \0 \0 \0 001 0000060 \0 \0 \0 034 | g % 272 \0 \0 377 377 377 377 \0 \0 0000100 \0 016 T e s t M e s s a g e 0 0 2 0000120
2-3.KafkaConsumer
では、最後にKafkaConsumerです。
と言いつつ、後はただ持ってくるだけなので、取得するのみですね。
// メッセージ取得 /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.100.210:2181 --topic KafkaTest --from-beginning TestMessage001 TestMessage002
Producer側で投入したメッセージが取得可能であることを確認しました。
その後Producer側でメッセージを再度投入すると再度表示されます。
尚、Consumerを接続した結果、ZooKeeper上に以下のノードが生成されていました。
その上でどこまでのメッセージを取得したか・・・等の情報が記録されているようです。
「/consumers/console-consumer-25801」
と、こういう形でとりあえず動かすことはできました。
試してみたいことは今後たくさんありますが、まずは第一歩ということで^^