読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

TwitterでつぶやいたStormの雑多な情報まとめ(その6

こんにちは。

#stormjp のタグでStormの雑多な情報まとめその6です。
とりあえず、二桁いかない程度で終わらせる方向で。

55.Storm-UIでのNimbusの表示情報の大部分はZookeeperから取得している。
 (残りはNimbusのローカルファイル)
 そのため、ZookeeperがダウンするとNimbus、Storm-UIも表示できなくなる。

56.Stormにおいて、TCPコネクションはWorkerプロセス間でメッシュ状にはられる。
 Streamは上流Task→下流Taskに対してメッシュ状にはられる。
 そのため1つのTCPコネクション上に複数のStreamが存在するわけだが、
 Tupleの送信順序はTCPコネクション単位で制御される。
 そのため違うTCPコネクション上を通るTuple間同士では「追い越し」が発生しえる。
 尚、Workerプロセス間で通信を行う際にはWorkerプロセスとStreamを指定している模様。

57.TopologyのWorkerを特定のHostでのみ起動したり、特定のTaskを特定のHostでのみ起動するなどの
 調整機構は「Pluggable scheduler」で可能。
 ただしTopology起動後に適用する形になるため、起動時から設定が有効になるわけではない。
 また、Rebalanceを行うと「Pluggable scheduler」が有効ではない状態での配分となる。

58.NimbusはThriftのAPIを公開し、Storm-UIの大部分の情報はそこから取得している。
 Storm0.8.1まではNimbusとUIは同じサーバに同居することが前提だったため、
 UIが直接ローカルのファイルを読みに行く処理もあった。
 → Storm0.8.2でどうなったのか確認が必要。

59.SpoutのAck/Failメソッドの中ではブロッキングしないようにハンドリングすること。
 他のイベント全てに影響が出るケースがあるため。

60.Storm0.8系はStorm0.7系と比べてスループットが3倍以上となっている。
 理由はプロセス内のメモリ上のキューイングにLMAX Disruptorを使用したため。
 → 何をもって3倍というかは微妙。

61.処理に時間がかかったケースにおいてWorkerプロセスが異常と判断して再起動されるのは
 open/prepareメソッドの中でブロックしたケースや、Zookeeperに長時間アクセスできなかったケース。
 個々のTupleのnextTuple/execute/ack/failメソッドに時間がかかっても再起動はされない。(タイムアウトは発生するが・・・)

62.Stormにおいて、Ack/Fail機構を用いたメッセージ再送処理を搭載している場合、
 どこからのBoltがボトルネックになり始めると下記の流れに陥り、
 処理できないイベントでプロセスやネイティブ領域があふれる。
 そのため、メッセージ再送機構を用いる場合はStorm-UI等でボトルネックが発生していないか確認すること。
 1.ボトルネックとなるBoltAが発生する
 2.Tuple1のタイムアウトが発生し、Spoutから同じTuple1'が再送される
 3.BoltAは元々ボトルネックなのだが、Tuple1、Tuple1'と処理するメッセージが本来よりも増える
   Tuple1'に対してもタイムアウトが発生するとTuple1''が再送されるため、
   同じ内容のメッセージが複数Topology上に存在することになる
 4.タイムアウトが発生し続け、再送のTupleが増え続ける・・・

63.StormはTwitter内ではMesos上で動作し、解析結果の算出に使用されている。

64.1つのZookeeperクラスタ上でやろうと思えば複数のStormクラスタを動作させることも可能。
  「storm.zookeeper.root」にそのStormのルートパスが記述されているため、
  「storm.zookeeper.root」=StormAと記述されたクラスタAと、
  「storm.zookeeper.root」=StormBと記述されたクラスタBを用意すればいい。
  ・・・でも多分普通やらない。

65.Taskの並列度の設定優先度は、下記の順で決まる。
 要は、影響する範囲が大きいものほど優先度が高いということ。
  1.using the component getComponentConfiguration with the "topology.max.task.parallelism" attribute or "setMaxTaskParallelism" method
  2.using the declarer addConfiguration(s) methods with the "topology.max.task.parallelism" attribute or "setMaxTaskParallelism" method at topology creation
  3.using the setSpout/setBold parallelism_hint parameter

66.Storm0.8.0の性能参考情報
 Amazon EC2上でc1.xlargeインスタンス(ハイ CPU エクストララージ インスタンス)を用いて検証。
 1メッセージ10バイトのTupleの場合秒間1ノードあたり60万Tuple/秒の処理性能(CPUネック)
 1メッセージ100バイトのTupleの場合秒間1ノードあたり32万Tuple/秒の処理性能(ネットワークネック)
 ただし、この場合Ack/Fail機構は使用しているのかどうかは不明。

67.Boltで一度emitしたTupleは以後更新してはいけない。
 理由は、Storm0.7.2以降行われている同一Worker内の場合シリアライズ処理などを省略して性能を向上させる対応のため。
 そのため、BoltでemitするTupleはexecuteメソッドからの呼び出し時の内部のみで処理を完了させること。
 何かの集計などを行う場合も、Tupleから値を取得して集計するか、シリアライズしてファイルにはくなどの対応が必要。

68.ExecutorとTaskの関係。ExecutorはTaskを実行するためのスレッド。
 1Taskにつき最大1個まで割り振ることができる。
 このように切り分けられた経緯として、Taskの数はConsistent Hashing等に用いられるため
 Topology起動中には変更できないという事情がある。
 そのため実行リソースであるExecutorを切り分け、そちらを増やせるようにした。
 ・・・とはいえ、出来るのは「Taskに対してExecutorを少なめに割り振った縮退状態からExecutorを普通に割り振った状態にする」ということのみ。
 微妙といえば微妙。

69.Stormで何かしらのリソースリークが発生した場合、真っ先に疑うのは「Spoutからの読み取り過ぎ」。
 非同期で動作するため、Boltで処理しきれない量のTupleを読み込めてしまう。
 そのため、max_spout_pendingの設定値を設定するか、Spoutのスレッドをすくなめに生成すること。

70.Stormにはtask.hooksという形でTupleの送受信など特定のタイミングでフックしてメソッドを呼び出す機構がある。
 #一度試そうとして失敗したので動作確認はできていません・・・・

さて、今回はここまで。
3桁、行かずに終わるといいんですがw

積み残しAI:
メッセージIDとAck/Failの概念については別途まとめる
さらなるまとめサイトの必要性について
実際の実装を添付してまとめる