SparkバッチをMesosの上で動作させる(自作アプリ→またしても失敗
こんにちは。
前回でエラーが発生する原因についてはわかったので、run-exampleと同様の方式で
自作アプリケーションを走らせてみます。
1.ビルド方式更新/アプリ修正
run-exampleはassemblyという形で依存Jarも含めた状態でパッケージされ、それを配布して動作していました。
そのため、自作アプリのTextCountもassemblyを使用可能な形式に更新します。
→ 依存ライブラリを含めない方が配布するJarとしては軽くなるのですが、それをやろうとするとSparkの媒体に入っている
「bin/compute-classpath.sh」(Sparkインストールディレクトリ配下のassembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jarしかクラスパスに追加しない)
も修正する必要が出てくるため、今回はそれは行いません。
本当はSparkのインストール先にdependencyのようなディレクトリを作成して、「bin/compute-classpath.sh」からは
dependencyディレクトリが存在した場合配下のJarをクラスパスに追加する・・・という動作であればいいのですが、
現状Sparkはそういう方式は取らず、sbt-assemblyでまとめられたjarを利用するアプローチだそうです。
確かにsbt-assemblyは非常に扱いやすいといえば扱いやすいのですが。・・・って長いですね(汗
assembly化するのに必要な設定は以下の通りです。
■TextCount/project/plugin.sbt(Spark自体のplugins.sbtをそのまま使用)
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" resolvers += "Spray Repository" at "http://repo.spray.cc/" addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1")
■TextCount/project/build.properties(Spark自体のbuild.propertiesをそのまま使用)
sbt.version=0.12.4
■TextCount/count.sbt
import AssemblyKeys._ name := "Count Project" version := "1.0" scalaVersion := "2.9.3" libraryDependencies += "org.apache.spark" %% "spark-core" % "0.8.0-incubating" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0" resolvers += "Akka Repository" at "http://repo.akka.io/releases/" resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/repo/" assemblySettings
その上で、アプリケーションのコードも下記のように修正します。
■TextCount/src/main/scala/TextCountApp.scala
/*** TextCountApp.scala ***/ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object TextCountApp { def main(args: Array[String]) { if (args.length == 0) { System.err.println("Usage: TextCountApp <master>") System.exit(1) } val sc = new SparkContext(args(0), "TextCountApp", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_APP_JAR"))) val logFile = "hdfs://spark1/inputdata/README.md" val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() val numSparks = logData.filter(line => line.contains("Spark")).count() println("Lines with a: %s, Lines with b: %s, Lines with Spark: %s".format(numAs, numBs, numSparks)) } }
その上で実行してみますと・・?
# cd /usr/local/src/spark-0.8.0-incubating/TextCount # ../sbt/sbt assembly (省略) [error] (*:assembly) deduplicate: different file contents found in the following: [error] /root/.ivy2/cache/com.esotericsoftware.kryo/kryo/bundles/kryo-2.21.jar:org/objenesis/instantiator/gcj/GCJInstantiator.class [error] /root/.ivy2/cache/org.mockito/mockito-all/jars/mockito-all-1.8.5.jar:org/objenesis/instantiator/gcj/GCJInstantiator.class [error] /root/.ivy2/cache/org.objenesis/objenesis/jars/objenesis-1.2.jar:org/objenesis/instantiator/gcj/GCJInstantiator.class [error] Total time: 9 s, completed 2013/12/06 19:33:23
げふ。sbtビルド時に同一クラスが被っているためエラーとなっているようです。
そのためビルド定義に重複時の定義を追加します。
■TextCount/count.sbt
import AssemblyKeys._ name := "count" version := "1.0" scalaVersion := "2.9.3" libraryDependencies += "org.apache.spark" %% "spark-core" % "0.8.0-incubating" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0" resolvers += "Akka Repository" at "http://repo.akka.io/releases/" resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/repo/" assemblySettings mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case PathList("org", "objenesis", xs @ _*) => MergeStrategy.last case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("org", "jboss", "netty", xs @ _*) => MergeStrategy.last case PathList("org", "apache", "commons", "beanutils", xs @ _*) => MergeStrategy.last case PathList("META-INF", xs @ _*) => MergeStrategy.discard case PathList("org", "hamcrest", xs @ _*) => MergeStrategy.last case _ => MergeStrategy.first } }
その上で再度ビルドを行うと、ビルドは成功します。
# cd /usr/local/src/spark-0.8.0-incubating/TextCount # ../sbt/sbt assembly (省略) [info] Packaging /opt/spark-0.8.0-incubating-bin-cdh4/TextCount/target/scala-2.9.3/count-assembly-1.0.jar ... [info] Done packaging. [success] Total time: 28 s, completed 2013/12/06 20:30:21
2.実行スクリプト作成
次はspark-exampleと同様の起動をさせるためのスクリプトを作成しました。
基本的にはspark-exampleと同じですが、指定したJarをアプリケーション用のJarとして読み込めるようにしています。
■run-app
SCALA_VERSION=2.9.3 # Figure out where the Scala framework is installed FWDIR="$(cd `dirname $0`; pwd)" # Export this as SPARK_HOME export SPARK_HOME="$FWDIR" # Load environment variables from conf/spark-env.sh, if it exists if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh fi if [ $# -lt 2 ] then echo "Usage: run-batch <spark-app-jar-path> <spark-exec-class> [<args>]" >&2 exit 1 fi # Get spark app jar path export SPARK_APP_JAR="$FWDIR"/$1 shift # Get spark exec class SPARK_EXEC_CLASS=$1 shift # Since the examples JAR ideally shouldn't include spark-core (that dependency should be # "provided"), also add our standard Spark classpath, built using compute-classpath.sh. CLASSPATH=`$FWDIR/bin/compute-classpath.sh` CLASSPATH="$SPARK_APP_JAR:$CLASSPATH" # Find java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ `command -v java` ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then echo -n "Spark Command: " echo "$RUNNER" -cp "$CLASSPATH" "$SPARK_EXEC_CLASS" "$@" echo "========================================" echo fi exec "$RUNNER" -cp "$CLASSPATH" "$SPARK_EXEC_CLASS" "$@"
では、これで実行を行ってみます。すると・・?
# cd /usr/local/src/spark-0.8.0-incubating # spark-0.8.0-incubating-bin-cdh4]# SPARK_PRINT_LAUNCH_COMMAND=1 ./run-app TextCount/target/scala-2.9.3/count-assembly-1.0.jar TextCountApp zk://spark1:2181/mesos Spark Command: java -cp /opt/spark-0.8.0-incubating-bin-cdh4/TextCount/target/scala-2.9.3/count-assembly-1.0.jar::/opt/spark-0.8.0-incubating-bin-cdh4/conf:/opt/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop2.0.0-mr1-cdh4.4.0.jar TextCountApp zk://spark1:2181/mesos (省略) 13/12/06 21:12:59 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 156.9 KB, free 1155.5 MB) Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.spark.util.InnerClosureFinder has interface org.objectweb.asm.ClassVisitor as super class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) (省略)
・・・げふ。今度はクラス間の齟齬が発生しているようです。
実際、count-assembly-1.0.jarとexampleで使用しているspark-examples-assembly-0.8.0-incubating.jarでは
以下のように「org.objectweb.asm.ClassVisitor」のバージョンが明らかに違います。
・・・assemblyの結果、余計なものを取り込んでしまったようですねーー;
# jar tvf spark-examples-assembly-0.8.0-incubating.jar | grep ClassVisitor (省略) 1524 Tue Dec 06 19:56:20 JST 2011 org/objectweb/asm/ClassVisitor.class (省略) # jar tvf target/scala-2.9.3/count-assembly-1.0.jar | grep ClassVisitor (省略) 853 Tue Jul 07 00:05:22 JST 2009 org/objectweb/asm/ClassVisitor.class (省略)
とはいえ、見た限り原因はsbtの除外定義等に集約されるため、わかりやすくはあります。
・・・ただ、そろそろ長いので一度切って、またビルド定義の修正を試してみます。