diff --git a/checkov/terraform/checks/resource/aws/BedrockAgentFlowHasDescription.py b/checkov/terraform/checks/resource/aws/BedrockAgentFlowHasDescription.py new file mode 100644 index 0000000000..e580c178ca --- /dev/null +++ b/checkov/terraform/checks/resource/aws/BedrockAgentFlowHasDescription.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from typing import Any + +from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck + + +class BedrockAgentFlowHasDescription(BaseResourceCheck): + def __init__(self) -> None: + name = "Ensure Bedrock Agent Flow has a description" + id = "CKV_AWS_393" + supported_resources = ("aws_bedrockagent_flow",) + categories = (CheckCategories.GENERAL_SECURITY,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: + if conf.get("description"): + return CheckResult.PASSED + return CheckResult.FAILED + + def get_evaluated_keys(self) -> list[str]: + return ["description"] + + +check = BedrockAgentFlowHasDescription() diff --git a/checkov/terraform/checks/resource/aws/BedrockAgentFlowIsEncrypted.py b/checkov/terraform/checks/resource/aws/BedrockAgentFlowIsEncrypted.py new file mode 100644 index 0000000000..bac7b78a64 --- /dev/null +++ b/checkov/terraform/checks/resource/aws/BedrockAgentFlowIsEncrypted.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from typing import Any + +from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck + + +class BedrockAgentFlowIsEncrypted(BaseResourceCheck): + def __init__(self) -> None: + name = "Ensure Bedrock Agent Flow is encrypted with a KMS key" + id = "CKV_AWS_394" + supported_resources = ("aws_bedrockagent_flow",) + categories = (CheckCategories.ENCRYPTION,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: + if conf.get("customer_encryption_key_arn"): + return CheckResult.PASSED + return CheckResult.FAILED + + def get_evaluated_keys(self) -> list[str]: + return ["customer_encryption_key_arn"] + + +check = BedrockAgentFlowIsEncrypted() diff --git a/checkov/terraform/checks/resource/aws/FSXS3AccessPointAttachmentHasPolicy.py b/checkov/terraform/checks/resource/aws/FSXS3AccessPointAttachmentHasPolicy.py new file mode 100644 index 0000000000..75b135c012 --- /dev/null +++ b/checkov/terraform/checks/resource/aws/FSXS3AccessPointAttachmentHasPolicy.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from typing import Any + +from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck + + +class FSXS3AccessPointAttachmentHasPolicy(BaseResourceCheck): + def __init__(self) -> None: + name = "Ensure FSx for OpenZFS S3 Access Point Attachment has a policy" + id = "CKV_AWS_395" + supported_resources = ("aws_fsx_s3_access_point_attachment",) + categories = (CheckCategories.IAM,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: + if conf.get("s3_access_point") and conf.get("s3_access_point")[0].get("policy"): + return CheckResult.PASSED + return CheckResult.FAILED + + def get_evaluated_keys(self) -> list[str]: + return ["s3_access_point/[0]/policy"] + + +check = FSXS3AccessPointAttachmentHasPolicy() diff --git a/docs/GPT Documentation/Adding Checks.md b/docs/GPT Documentation/Adding Checks.md new file mode 100644 index 0000000000..d09e1df869 --- /dev/null +++ b/docs/GPT Documentation/Adding Checks.md @@ -0,0 +1,39 @@ +## Python Checks + +Each Check in checkov is responsible for defining identifying ONE violation in the code which we want to prevent. +We NEVER define a check which is trying to identify several issues. +Each check also validates ONLY ONE ENTITY at a time, and CANNOT check for connections between multiple entities. +Each check inherits from the base class of `BaseCheck` under the `checkov/common` directory. +Each check should add the following fields in the constructor: +- `name` - one line of accurate description of the check for a human to read and understand. +- `id` - the check's id, always in one of the formats: + 1. `CKV__` where `cloud-provider` would be `AWS`, `AZURE` or `GCP` based on cloud provider. We will choose this if the check is related to a specific cloud provider. + 2. `CKV__` where `framework` is a 3 letter representation fo the framework like `K8S`. We will choose this option when we do not have a specific cloud provider we use. +- `supported_resources` - tuple of resource types the check is intended to run against. +- `categories` - tuple of categories this check relate to, based on `CheckCategories` object under `checkov/common`. + +Note that we don't want to have 2 checks with the same id, and the `` should follow. So if the last check for K8S is number 75, the next check for k8s should be `CKV_K8S_76`. +The full index of available checks is maintained under [Policy Index](../5.Policy%20Index/). + +The main function which defines the check is `scan_entity_conf`, but this can be also overrided in specific frameworks to allow easier implementation, with some examples are `scan_resource_conf` in `terraform` design for scanning `resource` objects in terraform, and `get_inspected_key` which is implemented in various frameworks. +Main notes: +1. `scan_entity_conf` - getting as input a dict representation of the entity we want to scan with all attributes defined on it after the graph building. Uses the dict to understand if the entity is defined correctly. If we use it we should also implement `get_evaluated_keys` to specify the specific keys we tried to check, or we can override the `self.evaluated_keys` during the implementation of `scan_entity_conf`. +2. `get_inspected_key` - as opposed to `scan_entity_conf`, this function gets the same input but just returns the string which represents the jsonpath of the key it searches. If found, the policy passes and the `evaluated_keys` field is automatically assignes it. +NOTE - we shouldn't implement both `get_inspected_key` and `scan_entity_conf` together, only one of them. + +## Graph Checks +Graph checks relate to checks which REQUIRE CONNECTIONS BETWEEN MULTIPLE ENTITIES. +For example, if we define a service account with certain write permissions in kubernetes, by itself it's not a violation. But if we use it in a k8s resource which should have minimal permissions, it might be an issue. +Graph checks are defined in `yaml` files instead of `python` files and use the BQL syntax which is defined under the [Yaml Custom Policies documentation](../3.Custom%20Policies/YAML%20Custom%20Policies.md). +The graph checks are usually written under the `checks` directory of the relevant IAC framework under a directory called `graph` or `graph_checks`. +All graph checks's ids start with `CKV2_` (as opposed to `CKV_` for python checks). +We define a connection for graph checks between 2 entities by the existence of an edge between the 2 objects which represent those resources. +The implementation of all operators which are available for graph checks are under the directory `checkov/common/graph/checks_infra/solvers`. + +## Adding Tests +Each check added to `checkov` should have a test with a passing and a failing example to make sure the check is both valid and stays updated over time. +The tests for a check of framework `X` should be located under the same folder structure as the check location but under the `tests` directory. For example, the tests for the check under `checkov/terraform/checks/resource/aws/check.py` should be under `tests/terraform/checks/resource/aws/test_check.py`. +Each such test is expected to have: +- An IAC file (or multiple files under the same directory) of the same framework which contains examples of both passing and failing resources. If there are multiple use-cases to check, then we expect an example for each of them. The resources names should contain both a description of the example and wether they are a passing or failing resource. For example - `passing_example_1_with_variable_rendering`. It's also possible to separate the examples to files of failing/passing examples and remove the prefix of `passing/failing` from the names. +- If an `expected.yaml` file exists in the same folder as the examples, it should contain all passing enities' ids under the `pass` group and all failing ones under the `fail`. In that case, the test should parse this file to decide which entities should pass or fail and check it in the test. +- The test itself should try to run the matching framework's `Runner` only on the tested check and validate the results against the expected results from `expected.yaml`. An example to how we can do it is using `runner.run(root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]))` when `runner` is the relevant `Runner` object for the framework, and the `check` is the relevant instance of the check that we want to test. \ No newline at end of file diff --git a/docs/GPT Documentation/Checkov Structure.md b/docs/GPT Documentation/Checkov Structure.md new file mode 100644 index 0000000000..0d9e9880d6 --- /dev/null +++ b/docs/GPT Documentation/Checkov Structure.md @@ -0,0 +1,7 @@ +Checkov is separated to different directories based on IAC FRAMEWORKS it supports. +This is shown under the "checkov" directory, where the "common" directory is responsible for shared code across all frameworks, and all other directories (like "terraform" or "cloudformation") describe the code related to each framework. +The main objects defined per framework are: +1. `Parser` - responsible for parsing the different code files related to the specific framework (for example `.tf` files in `terraform`) to python objects. +2. `Graph` - A graph representation being built from the python objects the `Parser` created. Each object being created is a represented as a graph vertex using the corresponding `Block` class implementation for the framework. Edges in the graph are being created based on the framework internal rules, for example in `terraform` we can connect 2 resources by referencing their `terraform-id` (`.`) in other resources. Each framework has other rules and not all of the rules are implemented in checkov yet. The code for the `Graph` usually sits under `checkov\\graph`. +3. `Checks` - all of the checks which were defined for the framework. +4. `Runner` - responsible for running the `Parser` and then build a `Graph` based on the result of the parser. After this process, it also scans the graph using the defined `Checks` and creates a report of all of the violations we have found. This is the main class of each framework and defines how it is being run. diff --git a/tests/terraform/checks/resource/aws/example_BedrockAgentFlowHasDescription.tf b/tests/terraform/checks/resource/aws/example_BedrockAgentFlowHasDescription.tf new file mode 100644 index 0000000000..3d276eb5a5 --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_BedrockAgentFlowHasDescription.tf @@ -0,0 +1,10 @@ +resource "aws_bedrockagent_flow" "pass" { + name = "example" + execution_role_arn = "arn:aws:iam::123456789012:role/service-role/AmazonBedrockExecutionRoleForFlows" + description = "This is an example flow." +} + +resource "aws_bedrockagent_flow" "fail" { + name = "example" + execution_role_arn = "arn:aws:iam::123456789012:role/service-role/AmazonBedrockExecutionRoleForFlows" +} diff --git a/tests/terraform/checks/resource/aws/example_BedrockAgentFlowIsEncrypted.tf b/tests/terraform/checks/resource/aws/example_BedrockAgentFlowIsEncrypted.tf new file mode 100644 index 0000000000..9160d10a84 --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_BedrockAgentFlowIsEncrypted.tf @@ -0,0 +1,10 @@ +resource "aws_bedrockagent_flow" "pass" { + name = "example" + execution_role_arn = "arn:aws:iam::123456789012:role/service-role/AmazonBedrockExecutionRoleForFlows" + customer_encryption_key_arn = "arn:aws:kms:us-east-1:123456789012:key/aea0cafc-355a-40a3-84f8-d52855ed333e" +} + +resource "aws_bedrockagent_flow" "fail" { + name = "example" + execution_role_arn = "arn:aws:iam::123456789012:role/service-role/AmazonBedrockExecutionRoleForFlows" +} diff --git a/tests/terraform/checks/resource/aws/example_FSXS3AccessPointAttachmentHasPolicy.tf b/tests/terraform/checks/resource/aws/example_FSXS3AccessPointAttachmentHasPolicy.tf new file mode 100644 index 0000000000..41c91959ba --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_FSXS3AccessPointAttachmentHasPolicy.tf @@ -0,0 +1,51 @@ +resource "aws_fsx_s3_access_point_attachment" "pass" { + name = "example-attachment" + type = "OPENZFS" + + openzfs_configuration { + volume_id = "fsvol-1234567890" + + file_system_identity { + type = "POSIX" + + posix_user { + uid = 1001 + gid = 1001 + } + } + } + + s3_access_point { + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Principal = { + AWS = "*" + } + Action = "s3:GetObject" + Resource = "arn:aws:s3:::my-bucket/*" + } + ] + }) + } +} + +resource "aws_fsx_s3_access_point_attachment" "fail" { + name = "example-attachment" + type = "OPENZFS" + + openzfs_configuration { + volume_id = "fsvol-1234567890" + + file_system_identity { + type = "POSIX" + + posix_user { + uid = 1001 + gid = 1001 + } + } + } +} diff --git a/tests/terraform/checks/resource/aws/test_BedrockAgentFlowHasDescription.py b/tests/terraform/checks/resource/aws/test_BedrockAgentFlowHasDescription.py new file mode 100644 index 0000000000..d9595f045a --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_BedrockAgentFlowHasDescription.py @@ -0,0 +1,43 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.runner import Runner + + +class TestBedrockAgentFlowHasDescription(unittest.TestCase): + def test(self): + # given + test_files_dir = os.path.dirname(os.path.realpath(__file__)) + + # when + report = Runner().run( + root_folder=None, + files=[os.path.join(test_files_dir, "example_BedrockAgentFlowHasDescription.tf")], + runner_filter=RunnerFilter(checks=["CKV_AWS_393"]), + ) + + # then + summary = report.get_summary() + + passing_resources = { + "aws_bedrockagent_flow.pass", + } + failing_resources = { + "aws_bedrockagent_flow.fail", + } + + passed_check_resources = {c.resource for c in report.passed_checks} + failed_check_resources = {c.resource for c in report.failed_checks} + + self.assertEqual(summary["passed"], 1) + self.assertEqual(summary["failed"], 1) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/terraform/checks/resource/aws/test_BedrockAgentFlowIsEncrypted.py b/tests/terraform/checks/resource/aws/test_BedrockAgentFlowIsEncrypted.py new file mode 100644 index 0000000000..d456f4d3d4 --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_BedrockAgentFlowIsEncrypted.py @@ -0,0 +1,43 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.runner import Runner + + +class TestBedrockAgentFlowIsEncrypted(unittest.TestCase): + def test(self): + # given + test_files_dir = os.path.dirname(os.path.realpath(__file__)) + + # when + report = Runner().run( + root_folder=None, + files=[os.path.join(test_files_dir, "example_BedrockAgentFlowIsEncrypted.tf")], + runner_filter=RunnerFilter(checks=["CKV_AWS_394"]), + ) + + # then + summary = report.get_summary() + + passing_resources = { + "aws_bedrockagent_flow.pass", + } + failing_resources = { + "aws_bedrockagent_flow.fail", + } + + passed_check_resources = {c.resource for c in report.passed_checks} + failed_check_resources = {c.resource for c in report.failed_checks} + + self.assertEqual(summary["passed"], 1) + self.assertEqual(summary["failed"], 1) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/terraform/checks/resource/aws/test_FSXS3AccessPointAttachmentHasPolicy.py b/tests/terraform/checks/resource/aws/test_FSXS3AccessPointAttachmentHasPolicy.py new file mode 100644 index 0000000000..79ae51650c --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_FSXS3AccessPointAttachmentHasPolicy.py @@ -0,0 +1,43 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.runner import Runner + + +class TestFSXS3AccessPointAttachmentHasPolicy(unittest.TestCase): + def test(self): + # given + test_files_dir = os.path.dirname(os.path.realpath(__file__)) + + # when + report = Runner().run( + root_folder=None, + files=[os.path.join(test_files_dir, "example_FSXS3AccessPointAttachmentHasPolicy.tf")], + runner_filter=RunnerFilter(checks=["CKV_AWS_395"]), + ) + + # then + summary = report.get_summary() + + passing_resources = { + "aws_fsx_s3_access_point_attachment.pass", + } + failing_resources = { + "aws_fsx_s3_access_point_attachment.fail", + } + + passed_check_resources = {c.resource for c in report.passed_checks} + failed_check_resources = {c.resource for c in report.failed_checks} + + self.assertEqual(summary["passed"], 1) + self.assertEqual(summary["failed"], 1) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == "__main__": + unittest.main()