データ競合を気にしながらS3イベントを処理してみた 〜Rust・Lambda・DynamoDBを添えて〜
この記事は KINTOテクノロジーズアドベントカレンダー2024 の6日目の記事です🎅🎄
はじめに
こんにちは。KINTO FACTORY開発グループの上原(@penpen_77777)です。
2024年の7月に入社し、KINTO FACTORYのバックエンドの開発を担当しています。
今回は、業務の中でS3イベントを処理する際に注意すべきだったデータ競合とその解決策についてサンプルコードを通じてご紹介します。
今回想定する読者
- AWSのS3イベントが重複して通知されたり、通知の順序が入れ替わることに悩んでいる方
- Rust、S3、DynamoDB、Lambda、Terraformについて基本的な知識がある方
- サンプルコードを読む際にこの辺りの知識があると理解しやすいです
S3イベントの概要
S3イベント[1]とは、S3へのオブジェクトをアップロード、削除などの操作をトリガーに発生するイベントのことです。
S3イベントをLambda関数やSNSなどで検知することによって、S3にまつわる様々な処理を自動化できます。
S3イベントの問題点
S3イベントを処理する上で注意するべきなのは、イベントが重複して通知されたり順序が入れ替わったりすることがあるという点です。
例えば、同一オブジェクトキーに対してオブジェクト削除後にオブジェクト作成する処理を考えてみましょう。
この場合、オブジェクトの削除イベントが先に通知され、その後に作成イベントが通知されることが期待されます(同一オブジェクトに対して削除→作成した場合の期待するS3イベントの受信順序の図を参照)
しかし、S3イベントは順序が保証されないため、作成イベントが先に通知され、後に削除イベントが通知されることがあります(同一オブジェクトに対して削除→作成した場合の起こりうるS3イベントの受信順序の図を参照)
結果、オブジェクトを削除するイベントによる処理結果が最新になってしまい、処理内容によってはデータの一貫性が保証されないという問題が発生することがあります。
この問題の解決策としてS3イベントに含まれるsequencerキーを使ってイベントの順序を保証する方法があります[2]。
イベントのシーケンスを決定する方法の 1 つとして、sequencer キーがあります。イベントが発生した順序でイベント通知が届く保証はありません。ただし、オブジェクト (PUT) を作成するイベントからの通知 と削除オブジェクトは sequencer を含みます。これは、特定のオブジェクトキーのイベントの順序を決定するために使用できます。
同じオブジェクトキーに対する 2 つのイベント通知の sequencer の文字列を比較すると、sequencer の 16 進値が大きいほうのイベント通知が後に発生したイベントであることがわかります。イベント通知を使用して Amazon S3 オブジェクトの別のデータベースまたはインデックスを維持している場合は、イベント通知を処理するたびに sequencer の値を比較し、保存することを推奨します。
次の点に注意してください。
複数のオブジェクトキーのイベントの順序を決定するために sequencer を使用することはできません。
sequencer の長さが異なる場合があります。これらの値を比較するには、最初に短い方の値を右に 0 と挿入してから、辞書式比較を実行します。
まとめると以下の通りです。
- sequencerはオブジェクトのPUTやDELETEイベントに含まれる値で、イベントの順序を決定するために使用可能
- sequencerを辞書式比較し、値が大きい方が後に発生したイベント
- 長さが異なる場合は、短い方の値の右側に0を挿入してから比較
- 複数のオブジェクト同士のS3イベント順序を決定するために使用できない
- 同一オブジェクトに対するPUTやDELETEイベントの順序を決定するために使用する
例えば、Rust上でS3イベントのシーケンサ比較を実装する場合は以下のように実装できます。
S3のシーケンサの性質を表現するための構造体S3Sequencer
のフィールドとコンストラクタを定義します。
// 1. 構造体S3Sequencerを定義する
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct S3Sequencer {
bucket_name: String,
object_key: String,
sequencer: String,
}
// 2. S3Sequencerのコンストラクタを定義する
// バケット名、オブジェクトキー、シーケンサを引数に取る
impl S3Sequencer {
pub fn new(bucket_name: &str, objcet_key: &str, sequencer: &str) -> Self {
Self {
bucket_name: bucket_name.to_owned(),
object_key: objcet_key.to_owned(),
sequencer: sequencer.to_owned(),
}
}
}
次に、イベントの前後関係をS3Sequencerの大小を比較することで判別させるため、PartialOrd
トレイト[3]と
PartialEq
トレイト[4] を実装していきます。
この2つのトレイトを実装すれば、以下のように==
や<
、>
などの比較演算子を使ってシーケンサの大小を比較できます。
let seq1 = S3Sequencer::new("bucket1", "object1", "abc123");
let seq2 = S3Sequencer::new("bucket1", "object1", "abc124");
if seq1 < seq2 {
println!("seq1はseq2より古いイベントです");
} else if seq1 == seq2 {
println!("seq1とseq2は同じイベントです");
} else {
println!("seq1はseq2より新しいイベントです");
}
PartialOrdトレイトの実装に必要なpartial_cmpメソッドは以下のように実装します。
impl PartialOrd for S3Sequencer {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// バケット名が異なるシーケンサは比較できない
if self.bucket_name != other.bucket_name {
return None;
}
// オブジェクトキーが異なるシーケンサは比較できない
if self.object_key != other.object_key {
return None;
}
// 長い方に合わせて、短い方の末尾に0を追加して比較
let max_len = std::cmp::max(self.sequencer.len(), other.sequencer.len());
let self_sequencer = self
.sequencer
.chars()
.chain(std::iter::repeat('0'))
.take(max_len);
let other_sequencer = other
.sequencer
.chars()
.chain(std::iter::repeat('0'))
.take(max_len);
Some(self_sequencer.cmp(other_sequencer))
}
バケット名およびオブジェクトキーが異なるシーケンサの比較に意味はないため、それぞれが異なる場合はearly returnでNoneを返します。
バケット名とオブジェクトキーが同じになっていることを確認できたらシーケンサの比較に入るわけですが、以下の順で処理をしていきます。
- シーケンサの長さを比較して、長い方の長さを
max_len
に格納する max_len
に合わせて、短い方のシーケンサの末尾に0を追加する- 2で作成したシーケンサを辞書順で比較して、大小を返す
PartialEqトレイトは以下のように実装します。
impl PartialEq for S3Sequencer {
fn eq(&self, other: &Self) -> bool {
self.partial_cmp(other)
.map_or(false, |o| o == std::cmp::Ordering::Equal)
}
}
PartialOrdトレイトのpartial_cmpメソッドを使って、シーケンサの比較結果が等しいかどうかを判定しています。
以上の実装により、S3イベントのシーケンサを比較できるようになりました。
データ一貫性を考慮したS3イベント処理の実装例
アーキテクチャ図
ここからは、シーケンサを用いてS3イベントの順序を保証する方法をサンプルコードを交えながら紹介します。
サンプルコードではS3バケットにアップロードされた画像ファイルをLambda上でグレイスケールに変換して、出力用のS3バケットに保存する処理を行います。
以下にアーキテクチャ図を示します。
入力画像用バケットに画像ファイルがアップロードされると、S3イベントを通じてLambda関数がトリガーされます。起動したLambda関数は、DynamoDBを見て処理中でないことを確認し、処理中フラグを立てて画像ファイルを処理します。処理が完了したら、処理中フラグを解除して次の画像ファイルの処理を待ちます。
作成・削除のイベントの通知の順序が逆転してしまうと、本来存在するはずの画像が誤って削除されるといった問題が発生します。
例えば、以下の流れで処理を行うと想定し実装したとします。
- 画像ファイルAが入力用バケットから削除される
- 画像ファイルAが入力用バケットに再度アップロードされる
- Lambdaが削除イベント(1に対応)を受信、画像ファイルAを出力用バケットから削除する
- Lambdaが作成イベント(2に対応)を受信、画像ファイルAを処理しグレースケールに変換して出力用バケットに保存する
しかし、S3イベントでは3と4の通知順序が逆転する可能性があるため、以下のような流れになることがあります。
- 画像ファイルAが入力用バケットから削除される
- 画像ファイルAが入力用バケットに再度アップロードされる
- Lambdaが作成イベント(2に対応)を受信、画像ファイルAを処理しグレースケールに変換して出力用バケットに保存する
- Lambdaが削除イベント(1に対応)を受信、画像ファイルAを出力用バケットから削除する
この場合、入力用バケットには画像ファイルAが存在するにもかかわらず、出力用バケットにはグレースケール化した画像ファイルA'が存在しないという問題が発生します。
このような問題を防ぐために、S3イベントのシーケンサを使って排他処理を実装します。
加えてDynamoDBによって画像処理状況を管理させることも排他処理の実装には必要です。
DynamoDBの条件付き書き込みを使って、処理中フラグを立てることで、複数のLambda関数が同時に同じ画像ファイルを処理することを防ぎます。
今回のサンプルコードはGitHubに公開しています。以下のリンクからご確認ください。
(実行にはAWSインフラの構築が必要ですが、terraformコードにより容易に試せるようにしております)
Rustによるサンプルコードの実装
今回はRustを使ってLambda関数を実装します。Lambda関数の実装にはcargo-lambdaを使用するのが便利です。
cargo-lambdaの詳しい使用方法については割愛します
エントリーポイントの作成
cargo-lambdaで初期化コマンドを叩くと以下のようにmain.rsが自動的に生成されます。
cargo lambda init
use lambda_runtime::{
run, service_fn,
tracing::{self},
Error,
};
mod handler;
mod image_task;
mod lock;
mod s3_sequencer;
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();
run(service_fn(handler::function_handler)).await
}
handler:function_handler
をLambda関数のエントリポイントとして指定しているため、handler.rs
に実装を記述します。
function_handler
は以下のように実装します。
use crate::image_task::ImageTask;
use aws_lambda_events::event::s3::S3Event;
use lambda_runtime::{tracing::info, Error, LambdaEvent};
pub async fn function_handler(event: LambdaEvent<S3Event>) -> Result<(), Error> {
// S3イベントをImageTaskに変換する
let tasks: Vec<_> = event
.payload
.records
.into_iter()
.map(ImageTask::try_from)
.collect::<Result<Vec<_>, _>>()?;
// futures::future::join_allで実行するタスクを作成する
let execute_tasks = tasks.iter().map(|task| task.execute());
// join_allで全てのタスクを実行・待機する
// 実行結果をretに格納する
let ret = futures::future::join_all(execute_tasks).await;
// 実行結果をログに出力する
for (t, r) in tasks.iter().zip(&ret) {
info!("object_key: {}, Result: {:?}", t.object_key, r);
}
// エラーがある場合はエラーを返す
if ret.iter().any(|r| r.is_err()) {
return Err("Some tasks failed".into());
}
// 正常終了
Ok(())
}
- S3イベントのベクタをImageTask構造体のベクタに変換します。変換方法はTryFromトレイトを実装しているため、try_fromメソッドを呼ぶだけで良いです。
- ImageTask構造体のベクタを元に、画像処理タスクを作成します。
- tokioクレートの
join_all
関数を使って、全てのタスクを並列実行します。 - 3の
join_all
で帰ってきた結果をログに出力します。 - エラーがある場合はエラーを返却し、Lambda関数を異常終了させます。
- エラーがなければ正常終了します。
画像処理の実装
1で使用するImageTask構造体は以下のように定義されており、Lambdaの実行に必要な情報を保持しています。
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
pub enum TaskType {
Grayscale,
Delete,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ImageTask {
pub bucket_name: String,
#[serde(rename = "id")]
pub object_key: String,
pub sequencer: S3Sequencer,
pub task_type: TaskType,
pub processing: bool,
}
フィールド名 | 説明 |
---|---|
bucket_name | S3バケット名 |
object_key | オブジェクトキー |
sequencer | S3イベントのsequencer |
task_type | タスクの種類を示す列挙体(Grayscale, Delete) |
processing | 処理中フラグ |
具体的な画像処理についてはImageTask構造体のexecuteメソッド内で実装します。
impl ImageTask {
pub async fn execute(&self) -> Result<(), Error> {
// 1. ロックを取得する
let lock = S3Lock::new(&self).await?;
// 2. タスクの種類に応じて処理を行う
match self.task_type {
TaskType::Grayscale => {
// 画像をグレースケールに変換し、出力バケットに保存する
let body = lock.read_input_object().await?;
let format = image::ImageFormat::from_path(&self.object_key)?;
let img = image::load_from_memory_with_format(&body, format)?;
let img = img.grayscale();
let mut buf = Vec::new();
img.write_to(&mut Cursor::new(&mut buf), format)?;
lock.write_output_object(buf).await?;
}
// 画像を出力用バケットから削除する
TaskType::Delete => lock.delete_output_object().await?,
}
// 3. ロックを解放する
lock.free().await?;
Ok(())
}
}
- S3のオブジェクトのデータ不整合が起きないように排他処理をかける
- タスクの種類に応じて処理を行う
- 元のバケットにファイルが追加された場合は、画像をグレースケールに変換し、出力用バケットに保存する
- ファイルが削除された場合には、出力用バケットからファイルを削除する
- 処理が終わったらロックを解放する
ロック処理を実装する
- ロック処理を実装するため、S3Lock構造体を定義します。
pub struct S3Lock {
dynamodb_client: aws_sdk_dynamodb::Client,
table_name: String,
s3_client: aws_sdk_s3::Client,
input_bucket_name: String,
input_object_key: String,
output_bucket_name: String,
output_object_key: String,
}
具体的なロック取得処理はコンストラクタに実装します。
少しコードが長いですが、ざっくり言うと以下の通りです。
- DynamoDBに書き込みが成功したらロックを取得できたとみなす。
- 書き込みに失敗した場合は2秒ごとにリトライする。
- 30秒以上ロックが取れない場合はタイムアウトする。
以下にロック処理のシーケンス図を示します。
コンストラクタ内のコードは以下の通りです。
impl S3Lock {
pub async fn new(task: &ImageTask) -> Result<Self, Error> {
let table_name = std::env::var("DYNAMODB_TABLE_NAME").unwrap();
let output_bucket_name = std::env::var("OUTPUT_BUCKET_NAME").unwrap();
let require_lock_timeout = Duration::from_secs(
std::env::var("REQUIRE_LOCK_TIMEOUT")
.unwrap_or_else(|_| "30".to_string())
.parse::<u64>()
.unwrap(),
);
let interval_retry_time = Duration::from_secs(
std::env::var("RETRY_INTERVAL")
.unwrap_or_else(|_| "2".to_string())
.parse::<u64>()
.unwrap(),
);
let config = aws_config::load_defaults(aws_config::BehaviorVersion::v2024_03_28()).await;
let s3_client = aws_sdk_s3::Client::new(&config);
let dynamodb_client = aws_sdk_dynamodb::Client::new(&config);
// ロックを取得する
// 実行時間を計測する
let start = Instant::now();
loop {
// 30秒以上ロックが取れない場合はタイムアウトする
if start.elapsed() > require_lock_timeout {
return Err("Failed to acquire lock, timeout".into());
}
// 強力な読み取り整合性を利用してDynamoDBからシーケンサを取得する
let item = dynamodb_client
.get_item()
.table_name(table_name.clone())
.key("id", AttributeValue::S(task.object_key.clone()))
.consistent_read(true)
.send()
.await?;
// 取得したアイテムが存在する場合はシーケンサを比較する
if let Some(item) = item.item {
let item: ImageTask = from_item(item)?;
if task.sequencer <= item.sequencer {
// 自分自身が古いシーケンサの場合は処理する必要がないのでスキップする
return Err("Old sequencer".into());
}
// 自分自身が新しいシーケンサの場合は他の処理が終わるまで待機する
if item.processing {
warn!(
"Waiting for other process to finish task, retrying, remaining time: {:?}",
require_lock_timeout - start.elapsed()
);
thread::sleep(interval_retry_time);
continue;
}
}
// DynamoDBに条件付き書き込みでロックを取得する
// その際にレコードが存在していたらprocessingフラグがfalseの場合のみ書き込む
let resp = dynamodb_client
.put_item()
.table_name(table_name.clone())
.set_item(Some(to_item(&task).unwrap()))
.condition_expression("attribute_not_exists(id) OR processing = :false")
.expression_attribute_values(":false", AttributeValue::Bool(false))
.send()
.await;
// 取得できたらループを抜け処理を続行する
// 取得できなかった場合はロックが取れるまでリトライを繰り返す
match resp {
Ok(_) => break,
Err(SdkError::ServiceError(e)) => match e.err() {
PutItemError::ConditionalCheckFailedException(_) => {
warn!(
"Failed to acquire lock, retrying, remaining time: {:?}",
require_lock_timeout - start.elapsed()
);
thread::sleep(Duration::from_secs(2));
continue;
}
_ => return Err(format!("{:?}", e).into()),
},
Err(e) => return Err(e.into()),
}
}
return Ok(Self {
dynamodb_client,
output_bucket_name,
s3_client,
table_name,
input_bucket_name: task.bucket_name.clone(),
input_object_key: task.object_key.clone(),
output_object_key: task.object_key.clone(),
});
}
}
ロックを解除する処理は以下のように実装しており、processingフラグをfalseに更新することでロックを解除します。
impl S3Lock {
pub async fn free(self) -> Result<(), Error> {
// DynamoDBのロックを解放する
// processingフラグのみを更新する
self.dynamodb_client
.update_item()
.table_name(self.table_name)
.key("id", AttributeValue::S(self.input_object_key))
.update_expression("SET processing = :false")
.expression_attribute_values(":false", AttributeValue::Bool(false))
.send()
.await?;
Ok(())
}
}
S3オブジェクトを触るのにロックを強制させたいため、S3LockにS3のオブジェクトを操作するメソッドを生やしています[5]。
impl S3Lock {
pub async fn read_input_object(&self) -> Result<Vec<u8>, Error> {
// S3オブジェクトを取得する
let object = self
.s3_client
.get_object()
.bucket(&self.input_bucket_name)
.key(&self.input_object_key)
.send()
.await?;
let body = object.body.collect().await?.to_vec();
Ok(body)
}
pub async fn write_output_object(&self, buf: Vec<u8>) -> Result<(), Error> {
// S3オブジェクトを保存する
let byte_stream = ByteStream::from(buf);
self.s3_client
.put_object()
.bucket(&self.output_bucket_name)
.key(&self.output_object_key)
.body(byte_stream)
.send()
.await?;
Ok(())
}
pub async fn delete_output_object(&self) -> Result<(), Error> {
// S3オブジェクトを削除する
self.s3_client
.delete_object()
.bucket(&self.output_bucket_name)
.key(&self.output_object_key)
.send()
.await?;
Ok(())
}
}
実際に動かしてみる
サンプルコードを実際に動かしてみましょう。
グレースケールにしたい画像を用意しなければならないわけですが、今回は兵庫県立公園あわじ花さじき[6]の画像を使用します。
インフラ構築
まずはterraform applyしてAWSインフラを構築します。
GitHubレポジトリをクローンして、以下のコマンドを実行してください。
cd terraform
# variables.tfやprovider.tfをよしなに修正しておく
terraform init
terraform apply
S3バケットに画像ファイルをアップロード
インフラ構築ができたら、入力用のS3バケットに画像ファイルをアップロードします。
アップロードが終わるとLambda関数の処理が始まり、DynamoDBのテーブルにアイテムが追加されます。
処理が終わると出力用のS3バケットに画像ファイルが保存され、DynamoDBのアイテムのprocessing
フラグがfalseになります。
出力用のS3バケットに画像が追加され、グレースケールに変換されていることが確認できます。
S3バケットから画像ファイルを削除
入力用バケットからオブジェクトが削除されると、出力用バケットからもオブジェクトが削除されます。
排他処理が効いているか確認
DynamoDBに追加されるアイテムのprocessing
フラグがtrueになると、処理すべきS3イベントが飛んできたとしてもその処理は待機します。
この挙動を確かめるためにDynamoDBのprocessing
フラグをわざとtrueにして、同じ名前のファイルをアップロードしてみます。
CloudWatch Logsを見ると、新たに発生したS3イベントが処理されずに他の処理の完了を待機していることがわかります。
DynamoDBのprocessing
フラグをfalseに戻すと、処理が再開されます。
排他処理のおかげで、削除イベントとアップロードイベントがほぼ同時に発生しても処理の順序が保証されます。
まとめ
今回はS3イベントの順序を保証するためのシーケンサを利用した、画像処理のサンプルコードを紹介しました。
S3イベントの順序を保証するためには、シーケンサを利用してイベントの順序を比較する必要があります。
自分の趣味でRustでサンプルコードを実装してみましたが、他の言語でも同様の実装が可能なはずです。
ぜひ参考にしてみてください。
関連記事 | Related Posts
We are hiring!
【データエンジニア】データエンジニアリングG/名古屋・大阪
データ分析部について※データ分析部は、データエンジニアリングGが所属している部門です。KINTOにおいて開発系部門発足時から設置されているチームであり、それほど経営としても注力しているポジションです。
【KINTO FACTORYバックエンドエンジニア】KINTO FACTORY開発G/大阪
KINTO FACTORYについて自動車のソフトウェア、ハードウェア両面でのアップグレードを行う新サービスです。トヨタ・レクサスの車をお持ちのお客様にOTAやハードウェアアップデートを通してリフォーム、アップグレード、パーソナライズなどを提供し購入後にも進化続ける自動車を提供するモビリティ業界における先端のサービスの開発となります。