読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

ストリームデータ処理サービスAmazon Kinesisについて調べた結果

こんにちは。

最近ストリームデータ処理サービスであるAmazon Kinesisのドキュメントを読んだり、
クライアントコードのソースを読んだり、実際の小さいアプリケーションを作ったりしたのですが、
その際にわかったことをとりあえずまとめておこうと思います。

1.Amazon Kinesisとは?(http://aws.amazon.com/jp/kinesis/)

Amazon Kinesis は、大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービス
・大容量のストリームデータを受信し、提供することができる

2.Kinesisの構造(http://docs.aws.amazon.com/kinesis/latest/dev/introduction.html)

Kinesisの実態は「大規模でスケール可能、メッセージが一定時間保存されるPubSub型キュー」
Kinesisでは複数のStreamを定義し、複数の種別のメッセージを保存、処理することが可能

2-1.Kinesisのコンセプト

・メッセージの生産者、消費者は各々複数用意可能でメッセージも一定期間保存されるため、信頼性の高いサービスが構築可能
Kinesisアーキテクチャは下記の通り。

・使用される用語は下記

Data Record                   : 投入する1件単位のデータを示す
Stream                        : Kinesisにおけるキュー名
Shard                         : Kinesisにおけるキュー内部のパーティション
Partition Key                 : Streamにデータを投入する際にどのShardに配分されるかを決定するキー項目
Sequence Number               : Kinesisにデータを投入した際に割り振られるデータの識別子
Amazon Kinesis Application    : Kinesisからデータを取得して処理する「消費者」
Amazon Kinesis Client Library : Kinesisを利用するクライアントライブラリ。Java製。
Application Name              : 「消費者」の名前。Kinesisクライアントライブラリのテンプレートを利用した場合にこの名前でDynamoDB上に状態管理がされ、Amazon CloudWatch metricsで確認が可能。
Producers                     : Kinesisにデータを投入する「生産者」

2-2.Amazon KinesisAPI

Kinesis Client Libraryが提供するAPIとして、下記のことが可能。
・生API
Kinesisの提供するクライアント(KinesisClient)は下記のAPIを提供する。

CreateStream   : Stream生成
ListStreams    : アカウントに紐づいたStream一覧取得
PutRecord      : Streamに対してRecord投入。Recordあたりの最大サイズは50KB
DescribeStream : Stream中のShard情報取得
GetRecords     : ShardからRecord取得。一度のリクエストで複数のRecordが取得可能
SplitShard     : Shardを2個のShardに分割する。データは親Shardと子Shardで別管理となるため重複は発生しない。
 → つまりは分割を行った場合には親Shardからデータを全て取得したことを確認した上で子Shardに対する処理を行う必要がある。
MargeShards    : 2個のShardをマージする。データ管理はSplitと同じ。
DeleteStream   : Streamを削除

・アプリケーションテンプレート(IRecordProcessor)
KinesisにおいてはIRecordProcessorという形でテンプレートが用意されており、これを継承することで
Kinesisアプリケーションの実装量を大幅に削減することが可能。
 → 実質的には「初期化処理」「Kinesisからレコードを受信した時の処理」「終了処理」の3メソッド実装ですんでしまう。

テンプレートを利用したKinesisアプリケーションの情報はDynamoDB上に保存され、CloudWatchでもメトリクス確認が可能となる。
 → そのため、確認していないが当然ながらIAMでDynamoDBを使用可能にする権限を割り振っておかないと使えない?

2-3.Amazon Kinesisの性能

1Shardあたりの性能は下記。
データのスループット自体はReadの方が大きいが、処理可能なリクエスト数はReadの方が少ない。
そのため、Read側は1回のリクエストで複数のRecordを取得することを想定したモデルと思われる。

オペレーション種別 データ量 処理リクエスト数
Write 1MB/s 1000Req/s
Read 2MB/s 5Req/s
2-4.Kinesis Client Libraryのソースを読んでわかったこと

・IRecordProcessorは内部でKinesisClientを叩いているだけであり、Kinesisに対して出来ることは変わらない。
 → IRecordProcessor1スレッドで1Shardに紐づくため、もし仮に複数のShardから受信したデータの待ち合わせなどを行う必要があれば大変。
Kinesis Clientの内部ではAmazonHttpClientにつながっているため、実際にネットワークを飛び交うのはHTTPリクエスト。

3.Kinesisを実際に使ってみてわかったこと/感想

サンプルのアプリケーションを作ってみた結果、わかったこと。
ソースは「https://github.com/kimutansk/storm-example-wordcount」参照。
#尚、過程のソースは捨ててるので、下記の内容が実装に全て含まれているわけではない。

Amazon EC2上からではなく、ローカルのマシンからでもインターネットにつながっていればKinesisとデータの送受信は可能。
 → ただし、ローカルのサーバが全開で処理をすると通信量=料金もかなりのものになり、リアルタイム処理であるがゆえに通信の遅延による影響も大きい。
   実際の運用アプリケーションを配置するならEC2上になるだろう。
Kinesisにアクセスする際に性能以上のアクセスを行うと性能超過例外が返る。
 → そのため、性能超過例外と、後はその他の例外でハンドリングは切り替える必要がある。
Kinesisからメッセージを取得する際は「どのメッセージを処理したか?」ではなく「どのシーケンス番号のメッセージまで処理を行ったか?」を用いて取得する。
 → Sequence NumberをShard上の「位置(ShardIterator)」に変換して使用する。
  → そのため、1件単位で失敗したメッセージを再取得して処理するといったものは不得手(やろうと思えば出来るが、下記の通り性能的な問題あり)
    かつ、ReadRequestの発行可能回数が少ないため、1件単位の取得を繰り返すとあっさりリミットオーバーする。

・単にShardを1個生成し、1スレッドを対応させて処理するなら非常に簡単に実装できる。
 → ドキュメントのソースをコピペして変数名を調整する位で可能。
・ただし、Shardとスレッドの対応を自前で管理する必要があるなど、それ以上のことをやろうとすると実装が増える。
・上記にも書いたがシーケンス番号を指定して取得・・が面倒で性能的にも負荷が大きいため、それを前提にしたモデルにはしないこと。
・いくつか面倒な点はあるものの、データソース自体の障害や構成変更への対応がクラウドというフィルタを介して隠蔽されているため、
 この手の対応がクライアントAPIにも含まれるApache Kafkaに比べるとシンプルなアプリケーションが構築できるのは確か。
 → この手の大規模分散システムだと付きまとう構成変更、障害をAWSが肩代わりしてくれると考えると、利用価値は大きいと思う。

大体わかったことはこんな感じでしょうか。
尚、Amazon KinesisのモデルやAPIApache Kafkaと酷似しているため、次回はKinesisとKafkaの比較とかもまとめてみようとは思います。