夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Spark Streamingの論文まとめ(1章〜5章

こんにちは。

Apache Spark Streamingに関する論文
「Discretized Streams: A Fault-Tolerant Model for Scalable Stream Processing」
http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf
について、概念と動作概要が書かれた1章〜5章を読んだので、1投稿にまとめます。

こちらも以前見たスライドの詳細化版なんですが、一部スライド見ているだけだとわからない情報もあり、参考になりました。


ストリーム処理を1秒の間に受信したイベント群に対するバッチ処理の連鎖として
バッチ処理の性質を保ったまま実行してしまうというのがSpark Streamingの肝でした。

こうすることで遅延は0.5〜2.0秒程発生してしまうため純粋なストリーム処理(大抵数百ミリ秒程の遅延)よりは遅くなるものの、
スループットはこれまでのストリーム処理基盤に比べて向上し、バッチ処理の障害復旧/遅延対応をそのまま使いまわせるという利点があります。

計算の性質としては非常にStorm-Tridentに近いのですが、
Storm-Tridentがストリーム処理プロダクトをベースにしている関係上
レスポンスタイムは高速ではあるもののスループットや処理効率では劣る形になり、
それを補うために他のプロダクトと組み合わせる必要が出てきます。

対して、Spark Streamingはバッチ処理プロダクトをベースにしているため、
レスポンスタイムでは劣るものの、スループット、障害復旧と優れた性質を誇ります。
ただし、リアルタイムトレーディングのような複合イベント処理はターゲットとしていないため、
複合イベント処理には向きませんし、組み合わせることもできません。

その場合は素直にストリーム処理プロダクトを組み合わせてね、ということなのだと思います。
そのあたりの割り切りは面白かったです。

ただ、3章の一貫性の所は内容がよくわからない状態のため、実際に動かしてみた段階で判断する必要がありそうです。


では、内容です。

概要

多数のビッグデータアプリケーションがデータを受信したタイミングでリアルタイムに処理することを求め始めている。
しかしながら、現状の分散ストリーム処理のプログラミングモデルは比較的低レベルであり、
状態の一貫性やシステム障害への対応をユーザが個別に気にしなければならない状況が発生している。
更に、障害回復を行う際には動作中の随時レプリケーションや長時間の復旧時間などコストがかかる状況になっている。

そのため、私たちは離散ストリーム(D-Streams)という新しいプログラミングモデルを提案する。
これは高レベルの機能API、強一貫性、効果的な障害復旧を提供するものである。
D-Streamsは新たな復旧メカニズムをサポートする。
これは伝統的なレプリケーションやストリームデータベースにおける上流バックアップ手法をより効率的にしたもので、
”ロストした状態の並行復旧”が可能。
加えて、以前のシステムとは異なり、システムの中の「ボトルネック(落後者?)」の影響も軽減可能。

私たちはD-StreamsをSparkというクラスタコンピューティング基盤を拡張して実現し、
ストリーム処理とバッチ処理インタラクティブなクエリ実行をシームレスに実行可能にした。
このシステムにおいて、100ノードのクラスタでは6000万レコード/秒のペースで数秒単位の遅延での処理を可能とする。

1. イントロダクション

ビッグデータはリアルタイムに到達しつつあり、受信したタイミングで処理し続けることの価値が高まっている。
例えば、SNSで話されたはやりのトピックを素早く検知する、ニュースサイトに訪れたユーザの動線を把握する、
プログラムログから数秒の間にエラーを検知する・・など。

これだけの大量データを扱うアプリケーションを処理するためにはクラスタ上で並列処理をする必要がある。
しかしながら、これまで開発されてきたクラスタ上で並列処理をするためのプロダクト・・・
MapReduceやDryadは分散することや耐障害性についてはうまくハンドリングできるが、ストリーム処理としては適さない。
これまで開発されてきたストリーム処理のプロダクト・・・Twitter Storm、Yahoo S4やストリームデータベースは
「その場で処理する」モデルを確立しており、状態を保持したオペレータが流れてくるイベントを処理し続け、出力する。

このモデルより、大規模クラスタ上でのいくつかの技術的挑戦が提示される。

  • 障害と遅いノードへの対応

その場で処理するシステムは耐障害性をレプリケーションで実現する。
その場合実際に処理するオペレータのコピーを保持し、あるノードで障害が発生した場合、
オペレータのコピーに対してメッセージを再送する形になる。

だが、このアプローチは大規模クラスタでは有効ではない。
レプリケーションを使うことによって常時2倍近いマシンを消費する形になってしまうし、
マスタとレプリケーションで両方障害が発生した場合は上流から処理をやり直す必要が出てきて、
全体のシステムが障害から復旧するまでの時間停止してしまう。

また、私たちは処理が遅いノードをどう扱うかについても議論中である。

  • 一貫性

いくつかの「その場で処理する」システムにおいては異なるノードが
非同期にデータを処理し続けているため、「グローバル状態」を確保するのが困難である。
例えば、ページビュー数を男性ユーザの値をあるノードが、女性ユーザの値を違うノードがカウントし続けていたとする。
その際、処理中のあるタイミングで男性ユーザと女性ユーザのアクセス割合を算出した場合、誤っている可能性が出てくる。

ストリーム処理を行うシステムは常時動作&イベント駆動となる関係上、
ユーザのオペレーションで開始するバッチ処理システムとはモデルが異なってくる。
そのため、同じ解析をする際にユーザはストリーム処理用、バッチ用と2つの解析タスクを作成する必要が出てくる。

加えて、ストリームデータと過去データの結合は困難となる。
例えば、ストリームイベント処理と過去データのjoinを考えた場合、
どうハンドリングすればいいかは困難であることがわかるはず。

これらの挑戦が重要であるからこそ、
既に一部の要素を解決しているクラスタコンピューティングシステムの確認を行った。

バッチ処理システム:
MapReduceやDryad。これらのシステムは各アプリケーションをグラフ中の細かい、決定論的なタスクに分解している。
こういう形で細かく分割することによって、部分的な再実行や遅いノードに対する投機的実行といった
効果的な障害復旧メカニズムを実現している。

一貫性の観点においては、これらのシステムは「少なくとも1回実行する」というセマンティクスの上で動作する。
そのため、障害が発生しても発生しなくても同一の出力が得られるようになっている。

結果、これらのシステムは高いスケーラビリティを確保し、
常時何千ものノードで処理を実行することが可能となっている。

この確認結果に基づいて、私たちはストリーミング処理システムが
より効果的にスケールする重要かつ決定的な設計ポイントを提案する。

それは、「ストリーミング処理を決定論的なバッチ処理を短い間隔で実行することの連続として扱う」ことである。
尚、私たちはその「短い間隔」を0.5秒未満、その上でエンドツーエンドの処理時間を1秒未満に抑えることを目標としている。

この方式は実世界の「ビッグデータ」アプリケーションの多くに適合し、追跡するイベントの時間精度も上がると考えている。
私たちはこのモデルを「離散ストリーム」、またはD-Streamsと呼ぶ。

残念ながら、現状のバッチ処理基盤で秒単位のレイテンシを達成することは難しい。
小規模なジョブを実行する場合でも、大抵分単位のレイテンシが発生してしまう。
こうなってしまう理由はジョブは結果をレプリケーションされたディスクストレージシステムに書き込むためで、
ディスクIOとネットワーク上のレプリケーションによって多くの時間が使用されている。

今回秒単位のレイテンシで処理を可能としたキーポイントはRDDsを活用することである。
RDDsは以前提案したメモリ上の共有状態のことで、ディスクに出力することなく耐障害性を実現している。
RDDはディスクにデータそのものを出力する代わりにオペレーションの系統グラフを保持することにより、
ロストした個所を再演算可能とすることで耐障害性を確保している。

RDDはD-Streamsに理想的に適合し、各コンピューティング処理を50〜200msのオーダーで実行可能となっている。
私たちはRDDsを用いて典型的なストリーム処理をどう実現するかを示す。
その中には状態をもった処理やインクリメンタルなスライディングウィンドウを含むものもあり、
それが数秒単位のレイテンシで実行し続けることができている。

尚、D-StreamsはRDDという更新が制限された状態を介するため、効果的な障害復旧も可能となっている。
既存のこれらのシステムにおいてはレプリケーション
上流データのバックアップによって障害復旧を行っていた。

だが、バッチ処理モデルにおけるD-Streamsは障害発生時は
障害が発ししたノードの保持していた情報のみを並列に再計算することで復旧が可能となっている。
これはレプリケーションやバックアップと比して非常にコストが少ない。

この並列な状態復旧は「レコードをその場で処理する」システムにおいては通常
複雑な状態を保持していたり、レプリケーションを行っているストリーム処理においては困難。
だが、バッチジョブの状態を復旧することはそう難しいことではない。
そのため、D-Streamはシンプルな復旧が可能となる。

また、「遅いノード」についても同様の方式でシンプルな投機的実行が可能。
「遅いノード」問題はクラスタコンピューティングにおいてはしばしば発生する事象のため、非常に有効。
既存のストリーム処理プロダクトはこの「遅い」問題に対応するのが困難である。

私たちはD-StreamsをSpark Streaming(Sparkの拡張コンポーネント)として実装した。
このシステムは6000万レコード/秒の処理を100ノード上で実行することで秒単位のレイテンシで処理が可能となっており、
障害や「遅いノード」の問題に対する復旧も1秒未満で可能となっている。

これは現状広く使用されているOSSのストリーム処理プロダクト(Storm)の5倍のスループットを誇る。
(障害復旧、イベントの抜け検知等は同等に行った上で)
尚、パフォーマンスの章においてはSpark Streamingを使用して
「ビデオ配信モニタリングシステム」「オンライン機械学習」を実施した結果を示している。

D-Streamsを用いるより重要な理由として、処理モデルやデータモデル(RDD)が
バッチ処理と同じものを用いてストリーム処理を実施することができるというものがある。
Spark Streamingは内部的にはSparkのバッチ処理インタラクティブクエリ処理とシームレスになっている。

これは実際に非常に有用な特徴で、この特徴を持つためにSpark Streamingはストリームに対して
アドホックなクエリを実行したりストリームを過去データと結合することが同一API上で可能となっている。

このようなバッチ処理とストリーム処理の境界を曖昧にして共通的に実行する方法についても記述する。

2. ゴールと背景

多くの重要なアプリケーションが大量データをリアルタイムに処理することで実現できる。
私たちの今回の施策のターゲットは数百ノードのマシン上で動作して、秒単位の遅延で実現したいアプリケーションとなる。
例としては以下のような感じ。

  • サイトアクセス統計

FacebookはPumaという分散統合システムを構築しており、ユーザに対して最適な広告を常時算出している。
遅延は10〜30秒程で毎秒100万のイベントを処理している。

  • スパム検知

TwitterのようなSNSは新たなスパムをリアルタイムに検出するために
統計機械学習アルゴリズムを常時実行している。

データセンターのオペレータはFlumeのようなログ収集プロダクトを使用して数百ノードの監視を常時行っている。

  • ネットワーク侵入検知

NIDSのようなシステムにおいて、毎秒数百万のイベントを解析して不正なアクティビティがないか監視を行っている。

これらのアプリケーションにおいては0.5〜2.0秒程の遅延で実現可能なD-Streamsは十分だと考えている。
D-Streamsを使用することで元のシステムよりより小さいタイムスケールで傾向をつかむことができ、
かつ障害復旧向けのレプリケーションも不要であるためマシンコストも削減可能。

尚、私たちは数百ミリ秒のレイテンシで対応が必要なリアルタイムトレーディングについては意図的に対象としていない。

秒単位のレイテンシに加えて、私たちが実現を目指すものは
耐障害性(障害や「遅いノード」問題から早期復旧)と効率性(基本的な処理のために必要となるハード以上に大きなリソースを極力消費しない)です。

耐障害性は私たちがターゲットとしている規模のシステムにおいて重要なファクターとなる。
加えて、障害復旧は高速(時間に厳しいストリーム処理では数秒での復旧が必要)である必要がある。
効率性も同じようにターゲットとしている規模のシステムにおいては重要なファクター。
例えば、常時処理をレプリケーションする必要があるシステムを数百ノード規模のクラスタで実行した場合、
どれだけのリソースが追加で必要になるか考えて欲しい。

2.1 既存のストリーム処理システム

分散ストリーム処理を実現するプロダクトはいくつかあるが、
ほとんどの既存システムは同じ「record-at-a-time」処理モデルを採用している。
このモデルにおいては、ストリーム処理は常時実行される状態を持つオペレーションに分割されて表現され、
そのオペレーションはメッセージを受信する度に処理を行い、内部情報を更新し、出力を送信している。
(例:全ウィンドウのページビューテーブル等)
下記の図(a)はその概要を示したものである。

この「record-at-a-time」処理モデルはレイテンシを最小化する。
だが、状態を持ったオペレーションをネットワークを介して非決定的論的に結合しているため、
効率的な耐障害性を提供するのは困難となる。

私たちは今回のアプローチを記述する前にまずはこの困難さについて明確化する。

2.2 障害や「遅いノード」問題に対する耐性を確保するための取組

「record-at-a-time」システムにおいては復旧方式はロストした状態の再構築や「遅いノード」の排除となる。
既存のシステムにおいてはこれらは2つのアプローチ
レプリケーション」か「上流データのバックアップ」で行われてきた。
このアプローチには障害復旧の際の所要時間とコストでトレードオフがある。

レプリケーション方式(データベースで取られるアプローチ)においては、
2つの演算描画グラフをレプリケートし、入力イベントは両方に送信する。

しかしながら、単にレプリケーションを行うだけでは不足で、2つのレプリケーション間で
同期をとって処理を行う必要がある(FluxやBorealisのDPC等)上、レプリケートされたタスクは
お互いに同じ順序で上流からイベントを受信し続ける必要がある。

例えば、オペレータが2つの親オペレータの結果を結合して出力するものだとする。
(順序はどちらか片方から受信した順番)
その場合、親オペレータは同じ順序で結果を出力する必要が出てくるため、
高速での復旧は可能だが、非常にコストが高くなる。

上流データのバックアップアプローチにおいては、
各ノードはあるタイミングから送信したメッセージをコピーして保持しておく必要がある。
ノード障害が発生した場合、待機系のマシンが役割を引き継ぎ、親ノードはメッセージを再送する必要がある。
このアプローチはノード障害発生時に情報を再計算・ストリーム間の送信も
再送する必要があるため、障害発生時に復旧時間が長くなる。

現状最も広まっているStormのようなメッセージキューイングシステムも同じアプローチをとっている。
このアプローチをとった場合「少なくとも1回処理する」という障害対応しか提供できない上、
ユーザ側でその対応について記述する必要があるという問題が生じる。
(Storm-Tridentの場合は毎回の結果は自動的にDB等のデータストアにバックアップすることで
 プログラミングモデルを単純化している。だが、その状態更新をトランザクション的に実行するために同期コストが高くなる。)

ここで重要なことは、「レプリケーション」「上流データのバックアップ」の両方のアプローチとも、
「遅いノード」問題については対処を行っていないことである。

もし「レプリケーション」のアプローチにおいてノードの処理が遅くなった場合、
メッセージを同じ順で処理し、同期が必要になる関係上システム全体が遅くなる。
同種の問題は「上流データのバックアップ」アプローチにおいても発生する。

現状、「遅いノード」を検知する手段は障害として扱う他なく、
それは遅くなったのが一過性かもしれない状況下において影響が大きくコストも大きくなる。

そのため、伝統的なストリーム処理プロダクトは小さめのクラスタか、
リソース的に余裕があるクラスタで有効になるケースが多くなる。
大規模なクラスタに適用した場合、障害や「遅いノード」という問題に苦しめられることになる。

3. Discretized Streams (D-Streams)

私たちのモデルは伝統的なストリーム処理が抱えている問題を
メッセージの内部状態に関わらずオペレーションを完全に決定論的にすることで解消する。
伝統的なストリーム処理での常駐する状態を保持し、ストリーム間のメッセージ順に依存するオペレータの代わりに、
D-Steamsはステートレスで決定論的なタスクの短い塊を実行することで代替する。

このタスク間で状態を保持する耐障害性を持ったデータ構造(RDDs)は決定論的に再演算を行うことができる。
RDDsを用いることで並列復旧障害など有効な障害復旧手法を実施可能になる。
耐障害性だけでなく、このモデルは明確な一貫性確保、シンプルなAPIバッチ処理との統合など重要な利点をもたらす。

言い表すと、私たちのこのストリーム処理を扱うモデルは「離散時間間隔で決定論的なバッチ計算のシリーズ」と言える。
一定時間間隔で受信したデータはクラスタをまたいだ耐障害性を確保した領域に保持される。
一定間隔が終了すると、そのデータセットは決定論的な並行オペレーション・・・map、reduce、groupByといった
新たなデータセットを生成するか、他プログラムへのアウトプットを行うオペレーションによって処理される。

私たちはそのデータセットの結果をRDDsという領域に保持する。
RDDsはこれまでも説明したように、データの「系統情報」を保持することでレプリケーションを行わずに
耐障害性を確保可能な高速アクセス可能なデータセットとなる。
下記の図の(b)のように。

ユーザはこれらのデータセットをD-Streamsという形でプログラム中で定義し、操作を行う。
D-Streamsはイミュータブルで分割されたデータセットの配列として表現され、決定論的なオペレータによって処理される。
これらのオペレータは新たなD-Streamsを生成し、RDD形式の中間データ(内部状態)も生成するケースがある。

私たちはSpark StreamingプログラムをURL毎にページビューイベントをカウントするプログラムを用いて説明する。
Spark StreamingはD-StreamsをScala言語におけるDryadLINQやFlumeJavaのような関数型APIを用いて生成する。
尚、これはSparkを拡張して実現している。

コードは以下のようになる。

pageViews  =  readStream("http://...",  "1s")
ones  =  pageViews.map(event  =>  (event.url,  1))
counts  =  ones.runningReduce((a,  b)  =>  a  +  b)

上記のコードはまずpageViewsというD-Streamsをhttp越しにStreamを受信することで生成する。
pageViewsは1秒ごとに取得が行われる。

その後、イベントをイベントのURLと1をペアとしたonesというD-Streamsを生成する。

onesに対しては個数を加算してマージするrunningReduceオペレータを用いてcountsの生成を行う。
上記コード中のmap/runningReduceに対して与えている引数はScalaの関数クロージャ

このプログラムを実行することで、システムはmapタスクを毎秒受信したイベントに対して実行する。
その後reduceタスクが起動し、mapの結果とRDDに保持されている前回のreduceの結果のマージを行う。
これらのタスクは実行の結果新たなRDDを生成し、そこに更新後のカウント情報を保持する。

もし障害や「遅いノード」が発生した場合、D-StreamsとRDDは自分自身が保持する系統グラフを基に再計算を行う。
#現状障害や「遅いノード」をどうやって検知するかはないですが、そこが重要なポイントにはなりそうです。
このSparkシステムは各データセットがどんなグルーピングを用いてパーティション分割されているかも下記の図のように保持している。

ノード障害が発生した場合、ロストしたRDDパーティションをタスクを再実行することでクラスタ内に存在するデータから復旧する。
#いまいちこの辺がわからないところで、ストリームとして取得したデータは二度と取得ができないため、
#定期的なチェックポイント+系統グラフを保持しているだけでは復旧できないはずなんですが・・・
RDD自体をどこかに毎回バックアップする=「上流バックアップ方式」と同じ方式を取らざるを得ないはず・・・

Sparkシステムは一定間隔(RDDが5回生成される度・・など)ごとにRDDをチェックポイントとして保存し、無限な再計算を防止している。
だが、これらは全データに対して行う必要はないため、復旧は高速、かつ並列に実行され、ノード間をまたぐこともない。

同じような方法でもしノードの処理が遅くなった場合、投機的にタスクのコピーを生成して実行することができる。
コピーした結果、決定論的に元のタスクと同じ結果を生成する。
このようにすることで、D-Streamsは全体トポロジを複製することなく障害や「遅いノード」から復旧することができる。

この章の残りで処理保証についてとD-Streamのプログラミングインタフェースをより詳細に記述する。
実際の実装方式は4章で記述する。

3.1 タイミング考慮

注意点として、D-Streamsはイベントがシステムに到着したタイミングでデータセットに記録している。
これはシステムがいつでも新しいバッチを開始するために必要で、
かつ、同じ場所で生成されるレコードは同じストリームとして扱うためにも必要となる。
(例:同じデータセンターで動作するサービスは同じデータセンターからのデータとして扱う)
これは意味論としては特に問題はない。

他のアプリケーションにおいては開発者は「外部タイムスタンプ(イベント発生元の時刻)」に
基づいてレコードをグルーピングしたいケースがあるかもしれない。
その場合はネットワークの遅延などによって実際に到着する時刻とタイムスタンプは前後する可能性がある。
D-Streamsはそのようなケースにおいては以下の2つのハンドリング方法が可能になっている。

1.
システムは各プロセスにおけるバッチが開始するまでに「余剰時間」を確保して待つことができる。
待つことにより、レコード送信に遅れが発生した場合であっても正しいバッチに含めることが可能。
但し、この場合はバッチ起動までに必ず一定の空き時間が生じてしまう。

2.
ユーザプログラム側で遅延レコードの補正をアプリケーションレベルで行う。
例えば、「t」「t+1」の間にクリックされた回数をカウントするアプリケーションがあったと仮定する。
D-Streamsのインターバル時間を1秒とおいた場合、Spark Streamingは「t」「t+1」の間に受信したレコードのカウントを行うため、
「t+1」のタイミングでクリックされた情報は落ちる。
そのため、次の間隔において外部タイムスタンプが「t」「t+1」の間に収まっているイベントの計算を行い、結果の更新を行う必要がある。

例えば、「t」「t+1」の間のカウント処理を新たにt+5のタイミングで実施することを考える。
その場合、「t」〜「t+5」のタイミングで受信したレコードの中から総計を算出する必要がある。
この処理はインクリメンタルなreduceを実行することでより効率的に実行可能。
前回の「t」「t+1」のカウント結果に新たに受信したイベント分のみを加算すればいいことになる。
このアプローチは “order-independent processing”と似ている。

このタイミング問題はストリーム処理全般が内在している問題で、外部システムから取得する際の遅延を常に考慮する必要がある。
これらの問題についてはストリームデータベース等で詳しく触れられている。
これらの技術の多くはD-Streamsでも実装されており、ユーザが逐一この問題について考える必要はない。
ただ、この論文においてはこれ以上の詳細については触れない。

3.2 D-Streamsオペレータ

Spark StreamingでD-Streamsを使用するにあたり、ユーザは「ドライバプログラム」を記述し、
1つまたはそれ以上のstreamsに対して関数型APIを通して処理を記述する必要がある。
このプログラムにおいて1つまたはそれ以上の外部ストリームからの入力を定義することが可能。
定義可能な例として、ポート上での待ち受けや、HDFSに対する定期的なロード等がある。
D-Streamsに対して実行可能なオペレータは以下の種類に分類分けすることができる。

  • Transformationオペレータ

このオペレータは1つまたはそれ以上の親D-Streamsから新しいD-Streamsを生成する。
これらはステートレス変換処理のケースにおいては親RDDにのみ依存する処理を定義可能。
ステートフル変換処理のケースにおいては更に過去のデータを用いることが可能。

  • Outputオペレータ

このオペレータは外部システムに出力を行う。
例えば、saveオペレーションでD-Streamsに含まれる各RDDをデータベースに出力することが可能。

D-Streamsはステートレス変換処理については典型的なバッチ処理フレームワークと同様の演算を扱うことができる。
(map、reduce、groupby、join)
尚、Spark StreamingにおいてはSparkの変換処理を再利用している。
例えば、プログラムが標準的なMapReduceであるワードカウントをD-Streamsの各インターバルごとに実施する場合、
以下のようにコードを記述することができる。

words  =  sentences.flatMap(s  =>  s.split("  "))
pairs  =  words.map(w  =>  (w,  1))
counts  =  pairs.reduceByKey((a,  b)  =>  a  +  b)

加えて、D-Streamsは複数インターバルをまたいだ処理のためいくつかのステートフルな変換処理の提供も行っている。
このステートフルな変換処理は典型的なストリーム処理の技術であるスライディングウィンドウを基に実装されている。
ステートフルな変換処理は以下の変換処理を持つ。

  • Windowing

windowオペレータはRDD中の過去のスライディングウィンドウを基に全レコードをグループ化する。
例えば、「sentences.window("5s")」というコードを実行するとD-Streams中のRDDから「0〜5」「1〜6」という形で取得するようになる。

  • Incremental aggregation

統合処理の共通的なユースケースにおいてはcountやmaxをスライディングウィンドウに対して実行する形になる。
D-Streamsはいくつかの拡張系を持つオペレータ「reduceByWindow」によってインクリメンタルな演算を可能としている。
シンプルな例として、marge処理を行って複数の値を結合する例を考える。
その場合、コードには以下のように記述する。下記のコードは過去5秒分の合計値を算出するオペレーションとなる。

pairs.reduceByWindow("5s",  (a,  b)  =>  a  +  b)

この処理はインターバル内のカウント処理は1回のみしか実行されないが、
5インターバル分の値を加算して結果を算出するという処理は下記の図(a)のように毎回実行される。

尚、もし統合処理を行うaggregate関数が可逆処理だった場合、
より効果的なsubtractingパターンを用いて上記図(b)のようにインクリメンタル処理を行うことができる。

pairs.reduceByWindow("5s",  (a,b)  =>  a+b,  (a,b)  =>  a-b)
  • State tracking

しばしば過去の状態を確認してレスポンス/状態に反映しなければならないアプリケーションが存在する。
例えば、ビデオデリバリーシステムで現在アクティブなセッションを追跡するプログラムを考える。
セッションが開始した際には「join」イベントを受信し、セッション終了時には「exit」イベントを受信するとする。
尚、この解析を発展させると「ビットレート閾値をオーバーしているセッションはいくつか?」等もカウントすることが可能。

D-Streamsはtrackオペレータを提供している。
これはstreamsを(Key, Event)というレコードから(Key, State)というストリームに変換するもので、
以下3つの引数を取る。

  1. 初期化関数。存在しないKeyに対するイベントを受信した際に実行される。
  2. 状態更新関数。Keyと現状のState、受信Eventを基にどのようなStateになるかを記述する。
  3. 古い状態を破棄するタイムアウト

例えば、アクティブセッションをカウントするための(ClientId, Event)を保持するstreamを変換する関数は下記のようになる。

sessions  =  events.track(
(key,  ev)  =>  1, //  initialize  function
(key,  st,  ev)  =>  ev==Exit  ?  null  :  1,//  update
"30s") //  timeout
counts  =  sessions.count()   //  a  stream  of  ints

上記のコードは各クライアントに対して"1"という状態をアクティブな場合保持し、
ログアウトする場合nullとして更新する処理として記述している。

これらのオペレータはバッチ処理を実行するSparkの段階で既に実装されていたもので、
RDDに適用するか、親Streamsに適用するかだけが異なってくる。
例えば、下図はtrack関数で構築されたRDDsが古い状態とのマージをどのように行っているかを示している。

Spark Streamingのプログラムは最終的にoutputオペレータを呼び出し、
Spark Streamingから外部のシステム(ダッシュボード等)に結果を送信する。

その際に私たちは2つのオペレータを提供している。

save:D-Streams中のRDDを外部システムに書き込む(例:HDFS、HBase)

foreachRDD:RDDに対してユーザ定義したコードを実行する。
例えば、ユーザは「counts.foreachRDD(rdd => print(rdd.getTop(K)))」
というコードを記述することでトップのKカウントを出力することができる。

3.3 一貫性

#この章については一応読んでみましたが、かなり訳が怪しいです。
#StormやS4が状態矛盾を気にしない云々も書かれていますが、
#その場合StormではRedis等のデータキャッシュを使ってハンドリングするわけで、いまいち問題としているポイントがわからない状態です。
#ページビューのデータがそもそもおかしくなるならSpark Streamingだろうが補正はできないわけですし。
ノード間の状態一貫性は各レコードを
プロセス間で分離して処理するストリーム処理では問題となる。
例えば、国ごとのページビューをカウントする分散システムを考える。
その場合、ページビューイベントは国ごとの統計を取るノードとは別ノードに送信される。

もし英国の統計を取るノードが途中でダウンし、フランスのノードの後ろに下がった場合、
その段階での各ノード間の状態スナップショットに差分が出る可能性が出てくる。
#おそらく、国ごとの経路があって、途中で通った経路についてもページビューとして含めるモデルを想定している?
英国のカウント値は古いストリームからの受信分も反映するものの、
フランスの後に下がった後の値は一般的にフランスより小さくなるだろう。

いくつかのシステム(Borealis)等においてはノード間の同期を行うことで対処を行っているが、
StormやS4のようなシステムではこの影響を無視する。

D-Streamsを用いた場合、セマンティクスはクリアになる。
各インターバルでの出力結果はそれまでに受信した結果を全て反映したものになるためである。
もしアウトプットが複数ノードに分散していたとしても、
決定論的なバッチジョブの結果出力したものなので、ずれが発生することはない。

こういった事情があるため、Spark Streamingの分散状態管理は容易になり、
障害発生時に「厳密に1回しか処理しない」ということも達成することができる。

3.4 バッチ処理インタラクティブな処理の統合

D-Streamsはバッチ処理と同じデータモデル(RDDs)を用いることでバッチ処理と同様の障害復旧メカニズムを用いることができ、
2つの処理をシームレスに結合することも可能となっている。
Spark Streamingは上記の2処理の統合のための有効な要素をいくつか提供している。

1つ目として、D-Steamsはバッチ処理で生成された静的なRDDsと結合することが可能。
例えば、受信したメッセージを以前計算されたスパムフィルタと結合したり、過去データと比較することが可能。

2つ目として、ユーザはD-Streamsのプログラムを過去データを利用したバッチモードで実行することが可能。
このモードを使用することで、過去データとの容易なマージが可能になっている。

3つ目として、ユーザはD-Streamsに対するインタラクティブクエリを
動作中のプログラムに対してScalaコンソールを接続することで実行可能。
SparkはSclaインタプリタ改造版を保持しており、
RDDsに対してアドホッククエリを秒単位の遅延で実行することが可能な作りになっている。
Streamingプログラムに対してインタプリタで接続することでそのタイミングのスナップショットに対して
アドホックなクエリを実行することが可能になる。

例として、ユーザは以下のクエリを実行することで、指定した時間帯の人気ワードを取得することが可能。

counts.slice("21:00",  "21:05").topK(10)

開発者がオフライン処理(Hadoop等)とオンライン処理の両方の開発経験者より、
この機能が大きな価値を持つという議論に至っている。
同じコードベース内でバッチ処理とストリーム処理を同時にかけるというのは
別々のシステム上で開発して実行することに比べて開発時間を節約可能。

ストリーム処理システムにおいてインタラクティブにクエリを実行できるということはそれ以上に有用なポイントを持っている。
それは、実行中の処理のデータに対する解析が容易になるということ。
初めに想定されていないクエリを実行することも可能であるため、ストリームに後から新規のクエリを追加することも可能。

このインタラクティブな実行が出来ない場合、メモリ上に既に存在する情報であるにもかかわらず、
最終的な結果を確認できるのは数十分後という形になってしまう。

3.5 結果まとめ

D-Streams概要章の最後に「record-at-a-time」システムとの比較結果を下記の表にまとめておく。
最も大きな違いはバッチ処理を分割することで決定論的な塊のまま処理するか、イベントを1件ずつ処理するかの違いとなるだろう。
バッチ化することでバッチ処理を走らせるインターバルという待ち時間は発生するものの、有効な障害復旧が可能となる。
実際、「record-at-a-time」システムにおいても同期や一貫性確保、遅れデータへの対応のために
Spark Streamingの秒単位の遅延と同じように、数百ミリ秒単位の遅延が発生してくる。

4. システムアーキテクチャ

私たちはD-StreamsをSpark Streamingという名称のシステム上に構築している。
Spark StreamingはSpark処理エンジンを拡張する形で作成している。
Spark Streamingは下図の3つのコンポーネントから構成される。

  • MasterはD-Streamsの系統グラフの追跡と生成されたRDDへのタスクのスケジューリングを行う
  • Workerはデータを受信し、入出力のRDDパーティションを保持し、タスクを実行する。
  • Client Libralyはシステムに対してデータを送信する

Spark Streamingと伝統的なストリーム処理システムの間の大きな違いはSpark Streamingは
タスクを小さく分割し、ステートレスにし、クラスタ上のどのノードであっても実行可能にするということ。
明確なトポロジとしてタスクを定義する伝統的なメッセージ処理システムと異なり、
別のマシンに処理を一部移動するということが度々発生する。
このアプローチは障害が発生したり「遅いタスク」に投機的にリソースを配分するなどの対応に対し柔軟に動作する。

このアプローチ自体は本来MapReduceのようなバッチ処理に向いている措置だが、
Spark Streamingにおいてもtaskが細分化されて、RDDがメモリ上に存在いているため50〜200msの遅延で対応可能になっている。

更に、Spark Streamingの状態は伝統的なシステムの常駐オペレータではなく、耐障害性を持つRDDに保持している。
RDDパーティションは任意のノード上に存在することができ、かつ複数ノード上で並列して処理を実行することが可能。
Spark Streamingにおいては、データ局所性を極力活用できるよう配置を行っている。
この配置によって決定論的な推測とデータの並列復旧が可能となる。

これらの利点はSpark本体から引き継いでいる。
だが、私たちは現状更にレイテンシを小さくできないか検討を継続している。

4.1 アプリケーション実行

Spark Streamingのアプリケーションは1つかそれ以上のインプットストリームを基に開始する。
Spark Streamingはクライアントからレコードを受信したタイミングか、
HDFSのような外部ストレージに定期的にデータをロードするタイミングでStreamsを取得し、動作する。
尚、このデータ取得部はflumeのようなログ収集ツールでも構わない。

前者のケース(クライアントからレコードを受信するパターン)においては取得する新規データを
データ取得元に応答を返す前に2つのWorkerノードでレプリケーションしている。
なぜなら、D-Streamsはインプットデータを再計算を可能とするために保存しておく必要があるためである。
Workerの障害が発生した時、クライアントライブラリは応答が返ってきていないデータについて再送する必要がある。

全データは各Worker毎の「block store」によって管理される。
「block store」はマスターを介して他のノードの上にある「block storre」を検索可能となっている。
なぜなら、「入力値」とRDDはイミュータブルである関係上「block store」を検索することは容易だからである。
各「block store」はユニークなIDを割り振られ、各ノードは保持している各対象パーティションのユニークIDを提供している。
「block store」は基本的にメモリ上に配置するが、スペースが足りなくなった場合はLRUアルゴリズムでディスクへの退避を行っている。

いつ処理の新規インターバルを開始するかについてはノード同士がNTPによって時刻同期されていることを前提に、
各ノードがマスタに対してブロックリストを送信し、処理の終了時に受信している。
マスタはそれ以上の同期処理は行わず、各インターバルごとに出力RDDを作成するタスクを実行するよう起動している。
タスクが起動する際は他のバッチ処理スケジューラと同じように前のタスクが終了していた場合起動するようになっている。

Spark Streamingが各時間インターバルごとにタスクを実行するには元々存在していたSparkのスケジューラを用いている。
また、DryadLINQシステムのような最適化も行っている。

  • 各タスクがmapの次にmapが来るなどまとめられる構成の場合、パイプラインオペレータとしてまとめている。
  • データ局所性を基にタスクを配分している
  • RDDパーティションがネットワーク越しにshuffleを行うことを避けている。

ネットワーク越しのshuffle実行防止について、例えばreduceByKeyオペレータについて考える。
各インターバルごとに新たに追加された結果を"add"し、もっとも古いインターバルの結果を減算する必要がある。
その際、スケジューラは各インターバル間でRDDパーティションを同じ方式で分割を行い、共通のキーが使い続けられるようにする。
結果、加算の際にネットワーク越しの通信は発生しなくなる。(詳細はSparkのパーティション対応スケジューリングを参照)

現状の私たちのコードベースにおいて、Workerの障害は復旧可能だが、マスタは耐障害性を保持していない。
私たちはD-Streamsの系統グラフのバックアップを保持することで
マスターに障害が発生した場合でも処理を継続することが可能だと考えているが、
ストリーム処理の復旧を優先したためそこは後回しにしている。

4.2 ストリーム処理の最適化

私たちはSparkエンジンに対してStream処理に対応させるための最適化を行っている。

  • Block placement

ストリームデータの入力時、ランダムでレプリカを作成するとホットスポットが発生してしまうため、
現状のノードの負荷に応じてレプリカを作成するノードを選択するようにしている。

  • Network communication

私たちはSparkのデータ通信部を改造時、reduce等の一部のタスクにおいて非同期IOを使用するようにした。
その結果高速化している。

  • Timestep pipelining

現状、各インターバルごとの実行はクラスタに完全に適用していない。(例:インターバルの最後にタスクがまだ残留して走っている場合がある)
そのため、次インターバルのタスクを前インターバルのタスクが残っている場合でも実行可能としている。
上記のためにSparkスケジューラを改造し、現状実行されているタスクが生成しているRDDをインプットとしてタスクを開始できるようにしている。

インターバルごとにバッチ処理が実行される関係上、系統グラフが無限に伸びる形になってしまう。
そのため、一定時間以上経過した系統グラフはチェックポイント設定時に除去するようにした。
無限に系統グラフが肥大化することはなくなっている。

5. Fault and Straggler Recovery

D-Streamsの決定論的な性質は伝統的なストリーム処理技術では使用するのが困難な
2つの強力な復旧技術(ロスト状態の並列復旧、「遅いノード」に応じた投機的実行)を実行可能にしている。

5.1 並列復旧

ノード障害が発生した際、D-StreamsはRDDとタスクを同じように別ノードでの並行実行による復旧を許容している。
システムは定期的にRDD状態のチェックポイントを取得し、別Workerノードに非同期でレプリケーションを行っている。

例えば、ページビューカウントプログラムを走らせており、
システムが1分ごとにチェックポイントを取得するよう設定されていたとする。
すると、ノード障害が発生した際、システムは障害によってロストしたRDDパーティションを検知し、
ロストしたパーティションを最後のチェックポイントから再算出するタスクを実行させる。

多くのタスクが別々のRDDパーティションに対して実行することができるため、
クラスタ全体が状態の復旧を並列して行うことが可能となる。
そのため、並列復旧はシステム全体のレプリケーションを取ることなく素早い状態復旧が可能となっている。

並列復旧は既存のストリーム処理に適用するのは難しい。
なぜなら、「record-at-a-time」システムにおいて適用するためには
基本的なレプリケーション確保であっても複雑かつコストの高い記録プロトコルが必要となるため。
このレプリケーション/復旧処理はMapReduceのような決定論的なバッチ処理においてはシンプルになり、実行可能。

並列復旧の効果を下記の図に示す。

この図はシングルノードの上流バックアップ方式(シンプルな解析モデル)と並行復旧のコストについて比較したものである。
1分前のチェックポイントから復旧させることを前提としている。

上流バックアップのグラフにおいて、1つのアイドルマシンを受信レコードを処理しなおすことで復旧させる処理に割り振っている。
復旧には元々システムの負荷が高かった場合長い時間がかかる。
なぜなら、新しいレコードは復旧処理中も常時送信され続けるからである。

仮に、障害発生前にシステムの負荷割合が「λ」だったとする。
1分の復旧処理を行う間に、λ分だけ新しいレコードを処理することに費やされる形になる。
こうして1分ごとにλ分だけ新規処理を継続する状態で復旧に必要となる時間をt{\small up}とすると、λとt{\small up}の関係はt{\small up} =\lambda+t{\small up}\lambdaとなる。
そのため、復旧にかかる時間はt{\small up}=\frac\lambda{1-\lambda}で記述できる。

他のグラフは他のマシン上で復旧処理を新規レコードを受信しながら並列実行した場合のグラフとなる。
障害発生前にクラスタ内にN個のマシンがいた場合、N-1個のマシンが復旧処理を実施できる。
そのため、各マシンは毎分\frac\lambda{N}の時間をかけて復旧処理を行う形になる。
その上で、新規レコードを処理するために毎分\frac N{N-1} \lambda分の処理を行う必要がある。
復旧に必要となる時間をt{\small par}とすると、λとt{\small par}の関係はt{\small par}=\frac\lambda{N}+t{\small par}\frac N{N-1} \lambdaとなる。
そのため、復旧にかかる時間はt{\small par}\simeq \frac\lambda{N(1-\lambda)}で記述できる。

上記の結果より、並行復旧はノード数が増えるほど復旧時間が短縮されることがわかる。

5.2 Straggler Mitigation

障害と並んで大規模クラスタにおいて重要な関心ごととなるのは「遅いノード」問題となる。
幸いなことに、D-Streamsは「遅いノード」の軽減をバッチ処理システムと同様に投機的処理を行うことで対応できる。

このような投機的処理は「record-at-a-time」システムで実行するのは難しい。
なぜなら、ノードの状態も含めたコピーが必要となるからである。だが、「遅いノード」のコピーを取得するのは難しい。
実際、ストリーム処理のレプリケーションはFlux等においては2つのレプリカを同期させることに焦点を当てており、投機的ではない。

現状私たちは「遅いノード」を検知するためにシンプルな閾値を使用している。
ジョブ中の中間タスクの終了タイミングが1.4倍以上の時間がかかった場合、遅いノードとしてマークする。
よりよりアルゴリズムがあるのかもしれないが、現状十分な対応が行われており、「遅いノード」も秒単位の遅延で復旧が可能となっている。