読者です 読者をやめる 読者になる 読者になる

夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その3:run概要&データソース概要

Embulk

こんにちは。

前回で初期化部分の確認が終わったため、今回は本処理の方に入ってきます。

尚、ServiceLoaderでJavaプラグインがロード出来るかについてはとりあえず一通り読んでからの方針で^^;

まず、基本構造としては
上記の図にあるRunnerが起動の起点となり、初期化を行った後に
run/cleanup/guess/previewの各々の処理に分岐する流れとなっています。

まず今回はrun処理の流れを追ってみることにします。

1. run処理の流れ概要

run処理の流れは下記のようになっています。

  1. 設定ファイル(YAML形式)を読み込む。
  2. 出力ファイル(次回実行用の設定出力先、Resume状態の出力先)の出力可能確認を行う。
  3. Resume状態ファイルを読み込む。
  4. Executorクラスを生成する。
  5. Resume状態にあわせて処理を実行する。
    1. 実行失敗した場合、Resume出力先設定がある場合はそこに状態を記録、設定が存在しない場合はTransactionをクリーンアップする。
    2. 実行成功した場合、Resume設定ファイルを削除する。
  6. 次回実行用の設定ファイルを出力する。

流れ自体は非常にオードソックスなものですね。

2. run処理を実行するクラス群

次はrun処理を実行するクラス群を確認します。
とりあえず、流れとデータの保存先が大体わかる感じまで見てみると、下記のようなクラス構成となっていました。

各クラスの役割は下記になります。

  • ConfigLoader

各種設定ファイルからConfigSourceを生成する設定読込クラス。
今回の範囲ではYamlファイルを読み込んでJsonObjectに変換して返している。

  • ConfigSource/TaskSource/CommitReport/ConfigDiff

設定の設定と取得が行えるインタフェース。
入れ子構造になっており、内部要素を取得したり、内部要素をEntityに変換して返すことが可能。
今回の範囲の実体はModelManagerとJsonObjectを設定したDataSourceImplであり、
ModelManagerをJsonObjectをクラスに変換する際に使用している。

  • Schema

データの読み込み/書き込み先となるデータソースの
スキーマをカラム(インデックス、名称、型情報を保持するカラム定義)のリストとして
保持するスキーマ定義クラス。

  • ResumeState

前回実行失敗時に状態を保存するクラス。
下記の情報を保持する。

    • 設定オブジェクト
    • インプットタスク情報/アウトプットタスク情報
    • インプットスキーマ/アウトプットスキーマ
    • インプットのコミット結果/アウトプットのコミット結果
  • ExecSession

実行時の情報を保持するクラス。
設定関連のオブジェクトの他にPluginManager、BufferAllocatorや
トランザクション時間(記録するための値)を保持する。

  • LocalExecutor

ローカルで実際のInput/Output処理を実行するクラス。
与えられた設定を元にInput/Output処理を実行し、結果を記録する。
現状、embulkの実処理を実行するコアクラス。

  • ProcessState

プロセスの現在の実行状態を保持するクラス。
Processorの数だけ実行情報を保持する。
保持している状態は下記のとおり。

    • 開始/終了状態
    • インプットタスク情報/アウトプットタスク情報
    • 発生例外情報
    • インプットスキーマ/アウトプットスキーマ
    • インプットのコミット結果/アウトプットのコミット結果
    • インプットの前回との設定差分/アウトプットの前回との設定差分

と、こう見てみるとよく見た構造を持つ4クラス(ConfigSource/TaskSource/CommitReport/ConfigDiff)が気になりますね。
ですので、まずはこのデータソース周りを見て見ます。

3. データソース関連クラス群

データソース関連クラス群をまとめてみると、下記のようになります。

クラス構造を見てもわかるとおり、ConfigSource/TaskSource/CommitReport/ConfigDiffの実体はDataSourceImplクラスであり、
実際にデータはJacksonのObjectNodeとして保持されています。

その上で、DataSourceSerDeクラスに記述されたSerDeModule(下記)によって
各種データソースのシリアライザ/デシリアライザが紐付けられています。
この紐付けによってこれらのデータソースのクラスを指定することで
設定ファイルから読み込んでキャストし、ロードすることができるわけですね。

public static class SerDeModule extends SimpleModule
{
    public SerDeModule(final ModelManager model)
    {
        // DataSourceImpl
        addSerializer(DataSourceImpl.class, new DataSourceSerializer<DataSourceImpl>());
        addDeserializer(DataSourceImpl.class, new DataSourceDeserializer<DataSourceImpl>(model));

        // ConfigSource
        addSerializer(ConfigSource.class, new DataSourceSerializer<ConfigSource>());
        addDeserializer(ConfigSource.class, new DataSourceDeserializer<ConfigSource>(model));

        // TaskSource
        addSerializer(TaskSource.class, new DataSourceSerializer<TaskSource>());
        addDeserializer(TaskSource.class, new DataSourceDeserializer<TaskSource>(model));

        // CommitReport
        addSerializer(CommitReport.class, new DataSourceSerializer<CommitReport>());
        addDeserializer(CommitReport.class, new DataSourceDeserializer<CommitReport>(model));

        // ConfigDiff
        addSerializer(ConfigDiff.class, new DataSourceSerializer<ConfigDiff>());
        addDeserializer(ConfigDiff.class, new DataSourceDeserializer<ConfigDiff>(model));
    }
}

ModelManagerはさまざまなクラスのシリアライザ/デシリアライザを
内包したObjectMapperを保持しており、それと上記のシリアライザ/デシリアライザを連携させ、
インジェクションすることでValueObjectをデータを保持するところから
非常に短い、シンプルなコードでロードすることが可能になっています。

なお、このあたりを見ていくとSessionTask、ExecutorTaskといったTaskオブジェクトの
リアライザ/デシリアライザ定義なども見えてくるわけですが、
そこまで見ていくときりがないんですよね(汗

そのあたりのエンティティのシリアライズ/デシリアライズ周りは動作自体にはそう影響してこない・・
と言いたいのですが、このあたりのValueObjectの管理をいかに楽に行うかが
ValueObjectを大量に扱うプロダクトの鍵ですので、このシリアライズ/デシリアライズ機構も
embulkの重要な機能のひとつになります。

とはいえ、run処理の流れとデータソース周りの概要が見えた段階で今回はここまでにして、
次回はLocalExecutorの詳細か、ModelManagerの詳細を見ていきます。

どちらにするかはまぁその時にでも。