バックエンドエンジニアの渡邉です。Dagster Pipesを使ってRustバイナリ(CLI)を実行する試みを記事にしました。
この記事は ノバセル Advent Calender 1日目です。
この記事はこんな方向けです。
- 日頃からDagsterを使ってる
- Rustが好き
- 必ずしもTechブログに実用性を求めない
これから紹介するコードは、Dagsterのexperimentalな機能に依存してるのでproductionで使わないでください。
コードのディレクトリ構成
まず、ディレクトリ構成を紹介します。
dagster_dir以下がDagsterプロジェクトです。Dagsterのコードは日頃使っている方は分かると思うので、ポイントとなるファイル以外省略します。おおよそここにあるような構成となっています。
ferris_says以下がRustバイナリのプロジェクトです。
root_dir/ ├── dagster_dir/ │ ├── assets/ │ ⋮ ├── ferris.py │ ⋮ │ └── ferris_says/ ├── fake_temp/ │ ├── context.json │ └── message.json ├── src/ │ ├── main.rs │ └── pipes.rs ├── target │ ⋮ │ └── Cargo.toml
DagsterとRustバイナリの連携メカニズム
コードの詳細を説明する前にDagster PipesとRustバイナリ(CLI)が、どのように協調して動作するか説明します。
上記 assets/ferris.py
から target
ディレクトリに生成されるバイナリをsubprocessとして実行します。しかし、これだけならDagster Pipesは必要ありません。Dagster Pipesを使うことで以下を実現できます。
- RustバイナリからDagster UIの実行ログに書き込む
- Rustバイナリで生成したアセットのメタデータをDagsterに送信できる
- 生成したアセットをRustバイナリでチェックし、その結果をDagsterに送信できる
これらの連携は、以下の仕組みで実現します。
DagsterからRustバイナリへのデータ送信
tempファイルを使ってDagster Jobの実行ContextをRustバイナリに渡します。ContextにはRustバイナリが生成するアセットキー(Dagster上でアセットを一意に識別)が含まれます。Rustバイナリは、Contextから読み取ったアセットキーを、Dagsterへのログ書き込みやメタデータ送信に使います。詳細は後述します。
sequenceDiagram participant D as Dagster participant T as tempファイル(Context) participant E as 環境変数 participant R as Rustバイナリ D->>T: データを書き込む D->>E: tempファイルのパスをエンコードして格納 R->>E: 環境変数の値を読み取る R->>R: 環境変数の値をデコード R->>T: デコードしたパスを使ってtempファイルにアクセス R->>R: tempファイルのアセットキーを読み込む
tempファイル(Context)には以下のような内容が記載されます。
{ "asset_keys": [ "ferris" ] }
RustバイナリからDagsterへのデータ送信
データ送信の前に、tempファイルパスをDagsterからRustバイナリに連携します。ただし、ここで連携するtempファイルは上記Contextとは別のファイルで、Messageと呼ばれています。RustバイナリはMessageに書き込みを行い、DagsterはMessageからデータを読み込み、自身のDBに保存し、UIに表示します。
sequenceDiagram participant D as Dagster participant T as tempファイル(Message) participant E as 環境変数 participant R as Rustバイナリ D->>T: tempファイルを作成 D->>E: tempファイルのパスをエンコードして格納 R->>E: 環境変数の値を読み取る R->>R: 環境変数の値をデコード R->>T: デコードしたパスを使ってtempファイルにデータを書き込む D->>T: データを読み込む D->>D: DBに保存 D->>D: UIに表示
tempファイル(Message)には以下のような内容が記載されます。詳細は後述します。
{"__dagster_pipes_version":"0.1","method":"Opened","params":null} {"__dagster_pipes_version":"0.1","method":"Log","params":{"message":"ferris will now speak","level":"info"}} {"__dagster_pipes_version":"0.1","method":"Log","params":{"message":"ferris spoke","level":"info"}} {"__dagster_pipes_version":"0.1","method":"ReportAssetCheck","params":{"passed":true,"severity":"Error","check_name":"ferris_should_say_something","asset_key":"ferris","metadata":{"what_did_you_say":{"raw_value":true,"type":"Bool"}}}} {"__dagster_pipes_version":"0.1","method":"ReportAssetCheck","params":{"asset_key":"ferris","passed":true,"check_name":"ferris_should_speak_3_words","severity":"Warn","metadata":{"number_of_words_spoken_by_ferris":{"raw_value":3,"type":"Int"}}}} {"__dagster_pipes_version":"0.1","method":"ReportAssetMaterialization","params":{"metadata":{"ferris_said":{"raw_value":"what you want","type":"Text"}},"data_version":"1731819806","asset_key":"ferris"}} {"__dagster_pipes_version":"0.1","method":"Closed","params":null}
この記事で紹介するコードは、DagsterとRustバイナリの連携を、tempファイルを使って実装しています。その他の実装については以下ドキュメントを参照してください。
https://docs.dagster.io/concepts/dagster-pipes#usage
次節からはコードを説明します。
Dagsterのコード
以下環境でDagsterを実行します。
- Python: 3.12.5
- Dagster: 1.8.4
以下がRustバイナリを呼び出すDagsterのアセットです。
from dagster import ( AssetCheckSpec, AssetExecutionContext, MaterializeResult, PipesSubprocessClient, asset, ) # ...rust_binary_pathの実装と依存パッケージは省略 @asset( check_specs=[ AssetCheckSpec(name="ferris_should_say_something", asset="ferris"), AssetCheckSpec(name="ferris_should_speak_3_words", asset="ferris"), ], ) def ferris( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: cmd = [ rust_binary_path(), # Rustバイナリのフルパスを取得 "'Say, say, say'", ] return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()
rust_binary_path
で取得したパスにあるcliを pipes_subprocess_client
で実行します。
assetデコレータの check_specs
でRustバイナリで実行するAssetCheckを指定しています。
pipes_subprocess_client
はDagsterのリソースとして以下のように定義されています。
# ...省略 "pipes_subprocess_client": PipesSubprocessClient( context_injector=PipesTempFileContextInjector(), message_reader=PipesTempFileMessageReader(), ), # ...省略
既に記載した通り、ContextとMessageをtempファイルで実装していることが分かります。
Rustバイナリのコード
Rust 1.82 (edition: 2021)を使用します。
Rustバイナリのディレクトリ構成を再掲します。
ferris_says/ ├── fake_temp/ # Dagsterを介さず`cargo run`で実行するための偽temp dir │ ├── context.json # 上述のtempファイル(Context)の偽物 │ └── message.json # 上述のtempファイル(Message)の偽物 ├── src/ │ ├── main.rs │ └── pipes.rs ├── target │ ⋮ │ └── Cargo.toml
作ったバイナリは、clapで作ったCLIです。Dagsterとの連携を説明するために作成したため、実践的ではありません。実行するとstdoutにferrisが表示されます。
$ cargo run -- "what you want" Compiling ferris_says v0.1.0 (省略) Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.00s Running `target/debug/ferris_says 'what you want'` ferris_says start args: Args { words: "what you want" } context_path: "fake_temp/context.json" context_data: PipesContextData { asset_keys: ["ferris"] } messages_path: "fake_temp/messages.json" _______________ < what you want > --------------- \ \ _~^~^~_ \) / o o \ (/ '_ - _' / '-----' \ ferris_says end
以下がCargo.tomlです。
[package] name = "ferris_says" version = "0.1.0" edition = "2021" [dependencies] anyhow = "1.0.93" base64 = "0.22.1" clap = { version = "3.2.25", features = ["derive"] } ferris-says = "0.3.1" flate2 = "1.0.34" serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132"
では、具体どのように連携しているのか解説していきます。
Dagsterとの連携
以下は src/main.rs
です。main
から呼ばれるrun
関数で生成している context
オブジェクトを通じてDagsterと連携します。
mod pipes; // 記事の後半にコードを記載 use clap::Parser; use ferris_says::say; use pipes::{MetadataType, PipesAssetCheckSeverity}; use serde_json::json; use std::{ io::{stdout, BufWriter}, time::{SystemTime, UNIX_EPOCH}, }; use crate::pipes::open_dagster_pipes; #[derive(Parser, Debug)] struct Args { words: String, } fn run(args: Args) -> anyhow::Result<()> { println!("args: {:?}", args); let mut context = open_dagster_pipes(); context.log("ferris will now speak", "info"); let mut writer = BufWriter::new(stdout()); say(&args.words, args.words.len(), &mut writer)?; context.log("ferris spoke", "info"); let what_did_you_say = !args.words.is_empty(); context.report_asset_check( "ferris_should_say_something", what_did_you_say, PipesAssetCheckSeverity::Error, json!({ "what_did_you_say": {"raw_value": what_did_you_say, "type": MetadataType::Bool}, }) .into(), None, ); let number_of_words_spoken_by_ferris = args.words.split_whitespace().count(); context.report_asset_check( "ferris_should_speak_3_words", 3 <= number_of_words_spoken_by_ferris, PipesAssetCheckSeverity::Warn, json!({ "number_of_words_spoken_by_ferris": {"raw_value": number_of_words_spoken_by_ferris, "type": MetadataType::Int}, }) .into(), None, ); context.report_asset_materialization( json!({ "ferris_said": {"raw_value": args.words, "type": MetadataType::Text}, }), Some(data_version()), None, ); Ok(()) } fn data_version() -> String { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_else(|e| { panic!("clock may have gone backwards: {:?}", e); }) .as_secs() .to_string() } fn main() { // Dagster Job実行ログのstdoutタブに表示される println!("ferris_says start"); let args = Args::parse(); if let Err(e) = run(args) { // Dagster Job実行ログのstderrタブに表示される eprintln!("{e}"); } println!("ferris_says end"); }
run
関数で、context.open_dagster_pipes
メソッドを呼んでいます。このメソッドを呼ぶことでDagsterとの連携に必要なPipesContext構造体のインスタンス( context
オブジェクト)を取得します。PipesContext構造体の実装は後述します。
let mut context = open_dagster_pipes();
取得したcontextオブジェクトの log
メソッドを呼んでいます。引数がDagster Job実行ログに表示されます。
context.log("ferris will now speak", "info");
以下は log
メソッドのシグネチャです。
pub fn log(&mut self, message: &str, level: &str)
同様に report_asset_check
メソッドを呼んでいます。引数で渡したメタデータとともに、Dagster Job実行ログに表示されます。アセット詳細ページ > Checksタブにあるアセットチェック履歴にも表示されます。
let what_did_you_say = !args.words.is_empty(); context.report_asset_check( "ferris_should_say_something", what_did_you_say, PipesAssetCheckSeverity::Error, json!({ "what_did_you_say": {"raw_value": what_did_you_say, "type": MetadataType::Bool}, }) .into(), None, ); let number_of_words_spoken_by_ferris = args.words.split_whitespace().count(); context.report_asset_check( "ferris_should_speak_3_words", 3 <= number_of_words_spoken_by_ferris, PipesAssetCheckSeverity::Warn, json!({ "number_of_words_spoken_by_ferris": {"raw_value": number_of_words_spoken_by_ferris, "type": MetadataType::Int}, }) .into(), None, );
以下は report_asset_check
メソッドのシグネチャです。
pub fn report_asset_check( &mut self, check_name: &str, passed: bool, severity: PipesAssetCheckSeverity, metadata: Option<serde_json::Value>, asset_key: Option<String>, )
PipesAssetCheckSeverity
にWarningかErrorを設定できます。Warnigを設定した場合、Dagster上に警告は表示されますが、アセットのマテリアライズは成功します。
同様に report_asset_materialization
メソッドを呼んでいます。引数で渡したメタデータとともに、Dagster Job実行ログに表示されます。
context.report_asset_materialization( json!({ "ferris_said": {"raw_value": args.words, "type": MetadataType::Text}, }), data_version(), None, );
以下は report_asset_materialization
メソッドのシグネチャです。
pub fn report_asset_materialization( &mut self, metadata: serde_json::Value, data_version: String, asset_key: Option<String>, )
data_version
は、Dagsterがアセットキー毎に管理しているバージョンです。リネージ(lineage)の後続アセットのマテリアライズが必要か否かを、このバージョンの変更有無で判定しています。つまり、このメソッドの呼び出し内容をDagsterが読み込み、保存することで、アセット作成処理がコミットされます。
ここまでcontextオブジェクトを通じてDagsterとどう連携するか説明しました。
tempファイル(Message)を再掲します。各行がcontextオブジェクトのメソッドに対応していることが分かります。
{"__dagster_pipes_version":"0.1","method":"Opened","params":null} {"__dagster_pipes_version":"0.1","method":"Log","params":{"message":"ferris will now speak","level":"info"}} {"__dagster_pipes_version":"0.1","method":"Log","params":{"message":"ferris spoke","level":"info"}} {"__dagster_pipes_version":"0.1","method":"ReportAssetCheck","params":{"passed":true,"severity":"Error","check_name":"ferris_should_say_something","asset_key":"ferris","metadata":{"what_did_you_say":{"raw_value":true,"type":"Bool"}}}} {"__dagster_pipes_version":"0.1","method":"ReportAssetCheck","params":{"asset_key":"ferris","passed":true,"check_name":"ferris_should_speak_3_words","severity":"Warn","metadata":{"number_of_words_spoken_by_ferris":{"raw_value":3,"type":"Int"}}}} {"__dagster_pipes_version":"0.1","method":"ReportAssetMaterialization","params":{"metadata":{"ferris_said":{"raw_value":"what you want","type":"Text"}},"data_version":"1731819806","asset_key":"ferris"}} {"__dagster_pipes_version":"0.1","method":"Closed","params":null}
次節にcontextオブジェクト(PipesContext構造体のインスタンス)を実装したpipesモジュールを記載します。
pipesモジュール
src/pipes.rs
にcontextオブジェクト及び、それに関連するコードを書きました。(Rust好きなはずなので、特に解説は不要と思います)
use base64::engine::general_purpose::STANDARD; use base64::Engine; use flate2::read::ZlibDecoder; use serde::{Deserialize, Serialize}; use serde_json::json; use std::fmt::Display; use std::fs::OpenOptions; use std::io::{Read, Write}; use std::{collections::HashMap, env}; pub fn open_dagster_pipes() -> PipesContext { const FAKE_TEMP_DIR: &str = "fake_temp/"; // DAGSTER_PIPES_CONTEXTはDagsterが設定する環境変数 let context_path = env::var("DAGSTER_PIPES_CONTEXT") .map_err(anyhow::Error::from) .and_then(|path| decode_param(&path)) .map(|path| path.path) // DAGSTER_PIPES_CONTEXTがない場合(バイナリを直接実行した場合)、fake/context.jsonを使う .unwrap_or_else(|_| format!("{FAKE_TEMP_DIR}context.json")); println!("context_path: {:?}", context_path); let context_file = OpenOptions::new() .read(true) .open(context_path) .unwrap_or_else(|err| { panic!("could not open or create context file: {}", err); }); let context_data: PipesContextData = serde_json::from_reader(context_file) .unwrap_or_else(|err| panic!("could not serialize from context file: {}", err)); println!("context_data: {:?}", context_data); let messages_path = env::var("DAGSTER_PIPES_MESSAGES") .map_err(anyhow::Error::from) .and_then(|path| decode_param(&path)) .map(|path| path.path) .unwrap_or_else(|_| format!("{FAKE_TEMP_DIR}messages.json")); // contextと同様 println!("messages_path: {:?}", messages_path); let mut pipes_context = PipesContext { data: context_data, writer: PipesTempFileMessageWriter { path: messages_path.to_string(), }, }; pipes_context.report_open(); pipes_context } // Dagster Pipesがエンコードしたパラメータをデコードする // https://github.com/dagster-io/dagster/blob/1.8.4/python_modules/dagster-pipes/dagster_pipes/__init__.py#L367 fn decode_param(encoded: &str) -> anyhow::Result<PipesParam> { let compressed = STANDARD.decode(encoded)?; let mut decoder = ZlibDecoder::new(&compressed[..]); let mut json_str = String::new(); decoder.read_to_string(&mut json_str)?; let value = serde_json::from_str(&json_str)?; Ok(value) } #[derive(Deserialize)] struct PipesParam { path: String, } pub struct PipesContext { data: PipesContextData, writer: PipesTempFileMessageWriter, } impl PipesContext { pub fn log(&mut self, message: &str, level: &str) { let params: HashMap<String, serde_json::Value> = HashMap::from([ ("message".to_string(), json!(message)), ("level".to_string(), json!(level)), ]); let msg = PipesMessage::new(Method::Log, params.into()); self.writer.write_message(msg); } pub fn report_open(&mut self) { let msg = PipesMessage::new(Method::Opened, None); self.writer.write_message(msg); } pub fn report_close(&mut self) { let msg = PipesMessage::new(Method::Closed, None); self.writer.write_message(msg); } pub fn report_asset_check( &mut self, check_name: &str, passed: bool, severity: PipesAssetCheckSeverity, metadata: Option<serde_json::Value>, asset_key: Option<String>, ) { let asset_key = self.resolve_asset_key(asset_key); let params: HashMap<String, serde_json::Value> = HashMap::from([ ("asset_key".to_string(), json!(asset_key)), ("check_name".to_string(), json!(check_name)), ("passed".to_string(), json!(passed)), ("metadata".to_string(), metadata.unwrap_or(json!(null))), ("severity".to_string(), json!(severity)), ]); let msg = PipesMessage::new(Method::ReportAssetCheck, params.into()); self.writer.write_message(msg); } pub fn report_asset_materialization( &mut self, metadata: serde_json::Value, data_version: String, asset_key: Option<String>, ) { let asset_key = self.resolve_asset_key(asset_key); let params: HashMap<String, serde_json::Value> = HashMap::from([ ("asset_key".to_string(), json!(asset_key)), ("metadata".to_string(), metadata), ("data_version".to_string(), json!(data_version)), ]); let msg = PipesMessage::new(Method::ReportAssetMaterialization, params.into()); self.writer.write_message(msg); } fn resolve_asset_key(&mut self, asset_key: Option<String>) -> String { // asset_keyが明示的に渡されない場合、"asset_keys"先頭のアセットキーが必要であると見做す asset_key.unwrap_or_else(|| self.data.asset_keys.first().cloned().unwrap()) } } impl Drop for PipesContext { fn drop(&mut self) { self.report_close(); } } #[derive(Deserialize, Debug)] struct PipesContextData { asset_keys: Vec<String>, } #[derive(Serialize)] struct PipesMessage { __dagster_pipes_version: String, method: Method, params: Option<HashMap<String, serde_json::Value>>, } impl PipesMessage { fn new(method: Method, params: Option<HashMap<String, serde_json::Value>>) -> Self { Self { __dagster_pipes_version: "0.1".to_string(), method, params, } } } // Dagster Pipesのメッセージ種別を列挙型で実装 // https://github.com/dagster-io/dagster/blob/1.8.4/python_modules/dagster-pipes/dagster_pipes/__init__.py#L58 // report_custom_messageはサポートしない #[derive(Serialize)] enum Method { Opened, Closed, Log, ReportAssetMaterialization, ReportAssetCheck, } impl Display for Method { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match *self { Self::Opened => write!(f, "opened"), Self::Closed => write!(f, "closed"), Self::Log => write!(f, "log"), Self::ReportAssetMaterialization => write!(f, "report_asset_materialization"), Self::ReportAssetCheck => write!(f, "report_asset_check"), } } } struct PipesTempFileMessageWriter { path: String, } impl PipesTempFileMessageWriter { fn write_message(&mut self, message: PipesMessage) { let mut file = OpenOptions::new() .create(true) .append(true) .open(&self.path) .unwrap_or_else(|err| { panic!("could not open messages file: {}", err); }); writeln!( file, "{}", serde_json::to_string(&message).unwrap_or_else(|err| { panic!("could not serialize message: {}", err); }) ) .unwrap_or_else(|err| { panic!("could not write to messages file: {}", err); }); } } // Dagster PipesのAssetCheckSeverityを列挙型で実装 // https://github.com/dagster-io/dagster/blob/1.8.4/python_modules/dagster-pipes/dagster_pipes/__init__.py#L138 #[derive(Serialize, Debug, Clone)] pub enum PipesAssetCheckSeverity { Warn, Error, } impl Display for PipesAssetCheckSeverity { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { Self::Warn => write!(f, "WARN"), Self::Error => write!(f, "ERROR"), } } } // Dagster PipesのMetadataTypeを列挙型で実装 // https://github.com/dagster-io/dagster/blob/1.8.4/python_modules/dagster-pipes/dagster_pipes/__init__.py#L151 // 上記MetadataTypeの値(MetadataValue)が、具体どのようなものかは、以下を参照 // https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/definitions/metadata/metadata_value.py#L40 #[derive(Serialize, Debug, Clone)] pub enum MetadataType { Text, Int, Bool, } impl Display for MetadataType { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { Self::Text => write!(f, "text"), Self::Int => write!(f, "int"), Self::Bool => write!(f, "bool"), } } }
上記コードは以下リポジトリの内容を参考に作成しました。
https://github.com/marijncv/pg-pipes/blob/main/pg_pipes/src/pipes.rs
まとめ
DagsterからRustバイナリを実行し、連携するコードを紹介しました。これは実験的な機能に依存しているため、現時点では本番環境での使用は推奨されません。しかし、Rustの高速な処理能力を活かせる場面があれば、将来的に有用な選択肢となる可能性があります。
以上で記事を終わります。ご覧いただき、ありがとうございました。