夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

StormからStormクラスタの情報を取得する方法(その2

こんにちは。

とりあえず、次のものに移るためにも下記の2点くらいは
まとめてしめましょうか^^;

どんな情報が取得できるか
取得できない情報は何か?


まずは情報が1かたまりとして大量に出ており、
結果なにがなにやらわからなかったため、1段階整形してみました。

コードは長くなってしまうので省略して、整形後の情報を出力します。
(コードについてはstorm-monitor参照)

整形すると、Stormクラスタの情報は下記のようになりました。
・・・ええ。まだ長すぎるんですが、概要はわかるのでここまでで。

ClusterSummary(supervisors:[SupervisorSummary(host:hyperion, uptime_secs:347574, num_workers:4, num_used_workers:3, supervisor_id:2f5998de-eaec-4470-8905-034aa1330148)], nimbus_uptime_secs:347587, topologies:[TopologySummary(id:Exclamation-1-1352702504, name:Exclamation, num_tasks:16, num_executors:16, num_workers:3, uptime_secs:347079, status:ACTIVE)])

■Topology Components:Exclamation-1-1352702504
Spout:
wordComponentCommon(inputs:{GlobalStreamId(componentId:__acker, streamId:__ack_fail)=<Grouping direct:NullStruct()>, GlobalStreamId(componentId:__acker, streamId:__ack_ack)=<Grouping direct:NullStruct()>}, streams:{__ack_init=StreamInfo(output_fields:[id, init-val, spout-task], direct:false), __system=StreamInfo(output_fields:[event], direct:false), default=StreamInfo(output_fields:[word], direct:false)}, parallelism_hint:10, json_conf:{"topology.tick.tuple.freq.secs":30,"topology.tasks":10})
__systemComponentCommon(inputs:{}, streams:{__tick=StreamInfo(output_fields:[rate_secs], direct:false)}, parallelism_hint:0, json_conf:{"topology.tasks":0})

Bolt:
__ackerComponentCommon(inputs:{GlobalStreamId(componentId:exclaim2, streamId:__ack_fail)=<Grouping fields:[id]>, GlobalStreamId(componentId:exclaim1, streamId:__ack_ack)=<Grouping fields:[id]>, GlobalStreamId(componentId:exclaim1, streamId:__ack_fail)=<Grouping fields:[id]>, GlobalStreamId(componentId:word, streamId:__ack_init)=<Grouping fields:[id]>, GlobalStreamId(componentId:exclaim2, streamId:__ack_ack)=<Grouping fields:[id]>}, streams:{__system=StreamInfo(output_fields:[event], direct:false), __ack_ack=StreamInfo(output_fields:[id], direct:true), __ack_fail=StreamInfo(output_fields:[id], direct:true)}, parallelism_hint:1, json_conf:{"topology.tasks":1,"topology.tick.tuple.freq.secs":30})
exclaim1ComponentCommon(inputs:{GlobalStreamId(componentId:word, streamId:default)=<Grouping shuffle:NullStruct()>}, streams:{__system=StreamInfo(output_fields:[event], direct:false), __ack_ack=StreamInfo(output_fields:[id, ack-val], direct:false), default=StreamInfo(output_fields:[word], direct:false), __ack_fail=StreamInfo(output_fields:[id], direct:false)}, parallelism_hint:3, json_conf:{"topology.tasks":3})
exclaim2ComponentCommon(inputs:{GlobalStreamId(componentId:exclaim1, streamId:default)=<Grouping shuffle:NullStruct()>}, streams:{__system=StreamInfo(output_fields:[event], direct:false), __ack_ack=StreamInfo(output_fields:[id, ack-val], direct:false), default=StreamInfo(output_fields:[word], direct:false), __ack_fail=StreamInfo(output_fields:[id], direct:false)}, parallelism_hint:2, json_conf:{"topology.tasks":2})

StateSpout:


■Topology Info:Exclamation-1-1352702504
ExecutorSummary:
ExecutorSummary(executor_info:ExecutorInfo(task_start:2, task_end:2), component_id:exclaim1, host:hyperion, port:6702, uptime_secs:347050, stats:ExecutorStats(emitted:{:all-time={default=11546540}, 600={default=19740}, 10800={default=348080}, 86400={default=2863140}}, transferred:{:all-time={default=11546540}, 600={default=19740}, 10800={default=348080}, 86400={default=2863140}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:word, streamId:default)=11546540}, 600={GlobalStreamId(componentId:word, streamId:default)=19760}, 10800={GlobalStreamId(componentId:word, streamId:default)=348100}, 86400={GlobalStreamId(componentId:word, streamId:default)=2863140}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.0514561071974808}, 600={GlobalStreamId(componentId:word, streamId:default)=0.06376518218623482}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.05188164320597529}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.05200583974238074}}, executed:{:all-time={GlobalStreamId(componentId:word, streamId:default)=11546540}, 600={GlobalStreamId(componentId:word, streamId:default)=19740}, 10800={GlobalStreamId(componentId:word, streamId:default)=348100}, 86400={GlobalStreamId(componentId:word, streamId:default)=2863140}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.06307690442331643}, 600={GlobalStreamId(componentId:word, streamId:default)=0.06585612968591692}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.0626831370295892}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.06247686106861697}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:3, task_end:3), component_id:exclaim1, host:hyperion, port:6701, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={__system=20, default=11546460}, 600={default=19700}, 10800={default=348040}, 86400={default=2863060}}, transferred:{:all-time={__system=0, default=11546460}, 600={default=19700}, 10800={default=348040}, 86400={default=2863060}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:word, streamId:default)=11546500}, 600={GlobalStreamId(componentId:word, streamId:default)=19700}, 10800={GlobalStreamId(componentId:word, streamId:default)=348060}, 86400={GlobalStreamId(componentId:word, streamId:default)=2863100}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.05408911791451955}, 600={GlobalStreamId(componentId:word, streamId:default)=0.05482233502538071}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.05292191001551457}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.05748314763717649}}, executed:{:all-time={GlobalStreamId(componentId:word, streamId:default)=11546480}, 600={GlobalStreamId(componentId:word, streamId:default)=19680}, 10800={GlobalStreamId(componentId:word, streamId:default)=348040}, 86400={GlobalStreamId(componentId:word, streamId:default)=2863080}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.06457379218601686}, 600={GlobalStreamId(componentId:word, streamId:default)=0.06605691056910569}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.06177450867716354}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.06697682216354416}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:4, task_end:4), component_id:exclaim1, host:hyperion, port:6703, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=11546540}, 600={default=19760}, 10800={default=348100}, 86400={default=2863140}}, transferred:{:all-time={default=11546540}, 600={default=19760}, 10800={default=348100}, 86400={default=2863140}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:word, streamId:default)=11546520}, 600={GlobalStreamId(componentId:word, streamId:default)=19720}, 10800={GlobalStreamId(componentId:word, streamId:default)=348080}, 86400={GlobalStreamId(componentId:word, streamId:default)=2863100}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.05303069669476171}, 600={GlobalStreamId(componentId:word, streamId:default)=0.06490872210953347}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.05119512755688348}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.05237679438370996}}, executed:{:all-time={GlobalStreamId(componentId:word, streamId:default)=11546520}, 600={GlobalStreamId(componentId:word, streamId:default)=19740}, 10800={GlobalStreamId(componentId:word, streamId:default)=348080}, 86400={GlobalStreamId(componentId:word, streamId:default)=2863100}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.0658882503126483}, 600={GlobalStreamId(componentId:word, streamId:default)=0.0547112462006079}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.07762583314180647}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.06771681045021131}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:5, task_end:5), component_id:exclaim2, host:hyperion, port:6702, uptime_secs:347050, stats:ExecutorStats(emitted:{:all-time={__system=20, default=17319780}, 600={default=29620}, 10800={default=522140}, 86400={default=4294700}}, transferred:{:all-time={__system=0, default=0}, 600={default=0}, 10800={default=0}, 86400={default=0}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=17319800}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=29600}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=522140}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=4294700}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01951754639199067}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02094594594594595}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01964990232504692}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01946119635830209}}, executed:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=17319820}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=29640}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=522160}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=4294720}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02833863169478667}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02024291497975709}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=0.03596598743680098}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02856530809924745}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:6, task_end:6), component_id:exclaim2, host:hyperion, port:6701, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=17319740}, 600={default=29560}, 10800={default=522080}, 86400={default=4294620}}, transferred:{:all-time={default=0}, 600={default=0}, 10800={default=0}, 86400={default=0}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=17319740}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=29560}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=522080}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=4294640}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01991600335801808}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01759133964817321}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01980539380937787}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02066762289737906}}, executed:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=17319740}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=29560}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=522080}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=4294640}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=0.029008518603628}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=0.03044654939106901}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=0.03769537235672694}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02975802395544213}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:7, task_end:7), component_id:word, host:hyperion, port:6703, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=3464200}, 600={default=5920}, 10800={default=104440}, 86400={default=858980}}, transferred:{:all-time={default=3464200}, 600={default=5920}, 10800={default=104440}, 86400={default=858980}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:8, task_end:8), component_id:word, host:hyperion, port:6702, uptime_secs:347050, stats:ExecutorStats(emitted:{:all-time={default=3463820}, 600={default=5920}, 10800={default=104420}, 86400={default=858900}}, transferred:{:all-time={default=3463820}, 600={default=5920}, 10800={default=104420}, 86400={default=858900}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:9, task_end:9), component_id:word, host:hyperion, port:6701, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=3463840}, 600={default=5900}, 10800={default=104400}, 86400={default=858900}}, transferred:{:all-time={default=3463840}, 600={default=5900}, 10800={default=104400}, 86400={default=858900}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:10, task_end:10), component_id:word, host:hyperion, port:6703, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=3464160}, 600={default=5920}, 10800={default=104420}, 86400={default=858980}}, transferred:{:all-time={default=3464160}, 600={default=5920}, 10800={default=104420}, 86400={default=858980}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:11, task_end:11), component_id:word, host:hyperion, port:6702, uptime_secs:347050, stats:ExecutorStats(emitted:{:all-time={default=3463780}, 600={default=5920}, 10800={default=104420}, 86400={default=858880}}, transferred:{:all-time={default=3463780}, 600={default=5920}, 10800={default=104420}, 86400={default=858880}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:12, task_end:12), component_id:word, host:hyperion, port:6701, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=3463840}, 600={default=5920}, 10800={default=104420}, 86400={default=858920}}, transferred:{:all-time={default=3463840}, 600={default=5920}, 10800={default=104420}, 86400={default=858920}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:13, task_end:13), component_id:word, host:hyperion, port:6703, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=3464160}, 600={default=5920}, 10800={default=104420}, 86400={default=858980}}, transferred:{:all-time={default=3464160}, 600={default=5920}, 10800={default=104420}, 86400={default=858980}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:14, task_end:14), component_id:word, host:hyperion, port:6702, uptime_secs:347050, stats:ExecutorStats(emitted:{:all-time={default=3463800}, 600={default=5920}, 10800={default=104440}, 86400={default=858900}}, transferred:{:all-time={default=3463800}, 600={default=5920}, 10800={default=104440}, 86400={default=858900}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:15, task_end:15), component_id:word, host:hyperion, port:6701, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=3463780}, 600={default=5900}, 10800={default=104400}, 86400={default=858880}}, transferred:{:all-time={default=3463780}, 600={default=5900}, 10800={default=104400}, 86400={default=858880}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:16, task_end:16), component_id:word, host:hyperion, port:6703, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={default=3464160}, 600={default=5920}, 10800={default=104440}, 86400={default=858980}}, transferred:{:all-time={default=3464160}, 600={default=5920}, 10800={default=104440}, 86400={default=858980}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>))
ExecutorSummary(executor_info:ExecutorInfo(task_start:1, task_end:1), component_id:__acker, host:hyperion, port:6703, uptime_secs:347052, stats:ExecutorStats(emitted:{:all-time={}, 600={}, 10800={}, 86400={}}, transferred:{:all-time={}, 600={}, 10800={}, 86400={}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}}, executed:{:all-time={GlobalStreamId(componentId:__system, streamId:__tick)=11580}, 600={GlobalStreamId(componentId:__system, streamId:__tick)=40}, 10800={GlobalStreamId(componentId:__system, streamId:__tick)=360}, 86400={GlobalStreamId(componentId:__system, streamId:__tick)=2880}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:__system, streamId:__tick)=0.05008635578583765}, 600={GlobalStreamId(componentId:__system, streamId:__tick)=0.0}, 10800={GlobalStreamId(componentId:__system, streamId:__tick)=0.0}, 86400={GlobalStreamId(componentId:__system, streamId:__tick)=0.0763888888888889}})>))

