見出し画像

DynamoDB更新をトリガーにSlack通知する機能をローカルで開発してデプロイしてみた


はじめに

こんにちは、SHIFTの開発部門に所属している Katayama です。

今回は、DynamoDB → Lambda → Slack通知という構成で、DynamoDBへのデータ追加をトリガーにSlackに通知するサービスを serverless アプリケーションとして開発する、というのをやってみる。過去の記事同様に、serverless-offlineとその関連 plugin を導入する事で、ローカルの開発環境でも検証を行えるようにして開発していきたいと思う。

開発の流れとしては以下のようになる。

  • 事前準備

    • AWSサービスのエミュレーターLocalStackをセットアップ

    • LocalStack上にDynamoDBを作成

  • 開発

    • serverless.yamlを設定

    • serverless-offlineとpluginを導入し、ローカルでDynamoDB Streamsをエミュレートする

    • LambdaでDynamoDB Streamsからのイベントの受信確認をする

    • SlackのIncoming WebhooksでSlack通知する

  • AWSにDeploy(本記事では取り上げないが、"sls deploy"コマンドを実行するだけで簡単にできる)

※今回はあるサービスを社内向けにαリリースした際に、そのサービス利用で生成されるドキュメントの内容を管理者側で確認し、いいドキュメントが生成されているか?をDynamoDBを定期的に見に行かずとも確認できるようにするための通知機能として開発した。

事前準備

AWSサービスのエミュレーターLocalStackをセットアップ

こちらについては以前の記事と全く同じなので本記事では割愛する。

LocalStack上にDynamoDBを作成

AWS CLIがインストール済みである事を前提にしている。インストールがまだの場合は公式を参照しインストールする。クレデンシャルについては、LocalStackにリクエストをする場合は以下のようなダミーで問題ない。

$ aws configure list
      Name                    Value             Type    Location
      ----                    -----             ----    --------
   profile                     dumy             env    ['AWS_PROFILE', 'AWS_DEFAULT_PROFILE']
access_key     ****************dumy shared-credentials-file    
secret_key     ****************dumy shared-credentials-file    
    region           ap-northeast-1      config-file    ~/.aws/config

以下のようなコマンドでDynamoDBを作成できる。作成後、LatestStreamArnの部分は後にserverless.yamlの設定時に利用するのでメモしておく。

$ aws dynamodb create-table --table-name dydb-stream --attribute-definitions AttributeName=id,AttributeType=S --key-schema AttributeName=id,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE --endpoint=http://localhost:4566
{
    "TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "id",
                "AttributeType": "S"
            }
        ],
        "TableName": "dydb-stream",
        "KeySchema": [
            {
                "AttributeName": "id",
                "KeyType": "HASH"
            }
        ],
        "TableStatus": "ACTIVE",
        "CreationDateTime": "2023-09-05T18:30:54.069000+09:00",
        "ProvisionedThroughput": {
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/dydb-stream",
        "TableId": "c6819541-5ef2-4cc1-bf14-40b9b6fd52cf",
        "StreamSpecification": {
            "StreamEnabled": true,
            "StreamViewType": "NEW_IMAGE"
        },
        "LatestStreamLabel": "2023-09-04T18:30:54.069",
        "LatestStreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/dydb-stream/stream/2023-09-04T18:30:54.069"
    }
}

オプションの詳細は公式に書かれているが、いくつか補足する。

  • --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE
    DynamoDB Streams と AWS Lambda のトリガーに書かれているように、DyanamoDBにはそのテーブルのデータ変更をトリガーにLambdaを呼び出すことができるようになっているが、DynamoDBを作成しただけではDynamoDB Streamsは作成されないので、明示的に"stream-specification"オプションでその設定をしている(DynamoDB Streamsについての詳細は公式を参照)。StreamViewTypeについては、今回は変更(データ追加・更新)後のすべてのカラムのデータをストリーミングに書き込むような設定にしている。

  • --endpoint=http://localhost:4566
    今回はローカル環境のLocalStackに対するリクエストになるのでエンドポイントを変更している。

開発

serverless.yamlを設定

DynamoDBとLambdaを作成するための設定を行う。DynamoDBについてはCloudFormationの設定方法に準じる。一部省略しているが、設定内容としては以下のようになるだろう。ちなみに、今回はLambdaをES Moduleで開発する事にしたので以前の記事のようにWebpackでのビルドはしない。

...
provider:
  name: aws
  region: ap-northeast-1
  runtime: nodejs16.x
  stage: ${opt:stage, self:custom.defaultStage}
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            # https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial.html#Streams.Lambda.Tutorial.CreateRole
            - dynamodb:DescribeStream
            - dynamodb:GetRecords
            - dynamodb:GetShardIterator
            - dynamodb:ListStreams
          Resource:
            - !GetAtt DynamodbStream.Arn

