RAKSUL TechBlog

RAKSULグループのエンジニアが技術トピックを発信するブログです

StepFunctions を CDK + Typescript で構築するサンプル集 feat. JSONata

はじめに

こんにちは。ノバセル事業本部の星野です。

この記事は

の 6 日目です。

最近の業務で、AWS Step Functions によって Web サービスを構築する機会がありました。 Step Functions は AWS の各種サービスをつかったワークフローを構築するためのサービスです。 Lambda, ECS, AWS Batch, Aurora Serverless など様々なサービスの呼び出しをステートマシン形式で定義することができます。

このとき、AWS クラウド開発キット(CDK) を触り、いくつかのサービスを使ったワークフローを構築したのですが、非常に便利な組み合わせだなあと感動しました。

...という記事を 2021 年に書いたのですが 2024年11月に、Step Functions を記述する方法に JSONata という記述形式が追加されました。

2025年12月現在、JSONataを使った CDK + Step Functions を記述するサンプルがあまりないため、上記の過去記事を JSONata 形式で書いてみようと思います

JSONPath と JSONata

Cloud Formation や CDK で Step Functions の定義を記述する際に、QueryLanguage に従来の「JSONPath」と。新しい記述形式の「JSONata」を指定することができます。

従来の JSONPath 記法と、新規法の JSONata を比べたとき

  • JSONPath 記法では 5 つのフィールド (InputPath, Parameters,ResultSelector, ResultPath, OutputPath) が必要だった
  • JSONata 記法では 2 つのフィールド (Arguments, Output) のみだけで制御できる

と、制御に必要なフィールドが絞られ、CDKを簡潔に描けるようになるのがメリットとのことです。 また JSONata で記述する際に、文字列・数値・集計・ブール値・配列などの関数ライブラリを利用することができ、Step Functions の各ステップの入出力の変換などが行えます。 JSONPath で記述していた際にはパラメーターを加工するためだけに Lambda を作成したり、 EvaluateExpression を利用する必要がありましたが、 JSONata ではそれらが不要になるケースも多く便利そうです。

Ref. - 変数と JSONata を使った AWS Step Functions での開発者エクスペリエンスの簡素化 - Step Functions での JSONata によるデータの変換

CDK + Step Functions + Typescript をつかったサンプル

"typescript": "~5.9.3"
"aws-cdk-lib": "2.215.0"
"aws-cdk": "2.1033.0"

以上のライブラリを使用した CDK のサンプル集を以下に書いていきます。 サンプル集ということで、各ステップには強い理由がなければ Pass タスクを使っています。参考にする場合は、適当なサービスの利用に置き換えてください。

JSONata の利用方法

CDK で JSONata をつかって Step Functions を利用する方は二種類あります

  • ステートマシンのトップレベルで JSONata を使うと宣言する
  • ステートマシンのトップレベルは従来の JSONPath を使い、任意のステップで JSONata を使う

公式ドキュメントには

When creating a workflow in the console, we recommend choosing JSONata for the top-level state machine QueryLanguage. (意訳) コンソールからワークフローを構築する場合は、ステートマシンを JSONata に設定することをお勧めする

とありますが、 JSONata のサンプルコードは少なく学習コストも高いことがままあるので、ステートマシン自体は JSONPath で設定し、はまらない範囲で個々のステップを JSONata で記述するのが個人的にはお勧めかなという気がします。

// import * as sfn from 'aws-cdk-lib/aws-stepfunctions';

const step1 = new sfn.Pass(this, 'STEP1', {
  result: sfn.Result.fromObject({
    message: 'First pass completed',
  }),
});

this.stateMachine = new sfn.StateMachine(this, 'StateMachine', {
  definition: step1,
});

こう書くと JSONPath としてステートマシンが構築されますが

以下のように、ステートマシンのトップレベルで JSONata を使うことを設定したり、

// トップレベルで JSONata で書くように指定する
this.stateMachine = new sfn.StateMachine(this, 'StateMachine', {
  queryLanguage: sfn.QueryLanguage.JSONATA,
  definition: step1,
});

以下のように、個々のステップで JSONata を使うことを設定することができます。

const step1 = new sfn.Pass(this, 'STEP1', {
  result: sfn.Result.fromObject({
    message: 'First pass completed',
  }),
  resultPath: '$.firstPassResult',
});

const step1 = new sfn.Pass(this, 'STEP1', {
  queryLanguage: sfn.QueryLanguage.JSONATA,
  outputs: {
    secondPassResult: {
      success: 'success'
    }
  } 
});

this.stateMachine = new sfn.StateMachine(this, 'StateMachine', {
  definitionBody: sfn.DefinitionBody.fromChainable(step1),
});

なお、個々のステップを定義するときは

// 以下の書き方の場合は new 不要
sfn.Pass.jsonPath(this, '...', { ... })
sfn.Pass.jsonata(this, '...', { ... })

という書き方をすることで、 queryLanguage パラメーターを指定しなくても JSONPath/JSONata どちらかを使うか設定することができます。

queryLanguage パラメーターを使う以下のような

new sfn.Pass(this, 'Hoge', {
  queryLanguage: sfn.QueryLanguage.JSONATA,
  outputPath: { ... }
})

本来、JSONata 記法では使えない outputPath を設定できてしまう(デプロイ時エラーになる)のですが

sfn.Pass.jsonata(this, 'Hoge', {
  outputPath: { ... } // Type Error
})

と書くと、 Typescript の型チェックで JSONata では使えないパラメーターを使おうとしたときエラーが発生するため、早期の間違い検知ができるのでお勧めです。

順次実行

あるタスク(ステップ)が終わったら、その結果をもとに次の処理を行うというような順次処理は、 JSONPath では以下のように書けました。

// 最初のPassステート
const firstPass = new sfn.Pass(this, 'Step2', {
  result: sfn.Result.fromObject({
    message: 'First pass completed',
  }),
  resultPath: '$.firstPassResult',
});

