SparkバッチをMesosの上で動作させる(その1
こんにちは。
前回Mesos上でのSparkをREPLで動作させることに成功したため、次はバッチアプリケーションとして動作させてみます。
1.バッチアプリケーションのクラスタ対応
まず、前回作成したTextCountApp.scalaを以下のように改造します。
修正個所としては以下の4点。
- システムプロパティとして"spark.executor.uri"を指定
- SparkContextでMesosのURIを指定
- SparkContextのSparkHomeと依存Jarを削除(動くか確認するためにあえてやってます)
- README.mdをHDFSから読み込むよう修正
- count.sbtにHDFSの依存性を追加&Clouderaのリポジトリを追加(%の数に注意。GroupIdの後に%%とつけるとScalaのバージョン毎のファイルを取得)
■TextCount/src/main/scala/TextCountApp.scala
/*** TextCountApp.scala ***/ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object TextCountApp { def main(args: Array[String]) { System.setProperty("spark.executor.uri", "hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz") val logFile = "hdfs://spark1/inputdata/README.md" val sc = new SparkContext("zk://spark1:2181/mesos", "TextCountApp") val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() val numSparks = logData.filter(line => line.contains("Spark")).count() println("Lines with a: %s, Lines with b: %s, Lines with Spark: %s".format(numAs, numBs, numSparks)) } }
■TextCount/count.sbt
name := "Count Project" version := "1.0" scalaVersion := "2.9.3" libraryDependencies += "org.apache.spark" %% "spark-core" % "0.8.0-incubating" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0" resolvers += "Akka Repository" at "http://repo.akka.io/releases/" resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/repo/"
上記のように修正したらTextCountのディレクトリ配下に移動し、以下のコマンドを実行します。
するとビルドが行われ、Jarファイルが生成されます。
C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount>..\sbt\sbt.cmd package (省略) [info] [SUCCESSFUL ] org.apache.hadoop#hadoop-yarn-server-nodemanager;2.0.0-cdh 4.4.0!hadoop-yarn-server-nodemanager.jar (6345ms) [info] Done updating. [success] Total time: 476 s, completed 2013/11/28 8:06:48
ビルドが終わったため、README.mdをHDFSに下記のコマンドでアップ後、実際に実行してみます。すると・・?
# sudo -u hdfs hdfs dfs -mkdir /inputdata # sudo -u hdfs hdfs dfs -put README.md /inputdata
2.バッチアプリケーション実行(Windowsから)
C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount>..\sbt\sbt.cmd run [info] Set current project to Count Project (in build file:/C:/Develop/Source/Spark/spark-0.8.0-incubating/TextCount/) [info] Running TextCountApp (省略) [error] (run-main) java.lang.UnsatisfiedLinkError: no mesos in java.library.path java.lang.UnsatisfiedLinkError: no mesos in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886) at java.lang.Runtime.loadLibrary0(Runtime.java:849) at java.lang.System.loadLibrary(System.java:1088) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:52) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:64) at org.apache.spark.SparkContext.<init>(SparkContext.scala:216) at TextCountApp$.main(TextCountApp.scala:9) at TextCountApp.main(TextCountApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [error] {file:/C:/Develop/Source/Spark/spark-0.8.0-incubating/TextCount/}default-1d6910/compile:run: Nonzero exit code: 1 [error] Total time: 4 s, completed 2013/11/28 8:14:37
Mesosのライブラリが存在しないとエラーが発生しました。
横着してWindows上で動作させたのがよろしくなかったようです(汗
ただ、Mesosのライブラリが起動に必要になる関係上、クラスタで動作させる場合には起動するクライアントもLinux上に
配置されている必要がある・・・ということはわかりました。
では、次はLinux上に場所を移動し、実行してみます。
3.バッチアプリケーション実行(Linuxから)
Linux上にファイルを移動し、TextCountAppにmesosのライブラリパスを追加します。
■TextCount/src/main/scala/TextCountApp.scala
/*** TextCountApp.scala ***/ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object TextCountApp { def main(args: Array[String]) { System.setProperty("mesos.navtive.library", "/usr/local/lib/libmesos.so") System.setProperty("spark.executor.uri", "hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz") val logFile = "hdfs://spark1/inputdata/README.md" val sc = new SparkContext("zk://spark1:2181/mesos", "TextCountApp") val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() val numSparks = logData.filter(line => line.contains("Spark")).count() println("Lines with a: %s, Lines with b: %s, Lines with Spark: %s".format(numAs, numBs, numSparks)) } } その上で再度ビルドをかけ、実行してみます。 >|| # cd /usr/local/src/spark-0.8.0-incubating/TextCount # ../sbt/sbt package (省略) [info] Packaging /usr/local/src/spark-0.8.0-incubating/TextCount/target/scala-2.9.3/count-project_2.9.3-1.0.jar ... [info] Done packaging. [success] Total time: 483 s, completed 2013/11/29 7:38:13 # ../sbt/sbt run Failed to load native Mesos library from /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib [error] (run-main) java.lang.UnsatisfiedLinkError: no mesos in java.library.path java.lang.UnsatisfiedLinkError: no mesos in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886) at java.lang.Runtime.loadLibrary0(Runtime.java:849) at java.lang.System.loadLibrary(System.java:1088) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:52) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:64) at org.apache.spark.SparkContext.<init>(SparkContext.scala:216) at TextCountApp$.main(TextCountApp.scala:10) at TextCountApp.main(TextCountApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [error] {file:/usr/local/src/spark-0.8.0-incubating/TextCount/}default-f43806/compile:run: Nonzero exit code: 1 [error] Total time: 2 s, completed 2013/11/29 7:38:59
すると、やはりmesosのライブラリがないとはじかれます。
バッチで実行する場合、java.library.pathに明示的に配置しておく必要があるということでしょうか・・・
仕方がないので、Sparkを起動するマシンでどの環境でもありそうな/usr/lib配下にシンボリックリンクを通しておきます。
本来はenv.sh系を使うべきな気もしますが、sbtから実行した場合どこに仕込めばいいかいまいちわからないため、今回はその方向にはやりません。
# ln -s /usr/local/lib/libmesos.so /usr/lib/libmesos.so
# ../sbt/sbt run I1129 07:45:21.053050 27173 detector.cpp:234] Master detector (scheduler(1)@192.168.100.246:46140) connected to ZooKeeper ... I1129 07:45:21.053287 27173 detector.cpp:251] Trying to create path '/mesos' in ZooKeeper I1129 07:45:21.053953 27173 detector.cpp:420] Master detector (scheduler(1)@192.168.100.246:46140) found 3 registered masters I1129 07:45:21.054417 27173 detector.cpp:467] Master detector (scheduler(1)@192.168.100.246:46140) got new master pid: master@192.168.100.247:5050 [error] (run-main) org.apache.spark.SparkException: Job failed: Task 0.0:1 failed more than 4 times org.apache.spark.SparkException: Job failed: Task 0.0:1 failed more than 4 times at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [error] {file:/usr/local/src/spark-0.8.0-incubating/TextCount/}default-f43806/compile:run: Nonzero exit code: 1 [error] Total time: 8 s, completed 2013/11/29 7:45:27
残念ながら再度失敗です。
ただ、Mesosへのタスク登録は上手くいったようなので、Mesos側のログの方を確認してみます。
- stdout
Fetching resources into '/var/run/mesos/slaves/201311130859-4133791936-5050-15712-0/frameworks/201311130859-4150569152-5050-15300-0004/executors/201311130859-4133791936-5050-15712-0/runs/0ef0a3c3-3307-495f-aa31-0dca598bcac4' Fetching resource 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' Downloading resource from 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' HDFS command: hadoop fs -copyToLocal 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' './spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' Extracting resource: tar xzf './spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' Running spark-executor with framework dir = .
- stderr
13/11/29 07:44:55 INFO executor.MesosExecutorBackend: Registered with Mesos as executor ID 201311130859-4133791936-5050-15712-2 13/11/29 07:44:55 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/11/29 07:44:55 INFO spark.SparkEnv: Connecting to BlockManagerMaster: akka://spark@spark1:45675/user/BlockManagerMaster 13/11/29 07:44:55 INFO storage.MemoryStore: MemoryStore started with capacity 324.4 MB. 13/11/29 07:44:55 INFO storage.DiskStore: Created local directory at /tmp/spark-local-20131129074455-7987 13/11/29 07:44:55 INFO network.ConnectionManager: Bound socket to port 40912 with id = ConnectionManagerId(spark1,40912) 13/11/29 07:44:55 INFO storage.BlockManagerMaster: Trying to register BlockManager 13/11/29 07:44:55 INFO storage.BlockManagerMaster: Registered BlockManager 13/11/29 07:44:55 INFO spark.SparkEnv: Connecting to MapOutputTracker: akka://spark@spark1:45675/user/MapOutputTracker 13/11/29 07:44:55 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-6dbae248-f2c3-42d4-9a64-8df2d69fd8b3 13/11/29 07:44:55 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/11/29 07:44:55 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48029 13/11/29 07:44:56 INFO executor.Executor: Running task ID 1 13/11/29 07:44:56 INFO broadcast.HttpBroadcast: Started reading broadcast variable 0 13/11/29 07:44:56 INFO storage.MemoryStore: ensureFreeSpace(228503) called with curMem=0, maxMem=340147568 13/11/29 07:44:56 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 223.1 KB, free 324.2 MB) 13/11/29 07:44:56 INFO broadcast.HttpBroadcast: Reading broadcast variable 0 took 0.216161677 s 13/11/29 07:44:56 ERROR executor.Executor: Exception in task ID 1 java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:68) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:106) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:258) at org.apache.hadoop.io.UTF8.readString(UTF8.java:250) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1956) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1850) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
見た感じ、Spark用のタスクを取得しようとしてデシリアライズに失敗しているように見えますが・・詳細は不明です。
とりあえず今回はここまでにして、エラーメッセージなどから原因を追ってみることにします。