diff --git a/spring-kafka-4/pom.xml b/spring-kafka-4/pom.xml index 3086d0993b04..b113ec0f1490 100644 --- a/spring-kafka-4/pom.xml +++ b/spring-kafka-4/pom.xml @@ -19,11 +19,24 @@ org.springframework.boot spring-boot-starter + + org.springframework.boot + spring-boot-starter-web + org.springframework.kafka spring-kafka ${spring-kafka.version} + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework.boot + spring-boot-starter-test + test + org.springframework.kafka spring-kafka-test diff --git a/spring-kafka-4/src/main/java/com/baeldung/seek/Application.java b/spring-kafka-4/src/main/java/com/baeldung/seek/Application.java new file mode 100644 index 000000000000..f6e2ad92b894 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/seek/Application.java @@ -0,0 +1,13 @@ +package com.baeldung.seek; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/seek/ConsumerListener.java b/spring-kafka-4/src/main/java/com/baeldung/seek/ConsumerListener.java new file mode 100644 index 000000000000..6bdf3670d30b --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/seek/ConsumerListener.java @@ -0,0 +1,27 @@ +package com.baeldung.seek; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.AbstractConsumerSeekAware; +import org.springframework.stereotype.Component; + +@Component +class ConsumerListener extends AbstractConsumerSeekAware { + + public static final Map MESSAGES = new HashMap<>(); + + @Override + public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { + assignments.keySet() + .forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, false)); + } + + @KafkaListener(id = "test-seek", topics = "test-seek-topic") + public void listen(ConsumerRecord in) { + MESSAGES.put(in.key(), in.value()); + } +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/seek/Response.java b/spring-kafka-4/src/main/java/com/baeldung/seek/Response.java new file mode 100644 index 000000000000..70fc5aaa0aeb --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/seek/Response.java @@ -0,0 +1,5 @@ +package com.baeldung.seek; + +public record Response(int partition, long offset, String value) { + +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/seek/SeekController.java b/spring-kafka-4/src/main/java/com/baeldung/seek/SeekController.java new file mode 100644 index 000000000000..3383f4cc8a75 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/seek/SeekController.java @@ -0,0 +1,75 @@ +package com.baeldung.seek; + +import java.time.Duration; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/seek/api/v1/") +public class SeekController { + + public static final String TOPIC_NAME = "test-topic"; + + private final DefaultKafkaConsumerFactory consumerFactory; + + public SeekController(DefaultKafkaConsumerFactory consumerFactory) { + this.consumerFactory = consumerFactory; + } + + @GetMapping("partition/{partition}/offset/{offset}") + public ResponseEntity getOneByPartitionAndOffset(@PathVariable("partition") int partition, @PathVariable("offset") int offset) { + try (KafkaConsumer consumer = (KafkaConsumer) consumerFactory.createConsumer()) { + TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition); + consumer.assign(Collections.singletonList(topicPartition)); + consumer.seek(topicPartition, offset); + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + Iterator> recordIterator = records.iterator(); + if (recordIterator.hasNext()) { + ConsumerRecord consumerRecord = recordIterator.next(); + Response response = new Response(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value()); + return new ResponseEntity<>(response, HttpStatus.OK); + } + } + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + + @GetMapping("partition/{partition}/beginning") + public ResponseEntity getOneByPartitionToBeginningOffset(@PathVariable("partition") int partition) { + try (KafkaConsumer consumer = (KafkaConsumer) consumerFactory.createConsumer()) { + TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition); + consumer.assign(Collections.singletonList(topicPartition)); + consumer.seekToBeginning(Collections.singleton(topicPartition)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + Iterator> recordIterator = records.iterator(); + if (recordIterator.hasNext()) { + ConsumerRecord consumerRecord = recordIterator.next(); + Response response = new Response(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value()); + return new ResponseEntity<>(response, HttpStatus.OK); + } + } + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + + @GetMapping("partition/{partition}/end") + public ResponseEntity getOneByPartitionToEndOffset(@PathVariable("partition") int partition) { + try (KafkaConsumer consumer = (KafkaConsumer) consumerFactory.createConsumer()) { + TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition); + consumer.assign(Collections.singletonList(topicPartition)); + consumer.seekToEnd(Collections.singleton(topicPartition)); + return new ResponseEntity<>(consumer.position(topicPartition), HttpStatus.OK); + } + } + +} diff --git a/spring-kafka-4/src/test/java/com/baeldung/seek/ConsumerListenerSeekLiveTest.java b/spring-kafka-4/src/test/java/com/baeldung/seek/ConsumerListenerSeekLiveTest.java new file mode 100644 index 000000000000..f14bba50c16a --- /dev/null +++ b/spring-kafka-4/src/test/java/com/baeldung/seek/ConsumerListenerSeekLiveTest.java @@ -0,0 +1,72 @@ +package com.baeldung.seek; + +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; + +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +class ConsumerListenerSeekLiveTest { + + protected static ListAppender listAppender; + + @Autowired + ConsumerListener consumerListener; + + @Container + private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + private static KafkaProducer testKafkaProducer; + + @DynamicPropertySource + static void setProps(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + } + + @BeforeAll + static void beforeAll() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + testKafkaProducer = new KafkaProducer<>(props); + IntStream.range(0, 5) + .forEach(m -> { + ProducerRecord record = new ProducerRecord<>("test-seek-topic", 0, String.valueOf(m), "Message no : %s".formatted(m)); + try { + testKafkaProducer.send(record) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + testKafkaProducer.flush(); + } + + @Test + void givenKafkaBrokerExists_whenMessagesAreSent_thenLastMessageShouldBeRetrieved() { + Map messages = consumerListener.MESSAGES; + Assertions.assertEquals(1, messages.size()); + Assertions.assertEquals("Message no : 4", messages.get("4")); + } + +} \ No newline at end of file diff --git a/spring-kafka-4/src/test/java/com/baeldung/seek/SeekControllerLiveTest.java b/spring-kafka-4/src/test/java/com/baeldung/seek/SeekControllerLiveTest.java new file mode 100644 index 000000000000..358c79414ae5 --- /dev/null +++ b/spring-kafka-4/src/test/java/com/baeldung/seek/SeekControllerLiveTest.java @@ -0,0 +1,99 @@ +package com.baeldung.seek; + +import static com.baeldung.seek.SeekController.TOPIC_NAME; + +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@AutoConfigureWebTestClient +class SeekControllerLiveTest { + + @Autowired + private WebTestClient webClient; + + @Container + private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + private static KafkaProducer testKafkaProducer; + + @DynamicPropertySource + static void setProps(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + } + + @BeforeAll + static void beforeAll() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + testKafkaProducer = new KafkaProducer<>(props); + int partition = 0; + IntStream.range(0, 5) + .forEach(m -> { + String key = String.valueOf(new Random().nextInt()); + String value = "Message no : %s".formatted(m); + ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, partition, key, value); + try { + testKafkaProducer.send(record) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + void givenKafkaBrokerExists_whenSeekByPartitionAndOffset_thenMessageShouldBeRetrieved() { + this.webClient.get() + .uri("/seek/api/v1/partition/0/offset/2") + .exchange() + .expectStatus() + .isOk() + .expectBody(String.class) + .isEqualTo("{\"partition\":0,\"offset\":2,\"value\":\"Message no : 2\"}"); + } + + @Test + void givenKafkaBrokerExists_whenSeekByBeginning_thenFirstMessageShouldBeRetrieved() { + this.webClient.get() + .uri("/seek/api/v1/partition/0/beginning") + .exchange() + .expectStatus() + .isOk() + .expectBody(String.class) + .isEqualTo("{\"partition\":0,\"offset\":0,\"value\":\"Message no : 0\"}"); + } + + @Test + void givenKafkaBrokerExists_whenSeekByEnd_thenLatestOffsetShouldBeRetrieved() { + this.webClient.get() + .uri("/seek/api/v1/partition/0/end") + .exchange() + .expectStatus() + .isOk() + .expectBody(Long.class) + .isEqualTo(5L); + } + +} \ No newline at end of file