这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/tasks/4-generate-driven-adapter.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Whether you'll use generic one also parameter `name` is required.
| rsocket | RSocket Requester | |
| s3 | AWS Simple Storage Service | |
| secrets | Secrets Manager Bancolombia | --secrets-backend [backend] <br/> Valid options for backend are "aws_secrets_manager" (default) or "vault". |
| secretskafkastrimzi | Secrets for Kafka Strimzi | --secretName [name] (Opcional) |
| sqs | SQS message sender | |

_**This task will generate something like that:**_
Expand Down
13 changes: 13 additions & 0 deletions docs/docs/tasks/5-generate-entry-point.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,23 @@ Whether you'll use generic one also parameter `name` is required.
| rsocket | Rsocket Controller Entry Point | |
| sqs | SQS Listener | |
| webflux | API REST (Spring Boot Starter WebFlux) | --router [true, false] default true --authorization [true,false] --from-swagger swagger.yaml --versioning [HEADER, PATH,NONE] default NONE |
| kafkastrimzi | Kafka Strimzi Consumer Entry Point | --name [name] --topicConsumer [topicName] (optional, for default 'test-with-registries') |

Additionally, if you'll use a restmvc, you can specify the web server on which the application will run. By default,
undertow.

## Ejemplo de uso para Kafka Strimzi Consumer
```shell
gradle generateEntryPoint --type=kafkastrimzi
gradle gep --type=kafkastrimzi
```
```shell
gradle generateEntryPoint --type=kafkastrimzi --name=myConsumer --topicConsumer=myTopic
gradle gep --type=kafkastrimzi --name=myConsumer --topicConsumer=myTopic
```

This will generate a specialized entry point for consuming Kafka messages using Strimzi, with custom parameters.

