はじめまして、AI技術開発部分析グループの伊田です。
MoTではタクシー配車アプリのKPIなどを筆頭にBIツール「Looker」でレポートを作成し、Slackに日々配信しています。この時、稀にSlack配信が失敗する場合があります。Lookerにはリトライ機構がないため配信が失敗した場合は障害対応として手動でレポートを再配信する運用が発生します。
本記事ではLooker APIを利用してスケジュール配信を自動リトライする方法を紹介したいと思います。
記事の構成
本記事は下記の流れでスケジュール配信の自動リトライ手順を説明します。
- スケジュール配信の自動リトライ概要
- 配信失敗ジョブの取得設定(Look)
- Looker APIによる「スケジュール配信の自動リトライ」の実装
スケジュール配信の自動リトライ概要
自動リトライ手順
スケジュール配信が失敗したジョブを自動リトライする方法について、Lookerコミュニティで対応方法が回答されています。
https://discourse.looker.com/t/schedule/15591
それによると
- 配信が失敗したジョブを抽出するLookの作成
- Lookデータ取得APIで1から失敗したスケジュールのScheduled Plan IDを取得
- 2で取得したScheduled Plan IDに対してAPIのrun_onceを実行する
となります。
※ Look = 1帳票の単位と思って頂ければ差し支えありません
※ Scheduled Plan ID = 配信設定した際にLookerが裏側で持つスケジュール配信IDです
この対応方法に則って自動リトライを構築していきます。
尚、1, 2 はLookerのシステム情報を参照する必要があるためLookerの「see_system_activity」権限が必要です(Developer権限に内包されている)。つまりAPI実行ユーザーにも「see_system_activity」権限が必要です。
基盤構成
1はLookerの機能そのものですが、2, 3はPythonで実装し、ワークフローエンジンであるAirflowを用いて定期実行するようにしたいと思います。
分析基盤全体の流れとしては下記です。
- (0時過ぎ) Airflowで夜間に日次バッチ処理を実行して分析基盤を更新する
- (9時までに順次配信) Lookerで作成したレポートをSlackにレポート配信する
- (10時起動) Airflowで配信が失敗したジョブをLooker APIを利用して抽出し、失敗したジョブが存在すればスケジュール配信APIを実行する
※本記事ではAirflowの説明はしません

配信失敗ジョブの取得設定(Look)
配信失敗ジョブを取得するためのLookを作成します。
Explore > System Activity > Scheduled Plan を選択します。

Explore画面で配信失敗ジョブを取得するための設定を行います。

まずは、Scheduled job や Scheduled Plan からどんなデータが取得できるか確認します。
DIMENSIONSから
- Scheduled job.Created Time(配信ジョブ実行日時)
- Scheduled job.Finalized Time(配信ジョブ完了日時)
- Scheduled Plan.ID(スケジュール配信ID)
- Scheduled job.ID(スケジュールジョブID)
- Scheduled job.Status(配信ジョブ実行結果)
- Scheduled Plan.Run Once(Yes: 1回だけ送信 / テスト配信 No: スケジュール配信)
- Scheduled job.Name(配信ジョブ名) ※業務関連の名称が設定されているため黒塗りにしています
- Scheduled Plan.Dashboard ID(Dashboardに配信設定している場合)
- Scheduled Plan.Look ID(Lookに配信設定をしている場合)
を選択します。

どういったデータが取得できるか肌感覚がつかめたので実際にAPI用のLookに落としこんでいきます。
DIMENSIONSから
- Scheduled job.Created Time(配信ジョブ実行日時)
- Scheduled Plan.ID(スケジュール配信ID)
- Scheduled job.Name(配信ジョブ名)
を選択します。
次にFILTERSで
- Scheduled job.Created Time 「が過去」 「10」 「時間」 (10時起動なので分析基盤更新タイミングである0時から10時までの配信失敗ジョブのみに絞る)
- Scheduled Plan.Run Once 「が次である」 「No」 (スケジュール配信のみ)
- Scheduled job.Status 「が次の値に等しい(=) 」 「failure」 (配信ジョブ実行結果が失敗のみ)
を設定します。

Lookを保存後、APIを実行するときにLookIDが必要なので控えておきます。
弊社環境では look_id = 442 です。
※LookIDはURL部分で確認できます

Looker APIの実装
Looker APIの処理を実装するにあたり、いきなり実装に入るのではなくLooker APIの紹介、APIキーの取得方法を確認した上で実装していきます。流れとしては下記の通りです。
Looker APIの紹介
APIを実装する前にLooker APIを簡単に紹介させて頂きます。不要な方は飛ばしてください。
公式ドキュメント
https://docs.looker.com/reference/api-and-integration/api-reference/v3.1
Lookからデータを取得する[Run Look]
https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/look#run_look
スケジュール配信再送[Run Scheduled Plan Once by Id]
ドキュメントだけでなく実際にAPIを試す環境もあります。コード実装せずにAPIを試せるのでとても助かります。
https://your-base-url.looker.com:19999/api-docs/index.html

Run Look を試す場合は SDK run_look(look_id, result_format) から look_id = 442, json をセットして Try it out! ボタンを押下すればすぐに結果が返ってきます。Python実装としては、 run_lookで得られた scheduled_plan_id をパラメータにして run_scheduled_plan_once_by_id を実行することになります。

Looker APIキーの取得
管理 > Users(Users) > Edit を選択します。

API3 Keys Edit Keys を選択します。

New API3 Key を選択してAPI Keyを発行します。
今後は、 Client ID と Client Secret を利用してAPIを実装します。

