RAKSUL TechBlog

ラクスルグループのエンジニアが技術トピックを発信するブログです

Dagster Pipes で Rust を動かす

 


バックエンドエンジニアの渡邉です。Dagster Pipesを使ってRustバイナリ(CLI)を実行する試みを記事にしました。

この記事は ノバセル Advent Calender 1日目です。

この記事はこんな方向けです。

  • 日頃からDagsterを使ってる
  • Rustが好き
  • 必ずしもTechブログに実用性を求めない

これから紹介するコードは、Dagsterのexperimentalな機能に依存してるのでproductionで使わないでください。

コードのディレクトリ構成

まず、ディレクトリ構成を紹介します。

dagster_dir以下がDagsterプロジェクトです。Dagsterのコードは日頃使っている方は分かると思うので、ポイントとなるファイル以外省略します。おおよそここにあるような構成となっています。

ferris_says以下がRustバイナリのプロジェクトです。

root_dir/
├── dagster_dir/
│   ├── assets/
│   ⋮    ├── ferris.py
│        ⋮
│
└── ferris_says/
    ├── fake_temp/
    │   ├── context.json
    │   └── message.json
    ├── src/
    │   ├── main.rs
    │   └── pipes.rs
    ├── target
    │   ⋮
    │
    └── Cargo.toml

DagsterとRustバイナリの連携メカニズム

コードの詳細を説明する前にDagster PipesとRustバイナリ(CLI)が、どのように協調して動作するか説明します。

上記 assets/ferris.py から target ディレクトリに生成されるバイナリをsubprocessとして実行します。しかし、これだけならDagster Pipesは必要ありません。Dagster Pipesを使うことで以下を実現できます。

  • RustバイナリからDagster UIの実行ログに書き込む
  • Rustバイナリで生成したアセットのメタデータをDagsterに送信できる
  • 生成したアセットをRustバイナリでチェックし、その結果をDagsterに送信できる

これらの連携は、以下の仕組みで実現します。

DagsterからRustバイナリへのデータ送信

tempファイルを使ってDagster Jobの実行ContextをRustバイナリに渡します。ContextにはRustバイナリが生成するアセットキー(Dagster上でアセットを一意に識別)が含まれます。Rustバイナリは、Contextから読み取ったアセットキーを、Dagsterへのログ書き込みやメタデータ送信に使います。詳細は後述します。

sequenceDiagram
    participant D as Dagster
    participant T as tempファイル(Context)
    participant E as 環境変数
    participant R as Rustバイナリ

    D->>T: データを書き込む
    D->>E: tempファイルのパスをエンコードして格納
    R->>E: 環境変数の値を読み取る
    R->>R: 環境変数の値をデコード
    R->>T: デコードしたパスを使ってtempファイルにアクセス
    R->>R: tempファイルのアセットキーを読み込む

tempファイル(Context)には以下のような内容が記載されます。

{
  "asset_keys": [
    "ferris"
  ]
}

RustバイナリからDagsterへのデータ送信

データ送信の前に、tempファイルパスをDagsterからRustバイナリに連携します。ただし、ここで連携するtempファイルは上記Contextとは別のファイルで、Messageと呼ばれています。RustバイナリはMessageに書き込みを行い、DagsterはMessageからデータを読み込み、自身のDBに保存し、UIに表示します。

sequenceDiagram
    participant D as Dagster
    participant T as tempファイル(Message)
    participant E as 環境変数
    participant R as Rustバイナリ

    D->>T: tempファイルを作成
    D->>E: tempファイルのパスをエンコードして格納
    R->>E: 環境変数の値を読み取る
    R->>R: 環境変数の値をデコード
    R->>T: デコードしたパスを使ってtempファイルにデータを書き込む
    D->>T: データを読み込む
    D->>D: DBに保存
    D->>D: UIに表示

tempファイル(Message)には以下のような内容が記載されます。詳細は後述します。

