読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Twitter Stormの機能まとめメモ(Spoutによるデータ取得)

最近Twitter Stormについてうだうだ呟いていますので、一端まとめを。
放置しているとそのまま流れちゃうんですよねぇ。。。

TwitterTwitter Stormについて呟く辺りが微妙な所ではありますが^^;

Spoutによるデータの取り込み

StormではStormクラスタに対してデータを取り込む際には
Spoutというコンポーネントを使用する。(下図参照)

Spoutから取り込んだデータはStormの処理管理配下に入り、
処理が保証される。

Stormが外部からデータを取り込むには、
ISpoutインタフェースを継承したクラスでnextTupleメソッドを呼ばれた際に
データをSpoutOutputCollectorに書きこむことで可能。

ちなみに、上記だけ見るとPull型のデータ取得のみに見えてしまいますが、
「SocketをSpoutに保持しておいて、nextTupleメソッドでreadする」という方式によって
通信の受信待ちをすることが可能。

尚、nextTupleは性能がStorm側で保持できるイベントがあふれるまで
ひたすら呼ばれ続けるため、
取得するものが無い場合には適度にSleepで待ちを作る必要がある。
#この辺りbacktype.storm.utils.Utilsクラスに便利メソッドがまとめられてます。

Socketからの通信待ちをする場合には普通にreadメソッドを呼んでおけば
待ち状態になるため、追加でSleepをする必要はない。

実装例(ZeroMqSpout.java : https://github.com/mykidong/storm-spring-example

ソケットからの通信待ちを行っているSpoutです。
非常にわかりやすい構成ですね。

  @Override
  public void nextTuple() { 
    byte[] msg = null;
        
        if(this.socket != null) {
            try {
                msg = socket.recv(0);                 
                String msgStr = new String(msg);           
                
                collector.emit(new Values(msgStr));                
            } catch (ZMQException e) {                
                throw new RuntimeException(e.getCause());
            }
        }   
実装例(RandomSentenceSpout.java : https://github.com/nathanmarz/storm-starter

一定時間ごとに文字列をはくSpoutです。
テスト用のSpoutとしてはちょうどいいかもしれませんね。

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[] {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature"};
        String sentence = sentences[_rand.nextInt(sentences.length)];
        _collector.emit(new Values(sentence));
    }

尚、シリアライザ/デシリアライザを作成しておけば、
collectorに書きこむ際辺りに自動変換/復元が行われるようなのですが、
その辺りはまだ調査中です。