読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Akka StreamsとReactive Streamsの対応は?(その1

こんにちは。

前回とりあえずAkka-Streamsを動かしてみたので、
今回はAkka-StreamsとReactive Streamsの対応を確認してみます。

1. Akka-Streamsのソース取得

まずはAkka-Streamsのソースがなければ始まりません。
そのため、前回のサンプルの依存性から辿って行って、Akka-Streamsのソースを取得します。

前回動作させたサンプルはactivator-akka-stream-scalaですから、まずはactivator-akka-stream-scalaの依存性を見てみます。

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream-experimental" % "0.2"
)

そこからMavenRepositoryを確認してpom.xmlをチェックすると下記の通り。
Reactive Streamsに依存している構成になっています。
尚、Reactive Streamsにはspiとtckという2つのコンポーネントがありますが、
定義を見る限り、tckはテスト用コンポーネントのようですね。(Technology Compatibility Kitの略だそうです。)

<dependencies>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.3</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.10</artifactId>
    <version>2.3.2</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-testkit_2.10</artifactId>
    <version>2.3.2</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.scalatest</groupId>
    <artifactId>scalatest_2.10</artifactId>
    <version>2.0</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.scalacheck</groupId>
    <artifactId>scalacheck_2.10</artifactId>
    <version>1.10.1</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams-spi</artifactId>
    <version>0.3</version>
  </dependency>
  <dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams-tck</artifactId>
    <version>0.3</version>
    <scope>test</scope>
  </dependency>
</dependencies>

ただ、Maven Repositoryの記述に従い、https://github.com/akka/akka に行くとそこにはAkka-Streamsはありませんでした・・
そのため、branchを漁っている最中にTwitterで@yasushiaさんに「release-2.3-dev」というbranchに
ソースがある旨のコメントをいただき、見つけることができました。
・・・普通に、「releasing-akka-stream-experimental-0.3」「releasing-akka-stream-experimental-v0.3」の
どちらが正しいんだろう、って悩んでいましたので^^;

ともあれ、これでソースの入手は完了です。

2. Reactive Streamsのコンポーネントは?

