diff --git a/spring-kafka-4/pom.xml b/spring-kafka-4/pom.xml index dd1acfa30a50..3a3201af23c6 100644 --- a/spring-kafka-4/pom.xml +++ b/spring-kafka-4/pom.xml @@ -31,7 +31,6 @@ org.springframework.boot spring-boot-starter-actuator - org.apache.avro avro diff --git a/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaConsumer.java b/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaConsumer.java new file mode 100644 index 000000000000..f7a684d07336 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaConsumer.java @@ -0,0 +1,24 @@ +package com.baeldung.sasl; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +public class KafkaConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); + public static final String TOPIC = "test-topic"; + public final List messages = new ArrayList<>(); + + @KafkaListener(topics = TOPIC) + public void receive(ConsumerRecord consumerRecord) { + LOGGER.info("Received payload: '{}'", consumerRecord.toString()); + messages.add(consumerRecord.value()); + } +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaSaslApplication.java b/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaSaslApplication.java new file mode 100644 index 000000000000..57e2ac605bb7 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaSaslApplication.java @@ -0,0 +1,13 @@ +package com.baeldung.sasl; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaSaslApplication { + + public static void main(String[] args) { + System.setProperty("spring.config.name", "application-sasl"); + SpringApplication.run(KafkaSaslApplication.class, args); + } +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaConsumer.java b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaConsumer.java new file mode 100644 index 000000000000..873b593d4d1e --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaConsumer.java @@ -0,0 +1,24 @@ +package com.baeldung.saslplaintext; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +public class KafkaConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); + public static final String TOPIC = "test-topic"; + public final List messages = new ArrayList<>(); + + @KafkaListener(topics = TOPIC) + public void receive(ConsumerRecord consumerRecord) { + LOGGER.info("Received payload: '{}'", consumerRecord.toString()); + messages.add(consumerRecord.value()); + } +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaProducer.java b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaProducer.java new file mode 100644 index 000000000000..9ca6e2d0f8df --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaProducer.java @@ -0,0 +1,29 @@ +package com.baeldung.saslplaintext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +public class KafkaProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class); + private final KafkaTemplate kafkaTemplate; + + public KafkaProducer(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void sendMessage(String message, String topic) { + LOGGER.info("Producing message: {}", message); + kafkaTemplate.send(topic, "key", message) + .whenComplete((result, ex) -> { + if (ex == null) { + LOGGER.info("Message sent to topic: {}", message); + } else { + LOGGER.error("Failed to send message", ex); + } + }); + } +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplication.java b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplication.java new file mode 100644 index 000000000000..4eef4c4dc2bb --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplication.java @@ -0,0 +1,13 @@ +package com.baeldung.saslplaintext; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaSaslPlaintextApplication { + + public static void main(String[] args) { + System.setProperty("spring.config.name", "application-sasl-plaintext"); + SpringApplication.run(KafkaSaslPlaintextApplication.class, args); + } +} diff --git a/spring-kafka-4/src/main/resources/application-sasl-plaintext.yml b/spring-kafka-4/src/main/resources/application-sasl-plaintext.yml new file mode 100644 index 000000000000..6f0f5fe99cac --- /dev/null +++ b/spring-kafka-4/src/main/resources/application-sasl-plaintext.yml @@ -0,0 +1,16 @@ +spring: + kafka: + bootstrap-servers: localhost:9092 + properties: + sasl.mechanism: PLAIN + sasl.jaas.config: > + org.apache.kafka.common.security.plain.PlainLoginModule required + username="user1" + password="user1-secret"; + security: + protocol: SASL_PLAINTEXT + consumer: + group-id: test-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer \ No newline at end of file diff --git a/spring-kafka-4/src/main/resources/application-sasl.yml b/spring-kafka-4/src/main/resources/application-sasl.yml new file mode 100644 index 000000000000..58d970fa6857 --- /dev/null +++ b/spring-kafka-4/src/main/resources/application-sasl.yml @@ -0,0 +1,19 @@ +spring: + kafka: + bootstrap-servers: localhost:9092 + properties: + sasl.mechanism: GSSAPI + sasl.jaas.config: > + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="./src/test/resources/sasl/keytabs/client.keytab" + principal="client@BAELDUNG.COM" + serviceName="kafka"; + security: + protocol: "SASL_PLAINTEXT" + consumer: + group-id: test + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer \ No newline at end of file diff --git a/spring-kafka-4/src/test/java/com/baeldung/sasl/SpringContextTest.java b/spring-kafka-4/src/test/java/com/baeldung/sasl/SpringContextTest.java new file mode 100644 index 000000000000..1739ce983b69 --- /dev/null +++ b/spring-kafka-4/src/test/java/com/baeldung/sasl/SpringContextTest.java @@ -0,0 +1,15 @@ +package com.baeldung.sasl; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = KafkaSaslApplication.class) +class SpringContextTest { + + @Test + void whenSpringContextIsBootstrapped_thenNoExceptions() { + } +} diff --git a/spring-kafka-4/src/test/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplicationLiveTest.java b/spring-kafka-4/src/test/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplicationLiveTest.java new file mode 100644 index 000000000000..0a105da4828a --- /dev/null +++ b/spring-kafka-4/src/test/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplicationLiveTest.java @@ -0,0 +1,48 @@ +package com.baeldung.saslplaintext; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; + +import static com.baeldung.saslplaintext.KafkaConsumer.TOPIC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@Testcontainers +@ActiveProfiles("sasl-plaintext") +@SpringBootTest(classes = KafkaSaslPlaintextApplication.class) +class KafkaSaslPlaintextApplicationLiveTest { + + private static final File KAFKA_COMPOSE_FILE = new File("src/test/resources/sasl-plaintext/docker-compose.yml"); + private static final String KAFKA_SERVICE = "kafka"; + private static final int SASL_PORT = 9092; + + @Container + public DockerComposeContainer container = + new DockerComposeContainer<>(KAFKA_COMPOSE_FILE) + .withExposedService(KAFKA_SERVICE, SASL_PORT, Wait.forListeningPort()); + + @Autowired + private KafkaProducer kafkaProducer; + + @Autowired + private KafkaConsumer kafkaConsumer; + + @Test + void givenSaslIsConfigured_whenProducerSendsMessageOverSasl_thenConsumerReceivesOverSasl() { + String message = UUID.randomUUID().toString(); + kafkaProducer.sendMessage(message, TOPIC); + + await().atMost(Duration.ofMinutes(2)) + .untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message)); + } +} diff --git a/spring-kafka-4/src/test/resources/sasl-plaintext/config/kafka_server_jaas.conf b/spring-kafka-4/src/test/resources/sasl-plaintext/config/kafka_server_jaas.conf new file mode 100644 index 000000000000..7fc27888f851 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl-plaintext/config/kafka_server_jaas.conf @@ -0,0 +1,13 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret" + user_user1="user1-secret"; +}; + +Client { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="zookeeper" + password="zookeeper-secret"; +}; \ No newline at end of file diff --git a/spring-kafka-4/src/test/resources/sasl-plaintext/config/zookeeper_jaas.conf b/spring-kafka-4/src/test/resources/sasl-plaintext/config/zookeeper_jaas.conf new file mode 100644 index 000000000000..566365981ca4 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl-plaintext/config/zookeeper_jaas.conf @@ -0,0 +1,6 @@ +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="zookeeper" + password="zookeeper-secret" + user_zookeeper="zookeeper-secret"; +}; \ No newline at end of file diff --git a/spring-kafka-4/src/test/resources/sasl-plaintext/docker-compose.yml b/spring-kafka-4/src/test/resources/sasl-plaintext/docker-compose.yml new file mode 100644 index 000000000000..de5a380aed90 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl-plaintext/docker-compose.yml @@ -0,0 +1,29 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.6.6 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf" + volumes: + - ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf + ports: + - 2181 + + kafka: + image: confluentinc/cp-kafka:7.6.6 + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + volumes: + - ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf + ports: + - "9092:9092" \ No newline at end of file diff --git a/spring-kafka-4/src/test/resources/sasl/Dockerfile b/spring-kafka-4/src/test/resources/sasl/Dockerfile new file mode 100644 index 000000000000..b2955abbffb3 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl/Dockerfile @@ -0,0 +1,15 @@ +# Use a minimal base image +FROM debian:bullseye + +RUN apt-get update && \ + apt-get install -y krb5-kdc krb5-admin-server krb5-user && \ + rm -rf /var/lib/apt/lists/* + +COPY config/krb5.conf /etc/krb5.conf +COPY setup_kdc.sh /setup_kdc.sh + +RUN chmod +x /setup_kdc.sh + +EXPOSE 88 749 + +CMD ["/setup_kdc.sh"] \ No newline at end of file diff --git a/spring-kafka-4/src/test/resources/sasl/config/kadm5.acl b/spring-kafka-4/src/test/resources/sasl/config/kadm5.acl new file mode 100644 index 000000000000..db000b280c48 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl/config/kadm5.acl @@ -0,0 +1 @@ +*/admin@BAELDUNG.COM * \ No newline at end of file diff --git a/spring-kafka-4/src/test/resources/sasl/config/kafka_server_jaas.conf b/spring-kafka-4/src/test/resources/sasl/config/kafka_server_jaas.conf new file mode 100644 index 000000000000..222de53e0013 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl/config/kafka_server_jaas.conf @@ -0,0 +1,17 @@ +KafkaServer { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/etc/kafka/keytabs/kafka.keytab" + principal="kafka/localhost@BAELDUNG.COM" + serviceName="kafka"; +}; + +Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/etc/kafka/keytabs/client.keytab" + principal="client@BAELDUNG.COM" + serviceName="kafka"; +}; \ No newline at end of file diff --git a/spring-kafka-4/src/test/resources/sasl/config/krb5.conf b/spring-kafka-4/src/test/resources/sasl/config/krb5.conf new file mode 100644 index 000000000000..f37a9ac5080c --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl/config/krb5.conf @@ -0,0 +1,17 @@ +[libdefaults] + default_realm = BAELDUNG.COM + dns_lookup_realm = false + dns_lookup_kdc = false + forwardable = true + rdns = true + +[realms] + BAELDUNG.COM = { + kdc = kdc + admin_server = kdc + } + +[logging] + default = FILE:/var/log/krb5libs.log + kdc = FILE:/var/log/krb5kdc.log + admin_server = FILE:/var/log/kadmind.log diff --git a/spring-kafka-4/src/test/resources/sasl/config/zookeeper_jaas.conf b/spring-kafka-4/src/test/resources/sasl/config/zookeeper_jaas.conf new file mode 100644 index 000000000000..5d5028e5cc94 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl/config/zookeeper_jaas.conf @@ -0,0 +1,7 @@ +Server { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/etc/kafka/keytabs/zookeeper.keytab" + principal="zookeeper/zookeeper.sasl_default@BAELDUNG.COM"; +}; \ No newline at end of file diff --git a/spring-kafka-4/src/test/resources/sasl/docker-compose.yml b/spring-kafka-4/src/test/resources/sasl/docker-compose.yml new file mode 100644 index 000000000000..2f084537c427 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl/docker-compose.yml @@ -0,0 +1,50 @@ +services: + + kdc: + build: + context: . + dockerfile: Dockerfile + volumes: + - ./config:/etc/krb5kdc + - ./keytabs:/etc/krb5kdc/keytabs + - ./config/krb5.conf:/etc/krb5.conf + ports: + - "88:88/udp" + + zookeeper: + image: confluentinc/cp-zookeeper:latest + container_name: zookeeper + hostname: localhost + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf" + volumes: + - ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf + - ./keytabs:/etc/kafka/keytabs + - ./config/krb5.conf:/etc/krb5.conf + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:latest + container_name: kafka + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI + KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI + KAFKA_LISTENERS: SASL_PLAINTEXT://:9092 + KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf" + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + volumes: + - ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf + - ./keytabs:/etc/kafka/keytabs + - ./config/krb5.conf:/etc/krb5.conf + depends_on: + - zookeeper + - kdc + ports: + - 9092:9092 diff --git a/spring-kafka-4/src/test/resources/sasl/setup_kdc.sh b/spring-kafka-4/src/test/resources/sasl/setup_kdc.sh new file mode 100644 index 000000000000..91bd12080cc3 --- /dev/null +++ b/spring-kafka-4/src/test/resources/sasl/setup_kdc.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +kdb5_util create -s -P masterpassword + +kadmin.local -q "addprinc -randkey kafka/localhost@BAELDUNG.COM" +kadmin.local -q "addprinc -randkey zookeeper/zookeeper.sasl_default@BAELDUNG.COM" +kadmin.local -q "addprinc -randkey client@BAELDUNG.COM" + +kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/kafka.keytab kafka/localhost@BAELDUNG.COM" +kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/zookeeper.keytab zookeeper/zookeeper.sasl_default@BAELDUNG.COM" +kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/client.keytab client@BAELDUNG.COM" + +krb5kdc +kadmind -nofork \ No newline at end of file