diff --git a/spring-kafka-4/pom.xml b/spring-kafka-4/pom.xml
index 3086d0993b04..b113ec0f1490 100644
--- a/spring-kafka-4/pom.xml
+++ b/spring-kafka-4/pom.xml
@@ -19,11 +19,24 @@
org.springframework.boot
spring-boot-starter
+
+ org.springframework.boot
+ spring-boot-starter-web
+
org.springframework.kafka
spring-kafka
${spring-kafka.version}
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
org.springframework.kafka
spring-kafka-test
diff --git a/spring-kafka-4/src/main/java/com/baeldung/seek/Application.java b/spring-kafka-4/src/main/java/com/baeldung/seek/Application.java
new file mode 100644
index 000000000000..f6e2ad92b894
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/seek/Application.java
@@ -0,0 +1,13 @@
+package com.baeldung.seek;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+}
diff --git a/spring-kafka-4/src/main/java/com/baeldung/seek/ConsumerListener.java b/spring-kafka-4/src/main/java/com/baeldung/seek/ConsumerListener.java
new file mode 100644
index 000000000000..6bdf3670d30b
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/seek/ConsumerListener.java
@@ -0,0 +1,27 @@
+package com.baeldung.seek;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.AbstractConsumerSeekAware;
+import org.springframework.stereotype.Component;
+
+@Component
+class ConsumerListener extends AbstractConsumerSeekAware {
+
+ public static final Map MESSAGES = new HashMap<>();
+
+ @Override
+ public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) {
+ assignments.keySet()
+ .forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, false));
+ }
+
+ @KafkaListener(id = "test-seek", topics = "test-seek-topic")
+ public void listen(ConsumerRecord in) {
+ MESSAGES.put(in.key(), in.value());
+ }
+}
diff --git a/spring-kafka-4/src/main/java/com/baeldung/seek/Response.java b/spring-kafka-4/src/main/java/com/baeldung/seek/Response.java
new file mode 100644
index 000000000000..70fc5aaa0aeb
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/seek/Response.java
@@ -0,0 +1,5 @@
+package com.baeldung.seek;
+
+public record Response(int partition, long offset, String value) {
+
+}
diff --git a/spring-kafka-4/src/main/java/com/baeldung/seek/SeekController.java b/spring-kafka-4/src/main/java/com/baeldung/seek/SeekController.java
new file mode 100644
index 000000000000..3383f4cc8a75
--- /dev/null
+++ b/spring-kafka-4/src/main/java/com/baeldung/seek/SeekController.java
@@ -0,0 +1,75 @@
+package com.baeldung.seek;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/seek/api/v1/")
+public class SeekController {
+
+ public static final String TOPIC_NAME = "test-topic";
+
+ private final DefaultKafkaConsumerFactory consumerFactory;
+
+ public SeekController(DefaultKafkaConsumerFactory consumerFactory) {
+ this.consumerFactory = consumerFactory;
+ }
+
+ @GetMapping("partition/{partition}/offset/{offset}")
+ public ResponseEntity getOneByPartitionAndOffset(@PathVariable("partition") int partition, @PathVariable("offset") int offset) {
+ try (KafkaConsumer consumer = (KafkaConsumer) consumerFactory.createConsumer()) {
+ TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
+ consumer.assign(Collections.singletonList(topicPartition));
+ consumer.seek(topicPartition, offset);
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ Iterator> recordIterator = records.iterator();
+ if (recordIterator.hasNext()) {
+ ConsumerRecord consumerRecord = recordIterator.next();
+ Response response = new Response(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value());
+ return new ResponseEntity<>(response, HttpStatus.OK);
+ }
+ }
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+ }
+
+ @GetMapping("partition/{partition}/beginning")
+ public ResponseEntity getOneByPartitionToBeginningOffset(@PathVariable("partition") int partition) {
+ try (KafkaConsumer consumer = (KafkaConsumer) consumerFactory.createConsumer()) {
+ TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
+ consumer.assign(Collections.singletonList(topicPartition));
+ consumer.seekToBeginning(Collections.singleton(topicPartition));
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ Iterator> recordIterator = records.iterator();
+ if (recordIterator.hasNext()) {
+ ConsumerRecord consumerRecord = recordIterator.next();
+ Response response = new Response(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value());
+ return new ResponseEntity<>(response, HttpStatus.OK);
+ }
+ }
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+ }
+
+ @GetMapping("partition/{partition}/end")
+ public ResponseEntity getOneByPartitionToEndOffset(@PathVariable("partition") int partition) {
+ try (KafkaConsumer consumer = (KafkaConsumer) consumerFactory.createConsumer()) {
+ TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
+ consumer.assign(Collections.singletonList(topicPartition));
+ consumer.seekToEnd(Collections.singleton(topicPartition));
+ return new ResponseEntity<>(consumer.position(topicPartition), HttpStatus.OK);
+ }
+ }
+
+}
diff --git a/spring-kafka-4/src/test/java/com/baeldung/seek/ConsumerListenerSeekLiveTest.java b/spring-kafka-4/src/test/java/com/baeldung/seek/ConsumerListenerSeekLiveTest.java
new file mode 100644
index 000000000000..f14bba50c16a
--- /dev/null
+++ b/spring-kafka-4/src/test/java/com/baeldung/seek/ConsumerListenerSeekLiveTest.java
@@ -0,0 +1,72 @@
+package com.baeldung.seek;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
+@Testcontainers
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+class ConsumerListenerSeekLiveTest {
+
+ protected static ListAppender listAppender;
+
+ @Autowired
+ ConsumerListener consumerListener;
+
+ @Container
+ private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
+ private static KafkaProducer testKafkaProducer;
+
+ @DynamicPropertySource
+ static void setProps(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
+ }
+
+ @BeforeAll
+ static void beforeAll() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ testKafkaProducer = new KafkaProducer<>(props);
+ IntStream.range(0, 5)
+ .forEach(m -> {
+ ProducerRecord record = new ProducerRecord<>("test-seek-topic", 0, String.valueOf(m), "Message no : %s".formatted(m));
+ try {
+ testKafkaProducer.send(record)
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ testKafkaProducer.flush();
+ }
+
+ @Test
+ void givenKafkaBrokerExists_whenMessagesAreSent_thenLastMessageShouldBeRetrieved() {
+ Map messages = consumerListener.MESSAGES;
+ Assertions.assertEquals(1, messages.size());
+ Assertions.assertEquals("Message no : 4", messages.get("4"));
+ }
+
+}
\ No newline at end of file
diff --git a/spring-kafka-4/src/test/java/com/baeldung/seek/SeekControllerLiveTest.java b/spring-kafka-4/src/test/java/com/baeldung/seek/SeekControllerLiveTest.java
new file mode 100644
index 000000000000..358c79414ae5
--- /dev/null
+++ b/spring-kafka-4/src/test/java/com/baeldung/seek/SeekControllerLiveTest.java
@@ -0,0 +1,99 @@
+package com.baeldung.seek;
+
+import static com.baeldung.seek.SeekController.TOPIC_NAME;
+
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+@Testcontainers
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@AutoConfigureWebTestClient
+class SeekControllerLiveTest {
+
+ @Autowired
+ private WebTestClient webClient;
+
+ @Container
+ private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
+ private static KafkaProducer testKafkaProducer;
+
+ @DynamicPropertySource
+ static void setProps(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
+ }
+
+ @BeforeAll
+ static void beforeAll() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ testKafkaProducer = new KafkaProducer<>(props);
+ int partition = 0;
+ IntStream.range(0, 5)
+ .forEach(m -> {
+ String key = String.valueOf(new Random().nextInt());
+ String value = "Message no : %s".formatted(m);
+ ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, partition, key, value);
+ try {
+ testKafkaProducer.send(record)
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ void givenKafkaBrokerExists_whenSeekByPartitionAndOffset_thenMessageShouldBeRetrieved() {
+ this.webClient.get()
+ .uri("/seek/api/v1/partition/0/offset/2")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .expectBody(String.class)
+ .isEqualTo("{\"partition\":0,\"offset\":2,\"value\":\"Message no : 2\"}");
+ }
+
+ @Test
+ void givenKafkaBrokerExists_whenSeekByBeginning_thenFirstMessageShouldBeRetrieved() {
+ this.webClient.get()
+ .uri("/seek/api/v1/partition/0/beginning")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .expectBody(String.class)
+ .isEqualTo("{\"partition\":0,\"offset\":0,\"value\":\"Message no : 0\"}");
+ }
+
+ @Test
+ void givenKafkaBrokerExists_whenSeekByEnd_thenLatestOffsetShouldBeRetrieved() {
+ this.webClient.get()
+ .uri("/seek/api/v1/partition/0/end")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .expectBody(Long.class)
+ .isEqualTo(5L);
+ }
+
+}
\ No newline at end of file