diff --git a/.gitignore b/.gitignore index 312939b161..5ac2de6bfe 100644 --- a/.gitignore +++ b/.gitignore @@ -171,7 +171,8 @@ fabric.properties # test assets that get created locally (20* refers to the start of a date, so this covers us for 78 years) tests/20* -# vim +# vim +.*.sw? .vim/ .vimspector.json !tests/terraform/graph/variable_rendering/test_resources/tfvar_module_variables/modules/instance diff --git a/checkov/terraform/graph_builder/graph_to_tf_definitions.py b/checkov/terraform/graph_builder/graph_to_tf_definitions.py index fece722e31..b6b88b8259 100644 --- a/checkov/terraform/graph_builder/graph_to_tf_definitions.py +++ b/checkov/terraform/graph_builder/graph_to_tf_definitions.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import logging from typing import List, Dict, Any, Tuple from checkov.common.graph.graph_builder import CustomAttributes @@ -15,20 +16,18 @@ def convert_graph_vertices_to_tf_definitions( tf_definitions: Dict[TFDefinitionKey, Dict[str, Any]] = {} breadcrumbs: Dict[str, Dict[str, Any]] = {} for vertex in vertices: - block_path = vertex.path - if not os.path.isfile(block_path): - print(f"tried to convert vertex to tf_definitions but its path does not exist: {vertex}") + if vertex.block_type == BlockType.TF_VARIABLE: continue - block_type = vertex.block_type - if block_type == BlockType.TF_VARIABLE: + + if not os.path.isfile(vertex.path): + logging.debug(f'tried to convert vertex to tf_definitions but its path does not exist: {vertex}') continue - tf_path = TFDefinitionKey(file_path=block_path) + tf_path = TFDefinitionKey(file_path=vertex.path) if vertex.source_module_object: - tf_path = TFDefinitionKey(file_path=block_path, tf_source_modules=vertex.source_module_object) - tf_definitions.setdefault(tf_path, {}).setdefault(block_type, []).append(vertex.config) - relative_block_path = f"/{os.path.relpath(block_path, root_folder)}" - add_breadcrumbs(vertex, breadcrumbs, relative_block_path) + tf_path = TFDefinitionKey(file_path=vertex.path, tf_source_modules=vertex.source_module_object) + tf_definitions.setdefault(tf_path, {}).setdefault(vertex.block_type, []).append(vertex.config) + add_breadcrumbs(vertex, breadcrumbs, f'/{os.path.relpath(vertex.path, root_folder)}') return tf_definitions, breadcrumbs diff --git a/checkov/terraform/graph_builder/local_graph.py b/checkov/terraform/graph_builder/local_graph.py index dd2b641e07..59297bc39c 100644 --- a/checkov/terraform/graph_builder/local_graph.py +++ b/checkov/terraform/graph_builder/local_graph.py @@ -370,14 +370,18 @@ def _build_edges_for_vertex(self, origin_node_index: int, vertex: TerraformBlock if target_variable is not None: self.create_edge(target_variable, origin_node_index, "default", cross_variable_edges) elif vertex.block_type == BlockType.TF_VARIABLE: - # Assuming the tfvars file is in the same directory as the variables file (best practice) - target_variable = 0 - for index in self.vertices_block_name_map.get(BlockType.VARIABLE, {}).get(vertex.name, []): - if self.get_dirname(self.vertices[index].path) == self.get_dirname(vertex.path): - target_variable = index - break - if target_variable: - self.create_edge(target_variable, origin_node_index, "default", cross_variable_edges) + # Match tfvars based on the directory for which they were loaded + target_variable = None + ldir = vertex.attributes.get('load_dir', None) + if ldir: + for index in self.vertices_block_name_map.get(BlockType.VARIABLE, {}).get(vertex.name, []): + if self.get_dirname(self.vertices[index].path) == ldir: + target_variable = index + break + + if target_variable is not None: + self.create_edge(target_variable, origin_node_index, 'default', cross_variable_edges) + return def _create_edge_from_reference(self, attribute_key: Any, origin_node_index: int, dest_node_index: int, sub_values: List[Any], vertex_reference: TerraformVertexReference, diff --git a/checkov/terraform/graph_builder/variable_rendering/renderer.py b/checkov/terraform/graph_builder/variable_rendering/renderer.py index 62f2c93b22..f70ef0eb0c 100644 --- a/checkov/terraform/graph_builder/variable_rendering/renderer.py +++ b/checkov/terraform/graph_builder/variable_rendering/renderer.py @@ -118,11 +118,9 @@ def evaluate_vertex_attribute_from_edge(self, edge_list: List[Edge]) -> None: origin_vertex.block_type == BlockType.VARIABLE and destination_vertex.block_type == BlockType.TF_VARIABLE ): - # evaluate the last specified variable based on .tfvars precedence - destination_vertex = list(filter(lambda v: v.block_type == BlockType.TF_VARIABLE, map(lambda e: self.local_graph.vertices[e.dest], edge_list)))[-1] self.update_evaluated_value( changed_attribute_key=edge.label, - changed_attribute_value=destination_vertex.attributes["default"], + changed_attribute_value=destination_vertex.attributes['default'], vertex=edge.origin, change_origin_id=edge.dest, attribute_at_dest=edge.label, diff --git a/checkov/terraform/tf_parser.py b/checkov/terraform/tf_parser.py index 30555659cf..f57528024f 100644 --- a/checkov/terraform/tf_parser.py +++ b/checkov/terraform/tf_parser.py @@ -52,7 +52,7 @@ def __init__(self, module_class: type[Module] = Module) -> None: self.external_modules_source_map: Dict[Tuple[str, str], str] = {} self.module_address_map: Dict[Tuple[str, str], str] = {} self.loaded_files_map: dict[str, dict[str, list[dict[str, Any]]] | None] = {} - self.external_variables_data: list[tuple[str, Any, str]] = [] + self.external_vars: dict[str, dict[str, tuple[Any, str]]] = {} self.temp_tf_definition: dict[str, Any] = {} def _init(self, directory: str, @@ -164,7 +164,6 @@ def _internal_dir_load( if not data: continue self.out_definitions[TFDefinitionKey(file)] = data - self.add_external_vars_from_data(data, file) force_final_module_load = False for i in range(0, 10): @@ -183,24 +182,24 @@ def _internal_dir_load( def _load_files( self, - files: list[os.DirEntry[str]], + files: list[str], ) -> list[tuple[str, dict[str, list[dict[str, Any]]] | None]]: def _load_file( - file: os.DirEntry[str] + file: str ) -> tuple[tuple[str, dict[str, list[dict[str, Any]]] | None], dict[str, Exception]]: parsing_errors: dict[str, Exception] = {} result = load_or_die_quietly(file, parsing_errors) for path, e in parsing_errors.items(): parsing_errors[path] = e - return (file.path, result), parsing_errors + return (file, result), parsing_errors files_to_data: list[tuple[str, dict[str, list[dict[str, Any]]] | None]] = [] files_to_parse = [] for file in files: - data = self.loaded_files_map.get(file.path) + data = self.loaded_files_map.get(file) if data: - files_to_data.append((file.path, data)) + files_to_data.append((file, data)) else: files_to_parse.append(file) @@ -489,7 +488,7 @@ def parse_hcl_module_from_multi_tf_definitions( source_dir=source_dir, external_modules_source_map=self.external_modules_source_map, ) - self.add_tfvars_with_source_dir(module, source, source_dir) + self.add_tfvars(module, source) copy_of_tf_definitions = pickle_deepcopy(tf_definitions) for tf_def in copy_of_tf_definitions: for file_path, blocks in tf_def.items(): @@ -524,21 +523,13 @@ def get_new_nested_module_key( return get_tf_definition_object_from_module_dependency(key, nested_key, module_name) def add_tfvars(self, module: Module, source: str) -> None: - if not self.external_variables_data: + if not self.external_vars: return - for (var_name, default, path) in self.external_variables_data: - if ".tfvars" in path: - block = [{var_name: {"default": default}}] - module.add_blocks(BlockType.TF_VARIABLE, block, path, source) - def add_tfvars_with_source_dir(self, module: Module, source: str, source_dir: str) -> None: - if not self.external_variables_data: - return - for var_name, default, path in self.external_variables_data: - if ".tfvars" in path: - if Path(source_dir) in Path(path).parents: - block = [{var_name: {"default": default}}] - module.add_blocks(BlockType.TF_VARIABLE, block, path, source) + for load_dir, i in self.external_vars.items(): + for name, (default, path) in i.items(): + block = [{name: {'default': default, 'load_dir': load_dir}}] + module.add_blocks(BlockType.TF_VARIABLE, block, path, source) def get_dirname(self, path: TFDefinitionKey) -> str: file_path = path.file_path @@ -567,31 +558,21 @@ def get_module_source( os.path.join(os.path.dirname(remove_module_dependency_from_path(file_to_load)), source)) return source - def add_external_vars_from_data(self, data: dict[str, Any], file: str) -> None: - var_blocks = data.get("variable") - if var_blocks and isinstance(var_blocks, list): - for var_block in var_blocks: - if not isinstance(var_block, dict): - continue - for var_name, var_definition in var_block.items(): - if not isinstance(var_definition, dict): - continue - - default_value = var_definition.get("default") - if default_value is not None and isinstance(default_value, list): - self.external_variables_data.append((var_name, default_value[0], file)) - def handle_variables( self, dir_contents: list[os.DirEntry[str]], vars_files: None | list[str], specified_vars: Mapping[str, str] | None, - ) -> list[os.DirEntry[str]]: + ) -> list[str]: tf_files_to_load = [] - hcl_tfvars: Optional[os.DirEntry[str]] = None - json_tfvars: Optional[os.DirEntry[str]] = None - auto_vars_files: List[os.DirEntry[str]] = [] - explicit_var_files: List[os.DirEntry[str]] = [] + hcl_tfvars: Optional[str] = None + json_tfvars: Optional[str] = None + auto_vars_files: List[str] = [] + external_vars: dict[str, tuple[Any, str]] = {} + + if not dir_contents: + return [] + for file in dir_contents: try: if not file.is_file(): @@ -600,44 +581,56 @@ def handle_variables( continue if file.name == "terraform.tfvars.json": - json_tfvars = file + json_tfvars = file.path elif file.name == "terraform.tfvars": - hcl_tfvars = file + hcl_tfvars = file.path elif file.name.endswith(".auto.tfvars.json") or file.name.endswith(".auto.tfvars"): - auto_vars_files.append(file) - elif vars_files and file.path in vars_files: - explicit_var_files.append(file) + auto_vars_files.append(file.path) elif file.name.endswith(".tf") or file.name.endswith('.hcl'): # TODO: add support for .tf.json - tf_files_to_load.append(file) + tf_files_to_load.append(file.path) + # Terraform Variable Definition Precedence + # 1. Environment vars for key, value in self.env_vars.items(): - if not key.startswith("TF_VAR_"): - continue - self.external_variables_data.append((key[7:], value, f"env:{key}")) + if key.startswith('TF_VAR_'): + external_vars[key[7:]] = (value, f'env:{key}') + + # 2. terraform.tfvars if hcl_tfvars: # terraform.tfvars data = load_or_die_quietly(hcl_tfvars, self.out_parsing_errors, clean_definitions=False) if data: - self.external_variables_data.extend([(k, safe_index(v, 0), hcl_tfvars.path) for k, v in data.items()]) + for k, v in data.items(): + external_vars[k] = (safe_index(v, 0), hcl_tfvars) + + # 3. terraform.tfvars.json if json_tfvars: # terraform.tfvars.json data = load_or_die_quietly(json_tfvars, self.out_parsing_errors) if data: - self.external_variables_data.extend([(k, v, json_tfvars.path) for k, v in data.items()]) + for k, v in data.items(): + external_vars[k] = (v, json_tfvars) + # 4. *.auto.tfvars / *.auto.tfvars.json auto_var_files_to_data = self._load_files(auto_vars_files) for var_file, data in sorted(auto_var_files_to_data, key=lambda x: x[0]): if data: - self.external_variables_data.extend([(k, v, var_file) for k, v in data.items()]) + for k, v in data.items(): + external_vars[k] = (v, var_file) - explicit_var_files_to_data = self._load_files(explicit_var_files) - # it's possible that os.scandir returned the var files in a different order than they were specified + # 5. --var-file arguments if vars_files: - for var_file, data in sorted(explicit_var_files_to_data, key=lambda x: vars_files.index(x[0])): + for var_file, data in self._load_files(vars_files): if data: - self.external_variables_data.extend([(k, v, var_file) for k, v in data.items()]) + for k, v in data.items(): + external_vars[k] = (v, var_file) - if specified_vars: # specified - self.external_variables_data.extend([(k, v, "manual specification") for k, v in specified_vars.items()]) + # Prevent specified vars from being overridden by tfvars + if specified_vars: + for k in specified_vars.keys(): + if k in external_vars: + del external_vars[k] + if external_vars: + self.external_vars[os.path.dirname(dir_contents[0].path)] = external_vars return tf_files_to_load @staticmethod diff --git a/tests/terraform/graph/variable_rendering/test_render_scenario.py b/tests/terraform/graph/variable_rendering/test_render_scenario.py index 0b98fd87b2..0e5ebfb85d 100644 --- a/tests/terraform/graph/variable_rendering/test_render_scenario.py +++ b/tests/terraform/graph/variable_rendering/test_render_scenario.py @@ -183,6 +183,9 @@ def test_tfvars(self): } self.go("tfvars", vars_files=['other3.tfvars', 'other2.tfvars'], different_expected=different_expected) + def test_tfvars_outside_dir(self): + self.go('tfvars_outside_dir', vars_files=['../tfvars/other1.tfvars']) + def test_account_dirs_and_modules(self): self.go("account_dirs_and_modules") diff --git a/tests/terraform/parser/resources/parser_scenarios/tfvars_outside_dir/expected.json b/tests/terraform/parser/resources/parser_scenarios/tfvars_outside_dir/expected.json new file mode 100644 index 0000000000..006eb902e2 --- /dev/null +++ b/tests/terraform/parser/resources/parser_scenarios/tfvars_outside_dir/expected.json @@ -0,0 +1,18 @@ +{ + "{\"file_path\": \"main.tf\", \"tf_source_modules\": null}": { + "resource": [ + { + "aws_s3_bucket": { + "my_bucket": { + "bucket": [ + "xyz" + ], + "__start_line__": 5, + "__end_line__": 7, + "__address__": "aws_s3_bucket.my_bucket" + } + } + } + ] + } +} diff --git a/tests/terraform/parser/resources/parser_scenarios/tfvars_outside_dir/main.tf b/tests/terraform/parser/resources/parser_scenarios/tfvars_outside_dir/main.tf new file mode 100644 index 0000000000..dfd04f1668 --- /dev/null +++ b/tests/terraform/parser/resources/parser_scenarios/tfvars_outside_dir/main.tf @@ -0,0 +1,7 @@ +variable "other_var_1" { + default = "abc" +} + +resource "aws_s3_bucket" "my_bucket" { + bucket = "${var.other_var_1}" +} diff --git a/tests/terraform/parser/test_new_parser_modules.py b/tests/terraform/parser/test_new_parser_modules.py index 4d722b3e80..9a5a2f702e 100644 --- a/tests/terraform/parser/test_new_parser_modules.py +++ b/tests/terraform/parser/test_new_parser_modules.py @@ -110,14 +110,7 @@ def test_load_local_module_new_parser(self): assert out_definitions[main_key]['module'][1]['mod2']['__resolved__'] == [key_idx_1] assert parser.external_modules_source_map == {(os.path.join(directory, 'module'), 'latest'): os.path.join(directory, 'module')} - assert parser.external_variables_data == [ - ('versioning', True, 'manual specification'), - ('__start_line__', 1, 'manual specification'), - ('__end_line__', 4, 'manual specification'), - ('versioning', False, 'manual specification'), - ('__start_line__', 6, 'manual specification'), - ('__end_line__', 9, 'manual specification') - ] + assert parser.external_vars == {} assert parser.keys_to_remove == {TFDefinitionKey(file_path=module_path)} assert parser._parsed_directories == { directory,