NimbusからStormクラスタの情報を取得する方法(その1
こんにちは。
前回マシンの性能を計測していたような気もしますが、
とりあえずサーバマシン選定中でいまだ手に入っていないので今回もStormネタのようです。
少し前にStormがZookeeperに保存している情報を確認したのですが、
それを何か楽に取る方法はないかなぁ、という形で確認を行いました。
・・・いえ、苦労してZookeeperから情報を取得しても一見あるように見えるだけで
実はたいした情報ありませんでした、では悲しいですので。
Stormにおいて、Nimbusは基本的に情報を持っていません。
そのため、Nimbusから取得できる情報はZookeeperも必ず持っているはずだ、
ということでまずはNimbusを通して取得する方法を確認してみます。
で、StormのAPIを確認してみたのですが、
Stormには「NimbusClient」というクラスがあり、そこからクラスタの情報が取得できることがわかりました。
なので、まずはそもそも情報取れるの?を確認してみます。
そんなわけで、作成したコードは下記です。
コード自体はstorm-monitorから取得できますので参考にしたい方はどうぞ。
・・・mavenかgrails、sbtあたりの対応も今度しますね。
/** * @param args * @throws TException * @throws NotAliveException */ public static void main(String[] args) throws TException, NotAliveException { Config conf = new Config(); conf.put(Config.NIMBUS_HOST, "192.168.100.203"); NimbusClient nimbusClient = new NimbusClient("192.168.100.203"); Nimbus.Client thriftClient = nimbusClient.getClient(); // Stormクラスタのサマリ情報を取得 ClusterSummary clusterInfo = thriftClient.getClusterInfo(); System.out.println(clusterInfo); // Stormクラスタに含まれるTopology一覧を取得 List<TopologySummary> topologies = clusterInfo.get_topologies(); for (TopologySummary topology : topologies) { String topologyId = topology.get_id(); // Topologyのサマリ情報を取得 System.out.println(thriftClient.getTopology(topologyId)); // Topologyの構成情報を取得 System.out.println(thriftClient.getTopologyInfo(topologyId)); } }
で、これを下記の状態のクラスタに対して実行してみました。
Topology画面
すると、結果は下記のようになりました。
・・・って、長いって!(汗
ClusterSummary(supervisors:[SupervisorSummary(host:hyperion, uptime_secs:84991, num_workers:4, num_used_workers:3, supervisor_id:2f5998de-eaec-4470-8905-034aa1330148)], nimbus_uptime_secs:85004, topologies:[TopologySummary(id:Exclamation-1-1352702504, name:Exclamation, num_tasks:16, num_executors:16, num_workers:3, uptime_secs:84496, status:ACTIVE)]) StormTopology(spouts:{word=SpoutSpec(spout_object:<ComponentObject serialized_java:...>, common:ComponentCommon(inputs:{GlobalStreamId(componentId:__acker, streamId:__ack_fail)=<Grouping direct:NullStruct()>, GlobalStreamId(componentId:__acker, streamId:__ack_ack)=<Grouping direct:NullStruct()>}, streams:{__ack_init=StreamInfo(output_fields:[id, init-val, spout-task], direct:false), __system=StreamInfo(output_fields:[event], direct:false), default=StreamInfo(output_fields:[word], direct:false)}, parallelism_hint:10, json_conf:{"topology.tick.tuple.freq.secs":30,"topology.tasks":10})), __system=SpoutSpec(spout_object:<ComponentObject serialized_java:...>, common:ComponentCommon(inputs:{}, streams:{__tick=StreamInfo(output_fields:[rate_secs], direct:false)}, parallelism_hint:0, json_conf:{"topology.tasks":0}))}, bolts:{__acker=Bolt(bolt_object:<ComponentObject serialized_java:...>, common:ComponentCommon(inputs:{GlobalStreamId(componentId:exclaim2, streamId:__ack_fail)=<Grouping fields:[id]>, GlobalStreamId(componentId:exclaim1, streamId:__ack_ack)=<Grouping fields:[id]>, GlobalStreamId(componentId:exclaim1, streamId:__ack_fail)=<Grouping fields:[id]>, GlobalStreamId(componentId:word, streamId:__ack_init)=<Grouping fields:[id]>, GlobalStreamId(componentId:exclaim2, streamId:__ack_ack)=<Grouping fields:[id]>}, streams:{__system=StreamInfo(output_fields:[event], direct:false), __ack_ack=StreamInfo(output_fields:[id], direct:true), __ack_fail=StreamInfo(output_fields:[id], direct:true)}, parallelism_hint:1, json_conf:{"topology.tasks":1,"topology.tick.tuple.freq.secs":30})), exclaim1=Bolt(bolt_object:<ComponentObject serialized_java:...>, common:ComponentCommon(inputs:{GlobalStreamId(componentId:word, streamId:default)=<Grouping shuffle:NullStruct()>}, streams:{__system=StreamInfo(output_fields:[event], direct:false), __ack_ack=StreamInfo(output_fields:[id, ack-val], direct:false), default=StreamInfo(output_fields:[word], direct:false), __ack_fail=StreamInfo(output_fields:[id], direct:false)}, parallelism_hint:3, json_conf:{"topology.tasks":3})), exclaim2=Bolt(bolt_object:<ComponentObject serialized_java:...>, common:ComponentCommon(inputs:{GlobalStreamId(componentId:exclaim1, streamId:default)=<Grouping shuffle:NullStruct()>}, streams:{__system=StreamInfo(output_fields:[event], direct:false), __ack_ack=StreamInfo(output_fields:[id, ack-val], direct:false), default=StreamInfo(output_fields:[word], direct:false), __ack_fail=StreamInfo(output_fields:[id], direct:false)}, parallelism_hint:2, json_conf:{"topology.tasks":2}))}, state_spouts:{}) TopologyInfo(id:Exclamation-1-1352702504, name:Exclamation, uptime_secs:84496, executors:[ExecutorSummary(executor_info:ExecutorInfo(task_start:2, task_end:2), component_id:exclaim1, host:hyperion, port:6702, uptime_secs:84465, stats:ExecutorStats(emitted:{:all-time={default=2810860}, 600={default=19880}, 10800={default=343300}, 86400={default=2751440}}, transferred:{:all-time={default=2810860}, 600={default=19880}, 10800={default=343300}, 86400={default=2751440}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:word, streamId:default)=2810860}, 600={GlobalStreamId(componentId:word, streamId:default)=19900}, 10800={GlobalStreamId(componentId:word, streamId:default)=343300}, 86400={GlobalStreamId(componentId:word, streamId:default)=2751440}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.0507673808016052}, 600={GlobalStreamId(componentId:word, streamId:default)=0.03618090452261307}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.0481794348965919}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.05043904282848254}}, executed:{:all-time={GlobalStreamId(componentId:word, streamId:default)=2810840}, 600={GlobalStreamId(componentId:word, streamId:default)=19880}, 10800={GlobalStreamId(componentId:word, streamId:default)=343280}, 86400={GlobalStreamId(componentId:word, streamId:default)=2751420}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.06208820139175478}, 600={GlobalStreamId(componentId:word, streamId:default)=0.06941649899396378}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.06490328594733162}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.06157547738985687}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:3, task_end:3), component_id:exclaim1, host:hyperion, port:6701, uptime_secs:84468, stats:ExecutorStats(emitted:{:all-time={__system=20, default=2810840}, 600={default=19900}, 10800={default=343300}, 86400={default=2751440}}, transferred:{:all-time={__system=0, default=2810840}, 600={default=19900}, 10800={default=343300}, 86400={default=2751440}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:word, streamId:default)=2810860}, 600={GlobalStreamId(componentId:word, streamId:default)=19900}, 10800={GlobalStreamId(componentId:word, streamId:default)=343300}, 86400={GlobalStreamId(componentId:word, streamId:default)=2751440}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.05162832727350348}, 600={GlobalStreamId(componentId:word, streamId:default)=0.05829145728643216}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.05080104864549956}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.05134038903265199}}, executed:{:all-time={GlobalStreamId(componentId:word, streamId:default)=2810840}, 600={GlobalStreamId(componentId:word, streamId:default)=19880}, 10800={GlobalStreamId(componentId:word, streamId:default)=343260}, 86400={GlobalStreamId(componentId:word, streamId:default)=2751420}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.06303453771826216}, 600={GlobalStreamId(componentId:word, streamId:default)=0.05231388329979879}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.0651401270174212}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.06293477549774298}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:4, task_end:4), component_id:exclaim1, host:hyperion, port:6703, uptime_secs:84467, stats:ExecutorStats(emitted:{:all-time={default=2810880}, 600={default=19920}, 10800={default=343300}, 86400={default=2751460}}, transferred:{:all-time={default=2810880}, 600={default=19920}, 10800={default=343300}, 86400={default=2751460}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:word, streamId:default)=2810880}, 600={GlobalStreamId(componentId:word, streamId:default)=19920}, 10800={GlobalStreamId(componentId:word, streamId:default)=343320}, 86400={GlobalStreamId(componentId:word, streamId:default)=2751460}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.05141450364298725}, 600={GlobalStreamId(componentId:word, streamId:default)=0.04718875502008032}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.05388558778981708}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.0508820771517667}}, executed:{:all-time={GlobalStreamId(componentId:word, streamId:default)=2810860}, 600={GlobalStreamId(componentId:word, streamId:default)=19900}, 10800={GlobalStreamId(componentId:word, streamId:default)=343280}, 86400={GlobalStreamId(componentId:word, streamId:default)=2751440}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:word, streamId:default)=0.06285620770867278}, 600={GlobalStreamId(componentId:word, streamId:default)=0.06532663316582915}, 10800={GlobalStreamId(componentId:word, streamId:default)=0.0636215334420881}, 86400={GlobalStreamId(componentId:word, streamId:default)=0.06211292995667723}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:5, task_end:5), component_id:exclaim2, host:hyperion, port:6702, uptime_secs:84465, stats:ExecutorStats(emitted:{:all-time={__system=20, default=4216260}, 600={default=29840}, 10800={default=514920}, 86400={default=4127140}}, transferred:{:all-time={__system=0, default=0}, 600={default=0}, 10800={default=0}, 86400={default=0}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=4216280}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=29840}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=514940}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=4127140}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01905471173641219}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01474530831099196}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=0.0214005515205655}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01889928618849857}}, executed:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=4216280}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=29840}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=514940}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=4127140}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02736535524206172}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=0.0274798927613941}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02788674408668971}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02722951002389064}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:6, task_end:6), component_id:exclaim2, host:hyperion, port:6701, uptime_secs:84468, stats:ExecutorStats(emitted:{:all-time={default=4216280}, 600={default=29840}, 10800={default=514920}, 86400={default=4127140}}, transferred:{:all-time={default=0}, 600={default=0}, 10800={default=0}, 86400={default=0}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=4216300}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=29860}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=514940}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=4127180}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01904513435950952}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02143335565974548}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01891482502815862}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=0.01887487340023939}}, executed:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=4216280}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=29840}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=514940}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=4127140}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02859392639957498}, 600={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02815013404825737}, 10800={GlobalStreamId(componentId:exclaim1, streamId:default)=0.03872295801452596}, 86400={GlobalStreamId(componentId:exclaim1, streamId:default)=0.02842161884501132}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:7, task_end:7), component_id:word, host:hyperion, port:6703, uptime_secs:84467, stats:ExecutorStats(emitted:{:all-time={default=843320}, 600={default=5980}, 10800={default=103020}, 86400={default=825480}}, transferred:{:all-time={default=843320}, 600={default=5980}, 10800={default=103020}, 86400={default=825480}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:8, task_end:8), component_id:word, host:hyperion, port:6702, uptime_secs:84465, stats:ExecutorStats(emitted:{:all-time={default=843220}, 600={default=5960}, 10800={default=102980}, 86400={default=825420}}, transferred:{:all-time={default=843220}, 600={default=5960}, 10800={default=102980}, 86400={default=825420}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:9, task_end:9), component_id:word, host:hyperion, port:6701, uptime_secs:84468, stats:ExecutorStats(emitted:{:all-time={default=843240}, 600={default=5960}, 10800={default=102980}, 86400={default=825400}}, transferred:{:all-time={default=843240}, 600={default=5960}, 10800={default=102980}, 86400={default=825400}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:10, task_end:10), component_id:word, host:hyperion, port:6703, uptime_secs:84467, stats:ExecutorStats(emitted:{:all-time={default=843320}, 600={default=5980}, 10800={default=103020}, 86400={default=825480}}, transferred:{:all-time={default=843320}, 600={default=5980}, 10800={default=103020}, 86400={default=825480}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:11, task_end:11), component_id:word, host:hyperion, port:6702, uptime_secs:84465, stats:ExecutorStats(emitted:{:all-time={default=843220}, 600={default=5960}, 10800={default=102980}, 86400={default=825420}}, transferred:{:all-time={default=843220}, 600={default=5960}, 10800={default=102980}, 86400={default=825420}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:12, task_end:12), component_id:word, host:hyperion, port:6701, uptime_secs:84468, stats:ExecutorStats(emitted:{:all-time={default=843240}, 600={default=5980}, 10800={default=102960}, 86400={default=825400}}, transferred:{:all-time={default=843240}, 600={default=5980}, 10800={default=102960}, 86400={default=825400}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:13, task_end:13), component_id:word, host:hyperion, port:6703, uptime_secs:84467, stats:ExecutorStats(emitted:{:all-time={default=843320}, 600={default=5980}, 10800={default=103020}, 86400={default=825500}}, transferred:{:all-time={default=843320}, 600={default=5980}, 10800={default=103020}, 86400={default=825500}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:14, task_end:14), component_id:word, host:hyperion, port:6702, uptime_secs:84465, stats:ExecutorStats(emitted:{:all-time={default=843200}, 600={default=5960}, 10800={default=102960}, 86400={default=825400}}, transferred:{:all-time={default=843200}, 600={default=5960}, 10800={default=102960}, 86400={default=825400}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:15, task_end:15), component_id:word, host:hyperion, port:6701, uptime_secs:84468, stats:ExecutorStats(emitted:{:all-time={default=843220}, 600={default=5960}, 10800={default=102980}, 86400={default=825380}}, transferred:{:all-time={default=843220}, 600={default=5960}, 10800={default=102980}, 86400={default=825380}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:16, task_end:16), component_id:word, host:hyperion, port:6703, uptime_secs:84467, stats:ExecutorStats(emitted:{:all-time={default=843320}, 600={default=5980}, 10800={default=103020}, 86400={default=825480}}, transferred:{:all-time={default=843320}, 600={default=5980}, 10800={default=103020}, 86400={default=825480}}, specific:<ExecutorSpecificStats spout:SpoutStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, complete_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}})>)), ExecutorSummary(executor_info:ExecutorInfo(task_start:1, task_end:1), component_id:__acker, host:hyperion, port:6703, uptime_secs:84467, stats:ExecutorStats(emitted:{:all-time={}, 600={}, 10800={}, 86400={}}, transferred:{:all-time={}, 600={}, 10800={}, 86400={}}, specific:<ExecutorSpecificStats bolt:BoltStats(acked:{:all-time={}, 600={}, 10800={}, 86400={}}, failed:{:all-time={}, 600={}, 10800={}, 86400={}}, process_ms_avg:{:all-time={}, 600={}, 10800={}, 86400={}}, executed:{:all-time={GlobalStreamId(componentId:__system, streamId:__tick)=2820}, 600={GlobalStreamId(componentId:__system, streamId:__tick)=20}, 10800={GlobalStreamId(componentId:__system, streamId:__tick)=340}, 86400={GlobalStreamId(componentId:__system, streamId:__tick)=2760}}, execute_ms_avg:{:all-time={GlobalStreamId(componentId:__system, streamId:__tick)=0.03546099290780142}, 600={GlobalStreamId(componentId:__system, streamId:__tick)=0.0}, 10800={GlobalStreamId(componentId:__system, streamId:__tick)=0.0}, 86400={GlobalStreamId(componentId:__system, streamId:__tick)=0.03623188405797101}})>))], status:ACTIVE, errors:{word=[], __acker=[], exclaim1=[], exclaim2=[]})
ただ、一つ言えることはNimbusを通してSpout/Bolt単位の構成、統計情報、あとはついでに起動順/終了順まで含めてとれるということですね。
あとはホストとport、StreamID等も出力しているため、どのプロセスにどのSpout/Boltがいるかもわかるということです。
後は欲を言えばword > exclaim1 > exclaim2という流れも取得できたらよかったんですが、
それはもう少し情報を読み込んでみないとわからないようですね。
ともあれ、これでNimbusが死んだ場合であっても最低限これだけはZookeeperに情報が蓄えられていることはわかりました。
あと何回か使って取得できる情報を読み込んでみることにします。