PythonのETLライブラリ「DLT」を導入してみた話

PythonのETLライブラリ「DLT」を導入してみた話

今回は、BigQuery にデータを取り込む用途で DLTという Python ライブラリを導入してみた経験について軽く紹介します。

導入するまでの経緯

データプラットフォームチームでは、多様なデータを扱っており、さまざまな外部ソースからデータを GCPGoogle Cloud Platform)環境に取り込んでいます。これらのデータは、分析やレポート作成をはじめとした用途のために加工され、BigQueryのテーブルやviewとして社内で提供されています。

データソースや提供パターンの多様性に伴い、データ取り込みや変換用のツールも数多く存在していました。それらは異なる時期に、異なる担当者によって、異なる書き方や構造で作られており、コーディングスタイルや管理方法もまちまちでした。中には、ELT(Extract, Load, Transform)処理を担う、大きくて複雑な自前実装のライブラリも含まれていました。

その結果、コードの可読性や保守性が低く、物によっては処理内容を理解するだけでも苦労する状態になっていました。将来的なメンテナンス性・保守性を考え、まずはこれらの取り込み処理を共通の仕組みに統一したい、そしてコードをリファクタリングして複雑性を下げたい、という方針になりました。

その過程で、「BigQuery のテーブルにデータを取り込む」という共通ニーズを担えるライブラリとしてDLTを知りました。コード例を見ると、これまで手動で書いていた処理を、より短く・シンプルに定義できそうだと感じ、試しに導入してみることにしました。

DLTによる最小構成の実装例

以下は、架空の渋滞情報 API から道路の渋滞データを取得し、BigQueryに取り込む最小限の実装例です。

import dlt
import json
import logging
import requests

logger = logging.getLogger(__name__)

def fetch_data(date: str):
    api_endpoint = "https://[my_api_endpoint]"
    res = requests.get(api_endpoint, params={"date": date})
    return json.loads(res.content)["Results"]

@dlt.resource
def get_traffic_delays(date: str):
    raw_data = fetch_data(date)
    delay_data = raw_data.get("traffic_delays")  # dict のリスト
    yield from delay_data  # generator として返す必要がある

class DelayDataExtractor:
    def __init__(self, bq_project: str):
        self.bq_project = bq_project

    def load_to_bigquery(self, load_date: str):
        pipeline = dlt.pipeline(
            pipeline_name="traffic_delay_pipeline",
            destination=dlt.destinations.bigquery(
                project_id=self.bq_project
            ),
            dataset_name="road_data"
        )

        # DLTが返す、読み込み処理の成功・失敗・所有時間などについての情報
        load_details = pipeline.run(
            get_traffic_delays(load_date),
            table_name="road_traffic_delays",
            write_disposition="append"
        )

        return load_details

if __name__ == "__main__":
    test_date = "2025-10-25"
    extractor = DelayDataExtractor("my_bq_project")
    load_details = extractor.load_to_bigquery(test_date)
    logger.info(load_details)

このような最小構成で必要な要素は、以下の3点です。 1. データ取得元の定義(@dlt.resource)→ dictのリストをyieldする形で定義する 2. データ取り込み用パイプラインの定義(dlt.pipeline) 3. パイプラインの実行(pipeline.run

dlt.source など他のAPIもありますが、単純なデータ取り込み用途であれば、このようなミニマルな構成で十分なケースが多いと感じました。

例から分かる通り、BigQueryへの書き込み処理は pipeline.run の1行で完結します。dlt.resource からyieldされたdictのデータ型は自動で推論され、テーブルのスキーマも自動生成されるため、これまで手動で書いていたスキーマ定義や型変換の処理が不要になります。

気を付ける必要があったポイント

カラム名がsnake_caseに自動変換されてしまう

DLTは、リソースから渡されるdictのフィールド名を元にテーブルのスキーマを自動生成しますが、デフォルトでは カラム名をsnake_caseに変換します。

例えば、以下のようなデータがあった場合:

{"ResourceType": "bucket", "DownloadTime": "2025-10-25"}

DLT が作成するBigQueryテーブルのカラム名は、 - resource_type - download_time

のように変換されていました。

この挙動はデフォルト設定によるものですが、以下の環境変数を設定することで、カラム名をdictの情報をそのまま使用するようにできます。

os.environ["SCHEMA__NAMING"] = "direct"

まとめ

今回は、BigQueryへのデータ取り込み用途でDLTライブラリを導入した経験をまとめました。

まだ導入しているデータパイプラインの数は多くありませんが、BigQuery への取り込み方法を統一できたことで、コードの見通しがかなり良くなったとすでに感じています。

現時点ではBigQueryへの取り込み部分に限定して利用していますが、今後は他にどのような使い方ができるのかも調査していく予定です。