夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Inboxを用いてFutureを使うよりお手軽に応答を受信する

こんにちは。

以前の投稿でActorから応答を受信するための方法について確認しましたが、
1つの応答ごとにFutureを用いる必要があり、正直な話かなり使いにくいものでした。

そのため、もっと簡単に応答を受信できるものが無いかな・・
と確認してみたところ、Inboxというコンポーネントがあり、
それを用いると応答が受信できることが確認できました。

参考:
Actors− Akka Documentation

■InboxApp.scala

object InboxApp extends App {
  override def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem.apply("ConfiguredRoutingApp")

    val actor1 = system.actorOf(Props[MessagePrintActor])

    val rootInbox = ActorDSL.inbox()
    rootInbox.send(actor1, "Test1")
    rootInbox.send(actor1, "Test2")
    rootInbox.send(actor1, "Test3")
    rootInbox.send(actor1, "Test4")

    Thread.sleep(1000)

    val msg1 = rootInbox.receive()
    println("Receive1:" + msg1)
    val msg2 = rootInbox.receive()
    println("Receive2:" + msg2)
    val msg3 = rootInbox.receive()
    println("Receive3:" + msg3)

    Thread.sleep(5000)
    system.shutdown()
  }
}

Inbox#send()メソッドを用いることで、Actorからの応答がInboxに蓄積されます。
#Watchメソッドを使った場合は何故か上手くいきませんでした(汗

蓄積後、receiveメソッドを使用することで応答を取得できます。
尚、メッセージが無い状態でreceiveメソッドを実行すると
一定時間待った後、TimeoutExceptionが発生するので、そのつもりで実装する必要があります。

上記のコードを記述して実行すると結果は下記のようになります。

akka://ConfiguredRoutingApp/user/$a: Received String Test1
akka://ConfiguredRoutingApp/user/$a: Received String Test2
akka://ConfiguredRoutingApp/user/$a: Received String Test3
akka://ConfiguredRoutingApp/user/$a: Received String Test4
Receive1:akka://ConfiguredRoutingApp/user/$a: Received String Test1
Receive2:akka://ConfiguredRoutingApp/user/$a: Received String Test2
Receive3:akka://ConfiguredRoutingApp/user/$a: Received String Test3

応答が受信できているのが確認できますね。

尚、Inboxのメッセージを受信可能な数はapplication.confの項目「akka.actor.dsl.inbox-size」で設定可能です。
そのため、設定ファイルを下記のように記述した状態で再度同様のコードを実行してみます。

■application.conf

akka.actor.dsl.inbox-size = 3

すると、実行結果は下記のようになりました。

akka://ConfiguredRoutingApp/user/$a: Received String Test1
akka://ConfiguredRoutingApp/user/$a: Received String Test2
akka://ConfiguredRoutingApp/user/$a: Received String Test3
akka://ConfiguredRoutingApp/user/$a: Received String Test4
[WARN] [08/15/2014 07:03:12.477] [ConfiguredRoutingApp-akka.actor.default-dispatcher-2] [akka://ConfiguredRoutingApp/system/dsl/inbox-1] dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is 3
Receive1:akka://ConfiguredRoutingApp/user/$a: Received String Test1
Receive2:akka://ConfiguredRoutingApp/user/$a: Received String Test2
Receive3:akka://ConfiguredRoutingApp/user/$a: Received String Test3

Inboxで容量をオーバーしたのでDropするというメッセージが出力されます。
尚、Inboxのソースを読むと下記のようになっており、上記のWARNログが出力されるのは1回のみのようです。
そのため、基本は溢れないようにサイズを設定する必要がありますね。

■Inbox.scala

  private class InboxActor(size: Int) extends Actor with ActorLogging {
    var clients = Queue.empty[Query]
    val messages = Queue.empty[Any]
    var clientsByTimeout = TreeSet.empty[Query]
    var printedWarning = false

    def enqueueMessage(msg: Any) {
      if (messages.size < size) messages enqueue msg
      else {
        if (!printedWarning) {
          log.warning("dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size)
          printedWarning = true
        }
      }
    }
  }

Akka Actorの設定を設定ファイルから読み込む

こんにちは。

前回に設定ファイルの設定方法について確認したので、
今回は実際に設定ファイルからActorの定義を読み込めるか試してみます。

参考:
Configuration − Akka Documentation
Routing − Akka Documentation

1. 設定ファイルからActor(Router)の定義を読み込む

Actorの設定自体は下記のように設定ファイルに記述することで、
「router1」という名称を指定したルータのルーティング定義として読み込めるようです。

akka.actor.deployment配下が個々のActorの定義、となっているようですね。
■application.conf

akka.actor.deployment {
  /router1 {
  router = round-robin-pool
  nr-of-instances = 3
  }
}

上記の設定を行ったうえで、アプリケーション側で下記のようにコードを記述します。
■ConfiguredRoutingApp.scala

object ConfiguredRoutingApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply("ConfiguredRoutingApp")
    val router1 = system.actorOf(FromConfig.props(Props[MessagePrintActor]),
      "router1")

    router1 ! "Test1"
    router1 ! "Test2"
    router1 ! "Test3"
    router1 ! "Test4"

    Thread.sleep(5000)
    system.shutdown()
  }
}

