Akka-Persistenceでイベントを保存する
こんにちは。
はじめはTypedActorをやろうとも思ったのですが、
Akka-Meetupでもあったように、TypedActorは使いにくいので通常あまり使われないようです。
#実際、クラス階層も変わるためいまいち使いにくい、というのはありました。
並行処理初心者のためのAkka入門
http://www.slideshare.net/sifue/akka-39611889
そのため、Akka-Persistenceを試してみます。
1. Akka-Persistenceでイベント情報を保存する
まずはAkka-Persistenceでイベント情報を保存/復元できるかの確認をまずは行います。
設定ファイルは下記になります。
■persistence-app.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } } akka.persistence.journal.leveldb.dir = "target/example/journal" akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
■SamplePersistentActor.scala
動作するActorは下記になります。
受信したメッセージに応じて動作を変える・・・という形になります。
import akka.actor.ActorLogging import akka.persistence.{SnapshotOffer, PersistentActor} /** * Akka-Persistenceの確認を行うサンプルActor */ class SamplePersistentActor(name: String) extends PersistentActor with ActorLogging { override def persistenceId: String = "SamplePersistentActor" + name var stateCount = 0 override def receiveCommand: SamplePersistentActor#Receive = { case "path" => context.sender ! self.path case "print" => println(self.path + ":" + stateCount) case "snap" => saveSnapshot(stateCount) case "view" => context.sender ! self.path + ":" + stateCount case message: String => stateCount += message.length } override def receiveRecover: SamplePersistentActor#Receive = { case SnapshotOffer(_, snapshot: Int) => stateCount = snapshot case other:Any => println(self.path + "(" + stateCount + "):" + other) } }
■PersistenceApp.scala
import akka.actor.{ActorDSL, Props, ActorSystem} import com.typesafe.config.ConfigFactory /** * Akka-Persistenceの確認を行うサンプルアプリケーション */ object PersistenceApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/persistence-app.conf") implicit val system = ActorSystem.apply("PersistentApp", config) val actor = system.actorOf(Props.apply(new SamplePersistentActor("actor"))) val rootInbox = ActorDSL.inbox() rootInbox.send(actor, "view") val msg1 = rootInbox.receive() println("Receive1:" + msg1) actor ! "Test1" actor ! "snap" Thread.sleep(5000) system.shutdown() } }
上記のアプリケーションを実行すると下記になります。
■1回目
[INFO] [10/06/2014 09:34:59.998] [main] [Remoting] Starting remoting akka://PersistentApp/user/$a(0):RecoveryCompleted Receive1:akka://PersistentApp/user/$a:0 [INFO] [10/06/2014 09:35:20.724] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down
■2回目
[INFO] [10/06/2014 09:36:54.234] [main] [Remoting] Starting remoting akka://PersistentApp/user/$a(5):RecoveryCompleted Receive1:akka://PersistentApp/user/$a:5 [INFO] [10/06/2014 09:36:59.899] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down
2回目の起動で状態が復旧しているのがわかりますね。
尚、ファイル自体は設定で指定した下記のディレクトリ配下に出力されます。
akka.persistence.journal.leveldb.dir = "target/example/journal" akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
2. Akka-Persistenceはどのタイミングでイベントが保存される?
先ほどのアプリケーションでは"snap"メッセージを送信して明示的にファイルを保存していましたが、
snapメッセージを送付しない場合はどうなるか、も確認してみました。
■PersistenceApp.scala
import akka.actor.{ActorDSL, Props, ActorSystem} import com.typesafe.config.ConfigFactory import akka.persistence.Recover /** * Akka-Persistenceの確認を行うサンプルアプリケーション */ object PersistenceApp extends App { override def main(args: Array[String]): Unit = { val config = ConfigFactory.load("conf/persistence-app.conf") implicit val system = ActorSystem.apply("PersistentApp", config) val actor = system.actorOf(Props.apply(new SamplePersistentActor("actor"))) val rootInbox = ActorDSL.inbox() rootInbox.send(actor, "view") val msg1 = rootInbox.receive() println("Receive1:" + msg1) actor ! "Test1" // メッセージ送信後、snap無しに再読み込みを実施 actor ! Recover() rootInbox.send(actor, "view") val msg2 = rootInbox.receive() println("Receive2:" + msg2) actor ! "Test2" actor ! "snap" Thread.sleep(5000) system.shutdown() } }
実行結果は下記の通りです。
■1回目
[INFO] [10/06/2014 09:53:57.489] [main] [Remoting] Starting remoting akka://PersistentApp/user/$a(0):RecoveryCompleted Receive1:akka://PersistentApp/user/$a:0 Receive2:akka://PersistentApp/user/$a:5 [INFO] [10/06/2014 09:54:03.144] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down
■2回目
[INFO] [10/06/2014 10:00:31.279] [main] [Remoting] Starting remoting akka://PersistentApp/user/$a(0):RecoveryCompleted Receive1:akka://PersistentApp/user/$a:0 Receive2:akka://PersistentApp/user/$a:5 [INFO] [10/06/2014 10:00:36.933] [ForkJoinPool-5-worker-7] [Remoting] Remoting shut down
Recoverメッセージを送信しても既に値が存在している場合は再読み込みは行われない。
ただ、saveSnapshotメソッドを呼び出していない場合はSystemがダウンするとファイルは保存はされていない・・・
となるようです。
後はActor単位で殺した場合はどうなるか、も気になる所ですが、
それはもっと本格的なプログラムを組んだ時に試してみます。