夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

SSHを通してKafkaBrokerに対するデータ送受信を行う方法(失敗

こんにちは。

Kafkaのようなメッセージ収集サーバを使用していると、
遠隔地からもデータの送受信を行いたい、というケースが出てきます。

ですが、Kafkaって通信は基本的に生のデータをネットワークに投入するので、
それはそれで気になるところがありますよね。

そのため、SSHを通してKafkaクラスタにデータを投入したい・・・という動機が出てきます。
#あるということにしてください^^;

ですが、Kafkaクラスタに対してProducerからデータを投入する場合、下記のような流れとなっています。

  1. ProducerがKafkaクラスタに対してBrokerのアドレスを問い合わせる
  2. KafkaクラスタがProducerに対してBrokerのアドレスを返す
  3. Producerは2で返されたBrokerのアドレスに対してデータの投入を行う

そのため、素直にSSHのポートフォワーディングを介してKafkaクラスタにデータを投入しようとすると、
下記のような流れとなり、問題が発生します。

  1. SSHポートフォワーディングを設定する
  2. Producerがフォワーディングされた通信路を通してKafkaクラスタにBrokerのアドレスを問い合わせる
  3. KafkaクラスタがProducerに対してフォワーディングされないBrokerのアドレスを返す
  4. Producerは3で返されたBrokerのアドレスに対してデータの投入を行い、フォワーディングされないアドレスのため失敗する

Brokerのアドレスをどれか1つ知っていればクラスタ全体のアドレスを知ることができ、
クラスタの構成変更にも追従できる・・という柔軟性と引き換えに、
この手の小細工(?)が行いにくくなっているわけですね。

そのため、MLで質問したところ、
「advertised.host.name」「advertised.port」という設定を行えば
「KafkaBrokerからKafkaProducerに通知するアドレス」を変更できるという回答をもらいました。
そのため、今回は実際にそれを試してみようと思います。

1.前提条件

前提条件として、Kafka0.8.1のインストールを行っておきます。
若干設定ファイルやコマンドの構成が変わっていますが、Kafka0.8.0と手順自体は同じです。

2.設定

まず、接続側のマシンからlocalhost:19092からKafkaクラスタサーバの9092に
ポートフォワーディングを行う前提とし、下記のように設定を行います。

log.dirs=/tmp/kafka-logs
#advertised.host.name=<hostname routable by clients>
#advertised.port=<port accessible by clients>

log.dirs=/opt/kafka/kafka-logs
advertised.host.name=localhost
advertised.port=19092

3.実行してみますが・・・?

では、KafkaBrokerプロセスを起動し、実際にSSHポートフォワーディングを通してアクセス可能か試してみます。
なお、クラスタ外からSSHポートフォワーディングで操作可能であることを確認するため、
ZooKeeperに対してもSSHポートフォワーディングを行い、その上で確認を行っています。

■ポートフォワーディング用コンソール

# ssh root@XXX.XXX.XXX.XXX -L 19092:XXX.XXX.XXX.XXX:9092 -L 12181:XXX.XXX.XXX.XXX:2181
// KafkaBrokerとZooKeeperに対してポートフォワーディング

■操作用コンソール

# cd /opt/kafka/bin
# ./kafka-topics.sh --create --zookeeper localhost:12181 --replication-factor 1 --partitions 3 --topic TestTopic
Created topic "TestTopic".
// ZooKeeperに対してポートフォワーディングし、Topicを作成成功。
# ./kafka-topics.sh --list --zookeeper localhost:12181
TestTopic
// 実際にTopicが作成されていることを確認
# ./kafka-console-producer.sh --broker-list localhost:19092 --topic TestTopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Test
[2014-04-06 20:42:01,352] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,364] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,365] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic (kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,476] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,481] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,481] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic (kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,589] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,595] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,595] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic (kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,709] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartit
ionInfo)
[2014-04-06 20:42:01,719] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,719] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic (kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,826] WARN Error while fetching metadata [{TopicMetadata for topic TestTopic ->No partition metadata for topic TestTopic due to kafka.common.LeaderNotAvailableException}] for topic [TestTopic]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2014-04-06 20:42:01,829] ERROR Failed to send requests for topics TestTopic with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2014-04-06 20:42:01,830] ERROR Error in handling batch of 2 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

・・・という形で、BrokerからTopic情報の取得に失敗した、となり、投入を行うことができませんでした・・・

尚、advertisedの設定を行わない場合は同じ操作を行っても下記のエラーとなるため、
advertisedの設定によって「3. Producerは2で返されたBrokerのアドレスに対してデータの投入を行う」の
タイミングにおける接続先は変わっているのは確かなのですが。

Test
[2014-04-06 21:11:44,231] ERROR Producer connection to localhost:9092 unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2014-04-06 21:11:44,238] WARN Failed to send producer request with correlation id 2 to broker 0 with data for partitions [TestTopic,0] (kafka.producer.async.DefaultEventHandler)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

4.まとめ

今回やってみたまとめは・・・こんな感じでしょうか。

  1. KafkaはBrokerがデータの投入先をProducerに通知するため、単にSSHポートフォワーディングをしただけではSSHを通して投入できない。
  2. 「advertised.host.name」「advertised.port」の設定でBrokerが通知するアドレスを切り替えることができる。
  3. だが、SSHポートフォワーディング+「advertised.host.name」「advertised.port」で投入を試みた場合、Brokerからエラーが返る。

どなたか解決方法をご存知の方はいらっしゃらないでしょうか。