読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Resilient Distributed Datasetsに関する論文まとめ(1章〜5章

こんにちは。

Resilient Distributed Datasetsに関する論文
「Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing」
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
について、概念と動作概要が書かれた1章〜5章を読んだので、1投稿にまとめます。

基本的には以前見たスライドの詳細化版なんですが、一部スライド見ているだけだとわからない情報もあり、参考になりました。

Sparkが対象とするような「基本的には全データに対して同じ演算を行う」アプリケーションにおいては
読み取り専用というRDDの性質がマイナスにならないこと、読み取り専用であることで
一貫性考慮が簡略化されるなどの考えはこういうフレームワークを構築する上で大事な視点になりそうですね。

「全データに対して同じ演算を行う」ということから全体のデータの流れを示す系統グラフを作成するのも重要ですね。
Storm等でもTridentを使用した場合はDAGの計算モデル構築が行われるのですが、Sparkも同様に内部ではDAGを構築して確認を行っているとのこと。
このあたりの計算の性質を抽象化してモデルに落とすというのが分散システム構築における「肝」の一つになるので、
事例を読みためて実際に落とし込めるようにならなければなぁ、と思った次第です。

では、内容です。

Abstract

本論文において、プログラマが大規模クラスタ上でのメモリ上計算を耐障害性を確保した上で
実行するための抽象化された分散メモリである弾力性のある分散データセット(RDDs)を提示する。

RDDsは現状のクラスタ上のコンピューティングが以下の2つの非効率を抱えていることから開発された。

1.インタラクティブアルゴリズム機械学習、グラフ描画)
2.インタラクティブデータマイニング

上記の2ケースにおいて、データをメモリ上に確保したままで処理することで性能を劇的に向上させることができる。

その上で、効率的に耐障害性を確保するためにRDDsは詳細な更新が不可で『荒い変換のみ可能』な制限されたデータセットとして提供される。

しかしながら、『制限されたデータセット』であっても最近の反復計算ジョブ・・・
Google Pregel(グラフ計算)その他の計算モデルに対しては十分適用可能だと考える。

本論文ではRDDsを分散処理基盤であるSpark上で実現し、性能評価した結果を記述する。

Abstract

本論文において、プログラマが大規模クラスタ上でのメモリ上計算を耐障害性を確保した上で
実行するための抽象化された分散メモリである弾力性のある分散データセット(RDDs)を提示する。

RDDsは現状のクラスタ上のコンピューティングが以下の2つの非効率を抱えていることから開発された。

1.インタラクティブアルゴリズム機械学習、グラフ描画)
2.インタラクティブデータマイニング

上記の2ケースにおいて、データをメモリ上に確保したままで処理することで性能を劇的に向上させることができる。

その上で、効率的に耐障害性を確保するためにRDDsは詳細な更新が不可で『荒い変換のみ可能』な制限されたデータセットとして提供される。

しかしながら、『制限されたデータセット』であっても最近の反復計算ジョブ・・・
Google Pregel(グラフ計算)その他の計算モデルに対しては十分適用可能だと考える。

本論文ではRDDsを分散処理基盤であるSpark上で実現し、性能評価した結果を記述する。

1.イントロダクション

昨今、MapReduceやDryadといったクラスタコンピューティング基盤が大規模データ解析に使用されている。
これらの基盤はユーザが分散方法、耐障害性を気にすることなく高レベルの演算を記述するだけで分散並列処理を可能としている。

これらの基盤はクラスタ上のリソースに対するアクセスを抽象化して提供しますが、分散メモリを活用するための抽象化は行っていない。
そのため、複数の計算を越えて結果を再利用する場合かなりの非効率を生んでいる。

データの再利用による効率化はページランク算出、K-Meansクラスタリング、ロジスティック回帰等を含む機械学習/グラフ計算に対して共通的な性質となる。
別の有用なデータマイニングユースケースとして、あるデータセットに対するアドホックなクエリを発行して検索を行うものがある。

だが、残念ながら現状の多くのクラスタコンピューティング基盤においては
異なる計算(例:MapReduceのジョブ間)をまたがってデータを持ち越す場合、
外部ストレージ(例:分散ファイルシステムHDFS)に対してデータを保存し、それを用いて持ち越す手段しか提供されていない。

この外部ストレージに対する書込読込(データレプリケーション、ファイルIO、シリアライズも伴う)のオーバーヘッドは
分散並列処理にかかる時間の支配的な要素となってしまっている。

この問題は広く認識されており、研究者は以下のように特定用途に特化した基盤においてはメモリ上にデータを保持して計算を効率化することを行ってきた。
・Pregel:インタラクティブなグラフ描画のための基盤。内部のデータをメモリ上に保持することで計算を効率化している。
・HaLoop:反復MapReduceのインタフェースをサポートするメモリ上で演算するための基盤

