DRIVE CHARTでは、機械学習システムのデータパイプラインとしてAirflowを利用しています。
今回は、AWSで提供されているマネージドサービス「Amazon Managed Workflows for Apache Airflow(MWAA)」に移行したので、ちょっとしたtipsや導入・運用時のハマリポイントをまとめようと思います。
はじめに
こんにちは
スマートドライビング事業部システム開発部AI基盤グループで、皆VSCodeを使ってる中で1人寂しくVimで開発している大内です。
今回は、機械学習システムのデータパイプラインとして、AWSが提供しているAirflowのマネージドサービス「Amazon Managed Workflows for Apache Airflow(MWAA)」を、実際に導入してから数ヶ月が経過し、導入・運用時のtips、ハマリポイントや現在の運用状況も含めて書いていこうと思います。
背景
ローンチ当初はプロダクト成長が最優先で、学習で出来上がるモデルの精度検証には、本番データが必要不可欠。さらにスピード重視で開発し続けていたため運用面が考慮されておらず、学習の検証環境と本番環境が同じAirflow用サーバで動いていました。そのため本番環境に影響が出ないように開発を行うのが難しくなり、障害が多発したり、開発スピードの低下など、負債がどんどん大きくなっていました。
ローンチあるあるですよね、、、笑
プロダクトのフェーズも変わり、安定稼働と改善も求められていたので、保守・運用コストを抑えた仕組み化に取り掛かりました。
当初はAirflowから脱却し、以下のようにAWS BatchとStepFunctionを用いてワークフローを実現する予定でした。
しかしStepFunctionsでは、Airflowのような「任意のタイミングで再実行する」をできないことが判明。(厳密には異なるが、トリッキーなことをしないといけない)
学習パイプライン1回分の実行で丸1日掛かっており、学習(SageMaker)に至っては20時間近くかかります。
途中で失敗したときに最初から再実行するのは、ビジネスサイドに大きな影響を与えるので、この方法は不採用に。。。。
Airflowをセルフホスティングで運用するのも結構大変だったので、環境ごとにEC2を立てて運用するのもどうかと悩んでいた時にMWAAに出会いました。
リリースされて数ヶ月のサービスだったので、実績が少なく運用面の不安がありましたが、
- 既存のAirflowの資産を使い回すことが可能
- メンバーが使い慣れている
- セキュリティなど、他に考えなければならないことをMWAAでやってくれる
- S3に置くだけでデプロイできるシンプルな作り
など、メリットのほうが大きいのでこちらを採用しました。
AirflowとMWAA
Airflowは元はAirbnbで生まれた、データエンジニアリングパイプライン用のオープンソースワークフロー管理ツールです。依存関係のある処理をタスクとして作成し、それらをつなぎ込んでdagという形で管理します。UIが用意されていて、それがかなり見やすく、タスクの実行時間や各タスクの実行時間のガントチャートが見れたりします。
タスクはAirflow側で用意されているOperatorを使って作成します。OperatorはProviderとして豊富に用意されているので、用途にあったものを使っていく形になります。
MWAAは、2020年11月にAWSから登場したAirflowのマネージドサービスで、インスタンスやDB管理、スケーラビリティ、可用性、セキュリティ周りなど、インフラ周りで考えなければならない点をまるっと引き受けてくれます。これにより、データフローの開発に集中することができ、Airflowそのものの運用コストを抑えることが可能になります。
また、AIrflowの権限周りはIAM Roleで制御することができ、特定のロールやポリシーを持つユーザのみ実行可能、といったことが可能になります。
2021年12月時点で、公開されているバージョンは1.10.12と2.0.2の2つになります。
使い方
MWAA構築時に指定したS3のパスにdagやrequirements.txtを配置するだけです。
あとはMWAA側が勝手に更新してくれます。とても楽ですね。
ちょっとしたtips
1. MWAA上ではなるべく処理をさせない
MWAAで提供されるインスタンスのスペックはそこまで高くありません。以下がMWAAで選べるインスタイプです。
メモリは、スモールから順に、2GB、4GB、8GBです。
ラージでも4vCPUの8GBとc5.xlarge並のスペックで、これにワーカーやAirflowUIが動いてるので、ちょっとした処理でもあっという間にメモリ不足に陥ります。
シェルスクリプトなどでちょっとした処理もMWAA上で行わず、外部サービス(ECS、Lambdaなど)に切り出すようにしましょう。
2. S3へ配置するファイルは、なるべくピュアなスクリプトにする
dagや必要なモジュールをS3に配置して更新しますが、初期に入っていないpipモジュールやLinuxコマンドが結構あります。(whichコマンドが無かったのは意外でした)
今回は既存のAirflow環境からの移行だったので、dagファイルにAirflowに不要なpytorchやtensorflowの読み込みがあったので、そのまま配置するとPythonのImportErrorが発生します。
これらを切り離して、awsコマンドでまるっと配置できるように切り離していきました。
しかしプライベートリポジトリのpipモジュールなど、どうしても入れたいケースがあるかと思います。今回はそんなケースが発生したので、以下のようにrequirements.txtに追加するとインストールが実行されます。
https://${YOUR_GITHUB_ACCESS_TOKEN}@api.github.com/repos/YourOrganization/private-pip-module/zipball/v1.2.3
api.github.com、zipballを使うのがポイントです。
ハマリポイント
「これで楽に導入・運用できる!」と思いきや、都合よく進まないものです。
色々とハマりました、、、
1. 定期実行時にexecution_dateがずれる
MWAAというよりAirflowでのハマりポイントです。
Airflowでは専用のmacroを提供しており、それを用いると日付や時間に依存した処理を作りやすくなります。execution_dateを使うと実行日時を取得でき、再実行しても変わらない値です。殆どのmacroはこの値から算出されています。
前日分のデータを使って学習を行うので、yesterday_dsを使ってたのですが、これが2日分ずれるという現象が起きていました。
MWAAのバグかと思いましたが、実はAirflowの仕様でした。
以下のように1時間ごとに実行するdagがあるとします。
Airflowのスケジューリングの例
Airflowはcronのようなジョブスケジューラとは違い「期間」を処理対象にしており、dagを作成する際は、インターバル(schedule_interval)を必須になります。
そのため、タスクの実行条件は
💡 execution_date + schedule_interval ≥ 現在の日時
11時のときに実行条件を満たすのは赤線で書かれたexecution_dateが10時のとき、すなわち1つ前のタスクが実行対象になります。
この仕様に気づかず、さらに次のハマリポイントも相まって、調査にかなり時間がかかりました。。。
2. macroは常にUTCのタイムゾーンで算出される
これもAirflowのハマリポイントで、仕様になります。
スケジューリング自体はAirflowのタイムゾーンをJSTを設定しておけば日本時間で監視してくれますが、macroはUTCで扱われます。
- 例: 日本時間2021-09-01 1:00 → execution_dateは2021-08-31 16:00
既存のコードで実行日付(例でいう2021-09-01)を取得するためにtomorrow_dsを使っていたので、前述のハマリポイントを知らずにコードを読んだ時には余計に混乱しました、、笑
macroを使う場合は、以下のようにしてtimezoneを変換して使いましょう。
3. シェルスクリプトが実行できず、Permission Deniedになる
実行権限を追加してS3に配置してもパーミッションが644になり、スクリプトが実行することができません。(S3なので気づけば当たり前なのですが、、、)
AirflowのBashOperatorに直接書けば実行できるのですが、コードの量が多かったのでそれはやりたくありませんでした。色々と調べて、以下のようにbashをつける実行できたのでこちらで対応しました。
4. requirements.txtが更新されない
S3に配置すると読み込んでくれるのですが、requirements.txtはバージョン管理されるので、どのバージョンでインストールするかをMWAAに設定する必要があります。
S3にアップロード後、以下のように明示的に更新を行います。
- terraformの適用例
- CLIで適用する例
5. PythonコマンドはPython2系
dagに必要なコードをS3配置して実行した際に、BashOperatorでPythonを実行しているコードがあり、以下のようなSyntaxErrorが発生しました。
BashOperatorを使ってバージョンを出力するとなんと「2.7系」であることが判明
python script.pyのところをpython3 script.pyのように、「python3」に変更して対応しました。
6. Airflowのタスクが突然エラーになって落ちるときがある
初期運用時は問題なかったのですが、特定のタスクがたまーにエラーが発生するようになりました。メモリが原因だろうとインスタンスサイズを大きくして対応し、発生しなくなったのですが、ここ最近出るようになってきました。
psycopg2.OperationalError: server closed the connection unexpectedly
↑ が出ているのでこれが関係してそうですが、なぜ出るのかよくわかってないですね。。。
インスタンスサイズも限界があるので、省メモリな運用にしていく必要がありそうです。
今はタスクを分割して、AirflowのUIから誰でも復旧できるようにして暫定的に運用しています。
おわりに
今回は、AWSで提供されているマネージドサービス「Amazon Managed Workflows for Apache Airflow(MWAA)」のちょっとしたtips、導入・運用時のハマリポイントについて書かせていただきました。
初期の頃と比べてだいぶ落ち着いて運用できていますが、まだまだ課題は多いですね。MWAAにも登場して間もないのでノウハウがまだ無いのと、マネージドサービスならではの辛みがあるので、ノウハウを貯めつつ、改善要望をAWSにフィードバックしつつ改善していければと思っております。