diff --git a/spring-kafka-4/pom.xml b/spring-kafka-4/pom.xml
index dd1acfa30a50..3a3201af23c6 100644
--- a/spring-kafka-4/pom.xml
+++ b/spring-kafka-4/pom.xml
@@ -31,7 +31,6 @@
org.springframework.boot
spring-boot-starter-actuator
-
org.apache.avro
avro
diff --git a/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaConsumer.java b/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaConsumer.java
new file mode 100644
index 000000000000..f7a684d07336
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaConsumer.java
@@ -0,0 +1,24 @@
+package com.baeldung.sasl;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class KafkaConsumer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
+ public static final String TOPIC = "test-topic";
+ public final List messages = new ArrayList<>();
+
+ @KafkaListener(topics = TOPIC)
+ public void receive(ConsumerRecord consumerRecord) {
+ LOGGER.info("Received payload: '{}'", consumerRecord.toString());
+ messages.add(consumerRecord.value());
+ }
+}
diff --git a/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaSaslApplication.java b/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaSaslApplication.java
new file mode 100644
index 000000000000..57e2ac605bb7
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/sasl/KafkaSaslApplication.java
@@ -0,0 +1,13 @@
+package com.baeldung.sasl;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaSaslApplication {
+
+ public static void main(String[] args) {
+ System.setProperty("spring.config.name", "application-sasl");
+ SpringApplication.run(KafkaSaslApplication.class, args);
+ }
+}
diff --git a/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaConsumer.java b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaConsumer.java
new file mode 100644
index 000000000000..873b593d4d1e
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaConsumer.java
@@ -0,0 +1,24 @@
+package com.baeldung.saslplaintext;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class KafkaConsumer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
+ public static final String TOPIC = "test-topic";
+ public final List messages = new ArrayList<>();
+
+ @KafkaListener(topics = TOPIC)
+ public void receive(ConsumerRecord consumerRecord) {
+ LOGGER.info("Received payload: '{}'", consumerRecord.toString());
+ messages.add(consumerRecord.value());
+ }
+}
diff --git a/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaProducer.java b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaProducer.java
new file mode 100644
index 000000000000..9ca6e2d0f8df
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaProducer.java
@@ -0,0 +1,29 @@
+package com.baeldung.saslplaintext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+@Component
+public class KafkaProducer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
+ private final KafkaTemplate kafkaTemplate;
+
+ public KafkaProducer(KafkaTemplate kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ public void sendMessage(String message, String topic) {
+ LOGGER.info("Producing message: {}", message);
+ kafkaTemplate.send(topic, "key", message)
+ .whenComplete((result, ex) -> {
+ if (ex == null) {
+ LOGGER.info("Message sent to topic: {}", message);
+ } else {
+ LOGGER.error("Failed to send message", ex);
+ }
+ });
+ }
+}
diff --git a/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplication.java b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplication.java
new file mode 100644
index 000000000000..4eef4c4dc2bb
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplication.java
@@ -0,0 +1,13 @@
+package com.baeldung.saslplaintext;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaSaslPlaintextApplication {
+
+ public static void main(String[] args) {
+ System.setProperty("spring.config.name", "application-sasl-plaintext");
+ SpringApplication.run(KafkaSaslPlaintextApplication.class, args);
+ }
+}
diff --git a/spring-kafka-4/src/main/resources/application-sasl-plaintext.yml b/spring-kafka-4/src/main/resources/application-sasl-plaintext.yml
new file mode 100644
index 000000000000..6f0f5fe99cac
--- /dev/null
+++ b/spring-kafka-4/src/main/resources/application-sasl-plaintext.yml
@@ -0,0 +1,16 @@
+spring:
+ kafka:
+ bootstrap-servers: localhost:9092
+ properties:
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: >
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="user1"
+ password="user1-secret";
+ security:
+ protocol: SASL_PLAINTEXT
+ consumer:
+ group-id: test-group
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/resources/application-sasl.yml b/spring-kafka-4/src/main/resources/application-sasl.yml
new file mode 100644
index 000000000000..58d970fa6857
--- /dev/null
+++ b/spring-kafka-4/src/main/resources/application-sasl.yml
@@ -0,0 +1,19 @@
+spring:
+ kafka:
+ bootstrap-servers: localhost:9092
+ properties:
+ sasl.mechanism: GSSAPI
+ sasl.jaas.config: >
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="./src/test/resources/sasl/keytabs/client.keytab"
+ principal="client@BAELDUNG.COM"
+ serviceName="kafka";
+ security:
+ protocol: "SASL_PLAINTEXT"
+ consumer:
+ group-id: test
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/java/com/baeldung/sasl/SpringContextTest.java b/spring-kafka-4/src/test/java/com/baeldung/sasl/SpringContextTest.java
new file mode 100644
index 000000000000..1739ce983b69
--- /dev/null
+++ b/spring-kafka-4/src/test/java/com/baeldung/sasl/SpringContextTest.java
@@ -0,0 +1,15 @@
+package com.baeldung.sasl;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(classes = KafkaSaslApplication.class)
+class SpringContextTest {
+
+ @Test
+ void whenSpringContextIsBootstrapped_thenNoExceptions() {
+ }
+}
diff --git a/spring-kafka-4/src/test/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplicationLiveTest.java b/spring-kafka-4/src/test/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplicationLiveTest.java
new file mode 100644
index 000000000000..0a105da4828a
--- /dev/null
+++ b/spring-kafka-4/src/test/java/com/baeldung/saslplaintext/KafkaSaslPlaintextApplicationLiveTest.java
@@ -0,0 +1,48 @@
+package com.baeldung.saslplaintext;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+
+import static com.baeldung.saslplaintext.KafkaConsumer.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+@Testcontainers
+@ActiveProfiles("sasl-plaintext")
+@SpringBootTest(classes = KafkaSaslPlaintextApplication.class)
+class KafkaSaslPlaintextApplicationLiveTest {
+
+ private static final File KAFKA_COMPOSE_FILE = new File("src/test/resources/sasl-plaintext/docker-compose.yml");
+ private static final String KAFKA_SERVICE = "kafka";
+ private static final int SASL_PORT = 9092;
+
+ @Container
+ public DockerComposeContainer> container =
+ new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
+ .withExposedService(KAFKA_SERVICE, SASL_PORT, Wait.forListeningPort());
+
+ @Autowired
+ private KafkaProducer kafkaProducer;
+
+ @Autowired
+ private KafkaConsumer kafkaConsumer;
+
+ @Test
+ void givenSaslIsConfigured_whenProducerSendsMessageOverSasl_thenConsumerReceivesOverSasl() {
+ String message = UUID.randomUUID().toString();
+ kafkaProducer.sendMessage(message, TOPIC);
+
+ await().atMost(Duration.ofMinutes(2))
+ .untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
+ }
+}
diff --git a/spring-kafka-4/src/test/resources/sasl-plaintext/config/kafka_server_jaas.conf b/spring-kafka-4/src/test/resources/sasl-plaintext/config/kafka_server_jaas.conf
new file mode 100644
index 000000000000..7fc27888f851
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl-plaintext/config/kafka_server_jaas.conf
@@ -0,0 +1,13 @@
+KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin-secret"
+ user_admin="admin-secret"
+ user_user1="user1-secret";
+};
+
+Client {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="zookeeper"
+ password="zookeeper-secret";
+};
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/resources/sasl-plaintext/config/zookeeper_jaas.conf b/spring-kafka-4/src/test/resources/sasl-plaintext/config/zookeeper_jaas.conf
new file mode 100644
index 000000000000..566365981ca4
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl-plaintext/config/zookeeper_jaas.conf
@@ -0,0 +1,6 @@
+Server {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="zookeeper"
+ password="zookeeper-secret"
+ user_zookeeper="zookeeper-secret";
+};
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/resources/sasl-plaintext/docker-compose.yml b/spring-kafka-4/src/test/resources/sasl-plaintext/docker-compose.yml
new file mode 100644
index 000000000000..de5a380aed90
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl-plaintext/docker-compose.yml
@@ -0,0 +1,29 @@
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.6.6
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
+ volumes:
+ - ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
+ ports:
+ - 2181
+
+ kafka:
+ image: confluentinc/cp-kafka:7.6.6
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
+ KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
+ KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
+ KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ volumes:
+ - ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
+ ports:
+ - "9092:9092"
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/resources/sasl/Dockerfile b/spring-kafka-4/src/test/resources/sasl/Dockerfile
new file mode 100644
index 000000000000..b2955abbffb3
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl/Dockerfile
@@ -0,0 +1,15 @@
+# Use a minimal base image
+FROM debian:bullseye
+
+RUN apt-get update && \
+ apt-get install -y krb5-kdc krb5-admin-server krb5-user && \
+ rm -rf /var/lib/apt/lists/*
+
+COPY config/krb5.conf /etc/krb5.conf
+COPY setup_kdc.sh /setup_kdc.sh
+
+RUN chmod +x /setup_kdc.sh
+
+EXPOSE 88 749
+
+CMD ["/setup_kdc.sh"]
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/resources/sasl/config/kadm5.acl b/spring-kafka-4/src/test/resources/sasl/config/kadm5.acl
new file mode 100644
index 000000000000..db000b280c48
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl/config/kadm5.acl
@@ -0,0 +1 @@
+*/admin@BAELDUNG.COM *
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/resources/sasl/config/kafka_server_jaas.conf b/spring-kafka-4/src/test/resources/sasl/config/kafka_server_jaas.conf
new file mode 100644
index 000000000000..222de53e0013
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl/config/kafka_server_jaas.conf
@@ -0,0 +1,17 @@
+KafkaServer {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/etc/kafka/keytabs/kafka.keytab"
+ principal="kafka/localhost@BAELDUNG.COM"
+ serviceName="kafka";
+};
+
+Client {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/etc/kafka/keytabs/client.keytab"
+ principal="client@BAELDUNG.COM"
+ serviceName="kafka";
+};
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/resources/sasl/config/krb5.conf b/spring-kafka-4/src/test/resources/sasl/config/krb5.conf
new file mode 100644
index 000000000000..f37a9ac5080c
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl/config/krb5.conf
@@ -0,0 +1,17 @@
+[libdefaults]
+ default_realm = BAELDUNG.COM
+ dns_lookup_realm = false
+ dns_lookup_kdc = false
+ forwardable = true
+ rdns = true
+
+[realms]
+ BAELDUNG.COM = {
+ kdc = kdc
+ admin_server = kdc
+ }
+
+[logging]
+ default = FILE:/var/log/krb5libs.log
+ kdc = FILE:/var/log/krb5kdc.log
+ admin_server = FILE:/var/log/kadmind.log
diff --git a/spring-kafka-4/src/test/resources/sasl/config/zookeeper_jaas.conf b/spring-kafka-4/src/test/resources/sasl/config/zookeeper_jaas.conf
new file mode 100644
index 000000000000..5d5028e5cc94
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl/config/zookeeper_jaas.conf
@@ -0,0 +1,7 @@
+Server {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/etc/kafka/keytabs/zookeeper.keytab"
+ principal="zookeeper/zookeeper.sasl_default@BAELDUNG.COM";
+};
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/resources/sasl/docker-compose.yml b/spring-kafka-4/src/test/resources/sasl/docker-compose.yml
new file mode 100644
index 000000000000..2f084537c427
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl/docker-compose.yml
@@ -0,0 +1,50 @@
+services:
+
+ kdc:
+ build:
+ context: .
+ dockerfile: Dockerfile
+ volumes:
+ - ./config:/etc/krb5kdc
+ - ./keytabs:/etc/krb5kdc/keytabs
+ - ./config/krb5.conf:/etc/krb5.conf
+ ports:
+ - "88:88/udp"
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:latest
+ container_name: zookeeper
+ hostname: localhost
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
+ volumes:
+ - ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
+ - ./keytabs:/etc/kafka/keytabs
+ - ./config/krb5.conf:/etc/krb5.conf
+ ports:
+ - "2181:2181"
+
+ kafka:
+ image: confluentinc/cp-kafka:latest
+ container_name: kafka
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI
+ KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI
+ KAFKA_LISTENERS: SASL_PLAINTEXT://:9092
+ KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ volumes:
+ - ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
+ - ./keytabs:/etc/kafka/keytabs
+ - ./config/krb5.conf:/etc/krb5.conf
+ depends_on:
+ - zookeeper
+ - kdc
+ ports:
+ - 9092:9092
diff --git a/spring-kafka-4/src/test/resources/sasl/setup_kdc.sh b/spring-kafka-4/src/test/resources/sasl/setup_kdc.sh
new file mode 100644
index 000000000000..91bd12080cc3
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/sasl/setup_kdc.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+kdb5_util create -s -P masterpassword
+
+kadmin.local -q "addprinc -randkey kafka/localhost@BAELDUNG.COM"
+kadmin.local -q "addprinc -randkey zookeeper/zookeeper.sasl_default@BAELDUNG.COM"
+kadmin.local -q "addprinc -randkey client@BAELDUNG.COM"
+
+kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/kafka.keytab kafka/localhost@BAELDUNG.COM"
+kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/zookeeper.keytab zookeeper/zookeeper.sasl_default@BAELDUNG.COM"
+kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/client.keytab client@BAELDUNG.COM"
+
+krb5kdc
+kadmind -nofork
\ No newline at end of file