読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Spark Streamingの論文を読んでみます(3章

こんにちは。

以下論文を読んでみようの続きで、今回は第3章です。

「Discretized Streams: A Fault-Tolerant Model for Scalable Stream Processing」
http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf

内容としては、「Discretized Streams (D-Streams)」ということで、
今回の肝である離散ストリームについて書かれた章となります。

3. Discretized Streams (D-Streams)

私たちのモデルは伝統的なストリーム処理が抱えている問題を
メッセージの内部状態に関わらずオペレーションを完全に決定論的にすることで解消する。
伝統的なストリーム処理での常駐する状態を保持し、ストリーム間のメッセージ順に依存するオペレータの代わりに、
D-Steamsはステートレスで決定論的なタスクの短い塊を実行することで代替する。

このタスク間で状態を保持する耐障害性を持ったデータ構造(RDDs)は決定論的に再演算を行うことができる。
RDDsを用いることで並列復旧障害など有効な障害復旧手法を実施可能になる。
耐障害性だけでなく、このモデルは明確な一貫性確保、シンプルなAPIバッチ処理との統合など重要な利点をもたらす。

言い表すと、私たちのこのストリーム処理を扱うモデルは「離散時間間隔で決定論的なバッチ計算のシリーズ」と言える。
一定時間間隔で受信したデータはクラスタをまたいだ耐障害性を確保した領域に保持される。
一定間隔が終了すると、そのデータセットは決定論的な並行オペレーション・・・map、reduce、groupByといった
新たなデータセットを生成するか、他プログラムへのアウトプットを行うオペレーションによって処理される。

私たちはそのデータセットの結果をRDDsという領域に保持する。
RDDsはこれまでも説明したように、データの「系統情報」を保持することでレプリケーションを行わずに
耐障害性を確保可能な高速アクセス可能なデータセットとなる。
下記の図の(b)のように。

ユーザはこれらのデータセットをD-Streamsという形でプログラム中で定義し、操作を行う。
D-Streamsはイミュータブルで分割されたデータセットの配列として表現され、決定論的なオペレータによって処理される。
これらのオペレータは新たなD-Streamsを生成し、RDD形式の中間データ(内部状態)も生成するケースがある。

私たちはSpark StreamingプログラムをURL毎にページビューイベントをカウントするプログラムを用いて説明する。
Spark StreamingはD-StreamsをScala言語におけるDryadLINQやFlumeJavaのような関数型APIを用いて生成する。
尚、これはSparkを拡張して実現している。

コードは以下のようになる。

pageViews  =  readStream("http://...",  "1s")
ones  =  pageViews.map(event  =>  (event.url,  1))
counts  =  ones.runningReduce((a,  b)  =>  a  +  b)

上記のコードはまずpageViewsというD-Streamsをhttp越しにStreamを受信することで生成する。
pageViewsは1秒ごとに取得が行われる。

その後、イベントをイベントのURLと1をペアとしたonesというD-Streamsを生成する。

onesに対しては個数を加算してマージするrunningReduceオペレータを用いてcountsの生成を行う。
上記コード中のmap/runningReduceに対して与えている引数はScalaの関数クロージャ

このプログラムを実行することで、システムはmapタスクを毎秒受信したイベントに対して実行する。
その後reduceタスクが起動し、mapの結果とRDDに保持されている前回のreduceの結果のマージを行う。
これらのタスクは実行の結果新たなRDDを生成し、そこに更新後のカウント情報を保持する。

もし障害や「遅いノード」が発生した場合、D-StreamsとRDDは自分自身が保持する系統グラフを基に再計算を行う。
#現状障害や「遅いノード」をどうやって検知するかはないですが、そこが重要なポイントにはなりそうです。
このSparkシステムは各データセットがどんなグルーピングを用いてパーティション分割されているかも下記の図のように保持している。

ノード障害が発生した場合、ロストしたRDDパーティションをタスクを再実行することでクラスタ内に存在するデータから復旧する。
#いまいちこの辺がわからないところで、ストリームとして取得したデータは二度と取得ができないため、
#定期的なチェックポイント+系統グラフを保持しているだけでは復旧できないはずなんですが・・・
RDD自体をどこかに毎回バックアップする=「上流バックアップ方式」と同じ方式を取らざるを得ないはず・・・

Sparkシステムは一定間隔(RDDが5回生成される度・・など)ごとにRDDをチェックポイントとして保存し、無限な再計算を防止している。
だが、これらは全データに対して行う必要はないため、復旧は高速、かつ並列に実行され、ノード間をまたぐこともない。

同じような方法でもしノードの処理が遅くなった場合、投機的にタスクのコピーを生成して実行することができる。
コピーした結果、決定論的に元のタスクと同じ結果を生成する。
このようにすることで、D-Streamsは全体トポロジを複製することなく障害や「遅いノード」から復旧することができる。

この章の残りで処理保証についてとD-Streamのプログラミングインタフェースをより詳細に記述する。
実際の実装方式は4章で記述する。

3.1 タイミング考慮

注意点として、D-Streamsはイベントがシステムに到着したタイミングでデータセットに記録している。
これはシステムがいつでも新しいバッチを開始するために必要で、
かつ、同じ場所で生成されるレコードは同じストリームとして扱うためにも必要となる。
(例:同じデータセンターで動作するサービスは同じデータセンターからのデータとして扱う)
これは意味論としては特に問題はない。

他のアプリケーションにおいては開発者は「外部タイムスタンプ(イベント発生元の時刻)」に
基づいてレコードをグルーピングしたいケースがあるかもしれない。
その場合はネットワークの遅延などによって実際に到着する時刻とタイムスタンプは前後する可能性がある。
D-Streamsはそのようなケースにおいては以下の2つのハンドリング方法が可能になっている。

1.
システムは各プロセスにおけるバッチが開始するまでに「余剰時間」を確保して待つことができる。
待つことにより、レコード送信に遅れが発生した場合であっても正しいバッチに含めることが可能。
但し、この場合はバッチ起動までに必ず一定の空き時間が生じてしまう。

2.
ユーザプログラム側で遅延レコードの補正をアプリケーションレベルで行う。
例えば、「t」「t+1」の間にクリックされた回数をカウントするアプリケーションがあったと仮定する。
D-Streamsのインターバル時間を1秒とおいた場合、Spark Streamingは「t」「t+1」の間に受信したレコードのカウントを行うため、
「t+1」のタイミングでクリックされた情報は落ちる。
そのため、次の間隔において外部タイムスタンプが「t」「t+1」の間に収まっているイベントの計算を行い、結果の更新を行う必要がある。

例えば、「t」「t+1」の間のカウント処理を新たにt+5のタイミングで実施することを考える。
その場合、「t」〜「t+5」のタイミングで受信したレコードの中から総計を算出する必要がある。
この処理はインクリメンタルなreduceを実行することでより効率的に実行可能。
前回の「t」「t+1」のカウント結果に新たに受信したイベント分のみを加算すればいいことになる。
このアプローチは “order-independent processing”と似ている。

このタイミング問題はストリーム処理全般が内在している問題で、外部システムから取得する際の遅延を常に考慮する必要がある。
これらの問題についてはストリームデータベース等で詳しく触れられている。
これらの技術の多くはD-Streamsでも実装されており、ユーザが逐一この問題について考える必要はない。
ただ、この論文においてはこれ以上の詳細については触れない。

3.2 D-Streamsオペレータ

Spark StreamingでD-Streamsを使用するにあたり、ユーザは「ドライバプログラム」を記述し、
1つまたはそれ以上のstreamsに対して関数型APIを通して処理を記述する必要がある。
このプログラムにおいて1つまたはそれ以上の外部ストリームからの入力を定義することが可能。
定義可能な例として、ポート上での待ち受けや、HDFSに対する定期的なロード等がある。
D-Streamsに対して実行可能なオペレータは以下の種類に分類分けすることができる。

  • Transformationオペレータ

このオペレータは1つまたはそれ以上の親D-Streamsから新しいD-Streamsを生成する。
これらはステートレス変換処理のケースにおいては親RDDにのみ依存する処理を定義可能。
ステートフル変換処理のケースにおいては更に過去のデータを用いることが可能。

  • Outputオペレータ

このオペレータは外部システムに出力を行う。
例えば、saveオペレーションでD-Streamsに含まれる各RDDをデータベースに出力することが可能。

D-Streamsはステートレス変換処理については典型的なバッチ処理フレームワークと同様の演算を扱うことができる。
(map、reduce、groupby、join)
尚、Spark StreamingにおいてはSparkの変換処理を再利用している。
例えば、プログラムが標準的なMapReduceであるワードカウントをD-Streamsの各インターバルごとに実施する場合、
以下のようにコードを記述することができる。

words  =  sentences.flatMap(s  =>  s.split("  "))
pairs  =  words.map(w  =>  (w,  1))
counts  =  pairs.reduceByKey((a,  b)  =>  a  +  b)

加えて、D-Streamsは複数インターバルをまたいだ処理のためいくつかのステートフルな変換処理の提供も行っている。
このステートフルな変換処理は典型的なストリーム処理の技術であるスライディングウィンドウを基に実装されている。
ステートフルな変換処理は以下の変換処理を持つ。

  • Windowing

windowオペレータはRDD中の過去のスライディングウィンドウを基に全レコードをグループ化する。
例えば、「sentences.window("5s")」というコードを実行するとD-Streams中のRDDから「0〜5」「1〜6」という形で取得するようになる。

  • Incremental aggregation

統合処理の共通的なユースケースにおいてはcountやmaxをスライディングウィンドウに対して実行する形になる。
D-Streamsはいくつかの拡張系を持つオペレータ「reduceByWindow」によってインクリメンタルな演算を可能としている。
シンプルな例として、marge処理を行って複数の値を結合する例を考える。
その場合、コードには以下のように記述する。下記のコードは過去5秒分の合計値を算出するオペレーションとなる。

pairs.reduceByWindow("5s",  (a,  b)  =>  a  +  b)

この処理はインターバル内のカウント処理は1回のみしか実行されないが、
5インターバル分の値を加算して結果を算出するという処理は下記の図(a)のように毎回実行される。

尚、もし統合処理を行うaggregate関数が可逆処理だった場合、
より効果的なsubtractingパターンを用いて上記図(b)のようにインクリメンタル処理を行うことができる。

pairs.reduceByWindow("5s",  (a,b)  =>  a+b,  (a,b)  =>  a-b)
  • State tracking

しばしば過去の状態を確認してレスポンス/状態に反映しなければならないアプリケーションが存在する。
例えば、ビデオデリバリーシステムで現在アクティブなセッションを追跡するプログラムを考える。
セッションが開始した際には「join」イベントを受信し、セッション終了時には「exit」イベントを受信するとする。
尚、この解析を発展させると「ビットレート閾値をオーバーしているセッションはいくつか?」等もカウントすることが可能。

D-Streamsはtrackオペレータを提供している。
これはstreamsを(Key, Event)というレコードから(Key, State)というストリームに変換するもので、
以下3つの引数を取る。

  1. 初期化関数。存在しないKeyに対するイベントを受信した際に実行される。
  2. 状態更新関数。Keyと現状のState、受信Eventを基にどのようなStateになるかを記述する。
  3. 古い状態を破棄するタイムアウト

例えば、アクティブセッションをカウントするための(ClientId, Event)を保持するstreamを変換する関数は下記のようになる。

sessions  =  events.track(
(key,  ev)  =>  1, //  initialize  function
(key,  st,  ev)  =>  ev==Exit  ?  null  :  1,//  update
"30s") //  timeout
counts  =  sessions.count()   //  a  stream  of  ints

上記のコードは各クライアントに対して"1"という状態をアクティブな場合保持し、
ログアウトする場合nullとして更新する処理として記述している。

これらのオペレータはバッチ処理を実行するSparkの段階で既に実装されていたもので、
RDDに適用するか、親Streamsに適用するかだけが異なってくる。
例えば、下図はtrack関数で構築されたRDDsが古い状態とのマージをどのように行っているかを示している。

Spark Streamingのプログラムは最終的にoutputオペレータを呼び出し、
Spark Streamingから外部のシステム(ダッシュボード等)に結果を送信する。

その際に私たちは2つのオペレータを提供している。

save:D-Streams中のRDDを外部システムに書き込む(例:HDFS、HBase)

foreachRDD:RDDに対してユーザ定義したコードを実行する。
例えば、ユーザは「counts.foreachRDD(rdd => print(rdd.getTop(K)))」
というコードを記述することでトップのKカウントを出力することができる。

3.3 一貫性

#この章については一応読んでみましたが、かなり訳が怪しいです。
#StormやS4が状態矛盾を気にしない云々も書かれていますが、
#その場合StormではRedis等のデータキャッシュを使ってハンドリングするわけで、いまいち問題としているポイントがわからない状態です。
#ページビューのデータがそもそもおかしくなるならSpark Streamingだろうが補正はできないわけですし。
ノード間の状態一貫性は各レコードを
プロセス間で分離して処理するストリーム処理では問題となる。
例えば、国ごとのページビューをカウントする分散システムを考える。
その場合、ページビューイベントは国ごとの統計を取るノードとは別ノードに送信される。

もし英国の統計を取るノードが途中でダウンし、フランスのノードの後ろに下がった場合、
その段階での各ノード間の状態スナップショットに差分が出る可能性が出てくる。
#おそらく、国ごとの経路があって、途中で通った経路についてもページビューとして含めるモデルを想定している?
英国のカウント値は古いストリームからの受信分も反映するものの、
フランスの後に下がった後の値は一般的にフランスより小さくなるだろう。

いくつかのシステム(Borealis)等においてはノード間の同期を行うことで対処を行っているが、
StormやS4のようなシステムではこの影響を無視する。

D-Streamsを用いた場合、セマンティクスはクリアになる。
各インターバルでの出力結果はそれまでに受信した結果を全て反映したものになるためである。
もしアウトプットが複数ノードに分散していたとしても、
決定論的なバッチジョブの結果出力したものなので、ずれが発生することはない。

こういった事情があるため、Spark Streamingの分散状態管理は容易になり、
障害発生時に「厳密に1回しか処理しない」ということも達成することができる。

3.4 バッチ処理インタラクティブな処理の統合

D-Streamsはバッチ処理と同じデータモデル(RDDs)を用いることでバッチ処理と同様の障害復旧メカニズムを用いることができ、
2つの処理をシームレスに結合することも可能となっている。
Spark Streamingは上記の2処理の統合のための有効な要素をいくつか提供している。

1つ目として、D-Steamsはバッチ処理で生成された静的なRDDsと結合することが可能。
例えば、受信したメッセージを以前計算されたスパムフィルタと結合したり、過去データと比較することが可能。

2つ目として、ユーザはD-Streamsのプログラムを過去データを利用したバッチモードで実行することが可能。
このモードを使用することで、過去データとの容易なマージが可能になっている。

3つ目として、ユーザはD-Streamsに対するインタラクティブクエリを
動作中のプログラムに対してScalaコンソールを接続することで実行可能。
SparkはSclaインタプリタ改造版を保持しており、
RDDsに対してアドホッククエリを秒単位の遅延で実行することが可能な作りになっている。
Streamingプログラムに対してインタプリタで接続することでそのタイミングのスナップショットに対して
アドホックなクエリを実行することが可能になる。

例として、ユーザは以下のクエリを実行することで、指定した時間帯の人気ワードを取得することが可能。

counts.slice("21:00",  "21:05").topK(10)

開発者がオフライン処理(Hadoop等)とオンライン処理の両方の開発経験者より、
この機能が大きな価値を持つという議論に至っている。
同じコードベース内でバッチ処理とストリーム処理を同時にかけるというのは
別々のシステム上で開発して実行することに比べて開発時間を節約可能。

ストリーム処理システムにおいてインタラクティブにクエリを実行できるということはそれ以上に有用なポイントを持っている。
それは、実行中の処理のデータに対する解析が容易になるということ。
初めに想定されていないクエリを実行することも可能であるため、ストリームに後から新規のクエリを追加することも可能。

このインタラクティブな実行が出来ない場合、メモリ上に既に存在する情報であるにもかかわらず、
最終的な結果を確認できるのは数十分後という形になってしまう。

3.5 結果まとめ

D-Streams概要章の最後に「record-at-a-time」システムとの比較結果を下記の表にまとめておく。
最も大きな違いはバッチ処理を分割することで決定論的な塊のまま処理するか、イベントを1件ずつ処理するかの違いとなるだろう。
バッチ化することでバッチ処理を走らせるインターバルという待ち時間は発生するものの、有効な障害復旧が可能となる。
実際、「record-at-a-time」システムにおいても同期や一貫性確保、遅れデータへの対応のために
Spark Streamingの秒単位の遅延と同じように、数百ミリ秒単位の遅延が発生してくる。

=====
一貫性の章についてはさっぱりでしたが、それ以外は大体わかった・・・とは思います。
D-Streams自体が一定のインターバルごとに時間を区切って実行する概念のため、
レスポンスタイムについては既存のストリーム処理には敵わないという形で明記しているのは興味深かったです。
言うなれば、Sparkの実行単位を細かく区切ることで、バッチ処理のレスポンスタイムを高めた存在がSpark Streamingなわけで、
なのでスループットが他のストリーム処理プロダクトをしのぐというのも納得がいく話ではありますね。
その代わり、レスポンスタイムは数秒レベルでは発生してしまうという。