夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

SpringXD=リアルタイム解析やバッチ処理同士をつなぐ汎用、分散、拡張可能なデータ統合基盤?

こんにちは。

最近色々手を出し過ぎな気もしますが、気になったものがあったのでちとまとめてみます。
それは、Spring XDです。
http://www.springsource.org/spring-xd

ぱっとトップページを見てみるとやたらと使えそうに見えたので、
実際にチュートリアルを動かし、アーキテクチャ資料を確認してみます。

1.Spring XDって?

トップページを確認してみると、以下とありました。

Spring XDは汎用、分散で拡張可能なサービスで、
データインテグレーション、リアルタイム解析、バッチ処理、データ出力に用いることができる。
Spring XDのゴールはビッグデータの複雑さに対応することである。
実世界のビッグデータアプリケーション構築における複雑さは
主に複数の異なるシステムを様々なユースケースをまたいで一つのシステムに結合しようとするところから生じている。

包括的なビッグデータソリューションを構築する際、以下のような共通的な機能を実現する必要がある。

  • 多様で、分散したデータソースからHDFSやSplunkのようなデータストアに対して高スループットで投入
  • リアルタイム解析(例:メトリクス結合や値のカウント)
  • バッチジョブのワークフロー管理。これらのジョブは典型的なエンタープライズシステム(例:RDBMS)だけでなくHadoopの処理(例:MapReduce、Hive、Pig)も結合する。
  • HDFS、RFBMS、NoSQLとったデータソースから高スループットでデータを取得

Spring XDのねらいはこれらのユースケースに対するワンストップなソリューションを提供することにある。

・・・端的に言うと、複数のデータソースをまとめたり、出力したりして、途中に処理をかませることができる汎用基盤、といったノリでしょうか。
これだけ聞くとApache Camelと非常に似たものを感じますが、ただ分散する機能を備えているあたり、そのまま同じというわけでもなさそうです。

2.簡単なサンプルを動かしてみる

では、とりあえずページにあるチュートリアル的なことを流してみます。
とりあえず、ファイルをダウンロードして展開します。

・・・なのですが、ファイルが156MBとなり、やたらと重いです。
原因としては、以下のような点があるようですが・・・まぁ、複数フレームワークのライブラリを持つと、そりゃ重いですよね。
・redis本体やVMware vFabric GemFireのライブラリを保持している
・xd配下にSpring系、Hadoop系、Tomcat系のライブラリを丸々保持
・shell配下にSpring系、Hadoop系のライブラリを丸々保持

$ cd /usr/local/src
$ wget http://repo.springsource.org/libs-milestone-local/org/springframework/xd/spring-xd/1.0.0.M2/spring-xd-1.0.0.M2.zip
$ unzip spring-xd-1.0.0.M2.zip
$ mv spring-xd-1.0.0.M2 /opt/
$ cd /opt
$ ln -s spring-xd-1.0.0.M2 spring-xd

