- はじめに:Dagsterとは🐙
- Dagster Cloudの概要と導入チュートリアル🔧
- Dagsterパイプラインのコンセプト♻
- Dagster Cloudのパイプライン実行環境 🔀
- おわりに:Dagsterのネクストステップ🔜
Data Engineeringチームの岩崎です。 今年の9月よりRaksulに業務委託として参画させていただいております。普段は様々な企業様にてETLパイプラインの構築やシステム設計などでお仕事させて頂いております。もしよろしければ私の活動についてもご覧ください。
今回は、Dagster Cloudの検証で得た知見を元にDagster Cloudの導入からそのコンセプトの一部を紹介させていただければと思います。
はじめに:Dagsterとは🐙
Dagsterとはデータパイプラインツールの一つです。 Raksulでは取り扱う膨大なデータの処理や分析に日々改善を続けており、Dagsterもその一環としてデータ分析処理改善のため導入検証を進めています。 今回は、Dagsterのマネージドサービスである Dagster Cloudを中心にその設定や導入と、コンセプトについて簡単に紹介します。
データパイプラインにはすでに有名どころとしてAirflowがございますが、DagsterはAirflowを機能的に乗り越えるべく、DAGの視認性・可読性の問題、テスト容易性、複雑な依存関係の解消など、現代的な分析業務に即した構成を目指し開発されました。 国内ではまだ導入事例が少ないのですが、海外ではすでにモダンデータスタックの一つとして確固たる地位を得ているため、国内での知名度を上げるためここで取り上げさせていただきました。
Dagster Cloudの概要と導入チュートリアル🔧
Dagster CloudはDagsterのマネージドシステムです。サービスがスタートしたのは2022/8/9とまだ日が浅いですが、Dagster 単体としてはすでに三年以上の開発実績があり、海外では導入実績例も多数・開発コミュニティでの議論も活発で短い期間でリリースが繰り返されています。 DagsterはDockerやk8s、AWSのECSなどといった形でも動作させることができますが、ここでは、以降ECSで動作させることを想定して説明していきます。
Dagster Cloudのシステム概要
チュートリアルの前に、Dagsterのシステム概要について簡単に触れておきたいと思います。 左側Customer EnvironmentではAgentがDagsterのシステムを統括しており、一方でUser Code(Job)はデータパイプライン処理を指していて、外部のリソースに対してアクセスやデータ処理を行います。Dagster Agent、User Code(Job)はそれぞれDocker Imageの中に必要なコードとモジュールを導入した上で提供する形となっております。
右側Dagster CloudではAgent APIがCustomer Environment側のAgentとの通信を担っています。運用に必要なWebUIやオーケストレーション、実行スケジューリングやジョブ実行履歴などのメタデータの構成・蓄積はマネージドで提供されており、構成管理を意識する必要はございません。
導入チュートリアル
Dagster Cloudで最初のパイプライン実行までの基本的な流れについて、設定の順に従って紹介します。
Create a Dagster Cloud account and organization
Dagster Cloudへの登録は、googleアカウント・githubアカウントなどで可能です。登録すると、まずOrganization名を決める必要があり、以降も登場するDeploymentに対応する名称となります。
Select a deployment type
Dagster Cloud構成の提供形態はHybrid Deployment、Serverless Deploymentの2種類があります。
Hybrid Deploymentとは、Dagsterの実行環境は自前で準備した環境を用い、構成情報の管理はDagster Cloudに任せる提供形態のことです。自環境として利用できるのは、現在k8s、ECS、Docker、localの選択肢がございます。 一方で、Serverless DeploymentとはDagsterの構成環境を全てDagster Cloud内に構築する方式で、ユーザが準備するのはデータを保全するストレージのみになります。
ここではHybrid DeploymentをAWS ECS環境で構築した場合を中心に紹介していきたいと思います。
なお、Dagster Cloudの利用コストについてはこちらにも記載されていますが、以下の図の通りになります(※登録開始から30日以内は料金がかかりません)。 パイプライン処理を実行している時間(per minute)毎に$0.03(Hybrid)または$0.04(Serverless)かかる従量課金制となっており、環境がスケールするにつれてコストが割安になっていく方式です。
エンタープライズプランについては公表されておりませんが、一般的なSaasサービスのエンタープライズ価格相当と考えて問題ありません。エンタープライズプランだとDeployment数を増やせたり、認証認可に任意のSAML方式を利用したりなどできることの制限が緩和されます。
Deploy Your Code
自環境において、まずはDagster AgentをECSとして立ち上げます。ちなみにDagster Agentは公式のDocker Imageも使用可能です。 また、ここから先はAWSで環境構築する場合Cloud Formationを用いて自動的に構築することも可能です。 Cloud Formationの手順は公式にも掲載されているためそちらをご覧いただくとして、ここではCloud Formationの設定内容を簡単に説明していきたいと思います。
ネットワーク構成
Cloud Formationを使用してネットワーク構成を組む場合、自動的にVPCの構築、Route53、Service Discoveryなどが設定されます。
Dagster Agent(ECS Service)
Dagsterではパイプライン実行環境はDockerで提供します。Dagster AgentとDagster Task(パイプライン実行環境)はECS Serviceとして常時起動した状態となります。
Dagster Cloud内で通信に必要なAgentのTokenが提供されており、そのTokenをDagsterのDocker Agentに環境変数として適用すると、自環境のDagster AgentとDagster Cloudの通信が始まります。
上記に挙げた構成は、全てDagster Agentが持つdagster.yaml内にも記載されています。
※ branch_deployments: trueにしなければBranch Deploymentできないため注意してください。
instance_class: module: dagster_cloud class: DagsterCloudAgentInstance dagster_cloud_api: url: https://<ORGANIZATION名>.agent.dagster.cloud agent_token: agent:<ORGANIZATION名>:xxxxxxxxxxxxxxxxxxxxxx deployment: <ORGANIZATION名> branch_deployments: true user_code_launcher: module: dagster_cloud.workspace.ecs class: EcsUserCodeLauncher config: cluster: <ECS Cluster名> subnets: [<subnet ID>,.....] service_discovery_namespace_id: <Namespace名> execution_role_arn: arn:aws:iam::<Account ID>:role/<Task実行ロール名> task_role_arn: <Taskロール名> log_group: /ecs/<Log Stream名>
Launch a run
環境が構築されるとパイプラインのジョブを実行する準備が整います。 Workspaceから任意のパイプラインを選択し、JobのページでLaunchpad タブをクリックします。 遷移した画面で右下のLaunch Runをクリックしてジョブを実行します。
Create a Branch Deployment
後ほど詳細に説明しますが、Deploymentの種類にはProduction DeploymentとBranch Deploymentがあり、ここまでの作業はProduction Deploymentでの説明でした。しかし、一連の作業を開発環境で検証したい場合も当然あると思います。そのためにBranch Deploymentが用意されています。
このステップは実際の設定作業でもスキップ可能であり、後でいつでもセットアップが可能です。
Dagsterパイプラインのコンセプト♻
DagsterのパイプラインはPythonコードで記述します。パイプラインの基本的な構成要素はそれに対応したUIと機能が提供されており、以下の図はパイプラインの持つ各機能と、それら機能をディレクトリ単位で分けた構成の一例です。
ここでは、特に中心的な機能 Repository・Assets・Jobs・Graphs・Opsを簡単に紹介します。
なお、少し古めの記事ですと似たような機能が違う表現で紹介されていたりしますが、現在こちらで紹介した機能が最新版となっています。 それぞれに対応する機能を実装する際にはデコレータを用います。また、他にもいくつか紹介し切れない機能もございますが、詳細はドキュメントをご確認ください。
Repository
Repositoryとは、一つのパイプラインを構成する要素を定義する単位のことです。これから紹介するAssets、Jobs、Graphsなどの構成をRepositoryの中に指定することで一つのパイプラインとして表現します。UI上からはWorkspaceなどからパイプラインとそれぞれの構成要素を確認できます。
assets
アセットとは、Dagsterの外部にあるデータ資産を指します。Dagster上では一つのデータの塊の単位をアセットとして管理できます。
例えばSQLクエリ発行による結果をアセットとして保持したり、データ内容を確認したり、データ処理時間をグラフで確認したり等、視覚的・一覧的な構成管理に役立ちます。アセットには上に挙げたようなDBのテーブルや、機械学習用のデータモデル、Slackのチャンネルなど様々な情報が対象になります。
ちなみに、Dagsterでアセットだけ最新の情報を取得したい場合があると思います。その時にはMaterializeという操作でリフレッシュ可能です。
op
OpはDagsterのパイプラインの処理の一つの単位です。以前まではsolidと呼ばれる単位で記述されていましたがバージョン0.12以降から変わりました。Dagster パイプラインのUIでは一つの処理のユニットとして表現され、定義内容や設定情報、処理時間、入力、出力情報など様々な情報を確認することができます。
job
ジョブはパイプラインのジョブの実行起点です。opで定義した処理をjobとしてあらためて定義すると、パイプラインからジョブ実行できるようになります。 実行の際にはLaunch a runの項目で説明したようにLaunch Padから指定します。 また、今回は紹介しませんがユニットテストやlocal環境でのテストにも対応しています。
sensor
センサーとは、処理による状態変化を監視するトリガー設定のことです。 ジョブ実行完了が非同期であったり、処理の返却値以外に次の処理に遷移する実行契機を設けたい場合など、任意で指定した状態変化に基づいた処理を実行できます。例えば以下のような使い方をします。
・s3バケットにファイルがputされたら実行開始する
・他のジョブが特定のアセットをマテリアライズするたびにジョブ実行する
・外部システムがダウンしたときにジョブ実行する
graph
グラフは各opの処理を結合や分岐を定義します。処理の分岐結合や、アセットの分岐結合に用います。
Dagster Cloudのパイプライン実行環境 🔀
Dagster Cloudが提供する環境におけるDeployment、Location、Workspaceという関係性は少し複雑で、それぞれ以下のような関係性となっております。
図における太枠、(n)と表している箇所は複数個作成可能であることを示しています。各リソースと対応する環境は以下の関係となっています。
Repository : Production Deployment = 1 : 1
Pull Request : Branch Deployment = 1 : 1
Deployment : Locations = 1 : n
Locations : Workspace = 1 : n
Deployment
Githubリポジトリ上のコードをDocker ImageにまとめてDagster Cloud上とAWS環境にデプロイすることを指します。 Dagster Cloudでは大きくProduction DeploymentとBranch Deploymentが存在し、Production DeploymentはGithub Repositoryと1 : 1で対応しております。Production Deploymentできる数を増やしたい場合は、Enterprise Planが必要になります。
Github Actionsでproductionまたはbranch deployが提供されており、Dagster CloudではGithub Actionsでの提供方式が一般的です。Github Actionsを使わない場合、デプロイはCLI dagster-cloudコマンドラインから実行します。
Github Actionsで用意されているアクションとしてはmaster/mainブランチに対するpushをトリガーにdeployする方式と、develop branchのPull Requestに対する変更操作をトリガーにdeployする方式がございます。
2022/12/02現在、githubで記載されている設定情報のSetup Secretsは少し誤っていて、ORGANIZATION_IDの設定にはORGANIZATION IDがpied-piperだったらhttps://dagster.cloud/pied-piperもしくは https://pied-piper.dagster.cloudという記載となっていますが、実際には以下の設定となっております。
名前 | 設定内容 |
---|---|
DAGSTER_CLOUD_URL | https://dagster.cloud/<ORGANIZATION ID> の形で設定 |
ORGANIZATION_ID | https://<ORGANIZATION ID> .dagster.cloud の形で設定 |
また、提供されている方式ではクレデンシャル情報の設定を示していたりしますが、最近ではOpenID Connectによるクレデンシャル不要な方式も提供されているのでそちらの方がより安全にアクセス可能と考えます。
また、Branch Deployment環境の話ですが、以下の機能は制限され使用できません。
・Schedule
・Sensors
・Backfills(部分的にアセットを読み込ませてデータを補完する機能)
Locations
Locationsとは、一つのDocker Imageで構成される環境を指します。Dagsterの実行環境はDockerで構成されますが、LocationsをDeploymentの中に複数作成することが可能です。設定の仕方は以下を参考にしてください。また、dagster_cloud.yaml内で環境変数を設定することも可能です。
location_name: <任意のLocation名> image: <ACCOUNT ID>.dkr.ecr.ap-northeast-1.amazonaws.com/<リポジトリ名> code_source: package_name: <パッケージのディレクトリ> git: commit_hash: <HASH> url: <Github_URL>
なお、今現在Dagster CloudではLocations毎で別々のTask Roleを設定することができず、Dagster Agentが持つTask定義ファイル一つで全ての同一のTask定義を用いていますが、私がサポートにFeature Requestを投げて近日中にLocationごとのTask定義を適用できるようにご対応いただけるとのことでしたので今しばらくお待ちください。
Workspace
Workspaceとは一つのパイプラインの構成単位を指します。Dagster CloudのLocationsの中にWorkspaceを複数作ることができます。 Workspaceを読み込む際には、workspace.yamlへの記述が必要になります。
Workspaceは複数指定することが可能で、指定した数のパイプラインを作成することができます。Workspaceの指定の仕方として、他にもコード単位でも指定可能です。
load_from: - python_package: <プロジェクト名>
おわりに:Dagsterのネクストステップ🔜
いかがでしたでしょうか。どんなツールでも導入コストはございますが、Dagsterは一度理解すれば直観的にその内容を理解しやすく、概念も洗練されたサービスとなっております。今後、より詳細に概念を説明する記事も執筆予定です。本記事が理解のためのコストを下げるための一助となれば幸いです。