// 2番目のPassステート
const secondPass = new sfn.Pass(this, 'Step2', {
  result: sfn.Result.fromObject({
    status: 'success',
  }),
  resultPath: '$.secondPassResult',
});

// ステートを順列に接続
const definition = firstPass.next(secondPass);

// ステートマシンの作成
this.stateMachine = new sfn.StateMachine(this, 'MyStateMachine', {
  definitionBody: sfn.DefinitionBody.fromChainable(definition),
});
flowchart TD
    A((start)) --> B(FirstPass)
    B --> C(SecondPass)
    C --> D((end))

終端ステップである Step2 の状態の出力は

{
  "firstPassResult": {
    "message": "First pass completed"
  },
  "secondPassResult": {
    "status": "success"
  }
}

となります。同じ結果を得るための JSONata での記述は

const firstPass = sfn.Pass.jsonata(this, 'FirstPass', {
  outputs: {
    firstPassResult: {
      message: 'First pass completed',
    }
  }
});

const secondPass = sfn.Pass.jsonata(this, 'SecondPass', {
  outputs: {
    firstPassResult: "{% $states.input.firstPassResult %}",
    secondPassResult: {
      status: 'success'
    }
  } 
});

const definition = firstPass.next(secondPass);

this.stateMachine = new sfn.StateMachine(this, 'Example12StateMachine', {
  // 全てのステップが JSONate なので、ステートマシンの QueryLanguage を JSONate に指定することも可能
  // queryLanguage: sfn.QueryLanguage.JSONATA, 
  definitionBody: sfn.DefinitionBody.fromChainable(definition),
});

という感じになります。

SecondPass の outputs では "{% $states.input.firstPassResult %}" という jsonate expression を用いて、入力のデータを取得していますがこの部分は JSONata 関数を用いて

const secondPass = sfn.Pass.jsonata(this, 'SecondPass', {
  outputs: "{% $merge([ $states.input , {'secondPassResult': {'status': 'success'}} ]) %}" 
});

のように記述することもできます。

...なのですJSONata であまりロジックを記述するのは動作確認しづらいです。程度問題にはなりますが、あまりロジックが複雑になる場合は従来と同じように Lambda によって入出力の加工を行う方が無難なのではないかなと思います。

JSONata の記述については JSONata Documentation も参照してください。

並列処理

ここまでで大体この記事で言いたいことは終わりました。

// 初期データを設定するPassステート
const startPass = new sfn.Pass(this, 'StartPass', {
  comment: '初期データを設定',
  result: sfn.Result.fromObject({
    input: 'Start parallel processing',
    timestamp: sfn.JsonPath.stringAt('$$.State.EnteredTime'),
  }),
  resultPath: '$.startData',
});

// Branch 1: データ処理A
const branch1Pass = new sfn.Pass(this, 'Branch1Pass', {
  comment: 'ブランチ1: データ処理A',
  result: sfn.Result.fromObject({
    result: 'Processing A completed',
  }),
  resultPath: '$.branchAResult',
});

// Branch 2: データ処理B
const branch2Pass = new sfn.Pass(this, 'Branch2Pass', {
  comment: 'ブランチ2: データ処理B',
  result: sfn.Result.fromObject({
    result: 'Processing B completed',
  }),
  resultPath: '$.branchBResult',
});

// Branch 3: データ処理C
const branch3Pass = new sfn.Pass(this, 'Branch3Pass', {
  comment: 'ブランチ3: データ処理C',
  result: sfn.Result.fromObject({
    result: 'Processing C completed',
  }),
  resultPath: '$.branchCResult',
});

// 並列処理ステート
const parallelState = new sfn.Parallel(this, 'ParallelState', {
  comment: '3つのブランチを並列実行',
  resultPath: '$.parallelResults',
});

// 各ブランチを並列ステートに追加
parallelState.branch(branch1Pass);
parallelState.branch(branch2Pass);
parallelState.branch(branch3Pass);

// 並列処理の結果をマージする最終Passステート
const finalPass = new sfn.Pass(this, 'FinalPass', {
  comment: '並列処理の結果をマージ',
  result: sfn.Result.fromObject({
    status: 'All parallel branches completed',
  }),
  resultPath: '$.finalResult',
});

// ステートを接続
const definition = startPass
  .next(parallelState)
  .next(finalPass);

// ステートマシンの作成
this.stateMachine = new sfn.StateMachine(this, 'Example21StateMachine', {
  definitionBody: sfn.DefinitionBody.fromChainable(definition),
});
flowchart TD
    S((start)) --> B(StartPass)

    subgraph ParallelState
      direction TB
      PS((start)) --> PA(Branch1Pass)
      PS --> PB(Branch2Pass)
      PS --> PC(Branch3Pass)
    end

    B --> ParallelState
    ParallelState --> C(FinalPass)
    C --> E((end))

キモとなっているのは

// 並列処理ステート
const parallelState = new sfn.Parallel(this, 'ParallelState', {
  resultPath: '$.parallelResults',
});

parallelState.branch(branch1Pass);
parallelState.branch(branch2Pass);
parallelState.branch(branch3Pass);

の部分ですが、ここは

const parallelState = sfn.Parallel.jsonata(this, 'ParallelState', {
  outputs: "{% $merge([ $states.input , {'parallelResults': $states.result } ]) %}" ,
});

// 各ブランチを並列ステートに追加
parallelState.branch(branch1Pass);
parallelState.branch(branch2Pass);
parallelState.branch(branch3Pass);

と置き換えることができます。 並列処理(Parallel, Map)の中に含まれ得る Branch1Pass, Branch2Pass, Branch3Pass の結果は $states.result で参照することができて便利です。

Mapで動的に並列数を変えたい場合も

const startPass = sfn.Pass.jsonata(this, 'StartPass', {
  outputs: {
    numOfItem: "5", // 上流から適当な数字や、何らかのオブジェクトの配列が渡ってくる
  }
});