だが、これらの基盤はHaLoopの反復MapReduceのように限られたコンピューティングパターンのみしかサポートしていない。
加えて、「データを共有する」ことが前提となっており、柔軟性に欠ける。
そのため、これらの基盤は汎用的なデータ再利用のための抽象化は提供せず、アドホックなクエリの繰り返し実行のようなユースケースにも対応しない。

本論文では幅広いユースケースに適用可能なデータ再利用抽象化基盤である弾力性のある分散データセット(RDDs)を提示する。
RDDsは耐障害性を保持し、並列利用可能で、ユーザが明示的に内部データとして保存させることができる。
データ配置は最適化され、豊富なオペレーションセットをサポートしている。

RDDsによるメインチャレンジとして、効率的な耐障害性を提供することがあげられる。

現状存在しているクラスタ上のメモリ保持基盤としては分散共有メモリ、KVS、データベース、Piccoloといったものがあげられる。
これらは特定のインタフェースに沿ったきめ細やかな更新(例:テーブル中のセルを更新)をサポートしている。
これらのインタフェースの基に耐障害性を提供する際に取る手段は「マシン間のレプリケーション」か「更新結果のログ記録」となる。

この2つのアプローチはデータ集約型のアプローチとなり、クラスタ内ネットワークを使用して大容量のデータをコピーする必要があるために負荷が大きくなる。
帯域幅がメモリ>>ネットワーク>ストレージという状況で行われるコピーのため、ストレージによってコピーの速度は大きく制限される。

これらのアプローチとは対照的に、RDDsは「荒い変換インタフェース(map、filter、join等)」のみしかサポートしない。
このインタフェース群を通してオペレーションを行うとRDDsに保持される大量データに対して全てインタフェースによる変換がおこなわれる。
RDDsはデータそのものを記録するのではなく、「RDDsを構築するためにデータに対して行った変換」を記録することで効率的な耐障害性を実現している。

もしRDD中の一部分がロストした場合、RDDはそのデータが他のRDDからどのように変換されて生じたデータかを保持しているため、
コストの大きなレプリケーションなしに迅速に再計算/復旧させることができる。

「荒い変換インタフェース」しか持たないRDDsは一見使えないように見えるかもしれないが、
並列アプリケーションにおいて行われる実際のオペレーションは全てのデータに同じ演算を並列実行することで実現されるため、実質的にうまく適応する。

実際に、RDDsを用いることで分散システム上で並列コンピューティングを行う際に適切に処理できることを示す。
適用可能な計算はこれまで提案されてきたMapRedude、DryadLINQ、SQL、Pregel、HaLoopといったものに加え、
これまでは存在しなかったインタラクティブデータマイニングといったアプリケーションにも及ぶ。

これまでは新しいフレームワークを導入することでしか解決しなかった新たなコンピューティングニーズに対して
RDDsを用いることで抽象化し、共通的に解決できることを示す。

我々はSparkというシステム上でRDDsを実現した。
Sparkはカリフォルニア大学バークレー校や他の複数の企業で研究/実アプリケーションとして使用されている。

SparkはScala上でDryadLINQのような使用しやすいプログラミングインタフェースを提供する。
加えて、Scalaインタプリタを用いてビッグデータに対するクエリをインタラクティブに実行可能。
我々はSparkがメモリ上のデータへのインタラクティブな汎用データマイニングを可能とする初のシステムだと考えている。

我々はRDDsとSparkをもちいてユーザアプリケーションの評価を行った。
結果、インタラクティブなアプリケーションにおいてSparkはHadoopの20倍高速に、
実世界のデータ解析を行うアプリケーションにおいて40倍高速な結果となった。
その際、1TBのデータを検索するのに必要なレイテンシが5〜7秒となっている。

RDDがより汎用的に使用できることを示すために我々はPregelとHaLoopのプログラミングモデルをSpark上で
実現するためのライブラリを作成した。(各々200行程)

2. Resilient Distributed Datasets (RDDs)

2.1 RDD概要

RDDは「読み取り専用の、分割されたレコードコレクション」と言い表すことができる。
RDDはストレージ、または他のRDDに対して決定論的な操作を行うことで生成される。
#決定論的な操作・・・結果が確定した操作なんでしょうけど、いまいちどう記述すればいいか微妙ですね。

我々はこの「決定論操作」のことをRDDに対するその他の操作と区別するためにtransformationsと呼ぶ。
transformationsにはmap、filter、join等が含まれる。

