SSHを通してKafkaBrokerに対するデータ送受信を行う方法(解決編
こんにちは。
前回(前の投稿に非ず)SSHポートフォワーディングを行った結果Producerからの投入に失敗してしまいました(汗
今回はその解決エントリになります。
1.投入失敗した原因
メーリングリストで質問しながらログを確認したところ、
Kafka起動後に下記のようなログが継続して出力されていることがわかりました。
■logs/controller.log
[2014-04-09 23:55:07,709] ERROR [Controller-0-to-broker-0-send-thread], Controller 0's connection to broker id:0,host:localhost,port:19092 was unsuccessful (kafka.controller.RequestSendThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-09 23:55:08,009] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 7 failed to send UpdateMetadata request with correlation id 3 to broker id:0,host:localhost,port:19092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
KafkaBrokerに含まれる「Controller」スレッドがadvertised.host.name/advertised.portに
つなぎに行って接続できず、結果KafkaBrokerの初期化が完了しない・・ように見えます。
つまり、advertised.host.name/advertised.portによって変更される接続先は下記3つというわけでした。
- KafkaProducer
- KafkaConsumer
- KafkaBrokerの中のControllerスレッド(見逃しており、動作せず)
KafkaにおけるClientとは、上記3つのプロセス/スレッドを指しているようです。
図に示すと下記みたいな感じでしょうか。
2.どのようにしたら解決できるのか?
前の節であったように、3つのClientから同じホスト/ポートでアクセス可能にする必要があります。
そのため、設定としては下記のようにする必要があるでしょう。
■server.properties
この設定変更により、「Controller」スレッドがKafkaBrokerにアクセス可能になる。
advertised.host.name=localhost advertised.port=9092
■SSHポートフォワーディング
下記のようにポートフォワーディングを行うことで、KafkaProducer/KafkaConsumerがKafkaBrokerにアクセス可能になる。
localhost:9092 > 192.168.100.XXX:9092 localhost:12181 > 192.168.100.XXX:2181
3.設定変更後確認してみると・・?
「2」での設定を行い、再度確認を行います。
まず、KafkaBrokerの起動を行った後、
「logs/controller.log」に接続できない旨のログが出力されなくなっていることは確認出来ました。
次は、Producer/Consumerから接続してみると・・?
■ポートフォワーディング用コンソール
# ssh root@XXX.XXX.XXX.XXX -L 19092:XXX.XXX.XXX.XXX:9092 -L 12181:XXX.XXX.XXX.XXX:2181 // KafkaBrokerとZooKeeperに対してポートフォワーディング
- Producer data put
========================
# ./kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
TestMessage
========================
■Producer用コンソール
# ./kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. TestMessage
■Consumer用コンソール
# ./kafka-console-consumer.sh --zookeeper localhost:12181 --topic TestTopic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. TestMessage
4.まとめ
無事ポートフォワーディング越しにKafkaBrokerに対してデータの送受信を行うことができました。
ただ、こういうある意味強引な手法を使った場合、KafkaBrokerをクラスタ化する際に問題が発生しそうではあります。
実際、Kafkaのメーリングリストでは今回の手法はよろしくない手法で、
KafkaProducer/KafkaConsumerからKafkaBrokerに対してアクセスを行う際に
通信をProxyなどで中継することで解決する方がいい、とコメントを頂きました。
実際その通りだとは思います。
クラスタ化したい場合は今回のポートフォワーディングを適用するのは1Brokerにして、
適用したBrokerからKafkaBrokerクラスタ中の他のクラスタに改めてデータ同期ツールを使って
同期する、などの対応を行った方がよさそうです。
あとは、現在Kafkaのメーリングリストでは下記にあるKafka用のSSH認証のブランチを
マージしよう、という流れも出てきているので、それを待つか・・・ですかね。
https://github.com/relango/kafka/tree/kafka_security
ともあれ、とりあえず何とかなることは確認できました。
もし暫定対処であっても良ければ、今回の方法でSSHを通して投入することが可能です。