// 上流の変数に応じて動的に並列数を決定する Map ステート
const mapState = sfn.Map.jsonata(this, "MyMapState", {
  maxConcurrency: 3,
  // 上流からの入力から配列を作成してitemとして登録 (itemごとに並列に実行される)
  items: sfn.ProvideItems.jsonata("{% $range(0,$number($states.input.numOfItem), 1) %}"), // [0,1,2,3,4,5] の配列がつくられ、それぞれの値を入力値に並列処理が走る
  outputs: "{% $states.result %}" // itemの出力を配列として出力する
});

// Mapの中の処理
const mapStep = sfn.Pass.jsonata(this, "MapItem", {
  outputs: "{% $number($states.input) + 10 %}" // テスト用の出力として、入力で渡ってきた数字に10を足す
})
mapState.itemProcessor(mapStep, {
  mode: sfn.ProcessorMode.DISTRIBUTED,
  executionType: sfn.ProcessorType.STANDARD,
})


// 並列処理の結果をマージする最終Passステート
const finalPass = sfn.Pass.jsonata(this, 'FinalPass' );

// ステートを接続
const definition = startPass
  .next(mapState) // [10,11,12,13,14,15] が出力される
  .next(finalPass);

// ステートマシンの作成(カスタムロールを使用)
this.stateMachine = new sfn.StateMachine(this, 'Example22StateMachine', {
  queryLanguage: sfn.QueryLanguage.JSONATA,
  definitionBody: sfn.DefinitionBody.fromChainable(definition),
  comment: '並列処理のサンプル (JSONPath)',
  role: stateMachineRole,
});

という感じで、JSONateだけで

  • Mapの上流から渡ってきた入力を変形して並列実行する Item のパラメーター用に変形する
  • 並列実行した Item の結果を整形する

というように使えるのが便利です。

上記サンプルコードでは、 outputs に並列処理した結果を出力させていますが、Step functions の入出力の JSON にはサイズ制限があります。処理数が動的で、JSON のサイズが肥大化する可能性のある処理の場合は、サイズを超えないようにサマリーだけを出力するなどの工夫が必要なことには注意してください。

分岐

flowchart TD
    S((start)) --> B(StartPass)

    B -->|odd| C(CaseA)
    B -->|even| D(CaseB)
    C --> E((end))
    D --> E((end))

というような場合は Choice を使いますが

const firstPass = sfn.Pass.jsonata(this, 'FirstPass', {
  outputs: {
    number: 10
  }
});

const evenPass = sfn.Pass.jsonata(this, "Even");
const oddPass = sfn.Pass.jsonata(this, "Odd");

// 上流からの入力が偶数か奇数かで処理を分ける
const choice = sfn.Choice.jsonata(this, "OddOrEven")
  .when(
    sfn.Condition.jsonata("{% $states.input.number % 2 = 0 %}"),
    evenPass
  ).otherwise(oddPass)

// ステートを順列に接続
const definition = firstPass.next(choice);

というふうに、とてもシンプルに書けます。 これまでは

sfn.Condition.jsonata("{% $states.input.number % 2 = 0 %}"),

の部分を Lambda に記述するなどが必要だったのでかなり嬉しい。 どのような比較ができるかは

https://docs.jsonata.org/numeric-operators

を参考にすると良いと思います。

おわりに

以上、 JSONata をつかったサンプル集でした。 JSONata便利なのですが、ググったときに見つかる情報が JSONPath のものばかりなので参考にしていただければと思います。

こういう記事が増えて、AIコーディングでちゃんと JSONata をつかったCDKを描いてくれるようになることを切に願う。

Federated Learning: Training Models Where the Data Lives

こんにちは、ラクスルベトナムTech LeadのMinhです。

本記事はノバセル テクノ場 出張版2025 Advent Calendar 2025の5日目の記事になります。

私は日本語が得意ではないので英語での投稿とさせてください。

1. Introduction: AI Wants More Data, but the World Says “No”

Figure 1. High-level federated learning architecture. Adapted from Nasim et al. [1].

Modern deep learning systems—especially foundation models and large language models—improve predictably with scale. Empirical scaling laws show that model performance increases as a function of data, parameters, and compute, with insufficient data becoming the dominant bottleneck as models grow larger [2][3]. Yet many of the most valuable datasets are increasingly:

  • Highly sensitive — private messages, medical images, transaction histories
  • Fragmented — distributed across devices, clinics, banks, countries
  • Heavily regulated — GDPR, HIPAA, PSD2, sector-specific banking rules, and data-localization laws

The traditional approach—centralizing everything into a data lake—is increasingly:

  • Legally constrained or prohibited
  • Politically complex, especially across business units or independent institutions
  • Operationally risky, widening the blast radius of any breach

As a result, organizations face a paradox: AI needs more data, but the world provides less access to it. Recent surveys describe Federated Learning (FL) as a promising response to this tension: a decentralized training paradigm where learning is aggregated, not the raw data [4].

2. What Is Federated Learning?

At its core, Federated Learning trains a shared model by sending the model to the data rather than sending the data to the model. Crucially, FL is not merely a distributed optimization algorithm. Modern surveys emphasize that it is a systems framework that encompasses:

  • Communication protocols
  • Client selection and orchestration
  • Robustness to failures and adversarial updates
  • Model and resource heterogeneity
  • Deployment and MLOps considerations

2.1 Two Common Federated Learning Settings

Cross-Device FL

  • Scale: Millions of edge clients (phones, browsers, wearables)
  • Challenges: tiny per-client datasets, intermittent connectivity, non-IID data distribution, resource constraints
  • Use cases: mobile keyboards, on-device personalization, recommendation and speech models

Cross-Silo FL

  • Scale: Tens to hundreds of reliable, institution-level clients (e.g., hospitals, banks)
  • Characteristics: larger datasets, stable networking, contractual governance
  • Use cases: multi-hospital diagnostics, cross-bank fraud detection, credit scoring

FL’s versatility across these settings makes it attractive as an alternative to centralized AI pipelines.

3. What Federated Learning Actually Solves

3.1 Privacy, Regulation, and Data Locality

