这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.baeldung.kafkastreams;

import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class CustomProcessingExceptionHandler implements ProcessingExceptionHandler {

private static final Logger log = LoggerFactory.getLogger(CustomProcessingExceptionHandler.class);

@Override
public ProcessingHandlerResponse handle(ErrorHandlerContext errorHandlerContext, Record<?, ?> record, Exception ex) {
log.error("ProcessingExceptionHandler Error for record NodeId: {} | TaskId: {} | Key: {} | Value: {} | Exception: {}",
errorHandlerContext.processorNodeId(), errorHandlerContext.taskId(), record.key(), record.value(), ex.getMessage(), ex);

return ProcessingHandlerResponse.CONTINUE;
}

@Override
public void configure(Map<String, ?> configs) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.baeldung.kafkastreams;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

private static final Logger log = LoggerFactory.getLogger(UserSerializer.class);

@Override
public ProductionExceptionHandlerResponse handle(ErrorHandlerContext context, ProducerRecord<byte[], byte[]> record, Exception exception) {
log.error("ProductionExceptionHandler Error producing record NodeId: {} | TaskId: {} | Topic: {} | Partition: {} | Exception: {}",
context.processorNodeId(), context.taskId(), record.topic(), record.partition(), exception.getMessage(), exception);

return ProductionExceptionHandlerResponse.CONTINUE;
}

@Override
public void configure(Map<String, ?> configs) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.baeldung.kafkastreams;

import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamExceptionHandler implements StreamsUncaughtExceptionHandler {

private static final Logger log = LoggerFactory.getLogger(StreamExceptionHandler.class);

@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
log.error("Stream encountered fatal exception: {}", exception.getMessage(), exception);
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.baeldung.kafkastreams;

public record User(String id, String name, String country) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.baeldung.kafkastreams;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class UserDeserializer implements Deserializer<User> {
private static final Logger log = LoggerFactory.getLogger(UserDeserializer.class);
private final ObjectMapper mapper = new ObjectMapper();

@Override
public User deserialize(String topic, byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
try {
return mapper.readValue(bytes, User.class);
} catch (IOException ex) {
log.error("Error deserializing the message {} for topic {} error message {}", bytes, topic, ex.getMessage(), ex);
throw new RuntimeException(ex);
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.baeldung.kafkastreams;

import org.apache.kafka.common.serialization.Serdes;

public class UserSerde extends Serdes.WrapperSerde<User> {
public UserSerde() {
super(new UserSerializer(), new UserDeserializer());
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.baeldung.kafkastreams;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserSerializer implements Serializer<User> {
private static final Logger log = LoggerFactory.getLogger(UserSerializer.class);
private final ObjectMapper mapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, User user) {
if (user == null) {
return null;
}

try {
return mapper.writeValueAsBytes(user);
} catch (JsonProcessingException ex) {
log.error("Error deserializing the user {} with exception {}", user, ex.getMessage(), ex);
throw new RuntimeException(ex);
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.baeldung.kafkastreams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.Properties;
import java.util.UUID;

public class UserStreamService {
private static final Logger log = LoggerFactory.getLogger(UserStreamService.class);
private KafkaStreams kafkaStreams;

public void start(String bootstrapServer) {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, User> userStream = builder.stream(
"user-topic",
Consumed.with(Serdes.String(), new UserSerde())
);

KTable<String, Long> usersPerCountry = userStream
.filter((key, user) ->
Objects.nonNull(user) && user.country() != null && !user.country().isEmpty())
.groupBy((key, user) -> user.country(), Grouped.with(Serdes.String(),
new UserSerde()))
.count(Materialized.as("users_per_country_store"));

usersPerCountry.toStream()
.peek((country, count) -> log.info("Aggregated for country {} with count {}",country, count))
.to("users_per_country", Produced.with(Serdes.String(), Serdes.Long()));

Properties props = getStreamProperties(bootstrapServer);
kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setUncaughtExceptionHandler(new StreamExceptionHandler());

Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

kafkaStreams.start();
}

public void stop() {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}

public void cleanUp() {
if (kafkaStreams != null) {
kafkaStreams.cleanUp();
}
}

private static Properties getStreamProperties(String bootstrapServer) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-country-aggregator" + UUID.randomUUID());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProcessingExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);

return props;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void shouldTestKafkaStreams() throws InterruptedException {
.getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Use a temporary directory for storing state, which will be automatically removed after the test.
try {
Path stateDirectory = Files.createTempDirectory("kafka-streams");
Expand Down Expand Up @@ -75,4 +75,4 @@ public void shouldTestKafkaStreams() throws InterruptedException {
Thread.sleep(30000);
streams.close();
}
}
}
Loading