EC2インスタンスメタデータに対するServer-Side Request Forgery(SSRF)の対策とバイパス
はじめに
こんにちは!SHIFT DevOpsグループの横山です。
セキュリティの技術書を読んでいると、しばしばCTFより自分で対策を実装しそのバイパスを考える、というサイクルを繰り返すほうがよいとする主張を目にするためSSRFに対して実践してみました。
EC2インスタンスメタデータに対するSSRF
SSRFはアプリケーションに意図しないリクエストをさせる攻撃手法です。
実際のアプリケーションではWebhookなどにSSRFの脆弱性が存在することがあります。
以下はSSRFの脆弱性があるコード例です。
from flask import Flask, request
import requests
app = Flask(__name__)
@app.route('/')
def index():
return "ssrf demo!"
@app.route('/ssrf')
def ssrf():
url = request.args.get('url', 'http://127.0.0.1:5000')
res = requests.get(url, timeout=3)
return res.text
if __name__ == '__main__':
app.run(host='0.0.0.0')
SSRFに対して脆弱なパラメータに http://169.254. 169.254/latest/meta-data/iam/security-credentials を渡すことでロール名がわかります。
取得したロール名を利用し http://169.254. 169.254/latest/meta-data/iam/security-credentials/<ロール名> を渡すことで認証情報を取得できます。
認証情報を設定し、取得したロールの権限でコマンドを実行できます。
SSRFに対する対策とバイパス
SSRFへの対策をアプリケーション側で実装します。 https://github.com/swisskyrepo/PayloadsAllTheThings/tree/master/Server Side Request Forgery#ssrf-url-for-aws を参考にし、以下のペイロードとシェルを利用し攻撃します。
#!/bin/bash
while read u
do
echo $u, $(curl --silent "http://${1}:5000/ssrf?url=${u}/latest/meta-data/hostname") \
| grep compute.internal
done < /home/taka/work/sec/ssrf/payloads/payloads.txt
何も対策をしていないコードに対する結果は以下になります。
169.254. 169.254に対して部分一致での対策をしたコードとそれに対する結果は以下になります。
from flask import Flask, request
import requests
app = Flask(__name__)
block_list = ["169.254. 169.254"]
@app.route('/')
def index():
return "ssrf demo!"
@app.route('/ssrf')
def ssrf():
url = request.args.get('url', 'http://127.0.0.1:5000')
if any(u in url for u in block_list):
return "Malicious url detected!"
res = requests.get(url, timeout=3)
return res.text
if __name__ == '__main__':
app.run(host='0.0.0.0')
IPを指定された場合にグローバルIPであるかの確認による対策をしたコードとそれに対する結果は以下になります。
from urllib.parse import urlparse
from flask import Flask, request
import ipaddress
import requests
app = Flask(__name__)
def is_valid_ip(host):
ipaddr = None
try:
ipaddr = ipaddress.ip_address(host)
except ValueError:
print("Not valid ip address. Maybe domain?")
return ipaddr
@app.route('/')
def index():
return "ssrf demo!"
@app.route('/ssrf')
def ssrf():
url = request.args.get('url', 'http://127.0.0.1:5000')
if (ipaddr := is_valid_ip(urlparse(url).netloc)) is not None \
and not ipaddr.is_global:
return "Malicious url detected!"
res = requests.get(url, timeout=3)
return res.text
if __name__ == '__main__':
app.run(host='0.0.0.0')
上記に加え、ドメインの場合にA/AAAAレコードにグローバルIPが登録されているかの確認(OWASP参考)による対策をしたコードとそれに対する結果は以下になります。
from urllib.parse import urlparse
import dns.resolver
from flask import Flask, request
import ipaddress
import requests
app = Flask(__name__)
def is_valid_ip(host):
ipaddr = None
try:
ipaddr = ipaddress.ip_address(host)
except ValueError:
print("Not valid ip address. Maybe domain?")
return ipaddr
# Configure the DNS resolver to use for all DNS queries
DNS_RESOLVER = dns.resolver.Resolver()
DNS_RESOLVER.nameservers = ["1.1.1.1"]
def verify_dns_records(domain, records, type):
"""
Verify if one of the DNS records resolve to a non public IP address.
Return a boolean indicating if any error has been detected.
"""
error_detected = False
if records is not None:
for record in records:
value = record.to_text().strip()
try:
ip = ipaddress.ip_address(value)
# See https://docs.python.org/3/library/ipaddress.html#ipaddress.IPv4Address.is_global
if not ip.is_global:
print("[!] DNS record type '%s' for domain name '%s' resolve to "
"a non public IP address '%s'!" % (type, domain, value))
error_detected = True
except ValueError:
error_detected = True
print("[!] '%s' is not valid IP address!" % value)
return error_detected
def check(domain):
"""
Return a boolean indicating if any error has been detected.
"""
error_detected = False
# Get the IPs of the current domain
# See https://en.wikipedia.org/wiki/List_of_DNS_record_types
try:
# A = IPv4 address record
ip_v4_records = DNS_RESOLVER.query(domain, "A")
except Exception as e:
ip_v4_records = None
print("[i] Cannot get A record for domain '%s': %s\n" % (domain,e))
try:
# AAAA = IPv6 address record
ip_v6_records = DNS_RESOLVER.query(domain, "AAAA")
except Exception as e:
ip_v6_records = None
print("[i] Cannot get AAAA record for domain '%s': %s\n" % (domain,e))
# Verify the IPs obtained
if verify_dns_records(domain, ip_v4_records, "A") \
or verify_dns_records(domain, ip_v6_records, "AAAA"):
error_detected = True
return error_detected
@app.route('/')
def index():
return "ssrf demo!"
@app.route('/ssrf')
def ssrf():
url = request.args.get('url', 'http://127.0.0.1:5000')
host = urlparse(url).netloc
if (ipaddr := is_valid_ip(host)) is not None \
and not ipaddr.is_global:
return "Malicious url detected!"
if ipaddr is None and check(host):
return "Invalid domain!"
res = requests.get(url, timeout=3)
return res.text
if __name__ == '__main__':
app.run(host='0.0.0.0')
上記に加え、check関数においてip_v4_records、ip_v6_recordsがともにNoneのときの判定を追加したコードとそれに対する結果は以下になります。
169.254. 169.254にリダイレクトされるドメインを使用するとバイパスできます。
from urllib.parse import urlparse
import dns.resolver
from flask import Flask, request
import ipaddress
import requests
app = Flask(__name__)
def is_valid_ip(host):
ipaddr = None
try:
ipaddr = ipaddress.ip_address(host)
except ValueError:
print("Not valid ip address. Maybe domain?")
return ipaddr
# Configure the DNS resolver to use for all DNS queries
DNS_RESOLVER = dns.resolver.Resolver()
DNS_RESOLVER.nameservers = ["1.1.1.1"]
def verify_dns_records(domain, records, type):
"""
Verify if one of the DNS records resolve to a non public IP address.
Return a boolean indicating if any error has been detected.
"""
error_detected = False
if records is not None:
for record in records:
value = record.to_text().strip()
try:
ip = ipaddress.ip_address(value)
# See https://docs.python.org/3/library/ipaddress.html#ipaddress.IPv4Address.is_global
if not ip.is_global:
print("[!] DNS record type '%s' for domain name '%s' resolve to "
"a non public IP address '%s'!" % (type, domain, value))
error_detected = True
except ValueError:
error_detected = True
print("[!] '%s' is not valid IP address!" % value)
return error_detected
def check(domain):
"""
Return a boolean indicating if any error has been detected.
"""
error_detected = False
# Get the IPs of the current domain
# See https://en.wikipedia.org/wiki/List_of_DNS_record_types
try:
# A = IPv4 address record
ip_v4_records = DNS_RESOLVER.query(domain, "A")
except Exception as e:
ip_v4_records = None
print("[i] Cannot get A record for domain '%s': %s\n" % (domain,e))
try:
# AAAA = IPv6 address record
ip_v6_records = DNS_RESOLVER.query(domain, "AAAA")
except Exception as e:
ip_v6_records = None
print("[i] Cannot get AAAA record for domain '%s': %s\n" % (domain,e))
# Verify the IPs obtained
if (ip_v4_records is None and ip_v6_records is None) or \
(verify_dns_records(domain, ip_v4_records, "A") \
or verify_dns_records(domain, ip_v6_records, "AAAA")):
error_detected = True
return error_detected
@app.route('/')
def index():
return "ssrf demo!"
@app.route('/ssrf')
def ssrf():
url = request.args.get('url', 'http://127.0.0.1:5000')
host = urlparse(url).netloc
if (ipaddr := is_valid_ip(host)) is not None \
and not ipaddr.is_global:
return "Malicious url detected!"
if ipaddr is None and check(host):
return "Invalid domain!"
res = requests.get(url, timeout=3)
return res.text
if __name__ == '__main__':
app.run(host='0.0.0.0')
上記に加えrequests.getの引数にallow_redirects=Falseを追加し、リダイレクト不可にしたコードとそれに対する結果は以下になります。
from urllib.parse import urlparse
import dns.resolver
from flask import Flask, request
import ipaddress
import requests
app = Flask(__name__)
def is_valid_ip(host):
ipaddr = None
try:
ipaddr = ipaddress.ip_address(host)
except ValueError:
print("Not valid ip address. Maybe domain?")
return ipaddr
# Configure the DNS resolver to use for all DNS queries
DNS_RESOLVER = dns.resolver.Resolver()
DNS_RESOLVER.nameservers = ["1.1.1.1"]
def verify_dns_records(domain, records, type):
"""
Verify if one of the DNS records resolve to a non public IP address.
Return a boolean indicating if any error has been detected.
"""
error_detected = False
if records is not None:
for record in records:
value = record.to_text().strip()
try:
ip = ipaddress.ip_address(value)
# See https://docs.python.org/3/library/ipaddress.html#ipaddress.IPv4Address.is_global
if not ip.is_global:
print("[!] DNS record type '%s' for domain name '%s' resolve to "
"a non public IP address '%s'!" % (type, domain, value))
error_detected = True
except ValueError:
error_detected = True
print("[!] '%s' is not valid IP address!" % value)
return error_detected
def check(domain):
"""
Return a boolean indicating if any error has been detected.
"""
error_detected = False
# Get the IPs of the current domain
# See https://en.wikipedia.org/wiki/List_of_DNS_record_types
try:
# A = IPv4 address record
ip_v4_records = DNS_RESOLVER.query(domain, "A")
except Exception as e:
ip_v4_records = None
print("[i] Cannot get A record for domain '%s': %s\n" % (domain,e))
try:
# AAAA = IPv6 address record
ip_v6_records = DNS_RESOLVER.query(domain, "AAAA")
except Exception as e:
ip_v6_records = None
print("[i] Cannot get AAAA record for domain '%s': %s\n" % (domain,e))
# Verify the IPs obtained
if (ip_v4_records is None and ip_v6_records is None) or \
(verify_dns_records(domain, ip_v4_records, "A") \
or verify_dns_records(domain, ip_v6_records, "AAAA")):
error_detected = True
return error_detected
@app.route('/')
def index():
return "ssrf demo!"
@app.route('/ssrf')
def ssrf():
url = request.args.get('url', 'http://127.0.0.1:5000')
host = urlparse(url).netloc
if (ipaddr := is_valid_ip(host)) is not None \
and not ipaddr.is_global:
return "Malicious url detected!"
if ipaddr is None and check(host):
return "Invalid domain!"
res = requests.get(url, timeout=3, allow_redirects=False)
return res.text
if __name__ == '__main__':
app.run(host='0.0.0.0')
IMDSv2による対策
IMDSv1を無効化しIMDSv2のみ有効にするとPUTリクエストによりトークンを取得し、有効時間内のトークンをヘッダーとして指定しないとメタデータに対するリクエストが無効となります。
コードについては何も対策をせず、IMDSv1を無効化しIMDSv2のみ有効にした状態に対する結果は以下になります。
IMDSv1を無効化しIMDSv2のみが有効な場合でもPHPではgopherプロトコルを利用することでバイパスできるようなため、gopherプロトコルを利用してローカルでテストした結果が以下になります。
requestsライブラリーではリダイレクトも含め、http、httpsプロトコルしかサポートされていないため、gopherプロトコルによるIMDSv2のバイパスは不可でした。
おわりに
対策の実装とそのバイパスを繰り返すことで、対策がされているからと言ってその対策は必ずしも完璧ではない可能性があるということを学びました。
自分の攻撃者としての限界を実際の攻撃者に反映して考えることは危険なため、ネットワークレベルでの対策や最小権限の設定などについても考慮する必要があるということについても再実感しました。
参考
https://blog.tokumaru.org/2018/12/introduction-to-ssrf-server-side-request-forgery.html
https://blog.tokumaru.org/2019/12/defense-ssrf-amazon-ec2-imdsv2.html
お問合せはお気軽に
SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/
SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/
SHIFTの導入事例
https://service.shiftinc.jp/case/
お役立ち資料はこちら
https://service.shiftinc.jp/resources/
SHIFTの採用情報はこちら
PHOTO:UnsplashのPakata Goh