夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

akka-distributed-workersの初期化処理確認

こんにちは。

前回でPersistenceまで行ったので、今回はより大きなサンプルであるakka-distributed-workersを読み、
動作の確認を行ってみました。

ソースコードは下記の場所から取得することができます。
typesafehub/activator-akka-distributed-workers

中身を見てみるとakka-cluster、akka-remote、akka-persistenceといった通信周りや
クラスタリング周りのライブラリが使われており、規模も小さめで勉強には丁度いいように見えます。
そのため、akka-distributed-workersを順に読みながら理解を進めていきます。

あ、ただ当然まだ不慣れな関係上間違っているかもしれませんので、
気付いた場合はコメント頂けると幸いです。

1. akka-distributed-workersのActor構成

まず、akka-distributed-workersで使用されているActorは主に下記の11個です。

■固有Actor

  • Frontend
  • Master
  • Worker
  • WorkExecutor
  • WorkProducer
  • WorkResultConsumer

■既成Actor

  • ClusterSingletonManager
    • akka-clusterを利用してある特定のロールのActorを常時1つだけ有効化するActor
  • SharedLeveldbStore
    • テスト専用の複数のActorSystemで共有可能な状態保存Actor
  • ClusterClient
    • Akka-Clusterの外部からClusterの内部のActorと通信するためのActor
  • ClusterSingletonProxy
    • 自動的にClusterSingletonManagerが有効化しているActorへの接続を維持するActor
  • DistributedPubSubMediator
    • PubSubメッセージの送受信を媒介するActor

固有Actorについてはソースを読み解いていく中で確認します。

2. Main処理概要

akka-distributed-workersのmain文は下記のようになっています。

  def main(args: Array[String]): Unit = {
    if (args.isEmpty) {
      startBackend(2551, "backend")
      Thread.sleep(5000)
      startBackend(2552, "backend")
      startWorker(0)
      Thread.sleep(5000)
      startFrontend(0)
    } else {
      val port = args(0).toInt
      if (2000 <= port && port <= 2999)
        startBackend(port, "backend")
      else if (3000 <= port && port <= 3999)
        startFrontend(port)
      else
        startWorker(port)
    }
  }

順々にメソッドを呼び出して初期化/動作を進めていく流れですね。
引数が空だったとしてどういう風に初期化/動作していくかを確認します。

3. Backendの1個目起動

はじめにBackendの1個目を起動・・・と見えるメソッドを呼んでいます。
このメソッドを実行した結果、下記のActor構成になります。

尚、黒の破線がActorSystem1個を表しています。
SharedLeveldbStoreのみは起動後に参照が取得できることの確認を行っています。

4. Backendの2個目起動

次はBackendの2個目の起動となります。
このメソッドを実行した結果、下記のActor構成になります。

ClusterSingletomManagerが複数存在するため
Master等のActorは起動されない・・・という状態になります。

5. Worker起動

次はWorkerの起動です。
起動メソッドを実行した結果、下記のActor構成となります。

WorkerがWorkExecutorとClusterClientを子Actorとして保持し、
ClusterClientが存在しているClusterに対して接続を行います。

その直後にWorkerはMasterに対して登録メッセージを送付します。
これでMasterはWorkerを認識するようになります。

6. Frontend起動

最後にFrontendの起動です。
起動メソッドを実行した結果、下記のActor構成となります。

ClusterProxyでClusterへの接続を行い、
DistributedPubSubMediatorでPubSubメッセージの受信準備を行う形になりますね。

・・・という形で起動処理が行われることがわかりました。
ですが、ClusterClientとClusterProxyが何故使い分けられているか・・はいまいちわからず。
ただ、全体の流れはある程度わかったとは思います。

次回は実際にどういう風にWorkerに対してTaskの割り振りが行われているか・・
という内容を同じように流れを図示化して試してみます。