夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

SparkバッチをMesosの上で動作させる(自作アプリ→一部成功

こんにちは。
前回はビルド時にバージョン違いのJarが紛れてしまうことで動作しなかったので、
まずはビルド定義の見直しから行ってみます。

1.sbtのビルド定義を修正

Sparkのビルド定義(SparkBuild.scala)を見てみると、下記のように依存性を除外するルールが存在していました。
#共通的なもののみ。Cassandra等は別途大量に依存性を除外している。

■SparkBuild.scala

  val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
  val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
  val excludeAsm = ExclusionRule(organization = "asm")
  val excludeSnappy = ExclusionRule(organization = "org.xerial.snappy")

その上で、今回使用しているライブラリを見てみると、hadoop-clientに以下のような除外定義が設定されていました。
hadoop-clientの依存性を一部除外すればクラスのバージョンは統一できそうです。

"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),

そのため、自作アプリのビルド定義(hadoop-client部)を下記のように修正しました。

■count.sbt

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0" excludeAll(
    ExclusionRule(organization = "org.codehaus.jackson"),
    ExclusionRule(organization = "org.jboss.netty"),
    ExclusionRule(organization = "asm")
)

その上で、ビルドをかけてみてクラスファイルのサイズを比較してみますと・・?
■count.sbt

# cd /opt/spark-0.8.0-incubating-bin-cdh4/TextCount
# ../sbt/sbt clean assembly
(省略)
[info] Packaging /opt/spark-0.8.0-incubating-bin-cdh4/TextCount/target/scala-2.9.3/count-assembly-1.0.jar ...
[info] Done packaging.
[success] Total time: 33 s, completed 2013/12/13 8:35:46
# jar tvf target/scala-2.9.3/count-assembly-1.0.jar | grep ClassVisitor
(省略)
  1524 Tue Dec 06 19:56:20 JST 2011 org/objectweb/asm/ClassVisitor.class
(省略)

「spark-examples-assembly-0.8.0-incubating.jar」と同じバージョンのクラスファイルになっていることを確認しました。

2.再度アプリケーションを実行してみると・・?

ビルドがうまく行ったのを確認したので、再度アプリケーションを走らせてみます。

# cd /opt/spark-0.8.0-incubating-bin-cdh4
# SPARK_PRINT_LAUNCH_COMMAND=1 ./run-app TextCount/target/scala-2.9.3/count-assembly-1.0.jar TextCountApp zk://spark1:2181/mesos
(省略)
[info] Packaging /opt/spark-0.8.0-incubating-bin-cdh4/TextCount/target/scala-2.9.3/count-assembly-1.0.jar ...
[info] Done packaging.
[success] Total time: 33 s, completed 2013/12/13 8:35:46
# jar tvf target/scala-2.9.3/count-assembly-1.0.jar | grep ClassVisitor
13/12/13 08:39:04 INFO cluster.ClusterTaskSetManager: Lost TID 1 (task 0.0:1)
13/12/13 08:39:04 INFO cluster.ClusterTaskSetManager: Loss was due to java.io.EOFException
(省略)
java.io.EOFException
        at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)
        at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032)
        at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:68)
        at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:106)
        at org.apache.hadoop.io.UTF8.readChars(UTF8.java:258)
        at org.apache.hadoop.io.UTF8.readString(UTF8.java:250)
        at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
        at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
        at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
        at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:135)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
(省略)
13/12/13 08:39:04 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool
13/12/13 08:39:04 INFO scheduler.DAGScheduler: Failed to run count at TextCountApp.scala:19
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task 0.0:0 failed more than 4 times
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
        at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
        at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
13/12/13 08:39:04 INFO cluster.ClusterScheduler: Ignoring update from TID 9 because its task set is gon

と、残念ながらまたしても実行に失敗しました。
ですが、エラーメッセージを見てみるとTaskがLostする形で失敗したのではなく、
「TextCountApp.scala」の19行目で明示的に失敗した旨が出力されています。
「TextCountApp.scala」は下記のようになっているため、hdfsからのファイル取得/展開に失敗した・・という形のようです。
おそらく遅延評価が走っているので、実際に例外が発生したのは「sc.textFile」メソッド部と推測されます。
■TextCountApp.scala

