読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Stormが使うZookeeperの中ってどうなっているの?(その2

こんにちは。

前回に引き続き、Zookeeperの中を(とりあえずわかる範囲で)見て行きます。
前回はStormのクラスタを起動した状態での確認だったので、
今回はTopologyをSubmitした状態での確認ですね。

1.TopologySubmit前の下準備

Stormの機能をフルに実行した状態での確認を行いたいため、
StormStarterプロジェクトのTestWordSpoutを改造して下記のWordSpoutにし、
Ack/Fail機構を有効にした状態にします。

public class WordSpout extends BaseRichSpout {
	/**
	 * 
	 */
	private static final long serialVersionUID = -3089902525505221271L;
	public static Logger LOG = Logger.getLogger(WordSpout.class);
	boolean _isDistributed;
	SpoutOutputCollector _collector;

	public WordSpout() {
		this(true);
	}

	public WordSpout(boolean isDistributed) {
		_isDistributed = isDistributed;
	}

	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		_collector = collector;
	}

	public void close() {

	}

	public void nextTuple() {
		Utils.sleep(100);
		final String[] words = new String[] { "nathan", "mike", "jackson",
				"golda", "bertels" };
		final Random rand = new Random();
		final String word = words[rand.nextInt(words.length)];
		_collector.emit(new Values(word), UUID.randomUUID().toString());★改造★
	}

	public void ack(Object msgId) {
		LOG.info("[WordSpout] Ack " + msgId);★改造★
	}

	public void fail(Object msgId) {

	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		if (!_isDistributed) {
			Map<String, Object> ret = new HashMap<String, Object>();
			ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
			return ret;
		} else {
			return null;
		}
	}
}

2.StormUIから見た状態