RDDは常時具体化している必要はない。
その代わり、RDDはそのRDDが「どうやって生成されたか」について保持しており、実体データをその履歴を基に生成できる。
これは強力なプロパティで、本質的にはプログラムは障害発生後この手のデータセットを再構築しない限りRDDを参照することはできないためである。
#ここは訳が微妙です・・・ おそらく、データセットを実体化するというスタンスでは耐障害性を維持するのが難しいということなのだとは思いますが。

最終的に、ユーザはRDDsの異なる2つのファクター・・・永続性とパーティショニングをコントロール可能。
ユーザはRDDsを再利用するか、どこに保持するかを指定することができる。
また、キーベースでマシン間でどうパーティショニングするかを指定することができる。
これは配置最適化の際に有用で、例えば2つのRDDsをjoinするとき、同一のキーを保持するデータを同一ホストに配置して高速joinさせるなどの設定が可能。

2.2 Sparkプログラミングインタフェース

SparkにおいてはRDDsをDryadLINQやFlumeJavaに似た言語統合APIを提供する。
データセットはオブジェクトとして表現され、transformationsはそれらのオブジェクトに対するメソッドとして表現される。

プログラマはまずRDDsを静的ストレージに対するtransformations(map、filter等)を用いて生成する。
その後それらのRDDsに対して値を返すアクションやデータを出力するアクションを実行可能。

アクションの例としてcount(データセットの要素数を取得)、collect(要素そのものを取得)、save(データセットをストレージに保存)等がある。
DryadLINQのようにSparkはRDDsに対して「値を実際に使用するタイミングで評価する」という遅延評価を行う。
そのため、transformationsのパイプライン化を行うことも可能。

加えて、プログラマはRDDsに対してその後の処理で再利用したい場合はpersistメソッドを呼び出すことができる。
このメソッドを呼び出すことでSparkは対象のRDDsをメモリ上に保持(メモリ上に収まらない場合はディスクも併用)される。
persistメソッドを呼び出す際のフラグ設定によってRDDsをディスクのみに保存するか、またはマシン間レプリケーションを行うなどの配置選択も可能。
これらのAPIを備えることでユーザはRDDsに対する保存優先度(ディスクに保存するか?メモリ上で抑えるか)を定義可能。

  • 2.2.1 実例:コンソールログマイニング

Webサービスのエラーを特定するためにHDFS上に保存されたテラバイト単位のログに対して検索を行いたい場合を想定する。
Sparkを用いることで、オペレータはエラーメッセージのみをメモリ上にロードし、ロードした結果に対してインタラクティブにクエリを実行することができる。
これらの下準備を以下のコードで可能。

lines  =  spark.textFile("hdfs://...") // HDFSから行単位で分割されたテキストをロード
errors  =  lines.filter(_.startsWith("ERROR")) // 行頭にERRORを含む行のみをフィルタリング
errors.persist() // フィルタリング結果をメモリ上に保持

ここでのポイントは、クラスタ上での処理はまだ行っていないということである。
だが、ユーザはRDDsに対するカウントなどのRDDsに対する操作を実行することは可能。
#局所的なオペレーションにとどまっているためクラスタ規模の処理にはまだ発展してないということなのでしょう。

errors.count()

ユーザはRDDに対して以下のようにさらにtransformationsを実行することが可能。

//  Count  errors  mentioning  MySQL:
errors.filter(_.contains("MySQL")).count()

//  Return  the  time  fields  of  errors  mentioning
//  HDFS  as  an  array  (assuming  time  is  field
//  number  3  in  a  tab-separated  format):
errors.filter(_.contains("HDFS"))
.map(_.split('\t')(3))
.collect()

errorを含む初めのアクションが実行された後、Sparkはパーティショニングされたerrorをメモリ上に保持し、以後の計算速度をスピードアップさせる。
ここで注目してほしいのは、遅延評価によってlinesは実際にはメモリにロードされていないことである。
#遅延評価される&persistメソッドを実行していないため、ロードされていないということですね。
エラーメッセージはデータのみのごく一部(メモリに収まるほど小さい)かもしれないため、この動作は非常に有用。

最後に、RDDsの耐障害性をどう実現するかを示す。
Sparkにおいては以下の図のような系統グラフをRDDsに対するクエリに対応して保持している。

上記のクエリにおいて、lineに対してフィルタをかけたerrorをインプットとして追加のfilter/map処理をcollectを行う前に実行している。
Sparkスケジューラはfilter/mapという2つのtransformationsをパイプライン化し、各ノードが保持しているerrorsの断片に対してタスクを送信&実行させる。

errorsの一部が障害でロストした場合にはSparkはロストした対象のパーティションに対してのみ
linesに対するフィルタリングを実行し、errorsを復元させる。

2.3 RDDモデルの優位性

