Cognitoのカスタム認証フローでSRPを使ってみた
はじめに
こんにちは。SHIFTのインフラ・アーキテクトの岡田です。
AWSを使用したインフラの設計・構築に携わっています。
最近Cognitoのユーザープールを使用した認証システムに関わる機会が有り、面白く感じました。そこで今回少し勉強して、認証を行う際、MFAに対応するためカスタム認証チャレンジを使用する実装に取り組んでみました。
また、パスワード認証にはAWSで推奨されているSRP(Secure Remote Password)を使ってみました。使用したサービスはLambda、言語はPythonです。
ユースケースは、OpenID Connectのユーザー認証で、CognitoのHosted UIではなく独自のユーザーインタフェースを使用する場合などです。
「サーバーサイドのCognito認証で、カスタム認証フローとSRPを使いたい」という方の参考になれば幸いです。
システム構成
システム構成は以下となります。
ブラウザーからのリクエストをLambda関数が受けて、ユーザーを認証するためCognitoのAPIを呼び出します。
CognitoはLambdaトリガーを呼び出しながら、カスタム認証チャレンジを処理します。カスタム認証チャレンジについてはAWSの公式ドキュメントカスタム認証チャレンジの Lambda トリガーをご覧ください。
RedisはSRPで使用するuser_id_for_srpと、カスタム認証フローで使用するsessionを格納するために使用します。
また、MFAとしてOTPをSESを使用してメール送信しています。
認証フロー
それぞれのコンポーネント間でやり取りされる情報を次のシークエンス図に示します。サーバーサイドの実装のため、Cognito APIはAdmin系のものを使用できます。例えばInitiateAuthの代わりにAdminInitiateAuthを使用しています。
Lambda関数は、パスワード認証を行うものと、OTP認証を行うものの2つがあるので、順番に説明します。
パスワード認証
SRPを使用するのはこのLambda関数です。
SRPの仕様は公式文書Using the Secure Remote Password (SRP) Protocol for TLS Authenticationに記載されていますが、私はこれを読んでもよく理解できませんでした。幸いCognitoを使用するSRPのPython実装Python library for using AWS Cognito. With support for SRP(warrant)が公開されているので、そのソースコードと公式文書を見比べながら理解していきました。
基本的にwarrantのソースコードを使用しましたが、カスタム認証フローとAdmin系APIを使用するために一部修正する必要があったので、その部分を中心に説明していきます。
パスワード認証を行うLambda関数の実装例を次に示します。AWSSRPがwarrantのクラス、authenticate_userがその中の主要なメソッドです。
# user_idとpasswordをリクエストボディから取得するコード(省略)
u = AWSSRP(pool_id=user_pool_id, client_id=client_id, client_secret=client_secret, username=user_id, password=password)
try:
response, user_id_for_srp = u.authenticate_user()
except Exception as e:
# 例外を処理するコード(省略)
session = response["Session"]
data["session"] = session
data["user_id_for_srp"] = user_id_for_srp
# dataをRedisに格納するコード(省略)
return {
"statusCode": 200
}
authenticate_userの中でシークエンス図の2から13までが実行されます。authenticate_userのソースコードを次に示します。
response = boto_client.admin_initiate_auth(
AuthFlow='CUSTOM_AUTH',
AuthParameters=auth_params,
ClientId=self.client_id,
UserPoolId=self.pool_id
)
if response['ChallengeName'] == self.PASSWORD_VERIFIER_CHALLENGE:
user_id_for_srp = response['ChallengeParameters']['USER_ID_FOR_SRP']
challenge_response = self.process_challenge(response['ChallengeParameters'])
session = response["Session"]
response = boto_client.admin_respond_to_auth_challenge(
ClientId=self.client_id,
UserPoolId=self.pool_id,
Session=session,
ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE,
ChallengeResponses=challenge_response)
return response, user_id_for_srp
最初にadmin_initiate_authを呼び出します(シークエンス図2)が、ここでのwarrantからの変更点は以下の2点です。
APIを元のコードのinitiate_authからadmin_initiate_authに変更した。そのため引数にUserPoolIdを追加した。
AuthFlowをカスタム認証フローを表すCUSTOM_AUTHに変更した。
admin_initiate_authが実行されるとLambdaトリガーのDefine auth challengeが呼び出されます(シークエンス図3)。
実装例を次に示します。if文の最初の分岐が選択され、responseのchallengeNameに"PASSWORD_VERIFIER"がセットされて返却されます(シークエンス図4)。
if (len(event["request"]["session"]) == 1) and (event["request"]["session"][0]["challengeName"] == "SRP_A"):
event["response"]["challengeName"] = "PASSWORD_VERIFIER"
event["response"]["issueTokens"] = False
event["response"]["failAuthentication"] = False
elif (len(event["request"]["session"]) == 2) and (event["request"]["session"][1]["challengeName"] == "PASSWORD_VERIFIER") and (event["request"]["session"][1]["challengeResult"] is True):
event["response"]["challengeName"] = "CUSTOM_CHALLENGE"
event["response"]["issueTokens"] = False
event["response"]["failAuthentication"] = False
elif (len(event["request"]["session"]) >= 3) and (event["request"]["session"][len(event["request"]["session"]) - 1]["challengeName"] == "CUSTOM_CHALLENGE") and (event["request"]["session"][len(event["request"]["session"]) - 1]["challengeResult"] is True):
event["response"]["issueTokens"] = True
event["response"]["failAuthentication"] = False
elif (len(event["request"]["session"]) >= 3) and (event["request"]["session"][len(event["request"]["session"]) - 1]["challengeName"] == "CUSTOM_CHALLENGE") and (event["request"]["session"][len(event["request"]["session"]) - 1]["challengeResult"] is False):
event["response"]["challengeName"] = "CUSTOM_CHALLENGE"
event["response"]["issueTokens"] = False
event["response"]["failAuthentication"] = False
else:
event["response"]["issueTokens"] = False
event["response"]["failAuthentication"] = True
return event
authenticate_userに制御が戻り(シークエンス図5)、admin_respond_to_auth_challengeを呼び出します(シークエンス図6)が、ここでのwarrantからの変更点は以下の2点です。
APIを元のコードのrespond_to_auth_challengeからadmin_respond_to_auth_challengeに変更した。引数にUserPoolIdとカスタム認証フローのためのSessionを追加した。
OTP認証のLambda関数で必要となるuser_id_for_srpをadmin_initiate_authのresponseのChallengeParametersから取得した。
また、admin_respond_to_auth_challengeの引数のchallenge_responseの値はprocess_challengeメソッドで計算していますが、その中でSECRET_HASHキーの値を得るためにget_secret_hashメソッドを呼び出している部分があります。その引数として与えるUSERNAMEはemailアドレスのようなエイリアスではなく、本当のUSERNAME(admin_initiate_authのレスポンスのChallengeParametersの中から取得できます)である必要があります。該当するwarrantの変更部分を以下に示します。
if self.client_secret is not None:
response.update({
"SECRET_HASH":
# self.get_secret_hash(self.username, self.client_id, self.client_secret)})
self.get_secret_hash(challenge_parameters['USERNAME'], self.client_id, self.client_secret)})
admin_respond_to_auth_challengeの呼び出しでSRP自体は完了しますが、今回はカスタム認証フローを使用しているため、再度LambdaトリガーのDefine auth challengeが呼び出されます(シークエンス図7)。
今度はif文の2番目の分岐が選択され、responseのchallengeNameに"CUSTOM_CHALLENGE"がセットされて返却されます(シークエンス図の8)。するとCognitoが自動的にLambdaトリガーのCreate auth challengeを呼び出します(シークエンス図8)。実装例を次に示します。
OTPコードを生成し、SESを使ってemailを送信後、CognitoにOTPコードと有効期限を返却します(シークエンス図10~12)。
if event["request"]["challengeName"] == "CUSTOM_CHALLENGE":
# OTPコードを生成する。
email_address = event["request"]["userAttributes"]["email"]
otp_code = (OTPコードの生成ロジック)
# 期限を設定する。
expire = datetime.datetime.now() + datetime.timedelta(minutes=10)
expire_str = expire.strftime("%Y-%m-%d %H:%M:%S")
# emailを送信する。
try:
response = client.send_email(
Destination={
'ToAddresses': [
email_address,
],
},
Message={
'Body': {
'Text': {
'Charset': "UTF-8",
'Data': "OTPコード: " + otp_code,
},
},
'Subject': {
'Charset': "UTF-8",
'Data': "OTPコード",
},
},
Source=formataddr((Header(sender, 'ISO-2022-JP').encode(), sender_addr)),
)
except ClientError as e:
# 例外を処理するコード(省略)
event["response"]["privateChallengeParameters"] = {
"otpCode": otp_code,
"expire": expire_str
}
return event
パスワード認証を行うLambda関数に制御が戻り(シークエンス図13)、Redisにuser_id_for_srpとsessionを格納し(シークエンス図14)、ブラウザーに正常終了を返却します(シークエンス図15)。
OTP認証
OTP認証を行うLambda関数の実装例を次に示します。
def get_secret_hash(username, client_id, client_secret):
message = bytearray(username + client_id, 'utf-8')
hmac_obj = hmac.new(bytearray(client_secret, 'utf-8'), message, hashlib.sha256)
return base64.standard_b64encode(hmac_obj.digest()).decode('utf-8')
# ユーザーが入力したOTPを取得するコード(省略)
# Redisからuser_id_for_srpとsessionを取り出すコード(省略)
try:
response = client.admin_respond_to_auth_challenge(
UserPoolId=user_pool_id,
ClientId=client_id,
ChallengeName="CUSTOM_CHALLENGE",
ChallengeResponses={
'USERNAME': user_id_for_srp,
'ANSWER': otp_code,
'SECRET_HASH': get_secret_hash(user_id_for_srp, client_id, client_secret)
},
Session=session
)
except Exception as e:
# 例外を処理するコード(省略)
if "AuthenticationResult" in response.keys():
# OTPが一致していた場合、レスポンスからtokenを取得する。
authentication_result = response["AuthenticationResult"]
access_token = authentication_result["AccessToken"]
refresh_token = authentication_result["RefreshToken"]
id_token = authentication_result["IdToken"]
else:
# OTPが一致していなかった場合、エラーを返却するコード(省略)
ユーザーが入力したOTPをリクエストボディから取得した後、パスワード認証Lambda関数がRedisに格納したuser_id_for_srpとsessionを取り出します(シークエンス図18)。次いでadmin_respond_to_auth_challengeを呼び出します(シークエンス図19)。
今度はLambdaトリガーのVerify auth challengeが呼び出され(シークエンス図20)、入力したOTPが正しいかチェックします。実装例を次に示します。
expected_otp_code = event["request"]["privateChallengeParameters"]["otpCode"]
expire_str = event["request"]["privateChallengeParameters"]["expire"]
expire = datetime.datetime.strptime(expire_str, "%Y-%m-%d %H:%M:%S")
otp_code = event["request"]["challengeAnswer"]
if datetime.datetime.now() >= expire:
event["response"]["answerCorrect"] = False
elif otp_code == expected_otp_code:
event["response"]["answerCorrect"] = True
else:
event["response"]["answerCorrect"] = False
return event
有効期限内でかつ入力したOTPが正しい場合、responseのanswerCorrectにTrueをセットし、返却します(シークエンス図21)。
するとLambdaトリガーのDefine auth challengeが再度呼び出されます(シークエンス図22)。今度は3番目の分岐が選択され、responseのissueTokensをTrueにセットして返却します(シークエンス図23)。
OTP認証Lambda関数に制御が戻り(シークエンス図24)、responseからトークンを抽出後、ブラウザーにリダイレクトを返却します(シークエンス図25)。
なお、OTPの有効期限が超過した場合、もしくは入力したOTPが間違っていた時、Verify auth challengeはresponseのanswerCorrectにFalseをセットします。その場合Define auth challengeでは4番目の分岐が選択され、responseのissueTokensをFalseにセットして返却します。この時、admin_respond_to_auth_challengeのresponseにはAuthenticationResultは含まれません(従ってトークンは返却されません)。
まとめ
Cognitoのカスタム認証フローとSRPを組み合わせるとどのような実装になるか、試してみました。Lambdaで使用できるSRPの実装がAWSでは用意されていないので始めは途方にくれましたが、warrantが見つかって何とか実装できました。warrantを開発した人たちは本当に優秀だと思います。
Cognitoのカスタム認証フローについても最初は戸惑いましたが、Lambdaトリガーへの入力をひとつひとつ確認することで、理解を深めることが出来ました。
Lambdaトリガーには今回取り上げたカスタム認証フロー以外に、ユーザーのサインアップ時やIDトークンの生成時に使用できるものなど、多くの種類があります。次回はそれらの実装にもチャレンジしてみようと思います。
お問合せはお気軽に
https://service.shiftinc.jp/contact/
SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/
SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/
SHIFTの導入事例
https://service.shiftinc.jp/case/
お役立ち資料はこちら
https://service.shiftinc.jp/resources/
SHIFTの採用情報はこちら
https://recruit.shiftinc.jp/career/