夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

StormのコマンドはClojureを使用しているので起動が遅い?

こんにちは。

では、引き続きStormのコマンドが遅い理由について確認してみます。

1.Topology起動と、他のコマンドの違いは何なのか?

前回は、単にJVMの起動が遅いのではないか、レベルでとどまってしまいましたが、それではわからない点が1つあります。
「Topology起動と、他のコマンドの違いは何なのか?」ということです。

Topology起動時のJVM起動は1秒ほどで動いているにも関わらず、
他のコマンド(list/activate/deactivate/rebalance/kill)は5秒ほどかかっていました。
この違いは一体何なのでしょうか。

というわけで、前回の結果より、各コマンドを起動した時のJavaコマンド(クラスパス除く)を取得してみました。
#クラスパスは見た感じほぼ同じでしたので、それは影響しないとまずは仮定します。

■Topology起動

java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) storm.starter.ExclamationTopology ExclamationTopology

■Topology一覧取得(list)

java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.list

■Topology有効化(activate)

java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.activate ExclamationTopology

■Topology無効化(deactivate)

java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.deactivate ExclamationTopology

■Topology再配置(rebalance)

java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.rebalance ExclamationTopology

■Topology終了(kill)

java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.kill_topology ExclamationTopology

■設定値取得(追加)

java -client -Dstorm.options= -Dstorm.conf.file= (クラスパス) backtype.storm.command.config_value storm.log.dir

Topology起動とそれ以外のコマンドの違いは何でしょうか。
それは、Javaのクラスを起動しているか、Clojureのクラスを起動しているか」の違いです。
とはいえ、Clojureのクラスといっても実際にJVMで起動する際にJavaのクラスと違いが発生するものとは思えません。
では、何が違うんでしょう。

2.Javaクラス起動時とClojureクラス起動時の違い

とりあえず、違いがわかっているわけではないため、適当な仮定をつけながらあたりをつけていきます。
まず一つ目は、Clojureのクラスはコンパイル時に依存するクラスが大量に追加されているのではないか?」ということですね。
当然、JVM上で違う言語を動作させるわけですからコンパイル時等にそのあたりをラッピングさせる機構が働いているはずです。
ですので、そのラッピングするためのクラスが重いのではないか、という予測です。

というわけで、実際に起動時のロードクラスを見て確認してみましょう。
■Topology起動
Javaで書かれたTopology起動の結果は下記のようになり、ロードされたクラス数は1594クラスでした。
思ったより依存するクラスは多いわけですね。

