这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checkov/terraform/plan_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from checkov.terraform.checks.resource.registry import resource_registry
from checkov.terraform.context_parsers.registry import parser_registry
from checkov.terraform.plan_parser import TF_PLAN_RESOURCE_ADDRESS
from checkov.terraform.plan_utils import create_definitions, build_definitions_context
from checkov.terraform.plan_utils import create_definitions, build_definitions_context, get_entity_id
from checkov.terraform.deep_analysis_plan_graph_manager import DeepAnalysisGraphManager

_TerraformPlanContext: TypeAlias = "dict[str, dict[str, Any]]"
Expand Down Expand Up @@ -303,7 +303,7 @@ def get_entity_context(self, definition_path: list[str], full_file_path: str, en
resource_type = definition_path[0]
resource_name = definition_path[1]
resource_type_dict = entity.get(resource_type, {})
entity_id = resource_type_dict.get(resource_name, resource_type_dict).get(TF_PLAN_RESOURCE_ADDRESS)
entity_id = get_entity_id(resource_type_dict, resource_name)
else:
entity_id = definition_path[0]
return cast("dict[str, Any]", self.context.get(full_file_path, {}).get(entity_id, {}))
Expand Down
41 changes: 32 additions & 9 deletions checkov/terraform/plan_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ def build_definitions_context(
for entity in entities:
context_parser = parser_registry.context_parsers[block_type]
definition_path = context_parser.get_entity_context_path(entity)

entity_id: str
if len(definition_path) > 1:
resource_type = definition_path[0]
resource_name = definition_path[1]
resource_type_dict = entity.get(resource_type, {})
entity_id = resource_type_dict.get(resource_name, resource_type_dict).get(TF_PLAN_RESOURCE_ADDRESS)
try:
entity_id = get_entity_id(resource_type_dict, resource_name)
except Exception as e:
logging.error(str(e))
continue
else:
entity_id = definition_path[0]

Expand All @@ -94,6 +98,17 @@ def build_definitions_context(
return definitions_context


def get_entity_id(resource_type_dict: dict[str, Any], resource_name: str) -> str:
resource_dict = resource_type_dict.get(resource_name, resource_type_dict)
if isinstance(resource_dict, dict):
entity_id = resource_dict.get(TF_PLAN_RESOURCE_ADDRESS)
else:
entity_id = resource_type_dict.get(TF_PLAN_RESOURCE_ADDRESS)
if not entity_id:
raise Exception(f'Failed get_entity_id: {resource_name} does not have {TF_PLAN_RESOURCE_ADDRESS}')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to crash here? It appears this changes the old behavior which theoretically allowed None values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can allow None values but it can lead to unexpected behavior. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue with throwing an exception here is that we fail the entire definitions context building, not just for this entity. If you do decide to throw an exception here, perhaps you can catch it on the outside (log warning) and continue to the next block ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return str(entity_id)


def get_entity_context(
definitions: dict[str, dict[str, list[dict[str, Any]]]],
definitions_raw: dict[str, list[tuple[int, str]]],
Expand All @@ -116,14 +131,22 @@ def get_entity_context(
continue
resource_name = definition_path[1]
resource_definition = resource_type_dict.get(resource_name, resource_type_dict)
if resource_definition and resource_definition.get(TF_PLAN_RESOURCE_ADDRESS) == entity_id:
entity_context['start_line'] = resource_definition['start_line'][0]
entity_context['end_line'] = resource_definition['end_line'][0]
entity_context["code_lines"] = definitions_raw[full_file_path][
entity_context["start_line"] : entity_context["end_line"]
]
entity_context['address'] = resource_definition[TF_PLAN_RESOURCE_ADDRESS]
if not isinstance(resource_definition, dict):
entity_context = build_entity_context(resource_type_dict)
entity_context["code_lines"] = definitions_raw[full_file_path][entity_context["start_line"]: entity_context["end_line"]]
return entity_context
elif resource_definition and resource_definition.get(TF_PLAN_RESOURCE_ADDRESS) == entity_id:
entity_context = build_entity_context(resource_definition)
entity_context["code_lines"] = definitions_raw[full_file_path][entity_context["start_line"]: entity_context["end_line"]]
return entity_context
return entity_context


def build_entity_context(resource_dict: dict[str, Any]) -> dict[str, Any]:
entity_context: dict[str, Any] = {}
entity_context['start_line'] = resource_dict['start_line'][0]
entity_context['end_line'] = resource_dict['end_line'][0]
entity_context['address'] = resource_dict[TF_PLAN_RESOURCE_ADDRESS]
return entity_context


Expand Down
7 changes: 7 additions & 0 deletions tests/terraform/runner/test_plan_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from checkov.terraform import TFDefinitionKey
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.terraform.plan_runner import Runner, resource_registry
from checkov.terraform.plan_utils import get_entity_id


@parameterized_class([
Expand Down Expand Up @@ -961,6 +962,12 @@ def tearDown(self) -> None:
resource_registry.checks = deepcopy(self.orig_checks)
BaseCheckRegistry._BaseCheckRegistry__all_registered_checks = deepcopy(self.orig_all_registered_checks)

def test_get_entity_id(self):
resource_type_dict = {'__address__': 'azure.storage_use_azuread', '__end_line__': [14], '__start_line__': [0],
'alias': ['storage_use_azuread'], 'end_line': [14], 'start_line': [0], 'storage_use_azuread': True}
resource_name = "storage_use_azuread"
assert get_entity_id(resource_type_dict, resource_name) == 'azure.storage_use_azuread'


if __name__ == "__main__":
unittest.main()
Loading