{"__dagster_pipes_version":"0.1","method":"Opened","params":null}
{"__dagster_pipes_version":"0.1","method":"Log","params":{"message":"ferris will now speak","level":"info"}}
{"__dagster_pipes_version":"0.1","method":"Log","params":{"message":"ferris spoke","level":"info"}}
{"__dagster_pipes_version":"0.1","method":"ReportAssetCheck","params":{"passed":true,"severity":"Error","check_name":"ferris_should_say_something","asset_key":"ferris","metadata":{"what_did_you_say":{"raw_value":true,"type":"Bool"}}}}
{"__dagster_pipes_version":"0.1","method":"ReportAssetCheck","params":{"asset_key":"ferris","passed":true,"check_name":"ferris_should_speak_3_words","severity":"Warn","metadata":{"number_of_words_spoken_by_ferris":{"raw_value":3,"type":"Int"}}}}
{"__dagster_pipes_version":"0.1","method":"ReportAssetMaterialization","params":{"metadata":{"ferris_said":{"raw_value":"what you want","type":"Text"}},"data_version":"1731819806","asset_key":"ferris"}}
{"__dagster_pipes_version":"0.1","method":"Closed","params":null}

この記事で紹介するコードは、DagsterとRustバイナリの連携を、tempファイルを使って実装しています。その他の実装については以下ドキュメントを参照してください。

https://docs.dagster.io/concepts/dagster-pipes#usage

次節からはコードを説明します。

Dagsterのコード

以下環境でDagsterを実行します。

  • Python: 3.12.5
  • Dagster: 1.8.4

以下がRustバイナリを呼び出すDagsterのアセットです。

from dagster import (
    AssetCheckSpec,
    AssetExecutionContext,
    MaterializeResult,
    PipesSubprocessClient,
    asset,
)

# ...rust_binary_pathの実装と依存パッケージは省略