■MessagePrintActor.scala

class MessagePrintActor extends Actor {

  def receive = {
    case msg: String => {
      val message = self.path + ": Received String " + msg
      println(message)
    }
  }
}

上記の状態でプログラムを実行すると、結果は下記のようになります。

class MessagePrintActor extends Actor {
akka://ConfiguredRoutingApp/user/router1/$c: Received String Test3
akka://ConfiguredRoutingApp/user/router1/$a: Received String Test1
akka://ConfiguredRoutingApp/user/router1/$b: Received String Test2
akka://ConfiguredRoutingApp/user/router1/$a: Received String Test4

Router配下にMessagePrintActorが3つ生成され、RoundRobin方式で配信されているのがわかりますね。

尚、RoundRobinPoolは下記のようになっています。
そのため、"nr-of-instances"で指定した値を基に子のActorが生成されるのはわかります。
■RoundRobin.scala

final case class RoundRobinPool(
  override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
  override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
  override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
  override val usePoolDispatcher: Boolean = false)
  extends Pool with PoolOverrideUnsetConfig[RoundRobinPool] {

  def this(config: Config) =
    this(nrOfInstances = config.getInt("nr-of-instances"),
      resizer = DefaultResizer.fromConfig(config),
      usePoolDispatcher = config.hasPath("pool-dispatcher"))

ですが、何故"router = round-robin-pool"でRoundRobinPoolが呼び出されるのかな、
と思ってreference.confを見てみますと下記のような定義がありました。

    router.type-mapping {
      from-code = "akka.routing.NoRouter"
      round-robin-pool = "akka.routing.RoundRobinPool"
      round-robin-group = "akka.routing.RoundRobinGroup"
      random-pool = "akka.routing.RandomPool"
      random-group = "akka.routing.RandomGroup"
      balancing-pool = "akka.routing.BalancingPool"
      smallest-mailbox-pool = "akka.routing.SmallestMailboxPool"
      broadcast-pool = "akka.routing.BroadcastPool"
      broadcast-group = "akka.routing.BroadcastGroup"
      scatter-gather-pool = "akka.routing.ScatterGatherFirstCompletedPool"
      scatter-gather-group = "akka.routing.ScatterGatherFirstCompletedGroup"
      consistent-hashing-pool = "akka.routing.ConsistentHashingPool"
      consistent-hashing-group = "akka.routing.ConsistentHashingGroup"
    }

そのため、自分で作ったクラスを読み込ませたい場合は
フルでクラス名を指定すればOK、のようです。

Akka Actorでの設定ファイルの読み込み方を確認してみる

こんにちは。

前回でRoutingの基本をやって、次に移ろうとした所、
設定ファイルの読み込みが前提になっているような話になってきたので、
一度設定ファイルの読み込み方の確認をしてみます。

参考:
Configuration − Akka Documentation
Routing − Akka Documentation

1. 設定ファイルの読込確認

まず、基本の設定ファイルの読み込みから。
クラスパス直下にapplication.confを作成し、下記の内容で保存します。
■application.conf

akka {
  loglevel = "DEBUG"
}

その上で、実際にどのような設定が適用されているかを下記のコードで確認します。
内容としては、ActorSystemの設定を出力するのみ、となります。
■ConfiguredRoutingApp.scala

object ConfiguredRoutingApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply("MessageSendApp")
    println(system.settings);
  }
}