先ほどのTestWordSpoutを基にExclamationTopologyを作成し、StormクラスタにSubmitします。
StormUIから見た結果は下記のようになりました。
・・・ついでに、Ackerが実はStormUIからまだ確認できたことも知りました(汗

■Stormクラスタ

■ExclamationTopology

■TestWordSpout

■Bolt

■Acker

3.Zookeeperの状態

上記のStormクラスタの状態でZookeeperがどうなっているか確認します。
前回の結果、既にStormとそのサブディレクトリの存在まではわかっているため、
その中身を見て行きますね。

■supervisors
まずはSupervisorsディレクトリ配下を見てSupervisorのIDを確認します。

[zk: localhost:2181(CONNECTED) 18] ls /storm/supervisors
[2f5998de-eaec-4470-8905-034aa1330148, 75292ffd-b16c-4f8a-907f-456305f293e7]

■workerbeats

[zk: localhost:2181(CONNECTED) 2] ls /storm/workerbeats
[Exclamation-2-1348907510] // ★TopologyIDと同じオブジェクトが存在
[zk: localhost:2181(CONNECTED) 3] ls / /storm/workerbeats/Exclamation-2-1348907510
[2f5998de-eaec-4470-8905-034aa1330148-6700, 75292ffd-b16c-4f8a-907f-456305f293e7-6701, 2f5998de-eaec-4470-8905-034aa1330148-6701, 75292ffd-b16c-4f8a-907f-456305f293e7-6700]
// ★SupervisorID-ポート名のペアが4つあるため、どうやら各Workerを示しているようです。
[zk: localhost:2181(CONNECTED) 5] ls /storm/workerbeats/Exclamation-2-1348907510/75292ffd-b16c-4f8a-907f-456305f293e7-6700
[]
[zk: localhost:2181(CONNECTED) 6] get /storm/workerbeats/Exclamation-2-1348907510/75292ffd-b16c-4f8a-907f-456305f293e7-6700
??srclojure.lang.PersistentArrayMap??==??L_metatLclojure/lang/IPersistentMap;[arrayt[Ljava/lang/Object;xrlojure.lang.APersistentMap?<w?
sI_hashxp????pur[Ljava.lang.Object;??X?s)lxpsrclojure.lang.Keyword	
z%A(IhashL_strtLjava/lang/String;LsymtLclojure/lang/Symbol;xp???psrclojure.lang.Symbol?w?
{??IhashL_metaq~L_strq~Lnameq~Lnsq~xp(?k?ptstorm-idq~ptExclamation-2-1348907510sq~7??psq~
????pptexecutor-statspsq~????puq~srclojure.lang.PersistentVector????
N??IcntIshiftL_metaq~Lroott$Lclojure/lang/PersistentVector$Node;[tailq~xrclojure.lang.APersistentVector?
TFa?#I_hashxp????psr"clojure.lang.PersistentVector$Node??$?.??
??
?pt	time-secsq~ppsq~Pf?M
cZxid = 0x23fc
ctime = Sat Sep 29 17:32:22 JST 2012
mZxid = 0x3082
mtime = Sat Sep 29 18:03:28 JST 2012
pZxid = 0x23fc
cversion = 0
dataVersion = 619
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5888
numChildren = 0

バイト文字列群は一部省略していますが、そのなかにSpout/Boltの名前や、
ProcessLatencyといった単語も含まれていました。

そのため、workerbeats上のオブジェクトに各Workerが処理したTupleの統計情報も保持されているようです。
ただ、メッセージIDのようなものは見えなかったため、
個々のメッセージについてはZookeeperでは管理していない模様。

各SpoutとAckerが保持しているようですね。

■errors
次にerrosディレクトリ配下を確認します。

[zk: localhost:2181(CONNECTED) 12] ls /storm/errors
[Exclamation-2-1348907510]
[zk: localhost:2181(CONNECTED) 13] ls /storm/errors/Exclamation-2-1348907510
[word, __acker, exclaim1, exclaim2]
[zk: localhost:2181(CONNECTED) 16] get /storm/errors/Exclamation-2-1348907510/word

cZxid = 0x23f1
ctime = Sat Sep 29 17:32:10 JST 2012
mZxid = 0x23f1
mtime = Sat Sep 29 17:32:10 JST 2012
pZxid = 0x23f1
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0

これはエラーが発生していないため特にめぼしい情報はありませんでした。

■storms
次にstormsディレクトリ配下を確認します。

[zk: localhost:2181(CONNECTED) 21] ls /storm/storms
[Exclamation-2-1348907510]
[zk: localhost:2181(CONNECTED) 22] ls /storm/storms/Exclamation-2-1348907510
[]
[zk: localhost:2181(CONNECTED) 23] get /storm/storms/Exclamation-2-1348907510
??sr&backtype.storm.daemon.common.StormBase耕'? I?L__extmaptLjava/lang/Object;L__metaq~Lcomponent__GT_executorsq~Llaunch_time_secsq~L
num_workersq~Lstatusq~L
storm_nameq~xpppsrclojure.lang.PersistentArrayMap??==??L_metatLclojure/lang/IPersistentMap;[arrayt[Ljava/lang/Object;xrlojure.lang.APersistentMap?<w?
sI_hashxp????pur[Ljava.lang.Object;??X?s)lxp
texclaim2srjava.lang.Integer????8Ivaluexrjava.lang.Number???
???xptexclaim1sq~
t__ackersq~
twordq~t__systemsq~
sq~
Pf??srjava.lang.Long;???#?Jvaluexq~
sq~????puq~srclojure.lang.Keyword	z%A(IhashL_strtLjava/lang/String;LsymtLclojure/lang/Symbol;xpJ??t:typesrclojure.lang.Symbol?w?{??IhashL_metaq~L_strq~nameq~nsq~p??c=pttypeq~!psq~??psq~??e?pptactivept
Exclamation
cZxid = 0x23e6
ctime = Sat Sep 29 17:31:51 JST 2012
mZxid = 0x23e6
mtime = Sat Sep 29 17:31:51 JST 2012
pZxid = 0x23e6
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 939
numChildren = 0

StormBaseというエンティティの中に
各Spout/Boltの情報とストリームの情報が入っているようですね。

■assignments
最後にassignmentsディレクトリ配下を確認します。

[zk: localhost:2181(CONNECTED) 29] ls /storm/assignments
[Exclamation-2-1348907510]
[zk: localhost:2181(CONNECTED) 30] ls /storm/assignments/Exclamation-2-1348907510
[]
[zk: localhost:2181(CONNECTED) 31] get /storm/assignments/Exclamation-2-1348907510
??sr'backtype.storm.daemon.common.Assignment?1???TL__extmaptLjava/lang/Object;L__metaq~Lxecutor__GT_node_PLUS_portq~Lexecutor__GT_start_time_secsq~Lmaster_code_dirq~Lnode__GT_hostq~xpppsrclojure.lang.PersistentHashMap?????3?IcountZhasNullL_metatLclojure/lang/IPersistentMap;L	nullValueq~Lroott&Lclojure/lang/PersistentHashMap$INode;xrlojure.lang.APersistentMap?<w?
sI_hashxp????	ppsr0clojure.lang.PersistentHashMap$BitmapIndexedNode
????cIbitmap[arrayt[Ljava/lang/Object;Leditt-Ljava/util/concurrent/atomic/AtomicReference;xpur[Ljava.lang.Object;??X?s)lxppsq~??uq~
srclojure.lang.PersistentVector????
N??IcntIshiftL_metaq~Lroott$Lclojure/lang/PersistentVector$Node;[tailq~	xrclojure.lang.APersistentVector?TFa?#I_hashxp????psr"clojure.lang.PersistentVector$Node??$?.??[arrayq~	xpuq~
 ppppppppppppppppppppppppppppppppuq~
srjava.lang.Integer????8Ivaluexrjava.lang.Number???
???xpq~sq~????pq~uq~
t$2f5998de-eaec-4470-8905-034aa1330148sq~,sq~????pq~uq~
sq~q~!sq~????pq~uq~
t$75292ffd-b16c-4f8a-907f-456305f293e7sq~-sq~????pq~uq~
q~$sq~,ppppppsr+java.util.concurrent.atomic.AtomicReference?Wq?UxT?Lvalueq~xppppq~Qsrclojure.lang.PersistentArrayMap??==??L_metaq~[arrayq~	xq~????puq~
q~Jsq~Pf??q~Dq~Uq~>q~Uq~8q~Uq~2q~Uq~,q~Uq~&q~Uq~q~Uq~q~Ut4/opt/storm/nimbus/stormdist/Exclamation-2-1348907510sq~R????puq~
q~thyperionq~$ttethys
cZxid = 0x23e7
ctime = Sat Sep 29 17:31:51 JST 2012
mZxid = 0x23e7
mtime = Sat Sep 29 17:31:51 JST 2012
pZxid = 0x23e7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2355
numChildren = 0

Assignmentエンティティの中にSupervisorとのマッピングや、
Supervisorノード上のファイル保存パスなどが保存されています。

基本的にはStormが動作するための情報、
及び統計情報は全てZookeeperを確認すれば取得できることが分かりました。

4.わかったことまとめ

Stormの動作情報は全てZookeeper上に保持されている
Stormの統計情報もZookeeper上に保持されている
NimbusとUIは基本的にはZookeeperの中身を表示しているだけ

NimbusはZookeeperの状態を見てどうするか決めて、
結果をZookeeperに反映するという流れになっているようです。

自分ではほとんど何も持っていないようですね。
だから、Nimbusが一時的に死んでいても再起動すれば
何事もなかったかのように動き続けるという芸当ができているようです。

また、上記の項目とは別に、
Zookeeperのクラスタを確認してファイルツリーを作成し、
オブジェクトをデシリアライズして中身を確認できるような
ライブラリが欲しくなった今日この頃だったりもします。

機会があれば、つくってみますかねぇ。