diff --git a/spring-kafka-4/pom.xml b/spring-kafka-4/pom.xml index 94fbaff36b95..883cda9542a4 100644 --- a/spring-kafka-4/pom.xml +++ b/spring-kafka-4/pom.xml @@ -28,6 +28,16 @@ spring-kafka + + org.apache.avro + avro + ${apache.avro.version} + + + io.confluent + kafka-avro-serializer + ${kafka-avro-serializer.version} + org.springframework.boot @@ -58,6 +68,48 @@ + + org.apache.avro + avro-maven-plugin + ${apache.avro.version} + + String + + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro + + *.avsc + + ${project.build.directory}/generated-sources/avro + + + + + + org.codehaus.mojo + build-helper-maven-plugin + ${build-helper-maven-plugin.version} + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/avro + + + + + org.springframework.boot spring-boot-maven-plugin @@ -71,6 +123,16 @@ 21 3.4.4 + 1.12.0 + 7.9.1 + 3.2.0 + + + confluent + https://packages.confluent.io/maven/ + + + \ No newline at end of file diff --git a/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/AvroMagicByteApp.java b/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/AvroMagicByteApp.java new file mode 100644 index 000000000000..78795cb5fd4c --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/AvroMagicByteApp.java @@ -0,0 +1,37 @@ +package com.baeldung.avro.deserialization.exception; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; + +import com.baeldung.avro.deserialization.exception.avro.Article; + +@SpringBootApplication +class AvroMagicByteApp { + + private static final Logger LOG = LoggerFactory.getLogger(AvroMagicByteApp.class); + + private final List blog = new ArrayList<>(); + + public static void main(String[] args) { + new SpringApplicationBuilder().sources(AvroMagicByteApp.class) + .profiles("avro-magic-byte") + .run(args); + } + + @KafkaListener(topics = "baeldung.article.published") + public void listen(Article article) { + LOG.info("a new article was published: {}", article); + blog.add(article.getTitle()); + } + + public List getBlog() { + return blog; + } +} \ No newline at end of file diff --git a/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/DlqConfig.java b/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/DlqConfig.java new file mode 100644 index 000000000000..d97f86228da6 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/avro/deserialization/exception/DlqConfig.java @@ -0,0 +1,32 @@ +package com.baeldung.avro.deserialization.exception; + +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; + +@Configuration +class DlqConfig { + + @Bean + DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer dlqPublishingRecoverer) { + return new DefaultErrorHandler(dlqPublishingRecoverer); + } + + @Bean + DeadLetterPublishingRecoverer dlqPublishingRecoverer(KafkaTemplate bytesKafkaTemplate) { + return new DeadLetterPublishingRecoverer(bytesKafkaTemplate); + } + + @Bean("bytesKafkaTemplate") + KafkaTemplate bytesTemplate(ProducerFactory kafkaProducerFactory) { + return new KafkaTemplate<>(kafkaProducerFactory, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName())); + } + +} diff --git a/spring-kafka-4/src/main/resources/application-avro-magic-byte.yml b/spring-kafka-4/src/main/resources/application-avro-magic-byte.yml new file mode 100644 index 000000000000..ce5d473ef517 --- /dev/null +++ b/spring-kafka-4/src/main/resources/application-avro-magic-byte.yml @@ -0,0 +1,13 @@ + +spring: + kafka: +# bootstrap-servers <-- it'll be injected in test via Testcontainers and @ServiceConnection + consumer: + group-id: test-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + properties: + spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer + schema.registry.url: mock://test + specific.avro.reader: true diff --git a/spring-kafka-4/src/main/resources/avro/Article.avsc b/spring-kafka-4/src/main/resources/avro/Article.avsc new file mode 100644 index 000000000000..3767f3f4648d --- /dev/null +++ b/spring-kafka-4/src/main/resources/avro/Article.avsc @@ -0,0 +1,10 @@ +{ + "type": "record", + "name": "Article", + "namespace": "com.baeldung.avro.deserialization.exception.avro", + "fields": [ + { "name": "title", "type": "string" }, + { "name": "author", "type": "string" }, + { "name": "tags", "type": { "type": "array", "items": "string" } } + ] +} \ No newline at end of file diff --git a/spring-kafka-4/src/test/java/com/baeldung/avro/deserialization/exception/AvroMagicByteLiveTest.java b/spring-kafka-4/src/test/java/com/baeldung/avro/deserialization/exception/AvroMagicByteLiveTest.java new file mode 100644 index 000000000000..bd1d9bff14e0 --- /dev/null +++ b/spring-kafka-4/src/test/java/com/baeldung/avro/deserialization/exception/AvroMagicByteLiveTest.java @@ -0,0 +1,83 @@ +package com.baeldung.avro.deserialization.exception; + +import static java.time.Duration.ofSeconds; +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.context.ActiveProfiles; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.kafka.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import com.baeldung.avro.deserialization.exception.avro.Article; + +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +@SpringBootTest +@ActiveProfiles("avro-magic-byte") +class AvroMagicByteLiveTest { + + @Container + @ServiceConnection + static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:4.0.0")); + + @Autowired + private AvroMagicByteApp listener; + + @Test + void whenSendingCorrectArticle_thenItsAddedToTheBlog() throws Exception { + avroKafkaTemplate().send("baeldung.article.published", aTestArticle("Avro Magic Byte")) + .get(); + + await().untilAsserted(() -> assertThat(listener.getBlog()).containsExactly("Avro Magic Byte")); + } + + @Test + void whenSendingMalformedMessage_thenSendToDLQ() throws Exception { + stringKafkaTemplate().send("baeldung.article.published", "not a valid avro message!") + .get(); + + var dlqRecord = listenForOneMessage("baeldung.article.published-dlt", ofSeconds(5L)); + + assertThat(dlqRecord.value()).isEqualTo("not a valid avro message!"); + } + + private static KafkaTemplate avroKafkaTemplate() { + return new KafkaTemplate<>(kafkaProducerFactory()); + } + + private static KafkaTemplate stringKafkaTemplate() { + return new KafkaTemplate<>(kafkaProducerFactory(), Map.of(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + } + + private static DefaultKafkaProducerFactory kafkaProducerFactory() { + return new DefaultKafkaProducerFactory<>( + Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName(), "schema.registry.url", "mock://test")); + } + + private static ConsumerRecord listenForOneMessage(String topic, Duration timeout) { + return KafkaTestUtils.getOneRecord(kafka.getBootstrapServers(), "test-group-id", topic, 0, false, true, timeout); + } + + private static Article aTestArticle(String title) { + return new Article(title, "John Doe", List.of("avro", "kafka", "spring")); + } + +}