ErrorInfo:
word[]
__acker[]
exclaim1[]
exclaim2[]

上記の結果より、
StormTopology(TopologyComponents)からは下記の情報が取得できることがわかります。

StormTopologyに関する静的な情報
  • Spout/Boltの名称
  • Spout/Boltのインプットとアウトプット、その際のグルーピング定義
  • Spout/Boltの間を接続するStreamの定義


TopologyInfoからわかるのは下記ですね。

StormTopologyに関する動的な情報
  • 各Task(Executor単位でまとめて存在)がどのプロセスに配置されているか?
  • 処理したTupleの数(全体、10分、3時間、1日)
  • Tuple処理にかかった時間(全体、10分、3時間、1日)
  • 各Taskで発生した例外情報


上記の結果と「以前Zookeeperの中身をのぞいた結果」を比較してみると、
HeartBeatの情報だけが完全に落ちていることがわかります。
#SupervisorやWorkerの情報もまとまっては取得できないのですが、
#取得情報をマージするとわかるので、とりあえず取れている扱いとします。

つまりは各Worker、Taskが正常に動いているかどうかについてはZookeeperをのぞいてみないと
最終的なところはわからないようですね。

とはいえ、元々このNimbusClient自体、Storm-UIに使用するために作成されたものでしょうから、
画面に表示しない情報についてはやはり取得できない・・・というある意味当たり前の結論ではありました。

では、次回以降はZookeeperの中身詳細を確認してみますね。