SQSのメッセージ重複を回避する実装をlambdaとDynamoDBでやってみた
はじめに
こんにちは、SHIFTの開発部門に所属している Katayama です。
SQSからLambdaでSESメール配信する構成をserverless-liftでやってみたでは、以下のような構成でSQS→Lambda→SESでメール配信する構成を作るというのをやってみた。
ただ、AWS SQS(スタンダードキュー)には以下の引用の通り、メッセージは少なくとも1回は確実に配信されるが、複数の同一メッセージが配信される事もある。そのため、メール配信のようなサービスを構築したい場合、全く同じメールが2通ユーザーに届いてしまうなどのデメリットがある。
今回はこのデメリットを克服し、SQSの高スループットのメリットを最大限享受できるような実装をやってみたいと思う。
※本記事の内容は、serverlessのLambda開発環境としてserverless-webpackでトランスパイル、ESLint、エイリアス利用を設定してみたにあるようなserverless-webpackの設定や、serverless-offlineとそのpluginを利用し、SQS→Lambda→SESのメール送信をローカル環境で検証できるようにするで取り上げたserverless-offlineを利用したローカルの開発環境で検証できるようなセットアップが完了している事を前提にしている。
※今回の検証では以下のバージョンを使用している。
Node.js:16.17.0
serverless:3.25.1
serverless-offline:12.0.3
DynamoDBを利用して重複を排除する
考え方
まず前提として、Amazon SQS での Lambda の使用を参照すればわかるが、Lambdaで受け取るイベントのSQSのメッセージにはmd5OfBodyという属性があり、これはReceiveMessageに書かれている通り、メッセージ本文のMD5の値になっている。
ということは、md5OfBodyを見れば、SQSのメッセージのbodyが同じか、別物か?の区別ができるようになる。
今回の実装では上記のmd5OfBodyを利用して重複を排除する実装をやってみたいと思う。具体的には、DynamoDBにmd5OfBodyを保存しておき、DynamoDBにデータがあれば同じメール文のメッセージキューイベントと判断できるようにする事で、SQSからのイベントのメッセージで同一のものはskipする(同一のメール文は送信しない)という実装が実現できる。
※SQSのメッセージボディー(SendMessageのMessageBody)にはメール本文だけではなく、SESのメール送信に必要な情報(Lambda 関数の実装の章で出てきていたmailTo、subject、bodtText、createdAtなど)も含まれる。そのため、メール本文のmd5では簡単に重複してしまい、重複排除が意図した動きにならないが、メッセージボディーにはcreatedAtがあるので、メール送信用のSQSのキューを作成した時間で区別され、意図した重複排除になる想定。
※重複排除にDynamoDBを利用する意図だが、排除のためのデータは一時的なもので一定時間で揮発させたいという事があった。そのため、S3に登録してそれを select するような考え方ではなく、DynamoDBで一時的なデータを作成しそれを揮発させるような考え方をしている。
実際の実装と、serverless.yamlの設定
まず、serverless.yamlで今回利用するDynamoDBを構築するための記述を追加する(AWS::DynamoDB::Tableが参考になる)。
# serverless.yaml
# 省略
resources:
Resources:
sqsToSeSQueueTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: sqs2ses-queue-table-${sls:stage}
AttributeDefinitions:
- AttributeName: md5
AttributeType: S
KeySchema:
- AttributeName: md5
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
# 省略
上記では、md5というキーをパーティションキー(プライマリーキー)にもつテーブルを定義している。また、このテーブルのデータはDynamoDB の有効期限 (TTL) を使用して項目を期限切れにするに書かれている機能を利用できるように、TimeToLiveSpecificationの設定も行っている。これにより、ttlというキーにunix timestampを設定する事で、それを迎えたデータを期限切れにし、自動削除されるようにしている。今回、DynamoDBに保存するデータは、SQSからのメッセージイベントの重複を排除するためだけにあり、一時的に存在すればいいので、指定した短い時間で揮発させる設計にしている。
続いて、DynamoDBにデータがあればメールの送信をskipさせ、メール送信を行った場合にはDynamoDBにデータを保存する、という実装を行う。
// ./src/sqs2ses.js
// 省略
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb';
import { DateTime } from 'luxon';
// 省略
const sendmail = async (options = {}) => {
// 省略
};
// eslint-disable-next-line import/prefer-default-export
export const handler = async (event) => {
// 省略
const {
STAGE: stage,
REGION: region,
SES_IDENTITY_ARN: sesIdentityArn
} = process.env;
// 省略
const isLocal = stage === 'local';
const dynamoDBClient = new DynamoDBClient(
isLocal ? { endpoint: 'http://localhost:4566', region } : { region }
);
const ddbDocClient = DynamoDBDocument.from(dynamoDBClient);
const tableName = `sqs2ses-queue-table-${stage}`;
// 省略
try {
await Promise.all(
event.Records.map(async (message) => {
try {
const result = await ddbDocClient.get({
TableName: tableName,
Key: { md5: message.md5OfBody }
});
if (result.Item) {
logger.info({
message: `already send mail`,
md5OfBody: message.md5OfBody
});
return;
}
await sendmail({ sesClient, message, sesIdentityArn });
const ttl = DateTime.now().plus({ days: 1 }).toUnixInteger();
await ddbDocClient.put({
TableName: tableName,
Item: { md5: message.md5OfBody, ttl }
});
logger.info({
message: `put data`,
md5OfBody: message.md5OfBody,
ttl
});
} catch (e) {
logger.error({ message: e.message, stack: e.stack });
}
})
);
} catch (e) {
logger.error({ message: e.message, stack: e.stack });
}
};
実装としては単純で、SQSからのイベントに含まれるmd5OfBodyをキーにDynamoDBからデータを取得し、データ(Item)があれば既に送信済みのメールであると判断し、loggingだけして処理をskip。データが見つからなければメールを送信した後、DynamoDBにmd5OfBodyを保存する、という事をやっている。これによりSQSのスタンダードキューで困るメッセージの重複を防止できる。
上記で今回やりたかった実装はできているが、serverless-offlineとそのpluginを利用し、SQS→Lambda→SESのメール送信をローカル環境で検証できるようにするで取り上げたように、ローカル環境で検証・テストできる方が開発者体験としても品質保証の観点からもよいので、次の章ではローカル環境で今回開発した重複排除のロジックがうまく動くか?を検証・テストしてみたいと思う。
※DynamoDBのNode.jsクライアントには、DynamoDBDocumentを利用し、本来必要になる型の宣言を省略できるように実装している(詳細はここなどを参照)
ローカル環境でDynamoDBのエミュレーターを使って、重複排除ができているか?検証する
DynamoDBのエミュレーターには、事前準備 LocalStackを docker-compose で立てるで取り上げたLocalstackを利用する。
ローカル環境でLocalstackのDynamoDBに接続するには、単にLocalstackのエンドポイントをDynamoDBClientのconfigに設定するだけでいい。ただ、DynamoDBのテーブルはAWS CLIで作成する必要がある。
"aws.dynamodb.create-table": "aws dynamodb create-table --table-name sqs2ses-queue-table-local --attribute-definitions AttributeName=md5,AttributeType=S --key-schema AttributeName=md5,KeyType=HASH --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1 --endpoint=http://localhost:4566"
テーブルを作成した後は、実際にSQSにメッセージを送信し、ローカル上でメールの送信確認をしてみる(詳細はserverless-offlineとそのpluginを利用し、SQS→Lambda→SESのメール送信をローカル環境で検証できるようにするを参照)。
上記の動画を見て分かるように、動きとしては以下に記載している通りで同一メッセージの重複が排除できている事が確認できている。
1回目のSQSへのメッセージ送信後にはメール送信が行われる(ログからそれが確認できる)
2回目のSQSへのメッセージ送信後にはメール送信が行われず、既にメール送信済みというログが出力されるだけで処理が終了している
※ちなみに、serverless-dynamodb-localというserverlessのプラグインも存在し、このプラグインではDynamoDB LocalというLocalstackとはまた別物だが、Localstack同様にDynamoDBのエミュレーターを起動し、オプションの設定によってはserverless.yamlのDynamoDBのリソース設定に基づいて、DynamoDB Localの起動時にテーブルも同時に作成してくれる。
まとめとして
今回はSQSのメッセージの重複を排除する実装をやってみた。AWS環境でSQSのメッセージキューが重複してLambdaに送信される際の検証は難しい(いつ発生するか不明)なので、ローカル環境で全く同じメッセージをSQSに送信する事で、重複排除がされ、メール送信は1回のみしか行われない事を確認できた。
重複が問題になる場合には、今回やったような方法が利用できるかもしれない。
お問合せはお気軽に
https://service.shiftinc.jp/contact/
SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/
SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/
SHIFTの導入事例
https://service.shiftinc.jp/case/
お役立ち資料はこちら
https://service.shiftinc.jp/resources/
SHIFTの採用情報はこちら
https://recruit.shiftinc.jp/career/