java -client -verbose:class -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) storm.starter.ExclamationTopology ExclamationTopology
[Loaded java.lang.Object from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.io.Serializable from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.lang.Comparable from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.lang.CharSequence from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.lang.String from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
(省略)
[Loaded backtype.storm.generated.Nimbus$submitTopology_args from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar]
[Loaded backtype.storm.generated.Nimbus$submitTopology_args$_Fields from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar]
[Loaded org.apache.thrift7.protocol.TMap from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar]
[Loaded backtype.storm.generated.ComponentObject$1 from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar]
[Loaded backtype.storm.generated.Grouping$1 from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar]
[Loaded backtype.storm.generated.Nimbus$submitTopology_result from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar]
[Loaded backtype.storm.generated.Nimbus$submitTopology_result$_Fields from file:/opt/storm-0.9.4/lib/storm-core-0.9.4.jar]
619  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: ExclamationTopology
[Loaded java.lang.Shutdown from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.lang.Shutdown$Lock from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]

■Topology一覧取得(list)
で、次はClojureで書かれたTopology一覧取得コマンドの実行時です。
実行してみると・・・?

java -client -verbose:class -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (クラスパス) backtype.storm.command.list
[Loaded java.lang.Object from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.io.Serializable from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.lang.Comparable from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.lang.CharSequence from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.lang.String from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
(省略)
[Loaded clojure.lang.IDeref from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar]
[Loaded clojure.lang.IRef from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar]
[Loaded clojure.lang.Settable from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar]
[Loaded clojure.lang.IMeta from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar]
[Loaded clojure.lang.IReference from file:/opt/storm-0.9.4/lib/clojure-1.5.1.jar]
(省略)
No topologies running.
[Loaded java.lang.Shutdown from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]
[Loaded java.lang.Shutdown$Lock from /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.65.x86_64/jre/lib/rt.jar]

・・・あれ?なんかやたらと多いですね。
省略した分も含めると4720クラスのロードが行われていました。
特に、clojure、という名のつくクラスが多く、2700をオーバーする数のロードが行われていました。

他のコマンドも実行してみた結果は下記になります。

No コマンド 機能 ロードクラス数
1 jar TopologyをStormクラスタにデプロイ 1594 クラス
2 list StormクラスタのTopology一覧取得 4720 クラス
3 activate Topologyを有効化 4714 クラス
4 deactivate Topologyを無効化 4714 クラス
5 rebalance Topologyを再配置 4755 クラス
6 kill Topologyを終了 4752 クラス
7 configvalue 設定値読込 4544 クラス

3.わかったことは?

この結果からわかることは、下記あたりでしょうか。

  • JVM起動時にClojureのプログラムはJavaプログラムと比して3倍以上のクラスをロードする。
    • 結果、Clojureを実行するプログラムは起動に時間がかかる。

ともあれ、今回の結論としては、
StormのコマンドはClojureのクラスをロードして使用しているため、遅い
ということになりますね。

逆に言えば、StormのコマンドをClojureを使用せずにJavaで置き換えることができたなら、
それだけで大幅な高速化が出来るのはないでしょうか。

ただ、それは次回にでも。

Stormのコマンドが遅い理由とは?

こんにちは。

とりあえず気になることができたので、しばらくStormネタです。

1.Stormを使っていて気になる所

皆さんはStormを実際に使ってみたことがあるでしょうか?

私は使って最初のうちは気にならなかったのですが、最近気になってきたことがあります。
それは、Stormのコマンド(storm)の遅さです。

8コア、32GBクラスのマシンの上で動作させていても、
StormのクラスタからTopologyの一覧取得するだけで普通に10秒位使っちゃうんですよね。

ということで、まずは実際にコマンドを実行した結果かかった時間を示してみます。
前提となるマシンスペックは下記です。

■ホストマシン

CPU                 : Core i7 3770
Memory              : 32GB
HDD                 : 2TB

仮想マシン(stormtest)

CPUコア数           : 2
Memory              : 8GB
HDD                 : 200GB
OS                  : CentOS6.6 64bit(Basic Server)

仮想マシンの性能は低いですが、その場合の値、として見てもらえればOKです。
尚、他の仮想マシンは稼働させずに確認しています。

2.Stormコマンド実行にかかる時間

では、実際にコマンドを実行してみます。

Topology起動

所要時間:11秒

[root@stormtest storm]# date;bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.ExclamationTopology ExclamationTopology;date
2015年  4月  8日 水曜日 23:55:42 JST
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) storm.starter.ExclamationTopology ExclamationTopology
340  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
347  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar examples/storm-starter/storm-starter-topologies-0.9.4.jar to assigned location: /opt/storm/nimbus/inbox/stormjar-1238b151-b1f8-48ad-acc5-80cf4107e688.jar
Start uploading file 'examples/storm-starter/storm-starter-topologies-0.9.4.jar' to '/opt/storm/nimbus/inbox/stormjar-1238b151-b1f8-48ad-acc5-80cf4107e688.jar' (3244061 bytes)
[==================================================] 3244061 / 3244061
File 'examples/storm-starter/storm-starter-topologies-0.9.4.jar' uploaded to '/opt/storm/nimbus/inbox/stormjar-1238b151-b1f8-48ad-acc5-80cf4107e688.jar' (3244061 bytes)
395  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/storm/nimbus/inbox/stormjar-1238b151-b1f8-48ad-acc5-80cf4107e688.jar
395  [main] INFO  backtype.storm.StormSubmitter - Submitting topology ExclamationTopology in distributed mode with conf {"topology.workers":3,"topology.debug":true}
598  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: ExclamationTopology
2015年  4月  8日 水曜日 23:55:53 JST
Topology一覧取得

所要時間:16秒

[root@stormtest storm]# date;bin/storm list;date
2015年  4月  8日 水曜日 23:58:49 JST
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.list
1815 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
-------------------------------------------------------------------
ExclamationTopology  ACTIVE     18         3            192
2015年  4月  8日 水曜日 23:59:05 JST
TopologyActivate

所要時間:15秒

[root@stormtest storm]# date;bin/storm activate ExclamationTopology;date
2015年  4月  9日 木曜日 00:00:43 JST
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.activate ExclamationTopology
1323 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
1348 [main] INFO  backtype.storm.command.activate - Activated topology: ExclamationTopology
2015年  4月  9日 木曜日 00:00:58 JST
TopologyDeactivate

所要時間:16秒

[root@stormtest storm]# date;bin/storm deactivate ExclamationTopology;date
2015年  4月  8日 水曜日 23:59:51 JST
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.deactivate ExclamationTopology
1776 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
1813 [main] INFO  backtype.storm.command.deactivate - Deactivated topology: ExclamationTopology
2015年  4月  9日 木曜日 00:00:07 JST
TopologyRebalance

所要時間:16秒

[root@stormtest storm]# date;bin/storm rebalance ExclamationTopology;date
2015年  4月  9日 木曜日 00:02:51 JST
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.rebalance ExclamationTopology
1013 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
1047 [main] INFO  backtype.storm.command.rebalance - Topology ExclamationTopology is rebalancing
2015年  4月  9日 木曜日 00:03:07 JST
Topology kill

所要時間:16秒

[root@stormtest storm]# date;bin/storm kill ExclamationTopology;date
2015年  4月  9日 木曜日 00:04:25 JST
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.kill_topology ExclamationTopology
711  [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
753  [main] INFO  backtype.storm.command.kill-topology - Killed topology: ExclamationTopology
2015年  4月  9日 木曜日 00:04:41 JST

・・・遅い。
かつ、何故か一番時間がかかりそうなTopology起動よりも
他のコマンド群の方が明らかに時間がかかっているというのはどういうことなんでしょう・・・

ともあれ、とりあえずまとめてみた結果は下記でした。

No コマンド 機能 所要時間
1 jar TopologyをStormクラスタにデプロイ 11秒
2 list StormクラスタのTopology一覧取得 16秒
3 activate Topologyを有効化 15秒
4 deactivate Topologyを無効化 16秒
5 rebalance Topologyを再配置 16秒
6 kill Topologyを終了 16秒

3.どこに時間がかかっている?

実際に、Stormのコマンドの中身を確認してどこに時間がかかっているかを追ってみます。
今回試した6コマンドは全てStormコマンド内部では「exec_storm_class」という関数を呼び出しています。
ですので、exec_storm_classに下記のように時刻を出力する処理を追記し、再実行してみます。
尚、exec_storm_classの最後で出力を行っていないのは、最後まで実行しないままforkされてしまうからです。

def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
    import datetime # 追記
    print("Start exec_storm_class " + datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")) # 追記
    global CONFFILE
    storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])
    print("Config get " + datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")) # 追記
    if(storm_log_dir == None or storm_log_dir == "nil"):
        storm_log_dir = STORM_DIR+"/logs"
    all_args = [
        JAVA_CMD, jvmtype, get_config_opts(),
        "-Dstorm.home=" + STORM_DIR,
        "-Dstorm.log.dir=" + storm_log_dir,
        "-Djava.library.path=" + confvalue("java.library.path", extrajars),
        "-Dstorm.conf.file=" + CONFFILE,
        "-cp", get_classpath(extrajars),
    ] + jvmopts + [klass] + list(args)
    print("Config get and args construct " + datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")) # 追記
    print("Running: " + " ".join(all_args))
    if fork:
        os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)
    else:
        os.execvp(JAVA_CMD, all_args) # replaces the current process and
        # never returnsa

再実行してみた結果は下記になりました。

Topology起動
[root@stormtest storm]# date;bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.ExclamationTopology ExclamationTopology;date
2015年  4月  9日 木曜日 00:29:24 JST
Start exec_storm_class 2015-04-09 00:29:24
Config get 2015-04-09 00:29:29
Config get and args construct 2015-04-09 00:29:34
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) storm.starter.ExclamationTopology ExclamationTopology
 (省略) 
391  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: ExclamationTopology
2015年  4月  9日 木曜日 00:29:34 JST
Topology一覧取得

所要時間:16秒

2015年  4月  9日 木曜日 00:30:53 JST
Start exec_storm_class 2015-04-09 00:30:53
Config get 2015-04-09 00:30:59
Config get and args construct 2015-04-09 00:31:05
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.list
1815 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
-------------------------------------------------------------------
ExclamationTopology  ACTIVE     18         3            192
2015年  4月  9日 木曜日 00:31:10 JST
TopologyActivate

所要時間:15秒

[root@stormtest storm]# date;bin/storm activate ExclamationTopology;date
2015年  4月  9日 木曜日 00:33:54 JST
Start exec_storm_class 2015-04-09 00:33:54
Config get 2015-04-09 00:33:59
Config get and args construct 2015-04-09 00:34:04
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.activate ExclamationTopology
1628 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
1641 [main] INFO  backtype.storm.command.activate - Activated topology: ExclamationTopology
2015年  4月  9日 木曜日 00:34:09
TopologyDeactivate

所要時間:16秒

[root@stormtest storm]# date;bin/storm deactivate ExclamationTopology;date
2015年  4月  9日 木曜日 00:33:04 JST
Start exec_storm_class 2015-04-09 00:33:04
Config get 2015-04-09 00:33:09
Config get and args construct 2015-04-09 00:33:14
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.deactivate ExclamationTopology
2085 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
2098 [main] INFO  backtype.storm.command.deactivate - Deactivated topology: ExclamationTopology
2015年  4月  9日 木曜日 00:33:20 JST
TopologyRebalance

所要時間:16秒

[root@stormtest storm]# date;bin/storm rebalance ExclamationTopology;date
2015年  4月  9日 木曜日 00:36:22 JST
Start exec_storm_class 2015-04-09 00:36:22
Config get 2015-04-09 00:36:27
Config get and args construct 2015-04-09 00:36:32
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.rebalance ExclamationTopology
691  [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
705  [main] INFO  backtype.storm.command.rebalance - Topology ExclamationTopology is rebalancing
2015年  4月  9日 木曜日 00:36:38 JST
Topology kill

所要時間:16秒

[root@stormtest storm]# date;bin/storm kill ExclamationTopology;date
2015年  4月  9日 木曜日 00:37:21 JST
Start exec_storm_class 2015-04-09 00:37:21
Config get 2015-04-09 00:37:26
Config get and args construct 2015-04-09 00:37:32
Running: java -client -Dstorm.options= -Dstorm.home=/opt/storm-0.9.4 (省略) backtype.storm.command.kill_topology ExclamationTopology
583  [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
605  [main] INFO  backtype.storm.command.kill-topology - Killed topology: ExclamationTopology
2015年  4月  9日 木曜日 00:37:37 JST

4.時間がかかっている場所

上記の結果より、下記のことが言えるようです。

  1. 設定ファイルから設定値を読み込む(関数「confvalue」実行)に5秒程かかる
  2. Topologyの起動自体は実は1秒程しかかからない
  3. Topology起動以外のlist/activate/deactivate/rebalance/killは5秒程かかる

そのうち「設定ファイルから設定値を読み込むのに5秒程かかる」については、
下記の通りStormのコマンドを見るに、1項目の設定値を読み込むためにJVMを起動しているためのようではあります。

def confvalue(name, extrapaths):
    global CONFFILE
    command = [
        JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE,
        "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name
    ]
    p = sub.Popen(command, stdout=sub.PIPE) #Javaプロセスを起動している!
    output, errors = p.communicate()
    # python 3
    if not isinstance(output, str):
        output = output.decode('utf-8')
    lines = output.split("\n")
    for line in lines:
        tokens = line.split(" ")
        if tokens[0] == "VALUE:":
            return " ".join(tokens[1:])
    return ""

ただ、単純にJVMを起動するのに時間がかかる・・
というだけであれば、Topologyを起動するのに1秒ですむ所が、
やること自体は確実に少ないlist/activate/deactivate/rebalance/killは5秒程かかるのは筋が通らないんですよね。

とまぁ、とりあえずどこが遅いかはわかったので、
何故遅いか、については次回に詳細を追ってみることにします。

Storm-0.9.4のバージョンアップ内容

こんにちは。
少し間が空いてしまったのですが、
Stormが0.9.4にバージョンアップしたので、バージョンアップ内容をざっとまとめてみます。

  • STORM-559: ZkHosts in README should use 2181 as port.

storm-kafkaにおいて、READMEにあるZooKeeperの接続設定例が「localhost:9092」となっており、
ZooKeeperのデフォルトポートである2181で無い値になっていたので、それを修正したようです。
READMEの修正のため、機能的な変更点はありません。

  • STORM-682: supervisor should handle worker state corruption gracefully.

supervisorがローカルに保持するWorkerプロセスのHeatBeat情報が不整合を起こし、
次起動する際に起動に失敗する事象が発生する問題への対処ですね。
Xと、X.versionという2ファイルが存在するのですが、消す時にX消してX.version失敗すると
その後Supervisorが起動しなくなってしまうというものです。

そう起こる物ではないですが、起こるとStormのローカルファイルを消さないと復旧しないという厄介なものでした。

  • STORM-693: when kafka bolt fails to write tuple, it should report error instead of silently acking.

KafkaBoltでKafkaにメッセージを書き込む際に、失敗しても内容をログ出力するのみでメッセージを破棄する、
という動作だったものをメッセージの処理自体を失敗として扱うようにしてます。

このあたりは設計方針にも依る感は強いですが、とりあえず安全側に倒したというようですね。

  • STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages

他Workerプロセスが何かしらの原因でダウンした場合の通信エラーハンドリングの改善が行われています。
今までは他のプロセスのダウンによって、他のWorkerプロセスも芋ずる式にダウンすることがあったのですが、
それが発生しにくくなるよう、再接続間隔の調整等が行われています。

  • STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.

Supervisorがrebalance等の要因によって動作中のTopologyを自ノード上で動作させる時に落ちる問題への対処ですね。
設定ファイル等が各Supervisorに配信されるのはTopologySubmitのタイミングとなるため、
後からクラスタに追加する等、そのタイミングで配信されなかった場合や、何かしらの原因でファイルを保持しない
SupervisorにTopologyをrebalanceすることは出来ない問題がありました。

それが解消するようです。

今回は機能の追加はないため小さいアップデートになりますが、
STORM-682、STORM-329、STORM-130あたりは地味に重要なアップデートになりますね。
STORM-682は頻度は低いですが一度発生するとわかっていないと対応不可、
STORM-329、STORM-130は普通におき得る問題ですので。

というわけで、内容も確認したのでインストーラや依存バージョンを更新しますか〜。

並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その4:LocalExecutor概要

こんにちは。

Embulkが前回の投稿から今回の投稿までの間にJava用のプラグインもサポートしていますね。
これでようやくプラグインを書けるようになった・・・のですが、
とりあえずプラグインを書くのは裏で行っておくとして、ここではソースコードリーディングを続けます。
Embulk 0.3 & 0.4 の新機能 - リジュームとJavaプラグイン - Blog by Sadayuki Furuhashi

今回はLocalExecutor、つまりローカルでデータの取得→書込処理を行うためのクラスです。

1. LocalExecutorに関連するクラス群

LocalExecutorに関連するクラス群をまとめてみますと以下のような図になります。
正確にはExecクラスによってThreadLocalな変数を生成してExecSessionを保持できるようにして実行・・
といったことも関わってくるのですが、動作を流してみる分にはそれほど影響が無いため、
図中からは省略しています。

実際に動かすにあたっては非常に重要な機構ですが、まずはわかりやすく処理を一本通してみようということで^^;

各クラスの役目は前回書いた内容とほぼ同じですので省略します。

2. LocalExecutorの動作

では、実際のLocalExecutorの動作を見ていきます。

初期化処理(コンストラクタ
  1. 最大スレッド数、システム設定などを初期化する。
  2. ExecutorServiceのスレッド数はデフォルトコア数×2、設定可能
    • ExecutorServiceが実際に使用されるのは本記事的にはしばらく後。

その後、前回の実行結果がErrorで終わっており、再実行状態が存在するかを確認しています。
再実行状態が存在しない場合は通常実行(run)、存在する場合は再実行処理(resume)、となります。
差分としては下記の内容が存在しますが、大まかな流れはrunで大体わかるはずですので、
今回はrunのみ読んでいきます。
#resumeはまた機会があれば。

  1. 通常実行(run)では入力スキーマを設定から読み込む、再実行処理(resume)では入力スキーマを再実行状態から読み込む。
  2. 通常実行(run)ではInputPlugin/OutputPluginのtransactionメソッドを実行、再実行処理(resume)ではresumeメソッドを実行。
  3. 再実行処理(resume)ではresumeState→processStateの結果の移し替えが行われる。
実行処理(run):実行前ブロック
  1. doWith(ExecSession session, ExecAction action)メソッドにてスレッドローカルにExecSessionを保存し、処理を起動。
    • Exec#doWithメソッド内以外ではスレッドローカルにExecSessionが存在しないため、防護機構となります。
  2. スレッド名を一時的に"transaction"に変更する。
    • try-with-resourcesをこういう形で使うのは面白いですね。
  3. LocalExecutor#doRun(ConfigSource config)メソッドで実処理開始
実行処理(run):処理実行部

では、実処理部に入るのですが、文章だけで書いてもわかりにくいため、
ソースをインデントを浅くして1行の長さを長くしたものを下記にはっておきます。
一度ソースを見てから以後の文章を見てください。

private ExecutionResult doRun(ConfigSource config) {
  final ExecutorTask task = config.loadConfig(ExecutorTask.class);

  final InputPlugin in = newInputPlugin(task);
  final List<FilterPlugin> filterPlugins = newFilterPlugins(task);
  final OutputPlugin out = newOutputPlugin(task);

  final ProcessState state = new ProcessState(Exec.getLogger(LocalExecutor.class));
  try {
    ConfigDiff inputConfigDiff = in.transaction(task.getInputConfig(), new InputPlugin.Control() {
      public List<CommitReport> run(final TaskSource inputTask, final Schema inputSchema, final int taskCount) {
        state.initialize(taskCount);
        state.setInputSchema(inputSchema);
        Filters.transaction(filterPlugins, task.getFilterConfigs(), inputSchema, new Filters.Control() {
          public void run(final List<TaskSource> filterTasks, final List<Schema> filterSchemas) {
            Schema outputSchema = last(filterSchemas);
            state.setOutputSchema(outputSchema);
            ConfigDiff outputConfigDiff = out.transaction(task.getOutputConfig(), outputSchema, taskCount,
                new OutputPlugin.Control() {
                  public List<CommitReport> run(final TaskSource outputTask) {
                    task.setInputTask(inputTask);
                    task.setFilterTasks(filterTasks);
                    task.setOutputTask(outputTask);

                    if (taskCount > 0) {
                      process(task.dump(), filterSchemas, taskCount, state);
                      if (!state.isAllCommitted()) {
                        throw state.getRepresentativeException();
                      }
                    } else {
                      // TODO warning?
                    }
                    return state.getOutputCommitReports();
                  }
                });
            state.setOutputConfigDiff(outputConfigDiff);
          }
        });
        return state.getInputCommitReports();
      }
    });
    state.setInputConfigDiff(inputConfigDiff);

    try {
      doCleanup(config, state.buildResumeState(task, Exec.session()));
    } catch (Exception ex) {
      state.logger.warn("Commit succeeded but cleanup failed. Ignoring this exception.", ex); // TODO
    }

    return state.buildExecuteResult();

  } catch (Throwable ex) {
    if (state.isAllCommitted()) {
      // ignore the exception
      return state.buildExecuteResultWithWarningException(ex);
    }
    if (!state.isAnyStarted()) {
      throw ex;
    }
    throw state.buildPartialExecuteException(ex, task, Exec.session());
  }
}

実処理はまずPluginの設定を連結して実際に実行するパイプライン(Taskと呼ぶのが正しい?)を生成し、
それをスレッドに割り振ってExecutorServiceで実行する形を取っています。

あと、InputPlugin、FilterPlugin、OutputPluginは実際はインタフェースのため
実処理は記述されていないのですが、わかりやすくするため下記のクラス群を参考に記述しています。
下記のクラス群に移行している間はその旨を書いています。
あと、FilterPluginは処理として必須の流れではないようなので、今回は省略です。
・InputPlugin>FileInputRunner>FileInputPlugin(LocalFileInputPlugin)
・OutputPlugin>FileOutputRunner>FileOutputPlugin(LocalFileOutputPlugin)

では、これから実処理部の流れを見ていきます。

  1. ExecutorTask、InputPlugin、FilterPluginのリスト、OutputPluginを設定/PluginManagerを用いてロード
  2. InputPluginのtransaction開始
  3. (FileInputRunner)RunnerTaskを設定からロード
  4. (FileInputRunner)FileInputPluginのtransaction開始
  5. (LocalFileInputPlugin)PluginTaskを設定からロード
  6. (LocalFileInputPlugin)PluginTaskの情報を基に、ファイル一覧を取得する。
  7. (LocalFileInputPlugin)処理スレッド数をファイルと同一にして読込み処理開始
  8. (FileInputRunner)Decoders、ParserPluginのtransactionを通し、結果をLocalExecutorから与えられたInputPlugin.Control()に渡す。
    • この段階でようやくLocalExecutor(内部無名クラス)に処理が返ってくるわけですね。
  9. InputPlugin.Control()に渡された結果を受けて、Listに対するtransactionを開始
  10. List中のFilterPluginのtransactionを順々に実行、最後のFilterPluginは結果をLocalExecutorから与えられたFilterPlugin.Control()に渡す。
  11. FilterPlugin.Control()に与えられた結果を受けてOutputPluginのtransaction開始
  12. (FileOutputRunner)RunnerTaskを設定からロード
  13. (FileOutputRunner)FileOutputPluginのtransaction開始
  14. (LocalFileOutputPlugin)PluginTaskを設定からロード
  15. (LocalFileOutputPlugin)シーケンスのチェックを実施し、失敗した場合はエラーとする
  16. (FileOutputRunner)Encoders、FormatterPluginのtransactionを通し、結果をLocalExecutorから与えられたOutputPlugin.Control()に渡す。
    • 再度LocalExecutor(内部無名クラス)に処理が返ります。
  17. それまでのInputTask、FilterTask、OutputTaskをTask情報に設定してLocalExecutorで実際に処理を行うためのTaskを生成する。
  18. TaskをstartProcessorメソッドで非同期/並列に実行。
    • 並列度は初期化処理で書かれた「ExecutorServiceのスレッド数」で決まる。
  19. 実行結果を取得し、各InputPlugin、OutputPluginのCleanup処理を呼び出す
    • cleanup処理も並列実行可能な基本構造となっています。標準Pluginではそこまで本格的なCleanup処理を呼び出しているものはありませんでしたが。
  20. 実行結果を生成する。

その後、Runnerクラス側の方で失敗していた場合は再実行状態を生成し、成功した場合は次回実行用の状態を生成してファイル出力・・
という流れになっています。

とりあえず、LocalExecutorを読んでみましたが、1回では流れを通すのがやっとという感じですね。
ですので、次回以降も内部のEntity構成やInputPluginといったPlugin群の構造等、読んでいこうと思います。

並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その3:run概要&データソース概要

こんにちは。

前回で初期化部分の確認が終わったため、今回は本処理の方に入ってきます。

尚、ServiceLoaderでJavaプラグインがロード出来るかについてはとりあえず一通り読んでからの方針で^^;

まず、基本構造としては
上記の図にあるRunnerが起動の起点となり、初期化を行った後に
run/cleanup/guess/previewの各々の処理に分岐する流れとなっています。

まず今回はrun処理の流れを追ってみることにします。

1. run処理の流れ概要

run処理の流れは下記のようになっています。

  1. 設定ファイル(YAML形式)を読み込む。
  2. 出力ファイル(次回実行用の設定出力先、Resume状態の出力先)の出力可能確認を行う。
  3. Resume状態ファイルを読み込む。
  4. Executorクラスを生成する。
  5. Resume状態にあわせて処理を実行する。
    1. 実行失敗した場合、Resume出力先設定がある場合はそこに状態を記録、設定が存在しない場合はTransactionをクリーンアップする。
    2. 実行成功した場合、Resume設定ファイルを削除する。
  6. 次回実行用の設定ファイルを出力する。

流れ自体は非常にオードソックスなものですね。

2. run処理を実行するクラス群

次はrun処理を実行するクラス群を確認します。
とりあえず、流れとデータの保存先が大体わかる感じまで見てみると、下記のようなクラス構成となっていました。

各クラスの役割は下記になります。

  • ConfigLoader

各種設定ファイルからConfigSourceを生成する設定読込クラス。
今回の範囲ではYamlファイルを読み込んでJsonObjectに変換して返している。

  • ConfigSource/TaskSource/CommitReport/ConfigDiff

設定の設定と取得が行えるインタフェース。
入れ子構造になっており、内部要素を取得したり、内部要素をEntityに変換して返すことが可能。
今回の範囲の実体はModelManagerとJsonObjectを設定したDataSourceImplであり、
ModelManagerをJsonObjectをクラスに変換する際に使用している。

  • Schema

データの読み込み/書き込み先となるデータソースの
スキーマをカラム(インデックス、名称、型情報を保持するカラム定義)のリストとして
保持するスキーマ定義クラス。

  • ResumeState

前回実行失敗時に状態を保存するクラス。
下記の情報を保持する。

    • 設定オブジェクト
    • インプットタスク情報/アウトプットタスク情報
    • インプットスキーマ/アウトプットスキーマ
    • インプットのコミット結果/アウトプットのコミット結果
  • ExecSession

実行時の情報を保持するクラス。
設定関連のオブジェクトの他にPluginManager、BufferAllocatorや
トランザクション時間(記録するための値)を保持する。

  • LocalExecutor

ローカルで実際のInput/Output処理を実行するクラス。
与えられた設定を元にInput/Output処理を実行し、結果を記録する。
現状、embulkの実処理を実行するコアクラス。

  • ProcessState

プロセスの現在の実行状態を保持するクラス。
Processorの数だけ実行情報を保持する。
保持している状態は下記のとおり。

    • 開始/終了状態
    • インプットタスク情報/アウトプットタスク情報
    • 発生例外情報
    • インプットスキーマ/アウトプットスキーマ
    • インプットのコミット結果/アウトプットのコミット結果
    • インプットの前回との設定差分/アウトプットの前回との設定差分

と、こう見てみるとよく見た構造を持つ4クラス(ConfigSource/TaskSource/CommitReport/ConfigDiff)が気になりますね。
ですので、まずはこのデータソース周りを見て見ます。

3. データソース関連クラス群

データソース関連クラス群をまとめてみると、下記のようになります。

クラス構造を見てもわかるとおり、ConfigSource/TaskSource/CommitReport/ConfigDiffの実体はDataSourceImplクラスであり、
実際にデータはJacksonのObjectNodeとして保持されています。

その上で、DataSourceSerDeクラスに記述されたSerDeModule(下記)によって
各種データソースのシリアライザ/デシリアライザが紐付けられています。
この紐付けによってこれらのデータソースのクラスを指定することで
設定ファイルから読み込んでキャストし、ロードすることができるわけですね。

public static class SerDeModule extends SimpleModule
{
    public SerDeModule(final ModelManager model)
    {
        // DataSourceImpl
        addSerializer(DataSourceImpl.class, new DataSourceSerializer<DataSourceImpl>());
        addDeserializer(DataSourceImpl.class, new DataSourceDeserializer<DataSourceImpl>(model));

        // ConfigSource
        addSerializer(ConfigSource.class, new DataSourceSerializer<ConfigSource>());
        addDeserializer(ConfigSource.class, new DataSourceDeserializer<ConfigSource>(model));

        // TaskSource
        addSerializer(TaskSource.class, new DataSourceSerializer<TaskSource>());
        addDeserializer(TaskSource.class, new DataSourceDeserializer<TaskSource>(model));

        // CommitReport
        addSerializer(CommitReport.class, new DataSourceSerializer<CommitReport>());
        addDeserializer(CommitReport.class, new DataSourceDeserializer<CommitReport>(model));

        // ConfigDiff
        addSerializer(ConfigDiff.class, new DataSourceSerializer<ConfigDiff>());
        addDeserializer(ConfigDiff.class, new DataSourceDeserializer<ConfigDiff>(model));
    }
}

ModelManagerはさまざまなクラスのシリアライザ/デシリアライザを
内包したObjectMapperを保持しており、それと上記のシリアライザ/デシリアライザを連携させ、
インジェクションすることでValueObjectをデータを保持するところから
非常に短い、シンプルなコードでロードすることが可能になっています。

なお、このあたりを見ていくとSessionTask、ExecutorTaskといったTaskオブジェクトの
リアライザ/デシリアライザ定義なども見えてくるわけですが、
そこまで見ていくときりがないんですよね(汗

そのあたりのエンティティのシリアライズ/デシリアライズ周りは動作自体にはそう影響してこない・・
と言いたいのですが、このあたりのValueObjectの管理をいかに楽に行うかが
ValueObjectを大量に扱うプロダクトの鍵ですので、このシリアライズ/デシリアライズ機構も
embulkの重要な機能のひとつになります。

とはいえ、run処理の流れとデータソース周りの概要が見えた段階で今回はここまでにして、
次回はLocalExecutorの詳細か、ModelManagerの詳細を見ていきます。

どちらにするかはまぁその時にでも。

並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その2:初期化

こんにちは。

前回は起動時のさわりだけでしたので、今回からまともに中身を読む形になりますね^^;

1.クラス概要構成

JRubyによる起動処理部分が終わり、
Javaに戻ったのでまずはJavaクラスの概要構成から確認してみます。

概要の構成はこれだけです。
EmbulkはGuiceによるインジェクションで必要なオブジェクトを取得して
使用する形になるので、固定的に起点となるRunnerクラスから参照が行われているのは
上記の図の要素だけになります。

各クラスの解説は下記の通りです。

Runner

JRubyから起動されるEmbulkの本来のメインクラス。
各種オブジェクトの初期化後、run/cleanup/guess/previewといった個別コマンドの処理を行うクラス。

DataSourceImpl

システムプロパティに設定されたembulk関連の定義(embulk.で始まるもの)を
JSON形式で読み込んで保持し、Embulkプロセス内で自由に使用するために用いられるクラス。

EmbulkService

下記のモジュール類を用いて
インジェクション用のオブジェクト(Guice:Injector、SpringだったらContextみたいなもの)を
生成するクラス。

  • SystemConfigModule
  • ExecModule
  • ExtensionServiceLoaderModule
  • BuiltinPluginSourceModule
  • JRubyScriptingModule

・・という動作のため、上記の〜Moduleクラス群が
実際にインジェクションされるクラスを設定している形になりますね。

2.Moduleクラス群の設定内容

〜Moduleクラスの処理を見てみると下記のようになります。

SystemConfigModule
  • bind定義:DataSourceImpl > ConfigSource(@ForSystemConfig)
ExecModule
  • bind定義:LoggerProvider > ILoggerFactory(Provider)
  • bind定義:ModelManager(Singleton)
  • bind定義:BufferAllocator > BufferAllocator(Singleton)
  • bind定義:GuessExecutor.GuessParserPlugin > ParserPlugin(@Named("ParserPlugin.system_guess"))
  • bind定義:SamplingParserPlugin > ParserPlugin(@Named("ParserPlugin.system_sampling"))

Plugin系のクラスについてはアノテーションでPluginと名前を指定してインジェクションする形になりますが、
FileInputPlugin実装クラスとFileOutputPlugin実装クラスについては
Pluginの実行制御を行うFileInputRunner/FileOutputRunnerという形で登録されるようです。
理由は今後読み進めた際に確認してみます。

  • bind定義:ObjectMapper > ObjectMapperProvider

上記のObjectMapperProviderにおいて、
TimeZone、Timestamp、Charset、GuavaDataType、JodaDataTypeのオブジェクト変換に対応する
ObjectMapperが生成されています。
文字列が時刻として上手く解釈されるのはここで生成されるObjectMapperのおかげですかね。

ExtensionServiceLoaderModule

ServiceLoaderを用いてExtension定義(META-INF/services/org.embulk.exec.Extensionから読み込んだ結果)を
読み込み、Extensionを介してModuleのBindを行う。

現状、「org.embulk.standards.StandardPluginExtension」が定義されており、
embulk-standardプロジェクトで作成されたPluginがBindされています。
これと同じ方式でPluginをロードする流れを作れれば、JVM系言語で開発したPluginもロードが出来そうです。

JVM系言語で追加のPluginを記述したければ、下記の4手順を踏む必要がありそうです。

  1. 追加Pluginを開発する。
  2. 「追加Plugin」をロードするModuleを開発する。
  3. 「追加PluginをロードするModule」を返すExtensionを開発する。
  4. Extension定義に「追加PluginをロードするModuleを返すExtension」のクラスを記述する。

ただ、これは自前でEmbulkをビルドするならいくらでも可能でしょうけど、
Pluginとして提供は・・可能なんですかね。
ServiceLoaderがクラスパス上に存在する「META-INF/services/org.embulk.exec.Extension」を全て読み込む、
という動作であれば、Pluginとして提供するJarに「META-INF/services/org.embulk.exec.Extension」を含めておき、
Pluginとして提供するJarファイルがEmbulk起動時のクラスパスに配置されていれば可能にはなります。

実際どうなるかはやってみるしかなさそうではあります。

BuiltinPluginSourceModule
  • bind定義:InjectedPluginSource > PluginSource(Multi)

Multibinderを使用していますが、現状InjectedPluginSourceのみがBindされています。

JRubyScriptingModule
  • bind定義:ScriptingContainerProvider > ScriptingContainer(Provider、Singleton)
  • bind定義:JRubyPluginSource > PluginSource(Multi)

PluginSourceに対するMultibinderにJRubyPluginSourceも設定されていますが、
この場合PluginManagerで両方のPluginSourceがロードされるんでしょうか・・?


・・と、こんな感じの初期化処理でした。
JVM系言語で作成されたPluginをロードするための流れが見えたのは非常に収穫がありました。

尚、今回Guiceを使ったコードを始めてきちんと読んでみましたが、
インジェクション用の定義をコードに書く分、Springよりも直観的に読みやすくわかりやすいですね。
ただ、その代わりインジェクションするものを変える場合はコードを修正する=再ビルドする、
が絡んでくるわけですが。

今回のEmbulkのようにインジェクションするものは固定で良くて、
プラグインのロードによって機能を設定する、という用途では非常に使いやすそうです。

並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その1

こんにちは。

ようやくKinesisSpoutが一段楽したので次のネタを。

先日「データ転送ミドルウェア勉強会」が開催され、
そこでバルクデータロードツール『Embulk』が公開されました。

データのバルクロードというと、定番のOSSというのがなくて、
HDFSにバルクデータをロードする時はhadoopコマンドで行う・・などを行っていたのですが、
それがツールでできるというのは非常にありがたいですね。

で、既に使ってみた方の事例はいくつか挙がっていますので、実際にどう作られているかを見てみようと思います。
・・・ええ、Javaプラグインが書けるようになるまで実際に動かすかソース読むしか出来ないからですね。

1.embulkのモジュール構成

embulkのGitHubを確認してみますと、下記3つのモジュールで構成されています。

  1. embulk-cli
  2. embulk-core
  3. embulk-standards

各モジュールが何か、を見てみるとどうやら下記のような感じのようです。

embulk-cli

embulkをjavaコマンドで起動した際に呼ばれるMainクラスのみを保持するモジュール。
起動引数の前に「classpath:embulk/command/embulk.rb」を追加し、JRubyを呼び出しているのみです。
・・ソースがJavaと期待して読み始めるとしょっぱなからRuby突入している!?

embulk-core

embulkのコアモジュールでモジュールをロードする機能や実行する機能を保持。

embulk-standards

下記のような基本機能と、ロード処理を行うモジュール。

  1. CSVファイルフォーマッタ、パーサ、トークナイザー
  2. GZipファイルのエンコーダ/デコーダ
  3. ローカルファイル出力/入力
  4. NullOutput
  5. S3への出力
  6. 標準出力

2.起動の流れ

軌道制御部分がJRubyで書かれていることがわかったため、KinesisSpoutのようにJavaコードから追っていくやり方は多分出来ない・・・
ということで、JRubyの起動処理部分を追ってみます。
Rubyについては構文を知っている位のレベルなのでボケかましていたら生温かく突っ込んで頂けると幸いです。
尚、Rubyのコードはlibディレクトリ配下に配置されていました。

起動する際にはJRuby経由で「embulk/command/embulk.rb」が呼び出され、そこから起動します。

embulk.rb
  1. 環境変数「EMBULK_BUNDLE_PATH」または起動引数を基にGemのインストールパスを取得する。
  2. Gemのインストールパスを指定してEmbulk#run(embulk_run.rb)を呼び出す。
    • 指定されていない場合はembulk_runで設定される。
embulk_run.rb
  1. 起動引数から「-」が付与されない引数のうちはじめの引数を取得し、「サブコマンド」とする。
    • 「サブコマンド」が存在しない場合はその時点でusageエラー
  2. 「サブコマンド」が下記のいずれかの場合は指定引数に応じて後のusageメッセージに内容を追加。
    • bundle/run/preview/guess/example(usageメッセージの追加はなし)
  3. 「サブコマンド」が上記のいずれでもなく、下記のいずれかの場合は処理を起動
    • gem(Rubyのgemコマンドを起動)、exec(引数を用いてそのままプロセス起動)
  4. その他のオプションの形式チェックを実施、NGであればusageエラー
  5. 「サブコマンド」に応じて下記のように処理を分岐
    • bundle
      • bundleディレクトリをコピー後、bundlerをインストール後、Bundler::CLI開始(詳細はわからず)
    • example
      • 指定パスに対してexampleを出力する
    • 上記以外
      1. org.embulk.command.Runnerのコンストラクタに引数をJSONに変換して実行
      2. 「サブコマンド」「設定ファイルパス」を引数として指定してmainメソッドを実行

・・というわけで、CLIからJRubyを起動してGemやClasspathの解決を行い、
org.embulk.command.Runnerから再度Javaコードに戻ってツール本体が本格的に起動する・・
という流れのようです。

早い段階でJavaに戻ってきてちょっと安心。

とりあえず、次回はJavaコードの構造を確認した上でJava側の流れを追ってみます。