夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Spark Streaming=大規模準リアルタイムストリーム処理?

こんにちは。

Sparkについて調べてみよう企画第2段(?)です。
1回目はまずSparkとは何かの概要資料を確認してみました。
その先はRDDの構造を説明している論文と、後Spark Streamingというストリーム処理基盤の資料がありました。

とりあえず、そんなわけで(?)お手軽に概要がわかりそうなSpark Streamingの方を調べてみました。

まず見てみた資料は「Overview of Spark Streaming」(http://spark.incubator.apache.org/talks/strata_spark_streaming.pdf)です。
というわけで、読んだ結果をまとめてみます。

Spark Streamingとは何か?

・100オーダーのノードにスケールする
・秒単位のレイテンシで処理を実行可能
・Sparkのバッチやインタラクティブプロセッシングと統合可能(バッチ=ノーマルなSpark?)
・シンプルなバッチのようなAPIで複雑なアルゴリズムの実装が可能
Apache Kafka、Hadoop Flume、ZeroMQ等のライブストリームからデータを取得して処理可能

開発した動機
  • 多くの重要なアプリケーションが大規模ストリーム処理とリアルタイムに近い低遅延の結果出力を必要としているため

ソーシャルネットワークのトレンド
・ウェブサイト統計解析
・不正侵入検知・・・等
これらのアプリケーションは大規模クラスタでワークロードを分散することと、リアルタイムに近い遅延での処理が求められる。

これらの要件を満たすフレームワークに求められるもの

このような複雑なストリーム処理アプリケーションに求められるものは何か?
そもそもこのようなフレームワークに対して何が求められるのか?

フレームワークの要件
  • 大規模クラスタに対応するスケーラビリティ
  • 秒単位の遅延
  • シンプルなプログラミングモデル
  • これまでのバッチ/インタラクティブな処理との統合(下記のケーススタディよりこれまでのSparkと互換性が無くなれば扱いにくいため)
  • 有効な耐障害性(Stormとの比較より、状態を保持したストリーム処理を可能とするため)
ケーススタディ:Conviva, Inc.

・HBO, ESPN, ABC, SyFy等

  • 2つの異なる処理スタックが存在

・カスタマイズ構築された分散ストリーム処理システム
 −1000以上のメトリクスを百万単位のビデオセッションに対して適用
 −数十ものノード上での処理
Hadoopバックエンドによるオフライン解析
 −日ごと、月ごとのレポート生成
 −ストリーミングシステムと似た処理を実施

ケーススタディXYZ, Inc. (いろんな会社で、という意味)

・ライブストリーム処理を実行したいという会社は問題を抱えている
・「新たな機能を実装する度にバッチとストリーム処理両方の労力が必要」
・「当然、倍のバグに対処する必要がある」
・「倍の悩みが生じる」

対処:ステートフルストリーム処理
  • 伝統的なストリーム処理システムは1時間ポイントあたり1レコードのイベント駆動処理モデル

・各ノードが更新可能な状態値を保持
・各々のレコードに対して状態を更新し、新たなレコードを送信
だが、この方式だとノードがダウンした際に状態が消えてしまう。
耐障害性確保のためには「ステートフルなストリーム処理」を実現する必要がある。

既存のストリーム処理システム
  • Storm

・レコードがノードで処理されなかった場合、再送
・各レコードを「少なくとも1回」処理
・ノード障害/タイムアウトの発生状況次第ではレコードは2回処理されるケースもある
・ノード障害で「ノードの保持する状態値」は消える

  • Storm-TridentにおいてTransactionalTopologyを用いた場合

・各レコードは1回のみ処理される
・Transaction化することで更新速度は落ちる
※(コメント)StormはTrident化しても状態を外部のソースに保持した上で秒単位のレイテンシで処理可能ではあります。なので現状の情報からするとレイテンシ上は変わらない。最後でまとめます。

これらの要件を満たすのがSpark Streaming。性質は以下。
Spark Streaming:離散ストリーム処理

非常に小さな、確定的なバッチの系列として処理することでストリームコンピューティングを実現
・ライブストリームをX秒ごとの細切れにする
・Sparkにおいてはこれらの細切れになったバッチをRDDsとして扱い、RDDに対するオペレーションで処理出来る
・最終的にこれらのRDDに対する処理結果はバッチの中で扱うことができる
・バッチのサイズを0.5秒より小さくおさえることができれば、遅延は1秒に収まる
・小規模バッチを大量に扱うことでストリーム処理を実現できれば、バッチ処理とストリーム処理を同一基盤上で扱える

アプリケーション例:Twitterからのハッシュタグ取得(フェーズ1)

Twitter Streaming APIから取得されるツイートを細切れにし、メモリ上にRDDとして保持(イミュータブル、分散化)

アプリケーション例:Twitterからのハッシュタグ取得(フェーズ2)

ツイートからハッシュタグを抽出し、RDDとして保持。各バッチに対してRDDは作成される。

アプリケーション例:Twitterからのハッシュタグ取得(フェーズ3)

抽出したハッシュタグをバッチごとにHDFSに保存。

Twitterからのハッシュタグ取得をJava実装で実現した場合の例
val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
val hashTags = tweets.flatMap (status => getTags(status))
hashTags.saveAsHadoopFiles("hdfs://...")
JavaDStream<Status> tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
JavaDstream<String> hashTags = tweets.flatMap(new Function<...> {  })
// 変換処理をFunction objectとして定義している。
hashTags.saveAsHadoopFiles("hdfs://...")
耐障害性

RDDは耐障害性を持つ元データからの処理のシーケンスを記録している
・インプットデータのバッチは複数のワーカーノードにレプリケーションされ、耐障害性を持つ
・ワーカー障害によってデータをロストした場合、再度元データから復旧される

Spark Streamingのキーとなる構成要素
  • DStream:ストリームデータを示すRDDsのシーケンス(Streaming用のRDD?)

TwitterHDFS、Kafka、Flume、ZeroMQ、Akka Actor、TCP sockets etc...

  • 変換処理:DStreamを別のDStreamに変換する処理

・標準的なRDD処理:map、countByValue、reduce、join etc...
・ステートフルな処理:window、countByValueAndWindow etc...

  • 出力処理:外部媒体へのデータ出力

・saveAsHadoopFiles:HDFSへの出力
・foreatch:個々のバッチ処理の結果毎に処理を行う

アプリケーション例2:ハッシュタグのカウント

ハッシュタグ一覧の数をカウント

アプリケーション例3:ラスト10分のハッシュタグカウント

window関数を用いることで、あるウィンドウ範囲の結果を集計することが可能

よりスマートなハッシュタグカウント

countByValueAndWindow関数を用いることで、ウィンドウ範囲の全てのバッチの結果を加算するのではなく、
ウィンドウ範囲から外れたバッチの処理結果を減算/ウィンドウ範囲に加わったバッチの処理結果を加算することで効率的に算出可能。

よりスマートなreduce処理

インクリメンタルなカウント処理は多数のreduce処理に一般化することができる
・"転置reduce"という演算が必要になる
尚、ハッシュタグカウントの処理は以下のようにreduceで表現することが可能。

hashTags.reduceByKeyAndWindow(_ + _, _ - _, Minutes(1), …)
その他興味深いオペレーション
  • 任意の状態を保持し、照合する結果を適用させ続けることが可能

・ユーザ別の状態を保持し、ユーザごとのツイートで状態を更新させる

tweets.updateStateByKey(tweet => updateMood(tweet))
  • DStreamの中で任意のRDD処理を実行させることが可能

・入力ツイートをスパムファイルとJoinし、フィルタリングをかける

 tweets.transform(tweetsRDD => {
		tweetsRDD.join(spamHDFSFile).filter(...)
})
Spark Streamingのパフォーマンス
  • 6GB/secのデータを100ノード上で1秒未満のレイテンシで処理可能

・100streamのデータを4コアのAWS EC2インスタンス100ノードで処理した場合

Storm/Apache S4との比較

Storm以上のスループットを実現できた
・Spark Streaming: 670k records/second/node
・Storm: 115k records/second/node
Apache S4: 7.5k records/second/node

※(コメント)実はこの結果は微妙で、Storm-Tridentではなく単なるStormとの比較になっているように見えます。
 Storm-Tridentはレイテンシは増えるもののスループットは増大するため、その辺どちらかを明記せずに比較してもあまり意味はない・・・

実アプリケーション事例:Conviva

・1〜2秒のレイテンシを実現
・数百万のビデオセッションを処理
クラスタサイズに応じてリニアにスケール

実アプリケーション事例:Mobile Millennium Project
  • GPS計測結果を基にオンライン機械学習を用いて局所的な移動時間を算出

GPS観測結果を基にしたマルコフ連鎖モンテカルロ法シミュレーション
・CPU負荷が高く、有用な計算速度を出すためには数十のマシンが必要
クラスタサイズに応じてリニアにスケール

目指すものは1スタックで様々な処理を実現すること

・SparkShell / PySparkを用いて問題解析のための柔軟なデータ検索を実現
スタンドアローン実行時と同じプログラムでプロダクションログの解析を可能とする
・Spark Streamingでコードを記述することで、実際のライブログストリーム上から問題を検出する
※この辺いまいちうまく訳せていません。というか通りません。

まとめ

Spark Streamingとはストリーム処理基盤で、以下のことを実現する。
・大規模クラスタ上でスケールする
・秒単位のレイテンシを達成する
・シンプルなプログラミングモデルで実現
バッチ処理インタラクティブな処理を統合
・状態を保持する処理において有用な耐障害性を保持


・・・・と、こんな感じでした。
データストリームを小さい複数のバッチに区切ることでストリーム処理とバッチ処理を同じ基盤上で動作させるというコンセプトは面白かったです。

後は、Storm(Storm-Trident)使いとして書いておかなければならないことが。
Sparkをこれまで見て思うことは、おそらく耐障害性、レイテンシはStorm-TridentもSparkもあまり差はないように見えます。
スループットについては前提条件が微妙なため、コメントはできない状態ですね。

ただ、Storm-Tridentと比べてSpark Streamingの方がプログラミングのモデルとしてわかりやすいのは確かだと思います。

なので、あとはApache MesosHadoop HDFS といったものも絡んできて構成要素が多い
Sparkの実行基盤をどう簡単に構築管理できるかが比較時の悩みどころになりますかね。

ともあれ、これでSpark、Spark Streamingの概要はわかりました。
後は・・・SparkとSpark Streamingの論文を1本ずつ位流し読みしてから実際の構築に入ってみようと思います。
また読んだらまとめますね。