Cludera Manager最新版をWimpyな環境に導入してみる(失敗
こんにちは。
この間Tokyo Impala Meetup 2014.10に参加してきて、実際に動きを把握したくなってきましたので、
まずは手始めとしてCludera Managerを導入してみて、環境を構築していってみます。
ただ、当然ながら自宅環境ですのでWimpyな環境ではあるのですがw
1. ハード環境
お決まりのハード環境前提です。
概要構成は下記の通り。
で、性能は下記の通りです。
Wimpyな上にホストの容量を超過するように仮想マシンを配置していますが、
まぁ気にしない方向で^^;
■Cloudera Manager用マシン
CPU : Core i5 U430 Memory : 4GB HDD : 500GB OS : CentOS6.6 64bit(Basic Server)
■ホストマシン
CPU : Core i7 3770 Memory : 32GB HDD : 2TB
■仮想マシン(cluster1、cluster2、cluster3、cluster4)
CPUコア数 : 2 Memory : 8GB HDD : 200GB OS : CentOS6.6 64bit(Basic Server)
2. 前提準備
前提として、下記の設定を行っておきます。
- iptalbles無効化
- SeLinux無効化
- hostsに各サーバを追記し、名前でサーバを引けるようにする
- ntp設定
- OpenJDKアンインストール
3.Cloudera Managerダウンロード
まずはCloudera Managerをダウンロードします。
http://www.cloudera.com/content/cloudera/en/downloads.html にアクセスし、
Cloudera Managerを選択します。
4.Cloudera Managerインストール
手順はわかったため、Cloudera Manager用マシンで下記のように実行し、インストールを行います。
尚、バージョンを明確に固定しておきたいため、latestではなく5.2.0とバージョンを指定して進めます。
# wget http://archive.cloudera.com/cm5/installer/5.2.0/cloudera-manager-installer.bin # chmod u+x cloudera-manager-installer.bin # ./cloudera-manager-installer.bin
実行するとライセンスの確認がされますので、OKをして先に進みます。
すると下記のようにインストールが開始されます。
・・・それなりに時間がかかりますが、気になる場合は
/var/log/cloudera-manager-installer 配下に下記のようなログが出力されるので、
中身をtailすれば何か問題が起こっていないか、がわかります。
中ではリポジトリの追加を行った上で、普通にyumを実行しているようですね。
■2.install-oracle-j2sdk1.7.log
読み込んだプラグイン:fastestmirror インストール処理の設定をしています Loading mirror speeds from cached hostfile * base: ftp.riken.jp * extras: ftp.riken.jp * updates: ftp.riken.jp 依存性の解決をしています --> トランザクションの確認を実行しています。 ---> Package oracle-j2sdk1.7.x86_64 0:1.7.0+update67-1 will be インストール --> 依存性解決を終了しました。 依存性を解決しました ================================================================================ パッケージ アーキテクチャ バージョン リポジトリー 容量 ================================================================================ インストールしています: oracle-j2sdk1.7 x86_64 1.7.0+update67-1 cloudera-manager 135 M トランザクションの要約 ================================================================================ インストール 1 パッケージ
インストールが完了すると下記の画面が表示されます。
ポート番号は7180番ポートを使用しており、初期アカウント/パスワードはadmin/adminのようですね。
5. Cloudera Managerクラスタ設定
では、次はCloudera Managerの設定を行い、
実際にHadoopをインストールする先のクラスタの前準備を行います。
トップページにアクセスし、ログインします。
するとエディションを聞かれるため、Cloudera Expressとして「続行」します。
その後のページを「続行」すると、インストール先のホストが聞かれるため、
「cluster[1-4]」と入力してホストを検索すると下記のようにインストール先のホストが表示されますので、「続行」します。
すると次はインストールするパッケージ形態とバージョンについて確認されます。
Parcelを使う方式で、バージョンもそのままでいいため、特に変更せずに「続行」します。
次はJDKのインストールオプションです。
元々MinimalでJDKがインストールされていないこともありますので、
インストールオプションを有効にして「続行」します。
次はSSHログイン情報です。
こちらもユーザ名とパスワード情報を入力し、「続行」します。
すると、各サーバ上でインストールが開始されます。
・・・なのですが、実行してみると下記のようにエラーになりました。
エラーを確認してみるとバージョンが一致していないようなメッセージなのですが、
yum infoで取得しているバージョンはあっているように見える・・・あれ?
性能的なエラーには見えないことからWimpyな環境に導入したから発生したエラーでは
無いとは思うのですが、原因は何なんでしょうね?
とりあえず調べてみようと思います。
akka-distributed-workersの初期化処理確認
こんにちは。
前回でPersistenceまで行ったので、今回はより大きなサンプルであるakka-distributed-workersを読み、
動作の確認を行ってみました。
ソースコードは下記の場所から取得することができます。
typesafehub/activator-akka-distributed-workers
中身を見てみるとakka-cluster、akka-remote、akka-persistenceといった通信周りや
クラスタリング周りのライブラリが使われており、規模も小さめで勉強には丁度いいように見えます。
そのため、akka-distributed-workersを順に読みながら理解を進めていきます。
あ、ただ当然まだ不慣れな関係上間違っているかもしれませんので、
気付いた場合はコメント頂けると幸いです。
1. akka-distributed-workersのActor構成
まず、akka-distributed-workersで使用されているActorは主に下記の11個です。
■固有Actor
- Frontend
- Master
- Worker
- WorkExecutor
- WorkProducer
- WorkResultConsumer
■既成Actor
- ClusterSingletonManager
- akka-clusterを利用してある特定のロールのActorを常時1つだけ有効化するActor
- SharedLeveldbStore
- テスト専用の複数のActorSystemで共有可能な状態保存Actor
- ClusterClient
- Akka-Clusterの外部からClusterの内部のActorと通信するためのActor
- ClusterSingletonProxy
- 自動的にClusterSingletonManagerが有効化しているActorへの接続を維持するActor
- DistributedPubSubMediator
- PubSubメッセージの送受信を媒介するActor
固有Actorについてはソースを読み解いていく中で確認します。
2. Main処理概要
akka-distributed-workersのmain文は下記のようになっています。
def main(args: Array[String]): Unit = { if (args.isEmpty) { startBackend(2551, "backend") Thread.sleep(5000) startBackend(2552, "backend") startWorker(0) Thread.sleep(5000) startFrontend(0) } else { val port = args(0).toInt if (2000 <= port && port <= 2999) startBackend(port, "backend") else if (3000 <= port && port <= 3999) startFrontend(port) else startWorker(port) } }
順々にメソッドを呼び出して初期化/動作を進めていく流れですね。
引数が空だったとしてどういう風に初期化/動作していくかを確認します。
3. Backendの1個目起動
はじめにBackendの1個目を起動・・・と見えるメソッドを呼んでいます。
このメソッドを実行した結果、下記のActor構成になります。
尚、黒の破線がActorSystem1個を表しています。
SharedLeveldbStoreのみは起動後に参照が取得できることの確認を行っています。
4. Backendの2個目起動
次はBackendの2個目の起動となります。
このメソッドを実行した結果、下記のActor構成になります。
ClusterSingletomManagerが複数存在するため
Master等のActorは起動されない・・・という状態になります。
5. Worker起動
次はWorkerの起動です。
起動メソッドを実行した結果、下記のActor構成となります。
WorkerがWorkExecutorとClusterClientを子Actorとして保持し、
ClusterClientが存在しているClusterに対して接続を行います。
その直後にWorkerはMasterに対して登録メッセージを送付します。
これでMasterはWorkerを認識するようになります。
6. Frontend起動
最後にFrontendの起動です。
起動メソッドを実行した結果、下記のActor構成となります。
ClusterProxyでClusterへの接続を行い、
DistributedPubSubMediatorでPubSubメッセージの受信準備を行う形になりますね。
・・・という形で起動処理が行われることがわかりました。
ですが、ClusterClientとClusterProxyが何故使い分けられているか・・はいまいちわからず。
ただ、全体の流れはある程度わかったとは思います。
次回は実際にどういう風にWorkerに対してTaskの割り振りが行われているか・・
という内容を同じように流れを図示化して試してみます。
Akka-Persistenceでイベントを保存する
こんにちは。
はじめはTypedActorをやろうとも思ったのですが、
Akka-Meetupでもあったように、TypedActorは使いにくいので通常あまり使われないようです。
#実際、クラス階層も変わるためいまいち使いにくい、というのはありました。
並行処理初心者のためのAkka入門
http://www.slideshare.net/sifue/akka-39611889
そのため、Akka-Persistenceを試してみます。
1. Akka-Persistenceでイベント情報を保存する
まずはAkka-Persistenceでイベント情報を保存/復元できるかの確認をまずは行います。
設定ファイルは下記になります。
■persistence-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } } akka.persistence.journal.leveldb.dir = "target/example/journal" akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
■SamplePersistentActor.scala
動作するActorは下記になります。
受信したメッセージに応じて動作を変える・・・という形になります。
import akka.actor.ActorLogging import akka.persistence.{SnapshotOffer, PersistentActor} /** * Akka-Persistenceの確認を行うサンプルActor */ class SamplePersistentActor(name: String) extends PersistentActor with ActorLogging { override def persistenceId: String = "SamplePersistentActor" + name var stateCount = 0 override def receiveCommand: SamplePersistentActor#Receive = { case "path" => context.sender ! self.path case "print" => println(self.path + ":" + stateCount) case "snap" => saveSnapshot(stateCount) case "view" => context.sender ! self.path + ":" + stateCount case message: String => stateCount += message.length } override def receiveRecover: SamplePersistentActor#Receive = { case SnapshotOffer(_, snapshot: Int) => stateCount = snapshot case other:Any => println(self.path + "(" + stateCount + "):" + other) } }
■PersistenceApp.scala
import akka.actor.{ActorDSL, Props, ActorSystem} import com.typesafe.config.ConfigFactory /** * Akka-Persistenceの確認を行うサンプルアプリケーション */ object PersistenceApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/persistence-app.conf") implicit val system = ActorSystem.apply("PersistentApp", config) val actor = system.actorOf(Props.apply(new SamplePersistentActor("actor"))) val rootInbox = ActorDSL.inbox() rootInbox.send(actor, "view") val msg1 = rootInbox.receive() println("Receive1:" + msg1) actor ! "Test1" actor ! "snap" Thread.sleep(5000) system.shutdown() } }
上記のアプリケーションを実行すると下記になります。
■1回目
[INFO] [10/06/2014 09:34:59.998] [main] [Remoting] Starting remoting akka://PersistentApp/user/$a(0):RecoveryCompleted Receive1:akka://PersistentApp/user/$a:0 [INFO] [10/06/2014 09:35:20.724] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down
■2回目
[INFO] [10/06/2014 09:36:54.234] [main] [Remoting] Starting remoting akka://PersistentApp/user/$a(5):RecoveryCompleted Receive1:akka://PersistentApp/user/$a:5 [INFO] [10/06/2014 09:36:59.899] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down
2回目の起動で状態が復旧しているのがわかりますね。
尚、ファイル自体は設定で指定した下記のディレクトリ配下に出力されます。
akka.persistence.journal.leveldb.dir = "target/example/journal" akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
2. Akka-Persistenceはどのタイミングでイベントが保存される?
先ほどのアプリケーションでは"snap"メッセージを送信して明示的にファイルを保存していましたが、
snapメッセージを送付しない場合はどうなるか、も確認してみました。
■PersistenceApp.scala
import akka.actor.{ActorDSL, Props, ActorSystem} import com.typesafe.config.ConfigFactory import akka.persistence.Recover /** * Akka-Persistenceの確認を行うサンプルアプリケーション */ object PersistenceApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/persistence-app.conf") implicit val system = ActorSystem.apply("PersistentApp", config) val actor = system.actorOf(Props.apply(new SamplePersistentActor("actor"))) val rootInbox = ActorDSL.inbox() rootInbox.send(actor, "view") val msg1 = rootInbox.receive() println("Receive1:" + msg1) actor ! "Test1" // メッセージ送信後、snap無しに再読み込みを実施 actor ! Recover() rootInbox.send(actor, "view") val msg2 = rootInbox.receive() println("Receive2:" + msg2) actor ! "Test2" actor ! "snap" Thread.sleep(5000) system.shutdown() } }
実行結果は下記の通りです。
■1回目
[INFO] [10/06/2014 09:53:57.489] [main] [Remoting] Starting remoting akka://PersistentApp/user/$a(0):RecoveryCompleted Receive1:akka://PersistentApp/user/$a:0 Receive2:akka://PersistentApp/user/$a:5 [INFO] [10/06/2014 09:54:03.144] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down
■2回目
[INFO] [10/06/2014 10:00:31.279] [main] [Remoting] Starting remoting akka://PersistentApp/user/$a(0):RecoveryCompleted Receive1:akka://PersistentApp/user/$a:0 Receive2:akka://PersistentApp/user/$a:5 [INFO] [10/06/2014 10:00:36.933] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down
Recoverメッセージを送信しても既に値が存在している場合は再読み込みは行われない。
ただ、saveSnapshotメソッドを呼び出していない場合はSystemがダウンするとファイルは保存はされていない・・・
となるようです。
後はActor単位で殺した場合はどうなるか、も気になる所ですが、
それはもっと本格的なプログラムを組んだ時に試してみます。
Akka-RemoteでRoutingを行う
こんにちは。
前回、RemoteでRoutingを行おうとして上手くいかなかったため、
それをどうやって実現するのかを試してみます。
あと、Routingを行うなら異なるプロセス間を跨がないといまいち面白くないため、
名前的には微妙ですが、下記の構成にして実際にRoutingが可能か確認してみます。
RouterAppからClientAppとServerAppに対してRouting可能か確認を行う、というわけですね。
1. デプロイ先のプロセス情報を指定してデプロイ
まずはServerAppとClientAppに対してプロセスを跨ったデプロイが出来るかを確認します。
RouterAppの設定は下記のようにします。
■remote-router-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } actor.deployment { /remotePool { router = round-robin-pool nr-of-instances = 3 target.nodes = ["akka.tcp://RemoteServerApp@127.0.0.1:2552", "akka.tcp://RemoteClientApp@127.0.0.1:2553"] } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2554 } } }
デプロイ先をtarget.nodeで指定している感じですね。
その上で、アプリケーションコードは下記のようになります。
■RemoteRouterApp.scala
import akka.actor._ import com.typesafe.config.ConfigFactory import akka.routing.FromConfig /** * Akka-Remoteを用いてRouting接続し、メッセージを送信するサンプル */ object RemoteRouterApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-router-app.conf") implicit val system = ActorSystem.apply("RemoteRouterApp", config) val inbox = ActorDSL.inbox() // デプロイ先プロセスを指定してRouterをデプロイ val remoteRouterRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "remotePool") remoteRouterRef.tell("RemoteRouter1", inbox.getRef()) remoteRouterRef.tell("RemoteRouter2", inbox.getRef()) remoteRouterRef.tell("RemoteRouter3", inbox.getRef()) remoteRouterRef.tell("RemoteRouter4", inbox.getRef()) val received1 = inbox.receive() println("received1:" + received1) val received2 = inbox.receive() println("received2:" + received2) val received3 = inbox.receive() println("received3:" + received3) val received4 = inbox.receive() println("received4:" + received4) Thread.sleep(30000) system.shutdown() } }
上記の状態で、RemoteServerApp>RemoteClientApp>RemoteRouterAppという順でプロセスを起動します。
すると、結果は下記のようになりました。
■RemoteServerApp
[INFO] [09/28/2014 09:31:05.506] [main] [Remoting] Starting remoting [INFO] [09/28/2014 09:31:05.844] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/28/2014 09:31:05.847] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/28/2014 09:31:05.860] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local [INFO] [09/28/2014 09:31:15.068] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/28/2014 09:31:15.252] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy ## ★↓今回追加部分↓★ [INFO] [09/28/2014 09:31:21.792] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1 [INFO] [09/28/2014 09:31:21.792] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3 [INFO] [09/28/2014 09:31:21.816] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 09:32:05.894] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteClientApp
[INFO] [09/28/2014 09:31:14.477] [main] [Remoting] Starting remoting [INFO] [09/28/2014 09:31:14.736] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553] [INFO] [09/28/2014 09:31:14.738] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553] received1:akka://RemoteServerApp/user/Receive: Received String Remote received2:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy ## ★↓今回追加部分↓★ [INFO] [09/28/2014 09:31:21.792] [RemoteClientApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteClientApp@127.0.0.1:2553/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2] akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 09:31:45.458] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteRouterApp
[INFO] [09/28/2014 09:31:21.219] [main] [Remoting] Starting remoting [INFO] [09/28/2014 09:31:21.500] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteRouterApp@127.0.0.1:2554] [INFO] [09/28/2014 09:31:21.502] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteRouterApp@127.0.0.1:2554] ## ★↓今回追加部分↓★ received1:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3 received2:akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2 received3:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1 received4:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 09:31:51.912] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
下記のようにActorがデプロイされ、
かつRoundRobin方式でメッセージがRoutingされていることがわかりますね。
- Actor1 > RemoteServerApp
- Actor2 > RemoteClientApp
- Actor3 > RemoteServerApp
2. 既存のActorを指定してRouterを生成
次は、既に存在しているActorのパスを個別指定してRouterの生成が可能かどうかを確認してみます。
まずはRemoteServerAppの設定/コードを下記のように修正し、
下記の3パスについてActorが存在している状態にします。
- akka://RemoteServerApp/user/remotePool/$a
- akka://RemoteServerApp/user/remotePool/$b
- akka://RemoteServerApp/user/remotePool/$c
■remote-server-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } actor.deployment { /remotePool { router = round-robin-pool nr-of-instances = 3 remote = "akka.tcp://RemoteServerApp@127.0.0.1:2552" } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } }
■RemoteServerApp.scala
import akka.actor.{Props, ActorSystem} import com.typesafe.config.ConfigFactory import akka.routing.FromConfig /** * Akka-Remoteを用いて接続を受け付けるクラス */ object RemoteServerApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-server-app.conf") implicit val system = ActorSystem.apply("RemoteServerApp", config) val actor1 = system.actorOf(Props[MessagePrintActor], "Receive") actor1 ! "Local" // ローカスプロセス上にRoutingされたActorを生成 system.actorOf(FromConfig.props(Props[MessagePrintActor]),"remotePool") Thread.sleep(60000) system.shutdown() } }
Server修正後、Router側の修正も行います。
まずは設定の変更から。
■remote-router-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } actor.deployment { /remotePool { router = round-robin-pool nr-of-instances = 3 target.nodes = ["akka.tcp://RemoteServerApp@127.0.0.1:2552", "akka.tcp://RemoteClientApp@127.0.0.1:2553"] } /remoteGroup { router = round-robin-group routees.paths = [ "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a", "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$b", "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$c"] } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2554 } } }
RoundRobinの接続先をroutees.pathsで指定したという形ですね。
その上で、アプリケーションを下記のように修正します。
■RemoteRouterApp.scala
import akka.actor._ import com.typesafe.config.ConfigFactory import akka.routing.FromConfig /** * Akka-Remoteを用いてRouting接続し、メッセージを送信するサンプル */ object RemoteRouterApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-router-app.conf") implicit val system = ActorSystem.apply("RemoteRouterApp", config) val inbox = ActorDSL.inbox() // デプロイ先プロセスを指定してRouterをデプロイ val remoteRouterRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "remotePool") remoteRouterRef.tell("RemoteRouter1", inbox.getRef()) remoteRouterRef.tell("RemoteRouter2", inbox.getRef()) remoteRouterRef.tell("RemoteRouter3", inbox.getRef()) remoteRouterRef.tell("RemoteRouter4", inbox.getRef()) val received1 = inbox.receive() println("received1:" + received1) val received2 = inbox.receive() println("received2:" + received2) val received3 = inbox.receive() println("received3:" + received3) val received4 = inbox.receive() println("received4:" + received4) // 接続先パスを指定してRouterを生成 val remoteGroupRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "remoteGroup") remoteGroupRef.tell("RemoteGroup1", inbox.getRef()) remoteGroupRef.tell("RemoteGroup2", inbox.getRef()) remoteGroupRef.tell("RemoteGroup3", inbox.getRef()) remoteGroupRef.tell("RemoteGroup4", inbox.getRef()) val received5 = inbox.receive() println("received5:" + received5) val received6 = inbox.receive() println("received6:" + received6) val received7 = inbox.receive() println("received7:" + received7) val received8 = inbox.receive() println("received8:" + received8) Thread.sleep(30000) system.shutdown() } }
この状態で再度RemoteServerApp>RemoteClientApp>RemoteRouterAppという順でプロセスを起動します。
すると、結果は下記のようになりました。
■RemoteServerApp
[INFO] [09/28/2014 10:04:31.107] [main] [Remoting] Starting remoting [INFO] [09/28/2014 10:04:31.370] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/28/2014 10:04:31.372] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/28/2014 10:04:31.381] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local [INFO] [09/28/2014 10:04:40.146] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/28/2014 10:04:40.271] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy [INFO] [09/28/2014 10:04:46.888] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1 [INFO] [09/28/2014 10:04:46.888] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3 [INFO] [09/28/2014 10:04:46.889] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4 ## ★↓今回追加部分↓★ [INFO] [09/28/2014 10:04:46.960] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a] akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup1 [INFO] [09/28/2014 10:04:46.963] [RemoteServerApp-akka.actor.default-dispatcher-14] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$b] akka://RemoteServerApp/user/remotePool/$b: Received String RemoteGroup2 [INFO] [09/28/2014 10:04:46.964] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$c] akka://RemoteServerApp/user/remotePool/$c: Received String RemoteGroup3 [INFO] [09/28/2014 10:04:46.964] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a] akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup4 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 10:05:31.423] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteClientApp
[INFO] [09/28/2014 10:04:39.541] [main] [Remoting] Starting remoting [INFO] [09/28/2014 10:04:39.808] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553] [INFO] [09/28/2014 10:04:39.810] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553] received1:akka://RemoteServerApp/user/Receive: Received String Remote received2:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy [INFO] [09/28/2014 10:04:46.895] [RemoteClientApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteClientApp@127.0.0.1:2553/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2] akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2 [INFO] [09/28/2014 10:05:10.452] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteRouterApp
[INFO] [09/28/2014 10:04:46.326] [main] [Remoting] Starting remoting [INFO] [09/28/2014 10:04:46.597] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteRouterApp@127.0.0.1:2554] [INFO] [09/28/2014 10:04:46.599] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteRouterApp@127.0.0.1:2554] received1:akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2 received2:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1 received3:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3 received4:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4 ## ★↓今回追加部分↓★ received5:akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup1 received6:akka://RemoteServerApp/user/remotePool/$b: Received String RemoteGroup2 received7:akka://RemoteServerApp/user/remotePool/$c: Received String RemoteGroup3 received8:akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup4 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 10:05:17.044] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
上記のように、元々ServerAppに存在していたActorのパスを指定して接続することが可能になりました。
元々存在していない場合はnode指定で生成すればよく、
存在しているActorにRemote接続する場合はrouteeで指定すればよい・・ように見えますね。
Akka-RemoteでRemoteからActorを作成する
こんにちは。
前回に続き、今回はAkka-Remoteを使用してリモートからActorを作成する方法を確認します。
1. プログラム上でアドレスを定義してデプロイ
まずは、プログラム上でアドレスを定義してServer側にActorを生成することを試してみます。
クライアント側のアプリケーションを下記のように修正します。
■RemoteClientApp.scala
import akka.actor._ import com.typesafe.config.ConfigFactory import akka.remote.RemoteScope /** * Akka-Remoteを用いて接続し、メッセージを送信するサンプル */ object RemoteClientApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-client-app.conf") implicit val system = ActorSystem.apply("RemoteClientApp", config) val remoteActorRef = system.actorSelection("akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive") val inbox = ActorDSL.inbox() remoteActorRef.tell("Remote", inbox.getRef()) val received1 = inbox.receive() println("received1:" + received1) val remoteAddress = AddressFromURIString("akka.tcp://RemoteServerApp@127.0.0.1:2552") val programRemoteRef = system.actorOf(Props[MessagePrintActor]. withDeploy(Deploy(scope = RemoteScope(remoteAddress)))) programRemoteRef.tell("RemoteDeploy", inbox.getRef()) val received2 = inbox.receive() println("received2:" + received2) Thread.sleep(10000) system.shutdown() } }
下記の記述の個所が実際にServer側にActorをデプロイしている個所になりますね。
scopeを指定することでRemoteにデプロイすることが可能になります。
ただ、Server側もデプロイしようとしているActorのクラスを保持している必要があります。
このあたりが動的にデプロイ可能なのかどうかはもっと調べてからで^^;
val remoteAddress = AddressFromURIString("akka.tcp://RemoteServerApp@127.0.0.1:2552") val programRemoteRef = system.actorOf(Props[MessagePrintActor]. withDeploy(Deploy(scope = RemoteScope(remoteAddress))))
実行結果は下記の通りです。
■RemoteServerApp
[INFO] [09/27/2014 21:15:22.041] [main] [Remoting] Starting remoting [INFO] [09/27/2014 21:15:22.363] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/27/2014 21:15:22.365] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/27/2014 21:15:22.378] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local [INFO] [09/27/2014 21:15:31.115] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/27/2014 21:15:31.266] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy [INFO] [09/27/2014 21:16:22.434] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteClientApp
[INFO] [09/27/2014 21:15:30.542] [main] [Remoting] Starting remoting [INFO] [09/27/2014 21:15:30.796] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553] [INFO] [09/27/2014 21:15:30.798] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553] received1:akka://RemoteServerApp/user/Receive: Received String Remote received2:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy [INFO] [09/27/2014 21:15:41.402] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
リモートからActorをデプロイした場合、
Server側の「/remote/akka.tcp/〜」というパス配下にデプロイされるようですね。
2. 定義でRouterを用いたデプロイは出来る?
次は定義でRouterを用いたデプロイを行ってみます。
まずは事前準備としてクライアント側の定義ファイルを下記のように修正し、
actor.deploymentの定義を追加します。
■remote-client-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } actor.deployment { /router1 { router = round-robin-pool nr-of-instances = 3 remote = "akka.tcp://RemoteServerApp@127.0.0.1:2552" } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2553 } } }
その上で、クライアント側のアプリケーションを下記のように修正します。
■RemoteClientApp.scala
import akka.actor._ import com.typesafe.config.ConfigFactory import akka.remote.RemoteScope /** * Akka-Remoteを用いて接続し、メッセージを送信するサンプル */ object RemoteClientApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-client-app.conf") implicit val system = ActorSystem.apply("RemoteClientApp", config) // リモートのActorに対してメッセージ送信 val remoteActorRef = system.actorSelection("akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive") val inbox = ActorDSL.inbox() remoteActorRef.tell("Remote", inbox.getRef()) val received1 = inbox.receive() println("received1:" + received1) // リモートでActorをデプロイ val remoteAddress = AddressFromURIString("akka.tcp://RemoteServerApp@127.0.0.1:2552") val programRemoteRef = system.actorOf(Props[MessagePrintActor]. withDeploy(Deploy(scope = RemoteScope(remoteAddress)))) programRemoteRef.tell("RemoteDeploy", inbox.getRef()) val received2 = inbox.receive() println("received2:" + received2) // リモートでRouterをデプロイ? val remoteRouterRef = system.actorOf(Props[MessagePrintActor], "router1") remoteRouterRef.tell("RemoteRouter3", inbox.getRef()) val received3 = inbox.receive() println("received3:" + received3) remoteRouterRef.tell("RemoteRouter4", inbox.getRef()) val received4 = inbox.receive() println("received4:" + received4) remoteRouterRef.tell("RemoteRouter5", inbox.getRef()) val received5 = inbox.receive() println("received5:" + received5) remoteRouterRef.tell("RemoteRouter6", inbox.getRef()) val received6 = inbox.receive() println("received6:" + received6) Thread.sleep(10000) system.shutdown() } }
上記のコードを実行すると実行結果は下記のようになりました。
■RemoteServerApp
[INFO] [09/27/2014 21:28:52.195] [main] [Remoting] Starting remoting [INFO] [09/27/2014 21:28:52.516] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/27/2014 21:28:52.518] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/27/2014 21:28:52.528] [RemoteServerApp-akka.actor.default-dispatcher-5] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local [INFO] [09/27/2014 21:29:15.392] [RemoteServerApp-akka.actor.default-dispatcher-6] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/27/2014 21:29:15.518] [RemoteServerApp-akka.actor.default-dispatcher-6] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy [INFO] [09/27/2014 21:29:15.651] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1: Received String RemoteRouter3 [INFO] [09/27/2014 21:29:15.659] [RemoteServerApp-akka.actor.default-dispatcher-6] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1: Received String RemoteRouter4 [INFO] [09/27/2014 21:29:15.664] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1: Received String RemoteRouter5 [INFO] [09/27/2014 21:29:15.668] [RemoteServerApp-akka.actor.default-dispatcher-6] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1: Received String RemoteRouter6 [INFO] [09/27/2014 21:29:52.563] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteClientApp
[INFO] [09/27/2014 21:29:14.812] [main] [Remoting] Starting remoting [INFO] [09/27/2014 21:29:15.075] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553] [INFO] [09/27/2014 21:29:15.077] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553] received1:akka://RemoteServerApp/user/Receive: Received String Remote received2:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy received3:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1: Received String RemoteRouter3 received4:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1: Received String RemoteRouter4 received5:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1: Received String RemoteRouter5 received6:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/router1: Received String RemoteRouter6 [INFO] [09/27/2014 21:29:25.773] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
定義ファイルに記述したServerに対してMessagePrintActorをデプロイすることは出来ましたが、
Routerにはならず、単体Actorとしてのデプロイになってしまったようですね。
ともあれ、定義ファイルからアドレスを読み込んでデプロイすることは出来ました。
次回はRemoteでRouter化するためにはどうすればいいか確認してみます。
Akka-Remoteでメッセージングを行う
こんにちは。
今回はAkka-Remoteを用いて別プロセス同士でメッセージングを行う方法を確認してみます。
参考:
Remoting — Akka Documentation
役割をわかりやすくするために下記のような構成で名前をつけて作ってみます。
1. 下準備
尚、定義は下記の構成になります。
定義を行う際に注意する必要があるのは、
「通信を行うクライアント側でもenabled-transportsを設定し、
ポートも設定しておく必要がある」ということですね。
クライアント専用であればポート指定は0(自動取得)でいいのですが、
どちらにしてもサーバとほぼ同様の設定を施しておく必要があるということです。
■サーバ側
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } }
■クライアント側
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2553 } } }
2. Server側で立ち上げたActorに対してClientが接続を行う
では、まずはServer側で立ち上げたActorに対してClientが接続して通信を行うプログラムを作ります。
■RemoteServerApp.scala
import akka.actor.{Props, ActorSystem} import com.github.kimutansk.akka.exercise.routing.MessagePrintActor import com.typesafe.config.ConfigFactory /** * Akka-Remoteを用いて接続を受け付けるクラス */ object RemoteServerApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-server-app.conf") val system = ActorSystem.apply("RemoteServerApp", config) val actor1 = system.actorOf(Props[MessagePrintActor], "Receive") actor1 ! "Local" Thread.sleep(60000) system.shutdown() } }
■RemoteClientApp.scala
import akka.actor.{ActorSystem} import com.typesafe.config.ConfigFactory /** * Akka-Remoteを用いて接続し、メッセージを送信するクラス */ object RemoteClientApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-client-app.conf") val system = ActorSystem.apply("RemoteClientApp", config) val remoteActorRef = system.actorSelection("akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive") remoteActorRef ! "Remote" Thread.sleep(10000) system.shutdown() } }
内容としては非常に単純で、
Server側でActorを立ち上げ、そのパスを指定してリモートから接続してメッセージを送信するという内容ですね。
実行すると、下記のようになります。
#メッセージ系の一部のログは省略
■RemoteServerApp
[INFO] [09/24/2014 06:35:10.916] [main] [Remoting] Starting remoting [INFO] [09/24/2014 06:35:11.248] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/24/2014 06:35:11.250] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/24/2014 06:35:11.282] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local [INFO] [09/24/2014 06:35:20.079] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/24/2014 06:36:11.303] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
Remote側からメッセージを受け取れているのがわかりますね。
3. Client側でServerからの応答を受け取る
先ほどの実装では、Client側でServer側からの応答を受け取ることができませんでした。
そのため、次はInboxを用いてServer側からの応答を受け取れるか確認します。
■RemoteClientApp.scala
import akka.actor.{ActorDSL, ActorSystem} import com.typesafe.config.ConfigFactory /** * Akka-Remoteを用いて接続し、メッセージを送信するサンプル */ object RemoteClientApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-client-app.conf") implicit val system = ActorSystem.apply("RemoteClientApp", config) val remoteActorRef = system.actorSelection("akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive") val inbox = ActorDSL.inbox() remoteActorRef.tell("Remote", inbox.getRef()) val received1 = inbox.receive() println("received1:" + received1) Thread.sleep(10000) system.shutdown() } }
上記のコードを実行すると下記のようになりました。
■RemoteServerApp
[INFO] [09/24/2014 06:43:55.454] [main] [Remoting] Starting remoting [INFO] [09/24/2014 06:43:55.822] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/24/2014 06:43:55.824] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/24/2014 06:43:55.835] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local [INFO] [09/24/2014 06:44:02.158] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/24/2014 06:44:55.873] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteClientApp
[INFO] [09/24/2014 06:44:01.531] [main] [Remoting] Starting remoting [INFO] [09/24/2014 06:44:01.803] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553] [INFO] [09/24/2014 06:44:01.805] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553] received1:akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/24/2014 06:44:12.262] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
Client側でServer側においてsenderに対して送信したメッセージが受信できていることがわかりますね。
と、これでとりあえず基本パターンであるRemoteからメッセージを送信して、
応答を受信する、が実現できました。
ActorSystemからActorを検索し、メッセージを送信する
こんにちは。
今回は生成したActorに対してメッセージを送信するのではなく、
元々存在しているActorを取得してきてメッセージを送信することを試してみます。
これができるようになれば、元々存在しているActorSystemにアクセスし、
Actorに対してメッセージを送れるようになる、
つまりは複数のActorSystem間でのメッセージ送受信につながるはずです。
1. 1Actor取得確認
というわけで、実際にコードを書いてみます。
まず、下記の定義は継続使用します。
■application.conf
akka.actor.deployment {
/router1 {
router = round-robin-pool
nr-of-instances = 3
}
}
その上で、下記のコードを記述します。
■ReferenceApp.scala
object ReferenceApp extends App { override def main(args: Array[String]): Unit = { implicit val system = ActorSystem.apply("ReferenceApp") // router1配下に$a、$b、$cのActorが生成される val router1 = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "router1") val actor1 = system.actorSelection("akka://ReferenceApp/user/router1/$b") actor1 ! "Message1" router1 ! "Message2" router1 ! "Message3" Thread.sleep(1000) system.shutdown() } }
実行すると、下記の出力となります。(返信先が存在しないエラーは省略)
akka://ReferenceApp/user/router1/$b: Received String Message1 akka://ReferenceApp/user/router1/$a: Received String Message2 akka://ReferenceApp/user/router1/$b: Received String Message3
「Message1」はactorSelectionで取得したActorに通知しています。
そのため、router1配下に生成された$bのActorがactorSelectionで取得できたことがわかります。
尚、actorSelection部は下記のコードを記述した場合でも同様に取得することができました。
元々「system」自体が「akka://ReferenceApp」となっているので、
そこからの相対パスでも取得可能なようです。
val actor1 = system.actorSelection("/user/router1/$b")
2. 複数Actor取得確認
フルパスの指定ではなく、パターンでも取得可能なようなので、
下記のコードも試してみました。
■ReferenceApp.scala
object ReferenceApp extends App { override def main(args: Array[String]): Unit = { implicit val system = ActorSystem.apply("ReferenceApp") // router1配下に$a、$b、$cのActorが生成される val router1 = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "router1") val actors = system.actorSelection("/user/router1/*") actors ! "Message1" router1 ! "Message2" router1 ! "Message3" Thread.sleep(1000) system.shutdown() } }
すると、結果は下記のようになりました。
akka://ReferenceApp/user/router1/$a: Received String Message1 akka://ReferenceApp/user/router1/$b: Received String Message1 akka://ReferenceApp/user/router1/$c: Received String Message1 akka://ReferenceApp/user/router1/$a: Received String Message2 akka://ReferenceApp/user/router1/$b: Received String Message3
actorSelectionメソッドはどうやら複数のActorを取得することができるようですね。
で、取得した結果のオブジェクトに対してメッセージを送信すると、
条件を満たすActor全てに配信されるようです。
実際、取得されるオブジェクトもActorSelection型となっており、
複数のActorに対してメッセージを配分可能な仕組みになっているように見えます。
3. 検索の結果取得されたActorにメッセージを送信した応答を取得する
では、応答のメッセージがどうなっているのか、も確認してみます。
どうやら、以前と同様にInboxを用いることで受信メッセージの取得が可能なようです。
■ReferenceApp.scala
object ReferenceApp extends App { override def main(args: Array[String]): Unit = { implicit val system = ActorSystem.apply("ReferenceApp") // router1配下に$a、$b、$cのActorが生成される val router1 = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "router1") val actors = system.actorSelection("/user/router1/*") val rootInbox = ActorDSL.inbox() actors.tell("Path1", rootInbox.getRef()) actors.tell("Path2", rootInbox.getRef()) // 非同期処理のため、以後のreceiveがPath2の到着前に実行されるのを防止するために待ちを入れる Thread.sleep(1000) val received1 = rootInbox.receive() println("received1:" + received1) val received2 = rootInbox.receive() println("received2:" + received2) val received3 = rootInbox.receive() println("received3:" + received3) try { val received4 = rootInbox.receive() println("received4:" + received4) } catch { case ex:TimeoutException => { println("Exception Occured." + ex.getMessage)} } actors.tell("Path3", rootInbox.getRef()) val received5 = rootInbox.receive() println("received5:" + received5) router1 ! "Test1" Thread.sleep(1000) system.shutdown() } }
結果は下記のようになります。
akka://ReferenceApp/user/router1/$a: Received String Path1 akka://ReferenceApp/user/router1/$c: Received String Path1 akka://ReferenceApp/user/router1/$a: Received String Path2 akka://ReferenceApp/user/router1/$b: Received String Path1 akka://ReferenceApp/user/router1/$c: Received String Path2 akka://ReferenceApp/user/router1/$b: Received String Path2 [WARN] [08/16/2014 14:38:39.077] [ReferenceApp-akka.actor.default-dispatcher-5] [akka://ReferenceApp/system/dsl/inbox-1] dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is 3 received1:akka://ReferenceApp/user/router1/$a: Received String Path1 received2:akka://ReferenceApp/user/router1/$c: Received String Path1 received3:akka://ReferenceApp/user/router1/$a: Received String Path2 Exception Occured.deadline passed akka://ReferenceApp/user/router1/$a: Received String Path3 akka://ReferenceApp/user/router1/$b: Received String Path3 akka://ReferenceApp/user/router1/$c: Received String Path3 received5:akka://ReferenceApp/user/router1/$a: Received String Path3 akka://ReferenceApp/user/router1/$a: Received String Test1
Inboxが各々のActor($a、$b、$c)の応答をそれぞれ受信していることがわかります。
また、selectionで取得した場合は$a、$b、$cに対してメッセージが各々配信されますが、
メッセージに対する応答は$a、$b、$cで別のものとして受信することもわかります。
・・と、とりあえずActorSystemからActorを取得することができました。