夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Akka Actor確認(呼び出し元での結果取得)

こんにちは。

前回、Actorからの応答を受信できないか試したところ空の結果が返ってきた・・
という結果だったのですが、これでは実際使いにくい、と調べてみたところ、案の定応答を受信する仕組みは存在しました。

akka.patterns.askをインポートした上で ? で問い合わせをかけることでFuture型の返り値を取得できるようです。

尚、下記のようにakka.patterns.askを確認してみると、暗黙設定としてtimeoutが必要になりますので、
事前に暗黙設定を定義しておく必要があります。
■AskSupport.scala

implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef) // ActorRefに対して暗黙メソッドを追加

final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {

  def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
    case ref: InternalActorRef if ref.isTerminated ⇒
      actorRef ! message
      Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated."))
    case ref: InternalActorRef ⇒
      if (timeout.duration.length <= 0)
        Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]"))
      else {
        val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString)
        actorRef.tell(message, a)
        a.result.future
      }
    case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]"))
  }

  def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout) // ? をつけるとこの処理にマッピングされる。
}

というわけで、下記のようにActorとAppを修正してみます。
■HelloWorldActor.scala

class HelloWorldActor(name :String) extends Actor {
  /** Actor初期化時処理 */
  override def preStart = {println(name + " is started.")  }

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String  => {
      println("HelloWorldActor: Hello world! " + msg + " My name is " + name)
      sender ! "HelloWorldActor: Hello world! " + msg + " My name is " + name
    }
  }

  /** Actor終了時処理 */
  override def postStop = {println(name + " is stopped.")  }
}

■HelloWorldApp.scala

import akka.pattern.ask
import akka.actor.{ActorSystem, Props}
import scala.concurrent.Await
import akka.util.Timeout
import java.util.concurrent.TimeUnit

object HelloWorldApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem("HelloWorldApp")
    val helloWorldActor = system.actorOf(Props.apply(new HelloWorldActor("actor1")), "HelloWorldActor")
    implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS) // ? 実行時の暗黙タイムアウト設定

    val futureTest1 = helloWorldActor ? """Test1"""
    val futureTest2 = helloWorldActor ? """Test2"""
    val unitTest3  = helloWorldActor ! """Test3"""

    println("Test1 future is " + futureTest1)
    val resultTest1 = Await.result(futureTest1, timeout.duration).asInstanceOf[String]
    println("Test1 result is " + resultTest1)

    println("Test2 future is " + futureTest2)
    val resultTest2 = Await.result(futureTest2, timeout.duration).asInstanceOf[String]
    println("Test2 result is " + resultTest2)

    println("Test3 unit is " + unitTest3)
    println("Test3 unit class is " + unitTest3.getClass)

    Thread.sleep(5000)
    system.shutdown()
  }

上記の修正の結果、実行してみると?で実行した側は返り値がFutureとなり、
結果を取得することができました。

?で実行した場合は応答を返すことで応答を受信できます。
ですが、!で実行した場合は応答を返しても受信できないため、1通のメッセージはdeadLetters行き、ということがわかります。

actor1 is started.
Test1 future is scala.concurrent.impl.Promise$DefaultPromise@636e8707
HelloWorldActor: Hello world! Test1 My name is actor1
Test1 result is HelloWorldActor: Hello world! Test1 My name is actor1
Test2 future is scala.concurrent.impl.Promise$DefaultPromise@337ea934
HelloWorldActor: Hello world! Test2 My name is actor1
Test2 result is HelloWorldActor: Hello world! Test2 My name is actor1
HelloWorldActor: Hello world! Test3 My name is actor1
Test3 unit is ()
Test3 unit class is void
[INFO] [07/26/2014 08:19:06.119] [HelloWorldApp-akka.actor.default-dispatcher-2] [akka://HelloWorldApp/deadLetters] Message [java.lang.String] from Actor[akka://HelloWorldApp/user/HelloWorldActor#571958489] to Actor[akka://HelloWorldApp/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
actor1 is stopped.

尚、senderを使わず、単に結果を返した場合は応答は呼び出し元に返らず、
呼び出し元でタイムアウトが発生する、という動作になりました。

非同期のやり取りがこれだけで書けるというのはやはり大きいですね。