読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Clojure勉強日記(その18 ref とソフトウェアトランザクショナルメモリ

こんにちは。前回まででとりあえず関数型プログラミングの流儀の入口のようなものが完了し、
次は並行性、並列性、ロック・・と、今度は関数型を使うことによる具体的な利点を確認する内容です。

1.Clojureが用意する参照型

まず、初めにClojureが用意している参照型として下記の4つがあるそうな。

  1. ref 共有状態にある協調的・同期的な変更を管理
  2. アトム 共有状態にある非協調的・同期的な変更を管理
  3. エージェント 共有状態に対する非同期的な変更を管理
  4. var スレッドローカルな状態を管理

・・・やべぇ、どれもわからない。
と思いましたが、とりあえず順に説明されることのため、安心して進めます。

並行性、並列性、ロック

これは概念の解説で一般的な話なのでスキップ。
ただ、Clojureではプログラムのモデルを次の2つの層に分割するという話は重要なのでそれだけまとめ。

  1. 関数型モデル:変更可能な状態を持たないコード。ほとんどのコードはこれに含まれる。読む/テストする/並列化が簡単。
  2. 参照型モデル:変更可能な状態を持つコード。そのために並列化やテストが困難になる。ただ、実質的にこの要素を排除はできない。

ref とソフトウェアトランザクショナルメモリ

Clojureではほとんどのオブジェクトは変更できない。
変更可能なデータがほしい場合、変更可能であることを明記する必要があります。
一つの方法として、「変更不可なオブジェクトの変更可能な参照(ref)」を作成することとなります。

user=> (doc ref)
-------------------------
clojure.core/ref
([x] [x & options])
  Creates and returns a Ref with an initial value of x and zero or
  more options (in any order)

実際定義するとこんな感じ。
defで「参照」をバインドする形になるわけですね。

user=> (def current-state (ref "INACTIVE"))
#'user/current-state
user=> @current-state
"INACTIVE"
user=> current-state
#<Ref@3a9e7c70: "INACTIVE">
user=> (deref current-state)
"INACTIVE"

で、変更する際にはref-setを使用します。
但し、変更は「トランザクション」の中で行う必要があるため、dosyncで囲う必要が出てきます。

user=> (ref-set current-state "ACTIVE")
IllegalStateException No transaction running  clojure.lang.LockingTransaction.getEx (LockingTransaction.java:208)
user=> (dosync (ref-set current-state "ACTIVE"))
"ACTIVE"
user=> @current-state
"ACTIVE"
user=> (deref current-state)
"ACTIVE"

ここではさらっと行ってしまいましたが、
このトランザクションの仕組みはソフトウェアトランザクションメモリ(STM)であり、以下の性質を保証します。

  1. 更新はアトミック:トランザクション中で複数のrefに対する更新を行った場合、それぞれの更新の結果はトランザクション外から見た場合はまとめて一つの不可分のイベントとして観測される。
  2. 更新は一貫している:refにはバリデーション関数をつけることができ、どれか一つでも失敗した場合トランザクション全体が失敗する。
  3. 更新は独立している:トランザクションの中で他のトランザクションの途中結果が見えることはない。

トランザクション分離レベル的には「READ COMMITTED」にあたるのでしょうか。
上記の性質の結果、ACIDのうち「ACI」まで確保されることになります。

以下のようにdosync節の中で更新することで状態とフラグが同時更新されます。

user=> (def current-flag (ref "true"))
#'user/current-flag
user=> (dosync (ref-set current-state "INACTIVE") (ref-set current-flag
 "false"))
"false"
user=> @current-state
"INACTIVE"
user=> @current-flag
"false"

状態だけが更新されてフラグが古いままとか、その逆であるような状態を他のスレッドが見ることは決してないそうです。

alter

先ほどの例だと単純すぎるため、例を多少複雑化して確認します。
「発言者」「テキスト」を保持するメッセージ構造体を作成し、メッセージのリストをrefとして定義します。

user=> (defrecord Message [user text])
user.Message
user=> (new user.Message "User1" "Message1")
#user.Message{:user "User1", :text "Message1"}
user=> (def current-messages (ref ()))
#'user/current-messages

で、実際にメッセージを追加する関数を単純に考えるとrefを読みだして追加するという形になりますね。

user=> (defn add-message [msg] (dosync (ref-set current-messages (cons msg @current-messages))))
#'user/add-message
user=> (add-message
 (new user.Message "User1" "Message1"))
(#user.Message{:user "User1", :text "Message1"})
user=> (add-message
 (new user.Message "User2" "Message2"))
(#user.Message{:user "User2", :text "Message2"} #user.Message{:user "User1", :text "Message1"})

ただ、より効率的な記述方法としてalterがあります。

user=> (doc alter)
-------------------------
clojure.core/alter
([ref fun & args])
  Must be called in a transaction. Sets the in-transaction-value of
  ref to:
  (apply fun in-transaction-value-of-ref args)
  and returns the in-transaction-value of ref.

引数の順番としてはref→関数→引数という順のようです。
そのため、実際にコードに落としてみると下記のようになりますね。

user=> (defn add-message-update [msg] (dosync (alter current-messages conj msg)))
#'user/add-message-update
user=> (add-message-update (new user.Message "User3" "Message3"))
(#user.Message{:user "User3", :text "Message3"} #user.Message{:user "User2", :text "Message2"} #user.Message{:user "User1", :text "Message1"})
STMの仕組み:MVCC

そして登場しましたMVCC「Multi Version Cuncurrency Control)。
Clojureでは以下のように動くようです。

  1. トランザクションA開始時にポイントA(STMの世界でのユニークなタイムスタンプのようなもの)を取得する
  2. ポイントA時点のrefに対するプライベートコピーが作成される(つまり、トランザクション中に参照先の値が更新されることはない)
  3. プライベートコピーに対して更新が実行される(トランザクション中の値と呼ばれる)
  4. トランザクションコミット時にポイントA以降に更新が御子縄得rていた場合、トランザクションは再起動される(例外発生時はトランザクションAはロールバック
  5. トランザクションコミットするとプライベートコピーが実空間に公開される

非常に一般的と言えば一般的な内容ですね。
尚、「トランザクションの途中で他のトランザクションがalterの対象であるrefを更新しても構わないケース」の場合、
alterではなくcommuteという関数が使用可能なようです。

commute

というわけでcommuteになります。構文的にはalterと全く同じ。

clojure.core/commute
([ref fun & args])
  Must be called in a transaction. Sets the in-transaction-value of
  ref to:

但し、交換可能である操作のためのもので、更新がどういう順序で起きてもいい操作ということになります。
最終的に総計を算出する処理なんかはこれにあたるんですかね。
STMはcommuteの実行順序を自由に変えることが可能になるそうです。

user=> (defn add-message-update [msg] (dosync (commute current-messages conj msg)))
#'user/add-message-update
user=> (add-message-update (new user.Message "User1" "Message1"))
(#user.Message{:user "User1", :text "Message1"})

commuteの実行の交換はマイクロ秒のオーダーのため、今回のメッセージ追加については交換可能、となりそうです。
実際、チャットで発生するネットワーク遅延や人間の反応による遅延の方がよほど大きいですからね。

refにバリデーションを追加

ref初期化時にバリデーションを設けることでバリデーションチェックができ、
バリデーションチェックがNGとなった場合にトランザクションを中断させることが可能となります。
実際にrefのドキュメントを全て見てみると、「:validator validate-fn」という記述があります。

user=> (doc ref)
-------------------------
clojure.core/ref

([x] [x & options])

  Creates and returns a Ref with an initial value of x and zero or
  more options (in any order):

  :meta metadata-map

  :validator validate-fn

  :min-history (default 0)
  :max-history (default 10)

  If metadata-map is supplied, it will become the metadata on the
  ref. validate-fn must be nil or a side-effect-free fn of one
  argument, which will be passed the intended new state on any state
  change. If the new state is unacceptable, the validate-fn should
  return false or throw an exception. validate-fn will be called on
  transaction commit, when all refs have their final values.

  Normally refs accumulate history dynamically as needed to deal with
  read demands. If you know in advance you will need history you can
  set :min-history to ensure it will be available when first needed (instead
  of after a read fault). History is limited, and the limit can be set
  with :max-history.

そのため、実際にバリデーションを指定してみます。

; sender と text を保持する必要があるバリデータ
user=> (def validate-message-list
(partial every? #(and (:sender %) (:text %))))
#'user/validate-message-list
user=> (def messages (ref () :validator validate-message-list))
#'user/messages
user=> (defn add-message-update [msg] (dosync (alter messages conj msg)))
#'user/add-message-update
; 通常の文字列を追加するとはじく
user=> (add-message-update "not a valid message")
IllegalStateException Invalid reference state  clojure.lang.ARef.validate (ARef.java:33)
; 違う値を保持する構造体を追加するとはじく
user=> (defrecord MessageText [sender message])
user.MessageText
user=> (add-message-update (user.MessageText. "User1" "Message1"))
IllegalStateException Invalid reference state  clojure.lang.ARef.validate (ARef.java:33)
; 正しい値を保持する構造体を追加すると追加できる
user=> (add-message-update (user.Message. "User1" "Message1"))
(#user.Message{:sender "User1", :text "Message1"})
user=> (add-message-update (user.Message. "User2" "Message2"))
(#user.Message{:sender "User2", :text "Message2"} #user.Message{:sender "User1", :text "Message1"})

と、こういう形で言語のベースとしてバリデーションの仕組みも備わっていることがわかります。
書かずに機能を作成することにはかなり向いていますねぇ。

というわけで今回はここまで。
次回はまた違う状態管理の確認に入ります。