はじめに
こんにちは。ノバセル事業本部の星野です。
この記事は
の 6 日目です。
最近の業務で、AWS Step Functions によって Web サービスを構築する機会がありました。 Step Functions は AWS の各種サービスをつかったワークフローを構築するためのサービスです。 Lambda, ECS, AWS Batch, Aurora Serverless など様々なサービスの呼び出しをステートマシン形式で定義することができます。
このとき、AWS クラウド開発キット(CDK) を触り、いくつかのサービスを使ったワークフローを構築したのですが、非常に便利な組み合わせだなあと感動しました。
...という記事を 2021 年に書いたのですが 2024年11月に、Step Functions を記述する方法に JSONata という記述形式が追加されました。
2025年12月現在、JSONataを使った CDK + Step Functions を記述するサンプルがあまりないため、上記の過去記事を JSONata 形式で書いてみようと思います
JSONPath と JSONata
Cloud Formation や CDK で Step Functions の定義を記述する際に、QueryLanguage に従来の「JSONPath」と。新しい記述形式の「JSONata」を指定することができます。
従来の JSONPath 記法と、新規法の JSONata を比べたとき
- JSONPath 記法では 5 つのフィールド (InputPath, Parameters,ResultSelector, ResultPath, OutputPath) が必要だった
- JSONata 記法では 2 つのフィールド (Arguments, Output) のみだけで制御できる
と、制御に必要なフィールドが絞られ、CDKを簡潔に描けるようになるのがメリットとのことです。 また JSONata で記述する際に、文字列・数値・集計・ブール値・配列などの関数ライブラリを利用することができ、Step Functions の各ステップの入出力の変換などが行えます。 JSONPath で記述していた際にはパラメーターを加工するためだけに Lambda を作成したり、 EvaluateExpression を利用する必要がありましたが、 JSONata ではそれらが不要になるケースも多く便利そうです。
Ref. - 変数と JSONata を使った AWS Step Functions での開発者エクスペリエンスの簡素化 - Step Functions での JSONata によるデータの変換
CDK + Step Functions + Typescript をつかったサンプル
"typescript": "~5.9.3" "aws-cdk-lib": "2.215.0" "aws-cdk": "2.1033.0"
以上のライブラリを使用した CDK のサンプル集を以下に書いていきます。 サンプル集ということで、各ステップには強い理由がなければ Pass タスクを使っています。参考にする場合は、適当なサービスの利用に置き換えてください。
JSONata の利用方法
CDK で JSONata をつかって Step Functions を利用する方は二種類あります
- ステートマシンのトップレベルで JSONata を使うと宣言する
- ステートマシンのトップレベルは従来の JSONPath を使い、任意のステップで JSONata を使う
公式ドキュメントには
When creating a workflow in the console, we recommend choosing JSONata for the top-level state machine QueryLanguage. (意訳) コンソールからワークフローを構築する場合は、ステートマシンを JSONata に設定することをお勧めする
とありますが、 JSONata のサンプルコードは少なく学習コストも高いことがままあるので、ステートマシン自体は JSONPath で設定し、はまらない範囲で個々のステップを JSONata で記述するのが個人的にはお勧めかなという気がします。
// import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; const step1 = new sfn.Pass(this, 'STEP1', { result: sfn.Result.fromObject({ message: 'First pass completed', }), }); this.stateMachine = new sfn.StateMachine(this, 'StateMachine', { definition: step1, });
こう書くと JSONPath としてステートマシンが構築されますが
以下のように、ステートマシンのトップレベルで JSONata を使うことを設定したり、
// トップレベルで JSONata で書くように指定する this.stateMachine = new sfn.StateMachine(this, 'StateMachine', { queryLanguage: sfn.QueryLanguage.JSONATA, definition: step1, });
以下のように、個々のステップで JSONata を使うことを設定することができます。
const step1 = new sfn.Pass(this, 'STEP1', { result: sfn.Result.fromObject({ message: 'First pass completed', }), resultPath: '$.firstPassResult', }); const step1 = new sfn.Pass(this, 'STEP1', { queryLanguage: sfn.QueryLanguage.JSONATA, outputs: { secondPassResult: { success: 'success' } } }); this.stateMachine = new sfn.StateMachine(this, 'StateMachine', { definitionBody: sfn.DefinitionBody.fromChainable(step1), });
なお、個々のステップを定義するときは
// 以下の書き方の場合は new 不要 sfn.Pass.jsonPath(this, '...', { ... }) sfn.Pass.jsonata(this, '...', { ... })
という書き方をすることで、 queryLanguage パラメーターを指定しなくても JSONPath/JSONata どちらかを使うか設定することができます。
queryLanguage パラメーターを使う以下のような
new sfn.Pass(this, 'Hoge', { queryLanguage: sfn.QueryLanguage.JSONATA, outputPath: { ... } })
本来、JSONata 記法では使えない outputPath を設定できてしまう(デプロイ時エラーになる)のですが
sfn.Pass.jsonata(this, 'Hoge', { outputPath: { ... } // Type Error })
と書くと、 Typescript の型チェックで JSONata では使えないパラメーターを使おうとしたときエラーが発生するため、早期の間違い検知ができるのでお勧めです。
順次実行
あるタスク(ステップ)が終わったら、その結果をもとに次の処理を行うというような順次処理は、 JSONPath では以下のように書けました。
// 最初のPassステート const firstPass = new sfn.Pass(this, 'Step2', { result: sfn.Result.fromObject({ message: 'First pass completed', }), resultPath: '$.firstPassResult', }); // 2番目のPassステート const secondPass = new sfn.Pass(this, 'Step2', { result: sfn.Result.fromObject({ status: 'success', }), resultPath: '$.secondPassResult', }); // ステートを順列に接続 const definition = firstPass.next(secondPass); // ステートマシンの作成 this.stateMachine = new sfn.StateMachine(this, 'MyStateMachine', { definitionBody: sfn.DefinitionBody.fromChainable(definition), });
flowchart TD
A((start)) --> B(FirstPass)
B --> C(SecondPass)
C --> D((end))
終端ステップである Step2 の状態の出力は
{
"firstPassResult": {
"message": "First pass completed"
},
"secondPassResult": {
"status": "success"
}
}
となります。同じ結果を得るための JSONata での記述は
const firstPass = sfn.Pass.jsonata(this, 'FirstPass', { outputs: { firstPassResult: { message: 'First pass completed', } } }); const secondPass = sfn.Pass.jsonata(this, 'SecondPass', { outputs: { firstPassResult: "{% $states.input.firstPassResult %}", secondPassResult: { status: 'success' } } }); const definition = firstPass.next(secondPass); this.stateMachine = new sfn.StateMachine(this, 'Example12StateMachine', { // 全てのステップが JSONate なので、ステートマシンの QueryLanguage を JSONate に指定することも可能 // queryLanguage: sfn.QueryLanguage.JSONATA, definitionBody: sfn.DefinitionBody.fromChainable(definition), });
という感じになります。
SecondPass の outputs では "{% $states.input.firstPassResult %}" という jsonate expression を用いて、入力のデータを取得していますがこの部分は JSONata 関数を用いて
const secondPass = sfn.Pass.jsonata(this, 'SecondPass', { outputs: "{% $merge([ $states.input , {'secondPassResult': {'status': 'success'}} ]) %}" });
のように記述することもできます。
...なのですJSONata であまりロジックを記述するのは動作確認しづらいです。程度問題にはなりますが、あまりロジックが複雑になる場合は従来と同じように Lambda によって入出力の加工を行う方が無難なのではないかなと思います。
JSONata の記述については JSONata Documentation も参照してください。
並列処理
ここまでで大体この記事で言いたいことは終わりました。
// 初期データを設定するPassステート const startPass = new sfn.Pass(this, 'StartPass', { comment: '初期データを設定', result: sfn.Result.fromObject({ input: 'Start parallel processing', timestamp: sfn.JsonPath.stringAt('$$.State.EnteredTime'), }), resultPath: '$.startData', }); // Branch 1: データ処理A const branch1Pass = new sfn.Pass(this, 'Branch1Pass', { comment: 'ブランチ1: データ処理A', result: sfn.Result.fromObject({ result: 'Processing A completed', }), resultPath: '$.branchAResult', }); // Branch 2: データ処理B const branch2Pass = new sfn.Pass(this, 'Branch2Pass', { comment: 'ブランチ2: データ処理B', result: sfn.Result.fromObject({ result: 'Processing B completed', }), resultPath: '$.branchBResult', }); // Branch 3: データ処理C const branch3Pass = new sfn.Pass(this, 'Branch3Pass', { comment: 'ブランチ3: データ処理C', result: sfn.Result.fromObject({ result: 'Processing C completed', }), resultPath: '$.branchCResult', }); // 並列処理ステート const parallelState = new sfn.Parallel(this, 'ParallelState', { comment: '3つのブランチを並列実行', resultPath: '$.parallelResults', }); // 各ブランチを並列ステートに追加 parallelState.branch(branch1Pass); parallelState.branch(branch2Pass); parallelState.branch(branch3Pass); // 並列処理の結果をマージする最終Passステート const finalPass = new sfn.Pass(this, 'FinalPass', { comment: '並列処理の結果をマージ', result: sfn.Result.fromObject({ status: 'All parallel branches completed', }), resultPath: '$.finalResult', }); // ステートを接続 const definition = startPass .next(parallelState) .next(finalPass); // ステートマシンの作成 this.stateMachine = new sfn.StateMachine(this, 'Example21StateMachine', { definitionBody: sfn.DefinitionBody.fromChainable(definition), });
flowchart TD
S((start)) --> B(StartPass)
subgraph ParallelState
direction TB
PS((start)) --> PA(Branch1Pass)
PS --> PB(Branch2Pass)
PS --> PC(Branch3Pass)
end
B --> ParallelState
ParallelState --> C(FinalPass)
C --> E((end))
キモとなっているのは
// 並列処理ステート
const parallelState = new sfn.Parallel(this, 'ParallelState', {
resultPath: '$.parallelResults',
});
parallelState.branch(branch1Pass);
parallelState.branch(branch2Pass);
parallelState.branch(branch3Pass);
の部分ですが、ここは
const parallelState = sfn.Parallel.jsonata(this, 'ParallelState', { outputs: "{% $merge([ $states.input , {'parallelResults': $states.result } ]) %}" , }); // 各ブランチを並列ステートに追加 parallelState.branch(branch1Pass); parallelState.branch(branch2Pass); parallelState.branch(branch3Pass);
と置き換えることができます。
並列処理(Parallel, Map)の中に含まれ得る Branch1Pass, Branch2Pass, Branch3Pass の結果は $states.result で参照することができて便利です。
Mapで動的に並列数を変えたい場合も
const startPass = sfn.Pass.jsonata(this, 'StartPass', { outputs: { numOfItem: "5", // 上流から適当な数字や、何らかのオブジェクトの配列が渡ってくる } }); // 上流の変数に応じて動的に並列数を決定する Map ステート const mapState = sfn.Map.jsonata(this, "MyMapState", { maxConcurrency: 3, // 上流からの入力から配列を作成してitemとして登録 (itemごとに並列に実行される) items: sfn.ProvideItems.jsonata("{% $range(0,$number($states.input.numOfItem), 1) %}"), // [0,1,2,3,4,5] の配列がつくられ、それぞれの値を入力値に並列処理が走る outputs: "{% $states.result %}" // itemの出力を配列として出力する }); // Mapの中の処理 const mapStep = sfn.Pass.jsonata(this, "MapItem", { outputs: "{% $number($states.input) + 10 %}" // テスト用の出力として、入力で渡ってきた数字に10を足す }) mapState.itemProcessor(mapStep, { mode: sfn.ProcessorMode.DISTRIBUTED, executionType: sfn.ProcessorType.STANDARD, }) // 並列処理の結果をマージする最終Passステート const finalPass = sfn.Pass.jsonata(this, 'FinalPass' ); // ステートを接続 const definition = startPass .next(mapState) // [10,11,12,13,14,15] が出力される .next(finalPass); // ステートマシンの作成(カスタムロールを使用) this.stateMachine = new sfn.StateMachine(this, 'Example22StateMachine', { queryLanguage: sfn.QueryLanguage.JSONATA, definitionBody: sfn.DefinitionBody.fromChainable(definition), comment: '並列処理のサンプル (JSONPath)', role: stateMachineRole, });
という感じで、JSONateだけで
- Mapの上流から渡ってきた入力を変形して並列実行する Item のパラメーター用に変形する
- 並列実行した Item の結果を整形する
というように使えるのが便利です。
上記サンプルコードでは、 outputs に並列処理した結果を出力させていますが、Step functions の入出力の JSON にはサイズ制限があります。処理数が動的で、JSON のサイズが肥大化する可能性のある処理の場合は、サイズを超えないようにサマリーだけを出力するなどの工夫が必要なことには注意してください。
分岐
flowchart TD
S((start)) --> B(StartPass)
B -->|odd| C(CaseA)
B -->|even| D(CaseB)
C --> E((end))
D --> E((end))
というような場合は Choice を使いますが
const firstPass = sfn.Pass.jsonata(this, 'FirstPass', { outputs: { number: 10 } }); const evenPass = sfn.Pass.jsonata(this, "Even"); const oddPass = sfn.Pass.jsonata(this, "Odd"); // 上流からの入力が偶数か奇数かで処理を分ける const choice = sfn.Choice.jsonata(this, "OddOrEven") .when( sfn.Condition.jsonata("{% $states.input.number % 2 = 0 %}"), evenPass ).otherwise(oddPass) // ステートを順列に接続 const definition = firstPass.next(choice);
というふうに、とてもシンプルに書けます。 これまでは
sfn.Condition.jsonata("{% $states.input.number % 2 = 0 %}"),
の部分を Lambda に記述するなどが必要だったのでかなり嬉しい。 どのような比較ができるかは
https://docs.jsonata.org/numeric-operators
を参考にすると良いと思います。
おわりに
以上、 JSONata をつかったサンプル集でした。 JSONata便利なのですが、ググったときに見つかる情報が JSONPath のものばかりなので参考にしていただければと思います。
こういう記事が増えて、AIコーディングでちゃんと JSONata をつかったCDKを描いてくれるようになることを切に願う。




