読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Genn.aiでTopologyを定義してみる

こんにちは。

前回は使い方がわからなかったGenn.aiですが、ドキュメントも公開されて使い方がわかったため、
とりあえず再度試してみることにします。

http://pages.genn.ai/

まず、gungnirのCLIを起動する前のヘルプを確認してみます。

> ./gungnir -h
Unrecognized option: -h
usage: gungnir
 -p <password>   password to connect to gungnir server
 -u <username>   user name to connect to gungnir server

・・・というわけで、ユーザIDとパスワードを指定して比較する形でした。

では、実際にCLIを起動して接続して、途中でヘルプなども確認しながらTopologyを定義してみます。
まず、「TestSchema」という名称のTupleを定義し、結果を確認してみます。

gungnir> CREATE TUPLE TestSchema (testKey STRING, testValue STRING);
OK
gungnir> SHOW TUPLES;
[{"name":"TestAction","owner":"kimutansk","createTime":"2013-10-20T22:56:57.309Z"},{"name":"TestSchema","owner":"kimutansk","createTime":"2013-10-23T23:39:17.450Z"}]

と、定義が確認できます。
ただ、管理フィールドを定義していないTupleだったため、一度破棄して作りなおします。

gungnir> DROP TUPLE TestAction;
OK
gungnir> DROP TUPLE TestSchema;
OK
gungnir> CREATE TUPLE TestTuple(_tid, _tno, _time, tupleKey STRING, intValue INT, strValue STRING, listValue LIST<STRING>);
OK
gungnir> SHOW TUPLES;
[{"name":"TestTuple","owner":"kimutansk","createTime":"2013-10-23T23:55:38.868Z"}]
gungnir> DESC TUPLE TestTuple;
{"name":"TestTuple","fields":{"_tid":{"type":"auto detect"},"_tno":{"type":"auto detect"},"_time":{"type":"auto detect"},"tupleKey":{"type":"STRING"},"intValue":{"type":"INT"},"strValue":{"type":"STRING"},"listValue":{"type":"LIST","element":{"type":"STRING"}}},"partitioned":"-","owner":"kimutansk","createTime":"2013-10-23T23:55:38.868Z"}

ここで定義している_tid(Tracking ID), _tno(Tracking No), _time(Tupleの受付時間)はGenn.ai内部で管理情報を追加するためのもので、
定義しておくと自動的にTupleの該当フィールドに管理情報が設定されるようです。
また、DESC TUPLEで表示される"partitioned"とはStorm上で複数スレッドに分割してグルーピングを行う際のキーとなる値です。
#つまりはFieldsGrouping/groupByのキー

定義していない場合はランダム配分(ShuffleGrouping)となるそうです。

次は、実際のTopologyを定義してみます。

gungnir> FROM TestTuple USING kafka_spout() INTO Stream1;
OK
gungnir> FROM Stream1 FILTER intValue BETWEEN 0 AND 100 INTO Stream2;
OK
gungnir> FROM Stream1 FILTER intValue BETWEEN 50 AND 1000 INTO Stream3;
OK
gungnir> FROM Stream2 EMIT * USING  kafka_emit('Stream2');
OK
gungnir> FROM Stream3 EMIT * USING  kafka_emit('Stream3');
OK
gungnir> EXPLAIN;
SPOUT_0(kafka_spout(), [TestTuple(_tid, _tno, _time, tupleKey STRING, intValue INT, strValue STRING, listValue LIST<STRING>)])
 -S-> PARTITION_1
PARTITION_1(identity grouping)
 -S-> FILTER_2
 -S-> FILTER_3
FILTER_2(intValue BETWEEN [0, 100])
 -S-> EMIT_4
FILTER_3(intValue BETWEEN [50, 1000])
 -S-> EMIT_5
EMIT_4(kafka_emit(Stream2), [*])
EMIT_5(kafka_emit(Stream3), [*])

内容としてはKafkaから情報を取得し、int型のフィールドの値によって分岐させるTopologyです。
分岐後、Kafkaに対してemitを行っています。

実際にこのTopologyをSubmitしてTopologyを起動します。
起動後、Monitoringを行っています。

gungnir> SET monitor = enable;
OK
gungnir> SUBMIT TOPOLOGY;
OK
gungnir> DESC TOPOLOGY;
{"id":"526a9381e4b08627b67aeccb","explain":"SPOUT_0(kafka_spout(), [TestTuple(_tid, _tno, _time, tupleKey STRING, intValue INT, strValue STRING, listValue LIST<STRING>)])\n -S-> PARTITION_1\nPARTITION_1(identity grouping)\n -S-> FILTER_2\n -S-> FILTER_3\nFILTER_2(intValue BETWEEN [0, 100])\n -S-> EMIT_4\nFILTER_3(intValue BETWEEN [50, 1000])\n -S-> EMIT_5\nEMIT_4(kafka_emit(Stream2), [*])\nEMIT_5(kafka_emit(Stream3), [*])","status":"RUNNING","owner":"kimutansk","createTime":"2013-10-25T15:51:29.500Z","summary":{"name":"gungnir_526a9381e4b08627b67aeccb","status":"ACTIVE","uptimeSecs":2,"numWorkers":1,"numExecutors":3,"numTasks":3}}
gungnir> MONITOR 526a9381e4b08627b67aeccb;

起動後、以下のコマンドを実行することで実際にTupleを流し込むことが可能となります。

gungnir> TRACK TestTuple {"tupleKey":"Key1","intValue":1,"strValue":"Test1", "listValue":["List1", "List2"]}
gungnir> TRACK TestTuple {"tupleKey":"Key2","intValue":5,"strValue":"Test1", "listValue":["List1", "List2"]}

・・・なのですが、実際試してみたところ、MONITORコマンドをうったウィンドウで以下のエラーが発生し、
Topologyも強制的に終了されてしまいました。
まだまだこのあたりは安定していないようです。

gungnir> MONITOR 526a9381e4b08627b67aeccb;
Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 15000

ただ、Genn.ai自体がどんなものか、という概要はこれでわかるかと思います。
これでもう少しコマンド等が簡易に実行できるようになれば、実際にこれを使って解析・・・も考えられるかもしれません。

とりあえず、後は追加で何か情報が出てくるか、入力/出力がよりこなれてからになりますかね。