並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その3:run概要&データソース概要
こんにちは。
前回で初期化部分の確認が終わったため、今回は本処理の方に入ってきます。
尚、ServiceLoaderでJava製プラグインがロード出来るかについてはとりあえず一通り読んでからの方針で^^;
まず、基本構造としては
上記の図にあるRunnerが起動の起点となり、初期化を行った後に
run/cleanup/guess/previewの各々の処理に分岐する流れとなっています。
まず今回はrun処理の流れを追ってみることにします。
1. run処理の流れ概要
run処理の流れは下記のようになっています。
- 設定ファイル(YAML形式)を読み込む。
- 出力ファイル(次回実行用の設定出力先、Resume状態の出力先)の出力可能確認を行う。
- Resume状態ファイルを読み込む。
- Executorクラスを生成する。
- Resume状態にあわせて処理を実行する。
- 実行失敗した場合、Resume出力先設定がある場合はそこに状態を記録、設定が存在しない場合はTransactionをクリーンアップする。
- 実行成功した場合、Resume設定ファイルを削除する。
- 次回実行用の設定ファイルを出力する。
流れ自体は非常にオードソックスなものですね。
2. run処理を実行するクラス群
次はrun処理を実行するクラス群を確認します。
とりあえず、流れとデータの保存先が大体わかる感じまで見てみると、下記のようなクラス構成となっていました。
各クラスの役割は下記になります。
- ConfigLoader
各種設定ファイルからConfigSourceを生成する設定読込クラス。
今回の範囲ではYamlファイルを読み込んでJsonObjectに変換して返している。
- ConfigSource/TaskSource/CommitReport/ConfigDiff
設定の設定と取得が行えるインタフェース。
入れ子構造になっており、内部要素を取得したり、内部要素をEntityに変換して返すことが可能。
今回の範囲の実体はModelManagerとJsonObjectを設定したDataSourceImplであり、
ModelManagerをJsonObjectをクラスに変換する際に使用している。
- Schema
データの読み込み/書き込み先となるデータソースの
のスキーマをカラム(インデックス、名称、型情報を保持するカラム定義)のリストとして
保持するスキーマ定義クラス。
- ResumeState
前回実行失敗時に状態を保存するクラス。
下記の情報を保持する。
- ExecSession
実行時の情報を保持するクラス。
設定関連のオブジェクトの他にPluginManager、BufferAllocatorや
トランザクション時間(記録するための値)を保持する。
- LocalExecutor
ローカルで実際のInput/Output処理を実行するクラス。
与えられた設定を元にInput/Output処理を実行し、結果を記録する。
現状、embulkの実処理を実行するコアクラス。
- ProcessState
プロセスの現在の実行状態を保持するクラス。
Processorの数だけ実行情報を保持する。
保持している状態は下記のとおり。
と、こう見てみるとよく見た構造を持つ4クラス(ConfigSource/TaskSource/CommitReport/ConfigDiff)が気になりますね。
ですので、まずはこのデータソース周りを見て見ます。
3. データソース関連クラス群
データソース関連クラス群をまとめてみると、下記のようになります。
クラス構造を見てもわかるとおり、ConfigSource/TaskSource/CommitReport/ConfigDiffの実体はDataSourceImplクラスであり、
実際にデータはJacksonのObjectNodeとして保持されています。
その上で、DataSourceSerDeクラスに記述されたSerDeModule(下記)によって
各種データソースのシリアライザ/デシリアライザが紐付けられています。
この紐付けによってこれらのデータソースのクラスを指定することで
設定ファイルから読み込んでキャストし、ロードすることができるわけですね。
public static class SerDeModule extends SimpleModule { public SerDeModule(final ModelManager model) { // DataSourceImpl addSerializer(DataSourceImpl.class, new DataSourceSerializer<DataSourceImpl>()); addDeserializer(DataSourceImpl.class, new DataSourceDeserializer<DataSourceImpl>(model)); // ConfigSource addSerializer(ConfigSource.class, new DataSourceSerializer<ConfigSource>()); addDeserializer(ConfigSource.class, new DataSourceDeserializer<ConfigSource>(model)); // TaskSource addSerializer(TaskSource.class, new DataSourceSerializer<TaskSource>()); addDeserializer(TaskSource.class, new DataSourceDeserializer<TaskSource>(model)); // CommitReport addSerializer(CommitReport.class, new DataSourceSerializer<CommitReport>()); addDeserializer(CommitReport.class, new DataSourceDeserializer<CommitReport>(model)); // ConfigDiff addSerializer(ConfigDiff.class, new DataSourceSerializer<ConfigDiff>()); addDeserializer(ConfigDiff.class, new DataSourceDeserializer<ConfigDiff>(model)); } }
ModelManagerはさまざまなクラスのシリアライザ/デシリアライザを
内包したObjectMapperを保持しており、それと上記のシリアライザ/デシリアライザを連携させ、
インジェクションすることでValueObjectをデータを保持するところから
非常に短い、シンプルなコードでロードすることが可能になっています。
なお、このあたりを見ていくとSessionTask、ExecutorTaskといったTaskオブジェクトの
シリアライザ/デシリアライザ定義なども見えてくるわけですが、
そこまで見ていくときりがないんですよね(汗
そのあたりのエンティティのシリアライズ/デシリアライズ周りは動作自体にはそう影響してこない・・
と言いたいのですが、このあたりのValueObjectの管理をいかに楽に行うかが
ValueObjectを大量に扱うプロダクトの鍵ですので、このシリアライズ/デシリアライズ機構も
embulkの重要な機能のひとつになります。
とはいえ、run処理の流れとデータソース周りの概要が見えた段階で今回はここまでにして、
次回はLocalExecutorの詳細か、ModelManagerの詳細を見ていきます。
どちらにするかはまぁその時にでも。