Akka StreamsとReactive Streamsの対応は?(その1
こんにちは。
前回とりあえずAkka-Streamsを動かしてみたので、
今回はAkka-StreamsとReactive Streamsの対応を確認してみます。
1. Akka-Streamsのソース取得
まずはAkka-Streamsのソースがなければ始まりません。
そのため、前回のサンプルの依存性から辿って行って、Akka-Streamsのソースを取得します。
前回動作させたサンプルはactivator-akka-stream-scalaですから、まずはactivator-akka-stream-scalaの依存性を見てみます。
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-experimental" % "0.2" )
そこからMavenRepositoryを確認してpom.xmlをチェックすると下記の通り。
Reactive Streamsに依存している構成になっています。
尚、Reactive Streamsにはspiとtckという2つのコンポーネントがありますが、
定義を見る限り、tckはテスト用コンポーネントのようですね。(Technology Compatibility Kitの略だそうです。)
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.3</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.10</artifactId> <version>2.3.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-testkit_2.10</artifactId> <version>2.3.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.10</artifactId> <version>2.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> <artifactId>scalacheck_2.10</artifactId> <version>1.10.1</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams-spi</artifactId> <version>0.3</version> </dependency> <dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams-tck</artifactId> <version>0.3</version> <scope>test</scope> </dependency> </dependencies>
ただ、Maven Repositoryの記述に従い、https://github.com/akka/akka に行くとそこにはAkka-Streamsはありませんでした・・
そのため、branchを漁っている最中にTwitterで@yasushiaさんに「release-2.3-dev」というbranchに
ソースがある旨のコメントをいただき、見つけることができました。
・・・普通に、「releasing-akka-stream-experimental-0.3」「releasing-akka-stream-experimental-v0.3」の
どちらが正しいんだろう、って悩んでいましたので^^;
ともあれ、これでソースの入手は完了です。
2. Reactive Streamsのコンポーネントは?
Akka-Streamsのソースを確認する前に、Reactive Streamsのコンポーネントを確認しておきます。
Reactive Streamsのコンポーネント(reactive-streams-spi)は下記の6個から構成されています。
※この6個、というのは「akka-stream-experimental」が使用しているVer0.3を基にしています。
※現状の最新のReactive Streamsは既に構成が変更されていました。
- Publisher(org.reactivestreams.spiパッケージ、Interface)
- Subscriber(org.reactivestreams.spiパッケージ、Interface)
- 保持するSubscriptionを基にPublisherからのメッセージを取得するコンポーネント。
- Publisher側で提供可能となったメッセージを取得する。リクエスト(Subscription#requestMore)を実行することで需要と供給の状況を併せて取得することができる。
- 上流側の方が処理速度が大きい場合に下流側が溢れるのを防ぐためにBackPressureの機能を保持している。
- 保持メソッド
- public void onSubscribe(Subscription subscription):指定したSubscriptionを購読情報として保持する。Publisherに対して購読登録を行った場合に実行される。
- public void onNext(T element):Publisherがメッセージを提供する際に呼び出す。SubscriptionにSubscriberがメッセージ取得可能と通知していた場合に実行される。
- public void onComplete():Publisherが保持しているメッセージが終了した場合にSubscriberに通知する。
- public void onError(Throwable cause):PublisherがSubscriberへのStreamが利用不能となった場合にエラー情報を用いて呼び出す。実行後、切断処理を行う必要がある。
- Subscription(org.reactivestreams.spiパッケージ、Interface)
- PublisherとSubscriberをつなぐ情報を保持するコンポーネント。
- Subscriberは本コンポーネントのSubscription#requestMoreメソッドを呼び出すことでPublisherに対して処理可能である状態、メッセージ数を通知する。
- 保持メソッド
- public void cancel():本Subscriptionをキャンセルする。Requestされたメッセージを配信する前、かつPublisherが提供可能なメッセージを保持する場合でも発生しえる。
- public void requestMore(int elements):Subscriberが呼び出し、処理可能なメッセージ数をPublisherに通知する。引数の値はそれまで通知されていた値に累積する。
- Consumer(org.reactivestreams.apiパッケージ、Interface)
- Producer
- 論理的なSourceを示すコンポーネント。基本となる実装はProducerから取得できるPublisherを通して行われる。
- PublisherがSPIである間、ProducerがユーザレベルのAPIとなる(?)
- 保持メソッド
- public Publisher
getPublisher():Producerに対応したPublisherを取得する。本メソッドはこのAPI実装からのみ呼ばれるべき(?) - public void produceTo(Consumer
consumer): - 与えられたConsumerをProducerに接続する。これはConsumerに対応したSubscriberをProducerに対応したPublisherに接続することを意味する。
- 下記の3つのイベントのいずれかが発生するまでProducer>Consumerのメッセージの転送が行われる。
- ストリームが終了する(既に提供可能なメッセージが無い)
- Producerにエラーが発生する。
- Consumerがメッセージの受信をキャンセルする。
- public Publisher
- Processor
SPIである間・・という表現は明確になっていませんが、仮か、または開発中のコンポーネント・・ということでしょうか。
コアとなるPublisher/Subscriber/Subscriptionは完全に非同期で呼び出され、
お互いにブロックしないようにするためにあえて返り値をvoidにしているそうです。
そのあたりの使われ方をラップするためにConsumer/Producerを用意した・・ということなんでしょうかね。
詳細は全く不明です。
尚、最新のReactive StreamsにおいてはConsumer/Producerが無くなり、
ProcessorがPublisher/Subscriberを保持する形になっていました。
でもSPIパッケージ配下に配置されていることは変わらず。ちとわかりません。
ともあれ、上記に挙げたコンポーネントがAkka-Streamではどのように落としこまれているか、を見ていきます。
3. Akka-StreamsではReactive Streamsのコンポーネントをどう対応付けているか?
まずAkka-StreamsでどのソースがどのReactive Streamsを使用しているかの一覧を出してみると下記のようになりました。
※テスト系はのぞきます。
Akka-Streamsコンポーネント | Reactive-Streamsコンポーネント |
---|---|
akka.persistence.stream.PersistentPublisher | org.reactivestreams.api.Producer |
akka.persistence.stream.PersistentPublisher | org.reactivestreams.spi.Subscriber |
akka.stream.FlattenStrategy | org.reactivestreams.api.Producer |
akka.stream.FlowMaterializer | org.reactivestreams.api.Producer |
akka.stream.FlowMaterializer | org.reactivestreams.api.Consumer |
akka.stream.actor.ActorConsumer | org.reactivestreams.api.Consumer |
akka.stream.actor.ActorConsumer | org.reactivestreams.spi.Subscriber |
akka.stream.actor.ActorConsumer | org.reactivestreams.spi.Subscription |
akka.stream.actor.ActorProducer | org.reactivestreams.api.Consumer |
akka.stream.actor.ActorProducer | org.reactivestreams.api.Producer |
akka.stream.actor.ActorProducer | org.reactivestreams.spi.Publisher |
akka.stream.actor.ActorProducer | org.reactivestreams.spi.Subscriber |
akka.stream.actor.ActorProducer | org.reactivestreams.spi.Subscription |
akka.stream.impl.ActorBasedFlowMaterializer | org.reactivestreams.api.{ Consumer, Processor, Producer } |
akka.stream.impl.ActorBasedFlowMaterializer | org.reactivestreams.spi.Subscriber |
akka.stream.impl.ActorProcessor | org.reactivestreams.api.Processor |
akka.stream.impl.ActorProcessor | org.reactivestreams.spi.Subscriber |
akka.stream.impl.ActorProcessor | org.reactivestreams.api.Consumer |
akka.stream.impl.ActorProcessor | org.reactivestreams.spi.Subscription |
akka.stream.impl.ActorProducer | org.reactivestreams.api.{ Consumer, Producer } |
akka.stream.impl.ActorProducer | org.reactivestreams.spi.{ Publisher, Subscriber } |
akka.stream.impl.BlackholeConsumer | org.reactivestreams.api.Consumer |
akka.stream.impl.BlackholeConsumer | org.reactivestreams.spi.Subscription |
akka.stream.impl.BlackholeConsumer | org.reactivestreams.spi.Subscriber |
akka.stream.impl.ConcatAllImpl | org.reactivestreams.api.Producer |
akka.stream.impl.EmptyProducer | org.reactivestreams.api.Consumer |
akka.stream.impl.EmptyProducer | org.reactivestreams.api.Producer |
akka.stream.impl.EmptyProducer | org.reactivestreams.spi.Publisher |
akka.stream.impl.EmptyProducer | org.reactivestreams.spi.Subscriber |
akka.stream.impl.FlowImpl | org.reactivestreams.api.Consumer |
akka.stream.impl.FlowImpl | org.reactivestreams.api.Producer |
akka.stream.impl.FlowImpl | org.reactivestreams.api.Consumer |
akka.stream.impl.FutureProducer | org.reactivestreams.spi.Subscriber |
akka.stream.impl.FutureProducer | org.reactivestreams.spi.Subscription |
akka.stream.impl.IterableProducer | org.reactivestreams.spi.Subscriber |
akka.stream.impl.IterableProducer | org.reactivestreams.spi.Subscription |
akka.stream.impl.Messages | org.reactivestreams.spi.Subscription |
akka.stream.impl.SplitWhenProcessorImpl | org.reactivestreams.spi.Subscription |
akka.stream.impl.StaticFanins | org.reactivestreams.api.Producer |
akka.stream.impl.StaticFanouts | org.reactivestreams.api.Producer |
akka.stream.impl.StaticFanouts | org.reactivestreams.api.Consumer |
akka.stream.impl.StaticFanouts | org.reactivestreams.spi.Subscriber |
akka.stream.impl.StaticFanouts | org.reactivestreams.spi.Subscription |
akka.stream.impl.StreamOfStreamProcessors | org.reactivestreams.spi.{ Subscriber, Subscription } |
akka.stream.impl.StreamOfStreamProcessors | org.reactivestreams.api.Producer |
akka.stream.impl.SubscriberManagement | org.reactivestreams.api |
akka.stream.impl.SubscriberManagement | org.reactivestreams.spi |
akka.stream.impl.SynchronousProducerFromIterable | org.reactivestreams.api.Consumer |
akka.stream.impl.SynchronousProducerFromIterable | org.reactivestreams.spi.Subscription |
akka.stream.impl.SynchronousProducerFromIterable | org.reactivestreams.spi.Subscriber |
akka.stream.impl.SynchronousProducerFromIterable | org.reactivestreams.spi.Publisher |
akka.stream.impl.SynchronousProducerFromIterable | org.reactivestreams.api.Producer |
akka.stream.impl.TickProducer | org.reactivestreams.spi.Subscriber |
akka.stream.impl.TickProducer | org.reactivestreams.spi.Subscription |
akka.stream.impl.Transfer | org.reactivestreams.spi.{ Subscriber, Subscription } |
akka.stream.io.StreamIO | org.reactivestreams.api.{ Processor, Producer, Consumer } |
akka.stream.io.TcpConnectionStream | org.reactivestreams.api.Processor |
akka.stream.io.TcpListenStreamActor | org.reactivestreams.api.{ Consumer, Producer } |
akka.stream.io.TcpListenStreamActor | org.reactivestreams.spi.Publisher |
akka.stream.javadsl.Duct | org.reactivestreams.api.Consumer |
akka.stream.javadsl.Duct | org.reactivestreams.api.Producer |
akka.stream.javadsl.Flow | org.reactivestreams.api.Producer |
akka.stream.javadsl.Flow | org.reactivestreams.api.Consumer |
akka.streamdsl.Duct | org.reactivestreams.api.Consumer |
akka.streamdsl.Duct | org.reactivestreams.api.Producer |
akka.streamdsl.Flow | org.reactivestreams.api.Consumer |
akka.streamdsl.Flow | org.reactivestreams.api.Producer |
・・こう見てみると、Akka-Streamsという単一のプロダクトの中でインポートの記述方法一つとっても実はコードのルールは統一されていないことがわかります。
いえ、全く重要ではないんですが、気になりました(汗
尚、ProducerはPublisherにReactive Streamsで更新されているので可能になったら入れ替える・・
とNoteで明記されている関係上、上記の構成は今後あっさり変わりそうではあります。
とはいえ、上記のコンポーネントが今後Reactive Streamsにも関わり続けることは確かだとは思います。
ともあれ、今回はここまでで、次回は上記のコンポーネントの概要を確認してみます。
その後、Reactive Streamsを具体的にどう実現しているか、の確認に入る・・予定です。