In many industries, data simply cannot leave its source due to legal, ethical, or operational constraints:

  • Healthcare: Patient data is tightly regulated and fragmented across hospitals and imaging centers [5][6].
  • Finance: Transactions and identity data are siloed under strict confidentiality rules and regulatory oversight [7][8].
  • Consumer devices: Typed text, speech snippets, and behavioral signals are highly sensitive and difficult to centralize responsibly [9].

FL provides a practical mechanism to:

  • Train on local data while keeping it under local control
  • Comply with privacy and data-localization rules
  • Leverage diverse datasets across many participants
  • Reduce organizational risk by minimizing data movement

This is particularly attractive for enterprises seeking to de-risk AI adoption.

3.2 Personalization Without Over-Collection

User-facing products increasingly require personalization, but traditional approaches rely on intensive data collection. FL offers a cleaner alternative.

  • Gboard trains next-word prediction models using FL + secure aggregation + device-level differential privacy, allowing personalization without exposing raw keystrokes [10][11].
  • Apple’s Private Federated Learning (PFL) and the pfl-research framework generalize this for large-scale on-device learning with formal privacy protections [12].

Business value: FL enables tailored experiences without the operational or reputational risk of sending sensitive behavioral data to the cloud.

3.3 Cross-Organization Collaboration Without Data Sharing

FL enables institutions to jointly train models without exposing their data, creating new opportunities for collaboration.

  • Healthcare: Multi-hospital FL improves medical imaging performance—especially for rare diseases—while keeping data on-prem [5][6].
  • Finance: Multi-bank fraud detection and credit scoring systems use FL to combine intelligence without sharing transactions [7][8].

Strategic impact: FL transforms previously impossible partnerships into viable collaborations by aligning incentives, governance, and privacy requirements.

4. Architecture of a Federated Learning System

4.1 High-Level Architecture

Figure 2. Federated learning architecture for bird species classification. Adapted from Mulero-Pérez et al. [13].

  1. Initialization

    A central server initializes a global model (e.g., CNN, transformer) and selects clients for participation.

  2. Local Training & Model Update Transmission

    Each client trains the model on its private dataset. Data never leaves the device, satisfying privacy and data-locality constraints. Then, clients send updates (weights, gradients, or compressed representations) to the server.

  3. Aggregation

    The server aggregates updates via Federated Averaging (FedAvg) or more advanced algorithms (FedProx, FedNova, Scaffold).

  4. Iteration

    The updated global model is redistributed, and training proceeds iteratively until convergence.

4.2 Key Components

  • Federation Controller / Central Server

    Coordinates training rounds, maintains global state, enforces privacy mechanisms, and handles orchestration.

  • Clients (Edge Devices or Institutional Nodes)

    Provide local data and compute for training; may participate intermittently (cross-device) or reliably (cross-silo).

  • Local Private Data

    Remains on-device or on-prem, reducing governance overhead, breach risk, and compliance burden.

  • Global Model

    A shared model co-trained across participants, capturing collective knowledge without collecting sensitive records.

4.3 Architectural Challenges

  • System heterogeneity: varying hardware, availability, data distributions
  • Communication bottlenecks: limited bandwidth in edge environments
  • Stragglers: slow or unavailable clients
  • Privacy and security risks: model updates may leak data without DP or secure aggregation
  • Robustness: Byzantine clients or poisoned updates

5. Conclusion

Federated Learning has progressed from a conceptual idea to a mature architectural pattern for privacy‑preserving machine learning. Production deployments—such as Google’s Gboard and Apple’s PFL systems—show that FL is reliable and scalable in real‑world environments. As regulations tighten and edge computing advances, Federated Learning is positioned to become a foundational approach for building AI systems that uphold privacy, governance, and data sovereignty.

References [1] Nasim, M.D.A. et al. (2025). Principles and Components of Federated Learning Architectures. arXiv:2502.05273.

[2] Kaplan, J. et al. (2020). Scaling Laws for Neural Language Models. OpenAI.

[3] Hoffmann, J. et al. (2022). Training Compute-Optimal Large Language Models. DeepMind.

[4] Kairouz, P. et al. (2021). Advances and Open Problems in Federated Learning. Proceedings of the IEEE. arXiv:1912.04977

[5] Zhang, F. et al. (2023). Recent Methodological Advances in Federated Learning for Healthcare. arXiv:2310.02874.

[6] Rehman, M. H. U. et al. (2023). Federated Learning for Medical Imaging Radiology. British Journal of Radiology.

[7] Kennedy, C. H. et al. (2025). The Role of Federated Learning in Improving Financial Security: A Survey. IEEE GCAIoT / arXiv:2510.14991.

[8] Brundyn, A. et al. (2022). Using Federated Learning to Bridge Data Silos in Financial Services. NVIDIA Technical Blog.

[9] Google Research (2021). Predicting Text Selections with Federated Learning. Google AI Blog.

[10] Hard, A. et al. (2018). Federated Learning for Mobile Keyboard Prediction. Google Research.

[11] Xu, Z. et al. (2023). Federated Learning of Gboard Language Models with Differential Privacy. arXiv:2305.18465.

[12] Apple Machine Learning Research (2023). Private Federated Learning and pfl‑research Framework.

[13] Mulero-Pérez, D. et al. (2025). A Federated Learning Architecture for Bird Species Classification in Wetlands. Journal of Sensor and Actuator Networks, 14(4), 71. https://doi.org/10.3390/jsan14040071


PythonとOpenAI Batch APIでの長時間動画の非同期解析処理

こんにちは、ノバセル24新卒エンジニアの秦です。

本記事はノバセル テクノ場 出張版2025 Advent Calendar 2025の4日目の記事になります。

はじめに

OpenAIのAPIは、現時点では「動画ファイルそのもの」を直接入力として受け付けるインターフェースを持っていません。そのため、動画の内容をAIで解析したい場合、システム側で「動画を音声と映像フレーム(静止画)に分解し、それぞれをテキストや画像として入力する」という前処理が不可欠になります。

