夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Clojure勉強日記(その20 エージェントを使った非同期更新

こんにちは。

前回に続いて状態更新ものです。
これまでまとめていたrefとatomは「同期的な更新」を扱っていました。
ここで言う同期的な更新とは、「関数呼び出しが完了した時点で状態の更新も完了している」ということを指します。

ですが、Clojureでは非同期で更新を行ってもいいケースにおいて使用できるエージェントというものがあるそうです。
実際に試してみます。

1.エージェントの使用方法

エージェントの初期化はrefと同じ構文で可能で、参照も同様です。
きちんとオブジェクトを参照するとAgentオブジェクトとして生成されています。

user=> (def current-counter (agent 0))
#'user/current-counter
user=> @current-counter
0
user=> current-counter
#<Agent@6ac006a6: 0>
user=> (deref current-counter)
0

エージェントを更新する場合はrefのdosync、atomのresetとは違い、send関数を使用します。
「非同期更新」という位置づけのため、「更新を行う関数」をキューにつめて非同期に実行する・・・というような位置づけのようです。

user=> (send current-counter inc)
#<Agent@6ac006a6: 0>
; send関数の返り値は「更新前のエージェントの値」となる。非同期なのである意味当然な動作。
user=> @current-counter
1
; send後確認すると更新されている。

「エージェントが更新されたこと」を待ち受ける関数としてawaitがありますが、
これだけ小さい更新だと当然完了していますね(汗

user=> (send current-counter inc)
#<Agent@6ac006a6: 1>
user=> (await current-counter)
nil
; 特定のエージェントに対してsendキューが完了するまで待つ関数。
user=> @current-counter
2
user=> (send current-counter inc)
#<Agent@6ac006a6: 2>
user=> (await-for 10 current-counter)
true
; タイムアウト設定して待つ関数
エージェントのバリデーション

refと同じくエージェントもバリデーション処理が可能です。
・・・なのですが、非同期処理のためどういう形でエラーが発生したことが検知できるのかは気になるところではあります。
ともあれ、やってみましょう。

user=> (def current-counter (agent 0 :validator number?))
#'user/current-counter
; 文字列を設定する関数を与えたところ、下記の3パターンの事象が発生。REPLとエージェントの動作が非同期で行われているから?
; Agentの返り値の時点でエラー
user=> (send current-counter (fn [_] "Test"))
#<Agent@33cb47cb FAILED: 0>
; Agentへのsend関数は成功
user=> (send current-counter (fn [_] "Test"))
#<Agent@33cb47cb: 0>
; 普通に例外が発生
user=> (send current-counter (fn [_] "Test1"))
IllegalStateException Invalid reference state  clojure.lang.ARef.validate (ARef.java:33)
; Agentのエラーを確認可能
user=> (agent-errors current-counter)
(#<IllegalStateException java.lang.IllegalStateException: Invalid reference state>)
user=> (class (agent-errors current-counter))
clojure.lang.PersistentList
; エラー発生後も値にはアクセス可能
user=> @current-counter
0
; クリア関数を実行すると例外も消える
user=> (clear-agent-errors current-counter)
0
user=> (agent-errors current-counter)
nil
エージェントをトランザクション中で使用する

refの時にトランザクションは再実行される可能性があるため、
トランザクションは副作用を持ってはいけないという話がありました。

ですが、トランザクションは副作用と密接にかかわっており、成功したい場合にのみ副作用を起こしたい、という場合もあります。
そのせいか、トランザクション中でエージェントにアクションを送ると
そのアクションはトランザクションが成功した場合のみ一度だけ送信されることが保証するという機構があります。
例として、トランザクション成功時にファイル出力するエージェントを試してみます。

user=> (def message-backup (agent "message-backup.clj"))
#'user/message-backup
; ファイル名を保持するエージェント
user=> (def validate-message-list
  (partial every? #(and (:sender %) (:text %))))
; バリデータ
#'user/validate-message-list
user=> (def current-messages (ref () :validator validate-message-list))
#'user/current-messages
; メッセージrefの定義
user=> (defn add-message-with-backup [msg]
  (dosync 
    (let [snapshot (commute current-messages conj msg)]
      (send-off message-backup (fn [filename] (spit filename snapshot) filename)))))
#'user/add-message-with-backup
; commuteの結果(snapshot)をファイルに出力する関数
user=> (add-message-with-backup "Test")
IllegalStateException Invalid reference state  clojure.lang.ARef.validate (ARef.java:33)
; 不正な値を渡すとバリデータではじかれ、ファイルに出力されない
user=> (add-message-with-backup
 (new user.Message "User01" "Text01"))
#<Agent@57d0258a: "message-backup.clj">
user=> (add-message-with-backup
 (new user.Message "User02" "Text02"))
#<Agent@57d0258a: "message-backup.clj">
; この時点で"message-backup.clj"には2メッセージ出力
user=> (add-message-with-backup "Invalid-Message")
IllegalStateException Invalid reference state  clojure.lang.ARef.validate (ARef.java:33)
; "message-backup.clj"はバリデーションエラーのため変化しない

新出関数としてspit(≠split)、send-offがありますが、見てみると以下の通り。

user=> (doc spit)
-------------------------
clojure.core/spit
([f content & options])
  Opposite of slurp.  Opens f with writer, writes content, then
  closes f. Options passed to clojure.java.io/writer.
; 与えたファイルパス&コンテンツをファイル出力する関数
user=> (doc send-off)
-------------------------
clojure.core/send-off
([a f & args])
  Dispatch a potentially blocking action to an agent. Returns the
  agent immediately. Subsequently, in a separate thread, the state of
  the agent will be set to the value of:
  (apply action-fn state-of-agent args)
; アクションを別スレッドで行うための関数

尚、Clojureのこのトランザクションが保証するのは「アクションをエージェントに送るまで」です。
そのため、アクションが実施されなくてもトランザクション自体は完了とみなされる。
結果、トランザクションが完了してからファイル出力されるまでラグが発生する形になります。
その辺り気をつける必要がありますね。というかその場合は素直にDB使いましょう、という形になりますが。

更新モデル

これまでやってきた状態更新の概念と周辺関数を表にすると以下のようになります。

更新方法 リファレンス(ref) アトム(atom) エージェント(agent)
統一的な更新関数 alter swap! send-off
交換可能な更新関数 commute (常時交換可能) (常時交換可能)
ブロックしない更新関数 (ブロックが前提) (ブロックが前提) send
値を設定するだけの更新関数 ref-set reset! なし

基本的には「統一的な更新関数」を使い、それ以外のケースは補助的に使う形になりそうです。
尚、varについてはスレッドローカルな状態を管理するのに使用される・・・とのことです。
それについてはまた次回に。