結果は下記のようになりました。
省略していますが、クラスパスやシステム情報、といったものまで含まれていました。
自分で作成したapplication.confと、あとはreference.confをマージしたものになっているようです。

尚、reference.conf上はloglevelは「INFO」となっているため、
application.confで記述すると上書きされるようです。
■実行結果

{
    # merge of application.conf: 1,reference.conf: 8
    "akka" : {
        # reference.conf: 62
        "actor" : {
            # reference.conf: 74
            # Timeout for ActorSystem.actorOf
            "creation-timeout" : "20s",
            # reference.conf: 411
(省略)
        "loggers" : [
            # reference.conf: 17
            "akka.event.Logging$DefaultLogger"
        ],
        # application.conf: 2
        "loglevel" : "DEBUG",
        # reference.conf: 469
(省略)
}

後はapplication.confを下記のように変更して再度同じコードを実行すると、
該当のログレベルのコメントにも反映されます。
■application.conf

akka {
  # カスタムログレベル
  loglevel = "DEBUG"
}

■実行結果

        # application.conf: 3
        # カスタムログレベル
        "loglevel" : "DEBUG",
        # reference.conf: 469

2. 設定ファイル空間を指定しての読込

次は、設定ファイル空間を指定しての読込です。
まず、設定ファイルを下記のように変更します。

akka {
  # カスタムログレベル
  loglevel = "DEBUG"
}

ConfiguredRoutingApp {
  # 個別設定
  akka.loglevel = "ERROR"
}

その上で、設定を読み込むコード側を下記のように修正します。
■ConfiguredRoutingApp.scala

object ConfiguredRoutingApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load()
    val system = ActorSystem.apply("ConfiguredRoutingApp", config.getConfig("ConfiguredRoutingApp"))
    println(system.settings);
  }
}

実行の結果、該当のログレベルは"ERROR"となります。

とりあえず、設定ファイルの記述方法と、その中のサブツリーを指定した読込が出来ることがわかりました。
次は、実際にActorに設定を適用できるか確認してみます。

Akka Actor確認(Router確認)

こんにちは。

前回Actorからの応答を受け取ることができたので、
今回はActorに対してメッセージを割り振るRouterの機能を確認してみます。

参考:
Routing − Akka Documentation

上記の参考ページによると、
RouterはRoutingを行うLogicクラスとRouting先のRouteeListが必要になるようです。
実際にRouterを適用してみると下記のようなコードになります。

■ParentActor.scala

class ParentActor(name: String, childActorList: immutable.IndexedSeq[ActorRef], routingLogic: RoutingLogic) extends Actor {
  // 初期化時に与えられたActorListに対して順に送信するよう初期化
  val routees = immutable.IndexedSeq.tabulate(childActorList.size)(i => new ActorRefRoutee(childActorList(i)))
  val router = new Router(routingLogic, routees)

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String => {
      println("ParentActor: Received String " + msg + " My name is " + name)
      router.route(msg, self)
    }
    case msg: Int => {
      println("ParentActor: Received Int " + msg + " My name is " + name)
    }
  }

■ChildActor.scala(変更なし)

class ChildActor(name: String) extends Actor {

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String => {
      val message = "ChildActor: Received String " + msg + " My name is " + name
      println(message)
      sender ! message.length
    }
  }
}