また、動画を解析する場合、動画の長さ自体もボトルネックになります。特に数分〜数十分に及ぶ動画の場合、リアルタイムAPI(同期処理)では大量のトークン消費によるコスト増加や、HTTPタイムアウトの発生といった課題がつきまといます。

本記事では、これらの課題を解決するために、Pythonで前処理(分割)を行い、OpenAIのBatch APIを活用して長時間動画を効率的かつ低コストに非同期処理するための実装フローを紹介します。

Batch APIを利用することで、処理完了まで時間はかかりますが(24時間以内)、通常のAPI利用に比べて50%のコスト削減が可能になります。特に大量の動画アーカイブを処理する場合や、夜間にまとめて解析を行うようなユースケースでは、このコストメリットは非常に大きくなります。

想定読者

  • OpenAI APIを使って動画解析を行いたいエンジニア
  • 長時間動画の処理コストやタイムアウトに悩んでいる方
  • Batch APIの具体的な実装イメージ(特にサイズ制限への対処)を知りたい方

この記事でわかること

  • 動画を音声と映像に分割するPython/FFmpegの実装方法
  • OpenAI Batch APIでのバッチ登録方法とサイズ制限への対応
  • 非同期処理における結果取得とエラーハンドリング

全体フロー

動画解析の全体パイプラインは以下の通りです。

graph TD
    Video[動画ファイル] -->|FFmpegで抽出| Audio[音声データ]
    Video -->|FFmpegで切り出し| Frames[映像フレーム画像]
    
    Audio -->|分割| AudioChunks[音声チャンク]
    AudioChunks -->|Whisper API| Transcript[文字起こしテキスト]
    Transcript -->|JSONL整形| AudioBatch[音声バッチファイル]
    
    Frames -->|Base64エンコード & JSONL整形| VideoBatch[映像バッチファイル]

    AudioBatch -->|アップロード & バッチ登録| BatchProcess[バッチ処理プロセス]
    VideoBatch -->|アップロード & バッチ登録| BatchProcess
    
    BatchProcess -->|待機 & ポーリング| CompletedResult[解析結果ファイル]

    CompletedResult -->|ダウンロード & 紐付け| DB[(データベース)]
  1. 動画取得: 解析対象の動画ファイルを取得します。
  2. メディア分割: 動画を「音声」と「映像(フレーム)」に分離・加工します。
  3. 文字起こし: 音声データを文字起こし(Transcription)します。
  4. Batch API登録: 映像フレームと文字起こし結果をJSONL形式に整形し、OpenAI Batch APIに登録します。
  5. 結果取得: バッチ処理完了後、結果を取得しDB等に保存します。

特に「メディア分割」と「Batch API登録」のステップにおいて、OpenAIの制限(ファイルサイズやリクエストサイズ)を考慮した設計が必要です。

1. メディア分割の実装

動画ファイルをそのままAIに投げるのではなく、PythonとFFmpegを使って適切な形式に加工していきます。

音声処理フロー(抽出・分割・文字起こし)

動画内の音声をAIで解析するためには、まず音声データをテキスト化(文字起こし)する必要があります。しかし、OpenAIのWhisper APIには25MBというファイルサイズ制限があり、長時間の動画データはそのままではアップロードできない場合があります。

また、プロダクトの要件として「処理全体の完了時間を短縮したい」という事情もありました。巨大な1つのファイルを順次処理するよりも、分割して複数のリクエストとして並列処理させる方が、トータルの待ち時間を短く抑えられるためです。

そこで、以下のステップで処理を行っていきます。

  1. 抽出: 動画から音声を抜き出す(16kHz, モノラル)。
  2. 分割: FFmpegの segment オプション等を使い、一定時間(例:300秒)ごとに分割する。
  3. 文字起こし: 分割した各チャンクをWhisper API(同期)でテキスト化する。

Batch APIには音声データ(バイナリ)を直接投げられないため、この文字起こし工程までは同期APIで事前に行っておく必要がある点に注意してください。

import subprocess
from pathlib import Path
from openai import OpenAI

client = OpenAI()

def process_audio(video_path: str, output_dir: Path, chunk_seconds: int = 300):
    output_dir.mkdir(parents=True, exist_ok=True)
    audio_path = output_dir / "audio.wav"
    
    # 1. 動画から音声を抽出
    subprocess.run([
        "ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
        "-i", video_path,
        "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
        str(audio_path)
    ], check=True)

    # 2. 音声を一定時間ごとに分割
    segment_pattern = output_dir / "chunk_%03d.wav"
    subprocess.run([
        "ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
        "-i", str(audio_path),
        "-f", "segment", "-segment_time", str(chunk_seconds), "-c", "copy",
        str(segment_pattern)
    ], check=True)

    # 3. 分割したチャンクを文字起こし
    transcriptions = []
    for chunk_path in sorted(output_dir.glob("chunk_*.wav")):
        with open(chunk_path, "rb") as audio_file:
            transcript = client.audio.transcriptions.create(
                model="whisper-1", 
                file=audio_file,
                response_format="text"
            )
            transcriptions.append({
                "chunk": chunk_path.name,
                "text": transcript
            })
            
    return transcriptions

映像フレームの切り出し

次に、映像解析のために動画から静止画(フレーム)を切り出します。ここでは1秒間に1枚(1fps)のペースでPNG形式として抽出する例を示します。

必要以上に画像サイズが大きいと消費トークン数が無駄になるため、要件に応じてJPEGへの変更やリサイズを行うと良いと思います。

def extract_frames(video_path: str, output_dir: Path, interval_seconds: float = 1.0):
    output_dir.mkdir(parents=True, exist_ok=True)
    frame_pattern = output_dir / "frame_%06d.png"
    
    subprocess.run([
        "ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
        "-i", video_path,
        "-vf", f"fps=1/{interval_seconds}",
        str(frame_pattern)
    ], check=True)

    return sorted(output_dir.glob("frame_*.png"))

2. OpenAI Batch APIへの登録

抽出したデータをBatch API用の形式(JSONL)に変換し、登録します。

Batch APIの仕組みとサイズ制限

