読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

RCO Study Night "RCOにおける機械学習と次世代量子情報処理技術「量子アニーリング」" に参加してきました

こんにちは。

最近技術的な内容は大体Qiitaに書いているので
こちらは間が空いてしまっていますが、勉強会の参加記録はこちらに。

RCO Study Night "RCOにおける機械学習と次世代量子情報処理技術「量子アニーリング」"に参加してきました。

atnd.org

正直な話、ソフトウェア屋なので物理的な内容に踏み込んだ個所は理解できていないのですが、
その理解なりにわかったことを参加記録として残しておきます。

当然間違った内容も紛れているでしょうけど、まぁそれが現時点の私の理解可能なレベルということで。

参加メモ

RCOにおける機械学習(高柳 慎一)

  • アドテクはコードがそのままビジネス価値につながる
    • 特に重要な要素
      • 大量データのハンドリング
      • 多変数要素の最適化
  • 何故量子アニーリングを?
    • 最適化とは、つまりはモデルのパラメータを決定すること
      • 組み合わせ最適問題の解く手法として量子アニーリングを考える
      • ノーフリーランチの定理
        • 万能な最適化は存在しない
        • 各領域に最適なツールは個別に存在
        • つまりは、自分達の道具が必要
      • どういう領域に量子アニーリングは向くのか、実際に検証中

次世代量子情報処理技術「量子アニーリング」が拓く新時代 情報処理と物理学のハーモニー(田中 宗)

  • 組み合わせ最適問題とは?
    • 全ての組み合わせを確かめて一番いい結果を選ぶ問題
      • 2択テスト
      • 巡回セールスマン問題
    • 問題数が増えるにつれて計算量が爆発する。まともにやるととても解けない。
    • 定式化すると、離散変数を引数とする実数関数が最小値を取る条件を見つける問題
      • y = f(x1,x2,x3...xn) を最小化
  • 量子アニーリングの特長
    • 自己組織化
      • プログラミング不要、問題を与えれば自然に答えが出力
    • 統計力学理論
      • 組合せ最適化問題において、膨大なデータ処理の理論基盤
    • 量子並列性
      • 大量の情報を一度に並列処理可能
  • 自然現象は、計算(ナチュラルコンピューティング)
    • 自然現象を記述する言語、物理学
      • 運動方程式を解くと、システムの振る舞いが予言できる
        • 逆に言うと、システムの振る舞いが運動方程式の答えとなっている
    • 組合せ最適問題とは、離散変数を引数とする実数関数が最小値を取る条件を見つける問題
      • 力学:力学運動は作用と呼ばれる関数の最小値を取る軌道
      • 波動光学:光路最小条件を満たす所に光線が伝搬、屈折、干渉現象
      • 自然現象から着想を得て、計算の飛躍的発展を狙う
    • ナチュラルコンピューティング
      • 自然界のシステムを用いて、ベストな答えを探しだす
        • 粘菌コンピュータ
        • DNAコンピュータ
      • 自然現象からアルゴリズムを出す考え
    • 組合せ最適問題において、ただ結果を小さくするだけでは局所最適にはまり、ベストな答えは見つからない
      • 上がるプロセスも必要だが、それをどのように実現するか?
        • 熱による揺らぎを利用する
    • アニーリング(徐冷)
      • 合金は高温状態だとランダムな状態だが、徐々に冷却していくと物質して安定する状態=最適な状態に収斂する
    • イジングモデル
  • 量子の時代
    • 物理学とテクノロジーは下記のように発展してきた
      • 16世紀~:光学
      • 17世紀~:力学
      • 18世紀~:電磁気学
      • 19世紀~:熱力学
      • 20世紀~:量子力学
    • 熱力学:熱効果によるゆらぎ
      • 熱アニーリング
      • シミュレーテッドアニーリング
    • 量子力学:量子効果によるゆらぎ
    • 2択問題で考える
      • 熱揺らぎによる、ランダムな答え(A or B)
      • 量子揺らぎによる、「重ね合わせ」の答え(AでもありBでもある)
    • 量子効果を反映させた物理系のダイナミクスをシミュレーション
  • 何故量子アニーリングを使うのか?
    • 1億倍速い計算(1か月前のGoogle社のニュース)
      • D-Wave(初めての商用量子コンピュータ:2011~)
      • シミュレーテッドアニーリングと比べて1億倍高速
    • D-Waveの代表的数値
      • 10億円/台
      • 1nTまで減磁
        • 地磁場の5万分の1
      • 20mkまで冷却
      • 12kWの消費電力
        • スパコンは10MWオーダー、一般家庭は400Wとか
      • 1000+ 量子ビット
    • 但し、D-Waveの電力は大部分が冷却に費やされており、量子計算による消費電力はごく一部
      • この後量子ビット数=計算能力が伸びても消費電力はほとんど変わらない
    • 「1億倍速い計算」に対する見解
    • 多様な産業に展開し、無意識に使える技術まで落とし込みたい
      • 以前は数百年かかると言われていた
      • その後事情が変わり、発展が高速化
      • 講演者が現役のうちに、各産業に展開されるのでは
  • 現在リクルートとの共同研究で行っていること
    • どのような領域に量子アニーリングが使えるか、の実証研究
      • 現状、「この問題には有用」「この問題には使えない」ということはいくつか挙がっている
      • だが、「この問題領域には有用」「この問題領域には使えない」はわかっていない。
    • 量子モンテカルロ法(QMC)で量子アニーリングをシミュレーション
      • 通常コンピュータ上でQMCはシミュレーション可能
      • 計算効率は量子アニーリングに比べて落ちるが、計算量の増大ペースはほぼ同じ

感想

