akka-distributed-workersの初期化処理確認
こんにちは。
前回でPersistenceまで行ったので、今回はより大きなサンプルであるakka-distributed-workersを読み、
動作の確認を行ってみました。
ソースコードは下記の場所から取得することができます。
typesafehub/activator-akka-distributed-workers
中身を見てみるとakka-cluster、akka-remote、akka-persistenceといった通信周りや
クラスタリング周りのライブラリが使われており、規模も小さめで勉強には丁度いいように見えます。
そのため、akka-distributed-workersを順に読みながら理解を進めていきます。
あ、ただ当然まだ不慣れな関係上間違っているかもしれませんので、
気付いた場合はコメント頂けると幸いです。
1. akka-distributed-workersのActor構成
まず、akka-distributed-workersで使用されているActorは主に下記の11個です。
■固有Actor
- Frontend
- Master
- Worker
- WorkExecutor
- WorkProducer
- WorkResultConsumer
■既成Actor
- ClusterSingletonManager
- akka-clusterを利用してある特定のロールのActorを常時1つだけ有効化するActor
- SharedLeveldbStore
- テスト専用の複数のActorSystemで共有可能な状態保存Actor
- ClusterClient
- Akka-Clusterの外部からClusterの内部のActorと通信するためのActor
- ClusterSingletonProxy
- 自動的にClusterSingletonManagerが有効化しているActorへの接続を維持するActor
- DistributedPubSubMediator
- PubSubメッセージの送受信を媒介するActor
固有Actorについてはソースを読み解いていく中で確認します。
2. Main処理概要
akka-distributed-workersのmain文は下記のようになっています。
def main(args: Array[String]): Unit = { if (args.isEmpty) { startBackend(2551, "backend") Thread.sleep(5000) startBackend(2552, "backend") startWorker(0) Thread.sleep(5000) startFrontend(0) } else { val port = args(0).toInt if (2000 <= port && port <= 2999) startBackend(port, "backend") else if (3000 <= port && port <= 3999) startFrontend(port) else startWorker(port) } }
順々にメソッドを呼び出して初期化/動作を進めていく流れですね。
引数が空だったとしてどういう風に初期化/動作していくかを確認します。
3. Backendの1個目起動
はじめにBackendの1個目を起動・・・と見えるメソッドを呼んでいます。
このメソッドを実行した結果、下記のActor構成になります。
尚、黒の破線がActorSystem1個を表しています。
SharedLeveldbStoreのみは起動後に参照が取得できることの確認を行っています。
4. Backendの2個目起動
次はBackendの2個目の起動となります。
このメソッドを実行した結果、下記のActor構成になります。
ClusterSingletomManagerが複数存在するため
Master等のActorは起動されない・・・という状態になります。
5. Worker起動
次はWorkerの起動です。
起動メソッドを実行した結果、下記のActor構成となります。
WorkerがWorkExecutorとClusterClientを子Actorとして保持し、
ClusterClientが存在しているClusterに対して接続を行います。
その直後にWorkerはMasterに対して登録メッセージを送付します。
これでMasterはWorkerを認識するようになります。
6. Frontend起動
最後にFrontendの起動です。
起動メソッドを実行した結果、下記のActor構成となります。
ClusterProxyでClusterへの接続を行い、
DistributedPubSubMediatorでPubSubメッセージの受信準備を行う形になりますね。
・・・という形で起動処理が行われることがわかりました。
ですが、ClusterClientとClusterProxyが何故使い分けられているか・・はいまいちわからず。
ただ、全体の流れはある程度わかったとは思います。
次回は実際にどういう風にWorkerに対してTaskの割り振りが行われているか・・
という内容を同じように流れを図示化して試してみます。