@asset(
    check_specs=[
        AssetCheckSpec(name="ferris_should_say_something", asset="ferris"),
        AssetCheckSpec(name="ferris_should_speak_3_words", asset="ferris"),
    ],
)
def ferris(
    context: AssetExecutionContext,
    pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
    cmd = [
        rust_binary_path(),  # Rustバイナリのフルパスを取得
        "'Say, say, say'",
    ]
    return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()

rust_binary_path で取得したパスにあるcliを pipes_subprocess_client で実行します。

assetデコレータの check_specs でRustバイナリで実行するAssetCheckを指定しています。

pipes_subprocess_client はDagsterのリソースとして以下のように定義されています。

# ...省略
"pipes_subprocess_client": PipesSubprocessClient(
    context_injector=PipesTempFileContextInjector(),
    message_reader=PipesTempFileMessageReader(),
),
# ...省略

既に記載した通り、ContextとMessageをtempファイルで実装していることが分かります。

Rustバイナリのコード

Rust 1.82 (edition: 2021)を使用します。

Rustバイナリのディレクトリ構成を再掲します。

ferris_says/
├── fake_temp/       # Dagsterを介さず`cargo run`で実行するための偽temp dir
│   ├── context.json # 上述のtempファイル(Context)の偽物
│   └── message.json # 上述のtempファイル(Message)の偽物
├── src/
│   ├── main.rs
│   └── pipes.rs
├── target
│   ⋮
│
└── Cargo.toml

作ったバイナリは、clapで作ったCLIです。Dagsterとの連携を説明するために作成したため、実践的ではありません。実行するとstdoutにferrisが表示されます。

$ cargo run -- "what you want"
   Compiling ferris_says v0.1.0 (省略)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.00s
     Running `target/debug/ferris_says 'what you want'`
ferris_says start
args: Args { words: "what you want" }
context_path: "fake_temp/context.json"
context_data: PipesContextData { asset_keys: ["ferris"] }
messages_path: "fake_temp/messages.json"
 _______________
< what you want >
 ---------------
        \
         \
            _~^~^~_
        \) /  o o  \ (/
          '_   -   _'
          / '-----' \
ferris_says end

以下がCargo.tomlです。

[package]
name = "ferris_says"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.93"
base64 = "0.22.1"
clap = { version = "3.2.25", features = ["derive"] }
ferris-says = "0.3.1"
flate2 = "1.0.34"
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"

では、具体どのように連携しているのか解説していきます。

Dagsterとの連携

以下は src/main.rs です。main から呼ばれるrun 関数で生成している context オブジェクトを通じてDagsterと連携します。

mod pipes; // 記事の後半にコードを記載

use clap::Parser;
use ferris_says::say;
use pipes::{MetadataType, PipesAssetCheckSeverity};
use serde_json::json;
use std::{
    io::{stdout, BufWriter},
    time::{SystemTime, UNIX_EPOCH},
};

use crate::pipes::open_dagster_pipes;

#[derive(Parser, Debug)]
struct Args {
    words: String,
}

fn run(args: Args) -> anyhow::Result<()> {
    println!("args: {:?}", args);
    let mut context = open_dagster_pipes();

    context.log("ferris will now speak", "info");

    let mut writer = BufWriter::new(stdout());
    say(&args.words, args.words.len(), &mut writer)?;

    context.log("ferris spoke", "info");

    let what_did_you_say = !args.words.is_empty();
    context.report_asset_check(
        "ferris_should_say_something",
        what_did_you_say,
        PipesAssetCheckSeverity::Error,
        json!({
            "what_did_you_say": {"raw_value": what_did_you_say, "type": MetadataType::Bool},
        })
        .into(),
        None,
    );

    let number_of_words_spoken_by_ferris = args.words.split_whitespace().count();
    context.report_asset_check(
        "ferris_should_speak_3_words",
        3 <= number_of_words_spoken_by_ferris,
        PipesAssetCheckSeverity::Warn,
        json!({
            "number_of_words_spoken_by_ferris": {"raw_value": number_of_words_spoken_by_ferris, "type": MetadataType::Int},
        })
        .into(),
        None,
    );

    context.report_asset_materialization(
        json!({
            "ferris_said": {"raw_value": args.words, "type": MetadataType::Text},
        }),
        Some(data_version()),
        None,
    );

    Ok(())
}

fn data_version() -> String {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_else(|e| {
            panic!("clock may have gone backwards: {:?}", e);
        })
        .as_secs()
        .to_string()
}

fn main() {
    // Dagster Job実行ログのstdoutタブに表示される
    println!("ferris_says start");
    let args = Args::parse();
    if let Err(e) = run(args) {
        // Dagster Job実行ログのstderrタブに表示される
        eprintln!("{e}");
    }
    println!("ferris_says end");
}

run 関数で、context.open_dagster_pipes メソッドを呼んでいます。このメソッドを呼ぶことでDagsterとの連携に必要なPipesContext構造体のインスタンス( context オブジェクト)を取得します。PipesContext構造体の実装は後述します。

let mut context = open_dagster_pipes();

取得したcontextオブジェクトの log メソッドを呼んでいます。引数がDagster Job実行ログに表示されます。

 context.log("ferris will now speak", "info");

以下は log メソッドのシグネチャです。

pub fn log(&mut self, message: &str, level: &str)

同様に report_asset_check メソッドを呼んでいます。引数で渡したメタデータとともに、Dagster Job実行ログに表示されます。アセット詳細ページ > Checksタブにあるアセットチェック履歴にも表示されます。

let what_did_you_say = !args.words.is_empty();
context.report_asset_check(
    "ferris_should_say_something",
    what_did_you_say,
    PipesAssetCheckSeverity::Error,
    json!({
        "what_did_you_say": {"raw_value": what_did_you_say, "type": MetadataType::Bool},
    })
    .into(),
    None,
);

let number_of_words_spoken_by_ferris = args.words.split_whitespace().count();
context.report_asset_check(
    "ferris_should_speak_3_words",
    3 <= number_of_words_spoken_by_ferris,
    PipesAssetCheckSeverity::Warn,
    json!({
        "number_of_words_spoken_by_ferris": {"raw_value": number_of_words_spoken_by_ferris, "type": MetadataType::Int},
    })
    .into(),
    None,
);

以下は report_asset_check メソッドのシグネチャです。

pub fn report_asset_check(
    &mut self,
    check_name: &str,
    passed: bool,
    severity: PipesAssetCheckSeverity,
    metadata: Option<serde_json::Value>,
    asset_key: Option<String>,
)

PipesAssetCheckSeverity にWarningかErrorを設定できます。Warnigを設定した場合、Dagster上に警告は表示されますが、アセットのマテリアライズは成功します。

同様に report_asset_materialization メソッドを呼んでいます。引数で渡したメタデータとともに、Dagster Job実行ログに表示されます。

context.report_asset_materialization(
    json!({
        "ferris_said": {"raw_value": args.words, "type": MetadataType::Text},
    }),
    data_version(),
    None,
);

以下は report_asset_materialization メソッドのシグネチャです。

pub fn report_asset_materialization(
    &mut self,
    metadata: serde_json::Value,
    data_version: String,
    asset_key: Option<String>,
)

data_version は、Dagsterがアセットキー毎に管理しているバージョンです。リネージ(lineage)の後続アセットのマテリアライズが必要か否かを、このバージョンの変更有無で判定しています。つまり、このメソッドの呼び出し内容をDagsterが読み込み、保存することで、アセット作成処理がコミットされます。

ここまでcontextオブジェクトを通じてDagsterとどう連携するか説明しました。

tempファイル(Message)を再掲します。各行がcontextオブジェクトのメソッドに対応していることが分かります。

{"__dagster_pipes_version":"0.1","method":"Opened","params":null}
{"__dagster_pipes_version":"0.1","method":"Log","params":{"message":"ferris will now speak","level":"info"}}
{"__dagster_pipes_version":"0.1","method":"Log","params":{"message":"ferris spoke","level":"info"}}
{"__dagster_pipes_version":"0.1","method":"ReportAssetCheck","params":{"passed":true,"severity":"Error","check_name":"ferris_should_say_something","asset_key":"ferris","metadata":{"what_did_you_say":{"raw_value":true,"type":"Bool"}}}}
{"__dagster_pipes_version":"0.1","method":"ReportAssetCheck","params":{"asset_key":"ferris","passed":true,"check_name":"ferris_should_speak_3_words","severity":"Warn","metadata":{"number_of_words_spoken_by_ferris":{"raw_value":3,"type":"Int"}}}}
{"__dagster_pipes_version":"0.1","method":"ReportAssetMaterialization","params":{"metadata":{"ferris_said":{"raw_value":"what you want","type":"Text"}},"data_version":"1731819806","asset_key":"ferris"}}
{"__dagster_pipes_version":"0.1","method":"Closed","params":null}

次節にcontextオブジェクト(PipesContext構造体のインスタンス)を実装したpipesモジュールを記載します。

pipesモジュール

src/pipes.rs にcontextオブジェクト及び、それに関連するコードを書きました。(Rust好きなはずなので、特に解説は不要と思います)

use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use flate2::read::ZlibDecoder;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fmt::Display;
use std::fs::OpenOptions;
use std::io::{Read, Write};
use std::{collections::HashMap, env};

pub fn open_dagster_pipes() -> PipesContext {
    const FAKE_TEMP_DIR: &str = "fake_temp/";

    // DAGSTER_PIPES_CONTEXTはDagsterが設定する環境変数
    let context_path = env::var("DAGSTER_PIPES_CONTEXT")
        .map_err(anyhow::Error::from)
        .and_then(|path| decode_param(&path))
        .map(|path| path.path)
        // DAGSTER_PIPES_CONTEXTがない場合(バイナリを直接実行した場合)、fake/context.jsonを使う
        .unwrap_or_else(|_| format!("{FAKE_TEMP_DIR}context.json"));
    println!("context_path: {:?}", context_path);

    let context_file = OpenOptions::new()
        .read(true)
        .open(context_path)
        .unwrap_or_else(|err| {
            panic!("could not open or create context file: {}", err);
        });

    let context_data: PipesContextData = serde_json::from_reader(context_file)
        .unwrap_or_else(|err| panic!("could not serialize from context file: {}", err));
    println!("context_data: {:?}", context_data);

    let messages_path = env::var("DAGSTER_PIPES_MESSAGES")
        .map_err(anyhow::Error::from)
        .and_then(|path| decode_param(&path))
        .map(|path| path.path)
        .unwrap_or_else(|_| format!("{FAKE_TEMP_DIR}messages.json")); // contextと同様
    println!("messages_path: {:?}", messages_path);

    let mut pipes_context = PipesContext {
        data: context_data,
        writer: PipesTempFileMessageWriter {
            path: messages_path.to_string(),
        },
    };

    pipes_context.report_open();

    pipes_context
}

// Dagster Pipesがエンコードしたパラメータをデコードする
// https://github.com/dagster-io/dagster/blob/1.8.4/python_modules/dagster-pipes/dagster_pipes/__init__.py#L367
fn decode_param(encoded: &str) -> anyhow::Result<PipesParam> {
    let compressed = STANDARD.decode(encoded)?;

    let mut decoder = ZlibDecoder::new(&compressed[..]);
    let mut json_str = String::new();
    decoder.read_to_string(&mut json_str)?;

    let value = serde_json::from_str(&json_str)?;

    Ok(value)
}

#[derive(Deserialize)]
struct PipesParam {
    path: String,
}

pub struct PipesContext {
    data: PipesContextData,
    writer: PipesTempFileMessageWriter,
}

impl PipesContext {
    pub fn log(&mut self, message: &str, level: &str) {
        let params: HashMap<String, serde_json::Value> = HashMap::from([
            ("message".to_string(), json!(message)),
            ("level".to_string(), json!(level)),
        ]);
        let msg = PipesMessage::new(Method::Log, params.into());
        self.writer.write_message(msg);
    }

    pub fn report_open(&mut self) {
        let msg = PipesMessage::new(Method::Opened, None);
        self.writer.write_message(msg);
    }

    pub fn report_close(&mut self) {
        let msg = PipesMessage::new(Method::Closed, None);
        self.writer.write_message(msg);
    }

    pub fn report_asset_check(
        &mut self,
        check_name: &str,
        passed: bool,
        severity: PipesAssetCheckSeverity,
        metadata: Option<serde_json::Value>,
        asset_key: Option<String>,
    ) {
        let asset_key = self.resolve_asset_key(asset_key);

        let params: HashMap<String, serde_json::Value> = HashMap::from([
            ("asset_key".to_string(), json!(asset_key)),
            ("check_name".to_string(), json!(check_name)),
            ("passed".to_string(), json!(passed)),
            ("metadata".to_string(), metadata.unwrap_or(json!(null))),
            ("severity".to_string(), json!(severity)),
        ]);

        let msg = PipesMessage::new(Method::ReportAssetCheck, params.into());

        self.writer.write_message(msg);
    }

    pub fn report_asset_materialization(
        &mut self,
        metadata: serde_json::Value,
        data_version: String,
        asset_key: Option<String>,
    ) {
        let asset_key = self.resolve_asset_key(asset_key);

        let params: HashMap<String, serde_json::Value> = HashMap::from([
            ("asset_key".to_string(), json!(asset_key)),
            ("metadata".to_string(), metadata),
            ("data_version".to_string(), json!(data_version)),
        ]);

        let msg = PipesMessage::new(Method::ReportAssetMaterialization, params.into());

        self.writer.write_message(msg);
    }

    fn resolve_asset_key(&mut self, asset_key: Option<String>) -> String {
        // asset_keyが明示的に渡されない場合、"asset_keys"先頭のアセットキーが必要であると見做す
        asset_key.unwrap_or_else(|| self.data.asset_keys.first().cloned().unwrap())
    }
}

impl Drop for PipesContext {
    fn drop(&mut self) {
        self.report_close();
    }
}

#[derive(Deserialize, Debug)]
struct PipesContextData {
    asset_keys: Vec<String>,
}

#[derive(Serialize)]
struct PipesMessage {
    __dagster_pipes_version: String,
    method: Method,
    params: Option<HashMap<String, serde_json::Value>>,
}

impl PipesMessage {
    fn new(method: Method, params: Option<HashMap<String, serde_json::Value>>) -> Self {
        Self {
            __dagster_pipes_version: "0.1".to_string(),
            method,
            params,
        }
    }
}

// Dagster Pipesのメッセージ種別を列挙型で実装
// https://github.com/dagster-io/dagster/blob/1.8.4/python_modules/dagster-pipes/dagster_pipes/__init__.py#L58
// report_custom_messageはサポートしない
#[derive(Serialize)]
enum Method {
    Opened,
    Closed,
    Log,
    ReportAssetMaterialization,
    ReportAssetCheck,
}

impl Display for Method {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match *self {
            Self::Opened => write!(f, "opened"),
            Self::Closed => write!(f, "closed"),
            Self::Log => write!(f, "log"),
            Self::ReportAssetMaterialization => write!(f, "report_asset_materialization"),
            Self::ReportAssetCheck => write!(f, "report_asset_check"),
        }
    }
}

struct PipesTempFileMessageWriter {
    path: String,
}

impl PipesTempFileMessageWriter {
    fn write_message(&mut self, message: PipesMessage) {
        let mut file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(&self.path)
            .unwrap_or_else(|err| {
                panic!("could not open messages file: {}", err);
            });

        writeln!(
            file,
            "{}",
            serde_json::to_string(&message).unwrap_or_else(|err| {
                panic!("could not serialize message: {}", err);
            })
        )
        .unwrap_or_else(|err| {
            panic!("could not write to messages file: {}", err);
        });
    }
}

// Dagster PipesのAssetCheckSeverityを列挙型で実装
// https://github.com/dagster-io/dagster/blob/1.8.4/python_modules/dagster-pipes/dagster_pipes/__init__.py#L138
#[derive(Serialize, Debug, Clone)]
pub enum PipesAssetCheckSeverity {
    Warn,
    Error,
}

impl Display for PipesAssetCheckSeverity {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            Self::Warn => write!(f, "WARN"),
            Self::Error => write!(f, "ERROR"),
        }
    }
}

// Dagster PipesのMetadataTypeを列挙型で実装
// https://github.com/dagster-io/dagster/blob/1.8.4/python_modules/dagster-pipes/dagster_pipes/__init__.py#L151
// 上記MetadataTypeの値(MetadataValue)が、具体どのようなものかは、以下を参照
// https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/definitions/metadata/metadata_value.py#L40
#[derive(Serialize, Debug, Clone)]
pub enum MetadataType {
    Text,
    Int,
    Bool,
}

impl Display for MetadataType {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            Self::Text => write!(f, "text"),
            Self::Int => write!(f, "int"),
            Self::Bool => write!(f, "bool"),
        }
    }
}

上記コードは以下リポジトリの内容を参考に作成しました。

https://github.com/marijncv/pg-pipes/blob/main/pg_pipes/src/pipes.rs

まとめ

DagsterからRustバイナリを実行し、連携するコードを紹介しました。これは実験的な機能に依存しているため、現時点では本番環境での使用は推奨されません。しかし、Rustの高速な処理能力を活かせる場面があれば、将来的に有用な選択肢となる可能性があります。

以上で記事を終わります。ご覧いただき、ありがとうございました。