diff --git a/spring-kafka-4/pom.xml b/spring-kafka-4/pom.xml index 94fbaff36b95..273079d421ba 100644 --- a/spring-kafka-4/pom.xml +++ b/spring-kafka-4/pom.xml @@ -27,6 +27,10 @@ org.springframework.kafka spring-kafka + + org.springframework.boot + spring-boot-starter-actuator + diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentAddedEvent.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentAddedEvent.java new file mode 100644 index 000000000000..831ebb3b763a --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentAddedEvent.java @@ -0,0 +1,5 @@ +package com.baeldung.kafka.monitoring; + +public record ArticleCommentAddedEvent(String articleSlug, String articleAuthor, String comment, String commentAuthor) { + +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsListener.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsListener.java new file mode 100644 index 000000000000..927a9cfc978e --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsListener.java @@ -0,0 +1,23 @@ +package com.baeldung.kafka.monitoring; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +public class ArticleCommentsListener { + + private static final Logger log = LoggerFactory.getLogger(ArticleCommentsRestController.class); + + @KafkaListener( + topics = "baeldung.article-comment.added", + containerFactory = "customKafkaListenerContainerFactory" + ) + public void onArticleComment(ArticleCommentAddedEvent event, @Header("traceparent") String traceParent) { + log.info("Kafka Message Received: Comment Added: " + event); + // some logic here... + } + +} \ No newline at end of file diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsRestController.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsRestController.java new file mode 100644 index 000000000000..131529ccb9e4 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsRestController.java @@ -0,0 +1,48 @@ +package com.baeldung.kafka.monitoring; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api") +public class ArticleCommentsRestController { + + private static final Logger log = LoggerFactory.getLogger(ArticleCommentsRestController.class); + + private final KafkaTemplate articleCommentsKafkaTemplate; + + public ArticleCommentsRestController( + @Qualifier("articleCommentsKafkaTemplate") KafkaTemplate articleCommentsKafkaTemplate) { + this.articleCommentsKafkaTemplate = articleCommentsKafkaTemplate; + } + + @PostMapping("/articles/{articleSlug}/comments") + Response addArticleComment( + @PathVariable("articleSlug") String articleSlug, + @RequestBody ArticleCommentAddedDto dto + ) { + + log.info("HTTP Request received to save article comment: " + dto); + // some logic here (eg: save to DB) + + var event = new ArticleCommentAddedEvent(articleSlug, dto.articleAuthor(), dto.comment(), dto.commentAuthor()); + articleCommentsKafkaTemplate.send("baeldung.article-comment.added", articleSlug, event); + + return new Response("Success", articleSlug); + } + + record Response(String status, String articleSlug) { + + } + + record ArticleCommentAddedDto(String articleAuthor, String comment, String commentAuthor) { + + } +} \ No newline at end of file diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaConfig.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaConfig.java new file mode 100644 index 000000000000..ee45d8055e45 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaConfig.java @@ -0,0 +1,69 @@ +package com.baeldung.kafka.monitoring; + +import static java.util.Collections.singletonList; + +import java.util.Map; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.MicrometerConsumerListener; +import org.springframework.kafka.core.MicrometerProducerListener; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +import io.micrometer.core.instrument.ImmutableTag; +import io.micrometer.core.instrument.MeterRegistry; + +@Configuration +class KafkaConfig { + + @Bean + ConsumerFactory consumerFactory(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) { + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); + cf.addListener(new MicrometerConsumerListener<>(meterRegistry, singletonList(new ImmutableTag("app-name", "article-comments-app")))); + return cf; + } + + @Bean + ProducerFactory producerFactory(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) { + ProducerFactory pf = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()); + pf.addListener(new MicrometerProducerListener<>(meterRegistry, singletonList(new ImmutableTag("app-name", "article-comments-app")))); + return pf; + } + + @Bean + @Qualifier("articleCommentsKafkaTemplate") + KafkaTemplate articleCommentsKafkaTemplate(ProducerFactory producerFactory) { + var template = new KafkaTemplate<>(producerFactory); + + template.setObservationEnabled(true); + template.setMicrometerTags(Map.of("topic", "baeldung.article-comment.added")); + template.setMicrometerTagsProvider(record -> Map.of("article-slug", record.key() + .toString())); + + return template; + } + + @Bean + ConcurrentKafkaListenerContainerFactory customKafkaListenerContainerFactory(ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + + ContainerProperties containerProps = factory.getContainerProperties(); + containerProps.setObservationEnabled(true); + containerProps.setMicrometerTags(Map.of("app-name", "article-comments-app")); + containerProps.setMicrometerTagsProvider(record -> Map.of("article-slug", record.key() + .toString())); + + return factory; + } + +} \ No newline at end of file diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaMonitoringApplication.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaMonitoringApplication.java new file mode 100644 index 000000000000..949145608b75 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaMonitoringApplication.java @@ -0,0 +1,16 @@ +package com.baeldung.kafka.monitoring; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; + +@SpringBootApplication +class KafkaMonitoringApplication { + + public static void main(String[] args) { + new SpringApplicationBuilder() + .profiles("monitoring") + .sources(KafkaMonitoringApplication.class) + .run(args); + } + +} \ No newline at end of file diff --git a/spring-kafka-4/src/main/resources/application-monitoring.yml b/spring-kafka-4/src/main/resources/application-monitoring.yml new file mode 100644 index 000000000000..d2fa817ca710 --- /dev/null +++ b/spring-kafka-4/src/main/resources/application-monitoring.yml @@ -0,0 +1,22 @@ + +management: + endpoints.web.exposure.include: '*' + endpoint.health.show-details: always + +spring: + application: + name: kafka-monitoring + kafka: + bootstrap-servers: localhost:9092 + consumer: + group-id: baeldung-app-1 + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.value.default.type: com.baeldung.kafka.monitoring.ArticleCommentAddedEvent + spring.json.trusted.packages: com.baeldung.kafka.monitoring + + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer diff --git a/spring-kafka-4/src/main/resources/application.properties b/spring-kafka-4/src/main/resources/application.properties deleted file mode 100644 index 8758a8d44e77..000000000000 --- a/spring-kafka-4/src/main/resources/application.properties +++ /dev/null @@ -1,3 +0,0 @@ -server.port=8081 -spring.kafka.bootstrap-servers=localhost:9092 -long.message.topic.name=longMessage \ No newline at end of file diff --git a/spring-kafka-4/src/main/resources/application.yml b/spring-kafka-4/src/main/resources/application.yml new file mode 100644 index 000000000000..25b5eda0e58d --- /dev/null +++ b/spring-kafka-4/src/main/resources/application.yml @@ -0,0 +1,3 @@ +server.port: 8081 +spring.kafka.bootstrap-servers: localhost:9092 +long.message.topic.name: longMessage \ No newline at end of file diff --git a/spring-kafka-4/src/main/resources/logback.xml b/spring-kafka-4/src/main/resources/logback.xml index 7d900d8ea884..ec465592c318 100644 --- a/spring-kafka-4/src/main/resources/logback.xml +++ b/spring-kafka-4/src/main/resources/logback.xml @@ -2,7 +2,7 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + %d{HH:mm:ss.SSS} [%thread] %X{traceId:-}-%X{spanId:-} %-5level %logger{36} - %msg%n diff --git a/spring-kafka-4/src/test/resources/post_article_comment.cmd b/spring-kafka-4/src/test/resources/post_article_comment.cmd new file mode 100644 index 000000000000..9df1df232c0a --- /dev/null +++ b/spring-kafka-4/src/test/resources/post_article_comment.cmd @@ -0,0 +1,3 @@ +curl --location "http://localhost:8081/api/articles/oop-best-practices/comments" ^ +--header "Content-Type: application/json" ^ +--data "{\"articleAuthor\": \"Andrey the Author\", \"comment\": \"Great article!\", \"commentAuthor\": \"Richard the Reader\"}"