Akka-Streamsのソースを確認する前に、Reactive Streamsのコンポーネントを確認しておきます。
Reactive Streamsのコンポーネント(reactive-streams-spi)は下記の6個から構成されています。
※この6個、というのは「akka-stream-experimental」が使用しているVer0.3を基にしています。
※現状の最新のReactive Streamsは既に構成が変更されていました。

  • Publisher(org.reactivestreams.spiパッケージ、Interface)
    • メッセージの提供元コンポーネント。Subscriberのリクエスト(requestMore)に応じてメッセージを提供する。
    • Publisher側でsubscribeしたメッセージはSubscriberに送信される。
    • 保持メソッド
      • public void subscribe(Subscriber subscriber):指定したSubscriberを購読者として登録する。
  • Subscriber(org.reactivestreams.spiパッケージ、Interface)
    • 保持するSubscriptionを基にPublisherからのメッセージを取得するコンポーネント
    • Publisher側で提供可能となったメッセージを取得する。リクエスト(Subscription#requestMore)を実行することで需要と供給の状況を併せて取得することができる。
      • 上流側の方が処理速度が大きい場合に下流側が溢れるのを防ぐためにBackPressureの機能を保持している。
    • 保持メソッド
      • public void onSubscribe(Subscription subscription):指定したSubscriptionを購読情報として保持する。Publisherに対して購読登録を行った場合に実行される。
      • public void onNext(T element):Publisherがメッセージを提供する際に呼び出す。SubscriptionにSubscriberがメッセージ取得可能と通知していた場合に実行される。
      • public void onComplete():Publisherが保持しているメッセージが終了した場合にSubscriberに通知する。
      • public void onError(Throwable cause):PublisherがSubscriberへのStreamが利用不能となった場合にエラー情報を用いて呼び出す。実行後、切断処理を行う必要がある。
  • Subscription(org.reactivestreams.spiパッケージ、Interface)
    • PublisherとSubscriberをつなぐ情報を保持するコンポーネント
    • Subscriberは本コンポーネントのSubscription#requestMoreメソッドを呼び出すことでPublisherに対して処理可能である状態、メッセージ数を通知する。
    • 保持メソッド
      • public void cancel():本Subscriptionをキャンセルする。Requestされたメッセージを配信する前、かつPublisherが提供可能なメッセージを保持する場合でも発生しえる。
      • public void requestMore(int elements):Subscriberが呼び出し、処理可能なメッセージ数をPublisherに通知する。引数の値はそれまで通知されていた値に累積する。
  • Consumer(org.reactivestreams.apiパッケージ、Interface)
    • 論理的なSinkを示すコンポーネント。基本となる実装はConsumerから取得できるSubscriberを通して行われる。
    • SubscriberがSPIである間、ConsumerがユーザレベルのAPIとなる(?)
    • 保持メソッド
      • public Subscriber getSubscriber():Consumerに対応したSubscriberを取得する。本メソッドはこのAPI実装からのみ呼ばれるべき(?)
  • Producer
    • 論理的なSourceを示すコンポーネント。基本となる実装はProducerから取得できるPublisherを通して行われる。
    • PublisherがSPIである間、ProducerがユーザレベルのAPIとなる(?)
    • 保持メソッド
      • public Publisher getPublisher():Producerに対応したPublisherを取得する。本メソッドはこのAPI実装からのみ呼ばれるべき(?)
      • public void produceTo(Consumer consumer):
        • 与えられたConsumerをProducerに接続する。これはConsumerに対応したSubscriberをProducerに対応したPublisherに接続することを意味する。
        • 下記の3つのイベントのいずれかが発生するまでProducer>Consumerのメッセージの転送が行われる。
        • ストリームが終了する(既に提供可能なメッセージが無い)
        • Producerにエラーが発生する。
        • Consumerがメッセージの受信をキャンセルする。
  • Processor
    • メッセージを受信し、変換して送信する、という流れを保持するコンポーネント
    • APIの実装はProcessorを生成し、ProducerとConsumerに対して相互に接続を行うためのFactoryメソッドを提供する。

SPIである間・・という表現は明確になっていませんが、仮か、または開発中のコンポーネント・・ということでしょうか。

コアとなるPublisher/Subscriber/Subscriptionは完全に非同期で呼び出され、
お互いにブロックしないようにするためにあえて返り値をvoidにしているそうです。
そのあたりの使われ方をラップするためにConsumer/Producerを用意した・・ということなんでしょうかね。
詳細は全く不明です。

尚、最新のReactive StreamsにおいてはConsumer/Producerが無くなり、
ProcessorがPublisher/Subscriberを保持する形になっていました。

でもSPIパッケージ配下に配置されていることは変わらず。ちとわかりません。

ともあれ、上記に挙げたコンポーネントがAkka-Streamではどのように落としこまれているか、を見ていきます。

3. Akka-StreamsではReactive Streamsのコンポーネントをどう対応付けているか?

まずAkka-StreamsでどのソースがどのReactive Streamsを使用しているかの一覧を出してみると下記のようになりました。
※テスト系はのぞきます。

Akka-Streamsコンポーネント Reactive-Streamsコンポーネント
akka.persistence.stream.PersistentPublisher org.reactivestreams.api.Producer
akka.persistence.stream.PersistentPublisher org.reactivestreams.spi.Subscriber
akka.stream.FlattenStrategy org.reactivestreams.api.Producer
akka.stream.FlowMaterializer org.reactivestreams.api.Producer
akka.stream.FlowMaterializer org.reactivestreams.api.Consumer
akka.stream.actor.ActorConsumer org.reactivestreams.api.Consumer
akka.stream.actor.ActorConsumer org.reactivestreams.spi.Subscriber
akka.stream.actor.ActorConsumer org.reactivestreams.spi.Subscription
akka.stream.actor.ActorProducer org.reactivestreams.api.Consumer
akka.stream.actor.ActorProducer org.reactivestreams.api.Producer
akka.stream.actor.ActorProducer org.reactivestreams.spi.Publisher
akka.stream.actor.ActorProducer org.reactivestreams.spi.Subscriber
akka.stream.actor.ActorProducer org.reactivestreams.spi.Subscription
akka.stream.impl.ActorBasedFlowMaterializer org.reactivestreams.api.{ Consumer, Processor, Producer }
akka.stream.impl.ActorBasedFlowMaterializer org.reactivestreams.spi.Subscriber
akka.stream.impl.ActorProcessor org.reactivestreams.api.Processor
akka.stream.impl.ActorProcessor org.reactivestreams.spi.Subscriber
akka.stream.impl.ActorProcessor org.reactivestreams.api.Consumer
akka.stream.impl.ActorProcessor org.reactivestreams.spi.Subscription
akka.stream.impl.ActorProducer org.reactivestreams.api.{ Consumer, Producer }
akka.stream.impl.ActorProducer org.reactivestreams.spi.{ Publisher, Subscriber }
akka.stream.impl.BlackholeConsumer org.reactivestreams.api.Consumer
akka.stream.impl.BlackholeConsumer org.reactivestreams.spi.Subscription
akka.stream.impl.BlackholeConsumer org.reactivestreams.spi.Subscriber
akka.stream.impl.ConcatAllImpl org.reactivestreams.api.Producer
akka.stream.impl.EmptyProducer org.reactivestreams.api.Consumer
akka.stream.impl.EmptyProducer org.reactivestreams.api.Producer
akka.stream.impl.EmptyProducer org.reactivestreams.spi.Publisher
akka.stream.impl.EmptyProducer org.reactivestreams.spi.Subscriber
akka.stream.impl.FlowImpl org.reactivestreams.api.Consumer
akka.stream.impl.FlowImpl org.reactivestreams.api.Producer
akka.stream.impl.FlowImpl org.reactivestreams.api.Consumer
akka.stream.impl.FutureProducer org.reactivestreams.spi.Subscriber
akka.stream.impl.FutureProducer org.reactivestreams.spi.Subscription
akka.stream.impl.IterableProducer org.reactivestreams.spi.Subscriber
akka.stream.impl.IterableProducer org.reactivestreams.spi.Subscription
akka.stream.impl.Messages org.reactivestreams.spi.Subscription
akka.stream.impl.SplitWhenProcessorImpl org.reactivestreams.spi.Subscription
akka.stream.impl.StaticFanins org.reactivestreams.api.Producer
akka.stream.impl.StaticFanouts org.reactivestreams.api.Producer
akka.stream.impl.StaticFanouts org.reactivestreams.api.Consumer
akka.stream.impl.StaticFanouts org.reactivestreams.spi.Subscriber
akka.stream.impl.StaticFanouts org.reactivestreams.spi.Subscription
akka.stream.impl.StreamOfStreamProcessors org.reactivestreams.spi.{ Subscriber, Subscription }
akka.stream.impl.StreamOfStreamProcessors org.reactivestreams.api.Producer
akka.stream.impl.SubscriberManagement org.reactivestreams.api
akka.stream.impl.SubscriberManagement org.reactivestreams.spi
akka.stream.impl.SynchronousProducerFromIterable org.reactivestreams.api.Consumer
akka.stream.impl.SynchronousProducerFromIterable org.reactivestreams.spi.Subscription
akka.stream.impl.SynchronousProducerFromIterable org.reactivestreams.spi.Subscriber
akka.stream.impl.SynchronousProducerFromIterable org.reactivestreams.spi.Publisher
akka.stream.impl.SynchronousProducerFromIterable org.reactivestreams.api.Producer
akka.stream.impl.TickProducer org.reactivestreams.spi.Subscriber
akka.stream.impl.TickProducer org.reactivestreams.spi.Subscription
akka.stream.impl.Transfer org.reactivestreams.spi.{ Subscriber, Subscription }
akka.stream.io.StreamIO org.reactivestreams.api.{ Processor, Producer, Consumer }
akka.stream.io.TcpConnectionStream org.reactivestreams.api.Processor
akka.stream.io.TcpListenStreamActor org.reactivestreams.api.{ Consumer, Producer }
akka.stream.io.TcpListenStreamActor org.reactivestreams.spi.Publisher
akka.stream.javadsl.Duct org.reactivestreams.api.Consumer
akka.stream.javadsl.Duct org.reactivestreams.api.Producer
akka.stream.javadsl.Flow org.reactivestreams.api.Producer
akka.stream.javadsl.Flow org.reactivestreams.api.Consumer
akka.streamdsl.Duct org.reactivestreams.api.Consumer
akka.streamdsl.Duct org.reactivestreams.api.Producer
akka.streamdsl.Flow org.reactivestreams.api.Consumer
akka.streamdsl.Flow org.reactivestreams.api.Producer

・・こう見てみると、Akka-Streamsという単一のプロダクトの中でインポートの記述方法一つとっても実はコードのルールは統一されていないことがわかります。
いえ、全く重要ではないんですが、気になりました(汗

尚、ProducerはPublisherにReactive Streamsで更新されているので可能になったら入れ替える・・
とNoteで明記されている関係上、上記の構成は今後あっさり変わりそうではあります。
とはいえ、上記のコンポーネントが今後Reactive Streamsにも関わり続けることは確かだとは思います。

ともあれ、今回はここまでで、次回は上記のコンポーネントの概要を確認してみます。
その後、Reactive Streamsを具体的にどう実現しているか、の確認に入る・・予定です。