読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Spark Streamingの論文を読んでみます(4章

こんにちは。

以下論文を読んでみようの続きで、今回は第4章です。

「Discretized Streams: A Fault-Tolerant Model for Scalable Stream Processing」
http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf

内容としては、「System Architecture」ということで、
実際のシステムの構造を解説した章となります。

4. システムアーキテクチャ

私たちはD-StreamsをSpark Streamingという名称のシステム上に構築している。
Spark StreamingはSpark処理エンジンを拡張する形で作成している。
Spark Streamingは下図の3つのコンポーネントから構成される。

  • MasterはD-Streamsの系統グラフの追跡と生成されたRDDへのタスクのスケジューリングを行う
  • Workerはデータを受信し、入出力のRDDパーティションを保持し、タスクを実行する。
  • Client Libralyはシステムに対してデータを送信する

Spark Streamingと伝統的なストリーム処理システムの間の大きな違いはSpark Streamingは
タスクを小さく分割し、ステートレスにし、クラスタ上のどのノードであっても実行可能にするということ。
明確なトポロジとしてタスクを定義する伝統的なメッセージ処理システムと異なり、
別のマシンに処理を一部移動するということが度々発生する。
このアプローチは障害が発生したり「遅いタスク」に投機的にリソースを配分するなどの対応に対し柔軟に動作する。

このアプローチ自体は本来MapReduceのようなバッチ処理に向いている措置だが、
Spark Streamingにおいてもtaskが細分化されて、RDDがメモリ上に存在いているため50〜200msの遅延で対応可能になっている。

更に、Spark Streamingの状態は伝統的なシステムの常駐オペレータではなく、耐障害性を持つRDDに保持している。
RDDパーティションは任意のノード上に存在することができ、かつ複数ノード上で並列して処理を実行することが可能。
Spark Streamingにおいては、データ局所性を極力活用できるよう配置を行っている。
この配置によって決定論的な推測とデータの並列復旧が可能となる。

これらの利点はSpark本体から引き継いでいる。
だが、私たちは現状更にレイテンシを小さくできないか検討を継続している。

4.1 アプリケーション実行

Spark Streamingのアプリケーションは1つかそれ以上のインプットストリームを基に開始する。
Spark Streamingはクライアントからレコードを受信したタイミングか、
HDFSのような外部ストレージに定期的にデータをロードするタイミングでStreamsを取得し、動作する。
尚、このデータ取得部はflumeのようなログ収集ツールでも構わない。

前者のケース(クライアントからレコードを受信するパターン)においては取得する新規データを
データ取得元に応答を返す前に2つのWorkerノードでレプリケーションしている。
なぜなら、D-Streamsはインプットデータを再計算を可能とするために保存しておく必要があるためである。
Workerの障害が発生した時、クライアントライブラリは応答が返ってきていないデータについて再送する必要がある。

全データは各Worker毎の「block store」によって管理される。
「block store」はマスターを介して他のノードの上にある「block storre」を検索可能となっている。
なぜなら、「入力値」とRDDはイミュータブルである関係上「block store」を検索することは容易だからである。
各「block store」はユニークなIDを割り振られ、各ノードは保持している各対象パーティションのユニークIDを提供している。
「block store」は基本的にメモリ上に配置するが、スペースが足りなくなった場合はLRUアルゴリズムでディスクへの退避を行っている。

いつ処理の新規インターバルを開始するかについてはノード同士がNTPによって時刻同期されていることを前提に、
各ノードがマスタに対してブロックリストを送信し、処理の終了時に受信している。
マスタはそれ以上の同期処理は行わず、各インターバルごとに出力RDDを作成するタスクを実行するよう起動している。
タスクが起動する際は他のバッチ処理スケジューラと同じように前のタスクが終了していた場合起動するようになっている。

Spark Streamingが各時間インターバルごとにタスクを実行するには元々存在していたSparkのスケジューラを用いている。
また、DryadLINQシステムのような最適化も行っている。

  • 各タスクがmapの次にmapが来るなどまとめられる構成の場合、パイプラインオペレータとしてまとめている。
  • データ局所性を基にタスクを配分している
  • RDDパーティションがネットワーク越しにshuffleを行うことを避けている。

ネットワーク越しのshuffle実行防止について、例えばreduceByKeyオペレータについて考える。
各インターバルごとに新たに追加された結果を"add"し、もっとも古いインターバルの結果を減算する必要がある。
その際、スケジューラは各インターバル間でRDDパーティションを同じ方式で分割を行い、共通のキーが使い続けられるようにする。
結果、加算の際にネットワーク越しの通信は発生しなくなる。(詳細はSparkのパーティション対応スケジューリングを参照)

現状の私たちのコードベースにおいて、Workerの障害は復旧可能だが、マスタは耐障害性を保持していない。
私たちはD-Streamsの系統グラフのバックアップを保持することで
マスターに障害が発生した場合でも処理を継続することが可能だと考えているが、
ストリーム処理の復旧を優先したためそこは後回しにしている。

4.2 ストリーム処理の最適化

私たちはSparkエンジンに対してStream処理に対応させるための最適化を行っている。

  • Block placement

ストリームデータの入力時、ランダムでレプリカを作成するとホットスポットが発生してしまうため、
現状のノードの負荷に応じてレプリカを作成するノードを選択するようにしている。

  • Network communication

私たちはSparkのデータ通信部を改造時、reduce等の一部のタスクにおいて非同期IOを使用するようにした。
その結果高速化している。

  • Timestep pipelining

現状、各インターバルごとの実行はクラスタに完全に適用していない。(例:インターバルの最後にタスクがまだ残留して走っている場合がある)
そのため、次インターバルのタスクを前インターバルのタスクが残っている場合でも実行可能としている。
上記のためにSparkスケジューラを改造し、現状実行されているタスクが生成しているRDDをインプットとしてタスクを開始できるようにしている。

インターバルごとにバッチ処理が実行される関係上、系統グラフが無限に伸びる形になってしまう。
そのため、一定時間以上経過した系統グラフはチェックポイント設定時に除去するようにした。
無限に系統グラフが肥大化することはなくなっている。
=====
完全にシームレス・・というか統合はできずに、一部個別対応になっている部分はありますが、
それでも同じ基盤の上で実行しているというのは中々凄いことだとは思います。

次回は障害復旧の話で、次回がこの論文読みのラストになります。