kinesis-storm-spoutのソースコードを読んでみる(その4
こんにちは。
前回初期化処理周りの確認が終わったので、今回はメッセージの取得→送信や、
Ack/Fail機構周りを見ていきます。
尚、実際に見てみるとわかりますが、かなり煩雑な記述です(汗
ですので、とりあえず概要を知りたい方は
下記のポイントを理解できていれば問題ないと思います。
- KinesisSpoutはKinesisから取得したメッセージを1件単位でメモリ上に保持しており、Fail時は再送を行う。
- KinesisSpoutは「どのシーケンスまで処理完了したか?」を保持しており、定期的にZooKeeperに保存している。
- 「どのシーケンスまで処理完了したか?」は確実に完了した値としている。
- そのため、それより先のシーケンスのメッセージが処理完了しているケースもある。
今回のキーとなるクラスはInflightRecordTrackerです。
InflightRecordTrackerは「CheckPoint」という値を保持しており、
ZooKeeperに保存する「どこまでShardを読み込んだか?」のチェックポイントとして用いています。
「CheckPoint」は「ここまでは処理が完了した。(=Ackが返った)」という値になっており、
障害発生時も「CheckPoint」から読み込めばメッセージの欠損は発生しない作りとなっています。
#欠損は発生しないものの、重複処理は発生しえます。
nextTuple
Stormから常時呼ばれ続けるメソッドです。
このメソッドでメッセージを取得して下流コンポーネントに送信する・・という動作を行います。
※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※ShardGetterが二重になっていてわかりにくいが、クラス図の通りBufferedGetter > KinesisShardGetterという入れ子になっている。
- StateManagerがShardGetterを保持しない場合はSleepして終了
- ShardGetterを次のものに切り替える
- Roundrobin方式でnextTupleメソッドを呼び出すことにGetterを切り替えている。
- StateManagerから対象のShardに対するリトライメッセージを保持している場合はリトライメッセージを送信メッセージとする。
- リトライメッセージの保持については後で詳細を確認する。
- リトライメッセージが存在しない場合はShardGetterからメッセージを取得する
- BufferedGetterはバッファリングされたメッセージが存在しない場合はKinesisShardGetterからメッセージを取得する。
- BufferedGetterはバッファリングされたメッセージが存在せず、Shardの最後まで読んでいた場合、空メッセージを返す。
- 「Shardの最後まで読んでいた」はShardがReshadingされた結果既にクローズされている場合のみ発生する。
- ConfigのIKinesisRecordSchemeを使用してStormメッセージを生成する。
- 下流のコンポーネントにStormメッセージを送信
- Ack/Failの際に用いられるキー値は「ShardId:シーケンス」となっている。
- StateManagerにStormメッセージを送信した旨を通知する。
- StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
- (リトライ実行フラグがONの場合のみ下記を実施)
- InflightRecordTrackerの保持するAck待ち内部マップに送信メッセージが保持されていなかった場合、シーケンスをキーとして保存する。
- ※Ack待ち内部マップはLinkedListのような前後関係を保持するHashMapとなっており、前後のRecordへのアクセスを効率化している。
- Ack待ち内部マップに保存する際、その時点の「最後メッセージ」を「前の要素」として紐づける。
- Ack待ち内部マップに保存する際に「最後メッセージ」として記録する。
- Ack待ち内部マップに保存する際に「最前メッセージ」が空の場合、「最前メッセージ」としても記録する。
- InflightRecordTrackerは送信メッセージがリトライだった場合、リトライ数をインクリメントする。
- InflightRecordTrackerは送信メッセージがリトライだった場合、送信メッセージのシーケンスをリトライキューから除去する。
- StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
やたらと処理が多いですが、これは「CheckPoint」のシーケンスを
Ack/Fail時に算出するために必要になっています。
あと、最前メッセージ/最後メッセージは内容的には下記です。
最前メッセージ=「その時点でAck待ちとなっている中でシーケンスが最も前のメッセージ」
最後メッセージ=「その時点でAck待ちとなっている中でシーケンスが最も後のメッセージ」
ack
メッセージに対してAckが返った場合に呼び出されるメソッドです。
※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※キー値からShardId、シーケンスを復元して用いている。
- StateManagerにackを通知する。StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
- InflightRecordTrackerの保持するAck待ち内部マップから「送信メッセージ」を取得する。
- (送信メッセージが取得できた場合のみ以後の処理を実施)
- 「送信メッセージ」のAckフラグをON
- Ack待ち内部マップから「前の要素」「次の要素」を取得し、下記の処理を実施。
- 「前の要素」が空の場合下記の処理を実施。
- 対象メッセージ=「再前メッセージ」の場合シーケンスを「CheckPoint」として設定
- 対象メッセージをAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
- 「次の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
- 「次の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 対象メッセージ=「再前メッセージ」の場合シーケンスを「CheckPoint」として設定
- 「前の要素」が空でない場合下記の処理を実施。
- 「前の要素」がAck済みの場合下記の処理を実施
- 「前の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
- 「前の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
- 「次の要素」=「再前メッセージ」の場合、「次の要素」のシーケンスを「CheckPoint」として設定。
- 「次の要素」をAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 「前の要素」がAck済みでない場合下記の処理を実施
- 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
- 対象メッセージをAck待ち内部マップから除去し、各メッセージの「前の要素」「次の要素」、「最前メッセージ」「最後メッセージ」を更新。
- 「次の要素」が存在してAck受信済みの場合、下記の処理を実施。
- 「前の要素」がAck済みの場合下記の処理を実施
fail
メッセージに対してFailが返った場合に呼び出されるメソッドです。
※StateManagerの同期処理。これはZooKeeper上のShardListが更新された場合に対応するため。
※キー値からShardId、シーケンスを復元して用いている。
- StateManagerにfailを通知する。StateManager > LocalShardState > InflightRecordTrackerという順に通知される。
- InflightRecordTrackerの保持するAck待ち内部マップから「送信メッセージ」を取得する。
- (送信メッセージが取得できた場合のみ以後の処理を実施)
- リトライ数がリトライ閾値未満だった場合、対象メッセージのシーケンスをリトライキューに追加する。
- リトライ数がリトライ閾値以上だった場合、リトライカウントアウトログを出力し、対象メッセージのシーケンスに対してack処理を実施
・・と見てみるとackの処理がやたらと複雑ですが、
結局の所は「送信メッセージ」に対してAckを受信したタイミングで
「前の要素」「次の要素」の状態に応じて下記のように処理をしています。
「削除」となったタイミングで、削除対象のメッセージ=「最前メッセージ」だった場合に、
「CheckPoint」を削除対象のメッセージのシーケンスまで進め、
その次のメッセージを「最前メッセージ」と扱っている処理を行っています。
#「前の要素」が存在しない場合は「送信メッセージ」が「最前メッセージ」となるため、
#下記の表にも「CheckPoint」が進む旨を明記しています。
「次の要素」---> 「前の要素」↓ |
Ack済み | Ack待ち | 未存在 |
---|---|---|---|
Ack済み | 「送信メッセージ」を削除 「前の要素」を削除 |
「送信メッセージ」を保持 「前の要素」を削除 |
「送信メッセージ」を保持 「前の要素」を削除 |
Ack待ち | 「送信メッセージ」を削除 | 「送信メッセージ」を保持 | 「送信メッセージ」を保持 |
未存在 | 「送信メッセージ」を削除 「次の要素」を削除 「CheckPoint」を「次の要素」のシーケンスに設定 |
「送信メッセージ」を削除 「CheckPoint」を「送信メッセージ」のシーケンスに設定 |
「送信メッセージ」を削除 「CheckPoint」を「送信メッセージ」のシーケンスに設定 |
・・と、処理自体はローカルにメッセージを保存しているため複雑でしたが、
動作の概要自体は非常にわかりやすいものでした。
これでこれまでより深く理解してKinesisSpoutを使うことができますね!
kinesis-storm-spoutのソースコードを読んでみる(その3
こんにちは。
とりあえず前回で大体の構造がわかったので、
今後はStormで基本となる下記の処理単位ごとに実際に行われる処理を見てみます。
今回は初期化/有効化/停止周りまで。
コンストラクタ
Stormではコンストラクタと、Topologyを生成するクラスの中で呼ばれる各種メソッドでSpout/Boltにパラメータを設定します。
基本的にこのタイミングで設定可能なのはシリアライズ可能なオブジェクトのみですが、
KinesisSpoutは例外的にSerializationHelperというクラスによって
シリアライズ可能でないAWSの固有クラス(CredentialProvider等)がシリアライズ化され、Workerプロセスに配分されます。
コンストラクタで指定可能なのは下記の3要素です。
- KinesisSpoutConfig
- AWSCredentialsProvider
- ClientConfiguration
KinesisSpoutの動作設定と認証情報、接続情報が設定可能ということになりますね。
open
Workerプロセスに配信されて復元された後に呼び出されるメソッドです。
openメソッドの中で実行されている処理は下記でした。
- KinesisSpoutの状態管理クラス(ZookeeperStateManager)の生成
activate
openメソッドの後、Active状態で起動したTopologyや、
またはDeactivate状態で起動した後Activateイベントを受信したTopologyで呼び出されるメソッド。
- KinesisSpoutの状態管理クラス(ZookeeperStateManager)のactivate
- ShardListGetterを用いてShard一覧を取得
- Kinesis:DescribeStreamRequestを実行(Shardを取得しきるまで)
- ZooKeeper上(/kinesis_storm_spout/【TopologyName】/【StreamName】/shardList)にShardId一覧を保存する
- 但し、既にShardId一覧が保存されている場合は内容に関わらず、更新されない。
- ZooKeeper(/kinesis_storm_spout/【TopologyName】/【StreamName】/shardList)の監視を開始
- ただ、内容に関わらず更新されないため、実質Watcherが起動することはないと思われる。
- ShardListGetterを用いてShard一覧を取得
- KinesisSpoutの状態管理クラス(ZookeeperStateManager)のrebalance
- 「どこまでShardを読んだか?」を読込み対象のShard毎にZooKeeperに保存
- ZooKeeper上のShardId一覧から自分の読み込み対象となるShardIdを取得
- 読込み対象となるShardIdに対応する「どこまでShardを読んだか?」をZooKeeperから取得。取得できない場合は読み込みシーケンスを空で生成。
- 読込み対象はZooKeeper上のShardIdが基準となるため、Reshard後はZooKeeper上のShardIdと実際のShardIdがずれ、一切読み込めなくなる。
- 読込み対象となるShardIdに対応するShardGetterを生成
- ShardGetterをループするイテレータを生成し、RoundrobinでGetterからデータを取得する準備を行う。
- ZooKeeperへの状態保存時刻を現在時刻で初期化
deactivate
- KinesisSpoutの状態管理クラス(ZookeeperStateManager)のdeactivate
- 「どこまでShardを読んだか?」を読込み対象のShard毎にZooKeeperに保存
・・・ここまで読んだ時点でかなり萎える事実が判明していますね(汗
KinesisSpoutはKinesisのReshardingには一切対応しておらず、かつ
Reshardingを行うとZooKeeper上に保存された古いShardIdに対して読み込みを行い続けるため、
ZooKeeperの状態を消さない限りその後動作しなくなる、というもののようです。
尚、ご丁寧にソースコードには下記のように対応していない旨が明記されていました。とほほ。
「ZooKeeper上(/kinesis_storm_spout/【TopologyName】/【StreamName】/shardList)にShardId一覧を保存する」の個所の処理
/** * Initialize the shardList in ZK. This is called by every spout task on activate(), and ensures * that the shardList is up to date and correct. * * @param shards list of shards (output of DescribeStream). * @throws Exception */ void initialize(final ImmutableList<String> shards) throws Exception { NodeFunction verifyOrCreateShardList = new NodeFunction() { @Override public byte[] initialize() { LOG.info(this + " First initialization of shardList: " + shards); ShardListV0 shardList = new ShardListV0(shards); ObjectMapper objectMapper = new ObjectMapper(); byte[] data; try { data = objectMapper.writeValueAsBytes(shardList); } catch (JsonProcessingException e) { throw new KinesisSpoutException("Unable to serialize shardList " + shardList, e); } return data; } @Override public Mod<byte[]> apply(byte[] x) { // At this point, we don't support resharding. We assume the shard list is valid if one exists. ★対応していない宣言★ LOG.info("ShardList already initialized in Zookeeper. Assuming it is valid."); return Mod.noModification(); } }; atomicUpdate(SHARD_LIST_SUFFIX, verifyOrCreateShardList); }
ただ、KinesisSpoutのAck/Fail機構はそれはそれで面白そうなので、
中身を追うのは継続してみようとは思います。
kinesis-storm-spoutのソースコードを読んでみる(その2
こんにちは、前回に引き続き、
kinesis-storm-spoutのソースコードを読んでいきます。
kinesis-storm-spoutのコードを確認した所、
コンポーネント構成は大体下記のようになっていました。
#単なるエンティティ等は省いています。
尚、下記のクラスについてはインタフェースが切られており、
後から拡張する際に行いやすくはなっています。
特に、IKinesisRecordSchemeについてはまともに使用するならほぼ確実に自前で作成して埋め込む必要があると思います。
- DefaultKinesisRecordScheme(IKinesisRecordScheme)
- KinesisHelper(IShardListGetter)
- KinesisShardGetterBuilder(IShardGetterBuilder)
- ZookeeperStateManager(IKinesisSpoutStateManager)
- BufferedGetter(IShardGetter)
- KinesisShardGetter(IShardGetter)
あとは、BufferedGetterのようにKinesisの1Shardあたり秒間5リクエストしか処理できない、
という制約を上手く回避する作りも埋め込まれているのがわかります。
とりあえずこれで、こういう処理をどこでやっているか、という疑問が
わいた際には大体どのクラスを見ればわかると思います。
では、次回以降は実際にソースを読んで、動作として特徴的な所をまとめていきます。
kinesis-storm-spoutのソースコードを読んでみる(その1
こんにちは。
今更感はありますが、Amazon Kinesisが最近個人的に熱くなり、改めて使い方を見ています。
aws lambdaでKinesisのイベントを取得することも出来ますし、色々組み合わせて使えそうなんですよね。
ただ、当然Kinesisは単体では使用できないため、
何かしらのデータ処理基盤と組み合わせる形になります。
というわけで(?)、kinesis-storm-spoutのソースコードを読んでいます。
とまぁ、前置きはさておき、今回は読んでみてわかったことの概要をまず項目で挙げてしまい、
次回以降きちんと構造を書いて中身を追っていくことにします。
わかったこと一覧は下記です。
- 初回起動時はKinesisの「最初から」「最後から」のどちらかを読出し場所として指定する。デフォルトは「最後から」
- 「どこまで読んだか?」はZooKeeperに保存し、2回目起動時以降はZooKeeperから読み込む
- KinesisのShard数変更時、Topologyをdeactivate>activateすればShardに対するSpoutの再割り振りが行われる(?)
1.初回起動時はKinesisの「最初から」「最後から」のどちらかを読出し場所として指定する。デフォルトは「最後から」
kinesis-storm-spoutは初回起動時はKinesisの
「最初から(=保存されている中で一番古いレコード)」か、
「最後から(起動後に追加されたレコードのみ読む)」のどちらかを選んで読込みを開始します。
Kinesis自体がレコードは1日分保存しているため、「最初から」は1日前のレコードから読み込み始めるということになりますね。
2.「どこまで読んだか?」はZooKeeperに保存し、2回目起動時以降はZooKeeperから読み込む
これが重要な所で、kinesis-storm-spoutは
通常のKinesisApplicationのようにDynamoにどこまで読み込んだかを保存することはありません。
Kinesisにアクセスできる=Dynamoにも当然アクセスできる環境ですので、
何故そうしないのかはわかりませんが、現状はこういう動作になっています。
尚、ZooKeeperに状態を保存するのはデフォルトで60秒間隔。
Stormから常時呼び出されるTuple取得処理の中でZooKeeperに保存されます。
と、ここまで書いてわかるかとは思いますが、
複数のTopologyを起動してKinesisの読み込み状態を共有したい場合、
Zookeeperを共有しておく必要があるというわけですね。
このあたりは微妙に厄介かもしれません。
3.KinesisのShard数変更時、Topologyをdeactivate>activateすればShardに対するSpoutの再割り振りが行われる(?)
Kinesisはスケールアウト/スケールインでShard数が変動しますが、
kinesis-storm-spoutはレコード読込みをしながらShard数の変動を検知することはできません。
ではどうすればいいのかというと、
Topologyのdeactivate/activate処理において、KinesisのShardを取得しなおして
ZooKeeper上の状態を更新している処理があるため、それで適応しているように見えます。
・・ですが、いまいちこのあたりのコードが怪しいため、どこまできちんと動くかは微妙です。
尚、activate処理はTopologyを起動した際にも実行されるため、
Topologyの再起動でも実質同じことが行われることになります。
とりあえず、特徴的な所やkinesis-storm-spoutの固有事情的な所はこんな感じでしょうか。
次回以降クラス構成を起こして詳細を読んでみようと思います。
・・続くかは微妙ですが^^;
ただ、去年まではソースコードを読み込むなりしておきながら
どこにも残さずにしていたため、読んだ結果は何かしらの形で残していくかと。
それでは。
Stormクラスタのコンテナ化に向けて通信周りを整理してみる
こんにちは。
最近Dockerをはじめとしたコンテナを用いたアプリケーションの運用が広まっている・・・
ということで、Stormクラスタをコンテナのオーケストレーションツールで
構築できないか、と考えています。
出来るようになれば、よりStormがお手軽に使えるようになりますし、
CPUを多く使い、メモリはそれなり、ディスクアクセスはあまりないということで、
Storm自体がコンテナとの相性が良さそう、というのもあります。
そんなわけで、まずはStormが通信に使用しているポートと、
どのホストと通信が行われるかをまとめ、基情報にしようと思います。
1.Stormの構成プロセス
Stormの構成プロセスはZooKeeper含めて下記の7個になります。
ですが、現状Workerプロセスは「Supervisorプロセスが自分がいるマシン内で起動する」
という動作となるため、必ずSupervisorプロセスと同じコンテナ上で起動する形になります。
- Nimbus
- UI
- Supervisor
- DRPCServer
- LogViewer
- Worker(Supervisorと同コンテナ)
- ZooKeeper
現状存在しているStormのDockerfileを見てみましたが、
Workerを起動する際に別のコンテナを起動してそこに配置する・・ということは行っていないようです。
2.Stormの各プロセスが使用するポート一覧
各プロセスがデフォルトで使用しているポート番号は下記です。
コンテナ上で動作させるという時点でポート番号の重複は考えなくていいため、
デフォルト値をそのまま使用することにします。
また、「外部からアクセスされるか?」「Stormクラスタ内部のみのアクセスか?」で
コンテナ群からどう設定するかが変わってくるでしょうから、それも併せて記述します。
- Nimbus
- 6627(TopologySubmit等の通信待ち受けポート。外部からアクセスあり)
- UI
- 8080(画面表示用ポート。外部からアクセスあり)
- DRPCServer
- 3772(DRPCリクエスト受付用ポート。外部からアクセスあり)
- 3773(DRPCリクエストをWorkerプロセスが受信するためのポート)
- 3774(DRPCに対してHTTPでアクセスするためのポート。外部からアクセスあり)
- Supervisor
- (公開ポートなし)
- LogViwer
- 8000(ログ閲覧用ポート。外部からアクセスあり)
- Worker
- 6700〜6703(Worker同士の通信用ポート)
- Zookeeper
- 2181(クライアントアクセス用ポート)
- 2888(トランザクション情報送受信用ポート)
- 3888(リーダー選出通信用ポート)
また、その他の制約として下記があります。
- Supervisorプロセスが動作するマシンにおいてSupervisorのホスト名からIPアドレスを引ける必要がある。
これはSupervisorがZooKeeperに自分のホスト情報を登録する際に
IPアドレスではなくローカルで取得したホスト名を用いているからのようです。
その上で、この中でLogViewerについては使用しない方針とします。
何故なら、LogViewer自体がローカルのログをHTTPアクセスで確認するためものであり、
fluentdやkafkaでログ自体を収集してしまえば使用する必要はないプロセスだからです。
すると、Stormクラスタをコンテナで表現する場合の
ポート利用構成は下記の図のようになります。
NimbusとUI、DRPCの一部ポートだけ外部公開すればStormクラスタの機能は一通り満たせそうですね。
では、次回以降実際にStormクラスタをコンテナで運用できないか試してみます。
とはいえ、コンテナ同士を接続するのを手動でやるのは困難ですので、
k8sを使うことを前提としますかねぇ・・・
Cludera ManagerでImpalaをインストールする
こんにちは。
前回でCludera Managerが動作するようになったため、
Impalaを実際動く所までインストールしてみます。
1.OS設定実施
前回のvm.swappinessのエラーが出ていたため、それだけは解消してしまいます。
各サーバで下記のコマンドを実行することで解消可能です。
# echo "vm.swappiness = 0" >> /etc/sysctl.conf # sysctl -p
上記のコマンドを実行後、再度ホストインスペクタを実行するとエラーが解消するため、「続行」を押下します。
2. Impalaのインストール
「続行」を押下すると下記の画面が表示されるため、「Impalaがあるコア」を選択し、「続行」を押下します。
すると、必要なサービスをどこにインストールするか、を選択する画面が出るため、
下記のように修正します。
- HDFS:NameNode > cluster1
- HDFS:Secondary NameNode > cluster2
- HDFS:Balancer > cluster1
- HDFS:DataNode > cluster2、cluster3、cluster4
- Hive:Gateway > cluster1、cluster2、cluster3、cluster4
- Hive:Hive MetaStore Server > cluster1
- Hive:HiveServer2 > cluster1
- Hue:Hue Server > cluster1
- Impala:Impala Catalog Server > cluster1
- Impala:Impala State Store > cluster1
- Impala:Impala Daemon > HDFS:DataNodeと同じ
- Cloudera Management Service:Service Monitor > cluster1
- Cloudera Management Service:Host Monitor > cluster1
- Cloudera Management Service:Event Server > cluster1
- Cloudera Management Service:Alarm Publisher > cluster1
- Oozie:Oozie Server > cluster1
- Sqoop2:Sqoop2 Server > cluster1
- YARN(MR2 Included):Resource Manager > cluster1
- YARN(MR2 Included):Job History Server > cluster1
- YARN(MR2 Included):NodeManager > HDFS:DataNodeと同じ
- ZooKeeper:Server > cluster2、cluster3、cluster4
次はデータベースの設定です。
これはそのまま組み込みのデータベースを使う、として先に進めます。
わかりやすくするために下記の通りデータの出力先だけ「/opt」配下に修正し、「続行」を行います。
#記述が無いものはデフォルトです。
- データディレクトリ : /opt/zookeeper/snapshot
- トランザクションログのディレクトリ : /opt/zookeeper/transaction
- DataNode データディレクトリ(DataNode Default Group) : /opt/dfs/dn
- DataNode データディレクトリ(DataNode Group 1) : /opt/dfs/dn
- NameNode データディレクトリ : /opt/dfs/nn
- HDFS チェックポイントディレクトリ : /opt/dfs/snn
- NodeManager のローカルディレクトリリスト(NodeManager Default Group) : /opt/yarn/nm
- NodeManager のローカルディレクトリリスト(NodeManager Group 1) : /opt/yarn/nm
- Hive ウェアハウスディレクトリ : /opt/hive/warehouse
- Sqoop 2 Server の Metastore ディレクトリ : /opt/sqoop2
- Impala Daemon スクラッチディレクトリ(Impala Daemon Default Group) : /opt/impala/impalad
- Impala Daemon スクラッチディレクトリ(Impala Daemon Group 1) : /opt/impala/impalad
- Oozie Server のデータディレクトリ : /opt/oozie/data
- Host Monitor のストレージディレクトリ : /opt/cloudera-host-monitor
- Service Monitor のストレージディレクトリ : /opt/cloudera-service-monitor
すると、インストールがはじまり、各サーバにサービスがインストールされ、起動します。
・・・このあたり、以前rpmファイルを一つ一つ指定してHadoopのサービスを各サーバに
必要なものだけインストールしていたころと比べるとあっけなく感じる位楽ですね。
これでクラスタのインストールは完了です。
ちなみにトップページに戻ると設定の警告が出ているため、確認すると
下記のようにリソース系の設定エラーが出ていました。
とはいえ、今回は仮想マシンを使用しているためこの手のエラーは出るものとして継続します。
3. Impalaコマンドの動作確認
クラスタのサービス一覧からHueを選択し、Hueの画面を開いてアカウントを登録します。
アカウント登録後、サービスのテンプレートを全て登録し、継続します。
ユーザは特に何もせず次へ進みます。
続行後、画面上部のQuery Editor > Impalaを選択します。
すると、Impala Editor画面が表示されます。
まだテーブルは作成していないため、「show databases」を実行し、結果が表示されるのを確認します。
cluster2にログインし、impala-shellから同じコマンドを実行してみます。
# impala-shell Starting Impala Shell without Kerberos authentication Connected to cluster2:21000 Server version: impalad version 2.0.0-cdh5 RELEASE (build ecf30af0b4d6e56ea80297df2189367ada6b7da7) Welcome to the Impala shell. Press TAB twice to see a list of available commands. Copyright (c) 2012 Cloudera, Inc. All rights reserved. (Shell build version: Impala Shell v2.0.0-cdh5 (ecf30af) built on Sat Oct 11 13:56:06 PDT 2014) > show databases; Query: show databases +------------------+ | name | +------------------+ | _impala_builtins | | default | +------------------+
Hueの画面と同じ結果が表示されることが確認できました。
とりあえずImpala用のコマンドが実行可能になったため、次はImpalaに投入するためのデータを作成します。
ただ、CSVで投入してParquetに変換する、だといまいち効率が悪いため、
事前にParquetに変換するための処理を実際に流してみます。
あまりそのあたりの情報が無いようですが、
データを投入してからImpalaで変換を行うのは効率が悪いため、まずは試してみます。
Cludera Manager導入が実はCentOS6.6日本語環境でこける
こんにちは。
前回インストールに失敗してしまったので、その解決編です。
#CDH User JapanのMLでも色々サポートいただき、解決することができました。
#ありがとうございます。
1. インストール失敗した原因は?
結果から言うと、yumの結果が日本語表記になっていたのが問題でした。
yumの結果が下記のように日本語の結果となっていたため、
Cloudera Managerがバージョン文字列の抽出に失敗した・・と思われます。
■CentOS6.6日本語環境
# yum info cloudera-manager-agent 読み込んだプラグイン:fastestmirror, security Loading mirror speeds from cached hostfile * base: ftp.tsukuba.wide.ad.jp * extras: ftp.tsukuba.wide.ad.jp * updates: ftp.tsukuba.wide.ad.jp 利用可能なパッケージ 名前 : cloudera-manager-agent アーキテクチャ : x86_64 バージョン : 5.2.0 リリース : 1.cm520.p0.60.el6 容量 : 3.8 M リポジトリー : cloudera-manager 要約 : The Cloudera Manager Agent URL : http://www.cloudera.com ライセンス : Proprietary 説明 : The Cloudera Manager Agent. : : The Agent is deployed to machines running services managed by Cloudera Manager.
ちなみにCentOS6.5ですとインストール時に言語を日本語で
指定してインストールしてもyumの結果は英語表記になります。
そのため、CentOS6.5ですと今回の問題は発生せず、インストールに成功します。
■CentOS6.5日本語環境
# yum info cloudera-manager-agent 読み込んだプラグイン:fastestmirror, security Loading mirror speeds from cached hostfile * base: ftp.tsukuba.wide.ad.jp * extras: ftp.tsukuba.wide.ad.jp * updates: ftp.tsukuba.wide.ad.jp 利用可能なパッケージ 名前 : cloudera-manager-agent アーキテクチャ : x86_64 バージョン : 5.2.0 リリース : 1.cm520.p0.60.el6 容量 : 3.8 M リポジトリー : cloudera-manager 要約 : The Cloudera Manager Agent URL : http://www.cloudera.com ライセンス : Proprietary 説明 : The Cloudera Manager Agent. : : The Agent is deployed to machines running services managed by Cloudera Manager.
2. 解決手順
解消方法としては、「CentOS6.6の言語設定を英語に修正」となります。
/etc/sysconfig/i18n 中の「ja_JP.UTF-8」を「en_US.UTF-8」に差し替えます。
コマンド的には下記のコマンドで可能です。
# sed -i 's/ja_JP/en_US/' /etc/sysconfig/i18n
上記のコマンドを実行後再ログインすると、
CentOS6.6環境であってもyumの結果が英語で表示されるようになります。
■CentOS6.6英語環境
# yum info cloudera-manager-agent Loaded plugins: fastestmirror, security Loading mirror speeds from cached hostfile * base: ftp.tsukuba.wide.ad.jp * extras: ftp.tsukuba.wide.ad.jp * updates: ftp.tsukuba.wide.ad.jp Available Packages Name : cloudera-manager-agent Arch : x86_64 Version : 5.2.0 Release : 1.cm520.p0.60.el6 Size : 3.8 M Repo : cloudera-manager Summary : The Cloudera Manager Agent URL : http://www.cloudera.com License : Proprietary Description : The Cloudera Manager Agent. : : The Agent is deployed to machines running services managed by : Cloudera Manager.
上記の環境で再度Cloudera Managerのインストールを実行すると、
下記のようにインストールが成功することが確認できました。
#事前にcluster1は試しているため、今回の画面には表示されていません。
その後、「続行」してParcelのダウンロード/配布/アクティブ化を行います。
ホストインスペクタをかけます。
vm.swappinessはHadoopの性能が劣化するポピュラーなデフォルト設定ですが、
ClouderaManagerでも検知可能なんですね。
これはこの後設定するとして、とりあえずCloudera Managerのインストールは完了です。
次はImpalaの環境に入るか、Impala用のデータを作成するか・・迷う所ですね。
それでは。