読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Resilient Distributed Datasetsに関する論文を読んでみます(2章

こんにちは。

以下論文を読んでみようの続きで、今回は第2章です。

「Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing」
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

内容としては、「Resilient Distributed Datasets (RDDs)」ということで一番読みたい内容になっています。

2.1 RDD概要

RDDは「読み取り専用の、分割されたレコードコレクション」と言い表すことができる。
RDDはストレージ、または他のRDDに対して決定論的な操作を行うことで生成される。
#決定論的な操作・・・結果が確定した操作なんでしょうけど、いまいちどう記述すればいいか微妙ですね。

我々はこの「決定論操作」のことをRDDに対するその他の操作と区別するためにtransformationsと呼ぶ。
transformationsにはmap、filter、join等が含まれる。

RDDは常時具体化している必要はない。
その代わり、RDDはそのRDDが「どうやって生成されたか」について保持しており、実体データをその履歴を基に生成できる。
これは強力なプロパティで、本質的にはプログラムは障害発生後この手のデータセットを再構築しない限りRDDを参照することはできないためである。
#ここは訳が微妙です・・・ おそらく、データセットを実体化するというスタンスでは耐障害性を維持するのが難しいということなのだとは思いますが。

最終的に、ユーザはRDDsの異なる2つのファクター・・・永続性とパーティショニングをコントロール可能。
ユーザはRDDsを再利用するか、どこに保持するかを指定することができる。
また、キーベースでマシン間でどうパーティショニングするかを指定することができる。
これは配置最適化の際に有用で、例えば2つのRDDsをjoinするとき、同一のキーを保持するデータを同一ホストに配置して高速joinさせるなどの設定が可能。

2.2 Sparkプログラミングインタフェース

SparkにおいてはRDDsをDryadLINQやFlumeJavaに似た言語統合APIを提供する。
データセットはオブジェクトとして表現され、transformationsはそれらのオブジェクトに対するメソッドとして表現される。

プログラマはまずRDDsを静的ストレージに対するtransformations(map、filter等)を用いて生成する。
その後それらのRDDsに対して値を返すアクションやデータを出力するアクションを実行可能。

アクションの例としてcount(データセットの要素数を取得)、collect(要素そのものを取得)、save(データセットをストレージに保存)等がある。
DryadLINQのようにSparkはRDDsに対して「値を実際に使用するタイミングで評価する」という遅延評価を行う。
そのため、transformationsのパイプライン化を行うことも可能。

加えて、プログラマはRDDsに対してその後の処理で再利用したい場合はpersistメソッドを呼び出すことができる。
このメソッドを呼び出すことでSparkは対象のRDDsをメモリ上に保持(メモリ上に収まらない場合はディスクも併用)される。
persistメソッドを呼び出す際のフラグ設定によってRDDsをディスクのみに保存するか、またはマシン間レプリケーションを行うなどの配置選択も可能。
これらのAPIを備えることでユーザはRDDsに対する保存優先度(ディスクに保存するか?メモリ上で抑えるか)を定義可能。

2.2.1 実例:コンソールログマイニング

Webサービスのエラーを特定するためにHDFS上に保存されたテラバイト単位のログに対して検索を行いたい場合を想定する。
Sparkを用いることで、オペレータはエラーメッセージのみをメモリ上にロードし、ロードした結果に対してインタラクティブにクエリを実行することができる。
これらの下準備を以下のコードで可能。

lines  =  spark.textFile("hdfs://...") // HDFSから行単位で分割されたテキストをロード
errors  =  lines.filter(_.startsWith("ERROR")) // 行頭にERRORを含む行のみをフィルタリング
errors.persist() // フィルタリング結果をメモリ上に保持

ここでのポイントは、クラスタ上での処理はまだ行っていないということである。
だが、ユーザはRDDsに対するカウントなどのRDDsに対する操作を実行することは可能。
#局所的なオペレーションにとどまっているためクラスタ規模の処理にはまだ発展してないということなのでしょう。

errors.count()

ユーザはRDDに対して以下のようにさらにtransformationsを実行することが可能。

//  Count  errors  mentioning  MySQL:
errors.filter(_.contains("MySQL")).count()

//  Return  the  time  fields  of  errors  mentioning
//  HDFS  as  an  array  (assuming  time  is  field
//  number  3  in  a  tab-separated  format):
errors.filter(_.contains("HDFS"))
.map(_.split('\t')(3))
.collect()

errorを含む初めのアクションが実行された後、Sparkはパーティショニングされたerrorをメモリ上に保持し、以後の計算速度をスピードアップさせる。
ここで注目してほしいのは、遅延評価によってlinesは実際にはメモリにロードされていないことである。
#遅延評価される&persistメソッドを実行していないため、ロードされていないということですね。
エラーメッセージはデータのみのごく一部(メモリに収まるほど小さい)かもしれないため、この動作は非常に有用。

最後に、RDDsの耐障害性をどう実現するかを示す。
Sparkにおいては以下の図のような系統グラフをRDDsに対するクエリに対応して保持している。

上記のクエリにおいて、lineに対してフィルタをかけたerrorをインプットとして追加のfilter/map処理をcollectを行う前に実行している。
Sparkスケジューラはfilter/mapという2つのtransformationsをパイプライン化し、各ノードが保持しているerrorsの断片に対してタスクを送信&実行させる。

errorsの一部が障害でロストした場合にはSparkはロストした対象のパーティションに対してのみ
linesに対するフィルタリングを実行し、errorsを復元させる。

2.3 RDDモデルの優位性

RDDsの分散抽象化メモリとしての優位性を理解するためのの分散共有メモリ(DSM systems)との比較表を以下のように作成した。

DSM systemsにおいてはアプリケーションはグローバルアドレススペースを指定して任意の場所のオブジェクトを読込/書込可能。
尚、このDSM systemsの定義として、我々は昔からあるDSM systemsだけでなくアプリケーションから詳細な状態更新が
可能とするPiccoloのような共有ハッシュテーブル、および分散データベースも定義に含めている。
DSM systemsは非常に広範・汎用的な概念のため、一般的なクラスタシステム上で効果的な耐障害性を実装するのは非常に困難。

RDDsとDSM systemsの最も大きな違いはRDDが"明示的に記述された"transformationsからのみ生成されるのに対し、
DSMはどこからであっても読込/書込を認めているところにある。

結果、RDDにおいてはアプリケーションは一括書込みしかできなくなるが、より効果的な耐障害性が提供される。
特に、RDDは系統情報をたどって復旧が可能な関係上、チェックポイントを取ることによるオーバーヘッドが発生することはない。
更に、RDDは障害によってパーティションをロストした際に
プログラム全体をロールバックすることなく異なるノードで並列して計算して復旧させることが可能。

RDDsの第2の利点はこの変更不可能性がMapReduceのように遅いタスクのバックアップ·コピーを実行することにより、
低速なノード(落伍者)による影響を軽減できることとなる。
#訳微妙です。タスクのバックアップ・コピーを行うことにより、軽減できるとありましたが・・・

DSMにおいては2つのタスクのコピーが同じ領域にアクセスし、
各々の更新によって競合が発生する関係上「バックアップタスク」の実現は困難となる。

加えて、RDDsはDSMに比べて2つの利点を提供する。
1.RDDsは一括操作のみ実行可能であるためにデータの局所性に基づいてタスクをスケジュールし実行時のパフォーマンスを向上させることが可能
2.RDDsは十分なメモリがない場合はスキャンベースのオペレーションをメモリ上で優先的に実行し、
  それ以外をディスクに移行することで「メモリがある分だけ、有効な個所から」性能向上の恩恵を受けることが可能

2.4 RDDsに適さないアプリケーション

初めに述べたように、RDDsはデータ全体に対して一括した操作を実行する場合に適している。
このケースにおいてはRDDsは大容量のログ/データを保持することなく効果的に各transformationsの系統情報を保持し、データの復旧が可能。

だが、RDDsは以下のような粒度の小さい非同期更新を共有状態に対して実行する必要があるアプリケーションには適合しない。
−Webアプリケーション向けのストレージシステム/インクリメンタルなWebクローラー
これらのアプリケーションにおいては伝統的なアップデートログとチェックポイント確保で
状態を維持するデータベースだったり、RAMCloud、Percolator、Piccoloを用いる方が有効である。

我々のゴールはバッチ解析に対する効果的なプログラミングモデルを提供することであり、
これらのアプリケーションを実行するための基盤を特定アプリケーションに対して特化した状態を打破することにある。

======
タスクのバックアップ・コピーを行うことにより遅いノードの影響を軽減できる・・・という個所が
いまいち微妙ではありますが、それ以外は大体わかった感じです。

今まで共有状態というとKey単位で更新可能な分散KVSみたいなものしか考えていませんでしたが、
「系統情報」を保持することで局所的な障害を全体に影響与えずに復旧可能というRDDsのスタンスは面白いですね。

単にデータを処理するだけでなく、解析を分散並列で行うようなアプリケーションには非常に有用に見えます。

では、次は3章ですか。
プログラムばかりなので早く読み進められそうですが、とりあえず見てみます。