(省略)
13    val sc = new SparkContext(args(0), "TextCountApp",
14      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_APP_JAR")))
15
16    val logFile = "hdfs://spark1/inputdata/README.md"
17
18    val logData = sc.textFile(logFile, 2).cache()
19    val numAs = logData.filter(line => line.contains("a")).count()
(省略)

そのため、読み取り元を修正し、ローカルのファイルを読み込むよう修正してみます。
■TextCountApp.scala

(省略)
13    val sc = new SparkContext(args(0), "TextCountApp",
14      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_APP_JAR")))
15
16    val logFile = "/opt/spark-0.8.0-incubating-bin-cdh4/README.md"
17
18    val logData = sc.textFile(logFile, 2).cache()
19    val numAs = logData.filter(line => line.contains("a")).count()
(省略)

その上でビルド→実行をしてみますと・・?

# cd /opt/spark-0.8.0-incubating-bin-cdh4/TextCount
# ../sbt/sbt clean assembly
# cd /opt/spark-0.8.0-incubating-bin-cdh4
# SPARK_PRINT_LAUNCH_COMMAND=1 ./run-app TextCount/target/scala-2.9.3/count-assembly-1.0.jar TextCountApp zk://spark1:2181/mesos
(省略)
13/12/13 08:49:41 INFO scheduler.DAGScheduler: Failed to run count at TextCountApp.scala:19
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task 0.0:1 failed more than 4 times
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
        at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
        at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
13/12/13 08:49:41 INFO cluster.ClusterScheduler: Ignoring update from TID 9 because its task set is gone

残念ながら再度失敗。
ただ、やはりcountに失敗したという形になっているため、データの初期化の問題のようです。
ファイル入出力系は単に書くだけでなく、他の対応も必要なのかもしれません。

3.アプリケーションの構成を最小構成に変更し、再実行

そのため、ファイル入出力を一度省き、下記のようにアプリケーションを修正しました。
まずは最小のセットから確認を行ってみます。
■TextCountApp.scala

/*** TextCountApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object TextCountApp {
  def main(args: Array[String]) {

    if (args.length == 0) {
      System.err.println("Usage: TextCountApp <master>")
      System.exit(1)
    }

    val sc = new SparkContext(args(0), "TextCountApp",
      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_APP_JAR")))

    val exampleApacheLogs = List(
      """10.10.10.10 - "FRED" [18/Jan/2013:17:56:07 +1100] "GET http://images.com/2013/Generic.jpg
        | HTTP/1.1" 304 315 "http://referall.com/" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
        | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
        | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
        | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 ""
        | 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.replace("\n", ""),
      """10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "POST http://images.com/2013/Generic.jpg
        | HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
        | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
        | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
        | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 ""
        | 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.replace("\n", "")
    )

    val dataSet = sc.parallelize(exampleApacheLogs)

    val numGet = dataSet.filter(line => line.contains("GET")).count()
    val numPost = dataSet.filter(line => line.contains("POST")).count()
    val numFred = dataSet.filter(line => line.contains("FRED")).count()
    println("Lines with GET: %s, Lines with POST: %s, Lines with PRED: %s".format(numGet, numPost, numFred))
  }
}

再度ビルド→実行しますと・・?
# cd /opt/spark-0.8.0-incubating-bin-cdh4/TextCount
# ../sbt/sbt clean assembly
# cd /opt/spark-0.8.0-incubating-bin-cdh4
# SPARK_PRINT_LAUNCH_COMMAND=1 ./run-app TextCount/target/scala-2.9.3/count-assembly-1.0.jar TextCountApp zk://spark1:2181/mesos
(省略)
13/12/14 13:32:47 INFO scheduler.DAGScheduler: Completed ResultTask(2, 6)
13/12/14 13:32:47 INFO scheduler.DAGScheduler: Stage 2 (count at TextCountApp.scala:35) finished in 0.962 s
13/12/14 13:32:47 INFO spark.SparkContext: Job finished: count at TextCountApp.scala:35, took 0.967689091 s
Lines with GET: 1, Lines with POST: 1, Lines with PRED: 2
|

・・と、ようやくアプリケーションの方が実行成功しました。
Mesosの画面上も「FINISHED」という表示となっているため、問題なさそうです。

ようやくこれでSparkの自作アプリがMesos上で動作することが確認できました。
ファイル入出力周りはまだ問題が発生していますが、そちらはまた継続して対応ということで^^;