読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Activeator-Akka-Streamのソース確認

こんにちは。

前回に引き続き、Akka StreamsのコンポーネントがReactive Streamsで
定義されたコンポーネントのどれに対応しているかを見ていきます。

・・・と思ったのですが、
Akka Streamsをまともに読んでも範囲が広すぎるため、
Akka Streamsの利用サンプルとしてあがっている「activator-akka-stream-scala」をベースにまずは確認を行っていきます。
・・・ええ、とりあえず読み始めたんですが、実際の実現例なしにいきなりソース読んで理解できるものではありませんでした(汗

ソース自体は下記から持ってきています。
https://github.com/typesafehub/activator-akka-stream-scala

BasicTransformation

ソースは下記になります。
■BasicTransformation.scala

object BasicTransformation {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")

    val text =
      """|Lorem Ipsum is simply dummy text of the printing and typesetting industry.
         |Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, 
         |when an unknown printer took a galley of type and scrambled it to make a type 
         |specimen book.""".stripMargin

    Flow(text.split("\\s").toVector). // Flow生成
      // transform
      map(line => line.toUpperCase).
      // print to console (can also use ``foreach(println)``)
      foreach(transformedLine => println(transformedLine)).
      onComplete(FlowMaterializer(MaterializerSettings())) {
        case Success(_) => system.shutdown()
        case Failure(e) =>
          println("Failure: " + e.getMessage)
          system.shutdown()
      }

  }

上記の中で、Akka-Streamに関わる箇所はFlow、FlowMaterializer、MaterializerSettingsです。
まずはFlowのソースを追ってみます。

Flowのソースを追うと、下記のように内部でIterableなオブジェクトを基に
IterableProducerNodeに渡し、その結果をFlowImplとして生成しています。
■Flow.scala

object Flow {
  def apply[T](iterable: immutable.Iterable[T]): Flow[T] = FlowImpl(IterableProducerNode(iterable), Nil)
}

その後、Flowに対してmap等のtransportを実行していくことになります。
その部分の記述についてはFlowImplとして記述されています。

■FlowImpl.scala

private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Flow[O] {
  import FlowImpl._
  import Ast._

  override def map[U](f: O ⇒ U): Flow[U] = transform(())((_, in) ⇒ ((), List(f(in))))

  override def foreach(c: O ⇒ Unit): Flow[Unit] = transform(())((_, in) ⇒ c(in) -> Nil, _ ⇒ List(()))

  override def transform[S, U](zero: S)(
    f: (S, O) ⇒ (S, immutable.Seq[U]),
    onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
    isComplete: S ⇒ Boolean = (_: S) ⇒ false,
    cleanup: S ⇒ Unit = (_: S) ⇒ ()): Flow[U] =
    andThen(Transform(
      zero,
      f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])],
      onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]],
      isComplete.asInstanceOf[Any ⇒ Boolean],
      cleanup.asInstanceOf[Any ⇒ Unit]))
}

内容としてはScalaとしてはベーシックな内容ですね。

IterableProducerNodeは下記のようになっており、
この時点でReactive StreamsのコンポーネントであるProducerを生成するProducerNodeに落とし込まれたことがわかります。


■ActorBasedFlowMaterializer.scala

private[akka] object Ast {
  trait AstNode
  case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] {
    def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] =
      if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]]
      else new ActorProducer[I](context.actorOf(IterableProducer.props(iterable, settings)))
  }
}

Iteratorの中身が空だった場合に生成されるEmptyProducerは下記の通りです。
subscribeメソッドを読んだら即購読完了、という動作になっていますね。
購読のインタフェースのみ持っていて、でも最初から既に購読完了している、というProducerでした。
■EmptyProducer.scala

private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] {
  def getPublisher: Publisher[Nothing] = this

  def subscribe(subscriber: Subscriber[Nothing]): Unit =
    subscriber.onComplete()

  def produceTo(consumer: Consumer[Nothing]): Unit =
    getPublisher.subscribe(consumer.getSubscriber)
}

Iteratorの中身が存在した場合に生成されるActorProducerについては下記の通りです。
内部でActorPublisherが生成される作りになっています。

ActorPublisher自体も挙げます。
基本的にはActorRefの中身を見ている・・・のですが、Scala勉強中のため微妙に曖昧ではあります。
■ActorProducer.scala

class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T]

private[akka] trait ActorProducerLike[T] extends Producer[T] {
  def impl: ActorRef
  override val getPublisher: Publisher[T] = {
    val a = new ActorPublisher[T](impl)
    // Resolve cyclic dependency with actor. This MUST be the first message no matter what.
    impl ! ExposedPublisher(a.asInstanceOf[ActorPublisher[Any]])
    a
  }

private[akka] final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {

  // The subscriber of an subscription attempt is first placed in this list of pending subscribers.
  // The actor will call takePendingSubscribers to remove it from the list when it has received the 
  // SubscribePending message. The AtomicReference is set to null by the shutdown method, which is
  // called by the actor from postStop. Pending (unregistered) subscription attempts are denied by
  // the shutdown method. Subscription attempts after shutdown can be denied immediately.
  private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[T]]](Nil)

  override def subscribe(subscriber: Subscriber[T]): Unit = {
    @tailrec def doSubscribe(subscriber: Subscriber[T]): Unit = {
      val current = pendingSubscribers.get
      if (current eq null)
        reportSubscribeError(subscriber)
      else {
        if (pendingSubscribers.compareAndSet(current, subscriber +: current))
          impl ! SubscribePending
        else
          doSubscribe(subscriber) // CAS retry
      }
    }

    doSubscribe(subscriber)
  }

  def takePendingSubscribers(): immutable.Seq[Subscriber[T]] = {
    val pending = pendingSubscribers.getAndSet(Nil)
    assert(pending ne null, "takePendingSubscribers must not be called after shutdown")
    pending.reverse
  }

  def shutdown(reason: Option[Throwable]): Unit = {
    shutdownReason = reason
    pendingSubscribers.getAndSet(null) match {
      case null// already called earlier
      case pending ⇒ pending foreach reportSubscribeError
    }
  }

  @volatile private var shutdownReason: Option[Throwable] = None

  private def reportSubscribeError(subscriber: Subscriber[T]): Unit =
    shutdownReason match {
      case Some(e) ⇒ subscriber.onError(e)
      case None    ⇒ subscriber.onComplete()
    }
}

とりあえず、Flowがどういう経路を伝って
Reactive Streamのコンポーネントにつながるか、の概要がわかった感じですね。

ただ、この先に進むにはScala力が足りない気もしますので・・・
構造の把握はこの位にしておいて、次回からは実際に分散環境にデプロイできるか、を試してみます。