読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Resilient Distributed Datasetsに関する論文を読んでみます(3章

こんにちは。

以下論文を読んでみようの続きで、今回は第章です。

「Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing」
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

内容としては、「Spark Programming Interface」ということで、RDDsを扱うSpark自体に対する内容となっています。
では、入ります。


SparkはRDDを使うための基盤で、DryadLINQのような言語統合APIを静的型付け関数型プログラミング言語Scalaで実現している。
簡潔さ(対話型の使用のために便利)と効率(静的型付け)のため選択した。
但し、特に扱うにあたって関数型言語の素養が必要というわけではありません。

Sparkを使う際には開発者は以下の図のようなWorkersクラスタに接続するドライバプログラムを記述する。

ドライバは1個以上のRDDsを定義し、それらに対してアクションを実行する。
ドライバプログラム上で記述したプログラムは同様にRDDsの系統情報を記録する。
Workersは常駐プロセスでRDDパーティションを操作をまたいでメモリ上に保持する。

2.2.1のログマイニングアプリケーション例を基に説明すると、
ユーザはRDDsに対してmapオペレーションを関数型言語のようなリテラル群で記述している。
Scalaは各々のメソッド/引数部をJavaオブジェクトして表現し、
これらのオブジェクトはシリアライズされてクラスタ内の異なるworkersにネットワーク越しに配信され、別workersでロードされる。

Scalaはまた構文の中で記述された変数をJavaオブジェクトとして保持する。
例えば、以下の記述はrddの要素に対して各々5を加算するというオペレーションを示す。

var  x  =  5;
rdd.map(_  +  x)

3.1 SparkでのRDDに対する処理

以下の図はSparkにおいてRDDに対して可能となっている関数群となる。
これらの関数はプログラム中で実際に値として使用される時や出力時に評価を行う遅延評価で実現される。
いくつかの関数において注意が必要な点として、例えばjoinはkey-valueのペアにのみ実行可能といったものがある。

また、その他の関数名もScalaや他関数型言語で用いられるAPIに極力合わせた作りになっている。
例として、mapは1対1のマップとなっており、floatMapは1つの入力に対して1つまたは複数の出力を行うマッピングとなっている。
MapReduceのmapと同様の動作)

これらの関数に追加して、ユーザはRDDに対してpersist関数を呼び出すことができる。
加えて、RDDパーティション切り分け(Partitionarクラスで定義)やパーティションを示すデータセットを取得することができる。
groupByKey、reduceByKey、sortといった処理は自動的にハッシュ/レンジでパーティションが切り分けられる。

3.2 サンプルアプリケーション

2.2.1であげた例のうち、2つのインタラクティブなアプリケーション(論理回帰、ページランク)を用いて説明する。
ページランクの方はRDDのパーティショニングによって性能向上が可能であることも合わせて説明する。

3.2.1 論理回帰

多くの機械学習アルゴリズムは何度も最適化のための反復を実行し、勾配降下法を用いて効果を最大化している。
そのため、繰り返し読まれるデータをメモリ上に配置することによって性能を大きく向上させることが出来る。
例として、論理回帰によってデータ群から2値変数に対して回帰分析を行う例を示す。
(スパムか、そうでないか、の判定などに用いられる)

このアルゴリズムは勾配降下法を使用している。
wというランダム値から開始し、イテレーションを回す。
関数に対してwを適用した結果が最小となる値に収斂するまで実行する。

val  points  =  spark.textFile(...).map(parsePoint).persist()
var  w  =  // random initial vector
for  (i  <-  1  to  ITERATIONS)  {
  val  gradient  =  points.map{  p  =>
    p.x  *  (1/(1+exp(-p.y*(w  dot  p.x)))-1)*p.y
  }.reduce((a,b)  =>  a+b)
  w  -=  gradient
}

