这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def scan_resource_conf(self, conf):
principal = statement['Principal']
if 'Effect' in statement and statement['Effect'] == 'Deny':
continue
if 'Condition' in statement:
continue
if 'AWS' in principal:
aws = principal['AWS']
if (isinstance(aws, str) and aws == '*') or (isinstance(aws, list) and '*' in aws):
Expand Down
65 changes: 65 additions & 0 deletions checkov/terraform/checks/resource/aws/SNSCrossAccountAccess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

from typing import Any

from cloudsplaining.scan.resource_policy_document import ResourcePolicyDocument

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class SNSCrossAccountAccess(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure AWS SNS topic policies do not allow cross-account access"
id = "CKV_AWS_385"
supported_resources = ("aws_sns_topic_policy",)
categories = (CheckCategories.IAM,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
conf_policy = conf.get("policy")

if not conf_policy:
return CheckResult.PASSED

if conf_policy:
if isinstance(conf_policy[0], dict):
for policy in conf_policy:
try:
processed_policy = ResourcePolicyDocument(policy=policy)
for statement in processed_policy.statements:
if statement.effect != "Allow":
continue

has_specific_aws_iam_arn_principal = False

aws_principal_values = []
if statement.statement and "Principal" in statement.statement and "AWS" in statement.statement["Principal"]:
raw_aws_principals = statement.statement["Principal"]["AWS"]
if isinstance(raw_aws_principals, str):
aws_principal_values.append(raw_aws_principals)
elif isinstance(raw_aws_principals, list):
aws_principal_values.extend(raw_aws_principals)

for principal_str in aws_principal_values:
if isinstance(principal_str, str) and \
principal_str.startswith("arn:aws:iam::") and \
principal_str != "*":
has_specific_aws_iam_arn_principal = True
break

if has_specific_aws_iam_arn_principal:
if not statement.conditions:
return CheckResult.FAILED

except (TypeError, AttributeError):
return CheckResult.UNKNOWN
else:
return CheckResult.UNKNOWN
return CheckResult.PASSED

def get_evaluated_keys(self) -> list[str]:
return ["policy"]


check = SNSCrossAccountAccess()
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,42 @@ resource "aws_kms_key" "pass_3" {
}
POLICY
}

resource "aws_kms_key" "pass_4" {
description = "description"

policy = <<POLICY
{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::111122223333:root"
},
"Action": "kms:*",
"Resource": "*",
"Condition": {
"Bool": { "aws:MultiFactorAuthPresent": "true" }
}
},
{
"Sid": "RestrictWildcardPrincipalToAccount",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "kms:*",
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:CallerAccount": "111122223333"
}
}
}
]
}
POLICY
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# fail
resource "aws_sns_topic_policy" "fail0" {
arn = aws_sns_topic.test.arn

policy = <<POLICY
{
"Version":"2012-10-17",
"Statement":[
{
"Principal": {
"AWS": [
"arn:aws:iam::123456789101:role/sns"
]
},
"Effect": "Allow",
"Action": [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Receive",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission",
],
"Resource": "${aws_sns_topic.test.arn}"
}
]
}
POLICY
}

resource "aws_sns_topic_policy" "fail1" {
arn = aws_sns_topic.test.arn

policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowS3ToPublish",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "SNS:Publish",
"Resource": "${aws_sns_topic.test.arn}",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:::your-s3-bucket-name/*"
},
"StringEquals": {
"aws:SourceAccount": "${data.aws_caller_identity.current.account_id}"
}
}
},
{
"Sid": "AllowOriginalAccess",
"Principal": {
"AWS": [
"arn:aws:iam::123456789101:role/sns"
]
},
"Effect": "Allow",
"Action": [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Receive",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission"
],
"Resource": "${aws_sns_topic.test.arn}"
}
]
}
POLICY
}



# pass
resource "aws_sns_topic_policy" "pass0" {
arn = aws_sns_topic.test.arn

policy = <<POLICY
{
"Version":"2012-10-17",
"Statement":[
{
"Principal": {
"AWS": [
"*"
]
},
"Effect": "Allow",
"Action": [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Receive",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission",
],
"Resource": "${aws_sns_topic.test.arn}"
}
]
}
POLICY
}

resource "aws_sns_topic_policy" "pass1" {
arn = aws_sns_topic.test.arn

policy = <<POLICY
{
"Version":"2012-10-17",
"Statement":[
{
"Principal": "*",
"Effect": "Allow",
"Action": [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Receive",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission",
],
"Resource": "${aws_sns_topic.test.arn}"
}
]
}
POLICY
}

resource "aws_sns_topic_policy" "pass2" {
arn = aws_sns_topic.test.arn

policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowS3ToPublish",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "SNS:Publish",
"Resource": "${aws_sns_topic.test.arn}",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:::your-s3-bucket-name/*"
},
"StringEquals": {
"aws:SourceAccount": "${data.aws_caller_identity.current.account_id}"
}
}
},
{
"Sid": "AllowOriginalAccess",
"Principal": {
"AWS": [
"arn:aws:iam::123456789101:role/sns"
]
},
"Effect": "Allow",
"Action": [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Receive",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission"
],
"Resource": "${aws_sns_topic.test.arn}",
"Condition": {
"ArnEquals": {
"aws:PrincipalArn": "arn:aws:iam::123456789101:role/sns"
}
}
}
]
}
POLICY
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,45 @@ resource "aws_sns_topic_policy" "sns_tp1" {
POLICY
}

resource "aws_sns_topic_policy" "sns_pass_condition" {
arn = aws_sns_topic.test.arn

policy = <<POLICY
{
"Version":"2012-10-17",
"Statement":[
{
"Sid": "AllowSpecificPrincipalsFromSourceAccount",
"Principal": {
"AWS": [
"arn:aws:iam::123456789101:role/sns",
"*"
]
},
"Effect": "Allow",
"Action": [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Receive",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission"
],
"Resource": "${aws_sns_topic.test.arn}",
"Condition": {
"StringEquals": {
"aws:SourceAccount": "123456789101"
}
}
}
]
}
POLICY
}

# should return as unknown dou to condition parsing error.
resource "aws_sns_topic_policy" "sns_tp_unknown" {
arn = aws_sns_topic.test.arn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ def test(self):
'aws_kms_key.pass_0',
'aws_kms_key.pass_1',
'aws_kms_key.pass_2',
'aws_kms_key.pass_3'
'aws_kms_key.pass_3',
'aws_kms_key.pass_4',
}
failing_resources = {
'aws_kms_key.fail_0',
'aws_kms_key.fail_1',
'aws_kms_key.fail_2',
'aws_kms_key.fail_3',
'aws_kms_key.fail_4'
'aws_kms_key.fail_4',
}

passed_check_resources = set([c.resource for c in report.passed_checks])
Expand All @@ -36,7 +37,7 @@ def test(self):
self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)

self.assertEqual(summary['passed'], 4)
self.assertEqual(summary['passed'], 5)
self.assertEqual(summary['failed'], 5)
self.assertEqual(summary['skipped'], 0)
self.assertEqual(summary['parsing_errors'], 0)
Expand Down
Loading
Loading