見出し画像

SQSのメッセージ重複を回避する実装をlambdaとDynamoDBでやってみた

はじめに

こんにちは、SHIFTの開発部門に所属している Katayama です。

SQSからLambdaでSESメール配信する構成をserverless-liftでやってみたでは、以下のような構成でSQS→Lambda→SESでメール配信する構成を作るというのをやってみた。

ただ、AWS SQS(スタンダードキュー)には以下の引用の通り、メッセージは少なくとも1回は確実に配信されるが、複数の同一メッセージが配信される事もある。そのため、メール配信のようなサービスを構築したい場合、全く同じメールが2通ユーザーに届いてしまうなどのデメリットがある。

少なくとも1回の配信: メッセージは少なくとも 1 回は確実に配信されますが、ときどき複数のメッセージのコピーが配信されることもあります。
キューのタイプ

Amazon SQS は、冗長性と高可用性を確保するため、メッセージのコピーが複数のサーバーに保存されます。まれではありますが、メッセージを受信または削除するときに、メッセージのコピーが保存されているサーバーの 1 台が使用できない場合があります。 この場合、使用できないサーバーではメッセージのコピーが削除されず、メッセージの受信時に、そのメッセージコピーをもう一度受け取る場合があります。アプリケーションがべき等になるよう設計する必要があります (同じメッセージを繰り返し処理した場合にも悪影響が発生しないように設計する必要があります)。
少なくとも 1 回の配信より引用

今回はこのデメリットを克服し、SQSの高スループットのメリットを最大限享受できるような実装をやってみたいと思う。

※本記事の内容は、serverlessのLambda開発環境としてserverless-webpackでトランスパイル、ESLint、エイリアス利用を設定してみたにあるようなserverless-webpackの設定や、serverless-offlineとそのpluginを利用し、SQS→Lambda→SESのメール送信をローカル環境で検証できるようにするで取り上げたserverless-offlineを利用したローカルの開発環境で検証できるようなセットアップが完了している事を前提にしている。

※今回の検証では以下のバージョンを使用している。

  • Node.js:16.17.0

  • serverless:3.25.1

  • serverless-offline:12.0.3

DynamoDBを利用して重複を排除する

考え方

まず前提として、Amazon SQS での Lambda の使用を参照すればわかるが、Lambdaで受け取るイベントのSQSのメッセージにはmd5OfBodyという属性があり、これはReceiveMessageに書かれている通り、メッセージ本文のMD5の値になっている。

An MD5 digest of the message body. For information about MD5, see RFC1321.

ということは、md5OfBodyを見れば、SQSのメッセージのbodyが同じか、別物か?の区別ができるようになる。

今回の実装では上記のmd5OfBodyを利用して重複を排除する実装をやってみたいと思う。具体的には、DynamoDBにmd5OfBodyを保存しておき、DynamoDBにデータがあれば同じメール文のメッセージキューイベントと判断できるようにする事で、SQSからのイベントのメッセージで同一のものはskipする(同一のメール文は送信しない)という実装が実現できる。

※SQSのメッセージボディー(SendMessageのMessageBody)にはメール本文だけではなく、SESのメール送信に必要な情報(Lambda 関数の実装の章で出てきていたmailTo、subject、bodtText、createdAtなど)も含まれる。そのため、メール本文のmd5では簡単に重複してしまい、重複排除が意図した動きにならないが、メッセージボディーにはcreatedAtがあるので、メール送信用のSQSのキューを作成した時間で区別され、意図した重複排除になる想定。

※重複排除にDynamoDBを利用する意図だが、排除のためのデータは一時的なもので一定時間で揮発させたいという事があった。そのため、S3に登録してそれを select するような考え方ではなく、DynamoDBで一時的なデータを作成しそれを揮発させるような考え方をしている。

実際の実装と、serverless.yamlの設定

まず、serverless.yamlで今回利用するDynamoDBを構築するための記述を追加する(AWS::DynamoDB::Tableが参考になる)。

# serverless.yaml
# 省略
resources:
  Resources:
    sqsToSeSQueueTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: sqs2ses-queue-table-${sls:stage}
        AttributeDefinitions:
          - AttributeName: md5
            AttributeType: S
        KeySchema:
          - AttributeName: md5
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TimeToLiveSpecification:
          AttributeName: ttl
          Enabled: true
# 省略

上記では、md5というキーをパーティションキー(プライマリーキー)にもつテーブルを定義している。また、このテーブルのデータはDynamoDB の有効期限 (TTL) を使用して項目を期限切れにするに書かれている機能を利用できるように、TimeToLiveSpecificationの設定も行っている。これにより、ttlというキーにunix timestampを設定する事で、それを迎えたデータを期限切れにし、自動削除されるようにしている。今回、DynamoDBに保存するデータは、SQSからのメッセージイベントの重複を排除するためだけにあり、一時的に存在すればいいので、指定した短い時間で揮発させる設計にしている。

続いて、DynamoDBにデータがあればメールの送信をskipさせ、メール送信を行った場合にはDynamoDBにデータを保存する、という実装を行う。

// ./src/sqs2ses.js
// 省略
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb';
import { DateTime } from 'luxon';
// 省略
const sendmail = async (options = {}) => {
	// 省略
};

