夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

SparkバッチをMesosの上で動作させる(自作アプリ→またしても失敗

こんにちは。

前回でエラーが発生する原因についてはわかったので、run-exampleと同様の方式で
自作アプリケーションを走らせてみます。

1.ビルド方式更新/アプリ修正

run-exampleはassemblyという形で依存Jarも含めた状態でパッケージされ、それを配布して動作していました。
そのため、自作アプリのTextCountもassemblyを使用可能な形式に更新します。
→ 依存ライブラリを含めない方が配布するJarとしては軽くなるのですが、それをやろうとするとSparkの媒体に入っている
  「bin/compute-classpath.sh」(Sparkインストールディレクトリ配下のassembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jarしかクラスパスに追加しない)
  も修正する必要が出てくるため、今回はそれは行いません。
  本当はSparkのインストール先にdependencyのようなディレクトリを作成して、「bin/compute-classpath.sh」からは
  dependencyディレクトリが存在した場合配下のJarをクラスパスに追加する・・・という動作であればいいのですが、
  現状Sparkはそういう方式は取らず、sbt-assemblyでまとめられたjarを利用するアプローチだそうです。
  確かにsbt-assemblyは非常に扱いやすいといえば扱いやすいのですが。・・・って長いですね(汗

assembly化するのに必要な設定は以下の通りです。

■TextCount/project/plugin.sbt(Spark自体のplugins.sbtをそのまま使用)

resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

resolvers += "Spray Repository" at "http://repo.spray.cc/"

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1")

■TextCount/project/build.properties(Spark自体のbuild.propertiesをそのまま使用)

sbt.version=0.12.4

■TextCount/count.sbt

import AssemblyKeys._

name := "Count Project"

version := "1.0"

scalaVersion := "2.9.3"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.8.0-incubating"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/repo/"

assemblySettings

その上で、アプリケーションのコードも下記のように修正します。
■TextCount/src/main/scala/TextCountApp.scala

/*** TextCountApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object TextCountApp {
  def main(args: Array[String]) {

    if (args.length == 0) {
      System.err.println("Usage: TextCountApp <master>")
      System.exit(1)
    }

    val sc = new SparkContext(args(0), "TextCountApp",
      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_APP_JAR")))

    val logFile = "hdfs://spark1/inputdata/README.md"

    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    val numSparks = logData.filter(line => line.contains("Spark")).count()
    println("Lines with a: %s, Lines with b: %s, Lines with Spark: %s".format(numAs, numBs, numSparks))
  }
}

その上で実行してみますと・・?

# cd /usr/local/src/spark-0.8.0-incubating/TextCount
# ../sbt/sbt assembly
(省略)
[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /root/.ivy2/cache/com.esotericsoftware.kryo/kryo/bundles/kryo-2.21.jar:org/objenesis/instantiator/gcj/GCJInstantiator.class
[error] /root/.ivy2/cache/org.mockito/mockito-all/jars/mockito-all-1.8.5.jar:org/objenesis/instantiator/gcj/GCJInstantiator.class
[error] /root/.ivy2/cache/org.objenesis/objenesis/jars/objenesis-1.2.jar:org/objenesis/instantiator/gcj/GCJInstantiator.class
[error] Total time: 9 s, completed 2013/12/06 19:33:23

げふ。sbtビルド時に同一クラスが被っているためエラーとなっているようです。
そのためビルド定義に重複時の定義を追加します。
■TextCount/count.sbt

import AssemblyKeys._

name := "count"

version := "1.0"

scalaVersion := "2.9.3"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.8.0-incubating"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/repo/"

assemblySettings

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case PathList("org", "objenesis", xs @ _*) => MergeStrategy.last
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
    case PathList("org", "jboss", "netty", xs @ _*) => MergeStrategy.last
    case PathList("org", "apache", "commons", "beanutils", xs @ _*) => MergeStrategy.last
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case PathList("org", "hamcrest", xs @ _*) => MergeStrategy.last
    case _ => MergeStrategy.first
  }
}

その上で再度ビルドを行うと、ビルドは成功します。

# cd /usr/local/src/spark-0.8.0-incubating/TextCount
# ../sbt/sbt assembly
(省略)
[info] Packaging /opt/spark-0.8.0-incubating-bin-cdh4/TextCount/target/scala-2.9.3/count-assembly-1.0.jar ...
[info] Done packaging.
[success] Total time: 28 s, completed 2013/12/06 20:30:21
2.実行スクリプト作成

次はspark-exampleと同様の起動をさせるためのスクリプトを作成しました。
基本的にはspark-exampleと同じですが、指定したJarをアプリケーション用のJarとして読み込めるようにしています。
■run-app

SCALA_VERSION=2.9.3

# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"

# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"

# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
  . $FWDIR/conf/spark-env.sh
fi

if [ $# -lt 2 ]
then
  echo "Usage: run-batch <spark-app-jar-path> <spark-exec-class> [<args>]" >&2
  exit 1
fi

# Get spark app jar path
export SPARK_APP_JAR="$FWDIR"/$1
shift

# Get spark exec class
SPARK_EXEC_CLASS=$1
shift

# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_APP_JAR:$CLASSPATH"

# Find java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
  echo -n "Spark Command: "
  echo "$RUNNER" -cp "$CLASSPATH" "$SPARK_EXEC_CLASS" "$@"
  echo "========================================"
  echo
fi

exec "$RUNNER" -cp "$CLASSPATH" "$SPARK_EXEC_CLASS" "$@"

では、これで実行を行ってみます。すると・・?

# cd /usr/local/src/spark-0.8.0-incubating
# spark-0.8.0-incubating-bin-cdh4]# SPARK_PRINT_LAUNCH_COMMAND=1 ./run-app TextCount/target/scala-2.9.3/count-assembly-1.0.jar TextCountApp zk://spark1:2181/mesos
Spark Command: java -cp /opt/spark-0.8.0-incubating-bin-cdh4/TextCount/target/scala-2.9.3/count-assembly-1.0.jar::/opt/spark-0.8.0-incubating-bin-cdh4/conf:/opt/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop2.0.0-mr1-cdh4.4.0.jar TextCountApp zk://spark1:2181/mesos
(省略)
13/12/06 21:12:59 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 156.9 KB, free 1155.5 MB)
Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.spark.util.InnerClosureFinder has interface org.objectweb.asm.ClassVisitor as super class
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
(省略)

・・・げふ。今度はクラス間の齟齬が発生しているようです。
実際、count-assembly-1.0.jarとexampleで使用しているspark-examples-assembly-0.8.0-incubating.jarでは
以下のように「org.objectweb.asm.ClassVisitor」のバージョンが明らかに違います。
・・・assemblyの結果、余計なものを取り込んでしまったようですねーー;

# jar tvf spark-examples-assembly-0.8.0-incubating.jar | grep ClassVisitor
(省略)
  1524 Tue Dec 06 19:56:20 JST 2011 org/objectweb/asm/ClassVisitor.class
(省略)
# jar tvf target/scala-2.9.3/count-assembly-1.0.jar | grep ClassVisitor
(省略)
   853 Tue Jul 07 00:05:22 JST 2009 org/objectweb/asm/ClassVisitor.class
(省略)

とはいえ、見た限り原因はsbtの除外定義等に集約されるため、わかりやすくはあります。
・・・ただ、そろそろ長いので一度切って、またビルド定義の修正を試してみます。