こんにちは、技術部のou.gです。リリースしたのは去年だけど、弊社BIサービスのETL処理流れを簡単に紹介したいと思います。
テックスタック:
- DB: PostgreSQL
- Framework: Rails
- Gems: Kiba, postgres-copy
ETLのE(抽出):
SMPのDBから、必要となるデータを検索してCSVファイルとして落とします。
SMP::SourceModel.select(target_columns)
.where('updated_at >= timestamp ?', last_succeeded_timestamp).copy_to do |line|
source_csv_file.write line
end
ETLのT(変換・加工):
上記抽出したCSVファイルを使って、業務ロジックに合わせて、対象CSVファイルを作成します。
source Common::SMPSourceCSV, source_csv_file.path
destination Common::SMPDestinationCSV, destination_csv_file.path
transform do |row|
row[:column1] = row.delete(:column1)
row[:column2] = row.delete(:column2) || default_column2
row
end
transform Common::TransformColumn3
Common::SMPSourceCSV、Common::SMPDestinationCSVとCommon::TransformColumn3の書き方はKibaのREADMEに書いてありますので、省略します。
ETLのL(ロード)
Kibaのpost_processに加工されたCSVファイルをDWHに反映します。
post_process do
target_table_name = DWH::TragetModel.table_name
temp_table_name = "#{target_table_name}_temp"
# 同時実行の制御
DWH::TragetModel.connection
.execute("LOCK TABLE #{target_table_name} IN SHARE ROW EXCLUSIVE MODE;")
# temp tableのindex/制約等を削除すると、もっと速くなる。
DWH::TragetModel.connection
.execute("CREATE TEMP TABLE #{temp_table_name} (LIKE #{target_table_name} INCLUDING ALL) ON COMMIT DROP;")
DWH::TragetModel.copy_from(destination_csv_file.path, table: temp_table_name)
DWH::TragetModel.connection.execute(upsert_sql)
end
upsert_sqlの返却SQLは下記のようなものです。
WITH updated AS (
UPDATE target_table t
SET column1 = p.column1, column2 = p.column2, ..., columnN = p.columnN
FROM temp_copy_target_table p
WHERE t.smp_pk = p.smp_pk
RETURNING p.smp_pk
)
INSERT INTO target_table(smp_pk, column1, column2, ..., columnN)
SELECT smp_pk, column1, column2, ..., columnN
FROM temp_copy_target_table p
WHERE p.smp_pk NOT IN (select smp_pk from updated)
これで、one sqlでupsertの実現できました。
説明しやすいために、実際ソースコードの共通処理を展開しました。
以上