読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

SparkバッチをMesosの上で動作させる(exampleバッチ

こんにちは。

前回動かず、かつ原因がよくわからないままだったため、調べてみたのですが・・・
関連があるかもしれないという問題事例が下記のみで、かつ実際には違う事象だったため、解決しませんでした(汗

Spark running with mesos fails when submitting JavaSharkContext based HSQL queries

そのため、そもそもの環境構築が誤っているのか、それともSparkバッチの実装が誤っているだけなのか、
exampleのSparkバッチを用いて確認してみることにしました。

Sparkには以下の場所をみるとかなりの数のサンプルがあります。
一部はローカルモード専用のサンプルですが、Mesos上にデプロイするタイプのサンプル(GroupByTest.scala、SkewedGroupByTest.scala)もあります。
https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/examples

そのため、これらのサンプルを動かす形で切り分けの方を行います。

1.サンプルを動作させてみる

既にビルドされたものを使用するため、バージョンが固定されているspark-0.8.0-incubating-bin-cdh4の方を使用します。
すると・・?

# cd /opt/spark-0.8.0-incubating-bin-cdh4
# ./run-example org.apache.spark.examples.GroupByTest zk://spark1:2181/mesos
(省略)
13/12/04 18:17:23 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (FlatMappedRDD[1] at flatMap at GroupByTest.scala:39)
13/12/04 18:17:23 INFO cluster.ClusterScheduler: Adding task set 0.0 with 2 tasks
13/12/04 18:17:23 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 0 on executor 201311130859-4133791936-5050-15712-0: spark3 (PROCESS_LOCAL) // ★spark3でTask 0開始★
13/12/04 18:17:23 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1662 bytes in 3 ms
13/12/04 18:17:23 INFO cluster.ClusterTaskSetManager: Starting task 0.0:1 as TID 1 on executor 201311130859-4133791936-5050-15712-2: spark1 (PROCESS_LOCAL) // ★spark1でTask 1開始★
13/12/04 18:17:23 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:1 as 1662 bytes in 0 ms
13/12/04 18:17:24 INFO cluster.ClusterTaskSetManager: Re-queueing tasks for 201311130859-4133791936-5050-15712-0 from TaskSet 0.0
13/12/04 18:17:24 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 0.0:0) // ★spark3でTask 0失敗★
13/12/04 18:17:24 INFO scheduler.DAGScheduler: Executor lost: 201311130859-4133791936-5050-15712-0 (epoch 0)
13/12/04 18:17:24 INFO storage.BlockManagerMasterActor: Trying to remove executor 201311130859-4133791936-5050-15712-0 from BlockManagerMaster. // ★spark3でTask 0消去★
13/12/04 18:17:24 INFO storage.BlockManagerMaster: Removed 201311130859-4133791936-5050-15712-0 successfully in removeExecutor
13/12/04 18:17:24 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 2 on executor 201311130859-4133791936-5050-15712-0: spark3 (PROCESS_LOCAL) // ★spark3でTask 2開始★
13/12/04 18:17:24 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1662 bytes in 0 ms
13/12/04 18:17:24 INFO scheduler.DAGScheduler: Host gained which was in lost list earlier: spark3
13/12/04 18:17:24 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager spark1:54395 with 324.4 MB RAM
13/12/04 18:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_1 in memory on spark1:54395 (size: 1057.4 KB, free: 323.4 MB)
13/12/04 18:17:25 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 1792 ms on spark1 (progress: 1/2) // ★spark1でTask 1終了★
13/12/04 18:17:25 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
13/12/04 18:17:25 INFO cluster.ClusterTaskSetManager: Re-queueing tasks for 201311130859-4133791936-5050-15712-0 from TaskSet 0.0
13/12/04 18:17:25 INFO cluster.ClusterTaskSetManager: Lost TID 2 (task 0.0:0) // ★spark3でTask 2失敗★
・・・

spark1に割り振られたタスクは成功しているんですが、spark3に割り振られたタスクが失敗しているようです。
mesosのUIを見てみると以下のようになり、実際spark3のタスクが「LOST」・・・?しています。

そのため、spark3での実行結果コンソール出力を確認してみると、以下の内容でした。

  • stdout
Fetching resources into '/var/run/mesos/slaves/201311130859-4133791936-5050-15712-0/frameworks/201311130859-4150569152-5050-15300-0011/executors/201311130859-4133791936-5050-15712-0/runs/57563278-4e74-4a7e-9371-c139dfbff373'
  • stderr
sh: /opt/spark-0.8.0-incubating-bin-cdh4/spark-executor: そのようなファイルやディレクトリはありません

内容的にはSparkの実行環境が存在しないというものでした。
尚、「run-example」からは「conf/spark-env.sh」も読みこまれており、下記の環境変数は持ったまま起動しています。

export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
export SPARK_EXECUTOR_URI=hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz
export MASTER=zk://spark1:2181/mesos

そのため、「SPARK_EXECUTOR_URI」自体がSparkバッチの際には有効化されず、
REPLの場合にのみ使用されるようなものではないか・・・と考えられます。
尚、GroupByTest.scalaの内容は下記のようになっていました。

package org.apache.spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.util.Random

object GroupByTest {
  def main(args: Array[String]) {
    if (args.length == 0) {
      System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
      System.exit(1)
    }
    
    var numMappers = if (args.length > 1) args(1).toInt else 2
    var numKVPairs = if (args.length > 2) args(2).toInt else 1000
    var valSize = if (args.length > 3) args(3).toInt else 1000
    var numReducers = if (args.length > 4) args(4).toInt else numMappers

    val sc = new SparkContext(args(0), "GroupBy Test",
      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
    
    val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
      val ranGen = new Random
      var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
      for (i <- 0 until numKVPairs) {
        val byteArr = new Array[Byte](valSize)
        ranGen.nextBytes(byteArr)
        arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
      }
      arr1
    }.cache
    // Enforce that everything has been calculated and in cache
    pairs1.count
    
    println(pairs1.groupByKey(numReducers).count)

    System.exit(0)
  }
}

「SPARK_EXECUTOR_URI」を指定するのではなく「SPARK_HOME」を指定しています。
別サーバ上に展開されたSparkの実行ディレクトリパスを指定しておく必要がある・・・となりますね。

「SPARK_EXECUTOR_URI」指定によって実行環境も全て取得&展開されるのは便利ではありましたが、
「SPARK_HOME」方式は毎回Sparkの実行環境をコピーする必要がないため、実際何度も使うようになればこちらの方がよさそうです。
起動も早そうですし。

2.SlaveサーバへのSpark実行環境展開

mesos-masterを配置していたspark1と同じアーカイブであるspark-0.8.0-incubating-bin-cdh4.tgzを展開して準備をします。
尚、slave側ではsbtによるビルドは行っていません。ファイルを展開し、spark-env.shを準備したのみです。

# wget http://spark-project.org/download/spark-0.8.0-incubating-bin-cdh4.tgz
# tar xvzf spark-0.8.0-incubating-bin-cdh4.tgz
# mv spark-0.8.0-incubating-bin-cdh4 /opt/
# cd /opt/spark-0.8.0-incubating-bin-cdh4/
# cp -p conf/spark-env.sh.template  conf/spark-env.sh
# vi  conf/spark-env.sh
(下記の内容を追記)
export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
export SPARK_EXECUTOR_URI=hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz
export MASTER=zk://spark1:2181/mesos

SPARK_EXECUTOR_URIとかは不要だとは思いますが、後で別サーバの追加を楽にするためにmesos-masterと極力環境を合わせています。

3.サンプルを再動作させてみる

では、この状態でサンプルを再動作させてみます。
すると・・・?

# cd /opt/spark-0.8.0-incubating-bin-cdh4
# ./run-example org.apache.spark.examples.GroupByTest zk://spark1:2181/mesos
(省略)
13/12/04 18:44:42 INFO spark.MapOutputTracker: Size of output statuses for shuffle 0 is 177 bytes
13/12/04 18:44:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 0 to spark1:51128
13/12/04 18:44:42 INFO cluster.ClusterTaskSetManager: Finished TID 4 in 137 ms on spark3 (progress: 1/2)
13/12/04 18:44:42 INFO scheduler.DAGScheduler: Completed ResultTask(1, 0)
13/12/04 18:44:42 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 139 ms on spark1 (progress: 2/2)
13/12/04 18:44:42 INFO scheduler.DAGScheduler: Completed ResultTask(1, 1)
13/12/04 18:44:42 INFO cluster.ClusterScheduler: Remove TaskSet 1.0 from pool
13/12/04 18:44:42 INFO scheduler.DAGScheduler: Stage 1 (count at GroupByTest.scala:52) finished in 0.157 s
13/12/04 18:44:42 INFO spark.SparkContext: Job finished: count at GroupByTest.scala:52, took 0.291269363 s
2000

成功しました。
念のためmesosの画面の方も見てみますが、Taskの結果は以下のようになっており、実行成功しています。

というわけで、単にSparkバッチの実装方法が誤っていただけだということがわかりました。ーー;
4.まとめ
とりあえず、今回わかったことは以下です。

  • Spark実行環境のアドホックな展開はREPLでのみ使用できると思われる
  • Sparkバッチを実行する場合は予め実行環境をmesos-slaveサーバ上に展開しておく必要がある

なので、「Spark実行環境のアドホックな展開をSparkバッチで実行しようとしたこと」が誤りだったようですね。

とりあえず原因はわかったので、次回自作Sparkバッチを修正して、試してみます・・・