RDDsの分散抽象化メモリとしての優位性を理解するためのの分散共有メモリ(DSM systems)との比較表を以下のように作成した。

DSM systemsにおいてはアプリケーションはグローバルアドレススペースを指定して任意の場所のオブジェクトを読込/書込可能。
尚、このDSM systemsの定義として、我々は昔からあるDSM systemsだけでなくアプリケーションから詳細な状態更新が
可能とするPiccoloのような共有ハッシュテーブル、および分散データベースも定義に含めている。
DSM systemsは非常に広範・汎用的な概念のため、一般的なクラスタシステム上で効果的な耐障害性を実装するのは非常に困難。

RDDsとDSM systemsの最も大きな違いはRDDが"明示的に記述された"transformationsからのみ生成されるのに対し、
DSMはどこからであっても読込/書込を認めているところにある。

結果、RDDにおいてはアプリケーションは一括書込みしかできなくなるが、より効果的な耐障害性が提供される。
特に、RDDは系統情報をたどって復旧が可能な関係上、チェックポイントを取ることによるオーバーヘッドが発生することはない。
更に、RDDは障害によってパーティションをロストした際に
プログラム全体をロールバックすることなく異なるノードで並列して計算して復旧させることが可能。

RDDsの第2の利点はこの変更不可能性がMapReduceのように遅いタスクのバックアップ·コピーを実行することにより、
低速なノード(落伍者)による影響を軽減できることとなる。
#訳微妙です。タスクのバックアップ・コピーを行うことにより、軽減できるとありましたが・・・

DSMにおいては2つのタスクのコピーが同じ領域にアクセスし、
各々の更新によって競合が発生する関係上「バックアップタスク」の実現は困難となる。

加えて、RDDsはDSMに比べて2つの利点を提供する。
1.RDDsは一括操作のみ実行可能であるためにデータの局所性に基づいてタスクをスケジュールし実行時のパフォーマンスを向上させることが可能
2.RDDsは十分なメモリがない場合はスキャンベースのオペレーションをメモリ上で優先的に実行し、
  それ以外をディスクに移行することで「メモリがある分だけ、有効な個所から」性能向上の恩恵を受けることが可能

2.4 RDDsに適さないアプリケーション

初めに述べたように、RDDsはデータ全体に対して一括した操作を実行する場合に適している。
このケースにおいてはRDDsは大容量のログ/データを保持することなく効果的に各transformationsの系統情報を保持し、データの復旧が可能。

だが、RDDsは以下のような粒度の小さい非同期更新を共有状態に対して実行する必要があるアプリケーションには適合しない。
−Webアプリケーション向けのストレージシステム/インクリメンタルなWebクローラー
これらのアプリケーションにおいては伝統的なアップデートログとチェックポイント確保で
状態を維持するデータベースだったり、RAMCloud、Percolator、Piccoloを用いる方が有効である。

我々のゴールはバッチ解析に対する効果的なプログラミングモデルを提供することであり、
これらのアプリケーションを実行するための基盤を特定アプリケーションに対して特化した状態を打破することにある。

3.Spark Programming Interface

SparkはRDDを使うための基盤で、DryadLINQのような言語統合APIを静的型付け関数型プログラミング言語Scalaで実現している。
簡潔さ(対話型の使用のために便利)と効率(静的型付け)のため選択した。
但し、特に扱うにあたって関数型言語の素養が必要というわけではありません。

Sparkを使う際には開発者は以下の図のようなWorkersクラスタに接続するドライバプログラムを記述する。

ドライバは1個以上のRDDsを定義し、それらに対してアクションを実行する。
ドライバプログラム上で記述したプログラムは同様にRDDsの系統情報を記録する。
Workersは常駐プロセスでRDDパーティションを操作をまたいでメモリ上に保持する。

2.2.1のログマイニングアプリケーション例を基に説明すると、
ユーザはRDDsに対してmapオペレーションを関数型言語のようなリテラル群で記述している。
Scalaは各々のメソッド/引数部をJavaオブジェクトして表現し、
これらのオブジェクトはシリアライズされてクラスタ内の異なるworkersにネットワーク越しに配信され、別workersでロードされる。

Scalaはまた構文の中で記述された変数をJavaオブジェクトとして保持する。
例えば、以下の記述はrddの要素に対して各々5を加算するというオペレーションを示す。

var  x  =  5;
rdd.map(_  +  x)
3.1 SparkでのRDDに対する処理

以下の図はSparkにおいてRDDに対して可能となっている関数群となる。
これらの関数はプログラム中で実際に値として使用される時や出力時に評価を行う遅延評価で実現される。
いくつかの関数において注意が必要な点として、例えばjoinはkey-valueのペアにのみ実行可能といったものがある。

