RAKSUL TechBlog

ラクスルグループのエンジニアが技術トピックを発信するブログです

StepFunctions を CDK + Typescript で構築するサンプル集

title: StepFunctions を CDK + Typescript で構築するサンプル集

こんにちは。ノバセル事業本部の星野です。本日は、Raksul Advent Calendar 21日目です。

最近の業務で、StepFunctions によって Web サービスを構築する機会がありました。 StepFunctions は AWS の各種サービスをつかったワークフローを構築するためのサービスです。 Lambda, ECS, AWS Batch, Aurora Serverless など様々なサービスの呼び出しをステートマシン形式で定義することができます。

 https://aws.amazon.com/jp/step-functions/

このとき、AWS クラウド開発キット(CDK) を触り、いくつかのサービスを使ったワークフローを構築したのですが、非常に便利な組み合わせだなあと感動しました。

本稿では StepFunctions を CDK を使って構築する際によくありそうな実装パターンをサンプル集としてまとめてみました。

1. CDK とは

サンプル集に入る前に CDK についてざっと説明をします。

https://aws.amazon.com/jp/cdk/

CDK は使い慣れた開発言語を使用してクラウドアプリケーションのプロビジョニングを行うためのフレームワークです。 AWS でのクラウドアプリケーションのプロビジョニングといえば、Terraform や CloudFormation を使うやり方が有名ですが、これらのツールが DSL のようなもので構成を記述するのに対し、CDK は普段使い慣れているプログラミング言語を使って構成を記述できるというメリットがあります。今回は、言語に Typescript を選んで CDK の実装を紹介しています。

StepFunctions は Terraform や CloudFormation だと JSON や YAML 形式で書き、正しいロールを紐付けるなどする必要があるのですが、 CDK は構成に関する最低限の記述だけすると、書いていない部分はいい感じに空気を読んで設定してくれる作りになっています。複数のサービスと連携させたい StepFunctions は CDK と特に相性が良いと感じました。

また Lambda や ECS タスクは、Json 形式で StepFunctions からデータの入出力から行えるため、これらのコードを AWS のサービスから切り離しやすくテストしやすくなるという利点もあります。

その他、触っていて感じ利点は、

  • エディタが型の自動補完をしてくれるので、必要な設定などがあることを気が付きやすい
  • スクリプトとして順次実行されるので、エラーになったとき、どの行で不備があったのかわかりやすい。雑に Print デバッグなどもできる
  • CloudFormation の細かく書けるが書くのが難しい問題を、 CDK ではある程度簡単にかけるし、いい感じに補完してくれる
  • IDに命名規則を設けるなど、プラグイン的な機構を簡単にコードとして書ける

書いていて思ったのですが、 Java の Maven にたいする「Gradle いいじゃん」って話とだいたい一緒です。今はあまり感じていませんが、デメリットも多分「gradle デメリット」などでググったときに見つかるものとだいたい一緒なんじゃないでしょうか。

2. StepFunctions + CDK をつかったサンプル

前置きが長くなりましたが、ここからが StepFunctions を CDK + Typescript で構成したときのサンプル集です。

今回はサンプルということで、 AWS のサービスには Lambda だけを使います。またとくに Lambda になにかさせたいわけではない箇所にはダミーのタスクとして Pass タスクを使っています。場合に応じて適当なサービスの利用に置き換えてください。

CDK セットアップまでのチュートリアルや公式のコンセプトなどについては https://aws.amazon.com/jp/getting-started/guides/setup-cdk/ などを参照してください。

2-1. 順次実行

まずはよくある順次実行を構築してみます。あるタスクが終わったら、その結果を元に次のタスクを実行するやつです。 「Elastic Transcoder の実行が終わったら、その結果を DynamoDB に保存して Email で通知する」などのようなことができそうですね。

StepFunctions Ordered Tasks

import * as cdk from '@aws-cdk/core';
import * as stepfunc from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import * as lambda from '@aws-cdk/aws-lambda';

export class HoshinoAdventCalendarStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Lambda samples
    const firstFunction = new lambda.Function(this, 'FirstFunction', {
      code: lambda.Code.fromInline(`
        exports.handler = (event, context, callback) => {
          console.log(event);
          callback(null, {"Payload": {"Message": "hello"}});
        };
      `),
      runtime: lambda.Runtime.NODEJS_14_X,
      handler: 'index.handler',
      timeout: cdk.Duration.seconds(120),
    });

    const secondFunction = new lambda.Function(this, 'SecondFunction', {
      code: lambda.Code.fromInline(`
        exports.handler = (event, context, callback) => {
          console.log(event);
          callback(null, {});
        };
      `),
      runtime: lambda.Runtime.NODEJS_14_X,
      handler: 'index.handler',
      timeout: cdk.Duration.seconds(120),
    });

    // definite state machine
    const upstreamJob = new tasks.LambdaInvoke(this, 'UpstreamTask', {
      lambdaFunction: firstFunction,
      outputPath: '$.Payload',
    });
    const mainJob = new tasks.LambdaInvoke(this, 'MainFunctionTask', {
      lambdaFunction: secondFunction,
      payload: stepfunc.TaskInput.fromJsonPathAt('$.Payload'),
    });
    const definition = upstreamJob.next(mainJob).next(new stepfunc.Succeed(this, 'Queued'));

    new stepfunc.StateMachine(this, 'HoshinoAdventCalendarStateMachine', {
      definition,
    });
  }
}

 

SecondFunction の CloudWatch Logs を確認してみると { Message: 'hello' } というログが出力されます。上流の FirstFunction から下流の SecondFunction にパラメーターを Json で引き渡せることができました。

最後の Queued ではステートマシンを成功状態にして停止しています。やったね。

2-2. 並列実行

次は並列実行を構成してみます。並列タスクを定義する方法には

  • Parallel は決められたタスクを並列実行させることができる。
  • Map は動的に並列数を変えて、複数タスクを実行させる。

があります。 https://docs.aws.amazon.com/cdk/api/latest/docs/aws-stepfunctions-readme.html#parallelhttps://docs.aws.amazon.com/cdk/api/latest/docs/aws-stepfunctions-readme.html#map

  • Parallel では、上流の処理が終わったときに、通知とDBの更新を並列で行い、両方が終わったときに下流のタスクを実行させる。
  • Map では、上流で取得した複数の動画を、Mapによって並列でサムネイルを作る。

という使いみちががありそうです。

ここでは試しに、「Hello」と「World」というログを吐くだけの Lambda を並列実行するステートマシンを作成してみましょう。

StepFunctions Parallel Tasks1

(以下、 import やクラス宣言の部分は変わらないため、省きます)

const helloFunction = new lambda.Function(this, 'ChoiceFunction', {
code: lambda.Code.fromInline(`
exports.handler = (event, context, callback) => {
  console.log("Hello");
  callback(null, {});
};
`),
runtime: lambda.Runtime.NODEJS_14_X,
handler: 'index.handler',
timeout: cdk.Duration.seconds(120),
});

const worldFunction = new lambda.Function(this, 'WorldFunction', {
code: lambda.Code.fromInline(`
exports.handler = (event, context, callback) => {
  console.log("world");
  callback(null, {});
};
`),
runtime: lambda.Runtime.NODEJS_14_X,
handler: 'index.handler',
  timeout: cdk.Duration.seconds(120),
});

// Step Functions
const upstreamJob = new stepfunc.Pass(this, 'Start');
const parallel = new stepfunc.Parallel(this, 'Do the work in parallel');
const parallelJob1 = new tasks.LambdaInvoke(this, 'HelloTask', {
  lambdaFunction: helloFunction,
});
const parallelJob2 = new tasks.LambdaInvoke(this, 'WorldTask', {
    lambdaFunction: worldFunction,
});
parallel.branch(parallelJob1);
parallel.branch(parallelJob2);

const definition = upstreamJob.next(parallel).next(new stepfunc.Succeed(this, 'Queued'));

new stepfunc.StateMachine(this, 'HoshinoAdventCalendarStateMachine', {
  definition,
});

 

パラレルな処理の中には、さらにステートマシンを書くこともできます。

StepFunctions Parallel Tasks2

これくらいの複雑なステートマシンになると、 CloudFormation などで設定として記述するのが辛くなってくるので、 CDK の恩恵をかなり感じられそうです。

Parallel / Map のより詳細な動きは以下の公式ドキュメントを参考にしてください。

Parallel https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-parallel-state.html

MAP https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-map-state.html

2-3. 分岐

上流タスクの結果によって処理を分岐させたいということがあると思います。この場合 Choice を利用することでその実装が可能です。

StepFunctions ifelse Tasks1

const choiceFunction = new lambda.Function(this, 'ChoiceFunction', {
  code: lambda.Code.fromInline(`
    exports.handler = (event, context, callback) => {
      var odd = (Math.floor(Math.random(1, 2) * 1000) % 2) === 1;
      callback(null, { "Payload" : {"value": odd }});
    };
  `),
  runtime: lambda.Runtime.NODEJS_14_X,
  handler: 'index.handler',
  timeout: cdk.Duration.seconds(120),
});

// Step Functions
const upstream = new tasks.LambdaInvoke(this, 'choiceTask', {
  lambdaFunction: choiceFunction,
  outputPath: '$.Payload',
});
const ok = new stepfunc.Pass(this, 'OK');
const ng = new stepfunc.Pass(this, 'NotGood');

const definition = upstream
  .next(
    new stepfunc.Choice(this, 'Odd?')
      .when(stepfunc.Condition.booleanEquals('$.Payload.value', true), ok)
      .otherwise(ng),
  );

new stepfunc.StateMachine(this, 'HoshinoAdventCalendarStateMachine', {
  definition,
});
 

実行すると、上流のLambdaタスクの出力結果を元に処理を分岐させることができました。Webコンソール上にも以下のように結果が表示されます。このように、どの経路を通った処理が実行されたのかを見れるのも良いですね。

StepFunctions ifelse Tasks2

今回は値のtrue/falseを判定しましたが、その他どんな比較が可能なのかは以下を参照してください https://docs.aws.amazon.com/cdk/api/latest/docs/aws-stepfunctions-readme.html#available-conditions

Choice.when(条件A).when(条件B).otherwise のように書いたとき if (条件A) elseif (条件B) else のように動きます。この後にさらに別のステップを追加する場合 afterwards() メソッドで条件分岐が終了することを明示する必要があることに注意してください。

const upstream.next(
  choice
    .when(conditionA, taskA)
    .when(conditionB, taskB)
    .otherwise(taskC)
    .afterwards()
).next(downstream);

 

2-4. ループ

先程の分岐サンプルコードを一部書き換えてみましょう

 const definition = upstream
   .next(
     new stepfunc.Choice(this, 'Odd?')
       .when(stepfunc.Condition.booleanEquals('$.Payload.value', true), ok)
-       .otherwise(ng),
+       .otherwise(ng, upstream),
   );

 

するとステートマシンはこのように変わります。すごい。

StepFunctions Loop Tasks

たとえば、上流の処理が失敗ステータスで終わったときに Wait をつかって 10 分間待機してもう一度上流の処理を実行するなどのリトライ処理を実装することができます。 またこのときに、上流に渡すパラメーターに { "countRun": 1 } のような値を付け加えてカウントアップさせ、この値が一定値以上のときには問答無用でステートマシンを失敗させるような作りにすることも可能です。

2-5. 例外処理

最後に例外のハンドリングをステートマシンで実装する方法です。どこかでエラーが発生して処理が止まったときの通知を共通化したいときなどに利用します。

StepFunctions Error Handling

const functionA = new lambda.Function(this, 'functionA', {
  code: lambda.Code.fromInline(`
    exports.handler = (event, context, callback) => {
      callback(null, { });
    };
  `),
  runtime: lambda.Runtime.NODEJS_14_X,
  handler: 'index.handler',
  timeout: cdk.Duration.seconds(120),
});

// Step Functions
const taskA = new tasks.LambdaInvoke(this, 'taskA', {
  lambdaFunction: functionA,
});
const taskB = new tasks.LambdaInvoke(this, 'taskB', {
  lambdaFunction: functionA,
});
const handleError = new stepfunc.Fail(this, 'errrrrrrr!!!');
taskA.addCatch(handleError);
taskB.addCatch(handleError);

const definition = taskA.next(taskB);

new stepfunc.StateMachine(this, 'HoshinoAdventCalendarStateMachine', {
  definition,
});
 

なるほどね?

3. おわりに

StepFunctions を CDK + Typescript で構築する際のパターンをまとめてみました。冒頭の繰り返しになりますが、エディタの補完機能によって思う以上にドキュメントを読まなくても書ける感じがあって楽です。

StepFunctions をつかったワークフローは非常に便利なのですが、CloudFormationなどこれまでのIaaCによって複雑なフローを記述するのが難しかったので、 CDK によって比較的手軽に書けるのはとても良いなと思いました。

ただ、CDK は2021年現在すべての StepFunctions タスクに対応しているわけではないので、非対応のクラウドサービスを使う場合はご注意ください。

本番環境に乗せるにあたっては、ここに書いてあること以外にも VPC やセキュリティグループの解決なども設定する必要があり、 CDK が書ければ世は全てこともなしというわけにはいかないのですが…そこは頑張っていきましょう。

参考

StepFunctions | AWS デベロッパーズガイド

CDK StepFunctions モジュールのドキュメント | AWS docs