量子力学は素人向けの本を1冊読んだくらいの前提知識でしたが、シュレーディンガーの猫で有名なシュレーディンガー方程式がここで出てるとは、と正直驚いた発表でした。(そこかい

ただ、量子アニーリングとは何なのか、何に使えるのか、何故使うのかと、あとは現状の進展度合いの概要がわかったという意味で非常に興味深かった勉強会でした。

勉強会での講演者である田中先生は今年のうちに一般向けの書籍も記述されるとのことですので、そのあたりを楽しみに、気長に情報は追っておこうと思います。
一般向けの、になるとは思いますが。

後は、エンジニアが全員機械学習のコードを書けて、発表の中で挙がった4要素を兼ね備えるとはさすがリクルートコミュニケーションズ、マッチョなエンジニアが揃っていそうな会社ですね。
そちらは現状私自身は初歩の機械学習アルゴリズムを実装できるレベルでしかないので、勉強は重ねておく必要はありそうです。

2016/01/19追記:
発表資料公開されていました。

speakerdeck.com


www.slideshare.net

AWS NACLの設定をCLIから行ってみた

AWSのNACLを試してみたのですが、
ネット上を調べたところ日本語の実例情報がそれほどなかったため、
とりあえずまとめておこう、ということでまとめてみました。

1. NACLとは?

下記のページにもありますが、
「サブネットのインバウンドトラフィックとアウトバウンドトラフィックを制御するファイアウォールとして動作するセキュリティのオプションレイヤー」です。

docs.aws.amazon.com


セキュリティグループと何が違うのか、というのは下記のページにあります。docs.aws.amazon.com


簡単にまとめてしまうと、こんなイメージかと。

セキュリティグループ NACL
インスタンスレベルで動作 サブネットレベルで動作
ステートフル: ルールに関係なく、返されたトラフィックが自動的に許可 ステートレス: 返されたトラフィックがルールによって明示的に許可

2. どんなネットワークを構築したいか?

今回構築したいネットワークは下の図のようになります。

1VPC内でマスターのサブネットとスレーブのサブネットが複数存在する構成で、
マスターのサブネットは全スレーブサブネットと通信可能、
スレーブのサブネットはスレーブ間では直接通信は出来ない、というものです。

尚、これらのサブネットは全てパブリックサブネットなので、外部との通信は可能です。
外部との間はセキュリティグループで明示的に許可しないと通信は出来ないのでそちらはセキュリティグループで。
今回はVPC内の通信をNACLで一律ガード出来ないか、ということを考えています。
f:id:kimutansk:20150923053913p:plain

3. 構築前準備(サブネットの作成)

まずはVPCを作成した上でインターネットゲートウェイをアタッチします。
単にCLIVPCを作成した場合、インターネットゲートウェイがアタッチされておらず、外部と通信が出来ないからです。

# aws ec2 create-vpc --cidr-block 10.0.0.0/16
{
    "Vpc": {
        "InstanceTenancy": "default",
        "State": "pending",
        "VpcId": "vpc-9342fcf6", ★アタッチ時に使用★
        "CidrBlock": "10.0.0.0/16",
        "DhcpOptionsId": "dopt-e9c9258c"
    }
}
# aws ec2 create-internet-gateway
{
    "InternetGateway": {
        "Tags": [],
        "InternetGatewayId": "igw-434b9e26", ★アタッチ時に使用★
        "Attachments": []
    }
}
# aws ec2 attach-internet-gateway --internet-gateway-id igw-434b9e26 --vpc-id vpc-9342fcf6

その後、ルーティング設定を行います。
こちらもCLIVPCを作成した場合インターネットゲートウェイがアタッチされていない関係上、
初期のルートテーブルにはインターネットゲートウェイへのルートが存在しないためです。

# aws ec2 describe-route-tables --filters Name=vpc-id,Values=vpc-9342fcf6
{
    "RouteTables": [
        {
            "Associations": [
                {
                    "RouteTableAssociationId": "rtbassoc-c86efcad",
                    "Main": true,
                    "RouteTableId": "rtb-c3a229a6"
                }
            ],
            "RouteTableId": "rtb-c3a229a6",
            "VpcId": "vpc-9342fcf6",
            "PropagatingVgws": [],
            "Tags": [],
            "Routes": [
                {
                    "GatewayId": "local",
                    "DestinationCidrBlock": "10.0.0.0/16",
                    "State": "active",
                    "Origin": "CreateRouteTable"
                }
            ]
        }
    ]
}
# aws ec2 create-route --route-table-id rtb-c3a229a6 --destination-cidr-block 0.0.0.0/0 --gateway-id igw-434b9e26
{
    "Return": true
}

上記完了後、サブネットを作成します。

# aws ec2 create-subnet --vpc-id vpc-9342fcf6 --cidr-block 10.0.0.0/24
{
    "Subnet": {
        "VpcId": "vpc-9342fcf6",
        "CidrBlock": "10.0.0.0/24",
        "State": "pending",
        "AvailabilityZone": "ap-northeast-1c",
        "SubnetId": "subnet-0adb7253",
        "AvailableIpAddressCount": 251
    }
}
# aws ec2 create-subnet --vpc-id vpc-9342fcf6 --cidr-block 10.0.1.0/24
{
    "Subnet": {
        "VpcId": "vpc-9342fcf6",
        "CidrBlock": "10.0.1.0/24",
        "State": "pending",
        "AvailabilityZone": "ap-northeast-1c",
        "SubnetId": "subnet-02db725b",
        "AvailableIpAddressCount": 251
    }
}
# aws ec2 create-subnet --vpc-id vpc-9342fcf6 --cidr-block 10.0.2.0/24
{
    "Subnet": {
        "VpcId": "vpc-9342fcf6",
        "CidrBlock": "10.0.2.0/24",
        "State": "pending",
        "AvailabilityZone": "ap-northeast-1c",
        "SubnetId": "subnet-39db7260",
        "AvailableIpAddressCount": 251
    }
}
# aws ec2 create-subnet --vpc-id vpc-9342fcf6 --cidr-block 10.0.3.0/24
{
    "Subnet": {
        "VpcId": "vpc-9342fcf6",
        "CidrBlock": "10.0.3.0/24",
        "State": "pending",
        "AvailabilityZone": "ap-northeast-1c",
        "SubnetId": "subnet-3adb7263",
        "AvailableIpAddressCount": 251
    }
}

ここまでで、下記のようにネットワークの枠までは出来た状態になっています。
f:id:kimutansk:20150923062015p:plain

4. NACLの設定

では、各サブネットに対してNACLの設定を行います。
行う設定としては、各スレーブサブネットに対して下記の設定を行えばいい・・となるはずです。
※ルール#の順に並んでいて、タイプ/プロトコル/ポート範囲は全部、インバウンド/アウトバウンド共通

送信元/送信先 可否 説明
10.0.1.0/24 許可 自サブネット内の通信を許可
10.0.0.0/24 許可 マスタサブネットとの通信を許可
10.0.0.0/16 拒否 上記に当てはまらないVPC内の通信を拒否
0.0.0.0/0 許可 外部との通信は許可
0.0.0.0/0 拒否 デフォルトルール

これまでの手順の結果4サブネットに対して同一NACLが適用されているため、個別のNACLを作成して設定します。
はじめに4サブネットに対して割り振られたNACLの紐づけを行っているNetworkAclAssociationIdを取得しておきます。

# aws ec2 describe-network-acls --filters Name=vpc-id,Values=vpc-9342fcf6
{
    "NetworkAcls": [
        {
            "Associations": [
                {
                    "SubnetId": "subnet-3adb7263",
                    "NetworkAclId": "acl-c7c24ca2",
                    "NetworkAclAssociationId": "aclassoc-d128ccb5"
                },
                {
                    "SubnetId": "subnet-02db725b",
                    "NetworkAclId": "acl-c7c24ca2",
                    "NetworkAclAssociationId": "aclassoc-cc28cca8"
                },
                {
                    "SubnetId": "subnet-0adb7253",
                    "NetworkAclId": "acl-c7c24ca2",
                    "NetworkAclAssociationId": "aclassoc-fe28cc9a"
                },
                {
                    "SubnetId": "subnet-39db7260",
                    "NetworkAclId": "acl-c7c24ca2",
                    "NetworkAclAssociationId": "aclassoc-d528ccb1"
                }
            ],
            "NetworkAclId": "acl-c7c24ca2",
            "VpcId": "vpc-9342fcf6",
            "Tags": [],
            "Entries": [
                {
                    "CidrBlock": "0.0.0.0/0",
                    "RuleNumber": 100,
                    "Protocol": "-1",
                    "Egress": true,
                    "RuleAction": "allow"
                },
                {
                    "CidrBlock": "0.0.0.0/0",
                    "RuleNumber": 32767,
                    "Protocol": "-1",
                    "Egress": true,
                    "RuleAction": "deny"
                },
                {
                    "CidrBlock": "0.0.0.0/0",
                    "RuleNumber": 100,
                    "Protocol": "-1",
                    "Egress": false,
                    "RuleAction": "allow"
                },
                {
                    "CidrBlock": "0.0.0.0/0",
                    "RuleNumber": 32767,
                    "Protocol": "-1",
                    "Egress": false,
                    "RuleAction": "deny"
                }
            ],
            "IsDefault": true
        }
    ]
}

NetworkAclAssociationIdは取得できたため、追加のNACLを作成します。
マスタサブネットは元々存在しているNACLで問題ないため、スレーブサブネット分の作成です。

# aws ec2 create-network-acl --vpc-id vpc-9342fcf6
{
    "NetworkAcl": {
        "Associations": [],
        "NetworkAclId": "acl-fac14f9f",
        "VpcId": "vpc-9342fcf6",
        "Tags": [],
        "Entries": [
            {
                "CidrBlock": "0.0.0.0/0",
                "RuleNumber": 32767,
                "Protocol": "-1",
                "Egress": true,
                "RuleAction": "deny"
            },
            {
                "CidrBlock": "0.0.0.0/0",
                "RuleNumber": 32767,
                "Protocol": "-1",
                "Egress": false,
                "RuleAction": "deny"
            }
        ],
        "IsDefault": false
    }
}
★以後NetworkAclId以外は同一のため省略
# aws ec2 create-network-acl --vpc-id vpc-9342fcf6
{
~~~~
        "NetworkAclId": "acl-c4c14fa1",
~~~~
}
[root@host1 ~ 06:41:40]# aws ec2 create-network-acl --vpc-id vpc-9342fcf6
{
~~~~
        "NetworkAclId": "acl-c7c14fa2",
~~~~
}

CLIで作成した場合、「全許容」のNACLエントリは存在しないため、それも追加する必要がありそうですね。

NACL設定が作成されたため、次はエントリの追加を行います。
こちらも3NACL分書くと長くなるため、1NACL分のみ。
1NACL分設定完了したら後はNetworkAclIdとサブネット内通信のCIDRを変えて実行すればOKです。

★自サブネット内の通信を許可
# aws ec2 create-network-acl-entry --network-acl-id acl-fac14f9f --ingress --rule-number 10 --protocol -1 --cidr-block 10.0.1.0/24 --rule-action allow
# aws ec2 create-network-acl-entry --network-acl-id acl-fac14f9f --egress --rule-number 10 --protocol -1 --cidr-block 10.0.1.0/24 --rule-action allow
★マスタサブネットとの通信を許可
# aws ec2 create-network-acl-entry --network-acl-id acl-fac14f9f --ingress --rule-number 20 --protocol -1 --cidr-block 10.0.0.0/24 --rule-action allow
# aws ec2 create-network-acl-entry --network-acl-id acl-fac14f9f --egress --rule-number 20 --protocol -1 --cidr-block 10.0.0.0/24 --rule-action allow
★上記に当てはまらないVPC内の通信を拒否
# aws ec2 create-network-acl-entry --network-acl-id acl-fac14f9f --ingress --rule-number 30 --protocol -1 --cidr-block 10.0.0.0/16 --rule-action deny
# aws ec2 create-network-acl-entry --network-acl-id acl-fac14f9f --egress --rule-number 30 --protocol -1 --cidr-block 10.0.0.0/16 --rule-action deny
★外部との通信は許可
# aws ec2 create-network-acl-entry --network-acl-id acl-fac14f9f --ingress --rule-number 100 --protocol -1 --cidr-block 0.0.0.0/0 --rule-action allow
# aws ec2 create-network-acl-entry --network-acl-id acl-fac14f9f --egress --rule-number 100 --protocol -1 --cidr-block 0.0.0.0/0 --rule-action allow

NACLのエントリ追加が終わったため、次はサブネットに対してNACLの紐づけを行います。
先ほど取得したNetworkAclAssociationIdを用いてサブネットに対するNACLの紐づけを行います。
尚、レスポンスの通り、このコマンドを実行するとNetworkAclAssociationId自体も変わるようです。

# aws ec2 replace-network-acl-association --association-id aclassoc-cc28cca8 --network-acl-id acl-fac14f9f
{
    "NewAssociationId": "aclassoc-5733d733"
}
# aws ec2 replace-network-acl-association --association-id aclassoc-d528ccb1 --network-acl-id acl-c4c14fa1
{
    "NewAssociationId": "aclassoc-5c33d738"
}
# aws ec2 replace-network-acl-association --association-id aclassoc-d128ccb5 --network-acl-id acl-c7c14fa2
{
    "NewAssociationId": "aclassoc-2433d740"
}

これまででNACLの設定が出来ましたので、実際に通信をして確認してみます。

5. 動作確認

では、各サブネットにインスタンスを起動して確認してみます。
まずはインスタンスを起動します。(※レスポンスは省略)

# aws ec2 run-instances --image-id ami-9a2fb89a --key-name default-keypair --instance-type t2.micro --subnet-id subnet-0adb7253 --associate-public-ip-address
# aws ec2 run-instances --image-id ami-9a2fb89a --key-name default-keypair --instance-type t2.micro --subnet-id subnet-02db725b --associate-public-ip-address
# aws ec2 run-instances --image-id ami-9a2fb89a --key-name default-keypair --instance-type t2.micro --subnet-id subnet-39db7260 --associate-public-ip-address
# aws ec2 run-instances --image-id ami-9a2fb89a --key-name default-keypair --instance-type t2.micro --subnet-id subnet-3adb7263 --associate-public-ip-address

その後、外部からSSHログインが可能なように各インスタンスにセキュリティグループを付与してログインします。
で、実際に各通信を試してみた結果は下記のようになりました。

マスタサブネット
★外部への通信確認→OK
[ec2-user@ip-10-0-0-243 ~]$ curl http://www.google.com
<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">
<TITLE>302 Moved</TITLE></HEAD><BODY>
<H1>302 Moved</H1>
The document has moved
<A HREF="http://www.google.co.jp/?gfe_rd=cr&amp;ei=DtsBVrCDG8Wm8wevxYbYCA">here</A>.
</BODY></HTML>
★マスタサブネットへの通信確認→OK
[ec2-user@ip-10-0-0-243 ~]$ ping -c 1 10.0.0.243
PING 10.0.0.243 (10.0.0.243) 56(84) bytes of data.
64 bytes from 10.0.0.243: icmp_seq=1 ttl=64 time=0.030 ms

--- 10.0.0.243 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.030/0.030/0.030/0.000 ms
★スレーブサブネット1への通信確認→OK
[ec2-user@ip-10-0-0-243 ~]$ ping -c 1 10.0.1.10
PING 10.0.1.10 (10.0.1.10) 56(84) bytes of data.
64 bytes from 10.0.1.10: icmp_seq=1 ttl=64 time=0.602 ms

--- 10.0.1.10 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.602/0.602/0.602/0.000 ms
★スレーブサブネット2への通信確認→OK
[ec2-user@ip-10-0-0-243 ~]$ ping -c 1 10.0.2.109
PING 10.0.2.109 (10.0.2.109) 56(84) bytes of data.
64 bytes from 10.0.2.109: icmp_seq=1 ttl=64 time=0.547 ms

--- 10.0.2.109 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.547/0.547/0.547/0.000 ms
★スレーブサブネット3への通信確認→OK
[ec2-user@ip-10-0-0-243 ~]$ ping -c 1 10.0.3.168
PING 10.0.3.168 (10.0.3.168) 56(84) bytes of data.
64 bytes from 10.0.3.168: icmp_seq=1 ttl=64 time=0.563 ms

--- 10.0.3.168 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.563/0.563/0.563/0.000 ms
スレーブサブネット1
★外部への通信確認→OK
[ec2-user@ip-10-0-1-10 ~]$ curl http://www.google.com
<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">
<TITLE>302 Moved</TITLE></HEAD><BODY>
<H1>302 Moved</H1>
The document has moved
<A HREF="http://www.google.co.jp/?gfe_rd=cr&amp;ei=eNoBVrzBE8f98wfngau4DA">here</A>.
</BODY></HTML>
★マスタサブネットへの通信確認→OK
[ec2-user@ip-10-0-1-10 ~]$ ping -c 1 10.0.0.243
PING 10.0.0.243 (10.0.0.243) 56(84) bytes of data.
64 bytes from 10.0.0.243: icmp_seq=1 ttl=64 time=0.506 ms

--- 10.0.0.243 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.506/0.506/0.506/0.000 ms
★スレーブサブネット1への通信確認→OK
[ec2-user@ip-10-0-1-10 ~]$ ping -c 1 10.0.1.10
PING 10.0.1.10 (10.0.1.10) 56(84) bytes of data.
64 bytes from 10.0.1.10: icmp_seq=1 ttl=64 time=0.032 ms

--- 10.0.1.10 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.032/0.032/0.032/0.000 ms
★スレーブサブネット2への通信確認→NG
[ec2-user@ip-10-0-1-10 ~]$ ping -c 1 10.0.2.109
PING 10.0.2.109 (10.0.2.109) 56(84) bytes of data.

--- 10.0.2.109 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 10000ms
★スレーブサブネット3への通信確認→NG
[ec2-user@ip-10-0-1-10 ~]$ ping -c 1 10.0.3.168
PING 10.0.3.168 (10.0.3.168) 56(84) bytes of data.

--- 10.0.3.168 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 10000ms
スレーブサブネット2
★外部への通信確認→OK
[ec2-user@ip-10-0-2-109 ~]$ curl http://www.google.com
<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">
<TITLE>302 Moved</TITLE></HEAD><BODY>
<H1>302 Moved</H1>
The document has moved
<A HREF="http://www.google.co.jp/?gfe_rd=cr&amp;ei=D9sBVqijJ8am8wfhkYWABg">here</A>.
</BODY></HTML>
★マスタサブネットへの通信確認→OK
[ec2-user@ip-10-0-2-109 ~]$ ping -c 1 10.0.0.243
PING 10.0.0.243 (10.0.0.243) 56(84) bytes of data.
64 bytes from 10.0.0.243: icmp_seq=1 ttl=64 time=0.641 ms

--- 10.0.0.243 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.641/0.641/0.641/0.000 ms
★スレーブサブネット1への通信確認→NG
[ec2-user@ip-10-0-2-109 ~]$ ping -c 1 10.0.1.10
PING 10.0.1.10 (10.0.1.10) 56(84) bytes of data.

--- 10.0.1.10 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 10000ms
★スレーブサブネット2への通信確認→OK
[ec2-user@ip-10-0-2-109 ~]$ ping -c 1 10.0.2.109
PING 10.0.2.109 (10.0.2.109) 56(84) bytes of data.
64 bytes from 10.0.2.109: icmp_seq=1 ttl=64 time=0.025 ms

--- 10.0.2.109 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.025/0.025/0.025/0.000 ms
★スレーブサブネット3への通信確認→NG
[ec2-user@ip-10-0-2-109 ~]$ ping -c 1 10.0.3.168
PING 10.0.3.168 (10.0.3.168) 56(84) bytes of data.

--- 10.0.3.168 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 10000ms
スレーブサブネット3
★外部への通信確認→OK
[ec2-user@ip-10-0-3-168 ~]$ curl http://www.google.com
<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">
<TITLE>302 Moved</TITLE></HEAD><BODY>
<H1>302 Moved</H1>
The document has moved
<A HREF="http://www.google.co.jp/?gfe_rd=cr&amp;ei=ENsBVtnxOen98wf7iJm4Cw">here</A>.
</BODY></HTML>
★マスタサブネットへの通信確認→OK
[ec2-user@ip-10-0-3-168 ~]$ ping -c 1 10.0.0.243
PING 10.0.0.243 (10.0.0.243) 56(84) bytes of data.
64 bytes from 10.0.0.243: icmp_seq=1 ttl=64 time=0.457 ms

--- 10.0.0.243 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.457/0.457/0.457/0.000 ms
★スレーブサブネット1への通信確認→NG
[ec2-user@ip-10-0-3-168 ~]$ ping -c 1 10.0.1.10
PING 10.0.1.10 (10.0.1.10) 56(84) bytes of data.

--- 10.0.1.10 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 10000ms
★スレーブサブネット2への通信確認→NG
[ec2-user@ip-10-0-3-168 ~]$ ping -c 1 10.0.2.109
PING 10.0.2.109 (10.0.2.109) 56(84) bytes of data.

--- 10.0.2.109 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 10000ms
★スレーブサブネット3への通信確認→OK
[ec2-user@ip-10-0-3-168 ~]$ ping -c 1 10.0.3.168
PING 10.0.3.168 (10.0.3.168) 56(84) bytes of data.
64 bytes from 10.0.3.168: icmp_seq=1 ttl=64 time=0.024 ms

--- 10.0.3.168 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.024/0.024/0.024/0.000 ms

まとめると下記のようになります。

インスタンス 外部への通信 マスタサブネットへの通信 スレーブサブネット1への通信 スレーブサブネット2への通信 スレーブサブネット3への通信
マスタサブネットインスタンス OK OK OK OK OK
スレーブサブネット1インスタンス OK OK OK NG NG
スレーブサブネット2インスタンス OK OK NG OK NG
スレーブサブネット3インスタンス OK OK NG NG OK

とりあえず、NACLで目的としたVPC内の直接通信を防ぐ設定は成功したようです。
NACLエントリの作成やサブネットへの紐づけ等コマンドをうつ回数は多くなりましたが、CLIで実際に出来ることも確認できました。

ただ、この手の構築はCloudformationでやってみたいところですので、機会があればそれも出来ないか確認してみますか。

Twitterの新ストリーム処理基盤、Heronのアーキテクチャは?(詳細

こんにちは。

前回論文の前半部、Stormの問題点を読みましたが、
今回は中盤部、Twitter Heronのアーキテクチャについてです。
あと、後半部のStormとHeronの性能比較については下記のページでまとめているのの
事例が増えただけでしたので、とりあえず省略する方向で^^;kimutansk.hatenablog.com

では、前回の続きです。

5. Heron

5.1 Data Model and API

Heronの主要な設計目標はStormのAPI互換性を維持すること。
そのため、HeronのデータモデルはStormと同様のものとなる。
StormのようにHeronはTopologyを実行し、SpoutとBoltの有向非循環グラフとなる。
同様に、SpoutはTopology内に入力Tupleを生成or外部から取得し、Boltは実際の計算処理を行う。

Heron Topologyはデータベースシステム内の論理実行計画に相当する。
データベースシステムと同様に、この論理実行計画は実際に実行する前に物理実行計画に変換される。
Topologyの一部として、プログラマはタスク(Spout/Bolt)と、並列度、あとはTupleがタスク間をどう流れるかを示すグルーピングを指定する。
実際のTopology、各コンポーネントの並列度、グループ化の仕様からマシン上で実際に実行される物理実行計画は構成される。

HeronのTuple処理セマンティクスはStormのそれと下記のように似ている。

  • 1. At most once

全てのTupleは最大1回実行される。Topologyの状況次第では欠落することもあるが、複数回処理されないことは保証される。

  • 2. At least once

全てのTupleは最低1回実行される。Topologyの状況次第では複数回実行されることもあるが、少なくとも1回実行されることは保証される。

5.2 Architecture overview

Heronに対して求められる要素は、下記がある。

  • タスク管理の容易性向上
  • 開発者の開発生産性向上
  • 性能の予測可能性向上

そのため、我々はTwitterのスケール上でシステム内の異なるコンポーネントの綺麗な抽象化を行った上で、コンポーネント間の相互接続をしつつ動作させることが出来るアーキテクチャを構築する必要があった。

Heronの全体的なアーキテクチャは下図の通り。
f:id:kimutansk:20150715052542j:plain

ユーザはHeronのコマンドラインツールを利用してAuroraにTopology(Heron APIを利用して作成したSpout/Bolt群)をデプロイします。
AuroraはMesos上で実行される汎用的なサービススケジューラである。
だが、Heron Topologyは基本Auroraで動作するが、YARNやAmazon ECS上でも同様にデプロイすることが出来た。
この設計は、Nimbusがスケジューリングにも利用されていたという問題状況を脱するため。
Twitterの内製スケジューラのAuroraやその他のオープンソースのスケジューラ(YARN等)が洗練されてきているため、自前でスケジューラを作るのではなく、これらを利用して自分達は別のものを開発した方が有効だと判断した。

Heron Topologyは下図のようにいくつかのコンテナからなるAurora Jobとして構成される。
f:id:kimutansk:20150715052711j:plain

1コンテナはTopology Masterと呼ばれるプロセスを実行する。
残りのコンテナはStream Manager、Metrics Managerと、実際のSpout/Boltを実行するいくつかのHeron Instanceというプロセスから構成される。
複数のコンテナは単一の物理ホスト上で起動可能。
これらのコンテナはクラスタ内の各ノードの残リソース量にに応じてAuroraによってノードを跨いでスケジューリング/配置される。
TwitterではAuroraはリソース配分をLinux cgroupsを用いて行っている。)
また、Topology Masterの待機系も可用性向上のため利用することが出来る。
Topologyのメタデータ(起動ユーザ、稼働時刻、プロセス配置等)はZookeeperに保存される。

Heron InstanceはJavaで書かれたユーザコードを実行する必要があるため、Javaで記述されている。コンテナ内のHeron Instance毎にJVMが起動する形となる。
Heron Instanceは互いにprotocol buffers (protobufs)を用いて通信を行う。

5.3 Topology Master

Topology Master(TM)はTopology全体を管理するためのプロセス。
これはTopologyの状態を見るための単一のエンドポイントを提供するためであり、立ち位置としてはYARNのApplication Masterとよく似ている。
TMは起動時にZookeeper上にエフェメラルノードを作成し、外部からTMを検出可能としている。
エフェメラルノードを利用するのは下記2つの目的がある。

  1. あるTopologyに対して複数のTMが同時にMasterとなり、異なるViewを提供することを防止するため。
  2. TMに所属する他のプロセスがTMを検出可能とするため。

TMはTopologyのメトリクスを提供するエンドポイントとしても動作する。ただし、TMはデータ処理の経路上には存在しないため、そのことがボトルネックにはならないことを留意されたい。

5.4 Stream Manager

Stream Manager (SM)の主な機能は効率的にTupleのルーティングを行うことにある。
各Heron Instance(HI) は同一コンテナ上のSMに接続し、Tupleの送受信を行う。
Topology中の各SMは相互にコネクションをはり、{ \displaystyle O(k^2) }のコネクションから構成されるネットワークを構築し、通信を行う。
HIの数をnとすると、nはkより明確に大きくなる。SMが相互接続を行うことによって、{ \displaystyle O(k^2) }の物理コネクション上に{ \displaystyle O(n^2) }の論理コネクションを多重化して配置したオーバレイネットワークを構築したモデルになっている。
また、同一コンテナ中のHI同士の通信はローカルの短絡機構を用いてルーティングされる。

5.4.1 Topology Backpressure

Stormと異なり、Heronは動的にTopologyに流れるデータ流量を制御するBackPressure機構を有している。
この機構は異なるコンポーネントが異なる速度でデータ処理を行うTopologyにおいて重要な機構となる。
例えば、下流の実行状況が遅延したり、データのずれによって実行が遅れたケースにおいてデータパイプラインがどうなるかを考えて欲しい。
なお、このケースにおいては上流のコンポーネントの速度低下が問題になることはないとする。その場合、下流でデータを処理しきれなかった時点でバッファ上の長いキューを構成するか、Tupleを廃棄するほか無くなる。
Tupleが破棄されるということは、上流コンポーネントによる処理は行われているため、データの損失は勿論、パフォーマンス的な損失も大きい。
BackPressure機構は上記のような事情から、上流コンポーネントの速度を遅くするために必要とされる。

そのため、我々は実際にどういう方針で実装するかを下記のオプションから検討を行った。

  • TCP BackPressure

この方式では、HIから他の上流コンポーネントに対してBackPressureを伝播させるのにTCP Window機構を利用する。
HIとSM(同一コンテナ上)はTCPソケットを用いて接続するため、SMとデータを送受信する速度はHIの処理速度と等しくなる。
HIの速度が停止している場合は受信バッファが一杯になる。
SMはHI側の受信バッファの溢れを送信バッファの状態から検知し、他のSMと上流のHIに伝播させる。

この場合、速度が低下したHIがキャッチアップした場合のみ、BackPressure状態が解除されることになる。

この単純なTCP BackPressure機構は実装が容易となる。だが、この方式によるHI間のBackPressure機構はHI同士の論理コネクションがSM間の物理コネクション上にオーバーレイされているため、実際は上手くは動作しなかった。
多重化によって誤って違う上流HIの速度低下を招くだけでなく、異なる下流HIのストリームの速度低下も招いてしまった。結果、1か所の速度低下がTopology全体に大きく影響をあたえ、かつBackPressure状態の解除も非常に遅いものとなった。

  • Spout BackPressure

この方式ではSMがTopologyが外部から取得するデータ量を減少させるため、同一コンテナ上に存在するSpoutの速度を絞る。
この方式はSMとHIの間のTCP BackPressureを組み合わせて使用される。
SMは同一コンテナに所属するHIの処理速度が低下したのを検知した場合、ローカルに存在するSpoutを特定してデータの読み取りを停止する。
この方式はSMがローカルから受信するSpoutのバッファ(=Spoutの送信バッファ)を一杯になるため、Spoutからの受信メッセージを結果的にブロックする形になる。
影響を受けたSMはローカルに存在するSpoutを絞るための「BackPressure開始」メッセージを他SMに送信する。
他のSMはこのメッセージを受信すると、ローカルに存在するSpoutがTupleを読み込まないよう制御する。
速度が低下したHIの処理が追いついた場合、ローカルのSMは「BackPressure終了」のメッセージを他SMに送信し、他のSMはこのメッセージの受信に依ってローカルのSpoutからのデータ取得を再開する。
この方式は最上位のコンポーネントであるSpoutを直接ブロックする形になる。
この方式は必要以上にSpoutの処理速度を低下させる可能性があるため、最適ではないかもしれない。ただ、必要になったタイミングで即上流コンポーネントの処理速度を低下させることが出来るため、必要十分な機能でもある。
この方式の潜在的な欠点はBackPressure機構の開始終了を制御する特殊メッセージとなる。
しかし、この方式はTopologyの保持するSpout/Boltの数によってBackPressure機構の有効無効にかかる時間が変動しないという利点もある。

  • Stage-by-Stage Backpressure

Topologyは複数のステージからなる、とみなすことが出来る。
この方式では、Topologyにおいて、1段目を示すSpoutに達するまでBackPressureを1ステージ毎に伝播させる。
SpoutBackPressure方式と同様に、この方式はSMとローカルのHIのTCP BackPressure機構によって実現される。だが、SM間で利用されるBackPressureのメッセージの内容が異なってくる。

5.4.2 Implementation

Heronでは最終的に「Spout BackPressure」の方式を採用した。
この機構は実際に上手く機能した。また、Topologyのどこかが詰まった場合にそのイベントの発生元を確認することでBackPressureの発動の根本原因を突き止めることを容易にするデバッグ容易性も提供することが出来た。

全てのソケットチャネルはHighWaterMarkとLowWaterMarkによってサイズが制限されるアプリケーションレベルのバッファに紐づけられている。
バッファのサイズがHighWaterMarkに達した場合、LowWaterMarkを下回るまでBackPressureが有効化される。この設計を取った根拠としては、BackPressureの緩和に伴う極端なデータ出力/入力から来る急速なブレからTopologyを守るためとなっている。

この設計の結果、TupleがSpoutから送信された場合、Heronはプロセスまたはマシンの障害のシナリオの間を除き、それをDropしないということが確定される。そのため、Tupleの失敗をより確定的にすることが出来る。

TopologyがBackPressure状態になった場合、最も遅いコンポーネントにあわせてしばらく処理が行われることになる。
そのシチュエーションが継続した場合、Spoutが取得したデータが「Source」キューとして蓄積されることになる。Topologyの設定に応じて、Spoutは蓄積された古いデータを削除することも可能となる。

5.5 Heron Instance

Spout/BoltといったTopologyを構成するメインの処理はHeron Instance(HI)に切りだされて行われる。
StormのWorkerと違い、HIは単一のタスク(Spout/Bolt)を実行するJVMプロセスである。
このような設計にすることによって、各Spout/Boltのイベントやログのシーケンスを1タスク単位で見ることが出来るため、デバッグやプロファイリングが容易になる。
データ移動の複雑さはHIからSMに移譲されているため、HIを将来的に他の言語で開発することも可能になるだろう。
HIを実装するにあたり、「シングルスレッドモデル」「2スレッドモデル」のオプションを検討した。以後にその経過を記述する。

5.5.1 Single-threaded approach

シングルスレッドの設計においては、メインスレッドがローカルSMへの接続を維持し、Tupleを待つ。
Tupleが到着するとユーザロジックコードがメインスレッドによって呼び出される。
ユーザロジックコードが出力Tupleを生成する場合はバッファリングされている。
バッファが特定の閾値を超えるとローカルSMに通知される。
この方式はシンプルであるという利点を持つが、下記のようにユーザロジックコードがメインの処理を阻害する可能性があるという潜在的リスクも有する。

  • スリープシステムコールの呼び出し
  • ファイルやネットワークとの入出力待ち
  • スレッド同期待ち

我々はこの方式でまず実装を行い、上記のようなブロッキングはメトリクスの報告等必要な定期処理のために望ましくないものであることに気付いた。
ブロッキングの持続時間が変わる可能性もあるため、予期しない動作にもつながってしまう。
メトリクスが定期的に収集できなかった場合、収集元は確実にHIが「悪い」状態であるか否かを判断、トラブルシューティングを行うことができなくなる。

5.5.2 Two-threaded approach

下図に示すように、この設計方式ではHIはGatewayスレッドとタスク実行スレッドの2つのスレッドを保持している。
f:id:kimutansk:20150716052408j:plain
GatewayスレッドはHIと外部との全ての通信とデータの移動を制御することを受け持っている。同様に、ローカルSMとメトリクスマネージャへのTCP接続も維持している。
加えて、ローカルSMから送信されたTupleを受信する責任も持つ。これらの受信Tupleはタスク実行スレッドに送信され、実際の処理が行われる。

タスク実行スレッドにおいてはユーザロジックコードを実行する。
タスク実行スレッドが開始されると、それぞれSpout/Boltのどちらを実行しているかに依ってopen/prepareメソッドを呼び出して初期化処理を行う。
Tupleを受信した時、Boltを実行しているタスク実行スレッドはexecuteメソッドを呼び出し、Tupleの処理を行う。
Spoutの場合はデータソースからデータを取得するnextTupleメソッドを継続的に呼び出し、TopologyにTupleとしてこのデータを送信する。
Spout/Boltから送信されたTupleはローカルSMにTupleを転送するGatewayスレッドに送信される。

Tupleを処理することに加えて、タスク実行スレッドは「Tupleの処理する」「Tupleの送信数」「TupleへのAck送信数」「Tuple処理にかかった処理時間」等、いくつかのメトリクス情報の収集も行う。

先ほどのHeronのスレッド構成の図にに示されたように、Gatewayスレッドとタスク実行スレッドは3つの一方向キューを用いて通信を行う。

Gatewayスレッドはdata-inキューをタスク実行スレッドが処理するTupleを送信するために用いる。
タスク実行スレッドはdata-outキューをTopologyの他要素に対して送信したいTupleをGatewayスレッドに送信するために用いる。
metrics-outキューはタスク実行スレッドが収集したメトリクス情報をGatewayスレッドに渡すために用いる。

data-inキュー、data-outキューのサイズは制限がかけられている。
data-inキューのサイズがこの値を超過した場合、GatewayスレッドはローカルSMからの読み取りを中止する。
この動作によって、ローカルSMのBackPressure機能をトリガする形になる。

data-outキューのサイズが制限を超えた場合もGatewayスレッドはローカルSMがこれ以上データを受信できないことを想定することが出来、タスク実行スレッド側でこれ以上Tupleの処理/通信を行うべきでないことがわかる。

我々は制限付きキューを保持したTopologyを運用環境で実行した場合に予期しないGCの問題に遭遇した。
それまで全て正常に動作していたものの、ネットワークの切断によってGatewayスレッドがdata-outキューからデータを送信できない状態になった。
Tupleはdata-outキューにバックアップされはじめた。
このような状況はHIのメモリ上限に達するという結果をもたらした。ネットワークが復旧した際にGatewayスレッドはローカルSMに対してデータを送信するだけでなく、並行してデータの受信も再開する。
GatewayスレッドがTupleを送信する前にローカルSMからデータの受信を始めてしまうとdata-outキューで既にメモリの上限に達していた場合にGCを頻発させてパフォーマンスの低下を招いてしまう。

このようなGCの問題を回避するためには、data-outキューとdata-inキューのサイズを定期的に確認し、それによって制限値を増減させる対処を取っている。
キューの容量は継続的にサイズが増大し続ける場合はその時点の最終容量の半分まで容量が低減される。
キューの容量が安定して一定の値に戻るか、または0になるまでこの機構は定期的に呼び出される。
キューの容量が0になると新しいTupleを投入することが可能となり、結果新たなTupleを生成することにもなる。結果として、GCの問題から回復することも容易となる。
同様に、キュー内に保持されたTupleが設定されたリミットよりも小さくなった場合、設定された制限値か最大値に達するまでキャパシティを徐々に増大させることも行っている。

5.6 Metrics Manager

Metrics Manager(MM)はシステムの全コンポーネントからメトリクスを収集し、出力する。
これらのメトリクスにはTopologyのシステムメトリクスとユーザメトリクスが含まれている。
MMは各コンテナ上に存在し、SMとHIはそのMMに対してメトリクスの通知を行う。

収集されたメトリクスは各コンテナから内製のモニタリングシステムに送信される。
MMは併せて外部UIに表示するための情報をTopology Masterに送信する。
このようにコンテナ単位でMMを保持してそこから外部にメトリクスを送信するという仕組みによって柔軟に他のモニタリングシステム(GanliaやGraphite等に将来的に対応予定)に対する対応が可能となっている。

5.7 Startup Sequence and Failure Scenarios

TopologyがHeronに対してsubmitされると、初期化シーケンスが起動する。

Submit時、リソーススケジューラ(TwitterではAurora)はクラスタ上に存在する各マシン上のリソース状況を確認し、必要なリソースを保持しているマシンに対してコンテナの割り振りを行う。
Topology Master(TM)は一つ目のコンテナとして起動し、Zookeeperに対してディスカバリ用のエフェメラルノードを登録する。
一方、各コンテナ中のStream Manager(SM)はTMを検出するためにZookeeperに対して問い合わせを行う。
SMはTMに接続し、定期的にHeartBeatを送信する。

全てのSMが接続されると、TMは各コンテナに対してTopologyのタスク(Spout/Bolt)の割り当てを行う。
これを物理実行計画と呼んでいる。
割り当てが完了すると、SMは互いを検出するためにTMから物理実行計画を取得する。
現状、SMは互いに接続を行いメッシュ状のネットワークを構成している。

一方、HIは同一コンテナ上のSMを検出して物理実行計画の一部を取得して処理を開始する。これらの初期化ステップが完了後、データ/TupleはTopologyを流れ始める。

障害発生時の保護のため、TMはZookeeperに対して物理実行計画を保存する。

Topology実行中には複数の障害シナリオが存在し、Topologyの部分的に影響を与えるものから、全体に影響を与えるものもまである。
これらのシナリオはプロセスダウン、コンテナ障害、ハード障害といった要素で構成される。

TMがダウンした場合、コンテナはダウンしたプロセスを再起動させる。TMはZookeeperから状態を復旧する。
TMの待機系が存在する場合はTM待機系がマスターとなり、再起動したTMが待機系となる。
TMに接続を行っているSMはZookeeperから新たなTMを検出し、接続する。

SMがダウンした場合、TMと同様に同一コンテナ上で再起動される。起動後、SMはTMに接続して物理実行計画を確認し、変化が発生していないかを確認する。
他のSMは障害が発生したSMと接続が切断されてしまうが、新たなSMの位置を示す実行計画を取得し、それを基に新たなSMに接続する。

HIがダウンした場合はコンテナ内で再起動され、ローカルのSMと再接続する。
その際にSMから物理実行計画を取得し、自分が何を担当すべきだったかを認識して再度ユーザロジックコードを実行開始する。

コンテナレベルでの障害が発生して別マシンにコンテナが配置された場合、新たなコンテナ中のSMはTMを検出し、SMやHIの障害のシナリオと同じ流れに従う。

5.8 Architecture Features: Summary

最後に、Heronのアーキテクチャを見るにあたっては下記の要素に注目してほしい。

  1. リソースのプロビジョニング(コンテナやTM)がクラスタマネージャから制御可能なように抽象化され、共有のインフラ上での動作が容易になっていること。
  2. 各Heron Instanceがシングルタスクを実行するように動いているため、デバッグやjstask、ヒープダンプといった解析が容易になっていること。
  3. Topologyのコンポーネントのメトリクスが分離され、かつ透過的に取得できるようになっているため、システム内でどのプロセスに問題があるかを明確にマッピングすることが出来ること。
  4. コンポーネントレベルでリソース割り当てを可能にしたことに依り、Heronにおいては必要なリソースを明確にすることが出来、オーバープロビジョニングやリソースの無駄を回避できること。
  5. Topology毎にTopology Masterが存在することにより、各Topologyは互いに独立して動作/管理が可能になった。また、あるTopologyでの障害(主にユーザロジックコード上のBoltで発生)は他のTopologyに影響を与えなくなった。
  6. BackPressureの機構により、一定の処理速度と、性能の予測が立てられるようになった。また、BackPressureはTopologyのコンテナ群を別のコンテナ群にマイグレーションする際のキー技術にもなりえる。
  7. 単一障害点が無くなった。

6. Heron in Production

実際にHeronをプロダクション環境で動作させるにあたって、以下のような追加機能が必要になる。

  1. Topologyを容易に操作可能とする機能
  2. Topologyのメトリクス、傾向を確認可能とする機能
  3. Heron Instanceで発生した例外を確認可能とする機能
  4. Topologyのログを確認可能とする機能

これらの機能に対応するために、Twitterでは以下のコンポーネントを追加開発した。

  1. Heron Tracker

Heron TrackerはTopologyに関する様々な情報にアクセスするためのGatewayとして動作する。
具体的にはTopologyがメタデータを保存しているZookeeperとのインタフェースと、Topologyに関する追加情報を収集する。
TrackerはZookeeperの状態を監視し、Topologyの起動/終了や、物理実行計画の変更(例えば、コンテナが別ホストに移動する等)を追跡し続けている。
これらの情報に加えて、TrackerはTopology Masterを検出し、TMが保持している追加の関連メトリクスを取得することにも使用している。

Trackerは明確に定義されたRest APIを用いてこれらの情報を公開しており、追加のツールを用いてデータの取得を行うことも容易になっている。
APIはTopologyの論理実行計画/物理実行計画、ユーザ定義メトリクス/システムメトリクスも含む各種メトリクス、各HIのログに対するリンク、Aurora上のJobページのリンク等を提供する。
TrackerはAuroraのServiceとして実行され、複数インスタンス上で実行することで耐障害性を確保している。APIのリクエストはこれらのインスタンス間でバランシングされる。

  1. Heron UI
  2. Heron Viz

これらの追加コンポーネントの実システム上の配置は下図のようになる。
f:id:kimutansk:20150716052440j:plain

6.1 Heron Tracker

Heron TrackerはTopologyに関する様々な情報にアクセスするためのGatewayとして動作する。
具体的にはTopologyがメタデータを保存しているZookeeperとのインタフェースと、Topologyに関する追加情報を収集する。
TrackerはZookeeperの状態を監視し、Topologyの起動/終了や、物理実行計画の変更(例えば、コンテナが別ホストに移動する等)を追跡し続けている。
これらの情報に加えて、TrackerはTopology Masterを検出し、TMが保持している追加の関連メトリクスを取得することにも使用している。

Trackerは明確に定義されたRest APIを用いてこれらの情報を公開しており、追加のツールを用いてデータの取得を行うことも容易になっている。
APIはTopologyの論理実行計画/物理実行計画、ユーザ定義メトリクス/システムメトリクスも含む各種メトリクス、各HIのログに対するリンク、Aurora上のJobページのリンク等を提供する。
TrackerはAuroraのServiceとして実行され、複数インスタンス上で実行することで耐障害性を確保している。APIのリクエストはこれらのインスタンス間でバランシングされる。

6.2 Heron UI

HeronのユーザはビジュアライズされたUIを用いてTopologyの状態をインタラクティブに確認することが出来る。
Heron UIはHeron Tracker APIを使用し、Topologyの論理実行計画/物理実行計画を視覚的に表現し、表示する。
論理実行計画は一意に色分けされた各ノードと有向非巡回グラフが表示される。
物理実行計画はホストを表す内側の同心円、コンテナを描く中間の円、およびコンポーネントを示す外側の円として表示される。

ユーザはこれらのUIを用いてドリルダウン的にTupleの送信数/完了数/実行時遅延/Ack数/Fail数といったメトリクス情報を10分、1時間、3時間、起動からの総計といった時間単位で区切って確認することが可能。

これらの機能に加えて、Heron UIはインスタンスに関連付けられたログや例外を表示するためのアクセスリンクを提供し、デバッグ容易性を高めている。

下図に5段階のTopologyの可視化の一部を示す。
f:id:kimutansk:20150717060619j:plain

6.3 Heron Viz

Heron VizはMetrics Managerから収集したメトリクスを確認するダッシュボードを生成するサービスである。
Vizは定期的にTrackerにアクセスし、新規Topologyが存在するかを確認している。
新規のTopologyが存在した場合、Vizと呼ばれるグラフ生成のためのHTTP APIを使用し、Dashboardのグラフを生成する。
ダッシュボードを生成するために、VizはTopologyの論理実行計画を取得し、収集したメトリクスがどのコンポーネントマッピングされるかを特定する。(Soput/Boltの存在や、各コンポーネントがどれだけのインスタンスを保持しているかがわからないと詳細情報をマッピングできないため)

大きく分けるとVizのTopologyダッシュボードの表示するメトリクスは以下のメトリクスにカテゴライズ出来る。

  1. 健全性監視メトリクス
  2. リソース監視メトリクス
  3. コンポーネントメトリクス
  4. Stream Managerメトリクス

健全性監視メトリクスではTopology全体の遅延状況や、SpoutにおけるTuple処理失敗カウント、SMの生存状態といった内容が確認可能。

リソース監視メトリクスではCPUの割り当て量と実使用量、メモリの割り当て量と実使用量、GCに費やした時間といった内容が確認可能。

コンポーネントメトリクスでは各Spoutに対してTupleの送信数/Fail数/Ack数といった情報を含む。
また、TupleがSpoutから送信されてから最下流のBoltで処理完了するまでのEndtoEndのレイテンシも含んでいる。
併せて、各Boltに対してTupleの処理数/Ack数/送信数やBoltでの処理平均時間といった情報も確認可能になっている。

Stream Managerメトリクスでは各SMからメトリクスを収集し、HIからの受信Tuple数/送信Tuple数、他SMやHIと送受信する過程でDropしたTuple数、BackPressureが有効になった総時間といった内容が確認可能。

ダッシュボードの部分サンプルは下図の通り。
f:id:kimutansk:20150717062926j:plain

6.4 Heron@Twitter

Twitterでは既にStormは使用しておらず、Heronがストリーム処理の基本となっている。
ここ数カ月で既に数百のTopologyを複数のデータセンターで運用している。
これらのTopologyで何十TBのデータを処理し、何十億の出力を行っている。

Topologyは多様な構成となっており、Topologyの多くはSpout/Boltが3段以下(Spout>Boltの階層が3段階、ということと思われます)のものとなっている。
ただ、それ以上の階層を持つTopologyも存在しており、最長のものは8段に達する。

これらのTopologyの使用事例は多様であり、下記のようなものを含む。

  1. フィルタリング
  2. Twitter上の複数のストリームを統合(例えば、複数のストリームの総合値)
  3. 機械学習アルゴリズム(回帰、関連付け、クラスタリング

Twitterでは様々なグループがHeronを使用している。
これらのグループはユーザサービス、収益成長率、検索、コンテンツ発見といったチームで構成される。

また、これら全体をStormからHeronに移行した結果、インフラの利用効率が大幅に向上し、ストリーム処理に使用するハードを3分の1程に削減することが出来た。

======

と、こんな感じでHeronのアーキテクチャ部を読んでみました。
色々設計の検討過程も含めてわかるというのはやはり読んでいて面白いですし、参考になりますね。
Stormと同様で相変わらずググラビリティが最悪な状態は改善されていませんでしたが。

あとは、Topology(元々はStormのTopology)のSpout/Boltの段数が大部分は3以下、Twitter機械学習なども含めても最大8、というのは違う意味で参考になりました。
つまりは元々のStorm上でTopologyを作成する際もコンポーネントの段数は基本それくらいにおさえておくのがリソース効率的に優れている、ということなのだと思います。

これまで読んできましたが、今使っているStormをより効率よく使うための情報も多分に含まれていたため、現時点でもかなり有用な内容だったと思います。

あとは、Heronが実際に公開されるのを待つばかりですが・・
おそらく、TwitterのブログでもHeronの記事に「OSS Product」とタグ付けされていたため、公開される流れはあるように思われます。
それを楽しみに待つことにしましょうか。


それでは。

Docker-Registrator(Normal/internal)でConsulに登録される内容は?

こんにちは。

最近、Dockerでマルチコンテナのクラスタを組もう、ということをやっています。
で、そこで課題になってくるのがIPアドレスが一定しない、ということですよね。

ですので、そのためにConsul等のサービスディスカバリの仕組みを使う形になります。
ただ、Dockerコンテナの内部からConsulに登録するのはいまいち面倒・・ということで、
自動登録が可能なDocker Registratorを2パターン試してみました。

とりあえず、出来ると構成は下記のようになる・・はず。
f:id:kimutansk:20150712191036p:plain

尚、OSはCentOS7、あとFirewalldは無効化してiptablesを用いています。
理由は後ほど(次回以降?)

1. OS側準備

まずはDockerインストールなどの諸々のセットアップを行います。
尚、rootユーザでログインを予め許容しておき、rootユーザでセットアップを行います。
Selinux無効化

# setenforce 0
# sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config

IPv6無効化

# echo "net.ipv6.conf.all.disable_ipv6 = 1" >> /etc/sysctl.conf
# echo "net.ipv6.conf.default.disable_ipv6 = 1" >> /etc/sysctl.conf
# sysctl -p

■各ホストの名称追加

# echo "192.168.100.231 host1" >> /etc/hosts
# echo "192.168.100.232 host2" >> /etc/hosts
# echo "192.168.100.233 host3" >> /etc/hosts

■firewalldを無効化&iptablesを有効化

# yum install iptables-services
# systemctl status firewalld
# systemctl stop firewalld
# systemctl disable firewalld
# systemctl enable iptables
# systemctl start iptables
# systemctl status iptables

■Dockerインストール&有効化

# curl -O -sSL https://get.docker.com/rpm/1.7.0/centos-7/RPMS/x86_64/docker-engine-1.7.0-1.el7.centos.x86_64.rpm
# yum localinstall --nogpgcheck docker-engine-1.7.0-1.el7.centos.x86_64.rpm
# systemctl enable docker.service
# systemctl restart docker

2. Atlasを使ってConsul Clusterを組んでみる

どうせですので、Atlasを使ってConsul Clusterを組んでみることにします。
まず、下記のページにアクセスしてアカウントを作成します。
「Sign up & Tutorials」からアカウントは作成可能です。
Atlas by HashiCorp

で、作成してログインすると下記のようなページに遷移します。
f:id:kimutansk:20150712193546j:plain
右上の自分のアカウント名をクリックすると設定画面に飛ぶので、そこから「Tokens」をクリックすると下記の画面に飛びますので、名前を指定してTokenを発行します。
尚、一度発行した後は2度と見れないため、発行時にきちんと残しておきましょう。
f:id:kimutansk:20150712193842j:plain
という形で、Tokenの発行が出来ました。これを用いて構築します。

では、各ホストでiptablesの許容定義追加とconsulインストールと設定を行います。
余分な設定かもしれませんが、念のため。
iptables許容定義追加

# iptables -I INPUT 5 -p tcp --dport 53 -j ACCEPT
# iptables -I INPUT 5 -p udp --dport 53 -j ACCEPT
# iptables -I INPUT 5 -p tcp --dport 8300 -j ACCEPT
# iptables -I INPUT 5 -p tcp --dport 8301 -j ACCEPT
# iptables -I INPUT 5 -p udp --dport 8301 -j ACCEPT
# iptables -I INPUT 5 -p tcp --dport 8302 -j ACCEPT
# iptables -I INPUT 5 -p udp --dport 8302 -j ACCEPT
# iptables -I INPUT 5 -p tcp --dport 8400 -j ACCEPT
# iptables -I INPUT 5 -p tcp --dport 8500 -j ACCEPT
# iptables -I INPUT 5 -p udp --dport 8500 -j ACCEPT
# /usr/libexec/iptables/iptables.init save

■consulインストール&設定

# rpm -iUvh http://dl.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-5.noarch.rpm
# yum install -y jq
# wget https://dl.bintray.com/mitchellh/consul/0.5.2_linux_amd64.zip
# wget https://dl.bintray.com/mitchellh/consul/0.5.2_web_ui.zip
# unzip 0.5.2_linux_amd64.zip
# mkdir /opt/consul
# mv consul /opt/consul/
# ln -s /opt/consul/consul /usr/local/sbin/consul
# mkdir /opt/consul/data
# unzip 0.5.2_web_ui.zip
# mv dist /opt/consul/webui
# mkdir /etc/consul.d/
# echo '{"ports": {"dns": 53}, "recursor": "8.8.8.8" }' | jq . > /etc/consul.d/consul.json

では、先ほど発行したTokenを用いてconsul clusterを構築してみます。
尚、「-atlas」と「-atlas-token」の値はダミーです。

実際は、「-atlas」にはAtlasのアカウントID/Token名称を、
「-atlas-token」には発行したTokenを設定します。

# export HOST_IP=$(ifconfig eno16780032 | grep 'inet ' | awk '{ print $2  }')
# nohup consul agent -server -bootstrap-expect 3  -atlas-join -atlas=consul/test -atlas-token=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX -client ${HOST_IP} -data-dir /opt/consul/data -ui-dir /opt/consul/webui -config-dir /etc/consul.d -config-file /etc/consul.d/consul.json &

すると・・・?
コンソール出力(nohupなので正確にはそこから読んでます)に下記のように出力され、クラスタが構築されたことが確認できました。
素晴らしい。

2015/07/12 20:08:13 [INFO] scada-client: connect requested (capability: http)
2015/07/12 20:08:47 [INFO] scada-client: auto-joining LAN with [192.168.100.232 192.168.100.233]

メンバー一覧にもきちんと表示されます。

# consul members -rpc-addr=${HOST_IP}:8400
Node   Address               Status  Type    Build  Protocol  DC
host1  192.168.100.231:8301  alive   server  0.5.2  2         dc1
host2  192.168.100.232:8301  alive   server  0.5.2  2         dc1
host3  192.168.100.233:8301  alive   server  0.5.2  2         dc1

3.Registrator導入&登録内容確認

では、次にRegistratorの導入を行い、実際にconsulに登録される内容を見てみます。
■Registrator起動

# docker run -d -v /var/run/docker.sock:/tmp/docker.sock -h registrator --name registrator progrium/registrator consul://${HOST_IP}:8500
# docker ps
CONTAINER ID        IMAGE                  COMMAND                CREATED              STATUS              PORTS               NAMES
bb2639c9710e        progrium/registrator   "/bin/registrator co   About a minute ago   Up About a minute                       registrator

Registratorが起動できたので、適当なコンテナに名前つけてExposeして起動してみます。
どうせですので、SSHを使えるようにしたCentOS7で。
ということで、下記のDockerfileを用意します。
■Dockerfile

FROM centos:centos7

RUN yum -y install passwd openssh-server initscripts

RUN /usr/sbin/sshd-keygen
RUN sed -ri 's/#PermitRootLogin yes/PermitRootLogin yes/g' /etc/ssh/sshd_config
RUN echo 'root:rootroot' | chpasswd

EXPOSE 22

■コンテナビルド&起動

# docker build -t centos7ssh .
※host1で実施
# docker run -p 22 -d --hostname=container1 --name=container1 centos7ssh /usr/sbin/sshd -D
※host2で実施
# docker run -p 22 -d --hostname=container2 --name=container2 centos7ssh /usr/sbin/sshd -D
※host3で実施
# docker run -p 22 -d --hostname=container3 --name=container3 centos7ssh /usr/sbin/sshd -D

こうすると、下記のようにconsulに「centos7ssh」というサービスが追加されて、情報が取得できるようになります。

# curl -s http://${HOST_IP}:8500/v1/catalog/service/centos7ssh | jq .
[
  {
    "ServicePort": 32770, // ★ホストのポート番号★
    "ServiceAddress": "",
    "ServiceTags": null,
    "ServiceName": "centos7ssh",
    "ServiceID": "registrator:container1:22",
    "Address": "192.168.100.231", // ★ホストのIPアドレス★
    "Node": "host1" // ★ホストのホスト名★
  },
  {
    "ServicePort": 32768,
    "ServiceAddress": "",
    "ServiceTags": null,
    "ServiceName": "centos7ssh",
    "ServiceID": "registrator:container2:22",
    "Address": "192.168.100.232",
    "Node": "host2"
  },
  {
    "ServicePort": 32768,
    "ServiceAddress": "",
    "ServiceTags": null,
    "ServiceName": "centos7ssh",
    "ServiceID": "registrator:container3:22",
    "Address": "192.168.100.233",
    "Node": "host3"
  }
]

ただ、ちょっと厄介なこともあります。
上記の結果を見ればわかるように、centos7sshはNodeの値がホストのものとなっています。
つまり、下記のようにホストのサービスとして登録されているわけですね。

# curl -s http://${HOST_IP}:8500/v1/catalog/node/host1 | jq .
{
  "Services": {
    "registrator:container1:22": {
      "Port": 32770,
      "Address": "",
      "Tags": null,
      "Service": "centos7ssh",
      "ID": "registrator:container1:22"
    },
    "consul": {
      "Port": 8300,
      "Address": "",
      "Tags": [],
      "Service": "consul",
      "ID": "consul"
    }
  },
  "Node": {
    "Address": "192.168.100.231",
    "Node": "host1"
  }
}

そのため、「DNSからコンテナの名称を指定して引く」ことができません。
結果、コンテナの名前を指定して接続をするということが出来ず、
コンテナ同士で特定のコンテナを指定して名前解決をするということが出来ないわけですね。

# dig @${HOST_IP} container1.node.consul ANY
※結果が取得できない。

コンテナ同士で通信が出来ればいいポートについても全部Exposeすればいいのかもしれませんが、
それはそれでいまいち取り回しが悪いように思えます。

4.Registrator(internal)登録内容確認

コンテナの名前を指定してDNSを引く方法はないのか、と思って調べると案の定ありました。sttts.github.io

マルチホスト上のコンテナが通信可能なネットワークを構築可能なweaveで、
Registratorに対してPullRequestを投げており、internalという設定が出来るようです。

ですので、一度コンテナを全てkill&rmして、下記のコマンドを試してみます。

# docker run -d -v /var/run/docker.sock:/tmp/docker.sock -h registrator --name registrator progrium/registrator  -internal consul://${HOST_IP}:8500
※host1で実施
# docker run -p 22 -d --hostname=container1 --name=container1 centos7ssh /usr/sbin/sshd -D
※host2で実施
# docker run -p 22 -d --hostname=container2 --name=container2 centos7ssh /usr/sbin/sshd -D
※host3で実施
# docker run -p 22 -d --hostname=container3 --name=container3 centos7ssh /usr/sbin/sshd -D

すると、今度はNodeとしてコンテナが登録されています。

# curl -s http://${HOST_IP}:8500/v1/catalog/nodes | jq .
[
  {
    "Address": "172.17.0.11",
    "Node": "container1"
  },
  {
    "Address": "172.17.0.9",
    "Node": "container2"
  },
  {
    "Address": "172.17.0.9",
    "Node": "container3"
  },
  {
    "Address": "192.168.100.231",
    "Node": "host1"
  },
  {
    "Address": "192.168.100.232",
    "Node": "host2"
  },
  {
    "Address": "192.168.100.233",
    "Node": "host3"
  }
]

そのため、下記のように「コンテナの名前を指定して、consul DNSからコンテナのアドレスを取得する」ことが可能になります。
#今はマルチホストのコンテナネットワーク化していないため、IPアドレス被っていたりはしていますが。
これで、「コンテナ同士が名前を引いてIPアドレスを特定できる」ということが可能になるわけですね。

# dig @${HOST_IP} container1.node.consul ANY
container1.node.consul. 0       IN      A       172.17.0.11

StormやZookeeper、Kafka等、クラスタの構成プロセス同士がノード単位で名前を指定して通信する・・
というパターンでConsul DNSを用いて上手く名前解決できそうな感じです。

実際全て構築するのはまた大変そうですが、出来そうな見込みは立った、という感じでしょうか。


尚、参考までに、Serviceのポート番号はContainerのものが用いられているようです。

# curl -s http://${HOST_IP}:8500/v1/catalog/service/centos7ssh | jq .
[
  {
    "ServicePort": 22, // ★Containerのポート番号★
    "ServiceAddress": "",
    "ServiceTags": [],
    "ServiceName": "centos7ssh",
    "ServiceID": "container1:container1:22",
    "Address": "172.17.0.11", // ★ContainerのIPアドレス★
    "Node": "container1" // ★Containerのホスト名★
  },
  {
    "ServicePort": 22,
    "ServiceAddress": "",
    "ServiceTags": [],
    "ServiceName": "centos7ssh",
    "ServiceID": "container2:container2:22",
    "Address": "172.17.0.9",
    "Node": "container2"
  },
  {
    "ServicePort": 22,
    "ServiceAddress": "",
    "ServiceTags": [],
    "ServiceName": "centos7ssh",
    "ServiceID": "container3:container3:22",
    "Address": "172.17.0.9",
    "Node": "container3"
  }
]

Twitter Heronの論文でのStormの問題点は?(詳細

こんにちは。

前回あーいう形でサマリ記事を読んだので、次は実際に論文を読んでみるしかないだろう・・
ということで、Twitter Heronの論文を読んでみました。
今回は前半の、Stormの問題点を記述した個所と、設計検討結果についてです。

Twitter Heron

ただ、全文そのまま訳しているのではなく、読みながらそれなりに意訳や切り捨ては入っています。

Abstract

Stormは長い間Twitterにおけるリアルタイム解析のメイン基盤だった。
ただ、Twitterでのリアルタイム処理データ量が多様性の増加やユースケースの追加に伴って増加した結果、Stormの制約も多く明らかになった。
Twitterには共有インフラ上で動作する、よりスケールし、デバッグしやすく、性能が高く、管理しやすい基盤が必要になった。

これらの事情を受けてTwitter内でいくつかのオプションを基に検討した結果、新たなリアルタイムストリーム処理を構築する必要があると結論付けた。
本論文ではHeronと呼ばれるこの新システムの設計と実装について説明している。
Heronは現在Twitterでのストリーム処理のデファクトとなっている。

実際にHeronを開発し、運用してから結果を基に本論文を記述している。
また、併せてHeronの効率性とスケーラビリティを実証した証拠も併せて示している。

1. INTRODUCTION

Twitterでは他の組織と同じようにリアルタイムストリーム処理に多くを頼っている。
例えば、アクティブユーザのカウントや、ツイートと広告の関連性の算出など。
何年かの間はTwitterではStormをリアルタイムストリーム処理基盤として使用していた。

だが、Twitterでは共有インフラ上で他のデータサービスとリソースを共有する点や、スケーラビリティ、デバッグの容易性、管理のしやすさなど、より求められるものが増えてきた。

特に、作業の生産性で大きな課題となったのはデバッグの容易性だった。
Topologyが誤動作した場合-原因はコードの誤り、ハードウェア障害、負荷変動など様々あるのですが―、Stormでは複数コンポーネントの動作が1プロセス中に集約されているため、問題の解析が困難だった。

そのため、Twitterでは論理的な計算リソースが明確に物理プロセスに対してマッピングされる機構が求められた。
このような明確なマッピングはビジネス的に重要なTopologyにおいてはエラーがどこから発生しているか即特定できるのは非常に重要になる。

また、StormではTopologyを実行するために専用のハードの割り当てを必要とする。
このような構成はクラスタとしてのリソースを利用する際には効率が悪く、オンデマンドなスケーラビリティに対しても悪影響が大きい。
Twitterではメジャーなクラスタスケジューリングソフトを使用して、リアルタイムストリーム処理に限らない、様々なデータサービスに対して柔軟にリソースを割り当てることが必要だった。
Twitterの内部的にはメインのクラスタスケジューラとしてはAurora Schedulerを使用していた。

Stormでは新しいTopologyをプロビジョニングするには実質的に新しい隔離されたマシンリソースが必要となり、逆にそのTopologyが必要なくなった場合、割り当てられたマシンリソースは再度割り当てが行われるまでは利用されない状態となる。
このような事情から、マシンのプロビジョニングを管理することが面倒になる。
更に、TwitterのスケールにおいてはStormのスケーラビリティとリソース効率の悪さからリソースを使う側の生産性が悪化することも明確になっており、その意味でも現状を改善する必要が出てきた。

ただ、Twitterでは既に多くのアプリケーションがStormを用いて記述されており、それらのアプリケーションを更新する必要があるような改修は取ることが出来なかった。
また、ストリーム処理とバッチ処理を融合するSummingBirdのAPIとの互換性も必要だった。

様々なオプションを検討調査した後、Twitterではこれまで記述した設計目標を満たすために新たなストリーム処理システムを設計する必要があると結論付けた。
この結果開発された新しいシステムはHeronと呼ばれている。
HeronではStormとAPIの互換性があるため、StormのユーザはHeronに容易に移行することが出来る。
Twitterでは既にStormは使用しておらず、Heronがストリーム処理の基本となっている。
Heronを利用することでStormと比してパフォーマンスが大幅に向上し、よりリソース消費が低減された上に、デバッグの容易性、スケーラビリティ、管理性で大きな利点を享受している。

2. Related Work

ストリームデータ処理システムへの研究は10年ほど前に始まり、最近スケーラブルなストリーム処理へのニーズがより高まったため、実際の製品やOSSとして多くのものが出てきた。
最近ではストリーム処理は企業活動に汎用的に必要になるという流れになり、従来のデータベース製品と統合されたものも出てきている。

3. Motivation for Heron

StormはTwitterにおいて何年かリアルタイム解析の基盤を提供していました。
ただ、Twitterの規模でStormを運用すると後述する制約があることも明確になっている。
この制約を脱するためにHeronを開発した。

3.1 Storm Background

StormTopologyはSpout/Boltのグラフとして示せる。
Spoutはデータのインプットを、BoltはStreamに対する処理を示す。
SpoutはしばしばKesterlやKafkaのようなキューからデータを取得し、Boltに対するStreamを生成する。Boltはそれらを取得し定義された処理を実行する。

例として、リアルタイムアクティブユーザ(RTAC)をカウントするTopologyは下図のようになる。
f:id:kimutansk:20150712085359j:plain
Spout/BoltはTaskとして実行され、これらのTaskはExecutor上にグルーピングされる。Executorもまたグルーピングされ、1Workerプロセス中に複数存在する。
Workerプロセスは1JVMプロセスとして下図のように実行される。
f:id:kimutansk:20150712085453j:plain
1ホストは複数のWorkerプロセスを実行するが、各Workerプロセスは異なるTopologyに所属することもある。

3.2 Storm Worker Architecture: Limitations

これまで示されたように、StormのWorkerプロセスは複雑な構成となっている。
複数のプロセスがHost上で実行される。
JVMプロセス内部では各Executorは2スレッドにマッピングされる。
これらのスレッドはJVMの優先度ベースのスケジューリングアルゴリズムで実行される。
各スレッドがいくつかのTaskを実行する必要があるため、Executorは受信データに基づいてどのTaskを実行するかを判断するための別のスケジューリングアルゴリズムを導入している。
このような複数のスケジューリングとその相互作用は多くの場合、Task実行中の不確実性につながる。

加えて、各Workerプロセスが複数のTaskを実行するという複雑性もある。
例えば、KafkaSpout、Twitterの内部サービスとデータの結合を行うBolt、KVSにデータを出力するBoltのような異なるBoltが同一JVM上で実行される可能性がある。
このようなシナリオにおいてはこれらのリソース使用量を分離することは不可能となるので、特定のタスクの性能について推測することが困難とな
結果、Topology全体の動作が不安定になった際のまず取るべき手段はTopologyの再起動、ということとなる。
再起動後、上手く動作しないTaskが発生した場合、問題が発生していること自体はわかるが、根本原因を突き止めるのは他のTaskと同時に動作している関係上困難となる。

複数のタスクのログが1ファイルに出力されているため、特定のタスクに関連付けられたエラーまたは例外を特定するのは困難となる。
いくつかのタスクが他のタスクに対してより多くログを出力するようなケースにおいてはより状況は悪化する。
また、あるタスクでハンドリングされなかった例外はそれによってWorkerをダウンさせるため、同一Worker上で影響を受けていないタスクまで影響を与えてしまう。
このように、Topologyの一部でエラーが発生した際に全体に影響を与える作りとなっている。
また、異なるタスクが発生させたGC関連の問題を追うのもまた困難。

リソース割り当てにおいて、Stormは全てのWorkerプロセスが均質であることを前提にしている。
このアーキテクチャの仮定の結果、しばしばリソースの非効率やオーバープロビジョニングが発生する。例えば、3Spoutと1Boltを2Workerプロセスにスケジューリングすることを検討してほしい。
Boltが10GB、Spoutが5GBのメモリを使用する場合、最大のパターンでは1Boltと1Spoutを処理する構成となるため、1Workerあたり15GBのメモリを確保する。
だが、実際必要になるメモリは25GBのため、全体で5GBの無駄が発生する。
この問題は、Workerが多様な構成要素を含むにつれて状況は悪化する。特に、SummingBirdのような高レベルの抽象化を利用して複雑なTopologyを生成する時に問題は顕著に表れる。

Workerに大きなメモリを割り振った結果、jstackまたはヒープダンプなどの一般的なプロファイリングツールの利用が面倒になる。
Workerはヒープダンプを取得している間HeartBeatが停止するため、HeartBeatがタイムアウトし、ヒープダンプの取得が終わらないというケースが発生する。
上記のようなデバッグにおいて、Workerが大きいケースは困難となる。

ここで出てくる質問は、どうすればStormを1タスク1Workerに再構成することが出来るか、というものとなる。
Twitterではこのオプションを検討したが、このアプローチはリソース使用量に大きな非効率を発生させ、かつ求める並列度も制限されることが問題となった。
このアプローチを取るとTopology毎のWorkerプロセスの数が増大する。
ただ、Storm自体はWorkerプロセス間は均質という前提があるため、齟齬が発生する。
このモデルでは、各Workerのためのメモリ量として下記の値を確保する必要がある。
f:id:kimutansk:20150712122526j:plain

この値は最適で理想的な利用状況よりはるかに大きくなることがある。
3Spout+1Boltの先ほどの例を参照すると、各Workerの合計必要メモリ量は40GBとなり、最低限必要な25GBを大きく上回る。
加えて、Workerプロセスは各タスクの並列度に依って増加し、個々のWorkerが通信のためにポートを必要とする上にWorker間が互いに接続する必要があるため、それによってスケーラビリティに課題が発生する。

StormのWorkerはタスクとWorker本体のデータのやり取りを行う複数のスレッドとキューを保持している。
各Workerプロセスのグローバル受信スレッドは「上流」のWorkerからデータを受信し、グローバル送信スレッドは「下流」のWorkerへのデータ送信を担当している。
これらのグローバルスレッドに加えて各ExecutorはTopologyに定義されたSpout/Boltを実行するユーザスレッド、および各グローバル送信スレッドにユーザスレッドの出力を渡すローカル送信スレッドで構成されている。
従って、Stormでは各TupleはWorker内に入ってから送信されるまで4つのスレッドを通過する必要がある。(グローバル受信/グローバル送信/ユーザスレッド/ローカル通信)
この設計は大きなオーバーヘッドとキュー競合の問題につながる。

3.3 Issues with the Storm Nimbus

Nimbusはスケジューリング、監視を含むいくつかの機能と、Jarの配布を行っている。
また、併せてシステムのメトリクスコンポーネントも提供しており、いくつかのTopologyのカウンタを管理している。
従って、Nimnusは機能的に多くのことを担当しており、ボトルネックとなりやすい構成になっている。

第一に、NimbusScheculerはWorkerのための細かいレベルでのリソース予約およびリソース隔離をサポートしていない。
そのため、同じマシン上で実行している別のTopologyに属するWorkerプロセス同士が互いに干渉することがある。
このような状況は追跡不可能なパフォーマンスの問題を引き起こすことがある。
この問題を軽減するために、我々は1マシン上では1Topologyのみを実行するという形で隔離し、対処を行った。
だが、Topologyは割り当てられたハードウェアリソースを使いきることは困難である上に、このアプローチはリソースの浪費につながる。
YARNでStormを実行した場合であってもこの問題は解決しきれない。

第二に、StormはHeartBeatでZookeeperを使用している。
1Topologyに多くのWorkerが所属している状態だとZookeeperがボトルネックとなり、TopologyあたりのWorker数、およびクラスタ内の合計Topology数のスケーラビリティに課題が生じる。
この問題に対処するためにTwitterでは暫定的にHeartBeatのDaemonを開発し、そちらにHeartBeatのトラフィックをバイパスするようにしていた。
しかし、その暫定対処はHostとHeartBeatDaemonを個別にモニタリングする必要があり、運用の負担を増大させた。

最後に、Nimbusが単一障害点となっている。
Nimbusがダウンすると新たなTopologyのデプロイや既存Topologyの終了が出来ず、障害が発生したTopologyの検出、復旧も出来なくなる問題があった。

3.4 Lack of Backpressure

StormはBackPressureの機構を保持していない。
受信側のタスクが受信データ/Tupleを処理できない場合、送信側は単にTupleを削除することになってしまう。
これはFail-Fast機構とシンプルな対応ではあるが、下記のような欠点がある。

  1. Ack無効時にSpoutから取得するTupleの数を制御できずに溢れるケースが発生する。
  2. 上流コンポーネントの処理結果が失われる。
  3. システムの動作が予測しにくくなる。

3.5 Efficiency

運用環境においてはTupleの失敗、Tupleの再実行、および実行の遅れにつながるようなTopologyの実行時に性能的に予測不可能になる要素が存在する。
(データの取得速度がTopologyの処理速度を上回る場合)
このような状況において、下記のような課題が存在する。

Tupleの再実行

Tupleツリーの末端で障害が発生した場合であってもTupleツリー全てが失敗として扱われてしまう。これはTask数が多かったり、複数のTupleに分割して処理するようなTopologyの場合大きな課題となる。

GC時間の増大

データ取得速度がTopologyの処理速度を上回る場合、Tupleが大量にプロセス内に蓄積されるため、大容量メモリを確保したWorkerプロセスでGCが発生した場合、分単位の時間がかかることもある。結果、HeartBeatが失われてプロセスがkillされる。

キューの競合

通信用のキューにおいて競合がしばしば発生する。特に、Workerプロセス中で複数のExecutorが動作している場合はより顕著に表れる。

これまで述べたパフォーマンスが予測しにくく、問題が発生することに対して、Twitterは多くの場合余剰のリソースを割り振って対処していた。
あるTopologyでは600コアを平均20~30%使用する、という効率が悪い状態になっていた。Stormの性能上の課題が無ければ、本来150コア程で運用できていたはず。
加えて、これと同様の処理を最適化した状況で実行した所、75Coreでも十分賄える状況であった。実際には600Coreで運用していたことから、8倍もの余分なリソースを使用してしまっている。

4. Design Alternatives

これまでの結果から、Twitterでは下記の3つのオプションが検討された。

  1. Stormを拡張する。
  2. 既存のOSSでニーズを満たすものを採用する。
  3. 新たなストリーム処理基盤を開発する。

前節で説明した問題を解消するためにはStormの基本を書き変える必要があるため、コアコンポーネントの大規模な書き換えを必要とした。
だが、概要レベルでStormはデータをキューの束の上に載せる構成となっており、この基本的なアーキテクチャのブロックを変更することは困難だった。
このように、根幹から更新する形で既存のシステムを変更する場合、柔軟性も失われるうえ、長い開発期間も想定される。

次のオプションはApache SamzaやSpark Streamingのような既存のOSSプロダクトを採用することだった。
しかし、これらのシステムはTwitterの規模に適合するには多くの問題があった。
また、これらのシステムはStormのAPIと互換性が無かった。
別のAPIを利用して既存のTopologyを書き変える場合、移行に長い時間がかかり、移行期間に問題が発生する可能性も高い。

また、既にSummingBirdなどStormのAPIの上に開発されている、異なるライブラリの存在もある。
Twitterではストリーム処理基盤のAPIを変更するということとはスタック内の他のコンポーネントも全て変更するということを意味する。

そのため、Twitterでは最適の選択は新たなストリーム処理基盤を開発する、という結論になった。

=====
Stormの問題について詳細を記述してみました。
実際、Stormにはこれらの問題があるわけでして、こういった問題点への課題認識はReactive Streamsや、Heronに確実に受け継がれていると思います。

ちなみに、Stormのコアを書き変えるのに躊躇した理由としてClojureで開発されているということと、リードエンジニアが既にいない、ということも少なからず影響しているようには思えます。
Clojureは処理の流れを把握するレベルで読んだり、小さなプログラムを書くだけならそう難しくないですし、実際私でも出来ます。
ただ、あれを使ってそれなりの規模を持つOSSを記述するとなると専門家がいないと中々厳しいと思います・・・

で、今回は前半ですが、次回は後半、Heronのアーキテクチャについて記述してみます。
Heronの次回投稿までには多分違う投稿も挟まると思いますが、まぁそれはそれで^^;

Twitter Heronの論文でのStormの問題とHeronの利点は?(サマリ

こんにちは。

前回TwitterBlogのHeronの記事を読み込んでみたので、
次は論文を読むか、とはりきってみた所、有料だったので撃沈した今日この頃です。
この後開発が進んでいくことを考えると今買って読んでしまうか悩みますね・・

と思っていた所、下記のPaperを読んだ結果のサマリが投稿されているサイトが見つかったので、
実際論文読むかの参考という意味でも読んでみます。

blog.acolyer.org

ただ、そのまま挙げているわけではなく、Nathanさんのブログの記事云々とか等、
一部省略している所もあります。

1. Twitterでは既にStormを使用していない。

Twitterでは既にStormは使用しておらず、Heronがストリーム処理の基本となっている。
ここ数カ月で既に数百のTopologyを複数のデータセンターで運用している。
=====
このあたりは、さすがStormとAPI互換を保った成果が出ている、という感じですね。
=====

何十TBのデータを処理し、何十億の出力を行っているが、
スループット、レイテンシの大幅が改善がされた上で、CPUリソースの利用率は削減できている。

Heronを開発するにあたってStormを改造して使用するか、の検討も行われたが、
Stormの限界も下記のように明らかになり、打開するにはコア部分から作り直す必要があった。

  1. Taskのスケジューリングが複数のメカニズムによって成り立っており、複雑かつ予測しにくい。
  2. 各WorkerプロセスはTaskを複数個、複数種類保持しており、各Task毎のパフォーマンスやリソース使用量を切り分けることが困難。
  3. ログファイルに各Taskのログが混在して出力されるため、特定のTaskのエラーを追う際に他のTaskのログに埋もれて追いにくい。
  4. 1Taskで例外がハンドリングされなかった場合、Workerプロセスごと死んでしまう。
  5. 各Workerプロセスは全て均質なものとして扱っているため、存在するリソースに応じた最適化が困難で、しばしばリソース超過が発生する。
  6. Workerプロセスに多くのメモリが割り当てられているため、プロファイリングツールを使うのが困難。実際使った場合、HeartBeatが停止することでSupervisorによってWorkerプロセスが殺されてプロファイル結果を取得できないケースが多々ある。
  7. 上記の事情を受けて仮にStormでWorkerプロセスが1Taskを実行するように再設計した場合、リソースの非効率や並列度の確保の困難が発生する。
  8. 各Tupleがプロセス内の処理を完了するためには4スレッドを通過しなければならず、オーバーヘッドや競合の問題を発生させる。
  9. Nimbusが機能的に過負荷に陥りやすく、運用における大きなボトルネックとなる。
  10. あるマシン上で複数の異なるTopologyに所属するWorkerプロセスが動作しており、個々のTopologyのリソース解析を困難にする上に、互いにリソースの干渉を発生させていた。Twitterではその問題に対処するために1マシンには1Topologyのみがデプロイされるようにしていたが、これは当然非常に効率が悪い。
  11. Nimbusが単一障害点となっている。Nimbusがダウンすると新たなTopologyのデプロイや既存Topologyの終了が出来ず、障害が発生したTopologyの検出、復旧も出来なくなる。
  12. BackPressureの機構を保持していない。結果、Ack無効時にSpoutから取得するTupleの数を制御できずに溢れるケースが発生する。結果上流コンポーネントの処理が不明確な状態になったり、無効になってしまうことがあった。
  13. Tupleツリーの末端で障害が発生した場合であってもTupleツリー全てが失敗として扱われてしまう。
  14. 大容量メモリを確保したWorkerプロセスでGCが発生した場合、分単位の時間がかかることもある。(結果、HeartBeatが失われてkillされる。)
  15. 通信用のキューにおいて競合がしばしば発生する。特に、Workerプロセス中で複数のExecutorが動作している場合はより顕著に表れる。
  16. これまで述べたパフォーマンスが予測しにくく、問題が発生することに対して、Twitterは多くの場合余剰のリソースを割り振って対処していた。あるTopologyでは600コアを平均20~30%使用する、という効率が悪い状態になっていた。Stormの性能上の課題が無ければ、本来150コア程で運用できていたはず。

これらのニーズへの対応のため、Twitterは高性能/スケーラブルで共有のインフラ上で動作するストリーム処理基盤を求めていた。
=====
いや、いい感じにボコボコにされてますね(汗)。Storm。
まぁ、大部分はその通りなんですが、WorkerプロセスでGCに分単位の時間がかかるようなメモリ量を割り振る等、明らかにこれ最初の前提がおかしいだろ、というのもあるのでちと微妙な感じですね。
=====

2. Heron入門

Heronはコンテナベースの実装となっている。
Heron TopologyはMesos上のAurora Scheduler(Apache Aurora)で動作しているが、YARNやAmazon ECS上でも同様にデプロイすることが出来た。

Auroraのような外部のスケジューラを使用するようにしたのはStormでNimbusがスケジューリングを行っていた状況からの脱却のため。
Twitterの内製スケジューラのAuroraやその他のオープンソースのスケジューラ(YARN等)が洗練されてきているため、自前でスケジューラを作るのではなく、これらを利用して自分達は別のものを開発した方が有効だと判断した。

Heronでは1コンテナ上で下記のように複数のプロセスが起動している。
f:id:kimutansk:20150705221358p:plain

1コンテナ目はTopology Masterと呼ばれるプロセスを起動する。
残りのコンテナは各自がデータのルーティングを行うStreamManager、メトリクスの収集/レポートを行うMetricsManagerといくつかのHeron instance(Spout/Bolt等ユーザロジックを実行)を起動する。
複数のコンテナが1物理ホスト上に割り振られる。これらはAuroraベースのスケジューラで配分され、クラスタのリソースをより有効活用できる配置となる。(Twitterにおいて、AuroraはコンテナをLinux cgroupsで実現)
Topology Masterは待機系のプロセスも起動可能になっており、これによって耐障害性を高めている。
Topologyのメタデータ(起動ユーザ、稼働時刻、プロセス配置等)はZookeeperに保存される。

Stream ManagerはTupleのルーティングを担当する。
あるTopologyに所属するStreamManagerは相互に接続しており、 O(k2)の接続を確立している。(K=Stream Managerプロセス数=コンテナ数)
コンテナ中のHeron Instanceは同一コンテナのStream Managerと接続している。Heron Instance(n個)はStream Manager(k個)よりも数は多い。
そのため、O(n2)の論理的な接続がO(k2)の物理的な接続の上に多重化されて乗っている形になる。

HeronはBackPressureの機構を保持している。Stream Managerが同一コンテナ上に存在するSpoutのデータ取得量を制御することによって実現している。
BackPressureはバッファサイズが最高水準(High Water Mark)に達した時点で発動し、バッファサイズが最低水準(Low Water Mark)になるまで継続する。
この設計とした理由としては、BackPressureの緩和に伴う極端なデータ出力/入力から来る急速なブレからTopologyを守るためとなっている。

Heron InstanceはSpout/Boltの処理を担当する。
各Instanceが1JVMプロセスとなっており、各Heron InstanceはGatewayスレッドとExecutionスレッドの2スレッドを保持している。
GatewayスレッドはHeron Instanceの全データ入出力を担当している。
GatewayスレッドがStream ManagerやMetrics ManagerとのTCP接続を維持し、Stream Managerから受信したTupleをExecutorに渡し、結果を受け取ってStream Managerに送信する役割を持っている。

=====
コンテナと形で区切られてはいるものの、JVMプロセスの数自体はStorm時代と比べてかなり多くなっているようです。
ただ、1プロセス1Taskとなっているのでどこに問題があるか、等もわかりやすくはなるとは思いますが・・・ 実際にリソース効率はどういいんでしょうかね。

=====

3. HeronがStormより優れている点は?

これらの構造を基に組み上げられたHeronがStormより優れている点としては、下記が挙げられる。

  1. リソースのプロビジョニングはクラスタマネージャのレイヤで抽象化され、共有インフラを有効に活用することができる。
  2. 各Heron Instanceが1Taskしか実行しないため、デバッグが容易になった。
  3. メトリクス収集の単位がSpout/Boltと合致したことによって、システム内で発生した障害や性能問題に対して透過的に扱い、解析をすることが可能になった。
  4. HeronはTopologyの1Task単位で必要なリソースを指定可能となっているため、OverProvisioningが発生しない。
  5. 各TopologyのTopologyMasterが独立したことで、特定のTopologyの障害が他のTopologyに対して一切波及しなくなった。
  6. BackPressureの機構により、一定の処理速度と、性能の予測が立てられるようになった。また、BackPressureはTopologyのコンテナ群を別のコンテナ群にマイグレーションする際のキー技術にもなっている。
  7. 単一障害点が無くなった。

論文中においても、どの指標においてもHeronはStormを上回っていた。


・・・というわけで、記事を読んでみました。

こっそりBackPressureがコンテナのマイグレーションにもつながっていることが書かれていますね。
動作中の更新などもより動的に行えるようになるのかもしれません。
単に性能的な面だけでなく、そういった扱いやすさの面でも改善はされてそうですので、楽しみですね。

Twitter HeronはStormに比べてどう進化しているのか?

こんにちは。

今月頭、TwitterがHeronという新しいリアルタイム解析基盤について発表していました。
読んでみると、StormとAPIの互換性を保ったまま新しいHeronというリアルタイム解析基盤を開発したそうな。blog.twitter.com


ですので、一度Heronの記事を読んでまとめて、Stormと比較しておこうと思います。
StormもOSS化されて4年近く経過し、ストリーム処理プロダクトも世代交代の時期に来ているようですので、その意味でのまとめとしても。

その前に、そもそもStormって?

2011年にTwitterOSS化した耐障害性を持つ分散ストリーム処理基盤です。
どういうものかは下記あたりの資料を読むのが私が何か下手に書くよりわかりやすいと思います^^;
初めて広く広まったストリーム処理基盤のOSSで、その分野の走りだったのではないか、と考えています。

www.slideshare.net
www.slideshare.net

Stormの課題は?

StormがOSS化されたのは2011年のため、様々な所で使用され、問題も多く発生してきました。
いくつかの発表(下記)や、私自身が使った感覚からすると、下記のような問題があります。

www.slideshare.net
www.slideshare.net

1. 上流のコンポーネントの処理性能の方が下流より高い場合、溢れる。

StormにおいてSpout/Boltは完全に非同期で、かつ常時全速で動作し続けます。
結果、上流に存在するSpoutの方が処理性能が高かった場合、その差分がどんどんプロセスの中に溜まり続け、結果溢れてプロセスが飛ぶ・・ということが発生します。
ですので、上流のコンポーネントより下流のコンポーネントの方が処理量が多くなるようにチューニングする必要がありました。

2. メッセージの基本配分方式がラウンドロビンのため、効率が悪い。

Stormにおいて、メッセージの基本配分パターンは下流のコンポーネントに対してラウンドロビンで配分する、というものです。
これは一見上手く分散するように見えて、プロセス間通信によるロスを考慮していないため、効率が悪い。
グルーピング定義の調整とコンポーネントのスレッドの数のチューニングでプロセス間通信を削減することは可能ですが、逆に言うとそれだけ中身がわかっていないと性能は引き出しにくいわけですね。

3. 状態管理等に負荷が集中する個所があり、スケーラビリティに課題がある。

Stormは処理の統計情報や生存情報をZooKeeperに保存します。
ですが、その頻度と量が多いため、クラスタの規模が大きくなった場合にZooKeeperをあふれさせるということが良くあります。
また、起動時にNimbusという管理サーバからアプリケーションを取得するのですが、アプリケーションのファイルサイズやクラスタの規模によってはその取得に時間がかかり、アプリケーションの起動に時間がかかったり、タイムアウトすることがあります。
#特に、Stormクラスタ上で依存性の問題を除去するためにアプリケーションをFatJarにしておくと発生しやすい。

4. デバッグが大変

これは分散システムだったら何でもそうだろ、といえば実際そうなのですが、Stormでも大変でした。
ログが複数のサーバに配分されている上に、StormはWorkerプロセスが死んだ場合に自動で復旧させるので、問題があるようなんだけどプロセスは問題なく動いているように見える・・・という状況が多々ありました。
一応、Workerプロセスにリモートデバッグをかけるという力技もやってやれないことはないのですが、それでも厄介なことには変わりありません。

・・尚、ここまで一切Heronの記事については考慮せずに書いています。
ですので、リアルなStormの課題を書くことができたかと。
で、ようやくこれからがTwitter Heronです。

Twitter Heronとは?

意義とアプローチ

ストリーム処理基盤は大容量のデータを常時解析するのに有用で、下記のような性質が求められる。

  1. 数十億件/分のイベントを処理できること
  2. 秒未満のレイテンシと、スケール時に挙動が予測できる構成
  3. 障害発生時にハンドリングが容易
  4. スパイク発生や特定個所で詰まった場合に耐えられること
  5. デバッグが容易
  6. シンプルなデプロイ方式

上記のような性質を満たすシステムを構築するために、TwitterではStormの拡張や他OSSの利用も含めて検討を行ったが、結論としては新たな基盤の開発を採用した。
理由としては、Twitterで求めていた上記のような要求に対して、Stormのコア部分が追随できなくなっていたことと、他OSSもスケーラビリティ/スループット/レイテンシの面で見合わなかったため。

だが、StormのAPIと互換性を保てない基盤を開発してしまうと、既に開発済みのTopologyもマイグレーションが必要になり、そもそもモデルも大きく変更されてしまう。
そのため、TwitterではStormのAPIと互換性を保った新たなストリーム基盤を開発する方針となった。

=====
Stormのコア部分が要求に追従しきれなくなった、はわかりますが、Twitterであっても既存のTopologyの移行には課題が大きいというのが意外でした。

Heronの概要

Heronを開発するにあたって、Twitterでの目標は下記だった。

  1. パフォーマンスの予測精度の向上
  2. 開発効率の向上
  3. 管理の容易性

Heronのアーキテクチャは下図の通りとなる。
ユーザはTopologyを開発すると、StormのAPIを用いてSchedulerにSubmitを行う。
Schedulerは複数のContainerから構成されるプロセスとして各Topologyを実行する。
ContainerのうちいずれかはTopology ManagerとしてTopologyの管理を行う。
残りのContainerは各自がデータのルーティングを行うStreamManager、メトリクスの収集/レポートと「Heron instances」と呼ばれるSpout/Boltを実行するプロセッサの数を把握するMetricsManagerを実行する。
これらのContainerはクラスタ内のノードのリソース状況に応じてSchedulerによって配分される。
Topologyの物理配置状況や実行構成詳細といったメタデータはZooKeeper上で管理されている。

f:id:kimutansk:20150628160402p:plain
f:id:kimutansk:20150628160417p:plain

=====
メタデータ、という形でZooKeeperに保存する情報を絞ったように見えます。これでZooKeeper死亡、は起こりにくくなる・・?
=====

Heronで特筆すべき特徴としては下記がある。

リソースマネージャ非依存

Schedulerを抽象化することにより、アダプタを記述することでMesos、YARN、またはそれ以外の個別スケジューリングフレームワークの上で実行することが可能になっている。

スパイクや混雑への対応として、BackPressureの機能を保持

Heronではスパイクや特定のコンポーネントが詰まった場合への対応として、BackPressureの機能を保持している。
処理できる量のデータのみを上流コンポーネントに要求することで、変動に対応可能となっている。
=====
Reactive Streamsで出たBack Pressureですが、さらっと取り込まれているあたりはさすがではありますね。上流のコンポーネントが性能高くて溢れる、ということもこれで防げそうです。
=====

デバッグの容易性

各Task(Spout/Boltにマッピングされるもの?)は各プロセス内で全て動作し、1プロセス内で解析が可能となっている。
結果、動作の把握や性能の解析が容易になっている。
加えて、TopologyのUIで下図のようなメトリクスを見ることも可能になっている。
f:id:kimutansk:20150628163257p:plain
f:id:kimutansk:20150628163303p:plain
=====
プロセスに各Taskを全て保持することで、1プロセス内で処理が完結するルートを設けたということですね。
これはデバッグが容易になる他にもプロセス間通信を削減できるので性能向上に大きく寄与しそうです。

=====

Stormとの互換性

HeronはStormと完全な互換性を保っている。
そのため、Storm上で開発したTopologyをコード修正なしでHeron上に移行することが可能になっている。

スケーラビリティと低遅延

Heronは大規模なTopologyにおいて、高いスループットと低レイテンシを両立している。
それにより、システムはより大規模なTopologyを扱えるようになっている。

Heronの性能

Heronの性能を確認するために、2013年10月の時点のOSS版Stormを用いてWordCountTopologyを用いて比較を行った。
150万のWordをカウントし、Ackを有効化した状態で性能比較結果は下記のようになった。

f:id:kimutansk:20150628164653p:plain

  • レイテンシ

f:id:kimutansk:20150628164702p:plain

スループットの図からわかる通り、StormもHeronも並列度の追加に応じてほぼ線形に性能が向上している。
だが、Heronの方が全検証パターンにおいて10~14倍のスループットを誇っていた。
同様に、レイテンシについてもHeronはStormの5~15分の1のレイテンシで処理を行うことが出来た。

=====
2013年10月ということはその時点だとStormの通信はデフォルトZeroMQで行われています。
2014年3月の時点でデフォルトがNettyに差し替えられて性能が跳ね上がっているのですが、それを入れ込んでいないのはちとずるい比較ですかね。
Making Storm fly with Netty | Yahoo Engineering
とはいえ、Nettyを適用し、その上でチューニングを行ったStormと比べても性能が高い、というのは確かでしょう。
ただ、倍率自体は後で記述されているハードの削減効果にある3倍位に見ておくのが無難なように思えます。

=====

TwitterでのHeronの使用

Twitterにおいて、Heronは既にメインのストリーム処理基盤として使用されている。
100以上のTopologyがHeron上で動作し、ハードウェアを3分の1程に削減することが出来た。
結果、リソースを有効に活用できている。

まとめ

とりあえずStormの課題を挙げてみてからHeronの記事を再度読んでみましたが、見事にStormの課題に対応されているように見えます。
現状論文による発表と、Twitter内部での使用のみですが、OSSとして公開されるのが楽しみですね。

とはいえ、現状Heronの公開について具体的な話が出ているわけではなく、Stormのコミュニティにおいて、Heronの開発成果をフィードバックしてほしい、という話が出ている位のようです。
ただ、もし公開されれば、と考えると非常に楽しみですね。