こんにちは。 バックオフィス基盤1グループの福永です。
普段は決済基盤の開発や不正利用対策に従事しています。 業務の中でDBに5億件のデータを取り込むバッチを作成しました。 今回はバッチ作成にあたり意識したこと、工夫点などをご紹介します。
概要
タクシーアプリ『GO』の不正利用をより効果的に防ぐため、現在の対策に加え、 ユーザーのメタデータを使った新しいチェック機能を導入します。
この新機能では、不正判定を行うためのデータとして約5億件の判定用データを用います。このデータとユーザーのメタデータを照合することで、不正利用を判断する仕組みです。
ここで用いる判定用データはcsvファイルなので、検索に不向きです。 そこで、csvファイルをDBに取り込むバッチを作成しました。
この約5億件という大量のデータをDBに保存する作業には時間がかかりますが、データ取り込み中も既存のAPIなどを稼働させ続ける必要があり、DBへの負荷を最小限に抑えなければなりません。
この「高速かつ低負荷」という要件を実現するために、「ストリーミング×バッファリング×並列処理」を組み合わせた実装を採用しています。
前提条件
設計
ストリーミング
メモリ効率の観点からストリーミングを採用しました。 ストリーミングなら少しずつ連続的に読み込むことができ、効率がよいです。 逆にファイル丸ごと一度にダウンロードした場合、ファイルをメモリ上に展開する必要がありOut Of Memoryのリスクがあります。
バッファリング
ストリーミングで読み込んだレコードごとにハッシュ化などの変換ロジックを適用した後、すぐにデータベースに書き込むのではなく、アプリケーション内の小さなバッファ(数万行)に一時的に溜めます。 その後バルクインサートでまとめて保存することで、よりDBの処理時間を短縮します。
- サンプルコード Golang
// processData: ストリーミングで読み込み、バッファリングして書き込む func processData(inputStream io.Reader) error { // DB書き込み用のバッファ(スライス)を初期化 // 処理のたびにメモリ確保が発生しないよう、初期容量をMaxBatchSizeに設定 recordsBuffer := make([]TrustRecord, 0, MaxBatchSize) // ストリーム(ファイル)を1行ずつ読み込む scanner := bufio.NewScanner(inputStream) for scanner.Scan() { line := scanner.Text() // --- 1. ストリーミング読み込みと変換ロジック適用 --- // 1レコードずつ読み込み、ハッシュ化などの変換ロジックを適用 record := convertLineToRecord(line) // 行データをバッファに追加 recordsBuffer = append(recordsBuffer, record) // --- 2. バッファリングとバルクインサート --- // バッファが規定サイズに達したら、DBに一括書き込み if len(recordsBuffer) >= MaxBatchSize { if err := BulkInsertToDB(recordsBuffer); err != nil { return err // エラー発生時は即座に処理を中断 (堅牢性のため) } // バッファをクリアし、次のバッチ処理に備える recordsBuffer = recordsBuffer[:0] } } // ループ終了後、バッファに残っている最後のバッチを書き込む if len(recordsBuffer) > 0 { if err := BulkInsertToDB(recordsBuffer); err != nil { return err } } return nil }
並列処理
Apache Airflowをオーケストレーション層として採用し、処理の並列化を図りました。 Apache Airflowは、ワークフローをプログラムによって定義し、スケジューリング、監視するためのオープンソースのプラットフォームです。
5億件のデータは数百個のファイルに分割されているので、 前処理のバッチ→各ファイルを取り込むバッチを並列処理→後処理バッチ のような流れで処理します。今回は最大10podが並列で動くよう設定しました。 またAirflowはエラーになったバッチの確認や、失敗したところから再スタートなどの操作がGUI上でできるのが便利です。
(出典: Apache Airflowドキュメント)
その他細かい考慮点
KVSとRDBの選定理由
検討初期段階では、データの柔軟性や部分的な更新の容易さなどを考慮しました。必要なデータの選別、暗号化、ハッシュ化の処理を後から変更したり、一部のデータに欠損があった場合の更新処理をやりやすくする必要がありました。 また、既存の基盤がRDBメインであり、関連するモニタリングツールやノウハウがすでに充実している。さらに、学習コストが発生せず、運用コストを最小限に抑えられると判断したため、RDBを選定しました。
PostgreSQLでのデータ取り込み方法の選定理由
PostgreSQLにおいて大量のデータを一括保存する方法として、主に「COPYコマンド」と「Bulk Insert」の2種類を比較検討しました。 COPYコマンドは一般的に最も高速なデータ取り込み方法です。 しかし今回は、データを取り込む際に値のハッシュ化や保存要否のロジック判定を挟む必要があったことから、Bulk Insertを選択しました。
TIPS
性能検証
ghzを利用し、データ取り込みバッチ実行中に、既存の検索APIのパフォーマンスが悪化しないかを検証しました。 詳しくはこちらの記事でも紹介しています。
リストパーティショニング
毎月、新しいバージョンのデータ追加が必要なため、既存データに影響がないようリストパーティショニングを導入しています。これにより検索スピード(パーティション・プルーニング)や堅牢性、データ削除の容易性などのメリットがあります。
結果
5億件を約6時間で取り込むことができました。 また以下の対応により、さらに時間を短縮できると思います。
COPYコマンドの利用
PostgreSQLへのデータ取り込みについて、上述で値操作やロジックを挟むためCOPYコマンドの利用はNGとしましたが、 対象ファイルから修正を加えたCOPYコマンド用のファイルを生成し、その後COPYコマンドで一気に取り込む方法があります。 (このブログを書いている中で思いつきました。) ただし一部データの破損など不具合があった際はTRUNCATEで削除した後に再実行する必要があり、リカバリの粒度が荒くなることに注意です。
並列数の調整
Apache Airflowの並列数を10に設定していますが、DBのメモリ使用率が80%近くで高止まりしていました。これ以上増やしてもロック待ちやスワップにより帰って処理速度が低下する可能性があります。 並列数を減らしてリソースに余裕を持たせることで、安定した書き込み速度になると考えられます。
終わりに
本記事では、5億件という大規模な外部データの取り込みについて、「高速かつ低負荷」の両立を目指す方法をご紹介しました。 私自身、あまりやったことがない分野だったので多くの学びがありました。 この記事が誰かの役に立てたら幸いです。