diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/CustomProcessingExceptionHandler.java b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/CustomProcessingExceptionHandler.java new file mode 100644 index 000000000000..c86896302120 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/CustomProcessingExceptionHandler.java @@ -0,0 +1,26 @@ +package com.baeldung.kafkastreams; + +import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; +import org.apache.kafka.streams.processor.api.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class CustomProcessingExceptionHandler implements ProcessingExceptionHandler { + + private static final Logger log = LoggerFactory.getLogger(CustomProcessingExceptionHandler.class); + + @Override + public ProcessingHandlerResponse handle(ErrorHandlerContext errorHandlerContext, Record record, Exception ex) { + log.error("ProcessingExceptionHandler Error for record NodeId: {} | TaskId: {} | Key: {} | Value: {} | Exception: {}", + errorHandlerContext.processorNodeId(), errorHandlerContext.taskId(), record.key(), record.value(), ex.getMessage(), ex); + + return ProcessingHandlerResponse.CONTINUE; + } + + @Override + public void configure(Map configs) { + } +} \ No newline at end of file diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/CustomProductionExceptionHandler.java b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/CustomProductionExceptionHandler.java new file mode 100644 index 000000000000..2f48bf8e7a11 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/CustomProductionExceptionHandler.java @@ -0,0 +1,26 @@ +package com.baeldung.kafkastreams; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class CustomProductionExceptionHandler implements ProductionExceptionHandler { + + private static final Logger log = LoggerFactory.getLogger(UserSerializer.class); + + @Override + public ProductionExceptionHandlerResponse handle(ErrorHandlerContext context, ProducerRecord record, Exception exception) { + log.error("ProductionExceptionHandler Error producing record NodeId: {} | TaskId: {} | Topic: {} | Partition: {} | Exception: {}", + context.processorNodeId(), context.taskId(), record.topic(), record.partition(), exception.getMessage(), exception); + + return ProductionExceptionHandlerResponse.CONTINUE; + } + + @Override + public void configure(Map configs) { + } +} \ No newline at end of file diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/StreamExceptionHandler.java b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/StreamExceptionHandler.java new file mode 100644 index 000000000000..3be70538ebe7 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/StreamExceptionHandler.java @@ -0,0 +1,16 @@ +package com.baeldung.kafkastreams; + +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StreamExceptionHandler implements StreamsUncaughtExceptionHandler { + + private static final Logger log = LoggerFactory.getLogger(StreamExceptionHandler.class); + + @Override + public StreamThreadExceptionResponse handle(Throwable exception) { + log.error("Stream encountered fatal exception: {}", exception.getMessage(), exception); + return StreamThreadExceptionResponse.REPLACE_THREAD; + } +} diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/User.java b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/User.java new file mode 100644 index 000000000000..023634a12343 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/User.java @@ -0,0 +1,4 @@ +package com.baeldung.kafkastreams; + +public record User(String id, String name, String country) { +} diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserDeserializer.java b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserDeserializer.java new file mode 100644 index 000000000000..dd615305bc04 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserDeserializer.java @@ -0,0 +1,28 @@ +package com.baeldung.kafkastreams; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class UserDeserializer implements Deserializer { + private static final Logger log = LoggerFactory.getLogger(UserDeserializer.class); + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public User deserialize(String topic, byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return null; + } + try { + return mapper.readValue(bytes, User.class); + } catch (IOException ex) { + log.error("Error deserializing the message {} for topic {} error message {}", bytes, topic, ex.getMessage(), ex); + throw new RuntimeException(ex); + } + } +} + diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserSerde.java b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserSerde.java new file mode 100644 index 000000000000..e4a57e7d781f --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserSerde.java @@ -0,0 +1,10 @@ +package com.baeldung.kafkastreams; + +import org.apache.kafka.common.serialization.Serdes; + +public class UserSerde extends Serdes.WrapperSerde { + public UserSerde() { + super(new UserSerializer(), new UserDeserializer()); + } +} + diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserSerializer.java b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserSerializer.java new file mode 100644 index 000000000000..525d7e529a81 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserSerializer.java @@ -0,0 +1,27 @@ +package com.baeldung.kafkastreams; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UserSerializer implements Serializer { + private static final Logger log = LoggerFactory.getLogger(UserSerializer.class); + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public byte[] serialize(String topic, User user) { + if (user == null) { + return null; + } + + try { + return mapper.writeValueAsBytes(user); + } catch (JsonProcessingException ex) { + log.error("Error deserializing the user {} with exception {}", user, ex.getMessage(), ex); + throw new RuntimeException(ex); + } + } +} + diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserStreamService.java b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserStreamService.java new file mode 100644 index 000000000000..7a5c8cd750c1 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafkastreams/UserStreamService.java @@ -0,0 +1,73 @@ +package com.baeldung.kafkastreams; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.kstream.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.Properties; +import java.util.UUID; + +public class UserStreamService { + private static final Logger log = LoggerFactory.getLogger(UserStreamService.class); + private KafkaStreams kafkaStreams; + + public void start(String bootstrapServer) { + StreamsBuilder builder = new StreamsBuilder(); + KStream userStream = builder.stream( + "user-topic", + Consumed.with(Serdes.String(), new UserSerde()) + ); + + KTable usersPerCountry = userStream + .filter((key, user) -> + Objects.nonNull(user) && user.country() != null && !user.country().isEmpty()) + .groupBy((key, user) -> user.country(), Grouped.with(Serdes.String(), + new UserSerde())) + .count(Materialized.as("users_per_country_store")); + + usersPerCountry.toStream() + .peek((country, count) -> log.info("Aggregated for country {} with count {}",country, count)) + .to("users_per_country", Produced.with(Serdes.String(), Serdes.Long())); + + Properties props = getStreamProperties(bootstrapServer); + kafkaStreams = new KafkaStreams(builder.build(), props); + kafkaStreams.setUncaughtExceptionHandler(new StreamExceptionHandler()); + + Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); + + kafkaStreams.start(); + } + + public void stop() { + if (kafkaStreams != null) { + kafkaStreams.close(); + } + } + + public void cleanUp() { + if (kafkaStreams != null) { + kafkaStreams.cleanUp(); + } + } + + private static Properties getStreamProperties(String bootstrapServer) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-country-aggregator" + UUID.randomUUID()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + LogAndContinueExceptionHandler.class); + props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, + CustomProcessingExceptionHandler.class); + props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, + CustomProductionExceptionHandler.class); + + return props; + } +} + diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java index 09770148e084..56b070bb78e7 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java @@ -40,7 +40,7 @@ public void shouldTestKafkaStreams() throws InterruptedException { .getName()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - + // Use a temporary directory for storing state, which will be automatically removed after the test. try { Path stateDirectory = Files.createTempDirectory("kafka-streams"); @@ -75,4 +75,4 @@ public void shouldTestKafkaStreams() throws InterruptedException { Thread.sleep(30000); streams.close(); } -} +} \ No newline at end of file diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafkastreams/UserStreamLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafkastreams/UserStreamLiveTest.java new file mode 100644 index 000000000000..e2b324bebe17 --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafkastreams/UserStreamLiveTest.java @@ -0,0 +1,176 @@ +package com.baeldung.kafkastreams; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.*; +import org.junit.jupiter.api.*; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.junit.jupiter.api.Assertions.*; + +// This live test requires a Docker Daemon running so that a kafka container can be created + +@Testcontainers +class UserStreamLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + private static UserStreamService streamService; + + @BeforeAll + static void setup() { + KAFKA_CONTAINER.start(); + streamService = new UserStreamService(); + producer = new KafkaProducer<>(getProducerConfig()); + consumer = new KafkaConsumer<>(getConsumerConfig()); + new Thread(() -> streamService.start(KAFKA_CONTAINER.getBootstrapServers())).start(); + } + + @AfterAll + static void destroy() { + producer.flush(); + producer.close(); + consumer.close(); + streamService.stop(); + streamService.cleanUp(); + KAFKA_CONTAINER.stop(); + } + + @Test + void givenValidUserIsSent_whenStreamServiceStarts_thenAggregatedCountIsSent() { + producer.send(new ProducerRecord<>("user-topic", "x1", new User("1", "user1", "US"))); + producer.send(new ProducerRecord<>("user-topic", "x2", new User("2", "user2", "DE"))); + consumer.subscribe(List.of("users_per_country")); + + Awaitility.await() + .atMost(45, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + Map counts = StreamSupport.stream(records.spliterator(), false) + .collect(Collectors.toMap(ConsumerRecord::key, ConsumerRecord::value)); + + assertTrue(counts.containsKey("US")); + assertTrue(counts.containsKey("DE")); + assertEquals(1L, counts.get("US")); + assertEquals(1L, counts.get("DE")); + }); + } + + @Test + void givenValidAndNullUserIsSent_whenStreamServiceIsRunning_thenAggregatedCount() { + producer.send(new ProducerRecord<>("user-topic", "x3", new User("3", "user3", "IE"))); + producer.send(new ProducerRecord<>("user-topic", "x4", null)); + + consumer.subscribe(List.of("users_per_country")); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + Map counts = StreamSupport.stream(records.spliterator(), false) + .collect(Collectors.toMap(ConsumerRecord::key, ConsumerRecord::value)); + + assertTrue(counts.containsKey("IE")); + assertEquals(1L, counts.get("IE")); + }); + } + + @Test + void givenInvalidUserIsSent_whenStreamServiceIsRunning_thenAggregatedCountIsEmpty() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + KafkaProducer producer = new KafkaProducer<>(props); + + byte[] invalidJson = "{ invalid json".getBytes(StandardCharsets.UTF_8); + producer.send(new ProducerRecord<>("user-topic", "x5", invalidJson)); + + consumer.subscribe(List.of("users_per_country")); + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + assertTrue(records.isEmpty()); + }); + } + + @Test + void givenEmptyUserJsonIsSent_whenStreamServiceIsRunning_thenAggregatedCountIsEmpty() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + KafkaProducer producer = new KafkaProducer<>(props); + byte[] emptyJson = "".getBytes(StandardCharsets.UTF_8); + producer.send(new ProducerRecord<>("user-topic", "x6", emptyJson)); + + consumer.subscribe(List.of("users_per_country")); + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + assertTrue(records.isEmpty()); + }); + } + + @Test + void givenInvalidCountryUserIsSent_whenStreamServiceIsRunning_thenNoAggregatedCount() { + producer.send(new ProducerRecord<>("user-topic", "x7", new User("7", "user7", null))); + consumer.subscribe(List.of("users_per_country")); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + assertTrue(records.isEmpty()); + }); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + UUID.randomUUID()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } +}