こんにちは、SREグループのカンタンです!GO株式会社では複雑なワークフローを実現するためにAirflowを利用しています。Kubernetesを活かして、マルチテナンシーによるセキュリティ課題を解決しシームレスな開発体験を提供するAirflow基盤を用意しました。今回はその基盤をご紹介します。
Airflow
Airflowとはワークフローを管理するためのプラットフォームです。実行したいタスクとその依存関係を一つのDAG (DAG = Directed Acyclic Graph = 有向非巡回グラフ) としてPythonファイルで定義すると、その依存関係に従ってタスクが実行されます。更に、タスクの状態、履歴やログなどをGUIで確認できます。
Kubernetes、AWS S3、BigQuery、SQL DBなど様々なインテグレーションが存在していて機能も豊富なため非常に人気があります。

bashを使って hello を出力するタスクとPythonで airflow を出力するタスクのDAGサンプル:
from datetimeimport datetime from airflowimport DAG from airflow.decoratorsimport task from airflow.operators.bashimport BashOperator # DAG = ワークフロー with DAG( dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *", ) as dag: # Operator = タスク # bashタスク hello = BashOperator(task_id="hello", bash_command="echo hello") # Pythonタスク @task() def airflow(): print("airflow") # タスクの依存関係を定義 hello >> airflow()
Amazon MWAA
SREグループはAWSを利用することが多いため、Amazon Managed Workflows for Apache Airflow (Amazon MWAA)というマネージドAirflowサービスを利用しています。
VPCとS3バケットを用意すればAmazon MWAA環境が作れます。以下のようなアーキテクチャになります:

