SparkバッチをMesosの上で動作させる(exampleバッチ
こんにちは。
前回動かず、かつ原因がよくわからないままだったため、調べてみたのですが・・・
関連があるかもしれないという問題事例が下記のみで、かつ実際には違う事象だったため、解決しませんでした(汗
Spark running with mesos fails when submitting JavaSharkContext based HSQL queries
そのため、そもそもの環境構築が誤っているのか、それともSparkバッチの実装が誤っているだけなのか、
exampleのSparkバッチを用いて確認してみることにしました。
Sparkには以下の場所をみるとかなりの数のサンプルがあります。
一部はローカルモード専用のサンプルですが、Mesos上にデプロイするタイプのサンプル(GroupByTest.scala、SkewedGroupByTest.scala)もあります。
https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/examples
そのため、これらのサンプルを動かす形で切り分けの方を行います。
1.サンプルを動作させてみる
既にビルドされたものを使用するため、バージョンが固定されているspark-0.8.0-incubating-bin-cdh4の方を使用します。
すると・・?
# cd /opt/spark-0.8.0-incubating-bin-cdh4 # ./run-example org.apache.spark.examples.GroupByTest zk://spark1:2181/mesos (省略) 13/12/04 18:17:23 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (FlatMappedRDD[1] at flatMap at GroupByTest.scala:39) 13/12/04 18:17:23 INFO cluster.ClusterScheduler: Adding task set 0.0 with 2 tasks 13/12/04 18:17:23 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 0 on executor 201311130859-4133791936-5050-15712-0: spark3 (PROCESS_LOCAL) // ★spark3でTask 0開始★ 13/12/04 18:17:23 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1662 bytes in 3 ms 13/12/04 18:17:23 INFO cluster.ClusterTaskSetManager: Starting task 0.0:1 as TID 1 on executor 201311130859-4133791936-5050-15712-2: spark1 (PROCESS_LOCAL) // ★spark1でTask 1開始★ 13/12/04 18:17:23 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:1 as 1662 bytes in 0 ms 13/12/04 18:17:24 INFO cluster.ClusterTaskSetManager: Re-queueing tasks for 201311130859-4133791936-5050-15712-0 from TaskSet 0.0 13/12/04 18:17:24 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 0.0:0) // ★spark3でTask 0失敗★ 13/12/04 18:17:24 INFO scheduler.DAGScheduler: Executor lost: 201311130859-4133791936-5050-15712-0 (epoch 0) 13/12/04 18:17:24 INFO storage.BlockManagerMasterActor: Trying to remove executor 201311130859-4133791936-5050-15712-0 from BlockManagerMaster. // ★spark3でTask 0消去★ 13/12/04 18:17:24 INFO storage.BlockManagerMaster: Removed 201311130859-4133791936-5050-15712-0 successfully in removeExecutor 13/12/04 18:17:24 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 2 on executor 201311130859-4133791936-5050-15712-0: spark3 (PROCESS_LOCAL) // ★spark3でTask 2開始★ 13/12/04 18:17:24 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1662 bytes in 0 ms 13/12/04 18:17:24 INFO scheduler.DAGScheduler: Host gained which was in lost list earlier: spark3 13/12/04 18:17:24 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager spark1:54395 with 324.4 MB RAM 13/12/04 18:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_1 in memory on spark1:54395 (size: 1057.4 KB, free: 323.4 MB) 13/12/04 18:17:25 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 1792 ms on spark1 (progress: 1/2) // ★spark1でTask 1終了★ 13/12/04 18:17:25 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1) 13/12/04 18:17:25 INFO cluster.ClusterTaskSetManager: Re-queueing tasks for 201311130859-4133791936-5050-15712-0 from TaskSet 0.0 13/12/04 18:17:25 INFO cluster.ClusterTaskSetManager: Lost TID 2 (task 0.0:0) // ★spark3でTask 2失敗★ ・・・
spark1に割り振られたタスクは成功しているんですが、spark3に割り振られたタスクが失敗しているようです。
mesosのUIを見てみると以下のようになり、実際spark3のタスクが「LOST」・・・?しています。
そのため、spark3での実行結果コンソール出力を確認してみると、以下の内容でした。
- stdout
Fetching resources into '/var/run/mesos/slaves/201311130859-4133791936-5050-15712-0/frameworks/201311130859-4150569152-5050-15300-0011/executors/201311130859-4133791936-5050-15712-0/runs/57563278-4e74-4a7e-9371-c139dfbff373'
- stderr
sh: /opt/spark-0.8.0-incubating-bin-cdh4/spark-executor: そのようなファイルやディレクトリはありません
内容的にはSparkの実行環境が存在しないというものでした。
尚、「run-example」からは「conf/spark-env.sh」も読みこまれており、下記の環境変数は持ったまま起動しています。
export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI=hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz export MASTER=zk://spark1:2181/mesos
そのため、「SPARK_EXECUTOR_URI」自体がSparkバッチの際には有効化されず、
REPLの場合にのみ使用されるようなものではないか・・・と考えられます。
尚、GroupByTest.scalaの内容は下記のようになっていました。
- GroupByTest.scala
package org.apache.spark.examples import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import java.util.Random object GroupByTest { def main(args: Array[String]) { if (args.length == 0) { System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) } var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 var numReducers = if (args.length > 4) args(4).toInt else numMappers val sc = new SparkContext(args(0), "GroupBy Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random var arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte](valSize) ranGen.nextBytes(byteArr) arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) } arr1 }.cache // Enforce that everything has been calculated and in cache pairs1.count println(pairs1.groupByKey(numReducers).count) System.exit(0) } }
「SPARK_EXECUTOR_URI」を指定するのではなく「SPARK_HOME」を指定しています。
別サーバ上に展開されたSparkの実行ディレクトリパスを指定しておく必要がある・・・となりますね。
「SPARK_EXECUTOR_URI」指定によって実行環境も全て取得&展開されるのは便利ではありましたが、
「SPARK_HOME」方式は毎回Sparkの実行環境をコピーする必要がないため、実際何度も使うようになればこちらの方がよさそうです。
起動も早そうですし。
2.SlaveサーバへのSpark実行環境展開
mesos-masterを配置していたspark1と同じアーカイブであるspark-0.8.0-incubating-bin-cdh4.tgzを展開して準備をします。
尚、slave側ではsbtによるビルドは行っていません。ファイルを展開し、spark-env.shを準備したのみです。
# wget http://spark-project.org/download/spark-0.8.0-incubating-bin-cdh4.tgz # tar xvzf spark-0.8.0-incubating-bin-cdh4.tgz # mv spark-0.8.0-incubating-bin-cdh4 /opt/ # cd /opt/spark-0.8.0-incubating-bin-cdh4/ # cp -p conf/spark-env.sh.template conf/spark-env.sh # vi conf/spark-env.sh (下記の内容を追記) export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI=hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz export MASTER=zk://spark1:2181/mesos
SPARK_EXECUTOR_URIとかは不要だとは思いますが、後で別サーバの追加を楽にするためにmesos-masterと極力環境を合わせています。
3.サンプルを再動作させてみる
では、この状態でサンプルを再動作させてみます。
すると・・・?
# cd /opt/spark-0.8.0-incubating-bin-cdh4 # ./run-example org.apache.spark.examples.GroupByTest zk://spark1:2181/mesos (省略) 13/12/04 18:44:42 INFO spark.MapOutputTracker: Size of output statuses for shuffle 0 is 177 bytes 13/12/04 18:44:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 0 to spark1:51128 13/12/04 18:44:42 INFO cluster.ClusterTaskSetManager: Finished TID 4 in 137 ms on spark3 (progress: 1/2) 13/12/04 18:44:42 INFO scheduler.DAGScheduler: Completed ResultTask(1, 0) 13/12/04 18:44:42 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 139 ms on spark1 (progress: 2/2) 13/12/04 18:44:42 INFO scheduler.DAGScheduler: Completed ResultTask(1, 1) 13/12/04 18:44:42 INFO cluster.ClusterScheduler: Remove TaskSet 1.0 from pool 13/12/04 18:44:42 INFO scheduler.DAGScheduler: Stage 1 (count at GroupByTest.scala:52) finished in 0.157 s 13/12/04 18:44:42 INFO spark.SparkContext: Job finished: count at GroupByTest.scala:52, took 0.291269363 s 2000
成功しました。
念のためmesosの画面の方も見てみますが、Taskの結果は以下のようになっており、実行成功しています。
というわけで、単にSparkバッチの実装方法が誤っていただけだということがわかりました。ーー;
4.まとめ
とりあえず、今回わかったことは以下です。
- Spark実行環境のアドホックな展開はREPLでのみ使用できると思われる
- Sparkバッチを実行する場合は予め実行環境をmesos-slaveサーバ上に展開しておく必要がある
なので、「Spark実行環境のアドホックな展開をSparkバッチで実行しようとしたこと」が誤りだったようですね。
とりあえず原因はわかったので、次回自作Sparkバッチを修正して、試してみます・・・