また、その他の関数名もScalaや他関数型言語で用いられるAPIに極力合わせた作りになっている。
例として、mapは1対1のマップとなっており、floatMapは1つの入力に対して1つまたは複数の出力を行うマッピングとなっている。
MapReduceのmapと同様の動作)

これらの関数に追加して、ユーザはRDDに対してpersist関数を呼び出すことができる。
加えて、RDDパーティション切り分け(Partitionarクラスで定義)やパーティションを示すデータセットを取得することができる。
groupByKey、reduceByKey、sortといった処理は自動的にハッシュ/レンジでパーティションが切り分けられる。

3.2 サンプルアプリケーション

2.2.1であげた例のうち、2つのインタラクティブなアプリケーション(ロジスティック回帰、ページランク)を用いて説明する。
ページランクの方はRDDのパーティショニングによって性能向上が可能であることも合わせて説明する。

  • 3.2.1 ロジスティック回帰

多くの機械学習アルゴリズムは何度も最適化のための反復を実行し、勾配降下法を用いて効果を最大化している。
そのため、繰り返し読まれるデータをメモリ上に配置することによって性能を大きく向上させることが出来る。
例として、ロジスティック回帰によってデータ群から2値変数に対して回帰分析を行う例を示す。
(スパムか、そうでないか、の判定などに用いられる)

このアルゴリズムは勾配降下法を使用している。
wというランダム値から開始し、イテレーションを回す。
関数に対してwを適用した結果が最小となる値に収斂するまで実行する。

val  points  =  spark.textFile(...).map(parsePoint).persist()
var  w  =  // random initial vector
for  (i  <-  1  to  ITERATIONS)  {
  val  gradient  =  points.map{  p  =>
    p.x  *  (1/(1+exp(-p.y*(w  dot  p.x)))-1)*p.y
  }.reduce((a,b)  =>  a+b)
  w  -=  gradient
}

上記の処理はまずテキストファイルの各行をPointのオブジェクトに変換し、RDD化する処理を行っている。
その後RDDに対してmap/reduceを繰り返し実行し、勾配をwに対して加算する。(イテレーション回数実行する)
pointsをメモリ上に維持することでディスクに出力した場合と比較して20倍以上の高速化を行っている。

3.2.2 ページランク

より複雑なデータ共有が発生するパターンとして、ページランクを確認する。
このアルゴリズムはあるページが他のページからリンクされている数を基に「ランク」を算出するもの。
イテレーションにおいて、各ドキュメントは\frac r {n}ページランク値をリンク先のページに与える。
ここでの『r』はリンク元ページのランクで、『n』はリンク先のページ数となる。

その後ランクを\frac \alpha {N}+(1-\alpha)\Sigma c \small i の式を用いて更新する。
ここでの合計値は自分のページに対してリンクを行ったページの合計値で、Nは全ページ数となる。

Sparkでページランクを実現した場合は以下になる。

val  links  =  spark.textFile(...).map(...).persist()
var  ranks  =  //  RDD  of  (URL,  rank)  pairs
for  (i  <-  1  to  ITERATIONS)  {
  //  Build  an  RDD  of  (targetURL,  float)  pairs
  //  with  the  contributions  sent  by  each  page
  val  contribs  =  links.join(ranks).flatMap  {
    (url,  (links,  rank))  =>  links.map(dest  =>  (dest,  rank/links.size))
  }
  //  Sum  contributions  by  URL  and  get  new  ranks
  ranks  =  contribs.reduceByKey((x,y)  =>  x+y).mapValues(sum  =>  a/N  +  (1-a)*sum)
}

上記プログラムのRDD系列グラフは以下のようになる。

イテレーションにおいて、ranksデータセットを前回のイテレーション結果であるranksと静的情報であるlinksを用いて更新している。

このグラフで興味深いところは系統グラフがイテレーション数に応じて増大すること。
したがって、多くのイテレーションを実行するジョブにおいては、障害発生時の復旧時間を短縮するために
いくつかのバージョンのranksをレプリケーションする必要があるかもしれない。

このようなケースにおいて、プログラマはpersist関数をRELIABLEフラグを設定して呼ぶことができる。
但し、linksデータセットはファイルからのmap節から効率よく再生成されるためレプリケーションする必要はない。

linksデータセットは一般的にranksデータセットよりも大きくなる。
なぜなら、各ページは多数のリンク先を保持するが、ranksデータセットはページ数のサイズに収まるからである。
そのため、linksデータセットの復元に系列グラフを使用することで使用するメモリ量を抑えることができる。