Batch APIは、複数のAPIリクエストを1つのJSONLファイルにまとめてアップロードし、非同期で処理させる仕組みです。「ファイル作成 → アップロード → バッチ作成 → 待機 → 結果取得」というフローになります。

実装時に意識すべき制限(2025年12月時点)は以下の2点です。

  1. 1リクエストの上限: 50 MB
  2. 1バッチファイルの上限: 200 MB

実装戦略:なぜ「1フレーム1リクエスト」なのか?

今回の実装では、「映像は1リクエスト1フレームとし、それを大量に束ねて200MBごとのバッチファイルを作成する」という方針を採用しました。

もし「1リクエストに可能な限り画像を詰め込む」アプローチを取った場合、「リクエストへのパッキング(50MB制限)」と「バッチへのパッキング(200MB制限)」という2重のサイズ計算が必要になります。さらに、サーバーのリソース節約のために画像を生成しながら順次バッチ化するようなストリーム処理を行う場合、この計算ロジックは非常に複雑になってしまいます。

「1フレーム1リクエスト」であれば、画像1枚が50MBを超えることはまずないため、リクエスト単体のサイズ超過を気にせず、単純にバッチファイルの容量(200MB)だけを管理すれば良くなり、実装が大幅にシンプルになるためです。

バッチファイルの生成コード例

以下は、映像フレームをBase64エンコードし、制限サイズを超えないようにJSONLを分割生成する実装例です。 (音声テキスト用も同様に body 内にテキストを含めて生成します)

import json
import base64
from pathlib import Path

MAX_BATCH_BYTES = 170_000_000  # 200MBに対し余裕を持たせて170MB
MAX_REQUESTS_PER_BATCH = 50_000

def create_visual_batch_files(frames: list[Path], output_dir: Path):
    current_batch = []
    current_size = 0
    batch_index = 0

    for i, frame_path in enumerate(frames):
        # 画像をBase64エンコード
        image_bytes = frame_path.read_bytes()
        image_b64 = base64.b64encode(image_bytes).decode("ascii")
        data_url = f"data:image/png;base64,{image_b64}"

        # Batch APIリクエストオブジェクト
        request_item = {
            "custom_id": f"frame-{i}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-5-mini",
                "messages": [
                    {"role": "system", "content": "この画像のシーンを詳細に描写してください。"},
                    {
                        "role": "user", 
                        "content": [{"type": "image_url", "image_url": {"url": data_url}}]
                    }
                ],
                "max_tokens": 300
            }
        }

        # JSONLの1行としてのサイズを計算
        json_line = json.dumps(request_item, ensure_ascii=False)
        line_size = len(json_line.encode("utf-8"))

        # 制限チェック(サイズ or 件数)
        if (current_size + line_size > MAX_BATCH_BYTES) or (len(current_batch) >= MAX_REQUESTS_PER_BATCH):
            _write_batch_file(current_batch, output_dir / f"batch_visual_{batch_index}.jsonl")
            current_batch = []
            current_size = 0
            batch_index += 1

        current_batch.append(request_item)
        current_size += line_size

    # 残りのリクエストを書き出し
    if current_batch:
        _write_batch_file(current_batch, output_dir / f"batch_visual_{batch_index}.jsonl")

def _write_batch_file(requests, path):
    with open(path, "w", encoding="utf-8") as f:
        for req in requests:
            f.write(json.dumps(req, ensure_ascii=False) + "\n")

3. 結果の取得とハンドリング

Batch APIは非同期処理であるため、「バッチジョブの登録」の後に「バッチジョブの状態確認(ポーリング)」を一定間隔で行い、バッチジョブ完了を確認後に「結果ファイルの取得」を行うという2段階のプロセスが必要になります。

基本的にボーリングでは、バッチのステータスを定期的に確認し、completed になるのを待つことになります。しかし、ここで一つ注意点があります。

それは、「ステータスが completed = 成功」とは限らない ということです。

稀に、バッチファイルのフォーマット不正などが原因で、ステータスは completed なのに output_file_id(成功結果)が null で、error_file_id(失敗詳細)だけが返ってくるケースがあります。

そのため、単にステータスだけを見て「完了したから結果を取りに行こう!」と判断するのは危険です。これを成功とみなして処理を進めると、結果ファイルが存在しないため後続の処理でエラーになってしまいます。必ず output_file_id の有無や request_counts.failed もあわせて確認するようにしてください。

import time
from openai import OpenAI

client = OpenAI()

def wait_for_batch_completion(batch_id: str, polling_interval: int = 60):
    while True:
        batch = client.batches.retrieve(batch_id)
        print(f"Batch {batch_id} status: {batch.status}")

        if batch.status == "completed":
            # 成功ファイルがあるか確認
            if not batch.output_file_id:
                 raise Exception(f"Batch completed but no output file. Errors: {batch.error_file_id}")
            return batch
        elif batch.status == "failed":
            raise Exception(f"Batch failed: {batch.errors}")
        elif batch.status in ["cancelled", "expired"]:
            raise Exception(f"Batch ended with status: {batch.status}")
        
        time.sleep(polling_interval)

結果の保存

ダウンロードした結果ファイル(JSONL)には、各行にリクエスト時の custom_id(例: frame-0)が含まれています。

これをキーにして元の動画フレームや音声チャンクと紐付けを行い、DB等に保存します。今回は映像と音声を別バッチで処理したため、それぞれの結果リストを custom_id ベースでソート・マージして利用します。

4. 今後の展望

前述の「1フレーム1リクエスト」戦略は実装をシンプルにしますが、以下のような課題があります。

  • リクエストごとに指示用のプロンプトが必要になる
  • 1枚ごとの解釈になるため前後のフレーム関係を踏まえられない
  • 音声とも切り離されているため、音声と映像を紐づけて解析ができない

これらを解決するアプローチとして、1バッチ1リクエストにする案を考えています。 バッチの上限は気にせず、1バッチには1リクエストだけを含める形にし、1リクエストに50MBを超えない程度に複数画像を入れ込みます。