展開が終わったため、Spring XDのシングルノードモードを起動してみます。
Tomcatを起動して、その上でのAPとして動いているようです。
あと、Spring XDのXDは「eXtreme Data」のようです。コンソールを見ると。ええ。わかりませんよね(汗

$ cd /opt/spring-xd/xd/bin
$ ./xd-singlenode
INFO: No access restrictor found, access to all MBean is allowed
22:47:21,509  INFO main stream.StreamServer:137 - initialized server: context=, servlet=xd
9 08, 2013 10:47:21 午後 org.apache.catalina.core.AprLifecycleListener init
情報: The APR based Apache Tomcat Native library which allows optimal performance in production environments was not found on the java.library.path: /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
9 08, 2013 10:47:22 午後 org.apache.coyote.AbstractProtocol init
情報: Initializing ProtocolHandler ["http-bio-8080"]
9 08, 2013 10:47:22 午後 org.apache.catalina.core.StandardService startInternal
情報: Starting service Tomcat
9 08, 2013 10:47:22 午後 org.apache.catalina.core.StandardEngine startInternal
情報: Starting Servlet Engine: Apache Tomcat/7.0.35

9 08, 2013 10:47:58 午後 org.apache.catalina.util.SessionIdGenerator createSecureRandom
情報: Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [35,483] milliseconds.
9 08, 2013 10:47:58 午後 org.apache.coyote.AbstractProtocol start
情報: Starting ProtocolHandler ["http-bio-8080"]
22:47:58,226  INFO main stream.StreamServer:163 - started embedded tomcat adapter

 _____                           __   _______
/  ___|          (-)             \ \ / /  _  \
\ `--. _ __  _ __ _ _ __   __ _   \ V /| | | |
 `--. \ '_ \| '__| | '_ \ / _` |  / ^ \| | | |
/\__/ / |_) | |  | | | | | (_| | / / \ \ |/ /
\____/| .__/|_|  |_|_| |_|\__, | \/   \/___/
      | |                  __/ |
      |_|                 |___/
1.0.0.M2                         eXtreme Data

Running in Local Mode on port: 8080  JMX port: 8778

Documentation: https://github.com/SpringSource/spring-xd/wiki

サーバを起動後、今度は別ターミナルを立ち上げてshellから操作を行います。
どうやら、「./xd-singlenode」で立ち上げるのはベースのサーバで、操作用のコンソールがshellのようです。
・・・ただ、このあたりのインタフェースはRESTへのインタフェースとのため、RESTアクセス用のクライアントを作ればどこからでもアクセス可能なようですね。
モデルとしては、ベースのサーバプロセスを立ち上げておき、操作用のコンソールでデータ統合用の定義を入れ込んで動作させるというもののようです。

$ cd /opt/spring-xd/shell/bin
$ ./xd-shell
 _____                           __   _______
/  ___|          (-)             \ \ / /  _  \
\ `--. _ __  _ __ _ _ __   __ _   \ V /| | | |
 `--. \ '_ \| '__| | '_ \ / _` |  / ^ \| | | |
/\__/ / |_) | |  | | | | | (_| | / / \ \ |/ /
\____/| .__/|_|  |_|_| |_|\__, | \/   \/___/
      | |                  __/ |
      |_|                 |___/
eXtreme Data
1.0.0.M2 | Admin Server Target: http://localhost:8080
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>help
* ! - Allows execution of operating system (OS) commands
* // - Inline comment markers (start of line only)
* ; - Inline comment markers (start of line only)
* admin config info - Show the XD admin server being used
* admin config server - Configure the XD admin server to use
* aggregatecounter delete - Delete an aggregate counter
* aggregatecounter display - Display aggregate counter values by chosen interval and resolution(minute, hour)
* aggregatecounter list - List all available aggregate counter names
* clear - Clears the console
* cls - Clears the console
* counter delete - Delete the counter with the given name
* counter display - Display the value of a counter
* counter list - List all available counter names
* date - Displays the local date and time
* exit - Exits the shell
* fieldvaluecounter delete - Delete the field-value-counter with the given name
* fieldvaluecounter display - Display the value of a field-value-counter
* fieldvaluecounter list - List all available field-value-counter names
* gauge delete - Delete a gauge
* gauge display - Display the value of a gauge
* gauge list - List all available gauge names
* hadoop config fs - Sets the Hadoop namenode
* hadoop config info - Returns basic info about the Hadoop configuration
* hadoop config jt - Sets the Hadoop job tracker
* hadoop config load - Loads the Hadoop configuration from the given resource
* hadoop config props get - Returns the value of the given Hadoop property
* hadoop config props list - Returns (all) the Hadoop properties
* hadoop config props set - Sets the value for the given Hadoop property
* hadoop fs cat - Copy source paths to stdout
* hadoop fs chgrp - Change group association of files
* hadoop fs chmod - Change the permissions of files
* hadoop fs chown - Change the owner of files
* hadoop fs copyFromLocal - Copy single src, or multiple srcs from local file system to the destination file system. Same as put
* hadoop fs copyMergeToLocal - Takes a source directory and a destination file as input and concatenates files in src into the destination local file
* hadoop fs copyToLocal - Copy files to the local file system. Same as get
* hadoop fs count - Count the number of directories, files, bytes, quota, and remaining quota
* hadoop fs cp - Copy files from source to destination. This command allows multiple sources as well in which case the destination must be a directory
* hadoop fs du - Displays sizes of files and directories contained in the given directory or the length of a file in case its just a file
* hadoop fs expunge - Empty the trash
* hadoop fs get - Copy files to the local file system
* hadoop fs ls - List files in the directory
* hadoop fs mkdir - Create a new directory
* hadoop fs moveFromLocal - Similar to put command, except that the source localsrc is deleted after it's copied
* hadoop fs mv - Move source files to destination in the HDFS
* hadoop fs put - Copy single src, or multiple srcs from local file system to the destination file system
* hadoop fs rm - Remove files in the HDFS
* hadoop fs setrep - Change the replication factor of a file
* hadoop fs tail - Display last kilobyte of the file to stdout
* hadoop fs text - Take a source file and output the file in text format
* hadoop fs touchz - Create a file of zero length
* help - List all commands usage
* http post - POST data to http endpoint
* job create - Create a job
* job deploy - Deploy a previously created job
* job destroy - Destroy an existing job
* job list - List all jobs
* job undeploy - Un-deploy a previously deployed job
* quit - Exits the shell
* richgauge delete - Delete the richgauge
* richgauge display - Display Rich Gauge value
* richgauge list - List all available richgauge names
* script - Parses the specified resource file and executes its commands
* stream create - Create a new stream definition
* stream deploy - Deploy a previously created stream
* stream destroy - Destroy an existing stream
* stream list - List created streams
* stream undeploy - Un-deploy a previously deployed stream
* system properties - Shows the shell's properties
* tap create - Create a tap
* tap destroy - Destroy an existing tap
* tap list - List all taps
* trigger create - Create a new trigger with a given cron expression
* trigger destroy - Destroy an existing trigger
* trigger list - List all triggers
* version - Displays shell version

では、実際にサンプルのデータフローを定義してみます。
実行するコマンドは「stream create --definition "time | log" --name ticktest」。
これは「time」というインプット(1秒おきにTickを実行するもの)を「log」というアウトプットに流し込む「ticktest」というstreamを定義するというものです。
DSL的になっており、「|」はインプットとアウトプットをつなぐために用いているようですね。

xd:>stream create --definition "time | log" --name ticktest
Created new stream 'ticktest'

操作コンソール側で実行すると、サーバ側で以下のログ出力が始まり、コマンドに応じてストリームが生成されて動き出したのがわかります。

9 08, 2013 10:50:58 午後 org.apache.catalina.core.ApplicationContext log
情報: Initializing Spring FrameworkServlet 'xd'
22:51:17,895  INFO http-bio-8080-exec-3 job.JobPlugin:76 - Configuring module with the following properties: {xd.stream.name=ticktest, xd.trigger.execute_on_startup=true}
22:51:18,349  INFO http-bio-8080-exec-3 module.SimpleModule:130 - initialized module: SimpleModule [name=log, type=sink, group=ticktest, index=1]
22:51:18,354  INFO http-bio-8080-exec-3 module.SimpleModule:144 - started module: SimpleModule [name=log, type=sink, group=ticktest, index=1]
22:51:18,355  INFO http-bio-8080-exec-3 module.ModuleDeployer:127 - launched sink module: ticktest:log:1
22:51:18,431  INFO http-bio-8080-exec-3 job.JobPlugin:76 - Configuring module with the following properties: {xd.stream.name=ticktest, xd.trigger.execute_on_startup=true}
22:51:18,795  INFO http-bio-8080-exec-3 module.SimpleModule:130 - initialized module: SimpleModule [name=time, type=source, group=ticktest, index=0]
22:51:18,805  INFO http-bio-8080-exec-3 module.SimpleModule:144 - started module: SimpleModule [name=time, type=source, group=ticktest, index=0]
22:51:18,806  INFO http-bio-8080-exec-3 module.ModuleDeployer:127 - launched source module: ticktest:time:0
22:51:18,827  WARN task-scheduler-1 logger.ticktest:141 - 2013-09-08 22:51:18
22:51:19,831  WARN task-scheduler-1 logger.ticktest:141 - 2013-09-08 22:51:19
22:51:20,832  WARN task-scheduler-2 logger.ticktest:141 - 2013-09-08 22:51:20
22:51:21,834  WARN task-scheduler-1 logger.ticktest:141 - 2013-09-08 22:51:21
22:51:22,836  WARN task-scheduler-3 logger.ticktest:141 - 2013-09-08 22:51:22
22:51:23,837  WARN task-scheduler-2 logger.ticktest:141 - 2013-09-08 22:51:23
22:51:24,840  WARN task-scheduler-4 logger.ticktest:141 - 2013-09-08 22:51:24

「time」という名前の「source」から、「log」という名前の「sink」に流し込んでいるようですね。
で、操作コンソール側で以下のコマンドを実行すると、streamが終了します。

xd:>stream destroy --name ticktest
Destroyed stream 'ticktest'

その際、サーバ側では以下の出力が行われます。

23:16:05,774  WARN task-scheduler-2 logger.ticktest:141 - 2013-09-08 23:16:05
23:16:06,775  WARN task-scheduler-3 logger.ticktest:141 - 2013-09-08 23:16:06
23:16:07,776  WARN task-scheduler-3 logger.ticktest:141 - 2013-09-08 23:16:07
23:16:08,162  INFO http-bio-8080-exec-5 module.SimpleModule:155 - stopped module: SimpleModule [name=time, type=source, group=ticktest, index=0]
23:16:08,166  INFO http-bio-8080-exec-5 module.SimpleModule:155 - stopped module: SimpleModule [name=log, type=sink, group=ticktest, index=1]

3.分散モード(の概要)

分散モードにおいては、「admin」というプロセスを1プロセス立ち上げ、後は「container」という実際の処理を行うプロセスを立ち上げて処理するモデルとなります。
その際、RedisかRabbitMQをadmin→containerのタスク配置を行う際のキューとして使用するそうです。
詳細は「4.Spring XDのアーキテクチャ」で。

4.Spring XDのアーキテクチャ

と、ここまででとりあえずどんな動作なのかがわかってきたため、Spring XDのアーキテクチャの個所を読んでみます。

Spring XDにはシングルノードモードとマルチノードモードがある。
シングルノードモードは管理コンソールとコンテナが1プロセス上で動作するモードで、開発やテスト時の簡易動作用となる。
マルチノードモードは管理コンソールからクラスタ上に存在するコンテナプロセス群を制御する構成となる。
コンテナがクラスタ内で分散配置されることにより、タスクが分散実行される。

ランタイムアーキテクチャ

Spring XDのランタイム構成はXD AdminとXD Containerからなる。
これらは独自DSLで動作定義を行うようになっており、XD AdminのHTTPインタフェースからXD Containterに対して処理を割り振ることが可能。
XD Adminはタスクを「processing modules」にマッピングする。
ここでのModuleは最小実行単位であり、SpringのApplicationContextとして実現される。

マルチノードモードの分散ランタイムはModuleをXD Containerに対して配分する。
ModuleはXD Containerをまたいで動作し、XD Containerは複数のModuleを動作させることが可能。
シングルノードモードでは1プロセス内でXD AdminとXD Container、各Moduleが全て動作する。

  • DIRTランタイム

Spring XDではこのシンプルな分散アーキテクチャをDistributed Integration Runtime(DIRT)と呼ぶ。
XD Adminは定義された処理をModule単位に分割し、共有Redis queue上に公開する。
XD Containerはラウンドロビン方式で各々モジュール定義を取得し、動作する。
この動作は非常に単純な動作であり、大抵のユースケースにおいては最適ではない。
そのため、Module毎のグルーピング等を今後のリリースで実装する予定である。

尚、現状のバージョン1.0ではXD Admin1プロセスとXD Containerプロセス1プロセス以上、という構成をサポートしている。
1.1ではHaoop YARNやCloudFoundryのような異なる分散実行基盤の上で動作する予定である。

シングルノードランタイム

シングルノードモードでは1プロセス内でXD AdminとXD Container、各Moduleが全て動作する。
XD AdminとXD Containerの間の通信もメモリ内のキューで行われる。

とりあえず、大体どんな構成で動くかはわかりました。

5.Spring XD上で動作するコンポーネント概要

Spring XD上で動作するModuleとしては、
「source(データ入力)」「processor(データ処理)」「sink(データ出力)」の3種類のModuleがあります。
で、現状以下のような実装Moduleが存在するようです。
sourceやsinkについては作り方も大体想像がつくからいいとして、後はprocessorの分岐のサンプルとかがもちっと欲しいところですね。

  • source(データ入力)

HTTP/Tail/File/Twitter Search/Twitter Stream/Gemfire CQ/Syslog/TCP/JMS/RabbitMQ/Time/MQTT

  • processor(データ処理)

Filter/JSON Field Value Filter/Transform/JSON Field Extractor/Script/Splitter/Aggregator

  • sink(データ出力)

Log/File/HDFS/JDBC/TCP/RabbitMQ/GemFire Server/Splunk Server/MQTT

後は、Dynamic Routerという仕組みを用いてsource>processor>sinkの流れを分岐させることができるようです。
分岐のための条件定義はDSL上で定義する簡易なものと、groovyscriptで定義する複雑な定義が可能なものがあるとのこと。
かなり動的に出来るように見えますね。

6.類似プロダクトとの比較

後は類似プロダクトとのわかっている限りでの比較をしてみます。
やっている内容的には非常にApache Camelと似ているため、Camelとの比較になりますね。

まず、現状source/processor/sinkの多彩さについてはCamelの方が歴史も長い関係上上です。
数が違いすぎますね・・・ ただ、メジャーなものはSpring XDもそろえているのであまり影響はないのかもしれません。

定義のしやすさについてはコンソールでの実行の繰り返しとなり全体像が見えにくいSpring XDに比べてCamelの方が上かと。
起動時に定義をファイルに書いておけばそれを読み込むようなものがあれば変わってきそうではありますが。

但し、Camelは一度定義して起動するとストリームの追加などは定義を修正して再起動が必要となります。
柔軟さについては起動したままいくらでも定義が可能なSpring XDの方が上になりますね。

加えて、複数プロセス間で協調する仕組みもCamel単体としては備えていません。
この点についてはSpring XDの方が上・・・と言いたいところですが、現状は分散の仕組みが不完全なため、微妙です。

・・・後発であるため最終的な到達点はSpring XDの方が上になりそうですが、
現状は機能が足りなかったり不完全なため、現実的に使う分にはCamelになりそう・・・というのが実際のところだと思います。
ただ、この先どうなるかは非常に楽しみですね。