また、RDDのパーティショニングを制御することでPageRankアルゴリズム実行における通信量を最適化することができる。
links RDDのパーティショニングを指定した場合(例:linklists をURL のハッシュでパーティショニング)、
ranks RDDも同じ方式でパーティショニングすることによってlinks とranksのjoin関数においてノード間の通信を行わずに実行できる。
(同じリンクに対するランクとlinklistは同じノード上に存在するため)

加えて、ページをグルーピングするカスタムパーティショナーを自作することもできる。
(例:ドメイン名でリンクを分割する場合)
これらの最適化は以下のようにpartitionBy関数を実行することで実施可能。

links  =  spark.textFile(...).map(...).partitionBy(myPartFunc).persist()

上記の呼び出しから処理を開始した場合、linksとranksのjoin関数から生成されるcontirbutionsは
自動的に同一マシン上に保持しているURL同士で行われ、新たなrankが生成される。
その際にrankにも同一のパーティショニングが適用される。

この種のイテレーション間の一貫性はPregelのようなフレームワーク上で行われていた最適化策のうちの一つ。
RDDsではユーザが指定することでこの最適化を達成できる。

4. Representing RDDs

抽象化してRDDsを提供する上での課題は様々なtransformationに対して追跡可能な系統グラフをどう表現するか、になる。
理想的にはRDDsを実現するシステムは出来る限り充実したtransformationsを提供し、ユーザにその任意の組み合わせを可能とさせる必要がある。

そのため、この目標達成を容易にするためにグラフベースのRDDs表現を提案している。
私たちはSparkを個々のスケジューラ毎に個別ロジックを追加することなく
幅広いtransformationsに対応させるためにグラフベースのRDDs表現を用いている。
こうすることでシステム設計が非常にシンプルになるためである。

一言でいえば、私たちは各RDDに対して5つの情報を保持する共通インタフェースを定義する。

  1. パーティションのセット:データセット中のアトミックなデータ群
  2. 依存性のセット:親RDDへの依存情報
  3. RDDから生成した際の関数
  4. パーティショニングを行った際のメタデータ
  5. データ配置のメタデータ

例えば、HDFS上のファイルを示すRDDは各ファイルブロックのパーティション情報と、各ブロックがどのマシン上に存在しているかの情報を保持している。
なお、このRDDに対するmap関数の結果は同一のパーティションを保持するが、それは親RDDに対してmapを実行するときに生成される。(?)
このRDDからの情報取得インタフェースについてサマリすると以下の表のようになる。

このインタフェースデザインの際の最も興味深い質問はどのようにRDDs間の依存を表現するか、である。
私たちはこの「依存」を以下の2つに分類することで有用、かつ十分に依存を表現することが可能であることを見出した。

  1. 「狭い」依存:親RDDパーティションが多くても1つの子RDDパーティションから依存されるケース
  2. 「広い」依存:親RDDパーティション複数の子RDDパーティションに依存されるケース

例えば、mapは「狭い」依存を示し、joinは「広い」依存を示す。(但し、親RDDパーティションがハッシュによるパーティショニングがされていない場合)

以下の図は他の例を示す。

この区別は2つの理由により有用。
1つ目の理由としては、「狭い」依存の場合、親パーティションクラスタ中の1ノードで全て計算可能となり、パイプラインとして実行可能となること。
例えば、図中の上の例(map,filter)の場合各要素に対してfilterに続けてmapを実行することができる。
対して、「広い」依存のはMapReduceでのshuffle等で全ノードの親RDDsが使用可能であることが必要となる。

2つ目の理由としては、ノード障害後のデータ復旧時、「狭い」依存の場合はロストしたパーティションのみを再計算すればよく、
かつ並列で実行可能なため効率的に復旧が可能ということ。
対して、「広い」依存を伴う系統グラフの場合、1ノードの障害によって対象RDDの子孫となるRDD
ノードをまたがって喪失してしまう危険があり、ノード間を含めた完全な再計算が求められるケースが出てくる。

RDDに対する共通インタフェースはSparkにおけるtransformationsの大部分を20ライン未満のコードで記述することを可能にしている。
加えて、Sparkユーザが新たな関数(例:新たなサンプリング/joinを行う関数)を定義する場合でも
スケジューラの詳細を知らなくても拡張可能となっている。
以後、いくつかのRDD関数の実装について記述する。

サンプルにおいてはインプットしてHDFSを使用している。
これらのRDDsにおいて、partitions関数は各ブロックファイルに対する1つのパーティション情報を返す。
(各パーティションに対するブロックのオフセット情報も一緒に取得可能)
preferredLocations関数はブロックが実際に配置されているノードを返し、
iterator関数を用いることで実際にブロックを読み取ることが可能。

map