これにより、AIが複数のフレームを一度に見られるため「動画の流れ」を考慮した解析が可能になり、音声も1フレームごとに切り分けるよりは1バッチに含めやすくなります。

まとめ

PythonとBatch APIを利用して長時間動画を処理する際のポイントは以下の通りです。

  1. 非同期処理の活用: 即時性が不要ならBatch APIでコストを大幅(50%)に削減できる。
  2. 事前分割: FFmpeg等で音声・映像を適切に分割・加工し、音声は先に文字起こししておく。
  3. サイズ制限への対応: 「1フレーム1リクエスト」等の戦略で、1リクエスト50MB、1バッチ200MBの制限を回避する実装を行う。
  4. 確実な結果取得: completed ステータスを鵜呑みにせず、必ず出力ファイルの有無を確認する。

動画のAI処理を実装する際、この記事が少しでもお役に立てば嬉しいです

MySQL 5.7 → 8.0 移行で直面した空間データの盲点と対策

はじめに

こんにちは。ラクスル Advent Calendar 2025 3 日目を担当する、ラクスル事業部 Web エンジニアの森田です。

現在ラクスルでは MySQL 5.7 を Aurora の延長サポート環境で稼働させているため、MySQL 8.0 への移行を検証しています。

その過程で、とあるクエリが MySQL 5.7 では問題なく稼働していたにもかかわらず、8.0 環境下で応答時間が大幅に悪化する事象が発生しました。今回はこの事象から得られた知見を共有します。

事象

私が開発しているエリアマーケティングチームでは、ポスティングや新聞折込といった印刷から配布までワンストップで提供するサービスを扱っています。

新聞折込サービスには、全国の新聞販売店の配布エリアをポリゴンとして DB に保持し、ユーザーが地図上で指定した円形エリアと重なる販売店を導出するロジックが存在します。

この交差判定ロジックで使用されているポリゴンを使った検索クエリの実行時間が、 3 秒前後から 12 秒前後に悪化しました。

本プロダクトでは、地図検索の応答速度が重要指標であるため、これを改善する必要がありました。

原因

SRID の違い

原因を調査したところ、MySQL 8.0 から SRID(Spatial Reference System Identifier)の扱いが変更されていると判明しました。

SRID とは、空間データがどの座標系で表現されているかを示す識別子です。例えば 4612: 日本測地系、0: 平面直交座標系を表します。

5.7 系では、以下の箇所において SRID=4612 を明示していました。

  • 検索クエリ内での SRID 指定

  • DB のカラム定義(GEOMETRY 型列)

  • 空間インデックスの定義

しかし実は、MySQL 5.7 では SRID=0 以外は実質無視され、列やインデックスの SRID に関係なく SRID=0 として扱われていました。

MySQL 5.7 doesn’t really support any other SRIDs — they are simply ignored, and SRID 0 is assumed. *1

MySQL 8.0 で SRID が正しく評価され、SRID=4612 の空間インデックスが適用されるようになった結果、クエリの計算コストが増加し、パフォーマンスが悪化しました。

影響を受けた関数

今回のユースケースでは SPATIAL INDEX が効くMBRIntersects 関数を使用して、最小外接長方形(MBR)でざっくり絞り込みを行います。円とポリゴンをそれぞれ長方形で近似して比較し、候補を大幅に絞り込むための処理です。

実際の使用イメージは以下のようになります。(販売店ポリゴンは非公開情報のため、Nano banana で生成した画像でお届けします。)

MBRIntersects関数の使用イメージ

ここで重要なのが、MBRIntersectsは SRID の指定だけでなく、クエリ内でのジオメトリの渡し方によってもパフォーマンスが変わる という点です。

挙動の比較

  • ① MySQL 5.7 / SRID 4612 を指定する場合

SRID が無視され、平面座標系として処理されます。

  • ② MySQL 8.0 / SRID 4612 を関数呼び出しで指定する場合
EXPLAIN SELECT *
FROM spatial_table
WHERE MBRIntersects(
    ST_GeomFromText('POLYGON((35.0 135.0,35.0 135.1,35.1 135.1,35.1 135.0,35.0 135.0))', 4612),
    geom_column
) = 1;

この場合、EXPLAIN の結果を見ると

key: NULL

となり、SPATIAL INDEX は認識されません。MySQL は WHERE 句のST_GeomFromText(...) を 関数呼び出しとして扱い、毎行評価される可能性がある値 と見なすためです。*2

  • ③ MySQL 8.0 / SRID 4612 を定数化して指定する場合
SET @search_area = ST_GeomFromText(
  'POLYGON((35.0 135.0,35.0 135.1,35.1 135.1,35.1 135.0,35.0 135.0))',
  4612
);
EXPLAIN SELECT *
FROM spatial_table
WHERE MBRIntersects(geom_column, @search_area);

この場合は EXPLAIN に key: index_geom_column が表示され、オプティマイザは 定数のジオメトリとして認識 して SPATIAL INDEX を参照できます。

ただし SRID 4612 の計算コストは高いため、クエリはまだ遅いです。

  • ④ MySQL 8.0 / SRID 0 を指定する場合

オプティマイザは平面座標系として扱うため、5.7 時代と同等のレスポンスになります。

この違いも踏まえて、SRID とジオメトリの渡し方によるパフォーマンス差をまとめると以下の通りになりました。

No. MySQL バージョン クエリ例 インデックス認識 パフォーマンス
5.7 ST_GeomFromText(..., 4612) を WHERE に直接使用 無効(SRID 無視) 3 秒程度
8.0 ST_GeomFromText(..., 4612) を WHERE に直接使用 無効 12 秒程度
8.0 定数化 + SRID 4612 有効 10 秒程度
8.0 SRID 0 を指定 無効 3 秒程度

解決方法

今回のユースケースでは、以下の理由から パフォーマンスを重視して ④ SRID=0 を採用し、旧 5.7 時代のレスポンスと同じ速度まで回復しました。

  1. 使用している円の最大検索半径が 20km 程度で、地球の曲率による誤差はほぼ無視できる

  2. 円形エリアを元々多角形に近似してから交差判定を行っており、元々精度を最重要視しているわけでない

