diff --git a/spring-kafka-4/pom.xml b/spring-kafka-4/pom.xml
index 94fbaff36b95..273079d421ba 100644
--- a/spring-kafka-4/pom.xml
+++ b/spring-kafka-4/pom.xml
@@ -27,6 +27,10 @@
org.springframework.kafka
spring-kafka
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentAddedEvent.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentAddedEvent.java
new file mode 100644
index 000000000000..831ebb3b763a
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentAddedEvent.java
@@ -0,0 +1,5 @@
+package com.baeldung.kafka.monitoring;
+
+public record ArticleCommentAddedEvent(String articleSlug, String articleAuthor, String comment, String commentAuthor) {
+
+}
diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsListener.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsListener.java
new file mode 100644
index 000000000000..927a9cfc978e
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsListener.java
@@ -0,0 +1,23 @@
+package com.baeldung.kafka.monitoring;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ArticleCommentsListener {
+
+ private static final Logger log = LoggerFactory.getLogger(ArticleCommentsRestController.class);
+
+ @KafkaListener(
+ topics = "baeldung.article-comment.added",
+ containerFactory = "customKafkaListenerContainerFactory"
+ )
+ public void onArticleComment(ArticleCommentAddedEvent event, @Header("traceparent") String traceParent) {
+ log.info("Kafka Message Received: Comment Added: " + event);
+ // some logic here...
+ }
+
+}
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsRestController.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsRestController.java
new file mode 100644
index 000000000000..131529ccb9e4
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/ArticleCommentsRestController.java
@@ -0,0 +1,48 @@
+package com.baeldung.kafka.monitoring;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api")
+public class ArticleCommentsRestController {
+
+ private static final Logger log = LoggerFactory.getLogger(ArticleCommentsRestController.class);
+
+ private final KafkaTemplate articleCommentsKafkaTemplate;
+
+ public ArticleCommentsRestController(
+ @Qualifier("articleCommentsKafkaTemplate") KafkaTemplate articleCommentsKafkaTemplate) {
+ this.articleCommentsKafkaTemplate = articleCommentsKafkaTemplate;
+ }
+
+ @PostMapping("/articles/{articleSlug}/comments")
+ Response addArticleComment(
+ @PathVariable("articleSlug") String articleSlug,
+ @RequestBody ArticleCommentAddedDto dto
+ ) {
+
+ log.info("HTTP Request received to save article comment: " + dto);
+ // some logic here (eg: save to DB)
+
+ var event = new ArticleCommentAddedEvent(articleSlug, dto.articleAuthor(), dto.comment(), dto.commentAuthor());
+ articleCommentsKafkaTemplate.send("baeldung.article-comment.added", articleSlug, event);
+
+ return new Response("Success", articleSlug);
+ }
+
+ record Response(String status, String articleSlug) {
+
+ }
+
+ record ArticleCommentAddedDto(String articleAuthor, String comment, String commentAuthor) {
+
+ }
+}
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaConfig.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaConfig.java
new file mode 100644
index 000000000000..ee45d8055e45
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaConfig.java
@@ -0,0 +1,69 @@
+package com.baeldung.kafka.monitoring;
+
+import static java.util.Collections.singletonList;
+
+import java.util.Map;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.MicrometerConsumerListener;
+import org.springframework.kafka.core.MicrometerProducerListener;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+
+import io.micrometer.core.instrument.ImmutableTag;
+import io.micrometer.core.instrument.MeterRegistry;
+
+@Configuration
+class KafkaConfig {
+
+ @Bean
+ ConsumerFactory consumerFactory(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
+ DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
+ cf.addListener(new MicrometerConsumerListener<>(meterRegistry, singletonList(new ImmutableTag("app-name", "article-comments-app"))));
+ return cf;
+ }
+
+ @Bean
+ ProducerFactory producerFactory(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
+ ProducerFactory pf = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
+ pf.addListener(new MicrometerProducerListener<>(meterRegistry, singletonList(new ImmutableTag("app-name", "article-comments-app"))));
+ return pf;
+ }
+
+ @Bean
+ @Qualifier("articleCommentsKafkaTemplate")
+ KafkaTemplate articleCommentsKafkaTemplate(ProducerFactory producerFactory) {
+ var template = new KafkaTemplate<>(producerFactory);
+
+ template.setObservationEnabled(true);
+ template.setMicrometerTags(Map.of("topic", "baeldung.article-comment.added"));
+ template.setMicrometerTagsProvider(record -> Map.of("article-slug", record.key()
+ .toString()));
+
+ return template;
+ }
+
+ @Bean
+ ConcurrentKafkaListenerContainerFactory customKafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
+
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory);
+
+ ContainerProperties containerProps = factory.getContainerProperties();
+ containerProps.setObservationEnabled(true);
+ containerProps.setMicrometerTags(Map.of("app-name", "article-comments-app"));
+ containerProps.setMicrometerTagsProvider(record -> Map.of("article-slug", record.key()
+ .toString()));
+
+ return factory;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaMonitoringApplication.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaMonitoringApplication.java
new file mode 100644
index 000000000000..949145608b75
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/monitoring/KafkaMonitoringApplication.java
@@ -0,0 +1,16 @@
+package com.baeldung.kafka.monitoring;
+
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+
+@SpringBootApplication
+class KafkaMonitoringApplication {
+
+ public static void main(String[] args) {
+ new SpringApplicationBuilder()
+ .profiles("monitoring")
+ .sources(KafkaMonitoringApplication.class)
+ .run(args);
+ }
+
+}
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/resources/application-monitoring.yml b/spring-kafka-4/src/main/resources/application-monitoring.yml
new file mode 100644
index 000000000000..d2fa817ca710
--- /dev/null
+++ b/spring-kafka-4/src/main/resources/application-monitoring.yml
@@ -0,0 +1,22 @@
+
+management:
+ endpoints.web.exposure.include: '*'
+ endpoint.health.show-details: always
+
+spring:
+ application:
+ name: kafka-monitoring
+ kafka:
+ bootstrap-servers: localhost:9092
+ consumer:
+ group-id: baeldung-app-1
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ properties:
+ spring.json.value.default.type: com.baeldung.kafka.monitoring.ArticleCommentAddedEvent
+ spring.json.trusted.packages: com.baeldung.kafka.monitoring
+
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
diff --git a/spring-kafka-4/src/main/resources/application.properties b/spring-kafka-4/src/main/resources/application.properties
deleted file mode 100644
index 8758a8d44e77..000000000000
--- a/spring-kafka-4/src/main/resources/application.properties
+++ /dev/null
@@ -1,3 +0,0 @@
-server.port=8081
-spring.kafka.bootstrap-servers=localhost:9092
-long.message.topic.name=longMessage
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/resources/application.yml b/spring-kafka-4/src/main/resources/application.yml
new file mode 100644
index 000000000000..25b5eda0e58d
--- /dev/null
+++ b/spring-kafka-4/src/main/resources/application.yml
@@ -0,0 +1,3 @@
+server.port: 8081
+spring.kafka.bootstrap-servers: localhost:9092
+long.message.topic.name: longMessage
\ No newline at end of file
diff --git a/spring-kafka-4/src/main/resources/logback.xml b/spring-kafka-4/src/main/resources/logback.xml
index 7d900d8ea884..ec465592c318 100644
--- a/spring-kafka-4/src/main/resources/logback.xml
+++ b/spring-kafka-4/src/main/resources/logback.xml
@@ -2,7 +2,7 @@
- %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+ %d{HH:mm:ss.SSS} [%thread] %X{traceId:-}-%X{spanId:-} %-5level %logger{36} - %msg%n
diff --git a/spring-kafka-4/src/test/resources/post_article_comment.cmd b/spring-kafka-4/src/test/resources/post_article_comment.cmd
new file mode 100644
index 000000000000..9df1df232c0a
--- /dev/null
+++ b/spring-kafka-4/src/test/resources/post_article_comment.cmd
@@ -0,0 +1,3 @@
+curl --location "http://localhost:8081/api/articles/oop-best-practices/comments" ^
+--header "Content-Type: application/json" ^
+--data "{\"articleAuthor\": \"Andrey the Author\", \"comment\": \"Great article!\", \"commentAuthor\": \"Richard the Reader\"}"