RDDのmap関数を実行した場合MappedRDDオブジェクトを返す。
MappedRDDは親RDDと同じパーティション情報と最適化された場所情報を保持するが、
map関数を通して親RDDに対して関数を適用した場合はそれらのiteratorメソッドから取得可能。
#訳が微妙です。but節でつないでいる割に否定表現に見えない。

  • sample

サンプリングメソッドRDDが親RDDsから値を抽出するために使用するランダム変数生成用のシード値を持つ他はmapと同じ。

  • join

2つのRDDsをjoinする場合に2つの「狭い」依存(両方とも同じパーティショナーを用いた場合)や2つの「広い」依存、
または「狭い」依存と「広い」依存の両方が1つずつ発生する。(片方だけパーティショナーを保持し、もう片方は保持しない場合)
いずれの場合でも出力されたRDDはパーティショナー情報を保持する。
(親RDDから引き継ぐ場合と、デフォルトのハッシュパーティショナーの場合がある)

5. Implementation

私たちはSparkを14000ステップ程のScalaで記述している。
SparkはApache Mesosというクラスタマネージャの上で動作し、HadoopやMPI等の他のアプリケーションと共存することができる。
各々のSparkプログラムは分離されたMesosアプリケーションとして動作(DriverやWorkerも)し、
アプリケーション間のリソースについてもMesosの方でハンドリングされる。

Sparkは一通りのHadoop関連のデータソースからHadoopの既存のプラグインAPIを使いデータを取得することができる。
特に変更を加えていないScalaの上で動作する。

これからこれらのシステムにおいて以下のように興味深いポイントについて記述する。
5-1. ジョブスケジューラ
5-2. Sparkインタプリタによるインタラクティブな使用方法
5-3. メモリ管理
5.4. チェックポイント管理

5.1 ジョブスケジューラ

Sparkのスケジューラは4章で示したようにRDDの表現方法を使用している。
全体としては、SparkのスケジューラはDryadのものと近いが、
追加でRDDsパーティションをメモリ上に保持するための切り替え機能を有している。

RDDに対してアクション(例:countやsave)を実行した場合、
スケジューラは以下の図のように無閉路有向グラフを計算モデルとして構築するため、系統グラフを確認する。

上記の図のGに対するアクションを実行するタイミングにおいて「広い」依存が発生するため
各段階の「狭い」依存に対するtransformationsを実行し、各ステージのデータを生成する。
今回の場合、ステージ1は既にメモリ上に存在するため、ステージ3を実行するタイミングにおいては
ステージ2とステージ3の計算を行う形になる。

各ステージは保持できるだけの「狭い」依存に対する複数パイプラインtransformationsを保持する。
#要は、「広い」依存が発生しない「狭い」依存のパイプラインは1つのステージにまとめられるということ。
ステージの境界は「広い」依存が発生するshuffle系のtransformationsや既に計算が完了しており、
RDDから短縮された経路で算出することが出来るRDDが発生したケースにおいて生じる。

スケジューラは各ステージでロストしたパーティションについては実際に使用されるタイミングまでに再計算を行うタスクを設定する。
スケジューラはデータ局所性を利用した遅延評価を行うことを前提にタスクを配分する。

もし既に同様のデータをメモリ上に保持しているノードが存在した場合、そのノードに対して配分する。
それ以外のケースにおいて、データが特定の場所に保存されている場合(例:HDFS)はデータが保存されているノード上で極力実施するよう配分する。

shuffleをはじめとした「広い」依存に対しては、
その段階でのノード上に保持する内部レコードを親RDDを基に評価し、MapReduceのmap出力と同じように出力する。

もしタスクが失敗した場合、親RDDが生存している限りは別ノードで再度タスクを実行させる。
もし複数のステージが使用不能(例:Map時のshuffleの結果がロストした等)となった場合、
ロストしたパーティションを並列で再計算するタスクを再実行させる。

尚、現状スケジューラの障害についてはRDDを複製することに比べて系統グラフを複製することの方が簡単である関係上、
まだ耐障害性を持たせていない。

Spark上のすべての計算はドライバプログラムの呼び出しに応じて実行される。
だが、現状map等のタスクにおいてはlookupオペレーション(ハッシュパーティションされたRDDに対するランダムアクセスを提供)
に応じて呼び出される機構についても検証中である。
このケースにおいてはタスク側からスケジューラに対して必要なパーティションがロストしていた場合再計算を要求する必要が出てくる。

5.2 Sparkインタプリタによるインタラクティブな使用方法

ScalaRubyPythonのようなインタラクティブシェルを提供している。
私たちはSparkユーザに対してメモリ上にデータを保持することによる高速なレスポンスを
巨大なデータセットに対するクエリをインタラクティブに実行することで提供したいと考えた。

