Genn.aiでTopologyを定義してみる
こんにちは。
前回は使い方がわからなかったGenn.aiですが、ドキュメントも公開されて使い方がわかったため、
とりあえず再度試してみることにします。
まず、gungnirのCLIを起動する前のヘルプを確認してみます。
> ./gungnir -h Unrecognized option: -h usage: gungnir -p <password> password to connect to gungnir server -u <username> user name to connect to gungnir server
・・・というわけで、ユーザIDとパスワードを指定して比較する形でした。
では、実際にCLIを起動して接続して、途中でヘルプなども確認しながらTopologyを定義してみます。
まず、「TestSchema」という名称のTupleを定義し、結果を確認してみます。
gungnir> CREATE TUPLE TestSchema (testKey STRING, testValue STRING); OK gungnir> SHOW TUPLES; [{"name":"TestAction","owner":"kimutansk","createTime":"2013-10-20T22:56:57.309Z"},{"name":"TestSchema","owner":"kimutansk","createTime":"2013-10-23T23:39:17.450Z"}]
と、定義が確認できます。
ただ、管理フィールドを定義していないTupleだったため、一度破棄して作りなおします。
gungnir> DROP TUPLE TestAction; OK gungnir> DROP TUPLE TestSchema; OK gungnir> CREATE TUPLE TestTuple(_tid, _tno, _time, tupleKey STRING, intValue INT, strValue STRING, listValue LIST<STRING>); OK gungnir> SHOW TUPLES; [{"name":"TestTuple","owner":"kimutansk","createTime":"2013-10-23T23:55:38.868Z"}] gungnir> DESC TUPLE TestTuple; {"name":"TestTuple","fields":{"_tid":{"type":"auto detect"},"_tno":{"type":"auto detect"},"_time":{"type":"auto detect"},"tupleKey":{"type":"STRING"},"intValue":{"type":"INT"},"strValue":{"type":"STRING"},"listValue":{"type":"LIST","element":{"type":"STRING"}}},"partitioned":"-","owner":"kimutansk","createTime":"2013-10-23T23:55:38.868Z"}
ここで定義している_tid(Tracking ID), _tno(Tracking No), _time(Tupleの受付時間)はGenn.ai内部で管理情報を追加するためのもので、
定義しておくと自動的にTupleの該当フィールドに管理情報が設定されるようです。
また、DESC TUPLEで表示される"partitioned"とはStorm上で複数スレッドに分割してグルーピングを行う際のキーとなる値です。
#つまりはFieldsGrouping/groupByのキー
定義していない場合はランダム配分(ShuffleGrouping)となるそうです。
次は、実際のTopologyを定義してみます。
gungnir> FROM TestTuple USING kafka_spout() INTO Stream1; OK gungnir> FROM Stream1 FILTER intValue BETWEEN 0 AND 100 INTO Stream2; OK gungnir> FROM Stream1 FILTER intValue BETWEEN 50 AND 1000 INTO Stream3; OK gungnir> FROM Stream2 EMIT * USING kafka_emit('Stream2'); OK gungnir> FROM Stream3 EMIT * USING kafka_emit('Stream3'); OK gungnir> EXPLAIN; SPOUT_0(kafka_spout(), [TestTuple(_tid, _tno, _time, tupleKey STRING, intValue INT, strValue STRING, listValue LIST<STRING>)]) -S-> PARTITION_1 PARTITION_1(identity grouping) -S-> FILTER_2 -S-> FILTER_3 FILTER_2(intValue BETWEEN [0, 100]) -S-> EMIT_4 FILTER_3(intValue BETWEEN [50, 1000]) -S-> EMIT_5 EMIT_4(kafka_emit(Stream2), [*]) EMIT_5(kafka_emit(Stream3), [*])
内容としてはKafkaから情報を取得し、int型のフィールドの値によって分岐させるTopologyです。
分岐後、Kafkaに対してemitを行っています。
実際にこのTopologyをSubmitしてTopologyを起動します。
起動後、Monitoringを行っています。
gungnir> SET monitor = enable; OK gungnir> SUBMIT TOPOLOGY; OK gungnir> DESC TOPOLOGY; {"id":"526a9381e4b08627b67aeccb","explain":"SPOUT_0(kafka_spout(), [TestTuple(_tid, _tno, _time, tupleKey STRING, intValue INT, strValue STRING, listValue LIST<STRING>)])\n -S-> PARTITION_1\nPARTITION_1(identity grouping)\n -S-> FILTER_2\n -S-> FILTER_3\nFILTER_2(intValue BETWEEN [0, 100])\n -S-> EMIT_4\nFILTER_3(intValue BETWEEN [50, 1000])\n -S-> EMIT_5\nEMIT_4(kafka_emit(Stream2), [*])\nEMIT_5(kafka_emit(Stream3), [*])","status":"RUNNING","owner":"kimutansk","createTime":"2013-10-25T15:51:29.500Z","summary":{"name":"gungnir_526a9381e4b08627b67aeccb","status":"ACTIVE","uptimeSecs":2,"numWorkers":1,"numExecutors":3,"numTasks":3}} gungnir> MONITOR 526a9381e4b08627b67aeccb;
起動後、以下のコマンドを実行することで実際にTupleを流し込むことが可能となります。
gungnir> TRACK TestTuple {"tupleKey":"Key1","intValue":1,"strValue":"Test1", "listValue":["List1", "List2"]} gungnir> TRACK TestTuple {"tupleKey":"Key2","intValue":5,"strValue":"Test1", "listValue":["List1", "List2"]}
・・・なのですが、実際試してみたところ、MONITORコマンドをうったウィンドウで以下のエラーが発生し、
Topologyも強制的に終了されてしまいました。
まだまだこのあたりは安定していないようです。
gungnir> MONITOR 526a9381e4b08627b67aeccb; Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 15000
ただ、Genn.ai自体がどんなものか、という概要はこれでわかるかと思います。
これでもう少しコマンド等が簡易に実行できるようになれば、実際にこれを使って解析・・・も考えられるかもしれません。
とりあえず、後は追加で何か情報が出てくるか、入力/出力がよりこなれてからになりますかね。