Stormが使うZookeeperの中ってどうなっているの?(その2
こんにちは。
前回に引き続き、Zookeeperの中を(とりあえずわかる範囲で)見て行きます。
前回はStormのクラスタを起動した状態での確認だったので、
今回はTopologyをSubmitした状態での確認ですね。
1.TopologySubmit前の下準備
Stormの機能をフルに実行した状態での確認を行いたいため、
StormStarterプロジェクトのTestWordSpoutを改造して下記のWordSpoutにし、
Ack/Fail機構を有効にした状態にします。
public class WordSpout extends BaseRichSpout { /** * */ private static final long serialVersionUID = -3089902525505221271L; public static Logger LOG = Logger.getLogger(WordSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public WordSpout() { this(true); } public WordSpout(boolean isDistributed) { _isDistributed = isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } public void nextTuple() { Utils.sleep(100); final String[] words = new String[] { "nathan", "mike", "jackson", "golda", "bertels" }; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word), UUID.randomUUID().toString());★改造★ } public void ack(Object msgId) { LOG.info("[WordSpout] Ack " + msgId);★改造★ } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { if (!_isDistributed) { Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } } }
2.StormUIから見た状態
先ほどのTestWordSpoutを基にExclamationTopologyを作成し、StormクラスタにSubmitします。
StormUIから見た結果は下記のようになりました。
・・・ついでに、Ackerが実はStormUIからまだ確認できたことも知りました(汗
■Stormクラスタ
3.Zookeeperの状態
上記のStormクラスタの状態でZookeeperがどうなっているか確認します。
前回の結果、既にStormとそのサブディレクトリの存在まではわかっているため、
その中身を見て行きますね。
■supervisors
まずはSupervisorsディレクトリ配下を見てSupervisorのIDを確認します。
[zk: localhost:2181(CONNECTED) 18] ls /storm/supervisors [2f5998de-eaec-4470-8905-034aa1330148, 75292ffd-b16c-4f8a-907f-456305f293e7]
■workerbeats
[zk: localhost:2181(CONNECTED) 2] ls /storm/workerbeats [Exclamation-2-1348907510] // ★TopologyIDと同じオブジェクトが存在 [zk: localhost:2181(CONNECTED) 3] ls / /storm/workerbeats/Exclamation-2-1348907510 [2f5998de-eaec-4470-8905-034aa1330148-6700, 75292ffd-b16c-4f8a-907f-456305f293e7-6701, 2f5998de-eaec-4470-8905-034aa1330148-6701, 75292ffd-b16c-4f8a-907f-456305f293e7-6700] // ★SupervisorID-ポート名のペアが4つあるため、どうやら各Workerを示しているようです。 [zk: localhost:2181(CONNECTED) 5] ls /storm/workerbeats/Exclamation-2-1348907510/75292ffd-b16c-4f8a-907f-456305f293e7-6700 [] [zk: localhost:2181(CONNECTED) 6] get /storm/workerbeats/Exclamation-2-1348907510/75292ffd-b16c-4f8a-907f-456305f293e7-6700 ??srclojure.lang.PersistentArrayMap??==??L_metatLclojure/lang/IPersistentMap;[arrayt[Ljava/lang/Object;xrlojure.lang.APersistentMap?<w? sI_hashxp????pur[Ljava.lang.Object;??X?s)lxpsrclojure.lang.Keyword z%A(IhashL_strtLjava/lang/String;LsymtLclojure/lang/Symbol;xp???psrclojure.lang.Symbol?w? {??IhashL_metaq~L_strq~Lnameq~Lnsq~xp(?k?ptstorm-idq~ptExclamation-2-1348907510sq~7??psq~ ????pptexecutor-statspsq~????puq~srclojure.lang.PersistentVector???? N??IcntIshiftL_metaq~Lroott$Lclojure/lang/PersistentVector$Node;[tailq~xrclojure.lang.APersistentVector? TFa?#I_hashxp????psr"clojure.lang.PersistentVector$Node??$?.?? ?? ?pt time-secsq~ppsq~Pf?M cZxid = 0x23fc ctime = Sat Sep 29 17:32:22 JST 2012 mZxid = 0x3082 mtime = Sat Sep 29 18:03:28 JST 2012 pZxid = 0x23fc cversion = 0 dataVersion = 619 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5888 numChildren = 0
バイト文字列群は一部省略していますが、そのなかにSpout/Boltの名前や、
ProcessLatencyといった単語も含まれていました。
そのため、workerbeats上のオブジェクトに各Workerが処理したTupleの統計情報も保持されているようです。
ただ、メッセージIDのようなものは見えなかったため、
個々のメッセージについてはZookeeperでは管理していない模様。
各SpoutとAckerが保持しているようですね。
■errors
次にerrosディレクトリ配下を確認します。
[zk: localhost:2181(CONNECTED) 12] ls /storm/errors [Exclamation-2-1348907510] [zk: localhost:2181(CONNECTED) 13] ls /storm/errors/Exclamation-2-1348907510 [word, __acker, exclaim1, exclaim2] [zk: localhost:2181(CONNECTED) 16] get /storm/errors/Exclamation-2-1348907510/word cZxid = 0x23f1 ctime = Sat Sep 29 17:32:10 JST 2012 mZxid = 0x23f1 mtime = Sat Sep 29 17:32:10 JST 2012 pZxid = 0x23f1 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 1 numChildren = 0
これはエラーが発生していないため特にめぼしい情報はありませんでした。
■storms
次にstormsディレクトリ配下を確認します。
[zk: localhost:2181(CONNECTED) 21] ls /storm/storms [Exclamation-2-1348907510] [zk: localhost:2181(CONNECTED) 22] ls /storm/storms/Exclamation-2-1348907510 [] [zk: localhost:2181(CONNECTED) 23] get /storm/storms/Exclamation-2-1348907510 ??sr&backtype.storm.daemon.common.StormBase耕'? I?L__extmaptLjava/lang/Object;L__metaq~Lcomponent__GT_executorsq~Llaunch_time_secsq~L num_workersq~Lstatusq~L storm_nameq~xpppsrclojure.lang.PersistentArrayMap??==??L_metatLclojure/lang/IPersistentMap;[arrayt[Ljava/lang/Object;xrlojure.lang.APersistentMap?<w? sI_hashxp????pur[Ljava.lang.Object;??X?s)lxp texclaim2srjava.lang.Integer????8Ivaluexrjava.lang.Number??? ???xptexclaim1sq~ t__ackersq~ twordq~t__systemsq~ sq~ Pf??srjava.lang.Long;???#?Jvaluexq~ sq~????puq~srclojure.lang.Keyword z%A(IhashL_strtLjava/lang/String;LsymtLclojure/lang/Symbol;xpJ??t:typesrclojure.lang.Symbol?w?{??IhashL_metaq~L_strq~nameq~nsq~p??c=pttypeq~!psq~??psq~??e?pptactivept Exclamation cZxid = 0x23e6 ctime = Sat Sep 29 17:31:51 JST 2012 mZxid = 0x23e6 mtime = Sat Sep 29 17:31:51 JST 2012 pZxid = 0x23e6 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 939 numChildren = 0
StormBaseというエンティティの中に
各Spout/Boltの情報とストリームの情報が入っているようですね。
■assignments
最後にassignmentsディレクトリ配下を確認します。
[zk: localhost:2181(CONNECTED) 29] ls /storm/assignments [Exclamation-2-1348907510] [zk: localhost:2181(CONNECTED) 30] ls /storm/assignments/Exclamation-2-1348907510 [] [zk: localhost:2181(CONNECTED) 31] get /storm/assignments/Exclamation-2-1348907510 ??sr'backtype.storm.daemon.common.Assignment?1???TL__extmaptLjava/lang/Object;L__metaq~Lxecutor__GT_node_PLUS_portq~Lexecutor__GT_start_time_secsq~Lmaster_code_dirq~Lnode__GT_hostq~xpppsrclojure.lang.PersistentHashMap?????3?IcountZhasNullL_metatLclojure/lang/IPersistentMap;L nullValueq~Lroott&Lclojure/lang/PersistentHashMap$INode;xrlojure.lang.APersistentMap?<w? sI_hashxp???? ppsr0clojure.lang.PersistentHashMap$BitmapIndexedNode ????cIbitmap[arrayt[Ljava/lang/Object;Leditt-Ljava/util/concurrent/atomic/AtomicReference;xpur[Ljava.lang.Object;??X?s)lxppsq~??uq~ srclojure.lang.PersistentVector???? N??IcntIshiftL_metaq~Lroott$Lclojure/lang/PersistentVector$Node;[tailq~ xrclojure.lang.APersistentVector?TFa?#I_hashxp????psr"clojure.lang.PersistentVector$Node??$?.??[arrayq~ xpuq~ ppppppppppppppppppppppppppppppppuq~ srjava.lang.Integer????8Ivaluexrjava.lang.Number??? ???xpq~sq~????pq~uq~ t$2f5998de-eaec-4470-8905-034aa1330148sq~,sq~????pq~uq~ sq~q~!sq~????pq~uq~ t$75292ffd-b16c-4f8a-907f-456305f293e7sq~-sq~????pq~uq~ q~$sq~,ppppppsr+java.util.concurrent.atomic.AtomicReference?Wq?UxT?Lvalueq~xppppq~Qsrclojure.lang.PersistentArrayMap??==??L_metaq~[arrayq~ xq~????puq~ q~Jsq~Pf??q~Dq~Uq~>q~Uq~8q~Uq~2q~Uq~,q~Uq~&q~Uq~q~Uq~q~Ut4/opt/storm/nimbus/stormdist/Exclamation-2-1348907510sq~R????puq~ q~thyperionq~$ttethys cZxid = 0x23e7 ctime = Sat Sep 29 17:31:51 JST 2012 mZxid = 0x23e7 mtime = Sat Sep 29 17:31:51 JST 2012 pZxid = 0x23e7 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 2355 numChildren = 0
Assignmentエンティティの中にSupervisorとのマッピングや、
Supervisorノード上のファイル保存パスなどが保存されています。
基本的にはStormが動作するための情報、
及び統計情報は全てZookeeperを確認すれば取得できることが分かりました。
4.わかったことまとめ
Stormの動作情報は全てZookeeper上に保持されている
Stormの統計情報もZookeeper上に保持されている
NimbusとUIは基本的にはZookeeperの中身を表示しているだけ
NimbusはZookeeperの状態を見てどうするか決めて、
結果をZookeeperに反映するという流れになっているようです。
自分ではほとんど何も持っていないようですね。
だから、Nimbusが一時的に死んでいても再起動すれば
何事もなかったかのように動き続けるという芸当ができているようです。
また、上記の項目とは別に、
Zookeeperのクラスタを確認してファイルツリーを作成し、
オブジェクトをデシリアライズして中身を確認できるような
ライブラリが欲しくなった今日この頃だったりもします。
機会があれば、つくってみますかねぇ。