DynamoDB更新をトリガーにSlack通知する機能をローカルで開発してデプロイしてみた
はじめに
こんにちは、SHIFTの開発部門に所属している Katayama です。
今回は、DynamoDB → Lambda → Slack通知という構成で、DynamoDBへのデータ追加をトリガーにSlackに通知するサービスを serverless アプリケーションとして開発する、というのをやってみる。過去の記事同様に、serverless-offlineとその関連 plugin を導入する事で、ローカルの開発環境でも検証を行えるようにして開発していきたいと思う。
開発の流れとしては以下のようになる。
事前準備
AWSサービスのエミュレーターLocalStackをセットアップ
LocalStack上にDynamoDBを作成
開発
serverless.yamlを設定
serverless-offlineとpluginを導入し、ローカルでDynamoDB Streamsをエミュレートする
LambdaでDynamoDB Streamsからのイベントの受信確認をする
SlackのIncoming WebhooksでSlack通知する
AWSにDeploy(本記事では取り上げないが、"sls deploy"コマンドを実行するだけで簡単にできる)
※今回はあるサービスを社内向けにαリリースした際に、そのサービス利用で生成されるドキュメントの内容を管理者側で確認し、いいドキュメントが生成されているか?をDynamoDBを定期的に見に行かずとも確認できるようにするための通知機能として開発した。
事前準備
AWSサービスのエミュレーターLocalStackをセットアップ
こちらについては以前の記事と全く同じなので本記事では割愛する。
LocalStack上にDynamoDBを作成
AWS CLIがインストール済みである事を前提にしている。インストールがまだの場合は公式を参照しインストールする。クレデンシャルについては、LocalStackにリクエストをする場合は以下のようなダミーで問題ない。
$ aws configure list
Name Value Type Location
---- ----- ---- --------
profile dumy env ['AWS_PROFILE', 'AWS_DEFAULT_PROFILE']
access_key ****************dumy shared-credentials-file
secret_key ****************dumy shared-credentials-file
region ap-northeast-1 config-file ~/.aws/config
以下のようなコマンドでDynamoDBを作成できる。作成後、LatestStreamArnの部分は後にserverless.yamlの設定時に利用するのでメモしておく。
$ aws dynamodb create-table --table-name dydb-stream --attribute-definitions AttributeName=id,AttributeType=S --key-schema AttributeName=id,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE --endpoint=http://localhost:4566
{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"TableName": "dydb-stream",
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"TableStatus": "ACTIVE",
"CreationDateTime": "2023-09-05T18:30:54.069000+09:00",
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/dydb-stream",
"TableId": "c6819541-5ef2-4cc1-bf14-40b9b6fd52cf",
"StreamSpecification": {
"StreamEnabled": true,
"StreamViewType": "NEW_IMAGE"
},
"LatestStreamLabel": "2023-09-04T18:30:54.069",
"LatestStreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/dydb-stream/stream/2023-09-04T18:30:54.069"
}
}
オプションの詳細は公式に書かれているが、いくつか補足する。
--stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE
DynamoDB Streams と AWS Lambda のトリガーに書かれているように、DyanamoDBにはそのテーブルのデータ変更をトリガーにLambdaを呼び出すことができるようになっているが、DynamoDBを作成しただけではDynamoDB Streamsは作成されないので、明示的に"stream-specification"オプションでその設定をしている(DynamoDB Streamsについての詳細は公式を参照)。StreamViewTypeについては、今回は変更(データ追加・更新)後のすべてのカラムのデータをストリーミングに書き込むような設定にしている。--endpoint=http://localhost:4566
今回はローカル環境のLocalStackに対するリクエストになるのでエンドポイントを変更している。
開発
serverless.yamlを設定
DynamoDBとLambdaを作成するための設定を行う。DynamoDBについてはCloudFormationの設定方法に準じる。一部省略しているが、設定内容としては以下のようになるだろう。ちなみに、今回はLambdaをES Moduleで開発する事にしたので以前の記事のようにWebpackでのビルドはしない。
...
provider:
name: aws
region: ap-northeast-1
runtime: nodejs16.x
stage: ${opt:stage, self:custom.defaultStage}
iam:
role:
statements:
- Effect: Allow
Action:
# https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial.html#Streams.Lambda.Tutorial.CreateRole
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:ListStreams
Resource:
- !GetAtt DynamodbStream.Arn
package:
patterns:
- src
- package.json
- yarn.lock
- ...
custom:
defaultStage: local
resources:
Resources:
DynamodbStream:
Type: AWS::DynamoDB::Table
Properties:
TableName: dydb-stream
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
StreamSpecification:
StreamViewType: NEW_IMAGE
functions:
SlackNotification:
name: slack-notification
handler: ./src/slackNotification.handler
timeout: 5
environment:
STAGE: ${self:provider.stage}
events:
- stream:
enabled: true
type: dynamodb
arn: !GetAtt DynamodbStream.StreamArn
batchSize: 10
startingPosition: LATEST
上記の設定で一部補足をする。
provider.iam.role
Lambdaの実行ロールを設定するためのセクションで詳細はIAM Permissions For Functionsを参照。今回はDynamoDB Streamsのストリーミングを受け取れるように公式を参照してIAMを設定している。!GetAttはCloudFormationの組み込み関数で、serverlessFWでも利用できるものになる(一部、serverlessFWでは利用できない組み込み関数もある)。
functions.SlackNotification.events
serverlessFWではAWS Lambda Eventsに書かれているような簡単な方法でLambda関数のトリガー条件を設定できるが、今回はその中のDynamoDB / Kinesis Streamsに記載があるDynamoDB StreamsをLambdaのトリガーに設定している。細かいオプションについては公式を参照。
トラブルシューティング
Lambdaの開発であるあるだが、DeployするとLambdaの実行ロールが未設定でエラーになるという事がある。これはローカル開発時に利用するAWSエミュレーターであるLocalstackはIAMのチェックはされないため、気づかずにDeployしてしまうからだが、Deploy前にはLambdaが参照する他のリソースへのアクセス権(IAM)があるか?のチェックをすることが重要。
serverless-offlineとpluginを導入し、ローカルでDynamoDB Streamsをエミュレートする
ローカルの開発環境でもDynamoDB Streamsのストリーミングにデータが流れたらLambdaをトリガーできるように、エミュレーターの設定を行う。DynamoDB Streamsをエミュレートするにはserverless-offline-dynamodb-streamsが利用できる。
まずは、serverless-offline-dynamodb-streamsとserverless-offlineをプロジェクトの依存に追加する。
$ yarn add -D serverless-offline-dynamodb-streams serverless-offline
続いてserverless.yamlに以下を追記する。
...
provider:
...
package:
...
plugins:
- serverless-offline-dynamodb-streams # <- 追記
- serverless-offline # <- 追記
custom:
defaultStage: local
serverless-offline-dynamodb-streams: # <- 追記
apiVersion: '2013-12-02'
endpoint: http://localhost:4566
region: ap-northeast-1
accessKeyId: root
secretAccessKey: root
skipCacheInvalidation: false
readInterval: 500
resources:
Resources:
DynamodbStream:
...
functions:
SlackNotification:
...
ここまでの追加設定で、以下のようにserverless-offline-dynamodb-streamsがDynamoDB Streamsに対しGetRecordsを一定の間隔で行うようになり、実際にストリーミングにデータが流れればそれを取得し、Lambdaに流してくれるようになる。
...
localstack_chatgpt | 2023-09-05T02:26:54.182 INFO --- [ asgi_gw_1] localstack.request.aws : AWS dynamodbstreams.DescribeStream => 200
localstack_chatgpt | 2023-09-05T02:26:54.236 INFO --- [ asgi_gw_2] localstack.request.aws : AWS dynamodbstreams.DescribeStream => 200
localstack_chatgpt | 2023-09-05T02:26:54.257 INFO --- [ asgi_gw_1] localstack.request.aws : AWS dynamodbstreams.GetShardIterator => 200
localstack_chatgpt | 2023-09-05T02:26:54.282 INFO --- [ asgi_gw_2] localstack.request.aws : AWS dynamodbstreams.GetRecords => 200
localstack_chatgpt | 2023-09-05T02:26:54.796 INFO --- [ asgi_gw_1] localstack.request.aws : AWS dynamodbstreams.GetRecords => 200
...
LambdaでDynamoDB Streamsからのイベントの受信確認をする
Lambdaの方で実際にイベントが来るか?を確認してみると、以下のようにconsole.logでDynamoDB Streamsのデータがイベントで届いていることが確認できる("Records[0].dynamodb"の部分のデータの形式の詳細は公式の例を参照)。
export const handler = async (event) => {
console.log(event);
};
{
Records: [
{
eventSourceARN: 'arn:aws:dynamodb:ap-northeast-1:undefined:document-histories',
awsRegion: 'ap-northeast-1',
eventID: '05e30a40',
eventName: 'INSERT',
eventVersion: '1.1',
eventSource: 'aws:dynamodb',
dynamodb: [Object]
}
]
}
SlackのIncoming WebhooksでSlack通知する
LambdaでDynamoDB Streamsからのデータを受け取る部分までできれば、あとはSlack通知する部分の実装になる。Slack通知はIncoming Webhooksで行うが、詳細は公式に書かれているのでそちらを参照。エンドポイントになるURLを発行できたら後はPOSTリクエストを送るだけだが、こういう場合にはSDKを使う方が実装しやすいので@slack/webhookを利用する。
実装としては今回は以下のような形で、データの中の一部をSlackに通知するもの。
/* eslint-disable no-await-in-loop */
...
import { unmarshall } from '@aws-sdk/util-dynamodb';
import { IncomingWebhook } from '@slack/webhook';
import config from 'config';
import camelcaseKeys from 'camelcase-keys';
import { DateTime } from 'luxon';
...
const webhook = new IncomingWebhook(config.get('notification.slack.webhook'));
// eslint-disable-next-line import/prefer-default-export
export const handler = async (event) => {
try {
await Promise.all(
event.Records.map(async (message) => {
const data = camelcaseKeys(unmarshall(message.dynamodb.NewImage));
await webhook.send({
// https://api.slack.com/reference/messaging/attachments
attachments: [
{
mrkdwn_in: ['text'],
color: '#36a64f',
pretext: `DynamoDB Streamsの結果`,
author_name: data.userName,
title: `メアド: ${data.email}`,
text: `${data.text}`,
ts: DateTime.fromMillis(data.requestedMs).toUnixInteger()
}
]
});
})
);
} catch (e) {
logger.error({ message: e.message, e });
}
};
上記のコードについて少し補足する。
unmarshall
DynamoDBはデータ型を示す記述子がJSONの中に含まれるのが特徴。そのため、DynamoDB StreamsからLambdaに渡ってくるイベントの中身も以下のようにそのデータ型記述子がJSONのキーになっている状態になっている。
これだとSlack通知をする際にデータを取り出すのが面倒になるのでどうにかしたい。DynamoDBにLambda内から接続するのであればDynamoDBClientとドキュメントクライアントである@aws-sdk/lib-dynamodbの組み合わせでそのあたりをよしなにしてくれる。今回はDynamoDBに接続してデータを受け取るわけではないので、@aws-sdk/lib-dynamodbを利用することはできない。ただ、このライブラリの中で利用されている@aws-sdk/util-dynamodbにデータ型記述子に基づいてよしなにJavaScriptのオブジェクトに変換してくれる関数があるので、今回はそれを利用している。
webhook.send({...}) Slackのメッセージを構築する方法については、公式のビルダーツールを使うとやりやすいと思う。
上記のように実装した後、DynamoDBにデータを追加すると、以下のようにSlackに通知されることが確認できる。
まとめとして
今回はDynamoDBへのデータ追加・変更などをDynamoDB Streamsのストリーミングに流し、そのストリーミングのデータの流れをトリガーにしてLambdaをトリガーする、という事をローカルの開発環境でも検証できるようにしつつ開発してみるという事をやってみた。
以前、Kinesis Data Streamに流したデータをLambdaで受け取るイベント駆動を構築したことがあったが、DynamoDBもストリーミングの部分はどうやらKinesisと同じような感じで利用できるようだ。
今回は取り上げなかったが、公式に書かれている通り、データのフィルタリングのようなこともできるようなので、データの追加アクション時の様々なユースケースを満たせるのではないかと感じた。
《この公式ブロガーの記事一覧》
《品質や開発のお問合せはお気軽に》
SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/
SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/
SHIFTの導入事例
https://service.shiftinc.jp/case/
お役立ち資料はこちら
https://service.shiftinc.jp/resources/
SHIFTの採用情報はこちら
PHOTO:UnsplashのMuha Ajjan