这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions spring-kafka-4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.baeldung.kafka.monitoring;

public record ArticleCommentAddedEvent(String articleSlug, String articleAuthor, String comment, String commentAuthor) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.baeldung.kafka.monitoring;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class ArticleCommentsListener {

private static final Logger log = LoggerFactory.getLogger(ArticleCommentsRestController.class);

@KafkaListener(
topics = "baeldung.article-comment.added",
containerFactory = "customKafkaListenerContainerFactory"
)
public void onArticleComment(ArticleCommentAddedEvent event, @Header("traceparent") String traceParent) {
log.info("Kafka Message Received: Comment Added: " + event);
// some logic here...
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.baeldung.kafka.monitoring;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api")
public class ArticleCommentsRestController {

private static final Logger log = LoggerFactory.getLogger(ArticleCommentsRestController.class);

private final KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate;

public ArticleCommentsRestController(
@Qualifier("articleCommentsKafkaTemplate") KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate) {
this.articleCommentsKafkaTemplate = articleCommentsKafkaTemplate;
}

@PostMapping("/articles/{articleSlug}/comments")
Response addArticleComment(
@PathVariable("articleSlug") String articleSlug,
@RequestBody ArticleCommentAddedDto dto
) {

log.info("HTTP Request received to save article comment: " + dto);
// some logic here (eg: save to DB)

var event = new ArticleCommentAddedEvent(articleSlug, dto.articleAuthor(), dto.comment(), dto.commentAuthor());
articleCommentsKafkaTemplate.send("baeldung.article-comment.added", articleSlug, event);

return new Response("Success", articleSlug);
}

record Response(String status, String articleSlug) {

}

record ArticleCommentAddedDto(String articleAuthor, String comment, String commentAuthor) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.baeldung.kafka.monitoring;

import static java.util.Collections.singletonList;

import java.util.Map;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.MicrometerConsumerListener;
import org.springframework.kafka.core.MicrometerProducerListener;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.MeterRegistry;

@Configuration
class KafkaConfig {

@Bean
ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
cf.addListener(new MicrometerConsumerListener<>(meterRegistry, singletonList(new ImmutableTag("app-name", "article-comments-app"))));
return cf;
}

@Bean
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
ProducerFactory<String, ArticleCommentAddedEvent> pf = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
pf.addListener(new MicrometerProducerListener<>(meterRegistry, singletonList(new ImmutableTag("app-name", "article-comments-app"))));
return pf;
}

@Bean
@Qualifier("articleCommentsKafkaTemplate")
KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate(ProducerFactory<String, ArticleCommentAddedEvent> producerFactory) {
var template = new KafkaTemplate<>(producerFactory);

template.setObservationEnabled(true);
template.setMicrometerTags(Map.of("topic", "baeldung.article-comment.added"));
template.setMicrometerTagsProvider(record -> Map.of("article-slug", record.key()
.toString()));

return template;
}

@Bean
ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);

ContainerProperties containerProps = factory.getContainerProperties();
containerProps.setObservationEnabled(true);
containerProps.setMicrometerTags(Map.of("app-name", "article-comments-app"));
containerProps.setMicrometerTagsProvider(record -> Map.of("article-slug", record.key()
.toString()));

return factory;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.baeldung.kafka.monitoring;

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;

@SpringBootApplication
class KafkaMonitoringApplication {

public static void main(String[] args) {
new SpringApplicationBuilder()
.profiles("monitoring")
.sources(KafkaMonitoringApplication.class)
.run(args);
}

}
22 changes: 22 additions & 0 deletions spring-kafka-4/src/main/resources/application-monitoring.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

management:
endpoints.web.exposure.include: '*'
endpoint.health.show-details: always

spring:
application:
name: kafka-monitoring
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: baeldung-app-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.value.default.type: com.baeldung.kafka.monitoring.ArticleCommentAddedEvent
spring.json.trusted.packages: com.baeldung.kafka.monitoring

producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
3 changes: 0 additions & 3 deletions spring-kafka-4/src/main/resources/application.properties

This file was deleted.

3 changes: 3 additions & 0 deletions spring-kafka-4/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
server.port: 8081
spring.kafka.bootstrap-servers: localhost:9092
long.message.topic.name: longMessage
2 changes: 1 addition & 1 deletion spring-kafka-4/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
<pattern>%d{HH:mm:ss.SSS} [%thread] %X{traceId:-}-%X{spanId:-} %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
Expand Down
3 changes: 3 additions & 0 deletions spring-kafka-4/src/test/resources/post_article_comment.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
curl --location "http://localhost:8081/api/articles/oop-best-practices/comments" ^
--header "Content-Type: application/json" ^
--data "{\"articleAuthor\": \"Andrey the Author\", \"comment\": \"Great article!\", \"commentAuthor\": \"Richard the Reader\"}"