AirflowのSchedulersがワークフローとタスクのスケジューリングを行い、Workersがタスクを実際に実行しています。タスクの実行方法がタスクの種類によります:BashやPythonタスクはWorkersに直接実行されますが、例えばAirflowのKubernetesPodOperatorタスクを利用するとWorkersがKubernetesクラスタ上のPodを作成するため主な処理をPodにオフロードできます。
Amazon MWAA環境を作成するためのサンプルTerraform設定:
locals { s3_bucket_arn = "..." security_group_ids = [...] subnet_ids = [...] eks_cluster_arns = [...] } # -------------------- # Amazon MWAA (Amazon Managed Workflows for Apache Airflow) # -------------------- resource "aws_mwaa_environment" "this" { name = "demo-airflow" # Network network_configuration { security_group_ids = local.security_group_ids subnet_ids = local.subnet_ids } webserver_access_mode = "PUBLIC_ONLY" # Environment environment_class = "mw1.small" execution_role_arn = aws_iam_role.execution_role.arn min_workers = 1 max_workers = 10 schedulers = 2 weekly_maintenance_window_start = "MON:10:00" airflow_version = "2.5.1" # S3 bucket source_bucket_arn = local.s3_bucket_arn dag_s3_path = "/" } # -------------------- # Airflow execution role # -------------------- resource "aws_iam_policy" "execution_role" { name = "demo-airflow-execution-role" path = "/" description = "Policy for demo-airflow execution role" policy = jsonencode({ "Version" : "2012-10-17" "Statement" : [ { "Sid" : "AllowAirflowPublishMetrics" "Effect" : "Allow", "Action" : "airflow:PublishMetrics", "Resource" : "arn:aws:airflow:ap-northeast-1:XXXXXXXXXX:environment/demo-airflow" }, { "Sid" : "DenyS3ListAllMyBuckets", "Effect" : "Deny", "Action" : "s3:ListAllMyBuckets", "Resource" : [ local.s3_bucket_arn, "${local.s3_bucket_arn}/*" ] }, { "Sid" : "AllowGetBucketObjects", "Effect" : "Allow", "Action" : [ "s3:GetObject*", "s3:GetBucket*", "s3:List*" ], "Resource" : [ local.s3_bucket_arn, "${local.s3_bucket_arn}/*" ] }, { "Sid" : "AllowCloudWatchLog", "Effect" : "Allow", "Action" : [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource" : "arn:aws:logs:ap-northeast-1:XXXXXXXXXX:log-group:airflow-demo-airflow-*" }, { "Sid" : "AllowCloudWatchLogGroupsDescription", "Effect" : "Allow", "Action" : "logs:DescribeLogGroups", "Resource" : "*" }, { "Sid" : "ALlowCloudWatchPutMetrics", "Effect" : "Allow", "Action" : "cloudwatch:PutMetricData", "Resource" : "*" }, { "Sid" : "AllowAirflowCelerySQS", "Effect" : "Allow", "Action" : [ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource" : "arn:aws:sqs:ap-northeast-1:*:airflow-celery-*" }, { "Sid" : "AllowKMS", "Effect" : "Allow", "Action" : [ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt" ], "Condition" : { "StringLike" : { "kms:ViaService" : [ "sqs.ap-northeast-1.amazonaws.com" ] } }, "NotResource" : "arn:aws:kms:*:XXXXXXXXXX:key/*" }, { "Sid" : "ALlowEKSClusterAccess", "Effect" : "Allow", "Action" : "eks:DescribeCluster", "Resource" : local.eks_cluster_arns } ] }) } resource "aws_iam_role" "execution_role" { name = "demo-airflow-execution-role" assume_role_policy = <<EOF { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow.amazonaws.com", "airflow-env.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] } EOF } resource "aws_iam_role_policy_attachment" "execution_role" { role = aws_iam_role.execution_role.name policy_arn = aws_iam_policy.execution_role.arn }
Kubernetes連携
AirflowのKubernetesPodOperatorタスクを利用することでKubernetesのPodを作成できます。重い処理をPodにオフロードすることでAirflowのWorkerの負荷を下げることはもちろん、SREグループとしては既に運用しているKubernetes基盤をそのまま活かせるのが最大のメリットです(ログ収集や監視仕組み、コンテナ化、シークレット管理など)。
PodをAirflowで作成する際、コンテナのDockerイメージやメモリリソースなど、そのPodのスペックをKubernetesPodOperatorに渡す必要があります。渡し方が主に2つあります:インラインで引数として渡すか、PodのスペックをPodTemplate YAMLファイルに定義しファイルパスを指定するか。
インラインでスペックを渡す場合
from datetime import datetime from airflow import models # do not remove default_args = { 'provide_context': True } with models.DAG( dag_id='inline_pod', default_args=default_args, schedule_interval=None, ) as dag: # タスク task1 = KubernetesPodOperator( dag=dag, task_id="task1", # k8s settings in_cluster=True, config_file="kube_config.yaml", # pod settings namespace="default", image="alpine:3.18.0", image_pull_secrets=[k8s.V1LocalObjectReference("my-secret")], cmds=["sh", "-c", "echo 'task1'; sleep 5; echo 'end'"], labels={"foo": "bar"}, # lifecycle is_delete_operator_pod=True, get_logs=True, ) task2 = KubernetesPodOperator( dag=dag, task_id="task2", # k8s settings in_cluster=True, config_file="kube_config.yaml", # pod settings namespace="default", image="alpine:3.18.0", image_pull_secrets=[k8s.V1LocalObjectReference("my-secret")], cmds=["sh", "-c", "echo 'task2'; sleep 5; echo 'end'"], labels={"foo": "bar"}, # lifecycle is_delete_operator_pod=True, get_logs=True, ) # 依存関係 task1 >> task2
スペックをPodTemplate YAMLファイルとして定義する場合
pod-template.yaml ファイル
apiVersion: v1 kind: Pod metadata: namespace: default labels: foo: bar spec: containers: - name: base image: alpine:3.18.0 command: ["sh", "-c", "echo 'Default'"] resources: limits: memory: 32Mi requests: cpu: 20m memory: 32Mi imagePullSecrets: - name: my-secret
DAGファイル
from datetime import datetime from airflow import models # do not remove default_args = { 'provide_context': True } with models.DAG( dag_id='template_pod', default_args=default_args, schedule_interval=None, ) as dag: # タスク task1 = KubernetesPodOperator( dag=dag, task_id="task1", # k8s settings in_cluster=True, config_file="kube_config.yaml", # pod settings pod_template_file="pod-template.yaml", # テンプレートファイル指定 cmds=["sh", "-c", "echo 'task1'; sleep 5; echo 'end'"], # cmdのみを上書き # lifecycle is_delete_operator_pod=True, get_logs=True, ) task2 = KubernetesPodOperator( dag=dag, task_id="task2", # k8s settings in_cluster=True, config_file="kube_config.yaml", # pod settings pod_template_file="pod-template.yaml", # テンプレートファイル指定 cmds=["sh", "-c", "echo 'task2'; sleep 5; echo 'end'"], # cmdのみを上書き # lifecycle is_delete_operator_pod=True, get_logs=True, ) # 依存関係 task1 >> task2
インライン方法の方がシンプルですがPodのスペックが大きくなるとDAGが読みづらくなるのと、例えばDockerイメージタグのみを変更したい場合はスペックが別ファイルに分かれていた方が運用がしやすいでしょう。
課題
Airflowを本格的に利用する前に取り組まないといけない課題がいくつかありました。
自由度が高すぎて不要な依存関係を生みやすい
Airflowのインテグレーションが多くて、PythonコードでWorkerからデータベースに直接繋いでクエリを実行したり、HTTPサービスにリクエストを直接送ったりすることが簡単にできてしまいます。Airflowがよく利用されている分析用途のETL処理の文脈であれば特に懸念がないですが、GOでは様々なマイクロサービスのデータを集計したり比較したり修正したりしています。Airflowからそれぞれのマイクロサービスのデータベースを直接参照すると認証情報をAirflowに保存しないといけないのと、DBスキーマが変わった時にAirflowのDAGに反映する必要があって、メンテナンスが大変になります。更に、例えばGolangで書かれた注文管理APIサービスがあったとして、GolangのAPIサーバもPythonのAirflow DAGもデータベースを参照していて、言語がバラバラで運用が複雑になります。
Airflowからデータベースを直接参照するより、Airflowで実行したい処理を注文管理サービスの一部の機能としてジョブとして定義し、Airflowから注文管理サービスのPodを作成する形の方が運用しやすいです。Airflowはあくまでもトリガーだけになり、処理自体は注文管理サービスの一部になってAPIサーバと同様に管理できます:
- 同じgitリポジトリで管理
- 同じ言語で書く
- 同様に認証情報にアクセス
- 同じ内部関数を使ってDBにアクセス
- 同じDockerイメージで動く
- …
同様に、例えばAirflowからユーザ管理サービスにHTTPリクエストを直接送って処理させるより、ユーザ管理サービスのPodを動かして処理した方が良いです。
Airflowの様々なインテグレーションを使うよりも、Airflowを主にKubernetes Podのスケジューリングに利用しています。以下の画像では、左が様々な依存関係が生まれる直接参照の例で、右側が採用しているPod作成の例です。

