StormのコマンドはClojureを使用しているので起動が遅い?
こんにちは。
では、引き続きStormのコマンドが遅い理由について確認してみます。
1.Topology起動と、他のコマンドの違いは何なのか?
前回は、単にJVMの起動が遅いのではないか、レベルでとどまってしまいましたが、それではわからない点が1つあります。
「Topology起動と、他のコマンドの違いは何なのか?」ということです。
Topology起動時のJVM起動は1秒ほどで動いているにも関わらず、
他のコマンド(list/activate/deactivate/rebalance/kill)は5秒ほどかかっていました。
この違いは一体何なのでしょうか。
というわけで、前回の結果より、各コマンドを起動した時のJavaコマンド(クラスパス除く)を取得してみました。
#クラスパスは見た感じほぼ同じでしたので、それは影響しないとまずは仮定します。
■Topology起動
java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) storm.starter.ExclamationTopology ExclamationTopology
■Topology一覧取得(list)
java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.list
■Topology有効化(activate)
java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.activate ExclamationTopology
■Topology無効化(deactivate)
java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.deactivate ExclamationTopology
■Topology再配置(rebalance)
java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.rebalance ExclamationTopology
■Topology終了(kill)
java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.kill_topology ExclamationTopology
■設定値取得(追加)
java -client -Dstorm.options= -Dstorm.conf.file= (クラスパス) backtype.storm.command.config_value storm.log.dir
Topology起動とそれ以外のコマンドの違いは何でしょうか。
それは、「Javaのクラスを起動しているか、Clojureのクラスを起動しているか」の違いです。
とはいえ、Clojureのクラスといっても実際にJVMで起動する際にJavaのクラスと違いが発生するものとは思えません。
では、何が違うんでしょう。
2.Javaクラス起動時とClojureクラス起動時の違い
とりあえず、違いがわかっているわけではないため、適当な仮定をつけながらあたりをつけていきます。
まず一つ目は、「Clojureのクラスはコンパイル時に依存するクラスが大量に追加されているのではないか?」ということですね。
当然、JVM上で違う言語を動作させるわけですからコンパイル時等にそのあたりをラッピングさせる機構が働いているはずです。
ですので、そのラッピングするためのクラスが重いのではないか、という予測です。
というわけで、実際に起動時のロードクラスを見て確認してみましょう。
■Topology起動
Javaで書かれたTopology起動の結果は下記のようになり、ロードされたクラス数は1594クラスでした。
思ったより依存するクラスは多いわけですね。
java -client -verbose:class -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) storm.starter.ExclamationTopology ExclamationTopology [Loaded java.lang.Object from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.io.Serializable from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.lang.Comparable from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.lang.CharSequence from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.lang.String from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] (省略) [Loaded backtype.storm.generated.Nimbus$submitTopology_args from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar] [Loaded backtype.storm.generated.Nimbus$submitTopology_args$_Fields from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar] [Loaded org.apache.thrift7.protocol.TMap from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar] [Loaded backtype.storm.generated.ComponentObject$1 from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar] [Loaded backtype.storm.generated.Grouping$1 from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar] [Loaded backtype.storm.generated.Nimbus$submitTopology_result from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar] [Loaded backtype.storm.generated.Nimbus$submitTopology_result$_Fields from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar] 619 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: ExclamationTopology [Loaded java.lang.Shutdown from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.lang.Shutdown$Lock from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
■Topology一覧取得(list)
で、次はClojureで書かれたTopology一覧取得コマンドの実行時です。
実行してみると・・・?
java -client -verbose:class -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.list [Loaded java.lang.Object from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.io.Serializable from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.lang.Comparable from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.lang.CharSequence from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.lang.String from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] (省略) [Loaded clojure.lang.IDeref from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar] [Loaded clojure.lang.IRef from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar] [Loaded clojure.lang.Settable from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar] [Loaded clojure.lang.IMeta from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar] [Loaded clojure.lang.IReference from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar] (省略) No topologies running. [Loaded java.lang.Shutdown from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar] [Loaded java.lang.Shutdown$Lock from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
・・・あれ?なんかやたらと多いですね。
省略した分も含めると4720クラスのロードが行われていました。
特に、clojure、という名のつくクラスが多く、2700をオーバーする数のロードが行われていました。
他のコマンドも実行してみた結果は下記になります。
No | コマンド | 機能 | ロードクラス数 |
1 | jar | TopologyをStormクラスタにデプロイ | 1594 クラス |
2 | list | StormクラスタのTopology一覧取得 | 4720 クラス |
3 | activate | Topologyを有効化 | 4714 クラス |
4 | deactivate | Topologyを無効化 | 4714 クラス |
5 | rebalance | Topologyを再配置 | 4755 クラス |
6 | kill | Topologyを終了 | 4752 クラス |
7 | configvalue | 設定値読込 | 4544 クラス |
Stormのコマンドが遅い理由とは?
こんにちは。
とりあえず気になることができたので、しばらくStormネタです。
1.Stormを使っていて気になる所
皆さんはStormを実際に使ってみたことがあるでしょうか?
私は使って最初のうちは気にならなかったのですが、最近気になってきたことがあります。
それは、Stormのコマンド(storm)の遅さです。
8コア、32GBクラスのマシンの上で動作させていても、
StormのクラスタからTopologyの一覧取得するだけで普通に10秒位使っちゃうんですよね。
ということで、まずは実際にコマンドを実行した結果かかった時間を示してみます。
前提となるマシンスペックは下記です。
■ホストマシン
CPU : Core i7 3770 Memory : 32GB HDD : 2TB
■仮想マシン(stormtest)
CPUコア数 : 2 Memory : 8GB HDD : 200GB OS : CentOS6.6 64bit(Basic Server)
仮想マシンの性能は低いですが、その場合の値、として見てもらえればOKです。
尚、他の仮想マシンは稼働させずに確認しています。
2.Stormコマンド実行にかかる時間
では、実際にコマンドを実行してみます。
Topology起動
所要時間:11秒
[root@stormtest storm]# date;bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.ExclamationTopology ExclamationTopology;date 2015年 4月 8日 水曜日 23:55:42 JST Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) storm.starter.ExclamationTopology ExclamationTopology 340 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 347 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar examples/storm-starter/storm-starter-topologies-0.9.4.jar to assigned location: /opt/storm/nimbus/inbox/stormjar-1238b151-b1f8-48ad-acc5-80cf4107e688.jar Start uploading file 'examples/storm-starter/storm-starter-topologies-0.9.4.jar' to '/opt/storm/nimbus/inbox/stormjar-1238b151-b1f8-48ad-acc5-80cf4107e688.jar' (3244061 bytes) [==================================================] 3244061 / 3244061 File 'examples/storm-starter/storm-starter-topologies-0.9.4.jar' uploaded to '/opt/storm/nimbus/inbox/stormjar-1238b151-b1f8-48ad-acc5-80cf4107e688.jar' (3244061 bytes) 395 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/storm/nimbus/inbox/stormjar-1238b151-b1f8-48ad-acc5-80cf4107e688.jar 395 [main] INFO backtype.storm.StormSubmitter - Submitting topology ExclamationTopology in distributed mode with conf {"topology.workers":3,"topology.debug":true} 598 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: ExclamationTopology 2015年 4月 8日 水曜日 23:55:53 JST
Topology一覧取得
所要時間:16秒
[root@stormtest storm]# date;bin/storm list;date 2015年 4月 8日 水曜日 23:58:49 JST Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.list 1815 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 Topology_name Status Num_tasks Num_workers Uptime_secs ------------------------------------------------------------------- ExclamationTopology ACTIVE 18 3 192 2015年 4月 8日 水曜日 23:59:05 JST
TopologyActivate
所要時間:15秒
[root@stormtest storm]# date;bin/storm activate ExclamationTopology;date 2015年 4月 9日 木曜日 00:00:43 JST Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.activate ExclamationTopology 1323 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 1348 [main] INFO backtype.storm.command.activate - Activated topology: ExclamationTopology 2015年 4月 9日 木曜日 00:00:58 JST
TopologyDeactivate
所要時間:16秒
[root@stormtest storm]# date;bin/storm deactivate ExclamationTopology;date 2015年 4月 8日 水曜日 23:59:51 JST Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.deactivate ExclamationTopology 1776 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 1813 [main] INFO backtype.storm.command.deactivate - Deactivated topology: ExclamationTopology 2015年 4月 9日 木曜日 00:00:07 JST
TopologyRebalance
所要時間:16秒
[root@stormtest storm]# date;bin/storm rebalance ExclamationTopology;date 2015年 4月 9日 木曜日 00:02:51 JST Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.rebalance ExclamationTopology 1013 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 1047 [main] INFO backtype.storm.command.rebalance - Topology ExclamationTopology is rebalancing 2015年 4月 9日 木曜日 00:03:07 JST
Topology kill
所要時間:16秒
[root@stormtest storm]# date;bin/storm kill ExclamationTopology;date 2015年 4月 9日 木曜日 00:04:25 JST Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.kill_topology ExclamationTopology 711 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 753 [main] INFO backtype.storm.command.kill-topology - Killed topology: ExclamationTopology 2015年 4月 9日 木曜日 00:04:41 JST
・・・遅い。
かつ、何故か一番時間がかかりそうなTopology起動よりも
他のコマンド群の方が明らかに時間がかかっているというのはどういうことなんでしょう・・・
ともあれ、とりあえずまとめてみた結果は下記でした。
No | コマンド | 機能 | 所要時間 |
1 | jar | TopologyをStormクラスタにデプロイ | 11秒 |
2 | list | StormクラスタのTopology一覧取得 | 16秒 |
3 | activate | Topologyを有効化 | 15秒 |
4 | deactivate | Topologyを無効化 | 16秒 |
5 | rebalance | Topologyを再配置 | 16秒 |
6 | kill | Topologyを終了 | 16秒 |
3.どこに時間がかかっている?
実際に、Stormのコマンドの中身を確認してどこに時間がかかっているかを追ってみます。
今回試した6コマンドは全てStormコマンド内部では「exec_storm_class」という関数を呼び出しています。
ですので、exec_storm_classに下記のように時刻を出力する処理を追記し、再実行してみます。
尚、exec_storm_classの最後で出力を行っていないのは、最後まで実行しないままforkされてしまうからです。
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False): import datetime # 追記 print("Start exec_storm_class " + datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")) # 追記 global CONFFILE storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR]) print("Config get " + datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")) # 追記 if(storm_log_dir == None or storm_log_dir == "nil"): storm_log_dir = STORM_DIR+"/logs" all_args = [ JAVA_CMD, jvmtype, get_config_opts(), "-Dstorm.home=" + STORM_DIR, "-Dstorm.log.dir=" + storm_log_dir, "-Djava.library.path=" + confvalue("java.library.path", extrajars), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrajars), ] + jvmopts + [klass] + list(args) print("Config get and args construct " + datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")) # 追記 print("Running: " + " ".join(all_args)) if fork: os.spawnvp(os.P_WAIT, JAVA_CMD, all_args) else: os.execvp(JAVA_CMD, all_args) # replaces the current process and # never returnsa
再実行してみた結果は下記になりました。
Topology起動
[root@stormtest storm]# date;bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.ExclamationTopology ExclamationTopology;date 2015年 4月 9日 木曜日 00:29:24 JST Start exec_storm_class 2015-04-09 00:29:24 Config get 2015-04-09 00:29:29 Config get and args construct 2015-04-09 00:29:34 Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) storm.starter.ExclamationTopology ExclamationTopology (省略) 391 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: ExclamationTopology 2015年 4月 9日 木曜日 00:29:34 JST
Topology一覧取得
所要時間:16秒
2015年 4月 9日 木曜日 00:30:53 JST Start exec_storm_class 2015-04-09 00:30:53 Config get 2015-04-09 00:30:59 Config get and args construct 2015-04-09 00:31:05 Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.list 1815 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 Topology_name Status Num_tasks Num_workers Uptime_secs ------------------------------------------------------------------- ExclamationTopology ACTIVE 18 3 192 2015年 4月 9日 木曜日 00:31:10 JST
TopologyActivate
所要時間:15秒
[root@stormtest storm]# date;bin/storm activate ExclamationTopology;date 2015年 4月 9日 木曜日 00:33:54 JST Start exec_storm_class 2015-04-09 00:33:54 Config get 2015-04-09 00:33:59 Config get and args construct 2015-04-09 00:34:04 Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.activate ExclamationTopology 1628 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 1641 [main] INFO backtype.storm.command.activate - Activated topology: ExclamationTopology 2015年 4月 9日 木曜日 00:34:09
TopologyDeactivate
所要時間:16秒
[root@stormtest storm]# date;bin/storm deactivate ExclamationTopology;date 2015年 4月 9日 木曜日 00:33:04 JST Start exec_storm_class 2015-04-09 00:33:04 Config get 2015-04-09 00:33:09 Config get and args construct 2015-04-09 00:33:14 Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.deactivate ExclamationTopology 2085 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 2098 [main] INFO backtype.storm.command.deactivate - Deactivated topology: ExclamationTopology 2015年 4月 9日 木曜日 00:33:20 JST
TopologyRebalance
所要時間:16秒
[root@stormtest storm]# date;bin/storm rebalance ExclamationTopology;date 2015年 4月 9日 木曜日 00:36:22 JST Start exec_storm_class 2015-04-09 00:36:22 Config get 2015-04-09 00:36:27 Config get and args construct 2015-04-09 00:36:32 Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.rebalance ExclamationTopology 691 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 705 [main] INFO backtype.storm.command.rebalance - Topology ExclamationTopology is rebalancing 2015年 4月 9日 木曜日 00:36:38 JST
Topology kill
所要時間:16秒
[root@stormtest storm]# date;bin/storm kill ExclamationTopology;date 2015年 4月 9日 木曜日 00:37:21 JST Start exec_storm_class 2015-04-09 00:37:21 Config get 2015-04-09 00:37:26 Config get and args construct 2015-04-09 00:37:32 Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.kill_topology ExclamationTopology 583 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 605 [main] INFO backtype.storm.command.kill-topology - Killed topology: ExclamationTopology 2015年 4月 9日 木曜日 00:37:37 JST
4.時間がかかっている場所
上記の結果より、下記のことが言えるようです。
- 設定ファイルから設定値を読み込む(関数「confvalue」実行)に5秒程かかる
- Topologyの起動自体は実は1秒程しかかからない
- Topology起動以外のlist/activate/deactivate/rebalance/killは5秒程かかる
そのうち「設定ファイルから設定値を読み込むのに5秒程かかる」については、
下記の通りStormのコマンドを見るに、1項目の設定値を読み込むためにJVMを起動しているためのようではあります。
def confvalue(name, extrapaths): global CONFFILE command = [ JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name ] p = sub.Popen(command, stdout=sub.PIPE) #Javaプロセスを起動している! output, errors = p.communicate() # python 3 if not isinstance(output, str): output = output.decode('utf-8') lines = output.split("\n") for line in lines: tokens = line.split(" ") if tokens[0] == "VALUE:": return " ".join(tokens[1:]) return ""
ただ、単純にJVMを起動するのに時間がかかる・・
というだけであれば、Topologyを起動するのに1秒ですむ所が、
やること自体は確実に少ないlist/activate/deactivate/rebalance/killは5秒程かかるのは筋が通らないんですよね。
とまぁ、とりあえずどこが遅いかはわかったので、
何故遅いか、については次回に詳細を追ってみることにします。
Storm-0.9.4のバージョンアップ内容
こんにちは。
少し間が空いてしまったのですが、
Stormが0.9.4にバージョンアップしたので、バージョンアップ内容をざっとまとめてみます。
- STORM-559: ZkHosts in README should use 2181 as port.
storm-kafkaにおいて、READMEにあるZooKeeperの接続設定例が「localhost:9092」となっており、
ZooKeeperのデフォルトポートである2181で無い値になっていたので、それを修正したようです。
READMEの修正のため、機能的な変更点はありません。
- STORM-682: supervisor should handle worker state corruption gracefully.
supervisorがローカルに保持するWorkerプロセスのHeatBeat情報が不整合を起こし、
次起動する際に起動に失敗する事象が発生する問題への対処ですね。
Xと、X.versionという2ファイルが存在するのですが、消す時にX消してX.version失敗すると
その後Supervisorが起動しなくなってしまうというものです。
そう起こる物ではないですが、起こるとStormのローカルファイルを消さないと復旧しないという厄介なものでした。
- STORM-693: when kafka bolt fails to write tuple, it should report error instead of silently acking.
KafkaBoltでKafkaにメッセージを書き込む際に、失敗しても内容をログ出力するのみでメッセージを破棄する、
という動作だったものをメッセージの処理自体を失敗として扱うようにしてます。
このあたりは設計方針にも依る感は強いですが、とりあえず安全側に倒したというようですね。
- STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages
他Workerプロセスが何かしらの原因でダウンした場合の通信エラーハンドリングの改善が行われています。
今までは他のプロセスのダウンによって、他のWorkerプロセスも芋ずる式にダウンすることがあったのですが、
それが発生しにくくなるよう、再接続間隔の調整等が行われています。
- STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
Supervisorがrebalance等の要因によって動作中のTopologyを自ノード上で動作させる時に落ちる問題への対処ですね。
設定ファイル等が各Supervisorに配信されるのはTopologySubmitのタイミングとなるため、
後からクラスタに追加する等、そのタイミングで配信されなかった場合や、何かしらの原因でファイルを保持しない
SupervisorにTopologyをrebalanceすることは出来ない問題がありました。
それが解消するようです。
今回は機能の追加はないため小さいアップデートになりますが、
STORM-682、STORM-329、STORM-130あたりは地味に重要なアップデートになりますね。
STORM-682は頻度は低いですが一度発生するとわかっていないと対応不可、
STORM-329、STORM-130は普通におき得る問題ですので。
というわけで、内容も確認したのでインストーラや依存バージョンを更新しますか〜。
並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その4:LocalExecutor概要
こんにちは。
Embulkが前回の投稿から今回の投稿までの間にJava用のプラグインもサポートしていますね。
これでようやくプラグインを書けるようになった・・・のですが、
とりあえずプラグインを書くのは裏で行っておくとして、ここではソースコードリーディングを続けます。
Embulk 0.3 & 0.4 の新機能 - リジュームとJavaプラグイン - Blog by Sadayuki Furuhashi
今回はLocalExecutor、つまりローカルでデータの取得→書込処理を行うためのクラスです。
1. LocalExecutorに関連するクラス群
LocalExecutorに関連するクラス群をまとめてみますと以下のような図になります。
正確にはExecクラスによってThreadLocalな変数を生成してExecSessionを保持できるようにして実行・・
といったことも関わってくるのですが、動作を流してみる分にはそれほど影響が無いため、
図中からは省略しています。
実際に動かすにあたっては非常に重要な機構ですが、まずはわかりやすく処理を一本通してみようということで^^;
各クラスの役目は前回書いた内容とほぼ同じですので省略します。
2. LocalExecutorの動作
では、実際のLocalExecutorの動作を見ていきます。
初期化処理(コンストラクタ)
- 最大スレッド数、システム設定などを初期化する。
- ExecutorServiceのスレッド数はデフォルトコア数×2、設定可能
- ExecutorServiceが実際に使用されるのは本記事的にはしばらく後。
その後、前回の実行結果がErrorで終わっており、再実行状態が存在するかを確認しています。
再実行状態が存在しない場合は通常実行(run)、存在する場合は再実行処理(resume)、となります。
差分としては下記の内容が存在しますが、大まかな流れはrunで大体わかるはずですので、
今回はrunのみ読んでいきます。
#resumeはまた機会があれば。
実行処理(run):実行前ブロック
実行処理(run):処理実行部
では、実処理部に入るのですが、文章だけで書いてもわかりにくいため、
ソースをインデントを浅くして1行の長さを長くしたものを下記にはっておきます。
一度ソースを見てから以後の文章を見てください。
private ExecutionResult doRun(ConfigSource config) { final ExecutorTask task = config.loadConfig(ExecutorTask.class); final InputPlugin in = newInputPlugin(task); final List<FilterPlugin> filterPlugins = newFilterPlugins(task); final OutputPlugin out = newOutputPlugin(task); final ProcessState state = new ProcessState(Exec.getLogger(LocalExecutor.class)); try { ConfigDiff inputConfigDiff = in.transaction(task.getInputConfig(), new InputPlugin.Control() { public List<CommitReport> run(final TaskSource inputTask, final Schema inputSchema, final int taskCount) { state.initialize(taskCount); state.setInputSchema(inputSchema); Filters.transaction(filterPlugins, task.getFilterConfigs(), inputSchema, new Filters.Control() { public void run(final List<TaskSource> filterTasks, final List<Schema> filterSchemas) { Schema outputSchema = last(filterSchemas); state.setOutputSchema(outputSchema); ConfigDiff outputConfigDiff = out.transaction(task.getOutputConfig(), outputSchema, taskCount, new OutputPlugin.Control() { public List<CommitReport> run(final TaskSource outputTask) { task.setInputTask(inputTask); task.setFilterTasks(filterTasks); task.setOutputTask(outputTask); if (taskCount > 0) { process(task.dump(), filterSchemas, taskCount, state); if (!state.isAllCommitted()) { throw state.getRepresentativeException(); } } else { // TODO warning? } return state.getOutputCommitReports(); } }); state.setOutputConfigDiff(outputConfigDiff); } }); return state.getInputCommitReports(); } }); state.setInputConfigDiff(inputConfigDiff); try { doCleanup(config, state.buildResumeState(task, Exec.session())); } catch (Exception ex) { state.logger.warn("Commit succeeded but cleanup failed. Ignoring this exception.", ex); // TODO } return state.buildExecuteResult(); } catch (Throwable ex) { if (state.isAllCommitted()) { // ignore the exception return state.buildExecuteResultWithWarningException(ex); } if (!state.isAnyStarted()) { throw ex; } throw state.buildPartialExecuteException(ex, task, Exec.session()); } }
実処理はまずPluginの設定を連結して実際に実行するパイプライン(Taskと呼ぶのが正しい?)を生成し、
それをスレッドに割り振ってExecutorServiceで実行する形を取っています。
あと、InputPlugin、FilterPlugin、OutputPluginは実際はインタフェースのため
実処理は記述されていないのですが、わかりやすくするため下記のクラス群を参考に記述しています。
下記のクラス群に移行している間はその旨を書いています。
あと、FilterPluginは処理として必須の流れではないようなので、今回は省略です。
・InputPlugin>FileInputRunner>FileInputPlugin(LocalFileInputPlugin)
・OutputPlugin>FileOutputRunner>FileOutputPlugin(LocalFileOutputPlugin)
では、これから実処理部の流れを見ていきます。
- ExecutorTask、InputPlugin、FilterPluginのリスト、OutputPluginを設定/PluginManagerを用いてロード
- InputPluginのtransaction開始
- (FileInputRunner)RunnerTaskを設定からロード
- (FileInputRunner)FileInputPluginのtransaction開始
- (LocalFileInputPlugin)PluginTaskを設定からロード
- (LocalFileInputPlugin)PluginTaskの情報を基に、ファイル一覧を取得する。
- (LocalFileInputPlugin)処理スレッド数をファイルと同一にして読込み処理開始
- (FileInputRunner)Decoders、ParserPluginのtransactionを通し、結果をLocalExecutorから与えられたInputPlugin.Control()に渡す。
- この段階でようやくLocalExecutor(内部無名クラス)に処理が返ってくるわけですね。
- InputPlugin.Control()に渡された結果を受けて、List
に対するtransactionを開始 - List中のFilterPluginのtransactionを順々に実行、最後のFilterPluginは結果をLocalExecutorから与えられたFilterPlugin.Control()に渡す。
- FilterPlugin.Control()に与えられた結果を受けてOutputPluginのtransaction開始
- (FileOutputRunner)RunnerTaskを設定からロード
- (FileOutputRunner)FileOutputPluginのtransaction開始
- (LocalFileOutputPlugin)PluginTaskを設定からロード
- (LocalFileOutputPlugin)シーケンスのチェックを実施し、失敗した場合はエラーとする
- (FileOutputRunner)Encoders、FormatterPluginのtransactionを通し、結果をLocalExecutorから与えられたOutputPlugin.Control()に渡す。
- 再度LocalExecutor(内部無名クラス)に処理が返ります。
- それまでのInputTask、FilterTask、OutputTaskをTask情報に設定してLocalExecutorで実際に処理を行うためのTaskを生成する。
- TaskをstartProcessorメソッドで非同期/並列に実行。
- 並列度は初期化処理で書かれた「ExecutorServiceのスレッド数」で決まる。
- 実行結果を取得し、各InputPlugin、OutputPluginのCleanup処理を呼び出す
- cleanup処理も並列実行可能な基本構造となっています。標準Pluginではそこまで本格的なCleanup処理を呼び出しているものはありませんでしたが。
- 実行結果を生成する。
その後、Runnerクラス側の方で失敗していた場合は再実行状態を生成し、成功した場合は次回実行用の状態を生成してファイル出力・・
という流れになっています。
とりあえず、LocalExecutorを読んでみましたが、1回では流れを通すのがやっとという感じですね。
ですので、次回以降も内部のEntity構成やInputPluginといったPlugin群の構造等、読んでいこうと思います。
並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その3:run概要&データソース概要
こんにちは。
前回で初期化部分の確認が終わったため、今回は本処理の方に入ってきます。
尚、ServiceLoaderでJava製プラグインがロード出来るかについてはとりあえず一通り読んでからの方針で^^;
まず、基本構造としては
上記の図にあるRunnerが起動の起点となり、初期化を行った後に
run/cleanup/guess/previewの各々の処理に分岐する流れとなっています。
まず今回はrun処理の流れを追ってみることにします。
1. run処理の流れ概要
run処理の流れは下記のようになっています。
- 設定ファイル(YAML形式)を読み込む。
- 出力ファイル(次回実行用の設定出力先、Resume状態の出力先)の出力可能確認を行う。
- Resume状態ファイルを読み込む。
- Executorクラスを生成する。
- Resume状態にあわせて処理を実行する。
- 実行失敗した場合、Resume出力先設定がある場合はそこに状態を記録、設定が存在しない場合はTransactionをクリーンアップする。
- 実行成功した場合、Resume設定ファイルを削除する。
- 次回実行用の設定ファイルを出力する。
流れ自体は非常にオードソックスなものですね。
2. run処理を実行するクラス群
次はrun処理を実行するクラス群を確認します。
とりあえず、流れとデータの保存先が大体わかる感じまで見てみると、下記のようなクラス構成となっていました。
各クラスの役割は下記になります。
- ConfigLoader
各種設定ファイルからConfigSourceを生成する設定読込クラス。
今回の範囲ではYamlファイルを読み込んでJsonObjectに変換して返している。
- ConfigSource/TaskSource/CommitReport/ConfigDiff
設定の設定と取得が行えるインタフェース。
入れ子構造になっており、内部要素を取得したり、内部要素をEntityに変換して返すことが可能。
今回の範囲の実体はModelManagerとJsonObjectを設定したDataSourceImplであり、
ModelManagerをJsonObjectをクラスに変換する際に使用している。
- Schema
データの読み込み/書き込み先となるデータソースの
のスキーマをカラム(インデックス、名称、型情報を保持するカラム定義)のリストとして
保持するスキーマ定義クラス。
- ResumeState
前回実行失敗時に状態を保存するクラス。
下記の情報を保持する。
- ExecSession
実行時の情報を保持するクラス。
設定関連のオブジェクトの他にPluginManager、BufferAllocatorや
トランザクション時間(記録するための値)を保持する。
- LocalExecutor
ローカルで実際のInput/Output処理を実行するクラス。
与えられた設定を元にInput/Output処理を実行し、結果を記録する。
現状、embulkの実処理を実行するコアクラス。
- ProcessState
プロセスの現在の実行状態を保持するクラス。
Processorの数だけ実行情報を保持する。
保持している状態は下記のとおり。
と、こう見てみるとよく見た構造を持つ4クラス(ConfigSource/TaskSource/CommitReport/ConfigDiff)が気になりますね。
ですので、まずはこのデータソース周りを見て見ます。
3. データソース関連クラス群
データソース関連クラス群をまとめてみると、下記のようになります。
クラス構造を見てもわかるとおり、ConfigSource/TaskSource/CommitReport/ConfigDiffの実体はDataSourceImplクラスであり、
実際にデータはJacksonのObjectNodeとして保持されています。
その上で、DataSourceSerDeクラスに記述されたSerDeModule(下記)によって
各種データソースのシリアライザ/デシリアライザが紐付けられています。
この紐付けによってこれらのデータソースのクラスを指定することで
設定ファイルから読み込んでキャストし、ロードすることができるわけですね。
public static class SerDeModule extends SimpleModule { public SerDeModule(final ModelManager model) { // DataSourceImpl addSerializer(DataSourceImpl.class, new DataSourceSerializer<DataSourceImpl>()); addDeserializer(DataSourceImpl.class, new DataSourceDeserializer<DataSourceImpl>(model)); // ConfigSource addSerializer(ConfigSource.class, new DataSourceSerializer<ConfigSource>()); addDeserializer(ConfigSource.class, new DataSourceDeserializer<ConfigSource>(model)); // TaskSource addSerializer(TaskSource.class, new DataSourceSerializer<TaskSource>()); addDeserializer(TaskSource.class, new DataSourceDeserializer<TaskSource>(model)); // CommitReport addSerializer(CommitReport.class, new DataSourceSerializer<CommitReport>()); addDeserializer(CommitReport.class, new DataSourceDeserializer<CommitReport>(model)); // ConfigDiff addSerializer(ConfigDiff.class, new DataSourceSerializer<ConfigDiff>()); addDeserializer(ConfigDiff.class, new DataSourceDeserializer<ConfigDiff>(model)); } }
ModelManagerはさまざまなクラスのシリアライザ/デシリアライザを
内包したObjectMapperを保持しており、それと上記のシリアライザ/デシリアライザを連携させ、
インジェクションすることでValueObjectをデータを保持するところから
非常に短い、シンプルなコードでロードすることが可能になっています。
なお、このあたりを見ていくとSessionTask、ExecutorTaskといったTaskオブジェクトの
シリアライザ/デシリアライザ定義なども見えてくるわけですが、
そこまで見ていくときりがないんですよね(汗
そのあたりのエンティティのシリアライズ/デシリアライズ周りは動作自体にはそう影響してこない・・
と言いたいのですが、このあたりのValueObjectの管理をいかに楽に行うかが
ValueObjectを大量に扱うプロダクトの鍵ですので、このシリアライズ/デシリアライズ機構も
embulkの重要な機能のひとつになります。
とはいえ、run処理の流れとデータソース周りの概要が見えた段階で今回はここまでにして、
次回はLocalExecutorの詳細か、ModelManagerの詳細を見ていきます。
どちらにするかはまぁその時にでも。
並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その2:初期化
こんにちは。
前回は起動時のさわりだけでしたので、今回からまともに中身を読む形になりますね^^;
1.クラス概要構成
JRubyによる起動処理部分が終わり、
Javaに戻ったのでまずはJavaクラスの概要構成から確認してみます。
概要の構成はこれだけです。
EmbulkはGuiceによるインジェクションで必要なオブジェクトを取得して
使用する形になるので、固定的に起点となるRunnerクラスから参照が行われているのは
上記の図の要素だけになります。
各クラスの解説は下記の通りです。
Runner
JRubyから起動されるEmbulkの本来のメインクラス。
各種オブジェクトの初期化後、run/cleanup/guess/previewといった個別コマンドの処理を行うクラス。
DataSourceImpl
システムプロパティに設定されたembulk関連の定義(embulk.で始まるもの)を
JSON形式で読み込んで保持し、Embulkプロセス内で自由に使用するために用いられるクラス。
EmbulkService
下記のモジュール類を用いて
インジェクション用のオブジェクト(Guice:Injector、SpringだったらContextみたいなもの)を
生成するクラス。
- SystemConfigModule
- ExecModule
- ExtensionServiceLoaderModule
- BuiltinPluginSourceModule
- JRubyScriptingModule
・・という動作のため、上記の〜Moduleクラス群が
実際にインジェクションされるクラスを設定している形になりますね。
2.Moduleクラス群の設定内容
〜Moduleクラスの処理を見てみると下記のようになります。
SystemConfigModule
- bind定義:DataSourceImpl > ConfigSource(@ForSystemConfig)
ExecModule
- bind定義:LoggerProvider > ILoggerFactory(Provider)
- bind定義:ModelManager(Singleton)
- bind定義:BufferAllocator > BufferAllocator(Singleton)
- bind定義:GuessExecutor.GuessParserPlugin > ParserPlugin(@Named("ParserPlugin.system_guess"))
- bind定義:SamplingParserPlugin > ParserPlugin(@Named("ParserPlugin.system_sampling"))
Plugin系のクラスについてはアノテーションでPluginと名前を指定してインジェクションする形になりますが、
FileInputPlugin実装クラスとFileOutputPlugin実装クラスについては
Pluginの実行制御を行うFileInputRunner/FileOutputRunnerという形で登録されるようです。
理由は今後読み進めた際に確認してみます。
- bind定義:ObjectMapper > ObjectMapperProvider
上記のObjectMapperProviderにおいて、
TimeZone、Timestamp、Charset、GuavaDataType、JodaDataTypeのオブジェクト変換に対応する
ObjectMapperが生成されています。
文字列が時刻として上手く解釈されるのはここで生成されるObjectMapperのおかげですかね。
ExtensionServiceLoaderModule
ServiceLoaderを用いてExtension定義(META-INF/services/org.embulk.exec.Extensionから読み込んだ結果)を
読み込み、Extensionを介してModuleのBindを行う。
現状、「org.embulk.standards.StandardPluginExtension」が定義されており、
embulk-standardプロジェクトで作成されたPluginがBindされています。
これと同じ方式でPluginをロードする流れを作れれば、JVM系言語で開発したPluginもロードが出来そうです。
JVM系言語で追加のPluginを記述したければ、下記の4手順を踏む必要がありそうです。
- 追加Pluginを開発する。
- 「追加Plugin」をロードするModuleを開発する。
- 「追加PluginをロードするModule」を返すExtensionを開発する。
- Extension定義に「追加PluginをロードするModuleを返すExtension」のクラスを記述する。
ただ、これは自前でEmbulkをビルドするならいくらでも可能でしょうけど、
Pluginとして提供は・・可能なんですかね。
ServiceLoaderがクラスパス上に存在する「META-INF/services/org.embulk.exec.Extension」を全て読み込む、
という動作であれば、Pluginとして提供するJarに「META-INF/services/org.embulk.exec.Extension」を含めておき、
Pluginとして提供するJarファイルがEmbulk起動時のクラスパスに配置されていれば可能にはなります。
実際どうなるかはやってみるしかなさそうではあります。
BuiltinPluginSourceModule
- bind定義:InjectedPluginSource > PluginSource(Multi)
Multibinderを使用していますが、現状InjectedPluginSourceのみがBindされています。
JRubyScriptingModule
- bind定義:ScriptingContainerProvider > ScriptingContainer(Provider、Singleton)
- bind定義:JRubyPluginSource > PluginSource(Multi)
PluginSourceに対するMultibinderにJRubyPluginSourceも設定されていますが、
この場合PluginManagerで両方のPluginSourceがロードされるんでしょうか・・?
・・と、こんな感じの初期化処理でした。
JVM系言語で作成されたPluginをロードするための流れが見えたのは非常に収穫がありました。
尚、今回Guiceを使ったコードを始めてきちんと読んでみましたが、
インジェクション用の定義をコードに書く分、Springよりも直観的に読みやすくわかりやすいですね。
ただ、その代わりインジェクションするものを変える場合はコードを修正する=再ビルドする、
が絡んでくるわけですが。
今回のEmbulkのようにインジェクションするものは固定で良くて、
プラグインのロードによって機能を設定する、という用途では非常に使いやすそうです。
並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その1
こんにちは。
ようやくKinesisSpoutが一段楽したので次のネタを。
先日「データ転送ミドルウェア勉強会」が開催され、
そこでバルクデータロードツール『Embulk』が公開されました。
データのバルクロードというと、定番のOSSというのがなくて、
HDFSにバルクデータをロードする時はhadoopコマンドで行う・・などを行っていたのですが、
それがツールでできるというのは非常にありがたいですね。
で、既に使ってみた方の事例はいくつか挙がっていますので、実際にどう作られているかを見てみようと思います。
・・・ええ、Javaでプラグインが書けるようになるまで実際に動かすかソース読むしか出来ないからですね。
1.embulkのモジュール構成
embulkのGitHubを確認してみますと、下記3つのモジュールで構成されています。
- embulk-cli
- embulk-core
- embulk-standards
各モジュールが何か、を見てみるとどうやら下記のような感じのようです。
embulk-cli
embulkをjavaコマンドで起動した際に呼ばれるMainクラスのみを保持するモジュール。
起動引数の前に「classpath:embulk/command/embulk.rb」を追加し、JRubyを呼び出しているのみです。
・・ソースがJavaと期待して読み始めるとしょっぱなからRuby突入している!?
embulk-core
embulkのコアモジュールでモジュールをロードする機能や実行する機能を保持。
2.起動の流れ
軌道制御部分がJRubyで書かれていることがわかったため、KinesisSpoutのようにJavaコードから追っていくやり方は多分出来ない・・・
ということで、JRubyの起動処理部分を追ってみます。
#Rubyについては構文を知っている位のレベルなのでボケかましていたら生温かく突っ込んで頂けると幸いです。
尚、Rubyのコードはlibディレクトリ配下に配置されていました。
起動する際にはJRuby経由で「embulk/command/embulk.rb」が呼び出され、そこから起動します。
embulk.rb
- 環境変数「EMBULK_BUNDLE_PATH」または起動引数を基にGemのインストールパスを取得する。
- Gemのインストールパスを指定してEmbulk#run(embulk_run.rb)を呼び出す。
- 指定されていない場合はembulk_runで設定される。
embulk_run.rb
- 起動引数から「-」が付与されない引数のうちはじめの引数を取得し、「サブコマンド」とする。
- 「サブコマンド」が存在しない場合はその時点でusageエラー
- 「サブコマンド」が下記のいずれかの場合は指定引数に応じて後のusageメッセージに内容を追加。
- bundle/run/preview/guess/example(usageメッセージの追加はなし)
- 「サブコマンド」が上記のいずれでもなく、下記のいずれかの場合は処理を起動
- gem(Rubyのgemコマンドを起動)、exec(引数を用いてそのままプロセス起動)
- その他のオプションの形式チェックを実施、NGであればusageエラー
- 「サブコマンド」に応じて下記のように処理を分岐
・・というわけで、CLIからJRubyを起動してGemやClasspathの解決を行い、
org.embulk.command.Runnerから再度Javaコードに戻ってツール本体が本格的に起動する・・
という流れのようです。
早い段階でJavaに戻ってきてちょっと安心。