ディレクトリ構成
ディレクトリ構成は下記になります。
共通系処理を実装した後に本丸のスケジュール配信リトライ処理を実装していきます。
base_directory |-- looker | └- looker_api.py : 共通系処理 └-- looker_job_rerunner.py : スケジュール配信リトライ処理
最終的にAirflowのDAGから looker_job_rerunner.py をPythonOperatorで読み込みます。
Looker API共通処理の実装
looker_api.py ではログイン処理とリクエスト処理(get, post)を実装します。
※今回は取得(get)、新規作成(post)だけですが、必要に応じて更新(patch)、削除(delete)のリクエスト処理を実装してください
エンドポイント https://your-base-url.looker.com:19999/api/3.1/login に client_id と client_secret を利用してログインした後に access_token を保持します。
client_id と client_secret はAirflowの管理画面から設定してPG上で取得するようにしています。
import requests from airflow import models headers = {'content-type': 'application/json'} class LookerAPI(): """ https://docs.looker.com/reference/api-and-integration/api-reference/v3.1 LookerAPIクラス ログイン処理および各種LookerAPIのエンドポイントへの接続 """ def __init__(self, base_url='https://your-base-url.looker.com:19999/api/3.1'): self.base_url = base_url self.base_params = { 'verify_ssl': True, 'timeout': 120 } url = self.base_url + '/login' params = self.base_params.copy() params['client_id'] = models.Variable.get('looker_client_id') params['client_secret'] = models.Variable.get('looker_client_secret') response_json = self.post(url=url, params=params) if 'access_token' in response_json: self.base_params['access_token'] = response_json['access_token'] else: # 後ろ4文字のみ残してマスク mask_client_secret = ('X' * (len(params['client_secret']) - 4)) + params['client_secret'][-4:] raise Exception(f"Looker API Login Failed client_id:{params['client_id']} / client_secret:{mask_client_secret}") def get(self, url, params) -> dict: response = requests.get(url=url, params=params, headers=headers) if response.status_code != 200: raise Exception(f'status code: {response.status_code} / {response.json()}') return response.json() def post(self, url, params, json={}) -> dict: response = requests.post(url=url, params=params, headers=headers, json=json) if response.status_code != 200: raise Exception(f'status code: {response.status_code} / {response.json()}') return response.json()
Looker APIによる「スケジュール配信の自動リトライ」の実装
looker_job_rerunner.py ではスケジュール配信の自動リトライ処理を実装します。
各エンドポイントは次の通りです。
Look取得
https://your-base-url.looker.com:19999/api/3.1/looks/:look_id/run/json
スケジュール配信
https://your-base-url.looker.com:19999/api/3.1/scheduled_plans/:scheduled_plan_id/run_once
尚、Look取得時に念の為キャッシュを破棄するパラメータを設定しています。
import logging from base_directory.looker.looker_api import LookerAPI logging.getLogger().setLevel(logging.INFO) class LookerJobRerunner(): """ Lookerの定時スケジュール配信で失敗となったジョブを取得し、再実行するためのクラス 制限として、ジョブの再実行まではするが再実行となったジョブが配信成功するかどうかはわからない """ # look_id 442 はスケジュール配信が失敗したジョブを取得するための帳票ID def __init__(self, look_id=442): self.api = LookerAPI() self.look_id_for_fetch = look_id def _fetch_scheduled_plan(self) -> [dict]: url = f'{self.api.base_url}/looks/{self.look_id_for_fetch}/run/json' params = self.api.base_params.copy() params['cache'] = False return self.api.get(url=url, params=params) def _run_once_scheduled_plan(self, scheduled_plan_id): url = f'{self.api.base_url}/scheduled_plans/{scheduled_plan_id}/run_once' params = self.api.base_params.copy() self.api.post(url=url, params=params) def fetch_with_failure_and_retry(self): logging.info('fetch_scheduled_plan') scheduled_plan_dict_list = self._fetch_scheduled_plan() logging.info('run_once_scheduled_plan') for scheduled_plan_dict in scheduled_plan_dict_list: logging.info(scheduled_plan_dict) scheduled_plan_id = scheduled_plan_dict['scheduled_plan.id'] # 必要に応じて scheduled_job.name も取得 try: _ = self._run_once_scheduled_plan(scheduled_plan_id=scheduled_plan_id) except Exception as e: """ ここのExceptionに来るときは基本的に対処しようがないとき スケジュール配信が失敗したあとで配信ジョブを一度削除して再作成したため対象の配信IDが見つからない など """ logging.error(f'scheduled_plan_id: {scheduled_plan_id}') logging.error(f'Exception: {e}') return 200
Airflowの実装については割愛しますが、以上の実装を日次10時に起動することで「スケジュール配信の自動リトライ」を実現しています。
実際に自動リトライされたシーン
9時30分ごろにいつもなら出ているレポートが出ていないと連絡を頂きました。
10時に「スケジュール配信の自動リトライ」ジョブが起動することで運用チームが障害対応をせずに済んでいます。
※メンションに気づいた時は既に再配信されていたという状況です

まとめ
本記事ではLooker APIを利用してスケジュール配信を自動リトライする方法を紹介しました。欲しい情報をLookであらかじめ定義しておき、Looker APIを利用してデータを取得、スケジュール配信を自動リトライする処理を実装しました。この記事をもとにLookerの運用負荷が少しでも軽減されれば幸いです。
また、Looker APIを応用すれば他にも色々なことができるので本記事が参考になればと思います。