読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Akka Actor確認(Router確認)

こんにちは。

前回Actorからの応答を受け取ることができたので、
今回はActorに対してメッセージを割り振るRouterの機能を確認してみます。

参考:
Routing − Akka Documentation

上記の参考ページによると、
RouterはRoutingを行うLogicクラスとRouting先のRouteeListが必要になるようです。
実際にRouterを適用してみると下記のようなコードになります。

■ParentActor.scala

class ParentActor(name: String, childActorList: immutable.IndexedSeq[ActorRef], routingLogic: RoutingLogic) extends Actor {
  // 初期化時に与えられたActorListに対して順に送信するよう初期化
  val routees = immutable.IndexedSeq.tabulate(childActorList.size)(i => new ActorRefRoutee(childActorList(i)))
  val router = new Router(routingLogic, routees)

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String => {
      println("ParentActor: Received String " + msg + " My name is " + name)
      router.route(msg, self)
    }
    case msg: Int => {
      println("ParentActor: Received Int " + msg + " My name is " + name)
    }
  }

■ChildActor.scala(変更なし)

class ChildActor(name: String) extends Actor {

  /** メッセージ受信時処理 */
  def receive = {
    case msg: String => {
      val message = "ChildActor: Received String " + msg + " My name is " + name
      println(message)
      sender ! message.length
    }
  }
}

実際にActorを生成する呼び出し元コードの方は下記のようになります。
■MessageSendApp.scala

object MessageSendApp extends App {
  override def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply("MessageSendApp")
    val childActor1 = system.actorOf(Props.apply(new ChildActor("child1")))
    val childActor2 = system.actorOf(Props.apply(new ChildActor("child2")))
    val childActor3 = system.actorOf(Props.apply(new ChildActor("child3")))
    val seq = immutable.IndexedSeq(childActor1,childActor2,  childActor3)

    val routingLogic = new RoundRobinRoutingLogic

    val parentActor = system.actorOf(Props.apply(new ParentActor("parent1", seq, new RoundRobinRoutingLogic)))

    parentActor ! """Test1"""
    parentActor ! """Test2"""
    parentActor ! """Test3"""
    parentActor ! """Test4"""

    Thread.sleep(5000)
    system.shutdown()
  }
}

上記のコードを実際に実行してみると下記のようになりました。

ParentActor: Received String Test1 My name is parent1
ParentActor: Received String Test2 My name is parent1
ChildActor: Received String Test1 My name is child1
ChildActor: Received String Test2 My name is child2
ParentActor: Received String Test3 My name is parent1
ParentActor: Received String Test4 My name is parent1
ChildActor: Received String Test3 My name is child3
ParentActor: Received Int 51 My name is parent1
ChildActor: Received String Test4 My name is child1
ParentActor: Received Int 51 My name is parent1
ParentActor: Received Int 51 My name is parent1
ParentActor: Received Int 51 My name is parent1

メッセージがchild1、child2、child3と順に配信され、
全てのChildActorに割り振られると次はchild1に戻ることがわかります。

尚、RoundRobinRoutingLogicの実装は下記のようになっていました。

final class RoundRobinRoutingLogic extends RoutingLogic {
  val next = new AtomicLong(0)

  override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.isEmpty) NoRoutee
    else routees((next.getAndIncrement % routees.size).asInstanceOf[Int])

}

現在何通目か、というステータスのみは保持しておき、
immutableなRouteeListの剰余を取って配信していく・・という実装になっていますね。

他にもRoutingLogicは存在しますが、
とりあえずは使い方がわかったので今回はそれでよしとしようと思います。