diff --git a/docs/docs/tasks/4-generate-driven-adapter.md b/docs/docs/tasks/4-generate-driven-adapter.md
index c8aeb1f8..cc98f038 100644
--- a/docs/docs/tasks/4-generate-driven-adapter.md
+++ b/docs/docs/tasks/4-generate-driven-adapter.md
@@ -30,6 +30,7 @@ Whether you'll use generic one also parameter `name` is required.
| rsocket | RSocket Requester | |
| s3 | AWS Simple Storage Service | |
| secrets | Secrets Manager Bancolombia | --secrets-backend [backend]
Valid options for backend are "aws_secrets_manager" (default) or "vault". |
+| secretskafkastrimzi | Secrets for Kafka Strimzi | --secretName [name] (Opcional) |
| sqs | SQS message sender | |
_**This task will generate something like that:**_
diff --git a/docs/docs/tasks/5-generate-entry-point.md b/docs/docs/tasks/5-generate-entry-point.md
index d4d3d190..3faa51aa 100644
--- a/docs/docs/tasks/5-generate-entry-point.md
+++ b/docs/docs/tasks/5-generate-entry-point.md
@@ -24,10 +24,23 @@ Whether you'll use generic one also parameter `name` is required.
| rsocket | Rsocket Controller Entry Point | |
| sqs | SQS Listener | |
| webflux | API REST (Spring Boot Starter WebFlux) | --router [true, false] default true --authorization [true,false] --from-swagger swagger.yaml --versioning [HEADER, PATH,NONE] default NONE |
+| kafkastrimzi | Kafka Strimzi Consumer Entry Point | --name [name] --topicConsumer [topicName] (optional, for default 'test-with-registries') |
Additionally, if you'll use a restmvc, you can specify the web server on which the application will run. By default,
undertow.
+## Ejemplo de uso para Kafka Strimzi Consumer
+```shell
+gradle generateEntryPoint --type=kafkastrimzi
+gradle gep --type=kafkastrimzi
+```
+```shell
+gradle generateEntryPoint --type=kafkastrimzi --name=myConsumer --topicConsumer=myTopic
+gradle gep --type=kafkastrimzi --name=myConsumer --topicConsumer=myTopic
+```
+
+This will generate a specialized entry point for consuming Kafka messages using Strimzi, with custom parameters.
+
```shell
gradle generateEntryPoint --type=restmvc --server=[serverOption]
gradle gep --type=restmvc --server=[serverOption]
diff --git a/sh_generate_project.sh b/sh_generate_project.sh
index 0bcb9f0a..f8347134 100755
--- a/sh_generate_project.sh
+++ b/sh_generate_project.sh
@@ -27,12 +27,12 @@ gradle wrapper
if [ $TYPE == "reactive" ]
then
- for adapter in "asynceventbus" "binstash" "cognitotokenprovider" "dynamodb" "kms" "mongodb" "mq" "r2dbc" "redis" "restconsumer" "rsocket" "s3" "secrets" "sqs"
+ for adapter in "asynceventbus" "binstash" "cognitotokenprovider" "dynamodb" "kms" "mongodb" "mq" "r2dbc" "redis" "restconsumer" "rsocket" "s3" "secrets" "sqs" "secretskafkastrimzi"
do
./gradlew gda --type $adapter
done
- for entry in "asynceventhandler" "graphql" "kafka" "mq" "rsocket" "sqs" "webflux"
+ for entry in "asynceventhandler" "graphql" "kafka" "mq" "rsocket" "sqs" "webflux" "kafkastrimzi"
do
./gradlew gep --type $entry
done
diff --git a/src/main/java/co/com/bancolombia/factory/adapters/DrivenAdapterSecretsKafkaStrimzi.java b/src/main/java/co/com/bancolombia/factory/adapters/DrivenAdapterSecretsKafkaStrimzi.java
new file mode 100644
index 00000000..bc92f4aa
--- /dev/null
+++ b/src/main/java/co/com/bancolombia/factory/adapters/DrivenAdapterSecretsKafkaStrimzi.java
@@ -0,0 +1,29 @@
+package co.com.bancolombia.factory.adapters;
+
+import static co.com.bancolombia.Constants.APP_SERVICE;
+import static co.com.bancolombia.utils.Utils.buildImplementationFromProject;
+
+import co.com.bancolombia.exceptions.CleanException;
+import co.com.bancolombia.factory.ModuleBuilder;
+import co.com.bancolombia.factory.ModuleFactory;
+import java.io.IOException;
+
+public class DrivenAdapterSecretsKafkaStrimzi implements ModuleFactory {
+
+ @Override
+ public void buildModule(ModuleBuilder builder) throws IOException, CleanException {
+ String secretName = builder.getStringParam("secretName");
+ if (secretName == null || secretName.isEmpty()) {
+ secretName = "strimzi-kafka-test";
+ }
+ builder.addParam("secretName", secretName);
+ builder.setupFromTemplate("driven-adapter/secrets-kafka-strimzi");
+ builder.appendToSettings("secrets", "infrastructure/driven-adapters");
+ builder
+ .appendToProperties("aws")
+ .put("region", "us-east-1")
+ .put("strimzi-kafka-secretName", secretName);
+ String dependency = buildImplementationFromProject(":secrets");
+ builder.appendDependencyToModule(APP_SERVICE, dependency);
+ }
+}
diff --git a/src/main/java/co/com/bancolombia/factory/entrypoints/EntryPointKafkaStrimzi.java b/src/main/java/co/com/bancolombia/factory/entrypoints/EntryPointKafkaStrimzi.java
new file mode 100644
index 00000000..5443b4a2
--- /dev/null
+++ b/src/main/java/co/com/bancolombia/factory/entrypoints/EntryPointKafkaStrimzi.java
@@ -0,0 +1,39 @@
+package co.com.bancolombia.factory.entrypoints;
+
+import static co.com.bancolombia.Constants.APP_SERVICE;
+import static co.com.bancolombia.utils.Utils.buildImplementationFromProject;
+
+import co.com.bancolombia.exceptions.CleanException;
+import co.com.bancolombia.factory.ModuleBuilder;
+import co.com.bancolombia.factory.ModuleFactory;
+import co.com.bancolombia.factory.validations.ReactiveTypeValidation;
+import java.io.IOException;
+
+public class EntryPointKafkaStrimzi implements ModuleFactory {
+
+ @Override
+ public void buildModule(ModuleBuilder builder) throws IOException, CleanException {
+ builder.runValidations(ReactiveTypeValidation.class);
+ String name = builder.getStringParam("name");
+ if (name == null || name.isEmpty()) {
+ name = "kafkaStrimzi";
+ }
+ String topicConsumer = builder.getStringParam("topicConsumer");
+ if (topicConsumer == null || topicConsumer.isEmpty()) {
+ topicConsumer = "test-with-registries";
+ }
+ builder.addParam("name", name);
+
+ builder.addParam("topicConsumer", topicConsumer);
+ builder.setupFromTemplate("entry-point/kafka-strimzi-consumer");
+ builder.appendToSettings("kafka-consumer", "infrastructure/entry-points");
+ String dependency = buildImplementationFromProject(":kafka-consumer");
+ builder.appendDependencyToModule(APP_SERVICE, dependency);
+
+ builder
+ .appendToProperties("spring.kafka.consumer")
+ .put("bootstrap-servers", "localhost:9092")
+ .put("group-id", builder.getProject().getName());
+ builder.appendToProperties("adapters.kafka.consumer").put("topic", topicConsumer);
+ }
+}
diff --git a/src/main/java/co/com/bancolombia/factory/validations/architecture/ArchitectureValidation.java b/src/main/java/co/com/bancolombia/factory/validations/architecture/ArchitectureValidation.java
index 79a6713b..fb445914 100644
--- a/src/main/java/co/com/bancolombia/factory/validations/architecture/ArchitectureValidation.java
+++ b/src/main/java/co/com/bancolombia/factory/validations/architecture/ArchitectureValidation.java
@@ -22,7 +22,7 @@ public final class ArchitectureValidation {
private static final String FORBIDDEN_DOMAIN_CLASS_NAMES_PROP =
"arch.unit.forbiddenDomainClassNames";
private static final String FORBIDDEN_DOMAIN_NAMES =
- "rabbit,sqs,sns,ibm,dynamo,aws,mysql,postgres,redis,mongo,rsocket,r2dbc,http,kms,s3,graphql,kafka";
+ "rabbit,sqs,sns,ibm,dynamo,aws,mysql,postgres,redis,mongo,rsocket,r2dbc,http,kms,s3,graphql,kafka,kafkastrimzi";
public static void inject(ModuleBuilder builder, Logger logger, Set files) {
if (!FileUtils.readBooleanProperty(SKIP_PROP)) {
diff --git a/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java b/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java
index 10cb3ced..18221ee7 100644
--- a/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java
+++ b/src/main/java/co/com/bancolombia/task/GenerateDrivenAdapterTask.java
@@ -81,6 +81,11 @@ public void setSecretsBackend(DrivenAdapterSecrets.SecretsBackend secretsBackend
this.secretsBackend = secretsBackend;
}
+ @Option(option = "secretName", description = "Set the name of the secret in AWS Secrets Manager")
+ public void setSecretName(String secretName) {
+ builder.addParam("secretName", secretName);
+ }
+
@Override
protected void prepareParams() {
builder.addParam("task-param-cache-mode", cacheMode);
diff --git a/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java b/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java
index 9ddf7951..8af0487b 100644
--- a/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java
+++ b/src/main/java/co/com/bancolombia/task/GenerateEntryPointTask.java
@@ -69,6 +69,11 @@ public void setEda(BooleanOption eda) {
this.eda = eda;
}
+ @Option(option = "topicConsumer", description = "Set the topic for the Kafka consumer")
+ public void setTopicConsumer(String topicConsumer) {
+ builder.addParam("topicConsumer", topicConsumer);
+ }
+
@OptionValues("eda")
public List getEdaOptions() {
return Arrays.asList(BooleanOption.values());
diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache
new file mode 100644
index 00000000..f16c7770
--- /dev/null
+++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/build.gradle.mustache
@@ -0,0 +1,6 @@
+dependencies {
+ implementation project(':model')
+ implementation 'org.springframework:spring-context'
+ implementation 'com.github.bancolombia:aws-secrets-manager-async:{{SECRETS_VERSION}}'
+ implementation 'software.amazon.awssdk:sts'
+}
diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/definition.json b/src/main/resources/driven-adapter/secrets-kafka-strimzi/definition.json
new file mode 100644
index 00000000..bcbc4bac
--- /dev/null
+++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/definition.json
@@ -0,0 +1,9 @@
+{
+ "folders": [
+ "infrastructure/driven-adapters/secrets/src/main/java/{{packagePath}}/secrets/config"
+ ],
+ "files": {
+ "driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache": "infrastructure/driven-adapters/secrets/src/main/java/{{packagePath}}/secrets/config/KafkaOauthSecretsConfig.java",
+ "driven-adapter/secrets-kafka-strimzi/build.gradle.mustache": "infrastructure/driven-adapters/secrets/build.gradle"
+ }
+}
diff --git a/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache b/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache
new file mode 100644
index 00000000..693a385e
--- /dev/null
+++ b/src/main/resources/driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache
@@ -0,0 +1,43 @@
+package {{package}}.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
+import org.springframework.core.annotation.Order;
+
+import co.com.bancolombia.model.topiccredentials.ConsumerCredentials;
+import co.com.bancolombia.secretsmanager.api.GenericManagerAsync;
+import co.com.bancolombia.secretsmanager.api.exceptions.SecretException;
+import co.com.bancolombia.secretsmanager.config.AWSSecretsManagerConfig;
+import co.com.bancolombia.secretsmanager.connector.AWSSecretManagerConnectorAsync;
+import software.amazon.awssdk.regions.Region;
+
+@Configuration
+class KafkaOauthSecretsConfig {
+
+ @Bean("getSecret")
+ @Order(1)
+ GenericManagerAsync getSecretManager(@Value("${aws.region}") String region) {
+ return new AWSSecretManagerConnectorAsync(buildSecretsManagerConfig(region));
+ }
+
+ private AWSSecretsManagerConfig buildSecretsManagerConfig(String region) {
+ return AWSSecretsManagerConfig.builder()
+ .region(Region.of(region))
+ .cacheSize(100)
+ .cacheSeconds(100)
+ .build();
+ }
+
+ @Bean
+ @DependsOn("getSecret")
+ ConsumerCredentials getStrimziKafkaCredentials(
+ @Value("${aws.region}") String region,
+ @Value("${aws.strimzi-kafka-secretName}") String secretName
+ ) throws SecretException {
+ return getSecretManager(region)
+ .getSecret(secretName, ConsumerCredentials.class)
+ .block();
+ }
+}
diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/build.gradle.mustache b/src/main/resources/entry-point/kafka-strimzi-consumer/build.gradle.mustache
new file mode 100644
index 00000000..535f47ef
--- /dev/null
+++ b/src/main/resources/entry-point/kafka-strimzi-consumer/build.gradle.mustache
@@ -0,0 +1,13 @@
+dependencies {
+ implementation project(':model')
+ implementation project(':usecase')
+ implementation 'io.projectreactor.kafka:reactor-kafka'
+ implementation 'org.springframework.kafka:spring-kafka'
+ implementation 'org.springframework.boot:spring-boot-autoconfigure'
+ implementation 'org.springframework.boot:spring-boot'
+ implementation 'org.apache.logging.log4j:log4j-api'
+
+ implementation 'io.apicurio:apicurio-registry-serdes-jsonschema-serde:2.5.11.Final'
+ implementation 'io.apicurio:apicurio-registry-rest-client:1.3.2.Final'
+ implementation 'io.apicurio:apicurio-registry:3.0.6'
+}
diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache b/src/main/resources/entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache
new file mode 100644
index 00000000..7771a174
--- /dev/null
+++ b/src/main/resources/entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache
@@ -0,0 +1,15 @@
+package {{package}}.model.topiccredentials;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+public class ConsumerCredentials {
+ private String clientId;
+ private String clientSecret;
+ private String userPoolId;
+ private String tokenEndpointUri;
+ private String consumerScope;
+ private String grantType;
+}
diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/definition.json b/src/main/resources/entry-point/kafka-strimzi-consumer/definition.json
new file mode 100644
index 00000000..2f749a9e
--- /dev/null
+++ b/src/main/resources/entry-point/kafka-strimzi-consumer/definition.json
@@ -0,0 +1,12 @@
+{
+ "folders": [
+ "infrastructure/entry-points/kafka-consumer/src/test/java/{{packagePath}}/kafka/consumer/config",
+ "domain/model/src/main/java/{{packagePath}}/model/topiccredentials"
+ ],
+ "files": {
+ "entry-point/kafka-strimzi-consumer/build.gradle.mustache": "infrastructure/entry-points/kafka-consumer/build.gradle",
+ "entry-point/kafka-strimzi-consumer/kafka-config.java.mustache": "infrastructure/entry-points/kafka-consumer/src/main/java/{{packagePath}}/kafka/consumer/config/KafkaConfig.java",
+ "entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache": "infrastructure/entry-points/kafka-consumer/src/main/java/{{packagePath}}/kafka/consumer/KafkaConsumer.java",
+ "entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache":"domain/model/src/main/java/{{packagePath}}/model/topiccredentials/ConsumerCredentials.java"
+ }
+}
diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-config.java.mustache b/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-config.java.mustache
new file mode 100644
index 00000000..818a185e
--- /dev/null
+++ b/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-config.java.mustache
@@ -0,0 +1,93 @@
+package {{package}}.kafka.consumer.config;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.ssl.SslBundles;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
+
+import {{package}}.model.topiccredentials.ConsumerCredentials;
+import io.apicurio.registry.serde.SerdeConfig;
+import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
+import lombok.AllArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import reactor.kafka.receiver.ReceiverOptions;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Map;
+
+@Configuration
+@AllArgsConstructor
+@Log4j2
+public class KafkaConfig {
+
+ private final ConsumerCredentials consumerCredentials;
+
+ @Bean
+ ReceiverOptions kafkaReceiverOptions(
+ @Value(value = "${adapters.kafka.consumer.topic}") String topic,
+ KafkaProperties kafkaProperties,
+ SslBundles sslBundles) throws UnknownHostException {
+
+ ReceiverOptions basicReceiverOptions =
+ ReceiverOptions.create(buildJaasConfig(kafkaProperties, sslBundles));
+
+ return basicReceiverOptions.subscription(Collections.singletonList(topic));
+ }
+
+ @Bean
+ ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate(
+ ReceiverOptions kafkaReceiverOptions) {
+ return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
+ }
+
+ public Map buildJaasConfig(KafkaProperties kafkaProperties, SslBundles sslBundles) {
+ Map props = kafkaProperties.buildConsumerProperties(sslBundles);
+ String jaasConfig = String.format(
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required "
+ + "oauth.client.id=\"%s\" "
+ + "oauth.client.secret=\"%s\" "
+ + "oauth.token.endpoint.uri=\"%s\" "
+ + "oauth.grant.type=\"%s\";",
+ consumerCredentials.getClientId(),
+ consumerCredentials.getClientSecret(),
+ consumerCredentials.getTokenEndpointUri(),
+ consumerCredentials.getGrantType()
+ );
+
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
+ props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "ubicacion certificado ssl");
+ props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "sslTruststorePassword");
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "sslTruststoreType");
+
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "bootstrapServers");
+ props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
+
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
+
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class);
+
+ props.put(SerdeConfig.REGISTRY_URL, "registryUrl");
+ props.put(SerdeConfig.EXPLICIT_ARTIFACT_ID, "artifactId");
+ props.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "artifactGroupId");
+ props.put(SerdeConfig.EXPLICIT_ARTIFACT_VERSION, "artifactVersion");
+
+ props.put(SerdeConfig.AUTH_CLIENT_SECRET, consumerCredentials.getClientSecret());
+ props.put(SerdeConfig.AUTH_CLIENT_ID, consumerCredentials.getClientId());
+ props.put(SerdeConfig.AUTH_TOKEN_ENDPOINT, consumerCredentials.getTokenEndpointUri());
+ props.put(SerdeConfig.VALIDATION_ENABLED, true);
+
+ return props;
+ }
+}
diff --git a/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache b/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache
new file mode 100644
index 00000000..572c8bab
--- /dev/null
+++ b/src/main/resources/entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache
@@ -0,0 +1,39 @@
+package {{package}}.kafka.consumer;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+@Component
+@Log4j2
+@RequiredArgsConstructor
+public class KafkaConsumer {
+ private final ReactiveKafkaConsumerTemplate kafkaConsumer;
+ //private final SomeUseCase useCase;
+
+ @EventListener(ApplicationStartedEvent.class)
+ public Flux