夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache SparkをStandalone Modeで実行してみる

こんにちは。

色々紆余曲折ありましたが、ようやくSparkの動作確認スタートです。
以前構築したMesosクラスタはソースをビルドして作ったいまいち使いにくいものですので、
とりあえず後で再構築するとして、まずはMesosを使わなくていい状態で出来ることを確認していきます。

1.環境準備

※Mesosを使わないため、Windows上で実行しています。

以下のページからソースをダウンロードし、展開します。
http://spark-project.org/download/spark-0.8.0-incubating.tgz

展開後、展開したディレクトリに移動して以下のコマンドを実行。
そうすると依存ファイルがダウンロードされ、Sparkを動作させる準備が整います。
但し、マシンがヘボいと依存性解決とコンパイル、ビルドにやたらと時間がかかります。ひー。
#尚、以下に表示されている時間はコンパイル/ビルドにのみかかった時間。

C:\Develop\Source\Spark\spark-0.8.0-incubating>sbt\sbt.cmd assembly
(省略)
[info] Done packaging.
[info] Packaging C:\Develop\Source\Spark\spark-0.8.0-incubating\examples\target\
scala-2.9.3\spark-examples-assembly-0.8.0-incubating.jar ...
[info] Done packaging.
[success] Total time: 1265 s, completed 2013/11/04 21:36:04
2.Spark Shellの実行

ビルドが完了したので、spark shellを実行してみます。
すると以下のようにSpark Shellが起動し、Web UIが起動しました。

C:\Develop\Source\Spark\spark-0.8.0-incubating>spark-shell.cmd
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.8.0
      /_/
(省略)
13/11/04 21:45:18 INFO ui.SparkUI: Started Spark Web UI at http://haumea:4040
Spark context available as sc.
Type in expressions to have them evaluated.
Type :help for more information.

scala>

実際にアクセスしてみると以下のように実行状態が表示されます。

では実際にshell上でファイルを読込、操作してみます。

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

scala> textFile.count()
res1: Long = 111

scala> textFile.first()
res2: String = # Apache Spark

scala> textFile.foreach(println(_))
(README.mdの内容が表示される)

実際に結果が返ってきますね。
で、上記のコマンドを実行した結果、Spark UIでは以下のように処理状況が表示されています。

あとはフィルタなども実施することが可能です。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at <console>:14

scala> linesWithSpark.foreach(println(_))
# Apache Spark
You can find the latest Spark documentation, including a programming
Spark requires Scala 2.9.3 (Scala 2.10 is not yet supported). The project is
Spark and its example programs, run:
Once you've built Spark, the easiest way to start using it is the shell:
Spark also comes with several sample programs in the `examples` directory.
    ./run-example org.apache.spark.examples.SparkLR local[2]
All of the Spark samples take a `<master>` parameter that is the cluster URL
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
Hadoop, you must build Spark against the same version that your cluster runs.
when building Spark.
When developing a Spark application, specify the Hadoop version by adding the
in the online documentation for an overview on how to configure Spark.
Apache Spark is an effort undergoing incubation at The Apache Software
## Contributing to Spark

とりあえず、これで1プロセス上での動作が可能なことがわかったため、一度Spark Shellは終了します。

scala> exit
3.サンプルのStandalone Appの実行

REPLでの実行ができたため、次はサンプルのStandalone Appを実行してみます。

まずはSparkを展開したディレクトリ配下に「TextCount」というディレクトリを作成し、
その配下に以下の2つのファイルを作成します。
TextCount/src/main/scala/TextCountApp.scala
TextCount/count.sbt

ファイルの内容は以下の通りです。
■TextCount/src/main/scala/TextCountApp.scala

/*** TextCountApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object TextCountApp {
  def main(args: Array[String]) {
    val logFile = "C:/Develop/Source/Spark/spark-0.8.0-incubating/README.md"
    val sc = new SparkContext("local", "TextCountApp", "C:/Develop/Source/Spark/spark-0.8.0-incubating",
      List("target/scala-2.9.3/count-project_2.9.3-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    val numSparks = logData.filter(line => line.contains("Spark")).count()
    println("Lines with a: %s, Lines with b: %s, Lines with Spark: %s".format(numAs, numBs, numSparks))
  }
}

尚、SparkContextの引数は以下の意味を持つそうです。

  • 第1引数:リソースマネージャのURL(Standaloneの場合はlocal)
  • 第2引数:アプリケーション名称
  • 第3引数:Sparkのインストール先ディレクトリ
  • 第4引数:アプリケーションの依存ライブラリ

■TextCount/count.sbt

name := "Count Project"

version := "1.0"

scalaVersion := "2.9.3"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.8.0-incubating"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

上記の2ファイルを作成したらTextCountのディレクトリ配下に移動し、以下のコマンドを実行します。
するとビルドが行われ、Jarファイルが生成されます。

C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount>..\sbt\sbt.cmd package
(省略)
[info] Packaging C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount\target\scala-2.9.3\count-project_2.9.3-1.0.jar ...
[info] Done packaging.
[success] Total time: 7 s, completed 2013/11/04 22:29:24

では、ビルドされて生成されたアプリケーションを実行してみます。
すると、アプリケーションが実行されて結果が表示されます。

C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount>..\sbt\sbt.cmd run
[info] Set current project to Count Project (in build file:/C:/Develop/Source/Spark/spark-0.8.0-incubating/TextCount/)
[info] Running TextCountApp
13/11/04 22:33:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/04 22:33:27 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/04 22:33:27 INFO mapred.FileInputFormat: Total input paths to process : 1
Lines with a: 66, Lines with b: 35, Lines with Spark: 15 // アプリケーションの実行結果
[success] Total time: 6 s, completed 2013/11/04 22:33:28

とりあえず、これでStandaloneのアプリケーションは作成/動作させることができました。
また、アプリケーションの作り方の基本のきはわかった感じですね。

ただ、この先はクラスタにデプロイする形になるため、また環境構築祭りが続きそうですが^^;