実際にActorを生成する呼び出し元コードの方は下記のようになります。
■MessageSendApp.scala

object MessageSendApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply("MessageSendApp")
    val childActor1 = system.actorOf(Props.apply(new ChildActor("child1")))
    val childActor2 = system.actorOf(Props.apply(new ChildActor("child2")))
    val childActor3 = system.actorOf(Props.apply(new ChildActor("child3")))
    val seq = immutable.IndexedSeq(childActor1,childActor2,  childActor3)

    val routingLogic = new RoundRobinRoutingLogic

    val parentActor = system.actorOf(Props.apply(new ParentActor("parent1", seq, new RoundRobinRoutingLogic)))

    parentActor ! """Test1"""
    parentActor ! """Test2"""
    parentActor ! """Test3"""
    parentActor ! """Test4"""

    Thread.sleep(5000)
    system.shutdown()
  }
}

上記のコードを実際に実行してみると下記のようになりました。

ParentActor: Received String Test1 My name is parent1
ParentActor: Received String Test2 My name is parent1
ChildActor: Received String Test1 My name is child1
ChildActor: Received String Test2 My name is child2
ParentActor: Received String Test3 My name is parent1
ParentActor: Received String Test4 My name is parent1
ChildActor: Received String Test3 My name is child3
ParentActor: Received Int 51 My name is parent1
ChildActor: Received String Test4 My name is child1
ParentActor: Received Int 51 My name is parent1
ParentActor: Received Int 51 My name is parent1
ParentActor: Received Int 51 My name is parent1

メッセージがchild1、child2、child3と順に配信され、
全てのChildActorに割り振られると次はchild1に戻ることがわかります。

尚、RoundRobinRoutingLogicの実装は下記のようになっていました。

final class RoundRobinRoutingLogic extends RoutingLogic {
  val next = new AtomicLong(0)

  override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.isEmpty) NoRoutee
    else routees((next.getAndIncrement % routees.size).asInstanceOf[Int])

}

現在何通目か、というステータスのみは保持しておき、
immutableなRouteeListの剰余を取って配信していく・・という実装になっていますね。

他にもRoutingLogicは存在しますが、
とりあえずは使い方がわかったので今回はそれでよしとしようと思います。

Akka Actor確認(呼び出し元での結果取得)

こんにちは。

前回、Actorからの応答を受信できないか試したところ空の結果が返ってきた・・
という結果だったのですが、これでは実際使いにくい、と調べてみたところ、案の定応答を受信する仕組みは存在しました。

akka.patterns.askをインポートした上で ? で問い合わせをかけることでFuture型の返り値を取得できるようです。

尚、下記のようにakka.patterns.askを確認してみると、暗黙設定としてtimeoutが必要になりますので、
事前に暗黙設定を定義しておく必要があります。
■AskSupport.scala

implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef) // ActorRefに対して暗黙メソッドを追加

final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {

  def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
    case ref: InternalActorRef if ref.isTerminated ⇒
      actorRef ! message
      Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated."))
    case ref: InternalActorRef ⇒
      if (timeout.duration.length <= 0)
        Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]"))
      else {
        val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString)
        actorRef.tell(message, a)
        a.result.future
      }
    case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]"))
  }

  def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout) // ? をつけるとこの処理にマッピングされる。
}

というわけで、下記のようにActorとAppを修正してみます。
■HelloWorldActor.scala

class HelloWorldActor(name :String) extends Actor {
  /** Actor初期化時処理 */
  override def preStart = {println(name + " is started.")  }

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String  => {
      println("HelloWorldActor: Hello world! " + msg + " My name is " + name)
      sender ! "HelloWorldActor: Hello world! " + msg + " My name is " + name
    }
  }

  /** Actor終了時処理 */
  override def postStop = {println(name + " is stopped.")  }
}

■HelloWorldApp.scala

import akka.pattern.ask
import akka.actor.{ActorSystem, Props}
import scala.concurrent.Await
import akka.util.Timeout
import java.util.concurrent.TimeUnit

object HelloWorldApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem("HelloWorldApp")
    val helloWorldActor = system.actorOf(Props.apply(new HelloWorldActor("actor1")), "HelloWorldActor")
    implicit val timeout = Timeout(5000, TimeUnit.MILLISECONDS) // ? 実行時の暗黙タイムアウト設定

    val futureTest1 = helloWorldActor ? """Test1"""
    val futureTest2 = helloWorldActor ? """Test2"""
    val unitTest3  = helloWorldActor ! """Test3"""

    println("Test1 future is " + futureTest1)
    val resultTest1 = Await.result(futureTest1, timeout.duration).asInstanceOf[String]
    println("Test1 result is " + resultTest1)

    println("Test2 future is " + futureTest2)
    val resultTest2 = Await.result(futureTest2, timeout.duration).asInstanceOf[String]
    println("Test2 result is " + resultTest2)

    println("Test3 unit is " + unitTest3)
    println("Test3 unit class is " + unitTest3.getClass)

    Thread.sleep(5000)
    system.shutdown()
  }

上記の修正の結果、実行してみると?で実行した側は返り値がFutureとなり、
結果を取得することができました。

?で実行した場合は応答を返すことで応答を受信できます。
ですが、!で実行した場合は応答を返しても受信できないため、1通のメッセージはdeadLetters行き、ということがわかります。

actor1 is started.
Test1 future is scala.concurrent.impl.Promise$DefaultPromise@636e8707
HelloWorldActor: Hello world! Test1 My name is actor1
Test1 result is HelloWorldActor: Hello world! Test1 My name is actor1
Test2 future is scala.concurrent.impl.Promise$DefaultPromise@337ea934
HelloWorldActor: Hello world! Test2 My name is actor1
Test2 result is HelloWorldActor: Hello world! Test2 My name is actor1
HelloWorldActor: Hello world! Test3 My name is actor1
Test3 unit is ()
Test3 unit class is void
[INFO] [07/26/2014 08:19:06.119] [HelloWorldApp-akka.actor.default-dispatcher-2] [akka://HelloWorldApp/deadLetters] Message [java.lang.String] from Actor[akka://HelloWorldApp/user/HelloWorldActor#571958489] to Actor[akka://HelloWorldApp/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
actor1 is stopped.

尚、senderを使わず、単に結果を返した場合は応答は呼び出し元に返らず、
呼び出し元でタイムアウトが発生する、という動作になりました。

非同期のやり取りがこれだけで書けるというのはやはり大きいですね。

Akka Actor確認(メッセージの応答受信)

こんにちは。

今回はAkka Actorに対して送信したメッセージを受信する方法の確認をしてみます。

1. そのまま返り値は返る?

まず、receiveメソッドの結果返り値を返すように修正し、
呼び出し側でメッセージを送った際に返り値をキャストするようにしてみます。
■HelloWorldActor.scala

class HelloWorldActor(name :String) extends Actor {
  /**
   * Actor初期化時処理
   */
  override def preStart = {println(name + " is started.")  }

  /**
   * メッセージ受信時処理
   */
  def receive = {
    case msg: String  => {
      println("HelloWorldActor: Hello world! " + msg + " My name is " + name)
      "HelloWorldActor: Hello world! " + msg + " My name is " + name
    }
  }

  /**
   * Actor終了時処理
   */
  override def postStop = {println(name + " is stopped.")  }
}

■HelloWorldApp.scala

object HelloWorldApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply("HelloWorldApp")
    val helloWorldActor = system.actorOf(Props.apply(new HelloWorldActor("actor1")), "HelloWorldActor")

    val result1 = helloWorldActor ! """Test1"""
    val result2 = helloWorldActor ! """Test2"""

    println("Test1 result is " + result1)
    println("Test2 result is " + result2)

    Thread.sleep(5000)
    system.shutdown()
  }
}

上記のプログラムを実行してみると結果は下記。
Actorの返り値は返ってきていません。
・・まぁ、Actorの処理自体は非同期で行われるので、考えてみればこれで受け取れるはずはないのですが。

