読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Inboxを用いてFutureを使うよりお手軽に応答を受信する

こんにちは。

以前の投稿でActorから応答を受信するための方法について確認しましたが、
1つの応答ごとにFutureを用いる必要があり、正直な話かなり使いにくいものでした。

そのため、もっと簡単に応答を受信できるものが無いかな・・
と確認してみたところ、Inboxというコンポーネントがあり、
それを用いると応答が受信できることが確認できました。

参考:
Actors− Akka Documentation

■InboxApp.scala

object InboxApp extends App {
  override def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem.apply("ConfiguredRoutingApp")

    val actor1 = system.actorOf(Props[MessagePrintActor])

    val rootInbox = ActorDSL.inbox()
    rootInbox.send(actor1, "Test1")
    rootInbox.send(actor1, "Test2")
    rootInbox.send(actor1, "Test3")
    rootInbox.send(actor1, "Test4")

    Thread.sleep(1000)

    val msg1 = rootInbox.receive()
    println("Receive1:" + msg1)
    val msg2 = rootInbox.receive()
    println("Receive2:" + msg2)
    val msg3 = rootInbox.receive()
    println("Receive3:" + msg3)

    Thread.sleep(5000)
    system.shutdown()
  }
}

Inbox#send()メソッドを用いることで、Actorからの応答がInboxに蓄積されます。
#Watchメソッドを使った場合は何故か上手くいきませんでした(汗

蓄積後、receiveメソッドを使用することで応答を取得できます。
尚、メッセージが無い状態でreceiveメソッドを実行すると
一定時間待った後、TimeoutExceptionが発生するので、そのつもりで実装する必要があります。

上記のコードを記述して実行すると結果は下記のようになります。

akka://ConfiguredRoutingApp/user/$a: Received String Test1
akka://ConfiguredRoutingApp/user/$a: Received String Test2
akka://ConfiguredRoutingApp/user/$a: Received String Test3
akka://ConfiguredRoutingApp/user/$a: Received String Test4
Receive1:akka://ConfiguredRoutingApp/user/$a: Received String Test1
Receive2:akka://ConfiguredRoutingApp/user/$a: Received String Test2
Receive3:akka://ConfiguredRoutingApp/user/$a: Received String Test3

応答が受信できているのが確認できますね。

尚、Inboxのメッセージを受信可能な数はapplication.confの項目「akka.actor.dsl.inbox-size」で設定可能です。
そのため、設定ファイルを下記のように記述した状態で再度同様のコードを実行してみます。

■application.conf

akka.actor.dsl.inbox-size = 3

すると、実行結果は下記のようになりました。

akka://ConfiguredRoutingApp/user/$a: Received String Test1
akka://ConfiguredRoutingApp/user/$a: Received String Test2
akka://ConfiguredRoutingApp/user/$a: Received String Test3
akka://ConfiguredRoutingApp/user/$a: Received String Test4
[WARN] [08/15/2014 07:03:12.477] [ConfiguredRoutingApp-akka.actor.default-dispatcher-2] [akka://ConfiguredRoutingApp/system/dsl/inbox-1] dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is 3
Receive1:akka://ConfiguredRoutingApp/user/$a: Received String Test1
Receive2:akka://ConfiguredRoutingApp/user/$a: Received String Test2
Receive3:akka://ConfiguredRoutingApp/user/$a: Received String Test3

Inboxで容量をオーバーしたのでDropするというメッセージが出力されます。
尚、Inboxのソースを読むと下記のようになっており、上記のWARNログが出力されるのは1回のみのようです。
そのため、基本は溢れないようにサイズを設定する必要がありますね。

■Inbox.scala

  private class InboxActor(size: Int) extends Actor with ActorLogging {
    var clients = Queue.empty[Query]
    val messages = Queue.empty[Any]
    var clientsByTimeout = TreeSet.empty[Query]
    var printedWarning = false

    def enqueueMessage(msg: Any) {
      if (messages.size < size) messages enqueue msg
      else {
        if (!printedWarning) {
          log.warning("dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size)
          printedWarning = true
        }
      }
    }
  }