From b8f817059c026e474553b70a086c2e8c47f9a404 Mon Sep 17 00:00:00 2001 From: Harold Parra Date: Mon, 1 Sep 2025 11:32:06 -0500 Subject: [PATCH 1/6] feat(eda-kaizen): kafka strimzi ep y da --- docs/docs/tasks/4-generate-driven-adapter.md | 1 + docs/docs/tasks/5-generate-entry-point.md | 13 +++++++ sh_generate_project.sh | 4 +- .../DrivenAdapterSecretsKafkaStrimzi.java | 29 ++++++++++++++ .../entrypoints/EntryPointKafkaStrimzi.java | 39 +++++++++++++++++++ .../architecture/ArchitectureValidation.java | 2 +- .../task/GenerateDrivenAdapterTask.java | 7 +++- .../task/GenerateEntryPointTask.java | 7 +++- ...GenerateDrivenAdapterTaskReactiveTest.java | 13 +++++++ .../GenerateEntryPointTaskReactiveTest.java | 14 +++++++ 10 files changed, 124 insertions(+), 5 deletions(-) create mode 100644 src/main/java/co/com/bancolombia/factory/adapters/DrivenAdapterSecretsKafkaStrimzi.java create mode 100644 src/main/java/co/com/bancolombia/factory/entrypoints/EntryPointKafkaStrimzi.java diff --git a/docs/docs/tasks/4-generate-driven-adapter.md b/docs/docs/tasks/4-generate-driven-adapter.md index c8aeb1f8..c0d9d9a3 100644 --- a/docs/docs/tasks/4-generate-driven-adapter.md +++ b/docs/docs/tasks/4-generate-driven-adapter.md @@ -30,6 +30,7 @@ Whether you'll use generic one also parameter `name` is required. | rsocket | RSocket Requester | | | s3 | AWS Simple Storage Service | | | secrets | Secrets Manager Bancolombia | --secrets-backend [backend]
Valid options for backend are "aws_secrets_manager" (default) or "vault". | +| secretskafkastrimzi | Secrets para Kafka Strimzi | --secretName [name] | | sqs | SQS message sender | | _**This task will generate something like that:**_ diff --git a/docs/docs/tasks/5-generate-entry-point.md b/docs/docs/tasks/5-generate-entry-point.md index d4d3d190..0988cbec 100644 --- a/docs/docs/tasks/5-generate-entry-point.md +++ b/docs/docs/tasks/5-generate-entry-point.md @@ -24,10 +24,23 @@ Whether you'll use generic one also parameter `name` is required. | rsocket | Rsocket Controller Entry Point | | | sqs | SQS Listener | | | webflux | API REST (Spring Boot Starter WebFlux) | --router [true, false] default true --authorization [true,false] --from-swagger swagger.yaml --versioning [HEADER, PATH,NONE] default NONE | +| kafka-strimzi-consumer | Kafka Strimzi Consumer Entry Point | --name [name] --topicConsumer [topicName] (opcional, por defecto 'test-with-registries') | Additionally, if you'll use a restmvc, you can specify the web server on which the application will run. By default, undertow. +## Ejemplo de uso para Kafka Strimzi Consumer +```shell +gradle generateEntryPoint --type=kafkastrimzi +gradle gep --type=kafkastrimzi +``` +```shell +gradle generateEntryPoint --type=kafkastrimzi --name=myConsumer --topicConsumer=myTopic +gradle gep --type=kafka-strimzi-consumer --name=myConsumer --topicConsumer=myTopic +``` + +This will generate a specialized entry point for consuming Kafka messages using Strimzi, with custom parameters. + ```shell gradle generateEntryPoint --type=restmvc --server=[serverOption] gradle gep --type=restmvc --server=[serverOption] diff --git a/sh_generate_project.sh b/sh_generate_project.sh index 0bcb9f0a..f8347134 100755 --- a/sh_generate_project.sh +++ b/sh_generate_project.sh @@ -27,12 +27,12 @@ gradle wrapper if [ $TYPE == "reactive" ] then - for adapter in "asynceventbus" "binstash" "cognitotokenprovider" "dynamodb" "kms" "mongodb" "mq" "r2dbc" "redis" "restconsumer" "rsocket" "s3" "secrets" "sqs" + for adapter in "asynceventbus" "binstash" "cognitotokenprovider" "dynamodb" "kms" "mongodb" "mq" "r2dbc" "redis" "restconsumer" "rsocket" "s3" "secrets" "sqs" "secretskafkastrimzi" do ./gradlew gda --type $adapter done - for entry in "asynceventhandler" "graphql" "kafka" "mq" "rsocket" "sqs" "webflux" + for entry in "asynceventhandler" "graphql" "kafka" "mq" "rsocket" "sqs" "webflux" "kafkastrimzi" do ./gradlew gep --type $entry done diff --git a/src/main/java/co/com/bancolombia/factory/adapters/DrivenAdapterSecretsKafkaStrimzi.java b/src/main/java/co/com/bancolombia/factory/adapters/DrivenAdapterSecretsKafkaStrimzi.java new file mode 100644 index 00000000..bc92f4aa --- /dev/null +++ b/src/main/java/co/com/bancolombia/factory/adapters/DrivenAdapterSecretsKafkaStrimzi.java @@ -0,0 +1,29 @@ +package co.com.bancolombia.factory.adapters; + +import static co.com.bancolombia.Constants.APP_SERVICE; +import static co.com.bancolombia.utils.Utils.buildImplementationFromProject; + +import co.com.bancolombia.exceptions.CleanException; +import co.com.bancolombia.factory.ModuleBuilder; +import co.com.bancolombia.factory.ModuleFactory; +import java.io.IOException; + +public class DrivenAdapterSecretsKafkaStrimzi implements ModuleFactory { + + @Override + public void buildModule(ModuleBuilder builder) throws IOException, CleanException { + String secretName = builder.getStringParam("secretName"); + if (secretName == null || secretName.isEmpty()) { + secretName = "strimzi-kafka-test"; + } + builder.addParam("secretName", secretName); + builder.setupFromTemplate("driven-adapter/secrets-kafka-strimzi"); + builder.appendToSettings("secrets", "infrastructure/driven-adapters"); + builder + .appendToProperties("aws") + .put("region", "us-east-1") + .put("strimzi-kafka-secretName", secretName); + String dependency = buildImplementationFromProject(":secrets"); + builder.appendDependencyToModule(APP_SERVICE, dependency); + } +} diff --git a/src/main/java/co/com/bancolombia/factory/entrypoints/EntryPointKafkaStrimzi.java b/src/main/java/co/com/bancolombia/factory/entrypoints/EntryPointKafkaStrimzi.java new file mode 100644 index 00000000..5443b4a2 --- /dev/null +++ b/src/main/java/co/com/bancolombia/factory/entrypoints/EntryPointKafkaStrimzi.java @@ -0,0 +1,39 @@ +package co.com.bancolombia.factory.entrypoints; + +import static co.com.bancolombia.Constants.APP_SERVICE; +import static co.com.bancolombia.utils.Utils.buildImplementationFromProject; + +import co.com.bancolombia.exceptions.CleanException; +import co.com.bancolombia.factory.ModuleBuilder; +import co.com.bancolombia.factory.ModuleFactory; +import co.com.bancolombia.factory.validations.ReactiveTypeValidation; +import java.io.IOException; + +public class EntryPointKafkaStrimzi implements ModuleFactory { + + @Override + public void buildModule(ModuleBuilder builder) throws IOException, CleanException { + builder.runValidations(ReactiveTypeValidation.class); + String name = builder.getStringParam("name"); + if (name == null || name.isEmpty()) { + name = "kafkaStrimzi"; + } + String topicConsumer = builder.getStringParam("topicConsumer"); + if (topicConsumer == null || topicConsumer.isEmpty()) { + topicConsumer = "test-with-registries"; + } + builder.addParam("name", name); + + builder.addParam("topicConsumer", topicConsumer); + builder.setupFromTemplate("entry-point/kafka-strimzi-consumer"); + builder.appendToSettings("kafka-consumer", "infrastructure/entry-points"); + String dependency = buildImplementationFromProject(":kafka-consumer"); + builder.appendDependencyToModule(APP_SERVICE, dependency); + + builder + .appendToProperties("spring.kafka.consumer") + .put("bootstrap-servers", "localhost:9092") + .put("group-id", builder.getProject().getName()); + builder.appendToProperties("adapters.kafka.consumer").put("topic", topicConsumer); + } +} diff --git a/src/main/java/co/com/bancolombia/factory/validations/architecture/ArchitectureValidation.java b/src/main/java/co/com/bancolombia/factory/validations/architecture/ArchitectureValidation.java index 79a6713b..fb445914 100644 --- a/src/main/java/co/com/bancolombia/factory/validations/architecture/ArchitectureValidation.java +++ b/src/main/java/co/com/bancolombia/factory/validations/architecture/ArchitectureValidation.java @@ -22,7 +22,7 @@ public final class ArchitectureValidation { private static final String FORBIDDEN_DOMAIN_CLASS_NAMES_PROP = "arch.unit.forbiddenDomainClassNames"; private static final String FORBIDDEN_DOMAIN_NAMES = - "rabbit,sqs,sns,ibm,dynamo,aws,mysql,postgres,redis,mongo,rsocket,r2dbc,http,kms,s3,graphql,kafka"; + "rabbit,sqs,sns,ibm,dynamo,aws,mysql,postgres,redis,mongo,rsocket,r2dbc,http,kms,s3,graphql,kafka,kafkastrimzi"; public static void inject(ModuleBuilder builder, Logger logger, Set files) { if (!FileUtils.readBooleanProperty(SKIP_PROP)) { diff --git a/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java b/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java index 10cb3ced..0b331873 100644 --- a/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java +++ b/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java @@ -63,7 +63,7 @@ public void setTech(String tech) { @OptionValues("tech") public List getTechOptions() { - return Arrays.asList("kafka", "rabbitmq", "kafka,rabbitmq"); + return Arrays.asList("kafka", "rabbitmq", "kafka,rabbitmq", "secretskafkastrimzi"); } @Option(option = "cache-mode", description = "Set value for cache type") @@ -81,6 +81,11 @@ public void setSecretsBackend(DrivenAdapterSecrets.SecretsBackend secretsBackend this.secretsBackend = secretsBackend; } + @Option(option = "secretName", description = "Set the name of the secret in AWS Secrets Manager") + public void setSecretName(String secretName) { + builder.addParam("secretName", secretName); + } + @Override protected void prepareParams() { builder.addParam("task-param-cache-mode", cacheMode); diff --git a/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java b/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java index 9ddf7951..c0e4ebd4 100644 --- a/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java +++ b/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java @@ -69,6 +69,11 @@ public void setEda(BooleanOption eda) { this.eda = eda; } + @Option(option = "topicConsumer", description = "Set the topic for the Kafka consumer") + public void setTopicConsumer(String topicConsumer) { + builder.addParam("topicConsumer", topicConsumer); + } + @OptionValues("eda") public List getEdaOptions() { return Arrays.asList(BooleanOption.values()); @@ -81,7 +86,7 @@ public void setTech(String tech) { @OptionValues("tech") public List getTechOptions() { - return Arrays.asList("kafka", "rabbitmq", "kafka,rabbitmq"); + return Arrays.asList("kafka", "rabbitmq", "kafka,rabbitmq", "kafkastrimzi"); } @OptionValues("server") diff --git a/src/test/java/co/com/bancolombia/task/GenerateDrivenAdapterTaskReactiveTest.java b/src/test/java/co/com/bancolombia/task/GenerateDrivenAdapterTaskReactiveTest.java index 982d82a5..4062a77e 100644 --- a/src/test/java/co/com/bancolombia/task/GenerateDrivenAdapterTaskReactiveTest.java +++ b/src/test/java/co/com/bancolombia/task/GenerateDrivenAdapterTaskReactiveTest.java @@ -299,4 +299,17 @@ void generateDrivenAdapterCognitoTokenProvider() throws IOException, CleanExcept "src/main/java/co/com/bancolombia/cognito/config/CognitoTokenProviderConfig.java", "src/main/java/co/com/bancolombia/cognito/CognitoTokenProvider.java"); } + + @Test + void generateEntryPointReactiveKafkaStrimzi() throws IOException, CleanException { + // Arrange + task.setType("SECRETSKAFKASTRIMZI"); + // Act + task.execute(); + // Assert + assertFilesExistsInDir( + TEST_DIR + "/infrastructure/driven-adapters/secrets/", + "build.gradle", + "src/main/java/co/com/bancolombia/secrets/config/KafkaOauthSecretsConfig.java"); + } } diff --git a/src/test/java/co/com/bancolombia/task/GenerateEntryPointTaskReactiveTest.java b/src/test/java/co/com/bancolombia/task/GenerateEntryPointTaskReactiveTest.java index 7e59c87e..ee9e4cf0 100644 --- a/src/test/java/co/com/bancolombia/task/GenerateEntryPointTaskReactiveTest.java +++ b/src/test/java/co/com/bancolombia/task/GenerateEntryPointTaskReactiveTest.java @@ -273,4 +273,18 @@ void generateEntryPointReactiveWebWithNoneStrategy() throws IOException, CleanEx "src/main/java/co/com/bancolombia/api/RouterRest.java", "src/test/java/co/com/bancolombia/api/RouterRestTest.java"); } + + @Test + void generateEntryPointReactiveKafkaStrimzi() throws IOException, CleanException { + // Arrange + task.setType("KAFKASTRIMZI"); + // Act + task.execute(); + // Assert + assertFilesExistsInDir( + TEST_DIR + "/infrastructure/entry-points/kafka-consumer/", + "build.gradle", + "src/main/java/co/com/bancolombia/kafka/consumer/KafkaConsumer.java", + "src/main/java/co/com/bancolombia/kafka/consumer/config/KafkaConfig.java"); + } } From fef9158bca8038c8213ffe38a30f538e4819c37f Mon Sep 17 00:00:00 2001 From: Harold Parra Date: Mon, 1 Sep 2025 17:11:21 -0500 Subject: [PATCH 2/6] feat(eda-kaizen): kafka strimzi ep y da resources --- docs/docs/tasks/4-generate-driven-adapter.md | 1 - .../build.gradle.mustache | 6 ++ .../secrets-kafka-strimzi/definition.json | 9 ++ .../kafkaOauthSecretsConfig.java.mustache | 43 +++++++++ .../build.gradle.mustache | 13 +++ .../consumerCredentials.java.mustache | 15 +++ .../kafka-strimzi-consumer/definition.json | 12 +++ .../kafka-config.java.mustache | 93 +++++++++++++++++++ .../kafka-consumer.java.mustache | 39 ++++++++ 9 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache create mode 100644 src/main/resources/driven-adapter/secrets-kafka-strimzi/definition.json create mode 100644 src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache create mode 100644 src/main/resources/entry-point/kafka-strimzi-consumer/build.gradle.mustache create mode 100644 src/main/resources/entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache create mode 100644 src/main/resources/entry-point/kafka-strimzi-consumer/definition.json create mode 100644 src/main/resources/entry-point/kafka-strimzi-consumer/kafka-config.java.mustache create mode 100644 src/main/resources/entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache diff --git a/docs/docs/tasks/4-generate-driven-adapter.md b/docs/docs/tasks/4-generate-driven-adapter.md index c0d9d9a3..c8aeb1f8 100644 --- a/docs/docs/tasks/4-generate-driven-adapter.md +++ b/docs/docs/tasks/4-generate-driven-adapter.md @@ -30,7 +30,6 @@ Whether you'll use generic one also parameter `name` is required. | rsocket | RSocket Requester | | | s3 | AWS Simple Storage Service | | | secrets | Secrets Manager Bancolombia | --secrets-backend [backend]
Valid options for backend are "aws_secrets_manager" (default) or "vault". | -| secretskafkastrimzi | Secrets para Kafka Strimzi | --secretName [name] | | sqs | SQS message sender | | _**This task will generate something like that:**_ diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache new file mode 100644 index 00000000..05c9757d --- /dev/null +++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache @@ -0,0 +1,6 @@ +dependencies { + implementation project(':model') + implementation 'org.springframework:spring-context' + implementation 'com.github.bancolombia:aws-secrets-manager-async:4.4.31' + implementation 'software.amazon.awssdk:sts:2.20.0' +} diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/definition.json b/src/main/resources/driven-adapter/secrets-kafka-strimzi/definition.json new file mode 100644 index 00000000..bcbc4bac --- /dev/null +++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/definition.json @@ -0,0 +1,9 @@ +{ + "folders": [ + "infrastructure/driven-adapters/secrets/src/main/java/{{packagePath}}/secrets/config" + ], + "files": { + "driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache": "infrastructure/driven-adapters/secrets/src/main/java/{{packagePath}}/secrets/config/KafkaOauthSecretsConfig.java", + "driven-adapter/secrets-kafka-strimzi/build.gradle.mustache": "infrastructure/driven-adapters/secrets/build.gradle" + } +} diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache b/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache new file mode 100644 index 00000000..f37ef3d6 --- /dev/null +++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache @@ -0,0 +1,43 @@ +package {{package}}.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; +import org.springframework.core.annotation.Order; + +import co.com.bancolombia.model.topiccredentials.ConsumerCredentials; +import co.com.bancolombia.secretsmanager.api.GenericManagerAsync; +import co.com.bancolombia.secretsmanager.api.exceptions.SecretException; +import co.com.bancolombia.secretsmanager.config.AWSSecretsManagerConfig; +import co.com.bancolombia.secretsmanager.connector.AWSSecretManagerConnectorAsync; +import software.amazon.awssdk.regions.Region; + +@Configuration +class KafkaOauthSecretsConfig { + + @Bean("getSecret") + @Order(1) + GenericManagerAsync getSecretManager(@Value("${aws.region}") String region) { + return new AWSSecretManagerConnectorAsync(buildSecretsManagerConfig(region)); + } + + private AWSSecretsManagerConfig buildSecretsManagerConfig(String region) { + return AWSSecretsManagerConfig.builder() + .region(Region.of(region)) + .cacheSize({{cacheSize}}) + .cacheSeconds({{cacheSeconds}}) + .build(); + } + + @Bean + @DependsOn("getSecret") + ConsumerCredentials getStrimziKafkaCredentials( + @Value("${aws.region}") String region, + @Value("${aws.strimzi-kafka-secretName}") String secretName + ) throws SecretException { + return getSecretManager(region) + .getSecret(secretName, ConsumerCredentials.class) + .block(); + } +} diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/build.gradle.mustache b/src/main/resources/entry-point/kafka-strimzi-consumer/build.gradle.mustache new file mode 100644 index 00000000..535f47ef --- /dev/null +++ b/src/main/resources/entry-point/kafka-strimzi-consumer/build.gradle.mustache @@ -0,0 +1,13 @@ +dependencies { + implementation project(':model') + implementation project(':usecase') + implementation 'io.projectreactor.kafka:reactor-kafka' + implementation 'org.springframework.kafka:spring-kafka' + implementation 'org.springframework.boot:spring-boot-autoconfigure' + implementation 'org.springframework.boot:spring-boot' + implementation 'org.apache.logging.log4j:log4j-api' + + implementation 'io.apicurio:apicurio-registry-serdes-jsonschema-serde:2.5.11.Final' + implementation 'io.apicurio:apicurio-registry-rest-client:1.3.2.Final' + implementation 'io.apicurio:apicurio-registry:3.0.6' +} diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache b/src/main/resources/entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache new file mode 100644 index 00000000..7771a174 --- /dev/null +++ b/src/main/resources/entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache @@ -0,0 +1,15 @@ +package {{package}}.model.topiccredentials; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder(toBuilder = true) +public class ConsumerCredentials { + private String clientId; + private String clientSecret; + private String userPoolId; + private String tokenEndpointUri; + private String consumerScope; + private String grantType; +} diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/definition.json b/src/main/resources/entry-point/kafka-strimzi-consumer/definition.json new file mode 100644 index 00000000..2f749a9e --- /dev/null +++ b/src/main/resources/entry-point/kafka-strimzi-consumer/definition.json @@ -0,0 +1,12 @@ +{ + "folders": [ + "infrastructure/entry-points/kafka-consumer/src/test/java/{{packagePath}}/kafka/consumer/config", + "domain/model/src/main/java/{{packagePath}}/model/topiccredentials" + ], + "files": { + "entry-point/kafka-strimzi-consumer/build.gradle.mustache": "infrastructure/entry-points/kafka-consumer/build.gradle", + "entry-point/kafka-strimzi-consumer/kafka-config.java.mustache": "infrastructure/entry-points/kafka-consumer/src/main/java/{{packagePath}}/kafka/consumer/config/KafkaConfig.java", + "entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache": "infrastructure/entry-points/kafka-consumer/src/main/java/{{packagePath}}/kafka/consumer/KafkaConsumer.java", + "entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache":"domain/model/src/main/java/{{packagePath}}/model/topiccredentials/ConsumerCredentials.java" + } +} diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-config.java.mustache b/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-config.java.mustache new file mode 100644 index 00000000..818a185e --- /dev/null +++ b/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-config.java.mustache @@ -0,0 +1,93 @@ +package {{package}}.kafka.consumer.config; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.ssl.SslBundles; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate; + +import {{package}}.model.topiccredentials.ConsumerCredentials; +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer; +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; +import reactor.kafka.receiver.ReceiverOptions; + +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Map; + +@Configuration +@AllArgsConstructor +@Log4j2 +public class KafkaConfig { + + private final ConsumerCredentials consumerCredentials; + + @Bean + ReceiverOptions kafkaReceiverOptions( + @Value(value = "${adapters.kafka.consumer.topic}") String topic, + KafkaProperties kafkaProperties, + SslBundles sslBundles) throws UnknownHostException { + + ReceiverOptions basicReceiverOptions = + ReceiverOptions.create(buildJaasConfig(kafkaProperties, sslBundles)); + + return basicReceiverOptions.subscription(Collections.singletonList(topic)); + } + + @Bean + ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate( + ReceiverOptions kafkaReceiverOptions) { + return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions); + } + + public Map buildJaasConfig(KafkaProperties kafkaProperties, SslBundles sslBundles) { + Map props = kafkaProperties.buildConsumerProperties(sslBundles); + String jaasConfig = String.format( + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + + "oauth.client.id=\"%s\" " + + "oauth.client.secret=\"%s\" " + + "oauth.token.endpoint.uri=\"%s\" " + + "oauth.grant.type=\"%s\";", + consumerCredentials.getClientId(), + consumerCredentials.getClientSecret(), + consumerCredentials.getTokenEndpointUri(), + consumerCredentials.getGrantType() + ); + + props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "ubicacion certificado ssl"); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "sslTruststorePassword"); + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "sslTruststoreType"); + + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "bootstrapServers"); + props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"); + + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); + + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class); + + props.put(SerdeConfig.REGISTRY_URL, "registryUrl"); + props.put(SerdeConfig.EXPLICIT_ARTIFACT_ID, "artifactId"); + props.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "artifactGroupId"); + props.put(SerdeConfig.EXPLICIT_ARTIFACT_VERSION, "artifactVersion"); + + props.put(SerdeConfig.AUTH_CLIENT_SECRET, consumerCredentials.getClientSecret()); + props.put(SerdeConfig.AUTH_CLIENT_ID, consumerCredentials.getClientId()); + props.put(SerdeConfig.AUTH_TOKEN_ENDPOINT, consumerCredentials.getTokenEndpointUri()); + props.put(SerdeConfig.VALIDATION_ENABLED, true); + + return props; + } +} diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache b/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache new file mode 100644 index 00000000..572c8bab --- /dev/null +++ b/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache @@ -0,0 +1,39 @@ +package {{package}}.kafka.consumer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +@Component +@Log4j2 +@RequiredArgsConstructor +public class KafkaConsumer { + private final ReactiveKafkaConsumerTemplate kafkaConsumer; + //private final SomeUseCase useCase; + + @EventListener(ApplicationStartedEvent.class) + public Flux listenMessages() { + return kafkaConsumer + .receiveAutoAck() + .publishOn(Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + "kafka")) + .flatMap(record -> { + // map record and process + // return useCase.something(record.value()) + log.info("Record received {}", record.value()); + return Mono.empty(); + }) + .doOnError(error -> log.error("Error processing kafka record", error)) + .retry() + .repeat(); + } +} From 91108e92abcebf3169c7fac284c53a90b5dd84d0 Mon Sep 17 00:00:00 2001 From: Harold Parra Date: Tue, 2 Sep 2025 14:14:24 -0500 Subject: [PATCH 3/6] feat(eda-kaizen): values as default --- .../kafkaOauthSecretsConfig.java.mustache | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache b/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache index f37ef3d6..693a385e 100644 --- a/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache +++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache @@ -25,8 +25,8 @@ class KafkaOauthSecretsConfig { private AWSSecretsManagerConfig buildSecretsManagerConfig(String region) { return AWSSecretsManagerConfig.builder() .region(Region.of(region)) - .cacheSize({{cacheSize}}) - .cacheSeconds({{cacheSeconds}}) + .cacheSize(100) + .cacheSeconds(100) .build(); } From 3ec7402d5c5636b119eeddb1c563707a209f6e87 Mon Sep 17 00:00:00 2001 From: Harold Parra Date: Tue, 2 Sep 2025 15:17:45 -0500 Subject: [PATCH 4/6] feat(eda-kaizen): comments corrections --- docs/docs/tasks/5-generate-entry-point.md | 4 ++-- .../co/com/bancolombia/task/GenerateDrivenAdapterTask.java | 2 +- .../java/co/com/bancolombia/task/GenerateEntryPointTask.java | 2 +- .../secrets-kafka-strimzi/build.gradle.mustache | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/docs/tasks/5-generate-entry-point.md b/docs/docs/tasks/5-generate-entry-point.md index 0988cbec..3faa51aa 100644 --- a/docs/docs/tasks/5-generate-entry-point.md +++ b/docs/docs/tasks/5-generate-entry-point.md @@ -24,7 +24,7 @@ Whether you'll use generic one also parameter `name` is required. | rsocket | Rsocket Controller Entry Point | | | sqs | SQS Listener | | | webflux | API REST (Spring Boot Starter WebFlux) | --router [true, false] default true --authorization [true,false] --from-swagger swagger.yaml --versioning [HEADER, PATH,NONE] default NONE | -| kafka-strimzi-consumer | Kafka Strimzi Consumer Entry Point | --name [name] --topicConsumer [topicName] (opcional, por defecto 'test-with-registries') | +| kafkastrimzi | Kafka Strimzi Consumer Entry Point | --name [name] --topicConsumer [topicName] (optional, for default 'test-with-registries') | Additionally, if you'll use a restmvc, you can specify the web server on which the application will run. By default, undertow. @@ -36,7 +36,7 @@ gradle gep --type=kafkastrimzi ``` ```shell gradle generateEntryPoint --type=kafkastrimzi --name=myConsumer --topicConsumer=myTopic -gradle gep --type=kafka-strimzi-consumer --name=myConsumer --topicConsumer=myTopic +gradle gep --type=kafkastrimzi --name=myConsumer --topicConsumer=myTopic ``` This will generate a specialized entry point for consuming Kafka messages using Strimzi, with custom parameters. diff --git a/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java b/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java index 0b331873..18221ee7 100644 --- a/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java +++ b/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java @@ -63,7 +63,7 @@ public void setTech(String tech) { @OptionValues("tech") public List getTechOptions() { - return Arrays.asList("kafka", "rabbitmq", "kafka,rabbitmq", "secretskafkastrimzi"); + return Arrays.asList("kafka", "rabbitmq", "kafka,rabbitmq"); } @Option(option = "cache-mode", description = "Set value for cache type") diff --git a/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java b/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java index c0e4ebd4..8af0487b 100644 --- a/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java +++ b/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java @@ -86,7 +86,7 @@ public void setTech(String tech) { @OptionValues("tech") public List getTechOptions() { - return Arrays.asList("kafka", "rabbitmq", "kafka,rabbitmq", "kafkastrimzi"); + return Arrays.asList("kafka", "rabbitmq", "kafka,rabbitmq"); } @OptionValues("server") diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache index 05c9757d..851f2716 100644 --- a/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache +++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache @@ -1,6 +1,6 @@ dependencies { implementation project(':model') implementation 'org.springframework:spring-context' - implementation 'com.github.bancolombia:aws-secrets-manager-async:4.4.31' - implementation 'software.amazon.awssdk:sts:2.20.0' + implementation 'com.github.bancolombia:aws-secrets-manager-async:{{SECRETS_VERSION}}' + implementation 'software.amazon.awssdk:*' } From 355072e62d986b0075fee4f0fed0b859143aa74b Mon Sep 17 00:00:00 2001 From: Harold Parra Date: Tue, 2 Sep 2025 15:27:32 -0500 Subject: [PATCH 5/6] feat(eda-kaizen): comments corrections --- docs/docs/tasks/4-generate-driven-adapter.md | 1 + .../driven-adapter/secrets-kafka-strimzi/build.gradle.mustache | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/docs/tasks/4-generate-driven-adapter.md b/docs/docs/tasks/4-generate-driven-adapter.md index c8aeb1f8..cc98f038 100644 --- a/docs/docs/tasks/4-generate-driven-adapter.md +++ b/docs/docs/tasks/4-generate-driven-adapter.md @@ -30,6 +30,7 @@ Whether you'll use generic one also parameter `name` is required. | rsocket | RSocket Requester | | | s3 | AWS Simple Storage Service | | | secrets | Secrets Manager Bancolombia | --secrets-backend [backend]
Valid options for backend are "aws_secrets_manager" (default) or "vault". | +| secretskafkastrimzi | Secrets for Kafka Strimzi | --secretName [name] (Opcional) | | sqs | SQS message sender | | _**This task will generate something like that:**_ diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache index 851f2716..7c319965 100644 --- a/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache +++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache @@ -2,5 +2,5 @@ dependencies { implementation project(':model') implementation 'org.springframework:spring-context' implementation 'com.github.bancolombia:aws-secrets-manager-async:{{SECRETS_VERSION}}' - implementation 'software.amazon.awssdk:*' + implementation 'software.amazon.awssdk:' } From d016ca7e46e7ffe7cf26314b076efdd635af8bf7 Mon Sep 17 00:00:00 2001 From: Harold Parra Date: Tue, 2 Sep 2025 16:21:11 -0500 Subject: [PATCH 6/6] feat(eda-kaizen): comments corrections --- .../driven-adapter/secrets-kafka-strimzi/build.gradle.mustache | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache index 7c319965..f16c7770 100644 --- a/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache +++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache @@ -2,5 +2,5 @@ dependencies { implementation project(':model') implementation 'org.springframework:spring-context' implementation 'com.github.bancolombia:aws-secrets-manager-async:{{SECRETS_VERSION}}' - implementation 'software.amazon.awssdk:' + implementation 'software.amazon.awssdk:sts' }