Test1 result is ()
Test2 result is ()
actor1 is started.
HelloWorldActor: Hello world! Test1 My name is actor1
HelloWorldActor: Hello world! Test2 My name is actor1
actor1 is stopped.

2. senderに応答を渡すとどうなる?

というわけで、今度はActorが保持しているsenderに対してメッセージを渡してみます。
■HelloWorldActor.scala

class HelloWorldActor(name :String) extends Actor {
  /**
   * Actor初期化時処理
   */
  override def preStart = {println(name + " is started.")  }

  /**
   * メッセージ受信時処理
   */
  def receive = {
    case msg: String  => {
      println("HelloWorldActor: Hello world! " + msg + " My name is " + name)
      sender ! "HelloWorldActor: Hello world! " + msg + " My name is " + name
    }
  }

  /**
   * Actor終了時処理
   */
  override def postStop = {println(name + " is stopped.")  }
}

修正後、実際に実行してみますと・・?

Test1 result is ()
actor1 is started.
Test2 result is ()
HelloWorldActor: Hello world! Test1 My name is actor1
HelloWorldActor: Hello world! Test2 My name is actor1
[INFO] [07/25/2014 06:29:04.365] [HelloWorldApp-akka.actor.default-dispatcher-3] [akka://HelloWorldApp/deadLetters] Message [java.lang.String] from Actor[akka://HelloWorldApp/user/HelloWorldActor#1270036030] to Actor[akka://HelloWorldApp/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [07/25/2014 06:29:04.365] [HelloWorldApp-akka.actor.default-dispatcher-3] [akka://HelloWorldApp/deadLetters] Message [java.lang.String] from Actor[akka://HelloWorldApp/user/HelloWorldActor#1270036030] to Actor[akka://HelloWorldApp/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
actor1 is stopped.

deadLettersというActorに渡されて処理されなかったよ、というログが出力されます。
実際に下記のページを見てみると、
senderとは「senderを使用することで、アクターが最後に受け取ったメッセージの送信元となるアクターを参照することができる」とあります。
https://sites.google.com/site/scalajp/home/documentation/scala_actor_tutorial

なので、Actorを渡していないのでAkkaがデフォルトのActorであるdeadLettersを設定しており、
deadLettersが受け取っている・・という形になっているようです。

3. Actorを入れ子にしてみると?

「senderを使用することで、アクターが最後に受け取ったメッセージの送信元となるアクターを参照することができる」とあるので、
やはりActorのメッセージを受け取れるのはActor、となるようです。
そのため、親子のActorを作成して試してみます。

■ParentActor.scala

class ParentActor(name: String, child: ActorRef) extends Actor {

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String => {
      println("ParentActor: Received String " + msg + " My name is " + name)
      child ! "Hello world! " + msg + " My name is " + name
    }
    case msg: Int => {
      println("ParentActor: Received Int " + msg + " My name is " + name)
    }
  }

■ChildActor.scala

class ChildActor(name: String) extends Actor {

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String => {
      val message = "ChildActor: Received String " + msg + " My name is " + name
      println(message)
      sender ! message.length
    }
  }
}

■MessageSendApp.scala

object MessageSendApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply("MessageSendApp")
    val childActor = system.actorOf(Props.apply(new ChildActor("child1")))
    val parentActor = system.actorOf(Props.apply(new ParentActor("parent1", childActor)))

    parentActor ! """Test1"""
    parentActor ! """Test2"""

    Thread.sleep(5000)
    system.shutdown()
  }
}

これで実行してみると下記のようになりました。

ParentActor: Received String Test1 My name is parent1
ParentActor: Received String Test2 My name is parent1
ChildActor: Received String Hello world! Test1 My name is parent1 My name is child1
ChildActor: Received String Hello world! Test2 My name is parent1 My name is child1
ParentActor: Received Int 83 My name is parent1
ParentActor: Received Int 83 My name is parent1