Scalaインタプリタは基本的には1行ごとに入力されたコードをコンパイルし、結果をJVMにロードして関数を実行することで動作している。
このロードするクラスの中にはその1行で初期化された変数や関数を保持するシングルトンオブジェクトも含まれる。
例えば、以下のコードを入力した場合を考える。

var x = 5
println(x)

インタプリタはxを含むLine1というクラスを定義し、2行目のコードを以下のようにコンパイルする。

println(Line1.getInstance().x)

私たちはインタプリタをSparkに適用させるにあたり、以下2点の変更を行なった。

1. クラス運搬
Workerノードが各行ごとに生成されたバイトコードを取得できるようにした。
インタプリタは生成されたバイトコードをHTTPで送信している。

2. コード生成ロジックの修正
基本的には各行ごとに生成されたシングルトンオブジェクトは含まれるクラスとともにstaticアクセスされる。
これはそれまでの行で生成された変数に対してアクセスするクロージャ(例:Line1.x)も合わせてシリアライズされることを示す。
だが、JavaはxをオブジェクトLine1を通してしかアクセスしないため、Line1のように実際に使われるまでは送信されない。
そのため、Workerノードはxを受信することは無くなってしまう。
私たちはこれを防止するためコード生成ロジックを修正し、各行のオブジェクトを直接参照するようにした。

以下の図がユーザがコードを入力した際にインタプリタがどうJavaObjectに変換するかを示した図である。

SparkインタプリタHDFSに保存された巨大ファイルを検索する研究の一環で巨大ログを追跡するのに有効であることがわかった。
現状、SQLのようなより高次のクエリ言語で実現すべく計画中である。

5.3 メモリ管理

Sparkは3種類のRDDs格納時の永続化オプション
(デシリアライズされたJavaObject、メモリ上のシリアライズされたデータ、ディスク上データ)を提供している。

シリアライズされたJavaObjectは最も性能的には優れている。
メモリ上のシリアライズされたデータはメモリ領域が限られている場合に
JavaObjectよりもコストが低くすぐれた性能を出すことができる。
ディスク上データとして保存する場合、RDDのデータは巨大だが、再計算に時間がかかる場合に適している。

限られたメモリを管理するためにRDDsレベルでLRU退避ポリシーを使用している。
新しいRDDパーティションが算出された際に十分なメモリスペースが存在しなかった場合、最後にアクセスされたタイミングが最も早いRDDを退避する。
但し、新しく生成されたRDDと同様のパーティションだった場合は退避を行わない。
こうすることによって、古いパーティションを退避する際に同じパーティションがメモリに読みだす→退避が繰り返されることを防止している。

これはほとんどのtransformationsがRDD全体にアクセスする関係上
現状メモリ上に存在するRDDが将来的に使用される事が想定されるアプリケーションにおいては重要な要素となる。
現状このデフォルトポリシーで作成した全てのアプリケーションは効率よく動作するが、このポリシーについては差し替え可能となっている。

尚、現状クラスタ上のSparkインスタンスは各々分離されたメモリ領域を保持している。
だが、将来的には全インスタンス上のRDDを共有し、共通的な手段にアクセスする手段を調査中である。

5.4 チェックポイント管理

RDDは系統グラフを用いて復元することが可能だが、系統が長くなった場合は復元に時間がかかる場合がある。
そのため、RDDsを静的ストレージにチェックポイントとして保存することは有効。

一般的に、チェックポイントは長い系統グラフを保持し、「広い」依存のRDDsに対して有効となる。
ページランク算出のアルゴリズムのような)
これらのケースにおいて、
クラスタ上のノード障害は親RDD、および親RDDから取得したデータ断片のロストにつながり全体の再計算が必要になる場合がある。

対して、「狭い」依存で構成されたRDD、ロジスティック回帰やページランクのリストのようなRDDに対しては
RDDは別ノードで並列で算出できるためRDD自体のレプリケーションを取るのはオーバーコストとなりチェックポイントは有効ではない。

Sparkは現状チェックポイント生成のためのAPI(persist関数のフラグ)を提供しているが、利用するかどうかはユーザが判断している。
しかしながら現状チェックポイント生成を自動的に行うことができないか調査中である。
なぜなら、スケジューラはデータセットのサイズやどれほどの時間を初回の演算に使用したかを保持しており、
障害の際の復旧時間を最適化する形でのチェックポイント生成が可能だからである。

最後に読み取り専用RDDsという性質が一般的な共有メモリに比べてチェックポイントを作成するのを容易にしていることを付記する。
RDDsは読み取り専用であるため途中の一貫性を気にする必要がない。
そのため、システム上の処理を停止してスナップショットを取るなどの対処は不要になっている。