package:
  patterns:
    - src
    - package.json
    - yarn.lock
    - ...

custom:
  defaultStage: local

resources:
  Resources:
    DynamodbStream:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: dydb-stream
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 5
          WriteCapacityUnits: 5
        StreamSpecification:
          StreamViewType: NEW_IMAGE
functions:
  SlackNotification:
    name: slack-notification
    handler: ./src/slackNotification.handler
    timeout: 5
    environment:
      STAGE: ${self:provider.stage}
    events:
      - stream:
          enabled: true
          type: dynamodb
          arn: !GetAtt DynamodbStream.StreamArn
          batchSize: 10
          startingPosition: LATEST

上記の設定で一部補足をする。

  • provider.iam.role
    Lambdaの実行ロールを設定するためのセクションで詳細はIAM Permissions For Functionsを参照。今回はDynamoDB Streamsのストリーミングを受け取れるように公式を参照してIAMを設定している。!GetAttはCloudFormationの組み込み関数で、serverlessFWでも利用できるものになる(一部、serverlessFWでは利用できない組み込み関数もある)。

  • functions.SlackNotification.events
    serverlessFWではAWS Lambda Eventsに書かれているような簡単な方法でLambda関数のトリガー条件を設定できるが、今回はその中のDynamoDB / Kinesis Streamsに記載があるDynamoDB StreamsをLambdaのトリガーに設定している。細かいオプションについては公式を参照。

トラブルシューティング

Lambdaの開発であるあるだが、DeployするとLambdaの実行ロールが未設定でエラーになるという事がある。これはローカル開発時に利用するAWSエミュレーターであるLocalstackはIAMのチェックはされないため、気づかずにDeployしてしまうからだが、Deploy前にはLambdaが参照する他のリソースへのアクセス権(IAM)があるか?のチェックをすることが重要。

serverless-offlineとpluginを導入し、ローカルでDynamoDB Streamsをエミュレートする

ローカルの開発環境でもDynamoDB Streamsのストリーミングにデータが流れたらLambdaをトリガーできるように、エミュレーターの設定を行う。DynamoDB Streamsをエミュレートするにはserverless-offline-dynamodb-streamsが利用できる。

まずは、serverless-offline-dynamodb-streamsとserverless-offlineをプロジェクトの依存に追加する。

$ yarn add -D serverless-offline-dynamodb-streams serverless-offline

続いてserverless.yamlに以下を追記する。

...
provider:
  ...

package:
  ...

plugins:
  - serverless-offline-dynamodb-streams # <- 追記
  - serverless-offline # <- 追記

custom:
  defaultStage: local
  serverless-offline-dynamodb-streams: # <- 追記
    apiVersion: '2013-12-02'
    endpoint: http://localhost:4566
    region: ap-northeast-1
    accessKeyId: root
    secretAccessKey: root
    skipCacheInvalidation: false
    readInterval: 500

resources:
  Resources:
    DynamodbStream:
      ...
functions:
  SlackNotification:
    ...

ここまでの追加設定で、以下のようにserverless-offline-dynamodb-streamsがDynamoDB Streamsに対しGetRecordsを一定の間隔で行うようになり、実際にストリーミングにデータが流れればそれを取得し、Lambdaに流してくれるようになる。

...
localstack_chatgpt  | 2023-09-05T02:26:54.182  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS dynamodbstreams.DescribeStream => 200
localstack_chatgpt  | 2023-09-05T02:26:54.236  INFO --- [   asgi_gw_2] localstack.request.aws     : AWS dynamodbstreams.DescribeStream => 200
localstack_chatgpt  | 2023-09-05T02:26:54.257  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS dynamodbstreams.GetShardIterator => 200
localstack_chatgpt  | 2023-09-05T02:26:54.282  INFO --- [   asgi_gw_2] localstack.request.aws     : AWS dynamodbstreams.GetRecords => 200
localstack_chatgpt  | 2023-09-05T02:26:54.796  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS dynamodbstreams.GetRecords => 200
...

LambdaでDynamoDB Streamsからのイベントの受信確認をする

Lambdaの方で実際にイベントが来るか?を確認してみると、以下のようにconsole.logでDynamoDB Streamsのデータがイベントで届いていることが確認できる("Records[0].dynamodb"の部分のデータの形式の詳細は公式の例を参照)。

