読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Akka-RemoteでRoutingを行う

こんにちは。

前回、RemoteでRoutingを行おうとして上手くいかなかったため、
それをどうやって実現するのかを試してみます。

あと、Routingを行うなら異なるプロセス間を跨がないといまいち面白くないため、
名前的には微妙ですが、下記の構成にして実際にRoutingが可能か確認してみます。
RouterAppからClientAppとServerAppに対してRouting可能か確認を行う、というわけですね。

1. デプロイ先のプロセス情報を指定してデプロイ

まずはServerAppとClientAppに対してプロセスを跨ったデプロイが出来るかを確認します。
RouterAppの設定は下記のようにします。

■remote-router-app.conf

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  actor.deployment {
  /remotePool {
    router = round-robin-pool
    nr-of-instances = 3
    target.nodes = ["akka.tcp://RemoteServerApp@127.0.0.1:2552", "akka.tcp://RemoteClientApp@127.0.0.1:2553"]
  }
}

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2554
    }
 }
}

デプロイ先をtarget.nodeで指定している感じですね。
その上で、アプリケーションコードは下記のようになります。

■RemoteRouterApp.scala

import akka.actor._
import com.typesafe.config.ConfigFactory
import akka.routing.FromConfig

/**
 * Akka-Remoteを用いてRouting接続し、メッセージを送信するサンプル
 */
object RemoteRouterApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("conf/remote-router-app.conf")
    implicit val system = ActorSystem.apply("RemoteRouterApp", config)

    val inbox = ActorDSL.inbox()
    // デプロイ先プロセスを指定してRouterをデプロイ
    val remoteRouterRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]),
      "remotePool")
    remoteRouterRef.tell("RemoteRouter1", inbox.getRef())
    remoteRouterRef.tell("RemoteRouter2", inbox.getRef())
    remoteRouterRef.tell("RemoteRouter3", inbox.getRef())
    remoteRouterRef.tell("RemoteRouter4", inbox.getRef())
    val received1 = inbox.receive()
    println("received1:" + received1)
    val received2 = inbox.receive()
    println("received2:" + received2)
    val received3 = inbox.receive()
    println("received3:" + received3)
    val received4 = inbox.receive()
    println("received4:" + received4)

    Thread.sleep(30000)
    system.shutdown()
  }
}

上記の状態で、RemoteServerApp>RemoteClientApp>RemoteRouterAppという順でプロセスを起動します。
すると、結果は下記のようになりました。
■RemoteServerApp

[INFO] [09/28/2014 09:31:05.506] [main] [Remoting] Starting remoting
[INFO] [09/28/2014 09:31:05.844] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552]
[INFO] [09/28/2014 09:31:05.847] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552]
[INFO] [09/28/2014 09:31:05.860] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local
[INFO] [09/28/2014 09:31:15.068] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote
[INFO] [09/28/2014 09:31:15.252] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy
## ★↓今回追加部分↓★
[INFO] [09/28/2014 09:31:21.792] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1
[INFO] [09/28/2014 09:31:21.792] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3
[INFO] [09/28/2014 09:31:21.816] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4
## ★↑今回追加部分↑★
[INFO] [09/28/2014 09:32:05.894] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

■RemoteClientApp

[INFO] [09/28/2014 09:31:14.477] [main] [Remoting] Starting remoting
[INFO] [09/28/2014 09:31:14.736] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553]
[INFO] [09/28/2014 09:31:14.738] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553]
received1:akka://RemoteServerApp/user/Receive: Received String Remote
received2:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy
## ★↓今回追加部分↓★
[INFO] [09/28/2014 09:31:21.792] [RemoteClientApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteClientApp@127.0.0.1:2553/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2] akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2
## ★↑今回追加部分↑★
[INFO] [09/28/2014 09:31:45.458] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

■RemoteRouterApp

