夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

【再】SparkのサンプルをMesosの上で動作させる(REPL)

こんにちは。

前回見事に上手くいかなかったため、設定を確認して再度動かしてみることにします。
と言いつつ、エラー自体には以下のメッセージが明確に出ているため、
HDFSのポート設定がHDFSとSpark間でずれているからということは明白なのですが・・・

Twitter上で以下のページの情報を頂き、確認してみると、
8020番ポートはNameNodeのデフォルトポート(設定項目「fs.default.name」で設定)であることがわかりました。
Ports Used by Components of CDH4

そのため、HDFSの設定を以下のように更新し、サービスを再起動します。
HDFS設定

# vi /etc/hadoop/conf/core-site.xml
※内容の設定部分を下記の値にする
====
<configuration>
        <property>
                <name>fs.default.name</name>
                <value>hdfs://spark1</value>
        </property>
        <property>
                <name>fs.checkpoint.dir</name>
                <value>/home/hdfs/checkpoint</value>
        </property>
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/home/hdfs/tmp</value>
        </property>
</configuration>
====

その上で、再度SparkをREPLから実行してみます。すると・・?

# cd /opt/spark-0.8.0-incubating-bin-cdh4/
# ./spark-shell
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.8.0
      /_/

Using Scala version 2.9.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_45)
(省略)
13/11/24 18:01:21 INFO ui.SparkUI: Started Spark Web UI at http://spark1:4040
13/11/24 18:01:21 WARN spark.SparkContext: Master zk://spark1:2181/mesos does not match expected format, parsing as Mesos URL
I1124 18:01:21.642766  5955 detector.cpp:234] Master detector (scheduler(1)@192.168.100.246:43908) connected to ZooKeeper ...
I1124 18:01:21.643249  5955 detector.cpp:251] Trying to create path '/mesos' in ZooKeeper
I1124 18:01:21.643843  5955 detector.cpp:420] Master detector (scheduler(1)@192.168.100.246:43908)  found 3 registered masters
I1124 18:01:21.644157  5955 detector.cpp:467] Master detector (scheduler(1)@192.168.100.246:43908)  got new master pid: master@192.168.100.247:5050
13/11/24 18:01:21 INFO mesos.MesosSchedulerBackend: Registered as framework ID 201311130859-4150569152-5050-15300-0001
Spark context available as sc.
Type in expressions to have them evaluated.
Type :help for more information.

scala> val data = 1 to 100000
※1から100000のデータセットを生成
data: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170...
scala> val paraData = sc.parallelize(data)
※生成したデータをRDD化
paraData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
scala> paraData.filter(_< 1000).collect()
13/11/24 18:03:47 INFO spark.SparkContext: Starting job: collect at <console>:17
13/11/24 18:03:47 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:17) with 8 output partitions (allowLocal=false)
13/11/24 18:03:47 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at <console>:17)
13/11/24 18:03:47 INFO scheduler.DAGScheduler: Parents of final stage: List()
13/11/24 18:03:47 INFO scheduler.DAGScheduler: Missing parents: List()
13/11/24 18:03:47 INFO scheduler.DAGScheduler: Submitting Stage 0 (FilteredRDD[1] at filter at <console>:17), which has no missing parents
13/11/24 18:03:47 INFO scheduler.DAGScheduler: Submitting 8 missing tasks from Stage 0 (FilteredRDD[1] at filter at <console>:17)
(省略)
13/11/24 18:03:52 INFO scheduler.DAGScheduler: Completed ResultTask(0, 7)
13/11/24 18:03:52 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:17) finished in 5.007 s
13/11/24 18:03:52 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool
13/11/24 18:03:52 INFO spark.SparkContext: Job finished: collect at <console>:17, took 5.079664023 s
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, ...

となり、成功しました。

Spark側のWeb UIにも以下の通り表示され、成功していることが確認できました。


Mesos側も以下の通り。

尚、mesos-slaveにはSparkの実行アーカイブがダウンロード&展開されて実行されるため、
REPLを1回実行する度に170MB前後ずつファイル容量は食っていくようです。
何回か実行したくらいでは特に問題になりませんが、ファイルが蓄積されていくことだけは留意する必要がありそう。
※REPLを起動して複数回のオペレーションを実行した場合にはSparkの実行アーカイブは使いまわされました。