シームレスな開発体験
SREグループではKubernetes基盤を提供していて、それぞれのアプリケーション(サービス)のAPIサーバやCron Jobなどのデプロイが自動化されています。アプリケーションリポジトリにプッシュするとAPIサーバやCron Jobなどが自動的にクラスタにデプロイされます。

Airflowから作成されるPodの管理もなるべく既存の基盤に合わせることで全体の開発体験をシームレスにすると良いでしょう。特にDAGが作成するPodのDockerイメージタグをデプロイのたびに手動で更新することは難しいです。この後解決案を紹介します。
マルチテナンシー
Airflowはマルチテナントなシステムではないです。例えばAirflowのS3バケットをテナントごとフォルダ分けしアップロード権限をユーザによって最小限に設定したとしても、テナントAがアップロードしたDAGからAirflowのデータベースにアクセスし他のDAGの削除やユーザの変更ができてしまいます。
更に、Airflowを使ってKubernetes Podを作成する場合、Airflowに強いKubernetes権限を付与する必要があるため、テナントAが他のテナントのPodを作成できてしまいます。例えば:
- テナントAのDAGがWorkerから実行される
- DAGのタスクに従ってWorkerがAirflowのデータベースを直接アクセス
- テナントBのDAGが削除される
- テナントBのPodを作成してしまう

