読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Spark Summit Sched & Perf1 「Spark Performance」

こんにちは。

間が空いてしまっていますが、
Spark Summitの資料をとりあえず読んでみよう、の第2弾です。

今回は「Spark Performance」を見てみました。
概要の次にいきなり性能の資料に入ってしまうあたり趣味を反映している気もしますが^^;

では、見てみます。
あと、この情報はSpark0.8.1準拠だそうです。現状の最新版ですね。

Spark deep dive
  • まず、下記のRDDの記述によって性能が変わることを理解いただきたい。

RDD

(patrick, $24), (matei, $30), (patrick, $1), (aaron, $23), (aaron, $2), (reynold, $10), (aaron, $10)…..

■パターン1(groupByKey)
このパターンの場合、groupByKeyオペレーションを実行したタイミングで全データがネットワークを飛び交い、Groupingが行われる。

spendPerUser = rdd.groupByKey().map(lambda pair: sum(pair[1])).collect()

■パターン2(reduceByKey)
このパターンの場合、shufflingの前にローカルプロセス内でReduceが実行されるため、
ネットワークを飛び交うのはローカルで計算された後の値となる。

spendPerUser = rdd.reduceByKey(lambda x, y: x + y).collect()
    • Sparkの動作を理解することで、性能的に最適なオペレーションを記述することが可能となる。
  • Sparkの動作を理解するための基本内容としては下記がある。
    • RDDパーティション分割された並行コレクション
    • UserはRDDを作成し、RDDを変換し、RDDに対してアクションを実行する。
    • アクションの実行内容はDAGに変換される。
    • DAGは「stage」という単位にコンパイルされて実行される。
    • 各「stage」はタスクの系譜として実行される。
  • stage分割の例としては下記の通り。
sc.textFile("/some-hdfs-data")               // 1.RDD[String]が生成
  .map(line => line.split("\t"))             // 2.1の各要素をsplitし、RDD[List[String]]が生成
  .map(parts => (parts[0], int(parts[1])))   // 3.2の要素を(Listの1要素目,Listの2要素目)というペアにまとめる
  .reduceByKey(_ + _, 3)                     // 4.3のListの1個目をキーにしてListの2要素目を加算する形で3パーティションのRDDを生成
  .collect()                                 // 5.4の結果を配列にまとめる

  • 上記のアプリケーションは下記のようにstageに分割される。

  • 上記のstageは下記のようにまとめることができる。
    • stage1は各RDDに分割して分割RDD毎にタスクを生成し、配分した後に局所性を保ったまま処理を継続している
    • stage2はshuffleでデータを相互通信し、最終的に結果を起動元に応答している

  • Sparkは基本的に下記のような流れでstageを実行している。
    • A. InputFormatか前stageのshuffleからデータをインプットとして受け取る
    • B. stage中のタスクを実行する
    • C. shuffleか、結果の応答/出力が発生した際に結果の出力を行う
  • これらのタスクは下記のようにパイプラインのように実行される(shuffleなど前のstageを待つ必要がある箇所は並列化はされない)

  • 各stageの性能情報はSparkのperf UIから下記のように見ることができる。
    • メモリの使用量、Shuffleの際の書込み量などを確認することが可能。


  • その他Sparkを用いていてよく発生する問題について下記の4フェーズに分けて説明する。
  • 1.タスクの配置/スケジューリング/起動
  • 2.タスクの実行
  • 3.stage間のデータの通信
  • 4.結果の取得
  • 1.タスクの配置/スケジューリング/起動
    • シリアライズするタスクの容量が大きいケース
      • 下記のようなコードを記述した場合、「hash_map」がタスクと一緒にシリアライズされてしまう。
      • 大容量のオブジェクトはRDDに格納すること。
      • 尚、このようなコードはSpark0.9.0以降ではSpark側でWarningを発する。
hash_map = some_massive_hash_map()              // コード中でRDDでない大容量オブジェクトを生成している
rdd.map(lambda x: hash_map(x)).count_by_value() // 大容量オブジェクトを処理する際、シリアライズされて通信した上で実行される
    • 多数の「空の」タスクがfilterオペレーションによって生成されるケース
      • 下記のようなコードを記述した場合、大量の「生存期間の短い(20ms未満が目安)」タスクが生成されてしまい、効率が悪い。
      • 要素が存在しないRDDのpartitionに対してもタスクは生成され、実行されるため。
      • フィルタ後に「coalesce」や「repartition」オペレーションを実行することでRDDを縮小し、処理の断片化を防ぐことができる。
