Activeator-Akka-Streamのソース確認
こんにちは。
前回に引き続き、Akka StreamsのコンポーネントがReactive Streamsで
定義されたコンポーネントのどれに対応しているかを見ていきます。
・・・と思ったのですが、
Akka Streamsをまともに読んでも範囲が広すぎるため、
Akka Streamsの利用サンプルとしてあがっている「activator-akka-stream-scala」をベースにまずは確認を行っていきます。
・・・ええ、とりあえず読み始めたんですが、実際の実現例なしにいきなりソース読んで理解できるものではありませんでした(汗
ソース自体は下記から持ってきています。
https://github.com/typesafehub/activator-akka-stream-scala
BasicTransformation
ソースは下記になります。
■BasicTransformation.scala
object BasicTransformation { def main(args: Array[String]): Unit = { implicit val system = ActorSystem("Sys") val text = """|Lorem Ipsum is simply dummy text of the printing and typesetting industry. |Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, |when an unknown printer took a galley of type and scrambled it to make a type |specimen book.""".stripMargin Flow(text.split("\\s").toVector). // Flow生成 // transform map(line => line.toUpperCase). // print to console (can also use ``foreach(println)``) foreach(transformedLine => println(transformedLine)). onComplete(FlowMaterializer(MaterializerSettings())) { case Success(_) => system.shutdown() case Failure(e) => println("Failure: " + e.getMessage) system.shutdown() } }
上記の中で、Akka-Streamに関わる箇所はFlow、FlowMaterializer、MaterializerSettingsです。
まずはFlowのソースを追ってみます。
Flowのソースを追うと、下記のように内部でIterableなオブジェクトを基に
IterableProducerNodeに渡し、その結果をFlowImplとして生成しています。
■Flow.scala
object Flow { def apply[T](iterable: immutable.Iterable[T]): Flow[T] = FlowImpl(IterableProducerNode(iterable), Nil) }
その後、Flowに対してmap等のtransportを実行していくことになります。
その部分の記述についてはFlowImplとして記述されています。
■FlowImpl.scala
private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Flow[O] { import FlowImpl._ import Ast._ override def map[U](f: O ⇒ U): Flow[U] = transform(())((_, in) ⇒ ((), List(f(in)))) override def foreach(c: O ⇒ Unit): Flow[Unit] = transform(())((_, in) ⇒ c(in) -> Nil, _ ⇒ List(())) override def transform[S, U](zero: S)( f: (S, O) ⇒ (S, immutable.Seq[U]), onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, isComplete: S ⇒ Boolean = (_: S) ⇒ false, cleanup: S ⇒ Unit = (_: S) ⇒ ()): Flow[U] = andThen(Transform( zero, f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], isComplete.asInstanceOf[Any ⇒ Boolean], cleanup.asInstanceOf[Any ⇒ Unit])) }
内容としてはScalaとしてはベーシックな内容ですね。
IterableProducerNodeは下記のようになっており、
この時点でReactive StreamsのコンポーネントであるProducerを生成するProducerNodeに落とし込まれたことがわかります。
■ActorBasedFlowMaterializer.scala
private[akka] object Ast { trait AstNode case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] { def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]] else new ActorProducer[I](context.actorOf(IterableProducer.props(iterable, settings))) } }
Iteratorの中身が空だった場合に生成されるEmptyProducerは下記の通りです。
subscribeメソッドを読んだら即購読完了、という動作になっていますね。
購読のインタフェースのみ持っていて、でも最初から既に購読完了している、というProducerでした。
■EmptyProducer.scala
private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] { def getPublisher: Publisher[Nothing] = this def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onComplete() def produceTo(consumer: Consumer[Nothing]): Unit = getPublisher.subscribe(consumer.getSubscriber) }
Iteratorの中身が存在した場合に生成されるActorProducerについては下記の通りです。
内部でActorPublisherが生成される作りになっています。
ActorPublisher自体も挙げます。
基本的にはActorRefの中身を見ている・・・のですが、Scala勉強中のため微妙に曖昧ではあります。
■ActorProducer.scala
class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T] private[akka] trait ActorProducerLike[T] extends Producer[T] { def impl: ActorRef override val getPublisher: Publisher[T] = { val a = new ActorPublisher[T](impl) // Resolve cyclic dependency with actor. This MUST be the first message no matter what. impl ! ExposedPublisher(a.asInstanceOf[ActorPublisher[Any]]) a } private[akka] final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { // The subscriber of an subscription attempt is first placed in this list of pending subscribers. // The actor will call takePendingSubscribers to remove it from the list when it has received the // SubscribePending message. The AtomicReference is set to null by the shutdown method, which is // called by the actor from postStop. Pending (unregistered) subscription attempts are denied by // the shutdown method. Subscription attempts after shutdown can be denied immediately. private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[T]]](Nil) override def subscribe(subscriber: Subscriber[T]): Unit = { @tailrec def doSubscribe(subscriber: Subscriber[T]): Unit = { val current = pendingSubscribers.get if (current eq null) reportSubscribeError(subscriber) else { if (pendingSubscribers.compareAndSet(current, subscriber +: current)) impl ! SubscribePending else doSubscribe(subscriber) // CAS retry } } doSubscribe(subscriber) } def takePendingSubscribers(): immutable.Seq[Subscriber[T]] = { val pending = pendingSubscribers.getAndSet(Nil) assert(pending ne null, "takePendingSubscribers must not be called after shutdown") pending.reverse } def shutdown(reason: Option[Throwable]): Unit = { shutdownReason = reason pendingSubscribers.getAndSet(null) match { case null ⇒ // already called earlier case pending ⇒ pending foreach reportSubscribeError } } @volatile private var shutdownReason: Option[Throwable] = None private def reportSubscribeError(subscriber: Subscriber[T]): Unit = shutdownReason match { case Some(e) ⇒ subscriber.onError(e) case None ⇒ subscriber.onComplete() } }
とりあえず、Flowがどういう経路を伝って
Reactive Streamのコンポーネントにつながるか、の概要がわかった感じですね。
ただ、この先に進むにはScala力が足りない気もしますので・・・
構造の把握はこの位にしておいて、次回からは実際に分散環境にデプロイできるか、を試してみます。