読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Resilient Distributed Datasetsに関する論文を読んでみます(5章

以下論文を読んでみようの続きで、今回は第5章です。

「Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing」
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

内容としては、「Implementation」ということで、実際の実装方式や動作についての章になります。
この章で動きや性質は大体わかるのと、これ以降の章は検証結果と考察という形で新規情報や機構・・
という形ではなくなりますので、この論文としては今回が最後になります。

では、実際に入っていきます。

私たちはSparkを14000ステップ程のScalaで記述している。
SparkはApache Mesosというクラスタマネージャの上で動作し、HadoopやMPI等の他のアプリケーションと共存することができる。
各々のSparkプログラムは分離されたMesosアプリケーションとして動作(DriverやWorkerも)し、
アプリケーション間のリソースについてもMesosの方でハンドリングされる。

Sparkは一通りのHadoop関連のデータソースからHadoopの既存のプラグインAPIを使いデータを取得することができる。
特に変更を加えていないScalaの上で動作する。

これからこれらのシステムにおいて以下のように興味深いポイントについて記述する。
5-1. ジョブスケジューラ
5-2. Sparkインタプリタによるインタラクティブな使用方法
5-3. メモリ管理
5.4. チェックポイント管理

5.1 ジョブスケジューラ

Sparkのスケジューラは4章で示したようにRDDの表現方法を使用している。
全体としては、SparkのスケジューラはDryadのものと近いが、
追加でRDDsパーティションをメモリ上に保持するための切り替え機能を有している。

RDDに対してアクション(例:countやsave)を実行した場合、
スケジューラは以下の図のように無閉路有向グラフを計算モデルとして構築するため、系統グラフを確認する。

上記の図のGに対するアクションを実行するタイミングにおいて「広い」依存が発生するため
各段階の「狭い」依存に対するtransformationsを実行し、各ステージのデータを生成する。
今回の場合、ステージ1は既にメモリ上に存在するため、ステージ3を実行するタイミングにおいては
ステージ2とステージ3の計算を行う形になる。

各ステージは保持できるだけの「狭い」依存に対する複数パイプラインtransformationsを保持する。
#要は、「広い」依存が発生しない「狭い」依存のパイプラインは1つのステージにまとめられるということ。
ステージの境界は「広い」依存が発生するshuffle系のtransformationsや既に計算が完了しており、
RDDから短縮された経路で算出することが出来るRDDが発生したケースにおいて生じる。

スケジューラは各ステージでロストしたパーティションについては実際に使用されるタイミングまでに再計算を行うタスクを設定する。
スケジューラはデータ局所性を利用した遅延評価を行うことを前提にタスクを配分する。

もし既に同様のデータをメモリ上に保持しているノードが存在した場合、そのノードに対して配分する。
それ以外のケースにおいて、データが特定の場所に保存されている場合(例:HDFS)はデータが保存されているノード上で極力実施するよう配分する。

shuffleをはじめとした「広い」依存に対しては、
その段階でのノード上に保持する内部レコードを親RDDを基に評価し、MapReduceのmap出力と同じように出力する。

もしタスクが失敗した場合、親RDDが生存している限りは別ノードで再度タスクを実行させる。
もし複数のステージが使用不能(例:Map時のshuffleの結果がロストした等)となった場合、
ロストしたパーティションを並列で再計算するタスクを再実行させる。

尚、現状スケジューラの障害についてはRDDを複製することに比べて系統グラフを複製することの方が簡単である関係上、
まだ耐障害性を持たせていない。

Spark上のすべての計算はドライバプログラムの呼び出しに応じて実行される。
だが、現状map等のタスクにおいてはlookupオペレーション(ハッシュパーティションされたRDDに対するランダムアクセスを提供)
に応じて呼び出される機構についても検証中である。
このケースにおいてはタスク側からスケジューラに対して必要なパーティションがロストしていた場合再計算を要求する必要が出てくる。

5.2 Sparkインタプリタによるインタラクティブな使用方法

ScalaRubyPythonのようなインタラクティブシェルを提供している。
私たちはSparkユーザに対してメモリ上にデータを保持することによる高速なレスポンスを
巨大なデータセットに対するクエリをインタラクティブに実行することで提供したいと考えた。

Scalaインタプリタは基本的には1行ごとに入力されたコードをコンパイルし、結果をJVMにロードして関数を実行することで動作している。
このロードするクラスの中にはその1行で初期化された変数や関数を保持するシングルトンオブジェクトも含まれる。
例えば、以下のコードを入力した場合を考える。

var x = 5
println(x)

インタプリタはxを含むLine1というクラスを定義し、2行目のコードを以下のようにコンパイルする。

println(Line1.getInstance().x)

私たちはインタプリタをSparkに適用させるにあたり、以下2点の変更を行なった。

1. クラス運搬
Workerノードが各行ごとに生成されたバイトコードを取得できるようにした。
インタプリタは生成されたバイトコードをHTTPで送信している。

2. コード生成ロジックの修正
基本的には各行ごとに生成されたシングルトンオブジェクトは含まれるクラスとともにstaticアクセスされる。
これはそれまでの行で生成された変数に対してアクセスするクロージャ(例:Line1.x)も合わせてシリアライズされることを示す。
だが、JavaはxをオブジェクトLine1を通してしかアクセスしないため、Line1のように実際に使われるまでは送信されない。
そのため、Workerノードはxを受信することは無くなってしまう。
私たちはこれを防止するためコード生成ロジックを修正し、各行のオブジェクトを直接参照するようにした。

以下の図がユーザがコードを入力した際にインタプリタがどうJavaObjectに変換するかを示した図である。

SparkインタプリタHDFSに保存された巨大ファイルを検索する研究の一環で巨大ログを追跡するのに有効であることがわかった。
現状、SQLのようなより高次のクエリ言語で実現すべく計画中である。

5.3 メモリ管理

Sparkは3種類のRDDs格納時の永続化オプション
(デシリアライズされたJavaObject、メモリ上のシリアライズされたデータ、ディスク上データ)を提供している。

シリアライズされたJavaObjectは最も性能的には優れている。
メモリ上のシリアライズされたデータはメモリ領域が限られている場合に
JavaObjectよりもコストが低くすぐれた性能を出すことができる。
ディスク上データとして保存する場合、RDDのデータは巨大だが、再計算に時間がかかる場合に適している。

限られたメモリを管理するためにRDDsレベルでLRU退避ポリシーを使用している。
新しいRDDパーティションが算出された際に十分なメモリスペースが存在しなかった場合、最後にアクセスされたタイミングが最も早いRDDを退避する。
但し、新しく生成されたRDDと同様のパーティションだった場合は退避を行わない。
こうすることによって、古いパーティションを退避する際に同じパーティションがメモリに読みだす→退避が繰り返されることを防止している。

これはほとんどのtransformationsがRDD全体にアクセスする関係上
現状メモリ上に存在するRDDが将来的に使用される事が想定されるアプリケーションにおいては重要な要素となる。
現状このデフォルトポリシーで作成した全てのアプリケーションは効率よく動作するが、このポリシーについては差し替え可能となっている。

尚、現状クラスタ上のSparkインスタンスは各々分離されたメモリ領域を保持している。
だが、将来的には全インスタンス上のRDDを共有し、共通的な手段にアクセスする手段を調査中である。

5.4 チェックポイント管理

RDDは系統グラフを用いて復元することが可能だが、系統が長くなった場合は復元に時間がかかる場合がある。
そのため、RDDsを静的ストレージにチェックポイントとして保存することは有効。

一般的に、チェックポイントは長い系統グラフを保持し、「広い」依存のRDDsに対して有効となる。
ページランク算出のアルゴリズムのような)
これらのケースにおいて、
クラスタ上のノード障害は親RDD、および親RDDから取得したデータ断片のロストにつながり全体の再計算が必要になる場合がある。

対して、「狭い」依存で構成されたRDD、ロジスティック回帰やページランクのリストのようなRDDに対しては
RDDは別ノードで並列で算出できるためRDD自体のレプリケーションを取るのはオーバーコストとなりチェックポイントは有効ではない。

Sparkは現状チェックポイント生成のためのAPI(persist関数のフラグ)を提供しているが、利用するかどうかはユーザが判断している。
しかしながら現状チェックポイント生成を自動的に行うことができないか調査中である。
なぜなら、スケジューラはデータセットのサイズやどれほどの時間を初回の演算に使用したかを保持しており、
障害の際の復旧時間を最適化する形でのチェックポイント生成が可能だからである。

最後に読み取り専用RDDsという性質が一般的な共有メモリに比べてチェックポイントを作成するのを容易にしていることを付記する。
RDDsは読み取り専用であるため途中の一貫性を気にする必要がない。
そのため、システム上の処理を停止してスナップショットを取るなどの対処は不要になっている。

=====
これで実装の章も終りです。
きちんと性質を確認してみると「チェックポイントの作成が容易」等のなるほどという要素もありますね。
とりあえずこれで動作的にはわかってきたので、次回投稿で1〜5章を1投稿にまとめて用語統一をして、それでこの論文についてはしめようと思います。