```shell
gradle generateEntryPoint --type=restmvc --server=[serverOption]
gradle gep --type=restmvc --server=[serverOption]
Expand Down
4 changes: 2 additions & 2 deletions sh_generate_project.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ gradle wrapper

if [ $TYPE == "reactive" ]
then
for adapter in "asynceventbus" "binstash" "cognitotokenprovider" "dynamodb" "kms" "mongodb" "mq" "r2dbc" "redis" "restconsumer" "rsocket" "s3" "secrets" "sqs"
for adapter in "asynceventbus" "binstash" "cognitotokenprovider" "dynamodb" "kms" "mongodb" "mq" "r2dbc" "redis" "restconsumer" "rsocket" "s3" "secrets" "sqs" "secretskafkastrimzi"
do
./gradlew gda --type $adapter
done

for entry in "asynceventhandler" "graphql" "kafka" "mq" "rsocket" "sqs" "webflux"
for entry in "asynceventhandler" "graphql" "kafka" "mq" "rsocket" "sqs" "webflux" "kafkastrimzi"
do
./gradlew gep --type $entry
done
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package co.com.bancolombia.factory.adapters;

import static co.com.bancolombia.Constants.APP_SERVICE;
import static co.com.bancolombia.utils.Utils.buildImplementationFromProject;

import co.com.bancolombia.exceptions.CleanException;
import co.com.bancolombia.factory.ModuleBuilder;
import co.com.bancolombia.factory.ModuleFactory;
import java.io.IOException;

public class DrivenAdapterSecretsKafkaStrimzi implements ModuleFactory {

@Override
public void buildModule(ModuleBuilder builder) throws IOException, CleanException {
String secretName = builder.getStringParam("secretName");
if (secretName == null || secretName.isEmpty()) {
secretName = "strimzi-kafka-test";
}
builder.addParam("secretName", secretName);
builder.setupFromTemplate("driven-adapter/secrets-kafka-strimzi");
builder.appendToSettings("secrets", "infrastructure/driven-adapters");
builder
.appendToProperties("aws")
.put("region", "us-east-1")
.put("strimzi-kafka-secretName", secretName);
String dependency = buildImplementationFromProject(":secrets");
builder.appendDependencyToModule(APP_SERVICE, dependency);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package co.com.bancolombia.factory.entrypoints;

import static co.com.bancolombia.Constants.APP_SERVICE;
import static co.com.bancolombia.utils.Utils.buildImplementationFromProject;

import co.com.bancolombia.exceptions.CleanException;
import co.com.bancolombia.factory.ModuleBuilder;
import co.com.bancolombia.factory.ModuleFactory;
import co.com.bancolombia.factory.validations.ReactiveTypeValidation;
import java.io.IOException;

public class EntryPointKafkaStrimzi implements ModuleFactory {

@Override
public void buildModule(ModuleBuilder builder) throws IOException, CleanException {
builder.runValidations(ReactiveTypeValidation.class);
String name = builder.getStringParam("name");
if (name == null || name.isEmpty()) {
name = "kafkaStrimzi";
}
String topicConsumer = builder.getStringParam("topicConsumer");
if (topicConsumer == null || topicConsumer.isEmpty()) {
topicConsumer = "test-with-registries";
}
builder.addParam("name", name);

builder.addParam("topicConsumer", topicConsumer);
builder.setupFromTemplate("entry-point/kafka-strimzi-consumer");
builder.appendToSettings("kafka-consumer", "infrastructure/entry-points");
String dependency = buildImplementationFromProject(":kafka-consumer");
builder.appendDependencyToModule(APP_SERVICE, dependency);

builder
.appendToProperties("spring.kafka.consumer")
.put("bootstrap-servers", "localhost:9092")
.put("group-id", builder.getProject().getName());
builder.appendToProperties("adapters.kafka.consumer").put("topic", topicConsumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public final class ArchitectureValidation {
private static final String FORBIDDEN_DOMAIN_CLASS_NAMES_PROP =
"arch.unit.forbiddenDomainClassNames";
private static final String FORBIDDEN_DOMAIN_NAMES =
"rabbit,sqs,sns,ibm,dynamo,aws,mysql,postgres,redis,mongo,rsocket,r2dbc,http,kms,s3,graphql,kafka";
"rabbit,sqs,sns,ibm,dynamo,aws,mysql,postgres,redis,mongo,rsocket,r2dbc,http,kms,s3,graphql,kafka,kafkastrimzi";

public static void inject(ModuleBuilder builder, Logger logger, Set<File> files) {
if (!FileUtils.readBooleanProperty(SKIP_PROP)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public void setSecretsBackend(DrivenAdapterSecrets.SecretsBackend secretsBackend
this.secretsBackend = secretsBackend;
}

@Option(option = "secretName", description = "Set the name of the secret in AWS Secrets Manager")
public void setSecretName(String secretName) {
builder.addParam("secretName", secretName);
}

@Override
protected void prepareParams() {
builder.addParam("task-param-cache-mode", cacheMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public void setEda(BooleanOption eda) {
this.eda = eda;
}

@Option(option = "topicConsumer", description = "Set the topic for the Kafka consumer")
public void setTopicConsumer(String topicConsumer) {
builder.addParam("topicConsumer", topicConsumer);
}

@OptionValues("eda")
public List<BooleanOption> getEdaOptions() {
return Arrays.asList(BooleanOption.values());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dependencies {
implementation project(':model')
implementation 'org.springframework:spring-context'
implementation 'com.github.bancolombia:aws-secrets-manager-async:{{SECRETS_VERSION}}'
implementation 'software.amazon.awssdk:sts'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"folders": [
"infrastructure/driven-adapters/secrets/src/main/java/{{packagePath}}/secrets/config"
],
"files": {
"driven-adapter/secrets-kafka-strimzi/kafkaOauthSecretsConfig.java.mustache": "infrastructure/driven-adapters/secrets/src/main/java/{{packagePath}}/secrets/config/KafkaOauthSecretsConfig.java",
"driven-adapter/secrets-kafka-strimzi/build.gradle.mustache": "infrastructure/driven-adapters/secrets/build.gradle"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package {{package}}.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;

import co.com.bancolombia.model.topiccredentials.ConsumerCredentials;
import co.com.bancolombia.secretsmanager.api.GenericManagerAsync;
import co.com.bancolombia.secretsmanager.api.exceptions.SecretException;
import co.com.bancolombia.secretsmanager.config.AWSSecretsManagerConfig;
import co.com.bancolombia.secretsmanager.connector.AWSSecretManagerConnectorAsync;
import software.amazon.awssdk.regions.Region;

@Configuration
class KafkaOauthSecretsConfig {

@Bean("getSecret")
@Order(1)
GenericManagerAsync getSecretManager(@Value("${aws.region}") String region) {
return new AWSSecretManagerConnectorAsync(buildSecretsManagerConfig(region));
}

private AWSSecretsManagerConfig buildSecretsManagerConfig(String region) {
return AWSSecretsManagerConfig.builder()
.region(Region.of(region))
.cacheSize(100)
.cacheSeconds(100)
.build();
}

@Bean
@DependsOn("getSecret")
ConsumerCredentials getStrimziKafkaCredentials(
@Value("${aws.region}") String region,
@Value("${aws.strimzi-kafka-secretName}") String secretName
) throws SecretException {
return getSecretManager(region)
.getSecret(secretName, ConsumerCredentials.class)
.block();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
dependencies {
implementation project(':model')
implementation project(':usecase')
implementation 'io.projectreactor.kafka:reactor-kafka'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-autoconfigure'
implementation 'org.springframework.boot:spring-boot'
implementation 'org.apache.logging.log4j:log4j-api'

implementation 'io.apicurio:apicurio-registry-serdes-jsonschema-serde:2.5.11.Final'
implementation 'io.apicurio:apicurio-registry-rest-client:1.3.2.Final'
implementation 'io.apicurio:apicurio-registry:3.0.6'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package {{package}}.model.topiccredentials;

import lombok.Builder;
import lombok.Data;

@Data
@Builder(toBuilder = true)
public class ConsumerCredentials {
private String clientId;
private String clientSecret;
private String userPoolId;
private String tokenEndpointUri;
private String consumerScope;
private String grantType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"folders": [
"infrastructure/entry-points/kafka-consumer/src/test/java/{{packagePath}}/kafka/consumer/config",
"domain/model/src/main/java/{{packagePath}}/model/topiccredentials"
],
"files": {
"entry-point/kafka-strimzi-consumer/build.gradle.mustache": "infrastructure/entry-points/kafka-consumer/build.gradle",
"entry-point/kafka-strimzi-consumer/kafka-config.java.mustache": "infrastructure/entry-points/kafka-consumer/src/main/java/{{packagePath}}/kafka/consumer/config/KafkaConfig.java",
"entry-point/kafka-strimzi-consumer/kafka-consumer.java.mustache": "infrastructure/entry-points/kafka-consumer/src/main/java/{{packagePath}}/kafka/consumer/KafkaConsumer.java",
"entry-point/kafka-strimzi-consumer/consumerCredentials.java.mustache":"domain/model/src/main/java/{{packagePath}}/model/topiccredentials/ConsumerCredentials.java"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package {{package}}.kafka.consumer.config;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;

import {{package}}.model.topiccredentials.ConsumerCredentials;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import reactor.kafka.receiver.ReceiverOptions;

import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Map;

@Configuration
@AllArgsConstructor
@Log4j2
public class KafkaConfig {

private final ConsumerCredentials consumerCredentials;

@Bean
ReceiverOptions<String, String> kafkaReceiverOptions(
@Value(value = "${adapters.kafka.consumer.topic}") String topic,
KafkaProperties kafkaProperties,
SslBundles sslBundles) throws UnknownHostException {

ReceiverOptions<String, String> basicReceiverOptions =
ReceiverOptions.create(buildJaasConfig(kafkaProperties, sslBundles));

return basicReceiverOptions.subscription(Collections.singletonList(topic));
}

@Bean
ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate(
ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}

public Map<String, Object> buildJaasConfig(KafkaProperties kafkaProperties, SslBundles sslBundles) {
Map<String, Object> props = kafkaProperties.buildConsumerProperties(sslBundles);
String jaasConfig = String.format(
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required "
+ "oauth.client.id=\"%s\" "
+ "oauth.client.secret=\"%s\" "
+ "oauth.token.endpoint.uri=\"%s\" "
+ "oauth.grant.type=\"%s\";",
consumerCredentials.getClientId(),
consumerCredentials.getClientSecret(),
consumerCredentials.getTokenEndpointUri(),
consumerCredentials.getGrantType()
);

props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");

props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "ubicacion certificado ssl");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "sslTruststorePassword");
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "sslTruststoreType");

props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "bootstrapServers");
props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class);

props.put(SerdeConfig.REGISTRY_URL, "registryUrl");
props.put(SerdeConfig.EXPLICIT_ARTIFACT_ID, "artifactId");
props.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "artifactGroupId");
props.put(SerdeConfig.EXPLICIT_ARTIFACT_VERSION, "artifactVersion");

props.put(SerdeConfig.AUTH_CLIENT_SECRET, consumerCredentials.getClientSecret());
props.put(SerdeConfig.AUTH_CLIENT_ID, consumerCredentials.getClientId());
props.put(SerdeConfig.AUTH_TOKEN_ENDPOINT, consumerCredentials.getTokenEndpointUri());
props.put(SerdeConfig.VALIDATION_ENABLED, true);

return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package {{package}}.kafka.consumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Component
@Log4j2
@RequiredArgsConstructor
public class KafkaConsumer {
private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumer;
//private final SomeUseCase useCase;

@EventListener(ApplicationStartedEvent.class)
public Flux<Object> listenMessages() {
return kafkaConsumer
.receiveAutoAck()
.publishOn(Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"kafka"))
.flatMap(record -> {
// map record and process
// return useCase.something(record.value())
log.info("Record received {}", record.value());
return Mono.empty();
})
.doOnError(error -> log.error("Error processing kafka record", error))
.retry()
.repeat();
}
}
Loading
Loading