読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Resilient Distributed Datasetsに関する論文を読んでみます(4章

こんにちは。

以下論文を読んでみようの続きで、今回は第4章です。

「Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing」
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

内容としては、「Representing RDDs」ということで、RDDsの表現方法についての章になります。
この章くらいまでが実装手前の設計部分で、それ以後が実装内容と検証結果/考察、といった感じですかね。
では、入ります。

抽象化してRDDsを提供する上での課題は様々なtransformationに対して追跡可能な系統グラフをどう表現するか、になる。
理想的にはRDDsを実現するシステムは出来る限り充実したtransformationsを提供し、ユーザにその任意の組み合わせを可能とさせる必要がある。

そのため、この目標達成を容易にするためにグラフベースのRDDs表現を提案している。
私たちはSparkを個々のスケジューラ毎に個別ロジックを追加することなく
幅広いtransformationsに対応させるためにグラフベースのRDDs表現を用いている。
こうすることでシステム設計が非常にシンプルになるためである。

一言でいえば、私たちは各RDDに対して5つの情報を保持する共通インタフェースを定義する。

  1. パーティションのセット:データセット中のアトミックなデータ群
  2. 依存性のセット:親RDDへの依存情報
  3. RDDから生成した際の関数
  4. パーティショニングを行った際のメタデータ
  5. データ配置のメタデータ

例えば、HDFS上のファイルを示すRDDは各ファイルブロックのパーティション情報と、各ブロックがどのマシン上に存在しているかの情報を保持している。
なお、このRDDに対するmap関数の結果は同一のパーティションを保持するが、それは親RDDに対してmapを実行するときに生成される。(?)
このRDDからの情報取得インタフェースについてサマリすると以下の表のようになる。

このインタフェースデザインの際の最も興味深い質問はどのようにRDDs間の依存を表現するか、である。
私たちはこの「依存」を以下の2つに分類することで有用、かつ十分に依存を表現することが可能であることを見出した。

  1. 「狭い」依存:親RDDパーティションが多くても1つの子RDDパーティションから依存されるケース
  2. 「広い」依存:親RDDパーティション複数の子RDDパーティションに依存されるケース

例えば、mapは「狭い」依存を示し、joinは「広い」依存を示す。(但し、親RDDパーティションがハッシュによるパーティショニングがされていない場合)

以下の図は他の例を示す。

この区別は2つの理由により有用。
1つ目の理由としては、「狭い」依存の場合、親パーティションクラスタ中の1ノードで全て計算可能となり、パイプラインとして実行可能となること。
例えば、図中の上の例(map,filter)の場合各要素に対してfilterに続けてmapを実行することができる。
対して、「広い」依存のはMapReduceでのshuffle等で全ノードの親RDDsが使用可能であることが必要となる。

2つ目の理由としては、ノード障害後のデータ復旧時、「狭い」依存の場合はロストしたパーティションのみを再計算すればよく、
かつ並列で実行可能なため効率的に復旧が可能ということ。
対して、「広い」依存を伴う系統グラフの場合、1ノードの障害によって対象RDDの子孫となるRDD
ノードをまたがって喪失してしまう危険があり、ノード間を含めた完全な再計算が求められるケースが出てくる。

RDDに対する共通インタフェースはSparkにおけるtransformationsの大部分を20ライン未満のコードで記述することを可能にしている。
加えて、Sparkユーザが新たな関数(例:新たなサンプリング/joinを行う関数)を定義する場合でも
スケジューラの詳細を知らなくても拡張可能となっている。
以後、いくつかのRDD関数の実装について記述する。

HDFS Files

サンプルにおいてはインプットしてHDFSを使用している。
これらのRDDsにおいて、partitions関数は各ブロックファイルに対する1つのパーティション情報を返す。
(各パーティションに対するブロックのオフセット情報も一緒に取得可能)
preferredLocations関数はブロックが実際に配置されているノードを返し、
iterator関数を用いることで実際にブロックを読み取ることが可能。

map

RDDのmap関数を実行した場合MappedRDDオブジェクトを返す。
MappedRDDは親RDDと同じパーティション情報と最適化された場所情報を保持するが、
map関数を通して親RDDに対して関数を適用した場合はそれらのiteratorメソッドから取得可能。
#訳が微妙です。but節でつないでいる割に否定表現に見えない。

sample

サンプリングメソッドRDDが親RDDsから値を抽出するために使用するランダム変数生成用のシード値を持つ他はmapと同じ。

join

2つのRDDsをjoinする場合に2つの「狭い」依存(両方とも同じパーティショナーを用いた場合)や2つの「広い」依存、
または「狭い」依存と「広い」依存の両方が1つずつ発生する。(片方だけパーティショナーを保持し、もう片方は保持しない場合)
いずれの場合でも出力されたRDDはパーティショナー情報を保持する。
(親RDDから引き継ぐ場合と、デフォルトのハッシュパーティショナーの場合がある)

=====
とりあえずどんな感じで表現され、使うかはわかりました。
ついに次回がスケジューラなども含めた実装の内容になり、ここまで読み切っておけばSparkの内部動作概要は大体分かる形になります。
その後は評価と結論付け・・・という形で新規の情報はそこまでありません。
そのため、とりあえず次回でこの論文についてはラストになりそうです。