【再】SparkのサンプルをMesosの上で動作させる(REPL)
こんにちは。
前回見事に上手くいかなかったため、設定を確認して再度動かしてみることにします。
と言いつつ、エラー自体には以下のメッセージが明確に出ているため、
HDFSのポート設定がHDFSとSpark間でずれているからということは明白なのですが・・・
Twitter上で以下のページの情報を頂き、確認してみると、
8020番ポートはNameNodeのデフォルトポート(設定項目「fs.default.name」で設定)であることがわかりました。
Ports Used by Components of CDH4
そのため、HDFSの設定を以下のように更新し、サービスを再起動します。
・HDFS設定
# vi /etc/hadoop/conf/core-site.xml ※内容の設定部分を下記の値にする ==== <configuration> <property> <name>fs.default.name</name> <value>hdfs://spark1</value> </property> <property> <name>fs.checkpoint.dir</name> <value>/home/hdfs/checkpoint</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/home/hdfs/tmp</value> </property> </configuration> ====
その上で、再度SparkをREPLから実行してみます。すると・・?
# cd /opt/spark-0.8.0-incubating-bin-cdh4/ # ./spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 /_/ Using Scala version 2.9.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_45) (省略) 13/11/24 18:01:21 INFO ui.SparkUI: Started Spark Web UI at http://spark1:4040 13/11/24 18:01:21 WARN spark.SparkContext: Master zk://spark1:2181/mesos does not match expected format, parsing as Mesos URL I1124 18:01:21.642766 5955 detector.cpp:234] Master detector (scheduler(1)@192.168.100.246:43908) connected to ZooKeeper ... I1124 18:01:21.643249 5955 detector.cpp:251] Trying to create path '/mesos' in ZooKeeper I1124 18:01:21.643843 5955 detector.cpp:420] Master detector (scheduler(1)@192.168.100.246:43908) found 3 registered masters I1124 18:01:21.644157 5955 detector.cpp:467] Master detector (scheduler(1)@192.168.100.246:43908) got new master pid: master@192.168.100.247:5050 13/11/24 18:01:21 INFO mesos.MesosSchedulerBackend: Registered as framework ID 201311130859-4150569152-5050-15300-0001 Spark context available as sc. Type in expressions to have them evaluated. Type :help for more information. scala> val data = 1 to 100000 ※1から100000のデータセットを生成 data: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170... scala> val paraData = sc.parallelize(data) ※生成したデータをRDD化 paraData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14 scala> paraData.filter(_< 1000).collect() 13/11/24 18:03:47 INFO spark.SparkContext: Starting job: collect at <console>:17 13/11/24 18:03:47 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:17) with 8 output partitions (allowLocal=false) 13/11/24 18:03:47 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at <console>:17) 13/11/24 18:03:47 INFO scheduler.DAGScheduler: Parents of final stage: List() 13/11/24 18:03:47 INFO scheduler.DAGScheduler: Missing parents: List() 13/11/24 18:03:47 INFO scheduler.DAGScheduler: Submitting Stage 0 (FilteredRDD[1] at filter at <console>:17), which has no missing parents 13/11/24 18:03:47 INFO scheduler.DAGScheduler: Submitting 8 missing tasks from Stage 0 (FilteredRDD[1] at filter at <console>:17) (省略) 13/11/24 18:03:52 INFO scheduler.DAGScheduler: Completed ResultTask(0, 7) 13/11/24 18:03:52 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:17) finished in 5.007 s 13/11/24 18:03:52 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from pool 13/11/24 18:03:52 INFO spark.SparkContext: Job finished: collect at <console>:17, took 5.079664023 s res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, ...
となり、成功しました。
Spark側のWeb UIにも以下の通り表示され、成功していることが確認できました。
Mesos側も以下の通り。
尚、mesos-slaveにはSparkの実行アーカイブがダウンロード&展開されて実行されるため、
REPLを1回実行する度に170MB前後ずつファイル容量は食っていくようです。
何回か実行したくらいでは特に問題になりませんが、ファイルが蓄積されていくことだけは留意する必要がありそう。
※REPLを起動して複数回のオペレーションを実行した場合にはSparkの実行アーカイブは使いまわされました。
mesos-slaveで実行したタスクの標準出力/標準エラーへの出力は以下のようになっていました。
標準出力側ではまんまコマンドを実行してファイルを展開して実行していることがわかりますね。
標準エラー側では同時に2タスクを並列実行していることがわかります。
- stdout
Fetching resources into '/var/run/mesos/slaves/201311130859-4133791936-5050-15712-2/frameworks/201311130859-4150569152-5050-15300-0001/executors/201311130859-4133791936-5050-15712-2/runs/bf19bfb6-e8c3-4a2f-a823-624ea8f872b8' Fetching resource 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' Downloading resource from 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' HDFS command: hadoop fs -copyToLocal 'hdfs://spark1/sparkarchive/spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' './spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' Extracting resource: tar xzf './spark-0.8.0-2.0.0-mr1-cdh4.4.0.tgz' Running spark-executor with framework dir = .
- stderr
13/11/24 18:03:50 INFO executor.MesosExecutorBackend: Registered with Mesos as executor ID 201311130859-4133791936-5050-15712-2 13/11/24 18:03:51 INFO executor.Executor: Using REPL class URI: http://192.168.100.246:57540 13/11/24 18:03:51 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/11/24 18:03:51 INFO spark.SparkEnv: Connecting to BlockManagerMaster: akka://spark@spark1:46382/user/BlockManagerMaster 13/11/24 18:03:51 INFO storage.MemoryStore: MemoryStore started with capacity 324.4 MB. 13/11/24 18:03:51 INFO storage.DiskStore: Created local directory at /tmp/spark-local-20131124180351-382b 13/11/24 18:03:51 INFO network.ConnectionManager: Bound socket to port 57184 with id = ConnectionManagerId(spark1,57184) 13/11/24 18:03:51 INFO storage.BlockManagerMaster: Trying to register BlockManager 13/11/24 18:03:51 INFO storage.BlockManagerMaster: Registered BlockManager 13/11/24 18:03:51 INFO spark.SparkEnv: Connecting to MapOutputTracker: akka://spark@spark1:46382/user/MapOutputTracker 13/11/24 18:03:51 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-68034881-002c-4e7b-84d1-03802bb02c30 13/11/24 18:03:51 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/11/24 18:03:51 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:45525 13/11/24 18:03:51 INFO executor.Executor: Running task ID 4 13/11/24 18:03:51 INFO executor.Executor: Running task ID 1 13/11/24 18:03:52 INFO executor.Executor: Its epoch is 0 13/11/24 18:03:52 INFO executor.Executor: Its epoch is 0 13/11/24 18:03:52 INFO executor.Executor: Serialized size of result for 4 is 409 13/11/24 18:03:52 INFO executor.Executor: Finished task ID 4 13/11/24 18:03:52 INFO executor.Executor: Serialized size of result for 1 is 409 13/11/24 18:03:52 INFO executor.Executor: Finished task ID 1 13/11/24 18:03:52 INFO executor.Executor: Running task ID 7 13/11/24 18:03:52 INFO executor.Executor: Its epoch is 0 13/11/24 18:03:52 INFO executor.Executor: Serialized size of result for 7 is 409 13/11/24 18:03:52 INFO executor.Executor: Finished task ID 7
後は、実際にタスクを実行している瞬間においてはmesos-masterのUIを確認してみるとCPUが6個(=1mesos-slave毎に2CPU)使われていることが確認できました。
おそらく、1タスク毎に1CPU使用しているため同時の実行並列数が2になっていると思われます。
また、タスク実行後はSpark shellがメモリを2GB確保した状態になっていました。
これもRDDを実際に生成したからこうなったと思われます。
ともあれ、これでSpark on Mesosの最初の1歩がようやく動いた形になります。
以下のようにたくさん試す項目は残っていますが、それはまた次回以降に。
やり方はわかってきたので、いつからかはAWS上でオンデマンドに検証用クラスタを用意したい感じでもあります。
とまぁ、それもどこまでいけるかもわかりませんが^^;
- REPLからではなく、バッチ実行でSparkジョブを動作させる
- HDFSからのデータ取得→データ出力確認
- 外部ライブラリを必要とするケースの確認
- Chronos/Marathon/Aurora on mesosの確認
- cgroupsを使用するケースのmesosの確認
- Docker on mesosの確認
- Storm on mesosの確認
- Summingbird on mesosの確認