这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ fabric.properties

# test assets that get created locally (20* refers to the start of a date, so this covers us for 78 years)
tests/20*
# vim
# vim
.*.sw?
.vim/
.vimspector.json
!tests/terraform/graph/variable_rendering/test_resources/tfvar_module_variables/modules/instance
Expand Down
19 changes: 9 additions & 10 deletions checkov/terraform/graph_builder/graph_to_tf_definitions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import logging
from typing import List, Dict, Any, Tuple

from checkov.common.graph.graph_builder import CustomAttributes
Expand All @@ -15,20 +16,18 @@ def convert_graph_vertices_to_tf_definitions(
tf_definitions: Dict[TFDefinitionKey, Dict[str, Any]] = {}
breadcrumbs: Dict[str, Dict[str, Any]] = {}
for vertex in vertices:
block_path = vertex.path
if not os.path.isfile(block_path):
print(f"tried to convert vertex to tf_definitions but its path does not exist: {vertex}")
if vertex.block_type == BlockType.TF_VARIABLE:
continue
block_type = vertex.block_type
if block_type == BlockType.TF_VARIABLE:

if not os.path.isfile(vertex.path):
logging.debug(f'tried to convert vertex to tf_definitions but its path does not exist: {vertex}')
continue

tf_path = TFDefinitionKey(file_path=block_path)
tf_path = TFDefinitionKey(file_path=vertex.path)
if vertex.source_module_object:
tf_path = TFDefinitionKey(file_path=block_path, tf_source_modules=vertex.source_module_object)
tf_definitions.setdefault(tf_path, {}).setdefault(block_type, []).append(vertex.config)
relative_block_path = f"/{os.path.relpath(block_path, root_folder)}"
add_breadcrumbs(vertex, breadcrumbs, relative_block_path)
tf_path = TFDefinitionKey(file_path=vertex.path, tf_source_modules=vertex.source_module_object)
tf_definitions.setdefault(tf_path, {}).setdefault(vertex.block_type, []).append(vertex.config)
add_breadcrumbs(vertex, breadcrumbs, f'/{os.path.relpath(vertex.path, root_folder)}')
return tf_definitions, breadcrumbs


Expand Down
20 changes: 12 additions & 8 deletions checkov/terraform/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,14 +370,18 @@ def _build_edges_for_vertex(self, origin_node_index: int, vertex: TerraformBlock
if target_variable is not None:
self.create_edge(target_variable, origin_node_index, "default", cross_variable_edges)
elif vertex.block_type == BlockType.TF_VARIABLE:
# Assuming the tfvars file is in the same directory as the variables file (best practice)
target_variable = 0
for index in self.vertices_block_name_map.get(BlockType.VARIABLE, {}).get(vertex.name, []):
if self.get_dirname(self.vertices[index].path) == self.get_dirname(vertex.path):
target_variable = index
break
if target_variable:
self.create_edge(target_variable, origin_node_index, "default", cross_variable_edges)
# Match tfvars based on the directory for which they were loaded
target_variable = None
ldir = vertex.attributes.get('load_dir', None)
if ldir:
for index in self.vertices_block_name_map.get(BlockType.VARIABLE, {}).get(vertex.name, []):
if self.get_dirname(self.vertices[index].path) == ldir:
target_variable = index
break

if target_variable is not None:
self.create_edge(target_variable, origin_node_index, 'default', cross_variable_edges)
return

def _create_edge_from_reference(self, attribute_key: Any, origin_node_index: int, dest_node_index: int,
sub_values: List[Any], vertex_reference: TerraformVertexReference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ def evaluate_vertex_attribute_from_edge(self, edge_list: List[Edge]) -> None:
origin_vertex.block_type == BlockType.VARIABLE
and destination_vertex.block_type == BlockType.TF_VARIABLE
):
# evaluate the last specified variable based on .tfvars precedence
destination_vertex = list(filter(lambda v: v.block_type == BlockType.TF_VARIABLE, map(lambda e: self.local_graph.vertices[e.dest], edge_list)))[-1]
self.update_evaluated_value(
changed_attribute_key=edge.label,
changed_attribute_value=destination_vertex.attributes["default"],
changed_attribute_value=destination_vertex.attributes['default'],
vertex=edge.origin,
change_origin_id=edge.dest,
attribute_at_dest=edge.label,
Expand Down
109 changes: 51 additions & 58 deletions checkov/terraform/tf_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(self, module_class: type[Module] = Module) -> None:
self.external_modules_source_map: Dict[Tuple[str, str], str] = {}
self.module_address_map: Dict[Tuple[str, str], str] = {}
self.loaded_files_map: dict[str, dict[str, list[dict[str, Any]]] | None] = {}
self.external_variables_data: list[tuple[str, Any, str]] = []
self.external_vars: dict[str, dict[str, tuple[Any, str]]] = {}
self.temp_tf_definition: dict[str, Any] = {}

def _init(self, directory: str,
Expand Down Expand Up @@ -164,7 +164,6 @@ def _internal_dir_load(
if not data:
continue
self.out_definitions[TFDefinitionKey(file)] = data
self.add_external_vars_from_data(data, file)

force_final_module_load = False
for i in range(0, 10):
Expand All @@ -183,24 +182,24 @@ def _internal_dir_load(

def _load_files(
self,
files: list[os.DirEntry[str]],
files: list[str],
) -> list[tuple[str, dict[str, list[dict[str, Any]]] | None]]:
def _load_file(
file: os.DirEntry[str]
file: str
) -> tuple[tuple[str, dict[str, list[dict[str, Any]]] | None], dict[str, Exception]]:
parsing_errors: dict[str, Exception] = {}
result = load_or_die_quietly(file, parsing_errors)
for path, e in parsing_errors.items():
parsing_errors[path] = e

return (file.path, result), parsing_errors
return (file, result), parsing_errors

files_to_data: list[tuple[str, dict[str, list[dict[str, Any]]] | None]] = []
files_to_parse = []
for file in files:
data = self.loaded_files_map.get(file.path)
data = self.loaded_files_map.get(file)
if data:
files_to_data.append((file.path, data))
files_to_data.append((file, data))
else:
files_to_parse.append(file)

Expand Down Expand Up @@ -489,7 +488,7 @@ def parse_hcl_module_from_multi_tf_definitions(
source_dir=source_dir,
external_modules_source_map=self.external_modules_source_map,
)
self.add_tfvars_with_source_dir(module, source, source_dir)
self.add_tfvars(module, source)
copy_of_tf_definitions = pickle_deepcopy(tf_definitions)
for tf_def in copy_of_tf_definitions:
for file_path, blocks in tf_def.items():
Expand Down Expand Up @@ -524,21 +523,13 @@ def get_new_nested_module_key(
return get_tf_definition_object_from_module_dependency(key, nested_key, module_name)

def add_tfvars(self, module: Module, source: str) -> None:
if not self.external_variables_data:
if not self.external_vars:
return
for (var_name, default, path) in self.external_variables_data:
if ".tfvars" in path:
block = [{var_name: {"default": default}}]
module.add_blocks(BlockType.TF_VARIABLE, block, path, source)

def add_tfvars_with_source_dir(self, module: Module, source: str, source_dir: str) -> None:
if not self.external_variables_data:
return
for var_name, default, path in self.external_variables_data:
if ".tfvars" in path:
if Path(source_dir) in Path(path).parents:
block = [{var_name: {"default": default}}]
module.add_blocks(BlockType.TF_VARIABLE, block, path, source)
for load_dir, i in self.external_vars.items():
for name, (default, path) in i.items():
block = [{name: {'default': default, 'load_dir': load_dir}}]
module.add_blocks(BlockType.TF_VARIABLE, block, path, source)

def get_dirname(self, path: TFDefinitionKey) -> str:
file_path = path.file_path
Expand Down Expand Up @@ -567,31 +558,21 @@ def get_module_source(
os.path.join(os.path.dirname(remove_module_dependency_from_path(file_to_load)), source))
return source

def add_external_vars_from_data(self, data: dict[str, Any], file: str) -> None:
var_blocks = data.get("variable")
if var_blocks and isinstance(var_blocks, list):
for var_block in var_blocks:
if not isinstance(var_block, dict):
continue
for var_name, var_definition in var_block.items():
if not isinstance(var_definition, dict):
continue

default_value = var_definition.get("default")
if default_value is not None and isinstance(default_value, list):
self.external_variables_data.append((var_name, default_value[0], file))

def handle_variables(
self,
dir_contents: list[os.DirEntry[str]],
vars_files: None | list[str],
specified_vars: Mapping[str, str] | None,
) -> list[os.DirEntry[str]]:
) -> list[str]:
tf_files_to_load = []
hcl_tfvars: Optional[os.DirEntry[str]] = None
json_tfvars: Optional[os.DirEntry[str]] = None
auto_vars_files: List[os.DirEntry[str]] = []
explicit_var_files: List[os.DirEntry[str]] = []
hcl_tfvars: Optional[str] = None
json_tfvars: Optional[str] = None
auto_vars_files: List[str] = []
external_vars: dict[str, tuple[Any, str]] = {}

if not dir_contents:
return []

for file in dir_contents:
try:
if not file.is_file():
Expand All @@ -600,44 +581,56 @@ def handle_variables(
continue

if file.name == "terraform.tfvars.json":
json_tfvars = file
json_tfvars = file.path
elif file.name == "terraform.tfvars":
hcl_tfvars = file
hcl_tfvars = file.path
elif file.name.endswith(".auto.tfvars.json") or file.name.endswith(".auto.tfvars"):
auto_vars_files.append(file)
elif vars_files and file.path in vars_files:
explicit_var_files.append(file)
auto_vars_files.append(file.path)
elif file.name.endswith(".tf") or file.name.endswith('.hcl'): # TODO: add support for .tf.json
tf_files_to_load.append(file)
tf_files_to_load.append(file.path)

# Terraform Variable Definition Precedence
# 1. Environment vars
for key, value in self.env_vars.items():
if not key.startswith("TF_VAR_"):
continue
self.external_variables_data.append((key[7:], value, f"env:{key}"))
if key.startswith('TF_VAR_'):
external_vars[key[7:]] = (value, f'env:{key}')

# 2. terraform.tfvars
if hcl_tfvars: # terraform.tfvars
data = load_or_die_quietly(hcl_tfvars, self.out_parsing_errors, clean_definitions=False)
if data:
self.external_variables_data.extend([(k, safe_index(v, 0), hcl_tfvars.path) for k, v in data.items()])
for k, v in data.items():
external_vars[k] = (safe_index(v, 0), hcl_tfvars)

# 3. terraform.tfvars.json
if json_tfvars: # terraform.tfvars.json
data = load_or_die_quietly(json_tfvars, self.out_parsing_errors)
if data:
self.external_variables_data.extend([(k, v, json_tfvars.path) for k, v in data.items()])
for k, v in data.items():
external_vars[k] = (v, json_tfvars)

# 4. *.auto.tfvars / *.auto.tfvars.json
auto_var_files_to_data = self._load_files(auto_vars_files)
for var_file, data in sorted(auto_var_files_to_data, key=lambda x: x[0]):
if data:
self.external_variables_data.extend([(k, v, var_file) for k, v in data.items()])
for k, v in data.items():
external_vars[k] = (v, var_file)

explicit_var_files_to_data = self._load_files(explicit_var_files)
# it's possible that os.scandir returned the var files in a different order than they were specified
# 5. --var-file arguments
if vars_files:
for var_file, data in sorted(explicit_var_files_to_data, key=lambda x: vars_files.index(x[0])):
for var_file, data in self._load_files(vars_files):
if data:
self.external_variables_data.extend([(k, v, var_file) for k, v in data.items()])
for k, v in data.items():
external_vars[k] = (v, var_file)

if specified_vars: # specified
self.external_variables_data.extend([(k, v, "manual specification") for k, v in specified_vars.items()])
# Prevent specified vars from being overridden by tfvars
if specified_vars:
for k in specified_vars.keys():
if k in external_vars:
del external_vars[k]

if external_vars:
self.external_vars[os.path.dirname(dir_contents[0].path)] = external_vars
return tf_files_to_load

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def test_tfvars(self):
}
self.go("tfvars", vars_files=['other3.tfvars', 'other2.tfvars'], different_expected=different_expected)

def test_tfvars_outside_dir(self):
self.go('tfvars_outside_dir', vars_files=['../tfvars/other1.tfvars'])

def test_account_dirs_and_modules(self):
self.go("account_dirs_and_modules")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"{\"file_path\": \"main.tf\", \"tf_source_modules\": null}": {
"resource": [
{
"aws_s3_bucket": {
"my_bucket": {
"bucket": [
"xyz"
],
"__start_line__": 5,
"__end_line__": 7,
"__address__": "aws_s3_bucket.my_bucket"
}
}
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
variable "other_var_1" {
default = "abc"
}

resource "aws_s3_bucket" "my_bucket" {
bucket = "${var.other_var_1}"
}
9 changes: 1 addition & 8 deletions tests/terraform/parser/test_new_parser_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,7 @@ def test_load_local_module_new_parser(self):
assert out_definitions[main_key]['module'][1]['mod2']['__resolved__'] == [key_idx_1]

assert parser.external_modules_source_map == {(os.path.join(directory, 'module'), 'latest'): os.path.join(directory, 'module')}
assert parser.external_variables_data == [
('versioning', True, 'manual specification'),
('__start_line__', 1, 'manual specification'),
('__end_line__', 4, 'manual specification'),
('versioning', False, 'manual specification'),
('__start_line__', 6, 'manual specification'),
('__end_line__', 9, 'manual specification')
]
assert parser.external_vars == {}
assert parser.keys_to_remove == {TFDefinitionKey(file_path=module_path)}
assert parser._parsed_directories == {
directory,
Expand Down
Loading