mesos-slaveで実行したタスクの標準出力/標準エラーへの出力は以下のようになっていました。
標準出力側ではまんまコマンドを実行してファイルを展開して実行していることがわかりますね。
標準エラー側では同時に2タスクを並列実行していることがわかります。

  • stdout
Fetching resources into '/var/run/mesos/slaves/201311130859-4133791936-5050-15712-2/frameworks/201311130859-4150569152-5050-15300-0001/executors/201311130859-4133791936-5050-15712-2/runs/bf19bfb6-e8c3-4a2f-a823-624ea8f872b8'
Fetching resource 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz'
Downloading resource from 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz'
HDFS command: hadoop fs -copyToLocal 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' './spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz'
Extracting resource: tar xzf './spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz'
Running spark-executor with framework dir = .
  • stderr
13/11/24 18:03:50 INFO executor.MesosExecutorBackend: Registered with Mesos as executor ID 201311130859-4133791936-5050-15712-2
13/11/24 18:03:51 INFO executor.Executor: Using REPL class URI: http://192.168.100.246:57540
13/11/24 18:03:51 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
13/11/24 18:03:51 INFO spark.SparkEnv: Connecting to BlockManagerMaster: akka://spark@spark1:46382/user/BlockManagerMaster
13/11/24 18:03:51 INFO storage.MemoryStore: MemoryStore started with capacity 324.4 MB.
13/11/24 18:03:51 INFO storage.DiskStore: Created local directory at /tmp/spark-local-20131124180351-382b
13/11/24 18:03:51 INFO network.ConnectionManager: Bound socket to port 57184 with id = ConnectionManagerId(spark1,57184)
13/11/24 18:03:51 INFO storage.BlockManagerMaster: Trying to register BlockManager
13/11/24 18:03:51 INFO storage.BlockManagerMaster: Registered BlockManager
13/11/24 18:03:51 INFO spark.SparkEnv: Connecting to MapOutputTracker: akka://spark@spark1:46382/user/MapOutputTracker
13/11/24 18:03:51 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-68034881-002c-4e7b-84d1-03802bb02c30
13/11/24 18:03:51 INFO server.Server: jetty-7.x.y-SNAPSHOT
13/11/24 18:03:51 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:45525
13/11/24 18:03:51 INFO executor.Executor: Running task ID 4
13/11/24 18:03:51 INFO executor.Executor: Running task ID 1
13/11/24 18:03:52 INFO executor.Executor: Its epoch is 0
13/11/24 18:03:52 INFO executor.Executor: Its epoch is 0
13/11/24 18:03:52 INFO executor.Executor: Serialized size of result for 4 is 409
13/11/24 18:03:52 INFO executor.Executor: Finished task ID 4
13/11/24 18:03:52 INFO executor.Executor: Serialized size of result for 1 is 409
13/11/24 18:03:52 INFO executor.Executor: Finished task ID 1
13/11/24 18:03:52 INFO executor.Executor: Running task ID 7
13/11/24 18:03:52 INFO executor.Executor: Its epoch is 0
13/11/24 18:03:52 INFO executor.Executor: Serialized size of result for 7 is 409
13/11/24 18:03:52 INFO executor.Executor: Finished task ID 7

後は、実際にタスクを実行している瞬間においてはmesos-masterのUIを確認してみるとCPUが6個(=1mesos-slave毎に2CPU)使われていることが確認できました。
おそらく、1タスク毎に1CPU使用しているため同時の実行並列数が2になっていると思われます。

また、タスク実行後はSpark shellがメモリを2GB確保した状態になっていました。
これもRDDを実際に生成したからこうなったと思われます。

ともあれ、これでSpark on Mesosの最初の1歩がようやく動いた形になります。

以下のようにたくさん試す項目は残っていますが、それはまた次回以降に。
やり方はわかってきたので、いつからかはAWS上でオンデマンドに検証用クラスタを用意したい感じでもあります。
とまぁ、それもどこまでいけるかもわかりませんが^^;

  1. REPLからではなく、バッチ実行でSparkジョブを動作させる
  2. HDFSからのデータ取得→データ出力確認
  3. 外部ライブラリを必要とするケースの確認
  4. Chronos/Marathon/Aurora on mesosの確認
  5. cgroupsを使用するケースのmesosの確認
  6. Docker on mesosの確認
  7. Storm on mesosの確認
  8. Summingbird on mesosの確認