こういう問題を厳密に解決するにはAirflow環境をテナントごとに分けるしかないですがAirflowのマネージドサービスのコスト面が気になります。例えば mw1.small サイズのAmazon MWAA環境は最低でも月額$350以上かかります。10テナントがあった場合、月額$3500以上かかってしまいます。この後解決案を紹介します。
SREのAirflow基盤
これからSREグループで考えた方針と提供しているAirflow基盤について紹介したいと思います。
方針
前述の課題に取り組むように以下の方針を決めています
- Airflowは主にKubernetesのPodを作成するために利用する(データベースへの直接参照などがない)
- Podを作成する際、スペックをPodTemplate YAMLファイルで指定する
- AirflowのテナントをKubernetesネームスペースと一致させる
- 本番用のAirflow環境と開発用のAirflow環境を一つずつ用意する
- Airflow用のS3バケットのフォルダ構造を以下にする
dagsフォルダ:DAGをネームスペースごとにフォルダ分けし管理templatesフォルダ:PodTemplateをネームスペースとアプリケーションごとにフォルダ分けし管理
サンプル
airflow-bucket ├── dags │ ├── namespaceA # ネームスペースAのDAGを管理 │ │ ├── dagA-1.py │ │ └── dagA-2.py │ └── namespaceB # ネームスペースBのDAGを管理 │ ├── dagB-1.py │ ├── dagB-2.py │ └── dagB-3.py └── templates ├── namespaceA │ ├── application1 # アプリケーション1のPodTemplateを管理 │ │ └── pod-application1-template1.yaml │ └── application2 # アプリケーション2のPodTemplateを管理 │ └── pod-application2-template1.yaml └── namespaceB └── application3 # アプリケーション3のPodTemplateを管理 ├── pod-application3-template1.yaml └── pod-application3-template2.yaml
DAG管理
Airflowを利用しやすくするため、DAGをネームスペースごとのgitリポジトリで管理し、gitフローで運用しています。リポジトリにPRを出してマージすると、CIによる自動デプロイが走りS3バケットにアップロードされます。
エンジニアが慣れているgitフローで運用することでAirflowへの反映方法を気にしてなくても良くて自然な開発体験になります:
- developブランチにマージすると開発環境のAirflowに反映される
- mainブランチにマージすると本番環境のAirflowに反映される
自動デプロイすることでエンジニアにS3権限を付与する必要がないのと、GitHubのCODEOWNERSとブランチ保護ルールを設定することで本番環境へのデプロイの際にSREメンバーのレビューを必須にしていてDAG内容の確認ができます。

PodTemplate管理
SREが提供しているKubernetes基盤ではアプリケーションリポジトリにプッシュすると、
Airflowから作成されるPodも同様にデプロイできるように以下の方針にしています:
- AirflowからPodを作成する際、PodのスペックをPodTemplate YAMLファイルとして定義する
- PodTemplateファイルをアプリケーションリポジトリで管理する
- アプリケーションリポジトリにプッシュすることでPodTemplateが自動的にS3にデプロイされる (ビルドされたDockerイメージタグを反映させた上で)

そうすることで、アプリケーションリポジトリにプッシュすることだけでAPIサーバもCron JobもAirflowのPodTemplateもデプロイされます。リバートする必要があった場合には、APIサーバもCron JobもAirflowのPodTemplateもリバートされます。DAG管理と同様に、エンジニアが慣れているgitフローで運用すれば良くて、Airflowのことを気にする必要がないです。
全体図
Kubernetesクラスタへのデプロイ方法、DAGのデプロイ方法とPodTemplateのデプロイ方法をまとめた全体図になります。

セキュリティ観点
テナントごとのAirflow環境ではなく、一つの環境だけを用意しマルチテナントと近い形にすることで料金を大幅に抑えています。以下の方針でセキュリティが担保されています。
DAGを本番環境にデプロイする際にSREレビューが必須になるため、Airflowデータベースを直接参照したり別のネームスペースのPodを作成したりすることを防げます。レビュー不足で他のテナントに影響を与える可能性が残っていますがリスクを抑えています。
DAGのデプロイもPodTemplateのデプロイも自動化されているため、エンジニアに権限を付与する必要がないのと、CIのIAM権限を最小限にできます。
開発体験
慣れているgitフローで運用することだけでデプロイやリバートなどができますのでAirflowの利用がかなり楽になっています。
DAG管理:
- DAGがネームスペースごとの専用リポジトリで管理されているため、他のネームスペースとコンフリクトが発生しない
- gitフローで自然にデプロイされる
- 開発環境の場合はSREレビューが必須ではないためブロッカーにならない
アプリケーションコード管理:
- AirflowからアプリケーションのPodを作成することで、Pythonのコードを用意する必要がなくて、アプリケーションと同じ言語、コード、Dockerイメージを利用できる
PodTemplate管理:
- APIサーバやCron Jobなど、既存のワークロードと同じタイミングでデプロイ・リバートできる
- gitフローで自然にデプロイされる
DAGを本番環境にデプロイするためにSREメンバーのレビューが必須になるため、開発者の作業が止まる可能性があります。この辺りが唯一妥協しているポイントになります。Airflowは将来的にマルチテナントになるようで、今後のリリースに期待しています!
終わりに
AirflowとKubernetesの相性を活かしたシームレスな開発体験を提供するAirflow基盤を用意しました。Airflowのマルチテナンシー課題を考慮しセキュリティを妥協しないマルチテナントと近い形の基盤に至りました。Amazon MWAAを利用していますがGCP Cloud Composerなど他のAirflowサービスにも適用できるためこの記事がご参考になれば幸いです!