【後編】StripeのWebhookエンドポイントをAPI Gateway→Lambda→Kinesisのserverless(サーバーレス)で構築し、ローカル環境での検証もできるようにしてみた
はじめに
こんにちは、SHIFTの開発部門に所属している Katayama です。
【前編】Stripe の Webhook エンドポイントを API Gateway→Lambda→Kinesis の serverless(サーバーレス)で構築し、ローカル環境での検証もできるようにしてみたの続きで、本記事では、Lambda→Kinesis の部分の実装と、全体を通してのローカルの開発環境で検証の部分を取り上げる。
※今回の検証では以下のバージョンを使用している
Node.js:16.17.0
serverless:3.25.1
serverless-offline:12.0.3
serverless-offline-kinesis:6.2.3
Lambda→Kinesis の部分を構築する
前編でも触れていたが、サーバーレスの開発は大きく分けて以下の 2 つになる。
①API Gateway→Lambda の部分
②Lambda→Kinesis の部分
① については、前編の記事を参照ください。
② では、Lmabda から Kinesis にデータを流す Lambda の実装と、Kinesis に意図したデータが流せているか?の確認目的でレシーバーになる Lambda 関数(流れてきたデータを受け取りそれをログに出力する簡易的な Lmabda 関数)も別途実装してみようと思う。
※①・② を通しで確認するローカルの開発環境で検証として、一覧の流れ(API Gateway→Lambda→Kinesis のイベント駆動でデータが意図して流れるか?)を確認するのを最後にやってみる。
Kinesis にデータを流す Producer を実装する
前編の記事で取り上げた実装により、Stripe の Webhook イベントの中身のデータを Lambda 関数で受け取る実装まではできていた。その続きの実装として、ここでは Kinesis Data Stream にデータを流す部分を実装していく。
実装としては、kinesis クラスに putRecord を行うロジックは押し込める形で実装してみた。
// src/lib/kinesis.js
// 省略
import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
import { nanoid } from "nanoid";
export default class Kinesis {
constructor(options = {}) {
// 省略
const { stage, region, streamName } = options;
const isLocal = stage === "local";
this.kinesisClient = new KinesisClient(
isLocal ? { region, endpoint: "http://localhost:4566" } : { region }
);
this.streamName = streamName;
}
async putRecord(options = {}) {
// 省略
const { category, type, data } = options;
const kinesisData = {
category,
type,
service: "aws-node-serverless-stripe-webhook",
payload: { version: 1, data },
};
const response = await this.kinesisClient.send(
new PutRecordCommand({
Data: new TextEncoder().encode(JSON.stringify(kinesisData)),
PartitionKey: nanoid(),
StreamName: this.streamName,
})
);
return response;
}
}
// src/stripe2kinesis.js
import Kinesis from "@/kinesis";
// 省略
export const handler = async (event) => {
try {
// 省略
logger.info({ type, payload });
await kinesis.putRecord({ category: "stripe", type, data: payload });
return { status: 202 };
} catch (e) {
logger.error({ message: e.message, stack: e.stack });
return { status: 500 };
}
};
実装に関して補足する。
Data: new TextEncoder().encode(JSON.stringify(kinesisData))
Dataに書かれている通り、Uint8Array を要求しているのでTextEncoderによるエンコードを行っている(ちなみに、Kinesis に渡るデータはここに記載があると通り、Base64 でエンコードされたバイナリデータなので AWS の SDK 内で Uint8Array→Base64 の変換が行われていると思われる)PartitionKey: nanoid()
Kinesis Data Stream におけるパーティションキーはストリーム内のデータをシャード単位でグループ化=どのシャードにデータを流すか?を決めるのに使われる。今回は 1 つのシャードしかない想定なので実行毎に異なるキーを発行するようにしている(What is partition key in AWS Kinesis all about?に書かれている通り、PartitionKey はシャードが 1 つの場合には特に意味をもたないが、複数のシャードがある場合、処理の分散化において重要になる)
上記のように実装できた所で、「API Gateway からのリクエストを受け取り、Kinesis に流すデータを取り出す」の章で検証した時のように、Stripe のイベントを作成してみると、以下のように Localstack のログで PutRecord ができている事が確認できる。
また、AWS CLI でストリームのデータを取得(GetRecords)して Base64 でデコードすると、意図したデータがストリームに流せている事が確認できる。
study@localhost:~/workspace/learn-serverless (stripe-webhooks *)
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name kinesis-internal-events-local --endpoint=http://localhost:4566
{
"ShardIterator": "AAAAAAAAAAEEzIvKnw++xr7JpF5fqK+qCHCYAUHDAlHAieOikP9vft9MwFGJRIaOR/mbqDWk7hpP5y88VqnZyQRh8YHsb9WL86posxLnC2VUDoKNghsNktav3zVRWoHjj9X/K3OCNp9GZL/QcaRXf6lJpN9qppWapdAHlUPSfOm9VmzrHwAoWd4yuwQMYHJ/7dgGMH18FBxqI+jF0ysWYxjtXcFfIHDvSmtacZlrQN9h9cMMxHjc5Q=="
}
study@localhost:~/workspace/learn-serverless (stripe-webhooks *)
$ aws kinesis get-records --shard-iterator AAAAAAAAAAEEzIvKnw++xr7JpF5fqK+qCHCYAUHDAlHAieOikP9vft9MwFGJRIaOR/mbqDWk7hpP5y88VqnZyQRh8YHsb9WL86posxLnC2VUDoKNghsNktav3zVRWoHjj9X/K3OCNp9GZL/QcaRXf6lJpN9qppWapdAHlUPSfOm9VmzrHwAoWd4yuwQMYHJ/7dgGMH18FBxqI+jF0ysWYxjtXcFfIHDvSmtacZlrQN9h9cMMxHjc5Q== --endpoint=http://localhost:4566
{
"Records": [
{
...,
"Data": "eyJjYXRlZ29yeSI6InN0cmlwZSIsInR5cGUiOiJjdXN0b21lci5jcmVhdGVkIiwic2VydmljZSI6ImF3cy1ub2RlLXNlcnZlcmxlc3Mtc3RyaXBlLXdlYmhvb2siLCJwYXlsb2FkIjp7InZlcnNpb24iOjEsImRhdGEiOnsiaWQiOiJjdXNfTjBjZzBXaEZmNDJoZ0YiLCJvYmplY3QiOiJjdXN0b21lciIsImFkZHJlc3MiOm51bGwsImJhbGFuY2UiOjAsImNyZWF0ZWQiOjE2NzE0MjU5NDksImN1cnJlbmN5IjpudWxsLCJkZWZhdWx0X3NvdXJjZSI6bnVsbCwiZGVsaW5xdWVudCI6ZmFsc2UsImRlc2NyaXB0aW9uIjoiKGNyZWF0ZWQgYnkgU3RyaXBlIENMSSkiLCJkaXNjb3VudCI6bnVsbCwiZW1haWwiOm51bGwsImludm9pY2VfcHJlZml4IjoiRTg3MUQxQkUiLCJpbnZvaWNlX3NldHRpbmdzIjp7ImN1c3RvbV9maWVsZHMiOm51bGwsImRlZmF1bHRfcGF5bWVudF9tZXRob2QiOm51bGwsImZvb3RlciI6bnVsbCwicmVuZGVyaW5nX29wdGlvbnMiOm51bGx9LCJsaXZlbW9kZSI6ZmFsc2UsIm1ldGFkYXRhIjp7fSwibmFtZSI6bnVsbCwibmV4dF9pbnZvaWNlX3NlcXVlbmNlIjoxLCJwaG9uZSI6bnVsbCwicHJlZmVycmVkX2xvY2FsZXMiOltdLCJzaGlwcGluZyI6bnVsbCwidGF4X2V4ZW1wdCI6Im5vbmUiLCJ0ZXN0X2Nsb2NrIjpudWxsfX19",
"PartitionKey": "GQKK09zn7PlCQNsOf6yGd",
"EncryptionType": "NONE"
}
],
...
}
study@localhost:~/workspace/learn-serverless (stripe-webhooks *)
$ echo 'eyJjYXRlZ29yeSI6InN0cmlwZSIsInR5cGUiOiJjdXN0b21lci51cGRhdGVkIiwic2VydmljZSI6ImF3cy1ub2RlLXNlcnZlcmxlc3Mtc3RyaXBlLXdlYmhvb2siLCJwYXlsb2FkIjp7InZlcnNpb24iOjEsImRhdGEiOnsiaWQiOiJjdXNfTjBjZzBXaEZmNDJoZ0YiLCJvYmplY3QiOiJjdXN0b21lciIsImFkZHJlc3MiOm51bGwsImJhbGFuY2UiOjAsImNyZWF0ZWQiOjE2NzE0MjU5NDksImN1cnJlbmN5IjpudWxsLCJkZWZhdWx0X3NvdXJjZSI6bnVsbCwiZGVsaW5xdWVudCI6ZmFsc2UsImRlc2NyaXB0aW9uIjoiKGNyZWF0ZWQgYnkgU3RyaXBlIENMSSkiLCJkaXNjb3VudCI6bnVsbCwiZW1haWwiOm51bGwsImludm9pY2VfcHJlZml4IjoiRTg3MUQxQkUiLCJpbnZvaWNlX3NldHRpbmdzIjp7ImN1c3RvbV9maWVsZHMiOm51bGwsImRlZmF1bHRfcGF5bWVudF9tZXRob2QiOm51bGwsImZvb3RlciI6bnVsbCwicmVuZGVyaW5nX29wdGlvbnMiOm51bGx9LCJsaXZlbW9kZSI6ZmFsc2UsIm1ldGFkYXRhIjp7ImZvbyI6ImJhciJ9LCJuYW1lIjpudWxsLCJuZXh0X2ludm9pY2Vfc2VxdWVuY2UiOjEsInBob25lIjpudWxsLCJwcmVmZXJyZWRfbG9jYWxlcyI6W10sInNoaXBwaW5nIjpudWxsLCJ0YXhfZXhlbXB0Ijoibm9uZSIsInRlc3RfY2xvY2siOm51bGx9fX0=' | base64 -d
{"category":"stripe","type":"customer.updated","service":"aws-node-serverless-stripe-webhook","payload":{"version":1,"data":{"id":"cus_N0cg0WhFf42hgF","object":"customer","address":null,"balance":0,"created":1671449949,"currency":null,"default_source":null,"delinquent":false,"description":"(created by Stripe CLI)","discount":null,"email":null,"invoice_prefix":"E871D1BE","invoice_settings":{"custom_fields":null,"default_payment_method":null,"footer":null,"rendering_options":null},"livemode":false,"metadata":{"foo":"bar"},"name":null,"next_invoice_sequence":1,"phone":null,"preferred_locales":[],"shipping":null,"tax_exempt":"none","test_clock":null}}}
これで確認としてはできているが、いちいちコマンドを実行して確認するのも面倒なので、Kinesis Data Stream の Consumer になる Lmabda を実装してデータを確認してみようと思う。
ローカルの開発環境で検証を簡単にするために、Kinesis のデータを受け取る Consumer を実装する
まずは、Kinesis とのイベントソースマッピングを持つ Lambda 関数を serverless.yaml に新しく定義する。
# serverless.yaml
# 省略
functions:
# 省略
# ローカルの開発環境での検証用
localOnlyKinesisConsumer:
name: localOnlyKinesisConsumer
handler: support/kinesis-consumer.handler
events:
- stream:
type: kinesis
arn: arn:aws:kinesis:${self:provider.region}:000000000000:stream/${file(./env/${self:provider.stage}.json):KINESIS_STREAM_NAME}
batchSize: 100
startingPosition: LATEST
ただ、この状態ではローカルの開発環境で検証はできないので、serverless-offline-kinesis(ローカルの開発環境で Kinesis Data Stream にデータが流れた際、イベントソースマッピングを設定している Lambda 関数を自動で起動してくれる serverless offline の plugin)を新規に導入し、Kinesis にデータが流れたら、イベントソースマッピングを持つ Lambda が起動するように設定を追加する。
# serverless.yaml
# 省略
plugins:
- serverless-webpack
- serverless-offline-kinesis # <- ここを追記
- serverless-offline
custom:
# 省略
serverless-offline-kinesis:
endpoint: http://localhost:4566
region: ${self:provider.region}
accessKeyId: local
secretAccessKey: local
ここまで設定すると、以下のような実装をした Lmabda 関数(kinesis-consumer.js)で event を console.log で出力させてみると、Kinesis からイベントデータが Lambda に渡ってくる所まで確認できる。
// ./support/kinesis-consumer
export const handler = async (event) => {
try {
console.log(event);
} catch (e) {
console.error(e);
}
};
Running Authorization function for post /local/webhook (λ: authorizerFunc)
(λ: authorizerFunc) RequestId: 657a8aac-373b-404b-8d79-5185b13a41bb Duration: 387.73 ms Billed Duration: 388 ms
Authorization function returned a successful response: (λ: authorizerFunc)
POST /local/webhook (λ: stripe2kinesis)
(λ: stripe2kinesis) RequestId: 00e95d13-9586-43c9-b004-bfb6197aed18 Duration: 1323.32 ms Billed Duration: 1324 ms
{
Records: [
{
kinesis: [Object],
eventSource: 'aws:kinesis',
eventID: 'shardId-000000000000:49636242008595608693328728591756196462849345486980120578',
invokeIdentityArn: 'arn:aws:iam::serverless:role/offline',
eventVersion: '1.0',
eventName: 'aws:kinesis:record',
eventSourceARN: 'arn:aws:kinesis:ap-northeast-1:000000000000:kinesis-internal-events-local',
awsRegion: undefined
}
]
}
(λ: localOnlyKinesisConsumer) RequestId: 5a84ac81-60c1-48ef-bc17-c4b9b9cf853c Duration: 115.92 ms Billed Duration: 116 ms
最後に、Kinesis のデータを取り出し、Buffer.from で Base64 でデコードして Producer で送ったデータと同じか?を確かめられるように実装を変えた後、Stripe の Webhook イベントをテストで発生させて、それが API Gateway→Lmabda→Kinesis と流れていった後、意図したデータを Consumer で受け取れるか?を確認してみると、以下の動画のように意図した通り動いてることが確認できる。
// ./support/kinesis-consumer
export const handler = async (event) => {
try {
await Promise.all(
// eslint-disable-next-line array-callback-return
event.Records.map((record) => {
try {
const { data: eventData } = record.kinesis;
const { category, type, payload } = JSON.parse(
Buffer.from(eventData, "base64")
);
console.log("category", category);
console.log("type", type);
console.log("payload", payload);
} catch (e) {
console.error(e);
}
})
);
} catch (e) {
console.error(e);
}
};
以上で、前編・後編に分かれて取り上げていた内容(API Gateway→Lambda の部分を構築する、Lambda→Kinesis の部分を構築する)を組み合わせて、ローカルの開発環境で API Gateway→Lambda→Kinesis の構成の検証も問題なくできた事が確認できたと思う。
まとめとして
今回は前編・後編の記事を通じて、Stripe のの Webhook エンドポイントとして、API Gateway→Lambda→Kinesis のサーバレスアプリケーションの構築をやってみて、ローカルの開発環境で検証も行ってみた。Stripe の Webhook イベントの処理の部分を Web アプリケーションから分離しつつ、Kinesis を利用する事で、Consumer も複数配置できるので、マイクロサービスが複数あり、Stripe のイベントでデータを書き換えたいなどの要望に応えられる構成になったのではないかと思った。
今回は Stripe のエンドポイントだったが、それに限らずリクエストを受け取りそれをイベント駆動で非同期で処理するというアーキテクチャはままあると思うので、今回の構築を参考に他の部分でも Kinesis を利用したイベント駆動の構築をやってみたいと思った。
《この公式ブロガーの記事一覧》