夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

SparkバッチをMesosの上で動作させる(その1

こんにちは。

前回Mesos上でのSparkをREPLで動作させることに成功したため、次はバッチアプリケーションとして動作させてみます。

1.バッチアプリケーションのクラスタ対応

まず、前回作成したTextCountApp.scalaを以下のように改造します。

修正個所としては以下の4点。

  1. システムプロパティとして"spark.executor.uri"を指定
  2. SparkContextでMesosのURIを指定
  3. SparkContextのSparkHomeと依存Jarを削除(動くか確認するためにあえてやってます)
  4. README.mdをHDFSから読み込むよう修正
  5. count.sbtにHDFSの依存性を追加&Clouderaのリポジトリを追加(%の数に注意。GroupIdの後に%%とつけるとScalaのバージョン毎のファイルを取得)

■TextCount/src/main/scala/TextCountApp.scala

/*** TextCountApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object TextCountApp {
  def main(args: Array[String]) {
    System.setProperty("spark.executor.uri", "hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz")
    val logFile = "hdfs://spark1/inputdata/README.md"
    val sc = new SparkContext("zk://spark1:2181/mesos", "TextCountApp")
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    val numSparks = logData.filter(line => line.contains("Spark")).count()
    println("Lines with a: %s, Lines with b: %s, Lines with Spark: %s".format(numAs, numBs, numSparks))
  }
}

■TextCount/count.sbt

name := "Count Project"

version := "1.0"

scalaVersion := "2.9.3"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.8.0-incubating"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/repo/"

上記のように修正したらTextCountのディレクトリ配下に移動し、以下のコマンドを実行します。
するとビルドが行われ、Jarファイルが生成されます。

C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount>..\sbt\sbt.cmd package
(省略)
[info]  [SUCCESSFUL ] org.apache.hadoop#hadoop-yarn-server-nodemanager;2.0.0-cdh
4.4.0!hadoop-yarn-server-nodemanager.jar (6345ms)
[info] Done updating.
[success] Total time: 476 s, completed 2013/11/28 8:06:48

ビルドが終わったため、README.mdをHDFSに下記のコマンドでアップ後、実際に実行してみます。すると・・?

# sudo -u hdfs hdfs dfs -mkdir /inputdata
# sudo -u hdfs hdfs dfs -put README.md /inputdata
2.バッチアプリケーション実行(Windowsから)
C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount>..\sbt\sbt.cmd run
[info] Set current project to Count Project (in build file:/C:/Develop/Source/Spark/spark-0.8.0-incubating/TextCount/)
[info] Running TextCountApp
(省略)
[error] (run-main) java.lang.UnsatisfiedLinkError: no mesos in java.library.path

java.lang.UnsatisfiedLinkError: no mesos in java.library.path
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886)
        at java.lang.Runtime.loadLibrary0(Runtime.java:849)
        at java.lang.System.loadLibrary(System.java:1088)
        at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:52)
        at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:64)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:216)
        at TextCountApp$.main(TextCountApp.scala:9)
        at TextCountApp.main(TextCountApp.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
java.lang.RuntimeException: Nonzero exit code: 1
        at scala.sys.package$.error(package.scala:27)
[error] {file:/C:/Develop/Source/Spark/spark-0.8.0-incubating/TextCount/}default-1d6910/compile:run: Nonzero exit code: 1
[error] Total time: 4 s, completed 2013/11/28 8:14:37

Mesosのライブラリが存在しないとエラーが発生しました。
横着してWindows上で動作させたのがよろしくなかったようです(汗
ただ、Mesosのライブラリが起動に必要になる関係上、クラスタで動作させる場合には起動するクライアントもLinux上に
配置されている必要がある・・・ということはわかりました。

では、次はLinux上に場所を移動し、実行してみます。

3.バッチアプリケーション実行(Linuxから)

Linux上にファイルを移動し、TextCountAppにmesosのライブラリパスを追加します。
■TextCount/src/main/scala/TextCountApp.scala

/*** TextCountApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object TextCountApp {
  def main(args: Array[String]) {
    System.setProperty("mesos.navtive.library", "/usr/local/lib/libmesos.so")
    System.setProperty("spark.executor.uri", "hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz")
    val logFile = "hdfs://spark1/inputdata/README.md"
    val sc = new SparkContext("zk://spark1:2181/mesos", "TextCountApp")
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    val numSparks = logData.filter(line => line.contains("Spark")).count()
    println("Lines with a: %s, Lines with b: %s, Lines with Spark: %s".format(numAs, numBs, numSparks))
  }
}
その上で再度ビルドをかけ、実行してみます。
>||
# cd /usr/local/src/spark-0.8.0-incubating/TextCount
# ../sbt/sbt package
(省略)
[info] Packaging /usr/local/src/spark-0.8.0-incubating/TextCount/target/scala-2.9.3/count-project_2.9.3-1.0.jar ...
[info] Done packaging.
[success] Total time: 483 s, completed 2013/11/29 7:38:13
# ../sbt/sbt run
Failed to load native Mesos library from /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
[error] (run-main) java.lang.UnsatisfiedLinkError: no mesos in java.library.path
java.lang.UnsatisfiedLinkError: no mesos in java.library.path
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886)
        at java.lang.Runtime.loadLibrary0(Runtime.java:849)
        at java.lang.System.loadLibrary(System.java:1088)
        at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:52)
        at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:64)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:216)
        at TextCountApp$.main(TextCountApp.scala:10)
        at TextCountApp.main(TextCountApp.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
java.lang.RuntimeException: Nonzero exit code: 1
        at scala.sys.package$.error(package.scala:27)
[error] {file:/usr/local/src/spark-0.8.0-incubating/TextCount/}default-f43806/compile:run: Nonzero exit code: 1
[error] Total time: 2 s, completed 2013/11/29 7:38:59

すると、やはりmesosのライブラリがないとはじかれます。
バッチで実行する場合、java.library.pathに明示的に配置しておく必要があるということでしょうか・・・
仕方がないので、Sparkを起動するマシンでどの環境でもありそうな/usr/lib配下にシンボリックリンクを通しておきます。
本来はenv.sh系を使うべきな気もしますが、sbtから実行した場合どこに仕込めばいいかいまいちわからないため、今回はその方向にはやりません。

# ln -s /usr/local/lib/libmesos.so /usr/lib/libmesos.so
# ../sbt/sbt run
I1129 07:45:21.053050 27173 detector.cpp:234] Master detector (scheduler(1)@192.168.100.246:46140) connected to ZooKeeper ...
I1129 07:45:21.053287 27173 detector.cpp:251] Trying to create path '/mesos' in ZooKeeper
I1129 07:45:21.053953 27173 detector.cpp:420] Master detector (scheduler(1)@192.168.100.246:46140)  found 3 registered masters
I1129 07:45:21.054417 27173 detector.cpp:467] Master detector (scheduler(1)@192.168.100.246:46140)  got new master pid: master@192.168.100.247:5050
[error] (run-main) org.apache.spark.SparkException: Job failed: Task 0.0:1 failed more than 4 times
org.apache.spark.SparkException: Job failed: Task 0.0:1 failed more than 4 times
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
        at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
        at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
java.lang.RuntimeException: Nonzero exit code: 1
        at scala.sys.package$.error(package.scala:27)
[error] {file:/usr/local/src/spark-0.8.0-incubating/TextCount/}default-f43806/compile:run: Nonzero exit code: 1
[error] Total time: 8 s, completed 2013/11/29 7:45:27

残念ながら再度失敗です。
ただ、Mesosへのタスク登録は上手くいったようなので、Mesos側のログの方を確認してみます。

  • stdout
Fetching resources into '/var/run/mesos/slaves/201311130859-4133791936-5050-15712-0/frameworks/201311130859-4150569152-5050-15300-0004/executors/201311130859-4133791936-5050-15712-0/runs/0ef0a3c3-3307-495f-aa31-0dca598bcac4'
Fetching resource 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz'
Downloading resource from 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz'
HDFS command: hadoop fs -copyToLocal 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' './spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz'
Extracting resource: tar xzf './spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz'
Running spark-executor with framework dir = .
  • stderr
13/11/29 07:44:55 INFO executor.MesosExecutorBackend: Registered with Mesos as executor ID 201311130859-4133791936-5050-15712-2
13/11/29 07:44:55 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
13/11/29 07:44:55 INFO spark.SparkEnv: Connecting to BlockManagerMaster: akka://spark@spark1:45675/user/BlockManagerMaster
13/11/29 07:44:55 INFO storage.MemoryStore: MemoryStore started with capacity 324.4 MB.
13/11/29 07:44:55 INFO storage.DiskStore: Created local directory at /tmp/spark-local-20131129074455-7987
13/11/29 07:44:55 INFO network.ConnectionManager: Bound socket to port 40912 with id = ConnectionManagerId(spark1,40912)
13/11/29 07:44:55 INFO storage.BlockManagerMaster: Trying to register BlockManager
13/11/29 07:44:55 INFO storage.BlockManagerMaster: Registered BlockManager
13/11/29 07:44:55 INFO spark.SparkEnv: Connecting to MapOutputTracker: akka://spark@spark1:45675/user/MapOutputTracker
13/11/29 07:44:55 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-6dbae248-f2c3-42d4-9a64-8df2d69fd8b3
13/11/29 07:44:55 INFO server.Server: jetty-7.x.y-SNAPSHOT
13/11/29 07:44:55 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48029
13/11/29 07:44:56 INFO executor.Executor: Running task ID 1
13/11/29 07:44:56 INFO broadcast.HttpBroadcast: Started reading broadcast variable 0
13/11/29 07:44:56 INFO storage.MemoryStore: ensureFreeSpace(228503) called with curMem=0, maxMem=340147568
13/11/29 07:44:56 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 223.1 KB, free 324.2 MB)
13/11/29 07:44:56 INFO broadcast.HttpBroadcast: Reading broadcast variable 0 took 0.216161677 s
13/11/29 07:44:56 ERROR executor.Executor: Exception in task ID 1
java.io.EOFException
	at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)
	at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032)
	at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:68)
	at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:106)
	at org.apache.hadoop.io.UTF8.readChars(UTF8.java:258)
	at org.apache.hadoop.io.UTF8.readString(UTF8.java:250)
	at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
	at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
	at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
	at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1956)
	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1850)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)

見た感じ、Spark用のタスクを取得しようとしてデシリアライズに失敗しているように見えますが・・詳細は不明です。
とりあえず今回はここまでにして、エラーメッセージなどから原因を追ってみることにします。