まとめ

今回の検証で、空間データの扱いがバージョン間で大きく変わることは盲点でした。

今後も、地図や空間データを扱う機能のパフォーマンスを保ちながら、UX を改善していくために、このような細かい仕様変更にも注意していきたいと考えています。

新卒テクニカルサポートエンジニアから開発職へ。入社約1ヶ月間で変わった5つの当たり前

こんにちは。 2025年11月1日にラクスル株式会社へ入社し、開発エンジニアをしています。

前職では外資系企業にてテクニカルサポートエンジニアとして1年7ヶ月勤務し、お客様の技術的な問題の解決をお手伝いしておりました。 その中で「自分で価値を創る開発職へ」と決意し、現在に至ります。

学生時代、開発インターンに参加経験はあるものの、正社員としては初めての開発職です。この20日間は、毎日新しい壁にぶつかり、戸惑いながらも、エンジニアとしての「あたりまえ」がガラリと変わるのを感じています。

今回は、その20日間で特に「意識が変わった」「重要性を改めて認識した」5つのポイントに絞って、その学びを共有させていただきます。

1 見ている時間軸が「過去・現在」から「未来の運用」へとシフトした

テクニカルサポートエンジニア時代では すでに起きている問題(過去の事象) を扱っていました。 ユーザーが困っている技術的な問題を分析し、原因の特定や、何かしらの解決策を提示しておりました。 そのため焦点は過去にありました。

一方で現在は、問題が起こる前に、未来を想像すること が多いと感じました。

一例ですがコードレビュー時にいただいたコメントが未来視点の内容が多かったです。

そのため何かしら仕様を決める際やコードを書く際も、「この設計が将来的に拡張しにくくないか?等の未来」を意識的に考えるようになりました。

開発職の方からすれば当たり前かもしれません。

ただ他業種から転職してきた私にとって、この視点の変化は、非常に大きな違いとして感じています。

2 正解を判断する軸が多すぎるから、仮説を立ててすぐ相談

テクニカルサポートエンジニアでは、技術的な課題を解決するための提案を探す際、仮説を立て、検証環境で検証することで、その正しさを確認することができました。

しかし開発の世界では、どちらが明確な正解かを決めるのが難しいと感じました。

例えば、実現したいことに対する実装方針をAとBで迷ったとします。どちらも作りたいものは作れます。しかし「ビジネス価値」「保守性」「開発速度」といった複数の判断軸が存在するため、現時点の私では判断に迷う場面がありました。

そのため、現時点では「xxの理由でAだと思っているのですが、xxという観点で判断しかねています。アドバイスいただけないでしょうか?」と言ったように、可能な限り判断軸を明記して相談することを意識するようになりました。

3 ゴールが技術的課題の解消から、ビジネスの貢献へシフト

テクニカルサポートエンジニア時代、技術的な課題の解消が主なゴールでした。

しかし現在は、「技術」を求めるだけでなく、「その技術がもたらすビジネスへの貢献度」 がゴールの基準になっていると感じます。

具体的には現在のゴールは、「ビジネス価値、保守コスト、開発速度」 といった複数の軸で評価し、何かしらのビジネス課題の解決に貢献するコードを書くことだと感じました。

実際に実装方針に関して相談をした際「その選択がビジネスにどんな影響を与えるか」が議論されていることを見かけました。

そのため技術的なことを考えるだけではなく、その先にある「ビジネス課題の解決への貢献」を意識的に考えるようになりました。

4 既存コードの理解には「技術」だけでなく「背景」も理解

テクニカルサポートエンジニア時代、既存のコードや設定を読む際、その目的は「技術的な構造(なぜこう動いているか)」を理解することでした。

しかし、開発職として機能のコードを読んでいると、現状の仕様を理解しても「なぜその選択がとられたのか」「なぜ他の選択がとられなかったのか」はコードだけでは読み取るのが難しいと感じることがありました。

そのためコードを正しく理解し、変更するためには、技術的観点だけではなく、その機能が生まれた背景にあるビジネスの文脈(なぜその仕様になったか) を把握することも必要だと感じました。

5 チームで働くための「意識的な情報発信」(Working Out Loud)

テクニカルサポートエンジニア時代も情報発信は意識していましたが、現在はさらに情報発信の重要度が大きくなったと感じております。

現在の仕事は一人で完結することはなく、必ずどこかで他の方の協力を得ます(コードレビュー等)。

このとき、「何を考えて、なぜその設計に至ったか」という思考の過程が共有されていないと、チームメンバーの時間を奪ってしまい、無駄なコミュニケーションが発生すると感じました。

そのため連携やコミュニケーションがしやすいように、以下2点を意識するようになりました。 - 口頭で話してた際には、その内容をどこかしらに文章を残す - 誰でも状況を追いやすいように、自分の思考の過程をSlackに記録(Working Out Loud)

どの粒度で報告し、どの粒度で情報を残した方が良いかはっきりはわからないです。ただ情報は残されれば残されているほど良い、困ることにはつながらないと思うため、可能な限り情報を残すために、上記2点を意識しました。

特に私は入社したてなので、「今、何を、どう考えているか」を伝えるため、意識的に情報を発信するSlackでのWoking Out Loudを心がけています。

実際にWoking Out Loudを行なっていたおかげで、早期に困り事を助けていただいたり、情報を共有いただけることがありました。

終わりに

たった20日ですが、テクニカルサポートエンジニアと開発職では、

見ている時間軸、気にするポイントが違うことを肌で感じました。

テクニカルサポートエンジニアでの考え方がまったく役に立たないというよりは、違いを感じやすいからこそ、開発職として重要なことを気づきやすい ように思いました。

この記事が同じようにキャリアチェンジを考えている人や新しい環境で頑張る誰かの参考になれば嬉しいです。

これからもっとプロダクトに貢献できるよう、日々精進してまいります。