ParentActorがChildActorからの応答を受け取っていることがわかります。
ちなみにChildActor側のコードを見てもらえればわかると思いますが、
「senderを使用した」他には特に何も設定を行っていません。
このあたりを自動的にやってくれるのはいいですね。

4. senderには何が入っている?

最後に、senderにはどういう情報が入っているかが気になったので、
ChildActorを修正してsenderの内容を出力してみます。
■ChildActor.scala

class ChildActor(name: String) extends Actor {

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String => {
      val message = "ChildActor: Received String " + msg + " My name is " + name
      println(message)
      println(sender)
      println(sender.getClass)
      sender ! message.length
    }
  }
}

すると・・?

ParentActor: Received String Test1 My name is parent1
ParentActor: Received String Test2 My name is parent1
ChildActor: Received String Hello world! Test1 My name is parent1 My name is child1
Actor[akka://MessageSendApp/user/$b#-1505499634]
class akka.actor.RepointableActorRef
ChildActor: Received String Hello world! Test2 My name is parent1 My name is child1
ParentActor: Received Int 83 My name is parent1
Actor[akka://MessageSendApp/user/$b#-1505499634]
class akka.actor.RepointableActorRef
ParentActor: Received Int 83 My name is parent1

senderには型が「akka.actor.RepointableActorRef」で、Akka上のActorを識別する情報が入っていることがわかりました。

Receiveする際の型を切り替えてやれば、
メッセージのやり取りを延々継続して処理をし続けることも可能になりますね。

Akka Actor確認(パラメータ、初期化/終了時処理)

こんにちは。

内容的にScalaの復習も兼ねていますがまぁお気になさらず。

前回とりあえず最低限動作するものを作成しました。
今回はActorに対してパラメータを渡す方法と、
初期化処理/終了時処理をどう記述するかを確認してみます。

Actorのソースを見ると、下記のように初期化処理/終了時処理用のメソッドが用意されています。
■Actor.scala

trait Actor {
// (省略)
  /**
   * User overridable callback.
   * <p/>
   * Is called when an Actor is started.
   * Actors are automatically started asynchronously when created.
   * Empty default implementation.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def preStart(): Unit = ()
// (省略)
  /**
   * User overridable callback.
   * <p/>
   * Is called asynchronously after 'actor.stop()' is invoked.
   * Empty default implementation.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
  //#lifecycle-hooks
  def postStop(): Unit = ()
// (省略)
}

そのため、初期化処理/終了時処理を記述し、かつパラメータを渡して初期化する方式に修正します。
#今回は併せて受信したメッセージが文字列の場合にのみ処理を行うよう修正

■HelloWorldActor.scala

class HelloWorldActor(name :String) extends Actor {
  /**
   * Actor初期化時処理
   */
  override def preStart = {println(name + " is started.")  }

  /**
   * メッセージ受信時処理
   */
  def receive = {
    case msg: String  => { println("HelloWorldActor: Hello world! " + msg + " My name is " + name) }
  }

  /**
   * Actor終了時処理
   */
  override def postStop = {println(name + " is stopped.")  }
}

■HelloWorldApp.scala

object HelloWorldApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply("HelloWorldApp")
    val helloWorldActor = system.actorOf(Props.apply(new HelloWorldActor("actor1")), "HelloWorldActor")

    helloWorldActor ! """Test1"""
    helloWorldActor ! """Test2"""
    helloWorldActor ! 1

    system.shutdown()
  }
}

上記のコードを作成して実行すると下記の結果が得られます。

actor1 is started.
HelloWorldActor: Hello world! Test1 My name is actor1
HelloWorldActor: Hello world! Test2 My name is actor1
actor1 is stopped.

初期化/終了時の処理が実行されていることが確認でき、
パラメータとして渡した名前についてもActor側で保持して動作していることがわかります。

ただ、receiveメソッド中で処理されなかった
メッセージについては特にエラーなどは返って来ない・・?
とりあえず、こちらについては次回確認してみます。