这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions spring-kafka-4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,24 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions spring-kafka-4/src/main/java/com/baeldung/seek/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.baeldung.seek;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.baeldung.seek;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;

@Component
class ConsumerListener extends AbstractConsumerSeekAware {

public static final Map<String, String> MESSAGES = new HashMap<>();

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet()
.forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, false));
}

@KafkaListener(id = "test-seek", topics = "test-seek-topic")
public void listen(ConsumerRecord<String, String> in) {
MESSAGES.put(in.key(), in.value());
}
}
5 changes: 5 additions & 0 deletions spring-kafka-4/src/main/java/com/baeldung/seek/Response.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.baeldung.seek;

public record Response(int partition, long offset, String value) {

}
75 changes: 75 additions & 0 deletions spring-kafka-4/src/main/java/com/baeldung/seek/SeekController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.baeldung.seek;

import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/seek/api/v1/")
public class SeekController {

public static final String TOPIC_NAME = "test-topic";

private final DefaultKafkaConsumerFactory<String, String> consumerFactory;

public SeekController(DefaultKafkaConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}

@GetMapping("partition/{partition}/offset/{offset}")
public ResponseEntity<Response> getOneByPartitionAndOffset(@PathVariable("partition") int partition, @PathVariable("offset") int offset) {
try (KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
if (recordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordIterator.next();
Response response = new Response(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value());
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}

@GetMapping("partition/{partition}/beginning")
public ResponseEntity<Response> getOneByPartitionToBeginningOffset(@PathVariable("partition") int partition) {
try (KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seekToBeginning(Collections.singleton(topicPartition));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
if (recordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordIterator.next();
Response response = new Response(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value());
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}

@GetMapping("partition/{partition}/end")
public ResponseEntity<Long> getOneByPartitionToEndOffset(@PathVariable("partition") int partition) {
try (KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seekToEnd(Collections.singleton(topicPartition));
return new ResponseEntity<>(consumer.position(topicPartition), HttpStatus.OK);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.baeldung.seek;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;

@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class ConsumerListenerSeekLiveTest {

protected static ListAppender<ILoggingEvent> listAppender;

@Autowired
ConsumerListener consumerListener;

@Container
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
private static KafkaProducer<String, String> testKafkaProducer;

@DynamicPropertySource
static void setProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}

@BeforeAll
static void beforeAll() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
testKafkaProducer = new KafkaProducer<>(props);
IntStream.range(0, 5)
.forEach(m -> {
ProducerRecord<String, String> record = new ProducerRecord<>("test-seek-topic", 0, String.valueOf(m), "Message no : %s".formatted(m));
try {
testKafkaProducer.send(record)
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
testKafkaProducer.flush();
}

@Test
void givenKafkaBrokerExists_whenMessagesAreSent_thenLastMessageShouldBeRetrieved() {
Map<String, String> messages = consumerListener.MESSAGES;
Assertions.assertEquals(1, messages.size());
Assertions.assertEquals("Message no : 4", messages.get("4"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.baeldung.seek;

import static com.baeldung.seek.SeekController.TOPIC_NAME;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureWebTestClient
class SeekControllerLiveTest {

@Autowired
private WebTestClient webClient;

@Container
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
private static KafkaProducer<String, String> testKafkaProducer;

@DynamicPropertySource
static void setProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}

@BeforeAll
static void beforeAll() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
testKafkaProducer = new KafkaProducer<>(props);
int partition = 0;
IntStream.range(0, 5)
.forEach(m -> {
String key = String.valueOf(new Random().nextInt());
String value = "Message no : %s".formatted(m);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partition, key, value);
try {
testKafkaProducer.send(record)
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}

@Test
void givenKafkaBrokerExists_whenSeekByPartitionAndOffset_thenMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/offset/2")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"partition\":0,\"offset\":2,\"value\":\"Message no : 2\"}");
}

@Test
void givenKafkaBrokerExists_whenSeekByBeginning_thenFirstMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/beginning")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"partition\":0,\"offset\":0,\"value\":\"Message no : 0\"}");
}

@Test
void givenKafkaBrokerExists_whenSeekByEnd_thenLatestOffsetShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/end")
.exchange()
.expectStatus()
.isOk()
.expectBody(Long.class)
.isEqualTo(5L);
}

}