夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Akka-Remoteでメッセージングを行う

こんにちは。

今回はAkka-Remoteを用いて別プロセス同士でメッセージングを行う方法を確認してみます。

参考:
Remoting — Akka Documentation

役割をわかりやすくするために下記のような構成で名前をつけて作ってみます。

1. 下準備

尚、定義は下記の構成になります。
定義を行う際に注意する必要があるのは、
「通信を行うクライアント側でもenabled-transportsを設定し、
 ポートも設定しておく必要がある」ということですね。

クライアント専用であればポート指定は0(自動取得)でいいのですが、
どちらにしてもサーバとほぼ同様の設定を施しておく必要があるということです。

■サーバ側

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
 }
}

■クライアント側

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2553
    }
 }
}

2. Server側で立ち上げたActorに対してClientが接続を行う

では、まずはServer側で立ち上げたActorに対してClientが接続して通信を行うプログラムを作ります。

■RemoteServerApp.scala

import akka.actor.{Props, ActorSystem}
import com.github.kimutansk.akka.exercise.routing.MessagePrintActor
import com.typesafe.config.ConfigFactory

/**
 * Akka-Remoteを用いて接続を受け付けるクラス
 */
object RemoteServerApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("conf/remote-server-app.conf")
    val system = ActorSystem.apply("RemoteServerApp", config)
    val actor1 = system.actorOf(Props[MessagePrintActor], "Receive")

    actor1 ! "Local"

    Thread.sleep(60000)
    system.shutdown()
  }
}

■RemoteClientApp.scala

import akka.actor.{ActorSystem}
import com.typesafe.config.ConfigFactory

/**
 * Akka-Remoteを用いて接続し、メッセージを送信するクラス
 */
object RemoteClientApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("conf/remote-client-app.conf")
    val system = ActorSystem.apply("RemoteClientApp", config)
    val remoteActorRef = system.actorSelection("akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive")
    remoteActorRef ! "Remote"

    Thread.sleep(10000)
    system.shutdown()
  }
}

内容としては非常に単純で、
Server側でActorを立ち上げ、そのパスを指定してリモートから接続してメッセージを送信するという内容ですね。
実行すると、下記のようになります。
#メッセージ系の一部のログは省略
■RemoteServerApp

[INFO] [09/24/2014 06:35:10.916] [main] [Remoting] Starting remoting
[INFO] [09/24/2014 06:35:11.248] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552]
[INFO] [09/24/2014 06:35:11.250] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552]
[INFO] [09/24/2014 06:35:11.282] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local
[INFO] [09/24/2014 06:35:20.079] [RemoteServerApp-akka.actor.default-dispatcher-2] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote
[INFO] [09/24/2014 06:36:11.303] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

Remote側からメッセージを受け取れているのがわかりますね。

3. Client側でServerからの応答を受け取る

先ほどの実装では、Client側でServer側からの応答を受け取ることができませんでした。
そのため、次はInboxを用いてServer側からの応答を受け取れるか確認します。
■RemoteClientApp.scala

import akka.actor.{ActorDSL, ActorSystem}
import com.typesafe.config.ConfigFactory

/**
 * Akka-Remoteを用いて接続し、メッセージを送信するサンプル
 */
object RemoteClientApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("conf/remote-client-app.conf")
    implicit val system = ActorSystem.apply("RemoteClientApp", config)
    val remoteActorRef = system.actorSelection("akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive")

    val inbox = ActorDSL.inbox()
    remoteActorRef.tell("Remote", inbox.getRef())

    val received1 = inbox.receive()
    println("received1:" + received1)

    Thread.sleep(10000)
    system.shutdown()
  }
}

上記のコードを実行すると下記のようになりました。
■RemoteServerApp

[INFO] [09/24/2014 06:43:55.454] [main] [Remoting] Starting remoting
[INFO] [09/24/2014 06:43:55.822] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteServerApp@127.0.0.1:2552]
[INFO] [09/24/2014 06:43:55.824] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteServerApp@127.0.0.1:2552]
[INFO] [09/24/2014 06:43:55.835] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Local
[INFO] [09/24/2014 06:44:02.158] [RemoteServerApp-akka.actor.default-dispatcher-3] [akka.tcp://RemoteServerApp@127.0.0.1:2552/user/Receive] akka://RemoteServerApp/user/Receive: Received String Remote
[INFO] [09/24/2014 06:44:55.873] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

■RemoteClientApp

[INFO] [09/24/2014 06:44:01.531] [main] [Remoting] Starting remoting
[INFO] [09/24/2014 06:44:01.803] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteClientApp@127.0.0.1:2553]
[INFO] [09/24/2014 06:44:01.805] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://RemoteClientApp@127.0.0.1:2553]
received1:akka://RemoteServerApp/user/Receive: Received String Remote
[INFO] [09/24/2014 06:44:12.262] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down

Client側でServer側においてsenderに対して送信したメッセージが受信できていることがわかりますね。

と、これでとりあえず基本パターンであるRemoteからメッセージを送信して、
応答を受信する、が実現できました。