// eslint-disable-next-line import/prefer-default-export
export const handler = async (event) => {
	// 省略
	const {
		STAGE: stage,
		REGION: region,
		SES_IDENTITY_ARN: sesIdentityArn
	} = process.env;
	// 省略

	const isLocal = stage === 'local';

	const dynamoDBClient = new DynamoDBClient(
		isLocal ? { endpoint: 'http://localhost:4566', region } : { region }
	);
	const ddbDocClient = DynamoDBDocument.from(dynamoDBClient);
	const tableName = `sqs2ses-queue-table-${stage}`;

	// 省略
	try {
		await Promise.all(
			event.Records.map(async (message) => {
				try {
					const result = await ddbDocClient.get({
						TableName: tableName,
						Key: { md5: message.md5OfBody }
					});
					if (result.Item) {
						logger.info({
							message: `already send mail`,
							md5OfBody: message.md5OfBody
						});
						return;
					}

					await sendmail({ sesClient, message, sesIdentityArn });

					const ttl = DateTime.now().plus({ days: 1 }).toUnixInteger();
					await ddbDocClient.put({
						TableName: tableName,
						Item: { md5: message.md5OfBody, ttl }
					});
					logger.info({
						message: `put data`,
						md5OfBody: message.md5OfBody,
						ttl
					});
				} catch (e) {
					logger.error({ message: e.message, stack: e.stack });
				}
			})
		);
	} catch (e) {
		logger.error({ message: e.message, stack: e.stack });
	}
};

実装としては単純で、SQSからのイベントに含まれるmd5OfBodyをキーにDynamoDBからデータを取得し、データ(Item)があれば既に送信済みのメールであると判断し、loggingだけして処理をskip。データが見つからなければメールを送信した後、DynamoDBにmd5OfBodyを保存する、という事をやっている。これによりSQSのスタンダードキューで困るメッセージの重複を防止できる。

上記で今回やりたかった実装はできているが、serverless-offlineとそのpluginを利用し、SQS→Lambda→SESのメール送信をローカル環境で検証できるようにするで取り上げたように、ローカル環境で検証・テストできる方が開発者体験としても品質保証の観点からもよいので、次の章ではローカル環境で今回開発した重複排除のロジックがうまく動くか?を検証・テストしてみたいと思う。

※DynamoDBのNode.jsクライアントには、DynamoDBDocumentを利用し、本来必要になる型の宣言を省略できるように実装している(詳細はここなどを参照)

ローカル環境でDynamoDBのエミュレーターを使って、重複排除ができているか?検証する

DynamoDBのエミュレーターには、事前準備 LocalStackを docker-compose で立てるで取り上げたLocalstackを利用する。

ローカル環境でLocalstackのDynamoDBに接続するには、単にLocalstackのエンドポイントをDynamoDBClientのconfigに設定するだけでいい。ただ、DynamoDBのテーブルはAWS CLIで作成する必要がある。

"aws.dynamodb.create-table": "aws dynamodb create-table --table-name sqs2ses-queue-table-local --attribute-definitions AttributeName=md5,AttributeType=S --key-schema AttributeName=md5,KeyType=HASH --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1 --endpoint=http://localhost:4566"

テーブルを作成した後は、実際にSQSにメッセージを送信し、ローカル上でメールの送信確認をしてみる(詳細はserverless-offlineとそのpluginを利用し、SQS→Lambda→SESのメール送信をローカル環境で検証できるようにするを参照)。

上記の動画を見て分かるように、動きとしては以下に記載している通りで同一メッセージの重複が排除できている事が確認できている。

  • 1回目のSQSへのメッセージ送信後にはメール送信が行われる(ログからそれが確認できる)

  • 2回目のSQSへのメッセージ送信後にはメール送信が行われず、既にメール送信済みというログが出力されるだけで処理が終了している

※ちなみに、serverless-dynamodb-localというserverlessのプラグインも存在し、このプラグインではDynamoDB LocalというLocalstackとはまた別物だが、Localstack同様にDynamoDBのエミュレーターを起動し、オプションの設定によってはserverless.yamlのDynamoDBのリソース設定に基づいて、DynamoDB Localの起動時にテーブルも同時に作成してくれる。

まとめとして

今回はSQSのメッセージの重複を排除する実装をやってみた。AWS環境でSQSのメッセージキューが重複してLambdaに送信される際の検証は難しい(いつ発生するか不明)なので、ローカル環境で全く同じメッセージをSQSに送信する事で、重複排除がされ、メール送信は1回のみしか行われない事を確認できた。

重複が問題になる場合には、今回やったような方法が利用できるかもしれない。


執筆者プロフィール:Katayama Yuta
認証認可(SHIFTアカウント)や課金決済のプラットフォーム開発に従事。リードエンジニア。 経歴としては、SaaS ERPパッケージベンダーにて開発を2年経験。 SHIFTでは、GUIテストの自動化やUnitテストの実装などテスト関係の案件に従事したり、DevOpsの一環でCICD導入支援をする案件にも従事。その後現在のプラットフォーム開発に参画。

お問合せはお気軽に
https://service.shiftinc.jp/contact/

SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/

SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/

SHIFTの導入事例
https://service.shiftinc.jp/case/

お役立ち資料はこちら
https://service.shiftinc.jp/resources/

SHIFTの採用情報はこちら
https://recruit.shiftinc.jp/career/