这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions checkov/terraform/plan_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,89 @@ def _add_references(obj: dict[str, Any], conf: dict[str, Any], return_resource:
return_resource.setdefault(CustomAttributes.REFERENCES, []).append(conf_value["references"])


def _process_deleted_resources(
template: dict[str, Any],
resource_changes: dict[str, dict[str, Any]],
processed_addresses: set[str],
) -> list[dict[str, dict[str, Any]]]:
"""
Process resources that have pure delete actions and don't appear in planned_values.

When a resource is being purely deleted (not replaced), it doesn't appear in the
plan's planned_values section. However, it does exist in resource_changes with its
"before" state. This function extracts these resources so they can be checked by
policy rules that need to validate deletion conditions (e.g., ensuring a resource
is in an acceptable state before deletion).

:param template: The full terraform plan template
:param resource_changes: Map of resource addresses to their change information
:param processed_addresses: Set of resource addresses already processed from planned_values
:return: List of resource blocks for deleted resources, formatted for checkov checks
"""
deleted_blocks: list[dict[str, dict[str, Any]]] = []

for address, change_info in resource_changes.items():
# Avoid duplicate processing - resources in planned_values are already handled
if address in processed_addresses:
continue

actions = change_info.get("change", {}).get("actions", [])

# Only process pure deletes - other change types should already be in planned_values
if actions != ["delete"]:
continue

resource_type = change_info.get("type")
resource_name = change_info.get("name")
mode = change_info.get("mode")

if not resource_type or not resource_name:
continue

# Data sources are not managed and don't need deletion validation
if mode != "managed":
continue

# Use the "before" state to populate resource config for policy checks
before_values = change_info.get("change", {}).get("before")
if not before_values or not isinstance(before_values, dict):
continue

# Find configuration to get expressions for proper value transformation
conf = next(
(
x
for x in template.get("configuration", {}).get("root_module", {}).get("resources", [])
if x.get("type") == resource_type and x.get("name") == resource_name
),
None,
)

# Transform before state using _hclify to match the format used for other resources
expressions = conf.get("expressions") if conf else None
resource_conf = _hclify(
obj=before_values,
conf=expressions,
resource_type=resource_type,
)

# Add special attributes for policy checks to detect and validate deletions
resource_conf[TF_PLAN_RESOURCE_ADDRESS] = address # e.g., "aws_s3_bucket.example"
resource_conf[TF_PLAN_RESOURCE_CHANGE_ACTIONS] = actions # ["delete"]
resource_conf[TF_PLAN_RESOURCE_CHANGE_KEYS] = resource_changes.get(address, {}).get(TF_PLAN_RESOURCE_CHANGE_KEYS, [])

# Create the nested dict structure: { "resource_type": { "resource_name": { ...config... } } }
resource_block = {
resource_type: {
resource_name: resource_conf
}
}

deleted_blocks.append(resource_block)

return deleted_blocks


def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tuple[Optional[Dict[str, Any]], Optional[List[Tuple[int, str]]]]:
"""
:type tf_plan_file: str - path to plan file
Expand All @@ -517,6 +600,9 @@ def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tupl

resource_changes = _get_resource_changes(template=template)

# Track which resources we've already processed from planned_values
processed_addresses = set()

for resource in template.get("planned_values", {}).get("root_module", {}).get("resources", []):
conf = next(
(
Expand All @@ -536,6 +622,12 @@ def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tupl
tf_definition["resource"].append(resource_block)
elif block_type == "data":
tf_definition["data"].append(resource_block)

# Track this resource address as processed (regardless of preparation status)
# to prevent duplicate processing as a pure delete
if resource.get("address"):
processed_addresses.add(resource["address"])

child_modules = template.get("planned_values", {}).get("root_module", {}).get("child_modules", [])
root_module_conf = template.get("configuration", {}).get("root_module", {})
# Terraform supports modules within modules so we need to search
Expand All @@ -547,6 +639,18 @@ def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tupl
)
for block_type, resource_blocks in module_blocks.items():
tf_definition[block_type].extend(resource_blocks)

# Process deleted resources that weren't in planned_values
# Note: Module resource addresses from planned_values are not explicitly tracked above.
# Based on Terraform's plan structure, resources in planned_values should not simultaneously
# appear as pure deletes, but the deduplication check handles any edge cases.
deleted_blocks = _process_deleted_resources(
template=template,
resource_changes=resource_changes,
processed_addresses=processed_addresses,
)
tf_definition["resource"].extend(deleted_blocks)

return tf_definition, template_lines


Expand Down
24 changes: 23 additions & 1 deletion docs/7.Scan Examples/Terraform Plan Scanning.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ Following checks will be ignored;

To check if a resource will be deleted or changed (further change values can be found [here](https://www.terraform.io/internals/json-format#change-representation)) the change actions values can be accessed via the attribute name `__change_actions__`.

Ex. Python
Checkov supports checking both:
- **Replacement operations**: Resources with actions `["create", "delete"]` that appear in `planned_values`
- **Pure delete operations**: Resources with actions `["delete"]` that only exist in `resource_changes`

For pure deletions, the resource configuration will contain the "before" state (the resource's values before deletion), allowing checks to validate deletion conditions.

Ex. Python - Prevent deletion of specific resources
```python
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
actions = conf.get("__change_actions__")
Expand All @@ -61,6 +67,22 @@ Ex. Python
return CheckResult.PASSED
```

Ex. Python - Validate resource state before deletion
```python
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
actions = conf.get("__change_actions__")
# Only check resources being deleted
if isinstance(actions, list) and "delete" in actions:
# Check the resource's status before deletion
status = conf.get("status")
if isinstance(status, list):
status = status[0] if status else None
# Fail if deleting an active resource
if status == "ACTIVE":
return CheckResult.FAILED
return CheckResult.PASSED
```

Ex. YAML
```yaml
cond_type: attribute
Expand Down
26 changes: 26 additions & 0 deletions tests/terraform/runner/extra_tf_plan_checks/pure_delete_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.terraform.plan_parser import TF_PLAN_RESOURCE_CHANGE_ACTIONS


class SecretNotPureDeleted(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure Secret is not deleted (pure delete test)"
id = "CUSTOM_PURE_DELETE_1"
supported_resources = ("aws_secretsmanager_secret",)
categories = (CheckCategories.GENERAL_SECURITY,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
actions = conf.get(TF_PLAN_RESOURCE_CHANGE_ACTIONS)
# Check for pure delete action
if isinstance(actions, list) and actions == ["delete"]:
return CheckResult.FAILED
return CheckResult.PASSED


scanner = SecretNotPureDeleted()
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{
"format_version": "1.1",
"terraform_version": "1.2.4",
"planned_values": {
"root_module": {
"resources": []
}
},
"resource_changes": [
{
"address": "aws_secretsmanager_secret.deleted",
"mode": "managed",
"type": "aws_secretsmanager_secret",
"name": "deleted",
"provider_name": "registry.terraform.io/hashicorp/aws",
"change": {
"actions": [
"delete"
],
"before": {
"arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:test-secret",
"description": "Test secret to be deleted",
"force_overwrite_replica_secret": false,
"kms_key_id": "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012",
"name": "test-secret",
"recovery_window_in_days": 30,
"tags": {
"Environment": "test"
}
},
"after": null,
"after_unknown": {},
"before_sensitive": {
"replica": [],
"rotation_rules": [],
"tags": {},
"tags_all": {}
},
"after_sensitive": false
}
}
],
"configuration": {
"provider_config": {
"aws": {
"name": "aws",
"full_name": "registry.terraform.io/hashicorp/aws",
"expressions": {
"profile": {
"constant_value": "dev2"
},
"region": {
"constant_value": "us-west-2"
}
}
}
},
"root_module": {
"resources": [
{
"address": "aws_secretsmanager_secret.deleted",
"mode": "managed",
"type": "aws_secretsmanager_secret",
"name": "deleted",
"provider_config_key": "aws",
"expressions": {
"name": {
"constant_value": "test-secret"
},
"kms_key_id": {
"constant_value": "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012"
}
},
"schema_version": 0
}
]
}
}
}
30 changes: 30 additions & 0 deletions tests/terraform/runner/test_plan_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,36 @@ def test_runner_extra_check(self):
failed_check = next(check for check in report.failed_checks if check.check_id == "CUSTOM_DELETE_1")
self.assertEqual(failed_check.details, ["some great details"])

def test_runner_pure_delete_check(self):
"""Test that resources with pure delete actions (not replacements) are processed correctly"""
# given
current_dir = Path(__file__).parent
tf_dir_path = str(current_dir / "resources/plan_with_pure_deleted_resources")
extra_checks_dir_path = [str(current_dir / "extra_tf_plan_checks")]

# when
report = Runner().run(
root_folder=tf_dir_path,
external_checks_dir=extra_checks_dir_path,
runner_filter=RunnerFilter(checks=["CUSTOM_PURE_DELETE_1"])
)

# then
summary = report.get_summary()

# CUSTOM_PURE_DELETE_1 should fail for the secret with pure delete action
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["passed"], 0)

# Verify the specific resource that failed
failed_resource_ids = [check.resource for check in report.failed_checks]
self.assertIn("aws_secretsmanager_secret.deleted", failed_resource_ids)

# Verify the check details
secret_check = next((c for c in report.failed_checks if c.check_id == "CUSTOM_PURE_DELETE_1"), None)
self.assertIsNotNone(secret_check)
self.assertEqual(secret_check.resource, "aws_secretsmanager_secret.deleted")

def test_runner_nested_child_modules_with_connections(self):
# given
tf_file_path = Path(__file__).parent / "resources/plan_nested_child_modules_with_connections/tfplan.json"
Expand Down