夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Twitter Heronの論文でのStormの問題点は?(詳細

こんにちは。

前回あーいう形でサマリ記事を読んだので、次は実際に論文を読んでみるしかないだろう・・
ということで、Twitter Heronの論文を読んでみました。
今回は前半の、Stormの問題点を記述した個所と、設計検討結果についてです。

Twitter Heron

ただ、全文そのまま訳しているのではなく、読みながらそれなりに意訳や切り捨ては入っています。

Abstract

Stormは長い間Twitterにおけるリアルタイム解析のメイン基盤だった。
ただ、Twitterでのリアルタイム処理データ量が多様性の増加やユースケースの追加に伴って増加した結果、Stormの制約も多く明らかになった。
Twitterには共有インフラ上で動作する、よりスケールし、デバッグしやすく、性能が高く、管理しやすい基盤が必要になった。

これらの事情を受けてTwitter内でいくつかのオプションを基に検討した結果、新たなリアルタイムストリーム処理を構築する必要があると結論付けた。
本論文ではHeronと呼ばれるこの新システムの設計と実装について説明している。
Heronは現在Twitterでのストリーム処理のデファクトとなっている。

実際にHeronを開発し、運用してから結果を基に本論文を記述している。
また、併せてHeronの効率性とスケーラビリティを実証した証拠も併せて示している。

1. INTRODUCTION

Twitterでは他の組織と同じようにリアルタイムストリーム処理に多くを頼っている。
例えば、アクティブユーザのカウントや、ツイートと広告の関連性の算出など。
何年かの間はTwitterではStormをリアルタイムストリーム処理基盤として使用していた。

だが、Twitterでは共有インフラ上で他のデータサービスとリソースを共有する点や、スケーラビリティ、デバッグの容易性、管理のしやすさなど、より求められるものが増えてきた。

特に、作業の生産性で大きな課題となったのはデバッグの容易性だった。
Topologyが誤動作した場合-原因はコードの誤り、ハードウェア障害、負荷変動など様々あるのですが―、Stormでは複数コンポーネントの動作が1プロセス中に集約されているため、問題の解析が困難だった。

そのため、Twitterでは論理的な計算リソースが明確に物理プロセスに対してマッピングされる機構が求められた。
このような明確なマッピングはビジネス的に重要なTopologyにおいてはエラーがどこから発生しているか即特定できるのは非常に重要になる。

また、StormではTopologyを実行するために専用のハードの割り当てを必要とする。
このような構成はクラスタとしてのリソースを利用する際には効率が悪く、オンデマンドなスケーラビリティに対しても悪影響が大きい。
Twitterではメジャーなクラスタスケジューリングソフトを使用して、リアルタイムストリーム処理に限らない、様々なデータサービスに対して柔軟にリソースを割り当てることが必要だった。
Twitterの内部的にはメインのクラスタスケジューラとしてはAurora Schedulerを使用していた。

Stormでは新しいTopologyをプロビジョニングするには実質的に新しい隔離されたマシンリソースが必要となり、逆にそのTopologyが必要なくなった場合、割り当てられたマシンリソースは再度割り当てが行われるまでは利用されない状態となる。
このような事情から、マシンのプロビジョニングを管理することが面倒になる。
更に、TwitterのスケールにおいてはStormのスケーラビリティとリソース効率の悪さからリソースを使う側の生産性が悪化することも明確になっており、その意味でも現状を改善する必要が出てきた。

ただ、Twitterでは既に多くのアプリケーションがStormを用いて記述されており、それらのアプリケーションを更新する必要があるような改修は取ることが出来なかった。
また、ストリーム処理とバッチ処理を融合するSummingBirdのAPIとの互換性も必要だった。

様々なオプションを検討調査した後、Twitterではこれまで記述した設計目標を満たすために新たなストリーム処理システムを設計する必要があると結論付けた。
この結果開発された新しいシステムはHeronと呼ばれている。
HeronではStormとAPIの互換性があるため、StormのユーザはHeronに容易に移行することが出来る。
Twitterでは既にStormは使用しておらず、Heronがストリーム処理の基本となっている。
Heronを利用することでStormと比してパフォーマンスが大幅に向上し、よりリソース消費が低減された上に、デバッグの容易性、スケーラビリティ、管理性で大きな利点を享受している。

2. Related Work

ストリームデータ処理システムへの研究は10年ほど前に始まり、最近スケーラブルなストリーム処理へのニーズがより高まったため、実際の製品やOSSとして多くのものが出てきた。
最近ではストリーム処理は企業活動に汎用的に必要になるという流れになり、従来のデータベース製品と統合されたものも出てきている。

3. Motivation for Heron

StormはTwitterにおいて何年かリアルタイム解析の基盤を提供していました。
ただ、Twitterの規模でStormを運用すると後述する制約があることも明確になっている。
この制約を脱するためにHeronを開発した。

3.1 Storm Background

StormTopologyはSpout/Boltのグラフとして示せる。
Spoutはデータのインプットを、BoltはStreamに対する処理を示す。
SpoutはしばしばKesterlやKafkaのようなキューからデータを取得し、Boltに対するStreamを生成する。Boltはそれらを取得し定義された処理を実行する。

例として、リアルタイムアクティブユーザ(RTAC)をカウントするTopologyは下図のようになる。
f:id:kimutansk:20150712085359j:plain
Spout/BoltはTaskとして実行され、これらのTaskはExecutor上にグルーピングされる。Executorもまたグルーピングされ、1Workerプロセス中に複数存在する。
Workerプロセスは1JVMプロセスとして下図のように実行される。
f:id:kimutansk:20150712085453j:plain
1ホストは複数のWorkerプロセスを実行するが、各Workerプロセスは異なるTopologyに所属することもある。

3.2 Storm Worker Architecture: Limitations

これまで示されたように、StormのWorkerプロセスは複雑な構成となっている。
複数のプロセスがHost上で実行される。
JVMプロセス内部では各Executorは2スレッドにマッピングされる。
これらのスレッドはJVMの優先度ベースのスケジューリングアルゴリズムで実行される。
各スレッドがいくつかのTaskを実行する必要があるため、Executorは受信データに基づいてどのTaskを実行するかを判断するための別のスケジューリングアルゴリズムを導入している。
このような複数のスケジューリングとその相互作用は多くの場合、Task実行中の不確実性につながる。

加えて、各Workerプロセスが複数のTaskを実行するという複雑性もある。
例えば、KafkaSpout、Twitterの内部サービスとデータの結合を行うBolt、KVSにデータを出力するBoltのような異なるBoltが同一JVM上で実行される可能性がある。
このようなシナリオにおいてはこれらのリソース使用量を分離することは不可能となるので、特定のタスクの性能について推測することが困難とな
結果、Topology全体の動作が不安定になった際のまず取るべき手段はTopologyの再起動、ということとなる。
再起動後、上手く動作しないTaskが発生した場合、問題が発生していること自体はわかるが、根本原因を突き止めるのは他のTaskと同時に動作している関係上困難となる。

複数のタスクのログが1ファイルに出力されているため、特定のタスクに関連付けられたエラーまたは例外を特定するのは困難となる。
いくつかのタスクが他のタスクに対してより多くログを出力するようなケースにおいてはより状況は悪化する。
また、あるタスクでハンドリングされなかった例外はそれによってWorkerをダウンさせるため、同一Worker上で影響を受けていないタスクまで影響を与えてしまう。
このように、Topologyの一部でエラーが発生した際に全体に影響を与える作りとなっている。
また、異なるタスクが発生させたGC関連の問題を追うのもまた困難。

リソース割り当てにおいて、Stormは全てのWorkerプロセスが均質であることを前提にしている。
このアーキテクチャの仮定の結果、しばしばリソースの非効率やオーバープロビジョニングが発生する。例えば、3Spoutと1Boltを2Workerプロセスにスケジューリングすることを検討してほしい。
Boltが10GB、Spoutが5GBのメモリを使用する場合、最大のパターンでは1Boltと1Spoutを処理する構成となるため、1Workerあたり15GBのメモリを確保する。
だが、実際必要になるメモリは25GBのため、全体で5GBの無駄が発生する。
この問題は、Workerが多様な構成要素を含むにつれて状況は悪化する。特に、SummingBirdのような高レベルの抽象化を利用して複雑なTopologyを生成する時に問題は顕著に表れる。

Workerに大きなメモリを割り振った結果、jstackまたはヒープダンプなどの一般的なプロファイリングツールの利用が面倒になる。
Workerはヒープダンプを取得している間HeartBeatが停止するため、HeartBeatがタイムアウトし、ヒープダンプの取得が終わらないというケースが発生する。
上記のようなデバッグにおいて、Workerが大きいケースは困難となる。

ここで出てくる質問は、どうすればStormを1タスク1Workerに再構成することが出来るか、というものとなる。
Twitterではこのオプションを検討したが、このアプローチはリソース使用量に大きな非効率を発生させ、かつ求める並列度も制限されることが問題となった。
このアプローチを取るとTopology毎のWorkerプロセスの数が増大する。
ただ、Storm自体はWorkerプロセス間は均質という前提があるため、齟齬が発生する。
このモデルでは、各Workerのためのメモリ量として下記の値を確保する必要がある。
f:id:kimutansk:20150712122526j:plain

この値は最適で理想的な利用状況よりはるかに大きくなることがある。
3Spout+1Boltの先ほどの例を参照すると、各Workerの合計必要メモリ量は40GBとなり、最低限必要な25GBを大きく上回る。
加えて、Workerプロセスは各タスクの並列度に依って増加し、個々のWorkerが通信のためにポートを必要とする上にWorker間が互いに接続する必要があるため、それによってスケーラビリティに課題が発生する。

StormのWorkerはタスクとWorker本体のデータのやり取りを行う複数のスレッドとキューを保持している。
各Workerプロセスのグローバル受信スレッドは「上流」のWorkerからデータを受信し、グローバル送信スレッドは「下流」のWorkerへのデータ送信を担当している。
これらのグローバルスレッドに加えて各ExecutorはTopologyに定義されたSpout/Boltを実行するユーザスレッド、および各グローバル送信スレッドにユーザスレッドの出力を渡すローカル送信スレッドで構成されている。
従って、Stormでは各TupleはWorker内に入ってから送信されるまで4つのスレッドを通過する必要がある。(グローバル受信/グローバル送信/ユーザスレッド/ローカル通信)
この設計は大きなオーバーヘッドとキュー競合の問題につながる。

3.3 Issues with the Storm Nimbus

Nimbusはスケジューリング、監視を含むいくつかの機能と、Jarの配布を行っている。
また、併せてシステムのメトリクスコンポーネントも提供しており、いくつかのTopologyのカウンタを管理している。
従って、Nimnusは機能的に多くのことを担当しており、ボトルネックとなりやすい構成になっている。

第一に、NimbusScheculerはWorkerのための細かいレベルでのリソース予約およびリソース隔離をサポートしていない。
そのため、同じマシン上で実行している別のTopologyに属するWorkerプロセス同士が互いに干渉することがある。
このような状況は追跡不可能なパフォーマンスの問題を引き起こすことがある。
この問題を軽減するために、我々は1マシン上では1Topologyのみを実行するという形で隔離し、対処を行った。
だが、Topologyは割り当てられたハードウェアリソースを使いきることは困難である上に、このアプローチはリソースの浪費につながる。
YARNでStormを実行した場合であってもこの問題は解決しきれない。

第二に、StormはHeartBeatでZookeeperを使用している。
1Topologyに多くのWorkerが所属している状態だとZookeeperがボトルネックとなり、TopologyあたりのWorker数、およびクラスタ内の合計Topology数のスケーラビリティに課題が生じる。
この問題に対処するためにTwitterでは暫定的にHeartBeatのDaemonを開発し、そちらにHeartBeatのトラフィックをバイパスするようにしていた。
しかし、その暫定対処はHostとHeartBeatDaemonを個別にモニタリングする必要があり、運用の負担を増大させた。

最後に、Nimbusが単一障害点となっている。
Nimbusがダウンすると新たなTopologyのデプロイや既存Topologyの終了が出来ず、障害が発生したTopologyの検出、復旧も出来なくなる問題があった。

3.4 Lack of Backpressure

StormはBackPressureの機構を保持していない。
受信側のタスクが受信データ/Tupleを処理できない場合、送信側は単にTupleを削除することになってしまう。
これはFail-Fast機構とシンプルな対応ではあるが、下記のような欠点がある。

  1. Ack無効時にSpoutから取得するTupleの数を制御できずに溢れるケースが発生する。
  2. 上流コンポーネントの処理結果が失われる。
  3. システムの動作が予測しにくくなる。

3.5 Efficiency

運用環境においてはTupleの失敗、Tupleの再実行、および実行の遅れにつながるようなTopologyの実行時に性能的に予測不可能になる要素が存在する。
(データの取得速度がTopologyの処理速度を上回る場合)
このような状況において、下記のような課題が存在する。

Tupleの再実行

Tupleツリーの末端で障害が発生した場合であってもTupleツリー全てが失敗として扱われてしまう。これはTask数が多かったり、複数のTupleに分割して処理するようなTopologyの場合大きな課題となる。

GC時間の増大

データ取得速度がTopologyの処理速度を上回る場合、Tupleが大量にプロセス内に蓄積されるため、大容量メモリを確保したWorkerプロセスでGCが発生した場合、分単位の時間がかかることもある。結果、HeartBeatが失われてプロセスがkillされる。

キューの競合

通信用のキューにおいて競合がしばしば発生する。特に、Workerプロセス中で複数のExecutorが動作している場合はより顕著に表れる。

これまで述べたパフォーマンスが予測しにくく、問題が発生することに対して、Twitterは多くの場合余剰のリソースを割り振って対処していた。
あるTopologyでは600コアを平均20~30%使用する、という効率が悪い状態になっていた。Stormの性能上の課題が無ければ、本来150コア程で運用できていたはず。
加えて、これと同様の処理を最適化した状況で実行した所、75Coreでも十分賄える状況であった。実際には600Coreで運用していたことから、8倍もの余分なリソースを使用してしまっている。

4. Design Alternatives

これまでの結果から、Twitterでは下記の3つのオプションが検討された。

  1. Stormを拡張する。
  2. 既存のOSSでニーズを満たすものを採用する。
  3. 新たなストリーム処理基盤を開発する。

前節で説明した問題を解消するためにはStormの基本を書き変える必要があるため、コアコンポーネントの大規模な書き換えを必要とした。
だが、概要レベルでStormはデータをキューの束の上に載せる構成となっており、この基本的なアーキテクチャのブロックを変更することは困難だった。
このように、根幹から更新する形で既存のシステムを変更する場合、柔軟性も失われるうえ、長い開発期間も想定される。

次のオプションはApache SamzaやSpark Streamingのような既存のOSSプロダクトを採用することだった。
しかし、これらのシステムはTwitterの規模に適合するには多くの問題があった。
また、これらのシステムはStormのAPIと互換性が無かった。
別のAPIを利用して既存のTopologyを書き変える場合、移行に長い時間がかかり、移行期間に問題が発生する可能性も高い。

また、既にSummingBirdなどStormのAPIの上に開発されている、異なるライブラリの存在もある。
Twitterではストリーム処理基盤のAPIを変更するということとはスタック内の他のコンポーネントも全て変更するということを意味する。

そのため、Twitterでは最適の選択は新たなストリーム処理基盤を開発する、という結論になった。

=====
Stormの問題について詳細を記述してみました。
実際、Stormにはこれらの問題があるわけでして、こういった問題点への課題認識はReactive Streamsや、Heronに確実に受け継がれていると思います。

ちなみに、Stormのコアを書き変えるのに躊躇した理由としてClojureで開発されているということと、リードエンジニアが既にいない、ということも少なからず影響しているようには思えます。
Clojureは処理の流れを把握するレベルで読んだり、小さなプログラムを書くだけならそう難しくないですし、実際私でも出来ます。
ただ、あれを使ってそれなりの規模を持つOSSを記述するとなると専門家がいないと中々厳しいと思います・・・

で、今回は前半ですが、次回は後半、Heronのアーキテクチャについて記述してみます。
Heronの次回投稿までには多分違う投稿も挟まると思いますが、まぁそれはそれで^^;