From 171618860b4889af81bca50fd857a6b69d2311dc Mon Sep 17 00:00:00 2001 From: Rob McGregor <38837341+exitcode0@users.noreply.github.com> Date: Thu, 6 Nov 2025 18:07:38 +1100 Subject: [PATCH] feat(terraform_plan): support scanning deleted resources Previously, it was not possible to check a resource's configuration when the plan indicated the resource would be deleted entirely. This prevented custom checks from enforcing conventions around deletions. We now process pure delete actions from resource_changes. The "before" state is made available as the resource configuration, with __change_actions__ set to ["delete"]. --- checkov/terraform/plan_parser.py | 104 ++++++++++++++++++ .../Terraform Plan Scanning.md | 24 +++- .../pure_delete_secret.py | 26 +++++ .../tfplan.json | 79 +++++++++++++ tests/terraform/runner/test_plan_runner.py | 30 +++++ 5 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 tests/terraform/runner/extra_tf_plan_checks/pure_delete_secret.py create mode 100644 tests/terraform/runner/resources/plan_with_pure_deleted_resources/tfplan.json diff --git a/checkov/terraform/plan_parser.py b/checkov/terraform/plan_parser.py index 985b96c6e5..d93272921f 100644 --- a/checkov/terraform/plan_parser.py +++ b/checkov/terraform/plan_parser.py @@ -503,6 +503,89 @@ def _add_references(obj: dict[str, Any], conf: dict[str, Any], return_resource: return_resource.setdefault(CustomAttributes.REFERENCES, []).append(conf_value["references"]) +def _process_deleted_resources( + template: dict[str, Any], + resource_changes: dict[str, dict[str, Any]], + processed_addresses: set[str], +) -> list[dict[str, dict[str, Any]]]: + """ + Process resources that have pure delete actions and don't appear in planned_values. + + When a resource is being purely deleted (not replaced), it doesn't appear in the + plan's planned_values section. However, it does exist in resource_changes with its + "before" state. This function extracts these resources so they can be checked by + policy rules that need to validate deletion conditions (e.g., ensuring a resource + is in an acceptable state before deletion). + + :param template: The full terraform plan template + :param resource_changes: Map of resource addresses to their change information + :param processed_addresses: Set of resource addresses already processed from planned_values + :return: List of resource blocks for deleted resources, formatted for checkov checks + """ + deleted_blocks: list[dict[str, dict[str, Any]]] = [] + + for address, change_info in resource_changes.items(): + # Avoid duplicate processing - resources in planned_values are already handled + if address in processed_addresses: + continue + + actions = change_info.get("change", {}).get("actions", []) + + # Only process pure deletes - other change types should already be in planned_values + if actions != ["delete"]: + continue + + resource_type = change_info.get("type") + resource_name = change_info.get("name") + mode = change_info.get("mode") + + if not resource_type or not resource_name: + continue + + # Data sources are not managed and don't need deletion validation + if mode != "managed": + continue + + # Use the "before" state to populate resource config for policy checks + before_values = change_info.get("change", {}).get("before") + if not before_values or not isinstance(before_values, dict): + continue + + # Find configuration to get expressions for proper value transformation + conf = next( + ( + x + for x in template.get("configuration", {}).get("root_module", {}).get("resources", []) + if x.get("type") == resource_type and x.get("name") == resource_name + ), + None, + ) + + # Transform before state using _hclify to match the format used for other resources + expressions = conf.get("expressions") if conf else None + resource_conf = _hclify( + obj=before_values, + conf=expressions, + resource_type=resource_type, + ) + + # Add special attributes for policy checks to detect and validate deletions + resource_conf[TF_PLAN_RESOURCE_ADDRESS] = address # e.g., "aws_s3_bucket.example" + resource_conf[TF_PLAN_RESOURCE_CHANGE_ACTIONS] = actions # ["delete"] + resource_conf[TF_PLAN_RESOURCE_CHANGE_KEYS] = resource_changes.get(address, {}).get(TF_PLAN_RESOURCE_CHANGE_KEYS, []) + + # Create the nested dict structure: { "resource_type": { "resource_name": { ...config... } } } + resource_block = { + resource_type: { + resource_name: resource_conf + } + } + + deleted_blocks.append(resource_block) + + return deleted_blocks + + def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tuple[Optional[Dict[str, Any]], Optional[List[Tuple[int, str]]]]: """ :type tf_plan_file: str - path to plan file @@ -517,6 +600,9 @@ def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tupl resource_changes = _get_resource_changes(template=template) + # Track which resources we've already processed from planned_values + processed_addresses = set() + for resource in template.get("planned_values", {}).get("root_module", {}).get("resources", []): conf = next( ( @@ -536,6 +622,12 @@ def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tupl tf_definition["resource"].append(resource_block) elif block_type == "data": tf_definition["data"].append(resource_block) + + # Track this resource address as processed (regardless of preparation status) + # to prevent duplicate processing as a pure delete + if resource.get("address"): + processed_addresses.add(resource["address"]) + child_modules = template.get("planned_values", {}).get("root_module", {}).get("child_modules", []) root_module_conf = template.get("configuration", {}).get("root_module", {}) # Terraform supports modules within modules so we need to search @@ -547,6 +639,18 @@ def parse_tf_plan(tf_plan_file: str, out_parsing_errors: Dict[str, str]) -> Tupl ) for block_type, resource_blocks in module_blocks.items(): tf_definition[block_type].extend(resource_blocks) + + # Process deleted resources that weren't in planned_values + # Note: Module resource addresses from planned_values are not explicitly tracked above. + # Based on Terraform's plan structure, resources in planned_values should not simultaneously + # appear as pure deletes, but the deduplication check handles any edge cases. + deleted_blocks = _process_deleted_resources( + template=template, + resource_changes=resource_changes, + processed_addresses=processed_addresses, + ) + tf_definition["resource"].extend(deleted_blocks) + return tf_definition, template_lines diff --git a/docs/7.Scan Examples/Terraform Plan Scanning.md b/docs/7.Scan Examples/Terraform Plan Scanning.md index 23be257ea3..a6f168c489 100644 --- a/docs/7.Scan Examples/Terraform Plan Scanning.md +++ b/docs/7.Scan Examples/Terraform Plan Scanning.md @@ -52,7 +52,13 @@ Following checks will be ignored; To check if a resource will be deleted or changed (further change values can be found [here](https://www.terraform.io/internals/json-format#change-representation)) the change actions values can be accessed via the attribute name `__change_actions__`. -Ex. Python +Checkov supports checking both: +- **Replacement operations**: Resources with actions `["create", "delete"]` that appear in `planned_values` +- **Pure delete operations**: Resources with actions `["delete"]` that only exist in `resource_changes` + +For pure deletions, the resource configuration will contain the "before" state (the resource's values before deletion), allowing checks to validate deletion conditions. + +Ex. Python - Prevent deletion of specific resources ```python def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: actions = conf.get("__change_actions__") @@ -61,6 +67,22 @@ Ex. Python return CheckResult.PASSED ``` +Ex. Python - Validate resource state before deletion +```python + def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + actions = conf.get("__change_actions__") + # Only check resources being deleted + if isinstance(actions, list) and "delete" in actions: + # Check the resource's status before deletion + status = conf.get("status") + if isinstance(status, list): + status = status[0] if status else None + # Fail if deleting an active resource + if status == "ACTIVE": + return CheckResult.FAILED + return CheckResult.PASSED +``` + Ex. YAML ```yaml cond_type: attribute diff --git a/tests/terraform/runner/extra_tf_plan_checks/pure_delete_secret.py b/tests/terraform/runner/extra_tf_plan_checks/pure_delete_secret.py new file mode 100644 index 0000000000..1ada03193f --- /dev/null +++ b/tests/terraform/runner/extra_tf_plan_checks/pure_delete_secret.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from typing import Any + +from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck +from checkov.terraform.plan_parser import TF_PLAN_RESOURCE_CHANGE_ACTIONS + + +class SecretNotPureDeleted(BaseResourceCheck): + def __init__(self) -> None: + name = "Ensure Secret is not deleted (pure delete test)" + id = "CUSTOM_PURE_DELETE_1" + supported_resources = ("aws_secretsmanager_secret",) + categories = (CheckCategories.GENERAL_SECURITY,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + actions = conf.get(TF_PLAN_RESOURCE_CHANGE_ACTIONS) + # Check for pure delete action + if isinstance(actions, list) and actions == ["delete"]: + return CheckResult.FAILED + return CheckResult.PASSED + + +scanner = SecretNotPureDeleted() diff --git a/tests/terraform/runner/resources/plan_with_pure_deleted_resources/tfplan.json b/tests/terraform/runner/resources/plan_with_pure_deleted_resources/tfplan.json new file mode 100644 index 0000000000..0dea000b9c --- /dev/null +++ b/tests/terraform/runner/resources/plan_with_pure_deleted_resources/tfplan.json @@ -0,0 +1,79 @@ +{ + "format_version": "1.1", + "terraform_version": "1.2.4", + "planned_values": { + "root_module": { + "resources": [] + } + }, + "resource_changes": [ + { + "address": "aws_secretsmanager_secret.deleted", + "mode": "managed", + "type": "aws_secretsmanager_secret", + "name": "deleted", + "provider_name": "registry.terraform.io/hashicorp/aws", + "change": { + "actions": [ + "delete" + ], + "before": { + "arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:test-secret", + "description": "Test secret to be deleted", + "force_overwrite_replica_secret": false, + "kms_key_id": "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012", + "name": "test-secret", + "recovery_window_in_days": 30, + "tags": { + "Environment": "test" + } + }, + "after": null, + "after_unknown": {}, + "before_sensitive": { + "replica": [], + "rotation_rules": [], + "tags": {}, + "tags_all": {} + }, + "after_sensitive": false + } + } + ], + "configuration": { + "provider_config": { + "aws": { + "name": "aws", + "full_name": "registry.terraform.io/hashicorp/aws", + "expressions": { + "profile": { + "constant_value": "dev2" + }, + "region": { + "constant_value": "us-west-2" + } + } + } + }, + "root_module": { + "resources": [ + { + "address": "aws_secretsmanager_secret.deleted", + "mode": "managed", + "type": "aws_secretsmanager_secret", + "name": "deleted", + "provider_config_key": "aws", + "expressions": { + "name": { + "constant_value": "test-secret" + }, + "kms_key_id": { + "constant_value": "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012" + } + }, + "schema_version": 0 + } + ] + } + } +} diff --git a/tests/terraform/runner/test_plan_runner.py b/tests/terraform/runner/test_plan_runner.py index c1fd33b4fe..07e34751cc 100644 --- a/tests/terraform/runner/test_plan_runner.py +++ b/tests/terraform/runner/test_plan_runner.py @@ -694,6 +694,36 @@ def test_runner_extra_check(self): failed_check = next(check for check in report.failed_checks if check.check_id == "CUSTOM_DELETE_1") self.assertEqual(failed_check.details, ["some great details"]) + def test_runner_pure_delete_check(self): + """Test that resources with pure delete actions (not replacements) are processed correctly""" + # given + current_dir = Path(__file__).parent + tf_dir_path = str(current_dir / "resources/plan_with_pure_deleted_resources") + extra_checks_dir_path = [str(current_dir / "extra_tf_plan_checks")] + + # when + report = Runner().run( + root_folder=tf_dir_path, + external_checks_dir=extra_checks_dir_path, + runner_filter=RunnerFilter(checks=["CUSTOM_PURE_DELETE_1"]) + ) + + # then + summary = report.get_summary() + + # CUSTOM_PURE_DELETE_1 should fail for the secret with pure delete action + self.assertEqual(summary["failed"], 1) + self.assertEqual(summary["passed"], 0) + + # Verify the specific resource that failed + failed_resource_ids = [check.resource for check in report.failed_checks] + self.assertIn("aws_secretsmanager_secret.deleted", failed_resource_ids) + + # Verify the check details + secret_check = next((c for c in report.failed_checks if c.check_id == "CUSTOM_PURE_DELETE_1"), None) + self.assertIsNotNone(secret_check) + self.assertEqual(secret_check.resource, "aws_secretsmanager_secret.deleted") + def test_runner_nested_child_modules_with_connections(self): # given tf_file_path = Path(__file__).parent / "resources/plan_nested_child_modules_with_connections/tfplan.json"