diff --git a/spring-kafka-4/pom.xml b/spring-kafka-4/pom.xml
index 94fbaff36b95..883cda9542a4 100644
--- a/spring-kafka-4/pom.xml
+++ b/spring-kafka-4/pom.xml
@@ -28,6 +28,16 @@
spring-kafka
+
+ org.apache.avro
+ avro
+ ${apache.avro.version}
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${kafka-avro-serializer.version}
+
org.springframework.boot
@@ -58,6 +68,48 @@
+
+ org.apache.avro
+ avro-maven-plugin
+ ${apache.avro.version}
+
+ String
+
+
+
+ generate-sources
+
+ schema
+
+
+ ${project.basedir}/src/main/resources/avro
+
+ *.avsc
+
+ ${project.build.directory}/generated-sources/avro
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ ${build-helper-maven-plugin.version}
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ ${project.build.directory}/generated-sources/avro
+
+
+
+
+
org.springframework.boot
spring-boot-maven-plugin
@@ -71,6 +123,16 @@
21
3.4.4
+ 1.12.0
+ 7.9.1
+ 3.2.0
+
+
+ confluent
+ https://packages.confluent.io/maven/
+
+
+
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/AvroMagicByteApp.java b/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/AvroMagicByteApp.java
new file mode 100644
index 000000000000..78795cb5fd4c
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/AvroMagicByteApp.java
@@ -0,0 +1,37 @@
+package com.baeldung.avro.deserialization.exception;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+
+import com.baeldung.avro.deserialization.exception.avro.Article;
+
+@SpringBootApplication
+class AvroMagicByteApp {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroMagicByteApp.class);
+
+ private final List blog = new ArrayList<>();
+
+ public static void main(String[] args) {
+ new SpringApplicationBuilder().sources(AvroMagicByteApp.class)
+ .profiles("avro-magic-byte")
+ .run(args);
+ }
+
+ @KafkaListener(topics = "baeldung.article.published")
+ public void listen(Article article) {
+ LOG.info("a new article was published: {}", article);
+ blog.add(article.getTitle());
+ }
+
+ public List getBlog() {
+ return blog;
+ }
+}
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/DlqConfig.java b/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/DlqConfig.java
new file mode 100644
index 000000000000..d97f86228da6
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/DlqConfig.java
@@ -0,0 +1,32 @@
+package com.baeldung.avro.deserialization.exception;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+
+@Configuration
+class DlqConfig {
+
+ @Bean
+ DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer dlqPublishingRecoverer) {
+ return new DefaultErrorHandler(dlqPublishingRecoverer);
+ }
+
+ @Bean
+ DeadLetterPublishingRecoverer dlqPublishingRecoverer(KafkaTemplate bytesKafkaTemplate) {
+ return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
+ }
+
+ @Bean("bytesKafkaTemplate")
+ KafkaTemplate, ?> bytesTemplate(ProducerFactory, ?> kafkaProducerFactory) {
+ return new KafkaTemplate<>(kafkaProducerFactory, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()));
+ }
+
+}
diff --git a/spring-kafka-4/src/main/resources/application-avro-magic-byte.yml b/spring-kafka-4/src/main/resources/application-avro-magic-byte.yml
new file mode 100644
index 000000000000..ce5d473ef517
--- /dev/null
+++ b/spring-kafka-4/src/main/resources/application-avro-magic-byte.yml
@@ -0,0 +1,13 @@
+
+spring:
+ kafka:
+# bootstrap-servers <-- it'll be injected in test via Testcontainers and @ServiceConnection
+ consumer:
+ group-id: test-group
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ properties:
+ spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
+ schema.registry.url: mock://test
+ specific.avro.reader: true
diff --git a/spring-kafka-4/src/main/resources/avro/Article.avsc b/spring-kafka-4/src/main/resources/avro/Article.avsc
new file mode 100644
index 000000000000..3767f3f4648d
--- /dev/null
+++ b/spring-kafka-4/src/main/resources/avro/Article.avsc
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "Article",
+ "namespace": "com.baeldung.avro.deserialization.exception.avro",
+ "fields": [
+ { "name": "title", "type": "string" },
+ { "name": "author", "type": "string" },
+ { "name": "tags", "type": { "type": "array", "items": "string" } }
+ ]
+}
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/java/com/baeldung/avro/deserialization/exception/AvroMagicByteLiveTest.java b/spring-kafka-4/src/test/java/com/baeldung/avro/deserialization/exception/AvroMagicByteLiveTest.java
new file mode 100644
index 000000000000..bd1d9bff14e0
--- /dev/null
+++ b/spring-kafka-4/src/test/java/com/baeldung/avro/deserialization/exception/AvroMagicByteLiveTest.java
@@ -0,0 +1,83 @@
+package com.baeldung.avro.deserialization.exception;
+
+import static java.time.Duration.ofSeconds;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.utils.KafkaTestUtils;
+import org.springframework.test.context.ActiveProfiles;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.kafka.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import com.baeldung.avro.deserialization.exception.avro.Article;
+
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+
+@SpringBootTest
+@ActiveProfiles("avro-magic-byte")
+class AvroMagicByteLiveTest {
+
+ @Container
+ @ServiceConnection
+ static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:4.0.0"));
+
+ @Autowired
+ private AvroMagicByteApp listener;
+
+ @Test
+ void whenSendingCorrectArticle_thenItsAddedToTheBlog() throws Exception {
+ avroKafkaTemplate().send("baeldung.article.published", aTestArticle("Avro Magic Byte"))
+ .get();
+
+ await().untilAsserted(() -> assertThat(listener.getBlog()).containsExactly("Avro Magic Byte"));
+ }
+
+ @Test
+ void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
+ stringKafkaTemplate().send("baeldung.article.published", "not a valid avro message!")
+ .get();
+
+ var dlqRecord = listenForOneMessage("baeldung.article.published-dlt", ofSeconds(5L));
+
+ assertThat(dlqRecord.value()).isEqualTo("not a valid avro message!");
+ }
+
+ private static KafkaTemplate