Twitter Stormの機能まとめメモ(Spoutによるデータ取得)
最近Twitter Stormについてうだうだ呟いていますので、一端まとめを。
放置しているとそのまま流れちゃうんですよねぇ。。。
TwitterでTwitter Stormについて呟く辺りが微妙な所ではありますが^^;
Spoutによるデータの取り込み
StormではStormクラスタに対してデータを取り込む際には
Spoutというコンポーネントを使用する。(下図参照)
Spoutから取り込んだデータはStormの処理管理配下に入り、
処理が保証される。
Stormが外部からデータを取り込むには、
ISpoutインタフェースを継承したクラスでnextTupleメソッドを呼ばれた際に
データをSpoutOutputCollectorに書きこむことで可能。
ちなみに、上記だけ見るとPull型のデータ取得のみに見えてしまいますが、
「SocketをSpoutに保持しておいて、nextTupleメソッドでreadする」という方式によって
通信の受信待ちをすることが可能。
尚、nextTupleは性能がStorm側で保持できるイベントがあふれるまで
ひたすら呼ばれ続けるため、
取得するものが無い場合には適度にSleepで待ちを作る必要がある。
#この辺りbacktype.storm.utils.Utilsクラスに便利メソッドがまとめられてます。
Socketからの通信待ちをする場合には普通にreadメソッドを呼んでおけば
待ち状態になるため、追加でSleepをする必要はない。
実装例(ZeroMqSpout.java : https://github.com/mykidong/storm-spring-example)
ソケットからの通信待ちを行っているSpoutです。
非常にわかりやすい構成ですね。
@Override public void nextTuple() { byte[] msg = null; if(this.socket != null) { try { msg = socket.recv(0); String msgStr = new String(msg); collector.emit(new Values(msgStr)); } catch (ZMQException e) { throw new RuntimeException(e.getCause()); } }
実装例(RandomSentenceSpout.java : https://github.com/nathanmarz/storm-starter)
一定時間ごとに文字列をはくSpoutです。
テスト用のSpoutとしてはちょうどいいかもしれませんね。
@Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); }
尚、シリアライザ/デシリアライザを作成しておけば、
collectorに書きこむ際辺りに自動変換/復元が行われるようなのですが、
その辺りはまだ調査中です。