上記の処理はまずテキストファイルの各行をPointのオブジェクトに変換し、RDD化する処理を行っている。
その後RDDに対してmap/reduceを繰り返し実行し、勾配をwに対して加算する。(イテレーション回数実行する)
pointsをメモリ上に維持することでディスクに出力した場合と比較して20倍以上の高速化を行っている。

3.2.2 ページランク

より複雑なデータ共有が発生するパターンとして、ページランクを確認する。
このアルゴリズムはあるページが他のページからリンクされている数を基に「ランク」を算出するもの。
イテレーションにおいて、各ドキュメントは\frac r {n}ページランク値をリンク先のページに与える。
ここでの『r』はリンク元ページのランクで、『n』はリンク先のページ数となる。

その後ランクを\frac \alpha {N}+(1-\alpha)\Sigma c \small i の式を用いて更新する。
ここでの合計値は自分のページに対してリンクを行ったページの合計値で、Nは全ページ数となる。

Sparkでページランクを実現した場合は以下になる。

val  links  =  spark.textFile(...).map(...).persist()
var  ranks  =  //  RDD  of  (URL,  rank)  pairs
for  (i  <-  1  to  ITERATIONS)  {
  //  Build  an  RDD  of  (targetURL,  float)  pairs
  //  with  the  contributions  sent  by  each  page
  val  contribs  =  links.join(ranks).flatMap  {
    (url,  (links,  rank))  =>  links.map(dest  =>  (dest,  rank/links.size))
  }
  //  Sum  contributions  by  URL  and  get  new  ranks
  ranks  =  contribs.reduceByKey((x,y)  =>  x+y).mapValues(sum  =>  a/N  +  (1-a)*sum)
}

上記プログラムのRDD系列グラフは以下のようになる。

イテレーションにおいて、ranksデータセットを前回のイテレーション結果であるranksと静的情報であるlinksを用いて更新している。

このグラフで興味深いところは系統グラフがイテレーション数に応じて増大すること。
したがって、多くのイテレーションを実行するジョブにおいては、障害発生時の復旧時間を短縮するために
いくつかのバージョンのranksをレプリケーションする必要があるかもしれない。

このようなケースにおいて、プログラマはpersist関数をRELIABLEフラグを設定して呼ぶことができる。
但し、linksデータセットはファイルからのmap節から効率よく再生成されるためレプリケーションする必要はない。

linksデータセットは一般的にranksデータセットよりも大きくなる。
なぜなら、各ページは多数のリンク先を保持するが、ranksデータセットはページ数のサイズに収まるからである。
そのため、linksデータセットの復元に系列グラフを使用することで使用するメモリ量を抑えることができる。

また、RDDのパーティショニングを制御することでPageRankアルゴリズム実行における通信量を最適化することができる。
links RDDのパーティショニングを指定した場合(例:linklists をURL のハッシュでパーティショニング)、
ranks RDDも同じ方式でパーティショニングすることによってlinks とranksのjoin関数においてノード間の通信を行わずに実行できる。
(同じリンクに対するランクとlinklistは同じノード上に存在するため)

加えて、ページをグルーピングするカスタムパーティショナーを自作することもできる。
(例:ドメイン名でリンクを分割する場合)
これらの最適化は以下のようにpartitionBy関数を実行することで実施可能。

links  =  spark.textFile(...).map(...).partitionBy(myPartFunc).persist()

上記の呼び出しから処理を開始した場合、linksとranksのjoin関数から生成されるcontirbutionsは
自動的に同一マシン上に保持しているURL同士で行われ、新たなrankが生成される。
その際にrankにも同一のパーティショニングが適用される。

この種のイテレーション間の一貫性はPregelのようなフレームワーク上で行われていた最適化策のうちの一つ。
RDDsではユーザが指定することでこの最適化を達成できる。

====
とりあえずどんな感じで実装するかは大体わかった気はします。
これで概要はわかったとは思いますが、実装部等も興味があるのでとりあえず読み進めてみることにします。