SSHを通してKafkaBrokerに対するデータ送受信を行う方法(失敗
こんにちは。
Kafkaのようなメッセージ収集サーバを使用していると、
遠隔地からもデータの送受信を行いたい、というケースが出てきます。
ですが、Kafkaって通信は基本的に生のデータをネットワークに投入するので、
それはそれで気になるところがありますよね。
そのため、SSHを通してKafkaクラスタにデータを投入したい・・・という動機が出てきます。
#あるということにしてください^^;
ですが、Kafkaクラスタに対してProducerからデータを投入する場合、下記のような流れとなっています。
- ProducerがKafkaクラスタに対してBrokerのアドレスを問い合わせる
- KafkaクラスタがProducerに対してBrokerのアドレスを返す
- Producerは2で返されたBrokerのアドレスに対してデータの投入を行う
そのため、素直にSSHのポートフォワーディングを介してKafkaクラスタにデータを投入しようとすると、
下記のような流れとなり、問題が発生します。
- SSHポートフォワーディングを設定する
- Producerがフォワーディングされた通信路を通してKafkaクラスタにBrokerのアドレスを問い合わせる
- KafkaクラスタがProducerに対してフォワーディングされないBrokerのアドレスを返す
- Producerは3で返されたBrokerのアドレスに対してデータの投入を行い、フォワーディングされないアドレスのため失敗する
Brokerのアドレスをどれか1つ知っていればクラスタ全体のアドレスを知ることができ、
クラスタの構成変更にも追従できる・・という柔軟性と引き換えに、
この手の小細工(?)が行いにくくなっているわけですね。
そのため、MLで質問したところ、
「advertised.host.name」「advertised.port」という設定を行えば
「KafkaBrokerからKafkaProducerに通知するアドレス」を変更できるという回答をもらいました。
そのため、今回は実際にそれを試してみようと思います。
1.前提条件
前提条件として、Kafka0.8.1のインストールを行っておきます。
若干設定ファイルやコマンドの構成が変わっていますが、Kafka0.8.0と手順自体は同じです。
2.設定
まず、接続側のマシンからlocalhost:19092からKafkaクラスタサーバの9092に
ポートフォワーディングを行う前提とし、下記のように設定を行います。
log.dirs=/tmp/kafka-logs #advertised.host.name=<hostname routable by clients> #advertised.port=<port accessible by clients>
↓
log.dirs=/opt/kafka/kafka-logs advertised.host.name=localhost advertised.port=19092
3.実行してみますが・・・?
では、KafkaBrokerプロセスを起動し、実際にSSHポートフォワーディングを通してアクセス可能か試してみます。
なお、クラスタ外からSSHポートフォワーディングで操作可能であることを確認するため、
ZooKeeperに対してもSSHポートフォワーディングを行い、その上で確認を行っています。
■ポートフォワーディング用コンソール
# ssh root@XXX.XXX.XXX.XXX -L 19092:XXX.XXX.XXX.XXX:9092 -L 12181:XXX.XXX.XXX.XXX:2181 // KafkaBrokerとZooKeeperに対してポートフォワーディング
■操作用コンソール
# cd /opt/kafka/bin # ./kafka-topics.sh --create --zookeeper localhost:12181 --replication-factor 1 --partitions 3 --topic TestTopic Created topic "TestTopic". // ZooKeeperに対してポートフォワーディングし、Topicを作成成功。 # ./kafka-topics.sh --list --zookeeper localhost:12181 TestTopic // 実際にTopicが作成されていることを確認 # ./kafka-console-producer.sh --broker-list localhost:19092 --topic TestTopic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Test [2014-04-06 20:42:01,352] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-04-06 20:42:01,364] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-04-06 20:42:01,365] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic (kafka.producer.async.DefaultEventHandler) [2014-04-06 20:42:01,476] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-04-06 20:42:01,481] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-04-06 20:42:01,481] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic (kafka.producer.async.DefaultEventHandler) [2014-04-06 20:42:01,589] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-04-06 20:42:01,595] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-04-06 20:42:01,595] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic (kafka.producer.async.DefaultEventHandler) [2014-04-06 20:42:01,709] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartit ionInfo) [2014-04-06 20:42:01,719] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-04-06 20:42:01,719] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic (kafka.producer.async.DefaultEventHandler) [2014-04-06 20:42:01,826] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-04-06 20:42:01,829] ERROR Failed to send requests for topics TestTopic with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2014-04-06 20:42:01,830] ERROR Error in handling batch of 2 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
・・・という形で、BrokerからTopic情報の取得に失敗した、となり、投入を行うことができませんでした・・・
尚、advertisedの設定を行わない場合は同じ操作を行っても下記のエラーとなるため、
advertisedの設定によって「3. Producerは2で返されたBrokerのアドレスに対してデータの投入を行う」の
タイミングにおける接続先は変わっているのは確かなのですが。
Test [2014-04-06 21:11:44,231] ERROR Producer connection to localhost:9092 unsuccessful (kafka.producer.SyncProducer) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [2014-04-06 21:11:44,238] WARN Failed to send producer request with correlation id 2 to broker 0 with data for partitions [TestTopic,0] (kafka.producer.async.DefaultEventHandler) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)