読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Akka-Persistenceでイベントを保存する

こんにちは。

はじめはTypedActorをやろうとも思ったのですが、
Akka-Meetupでもあったように、TypedActorは使いにくいので通常あまり使われないようです。
#実際、クラス階層も変わるためいまいち使いにくい、というのはありました。

並行処理初心者のためのAkka入門
http://www.slideshare.net/sifue/akka-39611889

そのため、Akka-Persistenceを試してみます。

1. Akka-Persistenceでイベント情報を保存する

まずはAkka-Persistenceでイベント情報を保存/復元できるかの確認をまずは行います。
設定ファイルは下記になります。

■persistence-app.conf

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
 }
}

akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

■SamplePersistentActor.scala
動作するActorは下記になります。
受信したメッセージに応じて動作を変える・・・という形になります。

import akka.actor.ActorLogging
import akka.persistence.{SnapshotOffer, PersistentActor}

/**
 * Akka-Persistenceの確認を行うサンプルActor
 */
class SamplePersistentActor(name: String) extends PersistentActor with ActorLogging {

  override def persistenceId: String = "SamplePersistentActor" + name

  var stateCount = 0

  override def receiveCommand: SamplePersistentActor#Receive = {
    case "path" => context.sender ! self.path
    case "print" => println(self.path + ":" + stateCount)
    case "snap" => saveSnapshot(stateCount)
    case "view" => context.sender ! self.path + ":" + stateCount
    case message: String => stateCount += message.length
  }

  override def receiveRecover: SamplePersistentActor#Receive = {
    case SnapshotOffer(_, snapshot: Int) => stateCount = snapshot
    case other:Any => println(self.path + "(" + stateCount + "):" + other)
  }
}

■PersistenceApp.scala

import akka.actor.{ActorDSL, Props, ActorSystem}
import com.typesafe.config.ConfigFactory

/**
 * Akka-Persistenceの確認を行うサンプルアプリケーション
 */
object PersistenceApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("conf/persistence-app.conf")
    implicit val system = ActorSystem.apply("PersistentApp", config)

    val actor = system.actorOf(Props.apply(new SamplePersistentActor("actor")))
    val rootInbox = ActorDSL.inbox()

    rootInbox.send(actor, "view")
    val msg1 = rootInbox.receive()
    println("Receive1:" + msg1)

    actor ! "Test1"
    actor ! "snap"

    Thread.sleep(5000)
    system.shutdown()
  }
}

上記のアプリケーションを実行すると下記になります。
■1回目

[INFO] [10/06/2014 09:34:59.998] [main] [Remoting] Starting remoting
akka://PersistentApp/user/$a(0):RecoveryCompleted
Receive1:akka://PersistentApp/user/$a:0
[INFO] [10/06/2014 09:35:20.724] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down

■2回目

[INFO] [10/06/2014 09:36:54.234] [main] [Remoting] Starting remoting
akka://PersistentApp/user/$a(5):RecoveryCompleted
Receive1:akka://PersistentApp/user/$a:5
[INFO] [10/06/2014 09:36:59.899] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down

2回目の起動で状態が復旧しているのがわかりますね。

尚、ファイル自体は設定で指定した下記のディレクトリ配下に出力されます。

akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

2. Akka-Persistenceはどのタイミングでイベントが保存される?

先ほどのアプリケーションでは"snap"メッセージを送信して明示的にファイルを保存していましたが、
snapメッセージを送付しない場合はどうなるか、も確認してみました。
■PersistenceApp.scala

import akka.actor.{ActorDSL, Props, ActorSystem}
import com.typesafe.config.ConfigFactory
import akka.persistence.Recover

/**
 * Akka-Persistenceの確認を行うサンプルアプリケーション
 */
object PersistenceApp extends App {
  override def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("conf/persistence-app.conf")
    implicit val system = ActorSystem.apply("PersistentApp", config)

    val actor = system.actorOf(Props.apply(new SamplePersistentActor("actor")))
    val rootInbox = ActorDSL.inbox()

    rootInbox.send(actor, "view")
    val msg1 = rootInbox.receive()
    println("Receive1:" + msg1)

    actor ! "Test1"
    // メッセージ送信後、snap無しに再読み込みを実施
    actor ! Recover()

    rootInbox.send(actor, "view")
    val msg2 = rootInbox.receive()
    println("Receive2:" + msg2)

    actor ! "Test2"
    actor ! "snap"

    Thread.sleep(5000)
    system.shutdown()
  }
}

実行結果は下記の通りです。
■1回目

[INFO] [10/06/2014 09:53:57.489] [main] [Remoting] Starting remoting
akka://PersistentApp/user/$a(0):RecoveryCompleted
Receive1:akka://PersistentApp/user/$a:0
Receive2:akka://PersistentApp/user/$a:5
[INFO] [10/06/2014 09:54:03.144] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down

■2回目

[INFO] [10/06/2014 10:00:31.279] [main] [Remoting] Starting remoting
akka://PersistentApp/user/$a(0):RecoveryCompleted
Receive1:akka://PersistentApp/user/$a:0
Receive2:akka://PersistentApp/user/$a:5
[INFO] [10/06/2014 10:00:36.933] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down

Recoverメッセージを送信しても既に値が存在している場合は再読み込みは行われない。
ただ、saveSnapshotメソッドを呼び出していない場合はSystemがダウンするとファイルは保存はされていない・・・
となるようです。

後はActor単位で殺した場合はどうなるか、も気になる所ですが、
それはもっと本格的なプログラムを組んだ時に試してみます。