diff --git a/checkov/terraform/plan_runner.py b/checkov/terraform/plan_runner.py index eab251c33f..f8d9d4f988 100644 --- a/checkov/terraform/plan_runner.py +++ b/checkov/terraform/plan_runner.py @@ -30,7 +30,7 @@ from checkov.terraform.checks.resource.registry import resource_registry from checkov.terraform.context_parsers.registry import parser_registry from checkov.terraform.plan_parser import TF_PLAN_RESOURCE_ADDRESS -from checkov.terraform.plan_utils import create_definitions, build_definitions_context +from checkov.terraform.plan_utils import create_definitions, build_definitions_context, get_entity_id from checkov.terraform.deep_analysis_plan_graph_manager import DeepAnalysisGraphManager _TerraformPlanContext: TypeAlias = "dict[str, dict[str, Any]]" @@ -303,7 +303,7 @@ def get_entity_context(self, definition_path: list[str], full_file_path: str, en resource_type = definition_path[0] resource_name = definition_path[1] resource_type_dict = entity.get(resource_type, {}) - entity_id = resource_type_dict.get(resource_name, resource_type_dict).get(TF_PLAN_RESOURCE_ADDRESS) + entity_id = get_entity_id(resource_type_dict, resource_name) else: entity_id = definition_path[0] return cast("dict[str, Any]", self.context.get(full_file_path, {}).get(entity_id, {})) diff --git a/checkov/terraform/plan_utils.py b/checkov/terraform/plan_utils.py index f2d5aac1fe..3ed577d659 100644 --- a/checkov/terraform/plan_utils.py +++ b/checkov/terraform/plan_utils.py @@ -72,12 +72,16 @@ def build_definitions_context( for entity in entities: context_parser = parser_registry.context_parsers[block_type] definition_path = context_parser.get_entity_context_path(entity) - + entity_id: str if len(definition_path) > 1: resource_type = definition_path[0] resource_name = definition_path[1] resource_type_dict = entity.get(resource_type, {}) - entity_id = resource_type_dict.get(resource_name, resource_type_dict).get(TF_PLAN_RESOURCE_ADDRESS) + try: + entity_id = get_entity_id(resource_type_dict, resource_name) + except Exception as e: + logging.error(str(e)) + continue else: entity_id = definition_path[0] @@ -94,6 +98,17 @@ def build_definitions_context( return definitions_context +def get_entity_id(resource_type_dict: dict[str, Any], resource_name: str) -> str: + resource_dict = resource_type_dict.get(resource_name, resource_type_dict) + if isinstance(resource_dict, dict): + entity_id = resource_dict.get(TF_PLAN_RESOURCE_ADDRESS) + else: + entity_id = resource_type_dict.get(TF_PLAN_RESOURCE_ADDRESS) + if not entity_id: + raise Exception(f'Failed get_entity_id: {resource_name} does not have {TF_PLAN_RESOURCE_ADDRESS}') + return str(entity_id) + + def get_entity_context( definitions: dict[str, dict[str, list[dict[str, Any]]]], definitions_raw: dict[str, list[tuple[int, str]]], @@ -116,14 +131,22 @@ def get_entity_context( continue resource_name = definition_path[1] resource_definition = resource_type_dict.get(resource_name, resource_type_dict) - if resource_definition and resource_definition.get(TF_PLAN_RESOURCE_ADDRESS) == entity_id: - entity_context['start_line'] = resource_definition['start_line'][0] - entity_context['end_line'] = resource_definition['end_line'][0] - entity_context["code_lines"] = definitions_raw[full_file_path][ - entity_context["start_line"] : entity_context["end_line"] - ] - entity_context['address'] = resource_definition[TF_PLAN_RESOURCE_ADDRESS] + if not isinstance(resource_definition, dict): + entity_context = build_entity_context(resource_type_dict) + entity_context["code_lines"] = definitions_raw[full_file_path][entity_context["start_line"]: entity_context["end_line"]] return entity_context + elif resource_definition and resource_definition.get(TF_PLAN_RESOURCE_ADDRESS) == entity_id: + entity_context = build_entity_context(resource_definition) + entity_context["code_lines"] = definitions_raw[full_file_path][entity_context["start_line"]: entity_context["end_line"]] + return entity_context + return entity_context + + +def build_entity_context(resource_dict: dict[str, Any]) -> dict[str, Any]: + entity_context: dict[str, Any] = {} + entity_context['start_line'] = resource_dict['start_line'][0] + entity_context['end_line'] = resource_dict['end_line'][0] + entity_context['address'] = resource_dict[TF_PLAN_RESOURCE_ADDRESS] return entity_context diff --git a/tests/terraform/runner/test_plan_runner.py b/tests/terraform/runner/test_plan_runner.py index 13cf30706f..c1fd33b4fe 100644 --- a/tests/terraform/runner/test_plan_runner.py +++ b/tests/terraform/runner/test_plan_runner.py @@ -20,6 +20,7 @@ from checkov.terraform import TFDefinitionKey from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck from checkov.terraform.plan_runner import Runner, resource_registry +from checkov.terraform.plan_utils import get_entity_id @parameterized_class([ @@ -961,6 +962,12 @@ def tearDown(self) -> None: resource_registry.checks = deepcopy(self.orig_checks) BaseCheckRegistry._BaseCheckRegistry__all_registered_checks = deepcopy(self.orig_all_registered_checks) + def test_get_entity_id(self): + resource_type_dict = {'__address__': 'azure.storage_use_azuread', '__end_line__': [14], '__start_line__': [0], + 'alias': ['storage_use_azuread'], 'end_line': [14], 'start_line': [0], 'storage_use_azuread': True} + resource_name = "storage_use_azuread" + assert get_entity_id(resource_type_dict, resource_name) == 'azure.storage_use_azuread' + if __name__ == "__main__": unittest.main()