SparkバッチをMesosの上で動作させる(自作アプリ→一部成功
こんにちは。
前回はビルド時にバージョン違いのJarが紛れてしまうことで動作しなかったので、
まずはビルド定義の見直しから行ってみます。
1.sbtのビルド定義を修正
Sparkのビルド定義(SparkBuild.scala)を見てみると、下記のように依存性を除外するルールが存在していました。
#共通的なもののみ。Cassandra等は別途大量に依存性を除外している。
■SparkBuild.scala
val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson") val excludeNetty = ExclusionRule(organization = "org.jboss.netty") val excludeAsm = ExclusionRule(organization = "asm") val excludeSnappy = ExclusionRule(organization = "org.xerial.snappy")
その上で、今回使用しているライブラリを見てみると、hadoop-clientに以下のような除外定義が設定されていました。
hadoop-clientの依存性を一部除外すればクラスのバージョンは統一できそうです。
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
そのため、自作アプリのビルド定義(hadoop-client部)を下記のように修正しました。
■count.sbt
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0" excludeAll( ExclusionRule(organization = "org.codehaus.jackson"), ExclusionRule(organization = "org.jboss.netty"), ExclusionRule(organization = "asm") )
その上で、ビルドをかけてみてクラスファイルのサイズを比較してみますと・・?
■count.sbt
# cd /opt/spark-0.8.0-incubating-bin-cdh4/TextCount # ../sbt/sbt clean assembly (省略) [info] Packaging /opt/spark-0.8.0-incubating-bin-cdh4/TextCount/target/scala-2.9.3/count-assembly-1.0.jar ... [info] Done packaging. [success] Total time: 33 s, completed 2013/12/13 8:35:46 # jar tvf target/scala-2.9.3/count-assembly-1.0.jar | grep ClassVisitor (省略) 1524 Tue Dec 06 19:56:20 JST 2011 org/objectweb/asm/ClassVisitor.class (省略)
「spark-examples-assembly-0.8.0-incubating.jar」と同じバージョンのクラスファイルになっていることを確認しました。
2.再度アプリケーションを実行してみると・・?
ビルドがうまく行ったのを確認したので、再度アプリケーションを走らせてみます。
# cd /opt/spark-0.8.0-incubating-bin-cdh4 # SPARK_PRINT_LAUNCH_COMMAND=1 ./run-app TextCount/target/scala-2.9.3/count-assembly-1.0.jar TextCountApp zk://spark1:2181/mesos (省略) [info] Packaging /opt/spark-0.8.0-incubating-bin-cdh4/TextCount/target/scala-2.9.3/count-assembly-1.0.jar ... [info] Done packaging. [success] Total time: 33 s, completed 2013/12/13 8:35:46 # jar tvf target/scala-2.9.3/count-assembly-1.0.jar | grep ClassVisitor 13/12/13 08:39:04 INFO cluster.ClusterTaskSetManager: Lost TID 1 (task 0.0:1) 13/12/13 08:39:04 INFO cluster.ClusterTaskSetManager: Loss was due to java.io.EOFException (省略) java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:68) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:106) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:258) at org.apache.hadoop.io.UTF8.readString(UTF8.java:250) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:135) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) (省略) 13/12/13 08:39:04 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool 13/12/13 08:39:04 INFO scheduler.DAGScheduler: Failed to run count at TextCountApp.scala:19 Exception in thread "main" org.apache.spark.SparkException: Job failed: Task 0.0:0 failed more than 4 times at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) 13/12/13 08:39:04 INFO cluster.ClusterScheduler: Ignoring update from TID 9 because its task set is gon
と、残念ながらまたしても実行に失敗しました。
ですが、エラーメッセージを見てみるとTaskがLostする形で失敗したのではなく、
「TextCountApp.scala」の19行目で明示的に失敗した旨が出力されています。
「TextCountApp.scala」は下記のようになっているため、hdfsからのファイル取得/展開に失敗した・・という形のようです。
おそらく遅延評価が走っているので、実際に例外が発生したのは「sc.textFile」メソッド部と推測されます。
■TextCountApp.scala
(省略) 13 val sc = new SparkContext(args(0), "TextCountApp", 14 System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_APP_JAR"))) 15 16 val logFile = "hdfs://spark1/inputdata/README.md" 17 18 val logData = sc.textFile(logFile, 2).cache() 19 val numAs = logData.filter(line => line.contains("a")).count() (省略)
そのため、読み取り元を修正し、ローカルのファイルを読み込むよう修正してみます。
■TextCountApp.scala
(省略) 13 val sc = new SparkContext(args(0), "TextCountApp", 14 System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_APP_JAR"))) 15 16 val logFile = "/opt/spark-0.8.0-incubating-bin-cdh4/README.md" 17 18 val logData = sc.textFile(logFile, 2).cache() 19 val numAs = logData.filter(line => line.contains("a")).count() (省略)
その上でビルド→実行をしてみますと・・?
# cd /opt/spark-0.8.0-incubating-bin-cdh4/TextCount # ../sbt/sbt clean assembly # cd /opt/spark-0.8.0-incubating-bin-cdh4 # SPARK_PRINT_LAUNCH_COMMAND=1 ./run-app TextCount/target/scala-2.9.3/count-assembly-1.0.jar TextCountApp zk://spark1:2181/mesos (省略) 13/12/13 08:49:41 INFO scheduler.DAGScheduler: Failed to run count at TextCountApp.scala:19 Exception in thread "main" org.apache.spark.SparkException: Job failed: Task 0.0:1 failed more than 4 times at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) 13/12/13 08:49:41 INFO cluster.ClusterScheduler: Ignoring update from TID 9 because its task set is gone
残念ながら再度失敗。
ただ、やはりcountに失敗したという形になっているため、データの初期化の問題のようです。
ファイル入出力系は単に書くだけでなく、他の対応も必要なのかもしれません。
3.アプリケーションの構成を最小構成に変更し、再実行
そのため、ファイル入出力を一度省き、下記のようにアプリケーションを修正しました。
まずは最小のセットから確認を行ってみます。
■TextCountApp.scala
/*** TextCountApp.scala ***/ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object TextCountApp { def main(args: Array[String]) { if (args.length == 0) { System.err.println("Usage: TextCountApp <master>") System.exit(1) } val sc = new SparkContext(args(0), "TextCountApp", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_APP_JAR"))) val exampleApacheLogs = List( """10.10.10.10 - "FRED" [18/Jan/2013:17:56:07 +1100] "GET http://images.com/2013/Generic.jpg | HTTP/1.1" 304 315 "http://referall.com/" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 "" | 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.replace("\n", ""), """10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "POST http://images.com/2013/Generic.jpg | HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 "" | 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.replace("\n", "") ) val dataSet = sc.parallelize(exampleApacheLogs) val numGet = dataSet.filter(line => line.contains("GET")).count() val numPost = dataSet.filter(line => line.contains("POST")).count() val numFred = dataSet.filter(line => line.contains("FRED")).count() println("Lines with GET: %s, Lines with POST: %s, Lines with PRED: %s".format(numGet, numPost, numFred)) } }
再度ビルド→実行しますと・・?
# cd /opt/spark-0.8.0-incubating-bin-cdh4/TextCount
# ../sbt/sbt clean assembly
# cd /opt/spark-0.8.0-incubating-bin-cdh4
# SPARK_PRINT_LAUNCH_COMMAND=1 ./run-app TextCount/target/scala-2.9.3/count-assembly-1.0.jar TextCountApp zk://spark1:2181/mesos
(省略)
13/12/14 13:32:47 INFO scheduler.DAGScheduler: Completed ResultTask(2, 6)
13/12/14 13:32:47 INFO scheduler.DAGScheduler: Stage 2 (count at TextCountApp.scala:35) finished in 0.962 s
13/12/14 13:32:47 INFO spark.SparkContext: Job finished: count at TextCountApp.scala:35, took 0.967689091 s
Lines with GET: 1, Lines with POST: 1, Lines with PRED: 2
|
・・と、ようやくアプリケーションの方が実行成功しました。
Mesosの画面上も「FINISHED」という表示となっているため、問題なさそうです。
ようやくこれでSparkの自作アプリがMesos上で動作することが確認できました。
ファイル入出力周りはまだ問題が発生していますが、そちらはまた継続して対応ということで^^;