Akka-RemoteでRoutingを行う
こんにちは。
前回、RemoteでRoutingを行おうとして上手くいかなかったため、
それをどうやって実現するのかを試してみます。
あと、Routingを行うなら異なるプロセス間を跨がないといまいち面白くないため、
名前的には微妙ですが、下記の構成にして実際にRoutingが可能か確認してみます。
RouterAppからClientAppとServerAppに対してRouting可能か確認を行う、というわけですね。
1. デプロイ先のプロセス情報を指定してデプロイ
まずはServerAppとClientAppに対してプロセスを跨ったデプロイが出来るかを確認します。
RouterAppの設定は下記のようにします。
■remote-router-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } actor.deployment { /remotePool { router = round-robin-pool nr-of-instances = 3 target.nodes = ["akka.tcp://RemoteServerApp@127.0.0.1:2552", "akka.tcp://RemoteClientApp@127.0.0.1:2553"] } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2554 } } }
デプロイ先をtarget.nodeで指定している感じですね。
その上で、アプリケーションコードは下記のようになります。
■RemoteRouterApp.scala
import akka.actor._ import com.typesafe.config.ConfigFactory import akka.routing.FromConfig /** * Akka-Remoteを用いてRouting接続し、メッセージを送信するサンプル */ object RemoteRouterApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-router-app.conf") implicit val system = ActorSystem.apply("RemoteRouterApp", config) val inbox = ActorDSL.inbox() // デプロイ先プロセスを指定してRouterをデプロイ val remoteRouterRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "remotePool") remoteRouterRef.tell("RemoteRouter1", inbox.getRef()) remoteRouterRef.tell("RemoteRouter2", inbox.getRef()) remoteRouterRef.tell("RemoteRouter3", inbox.getRef()) remoteRouterRef.tell("RemoteRouter4", inbox.getRef()) val received1 = inbox.receive() println("received1:" + received1) val received2 = inbox.receive() println("received2:" + received2) val received3 = inbox.receive() println("received3:" + received3) val received4 = inbox.receive() println("received4:" + received4) Thread.sleep(30000) system.shutdown() } }
上記の状態で、RemoteServerApp>RemoteClientApp>RemoteRouterAppという順でプロセスを起動します。
すると、結果は下記のようになりました。
■RemoteServerApp
[INFO] [09/28/2014 09:31:05.506] [main] [Remoting] Starting remoting [INFO] [09/28/2014 09:31:05.844] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/28/2014 09:31:05.847] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/28/2014 09:31:05.860] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local [INFO] [09/28/2014 09:31:15.068] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/28/2014 09:31:15.252] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy ## ★↓今回追加部分↓★ [INFO] [09/28/2014 09:31:21.792] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1 [INFO] [09/28/2014 09:31:21.792] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3 [INFO] [09/28/2014 09:31:21.816] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 09:32:05.894] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteClientApp
[INFO] [09/28/2014 09:31:14.477] [main] [Remoting] Starting remoting [INFO] [09/28/2014 09:31:14.736] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553] [INFO] [09/28/2014 09:31:14.738] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553] received1:akka://RemoteServerApp/user/Receive: Received String Remote received2:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy ## ★↓今回追加部分↓★ [INFO] [09/28/2014 09:31:21.792] [RemoteClientApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteClientApp@127.0.0.1:2553/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2] akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 09:31:45.458] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteRouterApp
[INFO] [09/28/2014 09:31:21.219] [main] [Remoting] Starting remoting [INFO] [09/28/2014 09:31:21.500] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteRouterApp@127.0.0.1:2554] [INFO] [09/28/2014 09:31:21.502] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteRouterApp@127.0.0.1:2554] ## ★↓今回追加部分↓★ received1:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3 received2:akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2 received3:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1 received4:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 09:31:51.912] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
下記のようにActorがデプロイされ、
かつRoundRobin方式でメッセージがRoutingされていることがわかりますね。
- Actor1 > RemoteServerApp
- Actor2 > RemoteClientApp
- Actor3 > RemoteServerApp
2. 既存のActorを指定してRouterを生成
次は、既に存在しているActorのパスを個別指定してRouterの生成が可能かどうかを確認してみます。
まずはRemoteServerAppの設定/コードを下記のように修正し、
下記の3パスについてActorが存在している状態にします。
- akka://RemoteServerApp/user/remotePool/$a
- akka://RemoteServerApp/user/remotePool/$b
- akka://RemoteServerApp/user/remotePool/$c
■remote-server-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } actor.deployment { /remotePool { router = round-robin-pool nr-of-instances = 3 remote = "akka.tcp://RemoteServerApp@127.0.0.1:2552" } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } }
■RemoteServerApp.scala
import akka.actor.{Props, ActorSystem} import com.typesafe.config.ConfigFactory import akka.routing.FromConfig /** * Akka-Remoteを用いて接続を受け付けるクラス */ object RemoteServerApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-server-app.conf") implicit val system = ActorSystem.apply("RemoteServerApp", config) val actor1 = system.actorOf(Props[MessagePrintActor], "Receive") actor1 ! "Local" // ローカスプロセス上にRoutingされたActorを生成 system.actorOf(FromConfig.props(Props[MessagePrintActor]),"remotePool") Thread.sleep(60000) system.shutdown() } }
Server修正後、Router側の修正も行います。
まずは設定の変更から。
■remote-router-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } actor.deployment { /remotePool { router = round-robin-pool nr-of-instances = 3 target.nodes = ["akka.tcp://RemoteServerApp@127.0.0.1:2552", "akka.tcp://RemoteClientApp@127.0.0.1:2553"] } /remoteGroup { router = round-robin-group routees.paths = [ "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a", "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$b", "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$c"] } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2554 } } }
RoundRobinの接続先をroutees.pathsで指定したという形ですね。
その上で、アプリケーションを下記のように修正します。
■RemoteRouterApp.scala
import akka.actor._ import com.typesafe.config.ConfigFactory import akka.routing.FromConfig /** * Akka-Remoteを用いてRouting接続し、メッセージを送信するサンプル */ object RemoteRouterApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/remote-router-app.conf") implicit val system = ActorSystem.apply("RemoteRouterApp", config) val inbox = ActorDSL.inbox() // デプロイ先プロセスを指定してRouterをデプロイ val remoteRouterRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "remotePool") remoteRouterRef.tell("RemoteRouter1", inbox.getRef()) remoteRouterRef.tell("RemoteRouter2", inbox.getRef()) remoteRouterRef.tell("RemoteRouter3", inbox.getRef()) remoteRouterRef.tell("RemoteRouter4", inbox.getRef()) val received1 = inbox.receive() println("received1:" + received1) val received2 = inbox.receive() println("received2:" + received2) val received3 = inbox.receive() println("received3:" + received3) val received4 = inbox.receive() println("received4:" + received4) // 接続先パスを指定してRouterを生成 val remoteGroupRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]), "remoteGroup") remoteGroupRef.tell("RemoteGroup1", inbox.getRef()) remoteGroupRef.tell("RemoteGroup2", inbox.getRef()) remoteGroupRef.tell("RemoteGroup3", inbox.getRef()) remoteGroupRef.tell("RemoteGroup4", inbox.getRef()) val received5 = inbox.receive() println("received5:" + received5) val received6 = inbox.receive() println("received6:" + received6) val received7 = inbox.receive() println("received7:" + received7) val received8 = inbox.receive() println("received8:" + received8) Thread.sleep(30000) system.shutdown() } }
この状態で再度RemoteServerApp>RemoteClientApp>RemoteRouterAppという順でプロセスを起動します。
すると、結果は下記のようになりました。
■RemoteServerApp
[INFO] [09/28/2014 10:04:31.107] [main] [Remoting] Starting remoting [INFO] [09/28/2014 10:04:31.370] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/28/2014 10:04:31.372] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552] [INFO] [09/28/2014 10:04:31.381] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local [INFO] [09/28/2014 10:04:40.146] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote [INFO] [09/28/2014 10:04:40.271] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy [INFO] [09/28/2014 10:04:46.888] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1 [INFO] [09/28/2014 10:04:46.888] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3 [INFO] [09/28/2014 10:04:46.889] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4 ## ★↓今回追加部分↓★ [INFO] [09/28/2014 10:04:46.960] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a] akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup1 [INFO] [09/28/2014 10:04:46.963] [RemoteServerApp-akka.actor.default-dispatcher-14] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$b] akka://RemoteServerApp/user/remotePool/$b: Received String RemoteGroup2 [INFO] [09/28/2014 10:04:46.964] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$c] akka://RemoteServerApp/user/remotePool/$c: Received String RemoteGroup3 [INFO] [09/28/2014 10:04:46.964] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a] akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup4 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 10:05:31.423] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteClientApp
[INFO] [09/28/2014 10:04:39.541] [main] [Remoting] Starting remoting [INFO] [09/28/2014 10:04:39.808] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553] [INFO] [09/28/2014 10:04:39.810] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553] received1:akka://RemoteServerApp/user/Receive: Received String Remote received2:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy [INFO] [09/28/2014 10:04:46.895] [RemoteClientApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteClientApp@127.0.0.1:2553/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2] akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2 [INFO] [09/28/2014 10:05:10.452] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
■RemoteRouterApp
[INFO] [09/28/2014 10:04:46.326] [main] [Remoting] Starting remoting [INFO] [09/28/2014 10:04:46.597] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteRouterApp@127.0.0.1:2554] [INFO] [09/28/2014 10:04:46.599] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteRouterApp@127.0.0.1:2554] received1:akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2 received2:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1 received3:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3 received4:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4 ## ★↓今回追加部分↓★ received5:akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup1 received6:akka://RemoteServerApp/user/remotePool/$b: Received String RemoteGroup2 received7:akka://RemoteServerApp/user/remotePool/$c: Received String RemoteGroup3 received8:akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup4 ## ★↑今回追加部分↑★ [INFO] [09/28/2014 10:05:17.044] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
上記のように、元々ServerAppに存在していたActorのパスを指定して接続することが可能になりました。
元々存在していない場合はnode指定で生成すればよく、
存在しているActorにRemote接続する場合はrouteeで指定すればよい・・ように見えますね。