rdd = sc.textFile(“s3n://bucket/2013-data”)
             .map(lambda x: x.split(“\t”))
             .filter(lambda parts: parts[0] == “2013-10-17”)
             .filter(lambda parts: parts[1] == “19:00”)
// フィルタした結果、空になるRDDが大量に発生するフィルタをかけている。

rdd.map(lambda parts: (parts[2], parts[3]).reduceBy…
// mapオペレーションは空のRDDに対しても実行されるため、Taskの起動のオーバーヘッドだけがかかる。
  • 2.タスクの実行
    • 1レコードあたりの負荷が高いタスクを作ってしまうケース
      • 下記のようなコードを記述した場合、「1レコードあたりの負荷」が高いため、実行時間が延びる。
rdd.map(lambda x: 
  conn = new_mongo_db_cursor()
  conn.write(str(x))
  conn.close()) // RDDの1partitionをシリアルに登録となってしまう
      • 上記の場合、下記のようにレコードを分割して登録することによって並列化が可能となり、実行時間が短縮される。
        • その際、mapPartitions か mapWith のオペレーションが適用できる。
rdd.mapPartitions(lambda records:
  conn = new_mong_db_cursor()
  [conn.write(str(x)) for x in records]
  conn.close()) // RDD中の1レコードごとに分割して登録するため、並列化が可能
    • タスク間に偏りがあるケース
      • タスク間に偏りがある場合、stageのレスポンスタイムが一部の遅いタスクによって決定されてしまう
      • データ偏り:データの偏りがある場合はpartitioningに使用するkey値が不適切
        • key値を見直すか、内部でaggregationを行うコンポーネントを作成して対応する。
      • Worker偏り:一部のexecutorが遅い/不安定なノードで実行されてしまう場合
        • spark.speculationを設定するか、そもそも遅い/不安定なノードを取り除く
  • 3.stage間のデータの通信
    • バッファキャッシュが十分に確保できないケース
      • Sparkはshuffleに使用するデータをOSのバッファキャッシュに書き込むため。
        • shuffleに非常に時間がかかっている場合、この疑いがある。
      • もし大容量のshuffleを大容量のヒープ上で実施する場合、何GBかバッファキャッシュ用に確保すること。
        • 経験則では、OSとキャッシュにメモリの20%を残すくらいがちょうどいい。
    • Sparkのローカルディレクトリが設定されていないケース
      • Sparkのローカルディレクトリ(spark.local.dir)はshuffleに使用されるデータが出力される
        • 指定したパスに出力されるが、下記のように複数のパスを指定して負荷分散させることも可能。
        • その上で、noattime、nodiratime設定を行うこと。
spark.local.dir=/mnt1/spark,/mnt2/spark,/mnt3/spark
    • Reducerの数が設定されていないケース
      • デフォルトではReducerは親RDDと同様の値となるが、多すぎる/少なすぎる場合は下記のような問題が発生する。
        • 多すぎる場合:タスクの起動コストが問題となる。(「空の」タスク生成パターンと同じ)
        • 少なすぎる場合:並列化が十分ではなく、時間がかかる。
  • 4.結果の取得
    • 大きすぎる結果を収集してしまうケース
      • 下記のようにファイルから読み込んだ結果を直接収集しようとした場合に問題が発生する場合がある。
sc.textFile(“/big/hdfs/file/”).collect() // 大容量のRDDを直接結果として取得
      • 処理を行う場合、SparkのTaskで演算を行い、小さいサイズの結果を取得するようにする。
      • 出力を行う場合、collectで収集してから書きこむのではなく、SparkのTask中で直接並列出力するようにする。

・・と、こんな感じでした。
やはりどういう形でSparkの性能を活かすか、についてはある程度ベース知識が無いと難しいようですね。
ただ、Spark0.9.0においては警告を出すなどの対応があるため、やってみたらよくわからないけど遅かった・・・
というケースは多少回避できそうでもあります。