[INFO] [09/28/2014 09:31:21.219] [main] [Remoting] Starting remoting
[INFO] [09/28/2014 09:31:21.500] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteRouterApp@127.0.0.1:2554]
[INFO] [09/28/2014 09:31:21.502] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteRouterApp@127.0.0.1:2554]
## ★↓今回追加部分↓★
received1:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3
received2:akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2
received3:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1
received4:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4
## ★↑今回追加部分↑★
[INFO] [09/28/2014 09:31:51.912] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

下記のようにActorがデプロイされ、
かつRoundRobin方式でメッセージがRoutingされていることがわかりますね。

  • Actor1 > RemoteServerApp
  • Actor2 > RemoteClientApp
  • Actor3 > RemoteServerApp

2. 既存のActorを指定してRouterを生成

次は、既に存在しているActorのパスを個別指定してRouterの生成が可能かどうかを確認してみます。

まずはRemoteServerAppの設定/コードを下記のように修正し、
下記の3パスについてActorが存在している状態にします。

  • akka://RemoteServerApp/user/remotePool/$a
  • akka://RemoteServerApp/user/remotePool/$b
  • akka://RemoteServerApp/user/remotePool/$c

■remote-server-app.conf

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  actor.deployment {
    /remotePool {
    router = round-robin-pool
    nr-of-instances = 3
    remote = "akka.tcp://RemoteServerApp@127.0.0.1:2552"
    }
  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
 }
}

■RemoteServerApp.scala

import akka.actor.{Props, ActorSystem}
import com.typesafe.config.ConfigFactory
import akka.routing.FromConfig

/**
 * Akka-Remoteを用いて接続を受け付けるクラス
 */
object RemoteServerApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("conf/remote-server-app.conf")
    implicit val system = ActorSystem.apply("RemoteServerApp", config)
    val actor1 = system.actorOf(Props[MessagePrintActor], "Receive")

    actor1 ! "Local"

    // ローカスプロセス上にRoutingされたActorを生成
    system.actorOf(FromConfig.props(Props[MessagePrintActor]),"remotePool")

    Thread.sleep(60000)
    system.shutdown()
  }
}

Server修正後、Router側の修正も行います。
まずは設定の変更から。
■remote-router-app.conf

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  actor.deployment {
  /remotePool {
    router = round-robin-pool
    nr-of-instances = 3
    target.nodes = ["akka.tcp://RemoteServerApp@127.0.0.1:2552", "akka.tcp://RemoteClientApp@127.0.0.1:2553"]
  }

    /remoteGroup {
      router = round-robin-group
      routees.paths = [
        "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a",
        "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$b",
        "akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$c"]
  }
}

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2554
    }
 }
}

RoundRobinの接続先をroutees.pathsで指定したという形ですね。
その上で、アプリケーションを下記のように修正します。
■RemoteRouterApp.scala

import akka.actor._
import com.typesafe.config.ConfigFactory
import akka.routing.FromConfig

/**
 * Akka-Remoteを用いてRouting接続し、メッセージを送信するサンプル
 */
object RemoteRouterApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("conf/remote-router-app.conf")
    implicit val system = ActorSystem.apply("RemoteRouterApp", config)

    val inbox = ActorDSL.inbox()
    // デプロイ先プロセスを指定してRouterをデプロイ
    val remoteRouterRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]),
      "remotePool")
    remoteRouterRef.tell("RemoteRouter1", inbox.getRef())
    remoteRouterRef.tell("RemoteRouter2", inbox.getRef())
    remoteRouterRef.tell("RemoteRouter3", inbox.getRef())
    remoteRouterRef.tell("RemoteRouter4", inbox.getRef())
    val received1 = inbox.receive()
    println("received1:" + received1)
    val received2 = inbox.receive()
    println("received2:" + received2)
    val received3 = inbox.receive()
    println("received3:" + received3)
    val received4 = inbox.receive()
    println("received4:" + received4)

    // 接続先パスを指定してRouterを生成
    val remoteGroupRef = system.actorOf(FromConfig.props(Props[MessagePrintActor]),
      "remoteGroup")
    remoteGroupRef.tell("RemoteGroup1", inbox.getRef())
    remoteGroupRef.tell("RemoteGroup2", inbox.getRef())
    remoteGroupRef.tell("RemoteGroup3", inbox.getRef())
    remoteGroupRef.tell("RemoteGroup4", inbox.getRef())
    val received5 = inbox.receive()
    println("received5:" + received5)
    val received6 = inbox.receive()
    println("received6:" + received6)
    val received7 = inbox.receive()
    println("received7:" + received7)
    val received8 = inbox.receive()
    println("received8:" + received8)

    Thread.sleep(30000)
    system.shutdown()
  }
}

この状態で再度RemoteServerApp>RemoteClientApp>RemoteRouterAppという順でプロセスを起動します。
すると、結果は下記のようになりました。
■RemoteServerApp

[INFO] [09/28/2014 10:04:31.107] [main] [Remoting] Starting remoting
[INFO] [09/28/2014 10:04:31.370] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552]
[INFO] [09/28/2014 10:04:31.372] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552]
[INFO] [09/28/2014 10:04:31.381] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local
[INFO] [09/28/2014 10:04:40.146] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote
[INFO] [09/28/2014 10:04:40.271] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a] akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy
[INFO] [09/28/2014 10:04:46.888] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1
[INFO] [09/28/2014 10:04:46.888] [RemoteServerApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3
[INFO] [09/28/2014 10:04:46.889] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1] akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4
## ★↓今回追加部分↓★
[INFO] [09/28/2014 10:04:46.960] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a] akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup1
[INFO] [09/28/2014 10:04:46.963] [RemoteServerApp-akka.actor.default-dispatcher-14] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$b] akka://RemoteServerApp/user/remotePool/$b: Received String RemoteGroup2
[INFO] [09/28/2014 10:04:46.964] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$c] akka://RemoteServerApp/user/remotePool/$c: Received String RemoteGroup3
[INFO] [09/28/2014 10:04:46.964] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/remotePool/$a] akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup4
## ★↑今回追加部分↑★
[INFO] [09/28/2014 10:05:31.423] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

■RemoteClientApp

[INFO] [09/28/2014 10:04:39.541] [main] [Remoting] Starting remoting
[INFO] [09/28/2014 10:04:39.808] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553]
[INFO] [09/28/2014 10:04:39.810] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553]
received1:akka://RemoteServerApp/user/Receive: Received String Remote
received2:akka://RemoteServerApp/remote/akka.tcp/RemoteClientApp@127.0.0.1:2553/user/$a: Received String RemoteDeploy
[INFO] [09/28/2014 10:04:46.895] [RemoteClientApp-akka.actor.default-dispatcher-4] [akka.tcp://RemoteClientApp@127.0.0.1:2553/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2] akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2
[INFO] [09/28/2014 10:05:10.452] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

■RemoteRouterApp

[INFO] [09/28/2014 10:04:46.326] [main] [Remoting] Starting remoting
[INFO] [09/28/2014 10:04:46.597] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteRouterApp@127.0.0.1:2554]
[INFO] [09/28/2014 10:04:46.599] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteRouterApp@127.0.0.1:2554]
received1:akka://RemoteClientApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c2: Received String RemoteRouter2
received2:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter1
received3:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c3: Received String RemoteRouter3
received4:akka://RemoteServerApp/remote/akka.tcp/RemoteRouterApp@127.0.0.1:2554/user/remotePool/c1: Received String RemoteRouter4
## ★↓今回追加部分↓★
received5:akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup1
received6:akka://RemoteServerApp/user/remotePool/$b: Received String RemoteGroup2
received7:akka://RemoteServerApp/user/remotePool/$c: Received String RemoteGroup3
received8:akka://RemoteServerApp/user/remotePool/$a: Received String RemoteGroup4
## ★↑今回追加部分↑★
[INFO] [09/28/2014 10:05:17.044] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

上記のように、元々ServerAppに存在していたActorのパスを指定して接続することが可能になりました。
元々存在していない場合はnode指定で生成すればよく、
存在しているActorにRemote接続する場合はrouteeで指定すればよい・・ように見えますね。