export const handler = async (event) => {
	console.log(event);
};
{
  Records: [
    {
      eventSourceARN: 'arn:aws:dynamodb:ap-northeast-1:undefined:document-histories',
      awsRegion: 'ap-northeast-1',
      eventID: '05e30a40',
      eventName: 'INSERT',
      eventVersion: '1.1',
      eventSource: 'aws:dynamodb',
      dynamodb: [Object]
    }
  ]
}

SlackのIncoming WebhooksでSlack通知する

LambdaでDynamoDB Streamsからのデータを受け取る部分までできれば、あとはSlack通知する部分の実装になる。Slack通知はIncoming Webhooksで行うが、詳細は公式に書かれているのでそちらを参照。エンドポイントになるURLを発行できたら後はPOSTリクエストを送るだけだが、こういう場合にはSDKを使う方が実装しやすいので@slack/webhookを利用する。

実装としては今回は以下のような形で、データの中の一部をSlackに通知するもの。

/* eslint-disable no-await-in-loop */
...
import { unmarshall } from '@aws-sdk/util-dynamodb';
import { IncomingWebhook } from '@slack/webhook';
import config from 'config';
import camelcaseKeys from 'camelcase-keys';
import { DateTime } from 'luxon';

...
const webhook = new IncomingWebhook(config.get('notification.slack.webhook'));

// eslint-disable-next-line import/prefer-default-export
export const handler = async (event) => {
	try {
		await Promise.all(
			event.Records.map(async (message) => {
				const data = camelcaseKeys(unmarshall(message.dynamodb.NewImage));
				await webhook.send({
					// https://api.slack.com/reference/messaging/attachments
					attachments: [
						{
							mrkdwn_in: ['text'],
							color: '#36a64f',
							pretext: `DynamoDB Streamsの結果`,
							author_name: data.userName,
							title: `メアド: ${data.email}`,
							text: `${data.text}`,
							ts: DateTime.fromMillis(data.requestedMs).toUnixInteger()
						}
					]
				});
			})
		);
	} catch (e) {
		logger.error({ message: e.message, e });
	}
};

上記のコードについて少し補足する。

  • unmarshall
    DynamoDBはデータ型を示す記述子がJSONの中に含まれるのが特徴。そのため、DynamoDB StreamsからLambdaに渡ってくるイベントの中身も以下のようにそのデータ型記述子がJSONのキーになっている状態になっている。

これだとSlack通知をする際にデータを取り出すのが面倒になるのでどうにかしたい。DynamoDBにLambda内から接続するのであればDynamoDBClientとドキュメントクライアントである@aws-sdk/lib-dynamodbの組み合わせでそのあたりをよしなにしてくれる。今回はDynamoDBに接続してデータを受け取るわけではないので、@aws-sdk/lib-dynamodbを利用することはできない。ただ、このライブラリの中で利用されている@aws-sdk/util-dynamodbにデータ型記述子に基づいてよしなにJavaScriptのオブジェクトに変換してくれる関数があるので、今回はそれを利用している。

  • webhook.send({...}) Slackのメッセージを構築する方法については、公式のビルダーツールを使うとやりやすいと思う。

上記のように実装した後、DynamoDBにデータを追加すると、以下のようにSlackに通知されることが確認できる。

まとめとして

今回はDynamoDBへのデータ追加・変更などをDynamoDB Streamsのストリーミングに流し、そのストリーミングのデータの流れをトリガーにしてLambdaをトリガーする、という事をローカルの開発環境でも検証できるようにしつつ開発してみるという事をやってみた。

以前、Kinesis Data Streamに流したデータをLambdaで受け取るイベント駆動を構築したことがあったが、DynamoDBもストリーミングの部分はどうやらKinesisと同じような感じで利用できるようだ。

今回は取り上げなかったが、公式に書かれている通り、データのフィルタリングのようなこともできるようなので、データの追加アクション時の様々なユースケースを満たせるのではないかと感じた。

《この公式ブロガーの記事一覧》


執筆者プロフィール:Katayama Yuta
認証認可(SHIFTアカウント)や課金決済のプラットフォーム開発に従事。リードエンジニア。
経歴としては、SaaS ERPパッケージベンダーにて開発を2年経験。
SHIFTでは、GUIテストの自動化やUnitテストの実装などテスト関係の案件や、DevOpsの一環でCICD導入支援をする案件にも従事。その後現在のプラットフォーム開発に参画。

《品質や開発のお問合せはお気軽に》

SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/

SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/

SHIFTの導入事例
https://service.shiftinc.jp/case/

お役立ち資料はこちら
https://service.shiftinc.jp/resources/

SHIFTの採用情報はこちら

PHOTO:UnsplashMuha Ajjan