この記事は ノバセル Advent Calendar 20日目です。
はじめに
こんにちは。ノバセルでデータエンジニアをしている小川です。
DagsterのSensorを使ってTimeout時の自動リトライ機能を作成したためサンプルコードでその実装方法と利点を解説しようと思います。
背景
ノバセルでは、顧客が求めるデータ(リクエスト)に応じて、dbtで定義したデータモデルをDagster上でマテリアライズ(データ生成・変換・格納)しています。このフロー全体をDagsterで管理することで、スケジュール実行や状態管理、リトライ制御などを一元的に行っています。
大まかな処理は以下の流れです。
flowchart LR A([dbtでリクエストデータを表すモデルを作成]) --> B[リクエストデータ] C[外部データ] --> D([Dagster Jobで処理]) B --> D D --> E[Output] D --> F[完了済みリクエスト] F -.->|完了した処理は次回以降の処理対象にしない| A
- dbtで新たなリクエスト情報(処理対象のデータ範囲や条件など)を抽出し、処理要求を作成する。
- 上記のリクエスト情報をもとに、外部データを取得・変換するDagster Jobを実行し、成果物を生成する。
- 処理が完了したリクエストは「完了済み」として扱い、次回以降の処理対象から除外する。
このような仕組みのため、毎回の実行で扱うデータの内容や量が変動します。
顧客のリクエスト内容によって必要な外部データ量は大きく増える場合があり、その結果Dagster Jobの実行がTimeoutしてしまうケースが発生します。
dbtのvarsで指定する変数(たとえばcalculate_num)を用いると、一度に処理するデータ件数を制御できます。
vars: calculate_num: 10000
with fuga as ( select * from {{ ref('hoge') }} limit {{ var('calculate_num') }} )
これにより、Timeoutを回避するため必要に応じてデータ件数を減らすことは可能ですが、Timeoutのたびにこの変数を手動で調整し、再デプロイ・再実行するのは手間がかかります。
そこで、本記事ではDagsterのrun_failure_sensor機能を活用し、Timeoutが発生したときに自動で変数の値を下げてリトライを行う仕組みを構築する方法を紹介します。これにより、人手での再設定を減らし、よりスムーズな実行管理が可能になります。
使用する技術の説明
Dagster
Dagsterは、パイプラインやAsset(データ資産)をコードで定義し、スケジュール、モニタリング、エラー処理などを包括的に行えるデータオーケストレーションプラットフォームです。
私たちノバセルでも、データ処理の効率化と安定運用のためにDagsterを利用しています。
dbt
dbtは、SQLベースでデータウェアハウス上のデータモデルを定義・実行し、標準化・再現性の高いデータパイプラインを実現するツールです。
Dagsterとdbtの相性
Dagsterはデータパイプライン全体のオーケストレーションが得意で、dbtはデータ変換・モデリングに強みがあります。この2つを組み合わせることで、dbtによるモデル作成・更新をDagsterのパイプライン内で一元的にコントロールし、より高い信頼性と柔軟性を持ったデータパイプライン運用が可能となります。
デモを作成するにあたっての準備
この記事のデモは、公式チュートリアル「Using dbt with Dagster」のPart2まで完了した状態のものを改良して作っています。
Timeoutが起こるような重い処理を再現するため追加で以下を実施しました
- orders, payments, customersのseedデータを各10000件になるように追加
- 3つのstagingモデルをCROSS JOINする
very_heavy_model.sql
というモデルを追加
{{ config( materialized = 'table', ) }} with orders as ( select * from {{ ref('stg_orders') }} limit {{ var('calculate_num') }} ) , payments as ( select * from {{ ref('stg_payments') }} limit {{ var('calculate_num') }} ) , customers as ( select * from {{ ref('stg_customers') }} limit {{ var('calculate_num') }} ) , cross_joined as ( select * from orders cross join payments cross join customers ) select * from cross_joined
- 変数
calculate_num
はデフォルト10000に設定
vars: calculate_num: 10000
very_heavy_model.sql
をbuildする際にcalculate_num
を10000に設定すると計算量が多く、膨大な時間がかかります。
仕組みの概要
今回の要はDagsterのSensor機能、それもrun_failure_sensor
です。
Sensorとは外部の状態変化を検知してJobの実行をトリガーする機能です。
その中でもrun_failure_sensor
は名前の通りRunが失敗した時に反応するSensorを作成するためのものです。
実装
Asset側
完成系は以下の通りです。
from dagster import AssetExecutionContext, Config, define_asset_job from dagster_dbt import DbtCliResource, dbt_assets from .project import jaffle_shop_project DEFAULT_CALCULATE_NUM = 2000 class MyAssetConfig(Config): calculate_num: int = DEFAULT_CALCULATE_NUM @dbt_assets(manifest=jaffle_shop_project.manifest_path) def jaffle_shop_dbt_assets( context: AssetExecutionContext, dbt: DbtCliResource, config: MyAssetConfig ): calculate_num = config.calculate_num build_cmd = ["build", "--vars", f'{{"calculate_num": {calculate_num}}}'] yield from dbt.cli(build_cmd, context=context).stream() dbt_job = define_asset_job( name="dbt_job", selection=[jaffle_shop_dbt_assets], )
いくつかチュートリアルから手を加えています。
まず、MyAssetConfig
についてです。これはRun Configuration(RunConfig)というものであり、Jobの実行時に値を指定することでJobの中でその値を使って動的に処理を行うことができます。 Config
を継承することで好きな項目を追加することができます。
今回はcalculate_numを追加し、その値をdbt build
の--vars
オプションに渡すことで動的にdbtの変数の値を変更しています。実際にRunConfigに値を指定する所はこのあと出てきます。
次にdefine_asset_job
です。Sensorから実行する対象として設定できるのはJobでありAssetではダメなのでdefine_asset_job
を使ってJobにしています。
Sensor側
こちらは順序を追って見ていきます。
まずrun_failure_sensor
デコレータを使用し、ジョブが失敗した際にトリガーされるSensorを用意します。
どのJobが失敗した時にトリガーするかをmonitored_jobsに、このSensorから実行するJobをrequest_jobに指定します。
今回はdbt_jobがTimeoutで失敗した時にdbt_jobをリトライするためどちらにもdbt_jobを指定します。
from dagster import run_failure_sensor from .assets import dbt_job @run_failure_sensor(monitored_jobs=[dbt_job], request_job=dbt_job) def retry_on_timeout(context: RunFailureSensorContext): ...
次にRunFailureSensorContext
からエラーメッセージを取得し、Timeoutであるかどうかの判別をします。
Timeoutの時はRunRequestでリトライを、それ以外の場合にはSkipReasonでリトライをスキップします。
from collections.abc import Generator from dagster import run_failure_sensor, RunRequest, SkipReason, RunFailureSensorContext ... def retry_on_timeout(context: RunFailureSensorContext) -> Generator[RunRequest | SkipReason]: if _is_timeout(context=context): yield RunRequest() else: yield SkipReason("skip retry because this error is not timeout.") def _is_timeout(context: RunFailureSensorContext) -> bool: failure_events = context.get_step_failure_events() for failure_event in failure_events: # failure_eventのevent_specific_dataにはエラーメッセージが入っているためそこでTimeoutかどうかを判別 if "timeout" in str(failure_event.event_specific_data).lower() and failure_event.is_step_failure: return True return False
最後にリトライ時のrun_configを設定します。
失敗したRunのrun_configを取得し、それを半分にした値を設定してリトライするようにします。
from dagster import run_failure_sensor, RunRequest, SkipReason, RunConfig, RunFailureSensorContext from .assets import dbt_job, MyAssetConfig, DEFAULT_CALCULATE_NUM ... def retry_on_timeout(context: RunFailureSensorContext) -> Generator[RunRequest | SkipReason]: if _is_timeout(context=context): # 今実行されていたRunのcalculate_numを取得する previous_run = context.dagster_run run_config = previous_run.run_config previous_calculate_num = \ run_config["ops"]["jaffle_shop_dbt_assets"]["config"]["calculate_num"] \ if run_config else DEFAULT_CALCULATE_NUM # 以前のcalculate_numを半分にし、リトライ yield RunRequest(run_key=None ,run_config=RunConfig( ops={ "jaffle_shop_dbt_assets": MyAssetConfig( calculate_num=previous_calculate_num // 2 ) } )) else: ...
完成系は以下の通りです。
from collections.abc import Generator from dagster import run_failure_sensor, RunRequest, SkipReason, RunConfig, RunFailureSensorContext from .assets import dbt_job, MyAssetConfig, DEFAULT_CALCULATE_NUM @run_failure_sensor(monitored_jobs=[dbt_job], request_job=dbt_job) def retry_on_timeout(context: RunFailureSensorContext) -> Generator[RunRequest | SkipReason]: if _is_timeout(context=context): previous_run = context.dagster_run run_config = previous_run.run_config previous_calculate_num = \ run_config["ops"]["jaffle_shop_dbt_assets"]["config"]["calculate_num"] \ if run_config else DEFAULT_CALCULATE_NUM yield RunRequest(run_key=None ,run_config=RunConfig( ops={ "jaffle_shop_dbt_assets": MyAssetConfig( calculate_num=previous_calculate_num // 2 ) } )) else: yield SkipReason("skip retry because this error is not timeout.") def _is_timeout(context: RunFailureSensorContext) -> bool: failure_events = context.get_step_failure_events() for failure_event in failure_events: if "timeout" in str(failure_event.event_specific_data).lower() and failure_event.is_step_failure: return True return False
この仕組みのメリット
- 自動リトライで手間削減
Timeoutが発生するたびに手動でlimit
を変更しデプロイ・実行し直す必要がなくなります。Timeoutになったとしても処理数を減らしつつ勝手に処理が進んでいくので気を取られる回数が確実に減ります。 - 成功後は元の状態に復帰
limit
の変更はSensorからの実行のみに限られ、次回以降はデフォルトの設定値に戻るため、処理数を減らしたことによるパフォーマンス低下を常に強いられることはありません。
考慮すべき点
- リトライ上限の設定
limit
を下げ続けても改善しない場合、問題はデータ量以外の要因かもしれません。その場合は閾値を設け、ダメなら停止して代わりに通知する方が良いでしょう。 - 同時実行管理
リトライがスケジュール実行と重なると同時実行数が増え、パフォーマンス悪化につながる可能性があります。並列数やリソース配分を検討する必要があります。
締め
このように、Dagsterのrun_failure_sensor
は、エラー発生時の動的なパラメータ調整と自動リトライを実現する強力な仕組みです。Sensorの機能を最大限活用し、面倒なところは自動化してツールに任せてしまいましょう。
Sensorは奥が深いので是非使ってみてください!